Market Data RPCs

gRPC API for order book, trades, and real-time market data streaming

Market Data RPCs

Reference for market data retrieval via gRPC: GetOrderBook, StreamOrderBook, GetTrades, and StreamTrades.

Proto Definitions

Common Messages

message PriceLevel {
  double price = 1;   // Price level
  double size = 2;    // Total quantity at this price
  int32 count = 3;    // Number of orders at this price
}

message OrderBook {
  string symbol = 1;              // Trading pair
  repeated PriceLevel bids = 2;   // Buy orders (descending by price)
  repeated PriceLevel asks = 3;   // Sell orders (ascending by price)
  int64 timestamp = 4;            // Snapshot timestamp (Unix nanoseconds)
}

message OrderBookUpdate {
  string symbol = 1;
  repeated PriceLevel bid_updates = 2;  // Changed bid levels
  repeated PriceLevel ask_updates = 3;  // Changed ask levels
  int64 timestamp = 4;
}

message Trade {
  uint64 trade_id = 1;
  string symbol = 2;
  double price = 3;
  double size = 4;
  OrderSide side = 5;           // Taker side
  uint64 buy_order_id = 6;
  uint64 sell_order_id = 7;
  string buyer_id = 8;
  string seller_id = 9;
  int64 timestamp = 10;         // Execution time (Unix nanoseconds)
}

GetOrderBook

Retrieve a snapshot of the current order book.

RPC Definition

rpc GetOrderBook(GetOrderBookRequest) returns (OrderBook);

Request

message GetOrderBookRequest {
  string symbol = 1;  // Required: trading pair
  int32 depth = 2;    // Optional: max levels per side (default 20, max 500)
}

Response

message OrderBook {
  string symbol = 1;
  repeated PriceLevel bids = 2;  // Sorted descending by price
  repeated PriceLevel asks = 3;  // Sorted ascending by price
  int64 timestamp = 4;
}

Error Codes

gRPC CodeCondition
INVALID_ARGUMENTInvalid symbol or depth
NOT_FOUNDSymbol not found

Examples

Go

book, err := client.GetOrderBook(ctx, &pb.GetOrderBookRequest{
    Symbol: "LUX-USDC",
    Depth:  20,
})
if err != nil {
    log.Fatal(err)
}

log.Printf("Order Book for %s at %d:", book.Symbol, book.Timestamp)
log.Println("BIDS:")
for i, level := range book.Bids {
    log.Printf("  %d. %.2f x %.4f (%d orders)",
        i+1, level.Price, level.Size, level.Count)
}
log.Println("ASKS:")
for i, level := range book.Asks {
    log.Printf("  %d. %.2f x %.4f (%d orders)",
        i+1, level.Price, level.Size, level.Count)
}

// Calculate spread
if len(book.Bids) > 0 && len(book.Asks) > 0 {
    spread := book.Asks[0].Price - book.Bids[0].Price
    midPrice := (book.Asks[0].Price + book.Bids[0].Price) / 2
    spreadBps := spread / midPrice * 10000
    log.Printf("Spread: %.2f (%.1f bps)", spread, spreadBps)
}

Python

book = stub.GetOrderBook(lxdex_pb2.GetOrderBookRequest(
    symbol="LUX-USDC",
    depth=20,
))

print(f"Order Book for {book.symbol}")
print(f"Best Bid: {book.bids[0].price} x {book.bids[0].size}")
print(f"Best Ask: {book.asks[0].price} x {book.asks[0].size}")

# Calculate total liquidity
bid_liquidity = sum(level.size for level in book.bids)
ask_liquidity = sum(level.size for level in book.asks)
print(f"Bid Liquidity: {bid_liquidity:.2f}")
print(f"Ask Liquidity: {ask_liquidity:.2f}")

TypeScript

const book = await client.getOrderBook({
  symbol: "LUX-USDC",
  depth: 20,
});

console.log(`Best Bid: ${book.bids[0]?.price} x ${book.bids[0]?.size}`);
console.log(`Best Ask: ${book.asks[0]?.price} x ${book.asks[0]?.size}`);

// Calculate VWAP for bids
let totalSize = 0;
let totalValue = 0;
for (const level of book.bids.slice(0, 5)) {
  totalSize += level.size;
  totalValue += level.price * level.size;
}
const vwapBid = totalValue / totalSize;
console.log(`5-level VWAP Bid: ${vwapBid.toFixed(2)}`);

StreamOrderBook

Subscribe to real-time order book updates.

RPC Definition

rpc StreamOrderBook(StreamOrderBookRequest) returns (stream OrderBookUpdate);

Request

message StreamOrderBookRequest {
  string symbol = 1;  // Required: trading pair
  int32 depth = 2;    // Optional: max levels to track (default 20)
}

Response Stream

message OrderBookUpdate {
  string symbol = 1;
  repeated PriceLevel bid_updates = 2;
  repeated PriceLevel ask_updates = 3;
  int64 timestamp = 4;
}

Update Semantics

  1. First message: Full order book snapshot
  2. Subsequent messages: Incremental deltas
  3. Size = 0: Price level removed
  4. Batching: Updates batched at 10ms intervals

Examples

Go: Maintain Local Order Book

package main

import (
    "context"
    "io"
    "log"
    "sort"
    "sync"

    pb "github.com/luxfi/dex/pkg/grpc/gen"
)

type LocalOrderBook struct {
    mu        sync.RWMutex
    symbol    string
    bids      map[float64]*pb.PriceLevel
    asks      map[float64]*pb.PriceLevel
    timestamp int64
}

func NewLocalOrderBook(symbol string) *LocalOrderBook {
    return &LocalOrderBook{
        symbol: symbol,
        bids:   make(map[float64]*pb.PriceLevel),
        asks:   make(map[float64]*pb.PriceLevel),
    }
}

func (ob *LocalOrderBook) ApplyUpdate(update *pb.OrderBookUpdate) {
    ob.mu.Lock()
    defer ob.mu.Unlock()

    for _, level := range update.BidUpdates {
        if level.Size == 0 {
            delete(ob.bids, level.Price)
        } else {
            ob.bids[level.Price] = level
        }
    }

    for _, level := range update.AskUpdates {
        if level.Size == 0 {
            delete(ob.asks, level.Price)
        } else {
            ob.asks[level.Price] = level
        }
    }

    ob.timestamp = update.Timestamp
}

func (ob *LocalOrderBook) BestBid() *pb.PriceLevel {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    var best *pb.PriceLevel
    for _, level := range ob.bids {
        if best == nil || level.Price > best.Price {
            best = level
        }
    }
    return best
}

func (ob *LocalOrderBook) BestAsk() *pb.PriceLevel {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    var best *pb.PriceLevel
    for _, level := range ob.asks {
        if best == nil || level.Price < best.Price {
            best = level
        }
    }
    return best
}

func (ob *LocalOrderBook) Spread() float64 {
    bid := ob.BestBid()
    ask := ob.BestAsk()
    if bid == nil || ask == nil {
        return 0
    }
    return ask.Price - bid.Price
}

func main() {
    // ... setup client ...

    localBook := NewLocalOrderBook("LUX-USDC")

    stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
        Symbol: "LUX-USDC",
        Depth:  20,
    })
    if err != nil {
        log.Fatal(err)
    }

    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return
        }
        if err != nil {
            log.Fatal(err)
        }

        localBook.ApplyUpdate(update)

        bid := localBook.BestBid()
        ask := localBook.BestAsk()
        if bid != nil && ask != nil {
            log.Printf("BBO: %.2f x %.4f | %.2f x %.4f (spread: %.2f)",
                bid.Price, bid.Size,
                ask.Price, ask.Size,
                localBook.Spread(),
            )
        }
    }
}

Python: Async Stream Consumer

import asyncio
import grpc
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict

from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc


@dataclass
class PriceLevel:
    price: float
    size: float
    count: int


class LocalOrderBook:
    def __init__(self, symbol: str):
        self.symbol = symbol
        self.bids: Dict[float, PriceLevel] = {}
        self.asks: Dict[float, PriceLevel] = {}
        self.timestamp = 0

    def apply_update(self, update):
        for level in update.bid_updates:
            if level.size == 0:
                self.bids.pop(level.price, None)
            else:
                self.bids[level.price] = PriceLevel(
                    level.price, level.size, level.count
                )

        for level in update.ask_updates:
            if level.size == 0:
                self.asks.pop(level.price, None)
            else:
                self.asks[level.price] = PriceLevel(
                    level.price, level.size, level.count
                )

        self.timestamp = update.timestamp

    @property
    def best_bid(self) -> PriceLevel | None:
        if not self.bids:
            return None
        return self.bids[max(self.bids.keys())]

    @property
    def best_ask(self) -> PriceLevel | None:
        if not self.asks:
            return None
        return self.asks[min(self.asks.keys())]

    @property
    def spread(self) -> float:
        if self.best_bid and self.best_ask:
            return self.best_ask.price - self.best_bid.price
        return 0.0


async def stream_order_book(symbol: str):
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
        book = LocalOrderBook(symbol)

        request = lxdex_pb2.StreamOrderBookRequest(symbol=symbol, depth=20)

        async for update in stub.StreamOrderBook(request):
            book.apply_update(update)

            if book.best_bid and book.best_ask:
                print(f"BBO: {book.best_bid.price:.2f} x {book.best_bid.size:.4f} | "
                      f"{book.best_ask.price:.2f} x {book.best_ask.size:.4f} "
                      f"(spread: {book.spread:.2f})")


if __name__ == "__main__":
    asyncio.run(stream_order_book("LUX-USDC"))

GetTrades

Query recent trades for a symbol.

RPC Definition

rpc GetTrades(GetTradesRequest) returns (GetTradesResponse);

Request

message GetTradesRequest {
  string symbol = 1;  // Required: trading pair
  int32 limit = 2;    // Optional: max results (default 100, max 1000)
}

Response

message GetTradesResponse {
  repeated Trade trades = 1;  // Ordered by timestamp descending
}

Examples

Go

resp, err := client.GetTrades(ctx, &pb.GetTradesRequest{
    Symbol: "LUX-USDC",
    Limit:  50,
})
if err != nil {
    log.Fatal(err)
}

log.Printf("Recent %d trades for LUX-USDC:", len(resp.Trades))
for _, trade := range resp.Trades {
    side := "BUY"
    if trade.Side == pb.OrderSide_SELL {
        side = "SELL"
    }
    log.Printf("  %d: %s %.4f @ %.2f (taker: %s)",
        trade.TradeId,
        side,
        trade.Size,
        trade.Price,
        side,
    )
}

// Calculate VWAP
var totalSize, totalValue float64
for _, trade := range resp.Trades {
    totalSize += trade.Size
    totalValue += trade.Price * trade.Size
}
vwap := totalValue / totalSize
log.Printf("VWAP (last %d trades): %.2f", len(resp.Trades), vwap)

Python

response = stub.GetTrades(lxdex_pb2.GetTradesRequest(
    symbol="LUX-USDC",
    limit=100,
))

# Group by taker side
buy_volume = sum(t.size for t in response.trades if t.side == lxdex_pb2.BUY)
sell_volume = sum(t.size for t in response.trades if t.side == lxdex_pb2.SELL)

print(f"Buy Volume: {buy_volume:.2f}")
print(f"Sell Volume: {sell_volume:.2f}")
print(f"Buy/Sell Ratio: {buy_volume / sell_volume:.2f}" if sell_volume > 0 else "N/A")

StreamTrades

Subscribe to real-time trade feed.

RPC Definition

rpc StreamTrades(StreamTradesRequest) returns (stream Trade);

Request

message StreamTradesRequest {
  string symbol = 1;  // Required: trading pair
}

Response Stream

Each message is a single Trade as executed.

Examples

Go: Real-Time Trade Monitor

stream, err := client.StreamTrades(ctx, &pb.StreamTradesRequest{
    Symbol: "LUX-USDC",
})
if err != nil {
    log.Fatal(err)
}

// Track rolling statistics
var (
    windowSize  = 100
    recentPrices []float64
    totalVolume float64
)

for {
    trade, err := stream.Recv()
    if err == io.EOF {
        return
    }
    if err != nil {
        log.Fatal(err)
    }

    // Update rolling window
    recentPrices = append(recentPrices, trade.Price)
    if len(recentPrices) > windowSize {
        recentPrices = recentPrices[1:]
    }
    totalVolume += trade.Size

    // Calculate stats
    var sum float64
    for _, p := range recentPrices {
        sum += p
    }
    avgPrice := sum / float64(len(recentPrices))

    side := "BUY "
    if trade.Side == pb.OrderSide_SELL {
        side = "SELL"
    }

    log.Printf("[%s] %.4f @ %.2f | Avg: %.2f | Vol: %.2f",
        side, trade.Size, trade.Price, avgPrice, totalVolume)
}

Python: Trade Volume Aggregator

import asyncio
from collections import defaultdict
from datetime import datetime

import grpc
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc


class VolumeAggregator:
    def __init__(self, bucket_seconds: int = 60):
        self.bucket_seconds = bucket_seconds
        self.buckets = defaultdict(lambda: {"buy": 0.0, "sell": 0.0, "count": 0})

    def add_trade(self, trade):
        bucket_time = trade.timestamp // (self.bucket_seconds * 1_000_000_000)
        bucket = self.buckets[bucket_time]

        if trade.side == lxdex_pb2.BUY:
            bucket["buy"] += trade.size
        else:
            bucket["sell"] += trade.size
        bucket["count"] += 1

    def current_bucket_stats(self):
        current_bucket = int(datetime.now().timestamp()) // self.bucket_seconds
        return self.buckets.get(current_bucket, {"buy": 0, "sell": 0, "count": 0})


async def stream_and_aggregate(symbol: str):
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
        aggregator = VolumeAggregator(bucket_seconds=60)

        request = lxdex_pb2.StreamTradesRequest(symbol=symbol)

        async for trade in stub.StreamTrades(request):
            aggregator.add_trade(trade)

            stats = aggregator.current_bucket_stats()
            side = "BUY" if trade.side == lxdex_pb2.BUY else "SELL"

            print(f"[{side}] {trade.size:.4f} @ {trade.price:.2f} | "
                  f"1min Vol: B={stats['buy']:.2f} S={stats['sell']:.2f} "
                  f"({stats['count']} trades)")


if __name__ == "__main__":
    asyncio.run(stream_and_aggregate("LUX-USDC"))

Market Data Patterns

1. Order Book Snapshot + Stream

Best practice: get snapshot first, then subscribe to updates.

// 1. Get initial snapshot
book, err := client.GetOrderBook(ctx, &pb.GetOrderBookRequest{
    Symbol: "LUX-USDC",
    Depth:  100,
})
if err != nil {
    log.Fatal(err)
}

// 2. Initialize local book
localBook := NewLocalOrderBook("LUX-USDC")
localBook.InitFromSnapshot(book)

// 3. Subscribe to updates
stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
    Symbol: "LUX-USDC",
    Depth:  100,
})
if err != nil {
    log.Fatal(err)
}

// 4. Apply incremental updates
for {
    update, err := stream.Recv()
    if err != nil {
        break
    }
    localBook.ApplyUpdate(update)
}

2. Multi-Symbol Aggregation

symbols := []string{"LUX-USDC", "ETH-USDC", "BTC-USDC"}
updates := make(chan *pb.OrderBookUpdate, 1000)

for _, symbol := range symbols {
    go func(sym string) {
        stream, _ := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
            Symbol: sym,
        })
        for {
            update, err := stream.Recv()
            if err != nil {
                return
            }
            updates <- update
        }
    }(symbol)
}

// Aggregate all updates
for update := range updates {
    processUpdate(update)
}

3. Trade-Based Price Feed

type PriceFeed struct {
    mu      sync.RWMutex
    prices  map[string]float64
}

func (pf *PriceFeed) UpdateFromTrade(trade *pb.Trade) {
    pf.mu.Lock()
    defer pf.mu.Unlock()
    pf.prices[trade.Symbol] = trade.Price
}

func (pf *PriceFeed) GetPrice(symbol string) (float64, bool) {
    pf.mu.RLock()
    defer pf.mu.RUnlock()
    price, ok := pf.prices[symbol]
    return price, ok
}

Performance Considerations

Message Sizes

MessageTypical SizeMax Size
OrderBook (20 levels)~500 bytes~2KB
OrderBook (100 levels)~2KB~8KB
OrderBookUpdate~100 bytes~1KB
Trade~150 bytes~200 bytes

Update Frequencies

StreamTypical RatePeak Rate
OrderBook (active symbol)50-100/s500/s
Trades (active symbol)5-20/s200/s

Bandwidth Planning

Per symbol:
  OrderBook stream: ~5-50 KB/s typical, ~500 KB/s peak
  Trades stream: ~1-3 KB/s typical, ~30 KB/s peak

10 symbols:
  Combined: ~60-530 KB/s typical, ~5.3 MB/s peak

Rate Limits

RPCRate LimitNotes
GetOrderBook1000/s per userCache aggressively
StreamOrderBook10 concurrent per userPer-symbol limit
GetTrades1000/s per userUse streaming for real-time
StreamTrades10 concurrent per userPer-symbol limit