Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
865 lines
30 KiB
Python
865 lines
30 KiB
Python
"""
|
|
Database Index Management Module
|
|
|
|
This module provides comprehensive index management utilities for the multi-tenant SaaS platform,
|
|
including index creation, monitoring, optimization, and maintenance specifically designed for
|
|
PostgreSQL with multi-tenant architecture and Malaysian market requirements.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple, Any, Set
|
|
from django.db import connection, connections
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
from django_tenants.utils import schema_context
|
|
import time
|
|
import json
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
import re
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IndexType(Enum):
|
|
"""Types of database indexes."""
|
|
BTREE = "btree"
|
|
HASH = "hash"
|
|
GIST = "gist"
|
|
GIN = "gin"
|
|
BRIN = "brin"
|
|
SPGIST = "spgist"
|
|
PARTIAL = "partial"
|
|
EXPRESSION = "expression"
|
|
UNIQUE = "unique"
|
|
COMPOSITE = "composite"
|
|
|
|
|
|
class IndexStatus(Enum):
|
|
"""Status of database indexes."""
|
|
ACTIVE = "active"
|
|
INACTIVE = "inactive"
|
|
INVALID = "invalid"
|
|
CREATING = "creating"
|
|
DROPPING = "dropping"
|
|
REBUILDING = "rebuilding"
|
|
|
|
|
|
@dataclass
|
|
class IndexInfo:
|
|
"""Information about a database index."""
|
|
name: str
|
|
table_name: str
|
|
column_names: List[str]
|
|
index_type: IndexType
|
|
status: IndexStatus
|
|
is_unique: bool
|
|
is_primary: bool
|
|
size_bytes: int
|
|
usage_count: int
|
|
last_used: Optional[timezone.datetime]
|
|
create_statement: str
|
|
tenant_schema: str
|
|
|
|
|
|
@dataclass
|
|
class IndexRecommendation:
|
|
"""Recommendation for index management."""
|
|
action: str # 'create', 'drop', 'rebuild', 'modify'
|
|
index_name: Optional[str]
|
|
table_name: str
|
|
columns: List[str]
|
|
index_type: IndexType
|
|
reason: str
|
|
impact: str
|
|
priority: str # 'low', 'medium', 'high', 'critical'
|
|
estimated_benefit: str
|
|
|
|
|
|
class IndexManager:
|
|
"""
|
|
Comprehensive index management system for the multi-tenant SaaS platform.
|
|
|
|
Features:
|
|
- Automatic index creation and management
|
|
- Performance monitoring and analysis
|
|
- Multi-tenant index optimization
|
|
- Malaysian market-specific indexing
|
|
- Index maintenance and cleanup
|
|
"""
|
|
|
|
def __init__(self, tenant_schema: Optional[str] = None):
|
|
self.tenant_schema = tenant_schema
|
|
self.index_cache = {}
|
|
self.last_analysis = None
|
|
self.stats = {
|
|
'indexes_managed': 0,
|
|
'indexes_created': 0,
|
|
'indexes_dropped': 0,
|
|
'indexes_rebuilt': 0,
|
|
'performance_improvement': 0.0
|
|
}
|
|
|
|
def get_all_indexes(self, refresh: bool = False) -> List[IndexInfo]:
|
|
"""
|
|
Get all indexes in the database.
|
|
|
|
Args:
|
|
refresh: Force refresh from database
|
|
|
|
Returns:
|
|
List of IndexInfo objects
|
|
"""
|
|
cache_key = f"all_indexes_{self.tenant_schema or 'public'}"
|
|
|
|
if not refresh and cache_key in self.index_cache:
|
|
return self.index_cache[cache_key]
|
|
|
|
indexes = []
|
|
|
|
with connection.cursor() as cursor:
|
|
# Get basic index information
|
|
cursor.execute("""
|
|
SELECT
|
|
i.relname as index_name,
|
|
t.relname as table_name,
|
|
am.amname as index_type,
|
|
idx.indisunique as is_unique,
|
|
idx.indisprimary as is_primary,
|
|
pg_get_indexdef(idx.indexrelid) as definition,
|
|
pg_relation_size(i.relid) as size_bytes,
|
|
schemaname
|
|
FROM pg_index idx
|
|
JOIN pg_class i ON i.oid = idx.indexrelid
|
|
JOIN pg_class t ON t.oid = idx.indrelid
|
|
JOIN pg_namespace n ON n.oid = t.relnamespace
|
|
JOIN pg_am am ON am.oid = i.relam
|
|
WHERE schemaname = %s
|
|
ORDER BY t.relname, i.relname
|
|
""", [self.tenant_schema or 'public'])
|
|
|
|
results = cursor.fetchall()
|
|
|
|
for row in results:
|
|
index_name, table_name, index_type_str, is_unique, is_primary, definition, size_bytes, schema = row
|
|
|
|
# Extract column names from definition
|
|
column_names = self._extract_column_names(definition)
|
|
|
|
# Get usage statistics
|
|
usage_info = self._get_index_usage(cursor, index_name, schema)
|
|
|
|
# Determine index type
|
|
index_type = self._determine_index_type(definition, index_type_str)
|
|
|
|
# Get index status
|
|
status = self._get_index_status(cursor, index_name, schema)
|
|
|
|
index_info = IndexInfo(
|
|
name=index_name,
|
|
table_name=table_name,
|
|
column_names=column_names,
|
|
index_type=index_type,
|
|
status=status,
|
|
is_unique=is_unique,
|
|
is_primary=is_primary,
|
|
size_bytes=size_bytes or 0,
|
|
usage_count=usage_info.get('usage_count', 0),
|
|
last_used=usage_info.get('last_used'),
|
|
create_statement=definition,
|
|
tenant_schema=schema
|
|
)
|
|
|
|
indexes.append(index_info)
|
|
|
|
self.index_cache[cache_key] = indexes
|
|
self.last_analysis = timezone.now()
|
|
return indexes
|
|
|
|
def _extract_column_names(self, definition: str) -> List[str]:
|
|
"""Extract column names from index definition."""
|
|
# Extract column names from CREATE INDEX statement
|
|
match = re.search(r'ON\s+\w+\s*\(([^)]+)\)', definition)
|
|
if match:
|
|
columns_part = match.group(1]
|
|
# Split by commas and clean up
|
|
columns = [col.strip().strip('"') for col in columns_part.split(',')]
|
|
return columns
|
|
return []
|
|
|
|
def _get_index_usage(self, cursor, index_name: str, schema: str) -> Dict[str, Any]:
|
|
"""Get index usage statistics."""
|
|
try:
|
|
cursor.execute("""
|
|
SELECT
|
|
idx_scan as usage_count,
|
|
idx_tup_read as tuples_read,
|
|
idx_tup_fetch as tuples_fetched
|
|
FROM pg_stat_user_indexes
|
|
WHERE schemaname = %s AND indexrelname = %s
|
|
""", [schema, index_name])
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return {
|
|
'usage_count': result[0] or 0,
|
|
'tuples_read': result[1] or 0,
|
|
'tuples_fetched': result[2] or 0,
|
|
'last_used': timezone.now() if result[0] > 0 else None
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting index usage for {index_name}: {e}")
|
|
|
|
return {'usage_count': 0, 'tuples_read': 0, 'tuples_fetched': 0}
|
|
|
|
def _determine_index_type(self, definition: str, am_name: str) -> IndexType:
|
|
"""Determine index type from definition and access method."""
|
|
if am_name == "btree":
|
|
# Check for special cases
|
|
if "UNIQUE" in definition.upper():
|
|
return IndexType.UNIQUE
|
|
elif "WHERE" in definition.upper():
|
|
return IndexType.PARTIAL
|
|
elif "(" in definition and ")" in definition:
|
|
# Check if it's an expression index
|
|
content_between_parens = re.search(r'\(([^)]+)\)', definition)
|
|
if content_between_parens:
|
|
content = content_between_parens.group(1)
|
|
if not all(col.strip().isalnum() for col in content.split(',')):
|
|
return IndexType.EXPRESSION
|
|
return IndexType.BTREE
|
|
elif am_name == "hash":
|
|
return IndexType.HASH
|
|
elif am_name == "gist":
|
|
return IndexType.GIST
|
|
elif am_name == "gin":
|
|
return IndexType.GIN
|
|
elif am_name == "brin":
|
|
return IndexType.BRIN
|
|
elif am_name == "spgist":
|
|
return IndexType.SPGIST
|
|
|
|
return IndexType.BTREE
|
|
|
|
def _get_index_status(self, cursor, index_name: str, schema: str) -> IndexStatus:
|
|
"""Get current status of an index."""
|
|
try:
|
|
cursor.execute("""
|
|
SELECT indisvalid
|
|
FROM pg_index
|
|
WHERE indexrelid = (
|
|
SELECT oid FROM pg_class
|
|
WHERE relname = %s AND relnamespace = (
|
|
SELECT oid FROM pg_namespace WHERE nspname = %s
|
|
)
|
|
)
|
|
""", [index_name, schema])
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
return IndexStatus.ACTIVE if result[0] else IndexStatus.INVALID
|
|
except Exception as e:
|
|
logger.error(f"Error getting index status for {index_name}: {e}")
|
|
|
|
return IndexStatus.ACTIVE
|
|
|
|
def analyze_index_performance(self) -> Dict[str, Any]:
|
|
"""
|
|
Analyze index performance and generate recommendations.
|
|
|
|
Returns:
|
|
Dictionary with performance analysis and recommendations
|
|
"""
|
|
indexes = self.get_all_indexes(refresh=True)
|
|
recommendations = []
|
|
|
|
# Analyze unused indexes
|
|
unused_indexes = [
|
|
idx for idx in indexes
|
|
if idx.usage_count == 0 and not idx.is_primary
|
|
]
|
|
|
|
for idx in unused_indexes:
|
|
recommendations.append(IndexRecommendation(
|
|
action="drop",
|
|
index_name=idx.name,
|
|
table_name=idx.table_name,
|
|
columns=idx.column_names,
|
|
index_type=idx.index_type,
|
|
reason=f"Index {idx.name} has never been used",
|
|
impact="Reduces storage and maintenance overhead",
|
|
priority="medium",
|
|
estimated_benefit=f"Save {self._format_bytes(idx.size_bytes)}"
|
|
))
|
|
|
|
# Analyze duplicate indexes
|
|
recommendations.extend(self._find_duplicate_indexes(indexes))
|
|
|
|
# Analyze missing indexes
|
|
recommendations.extend(self._find_missing_indexes())
|
|
|
|
# Analyze fragmented indexes
|
|
recommendations.extend(self._analyze_fragmentation(indexes))
|
|
|
|
return {
|
|
'total_indexes': len(indexes),
|
|
'unused_indexes': len(unused_indexes),
|
|
'total_index_size': sum(idx.size_bytes for idx in indexes),
|
|
'recommendations': recommendations,
|
|
'high_priority_count': len([r for r in recommendations if r.priority == 'critical']),
|
|
'analysis_timestamp': timezone.now()
|
|
}
|
|
|
|
def _find_duplicate_indexes(self, indexes: List[IndexInfo]) -> List[IndexRecommendation]:
|
|
"""Find duplicate or redundant indexes."""
|
|
recommendations = []
|
|
index_groups = {}
|
|
|
|
# Group indexes by table and columns
|
|
for idx in indexes:
|
|
key = (idx.table_name, tuple(sorted(idx.column_names)))
|
|
if key not in index_groups:
|
|
index_groups[key] = []
|
|
index_groups[key].append(idx)
|
|
|
|
for (table, columns), group in index_groups.items():
|
|
if len(group) > 1:
|
|
# Sort by usage and keep the most used
|
|
group.sort(key=lambda x: x.usage_count, reverse=True)
|
|
keep_idx = group[0]
|
|
|
|
for drop_idx in group[1:]:
|
|
recommendations.append(IndexRecommendation(
|
|
action="drop",
|
|
index_name=drop_idx.name,
|
|
table_name=table,
|
|
columns=list(columns),
|
|
index_type=drop_idx.index_type,
|
|
reason=f"Duplicate index (redundant with {keep_idx.name})",
|
|
impact="Reduces storage and write overhead",
|
|
priority="low",
|
|
estimated_benefit=f"Save {self._format_bytes(drop_idx.size_bytes)}"
|
|
))
|
|
|
|
return recommendations
|
|
|
|
def _find_missing_indexes(self) -> List[IndexRecommendation]:
|
|
"""Find potentially missing indexes based on query patterns."""
|
|
recommendations = []
|
|
|
|
with connection.cursor() as cursor:
|
|
# Analyze sequential scans on large tables
|
|
cursor.execute("""
|
|
SELECT
|
|
schemaname,
|
|
tablename,
|
|
seq_scan,
|
|
seq_tup_read,
|
|
pg_total_relation_size(schemaname||'.'||tablename) as table_size
|
|
FROM pg_stat_user_tables
|
|
WHERE seq_scan > 1000
|
|
AND pg_total_relation_size(schemaname||'.'||tablename) > 100 * 1024 * 1024
|
|
ORDER BY seq_scan DESC
|
|
LIMIT 10
|
|
""")
|
|
|
|
for row in cursor.fetchall():
|
|
schema, table, seq_scan, seq_tup_read, table_size = row
|
|
|
|
recommendations.append(IndexRecommendation(
|
|
action="create",
|
|
index_name=None,
|
|
table_name=table,
|
|
columns=["id"], # Default recommendation
|
|
index_type=IndexType.BTREE,
|
|
reason=f"Table {table} has {seq_scan} sequential scans",
|
|
impact="Improve query performance for large table",
|
|
priority="high",
|
|
estimated_benefit=f"Reduce sequential scans by ~{int(seq_scan * 0.8)}"
|
|
))
|
|
|
|
return recommendations
|
|
|
|
def _analyze_fragmentation(self, indexes: List[IndexInfo]) -> List[IndexRecommendation]:
|
|
"""Analyze index fragmentation and recommend rebuilding."""
|
|
recommendations = []
|
|
|
|
with connection.cursor() as cursor:
|
|
for idx in indexes:
|
|
# Check index bloat (simplified check)
|
|
if idx.size_bytes > 10 * 1024 * 1024: # > 10MB
|
|
# Large indexes might benefit from rebuilding
|
|
if idx.usage_count > 1000: # Heavily used
|
|
recommendations.append(IndexRecommendation(
|
|
action="rebuild",
|
|
index_name=idx.name,
|
|
table_name=idx.table_name,
|
|
columns=idx.column_names,
|
|
index_type=idx.index_type,
|
|
reason=f"Large index {idx.name} with high usage may be fragmented",
|
|
impact="Improve query performance and reduce storage",
|
|
priority="medium",
|
|
estimated_benefit="Optimize read performance"
|
|
))
|
|
|
|
return recommendations
|
|
|
|
def create_index(self, table_name: str, columns: List[str],
|
|
index_type: IndexType = IndexType.BTREE,
|
|
unique: bool = False,
|
|
partial_condition: Optional[str] = None,
|
|
concurrently: bool = True) -> str:
|
|
"""
|
|
Create a new index.
|
|
|
|
Args:
|
|
table_name: Name of the table
|
|
columns: List of column names to index
|
|
index_type: Type of index to create
|
|
unique: Whether to create unique index
|
|
partial_condition: WHERE clause for partial index
|
|
concurrently: Create index concurrently (locks table less)
|
|
|
|
Returns:
|
|
Name of created index
|
|
"""
|
|
# Generate index name
|
|
index_name = f"idx_{table_name}_{'_'.join(columns)}"
|
|
|
|
if unique:
|
|
index_name = f"unq_{table_name}_{'_'.join(columns)}"
|
|
|
|
# Build CREATE INDEX statement
|
|
sql_parts = ["CREATE"]
|
|
|
|
if concurrently:
|
|
sql_parts.append("CONCURRENTLY")
|
|
|
|
if unique:
|
|
sql_parts.append("UNIQUE")
|
|
|
|
sql_parts.append("INDEX")
|
|
sql_parts.append(index_name)
|
|
sql_parts.append("ON")
|
|
sql_parts.append(table_name)
|
|
|
|
# Add USING clause for non-BTREE indexes
|
|
if index_type != IndexType.BTREE:
|
|
sql_parts.append(f"USING {index_type.value}")
|
|
|
|
# Add column list
|
|
sql_parts.append(f"({', '.join(columns)})")
|
|
|
|
# Add partial condition if specified
|
|
if partial_condition:
|
|
sql_parts.append(f"WHERE {partial_condition}")
|
|
|
|
create_sql = " ".join(sql_parts) + ";"
|
|
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(create_sql)
|
|
|
|
logger.info(f"Created index {index_name} on {table_name}")
|
|
self.stats['indexes_created'] += 1
|
|
self.stats['indexes_managed'] += 1
|
|
|
|
# Clear cache
|
|
self.index_cache.clear()
|
|
|
|
return index_name
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create index {index_name}: {e}")
|
|
raise
|
|
|
|
def drop_index(self, index_name: str, concurrently: bool = True) -> bool:
|
|
"""
|
|
Drop an existing index.
|
|
|
|
Args:
|
|
index_name: Name of index to drop
|
|
concurrently: Drop index concurrently
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
drop_sql = f"DROP INDEX {'CONCURRENTLY' if concurrently else ''} {index_name};"
|
|
cursor.execute(drop_sql)
|
|
|
|
logger.info(f"Dropped index {index_name}")
|
|
self.stats['indexes_dropped'] += 1
|
|
self.stats['indexes_managed'] += 1
|
|
|
|
# Clear cache
|
|
self.index_cache.clear()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to drop index {index_name}: {e}")
|
|
return False
|
|
|
|
def rebuild_index(self, index_name: str) -> bool:
|
|
"""
|
|
Rebuild an existing index (REINDEX).
|
|
|
|
Args:
|
|
index_name: Name of index to rebuild
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(f"REINDEX INDEX {index_name};")
|
|
|
|
logger.info(f"Rebuilt index {index_name}")
|
|
self.stats['indexes_rebuilt'] += 1
|
|
self.stats['indexes_managed'] += 1
|
|
|
|
# Clear cache
|
|
self.index_cache.clear()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to rebuild index {index_name}: {e}")
|
|
return False
|
|
|
|
def create_malaysian_indexes(self) -> List[str]:
|
|
"""
|
|
Create indexes specifically for Malaysian market requirements.
|
|
|
|
Returns:
|
|
List of created index names
|
|
"""
|
|
created_indexes = []
|
|
|
|
# Malaysian-specific indexes
|
|
malaysian_indexes = [
|
|
{
|
|
'table': 'core_user',
|
|
'columns': ['ic_number'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Malaysian IC validation and lookup'
|
|
},
|
|
{
|
|
'table': 'core_address',
|
|
'columns': ['postcode'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Malaysian postcode lookups'
|
|
},
|
|
{
|
|
'table': 'core_address',
|
|
'columns': ['state'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Malaysian state filtering'
|
|
},
|
|
{
|
|
'table': 'core_business',
|
|
'columns': ['registration_number'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Business registration number lookup'
|
|
},
|
|
{
|
|
'table': 'core_sstrate',
|
|
'columns': ['rate'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'SST rate queries'
|
|
},
|
|
{
|
|
'table': 'retail_product',
|
|
'columns': ['barcode'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Product barcode scanning'
|
|
},
|
|
{
|
|
'table': 'healthcare_patient',
|
|
'columns': ['ic_number'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Patient IC number lookup'
|
|
},
|
|
{
|
|
'table': 'education_student',
|
|
'columns': ['ic_number'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Student IC number lookup'
|
|
},
|
|
{
|
|
'table': 'logistics_vehicle',
|
|
'columns': ['registration_number'],
|
|
'type': IndexType.BTREE,
|
|
'unique': True,
|
|
'reason': 'Vehicle registration lookup'
|
|
}
|
|
]
|
|
|
|
for index_config in malaysian_indexes:
|
|
try:
|
|
index_name = self.create_index(
|
|
table_name=index_config['table'],
|
|
columns=index_config['columns'],
|
|
index_type=index_config['type'],
|
|
unique=index_config.get('unique', False)
|
|
)
|
|
created_indexes.append(index_name)
|
|
logger.info(f"Created Malaysian index: {index_name} - {index_config['reason']}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to create Malaysian index for {index_config['table']}: {e}")
|
|
|
|
return created_indexes
|
|
|
|
def create_multi_tenant_indexes(self) -> List[str]:
|
|
"""
|
|
Create indexes optimized for multi-tenant architecture.
|
|
|
|
Returns:
|
|
List of created index names
|
|
"""
|
|
created_indexes = []
|
|
|
|
# Multi-tenant optimization indexes
|
|
tenant_indexes = [
|
|
{
|
|
'table': 'core_user',
|
|
'columns': ['tenant_id', 'is_active'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Tenant-scoped user queries with status'
|
|
},
|
|
{
|
|
'table': 'core_transaction',
|
|
'columns': ['tenant_id', 'created_at'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Tenant transaction history by date'
|
|
},
|
|
{
|
|
'table': 'core_subscription',
|
|
'columns': ['tenant_id', 'status'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Tenant subscription status queries'
|
|
},
|
|
{
|
|
'table': 'core_auditlog',
|
|
'columns': ['tenant_id', 'created_at'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Tenant audit log queries'
|
|
},
|
|
{
|
|
'table': 'core_notification',
|
|
'columns': ['tenant_id', 'status'],
|
|
'type': IndexType.BTREE,
|
|
'reason': 'Tenant notification status queries'
|
|
}
|
|
]
|
|
|
|
for index_config in tenant_indexes:
|
|
try:
|
|
index_name = self.create_index(
|
|
table_name=index_config['table'],
|
|
columns=index_config['columns'],
|
|
index_type=index_config['type']
|
|
)
|
|
created_indexes.append(index_name)
|
|
logger.info(f"Created multi-tenant index: {index_name} - {index_config['reason']}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to create multi-tenant index for {index_config['table']}: {e}")
|
|
|
|
return created_indexes
|
|
|
|
def get_index_statistics(self) -> Dict[str, Any]:
|
|
"""
|
|
Get comprehensive index statistics.
|
|
|
|
Returns:
|
|
Dictionary with index statistics
|
|
"""
|
|
indexes = self.get_all_indexes()
|
|
|
|
stats = {
|
|
'total_indexes': len(indexes),
|
|
'total_size_bytes': sum(idx.size_bytes for idx in indexes),
|
|
'total_size_formatted': self._format_bytes(sum(idx.size_bytes for idx in indexes)),
|
|
'index_types': {},
|
|
'status_distribution': {},
|
|
'unused_count': len([idx for idx in indexes if idx.usage_count == 0]),
|
|
'high_usage_count': len([idx for idx in indexes if idx.usage_count > 1000]),
|
|
'large_indexes': [idx.name for idx in indexes if idx.size_bytes > 100 * 1024 * 1024], # > 100MB
|
|
'management_stats': self.stats.copy()
|
|
}
|
|
|
|
# Count by index type
|
|
for idx in indexes:
|
|
idx_type = idx.index_type.value
|
|
stats['index_types'][idx_type] = stats['index_types'].get(idx_type, 0) + 1
|
|
|
|
# Count by status
|
|
for idx in indexes:
|
|
status = idx.status.value
|
|
stats['status_distribution'][status] = stats['status_distribution'].get(status, 0) + 1
|
|
|
|
return stats
|
|
|
|
def _format_bytes(self, bytes_value: int) -> str:
|
|
"""Format bytes to human readable format."""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if bytes_value < 1024.0:
|
|
return f"{bytes_value:.2f} {unit}"
|
|
bytes_value /= 1024.0
|
|
return f"{bytes_value:.2f} PB"
|
|
|
|
def execute_recommendations(self, recommendations: List[IndexRecommendation],
|
|
dry_run: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Execute index recommendations.
|
|
|
|
Args:
|
|
recommendations: List of index recommendations
|
|
dry_run: If True, only show what would be done
|
|
|
|
Returns:
|
|
Dictionary with execution results
|
|
"""
|
|
results = {
|
|
'executed': 0,
|
|
'failed': 0,
|
|
'skipped': 0,
|
|
'details': []
|
|
}
|
|
|
|
for rec in recommendations:
|
|
try:
|
|
if dry_run:
|
|
results['details'].append(f"[DRY RUN] Would {rec.action} index for {rec.table_name}")
|
|
results['skipped'] += 1
|
|
continue
|
|
|
|
if rec.action == "create":
|
|
index_name = self.create_index(
|
|
table_name=rec.table_name,
|
|
columns=rec.columns,
|
|
index_type=rec.index_type
|
|
)
|
|
results['details'].append(f"Created index {index_name}")
|
|
results['executed'] += 1
|
|
|
|
elif rec.action == "drop":
|
|
if rec.index_name:
|
|
success = self.drop_index(rec.index_name)
|
|
if success:
|
|
results['details'].append(f"Dropped index {rec.index_name}")
|
|
results['executed'] += 1
|
|
else:
|
|
results['details'].append(f"Failed to drop index {rec.index_name}")
|
|
results['failed'] += 1
|
|
|
|
elif rec.action == "rebuild":
|
|
if rec.index_name:
|
|
success = self.rebuild_index(rec.index_name)
|
|
if success:
|
|
results['details'].append(f"Rebuilt index {rec.index_name}")
|
|
results['executed'] += 1
|
|
else:
|
|
results['details'].append(f"Failed to rebuild index {rec.index_name}")
|
|
results['failed'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to execute recommendation for {rec.table_name}: {e}"
|
|
results['details'].append(error_msg)
|
|
results['failed'] += 1
|
|
logger.error(error_msg)
|
|
|
|
return results
|
|
|
|
def maintenance_mode(self, actions: List[str]) -> Dict[str, Any]:
|
|
"""
|
|
Perform index maintenance operations.
|
|
|
|
Args:
|
|
actions: List of maintenance actions to perform
|
|
|
|
Returns:
|
|
Dictionary with maintenance results
|
|
"""
|
|
results = {
|
|
'actions_completed': 0,
|
|
'errors': [],
|
|
'summary': {}
|
|
}
|
|
|
|
for action in actions:
|
|
try:
|
|
if action == "analyze":
|
|
self._run_analyze()
|
|
results['summary']['analyze'] = "Completed"
|
|
|
|
elif action == "reindex_all":
|
|
self._reindex_all()
|
|
results['summary']['reindex_all'] = "Completed"
|
|
|
|
elif action == "cleanup_unused":
|
|
unused_count = self._cleanup_unused_indexes()
|
|
results['summary']['cleanup_unused'] = f"Removed {unused_count} unused indexes"
|
|
|
|
elif action == "update_stats":
|
|
self._update_statistics()
|
|
results['summary']['update_stats'] = "Completed"
|
|
|
|
results['actions_completed'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to perform {action}: {e}"
|
|
results['errors'].append(error_msg)
|
|
logger.error(error_msg)
|
|
|
|
return results
|
|
|
|
def _run_analyze(self):
|
|
"""Run ANALYZE on all tables."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("ANALYZE VERBOSE")
|
|
logger.info("Database analyze completed")
|
|
|
|
def _reindex_all(self):
|
|
"""Reindex all indexes in the database."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("REINDEX DATABASE")
|
|
logger.info("Database reindex completed")
|
|
|
|
def _cleanup_unused_indexes(self) -> int:
|
|
"""Remove unused indexes."""
|
|
performance_analysis = self.analyze_index_performance()
|
|
unused_recommendations = [r for r in performance_analysis['recommendations']
|
|
if r.action == "drop"]
|
|
|
|
if unused_recommendations:
|
|
results = self.execute_recommendations(unused_recommendations)
|
|
return len([r for r in results['details'] if "Dropped" in r])
|
|
|
|
return 0
|
|
|
|
def _update_statistics(self):
|
|
"""Update database statistics."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("VACUUM ANALYZE")
|
|
logger.info("Database statistics updated")
|
|
|
|
|
|
# Export main classes and functions
|
|
__all__ = [
|
|
'IndexManager',
|
|
'IndexType',
|
|
'IndexStatus',
|
|
'IndexInfo',
|
|
'IndexRecommendation',
|
|
] |