Skip to content

Centralized Error Tracking & Alert Center System Design

14.1 Overview

The Centralized Error Tracking & Alert Center represents the critical operational monitoring component that provides unified error tracking, real-time alerting, and incident response capabilities across all microservices in the quantitative trading system. This system ensures comprehensive error visibility, proactive alerting, and systematic incident management for institutional-grade operational excellence.

🎯 Core Capabilities

Capability Description
Unified Error Collection Centralized error collection from all microservices
Real-time Alerting Instant notifications via Telegram, email, Slack
Error Classification Automatic error categorization and severity assessment
Trend Analysis Error pattern analysis and trend monitoring
Incident Response Systematic incident tracking and resolution
Performance Monitoring Service health and performance tracking

14.2 System Architecture

14.2.1 Error Monitor Center Microservice Design

New Microservice: error-monitor-center

services/error-monitor-center/
├── src/
│   ├── main.py                            # FastAPI application entry point
│   ├── collector/
│   │   ├── error_listener.py              # Error event listener and collector
│   │   ├── error_parser.py                # Error parsing and classification
│   │   ├── error_validator.py             # Error validation and filtering
│   │   └── error_enricher.py              # Error enrichment with context
│   ├── storage/
│   │   ├── error_storage.py               # Error data storage and retrieval
│   │   ├── error_indexer.py               # Error indexing for fast queries
│   │   ├── error_aggregator.py            # Error aggregation and statistics
│   │   └── error_cleanup.py               # Error data cleanup and retention
│   ├── notifier/
│   │   ├── telegram_notifier.py           # Telegram alert notifications
│   │   ├── email_notifier.py              # Email alert notifications
│   │   ├── slack_notifier.py              # Slack alert notifications
│   │   ├── alert_manager.py               # Alert routing and management
│   │   └── alert_template.py              # Alert message templates
│   ├── analyzer/
│   │   ├── error_analyzer.py              # Error pattern analysis
│   │   ├── trend_analyzer.py              # Error trend analysis
│   │   ├── impact_analyzer.py             # Error impact assessment
│   │   └── correlation_analyzer.py        # Error correlation analysis
│   ├── api/
│   │   ├── error_api.py                   # Error query and management endpoints
│   │   ├── alert_api.py                   # Alert configuration endpoints
│   │   ├── dashboard_api.py               # Dashboard data endpoints
│   │   └── incident_api.py                # Incident management endpoints
│   ├── models/
│   │   ├── error_model.py                 # Error data models
│   │   ├── alert_model.py                 # Alert configuration models
│   │   ├── incident_model.py              # Incident tracking models
│   │   └── dashboard_model.py             # Dashboard data models
│   ├── config.py                          # Configuration management
│   └── requirements.txt                   # Python dependencies
├── Dockerfile                             # Container definition
└── docker-compose.yml                     # Local development setup

14.2.2 Error Monitoring Architecture Layers

Layer 1: Error Collection - Error Capture: Standardized error capture across microservices - Error Parsing: Error parsing and classification - Error Validation: Error validation and filtering - Error Enrichment: Error context enrichment

Layer 2: Error Storage - Error Storage: Centralized error data storage - Error Indexing: Fast error query indexing - Error Aggregation: Error statistics and aggregation - Data Retention: Error data retention and cleanup

Layer 3: Alert Management - Alert Detection: Real-time alert detection - Alert Routing: Multi-channel alert routing - Alert Templates: Standardized alert templates - Alert History: Alert history and tracking

Layer 4: Analysis & Response - Error Analysis: Error pattern and trend analysis - Impact Assessment: Error impact assessment - Incident Management: Systematic incident tracking - Performance Monitoring: Service health monitoring

14.3 Core Components Design

14.3.1 Error Collection Module

Purpose: Standardized error capture and collection across all microservices

Key Functions: - Error Capture: Standardized error capture mechanism - Error Classification: Automatic error categorization - Error Enrichment: Context enrichment for errors - Error Validation: Error validation and filtering

Standardized Error Capture Implementation:

import logging
import traceback
import json
from datetime import datetime
from typing import Dict, Any, Optional
import asyncio
import aio_pika

class ErrorCapture:
    def __init__(self, service_name: str, error_queue: str = "error.events"):
        self.service_name = service_name
        self.error_queue = error_queue
        self.logger = logging.getLogger(f"error_capture_{service_name}")

        # Error severity levels
        self.severity_levels = {
            "CRITICAL": 1,
            "ERROR": 2,
            "WARNING": 3,
            "INFO": 4,
            "DEBUG": 5
        }

    async def capture_exception(self, exception: Exception, context: Dict = None, 
                               severity: str = "ERROR") -> Dict[str, Any]:
        """Capture exception with context and send to error monitor"""
        error_info = {
            "service": self.service_name,
            "error_type": type(exception).__name__,
            "message": str(exception),
            "traceback": traceback.format_exc(),
            "severity": severity,
            "severity_level": self.severity_levels.get(severity, 2),
            "timestamp": datetime.utcnow().isoformat(),
            "context": context or {},
            "error_id": self._generate_error_id()
        }

        # Log error locally
        self.logger.error(f"Error captured: {error_info['error_type']} - {error_info['message']}")

        # Send to error monitor
        await self._send_error_to_monitor(error_info)

        return error_info

    async def capture_error(self, error_type: str, message: str, 
                           severity: str = "ERROR", context: Dict = None) -> Dict[str, Any]:
        """Capture custom error without exception"""
        error_info = {
            "service": self.service_name,
            "error_type": error_type,
            "message": message,
            "traceback": None,
            "severity": severity,
            "severity_level": self.severity_levels.get(severity, 2),
            "timestamp": datetime.utcnow().isoformat(),
            "context": context or {},
            "error_id": self._generate_error_id()
        }

        # Log error locally
        self.logger.error(f"Error captured: {error_type} - {message}")

        # Send to error monitor
        await self._send_error_to_monitor(error_info)

        return error_info

    async def _send_error_to_monitor(self, error_info: Dict[str, Any]):
        """Send error to centralized error monitor"""
        try:
            # Connect to message queue (NATS/RabbitMQ)
            connection = await aio_pika.connect_robust("amqp://localhost/")
            channel = await connection.channel()

            # Publish error to queue
            await channel.default_exchange.publish(
                aio_pika.Message(
                    body=json.dumps(error_info).encode(),
                    content_type="application/json"
                ),
                routing_key=self.error_queue
            )

            await connection.close()

        except Exception as e:
            self.logger.error(f"Failed to send error to monitor: {e}")

    def _generate_error_id(self) -> str:
        """Generate unique error ID"""
        import uuid
        return str(uuid.uuid4())

    def set_context(self, **kwargs):
        """Set default context for error capture"""
        self.default_context = kwargs

    async def capture_performance_issue(self, metric: str, value: float, 
                                       threshold: float, context: Dict = None):
        """Capture performance-related issues"""
        if value > threshold:
            await self.capture_error(
                error_type="PERFORMANCE_THRESHOLD_EXCEEDED",
                message=f"Performance threshold exceeded: {metric} = {value} (threshold: {threshold})",
                severity="WARNING",
                context=context or {}
            )

    async def capture_health_check_failure(self, service: str, endpoint: str, 
                                          status_code: int, response_time: float):
        """Capture health check failures"""
        await self.capture_error(
            error_type="HEALTH_CHECK_FAILURE",
            message=f"Health check failed for {service} at {endpoint}",
            severity="ERROR" if status_code >= 500 else "WARNING",
            context={
                "service": service,
                "endpoint": endpoint,
                "status_code": status_code,
                "response_time": response_time
            }
        )

14.3.2 Error Listener Module

Purpose: Listen and collect errors from all microservices

Key Functions: - Error Reception: Receive errors from all microservices - Error Parsing: Parse and validate error messages - Error Classification: Classify errors by type and severity - Error Routing: Route errors to appropriate handlers

Error Listener Implementation:

import asyncio
import json
import aio_pika
from typing import Dict, List, Callable
from datetime import datetime
import logging

class ErrorListener:
    def __init__(self, queue_name: str = "error.events"):
        self.queue_name = queue_name
        self.logger = logging.getLogger("error_listener")
        self.error_handlers: List[Callable] = []
        self.error_buffer: List[Dict] = []
        self.max_buffer_size = 1000

        # Error statistics
        self.error_stats = {
            "total_errors": 0,
            "errors_by_service": {},
            "errors_by_type": {},
            "errors_by_severity": {},
            "last_error_time": None
        }

    async def start_listening(self):
        """Start listening for error events"""
        try:
            # Connect to message queue
            connection = await aio_pika.connect_robust("amqp://localhost/")
            channel = await connection.channel()

            # Declare queue
            queue = await channel.declare_queue(self.queue_name, durable=True)

            # Start consuming messages
            await queue.consume(self._process_error_message)

            self.logger.info(f"Started listening for errors on queue: {self.queue_name}")

            # Keep connection alive
            try:
                await asyncio.Future()  # Run forever
            finally:
                await connection.close()

        except Exception as e:
            self.logger.error(f"Error in error listener: {e}")

    async def _process_error_message(self, message: aio_pika.IncomingMessage):
        """Process incoming error message"""
        async with message.process():
            try:
                # Parse error message
                error_data = json.loads(message.body.decode())

                # Validate error data
                if self._validate_error_data(error_data):
                    # Update statistics
                    self._update_error_stats(error_data)

                    # Add to buffer
                    self.error_buffer.append(error_data)

                    # Trim buffer if needed
                    if len(self.error_buffer) > self.max_buffer_size:
                        self.error_buffer = self.error_buffer[-self.max_buffer_size:]

                    # Notify handlers
                    await self._notify_handlers(error_data)

                    self.logger.info(f"Processed error from {error_data.get('service', 'unknown')}")
                else:
                    self.logger.warning(f"Invalid error data received: {error_data}")

            except Exception as e:
                self.logger.error(f"Error processing message: {e}")

    def _validate_error_data(self, error_data: Dict) -> bool:
        """Validate error data structure"""
        required_fields = ["service", "error_type", "message", "timestamp", "severity"]

        for field in required_fields:
            if field not in error_data:
                return False

        return True

    def _update_error_stats(self, error_data: Dict):
        """Update error statistics"""
        self.error_stats["total_errors"] += 1
        self.error_stats["last_error_time"] = datetime.utcnow()

        # Update service statistics
        service = error_data.get("service", "unknown")
        if service not in self.error_stats["errors_by_service"]:
            self.error_stats["errors_by_service"][service] = 0
        self.error_stats["errors_by_service"][service] += 1

        # Update error type statistics
        error_type = error_data.get("error_type", "unknown")
        if error_type not in self.error_stats["errors_by_type"]:
            self.error_stats["errors_by_type"][error_type] = 0
        self.error_stats["errors_by_type"][error_type] += 1

        # Update severity statistics
        severity = error_data.get("severity", "unknown")
        if severity not in self.error_stats["errors_by_severity"]:
            self.error_stats["errors_by_severity"][severity] = 0
        self.error_stats["errors_by_severity"][severity] += 1

    async def _notify_handlers(self, error_data: Dict):
        """Notify all registered error handlers"""
        for handler in self.error_handlers:
            try:
                await handler(error_data)
            except Exception as e:
                self.logger.error(f"Error in handler: {e}")

    def add_handler(self, handler: Callable):
        """Add error handler"""
        self.error_handlers.append(handler)

    def get_error_stats(self) -> Dict:
        """Get error statistics"""
        return self.error_stats.copy()

    def get_recent_errors(self, limit: int = 100) -> List[Dict]:
        """Get recent errors"""
        return self.error_buffer[-limit:]

    def get_errors_by_service(self, service: str) -> List[Dict]:
        """Get errors for specific service"""
        return [e for e in self.error_buffer if e.get("service") == service]

    def get_errors_by_severity(self, severity: str) -> List[Dict]:
        """Get errors by severity level"""
        return [e for e in self.error_buffer if e.get("severity") == severity]

14.3.3 Error Storage Module

Purpose: Store and manage error data with efficient querying

Key Functions: - Error Storage: Store error data in database - Error Indexing: Index errors for fast queries - Error Aggregation: Aggregate error statistics - Data Retention: Manage error data retention

Error Storage Implementation:

import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import asyncpg
import logging

class ErrorStorage:
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.logger = logging.getLogger("error_storage")
        self.pool = None

    async def initialize(self):
        """Initialize database connection and create tables"""
        try:
            self.pool = await asyncpg.create_pool(self.database_url)

            # Create error table
            await self._create_error_table()
            await self._create_error_indexes()

            self.logger.info("Error storage initialized successfully")

        except Exception as e:
            self.logger.error(f"Failed to initialize error storage: {e}")
            raise

    async def _create_error_table(self):
        """Create error table"""
        async with self.pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS errors (
                    id SERIAL PRIMARY KEY,
                    error_id VARCHAR(255) UNIQUE NOT NULL,
                    service VARCHAR(100) NOT NULL,
                    error_type VARCHAR(100) NOT NULL,
                    message TEXT NOT NULL,
                    traceback TEXT,
                    severity VARCHAR(20) NOT NULL,
                    severity_level INTEGER NOT NULL,
                    context JSONB,
                    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
                )
            """)

    async def _create_error_indexes(self):
        """Create indexes for fast queries"""
        async with self.pool.acquire() as conn:
            # Index on service
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_errors_service ON errors(service)
            """)

            # Index on error_type
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_errors_type ON errors(error_type)
            """)

            # Index on severity
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_errors_severity ON errors(severity)
            """)

            # Index on timestamp
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp)
            """)

            # Composite index for common queries
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_errors_service_severity 
                ON errors(service, severity)
            """)

    async def save_error(self, error_data: Dict[str, Any]) -> bool:
        """Save error to database"""
        try:
            async with self.pool.acquire() as conn:
                await conn.execute("""
                    INSERT INTO errors (
                        error_id, service, error_type, message, traceback,
                        severity, severity_level, context, timestamp
                    ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
                """, 
                error_data["error_id"],
                error_data["service"],
                error_data["error_type"],
                error_data["message"],
                error_data.get("traceback"),
                error_data["severity"],
                error_data["severity_level"],
                json.dumps(error_data.get("context", {})),
                datetime.fromisoformat(error_data["timestamp"])
                )

                return True

        except Exception as e:
            self.logger.error(f"Failed to save error: {e}")
            return False

    async def get_errors(self, service: Optional[str] = None, 
                        error_type: Optional[str] = None,
                        severity: Optional[str] = None,
                        limit: int = 100,
                        offset: int = 0) -> List[Dict]:
        """Get errors with filters"""
        try:
            query = "SELECT * FROM errors WHERE 1=1"
            params = []
            param_count = 0

            if service:
                param_count += 1
                query += f" AND service = ${param_count}"
                params.append(service)

            if error_type:
                param_count += 1
                query += f" AND error_type = ${param_count}"
                params.append(error_type)

            if severity:
                param_count += 1
                query += f" AND severity = ${param_count}"
                params.append(severity)

            query += " ORDER BY timestamp DESC LIMIT $%d OFFSET $%d" % (param_count + 1, param_count + 2)
            params.extend([limit, offset])

            async with self.pool.acquire() as conn:
                rows = await conn.fetch(query, *params)

                return [dict(row) for row in rows]

        except Exception as e:
            self.logger.error(f"Failed to get errors: {e}")
            return []

    async def get_error_stats(self, hours: int = 24) -> Dict:
        """Get error statistics for the last N hours"""
        try:
            since = datetime.utcnow() - timedelta(hours=hours)

            async with self.pool.acquire() as conn:
                # Total errors
                total = await conn.fetchval("""
                    SELECT COUNT(*) FROM errors WHERE timestamp >= $1
                """, since)

                # Errors by service
                services = await conn.fetch("""
                    SELECT service, COUNT(*) as count 
                    FROM errors 
                    WHERE timestamp >= $1 
                    GROUP BY service 
                    ORDER BY count DESC
                """, since)

                # Errors by severity
                severities = await conn.fetch("""
                    SELECT severity, COUNT(*) as count 
                    FROM errors 
                    WHERE timestamp >= $1 
                    GROUP BY severity 
                    ORDER BY count DESC
                """, since)

                # Errors by type
                types = await conn.fetch("""
                    SELECT error_type, COUNT(*) as count 
                    FROM errors 
                    WHERE timestamp >= $1 
                    GROUP BY error_type 
                    ORDER BY count DESC
                """, since)

                return {
                    "total_errors": total,
                    "errors_by_service": {row["service"]: row["count"] for row in services},
                    "errors_by_severity": {row["severity"]: row["count"] for row in severities},
                    "errors_by_type": {row["error_type"]: row["count"] for row in types},
                    "time_period_hours": hours
                }

        except Exception as e:
            self.logger.error(f"Failed to get error stats: {e}")
            return {}

    async def get_error_trends(self, days: int = 7) -> List[Dict]:
        """Get error trends over the last N days"""
        try:
            since = datetime.utcnow() - timedelta(days=days)

            async with self.pool.acquire() as conn:
                rows = await conn.fetch("""
                    SELECT 
                        DATE(timestamp) as date,
                        COUNT(*) as total_errors,
                        COUNT(CASE WHEN severity = 'CRITICAL' THEN 1 END) as critical_errors,
                        COUNT(CASE WHEN severity = 'ERROR' THEN 1 END) as error_errors,
                        COUNT(CASE WHEN severity = 'WARNING' THEN 1 END) as warning_errors
                    FROM errors 
                    WHERE timestamp >= $1 
                    GROUP BY DATE(timestamp) 
                    ORDER BY date
                """, since)

                return [dict(row) for row in rows]

        except Exception as e:
            self.logger.error(f"Failed to get error trends: {e}")
            return []

    async def cleanup_old_errors(self, days: int = 30):
        """Clean up errors older than N days"""
        try:
            cutoff = datetime.utcnow() - timedelta(days=days)

            async with self.pool.acquire() as conn:
                deleted = await conn.execute("""
                    DELETE FROM errors WHERE timestamp < $1
                """, cutoff)

                self.logger.info(f"Cleaned up old errors: {deleted}")

        except Exception as e:
            self.logger.error(f"Failed to cleanup old errors: {e}")

14.3.4 Alert Manager Module

Purpose: Manage and route alerts to appropriate channels

Key Functions: - Alert Detection: Detect when alerts should be triggered - Alert Routing: Route alerts to appropriate channels - Alert Templates: Standardized alert message templates - Alert History: Track alert history and delivery status

Alert Manager Implementation:

import asyncio
import json
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import logging
import aiohttp

class AlertManager:
    def __init__(self):
        self.logger = logging.getLogger("alert_manager")
        self.notifiers = {}
        self.alert_rules = []
        self.alert_history = []

        # Alert severity thresholds
        self.severity_thresholds = {
            "CRITICAL": 1,
            "ERROR": 2,
            "WARNING": 3,
            "INFO": 4
        }

    def add_notifier(self, name: str, notifier):
        """Add notification channel"""
        self.notifiers[name] = notifier

    def add_alert_rule(self, rule: Dict):
        """Add alert rule"""
        self.alert_rules.append(rule)

    async def process_error(self, error_data: Dict):
        """Process error and trigger alerts if needed"""
        try:
            # Check alert rules
            for rule in self.alert_rules:
                if self._should_trigger_alert(error_data, rule):
                    await self._trigger_alert(error_data, rule)

        except Exception as e:
            self.logger.error(f"Error processing alert: {e}")

    def _should_trigger_alert(self, error_data: Dict, rule: Dict) -> bool:
        """Check if alert should be triggered based on rule"""
        # Check severity
        if "min_severity" in rule:
            error_severity_level = self.severity_thresholds.get(error_data["severity"], 5)
            min_severity_level = self.severity_thresholds.get(rule["min_severity"], 5)
            if error_severity_level > min_severity_level:
                return False

        # Check service
        if "services" in rule and error_data["service"] not in rule["services"]:
            return False

        # Check error type
        if "error_types" in rule and error_data["error_type"] not in rule["error_types"]:
            return False

        # Check rate limiting
        if "rate_limit" in rule:
            recent_alerts = self._get_recent_alerts_for_rule(rule, minutes=rule["rate_limit"])
            if len(recent_alerts) >= rule.get("max_alerts", 1):
                return False

        return True

    def _get_recent_alerts_for_rule(self, rule: Dict, minutes: int) -> List[Dict]:
        """Get recent alerts for a specific rule"""
        since = datetime.utcnow() - timedelta(minutes=minutes)
        return [
            alert for alert in self.alert_history
            if alert["rule_id"] == rule.get("id") and alert["timestamp"] >= since
        ]

    async def _trigger_alert(self, error_data: Dict, rule: Dict):
        """Trigger alert based on rule"""
        try:
            # Create alert message
            alert_message = self._create_alert_message(error_data, rule)

            # Send to configured channels
            channels = rule.get("channels", ["telegram"])

            for channel in channels:
                if channel in self.notifiers:
                    try:
                        await self.notifiers[channel].send_alert(alert_message)

                        # Record alert
                        self._record_alert(error_data, rule, channel, True)

                    except Exception as e:
                        self.logger.error(f"Failed to send alert to {channel}: {e}")
                        self._record_alert(error_data, rule, channel, False, str(e))
                else:
                    self.logger.warning(f"Unknown notification channel: {channel}")

        except Exception as e:
            self.logger.error(f"Error triggering alert: {e}")

    def _create_alert_message(self, error_data: Dict, rule: Dict) -> str:
        """Create alert message from error data"""
        template = rule.get("template", "default")

        if template == "default":
            return self._create_default_alert_message(error_data)
        elif template == "detailed":
            return self._create_detailed_alert_message(error_data)
        else:
            return self._create_custom_alert_message(error_data, template)

    def _create_default_alert_message(self, error_data: Dict) -> str:
        """Create default alert message"""
        return f"""
🚨 **Error Alert** 🚨

**Service:** {error_data['service']}
**Type:** {error_data['error_type']}
**Severity:** {error_data['severity']}
**Message:** {error_data['message']}
**Time:** {error_data['timestamp']}

ID: {error_data['error_id']}
        """.strip()

    def _create_detailed_alert_message(self, error_data: Dict) -> str:
        """Create detailed alert message"""
        message = f"""
🚨 **Detailed Error Alert** 🚨

**Service:** {error_data['service']}
**Type:** {error_data['error_type']}
**Severity:** {error_data['severity']}
**Message:** {error_data['message']}
**Time:** {error_data['timestamp']}
**ID:** {error_data['error_id']}

**Context:**
{json.dumps(error_data.get('context', {}), indent=2)}

**Traceback:**
{error_data.get('traceback', 'No traceback available')}
        """.strip()

        return message

    def _create_custom_alert_message(self, error_data: Dict, template: str) -> str:
        """Create custom alert message using template"""
        # Simple template substitution
        message = template
        for key, value in error_data.items():
            if isinstance(value, str):
                message = message.replace(f"{{{key}}}", value)

        return message

    def _record_alert(self, error_data: Dict, rule: Dict, channel: str, 
                     success: bool, error_message: str = None):
        """Record alert in history"""
        alert_record = {
            "rule_id": rule.get("id"),
            "error_id": error_data["error_id"],
            "channel": channel,
            "success": success,
            "error_message": error_message,
            "timestamp": datetime.utcnow()
        }

        self.alert_history.append(alert_record)

        # Keep only recent history
        if len(self.alert_history) > 1000:
            self.alert_history = self.alert_history[-1000:]

    def get_alert_history(self, hours: int = 24) -> List[Dict]:
        """Get alert history for the last N hours"""
        since = datetime.utcnow() - timedelta(hours=hours)
        return [
            alert for alert in self.alert_history
            if alert["timestamp"] >= since
        ]

    def get_alert_stats(self, hours: int = 24) -> Dict:
        """Get alert statistics"""
        recent_alerts = self.get_alert_history(hours)

        stats = {
            "total_alerts": len(recent_alerts),
            "successful_alerts": len([a for a in recent_alerts if a["success"]]),
            "failed_alerts": len([a for a in recent_alerts if not a["success"]]),
            "alerts_by_channel": {},
            "alerts_by_rule": {}
        }

        for alert in recent_alerts:
            # By channel
            channel = alert["channel"]
            if channel not in stats["alerts_by_channel"]:
                stats["alerts_by_channel"][channel] = 0
            stats["alerts_by_channel"][channel] += 1

            # By rule
            rule_id = alert["rule_id"]
            if rule_id not in stats["alerts_by_rule"]:
                stats["alerts_by_rule"][rule_id] = 0
            stats["alerts_by_rule"][rule_id] += 1

        return stats

14.4 Data Architecture

14.4.1 Error Data Models

Error Event Model:

{
  "error_id": "err_12345",
  "service": "trading-engine",
  "error_type": "ConnectionError",
  "message": "Failed to connect to market data feed",
  "traceback": "Traceback (most recent call last):...",
  "severity": "ERROR",
  "severity_level": 2,
  "context": {
    "account_id": "acc_123",
    "strategy_id": "strat_456",
    "market": "BTCUSD",
    "attempt": 3
  },
  "timestamp": "2024-12-20T10:30:15.123Z"
}

Alert Rule Model:

{
  "id": "rule_001",
  "name": "Critical Trading Errors",
  "description": "Alert on critical trading system errors",
  "min_severity": "ERROR",
  "services": ["trading-engine", "order-executor"],
  "error_types": ["ConnectionError", "OrderError", "RiskError"],
  "channels": ["telegram", "email"],
  "template": "detailed",
  "rate_limit": 5,
  "max_alerts": 3,
  "enabled": true
}

Alert History Model:

{
  "rule_id": "rule_001",
  "error_id": "err_12345",
  "channel": "telegram",
  "success": true,
  "error_message": null,
  "timestamp": "2024-12-20T10:30:16.456Z"
}

14.4.2 Real-time Data Flow

Microservice Error → Error Capture → Message Queue → Error Listener
    ↓
Error Storage → Alert Detection → Alert Manager → Notification Channels
    ↓
Real-time Dashboard → Error Analytics → Trend Analysis → Incident Response

14.5 API Interface Design

14.5.1 Error Monitor Endpoints

Error Management:

GET    /api/v1/errors                           # Get errors with filters
GET    /api/v1/errors/{error_id}                # Get specific error
GET    /api/v1/errors/stats                     # Get error statistics
GET    /api/v1/errors/trends                    # Get error trends
DELETE /api/v1/errors/cleanup                   # Cleanup old errors

Alert Management:

GET    /api/v1/alerts/rules                     # Get alert rules
POST   /api/v1/alerts/rules                     # Create alert rule
PUT    /api/v1/alerts/rules/{rule_id}           # Update alert rule
DELETE /api/v1/alerts/rules/{rule_id}           # Delete alert rule
GET    /api/v1/alerts/history                   # Get alert history
GET    /api/v1/alerts/stats                     # Get alert statistics

Dashboard Data:

GET    /api/v1/dashboard/overview               # Get dashboard overview
GET    /api/v1/dashboard/errors-by-service      # Get errors by service
GET    /api/v1/dashboard/errors-by-severity     # Get errors by severity
GET    /api/v1/dashboard/error-trends           # Get error trends
GET    /api/v1/dashboard/alert-stats            # Get alert statistics

14.5.2 Real-time Updates

WebSocket Endpoints:

/ws/errors/stream                                # Real-time error stream
/ws/alerts/stream                                # Real-time alert stream
/ws/dashboard/updates                            # Real-time dashboard updates

14.6 Frontend Integration

14.6.1 Error Monitor Dashboard Components

Error Overview Panel: - Error Summary: Total errors, critical errors, error rate - Service Health: Error count by service - Severity Distribution: Errors by severity level - Recent Errors: Latest error events

Error Details Panel: - Error List: Detailed error list with filters - Error Details: Full error information and context - Error Search: Search errors by various criteria - Error Actions: Error acknowledgment and resolution

Alert Management Panel: - Alert Rules: Configure and manage alert rules - Alert History: Alert delivery history - Alert Statistics: Alert success/failure rates - Notification Channels: Configure notification channels

Analytics Panel: - Error Trends: Error trends over time - Service Performance: Service error rates - Error Patterns: Error pattern analysis - Impact Analysis: Error impact assessment

14.6.2 Interactive Features

Visualization Tools: - Error Charts: Error count and trend charts - Service Heatmap: Service health heatmap - Alert Timeline: Alert history timeline - Performance Dashboard: System performance metrics

Analysis Tools: - Error Analysis: Detailed error analysis - Pattern Detection: Error pattern detection - Correlation Analysis: Error correlation analysis - Root Cause Analysis: Error root cause analysis

14.7 Performance Characteristics

14.7.1 Error Monitor Performance Metrics

Metric Target Measurement
Error Processing <10ms Error processing time
Alert Delivery <5 seconds Alert delivery time
Query Response <100ms Error query response time
Data Retention 30 days Error data retention period

14.7.2 Alert System Quality

Requirement Implementation
Real-time Alerts Immediate alert detection and delivery
Multi-channel Support for Telegram, email, Slack
Rate Limiting Configurable alert rate limiting
Delivery Tracking Alert delivery status tracking

14.8 Integration with Existing System

14.8.1 Microservice Integration

Standardized Integration:

All Microservices → Error Capture → Error Monitor → Alert System

Real-time Monitoring: - Error Collection: Real-time error collection from all services - Health Monitoring: Service health and performance monitoring - Alert Integration: Integrated alerting with existing systems - Dashboard Integration: Integration with existing dashboards

14.8.2 Incident Response Integration

Incident Management Integration: - Incident Creation: Automatic incident creation for critical errors - Escalation Rules: Configurable escalation rules - Response Tracking: Incident response tracking - Resolution Workflow: Systematic resolution workflow

14.9 Implementation Roadmap

14.9.1 Phase 1: Foundation (Weeks 1-2)

  • Basic Error Collection: Basic error collection from microservices
  • Simple Storage: Basic error storage and retrieval
  • Basic Alerts: Simple alert notifications
  • Basic Dashboard: Basic error dashboard

14.9.2 Phase 2: Advanced Features (Weeks 3-4)

  • Advanced Error Analysis: Error pattern and trend analysis
  • Multi-channel Alerts: Support for multiple alert channels
  • Alert Rules: Configurable alert rules
  • Error Classification: Advanced error classification

14.9.3 Phase 3: Analytics (Weeks 5-6)

  • Error Analytics: Comprehensive error analytics
  • Performance Monitoring: Service performance monitoring
  • Trend Analysis: Advanced trend analysis
  • Impact Assessment: Error impact assessment

14.9.4 Phase 4: Production Ready (Weeks 7-8)

  • High Availability: Redundant error monitoring infrastructure
  • Performance Optimization: High-performance error processing
  • Advanced Analytics: Comprehensive error analytics
  • Enterprise Features: Institutional-grade error monitoring

14.10 Business Value

14.10.1 Operational Excellence

Benefit Impact
Proactive Monitoring Early detection and response to issues
Reduced Downtime Faster incident response and resolution
Improved Reliability Better system reliability and stability
Operational Visibility Complete operational visibility

14.10.2 Risk Management

Advantage Business Value
Risk Mitigation Proactive risk identification and mitigation
Compliance Better compliance with operational requirements
Quality Assurance Improved system quality and reliability
Professional Operations Institutional-grade operational standards

Document Information
Type: Centralized Error Tracking & Alert Center Design | Audience: Technical Leadership & Engineering Teams
Version: 1.0 | Date: December 2024
Focus: Error Monitoring & Alerting | Implementation: Detailed technical specifications for centralized error monitoring and alerting