Skip to content

70. Microservice Communication Patterns

Overview

In a fully microserviced quantitative trading system, services communicate through two primary patterns: synchronous RPC calls for immediate responses and asynchronous event-driven messaging for real-time data streams and scalable workflows. This dual approach ensures optimal performance, reliability, and scalability across the entire trading platform.

Communication Architecture

1. Synchronous Service-to-Service RPC

Direct request-response communication between microservices for immediate, guaranteed responses.

Common Solutions

Protocol Description Use Cases
HTTP/REST Standard web protocols, simple and widely supported External APIs, dashboard interfaces
gRPC High-performance RPC (HTTP/2 + Protobuf), low latency Internal service calls, high-frequency operations
Thrift Cross-language RPC framework, mature but less common Legacy system integration

gRPC Implementation Example

# strategy-engine calling market-data-center
import grpc
from market_data_pb2 import MarketDataRequest
from market_data_pb2_grpc import MarketDataServiceStub

class StrategyEngine:
    def __init__(self):
        self.market_data_channel = grpc.insecure_channel('market-data-center:50051')
        self.market_data_stub = MarketDataServiceStub(self.market_data_channel)

    def get_latest_quotes(self, symbol):
        request = MarketDataRequest(symbol=symbol)
        response = self.market_data_stub.GetLatestQuotes(request)
        return response.quotes

# signal-aggregation-center calling strategy-evolution-center
class SignalAggregator:
    def get_strategy_genealogy(self, strategy_id):
        request = StrategyGenealogyRequest(strategy_id=strategy_id)
        response = self.evolution_stub.GetGenealogy(request)
        return response.genealogy

Suitable Scenarios

  • Real-time responses required (strategy pulling latest market data)
  • Service quality guarantees (order submission, account queries)
  • Critical path operations (risk checks, position validation)

2. Asynchronous Event-Driven Communication

Event-based messaging for real-time data streams, scalable workflows, and loose coupling.

Message Broker Solutions

Middleware Description Trading System Fit
NATS Ultra-lightweight, high-speed Pub/Sub, low latency Primary choice - perfect for trading systems
Kafka Large-scale stream processing, high throughput Alternative for massive data streams
RabbitMQ Traditional message broker, ACK mechanisms Less suitable for high-frequency trading

NATS Implementation Example

# market-data-center publishing real-time data
import asyncio
import nats
from nats.aio.client import Client as NATS

class MarketDataPublisher:
    def __init__(self):
        self.nc = NATS()

    async def publish_market_data(self, symbol, data):
        await self.nc.publish(f"market.data.{symbol}", data.encode())

    async def publish_tick_data(self, tick_data):
        await self.nc.publish("market.data.tick", tick_data.encode())

# signal-aggregation-center subscribing to market data
class SignalAggregator:
    async def subscribe_to_market_data(self):
        await self.nc.subscribe("market.data.tick", cb=self.process_tick_data)

    async def process_tick_data(self, msg):
        tick_data = json.loads(msg.data.decode())
        # Process tick data and update signals
        await self.update_signals(tick_data)

# backtest-engine requesting backtest execution
class BacktestEngine:
    async def request_backtest(self, strategy_config):
        request_data = {
            "strategy_id": strategy_config["id"],
            "parameters": strategy_config["params"],
            "timeframe": strategy_config["timeframe"]
        }
        await self.nc.publish("backtest.request", json.dumps(request_data).encode())

# ultra-fast-backtest-center consuming backtest requests
class BacktestExecutor:
    async def subscribe_to_backtest_requests(self):
        await self.nc.subscribe("backtest.request", cb=self.execute_backtest)

    async def execute_backtest(self, msg):
        request = json.loads(msg.data.decode())
        result = await self.run_backtest(request)
        await self.nc.publish("backtest.result", json.dumps(result).encode())

Suitable Scenarios

  • Real-time data streams (market data, trade events, signal flows)
  • Large-scale task distribution (backtest requests, risk scans)
  • Loose coupling (independent service scaling and deployment)
  • Elastic architecture (automatic horizontal scaling of consumers)

Communication Pattern Recommendations

Trading System Architecture Guidelines

Communication Type Use Case Recommended Protocol Example
Synchronous RPC Immediate response required gRPC Account queries, order submission
Asynchronous Events Real-time data streams, scalable workflows NATS Market data, trade events, backtest tasks
External APIs Dashboard, third-party integration HTTP/REST Web interface, external system integration

Implementation Strategy

High-Frequency Internal Calls (Low Latency)

# Use gRPC for critical path operations
class OrderExecutionService:
    def __init__(self):
        self.risk_channel = grpc.insecure_channel('risk-management:50051')
        self.risk_stub = RiskServiceStub(self.risk_channel)

    async def submit_order(self, order):
        # Synchronous risk check
        risk_request = RiskCheckRequest(order=order)
        risk_response = await self.risk_stub.CheckRisk(risk_request)

        if risk_response.approved:
            # Submit to execution engine
            return await self.execution_stub.SubmitOrder(order)

Real-Time Data Streams

# Use NATS for market data and events
class MarketDataService:
    async def start_market_data_stream(self):
        await self.nc.subscribe("market.data.*", cb=self.process_market_data)
        await self.nc.subscribe("trade.events", cb=self.process_trade_events)
        await self.nc.subscribe("risk.alerts", cb=self.process_risk_alerts)

External API Interface

# Use HTTP/REST for external access
from fastapi import FastAPI

app = FastAPI()

@app.get("/api/v1/portfolio/positions")
async def get_positions():
    # Internal gRPC call to portfolio service
    portfolio_request = PortfolioRequest()
    portfolio_response = await portfolio_stub.GetPositions(portfolio_request)
    return portfolio_response.positions

System Integration Benefits

Performance Optimization

  • gRPC: Sub-millisecond latency for critical operations
  • NATS: High-throughput event streaming with minimal overhead
  • HTTP/REST: Standard interface for external systems

Scalability & Reliability

  • Horizontal scaling: Independent service scaling based on load
  • Fault isolation: Service failures don't cascade across the system
  • Load distribution: Automatic load balancing across service instances

Development & Maintenance

  • Technology diversity: Each service can use optimal technology stack
  • Independent deployment: Services can be updated without system-wide downtime
  • Clear boundaries: Well-defined interfaces between services

Business & Technical Value

  • Real-time Performance: Optimized communication for high-frequency trading
  • System Resilience: Fault-tolerant architecture with service isolation
  • Operational Flexibility: Independent scaling and deployment capabilities
  • Future-Proof Architecture: Supports growth from startup to institutional scale
  • Competitive Advantage: Professional-grade microservice architecture

This dual communication approach ensures your trading system achieves optimal performance, reliability, and scalability while maintaining the flexibility to evolve and grow with your business needs.