Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
846 lines
35 KiB
Python
846 lines
35 KiB
Python
"""
|
|
Load Testing for Multi-Tenant Scenarios
|
|
|
|
Comprehensive load testing for:
|
|
- Concurrent tenant operations
|
|
- Database connection pooling under load
|
|
- Schema isolation performance
|
|
- Resource usage optimization
|
|
- Scalability testing
|
|
|
|
Author: Claude
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import threading
|
|
import statistics
|
|
import queue
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from django.test import TestCase
|
|
from django.db import connection, connections, transaction
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.management import call_command
|
|
from django.conf import settings
|
|
|
|
from backend.src.core.models.tenant import Tenant
|
|
from backend.src.core.models.user import User
|
|
from backend.src.core.models.subscription import Subscription
|
|
from backend.src.modules.retail.models.product import Product
|
|
from backend.src.modules.healthcare.models.patient import Patient
|
|
from backend.src.modules.education.models.student import Student
|
|
from backend.src.modules.logistics.models.shipment import Shipment
|
|
from backend.src.modules.beauty.models.client import Client
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
class MultiTenantLoadTest(TestCase):
|
|
"""Load testing for multi-tenant scenarios"""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment for load testing"""
|
|
# Create base tenants for load testing
|
|
self.tenants = []
|
|
for i in range(20):
|
|
tenant = Tenant.objects.create(
|
|
name=f'Load Test Tenant {i:03d}',
|
|
schema_name=f'load_test_{i:03d}',
|
|
domain=f'loadtest{i:03d}.com',
|
|
business_type=random.choice(['retail', 'healthcare', 'education', 'logistics', 'beauty']),
|
|
registration_number=f'202401{i:06d}',
|
|
tax_id=f'MY123456{i:04d}',
|
|
contact_email=f'contact{i:03d}@loadtest.com',
|
|
contact_phone=f'+6012345{i:04d}',
|
|
address=f'{i+1} Load Test Street',
|
|
city='Kuala Lumpur',
|
|
state='KUL',
|
|
postal_code='50000'
|
|
)
|
|
self.tenants.append(tenant)
|
|
|
|
# Create users for each tenant
|
|
self.users = []
|
|
for tenant in self.tenants:
|
|
for i in range(5): # 5 users per tenant
|
|
user = User.objects.create_user(
|
|
username=f'user_{tenant.schema_name}_{i}',
|
|
email=f'user{i}@{tenant.domain}',
|
|
password='test123',
|
|
tenant=tenant,
|
|
role=random.choice(['admin', 'staff', 'user']),
|
|
first_name=f'User{i}',
|
|
last_name=f'From {tenant.name}'
|
|
)
|
|
self.users.append(user)
|
|
|
|
# Create subscriptions for tenants
|
|
self.subscriptions = []
|
|
for tenant in self.tenants:
|
|
subscription = Subscription.objects.create(
|
|
tenant=tenant,
|
|
plan=random.choice(['basic', 'premium', 'enterprise']),
|
|
status='active',
|
|
start_date=datetime.now().date(),
|
|
end_date=datetime.now().date() + timedelta(days=30),
|
|
amount=Decimal(random.choice([99.00, 299.00, 999.00])),
|
|
currency='MYR',
|
|
billing_cycle='monthly',
|
|
auto_renew=True
|
|
)
|
|
self.subscriptions.append(subscription)
|
|
|
|
# Create test data for different modules
|
|
self.create_test_data()
|
|
|
|
def create_test_data(self):
|
|
"""Create test data for different modules"""
|
|
# Products for retail tenants
|
|
self.products = []
|
|
retail_tenants = [t for t in self.tenants if t.business_type == 'retail']
|
|
for tenant in retail_tenants:
|
|
for i in range(50):
|
|
product = Product.objects.create(
|
|
tenant=tenant,
|
|
sku=f'{tenant.schema_name}_PRD_{i:04d}',
|
|
name=f'Product {i} for {tenant.name}',
|
|
description=f'Description for product {i}',
|
|
category=random.choice(['electronics', 'clothing', 'food', 'books']),
|
|
brand='Test Brand',
|
|
barcode=f'123456789{i:04d}',
|
|
unit='piece',
|
|
current_stock=random.randint(10, 1000),
|
|
minimum_stock=10,
|
|
maximum_stock=1000,
|
|
purchase_price=Decimal(random.uniform(10, 100)),
|
|
selling_price=Decimal(random.uniform(20, 200)),
|
|
tax_rate=6.0,
|
|
is_active=True
|
|
)
|
|
self.products.append(product)
|
|
|
|
# Patients for healthcare tenants
|
|
self.patients = []
|
|
healthcare_tenants = [t for t in self.tenants if t.business_type == 'healthcare']
|
|
for tenant in healthcare_tenants:
|
|
for i in range(30):
|
|
patient = Patient.objects.create(
|
|
tenant=tenant,
|
|
patient_id=f'{tenant.schema_name}_PAT_{i:04d}',
|
|
first_name=f'Patient{i}',
|
|
last_name=f'Test{i}',
|
|
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
|
|
gender=random.choice(['male', 'female']),
|
|
date_of_birth=datetime.now() - timedelta(days=random.randint(365*18, 365*70)),
|
|
blood_type=random.choice(['A+', 'A-', 'B+', 'B-', 'O+', 'O-', 'AB+', 'AB-']),
|
|
email=f'patient{i}@{tenant.domain}',
|
|
phone=f'+6012345{i:04d}',
|
|
address=f'{i+1} Patient Street',
|
|
city='Kuala Lumpur',
|
|
state='KUL',
|
|
postal_code='50000',
|
|
is_active=True
|
|
)
|
|
self.patients.append(patient)
|
|
|
|
# Students for education tenants
|
|
self.students = []
|
|
education_tenants = [t for t in self.tenants if t.business_type == 'education']
|
|
for tenant in education_tenants:
|
|
for i in range(100):
|
|
student = Student.objects.create(
|
|
tenant=tenant,
|
|
student_id=f'{tenant.schema_name}_STU_{i:04d}',
|
|
first_name=f'Student{i}',
|
|
last_name=f'Test{i}',
|
|
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
|
|
gender=random.choice(['male', 'female']),
|
|
date_of_birth=datetime.now() - timedelta(days=random.randint(365*6, 365*18)),
|
|
email=f'student{i}@{tenant.domain}',
|
|
phone=f'+6012345{i:04d}',
|
|
current_grade=random.choice(['Form 1', 'Form 2', 'Form 3', 'Form 4', 'Form 5']),
|
|
stream=random.choice(['science', 'arts', 'commerce']),
|
|
admission_date=datetime.now() - timedelta(days=random.randint(30, 365)),
|
|
status='active',
|
|
is_active=True
|
|
)
|
|
self.students.append(student)
|
|
|
|
# Shipments for logistics tenants
|
|
self.shipments = []
|
|
logistics_tenants = [t for t in self.tenants if t.business_type == 'logistics']
|
|
for tenant in logistics_tenants:
|
|
for i in range(25):
|
|
shipment = Shipment.objects.create(
|
|
tenant=tenant,
|
|
tracking_number=f'{tenant.schema_name}_TRK_{i:04d}',
|
|
order_number=f'ORD_{i:06d}',
|
|
sender_name=f'Sender {i}',
|
|
receiver_name=f'Receiver {i}',
|
|
sender_phone=f'+6012345{i:04d}',
|
|
receiver_phone=f'+6012345{i:04d}',
|
|
origin_state=random.choice(['KUL', 'PNG', 'JHR', 'KDH']),
|
|
destination_state=random.choice(['KUL', 'PNG', 'JHR', 'KDH']),
|
|
service_type=random.choice(['express', 'standard', 'economy']),
|
|
package_type=random.choice(['document', 'parcel', 'freight']),
|
|
weight=Decimal(random.uniform(0.5, 50)),
|
|
length=Decimal(random.uniform(10, 100)),
|
|
width=Decimal(random.uniform(10, 100)),
|
|
height=Decimal(random.uniform(10, 100)),
|
|
shipping_cost=Decimal(random.uniform(5, 200)),
|
|
status=random.choice(['processing', 'in_transit', 'delivered']),
|
|
priority=random.choice(['normal', 'urgent'])
|
|
)
|
|
self.shipments.append(shipment)
|
|
|
|
# Clients for beauty tenants
|
|
self.clients = []
|
|
beauty_tenants = [t for t in self.tenants if t.business_type == 'beauty']
|
|
for tenant in beauty_tenants:
|
|
for i in range(40):
|
|
client = Client.objects.create(
|
|
tenant=tenant,
|
|
client_number=f'{tenant.schema_name}_CLI_{i:04d}',
|
|
first_name=f'Client{i}',
|
|
last_name=f'Test{i}',
|
|
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
|
|
gender=random.choice(['male', 'female']),
|
|
date_of_birth=datetime.now() - timedelta(days=random.randint(365*18, 365*70)),
|
|
email=f'client{i}@{tenant.domain}',
|
|
phone=f'+6012345{i:04d}',
|
|
membership_tier=random.choice(['basic', 'silver', 'gold', 'platinum']),
|
|
loyalty_points=random.randint(0, 1000),
|
|
total_spent=Decimal(random.uniform(0, 10000)),
|
|
visit_count=random.randint(0, 50),
|
|
is_active=True
|
|
)
|
|
self.clients.append(client)
|
|
|
|
def test_concurrent_tenant_operations(self):
|
|
"""Test concurrent operations across multiple tenants"""
|
|
results = queue.Queue()
|
|
errors = queue.Queue()
|
|
|
|
def tenant_worker(tenant_id, worker_id):
|
|
"""Worker function for tenant operations"""
|
|
start_time = time.time()
|
|
operations_completed = 0
|
|
|
|
try:
|
|
tenant = self.tenants[tenant_id]
|
|
|
|
# Perform various operations
|
|
for i in range(20): # 20 operations per worker
|
|
operation_type = random.choice(['read', 'write', 'update'])
|
|
|
|
if operation_type == 'read':
|
|
# Read operations
|
|
users = User.objects.filter(tenant=tenant)
|
|
subscription = Subscription.objects.filter(tenant=tenant).first()
|
|
operations_completed += 2
|
|
|
|
elif operation_type == 'write':
|
|
# Write operations (create new records)
|
|
if tenant.business_type == 'retail':
|
|
Product.objects.create(
|
|
tenant=tenant,
|
|
sku=f'LOAD_{worker_id}_{i:04d}',
|
|
name=f'Load Test Product {worker_id}-{i}',
|
|
category='electronics',
|
|
unit='piece',
|
|
current_stock=100,
|
|
minimum_stock=10,
|
|
purchase_price=Decimal('50.00'),
|
|
selling_price=Decimal('100.00'),
|
|
tax_rate=6.0,
|
|
is_active=True
|
|
)
|
|
elif tenant.business_type == 'healthcare':
|
|
Patient.objects.create(
|
|
tenant=tenant,
|
|
patient_id=f'LOAD_{worker_id}_{i:04d}',
|
|
first_name=f'Load Patient {worker_id}-{i}',
|
|
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
|
|
gender='male',
|
|
date_of_birth=datetime.now() - timedelta(days=365*30),
|
|
email=f'load{worker_id}-{i}@{tenant.domain}',
|
|
phone=f'+6012345{worker_id:02d}{i:02d}',
|
|
is_active=True
|
|
)
|
|
operations_completed += 1
|
|
|
|
elif operation_type == 'update':
|
|
# Update operations
|
|
tenant.name = f'Updated Tenant {tenant_id} at {time.time()}'
|
|
tenant.save()
|
|
|
|
# Update user data
|
|
users = User.objects.filter(tenant=tenant)
|
|
for user in users[:5]: # Update first 5 users
|
|
user.last_login = datetime.now()
|
|
user.save()
|
|
operations_completed += len(users[:5]) + 1
|
|
|
|
# Small delay to simulate real usage
|
|
time.sleep(0.01)
|
|
|
|
end_time = time.time()
|
|
results.put({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'operations_completed': operations_completed,
|
|
'time_taken': end_time - start_time,
|
|
'success': True
|
|
})
|
|
|
|
except Exception as e:
|
|
errors.put({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'error': str(e),
|
|
'time_taken': time.time() - start_time,
|
|
'success': False
|
|
})
|
|
|
|
# Start concurrent workers
|
|
start_time = time.time()
|
|
threads = []
|
|
|
|
# Create workers for different tenants (concurrency level)
|
|
concurrency_level = 15
|
|
for i in range(concurrency_level):
|
|
tenant_id = i % len(self.tenants)
|
|
thread = threading.Thread(
|
|
target=tenant_worker,
|
|
args=(tenant_id, i)
|
|
)
|
|
threads.append(thread)
|
|
|
|
# Start all threads
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
# Wait for all threads to complete
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Collect results
|
|
successful_operations = []
|
|
while not results.empty():
|
|
successful_operations.append(results.get())
|
|
|
|
failed_operations = []
|
|
while not errors.empty():
|
|
failed_operations.append(errors.get())
|
|
|
|
# Analyze results
|
|
total_operations = sum(op['operations_completed'] for op in successful_operations)
|
|
operations_per_second = total_operations / total_time
|
|
success_rate = len(successful_operations) / (len(successful_operations) + len(failed_operations)) * 100
|
|
|
|
# Performance assertions
|
|
self.assertGreaterEqual(success_rate, 95.0,
|
|
"Success rate should be at least 95% for concurrent operations")
|
|
self.assertGreater(operations_per_second, 10,
|
|
"Should handle at least 10 operations per second")
|
|
|
|
# Log performance metrics
|
|
print(f"\nConcurrent Tenant Operations Results:")
|
|
print(f"Total time: {total_time:.2f}s")
|
|
print(f"Total operations: {total_operations}")
|
|
print(f"Operations per second: {operations_per_second:.1f}")
|
|
print(f"Success rate: {success_rate:.1f}%")
|
|
print(f"Successful workers: {len(successful_operations)}")
|
|
print(f"Failed workers: {len(failed_operations)}")
|
|
|
|
if failed_operations:
|
|
print(f"\nFailed operations:")
|
|
for failure in failed_operations:
|
|
print(f" Worker {failure['worker_id']}: {failure['error']}")
|
|
|
|
def test_database_connection_pooling_under_load(self):
|
|
"""Test database connection pooling under heavy load"""
|
|
connection_metrics = []
|
|
|
|
def connection_test_worker(worker_id, operations):
|
|
"""Worker to test database connections"""
|
|
worker_metrics = {
|
|
'worker_id': worker_id,
|
|
'connections': [],
|
|
'success_count': 0,
|
|
'error_count': 0
|
|
}
|
|
|
|
for i in range(operations):
|
|
start_time = time.time()
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
# Execute query with tenant isolation
|
|
tenant = self.tenants[worker_id % len(self.tenants)]
|
|
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
|
|
cursor.execute("SELECT COUNT(*) FROM auth_user;")
|
|
count = cursor.fetchone()[0]
|
|
|
|
connection_time = time.time() - start_time
|
|
worker_metrics['connections'].append(connection_time)
|
|
worker_metrics['success_count'] += 1
|
|
|
|
# Small delay to simulate real usage
|
|
time.sleep(0.001)
|
|
|
|
except Exception as e:
|
|
worker_metrics['error_count'] += 1
|
|
connection_time = time.time() - start_time
|
|
worker_metrics['connections'].append(connection_time)
|
|
|
|
return worker_metrics
|
|
|
|
# Test with different load levels
|
|
load_levels = [10, 25, 50, 100]
|
|
|
|
for load_level in load_levels:
|
|
print(f"\nTesting connection pooling with {load_level} concurrent connections:")
|
|
|
|
threads = []
|
|
results = queue.Queue()
|
|
|
|
# Create worker threads
|
|
for i in range(load_level):
|
|
thread = threading.Thread(
|
|
target=lambda q, wid: q.put(connection_test_worker(wid, 20)),
|
|
args=(results, i)
|
|
)
|
|
threads.append(thread)
|
|
|
|
# Start all threads
|
|
start_time = time.time()
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
# Wait for completion
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Collect and analyze results
|
|
all_metrics = []
|
|
while not results.empty():
|
|
all_metrics.append(results.get())
|
|
|
|
total_connections = sum(m['success_count'] + m['error_count'] for m in all_metrics)
|
|
successful_connections = sum(m['success_count'] for m in all_metrics)
|
|
connection_times = [time for m in all_metrics for time in m['connections']]
|
|
|
|
if connection_times:
|
|
avg_connection_time = statistics.mean(connection_times)
|
|
max_connection_time = max(connection_times)
|
|
min_connection_time = min(connection_times)
|
|
connections_per_second = total_connections / total_time
|
|
success_rate = successful_connections / total_connections * 100
|
|
|
|
# Performance assertions
|
|
self.assertLess(avg_connection_time, 0.05,
|
|
f"Average connection time should be under 50ms at {load_level} connections")
|
|
self.assertLess(max_connection_time, 0.2,
|
|
f"Maximum connection time should be under 200ms at {load_level} connections")
|
|
self.assertGreaterEqual(success_rate, 98.0,
|
|
f"Success rate should be at least 98% at {load_level} connections")
|
|
|
|
print(f" Average connection time: {avg_connection_time:.3f}s")
|
|
print(f" Max connection time: {max_connection_time:.3f}s")
|
|
print(f" Connections per second: {connections_per_second:.1f}")
|
|
print(f" Success rate: {success_rate:.1f}%")
|
|
|
|
def test_schema_isolation_performance(self):
|
|
"""Test performance of schema isolation under load"""
|
|
isolation_metrics = []
|
|
|
|
def schema_isolation_worker(tenant_id, worker_id):
|
|
"""Worker to test schema isolation"""
|
|
start_time = time.time()
|
|
operations_completed = 0
|
|
|
|
try:
|
|
tenant = self.tenants[tenant_id]
|
|
|
|
# Test schema-specific operations
|
|
with connection.cursor() as cursor:
|
|
# Switch to tenant schema
|
|
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
|
|
|
|
# Perform operations in tenant schema
|
|
for i in range(10):
|
|
# Count users in tenant schema
|
|
cursor.execute("SELECT COUNT(*) FROM auth_user;")
|
|
user_count = cursor.fetchone()[0]
|
|
|
|
# Get tenant-specific data
|
|
if tenant.business_type == 'retail':
|
|
cursor.execute("SELECT COUNT(*) FROM core_product;")
|
|
product_count = cursor.fetchone()[0]
|
|
elif tenant.business_type == 'healthcare':
|
|
cursor.execute("SELECT COUNT(*) FROM healthcare_patient;")
|
|
patient_count = cursor.fetchone()[0]
|
|
|
|
operations_completed += 1
|
|
|
|
# Small delay
|
|
time.sleep(0.001)
|
|
|
|
end_time = time.time()
|
|
|
|
isolation_metrics.append({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'operations_completed': operations_completed,
|
|
'time_taken': end_time - start_time,
|
|
'success': True
|
|
})
|
|
|
|
except Exception as e:
|
|
isolation_metrics.append({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'error': str(e),
|
|
'time_taken': time.time() - start_time,
|
|
'success': False
|
|
})
|
|
|
|
# Test schema isolation with concurrent access
|
|
threads = []
|
|
for i in range(30): # 30 concurrent workers
|
|
tenant_id = i % len(self.tenants)
|
|
thread = threading.Thread(
|
|
target=schema_isolation_worker,
|
|
args=(tenant_id, i)
|
|
)
|
|
threads.append(thread)
|
|
|
|
start_time = time.time()
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Analyze isolation performance
|
|
successful_ops = [m for m in isolation_metrics if m['success']]
|
|
failed_ops = [m for m in isolation_metrics if not m['success']]
|
|
|
|
total_operations = sum(op['operations_completed'] for op in successful_ops)
|
|
success_rate = len(successful_ops) / len(isolation_metrics) * 100
|
|
operations_per_second = total_operations / total_time
|
|
|
|
if successful_ops:
|
|
avg_time_per_op = statistics.mean([op['time_taken'] / op['operations_completed'] for op in successful_ops])
|
|
|
|
# Performance assertions
|
|
self.assertLess(avg_time_per_op, 0.01,
|
|
"Average time per schema operation should be under 10ms")
|
|
self.assertGreaterEqual(success_rate, 95.0,
|
|
"Schema isolation success rate should be at least 95%")
|
|
self.assertGreater(operations_per_second, 50,
|
|
"Should handle at least 50 schema operations per second")
|
|
|
|
print(f"\nSchema Isolation Performance:")
|
|
print(f"Total time: {total_time:.2f}s")
|
|
print(f"Total operations: {total_operations}")
|
|
print(f"Operations per second: {operations_per_second:.1f}")
|
|
print(f"Success rate: {success_rate:.1f}%")
|
|
if successful_ops:
|
|
print(f"Average time per operation: {avg_time_per_op:.4f}s")
|
|
|
|
def test_resource_usage_optimization(self):
|
|
"""Test resource usage optimization under multi-tenant load"""
|
|
import psutil
|
|
import os
|
|
|
|
process = psutil.Process(os.getpid())
|
|
|
|
# Monitor resource usage during load test
|
|
def resource_monitor_worker(duration, results_queue):
|
|
"""Worker to monitor resource usage"""
|
|
start_time = time.time()
|
|
memory_samples = []
|
|
cpu_samples = []
|
|
|
|
while time.time() - start_time < duration:
|
|
memory_info = process.memory_info()
|
|
cpu_percent = process.cpu_percent()
|
|
|
|
memory_samples.append(memory_info.rss / 1024 / 1024) # MB
|
|
cpu_samples.append(cpu_percent)
|
|
|
|
time.sleep(0.1) # Sample every 100ms
|
|
|
|
results_queue.put({
|
|
'memory_samples': memory_samples,
|
|
'cpu_samples': cpu_samples,
|
|
'duration': duration
|
|
})
|
|
|
|
def load_worker(worker_id, operations):
|
|
"""Load generation worker"""
|
|
for i in range(operations):
|
|
try:
|
|
# Random tenant operations
|
|
tenant = random.choice(self.tenants)
|
|
|
|
# Perform random database operations
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
|
|
cursor.execute("SELECT COUNT(*) FROM auth_user;")
|
|
|
|
# Small delay
|
|
time.sleep(0.005)
|
|
|
|
except Exception as e:
|
|
print(f"Worker {worker_id} error: {e}")
|
|
|
|
# Start resource monitoring
|
|
monitor_results = queue.Queue()
|
|
monitor_thread = threading.Thread(
|
|
target=resource_monitor_worker,
|
|
args=(10, monitor_results) # Monitor for 10 seconds
|
|
)
|
|
monitor_thread.start()
|
|
|
|
# Start load generation
|
|
start_time = time.time()
|
|
threads = []
|
|
|
|
# Create load workers
|
|
for i in range(50): # 50 concurrent workers
|
|
thread = threading.Thread(
|
|
target=load_worker,
|
|
args=(i, 100) # Each worker performs 100 operations
|
|
)
|
|
threads.append(thread)
|
|
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Wait for monitoring to complete
|
|
monitor_thread.join()
|
|
resource_data = monitor_results.get()
|
|
|
|
# Analyze resource usage
|
|
memory_samples = resource_data['memory_samples']
|
|
cpu_samples = resource_data['cpu_samples']
|
|
|
|
avg_memory = statistics.mean(memory_samples)
|
|
max_memory = max(memory_samples)
|
|
avg_cpu = statistics.mean(cpu_samples)
|
|
max_cpu = max(cpu_samples)
|
|
|
|
total_operations = 50 * 100 # 50 workers * 100 operations each
|
|
operations_per_second = total_operations / total_time
|
|
|
|
# Performance assertions
|
|
self.assertLess(avg_memory, 1000, # 1GB
|
|
"Average memory usage should be under 1GB")
|
|
self.assertLess(max_memory, 1500, # 1.5GB
|
|
"Peak memory usage should be under 1.5GB")
|
|
self.assertLess(avg_cpu, 80, # 80%
|
|
"Average CPU usage should be under 80%")
|
|
self.assertGreater(operations_per_second, 25,
|
|
"Should handle at least 25 operations per second under load")
|
|
|
|
print(f"\nResource Usage Optimization Results:")
|
|
print(f"Total operations: {total_operations}")
|
|
print(f"Operations per second: {operations_per_second:.1f}")
|
|
print(f"Average memory usage: {avg_memory:.1f} MB")
|
|
print(f"Peak memory usage: {max_memory:.1f} MB")
|
|
print(f"Average CPU usage: {avg_cpu:.1f}%")
|
|
print(f"Peak CPU usage: {max_cpu:.1f}%")
|
|
|
|
def test_scalability_benchmark(self):
|
|
"""Test scalability with increasing load"""
|
|
scalability_results = []
|
|
|
|
# Test with different tenant counts
|
|
tenant_counts = [5, 10, 15, 20]
|
|
|
|
for tenant_count in tenant_counts:
|
|
print(f"\nTesting scalability with {tenant_count} tenants:")
|
|
|
|
# Use subset of tenants
|
|
test_tenants = self.tenants[:tenant_count]
|
|
|
|
def scalability_worker(operations):
|
|
"""Worker for scalability testing"""
|
|
for i in range(operations):
|
|
try:
|
|
tenant = random.choice(test_tenants)
|
|
|
|
# Perform tenant-specific operations
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
|
|
cursor.execute("SELECT COUNT(*) FROM auth_user;")
|
|
|
|
# Simulate processing time
|
|
time.sleep(0.01)
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Run test with increasing concurrency
|
|
concurrency_levels = [5, 10, 20]
|
|
|
|
for concurrency in concurrency_levels:
|
|
start_time = time.time()
|
|
threads = []
|
|
|
|
# Create worker threads
|
|
for i in range(concurrency):
|
|
thread = threading.Thread(
|
|
target=scalability_worker,
|
|
args=(20,) # 20 operations per worker
|
|
)
|
|
threads.append(thread)
|
|
|
|
# Start and wait for completion
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
total_operations = concurrency * 20
|
|
operations_per_second = total_operations / total_time
|
|
|
|
scalability_results.append({
|
|
'tenant_count': tenant_count,
|
|
'concurrency': concurrency,
|
|
'total_time': total_time,
|
|
'operations_per_second': operations_per_second
|
|
})
|
|
|
|
print(f" Concurrency {concurrency}: {operations_per_second:.1f} ops/sec")
|
|
|
|
# Analyze scalability
|
|
print(f"\nScalability Analysis:")
|
|
for result in scalability_results:
|
|
throughput = result['operations_per_second']
|
|
tenant_count = result['tenant_count']
|
|
concurrency = result['concurrency']
|
|
|
|
# Calculate throughput per tenant
|
|
throughput_per_tenant = throughput / tenant_count
|
|
|
|
print(f" {tenant_count} tenants, {concurrency} concurrent: "
|
|
f"{throughput:.1f} ops/sec ({throughput_per_tenant:.1f} per tenant)")
|
|
|
|
# Performance assertions for scalability
|
|
# Throughput should not decrease significantly with more tenants
|
|
baseline_throughput = scalability_results[0]['operations_per_second']
|
|
max_throughput = max(r['operations_per_second'] for r in scalability_results)
|
|
|
|
self.assertGreater(max_throughput, baseline_throughput * 0.5,
|
|
"Throughput should not degrade by more than 50% under load")
|
|
|
|
def test_multi_tenant_transaction_performance(self):
|
|
"""Test transaction performance across multiple tenants"""
|
|
transaction_metrics = []
|
|
|
|
def transaction_worker(tenant_id, worker_id):
|
|
"""Worker for transaction testing"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
tenant = self.tenants[tenant_id]
|
|
|
|
# Perform transactions in tenant schema
|
|
with transaction.atomic():
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
|
|
|
|
# Create multiple records in a transaction
|
|
for i in range(5):
|
|
cursor.execute(
|
|
"INSERT INTO auth_user (username, email, password, tenant_id, is_active) "
|
|
"VALUES (%s, %s, %s, %s, %s) RETURNING id;",
|
|
[f'tx_user_{worker_id}_{i}', f'user{i}@{tenant.domain}', 'hash', tenant.id, True]
|
|
)
|
|
|
|
# Update tenant stats
|
|
cursor.execute(
|
|
"UPDATE core_tenant SET name = %s WHERE id = %s;",
|
|
[f'Updated at {time.time()}', tenant.id]
|
|
)
|
|
|
|
end_time = time.time()
|
|
|
|
transaction_metrics.append({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'time_taken': end_time - start_time,
|
|
'success': True
|
|
})
|
|
|
|
except Exception as e:
|
|
transaction_metrics.append({
|
|
'worker_id': worker_id,
|
|
'tenant_id': tenant_id,
|
|
'error': str(e),
|
|
'time_taken': time.time() - start_time,
|
|
'success': False
|
|
})
|
|
|
|
# Test concurrent transactions
|
|
threads = []
|
|
for i in range(40): # 40 concurrent transaction workers
|
|
tenant_id = i % len(self.tenants)
|
|
thread = threading.Thread(
|
|
target=transaction_worker,
|
|
args=(tenant_id, i)
|
|
)
|
|
threads.append(thread)
|
|
|
|
start_time = time.time()
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Analyze transaction performance
|
|
successful_tx = [m for m in transaction_metrics if m['success']]
|
|
failed_tx = [m for m in transaction_metrics if not m['success']]
|
|
|
|
success_rate = len(successful_tx) / len(transaction_metrics) * 100
|
|
|
|
if successful_tx:
|
|
avg_tx_time = statistics.mean([tx['time_taken'] for tx in successful_tx])
|
|
transactions_per_second = len(successful_tx) / total_time
|
|
|
|
# Performance assertions
|
|
self.assertLess(avg_tx_time, 0.1,
|
|
"Average transaction time should be under 100ms")
|
|
self.assertGreaterEqual(success_rate, 95.0,
|
|
"Transaction success rate should be at least 95%")
|
|
self.assertGreater(transactions_per_second, 20,
|
|
"Should handle at least 20 transactions per second")
|
|
|
|
print(f"\nMulti-Tenant Transaction Performance:")
|
|
print(f"Total time: {total_time:.2f}s")
|
|
print(f"Total transactions: {len(successful_tx)}")
|
|
print(f"Transactions per second: {len(successful_tx) / total_time:.1f}")
|
|
print(f"Success rate: {success_rate:.1f}%")
|
|
if successful_tx:
|
|
print(f"Average transaction time: {avg_tx_time:.3f}s") |