Files
antiscam-pro/api/virustotal.py
2025-10-17 12:25:12 +02:00

116 lines
5.3 KiB
Python

# api/virustotal.py
import requests
import os
import base64
import logging
from datetime import datetime # Only import datetime
logger = logging.getLogger(__name__)
# Retrieve API key from ENV or use default (should be changed!)
VIRUSTOTAL_API_KEY = os.getenv("VT_API_KEY")
# Warning if the key is not set
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
logger.warning("VIRUSTOTAL_API_KEY is not set in environment variables or default value was not changed.")
# Optionally, you might want to raise an error or disable VT checks entirely if the key is missing.
# Removed imports of pytz and tzlocal and SERVER_LOCAL_TZ detection logic
def scan_url_with_virustotal(url: str) -> dict:
"""
Scans a URL using VirusTotal API v3.
Retrieves an existing scan report for the given URL.
(Does not initiate a new scan if the report does not exist)
Args:
url (str): URL to scan.
Returns:
dict: Dictionary containing VirusTotal scan results.
{
"detected": bool, # True if malicious or suspicious > 0
"positives": int, # Sum of malicious and suspicious counts
"total": int, # Sum of counts for harmless, malicious, suspicious, undetected, timeout, failure
"scan_date": str, # Formatted scan date string in UTC or "N/A"
"error": str # Error message if request fails
}
"""
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "API key missing or default"}
api_url_base = "https://www.virustotal.com/api/v3/urls/"
headers = {"x-apikey": VIRUSTOTAL_API_KEY}
try:
# VirusTotal API v3 requires base64url encoded URL without padding
# https://developers.virustotal.com/v3.0/reference/#urls-id
encoded_url = base64.urlsafe_b64encode(url.encode()).decode().strip("=")
# URL to fetch analysis
analysis_url = f"{api_url_base}{encoded_url}"
logger.info(f"Querying VirusTotal for URL: {url}")
response = requests.get(analysis_url, headers=headers)
# Check response status
if response.status_code == 404:
logger.info(f"VirusTotal: URL not found in database: {url}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "message": "URL not found in VT database"}
elif response.status_code == 401:
logger.error("VirusTotal API error: Invalid API key")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Invalid API key"}
elif response.status_code == 429:
logger.warning("VirusTotal API error: Rate limit exceeded")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Rate limit exceeded"}
elif response.status_code >= 400: # Handle other 4xx/5xx HTTP errors
error_message = f"HTTP error {response.status_code}"
try:
error_data = response.json()
error_message += f": {error_data.get('error', {}).get('message', 'Unknown VT error')}"
except:
pass # Ignore if JSON parsing fails
logger.error(f"VirusTotal HTTP error for {url}: {error_message}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": error_message}
# If status 200 OK, process data
data = response.json()
attributes = data.get("data", {}).get("attributes", {})
stats = attributes.get("last_analysis_stats", {})
malicious = stats.get("malicious", 0)
suspicious = stats.get("suspicious", 0)
# Calculate total engines based on the stats provided
total_engines = stats.get("harmless", 0) + malicious + suspicious + stats.get("undetected", 0) + stats.get("timeout", 0) + stats.get("failure", 0)
scan_date_ts = attributes.get("last_analysis_date") # Timestamp (seconds since epoch UTC)
scan_date_str = "N/A"
if scan_date_ts:
try:
# Convert timestamp to naive UTC datetime object
utc_dt = datetime.utcfromtimestamp(scan_date_ts)
# Format date and time, append " UTC"
scan_date_str = utc_dt.strftime('%Y-%m-%d %H:%M:%S') + ' UTC'
except Exception as date_e:
logger.error(f"Error formatting VirusTotal scan date timestamp {scan_date_ts}: {date_e}")
scan_date_str = "Invalid Date Format"
return {
"detected": (malicious + suspicious) > 0,
"positives": malicious + suspicious,
"total": total_engines,
"scan_date": scan_date_str, # Return formatted date string in UTC
"error": None # No error
}
except requests.exceptions.RequestException as req_err:
logger.error(f"VirusTotal request failed for {url}: {req_err}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Request failed: {req_err}"}
except Exception as e:
logger.error(f"An unexpected error occurred during VirusTotal check for {url}: {e}")
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Unexpected error: {e}"}