""" Advanced caching strategies for Malaysian SME SaaS platform. Implements various caching patterns and optimizations. """ import json import logging import threading import time from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Union, Callable, Tuple from functools import wraps from django.core.cache import cache from django.db import connection, transaction from django.conf import settings from django.utils import timezone from django.http import HttpRequest, HttpResponse from django.contrib.auth import get_user_model from rest_framework.response import Response from rest_framework.decorators import api_view from django.views.decorators.cache import cache_page from django.views.decorators.vary import vary_on_headers, vary_on_cookie from .cache_manager import CacheManager, MalaysianDataCache, QueryCache from .config import CacheConfig logger = logging.getLogger(__name__) User = get_user_model() class CacheStrategy: """Base class for caching strategies.""" def __init__(self, cache_manager: CacheManager): self.cache = cache_manager self.hits = 0 self.misses = 0 self.evictions = 0 def get(self, key: str, default: Any = None) -> Any: """Get value from cache.""" result = self.cache.get(key, default) if result == default: self.misses += 1 else: self.hits += 1 return result def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool: """Set value in cache.""" return self.cache.set(key, value, timeout) def get_stats(self) -> Dict[str, Any]: """Get strategy statistics.""" return { "hits": self.hits, "misses": self.misses, "hit_rate": self.hits / (self.hits + self.misses) if (self.hits + self.misses) > 0 else 0, "evictions": self.evictions } class WriteThroughCache(CacheStrategy): """Write-through caching pattern.""" def write_through(self, key: str, value: Any, db_operation: Callable, timeout: Optional[int] = None) -> Any: """Write through cache and database.""" try: # Write to database result = db_operation() # Write to cache self.set(key, result, timeout) return result except Exception as e: logger.error(f"Write-through cache error: {e}") raise class WriteBehindCache(CacheStrategy): """Write-behind caching pattern with async writing.""" def __init__(self, cache_manager: CacheManager, batch_size: int = 10): super().__init__(cache_manager) self.batch_size = batch_size self.write_queue = [] self.write_lock = threading.Lock() self.writer_thread = threading.Thread(target=self._batch_writer, daemon=True) self.writer_thread.start() def write_behind(self, key: str, value: Any, db_operation: Callable) -> bool: """Write to cache and queue for database.""" try: # Write to cache immediately self.set(key, value) # Queue for database write with self.write_lock: self.write_queue.append((key, value, db_operation)) return True except Exception as e: logger.error(f"Write-behind cache error: {e}") return False def _batch_writer(self): """Background thread for batch database writes.""" while True: time.sleep(5) # Write every 5 seconds if not self.write_queue: continue batch = [] with self.write_lock: batch = self.write_queue[:self.batch_size] self.write_queue = self.write_queue[self.batch_size:] for key, value, db_operation in batch: try: db_operation(value) except Exception as e: logger.error(f"Batch write error for key {key}: {e}") class ReadThroughCache(CacheStrategy): """Read-through caching pattern.""" def read_through(self, key: str, db_operation: Callable, timeout: Optional[int] = None) -> Any: """Read through cache with fallback to database.""" result = self.get(key) if result is not None: return result try: # Read from database result = db_operation() # Cache the result if result is not None: self.set(key, result, timeout) return result except Exception as e: logger.error(f"Read-through cache error: {e}") raise class RefreshAheadCache(CacheStrategy): """Refresh-ahead caching pattern.""" def __init__(self, cache_manager: CacheManager, refresh_interval: int = 300): super().__init__(cache_manager) self.refresh_interval = refresh_interval self.refresh_queue = set() self.refresh_lock = threading.Lock() self.refresh_thread = threading.Thread(target=self._refresh_worker, daemon=True) self.refresh_thread.start() def get_or_refresh(self, key: str, db_operation: Callable, timeout: Optional[int] = None) -> Any: """Get from cache and queue for refresh if needed.""" result = self.get(key) if result is not None: # Queue for refresh with self.refresh_lock: self.refresh_queue.add((key, db_operation, timeout)) return result # Cache miss - get from database try: result = db_operation() if result is not None: self.set(key, result, timeout) return result except Exception as e: logger.error(f"Refresh-ahead cache error: {e}") raise def _refresh_worker(self): """Background thread for cache refresh.""" while True: time.sleep(self.refresh_interval) if not self.refresh_queue: continue items_to_refresh = [] with self.refresh_lock: items_to_refresh = list(self.refresh_queue) self.refresh_queue.clear() for key, db_operation, timeout in items_to_refresh: try: result = db_operation() if result is not None: self.set(key, result, timeout) except Exception as e: logger.error(f"Refresh error for key {key}: {e}") class CacheAsidePattern: """Cache-aside pattern implementation.""" def __init__(self, cache_manager: CacheManager): self.cache = cache_manager def get_or_set(self, key: str, db_operation: Callable, timeout: Optional[int] = None) -> Any: """Get from cache or set if not exists.""" result = self.cache.get(key) if result is not None: return result try: result = db_operation() if result is not None: self.cache.set(key, result, timeout) return result except Exception as e: logger.error(f"Cache-aside pattern error: {e}") raise def invalidate(self, key: str) -> bool: """Invalidate cache key.""" return self.cache.delete(key) class MultiLevelCache: """Multi-level caching with L1 and L2 caches.""" def __init__(self, l1_cache: CacheManager, l2_cache: CacheManager): self.l1_cache = l1_cache self.l2_cache = l2_cache self.l1_hits = 0 self.l2_hits = 0 self.misses = 0 def get(self, key: str) -> Optional[Any]: """Get from multi-level cache.""" # Try L1 cache first result = self.l1_cache.get(key) if result is not None: self.l1_hits += 1 return result # Try L2 cache result = self.l2_cache.get(key) if result is not None: self.l2_hits += 1 # Promote to L1 cache self.l1_cache.set(key, result) return result self.misses += 1 return None def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool: """Set in both cache levels.""" l1_success = self.l1_cache.set(key, value, timeout) l2_success = self.l2_cache.set(key, value, timeout) return l1_success and l2_success def get_stats(self) -> Dict[str, Any]: """Get multi-level cache statistics.""" return { "l1_hits": self.l1_hits, "l2_hits": self.l2_hits, "misses": self.misses, "total_requests": self.l1_hits + self.l2_hits + self.misses, "l1_hit_rate": self.l1_hits / (self.l1_hits + self.l2_hits + self.misses) if (self.l1_hits + self.l2_hits + self.misses) > 0 else 0, "overall_hit_rate": (self.l1_hits + self.l2_hits) / (self.l1_hits + self.l2_hits + self.misses) if (self.l1_hits + self.l2_hits + self.misses) > 0 else 0 } class MalaysianCacheStrategies: """Malaysian-specific caching strategies.""" def __init__(self, cache_manager: CacheManager): self.cache = cache_manager self.malaysian_cache = MalaysianDataCache(cache_manager) self.query_cache = QueryCache(cache_manager) def cache_ic_validation(self, ic_number: str, validation_func: Callable) -> Dict[str, Any]: """Cache IC validation results with TTL.""" cached_result = self.malaysian_cache.get_cached_ic_validation(ic_number) if cached_result: return cached_result result = validation_func(ic_number) self.malaysian_cache.set_cached_ic_validation(ic_number, result) return result def cache_sst_calculation(self, calculation_key: str, calculation_func: Callable) -> float: """Cache SST calculations.""" key = f"sst_calc_{calculation_key}" cached_result = self.cache.get(key) if cached_result: return cached_result result = calculation_func() self.cache.set(key, result, timeout=3600) # 1 hour return result def cache_postcode_lookup(self, postcode: str, lookup_func: Callable) -> Dict[str, Any]: """Cache postcode lookups with extended TTL.""" cached_result = self.malaysian_cache.get_cached_postcode_data(postcode) if cached_result: return cached_result result = lookup_func(postcode) self.malaysian_cache.set_cached_postcode_data(postcode, result) return result # Decorators for easy caching def cache_view_response(timeout: int = 300, key_prefix: str = ""): """Decorator to cache view responses.""" def decorator(view_func): @wraps(view_func) def _wrapped_view(request, *args, **kwargs): cache_key = f"{key_prefix}_{request.path}_{request.user.id if request.user.is_authenticated else 'anonymous'}" response = cache.get(cache_key) if response: return response response = view_func(request, *args, **kwargs) if isinstance(response, HttpResponse): cache.set(cache_key, response, timeout) return response return _wrapped_view return decorator def cache_query_results(timeout: int = 300, key_func: Optional[Callable] = None): """Decorator to cache query results.""" def decorator(query_func): @wraps(query_func) def _wrapped_query(*args, **kwargs): cache_key = key_func(*args, **kwargs) if key_func else f"query_{query_func.__name__}_{hash(str(args) + str(kwargs))}" result = cache.get(cache_key) if result: return result result = query_func(*args, **kwargs) cache.set(cache_key, result, timeout) return result return _wrapped_query return decorator def invalidate_cache_on_save(model): """Decorator to invalidate cache when model is saved.""" def decorator(save_method): @wraps(save_method) def _wrapped_save(self, *args, **kwargs): result = save_method(self, *args, **kwargs) # Invalidate cache for this model cache_key = f"{model.__name__}_{self.id}" cache.delete(cache_key) return result return _wrapped_save return decorator class CacheEvictionPolicy: """Advanced cache eviction policies.""" def __init__(self, cache_manager: CacheManager): self.cache = cache_manager self.access_times = {} self.access_counts = {} def record_access(self, key: str): """Record key access for eviction policies.""" now = time.time() self.access_times[key] = now self.access_counts[key] = self.access_counts.get(key, 0) + 1 def lru_eviction(self, keys: List[str], count: int = 1) -> List[str]: """Least Recently Used eviction.""" sorted_keys = sorted(keys, key=lambda k: self.access_times.get(k, 0)) return sorted_keys[:count] def lfu_eviction(self, keys: List[str], count: int = 1) -> List[str]: """Least Frequently Used eviction.""" sorted_keys = sorted(keys, key=lambda k: self.access_counts.get(k, 0)) return sorted_keys[:count] def fifo_eviction(self, keys: List[str], count: int = 1) -> List[str]: """First In First Out eviction.""" return keys[:count]