gRPC Streaming

Real-time data streaming with server and bidirectional gRPC streams

gRPC Streaming

The LX gRPC API provides server-streaming RPCs for real-time market data. This page covers streaming patterns, connection management, and best practices.

Streaming Types

gRPC supports four communication patterns. LX uses:

PatternLX Usage
UnaryOrder operations, queries
Server streamingMarket data subscriptions
Client streamingNot used
BidirectionalReserved for future use

Server Streaming RPCs

StreamOrderBook

Real-time order book updates for a trading pair.

rpc StreamOrderBook(StreamOrderBookRequest) returns (stream OrderBookUpdate);

message StreamOrderBookRequest {
  string symbol = 1;  // Trading pair
  int32 depth = 2;    // Max levels to track (default 20)
}

message OrderBookUpdate {
  string symbol = 1;
  repeated PriceLevel bid_updates = 2;
  repeated PriceLevel ask_updates = 3;
  int64 timestamp = 4;
}

Update Semantics:

  • First message is a full snapshot
  • Subsequent messages are incremental deltas
  • size = 0 means price level removed
  • Updates are batched at 10ms intervals

StreamTrades

Real-time trade feed for a trading pair.

rpc StreamTrades(StreamTradesRequest) returns (stream Trade);

message StreamTradesRequest {
  string symbol = 1;
}

message Trade {
  uint64 trade_id = 1;
  string symbol = 2;
  double price = 3;
  double size = 4;
  OrderSide side = 5;
  uint64 buy_order_id = 6;
  uint64 sell_order_id = 7;
  string buyer_id = 8;
  string seller_id = 9;
  int64 timestamp = 10;
}

Go Streaming Client

Basic Stream Consumer

package main

import (
    "context"
    "io"
    "log"
    "time"

    pb "github.com/luxfi/dex/pkg/grpc/gen"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewLXDEXServiceClient(conn)

    // Create context with cancellation
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start order book stream
    stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
        Symbol: "LUX-USDC",
        Depth:  20,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Consume updates
    for {
        update, err := stream.Recv()
        if err == io.EOF {
            log.Println("Stream closed by server")
            return
        }
        if err != nil {
            log.Printf("Stream error: %v", err)
            return
        }

        log.Printf("OrderBook update: %d bids, %d asks @ %d",
            len(update.BidUpdates),
            len(update.AskUpdates),
            update.Timestamp,
        )
    }
}

Stream with Reconnection

package main

import (
    "context"
    "io"
    "log"
    "time"

    pb "github.com/luxfi/dex/pkg/grpc/gen"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/status"
)

type OrderBookStreamer struct {
    client     pb.LXDEXServiceClient
    symbol     string
    depth      int32
    updates    chan *pb.OrderBookUpdate
    maxRetries int
    baseDelay  time.Duration
}

func NewOrderBookStreamer(client pb.LXDEXServiceClient, symbol string) *OrderBookStreamer {
    return &OrderBookStreamer{
        client:     client,
        symbol:     symbol,
        depth:      20,
        updates:    make(chan *pb.OrderBookUpdate, 1000),
        maxRetries: 10,
        baseDelay:  100 * time.Millisecond,
    }
}

func (s *OrderBookStreamer) Start(ctx context.Context) {
    go s.streamWithReconnect(ctx)
}

func (s *OrderBookStreamer) Updates() <-chan *pb.OrderBookUpdate {
    return s.updates
}

func (s *OrderBookStreamer) streamWithReconnect(ctx context.Context) {
    defer close(s.updates)

    retries := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        err := s.stream(ctx)
        if err == nil {
            return // Clean shutdown
        }

        // Check if error is retryable
        st, ok := status.FromError(err)
        if ok && !isRetryable(st.Code()) {
            log.Printf("Non-retryable error: %v", err)
            return
        }

        retries++
        if retries > s.maxRetries {
            log.Printf("Max retries exceeded")
            return
        }

        // Exponential backoff
        delay := s.baseDelay * time.Duration(1<<retries)
        if delay > 30*time.Second {
            delay = 30 * time.Second
        }

        log.Printf("Reconnecting in %v (attempt %d/%d)", delay, retries, s.maxRetries)

        select {
        case <-ctx.Done():
            return
        case <-time.After(delay):
        }
    }
}

func (s *OrderBookStreamer) stream(ctx context.Context) error {
    stream, err := s.client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
        Symbol: s.symbol,
        Depth:  s.depth,
    })
    if err != nil {
        return err
    }

    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        select {
        case s.updates <- update:
        default:
            log.Println("Update channel full, dropping update")
        }
    }
}

func isRetryable(code codes.Code) bool {
    switch code {
    case codes.Unavailable, codes.ResourceExhausted, codes.Aborted, codes.Internal:
        return true
    default:
        return false
    }
}

func main() {
    conn, _ := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    defer conn.Close()

    client := pb.NewLXDEXServiceClient(conn)
    streamer := NewOrderBookStreamer(client, "LUX-USDC")

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    streamer.Start(ctx)

    for update := range streamer.Updates() {
        log.Printf("Update: %+v", update)
    }
}

Python Streaming Client

Basic Consumer

import grpc
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc


def stream_orderbook(symbol: str):
    channel = grpc.insecure_channel('localhost:50051')
    stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)

    request = lxdex_pb2.StreamOrderBookRequest(
        symbol=symbol,
        depth=20,
    )

    try:
        for update in stub.StreamOrderBook(request):
            print(f"Bids: {len(update.bid_updates)}, "
                  f"Asks: {len(update.ask_updates)}, "
                  f"Time: {update.timestamp}")
    except grpc.RpcError as e:
        print(f"Stream error: {e.code()}: {e.details()}")


if __name__ == "__main__":
    stream_orderbook("LUX-USDC")

Async Stream with Reconnection

import asyncio
import grpc
from typing import AsyncIterator
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc


class OrderBookStreamer:
    def __init__(
        self,
        channel: grpc.aio.Channel,
        symbol: str,
        depth: int = 20,
        max_retries: int = 10,
    ):
        self.stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
        self.symbol = symbol
        self.depth = depth
        self.max_retries = max_retries
        self._running = False

    async def stream(self) -> AsyncIterator[lxdex_pb2.OrderBookUpdate]:
        self._running = True
        retries = 0
        base_delay = 0.1

        while self._running:
            try:
                request = lxdex_pb2.StreamOrderBookRequest(
                    symbol=self.symbol,
                    depth=self.depth,
                )

                async for update in self.stub.StreamOrderBook(request):
                    retries = 0  # Reset on success
                    yield update

            except grpc.RpcError as e:
                if not self._is_retryable(e.code()):
                    raise

                retries += 1
                if retries > self.max_retries:
                    raise

                delay = min(base_delay * (2 ** retries), 30.0)
                print(f"Reconnecting in {delay}s (attempt {retries})")
                await asyncio.sleep(delay)

    def stop(self):
        self._running = False

    @staticmethod
    def _is_retryable(code: grpc.StatusCode) -> bool:
        return code in (
            grpc.StatusCode.UNAVAILABLE,
            grpc.StatusCode.RESOURCE_EXHAUSTED,
            grpc.StatusCode.ABORTED,
            grpc.StatusCode.INTERNAL,
        )


async def main():
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        streamer = OrderBookStreamer(channel, "LUX-USDC")

        async for update in streamer.stream():
            print(f"Update: {update.timestamp}")


if __name__ == "__main__":
    asyncio.run(main())

TypeScript Streaming Client

Using @connectrpc/connect

import { createPromiseClient } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LXDEXService } from "./gen/lxdex_connect";

const transport = createGrpcTransport({
  baseUrl: "http://localhost:50051",
  httpVersion: "2",
});

const client = createPromiseClient(LXDEXService, transport);

async function streamOrderBook(symbol: string) {
  const stream = client.streamOrderBook({
    symbol,
    depth: 20,
  });

  for await (const update of stream) {
    console.log(`Bids: ${update.bidUpdates.length}`);
    console.log(`Asks: ${update.askUpdates.length}`);
    console.log(`Time: ${update.timestamp}`);
  }
}

streamOrderBook("LUX-USDC").catch(console.error);

Stream with Reconnection

import { createPromiseClient, ConnectError, Code } from "@connectrpc/connect";
import { createGrpcTransport } from "@connectrpc/connect-node";
import { LXDEXService } from "./gen/lxdex_connect";
import type { OrderBookUpdate } from "./gen/lxdex_pb";

class OrderBookStreamer {
  private client;
  private symbol: string;
  private depth: number;
  private maxRetries: number;
  private running = false;

  constructor(baseUrl: string, symbol: string, depth = 20, maxRetries = 10) {
    const transport = createGrpcTransport({
      baseUrl,
      httpVersion: "2",
    });
    this.client = createPromiseClient(LXDEXService, transport);
    this.symbol = symbol;
    this.depth = depth;
    this.maxRetries = maxRetries;
  }

  async *stream(): AsyncGenerator<OrderBookUpdate> {
    this.running = true;
    let retries = 0;
    const baseDelay = 100;

    while (this.running) {
      try {
        const stream = this.client.streamOrderBook({
          symbol: this.symbol,
          depth: this.depth,
        });

        for await (const update of stream) {
          retries = 0;
          yield update;
        }
      } catch (err) {
        if (err instanceof ConnectError && !this.isRetryable(err.code)) {
          throw err;
        }

        retries++;
        if (retries > this.maxRetries) {
          throw err;
        }

        const delay = Math.min(baseDelay * 2 ** retries, 30000);
        console.log(`Reconnecting in ${delay}ms (attempt ${retries})`);
        await new Promise((r) => setTimeout(r, delay));
      }
    }
  }

  stop() {
    this.running = false;
  }

  private isRetryable(code: Code): boolean {
    return [
      Code.Unavailable,
      Code.ResourceExhausted,
      Code.Aborted,
      Code.Internal,
    ].includes(code);
  }
}

async function main() {
  const streamer = new OrderBookStreamer("http://localhost:50051", "LUX-USDC");

  for await (const update of streamer.stream()) {
    console.log(`Update at ${update.timestamp}`);
  }
}

main().catch(console.error);

Stream Management Best Practices

1. Backpressure Handling

When the consumer is slower than the producer, implement backpressure:

// Buffered channel with drop policy
updates := make(chan *pb.OrderBookUpdate, 1000)

// Producer
select {
case updates <- update:
    // Sent successfully
default:
    // Buffer full, drop oldest or newest
    select {
    case <-updates:
        updates <- update // Drop oldest
    default:
        // Drop newest (current update)
    }
}

2. Heartbeat Detection

Detect dead connections with heartbeat monitoring:

const heartbeatInterval = 30 * time.Second

func streamWithHeartbeat(ctx context.Context, client pb.LXDEXServiceClient) error {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
        Symbol: "LUX-USDC",
    })
    if err != nil {
        return err
    }

    lastUpdate := time.Now()
    heartbeat := time.NewTicker(heartbeatInterval)
    defer heartbeat.Stop()

    for {
        select {
        case <-heartbeat.C:
            if time.Since(lastUpdate) > 2*heartbeatInterval {
                return fmt.Errorf("no updates received, connection likely dead")
            }
        default:
        }

        update, err := stream.Recv()
        if err != nil {
            return err
        }
        lastUpdate = time.Now()

        processUpdate(update)
    }
}

3. Graceful Shutdown

Properly close streams on shutdown:

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // Handle SIGINT/SIGTERM
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigCh
        log.Println("Shutting down...")
        cancel() // This cancels all streams
    }()

    // Start streaming
    runStreams(ctx)
}

4. Multiple Streams

Manage multiple symbol subscriptions:

type StreamManager struct {
    client   pb.LXDEXServiceClient
    streams  map[string]context.CancelFunc
    mu       sync.Mutex
}

func (m *StreamManager) Subscribe(symbol string, handler func(*pb.OrderBookUpdate)) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    if _, exists := m.streams[symbol]; exists {
        return fmt.Errorf("already subscribed to %s", symbol)
    }

    ctx, cancel := context.WithCancel(context.Background())
    m.streams[symbol] = cancel

    go m.runStream(ctx, symbol, handler)
    return nil
}

func (m *StreamManager) Unsubscribe(symbol string) {
    m.mu.Lock()
    defer m.mu.Unlock()

    if cancel, exists := m.streams[symbol]; exists {
        cancel()
        delete(m.streams, symbol)
    }
}

func (m *StreamManager) runStream(ctx context.Context, symbol string, handler func(*pb.OrderBookUpdate)) {
    stream, err := m.client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
        Symbol: symbol,
    })
    if err != nil {
        log.Printf("Failed to start stream for %s: %v", symbol, err)
        return
    }

    for {
        update, err := stream.Recv()
        if err != nil {
            return
        }
        handler(update)
    }
}

Performance Considerations

Message Rate

StreamTypical RateMax Rate
OrderBook (per symbol)10-100/s1000/s
Trades (per symbol)1-50/s500/s

Bandwidth Estimation

OrderBookUpdate message size: ~200 bytes (20 levels)
At 100 updates/second: ~20 KB/s per symbol
10 symbols: ~200 KB/s total

Connection Limits

LimitValue
Max concurrent streams per connection100
Max streams per user50
Max connections per IP10
Stream timeout (no activity)5 minutes