""" Performance Tests for API Endpoints Tests for API performance optimization: - Response time optimization - Concurrency handling - Rate limiting efficiency - Caching strategies - Payload size optimization Author: Claude """ import pytest import time import statistics import threading import requests from django.test import TestCase, Client from django.urls import reverse from django.contrib.auth import get_user_model from django.core.cache import cache from django.conf import settings from decimal import Decimal from datetime import date from backend.src.core.models.tenant import Tenant from backend.src.core.models.user import User from backend.src.modules.retail.models.product import Product User = get_user_model() class APIPerformanceTest(TestCase): """Test cases for API performance optimization""" def setUp(self): self.client = Client() # Create test tenant and user self.tenant = Tenant.objects.create( name='API Performance Test', schema_name='api_perf_test', domain='apiperf.com', business_type='retail' ) self.user = User.objects.create_user( username='testuser', email='test@example.com', password='test123', tenant=self.tenant, role='admin' ) # Create test data self.products = [] for i in range(100): product = Product.objects.create( tenant=self.tenant, sku=f'API-TEST-{i:06d}', name=f'API Test Product {i}', description=f'Description for API test product {i}', category='electronics', brand='Test Brand', barcode=f'123456789{i:04d}', unit='piece', current_stock=100 + i, minimum_stock=10, maximum_stock=500, purchase_price=Decimal('50.00') + (i * 0.1), selling_price=Decimal('100.00') + (i * 0.2), tax_rate=10.0, is_active=True ) self.products.append(product) def test_api_response_time_optimization(self): """Test API response time optimization""" # Test various API endpoints endpoints = [ ('api:tenant-list', 'GET', {}), ('api:user-list', 'GET', {}), ('api:product-list', 'GET', {}), ('api:tenant-detail', 'GET', {'pk': self.tenant.id}), ('api:user-detail', 'GET', {'pk': self.user.id}), ('api:product-detail', 'GET', {'pk': self.products[0].id}), ] response_times = {} for endpoint_name, method, params in endpoints: times = [] # Warm up cache for _ in range(3): if method == 'GET': self.client.get(reverse(endpoint_name, kwargs=params)) elif method == 'POST': self.client.post(reverse(endpoint_name, kwargs=params)) # Measure response times for _ in range(10): start_time = time.time() if method == 'GET': response = self.client.get(reverse(endpoint_name, kwargs=params)) elif method == 'POST': response = self.client.post(reverse(endpoint_name, kwargs=params)) response_time = time.time() - start_time times.append(response_time) # Verify response is successful self.assertEqual(response.status_code, 200) avg_time = statistics.mean(times) max_time = max(times) min_time = min(times) response_times[endpoint_name] = { 'avg': avg_time, 'max': max_time, 'min': min_time, 'times': times } # Performance assertions self.assertLess(avg_time, 0.5, f"Average response time for {endpoint_name} should be under 500ms") self.assertLess(max_time, 1.0, f"Maximum response time for {endpoint_name} should be under 1s") # Log performance metrics print(f"\nAPI Response Time Performance:") for endpoint, metrics in response_times.items(): print(f"{endpoint}: avg={metrics['avg']:.3f}s, max={metrics['max']:.3f}s, min={metrics['min']:.3f}s") def test_concurrent_request_handling(self): """Test concurrent request handling""" def make_request(request_id, results): start_time = time.time() try: response = self.client.get(reverse('api:product-list')) response_time = time.time() - start_time results.append({ 'request_id': request_id, 'success': response.status_code == 200, 'response_time': response_time, 'status_code': response.status_code }) except Exception as e: results.append({ 'request_id': request_id, 'success': False, 'error': str(e), 'response_time': time.time() - start_time }) # Test with different concurrency levels concurrency_levels = [10, 25, 50] for concurrency in concurrency_levels: results = [] threads = [] # Create concurrent requests for i in range(concurrency): thread = threading.Thread( target=make_request, args=(i, results) ) threads.append(thread) # Start all threads start_time = time.time() for thread in threads: thread.start() # Wait for all threads to complete for thread in threads: thread.join() total_time = time.time() - start_time # Analyze results successful_requests = [r for r in results if r['success']] failed_requests = [r for r in results if not r['success']] success_rate = len(successful_requests) / len(results) * 100 avg_response_time = statistics.mean([r['response_time'] for r in results]) # Performance assertions self.assertGreaterEqual(success_rate, 95.0, f"Success rate should be at least 95% for {concurrency} concurrent requests") self.assertLess(total_time, 5.0, f"Total time for {concurrency} concurrent requests should be under 5s") print(f"\nConcurrency Test ({concurrency} requests):") print(f"Success rate: {success_rate:.1f}%") print(f"Total time: {total_time:.3f}s") print(f"Average response time: {avg_response_time:.3f}s") print(f"Failed requests: {len(failed_requests)}") def test_rate_limiting_efficiency(self): """Test rate limiting efficiency""" # This test assumes rate limiting is implemented # Make rapid requests to test rate limiting request_results = [] for i in range(100): start_time = time.time() response = self.client.get(reverse('api:product-list')) response_time = time.time() - start_time request_results.append({ 'request_number': i, 'status_code': response.status_code, 'response_time': response_time, 'timestamp': time.time() }) # Analyze rate limiting effectiveness successful_requests = [r for r in request_results if r['status_code'] == 200] rate_limited_requests = [r for r in request_results if r['status_code'] == 429] print(f"\nRate Limiting Test:") print(f"Total requests: {len(request_results)}") print(f"Successful requests: {len(successful_requests)}") print(f"Rate limited requests: {len(rate_limited_requests)}") # If rate limiting is implemented, some requests should be limited if len(rate_limited_requests) > 0: print(f"Rate limiting is working - {len(rate_limited_requests)} requests were limited") # Response times should remain consistent even under load response_times = [r['response_time'] for r in successful_requests] if response_times: avg_response_time = statistics.mean(response_times) max_response_time = max(response_times) self.assertLess(avg_response_time, 0.5, "Average response time should remain under 500ms even with rate limiting") print(f"Average response time for successful requests: {avg_response_time:.3f}s") def test_caching_strategies(self): """Test caching strategies performance""" # Clear cache before testing cache.clear() # Test cache hit/miss performance endpoint = reverse('api:product-list') # First request (cache miss) start_time = time.time() response1 = self.client.get(endpoint) cache_miss_time = time.time() - start_time # Second request (cache hit) start_time = time.time() response2 = self.client.get(endpoint) cache_hit_time = time.time() - start_time # Multiple cache hits cache_hit_times = [] for _ in range(10): start_time = time.time() response = self.client.get(endpoint) cache_hit_times.append(time.time() - start_time) avg_cache_hit_time = statistics.mean(cache_hit_times) # Performance assertions self.assertLess(cache_miss_time, 1.0, "Cache miss should complete within 1s") self.assertLess(cache_hit_time, 0.1, "Cache hit should complete within 100ms") self.assertLess(avg_cache_hit_time, 0.05, "Average cache hit should be under 50ms") # Cache hit should be faster than cache miss self.assertLess(avg_cache_hit_time, cache_miss_time * 0.5, "Cache hit should be significantly faster than cache miss") print(f"\nCaching Strategy Performance:") print(f"Cache miss time: {cache_miss_time:.3f}s") print(f"First cache hit time: {cache_hit_time:.3f}s") print(f"Average cache hit time: {avg_cache_hit_time:.3f}s") print(f"Cache improvement: {(cache_miss_time / avg_cache_hit_time):.1f}x") def test_payload_size_optimization(self): """Test payload size optimization""" # Test different payload sizes test_sizes = [10, 50, 100, 500] for size in test_sizes: # Create test data test_products = [] for i in range(size): test_products.append({ 'sku': f'PAYLOAD-{i:06d}', 'name': f'Payload Test Product {i}', 'description': 'A' * 100, # Long description 'category': 'electronics', 'brand': 'Test Brand', 'current_stock': 100, 'purchase_price': '50.00', 'selling_price': '100.00' }) # Test different response formats # Full payload start_time = time.time() response = self.client.get(reverse('api:product-list')) full_payload_time = time.time() - start_time full_payload_size = len(response.content) # Paginated payload (assuming pagination is implemented) start_time = time.time() response = self.client.get(reverse('api:product-list') + '?page=1&page_size=20') paginated_time = time.time() - start_time paginated_size = len(response.content) # Fields-limited payload start_time = time.time() response = self.client.get(reverse('api:product-list') + '?fields=id,name,sku') fields_limited_time = time.time() - start_time fields_limited_size = len(response.content) # Performance assertions self.assertLess(full_payload_time, 2.0, f"Full payload request for {size} items should complete within 2s") self.assertLess(paginated_time, 0.5, f"Paginated request should be faster") self.assertLess(fields_limited_time, 0.3, f"Fields-limited request should be fastest") # Size assertions self.assertLess(paginated_size, full_payload_size * 0.3, f"Paginated payload should be much smaller for {size} items") self.assertLess(fields_limited_size, full_payload_size * 0.2, f"Fields-limited payload should be smallest") print(f"\nPayload Optimization Test ({size} items):") print(f"Full payload: {full_payload_time:.3f}s, {full_payload_size} bytes") print(f"Paginated: {paginated_time:.3f}s, {paginated_size} bytes") print(f"Fields limited: {fields_limited_time:.3f}s, {fields_limited_size} bytes") def test_database_query_optimization(self): """Test database query optimization in API calls""" # Test N+1 query problems # First, test without optimization start_time = time.time() response = self.client.get(reverse('api:product-list')) unoptimized_time = time.time() - start_time # Test with select_related (assuming optimization is implemented) start_time = time.time() response = self.client.get(reverse('api:product-list') + '?select_related=tenant') optimized_time = time.time() - start_time # Test with pagination start_time = time.time() response = self.client.get(reverse('api:product-list') + '?page=1&page_size=10') paginated_time = time.time() - start_time # Performance assertions self.assertLess(unoptimized_time, 1.0, "Unoptimized query should complete within 1s") self.assertLess(optimized_time, unoptimized_time * 0.8, "Optimized query should be faster") self.assertLess(paginated_time, unoptimized_time * 0.3, "Paginated query should be much faster") print(f"\nDatabase Query Optimization:") print(f"Unoptimized query: {unoptimized_time:.3f}s") print(f"Optimized query: {optimized_time:.3f}s") print(f"Paginated query: {paginated_time:.3f}s") def test_memory_usage_optimization(self): """Test memory usage optimization""" import psutil import os process = psutil.Process(os.getpid()) # Test memory usage with large datasets initial_memory = process.memory_info().rss / 1024 / 1024 # MB # Make multiple requests with large payloads for i in range(10): response = self.client.get(reverse('api:product-list')) # Process response to simulate real usage data = response.json() peak_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = peak_memory - initial_memory # Performance assertions self.assertLess(memory_increase, 50, "Memory increase should be under 50MB for large dataset processing") print(f"\nMemory Usage Optimization:") print(f"Initial memory: {initial_memory:.1f} MB") print(f"Peak memory: {peak_memory:.1f} MB") print(f"Memory increase: {memory_increase:.1f} MB") def test_authentication_performance(self): """Test authentication performance""" # Test login performance login_data = { 'username': 'testuser', 'password': 'test123' } login_times = [] for _ in range(10): start_time = time.time() response = self.client.post(reverse('api:login'), login_data) login_time = time.time() - start_time login_times.append(login_time) self.assertEqual(response.status_code, 200) avg_login_time = statistics.mean(login_times) # Test authenticated request performance self.client.login(username='testuser', password='test123') auth_request_times = [] for _ in range(10): start_time = time.time() response = self.client.get(reverse('api:product-list')) auth_request_time = time.time() - start_time auth_request_times.append(auth_request_time) self.assertEqual(response.status_code, 200) avg_auth_request_time = statistics.mean(auth_request_times) # Performance assertions self.assertLess(avg_login_time, 0.5, "Average login time should be under 500ms") self.assertLess(avg_auth_request_time, 0.2, "Average authenticated request time should be under 200ms") print(f"\nAuthentication Performance:") print(f"Average login time: {avg_login_time:.3f}s") print(f"Average authenticated request time: {avg_auth_request_time:.3f}s")