Python SDK

Trades

Trade history, streaming trades, and trade analysis with pandas

Trades

Access historical trades, stream real-time trade executions, and analyze trading activity with pandas.

Trade Model

from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class Trade:
    """Trade execution record."""
    trade_id: str
    symbol: str
    price: float
    size: float
    side: str                    # "buy" or "sell" (taker side)
    timestamp: int               # Unix milliseconds
    maker_order_id: str
    taker_order_id: str
    fee: float
    fee_currency: str

    @property
    def notional(self) -> float:
        """Calculate trade notional value."""
        return self.price * self.size

    @property
    def datetime(self) -> datetime:
        """Convert timestamp to datetime."""
        return datetime.fromtimestamp(self.timestamp / 1000)

    def __str__(self) -> str:
        return f"{self.side.upper()} {self.size:.4f} @ {self.price:.2f}"

Get Recent Trades

Basic Usage

from lux_dex import Client

client = Client("http://localhost:8080/rpc")

# Get last 100 trades (default)
trades = client.get_trades("BTC-USD")

print(f"Retrieved {len(trades)} trades")
for trade in trades[:10]:
    print(f"  {trade.datetime}: {trade.side.upper()} "
          f"{trade.size:.4f} @ {trade.price:.2f}")

Specify Limit

# Get last 500 trades
trades = client.get_trades("BTC-USD", limit=500)

# Get last 1000 trades
trades = client.get_trades("BTC-USD", limit=1000)

# Get all available trades
trades = client.get_trades("BTC-USD", limit=0)

Time Range Query

from datetime import datetime, timedelta

# Get trades from last hour
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)

trades = client.get_trades(
    symbol="BTC-USD",
    start_time=int(start_time.timestamp() * 1000),
    end_time=int(end_time.timestamp() * 1000)
)

print(f"Trades in last hour: {len(trades)}")

# Get trades from specific time range
trades = client.get_trades(
    symbol="BTC-USD",
    start_time=1700000000000,  # Unix ms
    end_time=1700100000000
)

Pagination

# Paginate through trade history
all_trades: list[Trade] = []
last_trade_id: str | None = None

while True:
    trades = client.get_trades(
        symbol="BTC-USD",
        limit=1000,
        before=last_trade_id
    )

    if not trades:
        break

    all_trades.extend(trades)
    last_trade_id = trades[-1].trade_id

    print(f"Fetched {len(all_trades)} trades...")

print(f"Total trades: {len(all_trades)}")

pandas DataFrame Integration

Convert to DataFrame

import pandas as pd
from lux_dex import Client

client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)

# Convert to DataFrame
df = pd.DataFrame([
    {
        "trade_id": t.trade_id,
        "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
        "price": t.price,
        "size": t.size,
        "side": t.side,
        "notional": t.notional,
        "fee": t.fee
    }
    for t in trades
])

df.set_index("timestamp", inplace=True)
print(df.head(10))

Built-in to_dataframe

from lux_dex import Client

client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)

# Direct conversion using Trade list helper
from lux_dex.utils import trades_to_dataframe

df = trades_to_dataframe(trades)
print(df.describe())

Trade Analysis

import pandas as pd
import numpy as np
from lux_dex import Client

client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=1000)

df = pd.DataFrame([
    {
        "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
        "price": t.price,
        "size": t.size,
        "side": t.side,
        "notional": t.notional
    }
    for t in trades
])

df.set_index("timestamp", inplace=True)

# Basic statistics
print("=== Trade Statistics ===")
print(f"Total trades: {len(df)}")
print(f"Total volume: {df['size'].sum():.4f}")
print(f"Total notional: ${df['notional'].sum():,.2f}")
print(f"Average price: {df['price'].mean():.2f}")
print(f"Price range: {df['price'].min():.2f} - {df['price'].max():.2f}")
print()

# By side
by_side = df.groupby("side").agg({
    "size": ["count", "sum", "mean"],
    "notional": "sum"
})
print("By Side:")
print(by_side)
print()

# VWAP
vwap = np.average(df["price"], weights=df["size"])
print(f"VWAP: {vwap:.2f}")

# Buy/sell imbalance
buy_volume = df[df["side"] == "buy"]["size"].sum()
sell_volume = df[df["side"] == "sell"]["size"].sum()
imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume)
print(f"Buy/Sell imbalance: {imbalance:.3f}")

OHLCV Aggregation

import pandas as pd
import numpy as np
from lux_dex import Client

client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=10000)

df = pd.DataFrame([
    {
        "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
        "price": t.price,
        "size": t.size
    }
    for t in trades
])

df.set_index("timestamp", inplace=True)

# Resample to 1-minute OHLCV
ohlcv = df.resample("1min").agg({
    "price": ["first", "max", "min", "last"],
    "size": "sum"
})

ohlcv.columns = ["open", "high", "low", "close", "volume"]
ohlcv = ohlcv.dropna()

print(ohlcv.tail(10))

# Calculate returns
ohlcv["returns"] = ohlcv["close"].pct_change()
print(f"\nVolatility (1min): {ohlcv['returns'].std() * 100:.4f}%")

Trade Flow Analysis

import pandas as pd
import numpy as np
from lux_dex import Client
from datetime import datetime, timedelta

client = Client("http://localhost:8080/rpc")

# Get last 24 hours of trades
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)

trades = client.get_trades(
    symbol="BTC-USD",
    start_time=int(start_time.timestamp() * 1000),
    end_time=int(end_time.timestamp() * 1000),
    limit=0  # All trades
)

df = pd.DataFrame([
    {
        "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
        "price": t.price,
        "size": t.size,
        "side": t.side,
        "notional": t.notional
    }
    for t in trades
])

df.set_index("timestamp", inplace=True)

# Add signed volume (positive for buys, negative for sells)
df["signed_volume"] = df.apply(
    lambda x: x["size"] if x["side"] == "buy" else -x["size"],
    axis=1
)

# Hourly analysis
hourly = df.resample("1h").agg({
    "price": "last",
    "size": "sum",
    "notional": "sum",
    "signed_volume": "sum"
})

hourly["net_flow"] = hourly["signed_volume"]
hourly["buy_pressure"] = (hourly["signed_volume"] / hourly["size"]).fillna(0)

print("=== Hourly Trade Flow ===")
print(hourly[["price", "size", "notional", "net_flow", "buy_pressure"]].tail(24))

# Find hours with strong imbalance
strong_buy = hourly[hourly["buy_pressure"] > 0.6]
strong_sell = hourly[hourly["buy_pressure"] < -0.6]

print(f"\nHours with strong buy pressure: {len(strong_buy)}")
print(f"Hours with strong sell pressure: {len(strong_sell)}")

Stream Trades

Synchronous Subscription

from lux_dex import Client
from lux_dex.types import Trade

client = Client(
    json_rpc_url="http://localhost:8080/rpc",
    ws_url="ws://localhost:8081"
)

def on_trade(trade: Trade) -> None:
    """Handle new trade."""
    emoji = "+" if trade.side == "buy" else "-"
    print(f"[{trade.datetime.strftime('%H:%M:%S.%f')[:-3]}] "
          f"{emoji} {trade.size:.4f} @ {trade.price:.2f}")

# Subscribe to trades
client.subscribe_trades("BTC-USD", callback=on_trade)

# Keep connection alive
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    client.unsubscribe_trades("BTC-USD")

Async Stream

import asyncio
from lux_dex import AsyncClient

async def stream_trades():
    async with AsyncClient(
        json_rpc_url="http://localhost:8080/rpc",
        ws_url="ws://localhost:8081"
    ) as client:
        trade_count = 0
        total_volume = 0.0

        async for trade in client.stream_trades("BTC-USD"):
            trade_count += 1
            total_volume += trade.size

            print(f"Trade #{trade_count}: {trade.side.upper()} "
                  f"{trade.size:.4f} @ {trade.price:.2f}")

            # Stop after 100 trades
            if trade_count >= 100:
                print(f"\nTotal volume: {total_volume:.4f}")
                break

asyncio.run(stream_trades())

Multiple Symbol Streams

import asyncio
from lux_dex import AsyncClient

async def stream_multiple():
    async with AsyncClient(
        json_rpc_url="http://localhost:8080/rpc",
        ws_url="ws://localhost:8081"
    ) as client:
        symbols = ["BTC-USD", "ETH-USD", "SOL-USD"]

        async def process_trades(symbol: str):
            async for trade in client.stream_trades(symbol):
                print(f"{symbol}: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")

        await asyncio.gather(*[
            process_trades(symbol) for symbol in symbols
        ])

asyncio.run(stream_multiple())

Real-time Aggregation

import asyncio
from collections import deque
from dataclasses import dataclass, field
from lux_dex import AsyncClient
from lux_dex.types import Trade

@dataclass
class TradeAggregator:
    """Aggregate trades in real-time."""
    window_seconds: int = 60
    trades: deque = field(default_factory=deque)

    def add_trade(self, trade: Trade) -> None:
        """Add trade and remove old ones."""
        self.trades.append(trade)
        cutoff = trade.timestamp - (self.window_seconds * 1000)
        while self.trades and self.trades[0].timestamp < cutoff:
            self.trades.popleft()

    @property
    def volume(self) -> float:
        return sum(t.size for t in self.trades)

    @property
    def vwap(self) -> float:
        if not self.trades:
            return 0.0
        total_notional = sum(t.price * t.size for t in self.trades)
        total_volume = sum(t.size for t in self.trades)
        return total_notional / total_volume if total_volume > 0 else 0.0

    @property
    def buy_volume(self) -> float:
        return sum(t.size for t in self.trades if t.side == "buy")

    @property
    def sell_volume(self) -> float:
        return sum(t.size for t in self.trades if t.side == "sell")

async def main():
    agg = TradeAggregator(window_seconds=60)

    async with AsyncClient(
        json_rpc_url="http://localhost:8080/rpc",
        ws_url="ws://localhost:8081"
    ) as client:
        async for trade in client.stream_trades("BTC-USD"):
            agg.add_trade(trade)

            print(f"60s window: Volume={agg.volume:.4f}, "
                  f"VWAP={agg.vwap:.2f}, "
                  f"Buy%={agg.buy_volume/agg.volume*100:.1f}%")

asyncio.run(main())

User Trades

from lux_dex import Client

client = Client(
    json_rpc_url="http://localhost:8080/rpc",
    api_key="your-api-key",
    api_secret="your-api-secret"
)

# Get user's trades
my_trades = client.get_user_trades(
    symbol="BTC-USD",
    limit=100
)

for trade in my_trades:
    print(f"{trade.datetime}: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")
    print(f"  Fee: {trade.fee} {trade.fee_currency}")

# Get trades for specific order
order_trades = client.get_trades_by_order("ord_123456")
print(f"Order had {len(order_trades)} fills")

Stream User Trades

import asyncio
from lux_dex import AsyncClient

async def stream_my_trades():
    async with AsyncClient(
        json_rpc_url="http://localhost:8080/rpc",
        ws_url="ws://localhost:8081",
        api_key="your-api-key",
        api_secret="your-api-secret"
    ) as client:
        async for trade in client.stream_user_trades():
            print(f"My trade: {trade.side} {trade.size:.4f} @ {trade.price:.2f}")
            print(f"  Order ID: {trade.taker_order_id}")
            print(f"  Fee: {trade.fee} {trade.fee_currency}")

asyncio.run(stream_my_trades())

Trade Tape Analysis

import pandas as pd
import numpy as np
from lux_dex import Client

client = Client("http://localhost:8080/rpc")
trades = client.get_trades("BTC-USD", limit=5000)

df = pd.DataFrame([
    {
        "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
        "price": t.price,
        "size": t.size,
        "side": t.side
    }
    for t in trades
])

# Classify trade sizes
def classify_size(size: float) -> str:
    if size < 0.1:
        return "small"
    elif size < 1.0:
        return "medium"
    elif size < 10.0:
        return "large"
    else:
        return "whale"

df["size_class"] = df["size"].apply(classify_size)

# Analyze by size class
by_class = df.groupby(["side", "size_class"]).agg({
    "size": ["count", "sum", "mean"],
    "price": "mean"
}).reset_index()

print("=== Trade Tape Analysis ===")
print(by_class)

# Large trade detection
large_trades = df[df["size"] >= 1.0]
print(f"\nLarge trades (>=1.0): {len(large_trades)}")
print(f"Large trade volume: {large_trades['size'].sum():.4f}")

# Calculate trade arrival rate
df["inter_arrival"] = df["timestamp"].diff().dt.total_seconds() * 1000
print(f"\nAverage inter-arrival time: {df['inter_arrival'].mean():.2f}ms")
print(f"Median inter-arrival time: {df['inter_arrival'].median():.2f}ms")

Jupyter Notebook Examples

# Cell 1: Setup
import nest_asyncio
nest_asyncio.apply()

import pandas as pd
import numpy as np
from lux_dex import AsyncClient

# Cell 2: Fetch trades
async def fetch_trades(symbol: str, limit: int = 1000):
    async with AsyncClient("http://localhost:8080/rpc") as client:
        trades = await client.get_trades(symbol, limit=limit)
        return pd.DataFrame([
            {
                "timestamp": pd.Timestamp(t.timestamp, unit="ms"),
                "price": t.price,
                "size": t.size,
                "side": t.side
            }
            for t in trades
        ])

df = await fetch_trades("BTC-USD", 5000)
df.set_index("timestamp", inplace=True)
display(df.head(10))

# Cell 3: Basic analysis
print(f"Trades: {len(df)}")
print(f"Time span: {df.index[0]} to {df.index[-1]}")
print(f"Volume: {df['size'].sum():.4f}")
print(f"Price range: {df['price'].min():.2f} - {df['price'].max():.2f}")

# Cell 4: Visualization
import matplotlib.pyplot as plt

fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# Price over time
axes[0].plot(df.index, df["price"], linewidth=0.5)
axes[0].set_ylabel("Price")
axes[0].set_title("BTC-USD Trade Tape")

# Volume bars colored by side
buys = df[df["side"] == "buy"]
sells = df[df["side"] == "sell"]
axes[1].bar(buys.index, buys["size"], color="green", alpha=0.7, width=0.0001)
axes[1].bar(sells.index, -sells["size"], color="red", alpha=0.7, width=0.0001)
axes[1].set_ylabel("Volume")

# Cumulative volume
df["cum_volume"] = df["size"].cumsum()
df["signed_cum_volume"] = df.apply(
    lambda x: x["size"] if x["side"] == "buy" else -x["size"],
    axis=1
).cumsum()

axes[2].plot(df.index, df["signed_cum_volume"], linewidth=1)
axes[2].axhline(y=0, color="black", linestyle="--", linewidth=0.5)
axes[2].set_ylabel("Net Volume")
axes[2].set_xlabel("Time")

plt.tight_layout()
plt.show()

# Cell 5: Stream trades in notebook
async def stream_and_display(duration_seconds: int = 30):
    trades = []
    async with AsyncClient(
        json_rpc_url="http://localhost:8080/rpc",
        ws_url="ws://localhost:8081"
    ) as client:
        import time
        start = time.time()
        async for trade in client.stream_trades("BTC-USD"):
            trades.append({
                "time": pd.Timestamp.now(),
                "price": trade.price,
                "size": trade.size,
                "side": trade.side
            })
            if time.time() - start > duration_seconds:
                break
    return pd.DataFrame(trades)

live_df = await stream_and_display(30)
display(live_df)
print(f"Captured {len(live_df)} trades in 30 seconds")

Error Handling

from lux_dex import Client
from lux_dex.exceptions import (
    SymbolNotFoundError,
    ConnectionError,
    RateLimitError
)

client = Client("http://localhost:8080/rpc")

try:
    trades = client.get_trades("INVALID-SYMBOL")
except SymbolNotFoundError:
    print("Symbol not found")
except ConnectionError as e:
    print(f"Connection failed: {e}")
except RateLimitError as e:
    import time
    print(f"Rate limited. Waiting {e.retry_after}s...")
    time.sleep(e.retry_after)
    trades = client.get_trades("BTC-USD")

Next Steps