Python SDK
Trades
Trade history, streaming trades, and trade analysis with pandas
Trades
Access historical trades, stream real-time trade executions, and analyze trading activity with pandas.
Trade Model
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Trade:
"""Trade execution record."""
trade_id: str
symbol: str
price: float
size: float
side: str # "buy" or "sell" (taker side)
timestamp: int # Unix milliseconds
maker_order_id: str
taker_order_id: str
fee: float
fee_currency: str
@property
def notional(self) -> float:
"""Calculate trade notional value."""
return self.price * self.size
@property
def datetime(self) -> datetime:
"""Convert timestamp to datetime."""
return datetime.fromtimestamp(self.timestamp / 1000)
def __str__(self) -> str:
return f"{self.side.upper()} {self.size:.4f} @ {self.price:.2f}"Get Recent Trades
Basic Usage
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
# Get last 100 trades (default)
trades = client.get_trades("BTC-USD")
print(f"Retrieved {len(trades)} trades")
for trade in trades[:10]:
print(f" {trade.datetime}: {trade.side.upper()} "
f"{trade.size:.4f} @ {trade.price:.2f}")Specify Limit
# Get last 500 trades
trades = client.get_trades("BTC-USD", limit=500)
# Get last 1000 trades
trades = client.get_trades("BTC-USD", limit=1000)
# Get all available trades
trades = client.get_trades("BTC-USD", limit=0)Time Range Query
from datetime import datetime, timedelta
# Get trades from last hour
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
trades = client.get_trades(
symbol="BTC-USD",
start_time=int(start_time.timestamp() * 1000),
end_time=int(end_time.timestamp() * 1000)
)
print(f"Trades in last hour: {len(trades)}")
# Get trades from specific time range
trades = client.get_trades(
symbol="BTC-USD",
start_time=1700000000000, # Unix ms
end_time=1700100000000
)Pagination
# Paginate through trade history
all_trades: list[Trade] = []
last_trade_id: str | None = None
while True:
trades = client.get_trades(
symbol="BTC-USD",
limit=1000,
before=last_trade_id
)
if not trades:
break
all_trades.extend(trades)
last_trade_id = trades[-1].trade_id
print(f"Fetched {len(all_trades)} trades...")
print(f"Total trades: {len(all_trades)}")pandas DataFrame Integration
Convert to DataFrame
import pandas as pd
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)
# Convert to DataFrame
df = pd.DataFrame([
{
"trade_id": t.trade_id,
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size,
"side": t.side,
"notional": t.notional,
"fee": t.fee
}
for t in trades
])
df.set_index("timestamp", inplace=True)
print(df.head(10))Built-in to_dataframe
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)
# Direct conversion using Trade list helper
from lux_dex.utils import trades_to_dataframe
df = trades_to_dataframe(trades)
print(df.describe())Trade Analysis
import pandas as pd
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)
df = pd.DataFrame([
{
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size,
"side": t.side,
"notional": t.notional
}
for t in trades
])
df.set_index("timestamp", inplace=True)
# Basic statistics
print("=== Trade Statistics ===")
print(f"Total trades: {len(df)}")
print(f"Total volume: {df['size'].sum():.4f}")
print(f"Total notional: ${df['notional'].sum():,.2f}")
print(f"Average price: {df['price'].mean():.2f}")
print(f"Price range: {df['price'].min():.2f} - {df['price'].max():.2f}")
print()
# By side
by_side = df.groupby("side").agg({
"size": ["count", "sum", "mean"],
"notional": "sum"
})
print("By Side:")
print(by_side)
print()
# VWAP
vwap = np.average(df["price"], weights=df["size"])
print(f"VWAP: {vwap:.2f}")
# Buy/sell imbalance
buy_volume = df[df["side"] == "buy"]["size"].sum()
sell_volume = df[df["side"] == "sell"]["size"].sum()
imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume)
print(f"Buy/Sell imbalance: {imbalance:.3f}")OHLCV Aggregation
import pandas as pd
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=10000)
df = pd.DataFrame([
{
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size
}
for t in trades
])
df.set_index("timestamp", inplace=True)
# Resample to 1-minute OHLCV
ohlcv = df.resample("1min").agg({
"price": ["first", "max", "min", "last"],
"size": "sum"
})
ohlcv.columns = ["open", "high", "low", "close", "volume"]
ohlcv = ohlcv.dropna()
print(ohlcv.tail(10))
# Calculate returns
ohlcv["returns"] = ohlcv["close"].pct_change()
print(f"\nVolatility (1min): {ohlcv['returns'].std() * 100:.4f}%")Trade Flow Analysis
import pandas as pd
import numpy as np
from lux_dex import Client
from datetime import datetime, timedelta
client = Client("http://localhost:8080/rpc")
# Get last 24 hours of trades
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
trades = client.get_trades(
symbol="BTC-USD",
start_time=int(start_time.timestamp() * 1000),
end_time=int(end_time.timestamp() * 1000),
limit=0 # All trades
)
df = pd.DataFrame([
{
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size,
"side": t.side,
"notional": t.notional
}
for t in trades
])
df.set_index("timestamp", inplace=True)
# Add signed volume (positive for buys, negative for sells)
df["signed_volume"] = df.apply(
lambda x: x["size"] if x["side"] == "buy" else -x["size"],
axis=1
)
# Hourly analysis
hourly = df.resample("1h").agg({
"price": "last",
"size": "sum",
"notional": "sum",
"signed_volume": "sum"
})
hourly["net_flow"] = hourly["signed_volume"]
hourly["buy_pressure"] = (hourly["signed_volume"] / hourly["size"]).fillna(0)
print("=== Hourly Trade Flow ===")
print(hourly[["price", "size", "notional", "net_flow", "buy_pressure"]].tail(24))
# Find hours with strong imbalance
strong_buy = hourly[hourly["buy_pressure"] > 0.6]
strong_sell = hourly[hourly["buy_pressure"] < -0.6]
print(f"\nHours with strong buy pressure: {len(strong_buy)}")
print(f"Hours with strong sell pressure: {len(strong_sell)}")Stream Trades
Synchronous Subscription
from lux_dex import Client
from lux_dex.types import Trade
client = Client(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
)
def on_trade(trade: Trade) -> None:
"""Handle new trade."""
emoji = "+" if trade.side == "buy" else "-"
print(f"[{trade.datetime.strftime('%H:%M:%S.%f')[:-3]}] "
f"{emoji} {trade.size:.4f} @ {trade.price:.2f}")
# Subscribe to trades
client.subscribe_trades("BTC-USD", callback=on_trade)
# Keep connection alive
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.unsubscribe_trades("BTC-USD")Async Stream
import asyncio
from lux_dex import AsyncClient
async def stream_trades():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
trade_count = 0
total_volume = 0.0
async for trade in client.stream_trades("BTC-USD"):
trade_count += 1
total_volume += trade.size
print(f"Trade #{trade_count}: {trade.side.upper()} "
f"{trade.size:.4f} @ {trade.price:.2f}")
# Stop after 100 trades
if trade_count >= 100:
print(f"\nTotal volume: {total_volume:.4f}")
break
asyncio.run(stream_trades())Multiple Symbol Streams
import asyncio
from lux_dex import AsyncClient
async def stream_multiple():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
symbols = ["BTC-USD", "ETH-USD", "SOL-USD"]
async def process_trades(symbol: str):
async for trade in client.stream_trades(symbol):
print(f"{symbol}: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")
await asyncio.gather(*[
process_trades(symbol) for symbol in symbols
])
asyncio.run(stream_multiple())Real-time Aggregation
import asyncio
from collections import deque
from dataclasses import dataclass, field
from lux_dex import AsyncClient
from lux_dex.types import Trade
@dataclass
class TradeAggregator:
"""Aggregate trades in real-time."""
window_seconds: int = 60
trades: deque = field(default_factory=deque)
def add_trade(self, trade: Trade) -> None:
"""Add trade and remove old ones."""
self.trades.append(trade)
cutoff = trade.timestamp - (self.window_seconds * 1000)
while self.trades and self.trades[0].timestamp < cutoff:
self.trades.popleft()
@property
def volume(self) -> float:
return sum(t.size for t in self.trades)
@property
def vwap(self) -> float:
if not self.trades:
return 0.0
total_notional = sum(t.price * t.size for t in self.trades)
total_volume = sum(t.size for t in self.trades)
return total_notional / total_volume if total_volume > 0 else 0.0
@property
def buy_volume(self) -> float:
return sum(t.size for t in self.trades if t.side == "buy")
@property
def sell_volume(self) -> float:
return sum(t.size for t in self.trades if t.side == "sell")
async def main():
agg = TradeAggregator(window_seconds=60)
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
async for trade in client.stream_trades("BTC-USD"):
agg.add_trade(trade)
print(f"60s window: Volume={agg.volume:.4f}, "
f"VWAP={agg.vwap:.2f}, "
f"Buy%={agg.buy_volume/agg.volume*100:.1f}%")
asyncio.run(main())User Trades
from lux_dex import Client
client = Client(
json_rpc_url="http://localhost:8080/rpc",
api_key="your-api-key",
api_secret="your-api-secret"
)
# Get user's trades
my_trades = client.get_user_trades(
symbol="BTC-USD",
limit=100
)
for trade in my_trades:
print(f"{trade.datetime}: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")
print(f" Fee: {trade.fee} {trade.fee_currency}")
# Get trades for specific order
order_trades = client.get_trades_by_order("ord_123456")
print(f"Order had {len(order_trades)} fills")Stream User Trades
import asyncio
from lux_dex import AsyncClient
async def stream_my_trades():
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081",
api_key="your-api-key",
api_secret="your-api-secret"
) as client:
async for trade in client.stream_user_trades():
print(f"My trade: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")
print(f" Order ID: {trade.taker_order_id}")
print(f" Fee: {trade.fee} {trade.fee_currency}")
asyncio.run(stream_my_trades())Trade Tape Analysis
import pandas as pd
import numpy as np
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=5000)
df = pd.DataFrame([
{
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size,
"side": t.side
}
for t in trades
])
# Classify trade sizes
def classify_size(size: float) -> str:
if size < 0.1:
return "small"
elif size < 1.0:
return "medium"
elif size < 10.0:
return "large"
else:
return "whale"
df["size_class"] = df["size"].apply(classify_size)
# Analyze by size class
by_class = df.groupby(["side", "size_class"]).agg({
"size": ["count", "sum", "mean"],
"price": "mean"
}).reset_index()
print("=== Trade Tape Analysis ===")
print(by_class)
# Large trade detection
large_trades = df[df["size"] >= 1.0]
print(f"\nLarge trades (>=1.0): {len(large_trades)}")
print(f"Large trade volume: {large_trades['size'].sum():.4f}")
# Calculate trade arrival rate
df["inter_arrival"] = df["timestamp"].diff().dt.total_seconds() * 1000
print(f"\nAverage inter-arrival time: {df['inter_arrival'].mean():.2f}ms")
print(f"Median inter-arrival time: {df['inter_arrival'].median():.2f}ms")Jupyter Notebook Examples
# Cell 1: Setup
import nest_asyncio
nest_asyncio.apply()
import pandas as pd
import numpy as np
from lux_dex import AsyncClient
# Cell 2: Fetch trades
async def fetch_trades(symbol: str, limit: int = 1000):
async with AsyncClient("http://localhost:8080/rpc") as client:
trades = await client.get_trades(symbol, limit=limit)
return pd.DataFrame([
{
"timestamp": pd.Timestamp(t.timestamp, unit="ms"),
"price": t.price,
"size": t.size,
"side": t.side
}
for t in trades
])
df = await fetch_trades("BTC-USD", 5000)
df.set_index("timestamp", inplace=True)
display(df.head(10))
# Cell 3: Basic analysis
print(f"Trades: {len(df)}")
print(f"Time span: {df.index[0]} to {df.index[-1]}")
print(f"Volume: {df['size'].sum():.4f}")
print(f"Price range: {df['price'].min():.2f} - {df['price'].max():.2f}")
# Cell 4: Visualization
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
# Price over time
axes[0].plot(df.index, df["price"], linewidth=0.5)
axes[0].set_ylabel("Price")
axes[0].set_title("BTC-USD Trade Tape")
# Volume bars colored by side
buys = df[df["side"] == "buy"]
sells = df[df["side"] == "sell"]
axes[1].bar(buys.index, buys["size"], color="green", alpha=0.7, width=0.0001)
axes[1].bar(sells.index, -sells["size"], color="red", alpha=0.7, width=0.0001)
axes[1].set_ylabel("Volume")
# Cumulative volume
df["cum_volume"] = df["size"].cumsum()
df["signed_cum_volume"] = df.apply(
lambda x: x["size"] if x["side"] == "buy" else -x["size"],
axis=1
).cumsum()
axes[2].plot(df.index, df["signed_cum_volume"], linewidth=1)
axes[2].axhline(y=0, color="black", linestyle="--", linewidth=0.5)
axes[2].set_ylabel("Net Volume")
axes[2].set_xlabel("Time")
plt.tight_layout()
plt.show()
# Cell 5: Stream trades in notebook
async def stream_and_display(duration_seconds: int = 30):
trades = []
async with AsyncClient(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081"
) as client:
import time
start = time.time()
async for trade in client.stream_trades("BTC-USD"):
trades.append({
"time": pd.Timestamp.now(),
"price": trade.price,
"size": trade.size,
"side": trade.side
})
if time.time() - start > duration_seconds:
break
return pd.DataFrame(trades)
live_df = await stream_and_display(30)
display(live_df)
print(f"Captured {len(live_df)} trades in 30 seconds")Error Handling
from lux_dex import Client
from lux_dex.exceptions import (
SymbolNotFoundError,
ConnectionError,
RateLimitError
)
client = Client("http://localhost:8080/rpc")
try:
trades = client.get_trades("INVALID-SYMBOL")
except SymbolNotFoundError:
print("Symbol not found")
except ConnectionError as e:
print(f"Connection failed: {e}")
except RateLimitError as e:
import time
print(f"Rate limited. Waiting {e.retry_after}s...")
time.sleep(e.retry_after)
trades = client.get_trades("BTC-USD")Next Steps
- Account - Balance and portfolio management
- WebSocket - Real-time streaming details
- Async Patterns - Advanced async usage