53. Smart Cancel-and-Replace System¶
Overview¶
The Smart Cancel-and-Replace System provides intelligent order management through real-time monitoring of open orders, automatic detection of market changes, and dynamic order replacement to maintain optimal pricing and execution efficiency. The system continuously monitors order status, detects when orders become stale or mispriced due to market movements, and automatically cancels and replaces orders with updated prices while implementing rate limiting to prevent excessive trading activity.
Core Capabilities¶
- Real-Time Order Monitoring: Continuous tracking of open order status and execution progress
- Market Change Detection: Automatic detection of significant market movements and price shifts
- Intelligent Decision Making: Smart algorithms to determine when orders should be cancelled and replaced
- Dynamic Price Adjustment: Automatic price updates based on current market conditions
- Rate Limiting: Protection against excessive cancel-replace operations
- Multi-Strategy Support: Different cancel-replace logic for various trading strategies
- Risk Management: Protection against adverse price movements and execution slippage
System Architecture¶
Microservice: cancel-replace-center¶
services/cancel-replace-center/
├── src/
│ ├── main.py
│ ├── monitor/
│ │ ├── order_status_monitor.py
│ │ ├── execution_tracker.py
│ │ └── order_health_checker.py
│ ├── detector/
│ │ ├── market_change_detector.py
│ │ ├── price_movement_analyzer.py
│ │ ├── volatility_monitor.py
│ │ └── spread_analyzer.py
│ ├── decider/
│ │ ├── cancel_replace_decider.py
│ │ ├── strategy_router.py
│ │ ├── risk_assessor.py
│ │ └── timing_optimizer.py
│ ├── executor/
│ │ ├── cancel_executor.py
│ │ ├── replace_executor.py
│ │ ├── order_router.py
│ │ └── rate_limiter.py
│ ├── strategies/
│ │ ├── time_based_strategy.py
│ │ ├── market_change_strategy.py
│ │ ├── partial_fill_strategy.py
│ │ └── adaptive_strategy.py
│ ├── analytics/
│ │ ├── performance_tracker.py
│ │ ├── cost_analyzer.py
│ │ └── effectiveness_metrics.py
│ ├── api/
│ │ ├── cancel_replace_api.py
│ ├── config.py
│ └── requirements.txt
├── Dockerfile
└── tests/
Core Components¶
1. Order Status Monitor¶
Real-time monitoring of open orders:
class OrderStatusMonitor:
def __init__(self, order_manager, execution_tracker):
self.order_manager = order_manager
self.execution_tracker = execution_tracker
self.order_cache = {}
self.monitoring_active = True
async def start_monitoring(self):
"""Start continuous order monitoring"""
while self.monitoring_active:
try:
# Fetch all open orders
open_orders = await self.fetch_open_orders()
# Update order cache
self.update_order_cache(open_orders)
# Check order health
health_issues = self.check_order_health(open_orders)
# Trigger alerts for critical issues
if health_issues:
await self.trigger_health_alerts(health_issues)
# Wait before next check
await asyncio.sleep(1) # 1-second monitoring interval
except Exception as e:
print(f"Error in order monitoring: {e}")
await asyncio.sleep(5) # Longer wait on error
async def fetch_open_orders(self) -> List[Dict]:
"""Fetch all open orders from all venues"""
all_orders = []
# Fetch from all connected exchanges
exchanges = self.order_manager.get_connected_exchanges()
for exchange in exchanges:
try:
exchange_orders = await self.order_manager.get_open_orders(exchange)
all_orders.extend(exchange_orders)
except Exception as e:
print(f"Error fetching orders from {exchange}: {e}")
return all_orders
def update_order_cache(self, orders: List[Dict]):
"""Update internal order cache"""
current_time = datetime.now()
for order in orders:
order_id = order["order_id"]
if order_id in self.order_cache:
# Update existing order
cached_order = self.order_cache[order_id]
cached_order.update(order)
cached_order["last_updated"] = current_time
else:
# Add new order
order["created_at"] = current_time
order["last_updated"] = current_time
self.order_cache[order_id] = order
def check_order_health(self, orders: List[Dict]) -> List[Dict]:
"""Check health of all orders"""
health_issues = []
current_time = datetime.now()
for order in orders:
order_id = order["order_id"]
order_time = order.get("created_at", current_time)
# Calculate order age
if isinstance(order_time, str):
order_time = datetime.fromisoformat(order_time)
order_age = (current_time - order_time).total_seconds()
# Check for health issues
issues = []
# Check if order is too old
max_age = order.get("max_age_seconds", 300) # 5 minutes default
if order_age > max_age:
issues.append({
"type": "order_too_old",
"severity": "medium",
"age_seconds": order_age,
"max_age_seconds": max_age
})
# Check if order is partially filled for too long
if order.get("filled_quantity", 0) > 0:
partial_fill_time = order.get("partial_fill_time", order_time)
if isinstance(partial_fill_time, str):
partial_fill_time = datetime.fromisoformat(partial_fill_time)
partial_fill_age = (current_time - partial_fill_time).total_seconds()
if partial_fill_age > 60: # 1 minute for partial fills
issues.append({
"type": "partial_fill_stale",
"severity": "high",
"partial_fill_age": partial_fill_age
})
# Check if order size is too large for current market
if self.is_order_size_excessive(order):
issues.append({
"type": "excessive_size",
"severity": "medium",
"order_size": order.get("quantity", 0)
})
if issues:
health_issues.append({
"order_id": order_id,
"symbol": order.get("symbol"),
"side": order.get("side"),
"issues": issues
})
return health_issues
def is_order_size_excessive(self, order: Dict) -> bool:
"""Check if order size is excessive for current market conditions"""
# This would integrate with market data to check against average trade size
# For now, use a simple heuristic
order_size = order.get("quantity", 0)
symbol = order.get("symbol", "")
# Get average trade size for symbol
avg_trade_size = self.get_average_trade_size(symbol)
# Order is excessive if > 10x average trade size
return order_size > avg_trade_size * 10
def get_average_trade_size(self, symbol: str) -> float:
"""Get average trade size for symbol"""
# This would integrate with market data provider
# For now, return default values
default_sizes = {
"BTC": 0.1,
"ETH": 1.0,
"SPY": 100,
"QQQ": 50
}
return default_sizes.get(symbol, 100.0)
async def trigger_health_alerts(self, health_issues: List[Dict]):
"""Trigger alerts for order health issues"""
for issue in health_issues:
# Log the issue
print(f"Order health issue: {issue}")
# Send to alert system
await self.send_alert(issue)
async def send_alert(self, issue: Dict):
"""Send alert for order health issue"""
# This would integrate with alert system
# For now, just log
print(f"Alert: {issue}")
def get_order_status(self, order_id: str) -> Dict:
"""Get detailed status of specific order"""
if order_id in self.order_cache:
return self.order_cache[order_id]
return {}
def get_all_orders_status(self) -> List[Dict]:
"""Get status of all monitored orders"""
return list(self.order_cache.values())
2. Market Change Detector¶
Detects significant market movements:
class MarketChangeDetector:
def __init__(self, market_data_provider, price_movement_analyzer):
self.market_data_provider = market_data_provider
self.price_movement_analyzer = price_movement_analyzer
self.price_history = {}
self.change_thresholds = {
"price_change": 0.002, # 0.2% price change
"spread_change": 0.001, # 0.1% spread change
"volume_spike": 2.0, # 2x volume increase
"volatility_increase": 1.5 # 1.5x volatility increase
}
def detect_market_shift(self, symbol: str, order_price: float,
order_side: str) -> Dict:
"""Detect if market has shifted significantly"""
# Get current market data
current_market = self.get_current_market_data(symbol)
if not current_market:
return {"shift_detected": False, "reason": "no_market_data"}
# Calculate price deviation
price_deviation = self.calculate_price_deviation(
order_price, current_market, order_side
)
# Check spread changes
spread_change = self.calculate_spread_change(symbol)
# Check volume changes
volume_change = self.calculate_volume_change(symbol)
# Check volatility changes
volatility_change = self.calculate_volatility_change(symbol)
# Determine if shift is significant
shift_detected = (
price_deviation > self.change_thresholds["price_change"] or
spread_change > self.change_thresholds["spread_change"] or
volume_change > self.change_thresholds["volume_spike"] or
volatility_change > self.change_thresholds["volatility_increase"]
)
return {
"shift_detected": shift_detected,
"price_deviation": price_deviation,
"spread_change": spread_change,
"volume_change": volume_change,
"volatility_change": volatility_change,
"current_market": current_market,
"thresholds": self.change_thresholds
}
def get_current_market_data(self, symbol: str) -> Dict:
"""Get current market data for symbol"""
try:
# Get order book
order_book = self.market_data_provider.get_order_book(symbol)
# Get recent trades
recent_trades = self.market_data_provider.get_recent_trades(symbol, limit=100)
# Calculate market metrics
best_bid = order_book["bids"][0]["price"] if order_book["bids"] else 0
best_ask = order_book["asks"][0]["price"] if order_book["asks"] else 0
mid_price = (best_bid + best_ask) / 2 if best_bid and best_ask else 0
spread = (best_ask - best_bid) / best_bid if best_bid else 0
return {
"symbol": symbol,
"best_bid": best_bid,
"best_ask": best_ask,
"mid_price": mid_price,
"spread": spread,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"Error getting market data for {symbol}: {e}")
return {}
def calculate_price_deviation(self, order_price: float, current_market: Dict,
order_side: str) -> float:
"""Calculate price deviation from current market"""
if not current_market:
return 0.0
if order_side == "buy":
# For buy orders, compare with best ask
market_price = current_market["best_ask"]
else:
# For sell orders, compare with best bid
market_price = current_market["best_bid"]
if market_price <= 0:
return 0.0
deviation = abs(order_price - market_price) / market_price
return deviation
def calculate_spread_change(self, symbol: str) -> float:
"""Calculate spread change over time"""
# Get historical spread data
historical_spreads = self.get_historical_spreads(symbol)
if len(historical_spreads) < 2:
return 0.0
current_spread = historical_spreads[-1]
avg_spread = sum(historical_spreads[:-1]) / len(historical_spreads[:-1])
if avg_spread <= 0:
return 0.0
spread_change = (current_spread - avg_spread) / avg_spread
return abs(spread_change)
def calculate_volume_change(self, symbol: str) -> float:
"""Calculate volume change over time"""
# Get recent volume data
recent_volume = self.get_recent_volume(symbol)
if len(recent_volume) < 2:
return 1.0
current_volume = recent_volume[-1]
avg_volume = sum(recent_volume[:-1]) / len(recent_volume[:-1])
if avg_volume <= 0:
return 1.0
volume_change = current_volume / avg_volume
return volume_change
def calculate_volatility_change(self, symbol: str) -> float:
"""Calculate volatility change over time"""
# Get recent price data
recent_prices = self.get_recent_prices(symbol)
if len(recent_prices) < 20:
return 1.0
# Calculate current volatility
current_volatility = self.calculate_volatility(recent_prices[-20:])
# Calculate historical volatility
historical_volatility = self.calculate_volatility(recent_prices[:-20])
if historical_volatility <= 0:
return 1.0
volatility_change = current_volatility / historical_volatility
return volatility_change
def calculate_volatility(self, prices: List[float]) -> float:
"""Calculate price volatility"""
if len(prices) < 2:
return 0.0
returns = []
for i in range(1, len(prices)):
if prices[i-1] > 0:
returns.append((prices[i] - prices[i-1]) / prices[i-1])
if not returns:
return 0.0
return np.std(returns)
def get_historical_spreads(self, symbol: str) -> List[float]:
"""Get historical spread data"""
# This would integrate with market data provider
# For now, return placeholder data
return [0.001, 0.0012, 0.0008, 0.0011, 0.001]
def get_recent_volume(self, symbol: str) -> List[float]:
"""Get recent volume data"""
# This would integrate with market data provider
# For now, return placeholder data
return [1000, 1200, 800, 1100, 1500]
def get_recent_prices(self, symbol: str) -> List[float]:
"""Get recent price data"""
# This would integrate with market data provider
# For now, return placeholder data
base_price = 100.0
return [base_price + i * 0.1 for i in range(50)]
3. Cancel-Replace Decider¶
Makes intelligent decisions about order cancellation and replacement:
class CancelReplaceDecider:
def __init__(self, strategy_router, risk_assessor, timing_optimizer):
self.strategy_router = strategy_router
self.risk_assessor = risk_assessor
self.timing_optimizer = timing_optimizer
self.decision_history = []
def should_cancel_and_replace(self, order: Dict, market_state: Dict) -> Dict:
"""Decide whether to cancel and replace an order"""
# Get strategy-specific decision logic
strategy = order.get("strategy", "default")
decision_strategy = self.strategy_router.get_strategy(strategy)
# Assess risk
risk_assessment = self.risk_assessor.assess_order_risk(order, market_state)
# Check timing
timing_check = self.timing_optimizer.check_timing(order)
# Make decision
decision = decision_strategy.make_decision(
order, market_state, risk_assessment, timing_check
)
# Record decision
self.record_decision(order, market_state, decision)
return decision
def record_decision(self, order: Dict, market_state: Dict, decision: Dict):
"""Record decision for analysis"""
decision_record = {
"order_id": order.get("order_id"),
"symbol": order.get("symbol"),
"timestamp": datetime.now().isoformat(),
"market_state": market_state,
"decision": decision
}
self.decision_history.append(decision_record)
# Keep history manageable
if len(self.decision_history) > 10000:
self.decision_history = self.decision_history[-10000:]
def get_decision_history(self, symbol: str = None, hours: int = 24) -> List[Dict]:
"""Get decision history"""
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered_history = [
record for record in self.decision_history
if datetime.fromisoformat(record["timestamp"]) > cutoff_time
]
if symbol:
filtered_history = [
record for record in filtered_history
if record["symbol"] == symbol
]
return filtered_history
4. Cancel Executor¶
Executes order cancellations:
class CancelExecutor:
def __init__(self, order_manager, rate_limiter):
self.order_manager = order_manager
self.rate_limiter = rate_limiter
self.cancel_history = []
async def cancel_order(self, order_id: str, reason: str = "market_change") -> Dict:
"""Cancel an order"""
# Check rate limits
if not self.rate_limiter.can_cancel():
return {
"status": "rate_limited",
"order_id": order_id,
"reason": "rate_limit_exceeded"
}
try:
# Execute cancellation
cancel_result = await self.order_manager.cancel_order(order_id)
# Record cancellation
self.record_cancellation(order_id, reason, cancel_result)
# Update rate limiter
self.rate_limiter.record_cancel()
return {
"status": "success",
"order_id": order_id,
"cancel_result": cancel_result,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "failed",
"order_id": order_id,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def record_cancellation(self, order_id: str, reason: str, result: Dict):
"""Record cancellation for analysis"""
cancel_record = {
"order_id": order_id,
"reason": reason,
"result": result,
"timestamp": datetime.now().isoformat()
}
self.cancel_history.append(cancel_record)
# Keep history manageable
if len(self.cancel_history) > 10000:
self.cancel_history = self.cancel_history[-10000:]
async def cancel_multiple_orders(self, order_ids: List[str],
reason: str = "batch_cancel") -> List[Dict]:
"""Cancel multiple orders"""
results = []
for order_id in order_ids:
result = await self.cancel_order(order_id, reason)
results.append(result)
# Small delay between cancellations
await asyncio.sleep(0.1)
return results
5. Replace Executor¶
Executes order replacements:
class ReplaceExecutor:
def __init__(self, order_manager, rate_limiter, price_calculator):
self.order_manager = order_manager
self.rate_limiter = rate_limiter
self.price_calculator = price_calculator
self.replace_history = []
async def replace_order(self, original_order: Dict, new_price: float,
reason: str = "price_update") -> Dict:
"""Replace an order with new price"""
# Check rate limits
if not self.rate_limiter.can_place():
return {
"status": "rate_limited",
"order_id": original_order["order_id"],
"reason": "rate_limit_exceeded"
}
try:
# Calculate optimal new order parameters
new_order_params = self.calculate_new_order_params(
original_order, new_price
)
# Place new order
new_order_result = await self.order_manager.place_order(new_order_params)
# Record replacement
self.record_replacement(original_order, new_order_params, new_order_result, reason)
# Update rate limiter
self.rate_limiter.record_place()
return {
"status": "success",
"original_order_id": original_order["order_id"],
"new_order_id": new_order_result.get("order_id"),
"new_price": new_price,
"reason": reason,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "failed",
"order_id": original_order["order_id"],
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def calculate_new_order_params(self, original_order: Dict, new_price: float) -> Dict:
"""Calculate parameters for new order"""
# Get remaining quantity
remaining_quantity = original_order.get("quantity", 0) - original_order.get("filled_quantity", 0)
# Calculate new order parameters
new_params = {
"symbol": original_order["symbol"],
"side": original_order["side"],
"quantity": remaining_quantity,
"price": new_price,
"order_type": original_order.get("order_type", "limit"),
"time_in_force": original_order.get("time_in_force", "GTC"),
"strategy": original_order.get("strategy", "default"),
"parent_order_id": original_order.get("order_id")
}
return new_params
def record_replacement(self, original_order: Dict, new_params: Dict,
result: Dict, reason: str):
"""Record replacement for analysis"""
replace_record = {
"original_order_id": original_order["order_id"],
"new_order_id": result.get("order_id"),
"original_price": original_order.get("price"),
"new_price": new_params["price"],
"reason": reason,
"result": result,
"timestamp": datetime.now().isoformat()
}
self.replace_history.append(replace_record)
# Keep history manageable
if len(self.replace_history) > 10000:
self.replace_history = self.replace_history[-10000:]
API Design¶
Cancel-Replace API¶
@router.post("/cancel_replace/check")
async def check_orders_for_replacement():
"""Check all orders for potential replacement"""
# Get all open orders
open_orders = order_monitor.get_all_orders_status()
decisions = []
for order in open_orders:
# Check market state
market_state = market_detector.detect_market_shift(
order["symbol"], order["price"], order["side"]
)
# Make decision
decision = decider.should_cancel_and_replace(order, market_state)
decisions.append(decision)
return {"decisions": decisions, "total_orders": len(open_orders)}
@router.post("/cancel_replace/execute/{order_id}")
async def execute_cancel_replace(order_id: str, reason: str = "manual"):
"""Execute cancel and replace for specific order"""
# Get order details
order = order_monitor.get_order_status(order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Check market state
market_state = market_detector.detect_market_shift(
order["symbol"], order["price"], order["side"]
)
# Make decision
decision = decider.should_cancel_and_replace(order, market_state)
if decision["should_replace"]:
# Cancel original order
cancel_result = await cancel_executor.cancel_order(order_id, reason)
# Place new order
new_price = decision["recommended_price"]
replace_result = await replace_executor.replace_order(order, new_price, reason)
return {
"order_id": order_id,
"decision": decision,
"cancel_result": cancel_result,
"replace_result": replace_result
}
else:
return {
"order_id": order_id,
"decision": decision,
"action": "no_action_needed"
}
@router.get("/cancel_replace/status")
async def get_system_status():
"""Get system status and statistics"""
return {
"monitored_orders": len(order_monitor.get_all_orders_status()),
"recent_decisions": len(decider.get_decision_history(hours=1)),
"recent_cancellations": len(cancel_executor.cancel_history[-100:]),
"recent_replacements": len(replace_executor.replace_history[-100:]),
"rate_limits": {
"cancel_remaining": rate_limiter.get_cancel_remaining(),
"place_remaining": rate_limiter.get_place_remaining()
}
}
Frontend Integration¶
Cancel-Replace Dashboard¶
const CancelReplaceView: React.FC = () => {
const [openOrders, setOpenOrders] = useState<Order[]>([]);
const [decisions, setDecisions] = useState<Decision[]>([]);
const [systemStatus, setSystemStatus] = useState<SystemStatus | null>(null);
return (
<div className="cancel-replace-dashboard">
{/* System Status */}
<SystemStatusPanel
status={systemStatus}
/>
{/* Open Orders */}
<OpenOrdersPanel
orders={openOrders}
onCheckReplacement={handleCheckReplacement}
/>
{/* Replacement Decisions */}
<ReplacementDecisionsPanel
decisions={decisions}
/>
{/* Order Health Heatmap */}
<OrderHealthHeatmapPanel
orders={openOrders}
/>
{/* Cancel/Replace History */}
<CancelReplaceHistoryPanel
orders={openOrders}
/>
{/* Performance Metrics */}
<PerformanceMetricsPanel
orders={openOrders}
/>
{/* Rate Limiting Status */}
<RateLimitingPanel
status={systemStatus}
/>
</div>
);
};
Implementation Roadmap¶
Phase 1: Core Infrastructure (Weeks 1-2)¶
- Set up order monitoring framework
- Implement market change detection
- Create basic cancel-replace logic
Phase 2: Advanced Decision Making (Weeks 3-4)¶
- Develop strategy-specific decision algorithms
- Implement risk assessment and timing optimization
- Build rate limiting and protection mechanisms
Phase 3: Execution & Analytics (Weeks 5-6)¶
- Create comprehensive execution engine
- Implement performance tracking and analytics
- Build historical analysis capabilities
Phase 4: Integration & Optimization (Weeks 7-8)¶
- Integrate with existing order management system
- Develop comprehensive dashboard
- Performance optimization and testing
Business Value¶
Strategic Benefits¶
- Optimal Order Management: Maintain orders at optimal prices
- Reduced Slippage: Minimize execution costs through timely updates
- Improved Fill Rates: Higher probability of order execution
- Risk Management: Proactive management of stale or mispriced orders
Operational Benefits¶
- Automated Order Management: Systematic order monitoring and updates
- Real-Time Response: Immediate reaction to market changes
- Cost Efficiency: Reduced execution costs through better pricing
- Compliance: Rate limiting to prevent exchange violations
Technical Specifications¶
Performance Requirements¶
- Order Monitoring: < 100ms for order status updates
- Market Detection: < 50ms for market change detection
- Decision Making: < 20ms for cancel-replace decisions
- Order Execution: < 200ms for cancel and replace operations
Risk Management Requirements¶
- Rate Limiting: Prevent excessive trading activity
- Price Protection: Avoid adverse price movements
- Execution Safety: Ensure reliable order management
- Monitoring: Comprehensive oversight of all operations
This Smart Cancel-and-Replace System provides institutional-grade order management capabilities, enabling sophisticated order optimization and dynamic price adjustment, similar to the systems used by major market makers like Jump, DRW, and IMC.