Files
multitenetsaas/backend/tests/load/test_multi_tenant_load.py
AHMET YILMAZ b3fff546e9
Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
project initialization
2025-10-05 02:37:33 +08:00

846 lines
35 KiB
Python

"""
Load Testing for Multi-Tenant Scenarios
Comprehensive load testing for:
- Concurrent tenant operations
- Database connection pooling under load
- Schema isolation performance
- Resource usage optimization
- Scalability testing
Author: Claude
"""
import pytest
import time
import threading
import statistics
import queue
import random
from datetime import datetime, timedelta
from decimal import Decimal
from django.test import TestCase
from django.db import connection, connections, transaction
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.conf import settings
from backend.src.core.models.tenant import Tenant
from backend.src.core.models.user import User
from backend.src.core.models.subscription import Subscription
from backend.src.modules.retail.models.product import Product
from backend.src.modules.healthcare.models.patient import Patient
from backend.src.modules.education.models.student import Student
from backend.src.modules.logistics.models.shipment import Shipment
from backend.src.modules.beauty.models.client import Client
User = get_user_model()
class MultiTenantLoadTest(TestCase):
"""Load testing for multi-tenant scenarios"""
def setUp(self):
"""Set up test environment for load testing"""
# Create base tenants for load testing
self.tenants = []
for i in range(20):
tenant = Tenant.objects.create(
name=f'Load Test Tenant {i:03d}',
schema_name=f'load_test_{i:03d}',
domain=f'loadtest{i:03d}.com',
business_type=random.choice(['retail', 'healthcare', 'education', 'logistics', 'beauty']),
registration_number=f'202401{i:06d}',
tax_id=f'MY123456{i:04d}',
contact_email=f'contact{i:03d}@loadtest.com',
contact_phone=f'+6012345{i:04d}',
address=f'{i+1} Load Test Street',
city='Kuala Lumpur',
state='KUL',
postal_code='50000'
)
self.tenants.append(tenant)
# Create users for each tenant
self.users = []
for tenant in self.tenants:
for i in range(5): # 5 users per tenant
user = User.objects.create_user(
username=f'user_{tenant.schema_name}_{i}',
email=f'user{i}@{tenant.domain}',
password='test123',
tenant=tenant,
role=random.choice(['admin', 'staff', 'user']),
first_name=f'User{i}',
last_name=f'From {tenant.name}'
)
self.users.append(user)
# Create subscriptions for tenants
self.subscriptions = []
for tenant in self.tenants:
subscription = Subscription.objects.create(
tenant=tenant,
plan=random.choice(['basic', 'premium', 'enterprise']),
status='active',
start_date=datetime.now().date(),
end_date=datetime.now().date() + timedelta(days=30),
amount=Decimal(random.choice([99.00, 299.00, 999.00])),
currency='MYR',
billing_cycle='monthly',
auto_renew=True
)
self.subscriptions.append(subscription)
# Create test data for different modules
self.create_test_data()
def create_test_data(self):
"""Create test data for different modules"""
# Products for retail tenants
self.products = []
retail_tenants = [t for t in self.tenants if t.business_type == 'retail']
for tenant in retail_tenants:
for i in range(50):
product = Product.objects.create(
tenant=tenant,
sku=f'{tenant.schema_name}_PRD_{i:04d}',
name=f'Product {i} for {tenant.name}',
description=f'Description for product {i}',
category=random.choice(['electronics', 'clothing', 'food', 'books']),
brand='Test Brand',
barcode=f'123456789{i:04d}',
unit='piece',
current_stock=random.randint(10, 1000),
minimum_stock=10,
maximum_stock=1000,
purchase_price=Decimal(random.uniform(10, 100)),
selling_price=Decimal(random.uniform(20, 200)),
tax_rate=6.0,
is_active=True
)
self.products.append(product)
# Patients for healthcare tenants
self.patients = []
healthcare_tenants = [t for t in self.tenants if t.business_type == 'healthcare']
for tenant in healthcare_tenants:
for i in range(30):
patient = Patient.objects.create(
tenant=tenant,
patient_id=f'{tenant.schema_name}_PAT_{i:04d}',
first_name=f'Patient{i}',
last_name=f'Test{i}',
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
gender=random.choice(['male', 'female']),
date_of_birth=datetime.now() - timedelta(days=random.randint(365*18, 365*70)),
blood_type=random.choice(['A+', 'A-', 'B+', 'B-', 'O+', 'O-', 'AB+', 'AB-']),
email=f'patient{i}@{tenant.domain}',
phone=f'+6012345{i:04d}',
address=f'{i+1} Patient Street',
city='Kuala Lumpur',
state='KUL',
postal_code='50000',
is_active=True
)
self.patients.append(patient)
# Students for education tenants
self.students = []
education_tenants = [t for t in self.tenants if t.business_type == 'education']
for tenant in education_tenants:
for i in range(100):
student = Student.objects.create(
tenant=tenant,
student_id=f'{tenant.schema_name}_STU_{i:04d}',
first_name=f'Student{i}',
last_name=f'Test{i}',
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
gender=random.choice(['male', 'female']),
date_of_birth=datetime.now() - timedelta(days=random.randint(365*6, 365*18)),
email=f'student{i}@{tenant.domain}',
phone=f'+6012345{i:04d}',
current_grade=random.choice(['Form 1', 'Form 2', 'Form 3', 'Form 4', 'Form 5']),
stream=random.choice(['science', 'arts', 'commerce']),
admission_date=datetime.now() - timedelta(days=random.randint(30, 365)),
status='active',
is_active=True
)
self.students.append(student)
# Shipments for logistics tenants
self.shipments = []
logistics_tenants = [t for t in self.tenants if t.business_type == 'logistics']
for tenant in logistics_tenants:
for i in range(25):
shipment = Shipment.objects.create(
tenant=tenant,
tracking_number=f'{tenant.schema_name}_TRK_{i:04d}',
order_number=f'ORD_{i:06d}',
sender_name=f'Sender {i}',
receiver_name=f'Receiver {i}',
sender_phone=f'+6012345{i:04d}',
receiver_phone=f'+6012345{i:04d}',
origin_state=random.choice(['KUL', 'PNG', 'JHR', 'KDH']),
destination_state=random.choice(['KUL', 'PNG', 'JHR', 'KDH']),
service_type=random.choice(['express', 'standard', 'economy']),
package_type=random.choice(['document', 'parcel', 'freight']),
weight=Decimal(random.uniform(0.5, 50)),
length=Decimal(random.uniform(10, 100)),
width=Decimal(random.uniform(10, 100)),
height=Decimal(random.uniform(10, 100)),
shipping_cost=Decimal(random.uniform(5, 200)),
status=random.choice(['processing', 'in_transit', 'delivered']),
priority=random.choice(['normal', 'urgent'])
)
self.shipments.append(shipment)
# Clients for beauty tenants
self.clients = []
beauty_tenants = [t for t in self.tenants if t.business_type == 'beauty']
for tenant in beauty_tenants:
for i in range(40):
client = Client.objects.create(
tenant=tenant,
client_number=f'{tenant.schema_name}_CLI_{i:04d}',
first_name=f'Client{i}',
last_name=f'Test{i}',
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
gender=random.choice(['male', 'female']),
date_of_birth=datetime.now() - timedelta(days=random.randint(365*18, 365*70)),
email=f'client{i}@{tenant.domain}',
phone=f'+6012345{i:04d}',
membership_tier=random.choice(['basic', 'silver', 'gold', 'platinum']),
loyalty_points=random.randint(0, 1000),
total_spent=Decimal(random.uniform(0, 10000)),
visit_count=random.randint(0, 50),
is_active=True
)
self.clients.append(client)
def test_concurrent_tenant_operations(self):
"""Test concurrent operations across multiple tenants"""
results = queue.Queue()
errors = queue.Queue()
def tenant_worker(tenant_id, worker_id):
"""Worker function for tenant operations"""
start_time = time.time()
operations_completed = 0
try:
tenant = self.tenants[tenant_id]
# Perform various operations
for i in range(20): # 20 operations per worker
operation_type = random.choice(['read', 'write', 'update'])
if operation_type == 'read':
# Read operations
users = User.objects.filter(tenant=tenant)
subscription = Subscription.objects.filter(tenant=tenant).first()
operations_completed += 2
elif operation_type == 'write':
# Write operations (create new records)
if tenant.business_type == 'retail':
Product.objects.create(
tenant=tenant,
sku=f'LOAD_{worker_id}_{i:04d}',
name=f'Load Test Product {worker_id}-{i}',
category='electronics',
unit='piece',
current_stock=100,
minimum_stock=10,
purchase_price=Decimal('50.00'),
selling_price=Decimal('100.00'),
tax_rate=6.0,
is_active=True
)
elif tenant.business_type == 'healthcare':
Patient.objects.create(
tenant=tenant,
patient_id=f'LOAD_{worker_id}_{i:04d}',
first_name=f'Load Patient {worker_id}-{i}',
ic_number=f'{random.randint(500101, 991231):02d}-{random.randint(10, 99):02d}-{random.randint(1000, 9999):04d}',
gender='male',
date_of_birth=datetime.now() - timedelta(days=365*30),
email=f'load{worker_id}-{i}@{tenant.domain}',
phone=f'+6012345{worker_id:02d}{i:02d}',
is_active=True
)
operations_completed += 1
elif operation_type == 'update':
# Update operations
tenant.name = f'Updated Tenant {tenant_id} at {time.time()}'
tenant.save()
# Update user data
users = User.objects.filter(tenant=tenant)
for user in users[:5]: # Update first 5 users
user.last_login = datetime.now()
user.save()
operations_completed += len(users[:5]) + 1
# Small delay to simulate real usage
time.sleep(0.01)
end_time = time.time()
results.put({
'worker_id': worker_id,
'tenant_id': tenant_id,
'operations_completed': operations_completed,
'time_taken': end_time - start_time,
'success': True
})
except Exception as e:
errors.put({
'worker_id': worker_id,
'tenant_id': tenant_id,
'error': str(e),
'time_taken': time.time() - start_time,
'success': False
})
# Start concurrent workers
start_time = time.time()
threads = []
# Create workers for different tenants (concurrency level)
concurrency_level = 15
for i in range(concurrency_level):
tenant_id = i % len(self.tenants)
thread = threading.Thread(
target=tenant_worker,
args=(tenant_id, i)
)
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Collect results
successful_operations = []
while not results.empty():
successful_operations.append(results.get())
failed_operations = []
while not errors.empty():
failed_operations.append(errors.get())
# Analyze results
total_operations = sum(op['operations_completed'] for op in successful_operations)
operations_per_second = total_operations / total_time
success_rate = len(successful_operations) / (len(successful_operations) + len(failed_operations)) * 100
# Performance assertions
self.assertGreaterEqual(success_rate, 95.0,
"Success rate should be at least 95% for concurrent operations")
self.assertGreater(operations_per_second, 10,
"Should handle at least 10 operations per second")
# Log performance metrics
print(f"\nConcurrent Tenant Operations Results:")
print(f"Total time: {total_time:.2f}s")
print(f"Total operations: {total_operations}")
print(f"Operations per second: {operations_per_second:.1f}")
print(f"Success rate: {success_rate:.1f}%")
print(f"Successful workers: {len(successful_operations)}")
print(f"Failed workers: {len(failed_operations)}")
if failed_operations:
print(f"\nFailed operations:")
for failure in failed_operations:
print(f" Worker {failure['worker_id']}: {failure['error']}")
def test_database_connection_pooling_under_load(self):
"""Test database connection pooling under heavy load"""
connection_metrics = []
def connection_test_worker(worker_id, operations):
"""Worker to test database connections"""
worker_metrics = {
'worker_id': worker_id,
'connections': [],
'success_count': 0,
'error_count': 0
}
for i in range(operations):
start_time = time.time()
try:
with connection.cursor() as cursor:
# Execute query with tenant isolation
tenant = self.tenants[worker_id % len(self.tenants)]
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
cursor.execute("SELECT COUNT(*) FROM auth_user;")
count = cursor.fetchone()[0]
connection_time = time.time() - start_time
worker_metrics['connections'].append(connection_time)
worker_metrics['success_count'] += 1
# Small delay to simulate real usage
time.sleep(0.001)
except Exception as e:
worker_metrics['error_count'] += 1
connection_time = time.time() - start_time
worker_metrics['connections'].append(connection_time)
return worker_metrics
# Test with different load levels
load_levels = [10, 25, 50, 100]
for load_level in load_levels:
print(f"\nTesting connection pooling with {load_level} concurrent connections:")
threads = []
results = queue.Queue()
# Create worker threads
for i in range(load_level):
thread = threading.Thread(
target=lambda q, wid: q.put(connection_test_worker(wid, 20)),
args=(results, i)
)
threads.append(thread)
# Start all threads
start_time = time.time()
for thread in threads:
thread.start()
# Wait for completion
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Collect and analyze results
all_metrics = []
while not results.empty():
all_metrics.append(results.get())
total_connections = sum(m['success_count'] + m['error_count'] for m in all_metrics)
successful_connections = sum(m['success_count'] for m in all_metrics)
connection_times = [time for m in all_metrics for time in m['connections']]
if connection_times:
avg_connection_time = statistics.mean(connection_times)
max_connection_time = max(connection_times)
min_connection_time = min(connection_times)
connections_per_second = total_connections / total_time
success_rate = successful_connections / total_connections * 100
# Performance assertions
self.assertLess(avg_connection_time, 0.05,
f"Average connection time should be under 50ms at {load_level} connections")
self.assertLess(max_connection_time, 0.2,
f"Maximum connection time should be under 200ms at {load_level} connections")
self.assertGreaterEqual(success_rate, 98.0,
f"Success rate should be at least 98% at {load_level} connections")
print(f" Average connection time: {avg_connection_time:.3f}s")
print(f" Max connection time: {max_connection_time:.3f}s")
print(f" Connections per second: {connections_per_second:.1f}")
print(f" Success rate: {success_rate:.1f}%")
def test_schema_isolation_performance(self):
"""Test performance of schema isolation under load"""
isolation_metrics = []
def schema_isolation_worker(tenant_id, worker_id):
"""Worker to test schema isolation"""
start_time = time.time()
operations_completed = 0
try:
tenant = self.tenants[tenant_id]
# Test schema-specific operations
with connection.cursor() as cursor:
# Switch to tenant schema
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
# Perform operations in tenant schema
for i in range(10):
# Count users in tenant schema
cursor.execute("SELECT COUNT(*) FROM auth_user;")
user_count = cursor.fetchone()[0]
# Get tenant-specific data
if tenant.business_type == 'retail':
cursor.execute("SELECT COUNT(*) FROM core_product;")
product_count = cursor.fetchone()[0]
elif tenant.business_type == 'healthcare':
cursor.execute("SELECT COUNT(*) FROM healthcare_patient;")
patient_count = cursor.fetchone()[0]
operations_completed += 1
# Small delay
time.sleep(0.001)
end_time = time.time()
isolation_metrics.append({
'worker_id': worker_id,
'tenant_id': tenant_id,
'operations_completed': operations_completed,
'time_taken': end_time - start_time,
'success': True
})
except Exception as e:
isolation_metrics.append({
'worker_id': worker_id,
'tenant_id': tenant_id,
'error': str(e),
'time_taken': time.time() - start_time,
'success': False
})
# Test schema isolation with concurrent access
threads = []
for i in range(30): # 30 concurrent workers
tenant_id = i % len(self.tenants)
thread = threading.Thread(
target=schema_isolation_worker,
args=(tenant_id, i)
)
threads.append(thread)
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Analyze isolation performance
successful_ops = [m for m in isolation_metrics if m['success']]
failed_ops = [m for m in isolation_metrics if not m['success']]
total_operations = sum(op['operations_completed'] for op in successful_ops)
success_rate = len(successful_ops) / len(isolation_metrics) * 100
operations_per_second = total_operations / total_time
if successful_ops:
avg_time_per_op = statistics.mean([op['time_taken'] / op['operations_completed'] for op in successful_ops])
# Performance assertions
self.assertLess(avg_time_per_op, 0.01,
"Average time per schema operation should be under 10ms")
self.assertGreaterEqual(success_rate, 95.0,
"Schema isolation success rate should be at least 95%")
self.assertGreater(operations_per_second, 50,
"Should handle at least 50 schema operations per second")
print(f"\nSchema Isolation Performance:")
print(f"Total time: {total_time:.2f}s")
print(f"Total operations: {total_operations}")
print(f"Operations per second: {operations_per_second:.1f}")
print(f"Success rate: {success_rate:.1f}%")
if successful_ops:
print(f"Average time per operation: {avg_time_per_op:.4f}s")
def test_resource_usage_optimization(self):
"""Test resource usage optimization under multi-tenant load"""
import psutil
import os
process = psutil.Process(os.getpid())
# Monitor resource usage during load test
def resource_monitor_worker(duration, results_queue):
"""Worker to monitor resource usage"""
start_time = time.time()
memory_samples = []
cpu_samples = []
while time.time() - start_time < duration:
memory_info = process.memory_info()
cpu_percent = process.cpu_percent()
memory_samples.append(memory_info.rss / 1024 / 1024) # MB
cpu_samples.append(cpu_percent)
time.sleep(0.1) # Sample every 100ms
results_queue.put({
'memory_samples': memory_samples,
'cpu_samples': cpu_samples,
'duration': duration
})
def load_worker(worker_id, operations):
"""Load generation worker"""
for i in range(operations):
try:
# Random tenant operations
tenant = random.choice(self.tenants)
# Perform random database operations
with connection.cursor() as cursor:
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
cursor.execute("SELECT COUNT(*) FROM auth_user;")
# Small delay
time.sleep(0.005)
except Exception as e:
print(f"Worker {worker_id} error: {e}")
# Start resource monitoring
monitor_results = queue.Queue()
monitor_thread = threading.Thread(
target=resource_monitor_worker,
args=(10, monitor_results) # Monitor for 10 seconds
)
monitor_thread.start()
# Start load generation
start_time = time.time()
threads = []
# Create load workers
for i in range(50): # 50 concurrent workers
thread = threading.Thread(
target=load_worker,
args=(i, 100) # Each worker performs 100 operations
)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Wait for monitoring to complete
monitor_thread.join()
resource_data = monitor_results.get()
# Analyze resource usage
memory_samples = resource_data['memory_samples']
cpu_samples = resource_data['cpu_samples']
avg_memory = statistics.mean(memory_samples)
max_memory = max(memory_samples)
avg_cpu = statistics.mean(cpu_samples)
max_cpu = max(cpu_samples)
total_operations = 50 * 100 # 50 workers * 100 operations each
operations_per_second = total_operations / total_time
# Performance assertions
self.assertLess(avg_memory, 1000, # 1GB
"Average memory usage should be under 1GB")
self.assertLess(max_memory, 1500, # 1.5GB
"Peak memory usage should be under 1.5GB")
self.assertLess(avg_cpu, 80, # 80%
"Average CPU usage should be under 80%")
self.assertGreater(operations_per_second, 25,
"Should handle at least 25 operations per second under load")
print(f"\nResource Usage Optimization Results:")
print(f"Total operations: {total_operations}")
print(f"Operations per second: {operations_per_second:.1f}")
print(f"Average memory usage: {avg_memory:.1f} MB")
print(f"Peak memory usage: {max_memory:.1f} MB")
print(f"Average CPU usage: {avg_cpu:.1f}%")
print(f"Peak CPU usage: {max_cpu:.1f}%")
def test_scalability_benchmark(self):
"""Test scalability with increasing load"""
scalability_results = []
# Test with different tenant counts
tenant_counts = [5, 10, 15, 20]
for tenant_count in tenant_counts:
print(f"\nTesting scalability with {tenant_count} tenants:")
# Use subset of tenants
test_tenants = self.tenants[:tenant_count]
def scalability_worker(operations):
"""Worker for scalability testing"""
for i in range(operations):
try:
tenant = random.choice(test_tenants)
# Perform tenant-specific operations
with connection.cursor() as cursor:
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
cursor.execute("SELECT COUNT(*) FROM auth_user;")
# Simulate processing time
time.sleep(0.01)
except Exception as e:
continue
# Run test with increasing concurrency
concurrency_levels = [5, 10, 20]
for concurrency in concurrency_levels:
start_time = time.time()
threads = []
# Create worker threads
for i in range(concurrency):
thread = threading.Thread(
target=scalability_worker,
args=(20,) # 20 operations per worker
)
threads.append(thread)
# Start and wait for completion
for thread in threads:
thread.start()
for thread in threads:
thread.join()
total_time = time.time() - start_time
total_operations = concurrency * 20
operations_per_second = total_operations / total_time
scalability_results.append({
'tenant_count': tenant_count,
'concurrency': concurrency,
'total_time': total_time,
'operations_per_second': operations_per_second
})
print(f" Concurrency {concurrency}: {operations_per_second:.1f} ops/sec")
# Analyze scalability
print(f"\nScalability Analysis:")
for result in scalability_results:
throughput = result['operations_per_second']
tenant_count = result['tenant_count']
concurrency = result['concurrency']
# Calculate throughput per tenant
throughput_per_tenant = throughput / tenant_count
print(f" {tenant_count} tenants, {concurrency} concurrent: "
f"{throughput:.1f} ops/sec ({throughput_per_tenant:.1f} per tenant)")
# Performance assertions for scalability
# Throughput should not decrease significantly with more tenants
baseline_throughput = scalability_results[0]['operations_per_second']
max_throughput = max(r['operations_per_second'] for r in scalability_results)
self.assertGreater(max_throughput, baseline_throughput * 0.5,
"Throughput should not degrade by more than 50% under load")
def test_multi_tenant_transaction_performance(self):
"""Test transaction performance across multiple tenants"""
transaction_metrics = []
def transaction_worker(tenant_id, worker_id):
"""Worker for transaction testing"""
start_time = time.time()
try:
tenant = self.tenants[tenant_id]
# Perform transactions in tenant schema
with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute(f'SET search_path TO "{tenant.schema_name}", public;')
# Create multiple records in a transaction
for i in range(5):
cursor.execute(
"INSERT INTO auth_user (username, email, password, tenant_id, is_active) "
"VALUES (%s, %s, %s, %s, %s) RETURNING id;",
[f'tx_user_{worker_id}_{i}', f'user{i}@{tenant.domain}', 'hash', tenant.id, True]
)
# Update tenant stats
cursor.execute(
"UPDATE core_tenant SET name = %s WHERE id = %s;",
[f'Updated at {time.time()}', tenant.id]
)
end_time = time.time()
transaction_metrics.append({
'worker_id': worker_id,
'tenant_id': tenant_id,
'time_taken': end_time - start_time,
'success': True
})
except Exception as e:
transaction_metrics.append({
'worker_id': worker_id,
'tenant_id': tenant_id,
'error': str(e),
'time_taken': time.time() - start_time,
'success': False
})
# Test concurrent transactions
threads = []
for i in range(40): # 40 concurrent transaction workers
tenant_id = i % len(self.tenants)
thread = threading.Thread(
target=transaction_worker,
args=(tenant_id, i)
)
threads.append(thread)
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
total_time = time.time() - start_time
# Analyze transaction performance
successful_tx = [m for m in transaction_metrics if m['success']]
failed_tx = [m for m in transaction_metrics if not m['success']]
success_rate = len(successful_tx) / len(transaction_metrics) * 100
if successful_tx:
avg_tx_time = statistics.mean([tx['time_taken'] for tx in successful_tx])
transactions_per_second = len(successful_tx) / total_time
# Performance assertions
self.assertLess(avg_tx_time, 0.1,
"Average transaction time should be under 100ms")
self.assertGreaterEqual(success_rate, 95.0,
"Transaction success rate should be at least 95%")
self.assertGreater(transactions_per_second, 20,
"Should handle at least 20 transactions per second")
print(f"\nMulti-Tenant Transaction Performance:")
print(f"Total time: {total_time:.2f}s")
print(f"Total transactions: {len(successful_tx)}")
print(f"Transactions per second: {len(successful_tx) / total_time:.1f}")
print(f"Success rate: {success_rate:.1f}%")
if successful_tx:
print(f"Average transaction time: {avg_tx_time:.3f}s")