""" Performance Tests for Database Operations Tests for database performance optimization: - Query optimization - Connection pooling efficiency - Multi-tenant query performance - Index usage validation - Bulk operations performance Author: Claude """ import pytest import time import statistics from django.test import TestCase from django.db import connection, connections, transaction from django.contrib.auth import get_user_model from django.core.management import call_command from django.conf import settings from django.db.utils import OperationalError from decimal import Decimal from datetime import date, timedelta from backend.src.core.models.tenant import Tenant from backend.src.core.models.user import User from backend.src.modules.retail.models.product import Product from backend.src.modules.healthcare.models.patient import Patient from backend.src.modules.education.models.student import Student User = get_user_model() class DatabasePerformanceTest(TestCase): """Test cases for database performance optimization""" def setUp(self): self.tenant = Tenant.objects.create( name='Performance Test Sdn Bhd', schema_name='performance_test', domain='performancetest.com', business_type='retail' ) def test_query_performance_with_indexes(self): """Test query performance with proper indexing""" # Create test data products = [] for i in range(1000): products.append(Product( tenant=self.tenant, sku=f'PRD-{i:06d}', name=f'Product {i}', description=f'Description for product {i}', category='electronics', brand='Test Brand', barcode=f'123456789{i:04d}', unit='piece', current_stock=100 + i, minimum_stock=10, maximum_stock=500, purchase_price=Decimal('50.00') + (i * 0.1), selling_price=Decimal('100.00') + (i * 0.2), tax_rate=10.0, is_active=True )) # Bulk create for performance start_time = time.time() Product.objects.bulk_create(products) bulk_create_time = time.time() - start_time # Test indexed query performance start_time = time.time() products_by_sku = Product.objects.filter(sku__startswith='PRD-000') indexed_query_time = time.time() - start_time # Test non-indexed query performance (description) start_time = time.time() products_by_desc = Product.objects.filter(description__contains='Description for product') non_indexed_query_time = time.time() - start_time # Test tenant-isolated query performance start_time = time.time() tenant_products = Product.objects.filter(tenant=self.tenant) tenant_query_time = time.time() - start_time # Performance assertions self.assertLess(bulk_create_time, 5.0, "Bulk create should complete within 5 seconds") self.assertLess(indexed_query_time, 0.1, "Indexed query should complete within 100ms") self.assertLess(tenant_query_time, 0.1, "Tenant query should complete within 100ms") # Indexed query should be faster than non-indexed self.assertLess(indexed_query_time, non_indexed_query_time * 2, "Indexed query should be significantly faster") # Log performance metrics print(f"\nBulk create 1000 products: {bulk_create_time:.3f}s") print(f"Indexed query (SKU): {indexed_query_time:.3f}s") print(f"Non-indexed query (description): {non_indexed_query_time:.3f}s") print(f"Tenant isolated query: {tenant_query_time:.3f}s") def test_connection_pooling_efficiency(self): """Test database connection pooling efficiency""" connection_times = [] # Test multiple rapid connections for i in range(50): start_time = time.time() with connection.cursor() as cursor: cursor.execute("SELECT 1") cursor.fetchone() connection_times.append(time.time() - start_time) # Analyze connection performance avg_connection_time = statistics.mean(connection_times) max_connection_time = max(connection_times) min_connection_time = min(connection_times) # Performance assertions self.assertLess(avg_connection_time, 0.05, "Average connection time should be under 50ms") self.assertLess(max_connection_time, 0.1, "Maximum connection time should be under 100ms") print(f"\nConnection pooling performance:") print(f"Average connection time: {avg_connection_time:.3f}s") print(f"Max connection time: {max_connection_time:.3f}s") print(f"Min connection time: {min_connection_time:.3f}s") def test_multi_tenant_query_performance(self): """Test multi-tenant query performance""" # Create multiple tenants tenants = [] for i in range(10): tenant = Tenant.objects.create( name=f'Tenant {i}', schema_name=f'tenant_{i}', domain=f'tenant{i}.com', business_type='retail' ) tenants.append(tenant) # Create products for each tenant all_products = [] for tenant in tenants: for i in range(100): all_products.append(Product( tenant=tenant, sku=f'{tenant.schema_name}-PRD-{i:03d}', name=f'Product {i} for {tenant.name}', category='electronics', unit='piece', current_stock=100, minimum_stock=10, purchase_price=Decimal('50.00'), selling_price=Decimal('100.00'), tax_rate=10.0, is_active=True )) Product.objects.bulk_create(all_products) # Test cross-tenant query performance start_time = time.time() all_tenant_products = Product.objects.filter( tenant__in=tenants[:5] ).select_related('tenant') cross_tenant_time = time.time() - start_time # Test single tenant query performance start_time = time.time() single_tenant_products = Product.objects.filter( tenant=tenants[0] ) single_tenant_time = time.time() - start_time # Test tenant-specific schema performance start_time = time.time() with connection.cursor() as cursor: cursor.execute(f'SET search_path TO "{tenants[0].schema_name}", public;') cursor.execute("SELECT COUNT(*) FROM core_product") cursor.fetchone() schema_query_time = time.time() - start_time # Performance assertions self.assertLess(cross_tenant_time, 0.5, "Cross-tenant query should be fast") self.assertLess(single_tenant_time, 0.1, "Single tenant query should be fast") self.assertLess(schema_query_time, 0.05, "Schema-specific query should be fast") print(f"\nMulti-tenant query performance:") print(f"Cross-tenant query: {cross_tenant_time:.3f}s") print(f"Single tenant query: {single_tenant_time:.3f}s") print(f"Schema-specific query: {schema_query_time:.3f}s") def test_bulk_operations_performance(self): """Test bulk operations performance""" # Test bulk create performance products_to_create = [] for i in range(500): products_to_create.append(Product( tenant=self.tenant, sku=f'BULK-{i:06d}', name=f'Bulk Product {i}', category='electronics', unit='piece', current_stock=100, minimum_stock=10, purchase_price=Decimal('50.00'), selling_price=Decimal('100.00'), tax_rate=10.0, is_active=True )) start_time = time.time() Product.objects.bulk_create(products_to_create) bulk_create_time = time.time() - start_time # Test bulk update performance products = Product.objects.filter(sku__startswith='BULK-') for product in products: product.current_stock += 10 start_time = time.time() Product.objects.bulk_update(products, ['current_stock']) bulk_update_time = time.time() - start_time # Test bulk delete performance start_time = time.time() Product.objects.filter(sku__startswith='BULK-').delete() bulk_delete_time = time.time() - start_time # Performance assertions self.assertLess(bulk_create_time, 2.0, "Bulk create 500 items should be fast") self.assertLess(bulk_update_time, 1.0, "Bulk update 500 items should be fast") self.assertLess(bulk_delete_time, 0.5, "Bulk delete 500 items should be fast") print(f"\nBulk operations performance:") print(f"Bulk create 500 items: {bulk_create_time:.3f}s") print(f"Bulk update 500 items: {bulk_update_time:.3f}s") print(f"Bulk delete 500 items: {bulk_delete_time:.3f}s") def test_transaction_performance(self): """Test transaction performance""" def test_transaction_operations(): with transaction.atomic(): # Create multiple records in a single transaction for i in range(100): Product.objects.create( tenant=self.tenant, sku=f'TXN-{i:06d}', name=f'Transaction Product {i}', category='electronics', unit='piece', current_stock=100, minimum_stock=10, purchase_price=Decimal('50.00'), selling_price=Decimal('100.00'), tax_rate=10.0, is_active=True ) # Test transaction performance transaction_times = [] for i in range(10): start_time = time.time() test_transaction_operations() transaction_times.append(time.time() - start_time) # Clean up Product.objects.filter(sku__startswith='TXN-').delete() avg_transaction_time = statistics.mean(transaction_times) max_transaction_time = max(transaction_times) # Performance assertions self.assertLess(avg_transaction_time, 1.0, "Average transaction time should be under 1 second") self.assertLess(max_transaction_time, 2.0, "Maximum transaction time should be under 2 seconds") print(f"\nTransaction performance:") print(f"Average transaction time: {avg_transaction_time:.3f}s") print(f"Max transaction time: {max_transaction_time:.3f}s") def test_select_related_performance(self): """Test select_related and prefetch_related performance""" # Create test data with relationships products = [] for i in range(100): products.append(Product( tenant=self.tenant, sku=f'REL-{i:06d}', name=f'Related Product {i}', category='electronics', unit='piece', current_stock=100, minimum_stock=10, purchase_price=Decimal('50.00'), selling_price=Decimal('100.00'), tax_rate=10.0, is_active=True )) Product.objects.bulk_create(products) # Test query without select_related start_time = time.time() products_no_select = Product.objects.filter(tenant=self.tenant) for product in products_no_select: _ = product.tenant.name # This will cause additional queries no_select_time = time.time() - start_time # Test query with select_related start_time = time.time() products_with_select = Product.objects.filter( tenant=self.tenant ).select_related('tenant') for product in products_with_select: _ = product.tenant.name # This should not cause additional queries with_select_time = time.time() - start_time # Performance assertions self.assertLess(with_select_time, no_select_time * 0.5, "Query with select_related should be much faster") print(f"\nSelect_related performance:") print(f"Without select_related: {no_select_time:.3f}s") print(f"With select_related: {with_select_time:.3f}s") print(f"Performance improvement: {(no_select_time / with_select_time):.1f}x") def test_query_caching_performance(self): """Test query caching performance""" # Create test data products = [] for i in range(100): products.append(Product( tenant=self.tenant, sku=f'CACHE-{i:06d}', name=f'Cached Product {i}', category='electronics', unit='piece', current_stock=100, minimum_stock=10, purchase_price=Decimal('50.00'), selling_price=Decimal('100.00'), tax_rate=10.0, is_active=True )) Product.objects.bulk_create(products) # Test repeated query performance query_times = [] for i in range(20): start_time = time.time() products = Product.objects.filter(tenant=self.tenant) list(products) # Force evaluation query_times.append(time.time() - start_time) # Analyze caching performance first_query_time = query_times[0] avg_subsequent_time = statistics.mean(query_times[1:]) # Subsequent queries should be faster due to caching self.assertLess(avg_subsequent_time, first_query_time * 0.8, "Subsequent queries should benefit from caching") print(f"\nQuery caching performance:") print(f"First query time: {first_query_time:.3f}s") print(f"Average subsequent query time: {avg_subsequent_time:.3f}s") print(f"Caching improvement: {(first_query_time / avg_subsequent_time):.1f}x") def test_database_connection_health(self): """Test database connection health and reliability""" health_results = [] # Test connection health over multiple attempts for i in range(10): start_time = time.time() try: with connection.cursor() as cursor: cursor.execute("SELECT 1") result = cursor.fetchone() health_results.append({ 'success': True, 'time': time.time() - start_time, 'result': result }) except OperationalError as e: health_results.append({ 'success': False, 'time': time.time() - start_time, 'error': str(e) }) # Analyze connection health successful_connections = [r for r in health_results if r['success']] failed_connections = [r for r in health_results if not r['success']] # All connections should succeed self.assertEqual(len(failed_connections), 0, "All database connections should succeed") # Connection times should be consistent connection_times = [r['time'] for r in successful_connections] avg_time = statistics.mean(connection_times) max_time = max(connection_times) self.assertLess(avg_time, 0.05, "Average connection time should be under 50ms") self.assertLess(max_time, 0.1, "Maximum connection time should be under 100ms") print(f"\nDatabase connection health:") print(f"Successful connections: {len(successful_connections)}/10") print(f"Failed connections: {len(failed_connections)}/10") print(f"Average connection time: {avg_time:.3f}s") print(f"Maximum connection time: {max_time:.3f}s")