rafaldembski commited on
Commit
f1769b8
1 Parent(s): 24dc9b4

Update utils/functions.py

Browse files
Files changed (1) hide show
  1. utils/functions.py +48 -41
utils/functions.py CHANGED
@@ -6,27 +6,26 @@ import os
6
  from datetime import datetime
7
  import logging
8
  import json
9
- from PIL import Image # Make sure you have installed this library: pip install pillow
10
- import pytesseract # Make sure you have installed this library: pip install pytesseract
11
 
12
- # Configure logging
13
  logging.basicConfig(
14
  filename='app.log',
15
  level=logging.INFO,
16
  format='%(asctime)s %(levelname)s:%(message)s'
17
  )
18
 
19
- # Define paths to JSON files
20
  BASE_DIR = os.path.dirname(os.path.abspath(__file__))
21
  DATA_DIR = os.path.join(BASE_DIR, '..', 'data')
22
  FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
23
  HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
24
  STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
25
 
26
- # Utility functions
27
-
28
  def load_json(file_path):
29
- """Loads data from a JSON file. Returns an empty list or default object if the file does not exist."""
30
  if not os.path.exists(file_path):
31
  if file_path.endswith('stats.json'):
32
  return {"total_analyses": 0, "total_frauds_detected": 0}
@@ -37,43 +36,43 @@ def load_json(file_path):
37
  data = json.load(file)
38
  return data
39
  except json.JSONDecodeError:
40
- logging.error(f"Cannot load data from {file_path}. The file is corrupted.")
41
  if file_path.endswith('stats.json'):
42
  return {"total_analyses": 0, "total_frauds_detected": 0}
43
  return []
44
 
45
  def save_json(file_path, data):
46
- """Saves data to a JSON file."""
47
  with open(file_path, 'w', encoding='utf-8') as file:
48
  json.dump(data, file, ensure_ascii=False, indent=4)
49
- logging.info(f"Data has been saved to {file_path}.")
50
 
51
  def add_fake_number(phone_number):
52
- """Adds a phone number to the fake_numbers.json file as fake if it does not already exist."""
53
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
54
  if phone_number not in fake_numbers:
55
  fake_numbers.append(phone_number)
56
  save_json(FAKE_NUMBERS_FILE, fake_numbers)
57
- logging.info(f"Number {phone_number} has been successfully added to fake_numbers.json.")
58
  return True
59
  else:
60
- logging.info(f"Number {phone_number} already exists in fake_numbers.json.")
61
  return False
62
 
63
  def is_fake_number(phone_number):
64
- """Checks if a given phone number is marked as fake in the fake_numbers.json file."""
65
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
66
  exists = phone_number in fake_numbers
67
- logging.info(f"Checking number {phone_number}: {'found' if exists else 'not found'}.")
68
  return exists
69
 
70
  def get_fake_numbers():
71
- """Retrieves the list of fake numbers from the fake_numbers.json file."""
72
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
73
  return fake_numbers
74
 
75
  def add_to_history(message, phone_number, analysis, risk, recommendations):
76
- """Adds an entry to the analysis history in history.json."""
77
  history = load_json(HISTORY_FILE)
78
  history.append({
79
  "timestamp": datetime.now().isoformat(),
@@ -84,46 +83,52 @@ def add_to_history(message, phone_number, analysis, risk, recommendations):
84
  "recommendations": recommendations
85
  })
86
  save_json(HISTORY_FILE, history)
87
- logging.info(f"Added entry to history.json for number {phone_number}.")
 
 
 
 
 
 
88
 
89
  def update_stats(fraud_detected=False):
90
- """Updates analysis statistics in stats.json."""
91
  stats = load_json(STATS_FILE)
92
  stats["total_analyses"] += 1
93
  if fraud_detected:
94
  stats["total_frauds_detected"] += 1
95
  save_json(STATS_FILE, stats)
96
- logging.info(f"Statistics have been updated: Analyses {stats['total_analyses']}, Frauds {stats['total_frauds_detected']}.")
97
 
98
  def get_stats():
99
- """Retrieves analysis statistics from stats.json."""
100
  stats = load_json(STATS_FILE)
101
- logging.info("Statistics have been successfully retrieved.")
102
  return stats
103
 
104
  def get_statistics():
105
- """Returns detailed analysis statistics from stats.json."""
106
  stats = load_json(STATS_FILE)
107
  return stats
108
 
109
  def get_phone_info(phone_number):
110
- """Verifies the phone number and returns information about the country and operator."""
111
  try:
112
  parsed_number = phonenumbers.parse(phone_number, None)
113
- country = geocoder.description_for_number(parsed_number, 'pl') # Change to 'pl' for Polish
114
- operator = carrier.name_for_number(parsed_number, 'pl') # Change to 'pl' for Polish
115
  if not country:
116
- country = "Unknown"
117
  if not operator:
118
- operator = "Unknown"
119
- logging.info(f"Number {phone_number} - Country: {country}, Operator: {operator}.")
120
  return country, operator
121
  except phonenumbers.NumberParseException as e:
122
- logging.error(f"Could not process phone number {phone_number}: {e}")
123
- return "Unknown", "Unknown"
124
 
125
  def simple_checks(message, language):
126
- """Performs simple heuristic checks on SMS messages."""
127
  warnings = []
128
  scam_keywords = {
129
  'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
@@ -135,20 +140,20 @@ def simple_checks(message, language):
135
  message_lower = message.lower()
136
 
137
  if any(keyword.lower() in message_lower for keyword in selected_keywords):
138
- warnings.append("The message contains keywords associated with potential fraud.")
139
  if re.search(r'http[s]?://', message):
140
- warnings.append("The message contains a link.")
141
  if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower):
142
- warnings.append("The message contains a request for confidential information.")
143
  return warnings
144
 
145
  def analyze_message(message, phone_number, additional_info, api_key, language):
146
- """Analyzes the SMS message using the SambaNova API."""
147
  if not api_key:
148
- logging.error("Missing API key.")
149
- return "Missing API key.", "Missing API key.", "Missing API key."
150
 
151
- url = "https://api.sambanova.ai/v1/chat/completions" # Make sure this is the correct API URL
152
  headers = {
153
  "Authorization": f"Bearer {api_key}",
154
  "Content-Type": "application/json"
@@ -280,10 +285,12 @@ You are an advanced AI assistant specializing in identifying fake SMS messages.
280
  - Do you have access to other tools that can help you assess this message?
281
  - What actions can you take to enhance your security in the future?
282
  </recommendations>
 
 
283
  """
284
  }
285
 
286
- system_prompt = system_prompts.get(language, system_prompts['English']) # Default to English if language is unsupported
287
 
288
  user_prompt = f"""Analyze the following message for potential fraud:
289
 
@@ -296,7 +303,7 @@ Additional Information:
296
  Provide your analysis and conclusions following the guidelines above."""
297
 
298
  payload = {
299
- "model": "Meta-Llama-3.1-8B-Instruct", # Ensure this is the correct API model
300
  "messages": [
301
  {"role": "system", "content": system_prompt},
302
  {"role": "user", "content": user_prompt}
 
6
  from datetime import datetime
7
  import logging
8
  import json
9
+ from PIL import Image # Upewnij się, że zainstalowałeś bibliotekę: pip install pillow
10
+ import pytesseract # Upewnij się, że zainstalowałeś bibliotekę: pip install pytesseract
11
 
12
+ # Konfiguracja logowania
13
  logging.basicConfig(
14
  filename='app.log',
15
  level=logging.INFO,
16
  format='%(asctime)s %(levelname)s:%(message)s'
17
  )
18
 
19
+ # Definiowanie ścieżek do plików JSON
20
  BASE_DIR = os.path.dirname(os.path.abspath(__file__))
21
  DATA_DIR = os.path.join(BASE_DIR, '..', 'data')
22
  FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
23
  HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
24
  STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
25
 
26
+ # Funkcje pomocnicze
 
27
  def load_json(file_path):
28
+ """Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt."""
29
  if not os.path.exists(file_path):
30
  if file_path.endswith('stats.json'):
31
  return {"total_analyses": 0, "total_frauds_detected": 0}
 
36
  data = json.load(file)
37
  return data
38
  except json.JSONDecodeError:
39
+ logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.")
40
  if file_path.endswith('stats.json'):
41
  return {"total_analyses": 0, "total_frauds_detected": 0}
42
  return []
43
 
44
  def save_json(file_path, data):
45
+ """Zapisuje dane do pliku JSON."""
46
  with open(file_path, 'w', encoding='utf-8') as file:
47
  json.dump(data, file, ensure_ascii=False, indent=4)
48
+ logging.info(f"Dane zostały zapisane do {file_path}.")
49
 
50
  def add_fake_number(phone_number):
51
+ """Dodaje numer telefonu do pliku fake_numbers.json jako fałszywy, jeśli jeszcze go tam nie ma."""
52
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
53
  if phone_number not in fake_numbers:
54
  fake_numbers.append(phone_number)
55
  save_json(FAKE_NUMBERS_FILE, fake_numbers)
56
+ logging.info(f"Numer {phone_number} został pomyślnie dodany do fake_numbers.json.")
57
  return True
58
  else:
59
+ logging.info(f"Numer {phone_number} już istnieje w fake_numbers.json.")
60
  return False
61
 
62
  def is_fake_number(phone_number):
63
+ """Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w pliku fake_numbers.json."""
64
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
65
  exists = phone_number in fake_numbers
66
+ logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.")
67
  return exists
68
 
69
  def get_fake_numbers():
70
+ """Pobiera listę fałszywych numerów z pliku fake_numbers.json."""
71
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
72
  return fake_numbers
73
 
74
  def add_to_history(message, phone_number, analysis, risk, recommendations):
75
+ """Dodaje wpis do historii analiz w pliku history.json."""
76
  history = load_json(HISTORY_FILE)
77
  history.append({
78
  "timestamp": datetime.now().isoformat(),
 
83
  "recommendations": recommendations
84
  })
85
  save_json(HISTORY_FILE, history)
86
+ logging.info(f"Dodano wpis do history.json dla numeru {phone_number}.")
87
+
88
+ def get_history():
89
+ """Pobiera historię analiz z pliku history.json jako listę słowników."""
90
+ history = load_json(HISTORY_FILE)
91
+ logging.info("Historia analiz została pobrana pomyślnie.")
92
+ return history
93
 
94
  def update_stats(fraud_detected=False):
95
+ """Aktualizuje statystyki analiz w pliku stats.json."""
96
  stats = load_json(STATS_FILE)
97
  stats["total_analyses"] += 1
98
  if fraud_detected:
99
  stats["total_frauds_detected"] += 1
100
  save_json(STATS_FILE, stats)
101
+ logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total_analyses']}, Oszustw {stats['total_frauds_detected']}.")
102
 
103
  def get_stats():
104
+ """Pobiera statystyki analiz z pliku stats.json."""
105
  stats = load_json(STATS_FILE)
106
+ logging.info("Statystyki zostały pobrane pomyślnie.")
107
  return stats
108
 
109
  def get_statistics():
110
+ """Zwraca szczegółowe statystyki analiz z pliku stats.json."""
111
  stats = load_json(STATS_FILE)
112
  return stats
113
 
114
  def get_phone_info(phone_number):
115
+ """Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze."""
116
  try:
117
  parsed_number = phonenumbers.parse(phone_number, None)
118
+ country = geocoder.description_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
119
+ operator = carrier.name_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
120
  if not country:
121
+ country = "Nieznany"
122
  if not operator:
123
+ operator = "Nieznany"
124
+ logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
125
  return country, operator
126
  except phonenumbers.NumberParseException as e:
127
+ logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
128
+ return "Nieznany", "Nieznany"
129
 
130
  def simple_checks(message, language):
131
+ """Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS."""
132
  warnings = []
133
  scam_keywords = {
134
  'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
 
140
  message_lower = message.lower()
141
 
142
  if any(keyword.lower() in message_lower for keyword in selected_keywords):
143
+ warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
144
  if re.search(r'http[s]?://', message):
145
+ warnings.append("Wiadomość zawiera link.")
146
  if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower):
147
+ warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
148
  return warnings
149
 
150
  def analyze_message(message, phone_number, additional_info, api_key, language):
151
+ """Analizuje wiadomość SMS za pomocą API SambaNova."""
152
  if not api_key:
153
+ logging.error("Brak klucza API.")
154
+ return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
155
 
156
+ url = "https://api.sambanova.ai/v1/chat/completions" # Upewnij się, że to poprawny URL
157
  headers = {
158
  "Authorization": f"Bearer {api_key}",
159
  "Content-Type": "application/json"
 
285
  - Do you have access to other tools that can help you assess this message?
286
  - What actions can you take to enhance your security in the future?
287
  </recommendations>
288
+
289
+ Your response should be formatted exactly as specified above, using the <analysis>, <risk_assessment>, and <recommendations> tags. Ensure that each section is thoroughly and comprehensively filled out.
290
  """
291
  }
292
 
293
+ system_prompt = system_prompts.get(language, system_prompts['English']) # Domyślnie angielski, jeśli język nie jest obsługiwany
294
 
295
  user_prompt = f"""Analyze the following message for potential fraud:
296
 
 
303
  Provide your analysis and conclusions following the guidelines above."""
304
 
305
  payload = {
306
+ "model": "Meta-Llama-3.1-8B-Instruct", # Upewnij się, że to poprawny model API
307
  "messages": [
308
  {"role": "system", "content": system_prompt},
309
  {"role": "user", "content": user_prompt}