44. Smart Market Making Engine¶
Overview¶
The Smart Market Making Engine provides automated market making capabilities across multiple trading pairs, dynamically adjusting bid/ask spreads and order sizes based on market volatility, order book depth, and inventory positions. The system implements sophisticated market making strategies with real-time risk management, inventory control, and intelligent pricing mechanisms to provide liquidity while managing exposure and profitability.
Core Capabilities¶
- Multi-Pair Market Making: Simultaneous market making across multiple trading pairs
- Dynamic Spread Adjustment: Real-time spread adjustment based on market conditions
- Inventory Management: Automated inventory balancing to maintain neutral positions
- Smart Pricing Engine: Intelligent price adjustment based on market pressure
- Risk Management: Comprehensive risk controls for position limits and loss thresholds
- Real-Time Monitoring: Live monitoring of market making performance and status
System Architecture¶
Microservice: market-making-engine¶
services/market-making-engine/
├── src/
│ ├── main.py
│ ├── maker/
│ │ ├── quote_generator.py
│ │ └── order_manager.py
│ ├── monitor/
│ │ ├── volatility_monitor.py
│ │ ├── market_depth_monitor.py
│ │ └── trade_flow_monitor.py
│ ├── inventory/
│ │ ├── inventory_manager.py
│ │ └── position_balancer.py
│ ├── adjuster/
│ │ ├── smart_pricer.py
│ │ └── spread_adjuster.py
│ ├── risk/
│ │ ├── market_making_risk_manager.py
│ │ └── exposure_controller.py
│ ├── api/
│ │ ├── market_maker_api.py
│ ├── config.py
│ └── requirements.txt
├── Dockerfile
└── tests/
Core Components¶
1. Quote Generator¶
Generates bid/ask quotes with dynamic pricing:
import numpy as np
from typing import Dict, Tuple
class QuoteGenerator:
def __init__(self, base_spread=0.002, volatility_multiplier=2.0,
inventory_sensitivity=0.0005):
self.base_spread = base_spread
self.volatility_multiplier = volatility_multiplier
self.inventory_sensitivity = inventory_sensitivity
def generate_quotes(self, symbol: str, mid_price: float,
volatility: float, inventory_bias: float,
market_depth: Dict) -> Dict:
"""Generate bid/ask quotes based on market conditions"""
# Calculate dynamic spread based on volatility
dynamic_spread = self.base_spread + volatility * self.volatility_multiplier
# Adjust for inventory bias (lean against position)
inventory_adjustment = inventory_bias * self.inventory_sensitivity
# Calculate bid and ask prices
bid_price = mid_price * (1 - dynamic_spread/2 + inventory_adjustment)
ask_price = mid_price * (1 + dynamic_spread/2 + inventory_adjustment)
# Calculate order sizes based on market depth
bid_size = self.calculate_order_size(market_depth, 'bid', mid_price)
ask_size = self.calculate_order_size(market_depth, 'ask', mid_price)
return {
"symbol": symbol,
"bid": {"price": bid_price, "size": bid_size},
"ask": {"price": ask_price, "size": ask_size},
"spread": dynamic_spread,
"timestamp": datetime.now().isoformat()
}
def calculate_order_size(self, market_depth: Dict, side: str,
mid_price: float) -> float:
"""Calculate optimal order size based on market depth"""
if side not in market_depth:
return 1.0 # Default size
# Analyze order book depth
orders = market_depth[side]
if not orders:
return 1.0
# Calculate average size within reasonable price range
total_size = 0
count = 0
price_range = 0.01 # 1% price range
for order in orders:
if side == 'bid':
if order['price'] >= mid_price * (1 - price_range):
total_size += order['size']
count += 1
else: # ask
if order['price'] <= mid_price * (1 + price_range):
total_size += order['size']
count += 1
if count > 0:
avg_size = total_size / count
# Cap size to prevent excessive exposure
return min(avg_size * 0.5, 10.0)
return 1.0
2. Volatility Monitor¶
Monitors real-time market volatility:
class VolatilityMonitor:
def __init__(self, window_size=100, update_frequency=1):
self.window_size = window_size
self.update_frequency = update_frequency
self.price_history = {}
self.volatility_cache = {}
def update_price(self, symbol: str, price: float):
"""Update price history for volatility calculation"""
if symbol not in self.price_history:
self.price_history[symbol] = []
self.price_history[symbol].append({
'price': price,
'timestamp': datetime.now()
})
# Keep only recent prices
if len(self.price_history[symbol]) > self.window_size:
self.price_history[symbol] = self.price_history[symbol][-self.window_size:]
def compute_volatility(self, symbol: str) -> float:
"""Compute current volatility for a symbol"""
if symbol not in self.price_history:
return 0.01 # Default volatility
prices = [p['price'] for p in self.price_history[symbol]]
if len(prices) < 2:
return 0.01
# Calculate log returns
returns = np.diff(np.log(prices))
# Compute rolling volatility
if len(returns) >= 20:
# Use exponential weighted volatility
volatility = np.sqrt(np.average(returns**2, weights=np.exp(np.arange(len(returns)))))
else:
volatility = np.std(returns)
# Cache result
self.volatility_cache[symbol] = volatility
return volatility
def get_volatility_regime(self, symbol: str) -> str:
"""Classify volatility regime"""
volatility = self.compute_volatility(symbol)
if volatility < 0.01:
return "low"
elif volatility < 0.03:
return "medium"
else:
return "high"
3. Inventory Manager¶
Manages inventory positions and provides bias signals:
class InventoryManager:
def __init__(self, target_inventory=0.0, max_inventory=100.0,
rebalance_threshold=10.0):
self.target_inventory = target_inventory
self.max_inventory = max_inventory
self.rebalance_threshold = rebalance_threshold
self.current_inventory = {}
self.inventory_history = {}
def update_inventory(self, symbol: str, trade_side: str, size: float):
"""Update inventory after trade execution"""
if symbol not in self.current_inventory:
self.current_inventory[symbol] = 0.0
# Update inventory (positive for long, negative for short)
if trade_side == 'buy':
self.current_inventory[symbol] += size
else: # sell
self.current_inventory[symbol] -= size
# Record inventory change
self.record_inventory_change(symbol)
def compute_inventory_bias(self, symbol: str) -> float:
"""Compute inventory bias for quote adjustment"""
if symbol not in self.current_inventory:
return 0.0
current_pos = self.current_inventory[symbol]
# Calculate bias based on deviation from target
deviation = current_pos - self.target_inventory
# Normalize by max inventory
if self.max_inventory > 0:
normalized_deviation = deviation / self.max_inventory
else:
normalized_deviation = 0.0
# Bias should lean against position (inventory mean reversion)
bias = -normalized_deviation * 0.0005 # Small adjustment factor
return bias
def should_rebalance(self, symbol: str) -> bool:
"""Check if inventory rebalancing is needed"""
if symbol not in self.current_inventory:
return False
deviation = abs(self.current_inventory[symbol] - self.target_inventory)
return deviation > self.rebalance_threshold
def get_rebalance_signal(self, symbol: str) -> Dict:
"""Generate rebalancing signal"""
if not self.should_rebalance(symbol):
return {"action": "none", "size": 0.0}
current_pos = self.current_inventory[symbol]
target_pos = self.target_inventory
rebalance_size = target_pos - current_pos
return {
"action": "buy" if rebalance_size > 0 else "sell",
"size": abs(rebalance_size),
"urgency": "high" if abs(rebalance_size) > self.max_inventory * 0.5 else "medium"
}
def record_inventory_change(self, symbol: str):
"""Record inventory change for monitoring"""
if symbol not in self.inventory_history:
self.inventory_history[symbol] = []
self.inventory_history[symbol].append({
'timestamp': datetime.now(),
'inventory': self.current_inventory.get(symbol, 0.0)
})
# Keep only recent history
if len(self.inventory_history[symbol]) > 1000:
self.inventory_history[symbol] = self.inventory_history[symbol][-1000:]
4. Smart Pricer¶
Intelligent price adjustment based on market pressure:
class SmartPricer:
def __init__(self, pressure_threshold=0.1, adjustment_speed=0.001):
self.pressure_threshold = pressure_threshold
self.adjustment_speed = adjustment_speed
self.market_pressure = {}
def assess_market_pressure(self, symbol: str, order_book: Dict) -> str:
"""Assess current market pressure"""
if 'bids' not in order_book or 'asks' not in order_book:
return "normal"
# Calculate order book imbalance
bid_volume = sum(order['size'] for order in order_book['bids'][:5])
ask_volume = sum(order['size'] for order in order_book['asks'][:5])
if bid_volume == 0 or ask_volume == 0:
return "normal"
imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume)
if abs(imbalance) > self.pressure_threshold:
if imbalance > 0:
return "bid_pressure" # More bids than asks
else:
return "ask_pressure" # More asks than bids
return "normal"
def adjust_quotes(self, quotes: Dict, market_pressure: str) -> Dict:
"""Adjust quotes based on market pressure"""
adjusted_quotes = quotes.copy()
if market_pressure == "bid_pressure":
# Increase bid price to capture more flow
adjustment = self.adjustment_speed
adjusted_quotes['bid']['price'] *= (1 + adjustment)
adjusted_quotes['ask']['price'] *= (1 + adjustment * 0.5)
elif market_pressure == "ask_pressure":
# Decrease ask price to capture more flow
adjustment = self.adjustment_speed
adjusted_quotes['bid']['price'] *= (1 - adjustment * 0.5)
adjusted_quotes['ask']['price'] *= (1 - adjustment)
return adjusted_quotes
def emergency_adjustment(self, quotes: Dict, trade_flow: Dict) -> Dict:
"""Emergency price adjustment based on aggressive trading"""
adjusted_quotes = quotes.copy()
# Check for aggressive buying/selling
recent_buys = trade_flow.get('recent_buys', 0)
recent_sells = trade_flow.get('recent_sells', 0)
if recent_buys > recent_sells * 2: # Heavy buying
# Move prices up quickly
adjustment = self.adjustment_speed * 3
adjusted_quotes['bid']['price'] *= (1 + adjustment)
adjusted_quotes['ask']['price'] *= (1 + adjustment)
elif recent_sells > recent_buys * 2: # Heavy selling
# Move prices down quickly
adjustment = self.adjustment_speed * 3
adjusted_quotes['bid']['price'] *= (1 - adjustment)
adjusted_quotes['ask']['price'] *= (1 - adjustment)
return adjusted_quotes
5. Market Making Risk Manager¶
Comprehensive risk management for market making:
class MarketMakingRiskManager:
def __init__(self, max_position=100.0, max_loss=-500.0,
max_drawdown=-0.1, max_exposure=0.2):
self.max_position = max_position
self.max_loss = max_loss
self.max_drawdown = max_drawdown
self.max_exposure = max_exposure
self.current_pnl = {}
self.position_history = {}
self.risk_status = {}
def check_position_limits(self, symbol: str, position: float) -> bool:
"""Check if position is within limits"""
if abs(position) > self.max_position:
return False
return True
def check_loss_limits(self, symbol: str, pnl: float) -> bool:
"""Check if PnL is within loss limits"""
if pnl < self.max_loss:
return False
return True
def check_drawdown_limits(self, symbol: str, current_pnl: float,
peak_pnl: float) -> bool:
"""Check if drawdown is within limits"""
if peak_pnl <= 0:
return True
drawdown = (current_pnl - peak_pnl) / peak_pnl
if drawdown < self.max_drawdown:
return False
return True
def check_exposure_limits(self, symbol: str, position: float,
total_capital: float) -> bool:
"""Check if exposure is within limits"""
if total_capital <= 0:
return False
exposure = abs(position) / total_capital
if exposure > self.max_exposure:
return False
return True
def comprehensive_risk_check(self, symbol: str, position: float,
pnl: float, total_capital: float) -> Dict:
"""Perform comprehensive risk check"""
risk_status = {
"symbol": symbol,
"timestamp": datetime.now().isoformat(),
"position_ok": self.check_position_limits(symbol, position),
"loss_ok": self.check_loss_limits(symbol, pnl),
"exposure_ok": self.check_exposure_limits(symbol, position, total_capital),
"overall_ok": True
}
# Overall risk status
risk_status["overall_ok"] = (risk_status["position_ok"] and
risk_status["loss_ok"] and
risk_status["exposure_ok"])
# Store risk status
self.risk_status[symbol] = risk_status
return risk_status
def get_risk_alerts(self) -> List[Dict]:
"""Get current risk alerts"""
alerts = []
for symbol, status in self.risk_status.items():
if not status["overall_ok"]:
alerts.append({
"symbol": symbol,
"timestamp": status["timestamp"],
"issues": []
})
if not status["position_ok"]:
alerts[-1]["issues"].append("Position limit exceeded")
if not status["loss_ok"]:
alerts[-1]["issues"].append("Loss limit exceeded")
if not status["exposure_ok"]:
alerts[-1]["issues"].append("Exposure limit exceeded")
return alerts
def should_stop_market_making(self, symbol: str) -> bool:
"""Determine if market making should be stopped"""
if symbol not in self.risk_status:
return False
return not self.risk_status[symbol]["overall_ok"]
6. Order Manager¶
Manages order lifecycle and execution:
class OrderManager:
def __init__(self, exchange_client):
self.exchange_client = exchange_client
self.active_orders = {}
self.order_history = {}
async def place_quotes(self, quotes: Dict) -> Dict:
"""Place bid/ask quotes"""
symbol = quotes["symbol"]
bid_quote = quotes["bid"]
ask_quote = quotes["ask"]
try:
# Cancel existing orders for this symbol
await self.cancel_existing_orders(symbol)
# Place new orders
bid_order = await self.exchange_client.place_order(
symbol=symbol,
side="buy",
price=bid_quote["price"],
size=bid_quote["size"],
order_type="limit"
)
ask_order = await self.exchange_client.place_order(
symbol=symbol,
side="sell",
price=ask_quote["price"],
size=ask_quote["size"],
order_type="limit"
)
# Store active orders
self.active_orders[symbol] = {
"bid_order": bid_order,
"ask_order": ask_order,
"timestamp": datetime.now()
}
return {
"status": "success",
"bid_order_id": bid_order.get("order_id"),
"ask_order_id": ask_order.get("order_id")
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def cancel_existing_orders(self, symbol: str):
"""Cancel existing orders for a symbol"""
if symbol in self.active_orders:
orders = self.active_orders[symbol]
try:
if "bid_order" in orders:
await self.exchange_client.cancel_order(
symbol=symbol,
order_id=orders["bid_order"]["order_id"]
)
if "ask_order" in orders:
await self.exchange_client.cancel_order(
symbol=symbol,
order_id=orders["ask_order"]["order_id"]
)
except Exception as e:
# Log error but continue
print(f"Error canceling orders for {symbol}: {e}")
# Remove from active orders
del self.active_orders[symbol]
def handle_trade_execution(self, trade: Dict):
"""Handle trade execution callback"""
symbol = trade["symbol"]
side = trade["side"]
size = trade["size"]
price = trade["price"]
# Record trade
if symbol not in self.order_history:
self.order_history[symbol] = []
self.order_history[symbol].append({
"timestamp": datetime.now(),
"side": side,
"size": size,
"price": price,
"order_id": trade.get("order_id")
})
API Design¶
Market Making API¶
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import Dict, List, Optional
from pydantic import BaseModel
router = APIRouter()
class MarketMakingRequest(BaseModel):
symbol: str
max_position: float = 100.0
max_loss: float = -500.0
base_spread: float = 0.002
class MarketMakingStatus(BaseModel):
symbol: str
status: str
current_quotes: Optional[Dict] = None
inventory: float = 0.0
pnl: float = 0.0
risk_status: Optional[Dict] = None
@router.post("/maker/start", response_model=Dict)
async def start_market_making(request: MarketMakingRequest):
"""Start market making for a symbol"""
try:
# Initialize market making for symbol
result = await market_maker.start_making(
symbol=request.symbol,
max_position=request.max_position,
max_loss=request.max_loss,
base_spread=request.base_spread
)
return {
"status": "started",
"symbol": request.symbol,
"message": f"Market making started for {request.symbol}"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/maker/stop")
async def stop_market_making(symbol: str):
"""Stop market making for a symbol"""
try:
await market_maker.stop_making(symbol)
return {
"status": "stopped",
"symbol": symbol,
"message": f"Market making stopped for {symbol}"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/maker/status/{symbol}")
async def get_market_making_status(symbol: str):
"""Get market making status for a symbol"""
status = await market_maker.get_status(symbol)
if not status:
raise HTTPException(status_code=404, detail="Symbol not found")
return status
@router.get("/maker/active")
async def get_active_market_making():
"""Get all active market making symbols"""
active_symbols = await market_maker.get_active_symbols()
return {
"active_symbols": active_symbols,
"total_count": len(active_symbols)
}
@router.get("/maker/performance/{symbol}")
async def get_market_making_performance(symbol: str):
"""Get market making performance metrics"""
performance = await market_maker.get_performance(symbol)
if not performance:
raise HTTPException(status_code=404, detail="Performance data not found")
return performance
@router.get("/maker/risk/alerts")
async def get_risk_alerts():
"""Get current risk alerts"""
alerts = risk_manager.get_risk_alerts()
return {
"alerts": alerts,
"alert_count": len(alerts)
}
Frontend Integration¶
Market Making Dashboard¶
// MarketMakingView.tsx
interface MarketMakingStatus {
symbol: string;
status: 'active' | 'stopped' | 'error';
currentQuotes?: {
bid: { price: number; size: number };
ask: { price: number; size: number };
};
inventory: number;
pnl: number;
riskStatus?: {
positionOk: boolean;
lossOk: boolean;
exposureOk: boolean;
overallOk: boolean;
};
}
interface MarketMakingPerformance {
symbol: string;
totalPnL: number;
totalTrades: number;
avgSpread: number;
fillRate: number;
inventoryTurnover: number;
}
const MarketMakingView: React.FC = () => {
const [activeSymbols, setActiveSymbols] = useState<MarketMakingStatus[]>([]);
const [performance, setPerformance] = useState<MarketMakingPerformance[]>([]);
const [riskAlerts, setRiskAlerts] = useState<any[]>([]);
const startMarketMaking = async (symbol: string) => {
await api.post('/maker/start', { symbol });
// Refresh status
};
const stopMarketMaking = async (symbol: string) => {
await api.post('/maker/stop', null, { params: { symbol } });
// Refresh status
};
return (
<div className="market-making-dashboard">
{/* Market Making Status */}
<MarketMakingStatusPanel
symbols={activeSymbols}
onStart={startMarketMaking}
onStop={stopMarketMaking}
/>
{/* Quote Display */}
<QuoteDisplayPanel symbols={activeSymbols} />
{/* Inventory Management */}
<InventoryManagementPanel symbols={activeSymbols} />
{/* Performance Metrics */}
<PerformanceMetricsPanel performance={performance} />
{/* Risk Alerts */}
<RiskAlertsPanel alerts={riskAlerts} />
</div>
);
};
Key Visualizations¶
- Quote Display: Real-time bid/ask quotes with spread visualization
- Inventory Chart: Position tracking over time with rebalancing signals
- PnL Curve: Cumulative profit/loss with drawdown indicators
- Risk Status: Color-coded risk indicators (green=normal, red=alert)
- Market Depth: Order book visualization with market making orders
Implementation Roadmap¶
Phase 1: Core Infrastructure (Weeks 1-2)¶
- Set up quote generation and order management
- Implement basic inventory tracking
- Create risk management framework
Phase 2: Smart Pricing (Weeks 3-4)¶
- Develop volatility monitoring
- Implement smart pricing algorithms
- Build market pressure detection
Phase 3: Risk Management (Weeks 5-6)¶
- Create comprehensive risk controls
- Implement emergency procedures
- Build performance monitoring
Phase 4: Frontend & Integration (Weeks 7-8)¶
- Develop market making dashboard
- Integrate with exchange APIs
- Performance optimization and testing
System Integration¶
Integration Points¶
- Exchange APIs: Order placement and market data
- Risk Management: Position and exposure monitoring
- Portfolio Management: Inventory and PnL tracking
- Market Data: Real-time price and order book feeds
Configuration¶
# market-making-config.yaml
market_making:
base_spread: 0.002
volatility_multiplier: 2.0
inventory_sensitivity: 0.0005
update_frequency: 1
risk_limits:
max_position: 100.0
max_loss: -500.0
max_drawdown: -0.1
max_exposure: 0.2
inventory:
target_inventory: 0.0
max_inventory: 100.0
rebalance_threshold: 10.0
pricing:
pressure_threshold: 0.1
adjustment_speed: 0.001
emergency_multiplier: 3.0
Business Value¶
Strategic Benefits¶
- Liquidity Provision: Provide market liquidity and earn spread revenue
- Risk Management: Automated position management and risk controls
- Market Making: Professional-grade market making capabilities
- Revenue Generation: Consistent revenue from bid-ask spreads
Operational Benefits¶
- Automated Trading: 24/7 market making without manual intervention
- Risk Control: Built-in risk limits and exposure management
- Performance Monitoring: Real-time performance and risk tracking
- Scalable Architecture: Support for multiple symbols and markets
Technical Specifications¶
Performance Requirements¶
- Quote Update Speed: < 100ms for quote updates
- Order Execution: < 50ms for order placement
- Risk Monitoring: Real-time risk assessment
- Multi-Symbol Support: 100+ simultaneous symbols
Security & Compliance¶
- Order Validation: Comprehensive order validation and risk checks
- Access Control: Role-based permissions for market making operations
- Audit Trail: Complete logging of all market making activities
- Regulatory Compliance: Adherence to market making regulations
This Smart Market Making Engine provides institutional-grade capabilities for automated market making, enabling sophisticated liquidity provision with comprehensive risk management and intelligent pricing mechanisms.