116 lines
5.3 KiB
Python
116 lines
5.3 KiB
Python
# api/virustotal.py
|
|
import requests
|
|
import os
|
|
import base64
|
|
import logging
|
|
from datetime import datetime # Only import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Retrieve API key from ENV or use default (should be changed!)
|
|
VIRUSTOTAL_API_KEY = os.getenv("VT_API_KEY")
|
|
|
|
# Warning if the key is not set
|
|
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
|
|
logger.warning("VIRUSTOTAL_API_KEY is not set in environment variables or default value was not changed.")
|
|
# Optionally, you might want to raise an error or disable VT checks entirely if the key is missing.
|
|
|
|
# Removed imports of pytz and tzlocal and SERVER_LOCAL_TZ detection logic
|
|
|
|
def scan_url_with_virustotal(url: str) -> dict:
|
|
"""
|
|
Scans a URL using VirusTotal API v3.
|
|
Retrieves an existing scan report for the given URL.
|
|
(Does not initiate a new scan if the report does not exist)
|
|
|
|
Args:
|
|
url (str): URL to scan.
|
|
|
|
Returns:
|
|
dict: Dictionary containing VirusTotal scan results.
|
|
{
|
|
"detected": bool, # True if malicious or suspicious > 0
|
|
"positives": int, # Sum of malicious and suspicious counts
|
|
"total": int, # Sum of counts for harmless, malicious, suspicious, undetected, timeout, failure
|
|
"scan_date": str, # Formatted scan date string in UTC or "N/A"
|
|
"error": str # Error message if request fails
|
|
}
|
|
"""
|
|
if not VIRUSTOTAL_API_KEY or VIRUSTOTAL_API_KEY == "YOUR_API_KEY":
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "API key missing or default"}
|
|
|
|
api_url_base = "https://www.virustotal.com/api/v3/urls/"
|
|
headers = {"x-apikey": VIRUSTOTAL_API_KEY}
|
|
|
|
try:
|
|
# VirusTotal API v3 requires base64url encoded URL without padding
|
|
# https://developers.virustotal.com/v3.0/reference/#urls-id
|
|
encoded_url = base64.urlsafe_b64encode(url.encode()).decode().strip("=")
|
|
|
|
# URL to fetch analysis
|
|
analysis_url = f"{api_url_base}{encoded_url}"
|
|
|
|
logger.info(f"Querying VirusTotal for URL: {url}")
|
|
response = requests.get(analysis_url, headers=headers)
|
|
|
|
# Check response status
|
|
if response.status_code == 404:
|
|
logger.info(f"VirusTotal: URL not found in database: {url}")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "message": "URL not found in VT database"}
|
|
elif response.status_code == 401:
|
|
logger.error("VirusTotal API error: Invalid API key")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Invalid API key"}
|
|
elif response.status_code == 429:
|
|
logger.warning("VirusTotal API error: Rate limit exceeded")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": "Rate limit exceeded"}
|
|
elif response.status_code >= 400: # Handle other 4xx/5xx HTTP errors
|
|
error_message = f"HTTP error {response.status_code}"
|
|
try:
|
|
error_data = response.json()
|
|
error_message += f": {error_data.get('error', {}).get('message', 'Unknown VT error')}"
|
|
except:
|
|
pass # Ignore if JSON parsing fails
|
|
logger.error(f"VirusTotal HTTP error for {url}: {error_message}")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": error_message}
|
|
|
|
# If status 200 OK, process data
|
|
data = response.json()
|
|
attributes = data.get("data", {}).get("attributes", {})
|
|
stats = attributes.get("last_analysis_stats", {})
|
|
|
|
malicious = stats.get("malicious", 0)
|
|
suspicious = stats.get("suspicious", 0)
|
|
|
|
# Calculate total engines based on the stats provided
|
|
total_engines = stats.get("harmless", 0) + malicious + suspicious + stats.get("undetected", 0) + stats.get("timeout", 0) + stats.get("failure", 0)
|
|
|
|
scan_date_ts = attributes.get("last_analysis_date") # Timestamp (seconds since epoch UTC)
|
|
scan_date_str = "N/A"
|
|
if scan_date_ts:
|
|
try:
|
|
# Convert timestamp to naive UTC datetime object
|
|
utc_dt = datetime.utcfromtimestamp(scan_date_ts)
|
|
# Format date and time, append " UTC"
|
|
scan_date_str = utc_dt.strftime('%Y-%m-%d %H:%M:%S') + ' UTC'
|
|
|
|
except Exception as date_e:
|
|
logger.error(f"Error formatting VirusTotal scan date timestamp {scan_date_ts}: {date_e}")
|
|
scan_date_str = "Invalid Date Format"
|
|
|
|
return {
|
|
"detected": (malicious + suspicious) > 0,
|
|
"positives": malicious + suspicious,
|
|
"total": total_engines,
|
|
"scan_date": scan_date_str, # Return formatted date string in UTC
|
|
"error": None # No error
|
|
}
|
|
|
|
except requests.exceptions.RequestException as req_err:
|
|
logger.error(f"VirusTotal request failed for {url}: {req_err}")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Request failed: {req_err}"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred during VirusTotal check for {url}: {e}")
|
|
return {"detected": False, "positives": 0, "total": 0, "scan_date": "N/A", "error": f"Unexpected error: {e}"}
|
|
|