Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
504 lines
16 KiB
Python
504 lines
16 KiB
Python
"""
|
|
Security headers and Content Security Policy (CSP) management for enhanced security.
|
|
"""
|
|
|
|
from django.conf import settings
|
|
from django.http import HttpResponse
|
|
from django.middleware.security import SecurityMiddleware as DjangoSecurityMiddleware
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
from django.core.exceptions import MiddlewareNotUsed
|
|
import re
|
|
import json
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
from urllib.parse import urlparse, urlunparse
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecurityHeadersMiddleware(MiddlewareMixin):
|
|
"""
|
|
Enhanced security middleware with comprehensive security headers and CSP policies.
|
|
"""
|
|
|
|
def __init__(self, get_response):
|
|
super().__init__(get_response)
|
|
|
|
# Initialize CSP configuration
|
|
self.csp_config = getattr(settings, 'CSP_CONFIG', self._get_default_csp_config())
|
|
|
|
# Initialize security headers
|
|
self.security_headers = getattr(settings, 'SECURITY_HEADERS', self._get_default_security_headers())
|
|
|
|
# Initialize allowed domains
|
|
self.allowed_domains = set(getattr(settings, 'ALLOWED_DOMAINS', []))
|
|
|
|
# Initialize nonce generator
|
|
self.nonce_generator = CSPNonceGenerator()
|
|
|
|
def process_response(self, request, response):
|
|
"""
|
|
Add security headers to the response.
|
|
"""
|
|
# Skip for static files and media
|
|
if self._should_skip_headers(request):
|
|
return response
|
|
|
|
# Add security headers
|
|
for header, value in self.security_headers.items():
|
|
response[header] = value
|
|
|
|
# Add CSP header
|
|
csp_value = self._generate_csp_header(request)
|
|
if csp_value:
|
|
response['Content-Security-Policy'] = csp_value
|
|
|
|
# Add Report-Only CSP in development
|
|
if settings.DEBUG and getattr(settings, 'CSP_REPORT_ONLY', False):
|
|
response['Content-Security-Policy-Report-Only'] = csp_value
|
|
|
|
# Add feature policy
|
|
feature_policy = self._generate_feature_policy()
|
|
if feature_policy:
|
|
response['Feature-Policy'] = feature_policy
|
|
|
|
# Add permissions policy
|
|
permissions_policy = self._generate_permissions_policy()
|
|
if permissions_policy:
|
|
response['Permissions-Policy'] = permissions_policy
|
|
|
|
# Add HSTS header in production
|
|
if not settings.DEBUG and getattr(settings, 'SECURE_HSTS_SECONDS', 0):
|
|
response['Strict-Transport-Security'] = self._generate_hsts_header()
|
|
|
|
return response
|
|
|
|
def _should_skip_headers(self, request) -> bool:
|
|
"""
|
|
Determine if security headers should be skipped for this request.
|
|
"""
|
|
# Skip for static files
|
|
if request.path.startswith(settings.STATIC_URL):
|
|
return True
|
|
|
|
# Skip for media files
|
|
if request.path.startswith(settings.MEDIA_URL):
|
|
return True
|
|
|
|
# Skip for health checks
|
|
if request.path.startswith('/health/'):
|
|
return True
|
|
|
|
# Skip for metrics
|
|
if request.path.startswith('/metrics/'):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _get_default_csp_config(self) -> Dict[str, List[str]]:
|
|
"""
|
|
Get default CSP configuration.
|
|
"""
|
|
return {
|
|
'default-src': ["'self'"],
|
|
'script-src': ["'self'", "'unsafe-inline'", "'unsafe-eval'"],
|
|
'style-src': ["'self'", "'unsafe-inline'"],
|
|
'img-src': ["'self'", "data:", "https:"],
|
|
'font-src': ["'self'", "data:"],
|
|
'connect-src': ["'self'"],
|
|
'media-src': ["'self'"],
|
|
'object-src': ["'none'"],
|
|
'frame-src': ["'self'"],
|
|
'frame-ancestors': ["'self'"],
|
|
'form-action': ["'self'"],
|
|
'base-uri': ["'self'"],
|
|
'manifest-src': ["'self'"],
|
|
'worker-src': ["'self'"],
|
|
'child-src': ["'self'"],
|
|
'prefetch-src': ["'self'"],
|
|
'require-trusted-types-for': ["'script'"],
|
|
'trusted-types': ["'default'"],
|
|
'upgrade-insecure-requests': [],
|
|
'block-all-mixed-content': [],
|
|
'report-uri': ['/csp-report-endpoint/'],
|
|
'report-to': ['csp-endpoint'],
|
|
}
|
|
|
|
def _get_default_security_headers(self) -> Dict[str, str]:
|
|
"""
|
|
Get default security headers.
|
|
"""
|
|
return {
|
|
'X-Content-Type-Options': 'nosniff',
|
|
'X-Frame-Options': 'DENY',
|
|
'X-XSS-Protection': '1; mode=block',
|
|
'Referrer-Policy': 'strict-origin-when-cross-origin',
|
|
'X-Permitted-Cross-Domain-Policies': 'none',
|
|
'Clear-Site-Data': '"cache", "cookies", "storage"',
|
|
'Cross-Origin-Opener-Policy': 'same-origin',
|
|
'Cross-Origin-Embedder-Policy': 'require-corp',
|
|
'Cross-Origin-Resource-Policy': 'same-origin',
|
|
}
|
|
|
|
def _generate_csp_header(self, request) -> str:
|
|
"""
|
|
Generate CSP header value based on configuration.
|
|
"""
|
|
directives = []
|
|
|
|
for directive, sources in self.csp_config.items():
|
|
if sources:
|
|
# Add nonce for script and style directives
|
|
if directive in ['script-src', 'style-src'] and "'unsafe-inline'" in sources:
|
|
nonce = self.nonce_generator.get_nonce()
|
|
sources.remove("'unsafe-inline'")
|
|
sources.append(f"'nonce-{nonce}'")
|
|
|
|
# Join sources
|
|
source_list = ' '.join(sources)
|
|
directives.append(f"{directive} {source_list}")
|
|
|
|
return '; '.join(directives)
|
|
|
|
def _generate_feature_policy(self) -> str:
|
|
"""
|
|
Generate Feature Policy header.
|
|
"""
|
|
policies = [
|
|
'camera none',
|
|
'microphone none',
|
|
'geolocation none',
|
|
'payment none',
|
|
'usb none',
|
|
'magnetometer none',
|
|
'gyroscope none',
|
|
'accelerometer none',
|
|
'fullscreen self',
|
|
'document-domain none',
|
|
'sync-xhr self',
|
|
'usb none',
|
|
]
|
|
|
|
return ', '.join(policies)
|
|
|
|
def _generate_permissions_policy(self) -> str:
|
|
"""
|
|
Generate Permissions Policy header.
|
|
"""
|
|
policies = [
|
|
'camera=()',
|
|
'microphone=()',
|
|
'geolocation=()',
|
|
'payment=()',
|
|
'usb=()',
|
|
'magnetometer=()',
|
|
'gyroscope=()',
|
|
'accelerometer=()',
|
|
'fullscreen=(self)',
|
|
'document-domain=()',
|
|
'sync-xhr=(self)',
|
|
'usb=()',
|
|
]
|
|
|
|
return ', '.join(policies)
|
|
|
|
def _generate_hsts_header(self) -> str:
|
|
"""
|
|
Generate HSTS header.
|
|
"""
|
|
max_age = getattr(settings, 'SECURE_HSTS_SECONDS', 31536000)
|
|
include_subdomains = getattr(settings, 'SECURE_HSTS_INCLUDE_SUBDOMAINS', True)
|
|
preload = getattr(settings, 'SECURE_HSTS_PRELOAD', False)
|
|
|
|
header = f'max-age={max_age}'
|
|
|
|
if include_subdomains:
|
|
header += '; includeSubDomains'
|
|
|
|
if preload:
|
|
header += '; preload'
|
|
|
|
return header
|
|
|
|
|
|
class CSPNonceGenerator:
|
|
"""
|
|
Generator for CSP nonces.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._nonces = set()
|
|
self._max_nonces = 1000 # Prevent memory leaks
|
|
|
|
def get_nonce(self) -> str:
|
|
"""
|
|
Generate a new nonce.
|
|
"""
|
|
import secrets
|
|
|
|
# Clean up old nonces if we have too many
|
|
if len(self._nonces) > self._max_nonces:
|
|
self._nonces.clear()
|
|
|
|
# Generate new nonce
|
|
nonce = secrets.token_urlsafe(16)
|
|
self._nonces.add(nonce)
|
|
|
|
return nonce
|
|
|
|
def is_valid_nonce(self, nonce: str) -> bool:
|
|
"""
|
|
Check if a nonce is valid.
|
|
"""
|
|
return nonce in self._nonces
|
|
|
|
def clear_nonces(self):
|
|
"""
|
|
Clear all nonces.
|
|
"""
|
|
self._nonces.clear()
|
|
|
|
|
|
class CSPReportHandler:
|
|
"""
|
|
Handler for CSP violation reports.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger('security.csp')
|
|
|
|
def handle_report(self, report_data: Dict):
|
|
"""
|
|
Handle CSP violation report.
|
|
"""
|
|
try:
|
|
# Log the violation
|
|
self.logger.warning(
|
|
f"CSP Violation: {report_data.get('document-uri')} - "
|
|
f"{report_data.get('violated-directive')} - "
|
|
f"{report_data.get('blocked-uri')}"
|
|
)
|
|
|
|
# Send to monitoring system
|
|
self._send_to_monitoring(report_data)
|
|
|
|
# Store for analysis
|
|
self._store_violation(report_data)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling CSP report: {e}")
|
|
|
|
def _send_to_monitoring(self, report_data: Dict):
|
|
"""
|
|
Send violation report to monitoring system.
|
|
"""
|
|
try:
|
|
from monitoring.alerts import alert_manager
|
|
from monitoring.alerts import Alert, AlertSeverity, AlertCategory
|
|
|
|
alert = Alert(
|
|
title="CSP Violation",
|
|
description=f"CSP violation detected: {report_data.get('violated-directive')}",
|
|
severity=AlertSeverity.WARNING,
|
|
category=AlertCategory.SECURITY,
|
|
metadata={
|
|
'document_uri': report_data.get('document-uri'),
|
|
'violated_directive': report_data.get('violated-directive'),
|
|
'blocked_uri': report_data.get('blocked-uri'),
|
|
'line_number': report_data.get('line-number'),
|
|
'column_number': report_data.get('column-number'),
|
|
}
|
|
)
|
|
|
|
alert_manager.trigger_alert(alert)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error sending CSP report to monitoring: {e}")
|
|
|
|
def _store_violation(self, report_data: Dict):
|
|
"""
|
|
Store violation for analysis.
|
|
"""
|
|
try:
|
|
from django.core.cache import cache
|
|
|
|
# Store recent violations
|
|
cache_key = f"csp_violations_{report_data.get('document-uri', 'unknown')}"
|
|
violations = cache.get(cache_key, [])
|
|
|
|
violations.append({
|
|
'timestamp': report_data.get('timestamp'),
|
|
'violated_directive': report_data.get('violated-directive'),
|
|
'blocked_uri': report_data.get('blocked-uri'),
|
|
'line_number': report_data.get('line-number'),
|
|
'column_number': report_data.get('column-number'),
|
|
})
|
|
|
|
# Keep only last 100 violations
|
|
if len(violations) > 100:
|
|
violations = violations[-100:]
|
|
|
|
cache.set(cache_key, violations, timeout=86400) # 24 hours
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error storing CSP violation: {e}")
|
|
|
|
|
|
class SecurityHeaderValidator:
|
|
"""
|
|
Validator for security headers.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger('security.headers')
|
|
|
|
def validate_headers(self, response: HttpResponse) -> Dict[str, bool]:
|
|
"""
|
|
Validate security headers in response.
|
|
"""
|
|
results = {}
|
|
|
|
# Validate CSP header
|
|
results['csp'] = self._validate_csp_header(response)
|
|
|
|
# Validate HSTS header
|
|
results['hsts'] = self._validate_hsts_header(response)
|
|
|
|
# Validate other security headers
|
|
results['x_content_type_options'] = self._validate_x_content_type_options(response)
|
|
results['x_frame_options'] = self._validate_x_frame_options(response)
|
|
results['x_xss_protection'] = self._validate_x_xss_protection(response)
|
|
results['referrer_policy'] = self._validate_referrer_policy(response)
|
|
|
|
return results
|
|
|
|
def _validate_csp_header(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate CSP header.
|
|
"""
|
|
csp_header = response.get('Content-Security-Policy')
|
|
if not csp_header:
|
|
self.logger.warning("Missing CSP header")
|
|
return False
|
|
|
|
# Check for required directives
|
|
required_directives = ['default-src', 'script-src', 'style-src']
|
|
for directive in required_directives:
|
|
if f"{directive} " not in csp_header:
|
|
self.logger.warning(f"Missing required CSP directive: {directive}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _validate_hsts_header(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate HSTS header.
|
|
"""
|
|
hsts_header = response.get('Strict-Transport-Security')
|
|
if not hsts_header:
|
|
self.logger.warning("Missing HSTS header")
|
|
return False
|
|
|
|
# Check for max-age
|
|
if 'max-age=' not in hsts_header:
|
|
self.logger.warning("HSTS header missing max-age")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _validate_x_content_type_options(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate X-Content-Type-Options header.
|
|
"""
|
|
header = response.get('X-Content-Type-Options')
|
|
if header != 'nosniff':
|
|
self.logger.warning("Invalid X-Content-Type-Options header")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _validate_x_frame_options(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate X-Frame-Options header.
|
|
"""
|
|
header = response.get('X-Frame-Options')
|
|
if header not in ['DENY', 'SAMEORIGIN']:
|
|
self.logger.warning("Invalid X-Frame-Options header")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _validate_x_xss_protection(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate X-XSS-Protection header.
|
|
"""
|
|
header = response.get('X-XSS-Protection')
|
|
if header != '1; mode=block':
|
|
self.logger.warning("Invalid X-XSS-Protection header")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _validate_referrer_policy(self, response: HttpResponse) -> bool:
|
|
"""
|
|
Validate Referrer-Policy header.
|
|
"""
|
|
header = response.get('Referrer-Policy')
|
|
valid_policies = [
|
|
'no-referrer',
|
|
'no-referrer-when-downgrade',
|
|
'origin',
|
|
'origin-when-cross-origin',
|
|
'same-origin',
|
|
'strict-origin',
|
|
'strict-origin-when-cross-origin',
|
|
'unsafe-url'
|
|
]
|
|
|
|
if header not in valid_policies:
|
|
self.logger.warning("Invalid Referrer-Policy header")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class SecurityHeaderMiddleware(SecurityHeadersMiddleware):
|
|
"""
|
|
Enhanced security middleware with Malaysian-specific security considerations.
|
|
"""
|
|
|
|
def __init__(self, get_response):
|
|
super().__init__(get_response)
|
|
|
|
# Initialize Malaysian-specific security headers
|
|
self.malaysian_headers = getattr(settings, 'MALAYSIAN_SECURITY_HEADERS', {
|
|
'X-Malaysian-Data-Protection': 'PDPA-Compliant',
|
|
'X-Malaysian-Privacy-Policy': '/privacy-policy/',
|
|
'X-Malaysian-Contact': '/contact/',
|
|
})
|
|
|
|
# Initialize validator
|
|
self.validator = SecurityHeaderValidator()
|
|
|
|
# Initialize report handler
|
|
self.report_handler = CSPReportHandler()
|
|
|
|
def process_response(self, request, response):
|
|
"""
|
|
Add security headers with Malaysian-specific considerations.
|
|
"""
|
|
# Call parent method
|
|
response = super().process_response(request, response)
|
|
|
|
# Add Malaysian-specific headers
|
|
for header, value in self.malaysian_headers.items():
|
|
response[header] = value
|
|
|
|
# Validate headers
|
|
if settings.DEBUG:
|
|
validation_results = self.validator.validate_headers(response)
|
|
for header, is_valid in validation_results.items():
|
|
if not is_valid:
|
|
self.validator.logger.warning(f"Invalid {header} header")
|
|
|
|
return response |