62. Global Task Scheduling and Batch Processing System¶
Overview¶
The Global Task Scheduling and Batch Processing System provides unified orchestration for all asynchronous tasks across the trading system. This enterprise-grade workflow management platform enables centralized control of backtesting jobs, strategy deployments, account settlements, risk scans, data synchronization, and other critical operations.
Architecture Design¶
Microservice Structure¶
services/global-scheduler-center/
├── src/
│ ├── main.py
│ ├── registry/
│ │ ├── __init__.py
│ │ ├── task_registry.py
│ │ └── task_definition.py
│ ├── scheduler/
│ │ ├── __init__.py
│ │ ├── task_scheduler.py
│ │ ├── cron_scheduler.py
│ │ └── event_scheduler.py
│ ├── executor/
│ │ ├── __init__.py
│ │ ├── task_executor.py
│ │ ├── worker_pool.py
│ │ └── task_queue.py
│ ├── controller/
│ │ ├── __init__.py
│ │ ├── retry_controller.py
│ │ └── priority_controller.py
│ ├── monitor/
│ │ ├── __init__.py
│ │ ├── task_monitor.py
│ │ ├── performance_tracker.py
│ │ └── alert_manager.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── scheduler_api.py
│ │ ├── task_management_api.py
│ │ └── monitoring_api.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── task_models.py
│ │ └── execution_models.py
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── cron_parser.py
│ │ └── task_validator.py
│ ├── config.py
│ └── requirements.txt
├── tests/
│ ├── test_task_registry.py
│ ├── test_scheduler.py
│ ├── test_executor.py
│ └── test_api.py
├── Dockerfile
└── docker-compose.yml
Core Components¶
1. Task Registry (registry/task_registry.py)¶
from typing import Dict, Callable, Any
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class TaskDefinition:
name: str
handler: Callable
description: str
timeout: int = 300
retry_count: int = 3
priority: int = 5
tags: list = None
class TaskRegistry:
def __init__(self):
self.task_handlers: Dict[str, TaskDefinition] = {}
self.task_metadata: Dict[str, dict] = {}
def register_task(self, task_name: str, handler: Callable,
description: str = "", timeout: int = 300,
retry_count: int = 3, priority: int = 5,
tags: list = None) -> None:
"""Register a new task with the scheduler"""
task_def = TaskDefinition(
name=task_name,
handler=handler,
description=description,
timeout=timeout,
retry_count=retry_count,
priority=priority,
tags=tags or []
)
self.task_handlers[task_name] = task_def
def get_task(self, task_name: str) -> TaskDefinition:
"""Retrieve task definition"""
return self.task_handlers.get(task_name)
def list_tasks(self) -> Dict[str, TaskDefinition]:
"""List all registered tasks"""
return self.task_handlers.copy()
def unregister_task(self, task_name: str) -> bool:
"""Remove task from registry"""
if task_name in self.task_handlers:
del self.task_handlers[task_name]
return True
return False
# Global registry instance
task_registry = TaskRegistry()
2. Task Scheduler (scheduler/task_scheduler.py)¶
import asyncio
import heapq
from datetime import datetime, timedelta
from typing import List, Dict, Any
from dataclasses import dataclass, field
@dataclass
class ScheduledTask:
task_name: str
payload: Dict[str, Any]
trigger_time: datetime
priority: int = 5
task_id: str = None
retry_count: int = 0
max_retries: int = 3
tags: List[str] = field(default_factory=list)
def __lt__(self, other):
return (self.trigger_time, -self.priority) < (other.trigger_time, -other.priority)
class TaskScheduler:
def __init__(self):
self.scheduled_tasks: List[ScheduledTask] = []
self.event_triggers: Dict[str, List[ScheduledTask]] = {}
self.running_tasks: Dict[str, ScheduledTask] = {}
self.task_history: List[Dict] = []
async def schedule_task(self, task_name: str, payload: Dict[str, Any],
trigger_time: datetime = None,
priority: int = 5,
tags: List[str] = None) -> str:
"""Schedule a task for execution"""
task_id = f"{task_name}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
scheduled_task = ScheduledTask(
task_name=task_name,
payload=payload,
trigger_time=trigger_time or datetime.now(),
priority=priority,
task_id=task_id,
tags=tags or []
)
heapq.heappush(self.scheduled_tasks, scheduled_task)
return task_id
async def schedule_cron_task(self, task_name: str, cron_expression: str,
payload: Dict[str, Any], priority: int = 5) -> str:
"""Schedule a recurring task using cron expression"""
next_run = self._parse_cron_expression(cron_expression)
return await self.schedule_task(task_name, payload, next_run, priority)
async def schedule_event_trigger(self, task_name: str, event_type: str,
payload: Dict[str, Any], priority: int = 5) -> str:
"""Schedule task to be triggered by specific events"""
task_id = await self.schedule_task(task_name, payload, priority=priority)
if event_type not in self.event_triggers:
self.event_triggers[event_type] = []
self.event_triggers[event_type].append(
ScheduledTask(task_name=task_name, payload=payload,
trigger_time=datetime.now(), priority=priority,
task_id=task_id)
)
return task_id
async def trigger_event(self, event_type: str, event_data: Dict[str, Any]) -> List[str]:
"""Trigger all tasks associated with an event"""
triggered_tasks = []
if event_type in self.event_triggers:
for task in self.event_triggers[event_type]:
task.payload.update(event_data)
heapq.heappush(self.scheduled_tasks, task)
triggered_tasks.append(task.task_id)
return triggered_tasks
def get_next_task(self) -> ScheduledTask:
"""Get the next task to execute"""
if self.scheduled_tasks and self.scheduled_tasks[0].trigger_time <= datetime.now():
return heapq.heappop(self.scheduled_tasks)
return None
def cancel_task(self, task_id: str) -> bool:
"""Cancel a scheduled task"""
# Remove from scheduled tasks
self.scheduled_tasks = [t for t in self.scheduled_tasks if t.task_id != task_id]
heapq.heapify(self.scheduled_tasks)
# Remove from event triggers
for event_type in self.event_triggers:
self.event_triggers[event_type] = [
t for t in self.event_triggers[event_type] if t.task_id != task_id
]
return True
def _parse_cron_expression(self, cron_expression: str) -> datetime:
"""Parse cron expression and return next execution time"""
# Simplified cron parser - in production use croniter
parts = cron_expression.split()
if len(parts) != 5:
raise ValueError("Invalid cron expression")
# Calculate next run time based on current time
now = datetime.now()
# This is a simplified implementation
return now + timedelta(minutes=1)
# Global scheduler instance
task_scheduler = TaskScheduler()
3. Task Executor (executor/task_executor.py)¶
import asyncio
import logging
from typing import Dict, Any
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
class TaskExecutor:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self.running_tasks: Dict[str, asyncio.Task] = {}
self.logger = logging.getLogger(__name__)
async def execute_task(self, scheduled_task) -> Dict[str, Any]:
"""Execute a scheduled task"""
task_id = scheduled_task.task_id
task_name = scheduled_task.task_name
try:
# Get task handler from registry
task_def = task_registry.get_task(task_name)
if not task_def:
raise ValueError(f"Task {task_name} not found in registry")
# Mark task as running
self.running_tasks[task_id] = asyncio.current_task()
# Execute task
start_time = datetime.now()
if asyncio.iscoroutinefunction(task_def.handler):
result = await task_def.handler(scheduled_task.payload)
else:
# Run sync functions in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.thread_pool, task_def.handler, scheduled_task.payload
)
execution_time = (datetime.now() - start_time).total_seconds()
return {
"task_id": task_id,
"status": "success",
"result": result,
"execution_time": execution_time,
"completed_at": datetime.now()
}
except Exception as e:
self.logger.error(f"Task {task_id} failed: {str(e)}")
return {
"task_id": task_id,
"status": "failed",
"error": str(e),
"completed_at": datetime.now()
}
finally:
if task_id in self.running_tasks:
del self.running_tasks[task_id]
async def execute_batch(self, tasks: list) -> list:
"""Execute multiple tasks in parallel"""
semaphore = asyncio.Semaphore(self.max_workers)
async def execute_with_semaphore(task):
async with semaphore:
return await self.execute_task(task)
return await asyncio.gather(*[execute_with_semaphore(task) for task in tasks])
def cancel_task(self, task_id: str) -> bool:
"""Cancel a running task"""
if task_id in self.running_tasks:
self.running_tasks[task_id].cancel()
return True
return False
def get_running_tasks(self) -> Dict[str, Any]:
"""Get information about currently running tasks"""
return {
task_id: {
"task_name": task.task_name,
"started_at": task.start_time if hasattr(task, 'start_time') else None
}
for task_id, task in self.running_tasks.items()
}
# Global executor instance
task_executor = TaskExecutor()
4. Retry Controller (controller/retry_controller.py)¶
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any
class RetryController:
def __init__(self):
self.retry_policies: Dict[str, Dict] = {}
self.failed_tasks: Dict[str, Dict] = {}
def set_retry_policy(self, task_name: str, max_retries: int = 3,
backoff_factor: float = 2.0,
initial_delay: int = 60) -> None:
"""Set retry policy for a specific task type"""
self.retry_policies[task_name] = {
"max_retries": max_retries,
"backoff_factor": backoff_factor,
"initial_delay": initial_delay
}
async def handle_failed_task(self, task_result: Dict[str, Any]) -> bool:
"""Handle failed task and determine if retry is needed"""
task_id = task_result["task_id"]
task_name = task_result.get("task_name", "unknown")
# Get retry policy
policy = self.retry_policies.get(task_name, {
"max_retries": 3,
"backoff_factor": 2.0,
"initial_delay": 60
})
# Check if task should be retried
current_retries = self.failed_tasks.get(task_id, {}).get("retry_count", 0)
if current_retries < policy["max_retries"]:
# Calculate delay with exponential backoff
delay = policy["initial_delay"] * (policy["backoff_factor"] ** current_retries)
# Schedule retry
retry_time = datetime.now() + timedelta(seconds=delay)
# Create retry task
retry_task = ScheduledTask(
task_name=task_name,
payload=task_result.get("payload", {}),
trigger_time=retry_time,
task_id=f"{task_id}_retry_{current_retries + 1}",
retry_count=current_retries + 1,
max_retries=policy["max_retries"]
)
# Add to scheduler
heapq.heappush(task_scheduler.scheduled_tasks, retry_task)
# Update failed tasks tracking
self.failed_tasks[task_id] = {
"retry_count": current_retries + 1,
"last_failure": datetime.now(),
"error": task_result.get("error", "Unknown error")
}
return True
else:
# Max retries exceeded, mark as permanently failed
self.failed_tasks[task_id] = {
"retry_count": current_retries,
"status": "permanently_failed",
"last_failure": datetime.now(),
"error": task_result.get("error", "Unknown error")
}
return False
def get_failed_tasks(self) -> Dict[str, Dict]:
"""Get information about failed tasks"""
return self.failed_tasks.copy()
def clear_failed_task(self, task_id: str) -> bool:
"""Clear failed task from tracking"""
if task_id in self.failed_tasks:
del self.failed_tasks[task_id]
return True
return False
# Global retry controller instance
retry_controller = RetryController()
5. Task Monitor (monitor/task_monitor.py)¶
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Any
from collections import defaultdict
class TaskMonitor:
def __init__(self):
self.task_history: List[Dict] = []
self.performance_metrics: Dict[str, List] = defaultdict(list)
self.alerts: List[Dict] = []
self.max_history_size = 10000
async def track_task_execution(self, task_result: Dict[str, Any]) -> None:
"""Track task execution result"""
# Add timestamp if not present
if "timestamp" not in task_result:
task_result["timestamp"] = datetime.now()
# Add to history
self.task_history.append(task_result)
# Maintain history size
if len(self.task_history) > self.max_history_size:
self.task_history = self.task_history[-self.max_history_size:]
# Update performance metrics
task_name = task_result.get("task_name", "unknown")
if "execution_time" in task_result:
self.performance_metrics[task_name].append(task_result["execution_time"])
# Check for alerts
await self._check_alerts(task_result)
async def _check_alerts(self, task_result: Dict[str, Any]) -> None:
"""Check if task result triggers any alerts"""
# Long execution time alert
if task_result.get("execution_time", 0) > 300: # 5 minutes
self.alerts.append({
"type": "long_execution",
"task_id": task_result["task_id"],
"task_name": task_result.get("task_name", "unknown"),
"execution_time": task_result["execution_time"],
"timestamp": datetime.now()
})
# Failed task alert
if task_result.get("status") == "failed":
self.alerts.append({
"type": "task_failure",
"task_id": task_result["task_id"],
"task_name": task_result.get("task_name", "unknown"),
"error": task_result.get("error", "Unknown error"),
"timestamp": datetime.now()
})
def get_task_history(self, task_name: str = None,
status: str = None,
limit: int = 100) -> List[Dict]:
"""Get task execution history with filters"""
filtered_history = self.task_history
if task_name:
filtered_history = [t for t in filtered_history if t.get("task_name") == task_name]
if status:
filtered_history = [t for t in filtered_history if t.get("status") == status]
return filtered_history[-limit:]
def get_performance_metrics(self, task_name: str = None) -> Dict[str, Any]:
"""Get performance metrics for tasks"""
if task_name:
metrics = self.performance_metrics.get(task_name, [])
else:
metrics = {name: data for name, data in self.performance_metrics.items()}
if not metrics:
return {}
if isinstance(metrics, list):
return {
"count": len(metrics),
"avg_execution_time": sum(metrics) / len(metrics),
"min_execution_time": min(metrics),
"max_execution_time": max(metrics)
}
else:
return {
name: {
"count": len(data),
"avg_execution_time": sum(data) / len(data),
"min_execution_time": min(data),
"max_execution_time": max(data)
}
for name, data in metrics.items()
}
def get_alerts(self, alert_type: str = None,
since: datetime = None) -> List[Dict]:
"""Get alerts with optional filtering"""
filtered_alerts = self.alerts
if alert_type:
filtered_alerts = [a for a in filtered_alerts if a["type"] == alert_type]
if since:
filtered_alerts = [a for a in filtered_alerts if a["timestamp"] >= since]
return filtered_alerts
def clear_alerts(self, alert_type: str = None) -> int:
"""Clear alerts and return count of cleared alerts"""
if alert_type:
original_count = len(self.alerts)
self.alerts = [a for a in self.alerts if a["type"] != alert_type]
return original_count - len(self.alerts)
else:
cleared_count = len(self.alerts)
self.alerts.clear()
return cleared_count
# Global monitor instance
task_monitor = TaskMonitor()
API Design¶
Scheduler API (api/scheduler_api.py)¶
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from datetime import datetime
router = APIRouter(prefix="/api/v1/scheduler", tags=["Task Scheduler"])
class TaskRequest(BaseModel):
task_name: str
payload: Dict[str, Any] = {}
trigger_time: Optional[datetime] = None
priority: int = 5
tags: List[str] = []
class CronTaskRequest(BaseModel):
task_name: str
cron_expression: str
payload: Dict[str, Any] = {}
priority: int = 5
class EventTriggerRequest(BaseModel):
task_name: str
event_type: str
payload: Dict[str, Any] = {}
priority: int = 5
@router.post("/task/submit")
async def submit_task(request: TaskRequest):
"""Submit a task for immediate or scheduled execution"""
try:
task_id = await task_scheduler.schedule_task(
request.task_name,
request.payload,
request.trigger_time,
request.priority,
request.tags
)
return {"task_id": task_id, "status": "scheduled"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/task/cron")
async def submit_cron_task(request: CronTaskRequest):
"""Submit a recurring task using cron expression"""
try:
task_id = await task_scheduler.schedule_cron_task(
request.task_name,
request.cron_expression,
request.payload,
request.priority
)
return {"task_id": task_id, "status": "scheduled"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/task/event-trigger")
async def submit_event_trigger(request: EventTriggerRequest):
"""Submit a task to be triggered by specific events"""
try:
task_id = await task_scheduler.schedule_event_trigger(
request.task_name,
request.event_type,
request.payload,
request.priority
)
return {"task_id": task_id, "status": "registered"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/event/trigger/{event_type}")
async def trigger_event(event_type: str, event_data: Dict[str, Any]):
"""Trigger all tasks associated with an event"""
try:
triggered_tasks = await task_scheduler.trigger_event(event_type, event_data)
return {"triggered_tasks": triggered_tasks, "count": len(triggered_tasks)}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/task/status/{task_id}")
async def get_task_status(task_id: str):
"""Get status of a specific task"""
# Check running tasks
if task_id in task_executor.running_tasks:
return {"task_id": task_id, "status": "running"}
# Check history
history = task_monitor.get_task_history(limit=1000)
for record in history:
if record.get("task_id") == task_id:
return record
raise HTTPException(status_code=404, detail="Task not found")
@router.delete("/task/cancel/{task_id}")
async def cancel_task(task_id: str):
"""Cancel a scheduled or running task"""
try:
# Cancel from scheduler
scheduler_cancelled = task_scheduler.cancel_task(task_id)
# Cancel from executor
executor_cancelled = task_executor.cancel_task(task_id)
if scheduler_cancelled or executor_cancelled:
return {"task_id": task_id, "status": "cancelled"}
else:
raise HTTPException(status_code=404, detail="Task not found")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/tasks/running")
async def get_running_tasks():
"""Get all currently running tasks"""
return task_executor.get_running_tasks()
@router.get("/tasks/scheduled")
async def get_scheduled_tasks():
"""Get all scheduled tasks"""
return [{"task_id": t.task_id, "task_name": t.task_name,
"trigger_time": t.trigger_time, "priority": t.priority}
for t in task_scheduler.scheduled_tasks]
@router.get("/tasks/failed")
async def get_failed_tasks():
"""Get all failed tasks"""
return retry_controller.get_failed_tasks()
@router.get("/tasks/history")
async def get_task_history(task_name: str = None, status: str = None, limit: int = 100):
"""Get task execution history"""
return task_monitor.get_task_history(task_name, status, limit)
@router.get("/metrics/performance")
async def get_performance_metrics(task_name: str = None):
"""Get performance metrics for tasks"""
return task_monitor.get_performance_metrics(task_name)
@router.get("/alerts")
async def get_alerts(alert_type: str = None, since: str = None):
"""Get system alerts"""
since_dt = None
if since:
since_dt = datetime.fromisoformat(since)
return task_monitor.get_alerts(alert_type, since_dt)
@router.delete("/alerts/clear")
async def clear_alerts(alert_type: str = None):
"""Clear alerts"""
cleared_count = task_monitor.clear_alerts(alert_type)
return {"cleared_count": cleared_count}
Frontend Integration¶
Global Scheduler Dashboard (frontend/src/components/GlobalSchedulerView.tsx)¶
import React, { useState, useEffect } from 'react';
import {
Box,
Grid,
Card,
CardContent,
Typography,
Button,
TextField,
Select,
MenuItem,
FormControl,
InputLabel,
Table,
TableBody,
TableCell,
TableContainer,
TableHead,
TableRow,
Paper,
Chip,
Alert,
Dialog,
DialogTitle,
DialogContent,
DialogActions,
IconButton,
Tooltip
} from '@mui/material';
import {
PlayArrow,
Stop,
Schedule,
History,
TrendingUp,
Warning,
Refresh,
Add,
Delete
} from '@mui/icons-material';
import { DataGrid, GridColDef } from '@mui/x-data-grid';
interface Task {
task_id: string;
task_name: string;
status: string;
trigger_time?: string;
priority: number;
execution_time?: number;
error?: string;
timestamp?: string;
}
interface TaskRequest {
task_name: string;
payload: Record<string, any>;
trigger_time?: string;
priority: number;
tags: string[];
}
const GlobalSchedulerView: React.FC = () => {
const [tasks, setTasks] = useState<Task[]>([]);
const [runningTasks, setRunningTasks] = useState<Task[]>([]);
const [scheduledTasks, setScheduledTasks] = useState<Task[]>([]);
const [failedTasks, setFailedTasks] = useState<Task[]>([]);
const [taskHistory, setTaskHistory] = useState<Task[]>([]);
const [performanceMetrics, setPerformanceMetrics] = useState<any>({});
const [alerts, setAlerts] = useState<any[]>([]);
const [loading, setLoading] = useState(false);
const [submitDialogOpen, setSubmitDialogOpen] = useState(false);
const [cronDialogOpen, setCronDialogOpen] = useState(false);
const [newTask, setNewTask] = useState<TaskRequest>({
task_name: '',
payload: {},
priority: 5,
tags: []
});
// Task columns for DataGrid
const taskColumns: GridColDef[] = [
{ field: 'task_id', headerName: 'Task ID', width: 200 },
{ field: 'task_name', headerName: 'Task Name', width: 150 },
{ field: 'status', headerName: 'Status', width: 120,
renderCell: (params) => (
<Chip
label={params.value}
color={
params.value === 'success' ? 'success' :
params.value === 'failed' ? 'error' :
params.value === 'running' ? 'warning' : 'default'
}
size="small"
/>
)
},
{ field: 'priority', headerName: 'Priority', width: 100 },
{ field: 'execution_time', headerName: 'Execution Time (s)', width: 150,
renderCell: (params) => params.value ? `${params.value.toFixed(2)}s` : '-'
},
{ field: 'timestamp', headerName: 'Timestamp', width: 200,
renderCell: (params) => params.value ? new Date(params.value).toLocaleString() : '-'
},
{ field: 'actions', headerName: 'Actions', width: 120,
renderCell: (params) => (
<Box>
{params.row.status === 'running' && (
<Tooltip title="Cancel Task">
<IconButton
size="small"
onClick={() => cancelTask(params.row.task_id)}
color="error"
>
<Stop />
</IconButton>
</Tooltip>
)}
</Box>
)
}
];
// Load data
const loadData = async () => {
setLoading(true);
try {
const [running, scheduled, failed, history, metrics, alertsData] = await Promise.all([
fetch('/api/v1/scheduler/tasks/running').then(r => r.json()),
fetch('/api/v1/scheduler/tasks/scheduled').then(r => r.json()),
fetch('/api/v1/scheduler/tasks/failed').then(r => r.json()),
fetch('/api/v1/scheduler/tasks/history').then(r => r.json()),
fetch('/api/v1/scheduler/metrics/performance').then(r => r.json()),
fetch('/api/v1/scheduler/alerts').then(r => r.json())
]);
setRunningTasks(Object.values(running));
setScheduledTasks(scheduled);
setFailedTasks(Object.values(failed));
setTaskHistory(history);
setPerformanceMetrics(metrics);
setAlerts(alertsData);
} catch (error) {
console.error('Error loading data:', error);
} finally {
setLoading(false);
}
};
// Submit task
const submitTask = async (taskData: TaskRequest) => {
try {
const response = await fetch('/api/v1/scheduler/task/submit', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(taskData)
});
if (response.ok) {
setSubmitDialogOpen(false);
setNewTask({ task_name: '', payload: {}, priority: 5, tags: [] });
loadData();
}
} catch (error) {
console.error('Error submitting task:', error);
}
};
// Cancel task
const cancelTask = async (taskId: string) => {
try {
await fetch(`/api/v1/scheduler/task/cancel/${taskId}`, {
method: 'DELETE'
});
loadData();
} catch (error) {
console.error('Error cancelling task:', error);
}
};
// Trigger event
const triggerEvent = async (eventType: string, eventData: Record<string, any>) => {
try {
await fetch(`/api/v1/scheduler/event/trigger/${eventType}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(eventData)
});
loadData();
} catch (error) {
console.error('Error triggering event:', error);
}
};
useEffect(() => {
loadData();
const interval = setInterval(loadData, 5000); // Refresh every 5 seconds
return () => clearInterval(interval);
}, []);
return (
<Box sx={{ p: 3 }}>
<Box sx={{ display: 'flex', justifyContent: 'space-between', mb: 3 }}>
<Typography variant="h4">Global Task Scheduler</Typography>
<Box>
<Button
variant="contained"
startIcon={<Add />}
onClick={() => setSubmitDialogOpen(true)}
sx={{ mr: 1 }}
>
Submit Task
</Button>
<Button
variant="outlined"
startIcon={<Schedule />}
onClick={() => setCronDialogOpen(true)}
sx={{ mr: 1 }}
>
Cron Task
</Button>
<Button
variant="outlined"
startIcon={<Refresh />}
onClick={loadData}
>
Refresh
</Button>
</Box>
</Box>
{/* Alerts */}
{alerts.length > 0 && (
<Box sx={{ mb: 3 }}>
{alerts.map((alert, index) => (
<Alert key={index} severity="warning" sx={{ mb: 1 }}>
{alert.type}: {alert.task_name} - {alert.error || 'Task failed'}
</Alert>
))}
</Box>
)}
{/* Performance Metrics */}
<Grid container spacing={3} sx={{ mb: 3 }}>
<Grid item xs={12} md={6}>
<Card>
<CardContent>
<Typography variant="h6" gutterBottom>
Performance Metrics
</Typography>
<Box sx={{ display: 'flex', justifyContent: 'space-between' }}>
<Typography>Total Tasks: {taskHistory.length}</Typography>
<Typography>Running: {runningTasks.length}</Typography>
<Typography>Scheduled: {scheduledTasks.length}</Typography>
<Typography>Failed: {failedTasks.length}</Typography>
</Box>
</CardContent>
</Card>
</Grid>
<Grid item xs={12} md={6}>
<Card>
<CardContent>
<Typography variant="h6" gutterBottom>
System Status
</Typography>
<Box sx={{ display: 'flex', justifyContent: 'space-between' }}>
<Chip label="Scheduler Active" color="success" />
<Chip label={`${runningTasks.length} Running`} color="warning" />
<Chip label={`${alerts.length} Alerts`} color="error" />
</Box>
</CardContent>
</Card>
</Grid>
</Grid>
{/* Task Tables */}
<Grid container spacing={3}>
<Grid item xs={12} md={6}>
<Card>
<CardContent>
<Typography variant="h6" gutterBottom>
Running Tasks
</Typography>
<DataGrid
rows={runningTasks}
columns={taskColumns}
pageSize={5}
autoHeight
disableSelectionOnClick
/>
</CardContent>
</Card>
</Grid>
<Grid item xs={12} md={6}>
<Card>
<CardContent>
<Typography variant="h6" gutterBottom>
Scheduled Tasks
</Typography>
<DataGrid
rows={scheduledTasks}
columns={taskColumns}
pageSize={5}
autoHeight
disableSelectionOnClick
/>
</CardContent>
</Card>
</Grid>
<Grid item xs={12}>
<Card>
<CardContent>
<Typography variant="h6" gutterBottom>
Task History
</Typography>
<DataGrid
rows={taskHistory}
columns={taskColumns}
pageSize={10}
autoHeight
disableSelectionOnClick
/>
</CardContent>
</Card>
</Grid>
</Grid>
{/* Submit Task Dialog */}
<Dialog open={submitDialogOpen} onClose={() => setSubmitDialogOpen(false)} maxWidth="md" fullWidth>
<DialogTitle>Submit New Task</DialogTitle>
<DialogContent>
<Grid container spacing={2} sx={{ mt: 1 }}>
<Grid item xs={12}>
<TextField
fullWidth
label="Task Name"
value={newTask.task_name}
onChange={(e) => setNewTask({ ...newTask, task_name: e.target.value })}
/>
</Grid>
<Grid item xs={12}>
<TextField
fullWidth
multiline
rows={4}
label="Payload (JSON)"
value={JSON.stringify(newTask.payload, null, 2)}
onChange={(e) => {
try {
setNewTask({ ...newTask, payload: JSON.parse(e.target.value) });
} catch (error) {
// Handle invalid JSON
}
}}
/>
</Grid>
<Grid item xs={6}>
<TextField
fullWidth
type="datetime-local"
label="Trigger Time (Optional)"
InputLabelProps={{ shrink: true }}
onChange={(e) => setNewTask({ ...newTask, trigger_time: e.target.value })}
/>
</Grid>
<Grid item xs={6}>
<FormControl fullWidth>
<InputLabel>Priority</InputLabel>
<Select
value={newTask.priority}
onChange={(e) => setNewTask({ ...newTask, priority: e.target.value as number })}
>
<MenuItem value={1}>High (1)</MenuItem>
<MenuItem value={3}>Medium (3)</MenuItem>
<MenuItem value={5}>Normal (5)</MenuItem>
<MenuItem value={7}>Low (7)</MenuItem>
<MenuItem value={9}>Very Low (9)</MenuItem>
</Select>
</FormControl>
</Grid>
</Grid>
</DialogContent>
<DialogActions>
<Button onClick={() => setSubmitDialogOpen(false)}>Cancel</Button>
<Button onClick={() => submitTask(newTask)} variant="contained">
Submit Task
</Button>
</DialogActions>
</Dialog>
</Box>
);
};
export default GlobalSchedulerView;
Implementation Roadmap¶
Phase 1: Core Infrastructure (Weeks 1-2)¶
- Set up microservice architecture
- Implement task registry and basic scheduler
- Create simple task executor with worker pool
- Basic API endpoints for task submission and status
Phase 2: Advanced Features (Weeks 3-4)¶
- Implement cron expression parser
- Add event-driven task triggers
- Build retry mechanism with exponential backoff
- Create task monitoring and metrics collection
Phase 3: Reliability & Performance (Weeks 5-6)¶
- Implement priority queues and task prioritization
- Add distributed task execution capabilities
- Build comprehensive error handling and alerting
- Performance optimization and load balancing
Phase 4: Frontend & Integration (Weeks 7-8)¶
- Develop comprehensive dashboard UI
- Integrate with existing trading system modules
- Add real-time monitoring and notifications
- Performance testing and optimization
System Integration¶
Integration Points¶
- Backtesting Engine: Schedule backtesting jobs with different parameters
- Risk Management: Trigger risk scans and portfolio rebalancing
- Data Pipeline: Schedule data synchronization and processing tasks
- Strategy Deployment: Orchestrate strategy deployment workflows
- Account Management: Schedule settlement and reconciliation tasks
Event Integration¶
# Example: Integration with trading system events
async def register_trading_events():
# Market data events
task_scheduler.schedule_event_trigger(
"data_sync", "market_data_update",
{"source": "real_time_feed"}, priority=1
)
# Risk events
task_scheduler.schedule_event_trigger(
"risk_scan", "position_change",
{"threshold": 1000000}, priority=2
)
# Performance events
task_scheduler.schedule_event_trigger(
"performance_report", "daily_close",
{"report_type": "daily_summary"}, priority=3
)
Business Value¶
Operational Efficiency¶
- Centralized Control: Unified management of all system tasks
- Automated Workflows: Reduce manual intervention in routine operations
- Resource Optimization: Efficient allocation of computing resources
- Error Reduction: Automated retry mechanisms reduce manual error handling
Scalability & Reliability¶
- Horizontal Scaling: Support for distributed task execution
- Fault Tolerance: Automatic recovery from failures
- Load Balancing: Intelligent distribution of task load
- High Availability: Redundant execution paths
Monitoring & Compliance¶
- Complete Audit Trail: Full visibility into task execution history
- Performance Monitoring: Real-time metrics and alerting
- Compliance Reporting: Detailed logs for regulatory requirements
- Operational Transparency: Clear view of system operations
Development Productivity¶
- Rapid Deployment: Quick setup of new automated workflows
- Flexible Scheduling: Support for complex timing requirements
- Event-Driven Architecture: Reactive system responses
- Developer-Friendly: Simple API for task integration
Technical Value¶
Architecture Benefits¶
- Microservice Integration: Seamless integration with existing services
- Event-Driven Design: Reactive and scalable architecture
- Priority Management: Intelligent task prioritization
- Resource Management: Efficient use of system resources
Performance Features¶
- Low Latency: Fast task scheduling and execution
- High Throughput: Support for thousands of concurrent tasks
- Memory Efficiency: Optimized data structures and caching
- Network Optimization: Minimal overhead for distributed execution
Reliability Features¶
- Automatic Recovery: Self-healing from failures
- Data Consistency: ACID properties for critical operations
- Backup & Recovery: Comprehensive disaster recovery
- Monitoring & Alerting: Proactive issue detection
This Global Task Scheduling and Batch Processing System provides the foundation for reliable, scalable, and efficient orchestration of all trading system operations, enabling the platform to handle complex workflows with enterprise-grade reliability and performance.