Skip to content

53. Smart Cancel-and-Replace System

Overview

The Smart Cancel-and-Replace System provides intelligent order management through real-time monitoring of open orders, automatic detection of market changes, and dynamic order replacement to maintain optimal pricing and execution efficiency. The system continuously monitors order status, detects when orders become stale or mispriced due to market movements, and automatically cancels and replaces orders with updated prices while implementing rate limiting to prevent excessive trading activity.

Core Capabilities

  • Real-Time Order Monitoring: Continuous tracking of open order status and execution progress
  • Market Change Detection: Automatic detection of significant market movements and price shifts
  • Intelligent Decision Making: Smart algorithms to determine when orders should be cancelled and replaced
  • Dynamic Price Adjustment: Automatic price updates based on current market conditions
  • Rate Limiting: Protection against excessive cancel-replace operations
  • Multi-Strategy Support: Different cancel-replace logic for various trading strategies
  • Risk Management: Protection against adverse price movements and execution slippage

System Architecture

Microservice: cancel-replace-center

services/cancel-replace-center/
├── src/
│   ├── main.py
│   ├── monitor/
│   │   ├── order_status_monitor.py
│   │   ├── execution_tracker.py
│   │   └── order_health_checker.py
│   ├── detector/
│   │   ├── market_change_detector.py
│   │   ├── price_movement_analyzer.py
│   │   ├── volatility_monitor.py
│   │   └── spread_analyzer.py
│   ├── decider/
│   │   ├── cancel_replace_decider.py
│   │   ├── strategy_router.py
│   │   ├── risk_assessor.py
│   │   └── timing_optimizer.py
│   ├── executor/
│   │   ├── cancel_executor.py
│   │   ├── replace_executor.py
│   │   ├── order_router.py
│   │   └── rate_limiter.py
│   ├── strategies/
│   │   ├── time_based_strategy.py
│   │   ├── market_change_strategy.py
│   │   ├── partial_fill_strategy.py
│   │   └── adaptive_strategy.py
│   ├── analytics/
│   │   ├── performance_tracker.py
│   │   ├── cost_analyzer.py
│   │   └── effectiveness_metrics.py
│   ├── api/
│   │   ├── cancel_replace_api.py
│   ├── config.py
│   └── requirements.txt
├── Dockerfile
└── tests/

Core Components

1. Order Status Monitor

Real-time monitoring of open orders:

class OrderStatusMonitor:
    def __init__(self, order_manager, execution_tracker):
        self.order_manager = order_manager
        self.execution_tracker = execution_tracker
        self.order_cache = {}
        self.monitoring_active = True

    async def start_monitoring(self):
        """Start continuous order monitoring"""

        while self.monitoring_active:
            try:
                # Fetch all open orders
                open_orders = await self.fetch_open_orders()

                # Update order cache
                self.update_order_cache(open_orders)

                # Check order health
                health_issues = self.check_order_health(open_orders)

                # Trigger alerts for critical issues
                if health_issues:
                    await self.trigger_health_alerts(health_issues)

                # Wait before next check
                await asyncio.sleep(1)  # 1-second monitoring interval

            except Exception as e:
                print(f"Error in order monitoring: {e}")
                await asyncio.sleep(5)  # Longer wait on error

    async def fetch_open_orders(self) -> List[Dict]:
        """Fetch all open orders from all venues"""

        all_orders = []

        # Fetch from all connected exchanges
        exchanges = self.order_manager.get_connected_exchanges()

        for exchange in exchanges:
            try:
                exchange_orders = await self.order_manager.get_open_orders(exchange)
                all_orders.extend(exchange_orders)
            except Exception as e:
                print(f"Error fetching orders from {exchange}: {e}")

        return all_orders

    def update_order_cache(self, orders: List[Dict]):
        """Update internal order cache"""

        current_time = datetime.now()

        for order in orders:
            order_id = order["order_id"]

            if order_id in self.order_cache:
                # Update existing order
                cached_order = self.order_cache[order_id]
                cached_order.update(order)
                cached_order["last_updated"] = current_time
            else:
                # Add new order
                order["created_at"] = current_time
                order["last_updated"] = current_time
                self.order_cache[order_id] = order

    def check_order_health(self, orders: List[Dict]) -> List[Dict]:
        """Check health of all orders"""

        health_issues = []
        current_time = datetime.now()

        for order in orders:
            order_id = order["order_id"]
            order_time = order.get("created_at", current_time)

            # Calculate order age
            if isinstance(order_time, str):
                order_time = datetime.fromisoformat(order_time)

            order_age = (current_time - order_time).total_seconds()

            # Check for health issues
            issues = []

            # Check if order is too old
            max_age = order.get("max_age_seconds", 300)  # 5 minutes default
            if order_age > max_age:
                issues.append({
                    "type": "order_too_old",
                    "severity": "medium",
                    "age_seconds": order_age,
                    "max_age_seconds": max_age
                })

            # Check if order is partially filled for too long
            if order.get("filled_quantity", 0) > 0:
                partial_fill_time = order.get("partial_fill_time", order_time)
                if isinstance(partial_fill_time, str):
                    partial_fill_time = datetime.fromisoformat(partial_fill_time)

                partial_fill_age = (current_time - partial_fill_time).total_seconds()
                if partial_fill_age > 60:  # 1 minute for partial fills
                    issues.append({
                        "type": "partial_fill_stale",
                        "severity": "high",
                        "partial_fill_age": partial_fill_age
                    })

            # Check if order size is too large for current market
            if self.is_order_size_excessive(order):
                issues.append({
                    "type": "excessive_size",
                    "severity": "medium",
                    "order_size": order.get("quantity", 0)
                })

            if issues:
                health_issues.append({
                    "order_id": order_id,
                    "symbol": order.get("symbol"),
                    "side": order.get("side"),
                    "issues": issues
                })

        return health_issues

    def is_order_size_excessive(self, order: Dict) -> bool:
        """Check if order size is excessive for current market conditions"""

        # This would integrate with market data to check against average trade size
        # For now, use a simple heuristic
        order_size = order.get("quantity", 0)
        symbol = order.get("symbol", "")

        # Get average trade size for symbol
        avg_trade_size = self.get_average_trade_size(symbol)

        # Order is excessive if > 10x average trade size
        return order_size > avg_trade_size * 10

    def get_average_trade_size(self, symbol: str) -> float:
        """Get average trade size for symbol"""

        # This would integrate with market data provider
        # For now, return default values
        default_sizes = {
            "BTC": 0.1,
            "ETH": 1.0,
            "SPY": 100,
            "QQQ": 50
        }

        return default_sizes.get(symbol, 100.0)

    async def trigger_health_alerts(self, health_issues: List[Dict]):
        """Trigger alerts for order health issues"""

        for issue in health_issues:
            # Log the issue
            print(f"Order health issue: {issue}")

            # Send to alert system
            await self.send_alert(issue)

    async def send_alert(self, issue: Dict):
        """Send alert for order health issue"""

        # This would integrate with alert system
        # For now, just log
        print(f"Alert: {issue}")

    def get_order_status(self, order_id: str) -> Dict:
        """Get detailed status of specific order"""

        if order_id in self.order_cache:
            return self.order_cache[order_id]

        return {}

    def get_all_orders_status(self) -> List[Dict]:
        """Get status of all monitored orders"""

        return list(self.order_cache.values())

2. Market Change Detector

Detects significant market movements:

class MarketChangeDetector:
    def __init__(self, market_data_provider, price_movement_analyzer):
        self.market_data_provider = market_data_provider
        self.price_movement_analyzer = price_movement_analyzer
        self.price_history = {}
        self.change_thresholds = {
            "price_change": 0.002,  # 0.2% price change
            "spread_change": 0.001,  # 0.1% spread change
            "volume_spike": 2.0,     # 2x volume increase
            "volatility_increase": 1.5  # 1.5x volatility increase
        }

    def detect_market_shift(self, symbol: str, order_price: float, 
                          order_side: str) -> Dict:
        """Detect if market has shifted significantly"""

        # Get current market data
        current_market = self.get_current_market_data(symbol)

        if not current_market:
            return {"shift_detected": False, "reason": "no_market_data"}

        # Calculate price deviation
        price_deviation = self.calculate_price_deviation(
            order_price, current_market, order_side
        )

        # Check spread changes
        spread_change = self.calculate_spread_change(symbol)

        # Check volume changes
        volume_change = self.calculate_volume_change(symbol)

        # Check volatility changes
        volatility_change = self.calculate_volatility_change(symbol)

        # Determine if shift is significant
        shift_detected = (
            price_deviation > self.change_thresholds["price_change"] or
            spread_change > self.change_thresholds["spread_change"] or
            volume_change > self.change_thresholds["volume_spike"] or
            volatility_change > self.change_thresholds["volatility_increase"]
        )

        return {
            "shift_detected": shift_detected,
            "price_deviation": price_deviation,
            "spread_change": spread_change,
            "volume_change": volume_change,
            "volatility_change": volatility_change,
            "current_market": current_market,
            "thresholds": self.change_thresholds
        }

    def get_current_market_data(self, symbol: str) -> Dict:
        """Get current market data for symbol"""

        try:
            # Get order book
            order_book = self.market_data_provider.get_order_book(symbol)

            # Get recent trades
            recent_trades = self.market_data_provider.get_recent_trades(symbol, limit=100)

            # Calculate market metrics
            best_bid = order_book["bids"][0]["price"] if order_book["bids"] else 0
            best_ask = order_book["asks"][0]["price"] if order_book["asks"] else 0
            mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0
            spread = (best_ask - best_bid) / best_bid if best_bid else 0

            return {
                "symbol": symbol,
                "best_bid": best_bid,
                "best_ask": best_ask,
                "mid_price": mid_price,
                "spread": spread,
                "timestamp": datetime.now().isoformat()
            }

        except Exception as e:
            print(f"Error getting market data for {symbol}: {e}")
            return {}

    def calculate_price_deviation(self, order_price: float, current_market: Dict, 
                                order_side: str) -> float:
        """Calculate price deviation from current market"""

        if not current_market:
            return 0.0

        if order_side == "buy":
            # For buy orders, compare with best ask
            market_price = current_market["best_ask"]
        else:
            # For sell orders, compare with best bid
            market_price = current_market["best_bid"]

        if market_price <= 0:
            return 0.0

        deviation = abs(order_price - market_price) / market_price
        return deviation

    def calculate_spread_change(self, symbol: str) -> float:
        """Calculate spread change over time"""

        # Get historical spread data
        historical_spreads = self.get_historical_spreads(symbol)

        if len(historical_spreads) < 2:
            return 0.0

        current_spread = historical_spreads[-1]
        avg_spread = sum(historical_spreads[:-1]) / len(historical_spreads[:-1])

        if avg_spread <= 0:
            return 0.0

        spread_change = (current_spread - avg_spread) / avg_spread
        return abs(spread_change)

    def calculate_volume_change(self, symbol: str) -> float:
        """Calculate volume change over time"""

        # Get recent volume data
        recent_volume = self.get_recent_volume(symbol)

        if len(recent_volume) < 2:
            return 1.0

        current_volume = recent_volume[-1]
        avg_volume = sum(recent_volume[:-1]) / len(recent_volume[:-1])

        if avg_volume <= 0:
            return 1.0

        volume_change = current_volume / avg_volume
        return volume_change

    def calculate_volatility_change(self, symbol: str) -> float:
        """Calculate volatility change over time"""

        # Get recent price data
        recent_prices = self.get_recent_prices(symbol)

        if len(recent_prices) < 20:
            return 1.0

        # Calculate current volatility
        current_volatility = self.calculate_volatility(recent_prices[-20:])

        # Calculate historical volatility
        historical_volatility = self.calculate_volatility(recent_prices[:-20])

        if historical_volatility <= 0:
            return 1.0

        volatility_change = current_volatility / historical_volatility
        return volatility_change

    def calculate_volatility(self, prices: List[float]) -> float:
        """Calculate price volatility"""

        if len(prices) < 2:
            return 0.0

        returns = []
        for i in range(1, len(prices)):
            if prices[i-1] > 0:
                returns.append((prices[i] - prices[i-1]) / prices[i-1])

        if not returns:
            return 0.0

        return np.std(returns)

    def get_historical_spreads(self, symbol: str) -> List[float]:
        """Get historical spread data"""

        # This would integrate with market data provider
        # For now, return placeholder data
        return [0.001, 0.0012, 0.0008, 0.0011, 0.001]

    def get_recent_volume(self, symbol: str) -> List[float]:
        """Get recent volume data"""

        # This would integrate with market data provider
        # For now, return placeholder data
        return [1000, 1200, 800, 1100, 1500]

    def get_recent_prices(self, symbol: str) -> List[float]:
        """Get recent price data"""

        # This would integrate with market data provider
        # For now, return placeholder data
        base_price = 100.0
        return [base_price + i * 0.1 for i in range(50)]

3. Cancel-Replace Decider

Makes intelligent decisions about order cancellation and replacement:

class CancelReplaceDecider:
    def __init__(self, strategy_router, risk_assessor, timing_optimizer):
        self.strategy_router = strategy_router
        self.risk_assessor = risk_assessor
        self.timing_optimizer = timing_optimizer
        self.decision_history = []

    def should_cancel_and_replace(self, order: Dict, market_state: Dict) -> Dict:
        """Decide whether to cancel and replace an order"""

        # Get strategy-specific decision logic
        strategy = order.get("strategy", "default")
        decision_strategy = self.strategy_router.get_strategy(strategy)

        # Assess risk
        risk_assessment = self.risk_assessor.assess_order_risk(order, market_state)

        # Check timing
        timing_check = self.timing_optimizer.check_timing(order)

        # Make decision
        decision = decision_strategy.make_decision(
            order, market_state, risk_assessment, timing_check
        )

        # Record decision
        self.record_decision(order, market_state, decision)

        return decision

    def record_decision(self, order: Dict, market_state: Dict, decision: Dict):
        """Record decision for analysis"""

        decision_record = {
            "order_id": order.get("order_id"),
            "symbol": order.get("symbol"),
            "timestamp": datetime.now().isoformat(),
            "market_state": market_state,
            "decision": decision
        }

        self.decision_history.append(decision_record)

        # Keep history manageable
        if len(self.decision_history) > 10000:
            self.decision_history = self.decision_history[-10000:]

    def get_decision_history(self, symbol: str = None, hours: int = 24) -> List[Dict]:
        """Get decision history"""

        cutoff_time = datetime.now() - timedelta(hours=hours)

        filtered_history = [
            record for record in self.decision_history
            if datetime.fromisoformat(record["timestamp"]) > cutoff_time
        ]

        if symbol:
            filtered_history = [
                record for record in filtered_history
                if record["symbol"] == symbol
            ]

        return filtered_history

4. Cancel Executor

Executes order cancellations:

class CancelExecutor:
    def __init__(self, order_manager, rate_limiter):
        self.order_manager = order_manager
        self.rate_limiter = rate_limiter
        self.cancel_history = []

    async def cancel_order(self, order_id: str, reason: str = "market_change") -> Dict:
        """Cancel an order"""

        # Check rate limits
        if not self.rate_limiter.can_cancel():
            return {
                "status": "rate_limited",
                "order_id": order_id,
                "reason": "rate_limit_exceeded"
            }

        try:
            # Execute cancellation
            cancel_result = await self.order_manager.cancel_order(order_id)

            # Record cancellation
            self.record_cancellation(order_id, reason, cancel_result)

            # Update rate limiter
            self.rate_limiter.record_cancel()

            return {
                "status": "success",
                "order_id": order_id,
                "cancel_result": cancel_result,
                "timestamp": datetime.now().isoformat()
            }

        except Exception as e:
            return {
                "status": "failed",
                "order_id": order_id,
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

    def record_cancellation(self, order_id: str, reason: str, result: Dict):
        """Record cancellation for analysis"""

        cancel_record = {
            "order_id": order_id,
            "reason": reason,
            "result": result,
            "timestamp": datetime.now().isoformat()
        }

        self.cancel_history.append(cancel_record)

        # Keep history manageable
        if len(self.cancel_history) > 10000:
            self.cancel_history = self.cancel_history[-10000:]

    async def cancel_multiple_orders(self, order_ids: List[str], 
                                   reason: str = "batch_cancel") -> List[Dict]:
        """Cancel multiple orders"""

        results = []

        for order_id in order_ids:
            result = await self.cancel_order(order_id, reason)
            results.append(result)

            # Small delay between cancellations
            await asyncio.sleep(0.1)

        return results

5. Replace Executor

Executes order replacements:

class ReplaceExecutor:
    def __init__(self, order_manager, rate_limiter, price_calculator):
        self.order_manager = order_manager
        self.rate_limiter = rate_limiter
        self.price_calculator = price_calculator
        self.replace_history = []

    async def replace_order(self, original_order: Dict, new_price: float, 
                          reason: str = "price_update") -> Dict:
        """Replace an order with new price"""

        # Check rate limits
        if not self.rate_limiter.can_place():
            return {
                "status": "rate_limited",
                "order_id": original_order["order_id"],
                "reason": "rate_limit_exceeded"
            }

        try:
            # Calculate optimal new order parameters
            new_order_params = self.calculate_new_order_params(
                original_order, new_price
            )

            # Place new order
            new_order_result = await self.order_manager.place_order(new_order_params)

            # Record replacement
            self.record_replacement(original_order, new_order_params, new_order_result, reason)

            # Update rate limiter
            self.rate_limiter.record_place()

            return {
                "status": "success",
                "original_order_id": original_order["order_id"],
                "new_order_id": new_order_result.get("order_id"),
                "new_price": new_price,
                "reason": reason,
                "timestamp": datetime.now().isoformat()
            }

        except Exception as e:
            return {
                "status": "failed",
                "order_id": original_order["order_id"],
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

    def calculate_new_order_params(self, original_order: Dict, new_price: float) -> Dict:
        """Calculate parameters for new order"""

        # Get remaining quantity
        remaining_quantity = original_order.get("quantity", 0) - original_order.get("filled_quantity", 0)

        # Calculate new order parameters
        new_params = {
            "symbol": original_order["symbol"],
            "side": original_order["side"],
            "quantity": remaining_quantity,
            "price": new_price,
            "order_type": original_order.get("order_type", "limit"),
            "time_in_force": original_order.get("time_in_force", "GTC"),
            "strategy": original_order.get("strategy", "default"),
            "parent_order_id": original_order.get("order_id")
        }

        return new_params

    def record_replacement(self, original_order: Dict, new_params: Dict, 
                         result: Dict, reason: str):
        """Record replacement for analysis"""

        replace_record = {
            "original_order_id": original_order["order_id"],
            "new_order_id": result.get("order_id"),
            "original_price": original_order.get("price"),
            "new_price": new_params["price"],
            "reason": reason,
            "result": result,
            "timestamp": datetime.now().isoformat()
        }

        self.replace_history.append(replace_record)

        # Keep history manageable
        if len(self.replace_history) > 10000:
            self.replace_history = self.replace_history[-10000:]

API Design

Cancel-Replace API

@router.post("/cancel_replace/check")
async def check_orders_for_replacement():
    """Check all orders for potential replacement"""

    # Get all open orders
    open_orders = order_monitor.get_all_orders_status()

    decisions = []
    for order in open_orders:
        # Check market state
        market_state = market_detector.detect_market_shift(
            order["symbol"], order["price"], order["side"]
        )

        # Make decision
        decision = decider.should_cancel_and_replace(order, market_state)
        decisions.append(decision)

    return {"decisions": decisions, "total_orders": len(open_orders)}

@router.post("/cancel_replace/execute/{order_id}")
async def execute_cancel_replace(order_id: str, reason: str = "manual"):
    """Execute cancel and replace for specific order"""

    # Get order details
    order = order_monitor.get_order_status(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")

    # Check market state
    market_state = market_detector.detect_market_shift(
        order["symbol"], order["price"], order["side"]
    )

    # Make decision
    decision = decider.should_cancel_and_replace(order, market_state)

    if decision["should_replace"]:
        # Cancel original order
        cancel_result = await cancel_executor.cancel_order(order_id, reason)

        # Place new order
        new_price = decision["recommended_price"]
        replace_result = await replace_executor.replace_order(order, new_price, reason)

        return {
            "order_id": order_id,
            "decision": decision,
            "cancel_result": cancel_result,
            "replace_result": replace_result
        }
    else:
        return {
            "order_id": order_id,
            "decision": decision,
            "action": "no_action_needed"
        }

@router.get("/cancel_replace/status")
async def get_system_status():
    """Get system status and statistics"""

    return {
        "monitored_orders": len(order_monitor.get_all_orders_status()),
        "recent_decisions": len(decider.get_decision_history(hours=1)),
        "recent_cancellations": len(cancel_executor.cancel_history[-100:]),
        "recent_replacements": len(replace_executor.replace_history[-100:]),
        "rate_limits": {
            "cancel_remaining": rate_limiter.get_cancel_remaining(),
            "place_remaining": rate_limiter.get_place_remaining()
        }
    }

Frontend Integration

Cancel-Replace Dashboard

const CancelReplaceView: React.FC = () => {
  const [openOrders, setOpenOrders] = useState<Order[]>([]);
  const [decisions, setDecisions] = useState<Decision[]>([]);
  const [systemStatus, setSystemStatus] = useState<SystemStatus | null>(null);

  return (
    <div className="cancel-replace-dashboard">
      {/* System Status */}
      <SystemStatusPanel 
        status={systemStatus}
      />

      {/* Open Orders */}
      <OpenOrdersPanel 
        orders={openOrders}
        onCheckReplacement={handleCheckReplacement}
      />

      {/* Replacement Decisions */}
      <ReplacementDecisionsPanel 
        decisions={decisions}
      />

      {/* Order Health Heatmap */}
      <OrderHealthHeatmapPanel 
        orders={openOrders}
      />

      {/* Cancel/Replace History */}
      <CancelReplaceHistoryPanel 
        orders={openOrders}
      />

      {/* Performance Metrics */}
      <PerformanceMetricsPanel 
        orders={openOrders}
      />

      {/* Rate Limiting Status */}
      <RateLimitingPanel 
        status={systemStatus}
      />
    </div>
  );
};

Implementation Roadmap

Phase 1: Core Infrastructure (Weeks 1-2)

  • Set up order monitoring framework
  • Implement market change detection
  • Create basic cancel-replace logic

Phase 2: Advanced Decision Making (Weeks 3-4)

  • Develop strategy-specific decision algorithms
  • Implement risk assessment and timing optimization
  • Build rate limiting and protection mechanisms

Phase 3: Execution & Analytics (Weeks 5-6)

  • Create comprehensive execution engine
  • Implement performance tracking and analytics
  • Build historical analysis capabilities

Phase 4: Integration & Optimization (Weeks 7-8)

  • Integrate with existing order management system
  • Develop comprehensive dashboard
  • Performance optimization and testing

Business Value

Strategic Benefits

  1. Optimal Order Management: Maintain orders at optimal prices
  2. Reduced Slippage: Minimize execution costs through timely updates
  3. Improved Fill Rates: Higher probability of order execution
  4. Risk Management: Proactive management of stale or mispriced orders

Operational Benefits

  1. Automated Order Management: Systematic order monitoring and updates
  2. Real-Time Response: Immediate reaction to market changes
  3. Cost Efficiency: Reduced execution costs through better pricing
  4. Compliance: Rate limiting to prevent exchange violations

Technical Specifications

Performance Requirements

  • Order Monitoring: < 100ms for order status updates
  • Market Detection: < 50ms for market change detection
  • Decision Making: < 20ms for cancel-replace decisions
  • Order Execution: < 200ms for cancel and replace operations

Risk Management Requirements

  • Rate Limiting: Prevent excessive trading activity
  • Price Protection: Avoid adverse price movements
  • Execution Safety: Ensure reliable order management
  • Monitoring: Comprehensive oversight of all operations

This Smart Cancel-and-Replace System provides institutional-grade order management capabilities, enabling sophisticated order optimization and dynamic price adjustment, similar to the systems used by major market makers like Jump, DRW, and IMC.