70. Microservice Communication Patterns¶
Overview¶
In a fully microserviced quantitative trading system, services communicate through two primary patterns: synchronous RPC calls for immediate responses and asynchronous event-driven messaging for real-time data streams and scalable workflows. This dual approach ensures optimal performance, reliability, and scalability across the entire trading platform.
Communication Architecture¶
1. Synchronous Service-to-Service RPC¶
Direct request-response communication between microservices for immediate, guaranteed responses.
Common Solutions¶
| Protocol | Description | Use Cases |
|---|---|---|
| HTTP/REST | Standard web protocols, simple and widely supported | External APIs, dashboard interfaces |
| gRPC | High-performance RPC (HTTP/2 + Protobuf), low latency | Internal service calls, high-frequency operations |
| Thrift | Cross-language RPC framework, mature but less common | Legacy system integration |
gRPC Implementation Example¶
# strategy-engine calling market-data-center
import grpc
from market_data_pb2 import MarketDataRequest
from market_data_pb2_grpc import MarketDataServiceStub
class StrategyEngine:
def __init__(self):
self.market_data_channel = grpc.insecure_channel('market-data-center:50051')
self.market_data_stub = MarketDataServiceStub(self.market_data_channel)
def get_latest_quotes(self, symbol):
request = MarketDataRequest(symbol=symbol)
response = self.market_data_stub.GetLatestQuotes(request)
return response.quotes
# signal-aggregation-center calling strategy-evolution-center
class SignalAggregator:
def get_strategy_genealogy(self, strategy_id):
request = StrategyGenealogyRequest(strategy_id=strategy_id)
response = self.evolution_stub.GetGenealogy(request)
return response.genealogy
Suitable Scenarios¶
- Real-time responses required (strategy pulling latest market data)
- Service quality guarantees (order submission, account queries)
- Critical path operations (risk checks, position validation)
2. Asynchronous Event-Driven Communication¶
Event-based messaging for real-time data streams, scalable workflows, and loose coupling.
Message Broker Solutions¶
| Middleware | Description | Trading System Fit |
|---|---|---|
| NATS | Ultra-lightweight, high-speed Pub/Sub, low latency | Primary choice - perfect for trading systems |
| Kafka | Large-scale stream processing, high throughput | Alternative for massive data streams |
| RabbitMQ | Traditional message broker, ACK mechanisms | Less suitable for high-frequency trading |
NATS Implementation Example¶
# market-data-center publishing real-time data
import asyncio
import nats
from nats.aio.client import Client as NATS
class MarketDataPublisher:
def __init__(self):
self.nc = NATS()
async def publish_market_data(self, symbol, data):
await self.nc.publish(f"market.data.{symbol}", data.encode())
async def publish_tick_data(self, tick_data):
await self.nc.publish("market.data.tick", tick_data.encode())
# signal-aggregation-center subscribing to market data
class SignalAggregator:
async def subscribe_to_market_data(self):
await self.nc.subscribe("market.data.tick", cb=self.process_tick_data)
async def process_tick_data(self, msg):
tick_data = json.loads(msg.data.decode())
# Process tick data and update signals
await self.update_signals(tick_data)
# backtest-engine requesting backtest execution
class BacktestEngine:
async def request_backtest(self, strategy_config):
request_data = {
"strategy_id": strategy_config["id"],
"parameters": strategy_config["params"],
"timeframe": strategy_config["timeframe"]
}
await self.nc.publish("backtest.request", json.dumps(request_data).encode())
# ultra-fast-backtest-center consuming backtest requests
class BacktestExecutor:
async def subscribe_to_backtest_requests(self):
await self.nc.subscribe("backtest.request", cb=self.execute_backtest)
async def execute_backtest(self, msg):
request = json.loads(msg.data.decode())
result = await self.run_backtest(request)
await self.nc.publish("backtest.result", json.dumps(result).encode())
Suitable Scenarios¶
- Real-time data streams (market data, trade events, signal flows)
- Large-scale task distribution (backtest requests, risk scans)
- Loose coupling (independent service scaling and deployment)
- Elastic architecture (automatic horizontal scaling of consumers)
Communication Pattern Recommendations¶
Trading System Architecture Guidelines¶
| Communication Type | Use Case | Recommended Protocol | Example |
|---|---|---|---|
| Synchronous RPC | Immediate response required | gRPC | Account queries, order submission |
| Asynchronous Events | Real-time data streams, scalable workflows | NATS | Market data, trade events, backtest tasks |
| External APIs | Dashboard, third-party integration | HTTP/REST | Web interface, external system integration |
Implementation Strategy¶
High-Frequency Internal Calls (Low Latency)¶
# Use gRPC for critical path operations
class OrderExecutionService:
def __init__(self):
self.risk_channel = grpc.insecure_channel('risk-management:50051')
self.risk_stub = RiskServiceStub(self.risk_channel)
async def submit_order(self, order):
# Synchronous risk check
risk_request = RiskCheckRequest(order=order)
risk_response = await self.risk_stub.CheckRisk(risk_request)
if risk_response.approved:
# Submit to execution engine
return await self.execution_stub.SubmitOrder(order)
Real-Time Data Streams¶
# Use NATS for market data and events
class MarketDataService:
async def start_market_data_stream(self):
await self.nc.subscribe("market.data.*", cb=self.process_market_data)
await self.nc.subscribe("trade.events", cb=self.process_trade_events)
await self.nc.subscribe("risk.alerts", cb=self.process_risk_alerts)
External API Interface¶
# Use HTTP/REST for external access
from fastapi import FastAPI
app = FastAPI()
@app.get("/api/v1/portfolio/positions")
async def get_positions():
# Internal gRPC call to portfolio service
portfolio_request = PortfolioRequest()
portfolio_response = await portfolio_stub.GetPositions(portfolio_request)
return portfolio_response.positions
System Integration Benefits¶
Performance Optimization¶
- gRPC: Sub-millisecond latency for critical operations
- NATS: High-throughput event streaming with minimal overhead
- HTTP/REST: Standard interface for external systems
Scalability & Reliability¶
- Horizontal scaling: Independent service scaling based on load
- Fault isolation: Service failures don't cascade across the system
- Load distribution: Automatic load balancing across service instances
Development & Maintenance¶
- Technology diversity: Each service can use optimal technology stack
- Independent deployment: Services can be updated without system-wide downtime
- Clear boundaries: Well-defined interfaces between services
Business & Technical Value¶
- Real-time Performance: Optimized communication for high-frequency trading
- System Resilience: Fault-tolerant architecture with service isolation
- Operational Flexibility: Independent scaling and deployment capabilities
- Future-Proof Architecture: Supports growth from startup to institutional scale
- Competitive Advantage: Professional-grade microservice architecture
This dual communication approach ensures your trading system achieves optimal performance, reliability, and scalability while maintaining the flexibility to evolve and grow with your business needs.