Market Data RPCs
gRPC API for order book, trades, and real-time market data streaming
Market Data RPCs
Reference for market data retrieval via gRPC: GetOrderBook, StreamOrderBook, GetTrades, and StreamTrades.
Proto Definitions
Common Messages
message PriceLevel {
double price = 1; // Price level
double size = 2; // Total quantity at this price
int32 count = 3; // Number of orders at this price
}
message OrderBook {
string symbol = 1; // Trading pair
repeated PriceLevel bids = 2; // Buy orders (descending by price)
repeated PriceLevel asks = 3; // Sell orders (ascending by price)
int64 timestamp = 4; // Snapshot timestamp (Unix nanoseconds)
}
message OrderBookUpdate {
string symbol = 1;
repeated PriceLevel bid_updates = 2; // Changed bid levels
repeated PriceLevel ask_updates = 3; // Changed ask levels
int64 timestamp = 4;
}
message Trade {
uint64 trade_id = 1;
string symbol = 2;
double price = 3;
double size = 4;
OrderSide side = 5; // Taker side
uint64 buy_order_id = 6;
uint64 sell_order_id = 7;
string buyer_id = 8;
string seller_id = 9;
int64 timestamp = 10; // Execution time (Unix nanoseconds)
}GetOrderBook
Retrieve a snapshot of the current order book.
RPC Definition
rpc GetOrderBook(GetOrderBookRequest) returns (OrderBook);Request
message GetOrderBookRequest {
string symbol = 1; // Required: trading pair
int32 depth = 2; // Optional: max levels per side (default 20, max 500)
}Response
message OrderBook {
string symbol = 1;
repeated PriceLevel bids = 2; // Sorted descending by price
repeated PriceLevel asks = 3; // Sorted ascending by price
int64 timestamp = 4;
}Error Codes
| gRPC Code | Condition |
|---|---|
INVALID_ARGUMENT | Invalid symbol or depth |
NOT_FOUND | Symbol not found |
Examples
Go
book, err := client.GetOrderBook(ctx, &pb.GetOrderBookRequest{
Symbol: "LUX-USDC",
Depth: 20,
})
if err != nil {
log.Fatal(err)
}
log.Printf("Order Book for %s at %d:", book.Symbol, book.Timestamp)
log.Println("BIDS:")
for i, level := range book.Bids {
log.Printf(" %d. %.2f x %.4f (%d orders)",
i+1, level.Price, level.Size, level.Count)
}
log.Println("ASKS:")
for i, level := range book.Asks {
log.Printf(" %d. %.2f x %.4f (%d orders)",
i+1, level.Price, level.Size, level.Count)
}
// Calculate spread
if len(book.Bids) > 0 && len(book.Asks) > 0 {
spread := book.Asks[0].Price - book.Bids[0].Price
midPrice := (book.Asks[0].Price + book.Bids[0].Price) / 2
spreadBps := spread / midPrice * 10000
log.Printf("Spread: %.2f (%.1f bps)", spread, spreadBps)
}Python
book = stub.GetOrderBook(lxdex_pb2.GetOrderBookRequest(
symbol="LUX-USDC",
depth=20,
))
print(f"Order Book for {book.symbol}")
print(f"Best Bid: {book.bids[0].price} x {book.bids[0].size}")
print(f"Best Ask: {book.asks[0].price} x {book.asks[0].size}")
# Calculate total liquidity
bid_liquidity = sum(level.size for level in book.bids)
ask_liquidity = sum(level.size for level in book.asks)
print(f"Bid Liquidity: {bid_liquidity:.2f}")
print(f"Ask Liquidity: {ask_liquidity:.2f}")TypeScript
const book = await client.getOrderBook({
symbol: "LUX-USDC",
depth: 20,
});
console.log(`Best Bid: ${book.bids[0]?.price} x ${book.bids[0]?.size}`);
console.log(`Best Ask: ${book.asks[0]?.price} x ${book.asks[0]?.size}`);
// Calculate VWAP for bids
let totalSize = 0;
let totalValue = 0;
for (const level of book.bids.slice(0, 5)) {
totalSize += level.size;
totalValue += level.price * level.size;
}
const vwapBid = totalValue / totalSize;
console.log(`5-level VWAP Bid: ${vwapBid.toFixed(2)}`);StreamOrderBook
Subscribe to real-time order book updates.
RPC Definition
rpc StreamOrderBook(StreamOrderBookRequest) returns (stream OrderBookUpdate);Request
message StreamOrderBookRequest {
string symbol = 1; // Required: trading pair
int32 depth = 2; // Optional: max levels to track (default 20)
}Response Stream
message OrderBookUpdate {
string symbol = 1;
repeated PriceLevel bid_updates = 2;
repeated PriceLevel ask_updates = 3;
int64 timestamp = 4;
}Update Semantics
- First message: Full order book snapshot
- Subsequent messages: Incremental deltas
- Size = 0: Price level removed
- Batching: Updates batched at 10ms intervals
Examples
Go: Maintain Local Order Book
package main
import (
"context"
"io"
"log"
"sort"
"sync"
pb "github.com/luxfi/dex/pkg/grpc/gen"
)
type LocalOrderBook struct {
mu sync.RWMutex
symbol string
bids map[float64]*pb.PriceLevel
asks map[float64]*pb.PriceLevel
timestamp int64
}
func NewLocalOrderBook(symbol string) *LocalOrderBook {
return &LocalOrderBook{
symbol: symbol,
bids: make(map[float64]*pb.PriceLevel),
asks: make(map[float64]*pb.PriceLevel),
}
}
func (ob *LocalOrderBook) ApplyUpdate(update *pb.OrderBookUpdate) {
ob.mu.Lock()
defer ob.mu.Unlock()
for _, level := range update.BidUpdates {
if level.Size == 0 {
delete(ob.bids, level.Price)
} else {
ob.bids[level.Price] = level
}
}
for _, level := range update.AskUpdates {
if level.Size == 0 {
delete(ob.asks, level.Price)
} else {
ob.asks[level.Price] = level
}
}
ob.timestamp = update.Timestamp
}
func (ob *LocalOrderBook) BestBid() *pb.PriceLevel {
ob.mu.RLock()
defer ob.mu.RUnlock()
var best *pb.PriceLevel
for _, level := range ob.bids {
if best == nil || level.Price > best.Price {
best = level
}
}
return best
}
func (ob *LocalOrderBook) BestAsk() *pb.PriceLevel {
ob.mu.RLock()
defer ob.mu.RUnlock()
var best *pb.PriceLevel
for _, level := range ob.asks {
if best == nil || level.Price < best.Price {
best = level
}
}
return best
}
func (ob *LocalOrderBook) Spread() float64 {
bid := ob.BestBid()
ask := ob.BestAsk()
if bid == nil || ask == nil {
return 0
}
return ask.Price - bid.Price
}
func main() {
// ... setup client ...
localBook := NewLocalOrderBook("LUX-USDC")
stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: "LUX-USDC",
Depth: 20,
})
if err != nil {
log.Fatal(err)
}
for {
update, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal(err)
}
localBook.ApplyUpdate(update)
bid := localBook.BestBid()
ask := localBook.BestAsk()
if bid != nil && ask != nil {
log.Printf("BBO: %.2f x %.4f | %.2f x %.4f (spread: %.2f)",
bid.Price, bid.Size,
ask.Price, ask.Size,
localBook.Spread(),
)
}
}
}Python: Async Stream Consumer
import asyncio
import grpc
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc
@dataclass
class PriceLevel:
price: float
size: float
count: int
class LocalOrderBook:
def __init__(self, symbol: str):
self.symbol = symbol
self.bids: Dict[float, PriceLevel] = {}
self.asks: Dict[float, PriceLevel] = {}
self.timestamp = 0
def apply_update(self, update):
for level in update.bid_updates:
if level.size == 0:
self.bids.pop(level.price, None)
else:
self.bids[level.price] = PriceLevel(
level.price, level.size, level.count
)
for level in update.ask_updates:
if level.size == 0:
self.asks.pop(level.price, None)
else:
self.asks[level.price] = PriceLevel(
level.price, level.size, level.count
)
self.timestamp = update.timestamp
@property
def best_bid(self) -> PriceLevel | None:
if not self.bids:
return None
return self.bids[max(self.bids.keys())]
@property
def best_ask(self) -> PriceLevel | None:
if not self.asks:
return None
return self.asks[min(self.asks.keys())]
@property
def spread(self) -> float:
if self.best_bid and self.best_ask:
return self.best_ask.price - self.best_bid.price
return 0.0
async def stream_order_book(symbol: str):
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
book = LocalOrderBook(symbol)
request = lxdex_pb2.StreamOrderBookRequest(symbol=symbol, depth=20)
async for update in stub.StreamOrderBook(request):
book.apply_update(update)
if book.best_bid and book.best_ask:
print(f"BBO: {book.best_bid.price:.2f} x {book.best_bid.size:.4f} | "
f"{book.best_ask.price:.2f} x {book.best_ask.size:.4f} "
f"(spread: {book.spread:.2f})")
if __name__ == "__main__":
asyncio.run(stream_order_book("LUX-USDC"))GetTrades
Query recent trades for a symbol.
RPC Definition
rpc GetTrades(GetTradesRequest) returns (GetTradesResponse);Request
message GetTradesRequest {
string symbol = 1; // Required: trading pair
int32 limit = 2; // Optional: max results (default 100, max 1000)
}Response
message GetTradesResponse {
repeated Trade trades = 1; // Ordered by timestamp descending
}Examples
Go
resp, err := client.GetTrades(ctx, &pb.GetTradesRequest{
Symbol: "LUX-USDC",
Limit: 50,
})
if err != nil {
log.Fatal(err)
}
log.Printf("Recent %d trades for LUX-USDC:", len(resp.Trades))
for _, trade := range resp.Trades {
side := "BUY"
if trade.Side == pb.OrderSide_SELL {
side = "SELL"
}
log.Printf(" %d: %s %.4f @ %.2f (taker: %s)",
trade.TradeId,
side,
trade.Size,
trade.Price,
side,
)
}
// Calculate VWAP
var totalSize, totalValue float64
for _, trade := range resp.Trades {
totalSize += trade.Size
totalValue += trade.Price * trade.Size
}
vwap := totalValue / totalSize
log.Printf("VWAP (last %d trades): %.2f", len(resp.Trades), vwap)Python
response = stub.GetTrades(lxdex_pb2.GetTradesRequest(
symbol="LUX-USDC",
limit=100,
))
# Group by taker side
buy_volume = sum(t.size for t in response.trades if t.side == lxdex_pb2.BUY)
sell_volume = sum(t.size for t in response.trades if t.side == lxdex_pb2.SELL)
print(f"Buy Volume: {buy_volume:.2f}")
print(f"Sell Volume: {sell_volume:.2f}")
print(f"Buy/Sell Ratio: {buy_volume / sell_volume:.2f}" if sell_volume > 0 else "N/A")StreamTrades
Subscribe to real-time trade feed.
RPC Definition
rpc StreamTrades(StreamTradesRequest) returns (stream Trade);Request
message StreamTradesRequest {
string symbol = 1; // Required: trading pair
}Response Stream
Each message is a single Trade as executed.
Examples
Go: Real-Time Trade Monitor
stream, err := client.StreamTrades(ctx, &pb.StreamTradesRequest{
Symbol: "LUX-USDC",
})
if err != nil {
log.Fatal(err)
}
// Track rolling statistics
var (
windowSize = 100
recentPrices []float64
totalVolume float64
)
for {
trade, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal(err)
}
// Update rolling window
recentPrices = append(recentPrices, trade.Price)
if len(recentPrices) > windowSize {
recentPrices = recentPrices[1:]
}
totalVolume += trade.Size
// Calculate stats
var sum float64
for _, p := range recentPrices {
sum += p
}
avgPrice := sum / float64(len(recentPrices))
side := "BUY "
if trade.Side == pb.OrderSide_SELL {
side = "SELL"
}
log.Printf("[%s] %.4f @ %.2f | Avg: %.2f | Vol: %.2f",
side, trade.Size, trade.Price, avgPrice, totalVolume)
}Python: Trade Volume Aggregator
import asyncio
from collections import defaultdict
from datetime import datetime
import grpc
from lxdex_grpc import lxdex_pb2, lxdex_pb2_grpc
class VolumeAggregator:
def __init__(self, bucket_seconds: int = 60):
self.bucket_seconds = bucket_seconds
self.buckets = defaultdict(lambda: {"buy": 0.0, "sell": 0.0, "count": 0})
def add_trade(self, trade):
bucket_time = trade.timestamp // (self.bucket_seconds * 1_000_000_000)
bucket = self.buckets[bucket_time]
if trade.side == lxdex_pb2.BUY:
bucket["buy"] += trade.size
else:
bucket["sell"] += trade.size
bucket["count"] += 1
def current_bucket_stats(self):
current_bucket = int(datetime.now().timestamp()) // self.bucket_seconds
return self.buckets.get(current_bucket, {"buy": 0, "sell": 0, "count": 0})
async def stream_and_aggregate(symbol: str):
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = lxdex_pb2_grpc.LXDEXServiceStub(channel)
aggregator = VolumeAggregator(bucket_seconds=60)
request = lxdex_pb2.StreamTradesRequest(symbol=symbol)
async for trade in stub.StreamTrades(request):
aggregator.add_trade(trade)
stats = aggregator.current_bucket_stats()
side = "BUY" if trade.side == lxdex_pb2.BUY else "SELL"
print(f"[{side}] {trade.size:.4f} @ {trade.price:.2f} | "
f"1min Vol: B={stats['buy']:.2f} S={stats['sell']:.2f} "
f"({stats['count']} trades)")
if __name__ == "__main__":
asyncio.run(stream_and_aggregate("LUX-USDC"))Market Data Patterns
1. Order Book Snapshot + Stream
Best practice: get snapshot first, then subscribe to updates.
// 1. Get initial snapshot
book, err := client.GetOrderBook(ctx, &pb.GetOrderBookRequest{
Symbol: "LUX-USDC",
Depth: 100,
})
if err != nil {
log.Fatal(err)
}
// 2. Initialize local book
localBook := NewLocalOrderBook("LUX-USDC")
localBook.InitFromSnapshot(book)
// 3. Subscribe to updates
stream, err := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: "LUX-USDC",
Depth: 100,
})
if err != nil {
log.Fatal(err)
}
// 4. Apply incremental updates
for {
update, err := stream.Recv()
if err != nil {
break
}
localBook.ApplyUpdate(update)
}2. Multi-Symbol Aggregation
symbols := []string{"LUX-USDC", "ETH-USDC", "BTC-USDC"}
updates := make(chan *pb.OrderBookUpdate, 1000)
for _, symbol := range symbols {
go func(sym string) {
stream, _ := client.StreamOrderBook(ctx, &pb.StreamOrderBookRequest{
Symbol: sym,
})
for {
update, err := stream.Recv()
if err != nil {
return
}
updates <- update
}
}(symbol)
}
// Aggregate all updates
for update := range updates {
processUpdate(update)
}3. Trade-Based Price Feed
type PriceFeed struct {
mu sync.RWMutex
prices map[string]float64
}
func (pf *PriceFeed) UpdateFromTrade(trade *pb.Trade) {
pf.mu.Lock()
defer pf.mu.Unlock()
pf.prices[trade.Symbol] = trade.Price
}
func (pf *PriceFeed) GetPrice(symbol string) (float64, bool) {
pf.mu.RLock()
defer pf.mu.RUnlock()
price, ok := pf.prices[symbol]
return price, ok
}Performance Considerations
Message Sizes
| Message | Typical Size | Max Size |
|---|---|---|
| OrderBook (20 levels) | ~500 bytes | ~2KB |
| OrderBook (100 levels) | ~2KB | ~8KB |
| OrderBookUpdate | ~100 bytes | ~1KB |
| Trade | ~150 bytes | ~200 bytes |
Update Frequencies
| Stream | Typical Rate | Peak Rate |
|---|---|---|
| OrderBook (active symbol) | 50-100/s | 500/s |
| Trades (active symbol) | 5-20/s | 200/s |
Bandwidth Planning
Per symbol:
OrderBook stream: ~5-50 KB/s typical, ~500 KB/s peak
Trades stream: ~1-3 KB/s typical, ~30 KB/s peak
10 symbols:
Combined: ~60-530 KB/s typical, ~5.3 MB/s peakRate Limits
| RPC | Rate Limit | Notes |
|---|---|---|
| GetOrderBook | 1000/s per user | Cache aggressively |
| StreamOrderBook | 10 concurrent per user | Per-symbol limit |
| GetTrades | 1000/s per user | Use streaming for real-time |
| StreamTrades | 10 concurrent per user | Per-symbol limit |