""" Django views for monitoring and metrics endpoints. """ import json from datetime import datetime, timedelta from typing import Dict, Any, Optional from django.http import JsonResponse, HttpResponse from django.views.generic import TemplateView from django.contrib.auth.mixins import LoginRequiredMixin from django.db import connection from django.utils import timezone from django.conf import settings from rest_framework.views import APIView from rest_framework.response import Response from rest_framework import status from prometheus_client import generate_latest, REGISTRY, CONTENT_TYPE_LATEST from prometheus_client.parser import text_string_to_metric_families from .middleware import MetricsView from .exporters import metrics_collector from .alerts import alert_manager, Alert, AlertSeverity, AlertCategory class MetricsView(APIView): """View for Prometheus metrics endpoint.""" def get(self, request): """Get Prometheus metrics.""" try: # Collect current metrics metrics_collector.collect_once() # Generate Prometheus format metrics_data = generate_latest(REGISTRY) return HttpResponse( metrics_data, content_type=CONTENT_TYPE_LATEST ) except Exception as e: return JsonResponse( {'error': f'Failed to generate metrics: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class HealthCheckView(APIView): """Health check endpoint.""" def get(self, request): """Comprehensive health check.""" try: health_status = { 'status': 'healthy', 'timestamp': timezone.now().isoformat(), 'version': getattr(settings, 'VERSION', '1.0.0'), 'environment': getattr(settings, 'ENVIRONMENT', 'development'), 'checks': {} } # Database health try: with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() health_status['checks']['database'] = { 'status': 'healthy', 'response_time': self._measure_response_time(self._check_database) } except Exception as e: health_status['checks']['database'] = { 'status': 'unhealthy', 'error': str(e) } health_status['status'] = 'degraded' # Cache health try: from django.core.cache import cache cache.set('health_check', 'test', 1) cache.get('health_check') health_status['checks']['cache'] = { 'status': 'healthy', 'response_time': self._measure_response_time(self._check_cache) } except Exception as e: health_status['checks']['cache'] = { 'status': 'unhealthy', 'error': str(e) } health_status['status'] = 'degraded' # Storage health try: storage_health = self._check_storage() health_status['checks']['storage'] = storage_health if storage_health['status'] != 'healthy': health_status['status'] = 'degraded' except Exception as e: health_status['checks']['storage'] = { 'status': 'unhealthy', 'error': str(e) } health_status['status'] = 'degraded' # External services health external_services = self._check_external_services() health_status['checks']['external_services'] = external_services if any(service['status'] != 'healthy' for service in external_services.values()): health_status['status'] = 'degraded' # Malaysian services health malaysian_services = self._check_malaysian_services() health_status['checks']['malaysian_services'] = malaysian_services if any(service['status'] != 'healthy' for service in malaysian_services.values()): health_status['status'] = 'degraded' # Determine HTTP status code if health_status['status'] == 'healthy': http_status = status.HTTP_200_OK elif health_status['status'] == 'degraded': http_status = status.HTTP_503_SERVICE_UNAVAILABLE else: http_status = status.HTTP_500_INTERNAL_SERVER_ERROR return Response(health_status, status=http_status) except Exception as e: return Response( {'status': 'unhealthy', 'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def _measure_response_time(self, func) -> float: """Measure response time of a function.""" import time start_time = time.time() func() return time.time() - start_time def _check_database(self): """Check database connectivity.""" with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() def _check_cache(self): """Check cache functionality.""" from django.core.cache import cache cache.set('health_check', 'test', 1) result = cache.get('health_check') if result != 'test': raise Exception("Cache functionality failed") def _check_storage(self) -> Dict[str, Any]: """Check storage availability.""" try: import os # Check media directory media_path = getattr(settings, 'MEDIA_ROOT', '/media') if not os.path.exists(media_path): return {'status': 'unhealthy', 'error': 'Media directory not found'} # Check write permissions test_file = os.path.join(media_path, 'health_check_test.tmp') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) except Exception as e: return {'status': 'unhealthy', 'error': f'Write permission error: {str(e)}'} # Check disk space disk_usage = os.statvfs(media_path) free_space_percent = (disk_usage.f_bavail * disk_usage.f_frsize) / (disk_usage.f_blocks * disk_usage.f_frsize) * 100 if free_space_percent < 10: return { 'status': 'degraded', 'error': f'Low disk space: {free_space_percent:.1f}% free' } return {'status': 'healthy', 'free_space_percent': free_space_percent} except Exception as e: return {'status': 'unhealthy', 'error': str(e)} def _check_external_services(self) -> Dict[str, Dict[str, Any]]: """Check external services health.""" services = {} # Check email service try: from django.core.mail import get_connection connection = get_connection() connection.open() connection.close() services['email'] = {'status': 'healthy'} except Exception as e: services['email'] = {'status': 'unhealthy', 'error': str(e)} # Check Redis (if configured) try: import redis redis_client = redis.from_url(settings.REDIS_URL) redis_client.ping() services['redis'] = {'status': 'healthy'} except Exception as e: services['redis'] = {'status': 'unhealthy', 'error': str(e)} # Check external APIs (if configured) external_apis = getattr(settings, 'EXTERNAL_APIS', {}) for api_name, api_config in external_apis.items(): try: import requests response = requests.get( api_config['health_url'], timeout=api_config.get('timeout', 5) ) if response.status_code == 200: services[api_name] = {'status': 'healthy'} else: services[api_name] = { 'status': 'unhealthy', 'error': f'HTTP {response.status_code}' } except Exception as e: services[api_name] = {'status': 'unhealthy', 'error': str(e)} return services def _check_malaysian_services(self) -> Dict[str, Dict[str, Any]]: """Check Malaysian-specific services.""" services = {} # Check Malaysian postcode service try: from core.services.malaysian_services import MalaysianPostcodeService postcode_service = MalaysianPostcodeService() result = postcode_service.lookup_postcode('50000') services['postcode_service'] = { 'status': 'healthy' if result else 'degraded' } except Exception as e: services['postcode_service'] = {'status': 'unhealthy', 'error': str(e)} # Check SST calculation service try: from core.services.malaysian_services import SSTCalculationService sst_service = SSTCalculationService() result = sst_service.calculate_sst(100, 'standard', 'Johor') services['sst_service'] = { 'status': 'healthy' if result is not None else 'degraded' } except Exception as e: services['sst_service'] = {'status': 'unhealthy', 'error': str(e)} # Check IC validation service try: from core.services.malaysian_services import ICValidationService ic_service = ICValidationService() result = ic_service.validate_ic('1234567890') services['ic_validation_service'] = { 'status': 'healthy' if result is not None else 'degraded' } except Exception as e: services['ic_validation_service'] = {'status': 'unhealthy', 'error': str(e)} return services class AlertsView(APIView): """Alerts management endpoint.""" def get(self, request): """Get alerts.""" try: # Check for new alerts alert_manager.check_rules() # Get query parameters severity = request.query_params.get('severity') category = request.query_params.get('category') status = request.query_params.get('status', 'active') hours = int(request.query_params.get('hours', 24)) # Get alerts if status == 'active': alerts = alert_manager.get_active_alerts(severity=severity, category=category) else: alerts = alert_manager.get_alert_history(hours=hours) if severity: alerts = [a for a in alerts if a.severity == severity] if category: alerts = [a for a in alerts if a.category == category] # Convert to response format response_data = { 'alerts': [alert.to_dict() for alert in alerts], 'summary': self._get_alerts_summary(alerts), 'timestamp': timezone.now().isoformat() } return Response(response_data) except Exception as e: return Response( {'error': f'Failed to get alerts: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def post(self, request): """Create manual alert.""" try: data = request.data alert = Alert( title=data['title'], description=data['description'], severity=data.get('severity', AlertSeverity.INFO), category=data.get('category', AlertCategory.SYSTEM), metadata=data.get('metadata', {}) ) alert_manager.trigger_alert(alert) return Response( {'message': 'Alert created successfully', 'alert_id': alert.id}, status=status.HTTP_201_CREATED ) except Exception as e: return Response( {'error': f'Failed to create alert: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def _get_alerts_summary(self, alerts) -> Dict[str, Any]: """Get alerts summary statistics.""" summary = { 'total': len(alerts), 'by_severity': {}, 'by_category': {}, 'by_status': {} } for alert in alerts: # Count by severity summary['by_severity'][alert.severity] = summary['by_severity'].get(alert.severity, 0) + 1 # Count by category summary['by_category'][alert.category] = summary['by_category'].get(alert.category, 0) + 1 # Count by status status = alert.get_status() summary['by_status'][status] = summary['by_status'].get(status, 0) + 1 return summary class AlertActionView(APIView): """Alert management actions.""" def post(self, request, alert_id: str, action: str): """Perform alert actions.""" try: if action == 'acknowledge': user = request.user.username if request.user.is_authenticated else 'api_user' alert_manager.acknowledge_alert(alert_id, user) return Response({'message': f'Alert {alert_id} acknowledged'}) elif action == 'resolve': user = request.user.username if request.user.is_authenticated else 'api_user' alert_manager.resolve_alert(alert_id, user) return Response({'message': f'Alert {alert_id} resolved'}) else: return Response( {'error': f'Unknown action: {action}'}, status=status.HTTP_400_BAD_REQUEST ) except Exception as e: return Response( {'error': f'Failed to perform action {action} on alert {alert_id}: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class MonitoringDashboardView(LoginRequiredMixin, TemplateView): """Monitoring dashboard template view.""" template_name = 'monitoring/dashboard.html' def get_context_data(self, **kwargs): """Get dashboard context data.""" context = super().get_context_data(**kwargs) # Get current alerts context['active_alerts'] = alert_manager.get_active_alerts() context['alert_summary'] = self._get_alerts_summary(context['active_alerts']) # Get system metrics context['system_metrics'] = self._get_system_metrics() # Get business metrics context['business_metrics'] = self._get_business_metrics() # Malaysian-specific metrics context['malaysian_metrics'] = self._get_malaysian_metrics() return context def _get_alerts_summary(self, alerts) -> Dict[str, Any]: """Get alerts summary.""" summary = { 'total': len(alerts), 'critical': len([a for a in alerts if a.severity == AlertSeverity.CRITICAL]), 'error': len([a for a in alerts if a.severity == AlertSeverity.ERROR]), 'warning': len([a for a in alerts if a.severity == AlertSeverity.WARNING]), 'info': len([a for a in alerts if a.severity == AlertSeverity.INFO]), } return summary def _get_system_metrics(self) -> Dict[str, Any]: """Get system metrics.""" try: import psutil return { 'cpu_usage': psutil.cpu_percent(interval=1), 'memory_usage': psutil.virtual_memory().percent, 'disk_usage': psutil.disk_usage('/').percent, 'load_average': psutil.getloadavg()[0], 'uptime': datetime.now() - datetime.fromtimestamp(psutil.boot_time()), } except Exception: return {} def _get_business_metrics(self) -> Dict[str, Any]: """Get business metrics.""" try: from django.contrib.auth import get_user_model from core.models import Transaction User = get_user_model() # Active users active_users = User.objects.filter( is_active=True, last_login__gte=timezone.now() - timedelta(minutes=30) ).count() # Today's transactions today_transactions = Transaction.objects.filter( created_at__date=timezone.now().date(), status='completed' ).count() return { 'active_users': active_users, 'today_transactions': today_transactions, } except Exception: return {} def _get_malaysian_metrics(self) -> Dict[str, Any]: """Get Malaysian-specific metrics.""" try: from core.models import MalaysianICValidation, SSTCalculation return { 'ic_validations_today': MalaysianICValidation.objects.filter( created_at__date=timezone.now().date() ).count(), 'sst_calculations_today': SSTCalculation.objects.filter( created_at__date=timezone.now().date() ).count(), } except Exception: return {} class MetricsDashboardView(LoginRequiredMixin, TemplateView): """Metrics dashboard template view.""" template_name = 'monitoring/metrics_dashboard.html'