gRPC Streaming
Real-time data streaming with server and bidirectional gRPC streams
gRPC Streaming
The LX gRPC API provides server-streaming RPCs for real-time market data. This page covers streaming patterns, connection management, and best practices.
Streaming Types
gRPC supports four communication patterns. LX uses:
| Pattern | LX Usage |
|---|---|
| Unary | Order operations, queries |
| Server streaming | Market data subscriptions |
| Client streaming | Not used |
| Bidirectional | Reserved for future use |
Server Streaming RPCs
StreamOrderBook
Real-time order book updates for a trading pair.
rpc StreamOrderBook(StreamOrderBookRequest) returns (stream OrderBookUpdate);
message StreamOrderBookRequest {
string symbol = 1; // Trading pair
int32 depth = 2; // Max levels to track (default 20)
}
message OrderBookUpdate {
string symbol = 1;
repeated PriceLevel bid_updates = 2;
repeated PriceLevel ask_updates = 3;
int64 timestamp = 4;
}Update Semantics:
- First message is a full snapshot
- Subsequent messages are incremental deltas
size = 0means price level removed- Updates are batched at 10ms intervals
StreamTrades
Real-time trade feed for a trading pair.
rpc StreamTrades(StreamTradesRequest) returns (stream Trade);
message StreamTradesRequest {
string symbol = 1;
}
message Trade {
uint64 trade_id = 1;
string symbol = 2;
double price = 3;
double size = 4;
OrderSide side = 5;
uint64 buy_order_id = 6;
uint64 sell_order_id = 7;
string buyer_id = 8;
string seller_id = 9;
int64 timestamp = 10;
}Go Streaming Client
Basic Stream Consumer
package main
import (
"context"
"io"
"log"
"time"
pb "github.com/luxfi/dex/pkg/grpc/gen"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := pb.NewLXDEXServiceClient(conn)
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start order book stream
stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: "LUX-USDC",
Depth: 20,
})
if err != nil {
log.Fatal(err)
}
// Consume updates
for {
update, err := stream.Recv()
if err == io.EOF {
log.Println("Stream closed by server")
return
}
if err != nil {
log.Printf("Stream error: %v", err)
return
}
log.Printf("OrderBook update: %d bids, %d asks @ %d",
len(update.BidUpdates),
len(update.AskUpdates),
update.Timestamp,
)
}
}Stream with Reconnection
package main
import (
"context"
"io"
"log"
"time"
pb "github.com/luxfi/dex/pkg/grpc/gen"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
type OrderBookStreamer struct {
client pb.LXDEXServiceClient
symbol string
depth int32
updates chan *pb.OrderBookUpdate
maxRetries int
baseDelay time.Duration
}
func NewOrderBookStreamer(client pb.LXDEXServiceClient, symbol string) *OrderBookStreamer {
return &OrderBookStreamer{
client: client,
symbol: symbol,
depth: 20,
updates: make(chan *pb.OrderBookUpdate, 1000),
maxRetries: 10,
baseDelay: 100 * time.Millisecond,
}
}
func (s *OrderBookStreamer) Start(ctx context.Context) {
go s.streamWithReconnect(ctx)
}
func (s *OrderBookStreamer) Updates() <-chan *pb.OrderBookUpdate {
return s.updates
}
func (s *OrderBookStreamer) streamWithReconnect(ctx context.Context) {
defer close(s.updates)
retries := 0
for {
select {
case <-ctx.Done():
return
default:
}
err := s.stream(ctx)
if err == nil {
return // Clean shutdown
}
// Check if error is retryable
st, ok := status.FromError(err)
if ok && !isRetryable(st.Code()) {
log.Printf("Non-retryable error: %v", err)
return
}
retries++
if retries > s.maxRetries {
log.Printf("Max retries exceeded")
return
}
// Exponential backoff
delay := s.baseDelay * time.Duration(1<<retries)
if delay > 30*time.Second {
delay = 30 * time.Second
}
log.Printf("Reconnecting in %v (attempt %d/%d)", delay, retries, s.maxRetries)
select {
case <-ctx.Done():
return
case <-time.After(delay):
}
}
}
func (s *OrderBookStreamer) stream(ctx context.Context) error {
stream, err := s.client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: s.symbol,
Depth: s.depth,
})
if err != nil {
return err
}
for {
update, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
select {
case s.updates <- update:
default:
log.Println("Update channel full, dropping update")
}
}
}
func isRetryable(code codes.Code) bool {
switch code {
case codes.Unavailable, codes.ResourceExhausted, codes.Aborted, codes.Internal:
return true
default:
return false
}
}
func main() {
conn, _ := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer conn.Close()
client := pb.NewLXDEXServiceClient(conn)
streamer := NewOrderBookStreamer(client, "LUX-USDC")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streamer.Start(ctx)
for update := range streamer.Updates() {
log.Printf("Update: %+v", update)
}
}Python Streaming Client
Basic Consumer
import grpc
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc
def stream_orderbook(symbol: str):
channel = grpc.insecure_channel('localhost:50051')
stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
request = lxdex_pb2.StreamOrderBookRequest(
symbol=symbol,
depth=20,
)
try:
for update in stub.StreamOrderBook(request):
print(f"Bids: {len(update.bid_updates)}, "
f"Asks: {len(update.ask_updates)}, "
f"Time: {update.timestamp}")
except grpc.RpcError as e:
print(f"Stream error: {e.code()}: {e.details()}")
if __name__ == "__main__":
stream_orderbook("LUX-USDC")Async Stream with Reconnection
import asyncio
import grpc
from typing import AsyncIterator
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc
class OrderBookStreamer:
def __init__(
self,
channel: grpc.aio.Channel,
symbol: str,
depth: int = 20,
max_retries: int = 10,
):
self.stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
self.symbol = symbol
self.depth = depth
self.max_retries = max_retries
self._running = False
async def stream(self) -> AsyncIterator[lxdex_pb2.OrderBookUpdate]:
self._running = True
retries = 0
base_delay = 0.1
while self._running:
try:
request = lxdex_pb2.StreamOrderBookRequest(
symbol=self.symbol,
depth=self.depth,
)
async for update in self.stub.StreamOrderBook(request):
retries = 0 # Reset on success
yield update
except grpc.RpcError as e:
if not self._is_retryable(e.code()):
raise
retries += 1
if retries > self.max_retries:
raise
delay = min(base_delay * (2 ** retries), 30.0)
print(f"Reconnecting in {delay}s (attempt {retries})")
await asyncio.sleep(delay)
def stop(self):
self._running = False
@staticmethod
def _is_retryable(code: grpc.StatusCode) -> bool:
return code in (
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.RESOURCE_EXHAUSTED,
grpc.StatusCode.ABORTED,
grpc.StatusCode.INTERNAL,
)
async def main():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
streamer = OrderBookStreamer(channel, "LUX-USDC")
async for update in streamer.stream():
print(f"Update: {update.timestamp}")
if __name__ == "__main__":
asyncio.run(main())TypeScript Streaming Client
Using @connectrpc/connect
import { createPromiseClient } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LXDEXService } from "./gen/lxdex_connect";
const transport = createGrpcTransport({
baseUrl: "http://localhost:50051",
httpVersion: "2",
});
const client = createPromiseClient(LXDEXService, transport);
async function streamOrderBook(symbol: string) {
const stream = client.streamOrderBook({
symbol,
depth: 20,
});
for await (const update of stream) {
console.log(`Bids: ${update.bidUpdates.length}`);
console.log(`Asks: ${update.askUpdates.length}`);
console.log(`Time: ${update.timestamp}`);
}
}
streamOrderBook("LUX-USDC").catch(console.error);Stream with Reconnection
import { createPromiseClient, ConnectError, Code } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LXDEXService } from "./gen/lxdex_connect";
import type { OrderBookUpdate } from "./gen/lxdex_pb";
class OrderBookStreamer {
private client;
private symbol: string;
private depth: number;
private maxRetries: number;
private running = false;
constructor(baseUrl: string, symbol: string, depth = 20, maxRetries = 10) {
const transport = createGrpcTransport({
baseUrl,
httpVersion: "2",
});
this.client = createPromiseClient(LXDEXService, transport);
this.symbol = symbol;
this.depth = depth;
this.maxRetries = maxRetries;
}
async *stream(): AsyncGenerator<OrderBookUpdate> {
this.running = true;
let retries = 0;
const baseDelay = 100;
while (this.running) {
try {
const stream = this.client.streamOrderBook({
symbol: this.symbol,
depth: this.depth,
});
for await (const update of stream) {
retries = 0;
yield update;
}
} catch (err) {
if (err instanceof ConnectError && !this.isRetryable(err.code)) {
throw err;
}
retries++;
if (retries > this.maxRetries) {
throw err;
}
const delay = Math.min(baseDelay * 2 ** retries, 30000);
console.log(`Reconnecting in ${delay}ms (attempt ${retries})`);
await new Promise((r) => setTimeout(r, delay));
}
}
}
stop() {
this.running = false;
}
private isRetryable(code: Code): boolean {
return [
Code.Unavailable,
Code.ResourceExhausted,
Code.Aborted,
Code.Internal,
].includes(code);
}
}
async function main() {
const streamer = new OrderBookStreamer("http://localhost:50051", "LUX-USDC");
for await (const update of streamer.stream()) {
console.log(`Update at ${update.timestamp}`);
}
}
main().catch(console.error);Stream Management Best Practices
1. Backpressure Handling
When the consumer is slower than the producer, implement backpressure:
// Buffered channel with drop policy
updates := make(chan *pb.OrderBookUpdate, 1000)
// Producer
select {
case updates <- update:
// Sent successfully
default:
// Buffer full, drop oldest or newest
select {
case <-updates:
updates <- update // Drop oldest
default:
// Drop newest (current update)
}
}2. Heartbeat Detection
Detect dead connections with heartbeat monitoring:
const heartbeatInterval = 30 * time.Second
func streamWithHeartbeat(ctx context.Context, client pb.LXDEXServiceClient) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: "LUX-USDC",
})
if err != nil {
return err
}
lastUpdate := time.Now()
heartbeat := time.NewTicker(heartbeatInterval)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
if time.Since(lastUpdate) > 2*heartbeatInterval {
return fmt.Errorf("no updates received, connection likely dead")
}
default:
}
update, err := stream.Recv()
if err != nil {
return err
}
lastUpdate = time.Now()
processUpdate(update)
}
}3. Graceful Shutdown
Properly close streams on shutdown:
func main() {
ctx, cancel := context.WithCancel(context.Background())
// Handle SIGINT/SIGTERM
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("Shutting down...")
cancel() // This cancels all streams
}()
// Start streaming
runStreams(ctx)
}4. Multiple Streams
Manage multiple symbol subscriptions:
type StreamManager struct {
client pb.LXDEXServiceClient
streams map[string]context.CancelFunc
mu sync.Mutex
}
func (m *StreamManager) Subscribe(symbol string, handler func(*pb.OrderBookUpdate)) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.streams[symbol]; exists {
return fmt.Errorf("already subscribed to %s", symbol)
}
ctx, cancel := context.WithCancel(context.Background())
m.streams[symbol] = cancel
go m.runStream(ctx, symbol, handler)
return nil
}
func (m *StreamManager) Unsubscribe(symbol string) {
m.mu.Lock()
defer m.mu.Unlock()
if cancel, exists := m.streams[symbol]; exists {
cancel()
delete(m.streams, symbol)
}
}
func (m *StreamManager) runStream(ctx context.Context, symbol string, handler func(*pb.OrderBookUpdate)) {
stream, err := m.client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: symbol,
})
if err != nil {
log.Printf("Failed to start stream for %s: %v", symbol, err)
return
}
for {
update, err := stream.Recv()
if err != nil {
return
}
handler(update)
}
}Performance Considerations
Message Rate
| Stream | Typical Rate | Max Rate |
|---|---|---|
| OrderBook (per symbol) | 10-100/s | 1000/s |
| Trades (per symbol) | 1-50/s | 500/s |
Bandwidth Estimation
OrderBookUpdate message size: ~200 bytes (20 levels)
At 100 updates/second: ~20 KB/s per symbol
10 symbols: ~200 KB/s totalConnection Limits
| Limit | Value |
|---|---|
| Max concurrent streams per connection | 100 |
| Max streams per user | 50 |
| Max connections per IP | 10 |
| Stream timeout (no activity) | 5 minutes |