""" Malaysian Personal Data Protection Act (PDPA) compliance features. """ import json import logging from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Tuple from django.conf import settings from django.contrib.auth import get_user_model from django.core.cache import cache from django.db import models, transaction from django.utils import timezone from django.utils.crypto import get_random_string from django.core.mail import send_mail from django.template.loader import render_to_string from django.http import HttpRequest, HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.exceptions import ValidationError import hashlib import hmac import secrets from enum import Enum import io import csv import xlsxwriter import PyPDF2 from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 logger = logging.getLogger(__name__) User = get_user_model() class PDPAConsentType(Enum): """Types of PDPA consent.""" MARKETING = "marketing" ANALYTICS = "analytics" PERSONALIZATION = "personalization" THIRD_PARTY = "third_party" DATA_SHARING = "data_sharing" AUTOMATED_DECISION = "automated_decision" class PDPARetentionPeriod(Enum): """Data retention periods.""" MINIMAL = "minimal" STANDARD = "standard" EXTENDED = "extended" LEGAL_HOLD = "legal_hold" class PDPADataCategory(Enum): """Categories of personal data under PDPA.""" IDENTIFICATION = "identification" CONTACT = "contact" FINANCIAL = "financial" HEALTH = "health" EMPLOYMENT = "employment" EDUCATION = "education" LOCATION = "location" BEHAVIORAL = "behavioral" BIOMETRIC = "biometric" GENEALOGICAL = "genealogical" OPINION = "opinion" class PDPAConsentManager: """ Manager for PDPA consent tracking and management. """ def __init__(self): self.logger = logging.getLogger('security.pdpa.consent') def record_consent(self, user_id: int, consent_type: PDPAConsentType, consent_given: bool, metadata: Dict = None) -> bool: """ Record user consent for data processing. """ try: from .models import PDPAConsentRecord # Create consent record consent_record = PDPAConsentRecord.objects.create( user_id=user_id, consent_type=consent_type.value, consent_given=consent_given, metadata=metadata or {}, ip_address="127.0.0.1", # Would get from request user_agent="Mozilla/5.0", # Would get from request ) # Update cache cache_key = f"pdpa_consent_{user_id}" self._update_consent_cache(user_id, consent_type, consent_given) # Log consent self.logger.info(f"Consent recorded: user={user_id}, type={consent_type.value}, given={consent_given}") return True except Exception as e: self.logger.error(f"Consent recording error: {e}") return False def check_consent(self, user_id: int, consent_type: PDPAConsentType) -> bool: """ Check if user has given consent for specific processing. """ try: # Check cache first cache_key = f"pdpa_consent_{user_id}" cached_consents = cache.get(cache_key, {}) if consent_type.value in cached_consents: return cached_consents[consent_type.value] # Check database from .models import PDPAConsentRecord latest_consent = PDPAConsentRecord.objects.filter( user_id=user_id, consent_type=consent_type.value, ).order_by('-created_at').first() consent_given = latest_consent.consent_given if latest_consent else False # Update cache self._update_consent_cache(user_id, consent_type, consent_given) return consent_given except Exception as e: self.logger.error(f"Consent check error: {e}") return False def get_user_consents(self, user_id: int) -> Dict[str, Dict]: """ Get all consent records for a user. """ try: from .models import PDPAConsentRecord # Get all consent records records = PDPAConsentRecord.objects.filter(user_id=user_id).order_by('-created_at') # Group by consent type consents = {} for record in records: if record.consent_type not in consents: consents[record.consent_type] = { 'latest': None, 'history': [] } consents[record.consent_type]['latest'] = { 'consent_given': record.consent_given, 'timestamp': record.created_at.isoformat(), 'metadata': record.metadata, } consents[record.consent_type]['history'].append({ 'consent_given': record.consent_given, 'timestamp': record.created_at.isoformat(), 'metadata': record.metadata, }) return consents except Exception as e: self.logger.error(f"Consent retrieval error: {e}") return {} def _update_consent_cache(self, user_id: int, consent_type: PDPAConsentType, consent_given: bool): """ Update consent cache. """ cache_key = f"pdpa_consent_{user_id}" cached_consents = cache.get(cache_key, {}) cached_consents[consent_type.value] = consent_given cache.set(cache_key, cached_consents, timeout=3600) # 1 hour class PDPADataProcessor: """ PDPA-compliant data processing and anonymization. """ def __init__(self): self.logger = logging.getLogger('security.pdpa.data') self.encryption_key = self._get_encryption_key() def anonymize_data(self, data: Dict, data_category: PDPADataCategory) -> Dict: """ Anonymize personal data according to PDPA requirements. """ try: anonymized_data = data.copy() # Apply anonymization based on data category if data_category == PDPADataCategory.IDENTIFICATION: anonymized_data = self._anonymize_identification_data(anonymized_data) elif data_category == PDPADataCategory.CONTACT: anonymized_data = self._anonymize_contact_data(anonymized_data) elif data_category == PDPADataCategory.FINANCIAL: anonymized_data = self._anonymize_financial_data(anonymized_data) elif data_category == PDPADataCategory.HEALTH: anonymized_data = self._anonymize_health_data(anonymized_data) elif data_category == PDPADataCategory.LOCATION: anonymized_data = self._anonymize_location_data(anonymized_data) return anonymized_data except Exception as e: self.logger.error(f"Data anonymization error: {e}") return data def encrypt_sensitive_data(self, data: str, user_id: int) -> str: """ Encrypt sensitive data with user-specific key. """ try: # Generate user-specific key user_key = self._generate_user_encryption_key(user_id) # Create cipher cipher = Fernet(user_key) # Encrypt data encrypted_data = cipher.encrypt(data.encode()) return base64.b64encode(encrypted_data).decode() except Exception as e: self.logger.error(f"Data encryption error: {e}") return data def decrypt_sensitive_data(self, encrypted_data: str, user_id: int) -> str: """ Decrypt sensitive data with user-specific key. """ try: # Generate user-specific key user_key = self._generate_user_encryption_key(user_id) # Create cipher cipher = Fernet(user_key) # Decrypt data decoded_data = base64.b64decode(encrypted_data.encode()) decrypted_data = cipher.decrypt(decoded_data) return decrypted_data.decode() except Exception as e: self.logger.error(f"Data decryption error: {e}") return encrypted_data def _anonymize_identification_data(self, data: Dict) -> Dict: """ Anonymize identification data. """ anonymized = data.copy() # Anonymize IC number (keep last 4 digits) if 'ic_number' in anonymized: ic = anonymized['ic_number'] if len(ic) > 4: anonymized['ic_number'] = '*' * (len(ic) - 4) + ic[-4:] # Anonymize passport number if 'passport_number' in anonymized: passport = anonymized['passport_number'] if len(passport) > 3: anonymized['passport_number'] = '*' * (len(passport) - 3) + passport[-3:] # Anonymize full name (keep initials) if 'full_name' in anonymized: name = anonymized['full_name'] names = name.split() anonymized_name = [] for n in names: if len(n) > 1: anonymized_name.append(n[0] + '*') else: anonymized_name.append(n) anonymized['full_name'] = ' '.join(anonymized_name) return anonymized def _anonymize_contact_data(self, data: Dict) -> Dict: """ Anonymize contact data. """ anonymized = data.copy() # Anonymize email (keep domain) if 'email' in anonymized: email = anonymized['email'] if '@' in email: local_part, domain = email.split('@', 1) anonymized_email = local_part[0] + '*' * (len(local_part) - 1) + '@' + domain anonymized['email'] = anonymized_email # Anonymize phone number if 'phone_number' in anonymized: phone = anonymized['phone_number'] digits = ''.join(c for c in phone if c.isdigit()) if len(digits) > 4: anonymized_phone = '*' * (len(digits) - 4) + digits[-4:] anonymized['phone_number'] = anonymized_phone return anonymized def _anonymize_financial_data(self, data: Dict) -> Dict: """ Anonymize financial data. """ anonymized = data.copy() # Anonymize bank account number if 'bank_account_number' in anonymized: account = anonymized['bank_account_number'] if len(account) > 4: anonymized['bank_account_number'] = '*' * (len(account) - 4) + account[-4:] # Anonymize credit card number if 'credit_card_number' in anonymized: card = anonymized['credit_card_number'] digits = ''.join(c for c in card if c.isdigit()) if len(digits) > 4: anonymized['credit_card_number'] = '*' * (len(digits) - 4) + digits[-4:] return anonymized def _anonymize_health_data(self, data: Dict) -> Dict: """ Anonymize health data (redact most information). """ anonymized = data.copy() # Redact sensitive health information sensitive_fields = ['medical_history', 'diagnosis', 'treatment', 'medication'] for field in sensitive_fields: if field in anonymized: anonymized[field] = '[REDACTED]' return anonymized def _anonymize_location_data(self, data: Dict) -> Dict: """ Anonymize location data. """ anonymized = data.copy() # Anonymize specific address (keep city/state) if 'address' in anonymized: address = anonymized['address'] anonymized['address'] = '[ADDRESS REDACTED]' # Keep only city and state for field in ['city', 'state']: if field in anonymized: anonymized[field] = anonymized[field] return anonymized def _get_encryption_key(self) -> bytes: """ Get or create encryption key. """ try: # Try to get key from cache cache_key = 'pdpa_encryption_key' key = cache.get(cache_key) if key: return base64.b64decode(key) # Generate new key key = Fernet.generate_key() cache.set(cache_key, base64.b64encode(key).decode(), timeout=None) return key except Exception as e: self.logger.error(f"Encryption key generation error: {e}") # Fallback to hardcoded key (not recommended for production) return b'your-secret-key-here' def _generate_user_encryption_key(self, user_id: int) -> bytes: """ Generate user-specific encryption key. """ try: # Use PBKDF2 to derive key from user ID and master key password = str(user_id).encode() salt = b'malaysian_pdpa_salt' # Should be properly secured kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password)) return key except Exception as e: self.logger.error(f"User key generation error: {e}") return self._get_encryption_key() class PDPADataRetentionManager: """ Manager for PDPA data retention and deletion policies. """ def __init__(self): self.logger = logging.getLogger('security.pdpa.retention') def apply_retention_policies(self): """ Apply data retention policies across all user data. """ try: # Get users with data retention policies from .models import PDPADataRetention retention_policies = PDPADataRetention.objects.all() for policy in retention_policies: self._apply_user_retention_policy(policy) self.logger.info(f"Applied retention policies to {retention_policies.count()} users") except Exception as e: self.logger.error(f"Retention policy application error: {e}") def _apply_user_retention_policy(self, policy): """ Apply retention policy for a specific user. """ try: user_id = policy.user_id retention_period = policy.retention_period # Calculate cutoff date cutoff_date = self._calculate_cutoff_date(retention_period) # Apply retention to different data types self._apply_activity_retention(user_id, cutoff_date) self._apply_log_retention(user_id, cutoff_date) self._apply_session_retention(user_id, cutoff_date) self._apply_consent_retention(user_id, cutoff_date) except Exception as e: self.logger.error(f"User retention policy error for user {user_id}: {e}") def _calculate_cutoff_date(self, retention_period: str) -> datetime: """ Calculate cutoff date based on retention period. """ now = timezone.now() if retention_period == PDPARetentionPeriod.MINIMAL.value: return now - timedelta(days=30) elif retention_period == PDPARetentionPeriod.STANDARD.value: return now - timedelta(days=365) elif retention_period == PDPARetentionPeriod.EXTENDED.value: return now - timedelta(days=1825) # 5 years elif retention_period == PDPARetentionPeriod.LEGAL_HOLD.value: return now - timedelta(days=3650) # 10 years else: return now - timedelta(days=365) # Default to standard def _apply_activity_retention(self, user_id: int, cutoff_date: datetime): """ Apply retention to user activity data. """ try: from django.contrib.auth.models import LogEntry # Delete old log entries LogEntry.objects.filter( user_id=user_id, action_time__lt=cutoff_date ).delete() except Exception as e: self.logger.error(f"Activity retention error for user {user_id}: {e}") def _apply_log_retention(self, user_id: int, cutoff_date: datetime): """ Apply retention to user log data. """ try: # Delete old security logs from monitoring.models import SecurityLog SecurityLog.objects.filter( user_id=user_id, timestamp__lt=cutoff_date ).delete() except Exception as e: self.logger.error(f"Log retention error for user {user_id}: {e}") def _apply_session_retention(self, user_id: int, cutoff_date: datetime): """ Apply retention to user session data. """ try: from django.contrib.sessions.models import Session # Delete old sessions Session.objects.filter( expire_date__lt=cutoff_date ).delete() except Exception as e: self.logger.error(f"Session retention error for user {user_id}: {e}") def _apply_consent_retention(self, user_id: int, cutoff_date: datetime): """ Apply retention to consent data (keep only latest). """ try: from .models import PDPAConsentRecord # Get all consent records records = PDPAConsentRecord.objects.filter( user_id=user_id, created_at__lt=cutoff_date ) # Group by consent type and keep only latest for consent_type in PDPAConsentType: type_records = records.filter(consent_type=consent_type.value) if type_records.count() > 1: # Keep only the latest record latest = type_records.order_by('-created_at').first() type_records.exclude(id=latest.id).delete() except Exception as e: self.logger.error(f"Consent retention error for user {user_id}: {e}") class PDPADataExporter: """ PDPA-compliant data export for data subject requests. """ def __init__(self): self.logger = logging.getLogger('security.pdpa.export') self.data_processor = PDPADataProcessor() def export_user_data(self, user_id: int, format: str = 'json') -> HttpResponse: """ Export user data in PDPA-compliant format. """ try: # Collect user data user_data = self._collect_user_data(user_id) # Apply anonymization if needed user_data = self._apply_export_anonymization(user_data) # Generate export if format == 'json': return self._export_json(user_data) elif format == 'csv': return self._export_csv(user_data) elif format == 'xlsx': return self._export_xlsx(user_data) elif format == 'pdf': return self._export_pdf(user_data) else: raise ValidationError(f"Unsupported export format: {format}") except Exception as e: self.logger.error(f"Data export error for user {user_id}: {e}") return JsonResponse( {'error': 'Export failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def _collect_user_data(self, user_id: int) -> Dict: """ Collect all user data for export. """ try: from .models import PDPAConsentRecord, PDPADataRetention data = { 'user_profile': self._get_user_profile(user_id), 'consent_records': self._get_consent_records(user_id), 'retention_policy': self._get_retention_policy(user_id), 'business_data': self._get_business_data(user_id), 'activity_logs': self._get_activity_logs(user_id), 'security_logs': self._get_security_logs(user_id), 'export_metadata': { 'export_date': timezone.now().isoformat(), 'export_type': 'pdpa_compliant', 'data_categories': ['identification', 'contact', 'activity'], } } return data except Exception as e: self.logger.error(f"Data collection error for user {user_id}: {e}") return {} def _get_user_profile(self, user_id: int) -> Dict: """ Get user profile data. """ try: user = User.objects.get(id=user_id) return { 'id': user.id, 'username': user.username, 'email': user.email, 'first_name': user.first_name, 'last_name': user.last_name, 'date_joined': user.date_joined.isoformat(), 'last_login': user.last_login.isoformat() if user.last_login else None, 'is_active': user.is_active, 'profile': { field.name: getattr(user, field.name) for field in user._meta.fields if field.name not in ['id', 'password', 'username', 'email', 'first_name', 'last_name', 'date_joined', 'last_login', 'is_active'] } } except User.DoesNotExist: return {} except Exception as e: self.logger.error(f"User profile retrieval error for user {user_id}: {e}") return {} def _get_consent_records(self, user_id: int) -> List[Dict]: """ Get consent records for user. """ try: from .models import PDPAConsentRecord records = PDPAConsentRecord.objects.filter(user_id=user_id) return [ { 'id': record.id, 'consent_type': record.consent_type, 'consent_given': record.consent_given, 'created_at': record.created_at.isoformat(), 'metadata': record.metadata, } for record in records ] except Exception as e: self.logger.error(f"Consent records retrieval error for user {user_id}: {e}") return [] def _get_retention_policy(self, user_id: int) -> Dict: """ Get retention policy for user. """ try: from .models import PDPADataRetention policy = PDPADataRetention.objects.filter(user_id=user_id).first() if policy: return { 'retention_period': policy.retention_period, 'created_at': policy.created_at.isoformat(), 'updated_at': policy.updated_at.isoformat(), } return {'retention_period': 'standard'} except Exception as e: self.logger.error(f"Retention policy retrieval error for user {user_id}: {e}") return {'retention_period': 'standard'} def _get_business_data(self, user_id: int) -> Dict: """ Get business-related data for user. """ try: user = User.objects.get(id=user_id) business_data = {} # Get business registration if exists if hasattr(user, 'business_registration'): business_data['registration'] = { 'registration_number': user.business_registration.registration_number, 'business_name': user.business_registration.business_name, 'business_type': user.business_registration.business_type, 'state': user.business_registration.state, 'registration_date': user.business_registration.registration_date.isoformat(), } # Get Malaysian services data business_data['malaysian_services'] = { 'ic_validations': list(user.malaysian_ic_validation.all().values( 'id', 'ic_number', 'validation_result', 'created_at' )), 'sst_calculations': list(user.sst_calculation.all().values( 'id', 'amount', 'sst_amount', 'state', 'created_at' )), 'postcode_lookups': list(user.postcode_lookup.all().values( 'id', 'postcode', 'result', 'created_at' )), } return business_data except Exception as e: self.logger.error(f"Business data retrieval error for user {user_id}: {e}") return {} def _get_activity_logs(self, user_id: int) -> List[Dict]: """ Get activity logs for user. """ try: from django.contrib.auth.models import LogEntry logs = LogEntry.objects.filter(user_id=user_id).order_by('-action_time')[:100] return [ { 'id': log.id, 'action_time': log.action_time.isoformat(), 'action_flag': log.action_flag, 'change_message': log.change_message, } for log in logs ] except Exception as e: self.logger.error(f"Activity logs retrieval error for user {user_id}: {e}") return [] def _get_security_logs(self, user_id: int) -> List[Dict]: """ Get security logs for user. """ try: from monitoring.models import SecurityLog logs = SecurityLog.objects.filter(user_id=user_id).order_by('-timestamp')[:100] return [ { 'id': log.id, 'timestamp': log.timestamp.isoformat(), 'event_type': log.event_type, 'severity': log.severity, 'description': log.description, } for log in logs ] except Exception as e: self.logger.error(f"Security logs retrieval error for user {user_id}: {e}") return [] def _apply_export_anonymization(self, user_data: Dict) -> Dict: """ Apply anonymization for export. """ # Apply anonymization to sensitive data if 'user_profile' in user_data: user_data['user_profile'] = self.data_processor.anonymize_data( user_data['user_profile'], PDPADataCategory.IDENTIFICATION ) if 'business_data' in user_data: user_data['business_data'] = self.data_processor.anonymize_data( user_data['business_data'], PDPADataCategory.FINANCIAL ) return user_data def _export_json(self, data: Dict) -> HttpResponse: """ Export data as JSON. """ response = HttpResponse( json.dumps(data, indent=2, default=str), content_type='application/json' ) response['Content-Disposition'] = 'attachment; filename="user_data_export.json"' return response def _export_csv(self, data: Dict) -> HttpResponse: """ Export data as CSV. """ output = io.StringIO() writer = csv.writer(output) # Write user profile writer.writerow(['User Profile']) if 'user_profile' in data: for key, value in data['user_profile'].items(): writer.writerow([key, value]) writer.writerow([]) # Empty row # Write consent records writer.writerow(['Consent Records']) if 'consent_records' in data: writer.writerow(['Consent Type', 'Given', 'Date']) for record in data['consent_records']: writer.writerow([ record['consent_type'], record['consent_given'], record['created_at'] ]) response = HttpResponse(output.getvalue(), content_type='text/csv') response['Content-Disposition'] = 'attachment; filename="user_data_export.csv"' return response def _export_xlsx(self, data: Dict) -> HttpResponse: """ Export data as Excel file. """ output = io.BytesIO() workbook = xlsxwriter.Workbook(output) # User profile sheet if 'user_profile' in data: profile_sheet = workbook.add_worksheet('User Profile') row = 0 for key, value in data['user_profile'].items(): profile_sheet.write(row, 0, key) profile_sheet.write(row, 1, str(value)) row += 1 # Consent records sheet if 'consent_records' in data: consent_sheet = workbook.add_worksheet('Consent Records') consent_sheet.write_row(0, 0, ['Consent Type', 'Given', 'Date']) row = 1 for record in data['consent_records']: consent_sheet.write_row(row, 0, [ record['consent_type'], record['consent_given'], record['created_at'] ]) row += 1 workbook.close() response = HttpResponse(output.getvalue(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = 'attachment; filename="user_data_export.xlsx"' return response def _export_pdf(self, data: Dict) -> HttpResponse: """ Export data as PDF. """ output = io.BytesIO() # Create simple PDF report from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas p = canvas.Canvas(output, pagesize=letter) p.drawString(100, 750, "User Data Export Report") p.drawString(100, 730, f"Generated: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}") y_position = 700 if 'user_profile' in data: p.drawString(100, y_position, "User Profile:") y_position -= 20 for key, value in data['user_profile'].items(): p.drawString(120, y_position, f"{key}: {str(value)}") y_position -= 15 p.save() response = HttpResponse(output.getvalue(), content_type='application/pdf') response['Content-Disposition'] = 'attachment; filename="user_data_export.pdf"' return response class PDPARequestHandler: """ Handler for PDPA data subject requests. """ def __init__(self): self.logger = logging.getLogger('security.pdpa.requests') self.consent_manager = PDPAConsentManager() self.data_exporter = PDPADataExporter() self.retention_manager = PDPADataRetentionManager() def handle_data_access_request(self, user_id: int, format: str = 'json') -> HttpResponse: """ Handle data subject access request. """ try: # Log the request self._log_access_request(user_id) # Export user data return self.data_exporter.export_user_data(user_id, format) except Exception as e: self.logger.error(f"Data access request error for user {user_id}: {e}") return JsonResponse( {'error': 'Access request failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def handle_data_deletion_request(self, user_id: int, confirmation: bool = False) -> JsonResponse: """ Handle data subject deletion request. """ try: if not confirmation: return JsonResponse({ 'message': 'Confirmation required for data deletion', 'requires_confirmation': True }) # Log the request self._log_deletion_request(user_id) # Apply retention policies self.retention_manager.apply_retention_policies() # Anonymize user data self._anonymize_user_data(user_id) return JsonResponse({ 'message': 'Data deletion request processed successfully', 'user_id': user_id, 'timestamp': timezone.now().isoformat() }) except Exception as e: self.logger.error(f"Data deletion request error for user {user_id}: {e}") return JsonResponse( {'error': 'Deletion request failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def handle_consent_withdrawal(self, user_id: int, consent_type: str) -> JsonResponse: """ Handle consent withdrawal request. """ try: # Validate consent type try: consent_enum = PDPAConsentType(consent_type) except ValueError: return JsonResponse( {'error': 'Invalid consent type'}, status=status.HTTP_400_BAD_REQUEST ) # Withdraw consent success = self.consent_manager.record_consent( user_id=user_id, consent_type=consent_enum, consent_given=False, metadata={'action': 'withdrawal'} ) if success: return JsonResponse({ 'message': f'Consent withdrawn for {consent_type}', 'user_id': user_id, 'consent_type': consent_type, 'timestamp': timezone.now().isoformat() }) else: return JsonResponse( {'error': 'Consent withdrawal failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) except Exception as e: self.logger.error(f"Consent withdrawal error for user {user_id}: {e}") return JsonResponse( {'error': 'Consent withdrawal failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def _log_access_request(self, user_id: int): """ Log data access request. """ try: self.logger.info(f"Data access request: user={user_id}") # Store in audit log from monitoring.models import SecurityLog SecurityLog.objects.create( user_id=user_id, event_type='data_access_request', severity='info', description=f'User requested data access', metadata={'request_type': 'access', 'timestamp': timezone.now().isoformat()} ) except Exception as e: self.logger.error(f"Access request logging error: {e}") def _log_deletion_request(self, user_id: int): """ Log data deletion request. """ try: self.logger.info(f"Data deletion request: user={user_id}") # Store in audit log from monitoring.models import SecurityLog SecurityLog.objects.create( user_id=user_id, event_type='data_deletion_request', severity='warning', description=f'User requested data deletion', metadata={'request_type': 'deletion', 'timestamp': timezone.now().isoformat()} ) except Exception as e: self.logger.error(f"Deletion request logging error: {e}") def _anonymize_user_data(self, user_id: int): """ Anonymize user data. """ try: user = User.objects.get(id=user_id) # Anonymize profile data user.first_name = "ANONYMIZED" user.last_name = "USER" user.email = f"anon_{user_id}@example.com" user.username = f"anon_user_{user_id}" user.save() self.logger.info(f"User data anonymized: user={user_id}") except Exception as e: self.logger.error(f"User anonymization error: {e}") # PDPA Views @api_view(['POST']) @permission_classes([IsAuthenticated]) def pdpa_consent_view(request): """ Record PDPA consent. """ try: consent_type = request.data.get('consent_type') consent_given = request.data.get('consent_given', False) if not consent_type: return JsonResponse( {'error': 'Consent type required'}, status=status.HTTP_400_BAD_REQUEST ) # Validate consent type try: consent_enum = PDPAConsentType(consent_type) except ValueError: return JsonResponse( {'error': 'Invalid consent type'}, status=status.HTTP_400_BAD_REQUEST ) # Record consent consent_manager = PDPAConsentManager() success = consent_manager.record_consent( user_id=request.user.id, consent_type=consent_enum, consent_given=consent_given, metadata=request.data.get('metadata', {}) ) if success: return JsonResponse({ 'message': 'Consent recorded successfully', 'consent_type': consent_type, 'consent_given': consent_given }) else: return JsonResponse( {'error': 'Consent recording failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) except Exception as e: logger.error(f"PDPA consent error: {e}") return JsonResponse( {'error': 'Consent recording failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['GET']) @permission_classes([IsAuthenticated]) def pdpa_consents_view(request): """ Get user's PDPA consents. """ try: consent_manager = PDPAConsentManager() consents = consent_manager.get_user_consents(request.user.id) return JsonResponse(consents) except Exception as e: logger.error(f"PDPA consents retrieval error: {e}") return JsonResponse( {'error': 'Consents retrieval failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['POST']) @permission_classes([IsAuthenticated]) def pdpa_data_access_view(request): """ Request data access. """ try: format_type = request.data.get('format', 'json') request_handler = PDPARequestHandler() return request_handler.handle_data_access_request(request.user.id, format_type) except Exception as e: logger.error(f"PDPA data access error: {e}") return JsonResponse( {'error': 'Data access request failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['POST']) @permission_classes([IsAuthenticated]) def pdpa_data_deletion_view(request): """ Request data deletion. """ try: confirmation = request.data.get('confirmation', False) request_handler = PDPARequestHandler() return request_handler.handle_data_deletion_request(request.user.id, confirmation) except Exception as e: logger.error(f"PDPA data deletion error: {e}") return JsonResponse( {'error': 'Data deletion request failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['POST']) @permission_classes([IsAuthenticated]) def pdpa_consent_withdrawal_view(request): """ Withdraw consent. """ try: consent_type = request.data.get('consent_type') if not consent_type: return JsonResponse( {'error': 'Consent type required'}, status=status.HTTP_400_BAD_REQUEST ) request_handler = PDPARequestHandler() return request_handler.handle_consent_withdrawal(request.user.id, consent_type) except Exception as e: logger.error(f"PDPA consent withdrawal error: {e}") return JsonResponse( {'error': 'Consent withdrawal failed'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) # PDPA Management Commands class PDPAManagementCommand: """ Management commands for PDPA compliance. """ def apply_retention_policies(self): """ Apply data retention policies. """ try: retention_manager = PDPADataRetentionManager() retention_manager.apply_retention_policies() print("Retention policies applied successfully") except Exception as e: print(f"Error applying retention policies: {e}") def audit_consent_records(self): """ Audit consent records for compliance. """ try: from .models import PDPAConsentRecord # Get consent records without recent updates outdated_records = PDPAConsentRecord.objects.filter( created_at__lt=timezone.now() - timedelta(days=365) ) print(f"Found {outdated_records.count()} outdated consent records") # Get users without consent records users_without_consent = User.objects.exclude( id__in=PDPAConsentRecord.objects.values('user_id') ).count() print(f"Found {users_without_consent} users without consent records") except Exception as e: print(f"Error auditing consent records: {e}") def generate_compliance_report(self): """ Generate PDPA compliance report. """ try: from .models import PDPAConsentRecord, PDPADataRetention report = { 'generated_at': timezone.now().isoformat(), 'total_users': User.objects.count(), 'users_with_consent': PDPAConsentRecord.objects.values('user_id').distinct().count(), 'users_with_retention_policy': PDPADataRetention.objects.count(), 'consent_records': { 'marketing': PDPAConsentRecord.objects.filter(consent_type='marketing').count(), 'analytics': PDPAConsentRecord.objects.filter(consent_type='analytics').count(), 'personalization': PDPAConsentRecord.objects.filter(consent_type='personalization').count(), 'third_party': PDPAConsentRecord.objects.filter(consent_type='third_party').count(), }, 'retention_policies': { 'minimal': PDPADataRetention.objects.filter(retention_period='minimal').count(), 'standard': PDPADataRetention.objects.filter(retention_period='standard').count(), 'extended': PDPADataRetention.objects.filter(retention_period='extended').count(), 'legal_hold': PDPADataRetention.objects.filter(retention_period='legal_hold').count(), } } print("PDPA Compliance Report:") print(json.dumps(report, indent=2)) return report except Exception as e: print(f"Error generating compliance report: {e}") return {}