# utils/functions.py import phonenumbers from phonenumbers import geocoder, carrier import re import requests import os import json from datetime import datetime def get_phone_info(phone_number): """ Weryfikuje numer telefonu i zwraca kraj oraz operatora. """ try: parsed_number = phonenumbers.parse(phone_number, None) country = geocoder.description_for_number(parsed_number, 'pl') operator = carrier.name_for_number(parsed_number, 'pl') return country, operator except phonenumbers.NumberParseException: return None, None def simple_checks(message): """ Wykonuje podstawowe sprawdzenia heurystyczne wiadomości SMS. """ warnings = [] scam_keywords = ['pieniądze', 'przelew', 'hasło', 'kod', 'nagroda', 'wygrana', 'pilne', 'pomoc', 'opłata'] if any(keyword in message.lower() for keyword in scam_keywords): warnings.append("Wiadomość zawiera słowa kluczowe związane z potencjalnym oszustwem.") if re.search(r'http[s]?://', message): warnings.append("Wiadomość zawiera link.") if re.search(r'\b(podaj|prześlij|udostępnij)\b.*\b(hasło|kod|dane osobowe|numer konta)\b', message.lower()): warnings.append("Wiadomość zawiera prośbę o poufne informacje.") return warnings def analyze_message(message, phone_number, additional_info, api_key): """ Analizuje wiadomość SMS za pomocą API SambaNova. """ url = "https://api.sambanova.ai/v1/chat/completions" headers = { "Authorization": f"Bearer {api_key}" } system_prompt = """ Jesteś zaawansowanym asystentem AI specjalizującym się w identyfikacji fałszywych wiadomości SMS. Twoim zadaniem jest przeprowadzenie szczegółowej analizy wiadomości, wykorzystując głęboki proces myślenia i dostarczając kompleksową ocenę. Twoja odpowiedź powinna być podzielona na trzy sekcje: **Analiza Treści Wiadomości:** - Przeprowadź szczegółową analizę treści wiadomości, identyfikując potencjalne czerwone flagi, takie jak błędy językowe, prośby o dane osobowe, pilne prośby o kontakt itp. - Opisz kontekst językowy i kulturowy wiadomości. - Zidentyfikuj wszelkie elementy, które mogą sugerować, że wiadomość jest próbą wyłudzenia informacji lub pieniędzy. **Ocena Ryzyka Oszustwa:** - Na podstawie analizy treści i dostępnych informacji oceń prawdopodobieństwo, że wiadomość jest oszustwem. Użyj skali od 1 do 10, gdzie 1 oznacza bardzo niskie ryzyko, a 10 bardzo wysokie ryzyko. - Wyjaśnij, jakie czynniki wpływają na tę ocenę. **Zalecenia dla Użytkownika:** - Podaj jasne i konkretne zalecenia dotyczące dalszych kroków, które użytkownik powinien podjąć. - Uwzględnij sugestie dotyczące bezpieczeństwa, takie jak blokowanie nadawcy, zgłaszanie wiadomości do odpowiednich instytucji, czy też ignorowanie wiadomości. - Jeśli to możliwe, zasugeruj dodatkowe środki ostrożności, które użytkownik może podjąć, aby chronić swoje dane osobowe i finansowe. Twoja odpowiedź powinna być sformatowana dokładnie w powyższy sposób, używając znaczników , i . Upewnij się, że każda sekcja jest wypełniona kompletnie i szczegółowo. """ user_prompt = f"""Przeanalizuj następującą wiadomość pod kątem potencjalnego oszustwa: Wiadomość: "{message}" Numer telefonu nadawcy: {phone_number}" Dodatkowe informacje: {additional_info} Podaj swoją analizę i wnioski zgodnie z powyższymi wytycznymi.""" payload = { "model": "Meta-Llama-3.1-8B-Instruct", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "max_tokens": 1000, "temperature": 0.2, "top_p": 0.9, "stop": ["<|eot_id|>"] } try: response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: data = response.json() ai_response = data['choices'][0]['message']['content'] analysis = re.search(r'(.*?)', ai_response, re.DOTALL) risk_assessment = re.search(r'(.*?)', ai_response, re.DOTALL) recommendations = re.search(r'(.*?)', ai_response, re.DOTALL) analysis_text = analysis.group(1).strip() if analysis else "Brak analizy." risk_text = risk_assessment.group(1).strip() if risk_assessment else "Brak oceny ryzyka." recommendations_text = recommendations.group(1).strip() if recommendations else "Brak zaleceń." return analysis_text, risk_text, recommendations_text else: return f"Błąd API: {response.status_code} - {response.text}", "Błąd analizy.", "Błąd analizy." except Exception as e: return f"Błąd połączenia z API: {e}", "Błąd analizy.", "Błąd analizy." def init_stats_file(): """ Inicjalizuje plik statystyk, jeśli nie istnieje. """ stats_file = 'stats.json' if not os.path.exists(stats_file): with open(stats_file, 'w') as f: json.dump({"total_analyses": 0, "total_frauds_detected": 0}, f) def update_stats(fraud_detected=False): """ Aktualizuje statystyki analizy. """ stats_file = 'stats.json' try: with open(stats_file, 'r') as f: stats = json.load(f) except (json.JSONDecodeError, FileNotFoundError): stats = {"total_analyses": 0, "total_frauds_detected": 0} stats["total_analyses"] += 1 if fraud_detected: stats["total_frauds_detected"] += 1 with open(stats_file, 'w') as f: json.dump(stats, f, indent=4) def get_stats(): """ Pobiera aktualne statystyki analizy. """ stats_file = 'stats.json' try: with open(stats_file, 'r') as f: stats = json.load(f) return stats except (json.JSONDecodeError, FileNotFoundError): return {"total_analyses": 0, "total_frauds_detected": 0} ```