rafaldembski commited on
Commit
24dc9b4
1 Parent(s): b7074eb

Update utils/functions.py

Browse files
Files changed (1) hide show
  1. utils/functions.py +40 -42
utils/functions.py CHANGED
@@ -6,27 +6,27 @@ import os
6
  from datetime import datetime
7
  import logging
8
  import json
9
- from PIL import Image # Upewnij się, że zainstalowałeś bibliotekę: pip install pillow
10
- import pytesseract # Upewnij się, że zainstalowałeś bibliotekę: pip install pytesseract
11
 
12
- # Konfiguracja logowania
13
  logging.basicConfig(
14
  filename='app.log',
15
  level=logging.INFO,
16
  format='%(asctime)s %(levelname)s:%(message)s'
17
  )
18
 
19
- # Definiowanie ścieżek do plików JSON
20
  BASE_DIR = os.path.dirname(os.path.abspath(__file__))
21
  DATA_DIR = os.path.join(BASE_DIR, '..', 'data')
22
  FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
23
  HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
24
  STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
25
 
26
- # Funkcje pomocnicze
27
 
28
  def load_json(file_path):
29
- """Ładuje dane z pliku JSON. Jeśli plik nie istnieje, zwraca pustą listę lub domyślny obiekt."""
30
  if not os.path.exists(file_path):
31
  if file_path.endswith('stats.json'):
32
  return {"total_analyses": 0, "total_frauds_detected": 0}
@@ -37,43 +37,43 @@ def load_json(file_path):
37
  data = json.load(file)
38
  return data
39
  except json.JSONDecodeError:
40
- logging.error(f"Nie można załadować danych z {file_path}. Plik jest uszkodzony.")
41
  if file_path.endswith('stats.json'):
42
  return {"total_analyses": 0, "total_frauds_detected": 0}
43
  return []
44
 
45
  def save_json(file_path, data):
46
- """Zapisuje dane do pliku JSON."""
47
  with open(file_path, 'w', encoding='utf-8') as file:
48
  json.dump(data, file, ensure_ascii=False, indent=4)
49
- logging.info(f"Dane zostały zapisane do {file_path}.")
50
 
51
  def add_fake_number(phone_number):
52
- """Dodaje numer telefonu do pliku fake_numbers.json jako fałszywy, jeśli jeszcze go tam nie ma."""
53
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
54
  if phone_number not in fake_numbers:
55
  fake_numbers.append(phone_number)
56
  save_json(FAKE_NUMBERS_FILE, fake_numbers)
57
- logging.info(f"Numer {phone_number} został pomyślnie dodany do fake_numbers.json.")
58
  return True
59
  else:
60
- logging.info(f"Numer {phone_number} już istnieje w fake_numbers.json.")
61
  return False
62
 
63
  def is_fake_number(phone_number):
64
- """Sprawdza, czy dany numer telefonu jest oznaczony jako fałszywy w pliku fake_numbers.json."""
65
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
66
  exists = phone_number in fake_numbers
67
- logging.info(f"Sprawdzanie numeru {phone_number}: {'znaleziony' if exists else 'nie znaleziony'}.")
68
  return exists
69
 
70
  def get_fake_numbers():
71
- """Pobiera listę fałszywych numerów z pliku fake_numbers.json."""
72
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
73
  return fake_numbers
74
 
75
  def add_to_history(message, phone_number, analysis, risk, recommendations):
76
- """Dodaje wpis do historii analiz w pliku history.json."""
77
  history = load_json(HISTORY_FILE)
78
  history.append({
79
  "timestamp": datetime.now().isoformat(),
@@ -84,46 +84,46 @@ def add_to_history(message, phone_number, analysis, risk, recommendations):
84
  "recommendations": recommendations
85
  })
86
  save_json(HISTORY_FILE, history)
87
- logging.info(f"Dodano wpis do history.json dla numeru {phone_number}.")
88
 
89
  def update_stats(fraud_detected=False):
90
- """Aktualizuje statystyki analiz w pliku stats.json."""
91
  stats = load_json(STATS_FILE)
92
  stats["total_analyses"] += 1
93
  if fraud_detected:
94
  stats["total_frauds_detected"] += 1
95
  save_json(STATS_FILE, stats)
96
- logging.info(f"Statystyki zostały zaktualizowane: Analiz {stats['total_analyses']}, Oszustw {stats['total_frauds_detected']}.")
97
 
98
  def get_stats():
99
- """Pobiera statystyki analiz z pliku stats.json."""
100
  stats = load_json(STATS_FILE)
101
- logging.info("Statystyki zostały pobrane pomyślnie.")
102
  return stats
103
 
104
  def get_statistics():
105
- """Zwraca szczegółowe statystyki analiz z pliku stats.json."""
106
  stats = load_json(STATS_FILE)
107
  return stats
108
 
109
  def get_phone_info(phone_number):
110
- """Weryfikuje numer telefonu i zwraca informacje o kraju i operatorze."""
111
  try:
112
  parsed_number = phonenumbers.parse(phone_number, None)
113
- country = geocoder.description_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
114
- operator = carrier.name_for_number(parsed_number, 'pl') # Zmiana na 'pl' dla polskiego
115
  if not country:
116
- country = "Nieznany"
117
  if not operator:
118
- operator = "Nieznany"
119
- logging.info(f"Numer {phone_number} - Kraj: {country}, Operator: {operator}.")
120
  return country, operator
121
  except phonenumbers.NumberParseException as e:
122
- logging.error(f"Nie udało się przetworzyć numeru telefonu {phone_number}: {e}")
123
- return "Nieznany", "Nieznany"
124
 
125
  def simple_checks(message, language):
126
- """Przeprowadza proste sprawdzenia heurystyczne wiadomości SMS."""
127
  warnings = []
128
  scam_keywords = {
129
  'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
@@ -135,20 +135,20 @@ def simple_checks(message, language):
135
  message_lower = message.lower()
136
 
137
  if any(keyword.lower() in message_lower for keyword in selected_keywords):
138
- warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.")
139
  if re.search(r'http[s]?://', message):
140
- warnings.append("Wiadomość zawiera link.")
141
  if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower):
142
- warnings.append("Wiadomość zawiera prośbę o poufne informacje.")
143
  return warnings
144
 
145
  def analyze_message(message, phone_number, additional_info, api_key, language):
146
- """Analizuje wiadomość SMS za pomocą API SambaNova."""
147
  if not api_key:
148
- logging.error("Brak klucza API.")
149
- return "Brak klucza API.", "Brak klucza API.", "Brak klucza API."
150
 
151
- url = "https://api.sambanova.ai/v1/chat/completions" # Upewnij się, że to poprawny URL
152
  headers = {
153
  "Authorization": f"Bearer {api_key}",
154
  "Content-Type": "application/json"
@@ -283,7 +283,7 @@ You are an advanced AI assistant specializing in identifying fake SMS messages.
283
  """
284
  }
285
 
286
- system_prompt = system_prompts.get(language, system_prompts['English']) # Domyślnie angielski, jeśli język nie jest obsługiwany
287
 
288
  user_prompt = f"""Analyze the following message for potential fraud:
289
 
@@ -296,7 +296,7 @@ Additional Information:
296
  Provide your analysis and conclusions following the guidelines above."""
297
 
298
  payload = {
299
- "model": "Meta-Llama-3.1-8B-Instruct", # Upewnij się, że to poprawny model API
300
  "messages": [
301
  {"role": "system", "content": system_prompt},
302
  {"role": "user", "content": user_prompt}
@@ -308,9 +308,7 @@ Provide your analysis and conclusions following the guidelines above."""
308
  }
309
 
310
  try:
311
- logging.info(f"Wysyłanie żądania do API z payload: {payload}")
312
  response = requests.post(url, headers=headers, json=payload)
313
- logging.info(f"Odpowiedź API: {response.status_code} - {response.text}")
314
  if response.status_code == 200:
315
  data = response.json()
316
  ai_response = data['choices'][0]['message']['content']
 
6
  from datetime import datetime
7
  import logging
8
  import json
9
+ from PIL import Image # Make sure you have installed this library: pip install pillow
10
+ import pytesseract # Make sure you have installed this library: pip install pytesseract
11
 
12
+ # Configure logging
13
  logging.basicConfig(
14
  filename='app.log',
15
  level=logging.INFO,
16
  format='%(asctime)s %(levelname)s:%(message)s'
17
  )
18
 
19
+ # Define paths to JSON files
20
  BASE_DIR = os.path.dirname(os.path.abspath(__file__))
21
  DATA_DIR = os.path.join(BASE_DIR, '..', 'data')
22
  FAKE_NUMBERS_FILE = os.path.join(DATA_DIR, 'fake_numbers.json')
23
  HISTORY_FILE = os.path.join(DATA_DIR, 'history.json')
24
  STATS_FILE = os.path.join(DATA_DIR, 'stats.json')
25
 
26
+ # Utility functions
27
 
28
  def load_json(file_path):
29
+ """Loads data from a JSON file. Returns an empty list or default object if the file does not exist."""
30
  if not os.path.exists(file_path):
31
  if file_path.endswith('stats.json'):
32
  return {"total_analyses": 0, "total_frauds_detected": 0}
 
37
  data = json.load(file)
38
  return data
39
  except json.JSONDecodeError:
40
+ logging.error(f"Cannot load data from {file_path}. The file is corrupted.")
41
  if file_path.endswith('stats.json'):
42
  return {"total_analyses": 0, "total_frauds_detected": 0}
43
  return []
44
 
45
  def save_json(file_path, data):
46
+ """Saves data to a JSON file."""
47
  with open(file_path, 'w', encoding='utf-8') as file:
48
  json.dump(data, file, ensure_ascii=False, indent=4)
49
+ logging.info(f"Data has been saved to {file_path}.")
50
 
51
  def add_fake_number(phone_number):
52
+ """Adds a phone number to the fake_numbers.json file as fake if it does not already exist."""
53
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
54
  if phone_number not in fake_numbers:
55
  fake_numbers.append(phone_number)
56
  save_json(FAKE_NUMBERS_FILE, fake_numbers)
57
+ logging.info(f"Number {phone_number} has been successfully added to fake_numbers.json.")
58
  return True
59
  else:
60
+ logging.info(f"Number {phone_number} already exists in fake_numbers.json.")
61
  return False
62
 
63
  def is_fake_number(phone_number):
64
+ """Checks if a given phone number is marked as fake in the fake_numbers.json file."""
65
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
66
  exists = phone_number in fake_numbers
67
+ logging.info(f"Checking number {phone_number}: {'found' if exists else 'not found'}.")
68
  return exists
69
 
70
  def get_fake_numbers():
71
+ """Retrieves the list of fake numbers from the fake_numbers.json file."""
72
  fake_numbers = load_json(FAKE_NUMBERS_FILE)
73
  return fake_numbers
74
 
75
  def add_to_history(message, phone_number, analysis, risk, recommendations):
76
+ """Adds an entry to the analysis history in history.json."""
77
  history = load_json(HISTORY_FILE)
78
  history.append({
79
  "timestamp": datetime.now().isoformat(),
 
84
  "recommendations": recommendations
85
  })
86
  save_json(HISTORY_FILE, history)
87
+ logging.info(f"Added entry to history.json for number {phone_number}.")
88
 
89
  def update_stats(fraud_detected=False):
90
+ """Updates analysis statistics in stats.json."""
91
  stats = load_json(STATS_FILE)
92
  stats["total_analyses"] += 1
93
  if fraud_detected:
94
  stats["total_frauds_detected"] += 1
95
  save_json(STATS_FILE, stats)
96
+ logging.info(f"Statistics have been updated: Analyses {stats['total_analyses']}, Frauds {stats['total_frauds_detected']}.")
97
 
98
  def get_stats():
99
+ """Retrieves analysis statistics from stats.json."""
100
  stats = load_json(STATS_FILE)
101
+ logging.info("Statistics have been successfully retrieved.")
102
  return stats
103
 
104
  def get_statistics():
105
+ """Returns detailed analysis statistics from stats.json."""
106
  stats = load_json(STATS_FILE)
107
  return stats
108
 
109
  def get_phone_info(phone_number):
110
+ """Verifies the phone number and returns information about the country and operator."""
111
  try:
112
  parsed_number = phonenumbers.parse(phone_number, None)
113
+ country = geocoder.description_for_number(parsed_number, 'pl') # Change to 'pl' for Polish
114
+ operator = carrier.name_for_number(parsed_number, 'pl') # Change to 'pl' for Polish
115
  if not country:
116
+ country = "Unknown"
117
  if not operator:
118
+ operator = "Unknown"
119
+ logging.info(f"Number {phone_number} - Country: {country}, Operator: {operator}.")
120
  return country, operator
121
  except phonenumbers.NumberParseException as e:
122
+ logging.error(f"Could not process phone number {phone_number}: {e}")
123
+ return "Unknown", "Unknown"
124
 
125
  def simple_checks(message, language):
126
+ """Performs simple heuristic checks on SMS messages."""
127
  warnings = []
128
  scam_keywords = {
129
  'Polish': ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata', 'bank', 'karta', 'konto', 'logowanie', 'transakcja', 'weryfikacja', 'dane osobowe', 'szybka płatność', 'blokada konta', 'powiadomienie'],
 
135
  message_lower = message.lower()
136
 
137
  if any(keyword.lower() in message_lower for keyword in selected_keywords):
138
+ warnings.append("The message contains keywords associated with potential fraud.")
139
  if re.search(r'http[s]?://', message):
140
+ warnings.append("The message contains a link.")
141
  if re.search(r'\b(podaj|prześlij|udostępnij|sende|übermittle|teile|send|provide|share)\b.*\b(hasło|kod|dane osobowe|numer konta|Passwort|Code|persönliche Daten|Kontonummer|password|code|personal information|account number)\b', message_lower):
142
+ warnings.append("The message contains a request for confidential information.")
143
  return warnings
144
 
145
  def analyze_message(message, phone_number, additional_info, api_key, language):
146
+ """Analyzes the SMS message using the SambaNova API."""
147
  if not api_key:
148
+ logging.error("Missing API key.")
149
+ return "Missing API key.", "Missing API key.", "Missing API key."
150
 
151
+ url = "https://api.sambanova.ai/v1/chat/completions" # Make sure this is the correct API URL
152
  headers = {
153
  "Authorization": f"Bearer {api_key}",
154
  "Content-Type": "application/json"
 
283
  """
284
  }
285
 
286
+ system_prompt = system_prompts.get(language, system_prompts['English']) # Default to English if language is unsupported
287
 
288
  user_prompt = f"""Analyze the following message for potential fraud:
289
 
 
296
  Provide your analysis and conclusions following the guidelines above."""
297
 
298
  payload = {
299
+ "model": "Meta-Llama-3.1-8B-Instruct", # Ensure this is the correct API model
300
  "messages": [
301
  {"role": "system", "content": system_prompt},
302
  {"role": "user", "content": user_prompt}
 
308
  }
309
 
310
  try:
 
311
  response = requests.post(url, headers=headers, json=payload)
 
312
  if response.status_code == 200:
313
  data = response.json()
314
  ai_response = data['choices'][0]['message']['content']