Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""
|
|
Integration test for user authentication flow.
|
|
This test MUST fail before implementation.
|
|
"""
|
|
|
|
import pytest
|
|
from django.test import TestCase
|
|
from django.urls import reverse
|
|
from rest_framework.test import APIClient
|
|
from rest_framework import status
|
|
import json
|
|
import time
|
|
|
|
|
|
class UserAuthenticationIntegrationTest(TestCase):
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
|
|
# Test user credentials
|
|
self.test_user = {
|
|
'email': 'test.user@example.com',
|
|
'password': 'SecurePassword123!',
|
|
'name': 'Test User',
|
|
'role': 'TENANT_ADMIN'
|
|
}
|
|
|
|
def test_complete_authentication_flow(self):
|
|
"""Test complete authentication flow from login to logout."""
|
|
# Step 1: User login (should fail before implementation)
|
|
login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': self.test_user['password']
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert login_response.status_code == status.HTTP_200_OK
|
|
login_data = login_response.json()
|
|
|
|
# Verify token structure
|
|
assert 'access_token' in login_data
|
|
assert 'refresh_token' in login_data
|
|
assert 'user' in login_data
|
|
assert 'expires_in' in login_data
|
|
|
|
access_token = login_data['access_token']
|
|
refresh_token = login_data['refresh_token']
|
|
user_info = login_data['user']
|
|
|
|
# Verify user information
|
|
assert user_info['email'] == self.test_user['email']
|
|
assert user_info['name'] == self.test_user['name']
|
|
assert user_info['role'] == self.test_user['role']
|
|
assert 'tenant_id' in user_info
|
|
|
|
# Step 2: Use access token for authenticated requests
|
|
auth_header = {'HTTP_AUTHORIZATION': f'Bearer {access_token}'}
|
|
|
|
# Test accessing protected resource
|
|
protected_response = self.client.get(
|
|
'/api/v1/users/',
|
|
**auth_header
|
|
)
|
|
|
|
assert protected_response.status_code == status.HTTP_200_OK
|
|
|
|
# Step 3: Test token refresh
|
|
refresh_response = self.client.post(
|
|
'/api/v1/auth/refresh/',
|
|
data=json.dumps({
|
|
'refresh_token': refresh_token
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert refresh_response.status_code == status.HTTP_200_OK
|
|
refresh_data = refresh_response.json()
|
|
|
|
# Verify new tokens
|
|
assert 'access_token' in refresh_data
|
|
assert 'refresh_token' in refresh_data
|
|
|
|
# New access token should be different (rotation)
|
|
new_access_token = refresh_data['access_token']
|
|
assert new_access_token != access_token
|
|
|
|
# New refresh token should also be different (rotation)
|
|
new_refresh_token = refresh_data['refresh_token']
|
|
assert new_refresh_token != refresh_token
|
|
|
|
# Step 4: Test new access token works
|
|
new_auth_header = {'HTTP_AUTHORIZATION': f'Bearer {new_access_token}'}
|
|
|
|
new_protected_response = self.client.get(
|
|
'/api/v1/users/',
|
|
**new_auth_header
|
|
)
|
|
|
|
assert new_protected_response.status_code == status.HTTP_200_OK
|
|
|
|
# Step 5: Test old refresh token is invalidated
|
|
old_refresh_response = self.client.post(
|
|
'/api/v1/auth/refresh/',
|
|
data=json.dumps({
|
|
'refresh_token': refresh_token # Old token
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert old_refresh_response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
# Step 6: Test logout/blacklist tokens
|
|
logout_response = self.client.post(
|
|
'/api/v1/auth/logout/',
|
|
**new_auth_header
|
|
)
|
|
|
|
assert logout_response.status_code == status.HTTP_200_OK
|
|
logout_data = logout_response.json()
|
|
|
|
assert 'message' in logout_data
|
|
assert logout_data['message'] == 'Successfully logged out'
|
|
|
|
# Step 7: Test token is blacklisted (cannot be used after logout)
|
|
blacklisted_response = self.client.get(
|
|
'/api/v1/users/',
|
|
**new_auth_header
|
|
)
|
|
|
|
assert blacklisted_response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_multi_factor_authentication_flow(self):
|
|
"""Test multi-factor authentication flow."""
|
|
# Step 1: Initial login with MFA enabled user
|
|
mfa_login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': 'mfa.user@example.com',
|
|
'password': 'SecurePassword123!'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
# Should return MFA challenge instead of full token
|
|
assert mfa_login_response.status_code == status.HTTP_200_OK
|
|
mfa_data = mfa_login_response.json()
|
|
|
|
assert 'mfa_required' in mfa_data
|
|
assert mfa_data['mfa_required'] is True
|
|
assert 'mfa_methods' in mfa_data
|
|
assert 'temp_token' in mfa_data
|
|
|
|
# Step 2: Complete MFA with TOTP
|
|
mfa_verify_response = self.client.post(
|
|
'/api/v1/auth/mfa/verify/',
|
|
data=json.dumps({
|
|
'temp_token': mfa_data['temp_token'],
|
|
'method': 'TOTP',
|
|
'code': '123456' # Mock TOTP code
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert mfa_verify_response.status_code == status.HTTP_200_OK
|
|
mfa_verify_data = mfa_verify_response.json()
|
|
|
|
assert 'access_token' in mfa_verify_data
|
|
assert 'refresh_token' in mfa_verify_data
|
|
|
|
def test_authentication_error_scenarios(self):
|
|
"""Test various authentication error scenarios."""
|
|
# Test invalid credentials
|
|
invalid_credentials_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': 'wrongpassword'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert invalid_credentials_response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
# Test missing credentials
|
|
missing_credentials_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email']
|
|
# Missing password
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert missing_credentials_response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
# Test invalid refresh token
|
|
invalid_refresh_response = self.client.post(
|
|
'/api/v1/auth/refresh/',
|
|
data=json.dumps({
|
|
'refresh_token': 'invalid_refresh_token'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert invalid_refresh_response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
# Test missing refresh token
|
|
missing_refresh_response = self.client.post(
|
|
'/api/v1/auth/refresh/',
|
|
data=json.dumps({}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert missing_refresh_response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
def test_token_expiry_handling(self):
|
|
"""Test handling of expired tokens."""
|
|
# This test would need to simulate token expiration
|
|
# For now, we'll test the structure
|
|
pass
|
|
|
|
def test_concurrent_session_management(self):
|
|
"""Test concurrent session management."""
|
|
# Login first device
|
|
device1_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': self.test_user['password']
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert device1_response.status_code == status.HTTP_200_OK
|
|
device1_token = device1_response.json()['access_token']
|
|
|
|
# Login second device
|
|
device2_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': self.test_user['password']
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert device2_response.status_code == status.HTTP_200_OK
|
|
device2_token = device2_response.json()['access_token']
|
|
|
|
# Both tokens should work (assuming concurrent sessions are allowed)
|
|
device1_auth = {'HTTP_AUTHORIZATION': f'Bearer {device1_token}'}
|
|
device2_auth = {'HTTP_AUTHORIZATION': f'Bearer {device2_token}'}
|
|
|
|
device1_protected = self.client.get('/api/v1/users/', **device1_auth)
|
|
device2_protected = self.client.get('/api/v1/users/', **device2_auth)
|
|
|
|
assert device1_protected.status_code == status.HTTP_200_OK
|
|
assert device2_protected.status_code == status.HTTP_200_OK
|
|
|
|
def test_permission_based_access_control(self):
|
|
"""Test permission-based access control."""
|
|
# Login as regular user
|
|
user_login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': 'regular.user@example.com',
|
|
'password': 'SecurePassword123!'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert user_login_response.status_code == status.HTTP_200_OK
|
|
user_token = user_login_response.json()['access_token']
|
|
user_auth = {'HTTP_AUTHORIZATION': f'Bearer {user_token}'}
|
|
|
|
# Regular user should not be able to access admin-only endpoints
|
|
admin_endpoint_response = self.client.get('/api/v1/tenants/', **user_auth)
|
|
assert admin_endpoint_response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
# But should be able to access user endpoints
|
|
user_endpoint_response = self.client.get('/api/v1/users/', **user_auth)
|
|
assert user_endpoint_response.status_code == status.HTTP_200_OK
|
|
|
|
def test_tenant_isolation_in_authentication(self):
|
|
"""Test that authentication tokens include tenant isolation."""
|
|
# Login as tenant admin
|
|
tenant_admin_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': 'tenant.admin@tenant1.com',
|
|
'password': 'SecurePassword123!'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert tenant_admin_response.status_code == status.HTTP_200_OK
|
|
tenant_admin_data = tenant_admin_response.json()
|
|
|
|
# Token should include tenant information
|
|
assert 'tenant_id' in tenant_admin_data['user']
|
|
tenant1_id = tenant_admin_data['user']['tenant_id']
|
|
|
|
# Login as different tenant admin
|
|
tenant2_admin_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': 'tenant.admin@tenant2.com',
|
|
'password': 'SecurePassword123!'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert tenant2_admin_response.status_code == status.HTTP_200_OK
|
|
tenant2_admin_data = tenant2_admin_response.json()
|
|
|
|
# Should have different tenant ID
|
|
assert 'tenant_id' in tenant2_admin_data['user']
|
|
tenant2_id = tenant2_admin_data['user']['tenant_id']
|
|
|
|
assert tenant1_id != tenant2_id
|
|
|
|
def test_authentication_rate_limiting(self):
|
|
"""Test authentication rate limiting."""
|
|
# Test multiple failed login attempts
|
|
for i in range(5):
|
|
failed_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': 'wrongpassword'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
# Should still allow attempts but may implement rate limiting
|
|
assert failed_response.status_code in [status.HTTP_401_UNAUTHORIZED, status.HTTP_429_TOO_MANY_REQUESTS]
|
|
|
|
def test_password_change_flow(self):
|
|
"""Test password change flow with authentication."""
|
|
# Login first
|
|
login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': self.test_user['password']
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert login_response.status_code == status.HTTP_200_OK
|
|
access_token = login_response.json()['access_token']
|
|
auth_header = {'HTTP_AUTHORIZATION': f'Bearer {access_token}'}
|
|
|
|
# Change password
|
|
password_change_response = self.client.post(
|
|
'/api/v1/auth/change-password/',
|
|
data=json.dumps({
|
|
'current_password': self.test_user['password'],
|
|
'new_password': 'NewSecurePassword456!'
|
|
}),
|
|
content_type='application/json',
|
|
**auth_header
|
|
)
|
|
|
|
assert password_change_response.status_code == status.HTTP_200_OK
|
|
|
|
# Test login with new password
|
|
new_login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': 'NewSecurePassword456!'
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert new_login_response.status_code == status.HTTP_200_OK
|
|
|
|
# Test old password no longer works
|
|
old_login_response = self.client.post(
|
|
'/api/v1/auth/login/',
|
|
data=json.dumps({
|
|
'email': self.test_user['email'],
|
|
'password': self.test_user['password']
|
|
}),
|
|
content_type='application/json'
|
|
)
|
|
|
|
assert old_login_response.status_code == status.HTTP_401_UNAUTHORIZED |