Rust SDK
Trade Streaming
Real-time trade streaming with tokio, efficient filtering, aggregation, and VWAP calculations
Trade Streaming
The Rust SDK provides high-performance trade streaming with tokio async streams, efficient filtering, and real-time analytics.
Get Recent Trades
Basic Query
use lux_dex_sdk::{Client, Trade, Result};
async fn get_trades(client: &Client) -> Result<()> {
// Get last 100 trades
let trades = client.get_trades("BTC-USD", 100).await?;
for trade in &trades {
println!(
"{} | {} {} @ {} | {}",
trade.timestamp.format("%H:%M:%S%.3f"),
trade.side,
trade.size,
trade.price,
trade.id
);
}
Ok(())
}With Time Range
use chrono::{Duration, Utc};
let start = Utc::now() - Duration::hours(1);
let end = Utc::now();
let trades = client
.get_trades_range("BTC-USD", start, end, 1000)
.await?;
println!("Got {} trades in last hour", trades.len());Trade Types
use chrono::{DateTime, Utc};
/// Executed trade
#[derive(Debug, Clone)]
pub struct Trade {
pub id: TradeId,
pub symbol: String,
pub price: f64,
pub size: f64,
pub side: Side, // Taker side
pub timestamp: DateTime<Utc>,
pub maker_order_id: OrderId,
pub taker_order_id: OrderId,
}
/// Trade with additional fields
#[derive(Debug, Clone)]
pub struct DetailedTrade {
pub trade: Trade,
pub maker_fee: f64,
pub taker_fee: f64,
pub liquidity: Liquidity, // Maker or Taker
pub sequence: u64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Liquidity {
Maker,
Taker,
}WebSocket Streaming
Subscribe to Trades
use lux_dex_sdk::{Client, Trade};
use tokio_stream::StreamExt;
async fn stream_trades(client: &Client) -> Result<()> {
let mut stream = client.subscribe_trades("BTC-USD").await?;
while let Some(trade) = stream.next().await {
let trade = trade?;
println!(
"{}: {} {} @ {}",
trade.side,
trade.size,
trade.symbol,
trade.price
);
}
Ok(())
}Subscribe to Multiple Symbols
use futures::stream::select_all;
async fn stream_multiple(client: &Client) -> Result<()> {
let symbols = vec!["BTC-USD", "ETH-USD", "SOL-USD"];
let streams: Vec<_> = futures::future::try_join_all(
symbols.iter().map(|s| client.subscribe_trades(s))
).await?;
let mut combined = select_all(streams);
while let Some(trade) = combined.next().await {
let trade = trade?;
println!("[{}] {} @ {}", trade.symbol, trade.size, trade.price);
}
Ok(())
}Filtering Trades
By Size
use tokio_stream::StreamExt;
async fn large_trades(client: &Client, min_size: f64) -> Result<()> {
let stream = client.subscribe_trades("BTC-USD").await?;
let mut filtered = stream.filter(|result| {
match result {
Ok(trade) => trade.size >= min_size,
Err(_) => true, // Let errors through
}
});
while let Some(trade) = filtered.next().await {
let trade = trade?;
println!("Large trade: {} @ {}", trade.size, trade.price);
}
Ok(())
}By Side
async fn buy_trades_only(client: &Client) -> Result<()> {
let stream = client.subscribe_trades("BTC-USD").await?;
let mut buys = stream.filter_map(|result| async {
match result {
Ok(trade) if trade.side == Side::Buy => Some(Ok(trade)),
Ok(_) => None,
Err(e) => Some(Err(e)),
}
});
while let Some(trade) = buys.next().await {
println!("Buy: {} @ {}", trade?.size, trade?.price);
}
Ok(())
}Trade Aggregation
Time-Based Aggregation
use std::time::Duration;
use tokio::time::{interval, Interval};
/// Aggregate trades into time buckets
pub struct TradeAggregator {
bucket_duration: Duration,
current_bucket: TradeBucket,
last_flush: Instant,
}
#[derive(Default, Clone)]
pub struct TradeBucket {
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
pub buy_volume: f64,
pub sell_volume: f64,
pub trade_count: u32,
pub vwap: f64,
total_value: f64, // For VWAP calculation
}
impl TradeAggregator {
pub fn new(bucket_duration: Duration) -> Self {
Self {
bucket_duration,
current_bucket: TradeBucket::default(),
last_flush: Instant::now(),
}
}
pub fn add(&mut self, trade: &Trade) -> Option<TradeBucket> {
// Update current bucket
if self.current_bucket.trade_count == 0 {
self.current_bucket.open = trade.price;
self.current_bucket.high = trade.price;
self.current_bucket.low = trade.price;
} else {
self.current_bucket.high = self.current_bucket.high.max(trade.price);
self.current_bucket.low = self.current_bucket.low.min(trade.price);
}
self.current_bucket.close = trade.price;
self.current_bucket.volume += trade.size;
self.current_bucket.total_value += trade.size * trade.price;
self.current_bucket.trade_count += 1;
match trade.side {
Side::Buy => self.current_bucket.buy_volume += trade.size,
Side::Sell => self.current_bucket.sell_volume += trade.size,
}
// Check if bucket should be flushed
if self.last_flush.elapsed() >= self.bucket_duration {
self.flush()
} else {
None
}
}
pub fn flush(&mut self) -> Option<TradeBucket> {
if self.current_bucket.trade_count == 0 {
return None;
}
// Calculate VWAP
self.current_bucket.vwap =
self.current_bucket.total_value / self.current_bucket.volume;
let bucket = std::mem::take(&mut self.current_bucket);
self.last_flush = Instant::now();
Some(bucket)
}
}Using the Aggregator
async fn aggregate_trades(client: &Client) -> Result<()> {
let mut stream = client.subscribe_trades("BTC-USD").await?;
let mut aggregator = TradeAggregator::new(Duration::from_secs(60));
let mut flush_interval = tokio::time::interval(Duration::from_secs(60));
loop {
tokio::select! {
Some(trade) = stream.next() => {
let trade = trade?;
if let Some(bucket) = aggregator.add(&trade) {
print_bucket(&bucket);
}
}
_ = flush_interval.tick() => {
if let Some(bucket) = aggregator.flush() {
print_bucket(&bucket);
}
}
}
}
}
fn print_bucket(bucket: &TradeBucket) {
println!(
"OHLC: {:.2}/{:.2}/{:.2}/{:.2} | Vol: {:.4} | VWAP: {:.2} | Trades: {}",
bucket.open,
bucket.high,
bucket.low,
bucket.close,
bucket.volume,
bucket.vwap,
bucket.trade_count
);
}VWAP Calculation
Rolling VWAP
/// Rolling VWAP calculator
pub struct RollingVwap {
window: VecDeque<(f64, f64)>, // (price, size)
window_duration: Duration,
total_value: f64,
total_volume: f64,
}
impl RollingVwap {
pub fn new(window_duration: Duration) -> Self {
Self {
window: VecDeque::new(),
window_duration,
total_value: 0.0,
total_volume: 0.0,
}
}
pub fn add(&mut self, trade: &Trade) {
let value = trade.price * trade.size;
self.window.push_back((trade.price, trade.size));
self.total_value += value;
self.total_volume += trade.size;
// Remove old trades (simplified - real impl uses timestamps)
while self.window.len() > 10000 {
if let Some((price, size)) = self.window.pop_front() {
self.total_value -= price * size;
self.total_volume -= size;
}
}
}
pub fn vwap(&self) -> Option<f64> {
if self.total_volume > 0.0 {
Some(self.total_value / self.total_volume)
} else {
None
}
}
}Time-Weighted VWAP
use chrono::{DateTime, Duration, Utc};
/// Time-weighted VWAP with exact window duration
pub struct TimeWeightedVwap {
trades: VecDeque<Trade>,
window: Duration,
}
impl TimeWeightedVwap {
pub fn new(window: Duration) -> Self {
Self {
trades: VecDeque::new(),
window,
}
}
pub fn add(&mut self, trade: Trade) {
self.trades.push_back(trade);
self.prune();
}
fn prune(&mut self) {
let cutoff = Utc::now() - self.window;
while let Some(front) = self.trades.front() {
if front.timestamp < cutoff {
self.trades.pop_front();
} else {
break;
}
}
}
pub fn vwap(&self) -> Option<f64> {
if self.trades.is_empty() {
return None;
}
let (total_value, total_volume) = self.trades.iter().fold(
(0.0, 0.0),
|(value, vol), t| (value + t.price * t.size, vol + t.size)
);
Some(total_value / total_volume)
}
pub fn volume(&self) -> f64 {
self.trades.iter().map(|t| t.size).sum()
}
}Trade Analytics
Trade Flow Analysis
/// Analyze trade flow for market sentiment
pub struct TradeFlowAnalyzer {
window: VecDeque<Trade>,
window_size: usize,
}
impl TradeFlowAnalyzer {
pub fn new(window_size: usize) -> Self {
Self {
window: VecDeque::with_capacity(window_size),
window_size,
}
}
pub fn add(&mut self, trade: Trade) {
if self.window.len() >= self.window_size {
self.window.pop_front();
}
self.window.push_back(trade);
}
/// Buy/sell volume ratio
pub fn volume_ratio(&self) -> f64 {
let (buy_vol, sell_vol) = self.window.iter().fold(
(0.0, 0.0),
|(buy, sell), t| match t.side {
Side::Buy => (buy + t.size, sell),
Side::Sell => (buy, sell + t.size),
}
);
if sell_vol > 0.0 {
buy_vol / sell_vol
} else {
f64::INFINITY
}
}
/// Order flow imbalance (-1 to 1)
pub fn imbalance(&self) -> f64 {
let (buy_vol, sell_vol) = self.window.iter().fold(
(0.0, 0.0),
|(buy, sell), t| match t.side {
Side::Buy => (buy + t.size, sell),
Side::Sell => (buy, sell + t.size),
}
);
let total = buy_vol + sell_vol;
if total > 0.0 {
(buy_vol - sell_vol) / total
} else {
0.0
}
}
/// Average trade size
pub fn avg_trade_size(&self) -> f64 {
if self.window.is_empty() {
return 0.0;
}
let total: f64 = self.window.iter().map(|t| t.size).sum();
total / self.window.len() as f64
}
/// Trade rate (trades per second)
pub fn trade_rate(&self) -> f64 {
if self.window.len() < 2 {
return 0.0;
}
let first = self.window.front().unwrap();
let last = self.window.back().unwrap();
let duration = (last.timestamp - first.timestamp).num_milliseconds() as f64 / 1000.0;
if duration > 0.0 {
self.window.len() as f64 / duration
} else {
0.0
}
}
}Zero-Copy Trade Parsing
use lux_dex_sdk::zerocopy::TradeRef;
/// Zero-copy trade reference
#[repr(C)]
pub struct TradeRef<'a> {
pub id: u64,
pub symbol: &'a str,
pub price: f64,
pub size: f64,
pub side: u8, // 0 = Buy, 1 = Sell
pub timestamp: i64,
}
impl<'a> TradeRef<'a> {
/// Parse from binary buffer without allocation
pub fn from_bytes(data: &'a [u8]) -> Result<Self> {
if data.len() < 48 {
return Err(Error::InsufficientData);
}
// Parse fields directly from buffer
let id = u64::from_le_bytes(data[0..8].try_into()?);
let price = f64::from_le_bytes(data[8..16].try_into()?);
let size = f64::from_le_bytes(data[16..24].try_into()?);
let side = data[24];
let timestamp = i64::from_le_bytes(data[25..33].try_into()?);
let symbol_len = data[33] as usize;
let symbol = std::str::from_utf8(&data[34..34 + symbol_len])?;
Ok(Self {
id,
symbol,
price,
size,
side,
timestamp,
})
}
pub fn side(&self) -> Side {
if self.side == 0 { Side::Buy } else { Side::Sell }
}
pub fn to_owned(&self) -> Trade {
Trade {
id: TradeId(self.id),
symbol: self.symbol.to_string(),
price: self.price,
size: self.size,
side: self.side(),
timestamp: DateTime::from_timestamp(self.timestamp / 1000, 0).unwrap(),
maker_order_id: OrderId::default(),
taker_order_id: OrderId::default(),
}
}
}Performance Benchmarks
use criterion::{criterion_group, criterion_main, Criterion};
fn trade_benchmarks(c: &mut Criterion) {
let trades: Vec<Trade> = generate_test_trades(10000);
c.bench_function("vwap_10k_trades", |b| {
b.iter(|| {
let mut vwap = RollingVwap::new(Duration::from_secs(60));
for trade in &trades {
vwap.add(trade);
}
vwap.vwap()
})
});
c.bench_function("aggregator_10k_trades", |b| {
b.iter(|| {
let mut agg = TradeAggregator::new(Duration::from_secs(60));
for trade in &trades {
agg.add(trade);
}
})
});
c.bench_function("flow_analyzer_10k_trades", |b| {
b.iter(|| {
let mut analyzer = TradeFlowAnalyzer::new(1000);
for trade in trades.iter().cloned() {
analyzer.add(trade);
}
analyzer.imbalance()
})
});
let data = serialize_trade(&trades[0]);
c.bench_function("zerocopy_parse", |b| {
b.iter(|| TradeRef::from_bytes(&data))
});
}
criterion_group!(benches, trade_benchmarks);
criterion_main!(benches);Expected results:
vwap_10k_trades time: [234 us 241 us 248 us]
aggregator_10k_trades time: [89 us 92 us 95 us]
flow_analyzer_10k_trades time: [156 us 161 us 166 us]
zerocopy_parse time: [12 ns 13 ns 14 ns]Complete Example
use lux_dex_sdk::{Client, Trade, Side, Result};
use tokio_stream::StreamExt;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::builder()
.websocket_url("ws://localhost:8081")
.build()
.await?;
// Initialize analytics
let mut vwap = RollingVwap::new(Duration::from_secs(300)); // 5-min VWAP
let mut flow = TradeFlowAnalyzer::new(100);
let mut aggregator = TradeAggregator::new(Duration::from_secs(60));
let mut stream = client.subscribe_trades("BTC-USD").await?;
while let Some(trade) = stream.next().await {
let trade = trade?;
// Update analytics
vwap.add(&trade);
flow.add(trade.clone());
if let Some(bucket) = aggregator.add(&trade) {
// Minute candle complete
println!(
"1m | O:{:.2} H:{:.2} L:{:.2} C:{:.2} V:{:.4}",
bucket.open, bucket.high, bucket.low, bucket.close, bucket.volume
);
}
// Real-time metrics
let current_vwap = vwap.vwap().unwrap_or(0.0);
let imbalance = flow.imbalance();
let trade_rate = flow.trade_rate();
// Trading signals
if trade.price > current_vwap * 1.001 && imbalance > 0.3 {
println!("SIGNAL: Price above VWAP with strong buy flow");
}
if trade_rate > 10.0 && trade.size > flow.avg_trade_size() * 5.0 {
println!("ALERT: Large trade in high activity period");
}
}
Ok(())
}