Source code for email_deliverability.resource_manager

"""Resource management for email deliverability data."""
import os
import json
import requests
import time
import logging
from datetime import datetime
import threading

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("email_deliverability.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('email_deliverability.resources')


[docs] class ResourceManager: """Manage external resources like blacklists and IP reputation data.""" _instance = None # Singleton instance _initialized = False def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(ResourceManager, cls).__new__(cls) return cls._instance
[docs] def __init__(self, cache_dir=None): """ Initialize the resource manager. Args: cache_dir (str): Directory to store cached resources """ # Only initialize once (singleton pattern) if ResourceManager._initialized: return self.cache_dir = cache_dir or os.path.join(os.path.expanduser("~"), ".email_deliverability") os.makedirs(self.cache_dir, exist_ok=True) # Track the last download time for each resource self.last_download_time = {} self._last_download_file = os.path.join(self.cache_dir, "last_download.json") self._load_download_times() # Prevent multiple threads from updating resources simultaneously self._lock = threading.RLock() # Flag to track whether the update scheduler is running self._scheduler_running = False self._scheduler_thread = None # List of registered resources self.resources = { "disposable_domains": { "url": "https://raw.githubusercontent.com/disposable-email-domains/disposable-email-domains/master/disposable_email_blocklist.conf", "processor": lambda data: data.strip().split('\n') }, "dnsbl_list": { # Instead of relying on external sources that might disappear, use a comprehensive built-in list "fallback": [ # Spamhaus blocklists "zen.spamhaus.org", "sbl.spamhaus.org", "xbl.spamhaus.org", "pbl.spamhaus.org", "sbl-xbl.spamhaus.org", "dbl.spamhaus.org", # SpamCop "bl.spamcop.net", # Barracuda "b.barracudacentral.org", # SORBS "dnsbl.sorbs.net", "spam.dnsbl.sorbs.net", "web.dnsbl.sorbs.net", "zombie.dnsbl.sorbs.net", "dul.dnsbl.sorbs.net", "smtp.dnsbl.sorbs.net", "new.spam.dnsbl.sorbs.net", # URIBL "multi.uribl.com", "black.uribl.com", "red.uribl.com", "uribl.spameatingmonkey.net", # Other popular DNSBLs "dnsbl-1.uceprotect.net", "dnsbl-2.uceprotect.net", "dnsbl-3.uceprotect.net", "dnsbl.dronebl.org", "cbl.abuseat.org", "bl.deadbeef.com", "bl.emailbasura.org", "bl.spamcannibal.org", "blackholes.mail-abuse.org", "bogons.cymru.com", "combined.abuse.ch", "db.wpbl.info", "rbl.interserver.net", "relays.mail-abuse.org", "truncate.gbudb.net", "psbl.surriel.com", "mailspike.net" ] }, "tld_list": { "url": "https://data.iana.org/TLD/tlds-alpha-by-domain.txt", "processor": lambda data: [line.strip().lower() for line in data.split('\n') if line.strip() and not line.startswith('#')] }, "ip_reputation_providers": { # Use built-in list of reputation providers instead of relying on external sources "fallback": { "providers": [ {"name": "Spamhaus", "url": "https://www.spamhaus.org/"}, {"name": "SpamCop", "url": "https://www.spamcop.net/"}, {"name": "Barracuda", "url": "https://www.barracuda.com/"}, {"name": "SORBS", "url": "http://www.sorbs.net/"}, {"name": "URIBL", "url": "https://uribl.com/"}, {"name": "SURBL", "url": "https://www.surbl.org/"}, {"name": "SpamRats", "url": "https://www.spamrats.com/"}, {"name": "MailSpike", "url": "https://mailspike.org/"}, {"name": "Invaluement", "url": "https://www.invaluement.com/"}, {"name": "Passive Spam Block List", "url": "https://psbl.org/"}, {"name": "Composite Blocking List", "url": "https://www.abuseat.org/"}, {"name": "Proofpoint IP Reputation", "url": "https://www.proofpoint.com/"}, {"name": "Cloudmark", "url": "https://www.cloudmark.com/"}, {"name": "TrustedSource", "url": "https://www.trustedsource.org/"} ] } } } ResourceManager._initialized = True
def _load_download_times(self): """Load last download times from file.""" try: if os.path.exists(self._last_download_file): with open(self._last_download_file, 'r') as f: self.last_download_time = json.load(f) except (json.JSONDecodeError, IOError): self.last_download_time = {} def _save_download_times(self): """Save last download times to file.""" try: with open(self._last_download_file, 'w') as f: json.dump(self.last_download_time, f) except IOError: logger.error("Failed to save resource download times", exc_info=True)
[docs] def needs_update(self, resource_name, max_age_hours=24): """ Check if a resource needs updating. Args: resource_name (str): Name of the resource max_age_hours (int): Maximum age in hours before update needed Returns: bool: True if the resource needs updating """ if resource_name not in self.last_download_time: return True last_time = self.last_download_time[resource_name] current_time = int(time.time()) age_in_seconds = current_time - last_time return age_in_seconds > (max_age_hours * 3600)
[docs] def download_resource(self, resource_name, url=None, processor=None, force=False): """ Download and cache an external resource. Args: resource_name (str): Name of the resource url (str): URL to download the resource from processor (callable): Function to process the downloaded data force (bool): Force download even if not needed Returns: object: The downloaded resource data """ # Use registered resource info if not provided resource_info = {} if resource_name in self.resources: resource_info = self.resources[resource_name] if not url: url = resource_info.get("url") if not processor: processor = resource_info.get("processor") # For resources without URLs (like dnsbl_list), use fallback directly if not url: logger.info(f"No URL provided for resource: {resource_name}, using fallback") return self._use_fallback_or_cached(resource_name, resource_info) with self._lock: if not force and not self.needs_update(resource_name): return self.load_resource(resource_name) logger.info(f"Downloading resource: {resource_name}") try: response = requests.get(url, timeout=30) response.raise_for_status() data = response.text # Process the data if a processor function is provided if processor and callable(processor): data = processor(data) # Save the processed data resource_path = os.path.join(self.cache_dir, f"{resource_name}.json") with open(resource_path, 'w') as f: if isinstance(data, (dict, list)): json.dump(data, f) else: f.write(str(data)) # Update the last download time self.last_download_time[resource_name] = int(time.time()) self._save_download_times() return data except Exception as e: logger.error(f"Failed to download resource {resource_name}: {str(e)}", exc_info=True) # Return cached version or fallback data return self._use_fallback_or_cached(resource_name, resource_info)
def _use_fallback_or_cached(self, resource_name, resource_info): """Use fallback data or cached data for a resource.""" # Try cached version first cached_data = self._load_cached_resource(resource_name) # Special checks for cached data validity if cached_data is not None: if resource_name == "ip_reputation_providers": if isinstance(cached_data, dict) and "providers" in cached_data and len(cached_data["providers"]) == 0: logger.info(f"Cached {resource_name} has empty providers list, using fallback instead") cached_data = None # Force using fallback instead elif resource_name == "dnsbl_list": # Ensure DNSBL list has all our blacklists if isinstance(cached_data, list) and len(cached_data) < len(resource_info.get("fallback", [])): logger.info(f"Cached {resource_name} is missing items, using fallback instead") cached_data = None # Force using fallback if cached_data is not None: logger.info(f"Using cached data for {resource_name}") return cached_data # Use fallback data if available if resource_info and "fallback" in resource_info: fallback_data = resource_info["fallback"] logger.info(f"Using fallback data for {resource_name}: {type(fallback_data)}") # Save fallback data to cache resource_path = os.path.join(self.cache_dir, f"{resource_name}.json") try: with open(resource_path, 'w') as f: if isinstance(fallback_data, (dict, list)): json.dump(fallback_data, f, indent=2) # Use pretty-printing for better debugging logger.info(f"Saved fallback data to {resource_path}") else: f.write(str(fallback_data)) # Update the last download time to prevent immediate re-download attempts self.last_download_time[resource_name] = int(time.time()) self._save_download_times() return fallback_data except IOError as e: logger.error(f"Failed to save fallback data for {resource_name}: {str(e)}") return fallback_data return None def _load_cached_resource(self, resource_name): """Load a resource from cache.""" resource_path = os.path.join(self.cache_dir, f"{resource_name}.json") if not os.path.exists(resource_path): return None try: with open(resource_path, 'r') as f: try: data = json.load(f) logger.debug(f"Loaded JSON data for {resource_name}: {type(data)}") return data except json.JSONDecodeError: # Not a JSON file, return as text f.seek(0) data = f.read() logger.debug(f"Loaded text data for {resource_name}") return data except IOError: logger.error(f"Failed to load resource {resource_name} from cache", exc_info=True) return None
[docs] def load_resource(self, resource_name): """ Load a cached resource. Args: resource_name (str): Name of the resource Returns: object: The cached resource data or None if not available """ # Try to load from cache cached_data = self._load_cached_resource(resource_name) if cached_data is not None: return cached_data # If not in cache, try to download if resource is registered if resource_name in self.resources: return self.download_resource(resource_name) # If all else fails, return fallback data if available if resource_name in self.resources and "fallback" in self.resources[resource_name]: return self.resources[resource_name]["fallback"] return None
[docs] def update_all_resources(self): """Update all registered resources.""" results = {} for name, info in self.resources.items(): logger.info(f"Updating resource: {name}") data = self.download_resource(name, force=True) # Debug log to see what we're working with logger.info(f"Downloaded data type for {name}: {type(data)}") if isinstance(data, dict): logger.info(f"Dictionary keys: {list(data.keys())}") item_count = 0 if data: # Calculate item count based on resource type and data structure if name == "dnsbl_list": if isinstance(data, list): item_count = len(data) else: # If it's not a list, log and use fallback length logger.warning(f"Expected list for dnsbl_list but got {type(data)}") item_count = len(info.get("fallback", [])) elif name == "ip_reputation_providers": # This is a special case with nested structure if isinstance(data, dict) and "providers" in data: logger.info(f"Found providers key with {len(data['providers'])} items") item_count = len(data["providers"]) else: # If structure doesn't match what we expect, log details logger.warning(f"Expected dict with 'providers' key for {name}, got: {data}") # Try to use fallback length try: item_count = len(info.get("fallback", {}).get("providers", [])) logger.info(f"Using fallback length: {item_count}") except (TypeError, AttributeError) as e: logger.error(f"Error getting fallback length: {e}") item_count = 0 else: # Standard resources if isinstance(data, list): item_count = len(data) elif isinstance(data, dict): item_count = len(data) else: try: item_count = len(data) except: item_count = 0 logger.warning(f"Could not determine item count for {name}") # Log the final count for debugging logger.info(f"Final item count for {name}: {item_count}") # Use UTC time for timestamps timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") results[name] = { "status": "updated", "timestamp": timestamp, "items": item_count } else: results[name] = { "status": "failed", "timestamp": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") } logger.info(f"Resource update completed: {json.dumps(results)}") return results
[docs] def start_scheduler(self, update_time="00:00"): """ Start a background scheduler to update resources daily. Args: update_time (str): Time to update resources daily (HH:MM format) """ if self._scheduler_running: logger.warning("Resource update scheduler is already running") return False def _run_scheduler(): # Import here to avoid circular imports import schedule schedule.every().day.at(update_time).do(self.update_all_resources) while self._scheduler_running: schedule.run_pending() time.sleep(60) # Check every minute self._scheduler_running = True self._scheduler_thread = threading.Thread(target=_run_scheduler, daemon=True) self._scheduler_thread.start() logger.info(f"Resource update scheduler started, updates at {update_time} daily") return True
[docs] def stop_scheduler(self): """Stop the background resource update scheduler.""" if not self._scheduler_running: logger.warning("Resource update scheduler is not running") return False self._scheduler_running = False if self._scheduler_thread and self._scheduler_thread.is_alive(): self._scheduler_thread.join(timeout=2) logger.info("Resource update scheduler stopped") return True
# Create a function to update resources on demand
[docs] def update_deliverability_resources(): """ Update all email deliverability resources. This function can be called directly or scheduled to run periodically. Returns: dict: Status of each resource update """ manager = ResourceManager() return manager.update_all_resources()
# Create a function to start the background scheduler
[docs] def start_resource_updater(update_time="03:00"): """ Start the background resource updater to refresh data daily. Args: update_time (str): Time to update resources daily (HH:MM format) Returns: bool: True if scheduler started successfully """ manager = ResourceManager() return manager.start_scheduler(update_time)
# Add a debugging function to examine resource data def debug_resource(resource_name): """ Debug a specific resource by examining its data structure. Args: resource_name (str): Name of the resource to debug Returns: dict: Debug information for the resource """ manager = ResourceManager() data = manager.load_resource(resource_name) result = { "resource_name": resource_name, "type": str(type(data)) } if data is None: result["status"] = "not_found" return result result["status"] = "found" if isinstance(data, dict): result["keys"] = list(data.keys()) result["sample"] = {k: data[k] for k in list(data.keys())[:2]} if data else {} # Special handling for nested structures if "providers" in data and isinstance(data["providers"], list): result["providers_count"] = len(data["providers"]) result["provider_sample"] = data["providers"][0] if data["providers"] else None elif isinstance(data, list): result["length"] = len(data) result["sample"] = data[:2] if data else [] else: try: result["length"] = len(data) result["preview"] = str(data)[:100] except: result["note"] = "Could not determine length or preview" return result