Files
multitenetsaas/backend/tests/performance/test_database_performance.py
AHMET YILMAZ b3fff546e9
Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
project initialization
2025-10-05 02:37:33 +08:00

418 lines
16 KiB
Python

"""
Performance Tests for Database Operations
Tests for database performance optimization:
- Query optimization
- Connection pooling efficiency
- Multi-tenant query performance
- Index usage validation
- Bulk operations performance
Author: Claude
"""
import pytest
import time
import statistics
from django.test import TestCase
from django.db import connection, connections, transaction
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.conf import settings
from django.db.utils import OperationalError
from decimal import Decimal
from datetime import date, timedelta
from backend.src.core.models.tenant import Tenant
from backend.src.core.models.user import User
from backend.src.modules.retail.models.product import Product
from backend.src.modules.healthcare.models.patient import Patient
from backend.src.modules.education.models.student import Student
User = get_user_model()
class DatabasePerformanceTest(TestCase):
"""Test cases for database performance optimization"""
def setUp(self):
self.tenant = Tenant.objects.create(
name='Performance Test Sdn Bhd',
schema_name='performance_test',
domain='performancetest.com',
business_type='retail'
)
def test_query_performance_with_indexes(self):
"""Test query performance with proper indexing"""
# Create test data
products = []
for i in range(1000):
products.append(Product(
tenant=self.tenant,
sku=f'PRD-{i:06d}',
name=f'Product {i}',
description=f'Description for product {i}',
category='electronics',
brand='Test Brand',
barcode=f'123456789{i:04d}',
unit='piece',
current_stock=100 + i,
minimum_stock=10,
maximum_stock=500,
purchase_price=Decimal('50.00') + (i * 0.1),
selling_price=Decimal('100.00') + (i * 0.2),
tax_rate=10.0,
is_active=True
))
# Bulk create for performance
start_time = time.time()
Product.objects.bulk_create(products)
bulk_create_time = time.time() - start_time
# Test indexed query performance
start_time = time.time()
products_by_sku = Product.objects.filter(sku__startswith='PRD-000')
indexed_query_time = time.time() - start_time
# Test non-indexed query performance (description)
start_time = time.time()
products_by_desc = Product.objects.filter(description__contains='Description for product')
non_indexed_query_time = time.time() - start_time
# Test tenant-isolated query performance
start_time = time.time()
tenant_products = Product.objects.filter(tenant=self.tenant)
tenant_query_time = time.time() - start_time
# Performance assertions
self.assertLess(bulk_create_time, 5.0, "Bulk create should complete within 5 seconds")
self.assertLess(indexed_query_time, 0.1, "Indexed query should complete within 100ms")
self.assertLess(tenant_query_time, 0.1, "Tenant query should complete within 100ms")
# Indexed query should be faster than non-indexed
self.assertLess(indexed_query_time, non_indexed_query_time * 2,
"Indexed query should be significantly faster")
# Log performance metrics
print(f"\nBulk create 1000 products: {bulk_create_time:.3f}s")
print(f"Indexed query (SKU): {indexed_query_time:.3f}s")
print(f"Non-indexed query (description): {non_indexed_query_time:.3f}s")
print(f"Tenant isolated query: {tenant_query_time:.3f}s")
def test_connection_pooling_efficiency(self):
"""Test database connection pooling efficiency"""
connection_times = []
# Test multiple rapid connections
for i in range(50):
start_time = time.time()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
connection_times.append(time.time() - start_time)
# Analyze connection performance
avg_connection_time = statistics.mean(connection_times)
max_connection_time = max(connection_times)
min_connection_time = min(connection_times)
# Performance assertions
self.assertLess(avg_connection_time, 0.05,
"Average connection time should be under 50ms")
self.assertLess(max_connection_time, 0.1,
"Maximum connection time should be under 100ms")
print(f"\nConnection pooling performance:")
print(f"Average connection time: {avg_connection_time:.3f}s")
print(f"Max connection time: {max_connection_time:.3f}s")
print(f"Min connection time: {min_connection_time:.3f}s")
def test_multi_tenant_query_performance(self):
"""Test multi-tenant query performance"""
# Create multiple tenants
tenants = []
for i in range(10):
tenant = Tenant.objects.create(
name=f'Tenant {i}',
schema_name=f'tenant_{i}',
domain=f'tenant{i}.com',
business_type='retail'
)
tenants.append(tenant)
# Create products for each tenant
all_products = []
for tenant in tenants:
for i in range(100):
all_products.append(Product(
tenant=tenant,
sku=f'{tenant.schema_name}-PRD-{i:03d}',
name=f'Product {i} for {tenant.name}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=10.0,
is_active=True
))
Product.objects.bulk_create(all_products)
# Test cross-tenant query performance
start_time = time.time()
all_tenant_products = Product.objects.filter(
tenant__in=tenants[:5]
).select_related('tenant')
cross_tenant_time = time.time() - start_time
# Test single tenant query performance
start_time = time.time()
single_tenant_products = Product.objects.filter(
tenant=tenants[0]
)
single_tenant_time = time.time() - start_time
# Test tenant-specific schema performance
start_time = time.time()
with connection.cursor() as cursor:
cursor.execute(f'SET search_path TO "{tenants[0].schema_name}", public;')
cursor.execute("SELECT COUNT(*) FROM core_product")
cursor.fetchone()
schema_query_time = time.time() - start_time
# Performance assertions
self.assertLess(cross_tenant_time, 0.5, "Cross-tenant query should be fast")
self.assertLess(single_tenant_time, 0.1, "Single tenant query should be fast")
self.assertLess(schema_query_time, 0.05, "Schema-specific query should be fast")
print(f"\nMulti-tenant query performance:")
print(f"Cross-tenant query: {cross_tenant_time:.3f}s")
print(f"Single tenant query: {single_tenant_time:.3f}s")
print(f"Schema-specific query: {schema_query_time:.3f}s")
def test_bulk_operations_performance(self):
"""Test bulk operations performance"""
# Test bulk create performance
products_to_create = []
for i in range(500):
products_to_create.append(Product(
tenant=self.tenant,
sku=f'BULK-{i:06d}',
name=f'Bulk Product {i}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=10.0,
is_active=True
))
start_time = time.time()
Product.objects.bulk_create(products_to_create)
bulk_create_time = time.time() - start_time
# Test bulk update performance
products = Product.objects.filter(sku__startswith='BULK-')
for product in products:
product.current_stock += 10
start_time = time.time()
Product.objects.bulk_update(products, ['current_stock'])
bulk_update_time = time.time() - start_time
# Test bulk delete performance
start_time = time.time()
Product.objects.filter(sku__startswith='BULK-').delete()
bulk_delete_time = time.time() - start_time
# Performance assertions
self.assertLess(bulk_create_time, 2.0, "Bulk create 500 items should be fast")
self.assertLess(bulk_update_time, 1.0, "Bulk update 500 items should be fast")
self.assertLess(bulk_delete_time, 0.5, "Bulk delete 500 items should be fast")
print(f"\nBulk operations performance:")
print(f"Bulk create 500 items: {bulk_create_time:.3f}s")
print(f"Bulk update 500 items: {bulk_update_time:.3f}s")
print(f"Bulk delete 500 items: {bulk_delete_time:.3f}s")
def test_transaction_performance(self):
"""Test transaction performance"""
def test_transaction_operations():
with transaction.atomic():
# Create multiple records in a single transaction
for i in range(100):
Product.objects.create(
tenant=self.tenant,
sku=f'TXN-{i:06d}',
name=f'Transaction Product {i}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=10.0,
is_active=True
)
# Test transaction performance
transaction_times = []
for i in range(10):
start_time = time.time()
test_transaction_operations()
transaction_times.append(time.time() - start_time)
# Clean up
Product.objects.filter(sku__startswith='TXN-').delete()
avg_transaction_time = statistics.mean(transaction_times)
max_transaction_time = max(transaction_times)
# Performance assertions
self.assertLess(avg_transaction_time, 1.0,
"Average transaction time should be under 1 second")
self.assertLess(max_transaction_time, 2.0,
"Maximum transaction time should be under 2 seconds")
print(f"\nTransaction performance:")
print(f"Average transaction time: {avg_transaction_time:.3f}s")
print(f"Max transaction time: {max_transaction_time:.3f}s")
def test_select_related_performance(self):
"""Test select_related and prefetch_related performance"""
# Create test data with relationships
products = []
for i in range(100):
products.append(Product(
tenant=self.tenant,
sku=f'REL-{i:06d}',
name=f'Related Product {i}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=10.0,
is_active=True
))
Product.objects.bulk_create(products)
# Test query without select_related
start_time = time.time()
products_no_select = Product.objects.filter(tenant=self.tenant)
for product in products_no_select:
_ = product.tenant.name # This will cause additional queries
no_select_time = time.time() - start_time
# Test query with select_related
start_time = time.time()
products_with_select = Product.objects.filter(
tenant=self.tenant
).select_related('tenant')
for product in products_with_select:
_ = product.tenant.name # This should not cause additional queries
with_select_time = time.time() - start_time
# Performance assertions
self.assertLess(with_select_time, no_select_time * 0.5,
"Query with select_related should be much faster")
print(f"\nSelect_related performance:")
print(f"Without select_related: {no_select_time:.3f}s")
print(f"With select_related: {with_select_time:.3f}s")
print(f"Performance improvement: {(no_select_time / with_select_time):.1f}x")
def test_query_caching_performance(self):
"""Test query caching performance"""
# Create test data
products = []
for i in range(100):
products.append(Product(
tenant=self.tenant,
sku=f'CACHE-{i:06d}',
name=f'Cached Product {i}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=10.0,
is_active=True
))
Product.objects.bulk_create(products)
# Test repeated query performance
query_times = []
for i in range(20):
start_time = time.time()
products = Product.objects.filter(tenant=self.tenant)
list(products) # Force evaluation
query_times.append(time.time() - start_time)
# Analyze caching performance
first_query_time = query_times[0]
avg_subsequent_time = statistics.mean(query_times[1:])
# Subsequent queries should be faster due to caching
self.assertLess(avg_subsequent_time, first_query_time * 0.8,
"Subsequent queries should benefit from caching")
print(f"\nQuery caching performance:")
print(f"First query time: {first_query_time:.3f}s")
print(f"Average subsequent query time: {avg_subsequent_time:.3f}s")
print(f"Caching improvement: {(first_query_time / avg_subsequent_time):.1f}x")
def test_database_connection_health(self):
"""Test database connection health and reliability"""
health_results = []
# Test connection health over multiple attempts
for i in range(10):
start_time = time.time()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
health_results.append({
'success': True,
'time': time.time() - start_time,
'result': result
})
except OperationalError as e:
health_results.append({
'success': False,
'time': time.time() - start_time,
'error': str(e)
})
# Analyze connection health
successful_connections = [r for r in health_results if r['success']]
failed_connections = [r for r in health_results if not r['success']]
# All connections should succeed
self.assertEqual(len(failed_connections), 0,
"All database connections should succeed")
# Connection times should be consistent
connection_times = [r['time'] for r in successful_connections]
avg_time = statistics.mean(connection_times)
max_time = max(connection_times)
self.assertLess(avg_time, 0.05, "Average connection time should be under 50ms")
self.assertLess(max_time, 0.1, "Maximum connection time should be under 100ms")
print(f"\nDatabase connection health:")
print(f"Successful connections: {len(successful_connections)}/10")
print(f"Failed connections: {len(failed_connections)}/10")
print(f"Average connection time: {avg_time:.3f}s")
print(f"Maximum connection time: {max_time:.3f}s")