Some checks failed
System Monitoring / Health Checks (push) Has been cancelled
System Monitoring / Performance Monitoring (push) Has been cancelled
System Monitoring / Database Monitoring (push) Has been cancelled
System Monitoring / Cache Monitoring (push) Has been cancelled
System Monitoring / Log Monitoring (push) Has been cancelled
System Monitoring / Resource Monitoring (push) Has been cancelled
System Monitoring / Uptime Monitoring (push) Has been cancelled
System Monitoring / Backup Monitoring (push) Has been cancelled
System Monitoring / Security Monitoring (push) Has been cancelled
System Monitoring / Monitoring Dashboard (push) Has been cancelled
System Monitoring / Alerting (push) Has been cancelled
Security Scanning / Dependency Scanning (push) Has been cancelled
Security Scanning / Code Security Scanning (push) Has been cancelled
Security Scanning / Secrets Scanning (push) Has been cancelled
Security Scanning / Container Security Scanning (push) Has been cancelled
Security Scanning / Compliance Checking (push) Has been cancelled
Security Scanning / Security Dashboard (push) Has been cancelled
Security Scanning / Security Remediation (push) Has been cancelled
709 lines
25 KiB
Python
709 lines
25 KiB
Python
"""
|
|
Prometheus exporters for various system and application metrics.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
import threading
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, timedelta
|
|
from django.db import connection, connections
|
|
from django.core.cache import cache
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.db.models import Count, Q, Avg
|
|
from django.utils import timezone
|
|
from django_tenants.utils import get_tenant_model, get_tenant_schema_name
|
|
from prometheus_client import Gauge, Counter, Histogram, Info, start_http_server
|
|
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
|
|
import psutil
|
|
import redis
|
|
|
|
from .middleware import (
|
|
DATABASE_QUERIES, CACHE_HITS, CACHE_MISSES, MALAYSIAN_OPERATIONS,
|
|
TENANT_METRICS, BUSINESS_METRICS, ERROR_EVENTS
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
TenantModel = get_tenant_model()
|
|
|
|
class DatabaseExporter:
|
|
"""Exporter for database metrics."""
|
|
|
|
def __init__(self):
|
|
self.metrics = {
|
|
'database_size': Gauge(
|
|
'database_size_bytes',
|
|
'Database size in bytes',
|
|
['database', 'tenant']
|
|
),
|
|
'database_connections': Gauge(
|
|
'database_connections_current',
|
|
'Current database connections',
|
|
['state', 'tenant']
|
|
),
|
|
'database_transactions': Counter(
|
|
'database_transactions_total',
|
|
'Database transactions',
|
|
['type', 'tenant']
|
|
),
|
|
'database_query_time': Histogram(
|
|
'database_query_duration_seconds',
|
|
'Database query duration',
|
|
['query_type', 'tenant']
|
|
),
|
|
'database_deadlocks': Counter(
|
|
'database_deadlocks_total',
|
|
'Database deadlocks',
|
|
['tenant']
|
|
),
|
|
'database_cache_hit_ratio': Gauge(
|
|
'database_cache_hit_ratio',
|
|
'Database cache hit ratio',
|
|
['tenant']
|
|
),
|
|
}
|
|
|
|
def collect_metrics(self):
|
|
"""Collect database metrics."""
|
|
try:
|
|
self._collect_database_size()
|
|
self._collect_connection_metrics()
|
|
self._collect_transaction_metrics()
|
|
self._collect_performance_metrics()
|
|
self._collect_deadlock_metrics()
|
|
except Exception as e:
|
|
logger.error(f"Error collecting database metrics: {e}")
|
|
|
|
def _collect_database_size(self):
|
|
"""Collect database size metrics."""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT datname, pg_database_size(datname) as size
|
|
FROM pg_database
|
|
WHERE datistemplate = false
|
|
""")
|
|
for row in cursor.fetchall():
|
|
db_name, size = row
|
|
self.metrics['database_size'].labels(
|
|
database=db_name,
|
|
tenant='all'
|
|
).set(size)
|
|
except Exception as e:
|
|
logger.error(f"Error collecting database size: {e}")
|
|
|
|
def _collect_connection_metrics(self):
|
|
"""Collect connection metrics."""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
# Current connections
|
|
cursor.execute("""
|
|
SELECT state, COUNT(*)
|
|
FROM pg_stat_activity
|
|
WHERE pid <> pg_backend_pid()
|
|
GROUP BY state
|
|
""")
|
|
for state, count in cursor.fetchall():
|
|
self.metrics['database_connections'].labels(
|
|
state=state or 'idle',
|
|
tenant='all'
|
|
).set(count)
|
|
|
|
# Max connections
|
|
cursor.execute("SHOW max_connections")
|
|
max_connections = cursor.fetchone()[0]
|
|
self.metrics['database_connections'].labels(
|
|
state='max',
|
|
tenant='all'
|
|
).set(max_connections)
|
|
except Exception as e:
|
|
logger.error(f"Error collecting connection metrics: {e}")
|
|
|
|
def _collect_transaction_metrics(self):
|
|
"""Collect transaction metrics."""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT datname, xact_commit, xact_rollback
|
|
FROM pg_stat_database
|
|
""")
|
|
for db_name, commits, rollbacks in cursor.fetchall():
|
|
self.metrics['database_transactions'].labels(
|
|
type='commit',
|
|
tenant=db_name
|
|
)._value._value.set(commits)
|
|
self.metrics['database_transactions'].labels(
|
|
type='rollback',
|
|
tenant=db_name
|
|
)._value._value.set(rollbacks)
|
|
except Exception as e:
|
|
logger.error(f"Error collecting transaction metrics: {e}")
|
|
|
|
def _collect_performance_metrics(self):
|
|
"""Collect performance metrics."""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
# Query performance
|
|
cursor.execute("""
|
|
SELECT query, calls, total_time, mean_time, rows
|
|
FROM pg_stat_statements
|
|
ORDER BY total_time DESC
|
|
LIMIT 100
|
|
""")
|
|
for query, calls, total_time, mean_time, rows in cursor.fetchall():
|
|
query_type = self._classify_query(query)
|
|
self.metrics['database_query_time'].labels(
|
|
query_type=query_type,
|
|
tenant='all'
|
|
).observe(mean_time / 1000) # Convert to seconds
|
|
|
|
# Cache hit ratio
|
|
cursor.execute("""
|
|
SELECT sum(blks_hit) / (sum(blks_hit) + sum(blks_read)) as hit_ratio
|
|
FROM pg_stat_database
|
|
""")
|
|
hit_ratio = cursor.fetchone()[0]
|
|
if hit_ratio:
|
|
self.metrics['database_cache_hit_ratio'].labels(
|
|
tenant='all'
|
|
).set(hit_ratio * 100)
|
|
except Exception as e:
|
|
logger.error(f"Error collecting performance metrics: {e}")
|
|
|
|
def _collect_deadlock_metrics(self):
|
|
"""Collect deadlock metrics."""
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("""
|
|
SELECT datname, deadlocks
|
|
FROM pg_stat_database
|
|
""")
|
|
for db_name, deadlocks in cursor.fetchall():
|
|
if deadlocks > 0:
|
|
self.metrics['database_deadlocks'].labels(
|
|
tenant=db_name
|
|
)._value._value.set(deadlocks)
|
|
except Exception as e:
|
|
logger.error(f"Error collecting deadlock metrics: {e}")
|
|
|
|
def _classify_query(self, query: str) -> str:
|
|
"""Classify SQL query type."""
|
|
query_upper = query.upper()
|
|
if query_upper.startswith('SELECT'):
|
|
return 'select'
|
|
elif query_upper.startswith('INSERT'):
|
|
return 'insert'
|
|
elif query_upper.startswith('UPDATE'):
|
|
return 'update'
|
|
elif query_upper.startswith('DELETE'):
|
|
return 'delete'
|
|
elif query_upper.startswith('CREATE'):
|
|
return 'ddl'
|
|
elif query_upper.startswith('ALTER'):
|
|
return 'ddl'
|
|
elif query_upper.startswith('DROP'):
|
|
return 'ddl'
|
|
else:
|
|
return 'other'
|
|
|
|
|
|
class CacheExporter:
|
|
"""Exporter for cache metrics."""
|
|
|
|
def __init__(self):
|
|
self.metrics = {
|
|
'cache_size': Gauge(
|
|
'cache_size_bytes',
|
|
'Cache size in bytes',
|
|
['cache_type', 'tenant']
|
|
),
|
|
'cache_items': Gauge(
|
|
'cache_items_total',
|
|
'Total items in cache',
|
|
['cache_type', 'tenant']
|
|
),
|
|
'cache_operations': Counter(
|
|
'cache_operations_total',
|
|
'Cache operations',
|
|
['operation', 'cache_type', 'tenant']
|
|
),
|
|
'cache_hit_ratio': Gauge(
|
|
'cache_hit_ratio_percent',
|
|
'Cache hit ratio percentage',
|
|
['cache_type', 'tenant']
|
|
),
|
|
'cache_evictions': Counter(
|
|
'cache_evictions_total',
|
|
'Cache evictions',
|
|
['cache_type', 'tenant']
|
|
),
|
|
'cache_memory_usage': Gauge(
|
|
'cache_memory_usage_bytes',
|
|
'Cache memory usage',
|
|
['cache_type', 'tenant']
|
|
),
|
|
}
|
|
|
|
def collect_metrics(self):
|
|
"""Collect cache metrics."""
|
|
try:
|
|
self._collect_redis_metrics()
|
|
self._collect_django_cache_metrics()
|
|
except Exception as e:
|
|
logger.error(f"Error collecting cache metrics: {e}")
|
|
|
|
def _collect_redis_metrics(self):
|
|
"""Collect Redis metrics."""
|
|
try:
|
|
redis_client = redis.Redis.from_url(settings.REDIS_URL)
|
|
info = redis_client.info()
|
|
|
|
# Memory usage
|
|
self.metrics['cache_memory_usage'].labels(
|
|
cache_type='redis',
|
|
tenant='all'
|
|
).set(info['used_memory'])
|
|
|
|
# Key count
|
|
self.metrics['cache_items'].labels(
|
|
cache_type='redis',
|
|
tenant='all'
|
|
).set(info['keyspace_hits'] + info['keyspace_misses'])
|
|
|
|
# Hit ratio
|
|
total = info['keyspace_hits'] + info['keyspace_misses']
|
|
if total > 0:
|
|
hit_ratio = (info['keyspace_hits'] / total) * 100
|
|
self.metrics['cache_hit_ratio'].labels(
|
|
cache_type='redis',
|
|
tenant='all'
|
|
).set(hit_ratio)
|
|
|
|
# Operations
|
|
self.metrics['cache_operations'].labels(
|
|
operation='get',
|
|
cache_type='redis',
|
|
tenant='all'
|
|
)._value._value.set(info['keyspace_hits'] + info['keyspace_misses'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting Redis metrics: {e}")
|
|
|
|
def _collect_django_cache_metrics(self):
|
|
"""Collect Django cache metrics."""
|
|
try:
|
|
# Get Django cache stats
|
|
cache_stats = cache.get_stats()
|
|
|
|
for backend_name, stats in cache_stats.items():
|
|
if 'hits' in stats and 'misses' in stats:
|
|
total = stats['hits'] + stats['misses']
|
|
if total > 0:
|
|
hit_ratio = (stats['hits'] / total) * 100
|
|
self.metrics['cache_hit_ratio'].labels(
|
|
cache_type='django',
|
|
tenant='all'
|
|
).set(hit_ratio)
|
|
|
|
self.metrics['cache_operations'].labels(
|
|
operation='get',
|
|
cache_type='django',
|
|
tenant='all'
|
|
)._value._value.set(total)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting Django cache metrics: {e}")
|
|
|
|
|
|
class SystemExporter:
|
|
"""Exporter for system metrics."""
|
|
|
|
def __init__(self):
|
|
self.metrics = {
|
|
'system_cpu_usage': Gauge(
|
|
'system_cpu_usage_percent',
|
|
'System CPU usage percentage'
|
|
),
|
|
'system_memory_usage': Gauge(
|
|
'system_memory_usage_bytes',
|
|
'System memory usage'
|
|
),
|
|
'system_memory_usage_percent': Gauge(
|
|
'system_memory_usage_percent',
|
|
'System memory usage percentage'
|
|
),
|
|
'system_disk_usage': Gauge(
|
|
'system_disk_usage_bytes',
|
|
'System disk usage',
|
|
['device', 'mountpoint']
|
|
),
|
|
'system_disk_usage_percent': Gauge(
|
|
'system_disk_usage_percent',
|
|
'System disk usage percentage',
|
|
['device', 'mountpoint']
|
|
),
|
|
'system_network_bytes': Counter(
|
|
'system_network_bytes_total',
|
|
'System network traffic',
|
|
['direction', 'interface']
|
|
),
|
|
'system_load_average': Gauge(
|
|
'system_load_average',
|
|
'System load average',
|
|
['period']
|
|
),
|
|
'system_uptime': Gauge(
|
|
'system_uptime_seconds',
|
|
'System uptime in seconds'
|
|
),
|
|
}
|
|
|
|
def collect_metrics(self):
|
|
"""Collect system metrics."""
|
|
try:
|
|
self._collect_cpu_metrics()
|
|
self._collect_memory_metrics()
|
|
self._collect_disk_metrics()
|
|
self._collect_network_metrics()
|
|
self._collect_load_metrics()
|
|
except Exception as e:
|
|
logger.error(f"Error collecting system metrics: {e}")
|
|
|
|
def _collect_cpu_metrics(self):
|
|
"""Collect CPU metrics."""
|
|
try:
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
self.metrics['system_cpu_usage'].set(cpu_percent)
|
|
|
|
# Per-CPU usage
|
|
cpu_times = psutil.cpu_times_percent(interval=1)
|
|
for i, (cpu_id, percent) in enumerate(psutil.cpu_percent(interval=1, percpu=True)):
|
|
self.metrics['system_cpu_usage'].labels(cpu=f'cpu_{i}').set(percent)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting CPU metrics: {e}")
|
|
|
|
def _collect_memory_metrics(self):
|
|
"""Collect memory metrics."""
|
|
try:
|
|
memory = psutil.virtual_memory()
|
|
self.metrics['system_memory_usage'].set(memory.used)
|
|
self.metrics['system_memory_usage_percent'].set(memory.percent)
|
|
|
|
# Swap memory
|
|
swap = psutil.swap_memory()
|
|
self.metrics['system_memory_usage'].labels(type='swap').set(swap.used)
|
|
self.metrics['system_memory_usage_percent'].labels(type='swap').set(swap.percent)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting memory metrics: {e}")
|
|
|
|
def _collect_disk_metrics(self):
|
|
"""Collect disk metrics."""
|
|
try:
|
|
disk_usage = psutil.disk_usage('/')
|
|
self.metrics['system_disk_usage'].labels(
|
|
device='root',
|
|
mountpoint='/'
|
|
).set(disk_usage.used)
|
|
self.metrics['system_disk_usage_percent'].labels(
|
|
device='root',
|
|
mountpoint='/'
|
|
).set((disk_usage.used / disk_usage.total) * 100)
|
|
|
|
# Disk I/O
|
|
disk_io = psutil.disk_io_counters()
|
|
if disk_io:
|
|
self.metrics['system_network_bytes'].labels(
|
|
direction='read',
|
|
interface='disk'
|
|
)._value._value.set(disk_io.read_bytes)
|
|
self.metrics['system_network_bytes'].labels(
|
|
direction='write',
|
|
interface='disk'
|
|
)._value._value.set(disk_io.write_bytes)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting disk metrics: {e}")
|
|
|
|
def _collect_network_metrics(self):
|
|
"""Collect network metrics."""
|
|
try:
|
|
net_io = psutil.net_io_counters()
|
|
if net_io:
|
|
self.metrics['system_network_bytes'].labels(
|
|
direction='recv',
|
|
interface='all'
|
|
)._value._value.set(net_io.bytes_recv)
|
|
self.metrics['system_network_bytes'].labels(
|
|
direction='sent',
|
|
interface='all'
|
|
)._value._value.set(net_io.bytes_sent)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting network metrics: {e}")
|
|
|
|
def _collect_load_metrics(self):
|
|
"""Collect load average metrics."""
|
|
try:
|
|
load_avg = psutil.getloadavg()
|
|
self.metrics['system_load_average'].labels(period='1min').set(load_avg[0])
|
|
self.metrics['system_load_average'].labels(period='5min').set(load_avg[1])
|
|
self.metrics['system_load_average'].labels(period='15min').set(load_avg[2])
|
|
|
|
# System uptime
|
|
self.metrics['system_uptime'].set(time.time() - psutil.boot_time())
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting load metrics: {e}")
|
|
|
|
|
|
class BusinessExporter:
|
|
"""Exporter for business metrics."""
|
|
|
|
def __init__(self):
|
|
self.metrics = {
|
|
'active_users': Gauge(
|
|
'business_active_users',
|
|
'Number of active users',
|
|
['tenant', 'industry_type']
|
|
),
|
|
'user_registrations': Counter(
|
|
'business_user_registrations_total',
|
|
'User registrations',
|
|
['tenant', 'period']
|
|
),
|
|
'revenue': Counter(
|
|
'business_revenue_myr_total',
|
|
'Revenue in Malaysian Ringgit',
|
|
['tenant', 'industry_type']
|
|
),
|
|
'transactions': Counter(
|
|
'business_transactions_total',
|
|
'Business transactions',
|
|
['status', 'tenant', 'payment_method']
|
|
),
|
|
'tenant_resource_usage': Gauge(
|
|
'business_tenant_resource_usage_percent',
|
|
'Tenant resource usage percentage',
|
|
['tenant', 'resource_type']
|
|
),
|
|
'malaysian_specific': Counter(
|
|
'business_malaysian_operations_total',
|
|
'Malaysian-specific operations',
|
|
['operation', 'state', 'tenant']
|
|
),
|
|
}
|
|
|
|
def collect_metrics(self):
|
|
"""Collect business metrics."""
|
|
try:
|
|
self._collect_user_metrics()
|
|
self._collect_revenue_metrics()
|
|
self._collect_transaction_metrics()
|
|
self._collect_tenant_metrics()
|
|
self._collect_malaysian_metrics()
|
|
except Exception as e:
|
|
logger.error(f"Error collecting business metrics: {e}")
|
|
|
|
def _collect_user_metrics(self):
|
|
"""Collect user metrics."""
|
|
try:
|
|
# Active users (last 5 minutes)
|
|
five_minutes_ago = timezone.now() - timedelta(minutes=5)
|
|
active_count = User.objects.filter(
|
|
last_login__gte=five_minutes_ago,
|
|
is_active=True
|
|
).count()
|
|
|
|
self.metrics['active_users'].labels(
|
|
tenant='all',
|
|
industry_type='all'
|
|
).set(active_count)
|
|
|
|
# User registrations by period
|
|
today = timezone.now().date()
|
|
week_ago = today - timedelta(days=7)
|
|
month_ago = today - timedelta(days=30)
|
|
|
|
registrations_today = User.objects.filter(
|
|
date_joined__date=today
|
|
).count()
|
|
|
|
registrations_week = User.objects.filter(
|
|
date_joined__date__gte=week_ago
|
|
).count()
|
|
|
|
registrations_month = User.objects.filter(
|
|
date_joined__date__gte=month_ago
|
|
).count()
|
|
|
|
self.metrics['user_registrations'].labels(
|
|
tenant='all',
|
|
period='today'
|
|
)._value._value.set(registrations_today)
|
|
self.metrics['user_registrations'].labels(
|
|
tenant='all',
|
|
period='week'
|
|
)._value._value.set(registrations_week)
|
|
self.metrics['user_registrations'].labels(
|
|
tenant='all',
|
|
period='month'
|
|
)._value._value.set(registrations_month)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting user metrics: {e}")
|
|
|
|
def _collect_revenue_metrics(self):
|
|
"""Collect revenue metrics."""
|
|
try:
|
|
# This would integrate with your payment system
|
|
# For now, we'll use placeholder values
|
|
from core.models import Transaction
|
|
|
|
today = timezone.now().date()
|
|
today_revenue = Transaction.objects.filter(
|
|
created_at__date=today,
|
|
status='completed'
|
|
).aggregate(total=Sum('amount'))['total'] or 0
|
|
|
|
self.metrics['revenue'].labels(
|
|
tenant='all',
|
|
industry_type='all'
|
|
)._value._value.set(today_revenue)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting revenue metrics: {e}")
|
|
|
|
def _collect_transaction_metrics(self):
|
|
"""Collect transaction metrics."""
|
|
try:
|
|
from core.models import Transaction
|
|
|
|
# Transaction counts by status
|
|
status_counts = Transaction.objects.values('status').annotate(
|
|
count=Count('id')
|
|
)
|
|
|
|
for item in status_counts:
|
|
self.metrics['transactions'].labels(
|
|
status=item['status'],
|
|
tenant='all',
|
|
payment_method='all'
|
|
)._value._value.set(item['count'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting transaction metrics: {e}")
|
|
|
|
def _collect_tenant_metrics(self):
|
|
"""Collect tenant metrics."""
|
|
try:
|
|
tenants = TenantModel.objects.all()
|
|
|
|
for tenant in tenants:
|
|
# Tenant resource usage (placeholder)
|
|
self.metrics['tenant_resource_usage'].labels(
|
|
tenant=tenant.name,
|
|
resource_type='storage'
|
|
).set(50) # Placeholder value
|
|
|
|
# Tenant active users
|
|
active_users = User.objects.filter(
|
|
tenant=tenant,
|
|
is_active=True,
|
|
last_login__gte=timezone.now() - timedelta(minutes=30)
|
|
).count()
|
|
|
|
self.metrics['active_users'].labels(
|
|
tenant=tenant.name,
|
|
industry_type=getattr(tenant, 'industry_type', 'general')
|
|
).set(active_users)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting tenant metrics: {e}")
|
|
|
|
def _collect_malaysian_metrics(self):
|
|
"""Collect Malaysian-specific metrics."""
|
|
try:
|
|
from core.models import MalaysianICValidation, SSTCalculation
|
|
|
|
# IC validations by state
|
|
ic_validations = MalaysianICValidation.objects.values(
|
|
'state'
|
|
).annotate(count=Count('id'))
|
|
|
|
for item in ic_validations:
|
|
self.metrics['malaysian_specific'].labels(
|
|
operation='ic_validation',
|
|
state=item['state'],
|
|
tenant='all'
|
|
)._value._value.set(item['count'])
|
|
|
|
# SST calculations
|
|
sst_calculations = SSTCalculation.objects.count()
|
|
self.metrics['malaysian_specific'].labels(
|
|
operation='sst_calculation',
|
|
state='all',
|
|
tenant='all'
|
|
)._value._value.set(sst_calculations)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting Malaysian metrics: {e}")
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Main metrics collector that runs all exporters."""
|
|
|
|
def __init__(self):
|
|
self.exporters = {
|
|
'database': DatabaseExporter(),
|
|
'cache': CacheExporter(),
|
|
'system': SystemExporter(),
|
|
'business': BusinessExporter(),
|
|
}
|
|
self.running = False
|
|
self.thread = None
|
|
|
|
def start_collection(self, interval: int = 30):
|
|
"""Start metrics collection in background thread."""
|
|
if not self.running:
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._collect_loop, args=(interval,))
|
|
self.thread.daemon = True
|
|
self.thread.start()
|
|
logger.info("Metrics collection started")
|
|
|
|
def stop_collection(self):
|
|
"""Stop metrics collection."""
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join()
|
|
logger.info("Metrics collection stopped")
|
|
|
|
def _collect_loop(self, interval: int):
|
|
"""Main collection loop."""
|
|
while self.running:
|
|
try:
|
|
for name, exporter in self.exporters.items():
|
|
logger.debug(f"Collecting {name} metrics...")
|
|
exporter.collect_metrics()
|
|
|
|
time.sleep(interval)
|
|
except Exception as e:
|
|
logger.error(f"Error in metrics collection loop: {e}")
|
|
time.sleep(interval)
|
|
|
|
def collect_once(self):
|
|
"""Collect metrics once (for testing)."""
|
|
for name, exporter in self.exporters.items():
|
|
try:
|
|
logger.debug(f"Collecting {name} metrics...")
|
|
exporter.collect_metrics()
|
|
except Exception as e:
|
|
logger.error(f"Error collecting {name} metrics: {e}")
|
|
|
|
|
|
# Global metrics collector instance
|
|
metrics_collector = MetricsCollector() |