Core Engine

State Management

Order book state persistence, snapshots, and recovery mechanisms

State Management

Implementation Status

ComponentStatusSource
OrderBook SnapshotCompletepkg/lx/orderbook.go#L1099-L1131
Depth SnapshotCompletepkg/lx/orderbook.go#L1133-L1145
Order-Level SnapshotCompletepkg/lx/orderbook.go#L1192-L1255
Reset/ClearCompletepkg/lx/orderbook.go#L1470-L1487
Trade History BufferCompletepkg/lx/orderbook.go#L1350-L1382
DAG Consensus StateCompletepkg/consensus/dag.go
Extended Book StateCompletepkg/lx/orderbook_extended.go#L334-L369

State Architecture

                           STATE MANAGEMENT ARCHITECTURE

    ┌──────────────────────────────────────────────────────────────────────────────┐
    │                              IN-MEMORY STATE                                  │
    │  ┌────────────────────────────────────────────────────────────────────────┐  │
    │  │                           OrderBook                                    │  │
    │  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────────┐ │  │
    │  │  │ Orders Map      │  │ User Orders     │  │ Price Levels (Bid/Ask)  │ │  │
    │  │  │ map[uint64]     │  │ map[string]     │  │ map[PriceInt]           │ │  │
    │  │  │ *Order          │  │ []uint64        │  │ *OptimizedPriceLevel    │ │  │
    │  │  └─────────────────┘  └─────────────────┘  └─────────────────────────┘ │  │
    │  │                                                                        │  │
    │  │  ┌─────────────────────────────────────────────────────────────────┐   │  │
    │  │  │ Atomic Counters                                                 │   │  │
    │  │  │ LastOrderID | LastTradeID | LastUpdateID | OrderSeq             │   │  │
    │  │  └─────────────────────────────────────────────────────────────────┘   │  │
    │  └────────────────────────────────────────────────────────────────────────┘  │
    └──────────────────────────────────────────────────────────────────────────────┘

                                        v
    ┌──────────────────────────────────────────────────────────────────────────────┐
    │                            SNAPSHOT LAYER                                     │
    │  ┌────────────────┐  ┌────────────────┐  ┌──────────────────────────────────┐│
    │  │ L2 Depth       │  │ L3 Order Level │  │ Full State Snapshot              ││
    │  │ (Price Agg)    │  │ (Individual)   │  │ (Serializable)                   ││
    │  └────────────────┘  └────────────────┘  └──────────────────────────────────┘│
    └──────────────────────────────────────────────────────────────────────────────┘

                                        v
    ┌──────────────────────────────────────────────────────────────────────────────┐
    │                         DAG CONSENSUS LAYER                                   │
    │  ┌────────────────────────────────────────────────────────────────────────┐  │
    │  │ OrderVertex -> Certificates -> Finalization -> Quantum Proof          │  │
    │  └────────────────────────────────────────────────────────────────────────┘  │
    └──────────────────────────────────────────────────────────────────────────────┘

Snapshot Types

L2 Depth Snapshot (Price Aggregated)

Aggregates orders by price level, suitable for market data feeds:

// OrderBookDepth represents order book depth
type OrderBookDepth struct {
    Symbol       string
    Timestamp    time.Time
    Bids         []PriceLevel
    Asks         []PriceLevel
    Sequence     uint64
    LastUpdateID uint64
}

// PriceLevel represents a price level in the order book
type PriceLevel struct {
    Price      float64
    Size       float64
    Count      int
    TotalSize  float64
    OrderCount int
    Orders     []*Order  // Optional, for L3 data
}

// GetDepth returns the order book depth
func (ob *OrderBook) GetDepth(levels int) *OrderBookDepth {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    return &OrderBookDepth{
        Bids:      ob.Bids.getLevels(levels),
        Asks:      ob.Asks.getLevels(levels),
        Sequence:  atomic.LoadUint64(&ob.LastOrderID),
        Timestamp: time.Now(),
        Symbol:    ob.Symbol,
    }
}

// getLevels aggregates orders by price level
func (tree *OrderTree) getLevels(depth int) []PriceLevel {
    tree.mu.RLock()
    defer tree.mu.RUnlock()

    maxLevels := depth
    if depth == 0 {
        maxLevels = len(tree.priceLevels)  // All levels
    }

    levels := make([]PriceLevel, 0, maxLevels)
    prices := make([]float64, 0, len(tree.priceLevels))

    // Collect non-empty price levels
    for _, level := range tree.priceLevels {
        if level.OrderCount > 0 {
            prices = append(prices, level.Price)
        }
    }

    // Sort prices (descending for bids, ascending for asks)
    if tree.side == Buy {
        sort.Sort(sort.Reverse(sort.Float64Slice(prices)))
    } else {
        sort.Float64s(prices)
    }

    // Build levels
    for i, price := range prices {
        if depth > 0 && i >= depth {
            break
        }
        priceInt := PriceInt(price * PriceMultiplier)
        if level, exists := tree.priceLevels[priceInt]; exists {
            levels = append(levels, PriceLevel{
                Price: level.Price,
                Size:  level.TotalSize,
                Count: level.OrderCount,
            })
        }
    }

    return levels
}

L3 Order-Level Snapshot

Individual order visibility for full transparency:

// OrderLevel represents an individual order at a price level
type OrderLevel struct {
    Price    float64
    Size     float64
    OrderID  uint64
    UserID   string
    ClientID string
}

// OrderBookSnapshot represents the full order book state
type OrderBookSnapshot struct {
    Symbol    string
    Timestamp time.Time
    Bids      []OrderLevel
    Asks      []OrderLevel
    Sequence  uint64
}

// GetOrderBookSnapshot returns full order-level book data
func (ob *OrderBook) GetOrderBookSnapshot() OrderBookSnapshot {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    snapshot := OrderBookSnapshot{
        Symbol:    ob.Symbol,
        Timestamp: time.Now(),
        Sequence:  atomic.LoadUint64(&ob.Bids.sequence),
    }

    // Get all bid orders
    snapshot.Bids = ob.Bids.getOrderLevels()
    snapshot.Asks = ob.Asks.getOrderLevels()

    return snapshot
}

func (tree *OrderTree) getOrderLevels() []OrderLevel {
    tree.mu.RLock()
    defer tree.mu.RUnlock()

    levels := make([]OrderLevel, 0)

    // Get all prices and sort
    prices := make([]float64, 0, len(tree.priceLevels))
    for _, level := range tree.priceLevels {
        if level.OrderCount > 0 {
            prices = append(prices, level.Price)
        }
    }

    if tree.side == Buy {
        sort.Sort(sort.Reverse(sort.Float64Slice(prices)))
    } else {
        sort.Float64s(prices)
    }

    // Build L3 levels with individual orders
    for _, price := range prices {
        priceInt := PriceInt(price * PriceMultiplier)
        if level, exists := tree.priceLevels[priceInt]; exists {
            level.mu.RLock()
            for _, order := range level.Orders {
                if order.Status == Open || order.Status == PartiallyFilled {
                    remainingSize := order.Size - order.Filled
                    if order.RemainingSize > 0 {
                        remainingSize = order.RemainingSize
                    }
                    levels = append(levels, OrderLevel{
                        Price:    order.Price,
                        Size:     remainingSize,
                        OrderID:  order.ID,
                        UserID:   order.User,
                        ClientID: order.ClientID,
                    })
                }
            }
            level.mu.RUnlock()
        }
    }

    return levels
}

Pointer-Based Snapshot

Fast snapshot using atomic pointer swap:

// GetSnapshot returns orderbook snapshot
func (ob *OrderBook) GetSnapshot() *OrderBookSnapshot {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    // Convert price levels to order levels
    bidLevels := ob.Bids.getLevels(10)
    askLevels := ob.Asks.getLevels(10)

    bids := make([]OrderLevel, len(bidLevels))
    for i, level := range bidLevels {
        bids[i] = OrderLevel{
            Price: level.Price,
            Size:  level.Size,
        }
    }

    asks := make([]OrderLevel, len(askLevels))
    for i, level := range askLevels {
        asks[i] = OrderLevel{
            Price: level.Price,
            Size:  level.Size,
        }
    }

    return &OrderBookSnapshot{
        Symbol:    ob.Symbol,
        Timestamp: time.Now(),
        Bids:      bids,
        Asks:      asks,
        Sequence:  atomic.LoadUint64(&ob.Bids.sequence),
    }
}

Trade History Management

Circular Trade Buffer

Efficient fixed-size buffer for recent trade history:

// CircularTradeBuffer for efficient trade history
type CircularTradeBuffer struct {
    buffer   []Trade
    head     uint64
    tail     uint64
    size     uint64
    capacity uint64
    mu       sync.RWMutex
}

// Add is O(1), no allocations after initialization
func (ctb *CircularTradeBuffer) Add(trade Trade) {
    ctb.mu.Lock()
    defer ctb.mu.Unlock()

    ctb.buffer[ctb.tail] = trade
    ctb.tail = (ctb.tail + 1) % uint64(len(ctb.buffer))

    if ctb.size < uint64(len(ctb.buffer)) {
        ctb.size++
    } else {
        ctb.head = (ctb.head + 1) % uint64(len(ctb.buffer))
    }
}

// GetRecent retrieves the most recent trades
func (ctb *CircularTradeBuffer) GetRecent(count int) []Trade {
    ctb.mu.RLock()
    defer ctb.mu.RUnlock()

    if count > int(ctb.size) {
        count = int(ctb.size)
    }

    trades := make([]Trade, count)
    idx := (ctb.tail - uint64(count) + uint64(len(ctb.buffer))) % uint64(len(ctb.buffer))

    for i := 0; i < count; i++ {
        trades[i] = ctb.buffer[idx]
        idx = (idx + 1) % uint64(len(ctb.buffer))
    }

    return trades
}

Slice-Based History (Legacy)

For compatibility with existing code:

// GetTrades returns all trades that have been executed
func (ob *OrderBook) GetTrades() []Trade {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    trades := make([]Trade, 0, len(ob.Trades))
    for _, trade := range ob.Trades {
        trades = append(trades, trade)
    }
    return trades
}

// Automatic pruning during matching
if len(ob.Trades) > 100000 {
    ob.Trades = ob.Trades[len(ob.Trades)-50000:]
}

State Reset

Complete state reset for testing or market reset:

// Reset clears the order book
func (ob *OrderBook) Reset() {
    ob.mu.Lock()
    defer ob.mu.Unlock()

    ob.Orders = make(map[uint64]*Order)
    ob.UserOrders = make(map[string][]uint64)
    ob.Trades = []Trade{}
    ob.Bids = NewOrderTree(Buy)
    ob.Asks = NewOrderTree(Sell)
    atomic.StorePointer(&ob.bids, unsafe.Pointer(ob.Bids))
    atomic.StorePointer(&ob.asks, unsafe.Pointer(ob.Asks))
    ob.ordersMap = sync.Map{}
    ob.userOrdersMap = sync.Map{}
    ob.LastOrderID = 0
    ob.LastTradeID = 0
    ob.LastUpdateID = 0
}

Extended Order Book State

Additional state for advanced features:

// ExtendedOrderBook adds advanced features to the basic order book
type ExtendedOrderBook struct {
    *OrderBook

    // Market data subscribers
    subscribers     []chan MarketDataUpdate
    subscriberMutex sync.RWMutex

    // Stop orders tracking
    stopBuyOrders  map[uint64]*Order
    stopSellOrders map[uint64]*Order

    // Iceberg orders tracking
    icebergOrders map[uint64]*IcebergState

    // Advanced features
    EnableSelfTradePrevention bool
    EnablePostOnly            bool
    EnableHiddenOrders        bool

    // Performance metrics
    totalVolume      float64
    totalTrades      uint64
    lastUpdateID     uint64
    lastSnapshotTime time.Time
}

// IcebergState represents iceberg order state
type IcebergState struct {
    TotalSize      float64
    DisplaySize    float64
    RemainingSize  float64
    RefillCount    int
    CurrentOrderID uint64
}

// GetStatistics returns order book statistics
func (book *ExtendedOrderBook) GetStatistics() map[string]interface{} {
    book.mu.RLock()
    defer book.mu.RUnlock()

    stats := make(map[string]interface{})

    // Basic stats
    stats["symbol"] = book.Symbol
    stats["total_orders"] = len(book.Orders)
    stats["total_trades"] = book.totalTrades
    stats["total_volume"] = book.totalVolume
    stats["last_trade_id"] = book.LastTradeID
    stats["last_update_id"] = book.lastUpdateID

    // Order counts by type
    stats["stop_orders"] = len(book.stopBuyOrders) + len(book.stopSellOrders)
    stats["iceberg_orders"] = len(book.icebergOrders)

    // Performance metrics
    stats["subscribers"] = len(book.subscribers)
    stats["last_snapshot_time"] = book.lastSnapshotTime

    return stats
}

DAG Consensus State

State management for distributed consensus:

// DAGOrderBook implements a DAG-based order book
type DAGOrderBook struct {
    mu            sync.RWMutex
    nodeID        string
    symbol        string
    orderBook     *lx.OrderBook
    vertices      map[ID]*OrderVertex
    edges         map[ID][]ID
    frontier      []ID
    accepted      map[ID]bool
    height        uint64
    shutdown      chan struct{}
    consensusTime time.Duration
}

// LuxDAGOrderBook extends DAGOrderBook with Lux consensus
type LuxDAGOrderBook struct {
    *DAGOrderBook
    luxConfig     LuxConsensusConfig
    blsKey        *SecretKey
    ringtail      *RingtailEngine
    quasar        *Quasar
    votes         map[ID]*VoteState
    precomputed   map[ID][]byte
    certificates  map[ID]*QuantumCertificate
    finalized     map[ID]bool
    voteThreshold float64
    finalityCount atomic.Uint64
    round         int
    voteCache     map[ID]map[string]bool
    quantumReady  bool
    quantumProofs map[ID]*QuantumCertificate
}

// OrderVertex represents a vertex in the DAG
type OrderVertex struct {
    ID        ID
    Order     *lx.Order
    NodeID    string
    Height    uint64
    Parents   []ID
    Timestamp time.Time
    Trades    []*lx.Trade
}

// GetStats returns Lux Consensus DAG statistics
func (lux *LuxDAGOrderBook) GetStats() map[string]interface{} {
    lux.mu.RLock()
    defer lux.mu.RUnlock()

    return map[string]interface{}{
        "node_id":           lux.nodeID,
        "vertices":          len(lux.vertices),
        "finalized":         len(lux.finalized),
        "frontier_size":     len(lux.frontier),
        "total_trades":      uint64(0),
        "quantum_finality":  lux.finalityCount.Load(),
        "certificates":      len(lux.certificates),
        "vote_threshold":    lux.voteThreshold,
        "quasar_certs":      lux.quasar.CertificateCount(),
        "quasar_skip_certs": lux.quasar.SkipCertificateCount(),
        "peers":             0,
        "fpc_enabled":       lux.luxConfig.Enable,
    }
}

State Serialization

For persistence and network transfer:

import "encoding/json"

// Serialize order book state to JSON
func (ob *OrderBook) MarshalJSON() ([]byte, error) {
    ob.mu.RLock()
    defer ob.mu.RUnlock()

    state := struct {
        Symbol      string         `json:"symbol"`
        LastOrderID uint64         `json:"last_order_id"`
        LastTradeID uint64         `json:"last_trade_id"`
        Orders      map[uint64]*Order `json:"orders"`
        Bids        []PriceLevel   `json:"bids"`
        Asks        []PriceLevel   `json:"asks"`
        Trades      []Trade        `json:"trades"`
        Timestamp   time.Time      `json:"timestamp"`
    }{
        Symbol:      ob.Symbol,
        LastOrderID: ob.LastOrderID,
        LastTradeID: ob.LastTradeID,
        Orders:      ob.Orders,
        Bids:        ob.Bids.getLevels(0),
        Asks:        ob.Asks.getLevels(0),
        Trades:      ob.Trades,
        Timestamp:   time.Now(),
    }

    return json.Marshal(state)
}

// Restore from serialized state
func (ob *OrderBook) UnmarshalJSON(data []byte) error {
    ob.mu.Lock()
    defer ob.mu.Unlock()

    var state struct {
        Symbol      string            `json:"symbol"`
        LastOrderID uint64            `json:"last_order_id"`
        LastTradeID uint64            `json:"last_trade_id"`
        Orders      map[uint64]*Order `json:"orders"`
        Trades      []Trade           `json:"trades"`
    }

    if err := json.Unmarshal(data, &state); err != nil {
        return err
    }

    ob.Symbol = state.Symbol
    ob.LastOrderID = state.LastOrderID
    ob.LastTradeID = state.LastTradeID
    ob.Orders = state.Orders
    ob.Trades = state.Trades

    // Rebuild order trees from orders
    ob.Bids = NewOrderTree(Buy)
    ob.Asks = NewOrderTree(Sell)

    for _, order := range ob.Orders {
        if order.Side == Buy {
            ob.Bids.addOrder(order)
        } else {
            ob.Asks.addOrder(order)
        }
    }

    atomic.StorePointer(&ob.bids, unsafe.Pointer(ob.Bids))
    atomic.StorePointer(&ob.asks, unsafe.Pointer(ob.Asks))

    return nil
}

Consistency Guarantees

Sequence Numbers

Track state changes for incremental updates:

type OrderBook struct {
    // ...
    LastUpdateID uint64         // Increments on every state change
    OrderSeq     uint64         // Sequence for ordering
}

// After any modification
atomic.AddUint64(&ob.LastUpdateID, 1)

Atomic Operations

Lock-free reads for hot paths:

// Read best price without lock
func (ob *OrderBook) GetBestBid() float64 {
    bids := ob.GetBids()
    if bids == nil {
        return 0
    }
    bestPriceInt := bids.bestPrice.Load()  // Atomic
    if bestPriceInt == 0 {
        return 0
    }
    return float64(bestPriceInt) / PriceMultiplier
}

// Read order without lock
func (ob *OrderBook) GetOrder(orderID uint64) *Order {
    // Try lock-free map first
    if val, exists := ob.ordersMap.Load(orderID); exists {
        if order, ok := val.(*Order); ok {
            return order
        }
    }

    // Fallback to locked map
    ob.mu.RLock()
    defer ob.mu.RUnlock()
    return ob.Orders[orderID]
}

Recovery Procedures

From Snapshot

func RecoverFromSnapshot(snapshot *OrderBookSnapshot) (*OrderBook, error) {
    ob := NewOrderBook(snapshot.Symbol)

    // Restore each order
    for _, level := range snapshot.Bids {
        order := &Order{
            ID:    level.OrderID,
            Side:  Buy,
            Price: level.Price,
            Size:  level.Size,
            User:  level.UserID,
        }
        ob.AddOrder(order)
    }

    for _, level := range snapshot.Asks {
        order := &Order{
            ID:    level.OrderID,
            Side:  Sell,
            Price: level.Price,
            Size:  level.Size,
            User:  level.UserID,
        }
        ob.AddOrder(order)
    }

    return ob, nil
}

From Event Log

func RecoverFromEvents(symbol string, events []Event) (*OrderBook, error) {
    ob := NewOrderBook(symbol)

    for _, event := range events {
        switch event.Type {
        case "order_added":
            order := event.Data.(*Order)
            ob.AddOrder(order)
        case "order_canceled":
            orderID := event.Data.(uint64)
            ob.CancelOrder(orderID)
        case "order_modified":
            mod := event.Data.(OrderModification)
            ob.ModifyOrder(mod.OrderID, mod.NewPrice, mod.NewSize)
        }
    }

    return ob, nil
}

Performance Considerations

OperationLatencyNotes
L2 Depth (10 levels)~2 usSorted aggregation
L3 Full Snapshot~50 usAll orders, sorted
Trade Buffer Add~50 nsO(1) circular buffer
State Reset~100 usMap recreation
JSON Serialize~1 msDepends on size