Skip to content

58. Cross-Market Price Co-movement Detection System

Overview

The Cross-Market Price Co-movement Detection System provides comprehensive real-time monitoring of price correlations and movements across different markets, exchanges, and assets. The system implements sophisticated correlation analysis, divergence detection, and automated alerting to identify arbitrage opportunities, risk signals, and market anomalies across global trading venues.

Core Capabilities

  • Multi-Market Price Synchronization: Real-time collection of prices from multiple exchanges and markets
  • Dynamic Correlation Analysis: Continuous calculation of price correlations between assets
  • Divergence Detection: Automated identification of price divergences and correlation breakdowns
  • Arbitrage Opportunity Detection: Real-time identification of cross-market arbitrage opportunities
  • Risk Signal Generation: Automated alerts for correlation breakdowns and market anomalies
  • Historical Analysis: Comprehensive tracking of correlation patterns and market relationships
  • Adaptive Clustering: Dynamic asset grouping based on correlation patterns

System Architecture

Microservice: cross-market-linkage-center

services/cross-market-linkage-center/
├── src/
│   ├── main.py
│   ├── syncer/
│   │   ├── price_syncer.py
│   │   ├── market_data_collector.py
│   │   └── exchange_connector.py
│   ├── calculator/
│   │   ├── correlation_calculator.py
│   │   ├── cointegration_analyzer.py
│   │   └── granger_causality.py
│   ├── detector/
│   │   ├── divergence_detector.py
│   │   ├── spread_analyzer.py
│   │   └── cluster_detector.py
│   ├── alerter/
│   │   ├── linkage_alerter.py
│   │   ├── arbitrage_detector.py
│   │   └── risk_signal_generator.py
│   ├── recorder/
│   │   ├── linkage_recorder.py
│   │   ├── correlation_history.py
│   │   └── event_logger.py
│   ├── api/
│   │   ├── linkage_api.py
│   ├── config.py
│   └── requirements.txt
├── Dockerfile
└── tests/

Core Components

1. Price Syncer

Real-time synchronization of market prices:

class PriceSyncer:
    def __init__(self, market_data_collector, exchange_connector):
        self.market_data_collector = market_data_collector
        self.exchange_connector = exchange_connector
        self.price_cache = {}
        self.sync_stats = {
            "total_symbols": 0,
            "active_exchanges": 0,
            "last_sync": None,
            "sync_frequency_ms": 100
        }

    async def start_synchronization(self):
        """Start real-time price synchronization"""

        # Initialize exchange connections
        await self.initialize_exchanges()

        # Start price collection for all configured symbols
        await self.start_price_collection()

        # Start correlation monitoring
        await self.start_correlation_monitoring()

    async def initialize_exchanges(self):
        """Initialize connections to all exchanges"""

        exchanges = self.get_configured_exchanges()

        for exchange in exchanges:
            try:
                connector = await self.exchange_connector.connect(exchange)
                await connector.initialize()
                self.sync_stats["active_exchanges"] += 1
            except Exception as e:
                print(f"Failed to connect to {exchange}: {e}")

    async def start_price_collection(self):
        """Start collecting prices from all exchanges"""

        symbols = self.get_monitored_symbols()
        self.sync_stats["total_symbols"] = len(symbols)

        for symbol in symbols:
            await self.start_symbol_monitoring(symbol)

    async def start_symbol_monitoring(self, symbol: str):
        """Start monitoring a specific symbol across exchanges"""

        exchanges = self.get_exchanges_for_symbol(symbol)

        for exchange in exchanges:
            try:
                # Subscribe to price updates
                await self.subscribe_to_price_updates(symbol, exchange)

                # Start price collection task
                asyncio.create_task(self.collect_prices(symbol, exchange))

            except Exception as e:
                print(f"Failed to monitor {symbol} on {exchange}: {e}")

    async def collect_prices(self, symbol: str, exchange: str):
        """Collect prices for a symbol from an exchange"""

        while True:
            try:
                # Get current price
                price_data = await self.get_current_price(symbol, exchange)

                if price_data:
                    # Update price cache
                    await self.update_price_cache(symbol, exchange, price_data)

                    # Trigger correlation calculation
                    await self.trigger_correlation_calculation(symbol)

                # Wait for next update
                await asyncio.sleep(self.sync_stats["sync_frequency_ms"] / 1000)

            except Exception as e:
                print(f"Error collecting price for {symbol} on {exchange}: {e}")
                await asyncio.sleep(1)

    async def update_price_cache(self, symbol: str, exchange: str, price_data: Dict):
        """Update price cache with new data"""

        cache_key = f"{symbol}_{exchange}"

        self.price_cache[cache_key] = {
            "symbol": symbol,
            "exchange": exchange,
            "bid": price_data.get("bid", 0),
            "ask": price_data.get("ask", 0),
            "last": price_data.get("last", 0),
            "volume": price_data.get("volume", 0),
            "timestamp": datetime.now().isoformat(),
            "spread": price_data.get("ask", 0) - price_data.get("bid", 0)
        }

        # Update sync statistics
        self.sync_stats["last_sync"] = datetime.now().isoformat()

    async def get_current_price(self, symbol: str, exchange: str) -> Optional[Dict]:
        """Get current price from exchange"""

        try:
            connector = self.exchange_connector.get_connector(exchange)
            price_data = await connector.get_ticker(symbol)
            return price_data
        except Exception as e:
            print(f"Error getting price for {symbol} on {exchange}: {e}")
            return None

    async def get_prices_for_symbol(self, symbol: str) -> List[Dict]:
        """Get prices for a symbol across all exchanges"""

        prices = []
        exchanges = self.get_exchanges_for_symbol(symbol)

        for exchange in exchanges:
            cache_key = f"{symbol}_{exchange}"
            if cache_key in self.price_cache:
                prices.append(self.price_cache[cache_key])

        return prices

    async def get_all_current_prices(self) -> Dict:
        """Get all current prices from cache"""

        return {
            "prices": self.price_cache,
            "statistics": self.sync_stats,
            "timestamp": datetime.now().isoformat()
        }

    def get_configured_exchanges(self) -> List[str]:
        """Get list of configured exchanges"""

        return [
            "binance", "okx", "bybit", "coinbase", "kraken",
            "nyse", "nasdaq", "lse", "tse", "hkex"
        ]

    def get_monitored_symbols(self) -> List[str]:
        """Get list of monitored symbols"""

        return [
            "BTC/USDT", "ETH/USDT", "AAPL", "MSFT", "NVDA",
            "GOOGL", "AMZN", "TSLA", "SPY", "QQQ"
        ]

    def get_exchanges_for_symbol(self, symbol: str) -> List[str]:
        """Get exchanges that support a symbol"""

        # Crypto symbols
        if "/" in symbol:
            return ["binance", "okx", "bybit", "coinbase", "kraken"]

        # Stock symbols
        else:
            return ["nyse", "nasdaq", "lse", "tse", "hkex"]

2. Correlation Calculator

Dynamic calculation of price correlations:

class CorrelationCalculator:
    def __init__(self, cointegration_analyzer, granger_causality):
        self.cointegration_analyzer = cointegration_analyzer
        self.granger_causality = granger_causality
        self.correlation_cache = {}
        self.price_series = {}
        self.calculation_window = 100  # Number of price points for correlation

    async def calculate_correlations(self, symbols: List[str]) -> Dict:
        """Calculate correlations between all symbol pairs"""

        correlations = {}

        for i, symbol1 in enumerate(symbols):
            for j, symbol2 in enumerate(symbols):
                if i < j:  # Avoid duplicate calculations
                    correlation = await self.calculate_pair_correlation(symbol1, symbol2)
                    correlations[f"{symbol1}_{symbol2}"] = correlation

        return correlations

    async def calculate_pair_correlation(self, symbol1: str, symbol2: str) -> Dict:
        """Calculate correlation between two symbols"""

        # Get price series for both symbols
        series1 = await self.get_price_series(symbol1)
        series2 = await self.get_price_series(symbol2)

        if len(series1) < 10 or len(series2) < 10:
            return {
                "correlation": 0.0,
                "confidence": 0.0,
                "sample_size": min(len(series1), len(series2)),
                "timestamp": datetime.now().isoformat()
            }

        # Calculate Pearson correlation
        correlation = self.calculate_pearson_correlation(series1, series2)

        # Calculate rolling correlation
        rolling_correlation = self.calculate_rolling_correlation(series1, series2)

        # Calculate cointegration
        cointegration = await self.calculate_cointegration(series1, series2)

        # Calculate Granger causality
        granger = await self.calculate_granger_causality(series1, series2)

        result = {
            "symbol1": symbol1,
            "symbol2": symbol2,
            "correlation": correlation,
            "rolling_correlation": rolling_correlation,
            "cointegration": cointegration,
            "granger_causality": granger,
            "confidence": self.calculate_confidence(series1, series2),
            "sample_size": len(series1),
            "timestamp": datetime.now().isoformat()
        }

        # Cache result
        cache_key = f"{symbol1}_{symbol2}"
        self.correlation_cache[cache_key] = result

        return result

    def calculate_pearson_correlation(self, series1: List[float], series2: List[float]) -> float:
        """Calculate Pearson correlation coefficient"""

        if len(series1) != len(series2):
            # Align series to same length
            min_length = min(len(series1), len(series2))
            series1 = series1[-min_length:]
            series2 = series2[-min_length:]

        if len(series1) < 2:
            return 0.0

        try:
            correlation = np.corrcoef(series1, series2)[0, 1]
            return correlation if not np.isnan(correlation) else 0.0
        except:
            return 0.0

    def calculate_rolling_correlation(self, series1: List[float], series2: List[float], window: int = 20) -> float:
        """Calculate rolling correlation"""

        if len(series1) < window or len(series2) < window:
            return 0.0

        # Use recent window
        recent1 = series1[-window:]
        recent2 = series2[-window:]

        return self.calculate_pearson_correlation(recent1, recent2)

    async def calculate_cointegration(self, series1: List[float], series2: List[float]) -> Dict:
        """Calculate cointegration between series"""

        if len(series1) < 30 or len(series2) < 30:
            return {"is_cointegrated": False, "p_value": 1.0}

        try:
            result = await self.cointegration_analyzer.test_cointegration(series1, series2)
            return result
        except Exception as e:
            return {"is_cointegrated": False, "p_value": 1.0, "error": str(e)}

    async def calculate_granger_causality(self, series1: List[float], series2: List[float]) -> Dict:
        """Calculate Granger causality"""

        if len(series1) < 50 or len(series2) < 50:
            return {"causality": "none", "p_value": 1.0}

        try:
            result = await self.granger_causality.test_causality(series1, series2)
            return result
        except Exception as e:
            return {"causality": "none", "p_value": 1.0, "error": str(e)}

    async def get_price_series(self, symbol: str) -> List[float]:
        """Get price series for a symbol"""

        if symbol not in self.price_series:
            self.price_series[symbol] = []

        # Get current price
        current_price = await self.get_current_price(symbol)
        if current_price:
            self.price_series[symbol].append(current_price)

            # Keep series manageable
            if len(self.price_series[symbol]) > self.calculation_window:
                self.price_series[symbol] = self.price_series[symbol][-self.calculation_window:]

        return self.price_series[symbol]

    async def get_current_price(self, symbol: str) -> Optional[float]:
        """Get current price for a symbol"""

        # This would integrate with price syncer
        # For now, return placeholder
        return 100.0 + np.random.normal(0, 1)

    def calculate_confidence(self, series1: List[float], series2: List[float]) -> float:
        """Calculate confidence in correlation"""

        sample_size = min(len(series1), len(series2))

        if sample_size < 10:
            return 0.0
        elif sample_size < 30:
            return 0.5
        elif sample_size < 50:
            return 0.7
        else:
            return 0.9

    async def get_correlation_matrix(self) -> Dict:
        """Get current correlation matrix"""

        symbols = list(self.price_series.keys())
        correlations = await self.calculate_correlations(symbols)

        return {
            "symbols": symbols,
            "correlations": correlations,
            "matrix": self.build_correlation_matrix(symbols, correlations),
            "timestamp": datetime.now().isoformat()
        }

    def build_correlation_matrix(self, symbols: List[str], correlations: Dict) -> List[List[float]]:
        """Build correlation matrix"""

        matrix = []

        for i, symbol1 in enumerate(symbols):
            row = []
            for j, symbol2 in enumerate(symbols):
                if i == j:
                    row.append(1.0)  # Self-correlation
                elif i < j:
                    key = f"{symbol1}_{symbol2}"
                    row.append(correlations.get(key, {}).get("correlation", 0.0))
                else:
                    key = f"{symbol2}_{symbol1}"
                    row.append(correlations.get(key, {}).get("correlation", 0.0))
            matrix.append(row)

        return matrix

3. Divergence Detector

Detection of price divergences and anomalies:

class DivergenceDetector:
    def __init__(self, spread_analyzer, cluster_detector):
        self.spread_analyzer = spread_analyzer
        self.cluster_detector = cluster_detector
        self.divergence_threshold = 0.3  # Correlation threshold for divergence
        self.spread_threshold = 0.02     # 2% spread threshold
        self.detection_history = []

    async def detect_divergences(self, correlations: Dict) -> List[Dict]:
        """Detect price divergences"""

        divergences = []

        for pair_key, correlation_data in correlations.items():
            # Check for correlation breakdown
            correlation_divergence = await self.detect_correlation_divergence(pair_key, correlation_data)
            if correlation_divergence:
                divergences.append(correlation_divergence)

            # Check for spread divergence
            spread_divergence = await self.detect_spread_divergence(pair_key, correlation_data)
            if spread_divergence:
                divergences.append(spread_divergence)

        # Check for cluster breakdowns
        cluster_divergences = await self.detect_cluster_divergences(correlations)
        divergences.extend(cluster_divergences)

        return divergences

    async def detect_correlation_divergence(self, pair_key: str, correlation_data: Dict) -> Optional[Dict]:
        """Detect correlation divergence"""

        current_correlation = correlation_data.get("correlation", 0.0)
        rolling_correlation = correlation_data.get("rolling_correlation", 0.0)

        # Check if correlation is below threshold
        if abs(current_correlation) < self.divergence_threshold:
            return {
                "type": "correlation_divergence",
                "pair": pair_key,
                "current_correlation": current_correlation,
                "rolling_correlation": rolling_correlation,
                "threshold": self.divergence_threshold,
                "severity": self.calculate_divergence_severity(current_correlation),
                "timestamp": datetime.now().isoformat(),
                "description": f"Correlation {current_correlation:.3f} below threshold {self.divergence_threshold}"
            }

        # Check for significant correlation change
        correlation_change = abs(current_correlation - rolling_correlation)
        if correlation_change > 0.2:  # 20% change
            return {
                "type": "correlation_change",
                "pair": pair_key,
                "current_correlation": current_correlation,
                "rolling_correlation": rolling_correlation,
                "change": correlation_change,
                "severity": "medium",
                "timestamp": datetime.now().isoformat(),
                "description": f"Correlation changed by {correlation_change:.3f}"
            }

        return None

    async def detect_spread_divergence(self, pair_key: str, correlation_data: Dict) -> Optional[Dict]:
        """Detect spread divergence"""

        symbol1, symbol2 = pair_key.split("_")

        # Get current spreads
        spread1 = await self.get_current_spread(symbol1)
        spread2 = await self.get_current_spread(symbol2)

        if spread1 and spread2:
            spread_ratio = spread1 / spread2 if spread2 > 0 else 0

            if spread_ratio > 2.0 or spread_ratio < 0.5:
                return {
                    "type": "spread_divergence",
                    "pair": pair_key,
                    "spread1": spread1,
                    "spread2": spread2,
                    "spread_ratio": spread_ratio,
                    "threshold": 2.0,
                    "severity": "high",
                    "timestamp": datetime.now().isoformat(),
                    "description": f"Spread ratio {spread_ratio:.3f} exceeds threshold"
                }

        return None

    async def detect_cluster_divergences(self, correlations: Dict) -> List[Dict]:
        """Detect cluster breakdowns"""

        divergences = []

        # Get asset clusters
        clusters = await self.cluster_detector.get_asset_clusters(correlations)

        for cluster in clusters:
            # Check cluster stability
            cluster_stability = await self.calculate_cluster_stability(cluster, correlations)

            if cluster_stability < 0.5:  # Low stability threshold
                divergences.append({
                    "type": "cluster_breakdown",
                    "cluster": cluster["name"],
                    "assets": cluster["assets"],
                    "stability": cluster_stability,
                    "threshold": 0.5,
                    "severity": "high",
                    "timestamp": datetime.now().isoformat(),
                    "description": f"Cluster {cluster['name']} stability {cluster_stability:.3f} below threshold"
                })

        return divergences

    async def get_current_spread(self, symbol: str) -> Optional[float]:
        """Get current spread for a symbol"""

        # This would integrate with price syncer
        # For now, return placeholder
        return 0.001 + np.random.normal(0, 0.0001)

    def calculate_divergence_severity(self, correlation: float) -> str:
        """Calculate divergence severity"""

        if abs(correlation) < 0.1:
            return "high"
        elif abs(correlation) < 0.2:
            return "medium"
        else:
            return "low"

    async def calculate_cluster_stability(self, cluster: Dict, correlations: Dict) -> float:
        """Calculate cluster stability"""

        assets = cluster["assets"]
        if len(assets) < 2:
            return 1.0

        # Calculate average correlation within cluster
        total_correlation = 0.0
        correlation_count = 0

        for i, asset1 in enumerate(assets):
            for j, asset2 in enumerate(assets):
                if i < j:
                    pair_key = f"{asset1}_{asset2}"
                    correlation = correlations.get(pair_key, {}).get("correlation", 0.0)
                    total_correlation += abs(correlation)
                    correlation_count += 1

        if correlation_count == 0:
            return 0.0

        return total_correlation / correlation_count

    async def get_divergence_history(self, hours: int = 24) -> List[Dict]:
        """Get divergence history"""

        cutoff_time = datetime.now() - timedelta(hours=hours)

        recent_divergences = [
            divergence for divergence in self.detection_history
            if datetime.fromisoformat(divergence["timestamp"]) > cutoff_time
        ]

        return recent_divergences

API Design

Cross-Market Linkage API

@router.get("/linkage/correlations")
async def get_current_correlations():
    """Get current correlation matrix"""

    correlations = await correlation_calculator.get_correlation_matrix()

    return {
        "correlations": correlations,
        "timestamp": datetime.now().isoformat()
    }

@router.get("/linkage/divergences")
async def get_current_divergences():
    """Get current divergences"""

    correlations = await correlation_calculator.get_correlation_matrix()
    divergences = await divergence_detector.detect_divergences(correlations["correlations"])

    return {
        "divergences": divergences,
        "total_count": len(divergences),
        "high_severity": len([d for d in divergences if d["severity"] == "high"]),
        "timestamp": datetime.now().isoformat()
    }

@router.get("/linkage/prices")
async def get_current_prices():
    """Get current prices across markets"""

    prices = await price_syncer.get_all_current_prices()

    return {
        "prices": prices,
        "timestamp": datetime.now().isoformat()
    }

@router.get("/linkage/spreads")
async def get_current_spreads():
    """Get current spreads across markets"""

    spreads = await spread_analyzer.get_current_spreads()

    return {
        "spreads": spreads,
        "timestamp": datetime.now().isoformat()
    }

@router.get("/linkage/arbitrage_opportunities")
async def get_arbitrage_opportunities():
    """Get current arbitrage opportunities"""

    opportunities = await arbitrage_detector.get_opportunities()

    return {
        "opportunities": opportunities,
        "total_count": len(opportunities),
        "timestamp": datetime.now().isoformat()
    }

@router.get("/linkage/clusters")
async def get_asset_clusters():
    """Get current asset clusters"""

    correlations = await correlation_calculator.get_correlation_matrix()
    clusters = await cluster_detector.get_asset_clusters(correlations["correlations"])

    return {
        "clusters": clusters,
        "timestamp": datetime.now().isoformat()
    }

Frontend Integration

Cross-Market Linkage Dashboard

const CrossMarketLinkageView: React.FC = () => {
  const [correlations, setCorrelations] = useState<CorrelationMatrix | null>(null);
  const [divergences, setDivergences] = useState<Divergence[]>([]);
  const [prices, setPrices] = useState<MarketPrices | null>(null);
  const [clusters, setClusters] = useState<AssetCluster[]>([]);

  return (
    <div className="cross-market-linkage-dashboard">
      {/* Correlation Matrix */}
      <CorrelationMatrixPanel 
        correlations={correlations}
      />

      {/* Divergence Alerts */}
      <DivergenceAlertsPanel 
        divergences={divergences}
        onDivergenceSelect={handleDivergenceSelect}
      />

      {/* Market Prices */}
      <MarketPricesPanel 
        prices={prices}
      />

      {/* Asset Clusters */}
      <AssetClustersPanel 
        clusters={clusters}
      />

      {/* Arbitrage Opportunities */}
      <ArbitrageOpportunitiesPanel 
        opportunities={arbitrageOpportunities}
      />

      {/* Spread Analysis */}
      <SpreadAnalysisPanel 
        spreads={spreads}
      />

      {/* Correlation Trends */}
      <CorrelationTrendsPanel 
        correlations={correlations}
      />

      {/* Risk Signals */}
      <RiskSignalsPanel 
        divergences={divergences}
      />
    </div>
  );
};

Implementation Roadmap

Phase 1: Core Synchronization (Weeks 1-2)

  • Implement price synchronization across exchanges
  • Set up correlation calculation framework
  • Create basic divergence detection

Phase 2: Advanced Analysis (Weeks 3-4)

  • Implement cointegration and Granger causality
  • Develop cluster detection algorithms
  • Build arbitrage opportunity detection

Phase 3: Alerting & Monitoring (Weeks 5-6)

  • Create automated alerting system
  • Build comprehensive monitoring dashboard
  • Implement risk signal generation

Phase 4: Integration & Optimization (Weeks 7-8)

  • Integrate with existing trading systems
  • Performance optimization
  • Advanced analytics features

Business Value

Strategic Benefits

  1. Arbitrage Opportunities: Real-time identification of cross-market arbitrage
  2. Risk Management: Early detection of correlation breakdowns and market anomalies
  3. Portfolio Optimization: Data-driven insights for asset allocation
  4. Market Intelligence: Comprehensive understanding of market relationships

Operational Benefits

  1. Real-Time Monitoring: Live tracking of cross-market relationships
  2. Automated Detection: Continuous monitoring of divergences and opportunities
  3. Comprehensive Analysis: Multi-dimensional correlation and causality analysis
  4. Historical Tracking: Complete record of market relationships and changes

Technical Specifications

Performance Requirements

  • Price Synchronization: < 100ms for price updates
  • Correlation Calculation: < 500ms for correlation matrix updates
  • Divergence Detection: < 200ms for anomaly identification
  • Alert Delivery: < 30s for critical alert delivery

Analysis Capabilities

  • Correlation Analysis: Pearson, rolling, and dynamic correlations
  • Cointegration Testing: Statistical cointegration analysis
  • Granger Causality: Causality testing between assets
  • Cluster Detection: Dynamic asset clustering based on correlations

Integration Requirements

  • Multi-Exchange Support: Binance, OKX, Bybit, Coinbase, Kraken, etc.
  • Multi-Asset Support: Crypto, stocks, commodities, bonds
  • Real-Time Data: High-frequency price and volume data
  • Risk Management: Integration with risk monitoring systems

This Cross-Market Price Co-movement Detection System provides institutional-grade cross-market analysis, enabling sophisticated arbitrage detection and risk management, similar to the systems used by top-tier quantitative trading firms like Jump Trading, DRW, and Two Sigma.