Files
multitenetsaas/backend/monitoring/middleware.py
AHMET YILMAZ b3fff546e9
Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
project initialization
2025-10-05 02:37:33 +08:00

512 lines
17 KiB
Python

"""
Django middleware for application monitoring and metrics collection.
"""
import time
import logging
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.contrib.auth import get_user_model
from django.db import connection
from django.utils import timezone
from django_tenants.utils import get_tenant_model
from prometheus_client import Counter, Histogram, Gauge, Info, CollectorRegistry, generate_latest
logger = logging.getLogger(__name__)
User = get_user_model()
TenantModel = get_tenant_model()
# Prometheus metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status', 'tenant']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'tenant'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
ACTIVE_USERS = Gauge(
'active_users_total',
'Number of active users'
)
DATABASE_CONNECTIONS = Gauge(
'database_connections_total',
'Database connections',
['state']
)
DATABASE_QUERIES = Counter(
'database_queries_total',
'Database queries executed',
['type', 'table', 'tenant']
CACHE_OPERATIONS = Counter(
'cache_operations_total',
'Cache operations',
['operation', 'cache_type', 'tenant']
)
CACHE_HITS = Counter(
'cache_hits_total',
'Cache hits',
['cache_type', 'tenant']
)
CACHE_MISSES = Counter(
'cache_misses_total',
'Cache misses',
['cache_type', 'tenant']
)
MALAYSIAN_OPERATIONS = Counter(
'malaysian_operations_total',
'Malaysian-specific operations',
['operation', 'type']
)
TENANT_METRICS = Gauge(
'tenant_metrics',
'Tenant-specific metrics',
['tenant_id', 'metric_type'],
['tenant_name', 'industry_type']
)
AUTH_EVENTS = Counter(
'auth_events_total',
'Authentication events',
['event_type', 'result', 'tenant']
)
ERROR_EVENTS = Counter(
'error_events_total',
'Application errors',
['error_type', 'severity', 'tenant']
)
BUSINESS_METRICS = Counter(
'business_events_total',
'Business events',
['event_type', 'tenant']
)
SLO_METRICS = Histogram(
'slo_metrics',
'Service Level Objective metrics',
['slo_name', 'tenant']
)
# Application info
APP_INFO = Info('application_info', 'Application information')
APP_INFO.info({
'version': getattr(settings, 'VERSION', '1.0.0'),
'environment': getattr(settings, 'ENVIRONMENT', 'development'),
'django_version': settings.VERSION,
'python_version': settings.PYTHON_VERSION,
'malaysian_sme_platform': 'true'
})
class MonitoringMiddleware:
"""Middleware for comprehensive application monitoring."""
def __init__(self, get_response):
self.get_response = get_response
self.registry = CollectorRegistry()
def __call__(self, request: HttpRequest) -> HttpResponse:
# Generate request ID for tracing
request_id = str(uuid.uuid4())
request.request_id = request_id
# Start timing
start_time = time.time()
# Get tenant info
tenant_info = self._get_tenant_info(request)
# Log request start
self._log_request_start(request, tenant_info)
# Execute request
response = self.get_response(request)
# Calculate metrics
duration = time.time() - start_time
endpoint = self._get_endpoint(request)
status_code = str(response.status_code)
# Record metrics
self._record_request_metrics(request, response, duration, endpoint, tenant_info)
self._record_business_metrics(request, response, tenant_info)
self._record_slo_metrics(request, response, duration, tenant_info)
# Add monitoring headers
self._add_monitoring_headers(response, request_id, duration)
return response
def process_exception(self, request: HttpRequest, exception: Exception) -> Optional[HttpResponse]:
"""Process exceptions and record error metrics."""
tenant_info = self._get_tenant_info(request)
ERROR_EVENTS.labels(
error_type=exception.__class__.__name__,
severity='error',
tenant=tenant_info.get('name', 'unknown')
).inc()
logger.error(f"Request {request.request_id} failed: {exception}", exc_info=True)
return None
def process_template_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse:
"""Process template responses for additional metrics."""
# Add performance metrics to template context
if hasattr(request, 'tenant'):
response.context_data = response.context_data or {}
response.context_data.update({
'performance_metrics': self._get_performance_metrics(request),
'tenant_metrics': self._get_tenant_metrics(request),
})
return response
def _get_tenant_info(self, request: HttpRequest) -> Dict[str, Any]:
"""Get tenant information from request."""
tenant_info = {'id': None, 'name': 'public', 'schema': 'public'}
if hasattr(request, 'tenant') and request.tenant:
tenant_info = {
'id': request.tenant.id,
'name': request.tenant.name,
'schema': request.tenant.schema_name,
'domain': getattr(request.tenant, 'domain_url', ''),
'industry_type': getattr(request.tenant, 'industry_type', 'general')
}
return tenant_info
def _log_request_start(self, request: HttpRequest, tenant_info: Dict[str, Any]):
"""Log request start information."""
logger.info(
f"Request started: {request.method} {request.path} "
f"(ID: {request.request_id}, Tenant: {tenant_info['name']}, "
f"User: {request.user if request.user.is_authenticated else 'anonymous'})"
)
def _get_endpoint(self, request: HttpRequest) -> str:
"""Extract endpoint name from request."""
try:
if hasattr(request.resolver_match, 'view_name'):
return request.resolver_match.view_name or request.path
return request.path
except AttributeError:
return request.path
def _record_request_metrics(self, request: HttpRequest, response: HttpResponse,
duration: float, endpoint: str, tenant_info: Dict[str, Any]):
"""Record HTTP request metrics."""
tenant_name = tenant_info.get('name', 'unknown')
# Record request count and duration
REQUEST_COUNT.labels(
method=request.method,
endpoint=endpoint,
status=response.status_code,
tenant=tenant_name
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=endpoint,
tenant=tenant_name
).observe(duration)
# Record database queries if available
if hasattr(connection, 'queries') and connection.queries:
for query in connection.queries:
DATABASE_QUERIES.labels(
type='select' if query['sql'].upper().startswith('SELECT') else 'other',
table=self._extract_table_name(query['sql']),
tenant=tenant_name
).inc()
# Log slow requests
if duration > 2.0: # 2 seconds threshold
logger.warning(
f"Slow request detected: {request.method} {request.path} "
f"(Duration: {duration:.2f}s, Tenant: {tenant_name})"
)
def _record_business_metrics(self, request: HttpRequest, response: HttpResponse,
tenant_info: Dict[str, Any]):
"""Record business-specific metrics."""
tenant_name = tenant_info.get('name', 'unknown')
# Track user activity
if request.user.is_authenticated:
BUSINESS_METRICS.labels(
event_type='user_activity',
tenant=tenant_name
).inc()
# Track Malaysian-specific operations
if self._is_malaysian_endpoint(request.path):
BUSINESS_METRICS.labels(
event_type='malaysian_operation',
tenant=tenant_name
).inc()
# Track API calls
if request.path.startswith('/api/'):
BUSINESS_METRICS.labels(
event_type='api_call',
tenant=tenant_name
).inc()
def _record_slo_metrics(self, request: HttpRequest, response: HttpResponse,
duration: float, tenant_info: Dict[str, Any]):
"""Record Service Level Objective metrics."""
tenant_name = tenant_info.get('name', 'unknown')
# Availability SLO
SLO_METRICS.labels(
slo_name='availability',
tenant=tenant_name
).observe(1.0 if response.status_code < 500 else 0.0)
# Latency SLO
latency_slo_value = 1.0 if duration <= 2.0 else max(0.0, 1.0 - (duration - 2.0) / 8.0)
SLO_METRICS.labels(
slo_name='latency',
tenant=tenant_name
).observe(latency_slo_value)
def _add_monitoring_headers(self, response: HttpResponse, request_id: str, duration: float):
"""Add monitoring headers to response."""
response['X-Request-ID'] = request_id
response['X-Response-Time'] = f"{duration:.3f}s"
response['X-Monitoring-Timestamp'] = timezone.now().isoformat()
def _extract_table_name(self, sql: str) -> str:
"""Extract table name from SQL query."""
try:
sql_upper = sql.upper()
if 'FROM ' in sql_upper:
from_part = sql_upper.split('FROM ')[1]
table_name = from_part.split()[0].strip('"[]')
return table_name
except Exception:
pass
return 'unknown'
def _is_malaysian_endpoint(self, path: str) -> bool:
"""Check if endpoint is Malaysian-specific."""
malaysian_endpoints = [
'/api/malaysian/',
'/api/sst/',
'/api/ic-validation/',
'/api/postcode/',
]
return any(path.startswith(endpoint) for endpoint in malaysian_endpoints)
def _get_performance_metrics(self, request: HttpRequest) -> Dict[str, Any]:
"""Get performance metrics for template context."""
return {
'response_time': getattr(request, 'response_time', 0),
'database_queries': len(getattr(connection, 'queries', [])),
'cache_hits': getattr(request, 'cache_hits', 0),
'cache_misses': getattr(request, 'cache_misses', 0),
}
def _get_tenant_metrics(self, request: HttpRequest) -> Dict[str, Any]:
"""Get tenant metrics for template context."""
if hasattr(request, 'tenant') and request.tenant:
return {
'tenant_name': request.tenant.name,
'tenant_users': request.tenant.users.count(),
'tenant_industry': getattr(request.tenant, 'industry_type', 'general'),
'tenant_domain': getattr(request.tenant, 'domain_url', ''),
}
return {}
class DatabaseMonitoringMiddleware:
"""Middleware for database monitoring."""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# Reset query tracking
initial_queries = len(getattr(connection, 'queries', []))
response = self.get_response(request)
# Calculate query metrics
final_queries = len(getattr(connection, 'queries', []))
queries_executed = final_queries - initial_queries
# Update metrics
if hasattr(request, 'tenant'):
tenant_name = request.tenant.name
DATABASE_QUERIES.labels(
type='total',
table='all',
tenant=tenant_name
).inc(queries_executed)
return response
class CacheMonitoringMiddleware:
"""Middleware for cache monitoring."""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# Initialize cache metrics
request.cache_hits = 0
request.cache_misses = 0
response = self.get_response(request)
# Record cache metrics
if hasattr(request, 'tenant'):
tenant_name = request.tenant.name
CACHE_HITS.labels(
cache_type='django',
tenant=tenant_name
).inc(request.cache_hits)
CACHE_MISSES.labels(
cache_type='django',
tenant=tenant_name
).inc(request.cache_misses)
return response
class SecurityMonitoringMiddleware:
"""Middleware for security monitoring."""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
# Security checks before request
self._check_security_headers(request)
self._check_rate_limits(request)
response = self.get_response(request)
# Security monitoring after response
self._monitor_security_events(request, response)
return response
def _check_security_headers(self, request: HttpRequest):
"""Check for security-related headers."""
# Check for suspicious user agents
user_agent = request.META.get('HTTP_USER_AGENT', '')
suspicious_agents = ['sqlmap', 'nikto', 'nmap', 'curl', 'wget']
if any(agent in user_agent.lower() for agent in suspicious_agents):
ERROR_EVENTS.labels(
error_type='suspicious_user_agent',
severity='warning',
tenant='unknown'
).inc()
def _check_rate_limits(self, request: HttpRequest):
"""Check rate limits."""
ip_address = self._get_client_ip(request)
# Implement rate limiting logic here
# This would typically use Redis to track request rates
def _monitor_security_events(self, request: HttpRequest, response: HttpResponse):
"""Monitor security-related events."""
# Monitor for authentication attempts
if '/login' in request.path:
if response.status_code == 200:
AUTH_EVENTS.labels(
event_type='login_attempt',
result='success',
tenant=getattr(request, 'tenant', {}).get('name', 'unknown')
).inc()
else:
AUTH_EVENTS.labels(
event_type='login_attempt',
result='failed',
tenant=getattr(request, 'tenant', {}).get('name', 'unknown')
).inc()
# Monitor for SQL injection attempts
if self._detect_sql_injection(request):
ERROR_EVENTS.labels(
error_type='sql_injection_attempt',
severity='critical',
tenant=getattr(request, 'tenant', {}).get('name', 'unknown')
).inc()
def _get_client_ip(self, request: HttpRequest) -> str:
"""Get client IP address."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def _detect_sql_injection(self, request: HttpRequest) -> bool:
"""Detect potential SQL injection attempts."""
sql_injection_patterns = [
"' OR '1'='1",
"DROP TABLE",
"UNION SELECT",
"INSERT INTO",
"UPDATE SET",
"DELETE FROM",
"--",
"/*",
"*/"
]
# Check GET parameters
for value in request.GET.values():
if any(pattern.upper() in value.upper() for pattern in sql_injection_patterns):
return True
# Check POST data
if hasattr(request, 'POST'):
for value in request.POST.values():
if any(pattern.upper() in value.upper() for pattern in sql_injection_patterns):
return True
return False
class MetricsView:
"""View for exposing Prometheus metrics."""
def __init__(self):
self.registry = CollectorRegistry()
def get_metrics(self) -> str:
"""Get all metrics in Prometheus format."""
return generate_latest(self.registry).decode('utf-8')
def get_health_metrics(self) -> Dict[str, Any]:
"""Get health metrics."""
return {
'application_info': APP_INFO.info,
'active_users': ACTIVE_USERS._value.get(),
'database_connections': DATABASE_CONNECTIONS._value.get(),
'timestamp': timezone.now().isoformat(),
}