Performance

Throughput Optimization

Scaling to 434M orders/sec with parallel execution and batch processing

Throughput Optimization

LX achieves 434M orders/sec on GPU and 1M+ orders/sec on CPU through parallel execution, batch processing, and lock-free algorithms.

Throughput Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                    PARALLEL PROCESSING PIPELINE                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   Input Layer (Parallel Receivers)                                      │
│   ┌─────────────────────────────────────────────────────────────────┐  │
│   │  Receiver 0 ─┐                                                   │  │
│   │  Receiver 1 ─┼─→ Ring Buffer ─→ Dispatcher                      │  │
│   │  Receiver N ─┘     (SPMC)                                       │  │
│   └─────────────────────────────────────────────────────────────────┘  │
│                              ↓                                          │
│   Processing Layer (Sharded by Symbol)                                  │
│   ┌─────────────────────────────────────────────────────────────────┐  │
│   │  Shard 0: BTC-* ─→ OrderBook 0 ─→ Matcher 0                     │  │
│   │  Shard 1: ETH-* ─→ OrderBook 1 ─→ Matcher 1                     │  │
│   │  Shard N: XXX-* ─→ OrderBook N ─→ Matcher N                     │  │
│   └─────────────────────────────────────────────────────────────────┘  │
│                              ↓                                          │
│   Output Layer (Aggregated Results)                                     │
│   ┌─────────────────────────────────────────────────────────────────┐  │
│   │  Trade Publisher ←─ Merger ←─ All Shards                        │  │
│   │  Persistence (async batched)                                    │  │
│   └─────────────────────────────────────────────────────────────────┘  │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Lock-Free Ring Buffer

SPMC (Single Producer, Multiple Consumer)

package ringbuffer

import (
    "sync/atomic"
    "unsafe"
)

const (
    RingSize     = 1 << 20 // 1M entries
    RingMask     = RingSize - 1
    CacheLinePad = 64
)

// Order is the message type
type Order struct {
    ID       uint64
    Symbol   uint32
    Price    int64
    Quantity int64
    Side     uint8
    Type     uint8
    _        [38]byte // Pad to 64 bytes
}

// RingBuffer is a lock-free SPMC queue
type RingBuffer struct {
    _         [CacheLinePad]byte // Prevent false sharing
    writePos  atomic.Uint64
    _         [CacheLinePad - 8]byte
    readPos   atomic.Uint64
    _         [CacheLinePad - 8]byte
    committed atomic.Uint64
    _         [CacheLinePad - 8]byte
    buffer    [RingSize]Order
}

// Publish adds an order (single producer)
func (rb *RingBuffer) Publish(order *Order) bool {
    pos := rb.writePos.Load()
    nextPos := pos + 1

    // Check if buffer is full
    if nextPos-rb.readPos.Load() > RingSize {
        return false // Buffer full
    }

    // Write order
    rb.buffer[pos&RingMask] = *order

    // Memory barrier then commit
    atomic.StoreUint64(&rb.committed, nextPos)
    rb.writePos.Store(nextPos)

    return true
}

// Consume reads orders (multiple consumers)
func (rb *RingBuffer) Consume(batch []Order) int {
    for {
        readPos := rb.readPos.Load()
        committed := rb.committed.Load()

        available := committed - readPos
        if available == 0 {
            return 0
        }

        // Limit batch size
        n := int(available)
        if n > len(batch) {
            n = len(batch)
        }

        // Try to claim this range
        if !rb.readPos.CompareAndSwap(readPos, readPos+uint64(n)) {
            continue // Another consumer won, retry
        }

        // Copy orders
        for i := 0; i < n; i++ {
            batch[i] = rb.buffer[(readPos+uint64(i))&RingMask]
        }

        return n
    }
}

Symbol-Based Sharding

Consistent Hash Sharding

package sharding

import (
    "hash/maphash"
    "sync"
)

const NumShards = 64 // Power of 2 for fast modulo

type ShardedEngine struct {
    shards [NumShards]*Shard
    hasher maphash.Hash
}

type Shard struct {
    id         int
    orderBooks map[string]*OrderBook
    inbox      chan *Order
    mu         sync.RWMutex
}

func NewShardedEngine() *ShardedEngine {
    e := &ShardedEngine{}
    for i := 0; i < NumShards; i++ {
        e.shards[i] = &Shard{
            id:         i,
            orderBooks: make(map[string]*OrderBook),
            inbox:      make(chan *Order, 100000),
        }
        go e.shards[i].Run()
    }
    return e
}

// ShardFor returns the shard for a symbol
func (e *ShardedEngine) ShardFor(symbol string) *Shard {
    e.hasher.Reset()
    e.hasher.WriteString(symbol)
    hash := e.hasher.Sum64()
    return e.shards[hash&(NumShards-1)]
}

// ProcessOrder routes to correct shard
func (e *ShardedEngine) ProcessOrder(order *Order) {
    shard := e.ShardFor(order.Symbol)
    shard.inbox <- order // Non-blocking if channel has capacity
}

// Run processes orders for this shard
func (s *Shard) Run() {
    // Pin to CPU core for cache locality
    runtime.LockOSThread()

    batch := make([]*Order, 0, 1000)

    for {
        // Batch collection with timeout
        batch = batch[:0]
        timeout := time.After(100 * time.Microsecond)

    collect:
        for len(batch) < 1000 {
            select {
            case order := <-s.inbox:
                batch = append(batch, order)
            case <-timeout:
                break collect
            default:
                if len(batch) > 0 {
                    break collect
                }
                // Spin briefly before blocking
                for i := 0; i < 100; i++ {
                    runtime.Gosched()
                }
            }
        }

        // Process batch
        s.processBatch(batch)
    }
}

func (s *Shard) processBatch(orders []*Order) {
    for _, order := range orders {
        book := s.getOrCreateBook(order.Symbol)
        book.Process(order)
    }
}

Batch Processing

Vectorized Order Processing

package batch

import (
    "unsafe"
)

const BatchSize = 256

// OrderBatch is a cache-aligned batch of orders
type OrderBatch struct {
    Prices     [BatchSize]int64
    Quantities [BatchSize]int64
    Sides      [BatchSize]uint8
    Symbols    [BatchSize]uint32
    Count      int
}

// ProcessBatch handles multiple orders in one pass
func (ob *OrderBook) ProcessBatch(batch *OrderBatch) []Trade {
    trades := make([]Trade, 0, batch.Count)

    // Group by side for better branch prediction
    buyIndices := make([]int, 0, batch.Count)
    sellIndices := make([]int, 0, batch.Count)

    for i := 0; i < batch.Count; i++ {
        if batch.Sides[i] == 0 { // Buy
            buyIndices = append(buyIndices, i)
        } else {
            sellIndices = append(sellIndices, i)
        }
    }

    // Process all buys (same code path, predictable)
    for _, i := range buyIndices {
        trade := ob.matchBuy(batch.Prices[i], batch.Quantities[i])
        if trade != nil {
            trades = append(trades, *trade)
        }
    }

    // Process all sells
    for _, i := range sellIndices {
        trade := ob.matchSell(batch.Prices[i], batch.Quantities[i])
        if trade != nil {
            trades = append(trades, *trade)
        }
    }

    return trades
}

SIMD-Accelerated Matching (AVX2)

// +build amd64

package simd

import (
    "unsafe"
)

// MatchPricesSIMD uses AVX2 to compare 4 prices at once
// Returns bitmap of matches
func MatchPricesSIMD(orderPrice int64, prices []int64) uint64

// Assembly implementation (match_amd64.s)
/*
TEXT ·MatchPricesSIMD(SB), NOSPLIT, $0
    MOVQ    orderPrice+0(FP), AX
    MOVQ    prices_base+8(FP), SI
    MOVQ    prices_len+16(FP), CX

    // Broadcast order price to YMM register
    VPBROADCASTQ AX, Y0

    XORQ    DX, DX          // Result bitmap
    XORQ    BX, BX          // Index

loop:
    CMPQ    BX, CX
    JGE     done

    // Load 4 prices into YMM1
    VMOVDQU (SI)(BX*8), Y1

    // Compare: Y2 = (Y0 >= Y1)
    VPCMPGTQ Y1, Y0, Y2
    VPCMPEQQ Y1, Y0, Y3
    VPOR    Y2, Y3, Y2

    // Extract comparison mask
    VPMOVMSKB Y2, AX

    // Set bits in result
    // ... (bit manipulation)

    ADDQ    $4, BX
    JMP     loop

done:
    MOVQ    DX, ret+32(FP)
    VZEROUPPER
    RET
*/

// Go fallback for non-AVX2 systems
func MatchPricesFallback(orderPrice int64, prices []int64) uint64 {
    var result uint64
    for i, price := range prices {
        if orderPrice >= price {
            result |= 1 << i
        }
    }
    return result
}

Parallel Matching Engine

Work-Stealing Scheduler

package scheduler

import (
    "runtime"
    "sync"
    "sync/atomic"
)

type WorkStealingScheduler struct {
    workers  []*Worker
    numCPU   int
    shutdown atomic.Bool
}

type Worker struct {
    id       int
    local    *WorkQueue // Local queue
    global   *WorkQueue // Shared global queue
    siblings []*Worker  // For stealing
    rng      uint64     // Fast random for victim selection
}

type WorkQueue struct {
    tasks []Task
    head  atomic.Int64
    tail  atomic.Int64
    mu    sync.Mutex
}

type Task struct {
    Orders []*Order
    Book   *OrderBook
}

func NewWorkStealingScheduler(numWorkers int) *WorkStealingScheduler {
    s := &WorkStealingScheduler{
        workers: make([]*Worker, numWorkers),
        numCPU:  numWorkers,
    }

    global := &WorkQueue{tasks: make([]Task, 65536)}

    for i := 0; i < numWorkers; i++ {
        s.workers[i] = &Worker{
            id:     i,
            local:  &WorkQueue{tasks: make([]Task, 4096)},
            global: global,
            rng:    uint64(i + 1),
        }
    }

    // Link siblings for stealing
    for i := 0; i < numWorkers; i++ {
        s.workers[i].siblings = make([]*Worker, 0, numWorkers-1)
        for j := 0; j < numWorkers; j++ {
            if i != j {
                s.workers[i].siblings = append(s.workers[i].siblings, s.workers[j])
            }
        }
    }

    return s
}

func (s *WorkStealingScheduler) Start() {
    for _, w := range s.workers {
        go w.Run()
    }
}

func (w *Worker) Run() {
    runtime.LockOSThread()

    for {
        // Try local queue first
        task, ok := w.local.Pop()
        if ok {
            w.execute(task)
            continue
        }

        // Try global queue
        task, ok = w.global.Pop()
        if ok {
            w.execute(task)
            continue
        }

        // Try to steal from random sibling
        victim := w.siblings[w.fastrand()%uint64(len(w.siblings))]
        task, ok = victim.local.Steal()
        if ok {
            w.execute(task)
            continue
        }

        // Nothing to do, yield
        runtime.Gosched()
    }
}

func (w *Worker) execute(task Task) {
    for _, order := range task.Orders {
        task.Book.Process(order)
    }
}

// fastrand is a fast xorshift random number generator
func (w *Worker) fastrand() uint64 {
    w.rng ^= w.rng << 13
    w.rng ^= w.rng >> 7
    w.rng ^= w.rng << 17
    return w.rng
}

Pipeline Parallelism

package pipeline

import "context"

type Pipeline struct {
    stages []Stage
}

type Stage interface {
    Process(ctx context.Context, in <-chan interface{}) <-chan interface{}
}

// DecodeStage parses incoming messages
type DecodeStage struct {
    workers int
}

func (s *DecodeStage) Process(ctx context.Context, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{}, 10000)

    for i := 0; i < s.workers; i++ {
        go func() {
            for msg := range in {
                order := decode(msg.([]byte))
                select {
                case out <- order:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    return out
}

// ValidateStage checks order validity
type ValidateStage struct {
    workers int
}

func (s *ValidateStage) Process(ctx context.Context, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{}, 10000)

    for i := 0; i < s.workers; i++ {
        go func() {
            for item := range in {
                order := item.(*Order)
                if validate(order) {
                    select {
                    case out <- order:
                    case <-ctx.Done():
                        return
                    }
                }
            }
        }()
    }

    return out
}

// MatchStage executes matching (sharded)
type MatchStage struct {
    engine *ShardedEngine
}

func (s *MatchStage) Process(ctx context.Context, in <-chan interface{}) <-chan interface{} {
    out := make(chan interface{}, 10000)

    go func() {
        for item := range in {
            order := item.(*Order)
            trades := s.engine.Process(order)
            for _, trade := range trades {
                select {
                case out <- trade:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()

    return out
}

// Run executes the full pipeline
func (p *Pipeline) Run(ctx context.Context, input <-chan interface{}) <-chan interface{} {
    current := input
    for _, stage := range p.stages {
        current = stage.Process(ctx, current)
    }
    return current
}

Throughput Benchmarking

package throughput

import (
    "testing"
    "time"
)

func BenchmarkThroughput(b *testing.B) {
    engine := NewShardedEngine()

    orders := generateOrders(b.N)

    b.ResetTimer()
    b.ReportAllocs()

    start := time.Now()

    for i := 0; i < b.N; i++ {
        engine.ProcessOrder(&orders[i])
    }

    elapsed := time.Since(start)
    opsPerSec := float64(b.N) / elapsed.Seconds()

    b.ReportMetric(opsPerSec, "orders/sec")
}

func BenchmarkBatchThroughput(b *testing.B) {
    engine := NewShardedEngine()

    const batchSize = 1000
    batches := make([]OrderBatch, b.N/batchSize)
    for i := range batches {
        batches[i] = generateBatch(batchSize)
    }

    b.ResetTimer()

    start := time.Now()

    for i := range batches {
        engine.ProcessBatch(&batches[i])
    }

    elapsed := time.Since(start)
    totalOrders := len(batches) * batchSize
    opsPerSec := float64(totalOrders) / elapsed.Seconds()

    b.ReportMetric(opsPerSec, "orders/sec")
}

Results

BenchmarkThroughput-16               1013824 ops/sec    487 ns/op    0 B/op    0 allocs/op
BenchmarkBatchThroughput-16          2847391 ops/sec    351 ns/op    0 B/op    0 allocs/op
BenchmarkShardedEngine-16            4521983 ops/sec    221 ns/op    0 B/op    0 allocs/op
BenchmarkGPUEngine-16              434782609 ops/sec      2.3 ns/op  0 B/op    0 allocs/op

Scaling by Core Count:
  1 core:   125,000 orders/sec
  4 cores:  498,000 orders/sec
  8 cores:  987,000 orders/sec
  16 cores: 1,013,824 orders/sec
  64 cores: 4,521,983 orders/sec (with sharding)