46. Cross-Market Arbitrage Engine¶
Overview¶
The Cross-Market Arbitrage Engine provides real-time monitoring and execution of arbitrage opportunities across multiple exchanges and markets for the same or related assets. The system identifies price discrepancies between markets, calculates potential profits after transaction costs, and executes simultaneous buy/sell orders to capture risk-free or low-risk arbitrage profits. It supports single-asset arbitrage, statistical arbitrage, and multi-exchange execution with comprehensive risk management.
Core Capabilities¶
- Multi-Market Monitoring: Real-time price monitoring across multiple exchanges
- Arbitrage Detection: Automatic identification of profitable arbitrage opportunities
- Simultaneous Execution: Coordinated buy/sell orders across markets
- Risk Management: Comprehensive risk controls for arbitrage operations
- Cost Analysis: Transaction cost and slippage estimation
- Performance Tracking: Real-time arbitrage performance monitoring
System Architecture¶
Microservice: arbitrage-engine¶
services/arbitrage-engine/
├── src/
│ ├── main.py
│ ├── market/
│ │ ├── market_syncer.py
│ │ ├── exchange_connector.py
│ │ └── data_validator.py
│ ├── calculator/
│ │ ├── spread_calculator.py
│ │ ├── profit_calculator.py
│ │ └── cost_analyzer.py
│ ├── detector/
│ │ ├── arbitrage_detector.py
│ │ ├── opportunity_filter.py
│ │ └── threshold_manager.py
│ ├── executor/
│ │ ├── arbitrage_executor.py
│ │ ├── order_coordinator.py
│ │ └── execution_strategy.py
│ ├── monitor/
│ │ ├── execution_monitor.py
│ │ ├── position_tracker.py
│ │ └── profit_tracker.py
│ ├── risk/
│ │ ├── arbitrage_risk_manager.py
│ │ ├── liquidity_checker.py
│ │ └── exposure_controller.py
│ ├── api/
│ │ ├── arbitrage_api.py
│ ├── config.py
│ └── requirements.txt
├── Dockerfile
└── tests/
Core Components¶
1. Market Syncer¶
Synchronizes real-time market data across exchanges:
import asyncio
from typing import Dict, List, Tuple
from datetime import datetime
class MarketSyncer:
def __init__(self, exchange_connectors: Dict):
self.exchange_connectors = exchange_connectors
self.market_data = {}
self.last_update = {}
self.data_quality = {}
async def fetch_market_prices(self, symbols: List[str],
exchanges: List[str]) -> Dict:
"""Fetch real-time prices from multiple exchanges"""
prices = {}
tasks = []
# Create tasks for all exchange-symbol combinations
for exchange in exchanges:
for symbol in symbols:
task = self.fetch_single_price(exchange, symbol)
tasks.append((exchange, symbol, task))
# Execute all tasks concurrently
for exchange, symbol, task in tasks:
try:
price_data = await task
prices[(exchange, symbol)] = price_data
self.last_update[(exchange, symbol)] = datetime.now()
except Exception as e:
print(f"Error fetching {symbol} from {exchange}: {e}")
# Use cached data if available
if (exchange, symbol) in self.market_data:
prices[(exchange, symbol)] = self.market_data[(exchange, symbol)]
# Update market data cache
self.market_data.update(prices)
return prices
async def fetch_single_price(self, exchange: str, symbol: str) -> Dict:
"""Fetch price data from a single exchange"""
if exchange not in self.exchange_connectors:
raise ValueError(f"Exchange {exchange} not configured")
connector = self.exchange_connectors[exchange]
# Fetch order book and ticker data
order_book = await connector.get_order_book(symbol)
ticker = await connector.get_ticker(symbol)
# Calculate mid price
best_bid = order_book['bids'][0]['price'] if order_book['bids'] else ticker['last']
best_ask = order_book['asks'][0]['price'] if order_book['asks'] else ticker['last']
mid_price = (best_bid + best_ask) / 2
price_data = {
"exchange": exchange,
"symbol": symbol,
"timestamp": datetime.now().isoformat(),
"mid_price": mid_price,
"best_bid": best_bid,
"best_ask": best_ask,
"bid_size": order_book['bids'][0]['size'] if order_book['bids'] else 0,
"ask_size": order_book['asks'][0]['size'] if order_book['asks'] else 0,
"last_price": ticker['last'],
"volume_24h": ticker.get('volume_24h', 0),
"spread": (best_ask - best_bid) / mid_price if mid_price > 0 else 0
}
# Validate data quality
self.validate_price_data(price_data)
return price_data
def validate_price_data(self, price_data: Dict):
"""Validate price data quality"""
# Check for reasonable price values
if price_data['mid_price'] <= 0:
raise ValueError(f"Invalid mid price: {price_data['mid_price']}")
# Check for reasonable spread
if price_data['spread'] > 0.1: # 10% spread is suspicious
print(f"Warning: Large spread detected: {price_data['spread']}")
# Check for stale data
if 'timestamp' in price_data:
update_time = datetime.fromisoformat(price_data['timestamp'])
if (datetime.now() - update_time).total_seconds() > 30:
print(f"Warning: Stale data from {price_data['exchange']}")
def get_data_quality_score(self, exchange: str, symbol: str) -> float:
"""Calculate data quality score for an exchange-symbol pair"""
if (exchange, symbol) not in self.last_update:
return 0.0
# Check data freshness
time_since_update = (datetime.now() - self.last_update[(exchange, symbol)]).total_seconds()
freshness_score = max(0, 1 - time_since_update / 60) # Decay over 1 minute
# Check spread quality
if (exchange, symbol) in self.market_data:
spread = self.market_data[(exchange, symbol)].get('spread', 1.0)
spread_score = max(0, 1 - spread * 10) # Better score for tighter spreads
else:
spread_score = 0.0
# Combined quality score
quality_score = (freshness_score * 0.7 + spread_score * 0.3)
return quality_score
2. Spread Calculator¶
Calculates price spreads between markets:
class SpreadCalculator:
def __init__(self, min_spread_threshold=0.001, max_spread_threshold=0.1):
self.min_spread_threshold = min_spread_threshold
self.max_spread_threshold = max_spread_threshold
self.spread_history = {}
def calculate_spreads(self, prices: Dict) -> Dict:
"""Calculate spreads between all exchange pairs for each symbol"""
spreads = {}
# Group prices by symbol
symbols = set()
for (exchange, symbol) in prices.keys():
symbols.add(symbol)
for symbol in symbols:
symbol_prices = {}
for (exchange, sym), price_data in prices.items():
if sym == symbol:
symbol_prices[exchange] = price_data
# Calculate spreads between all exchange pairs
exchanges = list(symbol_prices.keys())
for i in range(len(exchanges)):
for j in range(i + 1, len(exchanges)):
ex1, ex2 = exchanges[i], exchanges[j]
spread_data = self.calculate_pair_spread(
symbol_prices[ex1], symbol_prices[ex2]
)
if spread_data:
spreads[(ex1, ex2, symbol)] = spread_data
return spreads
def calculate_pair_spread(self, price1: Dict, price2: Dict) -> Dict:
"""Calculate spread between two exchanges for the same symbol"""
# Calculate bid-ask spreads
spread1 = (price1['best_ask'] - price1['best_bid']) / price1['mid_price']
spread2 = (price2['best_ask'] - price2['best_bid']) / price2['mid_price']
# Calculate arbitrage spreads
# Buy on exchange1, sell on exchange2
spread_buy1_sell2 = (price2['best_bid'] - price1['best_ask']) / price1['best_ask']
# Buy on exchange2, sell on exchange1
spread_buy2_sell1 = (price1['best_bid'] - price2['best_ask']) / price2['best_ask']
# Determine profitable direction
if spread_buy1_sell2 > spread_buy2_sell1 and spread_buy1_sell2 > 0:
profitable_direction = "buy1_sell2"
profitable_spread = spread_buy1_sell2
elif spread_buy2_sell1 > 0:
profitable_direction = "buy2_sell1"
profitable_spread = spread_buy2_sell1
else:
profitable_direction = None
profitable_spread = 0
spread_data = {
"symbol": price1['symbol'],
"exchange1": price1['exchange'],
"exchange2": price2['exchange'],
"timestamp": datetime.now().isoformat(),
"price1_mid": price1['mid_price'],
"price2_mid": price2['mid_price'],
"price1_bid": price1['best_bid'],
"price1_ask": price1['best_ask'],
"price2_bid": price2['best_bid'],
"price2_ask": price2['best_ask'],
"spread1": spread1,
"spread2": spread2,
"spread_buy1_sell2": spread_buy1_sell2,
"spread_buy2_sell1": spread_buy2_sell1,
"profitable_direction": profitable_direction,
"profitable_spread": profitable_spread,
"is_profitable": profitable_spread > self.min_spread_threshold
}
# Store spread history
key = (price1['exchange'], price2['exchange'], price1['symbol'])
if key not in self.spread_history:
self.spread_history[key] = []
self.spread_history[key].append(spread_data)
# Keep only recent history
if len(self.spread_history[key]) > 1000:
self.spread_history[key] = self.spread_history[key][-1000:]
return spread_data
3. Arbitrage Detector¶
Detects profitable arbitrage opportunities:
class ArbitrageDetector:
def __init__(self, min_profit_threshold=0.002, max_position_size=1000):
self.min_profit_threshold = min_profit_threshold
self.max_position_size = max_position_size
self.opportunities = []
self.opportunity_history = []
def detect_opportunities(self, spreads: Dict,
transaction_costs: Dict) -> List[Dict]:
"""Detect profitable arbitrage opportunities"""
opportunities = []
for (ex1, ex2, symbol), spread_data in spreads.items():
if not spread_data['is_profitable']:
continue
# Calculate net profit after transaction costs
net_profit = self.calculate_net_profit(spread_data, transaction_costs)
if net_profit > self.min_profit_threshold:
opportunity = self.create_opportunity(spread_data, net_profit)
opportunities.append(opportunity)
# Sort opportunities by profit
opportunities.sort(key=lambda x: x['net_profit'], reverse=True)
# Update current opportunities
self.opportunities = opportunities
return opportunities
def calculate_net_profit(self, spread_data: Dict,
transaction_costs: Dict) -> float:
"""Calculate net profit after transaction costs"""
gross_spread = spread_data['profitable_spread']
# Get transaction costs for both exchanges
ex1 = spread_data['exchange1']
ex2 = spread_data['exchange2']
cost1 = transaction_costs.get(ex1, {}).get('commission', 0.001)
cost2 = transaction_costs.get(ex2, {}).get('commission', 0.001)
# Estimate slippage costs
slippage1 = self.estimate_slippage(spread_data, 'exchange1')
slippage2 = self.estimate_slippage(spread_data, 'exchange2')
# Total transaction costs
total_costs = cost1 + cost2 + slippage1 + slippage2
# Net profit
net_profit = gross_spread - total_costs
return net_profit
def estimate_slippage(self, spread_data: Dict, exchange: str) -> float:
"""Estimate slippage cost for an exchange"""
if exchange == spread_data['exchange1']:
bid_size = spread_data['price1_bid_size']
ask_size = spread_data['price1_ask_size']
else:
bid_size = spread_data['price2_bid_size']
ask_size = spread_data['price2_ask_size']
# Simple slippage model based on order book depth
min_size = min(bid_size, ask_size)
if min_size > 0:
# Slippage increases with order size relative to available liquidity
slippage = min(0.001, 1.0 / min_size) # Cap at 0.1%
else:
slippage = 0.001 # Default slippage
return slippage
def create_opportunity(self, spread_data: Dict, net_profit: float) -> Dict:
"""Create arbitrage opportunity object"""
opportunity = {
"id": f"{spread_data['exchange1']}_{spread_data['exchange2']}_{spread_data['symbol']}_{int(datetime.now().timestamp())}",
"symbol": spread_data['symbol'],
"buy_exchange": spread_data['exchange1'] if spread_data['profitable_direction'] == "buy1_sell2" else spread_data['exchange2'],
"sell_exchange": spread_data['exchange2'] if spread_data['profitable_direction'] == "buy1_sell2" else spread_data['exchange1'],
"buy_price": spread_data['price1_ask'] if spread_data['profitable_direction'] == "buy1_sell2" else spread_data['price2_ask'],
"sell_price": spread_data['price2_bid'] if spread_data['profitable_direction'] == "buy1_sell2" else spread_data['price1_bid'],
"gross_spread": spread_data['profitable_spread'],
"net_profit": net_profit,
"timestamp": datetime.now().isoformat(),
"status": "detected",
"execution_status": "pending"
}
# Store in history
self.opportunity_history.append(opportunity)
# Keep only recent history
if len(self.opportunity_history) > 10000:
self.opportunity_history = self.opportunity_history[-10000:]
return opportunity
def get_opportunity_stats(self) -> Dict:
"""Get arbitrage opportunity statistics"""
if not self.opportunity_history:
return {
"total_opportunities": 0,
"avg_profit": 0.0,
"max_profit": 0.0,
"success_rate": 0.0
}
total_opportunities = len(self.opportunity_history)
executed_opportunities = [opp for opp in self.opportunity_history if opp['status'] == 'executed']
if executed_opportunities:
avg_profit = sum(opp['net_profit'] for opp in executed_opportunities) / len(executed_opportunities)
max_profit = max(opp['net_profit'] for opp in executed_opportunities)
success_rate = len(executed_opportunities) / total_opportunities
else:
avg_profit = 0.0
max_profit = 0.0
success_rate = 0.0
return {
"total_opportunities": total_opportunities,
"executed_opportunities": len(executed_opportunities),
"avg_profit": avg_profit,
"max_profit": max_profit,
"success_rate": success_rate
}
4. Arbitrage Executor¶
Executes arbitrage trades across exchanges:
class ArbitrageExecutor:
def __init__(self, exchange_connectors: Dict, risk_manager):
self.exchange_connectors = exchange_connectors
self.risk_manager = risk_manager
self.active_trades = {}
self.execution_history = []
async def execute_arbitrage(self, opportunity: Dict) -> Dict:
"""Execute arbitrage trade"""
# Validate opportunity with risk manager
if not self.risk_manager.validate_opportunity(opportunity):
return {
"status": "rejected",
"reason": "Risk validation failed",
"opportunity_id": opportunity['id']
}
try:
# Calculate optimal order size
order_size = self.calculate_optimal_size(opportunity)
# Place buy order
buy_order = await self.place_order(
opportunity['buy_exchange'],
opportunity['symbol'],
'buy',
order_size,
opportunity['buy_price']
)
# Place sell order
sell_order = await self.place_order(
opportunity['sell_exchange'],
opportunity['symbol'],
'sell',
order_size,
opportunity['sell_price']
)
# Create execution record
execution = {
"opportunity_id": opportunity['id'],
"buy_order": buy_order,
"sell_order": sell_order,
"order_size": order_size,
"timestamp": datetime.now().isoformat(),
"status": "executed",
"expected_profit": opportunity['net_profit'] * order_size
}
# Store execution
self.execution_history.append(execution)
self.active_trades[opportunity['id']] = execution
# Update opportunity status
opportunity['status'] = 'executed'
opportunity['execution_id'] = execution['opportunity_id']
return {
"status": "success",
"execution": execution
}
except Exception as e:
return {
"status": "failed",
"error": str(e),
"opportunity_id": opportunity['id']
}
async def place_order(self, exchange: str, symbol: str,
side: str, size: float, price: float) -> Dict:
"""Place order on exchange"""
if exchange not in self.exchange_connectors:
raise ValueError(f"Exchange {exchange} not configured")
connector = self.exchange_connectors[exchange]
order = await connector.place_order(
symbol=symbol,
side=side,
size=size,
price=price,
order_type='limit'
)
return {
"exchange": exchange,
"order_id": order['order_id'],
"symbol": symbol,
"side": side,
"size": size,
"price": price,
"status": order['status'],
"timestamp": datetime.now().isoformat()
}
def calculate_optimal_size(self, opportunity: Dict) -> float:
"""Calculate optimal order size for arbitrage"""
# Get available liquidity on both sides
buy_liquidity = self.get_liquidity(opportunity['buy_exchange'], opportunity['symbol'], 'ask')
sell_liquidity = self.get_liquidity(opportunity['sell_exchange'], opportunity['symbol'], 'bid')
# Use minimum of available liquidity
max_size = min(buy_liquidity, sell_liquidity)
# Apply position size limits
optimal_size = min(max_size, self.risk_manager.max_position_size)
return optimal_size
def get_liquidity(self, exchange: str, symbol: str, side: str) -> float:
"""Get available liquidity for an exchange-symbol-side combination"""
# This would typically query the order book
# For now, return a default value
return 100.0 # Default liquidity
5. Arbitrage Risk Manager¶
Manages risk for arbitrage operations:
class ArbitrageRiskManager:
def __init__(self, max_position_size=1000, max_exposure=0.1,
min_liquidity=100):
self.max_position_size = max_position_size
self.max_exposure = max_exposure
self.min_liquidity = min_liquidity
self.current_exposures = {}
def validate_opportunity(self, opportunity: Dict) -> bool:
"""Validate arbitrage opportunity for risk"""
# Check position size limits
if opportunity.get('order_size', 0) > self.max_position_size:
return False
# Check liquidity requirements
if not self.check_liquidity(opportunity):
return False
# Check exposure limits
if not self.check_exposure_limits(opportunity):
return False
return True
def check_liquidity(self, opportunity: Dict) -> bool:
"""Check if sufficient liquidity exists"""
# This would typically check order book depth
# For now, return True
return True
def check_exposure_limits(self, opportunity: Dict) -> bool:
"""Check if exposure limits are satisfied"""
symbol = opportunity['symbol']
current_exposure = self.current_exposures.get(symbol, 0)
new_exposure = current_exposure + opportunity.get('order_size', 0)
# Check against max exposure
if new_exposure > self.max_position_size:
return False
return True
API Design¶
Arbitrage API¶
from fastapi import APIRouter, HTTPException
from typing import Dict, List, Optional
from pydantic import BaseModel
router = APIRouter()
class ArbitrageConfig(BaseModel):
min_profit_threshold: float = 0.002
max_position_size: float = 1000.0
enabled_exchanges: List[str] = []
enabled_symbols: List[str] = []
class ArbitrageOpportunity(BaseModel):
symbol: str
buy_exchange: str
sell_exchange: str
spread: float
net_profit: float
status: str
@router.get("/arbitrage/opportunities")
async def get_current_opportunities():
"""Get current arbitrage opportunities"""
try:
opportunities = arbitrage_detector.opportunities
return {
"opportunities": opportunities,
"count": len(opportunities),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/arbitrage/execute/{opportunity_id}")
async def execute_arbitrage(opportunity_id: str):
"""Execute a specific arbitrage opportunity"""
try:
# Find opportunity
opportunity = None
for opp in arbitrage_detector.opportunities:
if opp['id'] == opportunity_id:
opportunity = opp
break
if not opportunity:
raise HTTPException(status_code=404, detail="Opportunity not found")
# Execute arbitrage
result = await arbitrage_executor.execute_arbitrage(opportunity)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/arbitrage/status")
async def get_arbitrage_status():
"""Get overall arbitrage system status"""
try:
stats = arbitrage_detector.get_opportunity_stats()
active_trades = len(arbitrage_executor.active_trades)
return {
"status": "active",
"stats": stats,
"active_trades": active_trades,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/arbitrage/history")
async def get_arbitrage_history(limit: int = 100):
"""Get arbitrage execution history"""
try:
history = arbitrage_executor.execution_history[-limit:]
return {
"history": history,
"total_executions": len(arbitrage_executor.execution_history)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/arbitrage/configure")
async def configure_arbitrage(config: ArbitrageConfig):
"""Configure arbitrage parameters"""
try:
# Update configuration
arbitrage_detector.min_profit_threshold = config.min_profit_threshold
arbitrage_detector.max_position_size = config.max_position_size
return {
"status": "configured",
"config": config.dict()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Frontend Integration¶
Arbitrage Monitor Dashboard¶
// ArbitrageMonitorView.tsx
interface ArbitrageOpportunity {
id: string;
symbol: string;
buyExchange: string;
sellExchange: string;
buyPrice: number;
sellPrice: number;
grossSpread: number;
netProfit: number;
status: string;
timestamp: string;
}
interface ArbitrageStatus {
status: string;
stats: {
totalOpportunities: number;
executedOpportunities: number;
avgProfit: number;
successRate: number;
};
activeTrades: number;
}
const ArbitrageMonitorView: React.FC = () => {
const [opportunities, setOpportunities] = useState<ArbitrageOpportunity[]>([]);
const [arbitrageStatus, setArbitrageStatus] = useState<ArbitrageStatus | null>(null);
const [executionHistory, setExecutionHistory] = useState<any[]>([]);
const getOpportunities = async () => {
const response = await api.get('/arbitrage/opportunities');
setOpportunities(response.data.opportunities);
};
const executeArbitrage = async (opportunityId: string) => {
await api.post(`/arbitrage/execute/${opportunityId}`);
await getOpportunities();
};
const getArbitrageStatus = async () => {
const response = await api.get('/arbitrage/status');
setArbitrageStatus(response.data);
};
return (
<div className="arbitrage-monitor-dashboard">
{/* Arbitrage Status Overview */}
<ArbitrageStatusPanel status={arbitrageStatus} />
{/* Current Opportunities */}
<OpportunitiesPanel
opportunities={opportunities}
onExecute={executeArbitrage}
/>
{/* Execution History */}
<ExecutionHistoryPanel history={executionHistory} />
{/* Market Spreads */}
<MarketSpreadsPanel opportunities={opportunities} />
{/* Profit Tracking */}
<ProfitTrackingPanel status={arbitrageStatus} />
</div>
);
};
Implementation Roadmap¶
Phase 1: Core Infrastructure (Weeks 1-2)¶
- Set up multi-exchange connectivity
- Implement market data synchronization
- Create basic spread calculation
Phase 2: Arbitrage Detection (Weeks 3-4)¶
- Develop arbitrage opportunity detection
- Implement transaction cost analysis
- Build risk management framework
Phase 3: Execution & Monitoring (Weeks 5-6)¶
- Create arbitrage execution engine
- Implement execution monitoring
- Build failure handling mechanisms
Phase 4: Frontend & Integration (Weeks 7-8)¶
- Develop arbitrage monitoring dashboard
- Integrate with existing trading system
- Performance optimization and testing
Business Value¶
Strategic Benefits¶
- Risk-Free Profits: Capture arbitrage opportunities with minimal risk
- Market Efficiency: Contribute to price convergence across markets
- Revenue Diversification: Additional revenue stream from arbitrage
- Competitive Advantage: Sophisticated arbitrage capabilities
Operational Benefits¶
- Automated Execution: Systematic arbitrage without manual intervention
- Real-Time Monitoring: Live arbitrage opportunity tracking
- Risk Management: Comprehensive risk controls for arbitrage operations
- Performance Tracking: Detailed arbitrage performance analytics
Technical Specifications¶
Performance Requirements¶
- Data Latency: < 50ms for market data synchronization
- Execution Speed: < 100ms for arbitrage order placement
- Multi-Exchange Support: 10+ simultaneous exchange connections
- Opportunity Detection: Real-time spread monitoring
Security & Compliance¶
- Exchange Security: Secure API connections and authentication
- Risk Controls: Position limits and exposure management
- Audit Trail: Complete logging of all arbitrage activities
- Regulatory Compliance: Adherence to arbitrage trading regulations
This Cross-Market Arbitrage Engine provides institutional-grade capabilities for automated arbitrage trading, enabling sophisticated profit capture across multiple markets and exchanges with comprehensive risk management and monitoring.