""" Security headers and Content Security Policy (CSP) management for enhanced security. """ from django.conf import settings from django.http import HttpResponse from django.middleware.security import SecurityMiddleware as DjangoSecurityMiddleware from django.utils.deprecation import MiddlewareMixin from django.core.exceptions import MiddlewareNotUsed import re import json from typing import Dict, List, Optional, Set, Tuple from urllib.parse import urlparse, urlunparse import logging logger = logging.getLogger(__name__) class SecurityHeadersMiddleware(MiddlewareMixin): """ Enhanced security middleware with comprehensive security headers and CSP policies. """ def __init__(self, get_response): super().__init__(get_response) # Initialize CSP configuration self.csp_config = getattr(settings, 'CSP_CONFIG', self._get_default_csp_config()) # Initialize security headers self.security_headers = getattr(settings, 'SECURITY_HEADERS', self._get_default_security_headers()) # Initialize allowed domains self.allowed_domains = set(getattr(settings, 'ALLOWED_DOMAINS', [])) # Initialize nonce generator self.nonce_generator = CSPNonceGenerator() def process_response(self, request, response): """ Add security headers to the response. """ # Skip for static files and media if self._should_skip_headers(request): return response # Add security headers for header, value in self.security_headers.items(): response[header] = value # Add CSP header csp_value = self._generate_csp_header(request) if csp_value: response['Content-Security-Policy'] = csp_value # Add Report-Only CSP in development if settings.DEBUG and getattr(settings, 'CSP_REPORT_ONLY', False): response['Content-Security-Policy-Report-Only'] = csp_value # Add feature policy feature_policy = self._generate_feature_policy() if feature_policy: response['Feature-Policy'] = feature_policy # Add permissions policy permissions_policy = self._generate_permissions_policy() if permissions_policy: response['Permissions-Policy'] = permissions_policy # Add HSTS header in production if not settings.DEBUG and getattr(settings, 'SECURE_HSTS_SECONDS', 0): response['Strict-Transport-Security'] = self._generate_hsts_header() return response def _should_skip_headers(self, request) -> bool: """ Determine if security headers should be skipped for this request. """ # Skip for static files if request.path.startswith(settings.STATIC_URL): return True # Skip for media files if request.path.startswith(settings.MEDIA_URL): return True # Skip for health checks if request.path.startswith('/health/'): return True # Skip for metrics if request.path.startswith('/metrics/'): return True return False def _get_default_csp_config(self) -> Dict[str, List[str]]: """ Get default CSP configuration. """ return { 'default-src': ["'self'"], 'script-src': ["'self'", "'unsafe-inline'", "'unsafe-eval'"], 'style-src': ["'self'", "'unsafe-inline'"], 'img-src': ["'self'", "data:", "https:"], 'font-src': ["'self'", "data:"], 'connect-src': ["'self'"], 'media-src': ["'self'"], 'object-src': ["'none'"], 'frame-src': ["'self'"], 'frame-ancestors': ["'self'"], 'form-action': ["'self'"], 'base-uri': ["'self'"], 'manifest-src': ["'self'"], 'worker-src': ["'self'"], 'child-src': ["'self'"], 'prefetch-src': ["'self'"], 'require-trusted-types-for': ["'script'"], 'trusted-types': ["'default'"], 'upgrade-insecure-requests': [], 'block-all-mixed-content': [], 'report-uri': ['/csp-report-endpoint/'], 'report-to': ['csp-endpoint'], } def _get_default_security_headers(self) -> Dict[str, str]: """ Get default security headers. """ return { 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'DENY', 'X-XSS-Protection': '1; mode=block', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'X-Permitted-Cross-Domain-Policies': 'none', 'Clear-Site-Data': '"cache", "cookies", "storage"', 'Cross-Origin-Opener-Policy': 'same-origin', 'Cross-Origin-Embedder-Policy': 'require-corp', 'Cross-Origin-Resource-Policy': 'same-origin', } def _generate_csp_header(self, request) -> str: """ Generate CSP header value based on configuration. """ directives = [] for directive, sources in self.csp_config.items(): if sources: # Add nonce for script and style directives if directive in ['script-src', 'style-src'] and "'unsafe-inline'" in sources: nonce = self.nonce_generator.get_nonce() sources.remove("'unsafe-inline'") sources.append(f"'nonce-{nonce}'") # Join sources source_list = ' '.join(sources) directives.append(f"{directive} {source_list}") return '; '.join(directives) def _generate_feature_policy(self) -> str: """ Generate Feature Policy header. """ policies = [ 'camera none', 'microphone none', 'geolocation none', 'payment none', 'usb none', 'magnetometer none', 'gyroscope none', 'accelerometer none', 'fullscreen self', 'document-domain none', 'sync-xhr self', 'usb none', ] return ', '.join(policies) def _generate_permissions_policy(self) -> str: """ Generate Permissions Policy header. """ policies = [ 'camera=()', 'microphone=()', 'geolocation=()', 'payment=()', 'usb=()', 'magnetometer=()', 'gyroscope=()', 'accelerometer=()', 'fullscreen=(self)', 'document-domain=()', 'sync-xhr=(self)', 'usb=()', ] return ', '.join(policies) def _generate_hsts_header(self) -> str: """ Generate HSTS header. """ max_age = getattr(settings, 'SECURE_HSTS_SECONDS', 31536000) include_subdomains = getattr(settings, 'SECURE_HSTS_INCLUDE_SUBDOMAINS', True) preload = getattr(settings, 'SECURE_HSTS_PRELOAD', False) header = f'max-age={max_age}' if include_subdomains: header += '; includeSubDomains' if preload: header += '; preload' return header class CSPNonceGenerator: """ Generator for CSP nonces. """ def __init__(self): self._nonces = set() self._max_nonces = 1000 # Prevent memory leaks def get_nonce(self) -> str: """ Generate a new nonce. """ import secrets # Clean up old nonces if we have too many if len(self._nonces) > self._max_nonces: self._nonces.clear() # Generate new nonce nonce = secrets.token_urlsafe(16) self._nonces.add(nonce) return nonce def is_valid_nonce(self, nonce: str) -> bool: """ Check if a nonce is valid. """ return nonce in self._nonces def clear_nonces(self): """ Clear all nonces. """ self._nonces.clear() class CSPReportHandler: """ Handler for CSP violation reports. """ def __init__(self): self.logger = logging.getLogger('security.csp') def handle_report(self, report_data: Dict): """ Handle CSP violation report. """ try: # Log the violation self.logger.warning( f"CSP Violation: {report_data.get('document-uri')} - " f"{report_data.get('violated-directive')} - " f"{report_data.get('blocked-uri')}" ) # Send to monitoring system self._send_to_monitoring(report_data) # Store for analysis self._store_violation(report_data) except Exception as e: self.logger.error(f"Error handling CSP report: {e}") def _send_to_monitoring(self, report_data: Dict): """ Send violation report to monitoring system. """ try: from monitoring.alerts import alert_manager from monitoring.alerts import Alert, AlertSeverity, AlertCategory alert = Alert( title="CSP Violation", description=f"CSP violation detected: {report_data.get('violated-directive')}", severity=AlertSeverity.WARNING, category=AlertCategory.SECURITY, metadata={ 'document_uri': report_data.get('document-uri'), 'violated_directive': report_data.get('violated-directive'), 'blocked_uri': report_data.get('blocked-uri'), 'line_number': report_data.get('line-number'), 'column_number': report_data.get('column-number'), } ) alert_manager.trigger_alert(alert) except Exception as e: self.logger.error(f"Error sending CSP report to monitoring: {e}") def _store_violation(self, report_data: Dict): """ Store violation for analysis. """ try: from django.core.cache import cache # Store recent violations cache_key = f"csp_violations_{report_data.get('document-uri', 'unknown')}" violations = cache.get(cache_key, []) violations.append({ 'timestamp': report_data.get('timestamp'), 'violated_directive': report_data.get('violated-directive'), 'blocked_uri': report_data.get('blocked-uri'), 'line_number': report_data.get('line-number'), 'column_number': report_data.get('column-number'), }) # Keep only last 100 violations if len(violations) > 100: violations = violations[-100:] cache.set(cache_key, violations, timeout=86400) # 24 hours except Exception as e: self.logger.error(f"Error storing CSP violation: {e}") class SecurityHeaderValidator: """ Validator for security headers. """ def __init__(self): self.logger = logging.getLogger('security.headers') def validate_headers(self, response: HttpResponse) -> Dict[str, bool]: """ Validate security headers in response. """ results = {} # Validate CSP header results['csp'] = self._validate_csp_header(response) # Validate HSTS header results['hsts'] = self._validate_hsts_header(response) # Validate other security headers results['x_content_type_options'] = self._validate_x_content_type_options(response) results['x_frame_options'] = self._validate_x_frame_options(response) results['x_xss_protection'] = self._validate_x_xss_protection(response) results['referrer_policy'] = self._validate_referrer_policy(response) return results def _validate_csp_header(self, response: HttpResponse) -> bool: """ Validate CSP header. """ csp_header = response.get('Content-Security-Policy') if not csp_header: self.logger.warning("Missing CSP header") return False # Check for required directives required_directives = ['default-src', 'script-src', 'style-src'] for directive in required_directives: if f"{directive} " not in csp_header: self.logger.warning(f"Missing required CSP directive: {directive}") return False return True def _validate_hsts_header(self, response: HttpResponse) -> bool: """ Validate HSTS header. """ hsts_header = response.get('Strict-Transport-Security') if not hsts_header: self.logger.warning("Missing HSTS header") return False # Check for max-age if 'max-age=' not in hsts_header: self.logger.warning("HSTS header missing max-age") return False return True def _validate_x_content_type_options(self, response: HttpResponse) -> bool: """ Validate X-Content-Type-Options header. """ header = response.get('X-Content-Type-Options') if header != 'nosniff': self.logger.warning("Invalid X-Content-Type-Options header") return False return True def _validate_x_frame_options(self, response: HttpResponse) -> bool: """ Validate X-Frame-Options header. """ header = response.get('X-Frame-Options') if header not in ['DENY', 'SAMEORIGIN']: self.logger.warning("Invalid X-Frame-Options header") return False return True def _validate_x_xss_protection(self, response: HttpResponse) -> bool: """ Validate X-XSS-Protection header. """ header = response.get('X-XSS-Protection') if header != '1; mode=block': self.logger.warning("Invalid X-XSS-Protection header") return False return True def _validate_referrer_policy(self, response: HttpResponse) -> bool: """ Validate Referrer-Policy header. """ header = response.get('Referrer-Policy') valid_policies = [ 'no-referrer', 'no-referrer-when-downgrade', 'origin', 'origin-when-cross-origin', 'same-origin', 'strict-origin', 'strict-origin-when-cross-origin', 'unsafe-url' ] if header not in valid_policies: self.logger.warning("Invalid Referrer-Policy header") return False return True class SecurityHeaderMiddleware(SecurityHeadersMiddleware): """ Enhanced security middleware with Malaysian-specific security considerations. """ def __init__(self, get_response): super().__init__(get_response) # Initialize Malaysian-specific security headers self.malaysian_headers = getattr(settings, 'MALAYSIAN_SECURITY_HEADERS', { 'X-Malaysian-Data-Protection': 'PDPA-Compliant', 'X-Malaysian-Privacy-Policy': '/privacy-policy/', 'X-Malaysian-Contact': '/contact/', }) # Initialize validator self.validator = SecurityHeaderValidator() # Initialize report handler self.report_handler = CSPReportHandler() def process_response(self, request, response): """ Add security headers with Malaysian-specific considerations. """ # Call parent method response = super().process_response(request, response) # Add Malaysian-specific headers for header, value in self.malaysian_headers.items(): response[header] = value # Validate headers if settings.DEBUG: validation_results = self.validator.validate_headers(response) for header, is_valid in validation_results.items(): if not is_valid: self.validator.logger.warning(f"Invalid {header} header") return response