Initial commit

This commit is contained in:
zv
2025-10-17 12:25:12 +02:00
commit 7f6e59eb0e
17 changed files with 185219 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

235
api/endpoints.py Normal file
View File

@@ -0,0 +1,235 @@
# api/endpoints.py
import re
import os
import requests
import base64
from .utils import load_keywords, load_numbers, load_domains, load_shorteners, resolve_redirect
from .validators import Validators
from transformers import pipeline
import logging
logger = logging.getLogger(__name__)
# Import scanning function from a separate virustotal.py
from .virustotal import scan_url_with_virustotal
# VirusTotal API key is not needed here since the function in virustotal.py uses it
# Initialize AI model
try:
# suppress_warnings=True silences warnings during model loading
spam_classifier = pipeline("text-classification", model="mrm8488/bert-tiny-finetuned-sms-spam-detection")
logger.info("AI spam detection model loaded successfully.")
except Exception as e:
logger.warning(f"Failed to load AI model: {e}")
spam_classifier = None
# check_message - LOGIC FUNCTION
def check_message(text: str) -> dict:
"""Checks a text message for spam and suspicious words."""
result = {
"suspicious_words": [],
"ai_result": None,
"is_suspicious": False
}
keywords = load_keywords() # Assume load_keywords() works correctly
keywords = [kw.strip() for kw in keywords if kw.strip()] # Filter empty keywords
# Check for suspicious keywords (case-insensitive)
found_keywords = [kw for kw in keywords if kw.lower() in text.lower()]
result["suspicious_words"] = found_keywords
if spam_classifier:
try:
# Trim message to first 512 tokens for BERT
max_len = 512
if len(text.split()) > max_len:
text_for_ai = " ".join(text.split()[:max_len])
logger.warning(f"Message trimmed to {max_len} words for AI analysis.")
else:
text_for_ai = text
prediction = spam_classifier(text_for_ai)[0]
label = prediction["label"]
confidence = round(prediction["score"], 4)
result["ai_result"] = {
"label": "SPAM" if label == "LABEL_1" else "HAM", # LABEL_1 assumed to be SPAM
"confidence": confidence
}
# Suspicion if AI >= 0.6 OR keywords found
result["is_suspicious"] = (label == "LABEL_1" and confidence >= 0.6) or bool(result["suspicious_words"])
except Exception as ai_e:
logger.error(f"Error during AI message classification: {ai_e}")
result["ai_result"] = {"label": "ERROR", "confidence": 0}
# If AI fails, use only keyword check
result["is_suspicious"] = bool(result["suspicious_words"])
else:
result["is_suspicious"] = bool(result["suspicious_words"])
logger.info(f"check_message result: {result}")
return result
# check_phone - LOGIC FUNCTION
def check_phone(number: str) -> dict:
"""Checks a phone number for valid format and known scams."""
known_scams = load_numbers()
known_scams = {num.strip() for num in known_scams if num.strip()}
is_valid = Validators.is_valid_phone(number)
number_for_check = number.lstrip('+') # Remove '+' for database check
is_suspicious = number_for_check in known_scams
result = {
"is_valid": is_valid,
"is_suspicious": is_suspicious
}
logger.info(f"check_phone result for {number}: {result}")
return result
# check_link - LOGIC FUNCTION
def check_link(link: str) -> dict:
"""Checks a URL for suspicious patterns, known scam domains, shorteners, and scans with VirusTotal."""
logger.info(f"Checking link: {link}")
try:
link_resolved = resolve_redirect(link)
logger.info(f"Resolved link: {link_resolved}")
except Exception as resolve_e:
logger.error(f"Error resolving link {link}: {resolve_e}")
return {
"is_valid": False,
"is_suspicious": True,
"details": [{
"text": f"Error resolving redirect: {resolve_e}",
"data-pl": f"Błąd rozwiązywania przekierowania: {resolve_e}",
"data-en": f"Error resolving redirect: {resolve_e}"
}],
"source": "local"
}
link_clean = link_resolved.lower()
suspicious_reasons = []
is_valid = Validators.is_valid_url(link_resolved)
if not is_valid:
logger.warning(f"Resolved link is not a valid URL: {link_resolved}")
return {
"is_valid": False,
"is_suspicious": False,
"details": [{
"text": "Invalid resolved URL format",
"data-pl": "Niepoprawny format rozwiązanego URL",
"data-en": "Invalid resolved URL format"
}],
"source": "local"
}
# Local analysis - suspicious patterns in URL
suspicious_patterns = ["free", "gift", "login", "verify", "paypal", "paypa1", "bank", ".ru", ".cn"]
for pattern in suspicious_patterns:
if pattern in link_clean:
suspicious_reasons.append({
"text": f"Suspicious pattern found in URL: {pattern}",
"data-pl": f"Podejrzany wzorzec '{pattern}' znaleziony w adresie URL.",
"data-en": f"Suspicious pattern '{pattern}' found in the URL."
})
logger.info(f"Suspicious pattern '{pattern}' found in link: {link_resolved}")
# Local analysis - scam domains from database
scam_domains = load_domains()
scam_domains = [domain.strip() for domain in scam_domains if domain.strip()]
for domain in scam_domains:
if domain and domain in link_clean:
suspicious_reasons.append({
"text": f"Domain '{domain}' marked as suspicious in our database.",
"data-pl": f"Domena '{domain}' oznaczona jako podejrzana w naszej bazie danych.",
"data-en": f"Domain '{domain}' marked as suspicious in our database."
})
logger.info(f"Suspicious domain '{domain}' found in link: {link_resolved}")
# Local analysis - URL shorteners (check original link)
shorteners = load_shorteners()
shorteners = [short.strip() for short in shorteners if short.strip()]
is_shortened = False
for short in shorteners:
if short and short in link.lower():
is_shortened = True
suspicious_reasons.append({
"text": f"Link shortener detected in original link: {short}",
"data-pl": f"Wykryto usługę skracania URL w oryginalnym linku: {short}",
"data-en": f"Link shortener detected in original link: {short}"
})
logger.info(f"URL shortener '{short}' detected in original link: {link}")
local_is_suspicious = bool(suspicious_reasons)
# VirusTotal scan: only if local suspicious findings or short link
vt_result = {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Not scanned locally"}
if local_is_suspicious or is_shortened:
logger.info(f"Local suspicious or short link -> checking VirusTotal for: {link_resolved}")
vt_result = scan_url_with_virustotal(link_resolved)
logger.info(f"VirusTotal result for {link_resolved}: {vt_result}")
vt_detected = vt_result.get("detected", False)
vt_positives = vt_result.get("positives", 0)
vt_total = vt_result.get("total", 0)
vt_scan_date = vt_result.get("scan_date", "N/A")
vt_error = vt_result.get("error")
combined_details = []
if local_is_suspicious:
combined_details.extend(suspicious_reasons)
if vt_result.get('error') != "Not scanned locally":
if vt_error and vt_error != "Not scanned locally":
vt_summary_text = f"VirusTotal: Error - {vt_error}"
vt_summary_pl = f"VirusTotal: Błąd - {vt_error}"
vt_summary_en = f"VirusTotal: Error - {vt_error}"
else:
vt_summary_text = f"VirusTotal: {vt_positives} / {vt_total} engines flagged the link."
vt_summary_pl = f"VirusTotal: {vt_positives} / {vt_total} silników oznaczyło link."
vt_summary_en = f"VirusTotal: {vt_positives} / {vt_total} engines flagged the link."
if vt_scan_date and vt_scan_date != "N/A":
vt_summary_text += f" Scan date: {vt_scan_date}."
vt_summary_pl += f" Scan date: {vt_scan_date}."
vt_summary_en += f" Scan date: {vt_scan_date}."
combined_details.append({
"text": vt_summary_text,
"data-pl": vt_summary_pl,
"data-en": vt_summary_en
})
else:
logger.info(f"VirusTotal scan skipped for link: {link_resolved} (local analysis clean and not shortened).")
final_is_suspicious = local_is_suspicious or vt_detected
if not combined_details:
combined_details.append({
"text": "No suspicious activity detected based on available checks.",
"data-pl": "Nie wykryto podejrzanej aktywności na podstawie dostępnych sprawdzeń.",
"data-en": "No suspicious activity detected based on available checks."
})
source = "none"
elif local_is_suspicious and vt_detected:
source = "combined"
elif vt_detected:
source = "virustotal"
elif local_is_suspicious:
source = "local"
else:
source = "unknown"
result = {
"is_valid": is_valid,
"is_suspicious": final_is_suspicious,
"details": combined_details,
"source": source
}
logger.info(f"Final result for check_link ({link}): {result}")
return result

53
api/utils.py Normal file
View File

@@ -0,0 +1,53 @@
# api/utils.py
import requests
def load_keywords():
"""Load scam-related keywords from file."""
try:
with open("data/scam_keywords.txt", "r", encoding="utf-8") as f:
# Ensure keywords are converted to lowercase
return [line.strip().lower() for line in f]
except FileNotFoundError:
print("File 'scam_keywords.txt' not found.")
return []
def load_numbers():
"""Load scam phone numbers from file."""
try:
with open("data/scam_numbers.txt", "r", encoding="utf-8") as f:
return [line.strip() for line in f]
except FileNotFoundError:
print("File 'scam_numbers.txt' not found.")
return []
def load_domains():
"""Load known scam domains from file."""
try:
with open("data/scam_domains.txt", "r", encoding="utf-8") as f:
return [line.strip().lower() for line in f]
except FileNotFoundError:
print("File 'scam_domains.txt' not found.")
return []
def load_shorteners():
"""Load known URL shorteners from file."""
try:
with open("data/url_shorteners.txt", "r", encoding="utf-8") as f:
return [line.strip().lower() for line in f]
except FileNotFoundError:
print("File 'url_shorteners.txt' not found.")
return []
def resolve_redirect(url: str) -> str:
"""
Resolves redirects for shortened URLs like bit.ly, tinyurl, etc.
Returns the final URL or the original if redirect fails.
"""
try:
response = requests.head(url, allow_redirects=True, timeout=5)
return response.url
except Exception as e:
print(f"[WARN] Failed to resolve redirect: {e}")
return url

102
api/validators.py Normal file
View File

@@ -0,0 +1,102 @@
import re
from typing import Optional
class Validators:
@staticmethod
def is_valid_phone(number: str) -> bool:
"""
Validate phone number format (E.164 with optional +)
Allowed formats:
+48123456789
48123456789
123456789
"""
return re.match(r"^\+?\d[\d\s-]{8,14}\d$", number) is not None
@staticmethod
def is_valid_email(email: str) -> bool:
"""
Validate email format
"""
return re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email) is not None
@staticmethod
def is_valid_password(password: str, min_length: int = 8) -> bool:
"""
Validate password:
- Minimum length
- At least one digit
- At least one uppercase
- At least one lowercase
"""
if len(password) < min_length:
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
return True
@staticmethod
def is_valid_username(username: str, min_length: int = 3, max_length: int = 20) -> bool:
"""
Validate username:
- Only alphanumeric and underscores
- Length between min and max
"""
return (re.match(r"^[a-zA-Z0-9_]+$", username) is not None and
min_length <= len(username) <= max_length)
@staticmethod
def is_valid_postal_code(code: str, country: str = 'PL') -> bool:
"""
Validate postal code format for different countries
Default: Polish format (00-000)
"""
if country == 'PL':
return re.match(r"^\d{2}-\d{3}$", code) is not None
# Add other country formats as needed
return False
@staticmethod
def is_valid_url(url: str) -> bool:
"""
Validate URL format
"""
return re.match(
r"^(https?://)?(www\.)?[a-z0-9-]+(\.[a-z]{2,}){1,}(/.*)?$",
url,
re.IGNORECASE
) is not None
@staticmethod
def is_length_valid(text: str, min_len: int = 0, max_len: Optional[int] = None) -> bool:
"""
Validate text length
"""
if max_len is None:
return len(text) >= min_len
return min_len <= len(text) <= max_len
@staticmethod
def is_numeric(text: str) -> bool:
"""
Check if text contains only digits
"""
return text.isdigit()
@staticmethod
def is_alpha(text: str) -> bool:
"""
Check if text contains only letters
"""
return text.isalpha()
@staticmethod
def is_alphanumeric(text: str) -> bool:
"""
Check if text contains only letters and digits
"""
return text.isalnum()

115
api/virustotal.py Normal file
View File

@@ -0,0 +1,115 @@
# api/virustotal.py
import requests
import os
import base64
import logging
from datetime import datetime # Only import datetime
logger = logging.getLogger(__name__)
# Retrieve API key from ENV or use default (should be changed!)
VIRUSTOTAL_API_KEY = os.getenv("VT_API_KEY")
# Warning if the key is not set
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
logger.warning("VIRUSTOTAL_API_KEY is not set in environment variables or default value was not changed.")
# Optionally, you might want to raise an error or disable VT checks entirely if the key is missing.
# Removed imports of pytz and tzlocal and SERVER_LOCAL_TZ detection logic
def scan_url_with_virustotal(url: str) -> dict:
"""
Scans a URL using VirusTotal API v3.
Retrieves an existing scan report for the given URL.
(Does not initiate a new scan if the report does not exist)
Args:
url (str): URL to scan.
Returns:
dict: Dictionary containing VirusTotal scan results.
{
"detected": bool, # True if malicious or suspicious > 0
"positives": int, # Sum of malicious and suspicious counts
"total": int, # Sum of counts for harmless, malicious, suspicious, undetected, timeout, failure
"scan_date": str, # Formatted scan date string in UTC or "N/A"
"error": str # Error message if request fails
}
"""
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "API key missing or default"}
api_url_base = "https://www.virustotal.com/api/v3/urls/"
headers = {"x-apikey": VIRUSTOTAL_API_KEY}
try:
# VirusTotal API v3 requires base64url encoded URL without padding
# https://developers.virustotal.com/v3.0/reference/#urls-id
encoded_url = base64.urlsafe_b64encode(url.encode()).decode().strip("=")
# URL to fetch analysis
analysis_url = f"{api_url_base}{encoded_url}"
logger.info(f"Querying VirusTotal for URL: {url}")
response = requests.get(analysis_url, headers=headers)
# Check response status
if response.status_code == 404:
logger.info(f"VirusTotal: URL not found in database: {url}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "message": "URL not found in VT database"}
elif response.status_code == 401:
logger.error("VirusTotal API error: Invalid API key")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Invalid API key"}
elif response.status_code == 429:
logger.warning("VirusTotal API error: Rate limit exceeded")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Rate limit exceeded"}
elif response.status_code >= 400: # Handle other 4xx/5xx HTTP errors
error_message = f"HTTP error {response.status_code}"
try:
error_data = response.json()
error_message += f": {error_data.get('error', {}).get('message', 'Unknown VT error')}"
except:
pass # Ignore if JSON parsing fails
logger.error(f"VirusTotal HTTP error for {url}: {error_message}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": error_message}
# If status 200 OK, process data
data = response.json()
attributes = data.get("data", {}).get("attributes", {})
stats = attributes.get("last_analysis_stats", {})
malicious = stats.get("malicious", 0)
suspicious = stats.get("suspicious", 0)
# Calculate total engines based on the stats provided
total_engines = stats.get("harmless", 0) + malicious + suspicious + stats.get("undetected", 0) + stats.get("timeout", 0) + stats.get("failure", 0)
scan_date_ts = attributes.get("last_analysis_date") # Timestamp (seconds since epoch UTC)
scan_date_str = "N/A"
if scan_date_ts:
try:
# Convert timestamp to naive UTC datetime object
utc_dt = datetime.utcfromtimestamp(scan_date_ts)
# Format date and time, append " UTC"
scan_date_str = utc_dt.strftime('%Y-%m-%d %H:%M:%S') + ' UTC'
except Exception as date_e:
logger.error(f"Error formatting VirusTotal scan date timestamp {scan_date_ts}: {date_e}")
scan_date_str = "Invalid Date Format"
return {
"detected": (malicious + suspicious) > 0,
"positives": malicious + suspicious,
"total": total_engines,
"scan_date": scan_date_str, # Return formatted date string in UTC
"error": None # No error
}
except requests.exceptions.RequestException as req_err:
logger.error(f"VirusTotal request failed for {url}: {req_err}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Request failed: {req_err}"}
except Exception as e:
logger.error(f"An unexpected error occurred during VirusTotal check for {url}: {e}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Unexpected error: {e}"}