Go SDK

Trades

Trade history, user trades, and WebSocket streaming with the Go SDK

Trades

The Go SDK provides comprehensive trade data access including historical trades, user-specific trades, and real-time streaming.

Trade Structure

type Trade struct {
    TradeID     uint64    `json:"tradeId"`     // Unique trade identifier
    Symbol      string    `json:"symbol"`       // Trading pair
    Price       float64   `json:"price"`        // Execution price
    Size        float64   `json:"size"`         // Trade size
    Side        OrderSide `json:"side"`         // Buy or Sell (taker side)
    BuyOrderID  uint64    `json:"buyOrderId"`   // Buy order ID
    SellOrderID uint64    `json:"sellOrderId"`  // Sell order ID
    BuyerID     string    `json:"buyerId"`      // Buyer account ID
    SellerID    string    `json:"sellerId"`     // Seller account ID
    Timestamp   int64     `json:"timestamp"`    // Unix timestamp (nanoseconds)
}

Fetching Trade History

Recent Trades

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

func main() {
    c, err := client.NewClient(
        client.WithGRPCURL("localhost:50051"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := c.ConnectGRPC(ctx); err != nil {
        log.Fatal("Failed to connect:", err)
    }

    // Get last 50 trades
    trades, err := c.GetTrades(ctx, "BTC-USD", 50)
    if err != nil {
        log.Fatal("Failed to get trades:", err)
    }

    fmt.Printf("Recent Trades for BTC-USD\n")
    fmt.Println("=========================")

    for _, trade := range trades {
        side := "BUY "
        if trade.Side == client.OrderSideSell {
            side = "SELL"
        }

        timestamp := time.Unix(0, trade.Timestamp)
        fmt.Printf("[%s] %s %.8f @ %.2f (Total: $%.2f)\n",
            timestamp.Format("15:04:05.000"),
            side,
            trade.Size,
            trade.Price,
            trade.TotalValue())
    }
}

Trade Statistics

package main

import (
    "context"
    "fmt"
    "log"
    "math"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

type TradeStats struct {
    Count      int
    TotalVolume float64
    TotalValue  float64
    BuyVolume   float64
    SellVolume  float64
    HighPrice   float64
    LowPrice    float64
    AvgPrice    float64
    VWAP        float64
}

func CalculateStats(trades []*client.Trade) *TradeStats {
    if len(trades) == 0 {
        return &TradeStats{}
    }

    stats := &TradeStats{
        Count:    len(trades),
        HighPrice: trades[0].Price,
        LowPrice:  trades[0].Price,
    }

    for _, trade := range trades {
        stats.TotalVolume += trade.Size
        stats.TotalValue += trade.TotalValue()

        if trade.Side == client.OrderSideBuy {
            stats.BuyVolume += trade.Size
        } else {
            stats.SellVolume += trade.Size
        }

        if trade.Price > stats.HighPrice {
            stats.HighPrice = trade.Price
        }
        if trade.Price < stats.LowPrice {
            stats.LowPrice = trade.Price
        }
    }

    stats.AvgPrice = stats.TotalValue / stats.TotalVolume
    stats.VWAP = stats.TotalValue / stats.TotalVolume

    return stats
}

func main() {
    c, err := client.NewClient(
        client.WithGRPCURL("localhost:50051"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := c.ConnectGRPC(ctx); err != nil {
        log.Fatal("Failed to connect:", err)
    }

    trades, err := c.GetTrades(ctx, "BTC-USD", 1000)
    if err != nil {
        log.Fatal("Failed to get trades:", err)
    }

    stats := CalculateStats(trades)

    fmt.Printf("Trade Statistics for BTC-USD (last %d trades)\n", stats.Count)
    fmt.Println("=============================================")
    fmt.Printf("Total Volume:  %.8f BTC\n", stats.TotalVolume)
    fmt.Printf("Total Value:   $%.2f\n", stats.TotalValue)
    fmt.Printf("Buy Volume:    %.8f (%.1f%%)\n",
        stats.BuyVolume, stats.BuyVolume/stats.TotalVolume*100)
    fmt.Printf("Sell Volume:   %.8f (%.1f%%)\n",
        stats.SellVolume, stats.SellVolume/stats.TotalVolume*100)
    fmt.Printf("High:          $%.2f\n", stats.HighPrice)
    fmt.Printf("Low:           $%.2f\n", stats.LowPrice)
    fmt.Printf("Average:       $%.2f\n", stats.AvgPrice)
    fmt.Printf("VWAP:          $%.2f\n", stats.VWAP)
}

Real-Time Trade Streaming

WebSocket Trade Stream

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

func main() {
    c, err := client.NewClient(
        client.WithWebSocketURL("ws://localhost:8081"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := c.ConnectWebSocket(ctx); err != nil {
        log.Fatal("Failed to connect WebSocket:", err)
    }

    // Subscribe to trade updates
    err = c.SubscribeTrades("BTC-USD", func(trade *client.Trade) {
        side := "BUY "
        if trade.Side == client.OrderSideSell {
            side = "SELL"
        }

        fmt.Printf("[%s] %s %.8f @ %.2f\n",
            time.Now().Format("15:04:05.000"),
            side,
            trade.Size,
            trade.Price)
    })
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }

    fmt.Println("Subscribed to BTC-USD trades. Press Ctrl+C to exit.")

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    fmt.Println("\nShutting down...")
}

Multi-Symbol Trade Aggregator

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "sync/atomic"
    "syscall"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

type TradeAggregator struct {
    mu         sync.RWMutex
    trades     map[string][]*client.Trade
    counts     map[string]*uint64
    maxHistory int
}

func NewTradeAggregator(maxHistory int) *TradeAggregator {
    return &TradeAggregator{
        trades:     make(map[string][]*client.Trade),
        counts:     make(map[string]*uint64),
        maxHistory: maxHistory,
    }
}

func (ta *TradeAggregator) AddTrade(trade *client.Trade) {
    ta.mu.Lock()
    defer ta.mu.Unlock()

    symbol := trade.Symbol
    ta.trades[symbol] = append(ta.trades[symbol], trade)

    // Trim to maxHistory
    if len(ta.trades[symbol]) > ta.maxHistory {
        ta.trades[symbol] = ta.trades[symbol][1:]
    }

    // Increment counter
    if ta.counts[symbol] == nil {
        var count uint64
        ta.counts[symbol] = &count
    }
    atomic.AddUint64(ta.counts[symbol], 1)
}

func (ta *TradeAggregator) GetRecentTrades(symbol string, limit int) []*client.Trade {
    ta.mu.RLock()
    defer ta.mu.RUnlock()

    trades := ta.trades[symbol]
    if len(trades) <= limit {
        result := make([]*client.Trade, len(trades))
        copy(result, trades)
        return result
    }

    result := make([]*client.Trade, limit)
    copy(result, trades[len(trades)-limit:])
    return result
}

func (ta *TradeAggregator) GetCount(symbol string) uint64 {
    ta.mu.RLock()
    defer ta.mu.RUnlock()

    if counter := ta.counts[symbol]; counter != nil {
        return atomic.LoadUint64(counter)
    }
    return 0
}

func (ta *TradeAggregator) GetAllCounts() map[string]uint64 {
    ta.mu.RLock()
    defer ta.mu.RUnlock()

    result := make(map[string]uint64)
    for symbol, counter := range ta.counts {
        result[symbol] = atomic.LoadUint64(counter)
    }
    return result
}

func main() {
    c, err := client.NewClient(
        client.WithWebSocketURL("ws://localhost:8081"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := c.ConnectWebSocket(ctx); err != nil {
        log.Fatal("Failed to connect WebSocket:", err)
    }

    aggregator := NewTradeAggregator(1000)

    // Subscribe to multiple symbols
    symbols := []string{"BTC-USD", "ETH-USD", "LUX-USD"}
    for _, symbol := range symbols {
        sym := symbol
        err := c.SubscribeTrades(sym, func(trade *client.Trade) {
            aggregator.AddTrade(trade)
        })
        if err != nil {
            log.Printf("Failed to subscribe to %s: %v", symbol, err)
        } else {
            fmt.Printf("Subscribed to %s trades\n", symbol)
        }
    }

    // Print stats periodically
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    for {
        select {
        case <-ticker.C:
            fmt.Println("\n--- Trade Counts ---")
            for symbol, count := range aggregator.GetAllCounts() {
                fmt.Printf("%s: %d trades\n", symbol, count)
            }

        case <-sigCh:
            fmt.Println("\nFinal counts:")
            for symbol, count := range aggregator.GetAllCounts() {
                fmt.Printf("%s: %d trades\n", symbol, count)
            }
            return
        }
    }
}

Trade Analysis

Volume Profile

package main

import (
    "context"
    "fmt"
    "log"
    "math"
    "sort"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

type PriceVolume struct {
    Price  float64
    Volume float64
}

func BuildVolumeProfile(trades []*client.Trade, buckets int) []PriceVolume {
    if len(trades) == 0 {
        return nil
    }

    // Find price range
    minPrice := trades[0].Price
    maxPrice := trades[0].Price
    for _, trade := range trades {
        if trade.Price < minPrice {
            minPrice = trade.Price
        }
        if trade.Price > maxPrice {
            maxPrice = trade.Price
        }
    }

    bucketSize := (maxPrice - minPrice) / float64(buckets)
    if bucketSize == 0 {
        bucketSize = 1
    }

    // Aggregate volume by price bucket
    volumeMap := make(map[int]float64)
    for _, trade := range trades {
        bucket := int((trade.Price - minPrice) / bucketSize)
        if bucket >= buckets {
            bucket = buckets - 1
        }
        volumeMap[bucket] += trade.Size
    }

    // Convert to sorted slice
    result := make([]PriceVolume, 0, len(volumeMap))
    for bucket, volume := range volumeMap {
        price := minPrice + float64(bucket)*bucketSize + bucketSize/2
        result = append(result, PriceVolume{
            Price:  price,
            Volume: volume,
        })
    }

    sort.Slice(result, func(i, j int) bool {
        return result[i].Price < result[j].Price
    })

    return result
}

func main() {
    c, err := client.NewClient(
        client.WithGRPCURL("localhost:50051"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := c.ConnectGRPC(ctx); err != nil {
        log.Fatal("Failed to connect:", err)
    }

    trades, err := c.GetTrades(ctx, "BTC-USD", 1000)
    if err != nil {
        log.Fatal("Failed to get trades:", err)
    }

    profile := BuildVolumeProfile(trades, 20)

    // Find max volume for normalization
    var maxVolume float64
    for _, pv := range profile {
        if pv.Volume > maxVolume {
            maxVolume = pv.Volume
        }
    }

    fmt.Println("Volume Profile (BTC-USD)")
    fmt.Println("========================")

    for _, pv := range profile {
        barLength := int(pv.Volume / maxVolume * 40)
        bar := ""
        for i := 0; i < barLength; i++ {
            bar += "#"
        }
        fmt.Printf("$%.0f | %s %.4f\n", pv.Price, bar, pv.Volume)
    }
}

Trade Flow Analysis

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

type TradeFlow struct {
    BuyVolume    float64
    SellVolume   float64
    BuyValue     float64
    SellValue    float64
    BuyCount     int
    SellCount    int
    NetFlow      float64
    LargeTrades  int // Trades > threshold
}

func AnalyzeTradeFlow(trades []*client.Trade, largeThreshold float64) *TradeFlow {
    flow := &TradeFlow{}

    for _, trade := range trades {
        value := trade.TotalValue()

        if trade.Side == client.OrderSideBuy {
            flow.BuyVolume += trade.Size
            flow.BuyValue += value
            flow.BuyCount++
        } else {
            flow.SellVolume += trade.Size
            flow.SellValue += value
            flow.SellCount++
        }

        if trade.Size >= largeThreshold {
            flow.LargeTrades++
        }
    }

    flow.NetFlow = flow.BuyValue - flow.SellValue
    return flow
}

func main() {
    c, err := client.NewClient(
        client.WithGRPCURL("localhost:50051"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    if err := c.ConnectGRPC(ctx); err != nil {
        log.Fatal("Failed to connect:", err)
    }

    trades, err := c.GetTrades(ctx, "BTC-USD", 500)
    if err != nil {
        log.Fatal("Failed to get trades:", err)
    }

    flow := AnalyzeTradeFlow(trades, 0.5) // 0.5 BTC = large trade

    fmt.Println("Trade Flow Analysis (BTC-USD)")
    fmt.Println("=============================")
    fmt.Printf("Buy Volume:    %.4f BTC (%d trades)\n", flow.BuyVolume, flow.BuyCount)
    fmt.Printf("Sell Volume:   %.4f BTC (%d trades)\n", flow.SellVolume, flow.SellCount)
    fmt.Printf("Buy Value:     $%.2f\n", flow.BuyValue)
    fmt.Printf("Sell Value:    $%.2f\n", flow.SellValue)
    fmt.Printf("Net Flow:      $%.2f\n", flow.NetFlow)
    fmt.Printf("Large Trades:  %d\n", flow.LargeTrades)

    // Signal
    if flow.NetFlow > 0 {
        fmt.Printf("\nSignal: Net buying pressure (+$%.2f)\n", flow.NetFlow)
    } else {
        fmt.Printf("\nSignal: Net selling pressure ($%.2f)\n", flow.NetFlow)
    }

    buyRatio := float64(flow.BuyCount) / float64(flow.BuyCount+flow.SellCount) * 100
    fmt.Printf("Buy/Sell Ratio: %.1f%%/%.1f%%\n", buyRatio, 100-buyRatio)
}

User Trades

Fetch User Trade History

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

func main() {
    c, err := client.NewClient(
        client.WithJSONRPCURL("http://localhost:8080"),
        client.WithAPIKey("your-api-key"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Get user's trades (would require additional API method)
    trades, err := c.GetTrades(ctx, "BTC-USD", 100)
    if err != nil {
        log.Fatal("Failed to get trades:", err)
    }

    // Filter by user ID (example)
    userID := "trader-001"
    var userTrades []*client.Trade
    for _, trade := range trades {
        if trade.BuyerID == userID || trade.SellerID == userID {
            userTrades = append(userTrades, trade)
        }
    }

    fmt.Printf("Trades for user %s\n", userID)
    fmt.Println("==================")

    var totalPnL float64
    for _, trade := range userTrades {
        var role string
        var value float64

        if trade.BuyerID == userID {
            role = "BOUGHT"
            value = -trade.TotalValue()
        } else {
            role = "SOLD"
            value = trade.TotalValue()
        }

        totalPnL += value

        timestamp := time.Unix(0, trade.Timestamp)
        fmt.Printf("[%s] %s %.8f @ %.2f = $%.2f\n",
            timestamp.Format("2006-01-02 15:04:05"),
            role,
            trade.Size,
            trade.Price,
            trade.TotalValue())
    }

    fmt.Printf("\nTotal P&L: $%.2f\n", totalPnL)
}

Subscribe to User Fills

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

func main() {
    c, err := client.NewClient(
        client.WithWebSocketURL("ws://localhost:8081"),
        client.WithAPIKey("your-api-key"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := c.ConnectWebSocket(ctx); err != nil {
        log.Fatal("Failed to connect WebSocket:", err)
    }

    userID := "trader-001"

    // Subscribe to user-specific trades
    err = c.Subscribe(fmt.Sprintf("user:fills:%s", userID), func(data interface{}) {
        if fill, ok := data.(map[string]interface{}); ok {
            tradeID := uint64(fill["tradeId"].(float64))
            symbol := fill["symbol"].(string)
            price := fill["price"].(float64)
            size := fill["size"].(float64)

            var role string
            if fill["buyerId"].(string) == userID {
                role = "BOUGHT"
            } else {
                role = "SOLD"
            }

            fmt.Printf("[%s] %s: %s %.8f @ %.2f\n",
                time.Now().Format("15:04:05.000"),
                symbol, role, size, price)
            fmt.Printf("  Trade ID: %d\n", tradeID)
        }
    })
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }

    fmt.Printf("Listening for fills for user %s...\n", userID)

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    fmt.Println("\nShutting down...")
}

Performance Considerations

Batch Trade Processing

package main

import (
    "context"
    "log"
    "sync"
    "time"

    "github.com/luxfi/dex/sdk/go/client"
)

type TradeBatcher struct {
    trades  []*client.Trade
    mu      sync.Mutex
    maxSize int
    handler func([]*client.Trade)
    ticker  *time.Ticker
    done    chan struct{}
}

func NewTradeBatcher(maxSize int, flushInterval time.Duration, handler func([]*client.Trade)) *TradeBatcher {
    tb := &TradeBatcher{
        trades:  make([]*client.Trade, 0, maxSize),
        maxSize: maxSize,
        handler: handler,
        ticker:  time.NewTicker(flushInterval),
        done:    make(chan struct{}),
    }

    go tb.run()
    return tb
}

func (tb *TradeBatcher) run() {
    for {
        select {
        case <-tb.ticker.C:
            tb.Flush()
        case <-tb.done:
            tb.Flush()
            return
        }
    }
}

func (tb *TradeBatcher) Add(trade *client.Trade) {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    tb.trades = append(tb.trades, trade)
    if len(tb.trades) >= tb.maxSize {
        tb.flushLocked()
    }
}

func (tb *TradeBatcher) Flush() {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    tb.flushLocked()
}

func (tb *TradeBatcher) flushLocked() {
    if len(tb.trades) == 0 {
        return
    }

    batch := tb.trades
    tb.trades = make([]*client.Trade, 0, tb.maxSize)
    go tb.handler(batch)
}

func (tb *TradeBatcher) Stop() {
    tb.ticker.Stop()
    close(tb.done)
}

func main() {
    c, err := client.NewClient(
        client.WithWebSocketURL("ws://localhost:8081"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer c.Disconnect()

    ctx := context.Background()
    if err := c.ConnectWebSocket(ctx); err != nil {
        log.Fatal("Failed to connect:", err)
    }

    // Create batcher that flushes every 100 trades or 1 second
    batcher := NewTradeBatcher(100, time.Second, func(trades []*client.Trade) {
        log.Printf("Processing batch of %d trades", len(trades))
        // Process batch (e.g., write to database, analytics)
    })
    defer batcher.Stop()

    // Subscribe and batch trades
    err = c.SubscribeTrades("BTC-USD", func(trade *client.Trade) {
        batcher.Add(trade)
    })
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }

    // Run for 60 seconds
    time.Sleep(60 * time.Second)
}

Next Steps