Python SDK
Order Book
Order book snapshots, subscriptions, and pandas DataFrame integration
Order Book
Access real-time order book data with support for snapshots, streaming updates, and pandas DataFrame analysis.
Order Book Model
from dataclasses import dataclass
from typing import Optional
@dataclass
class OrderBookLevel:
"""Single price level in order book."""
price: float
size: float
order_count: int
def __str__(self) -> str:
return f"{self.size:.4f} @ {self.price:.2f}"
@dataclass
class OrderBook:
"""Order book snapshot."""
symbol: str
bids: list[OrderBookLevel] # Sorted by price descending
asks: list[OrderBookLevel] # Sorted by price ascending
timestamp: int # Unix milliseconds
sequence: int # Sequence number for updates
@property
def best_bid(self) -> Optional[OrderBookLevel]:
"""Get best bid (highest buy price)."""
return self.bids[0] if self.bids else None
@property
def best_ask(self) -> Optional[OrderBookLevel]:
"""Get best ask (lowest sell price)."""
return self.asks[0] if self.asks else None
@property
def spread(self) -> float:
"""Calculate bid-ask spread."""
if self.best_bid and self.best_ask:
return self.best_ask.price - self.best_bid.price
return float("inf")
@property
def spread_bps(self) -> float:
"""Calculate spread in basis points."""
if self.best_bid and self.best_ask:
mid = (self.best_bid.price + self.best_ask.price) / 2
return (self.spread / mid) * 10000
return float("inf")
@property
def mid_price(self) -> float:
"""Calculate mid price."""
if self.best_bid and self.best_ask:
return (self.best_bid.price + self.best_ask.price) / 2
return 0.0Get Order Book Snapshot
Basic Usage
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
# Get order book with default depth (10 levels)
book = client.get_order_book("BTC-USD")
print(f"Symbol: {book.symbol}")
print(f"Best Bid: {book.best_bid}")
print(f"Best Ask: {book.best_ask}")
print(f"Spread: {book.spread:.2f} ({book.spread_bps:.2f} bps)")
print(f"Mid Price: {book.mid_price:.2f}")
# Print top of book
print("\nBids:")
for level in book.bids[:5]:
print(f" {level.size:.4f} @ {level.price:.2f}")
print("\nAsks:")
for level in book.asks[:5]:
print(f" {level.size:.4f} @ {level.price:.2f}")Specify Depth
# Get full depth (all levels)
full_book = client.get_order_book("BTC-USD", depth=0)
print(f"Total bid levels: {len(full_book.bids)}")
print(f"Total ask levels: {len(full_book.asks)}")
# Get top 5 levels only
top_book = client.get_order_book("BTC-USD", depth=5)
# Get top 100 levels
deep_book = client.get_order_book("BTC-USD", depth=100)Multiple Symbols
import asyncio
from lux_dex import AsyncClient
async def get_multiple_books():
async with AsyncClient("http://localhost:8080/rpc") as client:
symbols = ["BTC-USD", "ETH-USD", "SOL-USD", "AVAX-USD"]
books = await asyncio.gather(*[
client.get_order_book(symbol, depth=10)
for symbol in symbols
])
for book in books:
print(f"{book.symbol}: {book.spread:.2f} spread ({book.spread_bps:.1f} bps)")
asyncio.run(get_multiple_books())Best Bid/Ask
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
# Get best bid only
best_bid = client.get_best_bid("BTC-USD")
print(f"Best bid: {best_bid.size} @ {best_bid.price}")
# Get best ask only
best_ask = client.get_best_ask("BTC-USD")
print(f"Best ask: {best_ask.size} @ {best_ask.price}")
# Calculate spread
spread = best_ask.price - best_bid.price
mid = (best_bid.price + best_ask.price) / 2
spread_bps = (spread / mid) * 10000
print(f"Spread: {spread:.2f} ({spread_bps:.2f} bps)")pandas DataFrame Integration
Convert to DataFrame
import pandas as pd
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
book = client.get_order_book("BTC-USD", depth=20)
# Convert bids to DataFrame
bids_df = pd.DataFrame([
{"price": level.price, "size": level.size, "orders": level.order_count}
for level in book.bids
])
bids_df["side"] = "bid"
bids_df["cumulative_size"] = bids_df["size"].cumsum()
# Convert asks to DataFrame
asks_df = pd.DataFrame([
{"price": level.price, "size": level.size, "orders": level.order_count}
for level in book.asks
])
asks_df["side"] = "ask"
asks_df["cumulative_size"] = asks_df["size"].cumsum()
# Combine
df = pd.concat([bids_df, asks_df], ignore_index=True)
print(df)Built-in to_dataframe Method
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
book = client.get_order_book("BTC-USD", depth=50)
# Direct conversion
df = book.to_dataframe()
# DataFrame columns: price, size, orders, side, cumulative_size, notional
print(df.head(10))
print(df.tail(10))Order Book Analysis
import pandas as pd
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
book = client.get_order_book("BTC-USD", depth=100)
df = book.to_dataframe()
# Calculate metrics
bids = df[df["side"] == "bid"]
asks = df[df["side"] == "ask"]
print("=== Order Book Analysis ===")
print(f"Symbol: {book.symbol}")
print(f"Timestamp: {pd.Timestamp(book.timestamp, unit='ms')}")
print()
# Liquidity analysis
print("Liquidity:")
print(f" Total bid volume: {bids['size'].sum():.4f}")
print(f" Total ask volume: {asks['size'].sum():.4f}")
print(f" Bid notional: ${bids['notional'].sum():,.2f}")
print(f" Ask notional: ${asks['notional'].sum():,.2f}")
print()
# Spread analysis
print("Spread:")
print(f" Best bid: {book.best_bid.price:.2f}")
print(f" Best ask: {book.best_ask.price:.2f}")
print(f" Spread: {book.spread:.2f} ({book.spread_bps:.2f} bps)")
print(f" Mid price: {book.mid_price:.2f}")
print()
# Depth analysis
print("Depth (1% from mid):")
mid = book.mid_price
bid_depth_1pct = bids[bids["price"] >= mid * 0.99]["size"].sum()
ask_depth_1pct = asks[asks["price"] <= mid * 1.01]["size"].sum()
print(f" Bid depth (1%): {bid_depth_1pct:.4f}")
print(f" Ask depth (1%): {ask_depth_1pct:.4f}")
# Order concentration
print("\nOrder concentration:")
print(f" Bid orders: {bids['orders'].sum()}")
print(f" Ask orders: {asks['orders'].sum()}")
print(f" Avg bid size: {bids['size'].mean():.4f}")
print(f" Avg ask size: {asks['size'].mean():.4f}")Volume at Price
import pandas as pd
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
book = client.get_order_book("BTC-USD", depth=200)
df = book.to_dataframe()
# Group by price buckets
df["price_bucket"] = (df["price"] // 100) * 100 # $100 buckets
volume_profile = df.groupby(["price_bucket", "side"]).agg({
"size": "sum",
"orders": "sum"
}).reset_index()
print(volume_profile)
# Find high volume prices
bids = df[df["side"] == "bid"]
asks = df[df["side"] == "ask"]
print("\nTop 5 bid levels by volume:")
print(bids.nlargest(5, "size")[["price", "size", "orders"]])
print("\nTop 5 ask levels by volume:")
print(asks.nlargest(5, "size")[["price", "size", "orders"]])Stream Order Book
Synchronous Subscription
from lux_dex import Client
client = Client(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
)
def on_book_update(book: OrderBook) -> None:
"""Handle order book update."""
print(f"[{book.sequence}] {book.symbol}: "
f"Bid {book.best_bid.price:.2f} / Ask {book.best_ask.price:.2f} "
f"(spread: {book.spread_bps:.1f} bps)")
# Subscribe to order book
client.subscribe_order_book(
symbol="BTC-USD",
depth=10,
callback=on_book_update
)
# Keep connection alive
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.unsubscribe_order_book("BTC-USD")Async Stream
import asyncio
from lux_dex import AsyncClient
async def stream_order_book():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
async for book in client.stream_order_book("BTC-USD", depth=10):
print(f"[{book.sequence}] Spread: {book.spread:.2f}")
# Break on wide spread
if book.spread_bps > 50:
print("Wide spread detected!")
break
asyncio.run(stream_order_book())Multiple Symbol Streams
import asyncio
from lux_dex import AsyncClient
async def stream_multiple_books():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
symbols = ["BTC-USD", "ETH-USD", "SOL-USD"]
async def process_symbol(symbol: str):
async for book in client.stream_order_book(symbol, depth=5):
print(f"{symbol}: {book.spread_bps:.1f} bps")
# Stream all symbols concurrently
await asyncio.gather(*[
process_symbol(symbol) for symbol in symbols
])
asyncio.run(stream_multiple_books())Order Book Delta Updates
For high-frequency applications, use delta updates instead of full snapshots:
from dataclasses import dataclass
from lux_dex import AsyncClient
from lux_dex.types import OrderBookDelta, DeltaType
@dataclass
class OrderBookDelta:
"""Order book incremental update."""
symbol: str
sequence: int
delta_type: DeltaType # ADD, UPDATE, DELETE
side: str # "bid" or "ask"
price: float
size: float # New size (0 for DELETE)
async def process_deltas():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
# Get initial snapshot
book = await client.get_order_book("BTC-USD", depth=100)
local_book = build_local_book(book)
# Apply deltas
async for delta in client.stream_order_book_deltas("BTC-USD"):
apply_delta(local_book, delta)
def apply_delta(local_book: dict, delta: OrderBookDelta) -> None:
"""Apply delta to local order book."""
side = local_book["bids" if delta.side == "bid" else "asks"]
match delta.delta_type:
case DeltaType.ADD | DeltaType.UPDATE:
side[delta.price] = delta.size
case DeltaType.DELETE:
side.pop(delta.price, None)Order Book Imbalance
import pandas as pd
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
def calculate_imbalance(book: OrderBook, levels: int = 5) -> float:
"""Calculate order book imbalance.
Returns value between -1 (all asks) and 1 (all bids).
"""
bid_volume = sum(level.size for level in book.bids[:levels])
ask_volume = sum(level.size for level in book.asks[:levels])
total = bid_volume + ask_volume
if total == 0:
return 0.0
return (bid_volume - ask_volume) / total
# Monitor imbalance
book = client.get_order_book("BTC-USD", depth=10)
imbalance = calculate_imbalance(book, levels=5)
print(f"Order book imbalance: {imbalance:.3f}")
if imbalance > 0.3:
print("Strong bid pressure - bullish signal")
elif imbalance < -0.3:
print("Strong ask pressure - bearish signal")
else:
print("Balanced order book")numpy Integration
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
book = client.get_order_book("BTC-USD", depth=100)
# Convert to numpy arrays
bid_prices = np.array([level.price for level in book.bids])
bid_sizes = np.array([level.size for level in book.bids])
ask_prices = np.array([level.price for level in book.asks])
ask_sizes = np.array([level.size for level in book.asks])
# Weighted average price
bid_vwap = np.average(bid_prices, weights=bid_sizes)
ask_vwap = np.average(ask_prices, weights=ask_sizes)
print(f"Bid VWAP: {bid_vwap:.2f}")
print(f"Ask VWAP: {ask_vwap:.2f}")
# Cumulative depth
bid_cumulative = np.cumsum(bid_sizes)
ask_cumulative = np.cumsum(ask_sizes)
# Find depth to move price 1%
mid = book.mid_price
bid_1pct = mid * 0.99
ask_1pct = mid * 1.01
bid_depth_1pct = bid_sizes[bid_prices >= bid_1pct].sum()
ask_depth_1pct = ask_sizes[ask_prices <= ask_1pct].sum()
print(f"Depth to move bid 1%: {bid_depth_1pct:.4f}")
print(f"Depth to move ask 1%: {ask_depth_1pct:.4f}")Jupyter Notebook Examples
# Cell 1: Setup and imports
import nest_asyncio
nest_asyncio.apply()
import pandas as pd
import numpy as np
from lux_dex import AsyncClient
# Cell 2: Fetch and analyze order book
async def analyze_book():
async with AsyncClient("http://localhost:8080/rpc") as client:
book = await client.get_order_book("BTC-USD", depth=50)
return book.to_dataframe()
df = await analyze_book()
display(df.head(10))
# Cell 3: Visualize (if matplotlib installed)
import matplotlib.pyplot as plt
bids = df[df["side"] == "bid"].sort_values("price", ascending=False)
asks = df[df["side"] == "ask"].sort_values("price")
fig, ax = plt.subplots(figsize=(12, 6))
ax.barh(bids["price"], bids["size"], color="green", alpha=0.6, label="Bids")
ax.barh(asks["price"], -asks["size"], color="red", alpha=0.6, label="Asks")
ax.set_xlabel("Size")
ax.set_ylabel("Price")
ax.set_title("BTC-USD Order Book")
ax.legend()
plt.tight_layout()
plt.show()
# Cell 4: Continuous monitoring
async def monitor_spread(duration_seconds: int = 60):
spreads = []
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
import time
start = time.time()
async for book in client.stream_order_book("BTC-USD", depth=5):
spreads.append({
"timestamp": pd.Timestamp.now(),
"spread_bps": book.spread_bps,
"mid_price": book.mid_price
})
if time.time() - start > duration_seconds:
break
return pd.DataFrame(spreads)
spread_df = await monitor_spread(30)
display(spread_df.describe())Error Handling
from lux_dex import Client
from lux_dex.exceptions import (
SymbolNotFoundError,
ConnectionError,
RateLimitError
)
client = Client("http://localhost:8080/rpc")
try:
book = client.get_order_book("INVALID-SYMBOL")
except SymbolNotFoundError:
print("Symbol not found")
except ConnectionError as e:
print(f"Connection failed: {e}")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")Next Steps
- Trades - Trade history and streaming
- WebSocket - Real-time data streaming
- Async Patterns - Advanced async usage