58. Cross-Market Price Co-movement Detection System¶
Overview¶
The Cross-Market Price Co-movement Detection System provides comprehensive real-time monitoring of price correlations and movements across different markets, exchanges, and assets. The system implements sophisticated correlation analysis, divergence detection, and automated alerting to identify arbitrage opportunities, risk signals, and market anomalies across global trading venues.
Core Capabilities¶
- Multi-Market Price Synchronization: Real-time collection of prices from multiple exchanges and markets
- Dynamic Correlation Analysis: Continuous calculation of price correlations between assets
- Divergence Detection: Automated identification of price divergences and correlation breakdowns
- Arbitrage Opportunity Detection: Real-time identification of cross-market arbitrage opportunities
- Risk Signal Generation: Automated alerts for correlation breakdowns and market anomalies
- Historical Analysis: Comprehensive tracking of correlation patterns and market relationships
- Adaptive Clustering: Dynamic asset grouping based on correlation patterns
System Architecture¶
Microservice: cross-market-linkage-center¶
services/cross-market-linkage-center/
├── src/
│ ├── main.py
│ ├── syncer/
│ │ ├── price_syncer.py
│ │ ├── market_data_collector.py
│ │ └── exchange_connector.py
│ ├── calculator/
│ │ ├── correlation_calculator.py
│ │ ├── cointegration_analyzer.py
│ │ └── granger_causality.py
│ ├── detector/
│ │ ├── divergence_detector.py
│ │ ├── spread_analyzer.py
│ │ └── cluster_detector.py
│ ├── alerter/
│ │ ├── linkage_alerter.py
│ │ ├── arbitrage_detector.py
│ │ └── risk_signal_generator.py
│ ├── recorder/
│ │ ├── linkage_recorder.py
│ │ ├── correlation_history.py
│ │ └── event_logger.py
│ ├── api/
│ │ ├── linkage_api.py
│ ├── config.py
│ └── requirements.txt
├── Dockerfile
└── tests/
Core Components¶
1. Price Syncer¶
Real-time synchronization of market prices:
class PriceSyncer:
def __init__(self, market_data_collector, exchange_connector):
self.market_data_collector = market_data_collector
self.exchange_connector = exchange_connector
self.price_cache = {}
self.sync_stats = {
"total_symbols": 0,
"active_exchanges": 0,
"last_sync": None,
"sync_frequency_ms": 100
}
async def start_synchronization(self):
"""Start real-time price synchronization"""
# Initialize exchange connections
await self.initialize_exchanges()
# Start price collection for all configured symbols
await self.start_price_collection()
# Start correlation monitoring
await self.start_correlation_monitoring()
async def initialize_exchanges(self):
"""Initialize connections to all exchanges"""
exchanges = self.get_configured_exchanges()
for exchange in exchanges:
try:
connector = await self.exchange_connector.connect(exchange)
await connector.initialize()
self.sync_stats["active_exchanges"] += 1
except Exception as e:
print(f"Failed to connect to {exchange}: {e}")
async def start_price_collection(self):
"""Start collecting prices from all exchanges"""
symbols = self.get_monitored_symbols()
self.sync_stats["total_symbols"] = len(symbols)
for symbol in symbols:
await self.start_symbol_monitoring(symbol)
async def start_symbol_monitoring(self, symbol: str):
"""Start monitoring a specific symbol across exchanges"""
exchanges = self.get_exchanges_for_symbol(symbol)
for exchange in exchanges:
try:
# Subscribe to price updates
await self.subscribe_to_price_updates(symbol, exchange)
# Start price collection task
asyncio.create_task(self.collect_prices(symbol, exchange))
except Exception as e:
print(f"Failed to monitor {symbol} on {exchange}: {e}")
async def collect_prices(self, symbol: str, exchange: str):
"""Collect prices for a symbol from an exchange"""
while True:
try:
# Get current price
price_data = await self.get_current_price(symbol, exchange)
if price_data:
# Update price cache
await self.update_price_cache(symbol, exchange, price_data)
# Trigger correlation calculation
await self.trigger_correlation_calculation(symbol)
# Wait for next update
await asyncio.sleep(self.sync_stats["sync_frequency_ms"] / 1000)
except Exception as e:
print(f"Error collecting price for {symbol} on {exchange}: {e}")
await asyncio.sleep(1)
async def update_price_cache(self, symbol: str, exchange: str, price_data: Dict):
"""Update price cache with new data"""
cache_key = f"{symbol}_{exchange}"
self.price_cache[cache_key] = {
"symbol": symbol,
"exchange": exchange,
"bid": price_data.get("bid", 0),
"ask": price_data.get("ask", 0),
"last": price_data.get("last", 0),
"volume": price_data.get("volume", 0),
"timestamp": datetime.now().isoformat(),
"spread": price_data.get("ask", 0) - price_data.get("bid", 0)
}
# Update sync statistics
self.sync_stats["last_sync"] = datetime.now().isoformat()
async def get_current_price(self, symbol: str, exchange: str) -> Optional[Dict]:
"""Get current price from exchange"""
try:
connector = self.exchange_connector.get_connector(exchange)
price_data = await connector.get_ticker(symbol)
return price_data
except Exception as e:
print(f"Error getting price for {symbol} on {exchange}: {e}")
return None
async def get_prices_for_symbol(self, symbol: str) -> List[Dict]:
"""Get prices for a symbol across all exchanges"""
prices = []
exchanges = self.get_exchanges_for_symbol(symbol)
for exchange in exchanges:
cache_key = f"{symbol}_{exchange}"
if cache_key in self.price_cache:
prices.append(self.price_cache[cache_key])
return prices
async def get_all_current_prices(self) -> Dict:
"""Get all current prices from cache"""
return {
"prices": self.price_cache,
"statistics": self.sync_stats,
"timestamp": datetime.now().isoformat()
}
def get_configured_exchanges(self) -> List[str]:
"""Get list of configured exchanges"""
return [
"binance", "okx", "bybit", "coinbase", "kraken",
"nyse", "nasdaq", "lse", "tse", "hkex"
]
def get_monitored_symbols(self) -> List[str]:
"""Get list of monitored symbols"""
return [
"BTC/USDT", "ETH/USDT", "AAPL", "MSFT", "NVDA",
"GOOGL", "AMZN", "TSLA", "SPY", "QQQ"
]
def get_exchanges_for_symbol(self, symbol: str) -> List[str]:
"""Get exchanges that support a symbol"""
# Crypto symbols
if "/" in symbol:
return ["binance", "okx", "bybit", "coinbase", "kraken"]
# Stock symbols
else:
return ["nyse", "nasdaq", "lse", "tse", "hkex"]
2. Correlation Calculator¶
Dynamic calculation of price correlations:
class CorrelationCalculator:
def __init__(self, cointegration_analyzer, granger_causality):
self.cointegration_analyzer = cointegration_analyzer
self.granger_causality = granger_causality
self.correlation_cache = {}
self.price_series = {}
self.calculation_window = 100 # Number of price points for correlation
async def calculate_correlations(self, symbols: List[str]) -> Dict:
"""Calculate correlations between all symbol pairs"""
correlations = {}
for i, symbol1 in enumerate(symbols):
for j, symbol2 in enumerate(symbols):
if i < j: # Avoid duplicate calculations
correlation = await self.calculate_pair_correlation(symbol1, symbol2)
correlations[f"{symbol1}_{symbol2}"] = correlation
return correlations
async def calculate_pair_correlation(self, symbol1: str, symbol2: str) -> Dict:
"""Calculate correlation between two symbols"""
# Get price series for both symbols
series1 = await self.get_price_series(symbol1)
series2 = await self.get_price_series(symbol2)
if len(series1) < 10 or len(series2) < 10:
return {
"correlation": 0.0,
"confidence": 0.0,
"sample_size": min(len(series1), len(series2)),
"timestamp": datetime.now().isoformat()
}
# Calculate Pearson correlation
correlation = self.calculate_pearson_correlation(series1, series2)
# Calculate rolling correlation
rolling_correlation = self.calculate_rolling_correlation(series1, series2)
# Calculate cointegration
cointegration = await self.calculate_cointegration(series1, series2)
# Calculate Granger causality
granger = await self.calculate_granger_causality(series1, series2)
result = {
"symbol1": symbol1,
"symbol2": symbol2,
"correlation": correlation,
"rolling_correlation": rolling_correlation,
"cointegration": cointegration,
"granger_causality": granger,
"confidence": self.calculate_confidence(series1, series2),
"sample_size": len(series1),
"timestamp": datetime.now().isoformat()
}
# Cache result
cache_key = f"{symbol1}_{symbol2}"
self.correlation_cache[cache_key] = result
return result
def calculate_pearson_correlation(self, series1: List[float], series2: List[float]) -> float:
"""Calculate Pearson correlation coefficient"""
if len(series1) != len(series2):
# Align series to same length
min_length = min(len(series1), len(series2))
series1 = series1[-min_length:]
series2 = series2[-min_length:]
if len(series1) < 2:
return 0.0
try:
correlation = np.corrcoef(series1, series2)[0, 1]
return correlation if not np.isnan(correlation) else 0.0
except:
return 0.0
def calculate_rolling_correlation(self, series1: List[float], series2: List[float], window: int = 20) -> float:
"""Calculate rolling correlation"""
if len(series1) < window or len(series2) < window:
return 0.0
# Use recent window
recent1 = series1[-window:]
recent2 = series2[-window:]
return self.calculate_pearson_correlation(recent1, recent2)
async def calculate_cointegration(self, series1: List[float], series2: List[float]) -> Dict:
"""Calculate cointegration between series"""
if len(series1) < 30 or len(series2) < 30:
return {"is_cointegrated": False, "p_value": 1.0}
try:
result = await self.cointegration_analyzer.test_cointegration(series1, series2)
return result
except Exception as e:
return {"is_cointegrated": False, "p_value": 1.0, "error": str(e)}
async def calculate_granger_causality(self, series1: List[float], series2: List[float]) -> Dict:
"""Calculate Granger causality"""
if len(series1) < 50 or len(series2) < 50:
return {"causality": "none", "p_value": 1.0}
try:
result = await self.granger_causality.test_causality(series1, series2)
return result
except Exception as e:
return {"causality": "none", "p_value": 1.0, "error": str(e)}
async def get_price_series(self, symbol: str) -> List[float]:
"""Get price series for a symbol"""
if symbol not in self.price_series:
self.price_series[symbol] = []
# Get current price
current_price = await self.get_current_price(symbol)
if current_price:
self.price_series[symbol].append(current_price)
# Keep series manageable
if len(self.price_series[symbol]) > self.calculation_window:
self.price_series[symbol] = self.price_series[symbol][-self.calculation_window:]
return self.price_series[symbol]
async def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for a symbol"""
# This would integrate with price syncer
# For now, return placeholder
return 100.0 + np.random.normal(0, 1)
def calculate_confidence(self, series1: List[float], series2: List[float]) -> float:
"""Calculate confidence in correlation"""
sample_size = min(len(series1), len(series2))
if sample_size < 10:
return 0.0
elif sample_size < 30:
return 0.5
elif sample_size < 50:
return 0.7
else:
return 0.9
async def get_correlation_matrix(self) -> Dict:
"""Get current correlation matrix"""
symbols = list(self.price_series.keys())
correlations = await self.calculate_correlations(symbols)
return {
"symbols": symbols,
"correlations": correlations,
"matrix": self.build_correlation_matrix(symbols, correlations),
"timestamp": datetime.now().isoformat()
}
def build_correlation_matrix(self, symbols: List[str], correlations: Dict) -> List[List[float]]:
"""Build correlation matrix"""
matrix = []
for i, symbol1 in enumerate(symbols):
row = []
for j, symbol2 in enumerate(symbols):
if i == j:
row.append(1.0) # Self-correlation
elif i < j:
key = f"{symbol1}_{symbol2}"
row.append(correlations.get(key, {}).get("correlation", 0.0))
else:
key = f"{symbol2}_{symbol1}"
row.append(correlations.get(key, {}).get("correlation", 0.0))
matrix.append(row)
return matrix
3. Divergence Detector¶
Detection of price divergences and anomalies:
class DivergenceDetector:
def __init__(self, spread_analyzer, cluster_detector):
self.spread_analyzer = spread_analyzer
self.cluster_detector = cluster_detector
self.divergence_threshold = 0.3 # Correlation threshold for divergence
self.spread_threshold = 0.02 # 2% spread threshold
self.detection_history = []
async def detect_divergences(self, correlations: Dict) -> List[Dict]:
"""Detect price divergences"""
divergences = []
for pair_key, correlation_data in correlations.items():
# Check for correlation breakdown
correlation_divergence = await self.detect_correlation_divergence(pair_key, correlation_data)
if correlation_divergence:
divergences.append(correlation_divergence)
# Check for spread divergence
spread_divergence = await self.detect_spread_divergence(pair_key, correlation_data)
if spread_divergence:
divergences.append(spread_divergence)
# Check for cluster breakdowns
cluster_divergences = await self.detect_cluster_divergences(correlations)
divergences.extend(cluster_divergences)
return divergences
async def detect_correlation_divergence(self, pair_key: str, correlation_data: Dict) -> Optional[Dict]:
"""Detect correlation divergence"""
current_correlation = correlation_data.get("correlation", 0.0)
rolling_correlation = correlation_data.get("rolling_correlation", 0.0)
# Check if correlation is below threshold
if abs(current_correlation) < self.divergence_threshold:
return {
"type": "correlation_divergence",
"pair": pair_key,
"current_correlation": current_correlation,
"rolling_correlation": rolling_correlation,
"threshold": self.divergence_threshold,
"severity": self.calculate_divergence_severity(current_correlation),
"timestamp": datetime.now().isoformat(),
"description": f"Correlation {current_correlation:.3f} below threshold {self.divergence_threshold}"
}
# Check for significant correlation change
correlation_change = abs(current_correlation - rolling_correlation)
if correlation_change > 0.2: # 20% change
return {
"type": "correlation_change",
"pair": pair_key,
"current_correlation": current_correlation,
"rolling_correlation": rolling_correlation,
"change": correlation_change,
"severity": "medium",
"timestamp": datetime.now().isoformat(),
"description": f"Correlation changed by {correlation_change:.3f}"
}
return None
async def detect_spread_divergence(self, pair_key: str, correlation_data: Dict) -> Optional[Dict]:
"""Detect spread divergence"""
symbol1, symbol2 = pair_key.split("_")
# Get current spreads
spread1 = await self.get_current_spread(symbol1)
spread2 = await self.get_current_spread(symbol2)
if spread1 and spread2:
spread_ratio = spread1 / spread2 if spread2 > 0 else 0
if spread_ratio > 2.0 or spread_ratio < 0.5:
return {
"type": "spread_divergence",
"pair": pair_key,
"spread1": spread1,
"spread2": spread2,
"spread_ratio": spread_ratio,
"threshold": 2.0,
"severity": "high",
"timestamp": datetime.now().isoformat(),
"description": f"Spread ratio {spread_ratio:.3f} exceeds threshold"
}
return None
async def detect_cluster_divergences(self, correlations: Dict) -> List[Dict]:
"""Detect cluster breakdowns"""
divergences = []
# Get asset clusters
clusters = await self.cluster_detector.get_asset_clusters(correlations)
for cluster in clusters:
# Check cluster stability
cluster_stability = await self.calculate_cluster_stability(cluster, correlations)
if cluster_stability < 0.5: # Low stability threshold
divergences.append({
"type": "cluster_breakdown",
"cluster": cluster["name"],
"assets": cluster["assets"],
"stability": cluster_stability,
"threshold": 0.5,
"severity": "high",
"timestamp": datetime.now().isoformat(),
"description": f"Cluster {cluster['name']} stability {cluster_stability:.3f} below threshold"
})
return divergences
async def get_current_spread(self, symbol: str) -> Optional[float]:
"""Get current spread for a symbol"""
# This would integrate with price syncer
# For now, return placeholder
return 0.001 + np.random.normal(0, 0.0001)
def calculate_divergence_severity(self, correlation: float) -> str:
"""Calculate divergence severity"""
if abs(correlation) < 0.1:
return "high"
elif abs(correlation) < 0.2:
return "medium"
else:
return "low"
async def calculate_cluster_stability(self, cluster: Dict, correlations: Dict) -> float:
"""Calculate cluster stability"""
assets = cluster["assets"]
if len(assets) < 2:
return 1.0
# Calculate average correlation within cluster
total_correlation = 0.0
correlation_count = 0
for i, asset1 in enumerate(assets):
for j, asset2 in enumerate(assets):
if i < j:
pair_key = f"{asset1}_{asset2}"
correlation = correlations.get(pair_key, {}).get("correlation", 0.0)
total_correlation += abs(correlation)
correlation_count += 1
if correlation_count == 0:
return 0.0
return total_correlation / correlation_count
async def get_divergence_history(self, hours: int = 24) -> List[Dict]:
"""Get divergence history"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_divergences = [
divergence for divergence in self.detection_history
if datetime.fromisoformat(divergence["timestamp"]) > cutoff_time
]
return recent_divergences
API Design¶
Cross-Market Linkage API¶
@router.get("/linkage/correlations")
async def get_current_correlations():
"""Get current correlation matrix"""
correlations = await correlation_calculator.get_correlation_matrix()
return {
"correlations": correlations,
"timestamp": datetime.now().isoformat()
}
@router.get("/linkage/divergences")
async def get_current_divergences():
"""Get current divergences"""
correlations = await correlation_calculator.get_correlation_matrix()
divergences = await divergence_detector.detect_divergences(correlations["correlations"])
return {
"divergences": divergences,
"total_count": len(divergences),
"high_severity": len([d for d in divergences if d["severity"] == "high"]),
"timestamp": datetime.now().isoformat()
}
@router.get("/linkage/prices")
async def get_current_prices():
"""Get current prices across markets"""
prices = await price_syncer.get_all_current_prices()
return {
"prices": prices,
"timestamp": datetime.now().isoformat()
}
@router.get("/linkage/spreads")
async def get_current_spreads():
"""Get current spreads across markets"""
spreads = await spread_analyzer.get_current_spreads()
return {
"spreads": spreads,
"timestamp": datetime.now().isoformat()
}
@router.get("/linkage/arbitrage_opportunities")
async def get_arbitrage_opportunities():
"""Get current arbitrage opportunities"""
opportunities = await arbitrage_detector.get_opportunities()
return {
"opportunities": opportunities,
"total_count": len(opportunities),
"timestamp": datetime.now().isoformat()
}
@router.get("/linkage/clusters")
async def get_asset_clusters():
"""Get current asset clusters"""
correlations = await correlation_calculator.get_correlation_matrix()
clusters = await cluster_detector.get_asset_clusters(correlations["correlations"])
return {
"clusters": clusters,
"timestamp": datetime.now().isoformat()
}
Frontend Integration¶
Cross-Market Linkage Dashboard¶
const CrossMarketLinkageView: React.FC = () => {
const [correlations, setCorrelations] = useState<CorrelationMatrix | null>(null);
const [divergences, setDivergences] = useState<Divergence[]>([]);
const [prices, setPrices] = useState<MarketPrices | null>(null);
const [clusters, setClusters] = useState<AssetCluster[]>([]);
return (
<div className="cross-market-linkage-dashboard">
{/* Correlation Matrix */}
<CorrelationMatrixPanel
correlations={correlations}
/>
{/* Divergence Alerts */}
<DivergenceAlertsPanel
divergences={divergences}
onDivergenceSelect={handleDivergenceSelect}
/>
{/* Market Prices */}
<MarketPricesPanel
prices={prices}
/>
{/* Asset Clusters */}
<AssetClustersPanel
clusters={clusters}
/>
{/* Arbitrage Opportunities */}
<ArbitrageOpportunitiesPanel
opportunities={arbitrageOpportunities}
/>
{/* Spread Analysis */}
<SpreadAnalysisPanel
spreads={spreads}
/>
{/* Correlation Trends */}
<CorrelationTrendsPanel
correlations={correlations}
/>
{/* Risk Signals */}
<RiskSignalsPanel
divergences={divergences}
/>
</div>
);
};
Implementation Roadmap¶
Phase 1: Core Synchronization (Weeks 1-2)¶
- Implement price synchronization across exchanges
- Set up correlation calculation framework
- Create basic divergence detection
Phase 2: Advanced Analysis (Weeks 3-4)¶
- Implement cointegration and Granger causality
- Develop cluster detection algorithms
- Build arbitrage opportunity detection
Phase 3: Alerting & Monitoring (Weeks 5-6)¶
- Create automated alerting system
- Build comprehensive monitoring dashboard
- Implement risk signal generation
Phase 4: Integration & Optimization (Weeks 7-8)¶
- Integrate with existing trading systems
- Performance optimization
- Advanced analytics features
Business Value¶
Strategic Benefits¶
- Arbitrage Opportunities: Real-time identification of cross-market arbitrage
- Risk Management: Early detection of correlation breakdowns and market anomalies
- Portfolio Optimization: Data-driven insights for asset allocation
- Market Intelligence: Comprehensive understanding of market relationships
Operational Benefits¶
- Real-Time Monitoring: Live tracking of cross-market relationships
- Automated Detection: Continuous monitoring of divergences and opportunities
- Comprehensive Analysis: Multi-dimensional correlation and causality analysis
- Historical Tracking: Complete record of market relationships and changes
Technical Specifications¶
Performance Requirements¶
- Price Synchronization: < 100ms for price updates
- Correlation Calculation: < 500ms for correlation matrix updates
- Divergence Detection: < 200ms for anomaly identification
- Alert Delivery: < 30s for critical alert delivery
Analysis Capabilities¶
- Correlation Analysis: Pearson, rolling, and dynamic correlations
- Cointegration Testing: Statistical cointegration analysis
- Granger Causality: Causality testing between assets
- Cluster Detection: Dynamic asset clustering based on correlations
Integration Requirements¶
- Multi-Exchange Support: Binance, OKX, Bybit, Coinbase, Kraken, etc.
- Multi-Asset Support: Crypto, stocks, commodities, bonds
- Real-Time Data: High-frequency price and volume data
- Risk Management: Integration with risk monitoring systems
This Cross-Market Price Co-movement Detection System provides institutional-grade cross-market analysis, enabling sophisticated arbitrage detection and risk management, similar to the systems used by top-tier quantitative trading firms like Jump Trading, DRW, and Two Sigma.