Files
antiscam-pro/api/endpoints.py
2025-10-17 12:25:12 +02:00

236 lines
9.4 KiB
Python

# api/endpoints.py
import re
import os
import requests
import base64
from .utils import load_keywords, load_numbers, load_domains, load_shorteners, resolve_redirect
from .validators import Validators
from transformers import pipeline
import logging
logger = logging.getLogger(__name__)
# Import scanning function from a separate virustotal.py
from .virustotal import scan_url_with_virustotal
# VirusTotal API key is not needed here since the function in virustotal.py uses it
# Initialize AI model
try:
# suppress_warnings=True silences warnings during model loading
spam_classifier = pipeline("text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection")
logger.info("AI spam detection model loaded successfully.")
except Exception as e:
logger.warning(f"Failed to load AI model: {e}")
spam_classifier = None
# check_message - LOGIC FUNCTION
def check_message(text: str) -> dict:
"""Checks a text message for spam and suspicious words."""
result = {
"suspicious_words": [],
"ai_result": None,
"is_suspicious": False
}
keywords = load_keywords() # Assume load_keywords() works correctly
keywords = [kw.strip() for kw in keywords if kw.strip()] # Filter empty keywords
# Check for suspicious keywords (case-insensitive)
found_keywords = [kw for kw in keywords if kw.lower() in text.lower()]
result["suspicious_words"] = found_keywords
if spam_classifier:
try:
# Trim message to first 512 tokens for BERT
max_len = 512
if len(text.split()) > max_len:
text_for_ai = " ".join(text.split()[:max_len])
logger.warning(f"Message trimmed to {max_len} words for AI analysis.")
else:
text_for_ai = text
prediction = spam_classifier(text_for_ai)[0]
label = prediction["label"]
confidence = round(prediction["score"], 4)
result["ai_result"] = {
"label": "SPAM" if label == "LABEL_1" else "HAM", # LABEL_1 assumed to be SPAM
"confidence": confidence
}
# Suspicion if AI >= 0.6 OR keywords found
result["is_suspicious"] = (label == "LABEL_1" and confidence >= 0.6) or bool(result["suspicious_words"])
except Exception as ai_e:
logger.error(f"Error during AI message classification: {ai_e}")
result["ai_result"] = {"label": "ERROR", "confidence": 0}
# If AI fails, use only keyword check
result["is_suspicious"] = bool(result["suspicious_words"])
else:
result["is_suspicious"] = bool(result["suspicious_words"])
logger.info(f"check_message result: {result}")
return result
# check_phone - LOGIC FUNCTION
def check_phone(number: str) -> dict:
"""Checks a phone number for valid format and known scams."""
known_scams = load_numbers()
known_scams = {num.strip() for num in known_scams if num.strip()}
is_valid = Validators.is_valid_phone(number)
number_for_check = number.lstrip('+') # Remove '+' for database check
is_suspicious = number_for_check in known_scams
result = {
"is_valid": is_valid,
"is_suspicious": is_suspicious
}
logger.info(f"check_phone result for {number}: {result}")
return result
# check_link - LOGIC FUNCTION
def check_link(link: str) -> dict:
"""Checks a URL for suspicious patterns, known scam domains, shorteners, and scans with VirusTotal."""
logger.info(f"Checking link: {link}")
try:
link_resolved = resolve_redirect(link)
logger.info(f"Resolved link: {link_resolved}")
except Exception as resolve_e:
logger.error(f"Error resolving link {link}: {resolve_e}")
return {
"is_valid": False,
"is_suspicious": True,
"details": [{
"text": f"Error resolving redirect: {resolve_e}",
"data-pl": f"Błąd rozwiązywania przekierowania: {resolve_e}",
"data-en": f"Error resolving redirect: {resolve_e}"
}],
"source": "local"
}
link_clean = link_resolved.lower()
suspicious_reasons = []
is_valid = Validators.is_valid_url(link_resolved)
if not is_valid:
logger.warning(f"Resolved link is not a valid URL: {link_resolved}")
return {
"is_valid": False,
"is_suspicious": False,
"details": [{
"text": "Invalid resolved URL format",
"data-pl": "Niepoprawny format rozwiązanego URL",
"data-en": "Invalid resolved URL format"
}],
"source": "local"
}
# Local analysis - suspicious patterns in URL
suspicious_patterns = ["free", "gift", "login", "verify", "paypal", "paypa1", "bank", ".ru", ".cn"]
for pattern in suspicious_patterns:
if pattern in link_clean:
suspicious_reasons.append({
"text": f"Suspicious pattern found in URL: {pattern}",
"data-pl": f"Podejrzany wzorzec '{pattern}' znaleziony w adresie URL.",
"data-en": f"Suspicious pattern '{pattern}' found in the URL."
})
logger.info(f"Suspicious pattern '{pattern}' found in link: {link_resolved}")
# Local analysis - scam domains from database
scam_domains = load_domains()
scam_domains = [domain.strip() for domain in scam_domains if domain.strip()]
for domain in scam_domains:
if domain and domain in link_clean:
suspicious_reasons.append({
"text": f"Domain '{domain}' marked as suspicious in our database.",
"data-pl": f"Domena '{domain}' oznaczona jako podejrzana w naszej bazie danych.",
"data-en": f"Domain '{domain}' marked as suspicious in our database."
})
logger.info(f"Suspicious domain '{domain}' found in link: {link_resolved}")
# Local analysis - URL shorteners (check original link)
shorteners = load_shorteners()
shorteners = [short.strip() for short in shorteners if short.strip()]
is_shortened = False
for short in shorteners:
if short and short in link.lower():
is_shortened = True
suspicious_reasons.append({
"text": f"Link shortener detected in original link: {short}",
"data-pl": f"Wykryto usługę skracania URL w oryginalnym linku: {short}",
"data-en": f"Link shortener detected in original link: {short}"
})
logger.info(f"URL shortener '{short}' detected in original link: {link}")
local_is_suspicious = bool(suspicious_reasons)
# VirusTotal scan: only if local suspicious findings or short link
vt_result = {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Not scanned locally"}
if local_is_suspicious or is_shortened:
logger.info(f"Local suspicious or short link -> checking VirusTotal for: {link_resolved}")
vt_result = scan_url_with_virustotal(link_resolved)
logger.info(f"VirusTotal result for {link_resolved}: {vt_result}")
vt_detected = vt_result.get("detected", False)
vt_positives = vt_result.get("positives", 0)
vt_total = vt_result.get("total", 0)
vt_scan_date = vt_result.get("scan_date", "N/A")
vt_error = vt_result.get("error")
combined_details = []
if local_is_suspicious:
combined_details.extend(suspicious_reasons)
if vt_result.get('error') != "Not scanned locally":
if vt_error and vt_error != "Not scanned locally":
vt_summary_text = f"VirusTotal: Error - {vt_error}"
vt_summary_pl = f"VirusTotal: Błąd - {vt_error}"
vt_summary_en = f"VirusTotal: Error - {vt_error}"
else:
vt_summary_text = f"VirusTotal: {vt_positives} / {vt_total} engines flagged the link."
vt_summary_pl = f"VirusTotal: {vt_positives} / {vt_total} silników oznaczyło link."
vt_summary_en = f"VirusTotal: {vt_positives} / {vt_total} engines flagged the link."
if vt_scan_date and vt_scan_date != "N/A":
vt_summary_text += f" Scan date: {vt_scan_date}."
vt_summary_pl += f" Scan date: {vt_scan_date}."
vt_summary_en += f" Scan date: {vt_scan_date}."
combined_details.append({
"text": vt_summary_text,
"data-pl": vt_summary_pl,
"data-en": vt_summary_en
})
else:
logger.info(f"VirusTotal scan skipped for link: {link_resolved} (local analysis clean and not shortened).")
final_is_suspicious = local_is_suspicious or vt_detected
if not combined_details:
combined_details.append({
"text": "No suspicious activity detected based on available checks.",
"data-pl": "Nie wykryto podejrzanej aktywności na podstawie dostępnych sprawdzeń.",
"data-en": "No suspicious activity detected based on available checks."
})
source = "none"
elif local_is_suspicious and vt_detected:
source = "combined"
elif vt_detected:
source = "virustotal"
elif local_is_suspicious:
source = "local"
else:
source = "unknown"
result = {
"is_valid": is_valid,
"is_suspicious": final_is_suspicious,
"details": combined_details,
"source": source
}
logger.info(f"Final result for check_link ({link}): {result}")
return result