Skip to content

62. Global Task Scheduling and Batch Processing System

Overview

The Global Task Scheduling and Batch Processing System provides unified orchestration for all asynchronous tasks across the trading system. This enterprise-grade workflow management platform enables centralized control of backtesting jobs, strategy deployments, account settlements, risk scans, data synchronization, and other critical operations.

Architecture Design

Microservice Structure

services/global-scheduler-center/
├── src/
│   ├── main.py
│   ├── registry/
│   │   ├── __init__.py
│   │   ├── task_registry.py
│   │   └── task_definition.py
│   ├── scheduler/
│   │   ├── __init__.py
│   │   ├── task_scheduler.py
│   │   ├── cron_scheduler.py
│   │   └── event_scheduler.py
│   ├── executor/
│   │   ├── __init__.py
│   │   ├── task_executor.py
│   │   ├── worker_pool.py
│   │   └── task_queue.py
│   ├── controller/
│   │   ├── __init__.py
│   │   ├── retry_controller.py
│   │   └── priority_controller.py
│   ├── monitor/
│   │   ├── __init__.py
│   │   ├── task_monitor.py
│   │   ├── performance_tracker.py
│   │   └── alert_manager.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── scheduler_api.py
│   │   ├── task_management_api.py
│   │   └── monitoring_api.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── task_models.py
│   │   └── execution_models.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── cron_parser.py
│   │   └── task_validator.py
│   ├── config.py
│   └── requirements.txt
├── tests/
│   ├── test_task_registry.py
│   ├── test_scheduler.py
│   ├── test_executor.py
│   └── test_api.py
├── Dockerfile
└── docker-compose.yml

Core Components

1. Task Registry (registry/task_registry.py)

from typing import Dict, Callable, Any
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class TaskDefinition:
    name: str
    handler: Callable
    description: str
    timeout: int = 300
    retry_count: int = 3
    priority: int = 5
    tags: list = None

class TaskRegistry:
    def __init__(self):
        self.task_handlers: Dict[str, TaskDefinition] = {}
        self.task_metadata: Dict[str, dict] = {}

    def register_task(self, task_name: str, handler: Callable, 
                     description: str = "", timeout: int = 300,
                     retry_count: int = 3, priority: int = 5,
                     tags: list = None) -> None:
        """Register a new task with the scheduler"""
        task_def = TaskDefinition(
            name=task_name,
            handler=handler,
            description=description,
            timeout=timeout,
            retry_count=retry_count,
            priority=priority,
            tags=tags or []
        )
        self.task_handlers[task_name] = task_def

    def get_task(self, task_name: str) -> TaskDefinition:
        """Retrieve task definition"""
        return self.task_handlers.get(task_name)

    def list_tasks(self) -> Dict[str, TaskDefinition]:
        """List all registered tasks"""
        return self.task_handlers.copy()

    def unregister_task(self, task_name: str) -> bool:
        """Remove task from registry"""
        if task_name in self.task_handlers:
            del self.task_handlers[task_name]
            return True
        return False

# Global registry instance
task_registry = TaskRegistry()

2. Task Scheduler (scheduler/task_scheduler.py)

import asyncio
import heapq
from datetime import datetime, timedelta
from typing import List, Dict, Any
from dataclasses import dataclass, field

@dataclass
class ScheduledTask:
    task_name: str
    payload: Dict[str, Any]
    trigger_time: datetime
    priority: int = 5
    task_id: str = None
    retry_count: int = 0
    max_retries: int = 3
    tags: List[str] = field(default_factory=list)

    def __lt__(self, other):
        return (self.trigger_time, -self.priority) < (other.trigger_time, -other.priority)

class TaskScheduler:
    def __init__(self):
        self.scheduled_tasks: List[ScheduledTask] = []
        self.event_triggers: Dict[str, List[ScheduledTask]] = {}
        self.running_tasks: Dict[str, ScheduledTask] = {}
        self.task_history: List[Dict] = []

    async def schedule_task(self, task_name: str, payload: Dict[str, Any],
                          trigger_time: datetime = None, 
                          priority: int = 5,
                          tags: List[str] = None) -> str:
        """Schedule a task for execution"""
        task_id = f"{task_name}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"

        scheduled_task = ScheduledTask(
            task_name=task_name,
            payload=payload,
            trigger_time=trigger_time or datetime.now(),
            priority=priority,
            task_id=task_id,
            tags=tags or []
        )

        heapq.heappush(self.scheduled_tasks, scheduled_task)
        return task_id

    async def schedule_cron_task(self, task_name: str, cron_expression: str,
                               payload: Dict[str, Any], priority: int = 5) -> str:
        """Schedule a recurring task using cron expression"""
        next_run = self._parse_cron_expression(cron_expression)
        return await self.schedule_task(task_name, payload, next_run, priority)

    async def schedule_event_trigger(self, task_name: str, event_type: str,
                                   payload: Dict[str, Any], priority: int = 5) -> str:
        """Schedule task to be triggered by specific events"""
        task_id = await self.schedule_task(task_name, payload, priority=priority)

        if event_type not in self.event_triggers:
            self.event_triggers[event_type] = []

        self.event_triggers[event_type].append(
            ScheduledTask(task_name=task_name, payload=payload, 
                         trigger_time=datetime.now(), priority=priority,
                         task_id=task_id)
        )
        return task_id

    async def trigger_event(self, event_type: str, event_data: Dict[str, Any]) -> List[str]:
        """Trigger all tasks associated with an event"""
        triggered_tasks = []
        if event_type in self.event_triggers:
            for task in self.event_triggers[event_type]:
                task.payload.update(event_data)
                heapq.heappush(self.scheduled_tasks, task)
                triggered_tasks.append(task.task_id)
        return triggered_tasks

    def get_next_task(self) -> ScheduledTask:
        """Get the next task to execute"""
        if self.scheduled_tasks and self.scheduled_tasks[0].trigger_time <= datetime.now():
            return heapq.heappop(self.scheduled_tasks)
        return None

    def cancel_task(self, task_id: str) -> bool:
        """Cancel a scheduled task"""
        # Remove from scheduled tasks
        self.scheduled_tasks = [t for t in self.scheduled_tasks if t.task_id != task_id]
        heapq.heapify(self.scheduled_tasks)

        # Remove from event triggers
        for event_type in self.event_triggers:
            self.event_triggers[event_type] = [
                t for t in self.event_triggers[event_type] if t.task_id != task_id
            ]

        return True

    def _parse_cron_expression(self, cron_expression: str) -> datetime:
        """Parse cron expression and return next execution time"""
        # Simplified cron parser - in production use croniter
        parts = cron_expression.split()
        if len(parts) != 5:
            raise ValueError("Invalid cron expression")

        # Calculate next run time based on current time
        now = datetime.now()
        # This is a simplified implementation
        return now + timedelta(minutes=1)

# Global scheduler instance
task_scheduler = TaskScheduler()

3. Task Executor (executor/task_executor.py)

import asyncio
import logging
from typing import Dict, Any
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

class TaskExecutor:
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
        self.running_tasks: Dict[str, asyncio.Task] = {}
        self.logger = logging.getLogger(__name__)

    async def execute_task(self, scheduled_task) -> Dict[str, Any]:
        """Execute a scheduled task"""
        task_id = scheduled_task.task_id
        task_name = scheduled_task.task_name

        try:
            # Get task handler from registry
            task_def = task_registry.get_task(task_name)
            if not task_def:
                raise ValueError(f"Task {task_name} not found in registry")

            # Mark task as running
            self.running_tasks[task_id] = asyncio.current_task()

            # Execute task
            start_time = datetime.now()
            if asyncio.iscoroutinefunction(task_def.handler):
                result = await task_def.handler(scheduled_task.payload)
            else:
                # Run sync functions in thread pool
                loop = asyncio.get_event_loop()
                result = await loop.run_in_executor(
                    self.thread_pool, task_def.handler, scheduled_task.payload
                )

            execution_time = (datetime.now() - start_time).total_seconds()

            return {
                "task_id": task_id,
                "status": "success",
                "result": result,
                "execution_time": execution_time,
                "completed_at": datetime.now()
            }

        except Exception as e:
            self.logger.error(f"Task {task_id} failed: {str(e)}")
            return {
                "task_id": task_id,
                "status": "failed",
                "error": str(e),
                "completed_at": datetime.now()
            }
        finally:
            if task_id in self.running_tasks:
                del self.running_tasks[task_id]

    async def execute_batch(self, tasks: list) -> list:
        """Execute multiple tasks in parallel"""
        semaphore = asyncio.Semaphore(self.max_workers)

        async def execute_with_semaphore(task):
            async with semaphore:
                return await self.execute_task(task)

        return await asyncio.gather(*[execute_with_semaphore(task) for task in tasks])

    def cancel_task(self, task_id: str) -> bool:
        """Cancel a running task"""
        if task_id in self.running_tasks:
            self.running_tasks[task_id].cancel()
            return True
        return False

    def get_running_tasks(self) -> Dict[str, Any]:
        """Get information about currently running tasks"""
        return {
            task_id: {
                "task_name": task.task_name,
                "started_at": task.start_time if hasattr(task, 'start_time') else None
            }
            for task_id, task in self.running_tasks.items()
        }

# Global executor instance
task_executor = TaskExecutor()

4. Retry Controller (controller/retry_controller.py)

import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any

class RetryController:
    def __init__(self):
        self.retry_policies: Dict[str, Dict] = {}
        self.failed_tasks: Dict[str, Dict] = {}

    def set_retry_policy(self, task_name: str, max_retries: int = 3,
                        backoff_factor: float = 2.0, 
                        initial_delay: int = 60) -> None:
        """Set retry policy for a specific task type"""
        self.retry_policies[task_name] = {
            "max_retries": max_retries,
            "backoff_factor": backoff_factor,
            "initial_delay": initial_delay
        }

    async def handle_failed_task(self, task_result: Dict[str, Any]) -> bool:
        """Handle failed task and determine if retry is needed"""
        task_id = task_result["task_id"]
        task_name = task_result.get("task_name", "unknown")

        # Get retry policy
        policy = self.retry_policies.get(task_name, {
            "max_retries": 3,
            "backoff_factor": 2.0,
            "initial_delay": 60
        })

        # Check if task should be retried
        current_retries = self.failed_tasks.get(task_id, {}).get("retry_count", 0)

        if current_retries < policy["max_retries"]:
            # Calculate delay with exponential backoff
            delay = policy["initial_delay"] * (policy["backoff_factor"] ** current_retries)

            # Schedule retry
            retry_time = datetime.now() + timedelta(seconds=delay)

            # Create retry task
            retry_task = ScheduledTask(
                task_name=task_name,
                payload=task_result.get("payload", {}),
                trigger_time=retry_time,
                task_id=f"{task_id}_retry_{current_retries + 1}",
                retry_count=current_retries + 1,
                max_retries=policy["max_retries"]
            )

            # Add to scheduler
            heapq.heappush(task_scheduler.scheduled_tasks, retry_task)

            # Update failed tasks tracking
            self.failed_tasks[task_id] = {
                "retry_count": current_retries + 1,
                "last_failure": datetime.now(),
                "error": task_result.get("error", "Unknown error")
            }

            return True
        else:
            # Max retries exceeded, mark as permanently failed
            self.failed_tasks[task_id] = {
                "retry_count": current_retries,
                "status": "permanently_failed",
                "last_failure": datetime.now(),
                "error": task_result.get("error", "Unknown error")
            }
            return False

    def get_failed_tasks(self) -> Dict[str, Dict]:
        """Get information about failed tasks"""
        return self.failed_tasks.copy()

    def clear_failed_task(self, task_id: str) -> bool:
        """Clear failed task from tracking"""
        if task_id in self.failed_tasks:
            del self.failed_tasks[task_id]
            return True
        return False

# Global retry controller instance
retry_controller = RetryController()

5. Task Monitor (monitor/task_monitor.py)

import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Any
from collections import defaultdict

class TaskMonitor:
    def __init__(self):
        self.task_history: List[Dict] = []
        self.performance_metrics: Dict[str, List] = defaultdict(list)
        self.alerts: List[Dict] = []
        self.max_history_size = 10000

    async def track_task_execution(self, task_result: Dict[str, Any]) -> None:
        """Track task execution result"""
        # Add timestamp if not present
        if "timestamp" not in task_result:
            task_result["timestamp"] = datetime.now()

        # Add to history
        self.task_history.append(task_result)

        # Maintain history size
        if len(self.task_history) > self.max_history_size:
            self.task_history = self.task_history[-self.max_history_size:]

        # Update performance metrics
        task_name = task_result.get("task_name", "unknown")
        if "execution_time" in task_result:
            self.performance_metrics[task_name].append(task_result["execution_time"])

        # Check for alerts
        await self._check_alerts(task_result)

    async def _check_alerts(self, task_result: Dict[str, Any]) -> None:
        """Check if task result triggers any alerts"""
        # Long execution time alert
        if task_result.get("execution_time", 0) > 300:  # 5 minutes
            self.alerts.append({
                "type": "long_execution",
                "task_id": task_result["task_id"],
                "task_name": task_result.get("task_name", "unknown"),
                "execution_time": task_result["execution_time"],
                "timestamp": datetime.now()
            })

        # Failed task alert
        if task_result.get("status") == "failed":
            self.alerts.append({
                "type": "task_failure",
                "task_id": task_result["task_id"],
                "task_name": task_result.get("task_name", "unknown"),
                "error": task_result.get("error", "Unknown error"),
                "timestamp": datetime.now()
            })

    def get_task_history(self, task_name: str = None, 
                        status: str = None, 
                        limit: int = 100) -> List[Dict]:
        """Get task execution history with filters"""
        filtered_history = self.task_history

        if task_name:
            filtered_history = [t for t in filtered_history if t.get("task_name") == task_name]

        if status:
            filtered_history = [t for t in filtered_history if t.get("status") == status]

        return filtered_history[-limit:]

    def get_performance_metrics(self, task_name: str = None) -> Dict[str, Any]:
        """Get performance metrics for tasks"""
        if task_name:
            metrics = self.performance_metrics.get(task_name, [])
        else:
            metrics = {name: data for name, data in self.performance_metrics.items()}

        if not metrics:
            return {}

        if isinstance(metrics, list):
            return {
                "count": len(metrics),
                "avg_execution_time": sum(metrics) / len(metrics),
                "min_execution_time": min(metrics),
                "max_execution_time": max(metrics)
            }
        else:
            return {
                name: {
                    "count": len(data),
                    "avg_execution_time": sum(data) / len(data),
                    "min_execution_time": min(data),
                    "max_execution_time": max(data)
                }
                for name, data in metrics.items()
            }

    def get_alerts(self, alert_type: str = None, 
                  since: datetime = None) -> List[Dict]:
        """Get alerts with optional filtering"""
        filtered_alerts = self.alerts

        if alert_type:
            filtered_alerts = [a for a in filtered_alerts if a["type"] == alert_type]

        if since:
            filtered_alerts = [a for a in filtered_alerts if a["timestamp"] >= since]

        return filtered_alerts

    def clear_alerts(self, alert_type: str = None) -> int:
        """Clear alerts and return count of cleared alerts"""
        if alert_type:
            original_count = len(self.alerts)
            self.alerts = [a for a in self.alerts if a["type"] != alert_type]
            return original_count - len(self.alerts)
        else:
            cleared_count = len(self.alerts)
            self.alerts.clear()
            return cleared_count

# Global monitor instance
task_monitor = TaskMonitor()

API Design

Scheduler API (api/scheduler_api.py)

from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from datetime import datetime

router = APIRouter(prefix="/api/v1/scheduler", tags=["Task Scheduler"])

class TaskRequest(BaseModel):
    task_name: str
    payload: Dict[str, Any] = {}
    trigger_time: Optional[datetime] = None
    priority: int = 5
    tags: List[str] = []

class CronTaskRequest(BaseModel):
    task_name: str
    cron_expression: str
    payload: Dict[str, Any] = {}
    priority: int = 5

class EventTriggerRequest(BaseModel):
    task_name: str
    event_type: str
    payload: Dict[str, Any] = {}
    priority: int = 5

@router.post("/task/submit")
async def submit_task(request: TaskRequest):
    """Submit a task for immediate or scheduled execution"""
    try:
        task_id = await task_scheduler.schedule_task(
            request.task_name,
            request.payload,
            request.trigger_time,
            request.priority,
            request.tags
        )
        return {"task_id": task_id, "status": "scheduled"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("/task/cron")
async def submit_cron_task(request: CronTaskRequest):
    """Submit a recurring task using cron expression"""
    try:
        task_id = await task_scheduler.schedule_cron_task(
            request.task_name,
            request.cron_expression,
            request.payload,
            request.priority
        )
        return {"task_id": task_id, "status": "scheduled"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("/task/event-trigger")
async def submit_event_trigger(request: EventTriggerRequest):
    """Submit a task to be triggered by specific events"""
    try:
        task_id = await task_scheduler.schedule_event_trigger(
            request.task_name,
            request.event_type,
            request.payload,
            request.priority
        )
        return {"task_id": task_id, "status": "registered"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.post("/event/trigger/{event_type}")
async def trigger_event(event_type: str, event_data: Dict[str, Any]):
    """Trigger all tasks associated with an event"""
    try:
        triggered_tasks = await task_scheduler.trigger_event(event_type, event_data)
        return {"triggered_tasks": triggered_tasks, "count": len(triggered_tasks)}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/task/status/{task_id}")
async def get_task_status(task_id: str):
    """Get status of a specific task"""
    # Check running tasks
    if task_id in task_executor.running_tasks:
        return {"task_id": task_id, "status": "running"}

    # Check history
    history = task_monitor.get_task_history(limit=1000)
    for record in history:
        if record.get("task_id") == task_id:
            return record

    raise HTTPException(status_code=404, detail="Task not found")

@router.delete("/task/cancel/{task_id}")
async def cancel_task(task_id: str):
    """Cancel a scheduled or running task"""
    try:
        # Cancel from scheduler
        scheduler_cancelled = task_scheduler.cancel_task(task_id)

        # Cancel from executor
        executor_cancelled = task_executor.cancel_task(task_id)

        if scheduler_cancelled or executor_cancelled:
            return {"task_id": task_id, "status": "cancelled"}
        else:
            raise HTTPException(status_code=404, detail="Task not found")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/tasks/running")
async def get_running_tasks():
    """Get all currently running tasks"""
    return task_executor.get_running_tasks()

@router.get("/tasks/scheduled")
async def get_scheduled_tasks():
    """Get all scheduled tasks"""
    return [{"task_id": t.task_id, "task_name": t.task_name, 
             "trigger_time": t.trigger_time, "priority": t.priority}
            for t in task_scheduler.scheduled_tasks]

@router.get("/tasks/failed")
async def get_failed_tasks():
    """Get all failed tasks"""
    return retry_controller.get_failed_tasks()

@router.get("/tasks/history")
async def get_task_history(task_name: str = None, status: str = None, limit: int = 100):
    """Get task execution history"""
    return task_monitor.get_task_history(task_name, status, limit)

@router.get("/metrics/performance")
async def get_performance_metrics(task_name: str = None):
    """Get performance metrics for tasks"""
    return task_monitor.get_performance_metrics(task_name)

@router.get("/alerts")
async def get_alerts(alert_type: str = None, since: str = None):
    """Get system alerts"""
    since_dt = None
    if since:
        since_dt = datetime.fromisoformat(since)
    return task_monitor.get_alerts(alert_type, since_dt)

@router.delete("/alerts/clear")
async def clear_alerts(alert_type: str = None):
    """Clear alerts"""
    cleared_count = task_monitor.clear_alerts(alert_type)
    return {"cleared_count": cleared_count}

Frontend Integration

Global Scheduler Dashboard (frontend/src/components/GlobalSchedulerView.tsx)

import React, { useState, useEffect } from 'react';
import {
  Box,
  Grid,
  Card,
  CardContent,
  Typography,
  Button,
  TextField,
  Select,
  MenuItem,
  FormControl,
  InputLabel,
  Table,
  TableBody,
  TableCell,
  TableContainer,
  TableHead,
  TableRow,
  Paper,
  Chip,
  Alert,
  Dialog,
  DialogTitle,
  DialogContent,
  DialogActions,
  IconButton,
  Tooltip
} from '@mui/material';
import {
  PlayArrow,
  Stop,
  Schedule,
  History,
  TrendingUp,
  Warning,
  Refresh,
  Add,
  Delete
} from '@mui/icons-material';
import { DataGrid, GridColDef } from '@mui/x-data-grid';

interface Task {
  task_id: string;
  task_name: string;
  status: string;
  trigger_time?: string;
  priority: number;
  execution_time?: number;
  error?: string;
  timestamp?: string;
}

interface TaskRequest {
  task_name: string;
  payload: Record<string, any>;
  trigger_time?: string;
  priority: number;
  tags: string[];
}

const GlobalSchedulerView: React.FC = () => {
  const [tasks, setTasks] = useState<Task[]>([]);
  const [runningTasks, setRunningTasks] = useState<Task[]>([]);
  const [scheduledTasks, setScheduledTasks] = useState<Task[]>([]);
  const [failedTasks, setFailedTasks] = useState<Task[]>([]);
  const [taskHistory, setTaskHistory] = useState<Task[]>([]);
  const [performanceMetrics, setPerformanceMetrics] = useState<any>({});
  const [alerts, setAlerts] = useState<any[]>([]);
  const [loading, setLoading] = useState(false);
  const [submitDialogOpen, setSubmitDialogOpen] = useState(false);
  const [cronDialogOpen, setCronDialogOpen] = useState(false);
  const [newTask, setNewTask] = useState<TaskRequest>({
    task_name: '',
    payload: {},
    priority: 5,
    tags: []
  });

  // Task columns for DataGrid
  const taskColumns: GridColDef[] = [
    { field: 'task_id', headerName: 'Task ID', width: 200 },
    { field: 'task_name', headerName: 'Task Name', width: 150 },
    { field: 'status', headerName: 'Status', width: 120,
      renderCell: (params) => (
        <Chip
          label={params.value}
          color={
            params.value === 'success' ? 'success' :
            params.value === 'failed' ? 'error' :
            params.value === 'running' ? 'warning' : 'default'
          }
          size="small"
        />
      )
    },
    { field: 'priority', headerName: 'Priority', width: 100 },
    { field: 'execution_time', headerName: 'Execution Time (s)', width: 150,
      renderCell: (params) => params.value ? `${params.value.toFixed(2)}s` : '-'
    },
    { field: 'timestamp', headerName: 'Timestamp', width: 200,
      renderCell: (params) => params.value ? new Date(params.value).toLocaleString() : '-'
    },
    { field: 'actions', headerName: 'Actions', width: 120,
      renderCell: (params) => (
        <Box>
          {params.row.status === 'running' && (
            <Tooltip title="Cancel Task">
              <IconButton
                size="small"
                onClick={() => cancelTask(params.row.task_id)}
                color="error"
              >
                <Stop />
              </IconButton>
            </Tooltip>
          )}
        </Box>
      )
    }
  ];

  // Load data
  const loadData = async () => {
    setLoading(true);
    try {
      const [running, scheduled, failed, history, metrics, alertsData] = await Promise.all([
        fetch('/api/v1/scheduler/tasks/running').then(r => r.json()),
        fetch('/api/v1/scheduler/tasks/scheduled').then(r => r.json()),
        fetch('/api/v1/scheduler/tasks/failed').then(r => r.json()),
        fetch('/api/v1/scheduler/tasks/history').then(r => r.json()),
        fetch('/api/v1/scheduler/metrics/performance').then(r => r.json()),
        fetch('/api/v1/scheduler/alerts').then(r => r.json())
      ]);

      setRunningTasks(Object.values(running));
      setScheduledTasks(scheduled);
      setFailedTasks(Object.values(failed));
      setTaskHistory(history);
      setPerformanceMetrics(metrics);
      setAlerts(alertsData);
    } catch (error) {
      console.error('Error loading data:', error);
    } finally {
      setLoading(false);
    }
  };

  // Submit task
  const submitTask = async (taskData: TaskRequest) => {
    try {
      const response = await fetch('/api/v1/scheduler/task/submit', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(taskData)
      });

      if (response.ok) {
        setSubmitDialogOpen(false);
        setNewTask({ task_name: '', payload: {}, priority: 5, tags: [] });
        loadData();
      }
    } catch (error) {
      console.error('Error submitting task:', error);
    }
  };

  // Cancel task
  const cancelTask = async (taskId: string) => {
    try {
      await fetch(`/api/v1/scheduler/task/cancel/${taskId}`, {
        method: 'DELETE'
      });
      loadData();
    } catch (error) {
      console.error('Error cancelling task:', error);
    }
  };

  // Trigger event
  const triggerEvent = async (eventType: string, eventData: Record<string, any>) => {
    try {
      await fetch(`/api/v1/scheduler/event/trigger/${eventType}`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(eventData)
      });
      loadData();
    } catch (error) {
      console.error('Error triggering event:', error);
    }
  };

  useEffect(() => {
    loadData();
    const interval = setInterval(loadData, 5000); // Refresh every 5 seconds
    return () => clearInterval(interval);
  }, []);

  return (
    <Box sx={{ p: 3 }}>
      <Box sx={{ display: 'flex', justifyContent: 'space-between', mb: 3 }}>
        <Typography variant="h4">Global Task Scheduler</Typography>
        <Box>
          <Button
            variant="contained"
            startIcon={<Add />}
            onClick={() => setSubmitDialogOpen(true)}
            sx={{ mr: 1 }}
          >
            Submit Task
          </Button>
          <Button
            variant="outlined"
            startIcon={<Schedule />}
            onClick={() => setCronDialogOpen(true)}
            sx={{ mr: 1 }}
          >
            Cron Task
          </Button>
          <Button
            variant="outlined"
            startIcon={<Refresh />}
            onClick={loadData}
          >
            Refresh
          </Button>
        </Box>
      </Box>

      {/* Alerts */}
      {alerts.length > 0 && (
        <Box sx={{ mb: 3 }}>
          {alerts.map((alert, index) => (
            <Alert key={index} severity="warning" sx={{ mb: 1 }}>
              {alert.type}: {alert.task_name} - {alert.error || 'Task failed'}
            </Alert>
          ))}
        </Box>
      )}

      {/* Performance Metrics */}
      <Grid container spacing={3} sx={{ mb: 3 }}>
        <Grid item xs={12} md={6}>
          <Card>
            <CardContent>
              <Typography variant="h6" gutterBottom>
                Performance Metrics
              </Typography>
              <Box sx={{ display: 'flex', justifyContent: 'space-between' }}>
                <Typography>Total Tasks: {taskHistory.length}</Typography>
                <Typography>Running: {runningTasks.length}</Typography>
                <Typography>Scheduled: {scheduledTasks.length}</Typography>
                <Typography>Failed: {failedTasks.length}</Typography>
              </Box>
            </CardContent>
          </Card>
        </Grid>
        <Grid item xs={12} md={6}>
          <Card>
            <CardContent>
              <Typography variant="h6" gutterBottom>
                System Status
              </Typography>
              <Box sx={{ display: 'flex', justifyContent: 'space-between' }}>
                <Chip label="Scheduler Active" color="success" />
                <Chip label={`${runningTasks.length} Running`} color="warning" />
                <Chip label={`${alerts.length} Alerts`} color="error" />
              </Box>
            </CardContent>
          </Card>
        </Grid>
      </Grid>

      {/* Task Tables */}
      <Grid container spacing={3}>
        <Grid item xs={12} md={6}>
          <Card>
            <CardContent>
              <Typography variant="h6" gutterBottom>
                Running Tasks
              </Typography>
              <DataGrid
                rows={runningTasks}
                columns={taskColumns}
                pageSize={5}
                autoHeight
                disableSelectionOnClick
              />
            </CardContent>
          </Card>
        </Grid>
        <Grid item xs={12} md={6}>
          <Card>
            <CardContent>
              <Typography variant="h6" gutterBottom>
                Scheduled Tasks
              </Typography>
              <DataGrid
                rows={scheduledTasks}
                columns={taskColumns}
                pageSize={5}
                autoHeight
                disableSelectionOnClick
              />
            </CardContent>
          </Card>
        </Grid>
        <Grid item xs={12}>
          <Card>
            <CardContent>
              <Typography variant="h6" gutterBottom>
                Task History
              </Typography>
              <DataGrid
                rows={taskHistory}
                columns={taskColumns}
                pageSize={10}
                autoHeight
                disableSelectionOnClick
              />
            </CardContent>
          </Card>
        </Grid>
      </Grid>

      {/* Submit Task Dialog */}
      <Dialog open={submitDialogOpen} onClose={() => setSubmitDialogOpen(false)} maxWidth="md" fullWidth>
        <DialogTitle>Submit New Task</DialogTitle>
        <DialogContent>
          <Grid container spacing={2} sx={{ mt: 1 }}>
            <Grid item xs={12}>
              <TextField
                fullWidth
                label="Task Name"
                value={newTask.task_name}
                onChange={(e) => setNewTask({ ...newTask, task_name: e.target.value })}
              />
            </Grid>
            <Grid item xs={12}>
              <TextField
                fullWidth
                multiline
                rows={4}
                label="Payload (JSON)"
                value={JSON.stringify(newTask.payload, null, 2)}
                onChange={(e) => {
                  try {
                    setNewTask({ ...newTask, payload: JSON.parse(e.target.value) });
                  } catch (error) {
                    // Handle invalid JSON
                  }
                }}
              />
            </Grid>
            <Grid item xs={6}>
              <TextField
                fullWidth
                type="datetime-local"
                label="Trigger Time (Optional)"
                InputLabelProps={{ shrink: true }}
                onChange={(e) => setNewTask({ ...newTask, trigger_time: e.target.value })}
              />
            </Grid>
            <Grid item xs={6}>
              <FormControl fullWidth>
                <InputLabel>Priority</InputLabel>
                <Select
                  value={newTask.priority}
                  onChange={(e) => setNewTask({ ...newTask, priority: e.target.value as number })}
                >
                  <MenuItem value={1}>High (1)</MenuItem>
                  <MenuItem value={3}>Medium (3)</MenuItem>
                  <MenuItem value={5}>Normal (5)</MenuItem>
                  <MenuItem value={7}>Low (7)</MenuItem>
                  <MenuItem value={9}>Very Low (9)</MenuItem>
                </Select>
              </FormControl>
            </Grid>
          </Grid>
        </DialogContent>
        <DialogActions>
          <Button onClick={() => setSubmitDialogOpen(false)}>Cancel</Button>
          <Button onClick={() => submitTask(newTask)} variant="contained">
            Submit Task
          </Button>
        </DialogActions>
      </Dialog>
    </Box>
  );
};

export default GlobalSchedulerView;

Implementation Roadmap

Phase 1: Core Infrastructure (Weeks 1-2)

  • Set up microservice architecture
  • Implement task registry and basic scheduler
  • Create simple task executor with worker pool
  • Basic API endpoints for task submission and status

Phase 2: Advanced Features (Weeks 3-4)

  • Implement cron expression parser
  • Add event-driven task triggers
  • Build retry mechanism with exponential backoff
  • Create task monitoring and metrics collection

Phase 3: Reliability & Performance (Weeks 5-6)

  • Implement priority queues and task prioritization
  • Add distributed task execution capabilities
  • Build comprehensive error handling and alerting
  • Performance optimization and load balancing

Phase 4: Frontend & Integration (Weeks 7-8)

  • Develop comprehensive dashboard UI
  • Integrate with existing trading system modules
  • Add real-time monitoring and notifications
  • Performance testing and optimization

System Integration

Integration Points

  1. Backtesting Engine: Schedule backtesting jobs with different parameters
  2. Risk Management: Trigger risk scans and portfolio rebalancing
  3. Data Pipeline: Schedule data synchronization and processing tasks
  4. Strategy Deployment: Orchestrate strategy deployment workflows
  5. Account Management: Schedule settlement and reconciliation tasks

Event Integration

# Example: Integration with trading system events
async def register_trading_events():
    # Market data events
    task_scheduler.schedule_event_trigger(
        "data_sync", "market_data_update", 
        {"source": "real_time_feed"}, priority=1
    )

    # Risk events
    task_scheduler.schedule_event_trigger(
        "risk_scan", "position_change", 
        {"threshold": 1000000}, priority=2
    )

    # Performance events
    task_scheduler.schedule_event_trigger(
        "performance_report", "daily_close", 
        {"report_type": "daily_summary"}, priority=3
    )

Business Value

Operational Efficiency

  • Centralized Control: Unified management of all system tasks
  • Automated Workflows: Reduce manual intervention in routine operations
  • Resource Optimization: Efficient allocation of computing resources
  • Error Reduction: Automated retry mechanisms reduce manual error handling

Scalability & Reliability

  • Horizontal Scaling: Support for distributed task execution
  • Fault Tolerance: Automatic recovery from failures
  • Load Balancing: Intelligent distribution of task load
  • High Availability: Redundant execution paths

Monitoring & Compliance

  • Complete Audit Trail: Full visibility into task execution history
  • Performance Monitoring: Real-time metrics and alerting
  • Compliance Reporting: Detailed logs for regulatory requirements
  • Operational Transparency: Clear view of system operations

Development Productivity

  • Rapid Deployment: Quick setup of new automated workflows
  • Flexible Scheduling: Support for complex timing requirements
  • Event-Driven Architecture: Reactive system responses
  • Developer-Friendly: Simple API for task integration

Technical Value

Architecture Benefits

  • Microservice Integration: Seamless integration with existing services
  • Event-Driven Design: Reactive and scalable architecture
  • Priority Management: Intelligent task prioritization
  • Resource Management: Efficient use of system resources

Performance Features

  • Low Latency: Fast task scheduling and execution
  • High Throughput: Support for thousands of concurrent tasks
  • Memory Efficiency: Optimized data structures and caching
  • Network Optimization: Minimal overhead for distributed execution

Reliability Features

  • Automatic Recovery: Self-healing from failures
  • Data Consistency: ACID properties for critical operations
  • Backup & Recovery: Comprehensive disaster recovery
  • Monitoring & Alerting: Proactive issue detection

This Global Task Scheduling and Batch Processing System provides the foundation for reliable, scalable, and efficient orchestration of all trading system operations, enabling the platform to handle complex workflows with enterprise-grade reliability and performance.