#!/usr/bin/env python3 """ Deployment prerequisites checker for Malaysian SME SaaS platform. Validates that all prerequisites are met before deployment. """ import json import os import sys import subprocess import yaml import argparse from typing import Dict, List, Any, Optional from pathlib import Path class PrerequisiteChecker: """Comprehensive deployment prerequisite checker.""" def __init__(self, environment: str): self.environment = environment self.errors = [] self.warnings = [] self.checks_passed = 0 self.checks_failed = 0 self.checks_total = 0 def check_all(self) -> Dict[str, Any]: """Run all prerequisite checks.""" print(f"šŸ” Checking deployment prerequisites for {self.environment} environment...") print("=" * 60) # Environment and configuration checks self._check_environment_variables() self._check_configuration_files() self._check_database_connectivity() self._check_redis_connectivity() # Build and deployment checks self._check_docker_availability() self._check_docker_compose_availability() self._check_required_images() self._check_disk_space() # Security checks self._check_ssl_certificates() self._check_firewall_configuration() self._check_backup_availability() # Application-specific checks self._check_python_dependencies() self._check_node_dependencies() self._check_migrations_pending() self._check_static_files() # Performance and monitoring self._check_monitoring_tools() self._check_logging_configuration() self._check_resource_limits() # Generate report return self._generate_report() def _check_environment_variables(self) -> None: """Check required environment variables.""" print("\nšŸ“‹ Checking environment variables...") required_vars = { 'production': [ 'SECRET_KEY', 'DATABASE_URL', 'REDIS_URL', 'ALLOWED_HOSTS', 'CSRF_TRUSTED_ORIGINS', 'SENTRY_DSN', 'ROLLBAR_ACCESS_TOKEN', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_S3_BUCKET', 'SSL_CERT_PATH', 'SSL_KEY_PATH', ], 'staging': [ 'SECRET_KEY', 'DATABASE_URL', 'REDIS_URL', 'ALLOWED_HOSTS', 'SENTRY_DSN', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_S3_BUCKET', ] } required = required_vars.get(self.environment, []) self.checks_total += len(required) missing_vars = [] for var in required: if not os.getenv(var): missing_vars.append(var) self.errors.append(f"Missing required environment variable: {var}") else: self.checks_passed += 1 if missing_vars: self.checks_failed += len(missing_vars) print(f"āŒ Missing environment variables: {', '.join(missing_vars)}") else: print("āœ… All required environment variables are set") # Check sensitive variables sensitive_vars = ['SECRET_KEY', 'DATABASE_PASSWORD', 'REDIS_PASSWORD'] for var in sensitive_vars: value = os.getenv(var) if value and len(value) < 32: self.warnings.append(f"Sensitive variable {var} appears to be weak (length < 32)") def _check_configuration_files(self) -> None: """Check required configuration files.""" print("\nšŸ“„ Checking configuration files...") required_files = [ 'docker-compose.prod.yml', 'docker-compose.staging.yml', 'nginx.prod.conf', '.env', 'backend/.env', 'frontend/.env', 'backend/settings/production.py', 'backend/settings/staging.py', ] self.checks_total += len(required_files) missing_files = [] for file_path in required_files: if not Path(file_path).exists(): missing_files.append(file_path) self.errors.append(f"Missing configuration file: {file_path}") else: self.checks_passed += 1 if missing_files: self.checks_failed += len(missing_files) print(f"āŒ Missing configuration files: {', '.join(missing_files)}") else: print("āœ… All required configuration files exist") # Validate configuration file syntax self._validate_yaml_files() self._validate_env_files() def _validate_yaml_files(self) -> None: """Validate YAML configuration files.""" yaml_files = [ 'docker-compose.prod.yml', 'docker-compose.staging.yml', ] for yaml_file in yaml_files: if Path(yaml_file).exists(): try: with open(yaml_file, 'r') as f: yaml.safe_load(f) self.checks_passed += 1 print(f"āœ… {yaml_file} is valid YAML") except yaml.YAMLError as e: self.errors.append(f"Invalid YAML in {yaml_file}: {e}") self.checks_failed += 1 print(f"āŒ {yaml_file} has invalid YAML syntax") self.checks_total += len(yaml_files) def _validate_env_files(self) -> None: """Validate environment file syntax.""" env_files = [ '.env', 'backend/.env', 'frontend/.env', ] for env_file in env_files: if Path(env_file).exists(): try: with open(env_file, 'r') as f: for line_num, line in enumerate(f, 1): line = line.strip() if line and not line.startswith('#'): if '=' not in line: raise ValueError(f"Invalid format at line {line_num}") self.checks_passed += 1 print(f"āœ… {env_file} is valid") except Exception as e: self.errors.append(f"Invalid format in {env_file}: {e}") self.checks_failed += 1 print(f"āŒ {env_file} has invalid format") self.checks_total += len(env_files) def _check_database_connectivity(self) -> None: """Check database connectivity.""" print("\nšŸ—„ļø Checking database connectivity...") self.checks_total += 1 try: import psycopg2 from urllib.parse import urlparse db_url = os.getenv('DATABASE_URL') if not db_url: self.errors.append("DATABASE_URL environment variable not set") self.checks_failed += 1 print("āŒ DATABASE_URL not configured") return parsed = urlparse(db_url) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path[1:], user=parsed.username, password=parsed.password, connect_timeout=10 ) # Check database version with conn.cursor() as cursor: cursor.execute("SELECT version()") version = cursor.fetchone()[0] print(f"āœ… Database connected: PostgreSQL {version}") # Check database size with conn.cursor() as cursor: cursor.execute("SELECT pg_size_pretty(pg_database_size(current_database()))") size = cursor.fetchone()[0] print(f"šŸ“Š Database size: {size}") conn.close() self.checks_passed += 1 except ImportError: self.warnings.append("psycopg2 not available - skipping database check") print("āš ļø psycopg2 not available - skipping database check") except Exception as e: self.errors.append(f"Database connectivity failed: {e}") self.checks_failed += 1 print(f"āŒ Database connectivity failed: {e}") def _check_redis_connectivity(self) -> None: """Check Redis connectivity.""" print("\nšŸ”“ Checking Redis connectivity...") self.checks_total += 1 try: import redis from urllib.parse import urlparse redis_url = os.getenv('REDIS_URL') if not redis_url: self.errors.append("REDIS_URL environment variable not set") self.checks_failed += 1 print("āŒ REDIS_URL not configured") return parsed = urlparse(redis_url) r = redis.Redis( host=parsed.hostname, port=parsed.port or 6379, password=parsed.password, socket_timeout=10 ) # Test Redis connection info = r.info() print(f"āœ… Redis connected: version {info.get('redis_version', 'unknown')}") # Check memory usage used_memory = info.get('used_memory_human', 'unknown') print(f"šŸ“Š Redis memory usage: {used_memory}") r.close() self.checks_passed += 1 except ImportError: self.warnings.append("redis not available - skipping Redis check") print("āš ļø redis not available - skipping Redis check") except Exception as e: self.errors.append(f"Redis connectivity failed: {e}") self.checks_failed += 1 print(f"āŒ Redis connectivity failed: {e}") def _check_docker_availability(self) -> None: """Check Docker availability.""" print("\n🐳 Checking Docker availability...") self.checks_total += 1 try: result = subprocess.run(['docker', '--version'], capture_output=True, text=True) if result.returncode == 0: version = result.stdout.strip() print(f"āœ… Docker available: {version}") self.checks_passed += 1 # Check Docker daemon try: subprocess.run(['docker', 'info'], capture_output=True, timeout=10) print("āœ… Docker daemon is running") except subprocess.TimeoutExpired: self.warnings.append("Docker daemon response time is slow") except Exception as e: self.errors.append(f"Docker daemon not running: {e}") self.checks_failed += 1 return else: self.errors.append(f"Docker not available: {result.stderr}") self.checks_failed += 1 print("āŒ Docker not available") except FileNotFoundError: self.errors.append("Docker not found in PATH") self.checks_failed += 1 print("āŒ Docker not found in PATH") except Exception as e: self.errors.append(f"Docker check failed: {e}") self.checks_failed += 1 print(f"āŒ Docker check failed: {e}") def _check_docker_compose_availability(self) -> None: """Check Docker Compose availability.""" print("\n🐳 Checking Docker Compose availability...") self.checks_total += 1 # Try docker-compose first, then docker compose compose_cmd = None for cmd in ['docker-compose', 'docker compose']: try: result = subprocess.run([cmd, '--version'], capture_output=True, text=True) if result.returncode == 0: compose_cmd = cmd version = result.stdout.strip() print(f"āœ… Docker Compose available: {version}") self.checks_passed += 1 break except (FileNotFoundError, subprocess.SubprocessError): continue if not compose_cmd: self.errors.append("Docker Compose not available") self.checks_failed += 1 print("āŒ Docker Compose not available") def _check_required_images(self) -> None: """Check required Docker images.""" print("\nšŸ“¦ Checking required Docker images...") required_images = [ 'postgres:15', 'redis:7-alpine', 'nginx:alpine', ] self.checks_total += len(required_images) for image in required_images: try: result = subprocess.run(['docker', 'images', '-q', image], capture_output=True, text=True) if result.stdout.strip(): print(f"āœ… {image} is available locally") self.checks_passed += 1 else: print(f"āš ļø {image} not available locally (will be pulled during deployment)") self.warnings.append(f"Docker image {image} not available locally") except Exception as e: self.errors.append(f"Failed to check image {image}: {e}") self.checks_failed += 1 print(f"āŒ Failed to check image {image}") def _check_disk_space(self) -> None: """Check available disk space.""" print("\nšŸ’¾ Checking disk space...") self.checks_total += 1 try: result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True) if result.returncode == 0: lines = result.stdout.strip().split('\n') if len(lines) > 1: parts = lines[1].split() if len(parts) >= 5: usage_percent = parts[4].replace('%', '') available = parts[3] print(f"šŸ“Š Disk usage: {usage_percent}%, Available: {available}") if int(usage_percent) > 80: self.warnings.append(f"Disk usage is high: {usage_percent}%") elif int(usage_percent) > 90: self.errors.append(f"Disk usage is critical: {usage_percent}%") self.checks_failed += 1 return self.checks_passed += 1 else: self.errors.append(f"Failed to check disk space: {result.stderr}") self.checks_failed += 1 except Exception as e: self.errors.append(f"Disk space check failed: {e}") self.checks_failed += 1 def _check_ssl_certificates(self) -> None: """Check SSL certificates.""" print("\nšŸ”’ Checking SSL certificates...") cert_path = os.getenv('SSL_CERT_PATH') key_path = os.getenv('SSL_KEY_PATH') if cert_path and key_path: self.checks_total += 2 if Path(cert_path).exists(): print(f"āœ… SSL certificate found: {cert_path}") self.checks_passed += 1 else: self.errors.append(f"SSL certificate not found: {cert_path}") self.checks_failed += 1 if Path(key_path).exists(): print(f"āœ… SSL key found: {key_path}") self.checks_passed += 1 else: self.errors.append(f"SSL key not found: {key_path}") self.checks_failed += 1 else: self.warnings.append("SSL certificate paths not configured") print("āš ļø SSL certificate paths not configured") def _check_firewall_configuration(self) -> None: """Check firewall configuration.""" print("\nšŸ›”ļø Checking firewall configuration...") self.checks_total += 1 required_ports = [80, 443, 5432, 6379] open_ports = [] try: # Check if ufw is available result = subprocess.run(['ufw', 'status'], capture_output=True, text=True) if result.returncode == 0: if 'Status: active' in result.stdout: print("āœ… UFW firewall is active") self.checks_passed += 1 else: self.warnings.append("UFW firewall is not active") print("āš ļø UFW firewall is not active") else: # Try checking iptables result = subprocess.run(['iptables', '-L'], capture_output=True, text=True) if result.returncode == 0: print("āœ… iptables is available") self.checks_passed += 1 else: self.warnings.append("Could not verify firewall configuration") print("āš ļø Could not verify firewall configuration") except FileNotFoundError: self.warnings.append("Firewall management tools not found") print("āš ļø Firewall management tools not found") except Exception as e: self.errors.append(f"Firewall check failed: {e}") self.checks_failed += 1 def _check_backup_availability(self) -> None: """Check backup availability.""" print("\nšŸ’¾ Checking backup availability...") self.checks_total += 1 backup_dir = Path('./backups') if backup_dir.exists(): backup_files = list(backup_dir.glob('*.sql')) if backup_files: latest_backup = max(backup_files, key=lambda x: x.stat().st_mtime) print(f"āœ… Found {len(backup_files)} backup files") print(f"šŸ“„ Latest backup: {latest_backup.name}") self.checks_passed += 1 else: self.warnings.append("No backup files found") print("āš ļø No backup files found") else: self.warnings.append("Backup directory not found") print("āš ļø Backup directory not found") def _check_python_dependencies(self) -> None: """Check Python dependencies.""" print("\nšŸ Checking Python dependencies...") self.checks_total += 1 requirements_files = [ 'requirements.txt', 'requirements-dev.txt', ] missing_files = [] for req_file in requirements_files: if not Path(req_file).exists(): missing_files.append(req_file) if missing_files: self.errors.append(f"Missing requirements files: {', '.join(missing_files)}") self.checks_failed += 1 print(f"āŒ Missing requirements files: {', '.join(missing_files)}") else: print("āœ… All Python requirements files exist") self.checks_passed += 1 def _check_node_dependencies(self) -> None: """Check Node.js dependencies.""" print("\nšŸ“¦ Checking Node.js dependencies...") self.checks_total += 1 if Path('frontend/package.json').exists(): if Path('frontend/package-lock.json').exists(): print("āœ… Node.js dependencies are locked") self.checks_passed += 1 else: self.warnings.append("package-lock.json not found") print("āš ļø package-lock.json not found") else: self.warnings.append("Frontend package.json not found") print("āš ļø Frontend package.json not found") def _check_migrations_pending(self) -> None: """Check for pending database migrations.""" print("\nšŸ”„ Checking for pending migrations...") self.checks_total += 1 try: result = subprocess.run([ 'python', 'backend/manage.py', 'showmigrations', '--plan' ], capture_output=True, text=True) if result.returncode == 0: migrations = [line for line in result.stdout.split('\n') if '[ ]' in line] if migrations: print(f"āš ļø Found {len(migrations)} pending migrations") self.warnings.append(f"Found {len(migrations)} pending database migrations") else: print("āœ… No pending migrations") self.checks_passed += 1 else: self.errors.append(f"Failed to check migrations: {result.stderr}") self.checks_failed += 1 except Exception as e: self.errors.append(f"Migration check failed: {e}") self.checks_failed += 1 def _check_static_files(self) -> None: """Check static files configuration.""" print("\nšŸ“„ Checking static files...") self.checks_total += 1 static_dirs = [ 'backend/static', 'frontend/build/static', ] existing_dirs = [] for static_dir in static_dirs: if Path(static_dir).exists(): existing_dirs.append(static_dir) if existing_dirs: print(f"āœ… Found static directories: {', '.join(existing_dirs)}") self.checks_passed += 1 else: self.warnings.append("No static directories found") print("āš ļø No static directories found") def _check_monitoring_tools(self) -> None: """Check monitoring tools configuration.""" print("\nšŸ“Š Checking monitoring tools...") self.checks_total += 3 # Check Prometheus prometheus_config = Path('./monitoring/prometheus.yml') if prometheus_config.exists(): print("āœ… Prometheus configuration found") self.checks_passed += 1 else: self.warnings.append("Prometheus configuration not found") print("āš ļø Prometheus configuration not found") # Check Grafana grafana_dir = Path('./monitoring/grafana') if grafana_dir.exists(): print("āœ… Grafana configuration found") self.checks_passed += 1 else: self.warnings.append("Grafana configuration not found") print("āš ļø Grafana configuration not found") # Check logging configuration log_config = Path('./monitoring/logstash/pipeline') if log_config.exists(): print("āœ… Logging configuration found") self.checks_passed += 1 else: self.warnings.append("Logging configuration not found") print("āš ļø Logging configuration not found") def _check_logging_configuration(self) -> None: """Check logging configuration.""" print("\nšŸ“ Checking logging configuration...") self.checks_total += 1 log_dir = Path('./logs') if not log_dir.exists(): try: log_dir.mkdir(parents=True) print("āœ… Created logs directory") self.checks_passed += 1 except Exception as e: self.errors.append(f"Failed to create logs directory: {e}") self.checks_failed += 1 else: print("āœ… Logs directory exists") self.checks_passed += 1 def _check_resource_limits(self) -> None: """Check system resource limits.""" print("\n⚔ Checking system resource limits...") self.checks_total += 2 # Check file descriptor limit try: import resource fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0] print(f"šŸ“Š File descriptor limit: {fd_limit}") if fd_limit < 65536: self.warnings.append(f"File descriptor limit is low: {fd_limit}") else: self.checks_passed += 1 except ImportError: self.warnings.append("resource module not available - skipping file descriptor check") print("āš ļø resource module not available - skipping file descriptor check") # Check memory limits try: with open('/proc/meminfo', 'r') as f: meminfo = f.read() total_mem = None for line in meminfo.split('\n'): if 'MemTotal:' in line: total_mem = int(line.split()[1]) break if total_mem: total_mem_gb = total_mem / 1024 / 1024 print(f"šŸ“Š Total memory: {total_mem_gb:.1f} GB") if total_mem_gb < 4: self.warnings.append(f"Available memory is low: {total_mem_gb:.1f} GB") else: self.checks_passed += 1 except FileNotFoundError: self.warnings.append("Could not read memory information") print("āš ļø Could not read memory information") def _generate_report(self) -> Dict[str, Any]: """Generate comprehensive report.""" print("\n" + "=" * 60) print("šŸ“‹ DEPLOYMENT PREREQUISITES REPORT") print("=" * 60) # Summary statistics total_checks = self.checks_passed + self.checks_failed success_rate = (self.checks_passed / total_checks * 100) if total_checks > 0 else 0 print(f"\nšŸ“Š Summary:") print(f"āœ… Passed: {self.checks_passed}/{total_checks} ({success_rate:.1f}%)") print(f"āŒ Failed: {self.checks_failed}") print(f"āš ļø Warnings: {len(self.warnings)}") # Detailed errors if self.errors: print(f"\nāŒ Errors ({len(self.errors)}):") for i, error in enumerate(self.errors, 1): print(f" {i}. {error}") # Warnings if self.warnings: print(f"\nāš ļø Warnings ({len(self.warnings)}):") for i, warning in enumerate(self.warnings, 1): print(f" {i}. {warning}") # Overall assessment print(f"\nšŸŽÆ Overall Assessment:") if self.checks_failed == 0: if success_rate >= 90: print("āœ… Ready for deployment") deployment_ready = True else: print("āš ļø Mostly ready - consider addressing warnings") deployment_ready = True else: print("āŒ Not ready for deployment - fix errors first") deployment_ready = False # Generate report data report = { 'environment': self.environment, 'timestamp': str(datetime.now()), 'summary': { 'total_checks': total_checks, 'passed': self.checks_passed, 'failed': self.checks_failed, 'warnings': len(self.warnings), 'success_rate': success_rate, }, 'deployment_ready': deployment_ready, 'errors': self.errors, 'warnings': self.warnings, } # Save report to file report_file = f'deployment-prerequisites-{self.environment}-{datetime.now().strftime("%Y%m%d-%H%M%S")}.json' with open(report_file, 'w') as f: json.dump(report, f, indent=2) print(f"\nšŸ“„ Report saved to: {report_file}") return report def main(): """Main function.""" parser = argparse.ArgumentParser(description='Check deployment prerequisites') parser.add_argument('--environment', '-e', required=True, choices=['staging', 'production'], help='Target environment') parser.add_argument('--output', '-o', help='Output file for report') parser.add_argument('--quiet', '-q', action='store_true', help='Quiet mode') args = parser.parse_args() if not args.quiet: print("šŸš€ Malaysian SME SaaS Platform - Deployment Prerequisites Checker") print("=" * 70) checker = PrerequisiteChecker(args.environment) report = checker.check_all() if args.output: with open(args.output, 'w') as f: json.dump(report, f, indent=2) # Exit with appropriate code sys.exit(0 if report['deployment_ready'] else 1) if __name__ == '__main__': from datetime import datetime main()