Rust SDK

Trade Streaming

Real-time trade streaming with tokio, efficient filtering, aggregation, and VWAP calculations

Trade Streaming

The Rust SDK provides high-performance trade streaming with tokio async streams, efficient filtering, and real-time analytics.

Get Recent Trades

Basic Query

use lux_dex_sdk::{Client, Trade, Result};

async fn get_trades(client: &Client) -> Result<()> {
    // Get last 100 trades
    let trades = client.get_trades("BTC-USD", 100).await?;

    for trade in &trades {
        println!(
            "{} | {} {} @ {} | {}",
            trade.timestamp.format("%H:%M:%S%.3f"),
            trade.side,
            trade.size,
            trade.price,
            trade.id
        );
    }

    Ok(())
}

With Time Range

use chrono::{Duration, Utc};

let start = Utc::now() - Duration::hours(1);
let end = Utc::now();

let trades = client
    .get_trades_range("BTC-USD", start, end, 1000)
    .await?;

println!("Got {} trades in last hour", trades.len());

Trade Types

use chrono::{DateTime, Utc};

/// Executed trade
#[derive(Debug, Clone)]
pub struct Trade {
    pub id: TradeId,
    pub symbol: String,
    pub price: f64,
    pub size: f64,
    pub side: Side,           // Taker side
    pub timestamp: DateTime<Utc>,
    pub maker_order_id: OrderId,
    pub taker_order_id: OrderId,
}

/// Trade with additional fields
#[derive(Debug, Clone)]
pub struct DetailedTrade {
    pub trade: Trade,
    pub maker_fee: f64,
    pub taker_fee: f64,
    pub liquidity: Liquidity,  // Maker or Taker
    pub sequence: u64,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Liquidity {
    Maker,
    Taker,
}

WebSocket Streaming

Subscribe to Trades

use lux_dex_sdk::{Client, Trade};
use tokio_stream::StreamExt;

async fn stream_trades(client: &Client) -> Result<()> {
    let mut stream = client.subscribe_trades("BTC-USD").await?;

    while let Some(trade) = stream.next().await {
        let trade = trade?;
        println!(
            "{}: {} {} @ {}",
            trade.side,
            trade.size,
            trade.symbol,
            trade.price
        );
    }

    Ok(())
}

Subscribe to Multiple Symbols

use futures::stream::select_all;

async fn stream_multiple(client: &Client) -> Result<()> {
    let symbols = vec!["BTC-USD", "ETH-USD", "SOL-USD"];

    let streams: Vec<_> = futures::future::try_join_all(
        symbols.iter().map(|s| client.subscribe_trades(s))
    ).await?;

    let mut combined = select_all(streams);

    while let Some(trade) = combined.next().await {
        let trade = trade?;
        println!("[{}] {} @ {}", trade.symbol, trade.size, trade.price);
    }

    Ok(())
}

Filtering Trades

By Size

use tokio_stream::StreamExt;

async fn large_trades(client: &Client, min_size: f64) -> Result<()> {
    let stream = client.subscribe_trades("BTC-USD").await?;

    let mut filtered = stream.filter(|result| {
        match result {
            Ok(trade) => trade.size >= min_size,
            Err(_) => true, // Let errors through
        }
    });

    while let Some(trade) = filtered.next().await {
        let trade = trade?;
        println!("Large trade: {} @ {}", trade.size, trade.price);
    }

    Ok(())
}

By Side

async fn buy_trades_only(client: &Client) -> Result<()> {
    let stream = client.subscribe_trades("BTC-USD").await?;

    let mut buys = stream.filter_map(|result| async {
        match result {
            Ok(trade) if trade.side == Side::Buy => Some(Ok(trade)),
            Ok(_) => None,
            Err(e) => Some(Err(e)),
        }
    });

    while let Some(trade) = buys.next().await {
        println!("Buy: {} @ {}", trade?.size, trade?.price);
    }

    Ok(())
}

Trade Aggregation

Time-Based Aggregation

use std::time::Duration;
use tokio::time::{interval, Interval};

/// Aggregate trades into time buckets
pub struct TradeAggregator {
    bucket_duration: Duration,
    current_bucket: TradeBucket,
    last_flush: Instant,
}

#[derive(Default, Clone)]
pub struct TradeBucket {
    pub open: f64,
    pub high: f64,
    pub low: f64,
    pub close: f64,
    pub volume: f64,
    pub buy_volume: f64,
    pub sell_volume: f64,
    pub trade_count: u32,
    pub vwap: f64,
    total_value: f64,  // For VWAP calculation
}

impl TradeAggregator {
    pub fn new(bucket_duration: Duration) -> Self {
        Self {
            bucket_duration,
            current_bucket: TradeBucket::default(),
            last_flush: Instant::now(),
        }
    }

    pub fn add(&mut self, trade: &Trade) -> Option<TradeBucket> {
        // Update current bucket
        if self.current_bucket.trade_count == 0 {
            self.current_bucket.open = trade.price;
            self.current_bucket.high = trade.price;
            self.current_bucket.low = trade.price;
        } else {
            self.current_bucket.high = self.current_bucket.high.max(trade.price);
            self.current_bucket.low = self.current_bucket.low.min(trade.price);
        }

        self.current_bucket.close = trade.price;
        self.current_bucket.volume += trade.size;
        self.current_bucket.total_value += trade.size * trade.price;
        self.current_bucket.trade_count += 1;

        match trade.side {
            Side::Buy => self.current_bucket.buy_volume += trade.size,
            Side::Sell => self.current_bucket.sell_volume += trade.size,
        }

        // Check if bucket should be flushed
        if self.last_flush.elapsed() >= self.bucket_duration {
            self.flush()
        } else {
            None
        }
    }

    pub fn flush(&mut self) -> Option<TradeBucket> {
        if self.current_bucket.trade_count == 0 {
            return None;
        }

        // Calculate VWAP
        self.current_bucket.vwap =
            self.current_bucket.total_value / self.current_bucket.volume;

        let bucket = std::mem::take(&mut self.current_bucket);
        self.last_flush = Instant::now();
        Some(bucket)
    }
}

Using the Aggregator

async fn aggregate_trades(client: &Client) -> Result<()> {
    let mut stream = client.subscribe_trades("BTC-USD").await?;
    let mut aggregator = TradeAggregator::new(Duration::from_secs(60));

    let mut flush_interval = tokio::time::interval(Duration::from_secs(60));

    loop {
        tokio::select! {
            Some(trade) = stream.next() => {
                let trade = trade?;
                if let Some(bucket) = aggregator.add(&trade) {
                    print_bucket(&bucket);
                }
            }
            _ = flush_interval.tick() => {
                if let Some(bucket) = aggregator.flush() {
                    print_bucket(&bucket);
                }
            }
        }
    }
}

fn print_bucket(bucket: &TradeBucket) {
    println!(
        "OHLC: {:.2}/{:.2}/{:.2}/{:.2} | Vol: {:.4} | VWAP: {:.2} | Trades: {}",
        bucket.open,
        bucket.high,
        bucket.low,
        bucket.close,
        bucket.volume,
        bucket.vwap,
        bucket.trade_count
    );
}

VWAP Calculation

Rolling VWAP

/// Rolling VWAP calculator
pub struct RollingVwap {
    window: VecDeque<(f64, f64)>,  // (price, size)
    window_duration: Duration,
    total_value: f64,
    total_volume: f64,
}

impl RollingVwap {
    pub fn new(window_duration: Duration) -> Self {
        Self {
            window: VecDeque::new(),
            window_duration,
            total_value: 0.0,
            total_volume: 0.0,
        }
    }

    pub fn add(&mut self, trade: &Trade) {
        let value = trade.price * trade.size;
        self.window.push_back((trade.price, trade.size));
        self.total_value += value;
        self.total_volume += trade.size;

        // Remove old trades (simplified - real impl uses timestamps)
        while self.window.len() > 10000 {
            if let Some((price, size)) = self.window.pop_front() {
                self.total_value -= price * size;
                self.total_volume -= size;
            }
        }
    }

    pub fn vwap(&self) -> Option<f64> {
        if self.total_volume > 0.0 {
            Some(self.total_value / self.total_volume)
        } else {
            None
        }
    }
}

Time-Weighted VWAP

use chrono::{DateTime, Duration, Utc};

/// Time-weighted VWAP with exact window duration
pub struct TimeWeightedVwap {
    trades: VecDeque<Trade>,
    window: Duration,
}

impl TimeWeightedVwap {
    pub fn new(window: Duration) -> Self {
        Self {
            trades: VecDeque::new(),
            window,
        }
    }

    pub fn add(&mut self, trade: Trade) {
        self.trades.push_back(trade);
        self.prune();
    }

    fn prune(&mut self) {
        let cutoff = Utc::now() - self.window;
        while let Some(front) = self.trades.front() {
            if front.timestamp < cutoff {
                self.trades.pop_front();
            } else {
                break;
            }
        }
    }

    pub fn vwap(&self) -> Option<f64> {
        if self.trades.is_empty() {
            return None;
        }

        let (total_value, total_volume) = self.trades.iter().fold(
            (0.0, 0.0),
            |(value, vol), t| (value + t.price * t.size, vol + t.size)
        );

        Some(total_value / total_volume)
    }

    pub fn volume(&self) -> f64 {
        self.trades.iter().map(|t| t.size).sum()
    }
}

Trade Analytics

Trade Flow Analysis

/// Analyze trade flow for market sentiment
pub struct TradeFlowAnalyzer {
    window: VecDeque<Trade>,
    window_size: usize,
}

impl TradeFlowAnalyzer {
    pub fn new(window_size: usize) -> Self {
        Self {
            window: VecDeque::with_capacity(window_size),
            window_size,
        }
    }

    pub fn add(&mut self, trade: Trade) {
        if self.window.len() >= self.window_size {
            self.window.pop_front();
        }
        self.window.push_back(trade);
    }

    /// Buy/sell volume ratio
    pub fn volume_ratio(&self) -> f64 {
        let (buy_vol, sell_vol) = self.window.iter().fold(
            (0.0, 0.0),
            |(buy, sell), t| match t.side {
                Side::Buy => (buy + t.size, sell),
                Side::Sell => (buy, sell + t.size),
            }
        );

        if sell_vol > 0.0 {
            buy_vol / sell_vol
        } else {
            f64::INFINITY
        }
    }

    /// Order flow imbalance (-1 to 1)
    pub fn imbalance(&self) -> f64 {
        let (buy_vol, sell_vol) = self.window.iter().fold(
            (0.0, 0.0),
            |(buy, sell), t| match t.side {
                Side::Buy => (buy + t.size, sell),
                Side::Sell => (buy, sell + t.size),
            }
        );

        let total = buy_vol + sell_vol;
        if total > 0.0 {
            (buy_vol - sell_vol) / total
        } else {
            0.0
        }
    }

    /// Average trade size
    pub fn avg_trade_size(&self) -> f64 {
        if self.window.is_empty() {
            return 0.0;
        }
        let total: f64 = self.window.iter().map(|t| t.size).sum();
        total / self.window.len() as f64
    }

    /// Trade rate (trades per second)
    pub fn trade_rate(&self) -> f64 {
        if self.window.len() < 2 {
            return 0.0;
        }

        let first = self.window.front().unwrap();
        let last = self.window.back().unwrap();
        let duration = (last.timestamp - first.timestamp).num_milliseconds() as f64 / 1000.0;

        if duration > 0.0 {
            self.window.len() as f64 / duration
        } else {
            0.0
        }
    }
}

Zero-Copy Trade Parsing

use lux_dex_sdk::zerocopy::TradeRef;

/// Zero-copy trade reference
#[repr(C)]
pub struct TradeRef<'a> {
    pub id: u64,
    pub symbol: &'a str,
    pub price: f64,
    pub size: f64,
    pub side: u8,      // 0 = Buy, 1 = Sell
    pub timestamp: i64,
}

impl<'a> TradeRef<'a> {
    /// Parse from binary buffer without allocation
    pub fn from_bytes(data: &'a [u8]) -> Result<Self> {
        if data.len() < 48 {
            return Err(Error::InsufficientData);
        }

        // Parse fields directly from buffer
        let id = u64::from_le_bytes(data[0..8].try_into()?);
        let price = f64::from_le_bytes(data[8..16].try_into()?);
        let size = f64::from_le_bytes(data[16..24].try_into()?);
        let side = data[24];
        let timestamp = i64::from_le_bytes(data[25..33].try_into()?);
        let symbol_len = data[33] as usize;
        let symbol = std::str::from_utf8(&data[34..34 + symbol_len])?;

        Ok(Self {
            id,
            symbol,
            price,
            size,
            side,
            timestamp,
        })
    }

    pub fn side(&self) -> Side {
        if self.side == 0 { Side::Buy } else { Side::Sell }
    }

    pub fn to_owned(&self) -> Trade {
        Trade {
            id: TradeId(self.id),
            symbol: self.symbol.to_string(),
            price: self.price,
            size: self.size,
            side: self.side(),
            timestamp: DateTime::from_timestamp(self.timestamp / 1000, 0).unwrap(),
            maker_order_id: OrderId::default(),
            taker_order_id: OrderId::default(),
        }
    }
}

Performance Benchmarks

use criterion::{criterion_group, criterion_main, Criterion};

fn trade_benchmarks(c: &mut Criterion) {
    let trades: Vec<Trade> = generate_test_trades(10000);

    c.bench_function("vwap_10k_trades", |b| {
        b.iter(|| {
            let mut vwap = RollingVwap::new(Duration::from_secs(60));
            for trade in &trades {
                vwap.add(trade);
            }
            vwap.vwap()
        })
    });

    c.bench_function("aggregator_10k_trades", |b| {
        b.iter(|| {
            let mut agg = TradeAggregator::new(Duration::from_secs(60));
            for trade in &trades {
                agg.add(trade);
            }
        })
    });

    c.bench_function("flow_analyzer_10k_trades", |b| {
        b.iter(|| {
            let mut analyzer = TradeFlowAnalyzer::new(1000);
            for trade in trades.iter().cloned() {
                analyzer.add(trade);
            }
            analyzer.imbalance()
        })
    });

    let data = serialize_trade(&trades[0]);
    c.bench_function("zerocopy_parse", |b| {
        b.iter(|| TradeRef::from_bytes(&data))
    });
}

criterion_group!(benches, trade_benchmarks);
criterion_main!(benches);

Expected results:

vwap_10k_trades         time:   [234 us 241 us 248 us]
aggregator_10k_trades   time:   [89 us 92 us 95 us]
flow_analyzer_10k_trades time:  [156 us 161 us 166 us]
zerocopy_parse          time:   [12 ns 13 ns 14 ns]

Complete Example

use lux_dex_sdk::{Client, Trade, Side, Result};
use tokio_stream::StreamExt;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let client = Client::builder()
        .websocket_url("ws://localhost:8081")
        .build()
        .await?;

    // Initialize analytics
    let mut vwap = RollingVwap::new(Duration::from_secs(300)); // 5-min VWAP
    let mut flow = TradeFlowAnalyzer::new(100);
    let mut aggregator = TradeAggregator::new(Duration::from_secs(60));

    let mut stream = client.subscribe_trades("BTC-USD").await?;

    while let Some(trade) = stream.next().await {
        let trade = trade?;

        // Update analytics
        vwap.add(&trade);
        flow.add(trade.clone());

        if let Some(bucket) = aggregator.add(&trade) {
            // Minute candle complete
            println!(
                "1m | O:{:.2} H:{:.2} L:{:.2} C:{:.2} V:{:.4}",
                bucket.open, bucket.high, bucket.low, bucket.close, bucket.volume
            );
        }

        // Real-time metrics
        let current_vwap = vwap.vwap().unwrap_or(0.0);
        let imbalance = flow.imbalance();
        let trade_rate = flow.trade_rate();

        // Trading signals
        if trade.price > current_vwap * 1.001 && imbalance > 0.3 {
            println!("SIGNAL: Price above VWAP with strong buy flow");
        }

        if trade_rate > 10.0 && trade.size > flow.avg_trade_size() * 5.0 {
            println!("ALERT: Large trade in high activity period");
        }
    }

    Ok(())
}