Core Engine
State Management
Order book state persistence, snapshots, and recovery mechanisms
State Management
LP Specification: LP-9000: High-Performance DEX Core Engine - Section 5: State Management
Implementation Status
| Component | Status | Source |
|---|---|---|
| OrderBook Snapshot | Complete | pkg/lx/orderbook.go#L1099-L1131 |
| Depth Snapshot | Complete | pkg/lx/orderbook.go#L1133-L1145 |
| Order-Level Snapshot | Complete | pkg/lx/orderbook.go#L1192-L1255 |
| Reset/Clear | Complete | pkg/lx/orderbook.go#L1470-L1487 |
| Trade History Buffer | Complete | pkg/lx/orderbook.go#L1350-L1382 |
| DAG Consensus State | Complete | pkg/consensus/dag.go |
| Extended Book State | Complete | pkg/lx/orderbook_extended.go#L334-L369 |
State Architecture
STATE MANAGEMENT ARCHITECTURE
┌──────────────────────────────────────────────────────────────────────────────┐
│ IN-MEMORY STATE │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ OrderBook │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │ │
│ │ │ Orders Map │ │ User Orders │ │ Price Levels (Bid/Ask) │ │ │
│ │ │ map[uint64] │ │ map[string] │ │ map[PriceInt] │ │ │
│ │ │ *Order │ │ []uint64 │ │ *OptimizedPriceLevel │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Atomic Counters │ │ │
│ │ │ LastOrderID | LastTradeID | LastUpdateID | OrderSeq │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘
│
v
┌──────────────────────────────────────────────────────────────────────────────┐
│ SNAPSHOT LAYER │
│ ┌────────────────┐ ┌────────────────┐ ┌──────────────────────────────────┐│
│ │ L2 Depth │ │ L3 Order Level │ │ Full State Snapshot ││
│ │ (Price Agg) │ │ (Individual) │ │ (Serializable) ││
│ └────────────────┘ └────────────────┘ └──────────────────────────────────┘│
└──────────────────────────────────────────────────────────────────────────────┘
│
v
┌──────────────────────────────────────────────────────────────────────────────┐
│ DAG CONSENSUS LAYER │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ OrderVertex -> Certificates -> Finalization -> Quantum Proof │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘Snapshot Types
L2 Depth Snapshot (Price Aggregated)
Aggregates orders by price level, suitable for market data feeds:
// OrderBookDepth represents order book depth
type OrderBookDepth struct {
Symbol string
Timestamp time.Time
Bids []PriceLevel
Asks []PriceLevel
Sequence uint64
LastUpdateID uint64
}
// PriceLevel represents a price level in the order book
type PriceLevel struct {
Price float64
Size float64
Count int
TotalSize float64
OrderCount int
Orders []*Order // Optional, for L3 data
}
// GetDepth returns the order book depth
func (ob *OrderBook) GetDepth(levels int) *OrderBookDepth {
ob.mu.RLock()
defer ob.mu.RUnlock()
return &OrderBookDepth{
Bids: ob.Bids.getLevels(levels),
Asks: ob.Asks.getLevels(levels),
Sequence: atomic.LoadUint64(&ob.LastOrderID),
Timestamp: time.Now(),
Symbol: ob.Symbol,
}
}
// getLevels aggregates orders by price level
func (tree *OrderTree) getLevels(depth int) []PriceLevel {
tree.mu.RLock()
defer tree.mu.RUnlock()
maxLevels := depth
if depth == 0 {
maxLevels = len(tree.priceLevels) // All levels
}
levels := make([]PriceLevel, 0, maxLevels)
prices := make([]float64, 0, len(tree.priceLevels))
// Collect non-empty price levels
for _, level := range tree.priceLevels {
if level.OrderCount > 0 {
prices = append(prices, level.Price)
}
}
// Sort prices (descending for bids, ascending for asks)
if tree.side == Buy {
sort.Sort(sort.Reverse(sort.Float64Slice(prices)))
} else {
sort.Float64s(prices)
}
// Build levels
for i, price := range prices {
if depth > 0 && i >= depth {
break
}
priceInt := PriceInt(price * PriceMultiplier)
if level, exists := tree.priceLevels[priceInt]; exists {
levels = append(levels, PriceLevel{
Price: level.Price,
Size: level.TotalSize,
Count: level.OrderCount,
})
}
}
return levels
}L3 Order-Level Snapshot
Individual order visibility for full transparency:
// OrderLevel represents an individual order at a price level
type OrderLevel struct {
Price float64
Size float64
OrderID uint64
UserID string
ClientID string
}
// OrderBookSnapshot represents the full order book state
type OrderBookSnapshot struct {
Symbol string
Timestamp time.Time
Bids []OrderLevel
Asks []OrderLevel
Sequence uint64
}
// GetOrderBookSnapshot returns full order-level book data
func (ob *OrderBook) GetOrderBookSnapshot() OrderBookSnapshot {
ob.mu.RLock()
defer ob.mu.RUnlock()
snapshot := OrderBookSnapshot{
Symbol: ob.Symbol,
Timestamp: time.Now(),
Sequence: atomic.LoadUint64(&ob.Bids.sequence),
}
// Get all bid orders
snapshot.Bids = ob.Bids.getOrderLevels()
snapshot.Asks = ob.Asks.getOrderLevels()
return snapshot
}
func (tree *OrderTree) getOrderLevels() []OrderLevel {
tree.mu.RLock()
defer tree.mu.RUnlock()
levels := make([]OrderLevel, 0)
// Get all prices and sort
prices := make([]float64, 0, len(tree.priceLevels))
for _, level := range tree.priceLevels {
if level.OrderCount > 0 {
prices = append(prices, level.Price)
}
}
if tree.side == Buy {
sort.Sort(sort.Reverse(sort.Float64Slice(prices)))
} else {
sort.Float64s(prices)
}
// Build L3 levels with individual orders
for _, price := range prices {
priceInt := PriceInt(price * PriceMultiplier)
if level, exists := tree.priceLevels[priceInt]; exists {
level.mu.RLock()
for _, order := range level.Orders {
if order.Status == Open || order.Status == PartiallyFilled {
remainingSize := order.Size - order.Filled
if order.RemainingSize > 0 {
remainingSize = order.RemainingSize
}
levels = append(levels, OrderLevel{
Price: order.Price,
Size: remainingSize,
OrderID: order.ID,
UserID: order.User,
ClientID: order.ClientID,
})
}
}
level.mu.RUnlock()
}
}
return levels
}Pointer-Based Snapshot
Fast snapshot using atomic pointer swap:
// GetSnapshot returns orderbook snapshot
func (ob *OrderBook) GetSnapshot() *OrderBookSnapshot {
ob.mu.RLock()
defer ob.mu.RUnlock()
// Convert price levels to order levels
bidLevels := ob.Bids.getLevels(10)
askLevels := ob.Asks.getLevels(10)
bids := make([]OrderLevel, len(bidLevels))
for i, level := range bidLevels {
bids[i] = OrderLevel{
Price: level.Price,
Size: level.Size,
}
}
asks := make([]OrderLevel, len(askLevels))
for i, level := range askLevels {
asks[i] = OrderLevel{
Price: level.Price,
Size: level.Size,
}
}
return &OrderBookSnapshot{
Symbol: ob.Symbol,
Timestamp: time.Now(),
Bids: bids,
Asks: asks,
Sequence: atomic.LoadUint64(&ob.Bids.sequence),
}
}Trade History Management
Circular Trade Buffer
Efficient fixed-size buffer for recent trade history:
// CircularTradeBuffer for efficient trade history
type CircularTradeBuffer struct {
buffer []Trade
head uint64
tail uint64
size uint64
capacity uint64
mu sync.RWMutex
}
// Add is O(1), no allocations after initialization
func (ctb *CircularTradeBuffer) Add(trade Trade) {
ctb.mu.Lock()
defer ctb.mu.Unlock()
ctb.buffer[ctb.tail] = trade
ctb.tail = (ctb.tail + 1) % uint64(len(ctb.buffer))
if ctb.size < uint64(len(ctb.buffer)) {
ctb.size++
} else {
ctb.head = (ctb.head + 1) % uint64(len(ctb.buffer))
}
}
// GetRecent retrieves the most recent trades
func (ctb *CircularTradeBuffer) GetRecent(count int) []Trade {
ctb.mu.RLock()
defer ctb.mu.RUnlock()
if count > int(ctb.size) {
count = int(ctb.size)
}
trades := make([]Trade, count)
idx := (ctb.tail - uint64(count) + uint64(len(ctb.buffer))) % uint64(len(ctb.buffer))
for i := 0; i < count; i++ {
trades[i] = ctb.buffer[idx]
idx = (idx + 1) % uint64(len(ctb.buffer))
}
return trades
}Slice-Based History (Legacy)
For compatibility with existing code:
// GetTrades returns all trades that have been executed
func (ob *OrderBook) GetTrades() []Trade {
ob.mu.RLock()
defer ob.mu.RUnlock()
trades := make([]Trade, 0, len(ob.Trades))
for _, trade := range ob.Trades {
trades = append(trades, trade)
}
return trades
}
// Automatic pruning during matching
if len(ob.Trades) > 100000 {
ob.Trades = ob.Trades[len(ob.Trades)-50000:]
}State Reset
Complete state reset for testing or market reset:
// Reset clears the order book
func (ob *OrderBook) Reset() {
ob.mu.Lock()
defer ob.mu.Unlock()
ob.Orders = make(map[uint64]*Order)
ob.UserOrders = make(map[string][]uint64)
ob.Trades = []Trade{}
ob.Bids = NewOrderTree(Buy)
ob.Asks = NewOrderTree(Sell)
atomic.StorePointer(&ob.bids, unsafe.Pointer(ob.Bids))
atomic.StorePointer(&ob.asks, unsafe.Pointer(ob.Asks))
ob.ordersMap = sync.Map{}
ob.userOrdersMap = sync.Map{}
ob.LastOrderID = 0
ob.LastTradeID = 0
ob.LastUpdateID = 0
}Extended Order Book State
Additional state for advanced features:
// ExtendedOrderBook adds advanced features to the basic order book
type ExtendedOrderBook struct {
*OrderBook
// Market data subscribers
subscribers []chan MarketDataUpdate
subscriberMutex sync.RWMutex
// Stop orders tracking
stopBuyOrders map[uint64]*Order
stopSellOrders map[uint64]*Order
// Iceberg orders tracking
icebergOrders map[uint64]*IcebergState
// Advanced features
EnableSelfTradePrevention bool
EnablePostOnly bool
EnableHiddenOrders bool
// Performance metrics
totalVolume float64
totalTrades uint64
lastUpdateID uint64
lastSnapshotTime time.Time
}
// IcebergState represents iceberg order state
type IcebergState struct {
TotalSize float64
DisplaySize float64
RemainingSize float64
RefillCount int
CurrentOrderID uint64
}
// GetStatistics returns order book statistics
func (book *ExtendedOrderBook) GetStatistics() map[string]interface{} {
book.mu.RLock()
defer book.mu.RUnlock()
stats := make(map[string]interface{})
// Basic stats
stats["symbol"] = book.Symbol
stats["total_orders"] = len(book.Orders)
stats["total_trades"] = book.totalTrades
stats["total_volume"] = book.totalVolume
stats["last_trade_id"] = book.LastTradeID
stats["last_update_id"] = book.lastUpdateID
// Order counts by type
stats["stop_orders"] = len(book.stopBuyOrders) + len(book.stopSellOrders)
stats["iceberg_orders"] = len(book.icebergOrders)
// Performance metrics
stats["subscribers"] = len(book.subscribers)
stats["last_snapshot_time"] = book.lastSnapshotTime
return stats
}DAG Consensus State
State management for distributed consensus:
// DAGOrderBook implements a DAG-based order book
type DAGOrderBook struct {
mu sync.RWMutex
nodeID string
symbol string
orderBook *lx.OrderBook
vertices map[ID]*OrderVertex
edges map[ID][]ID
frontier []ID
accepted map[ID]bool
height uint64
shutdown chan struct{}
consensusTime time.Duration
}
// LuxDAGOrderBook extends DAGOrderBook with Lux consensus
type LuxDAGOrderBook struct {
*DAGOrderBook
luxConfig LuxConsensusConfig
blsKey *SecretKey
ringtail *RingtailEngine
quasar *Quasar
votes map[ID]*VoteState
precomputed map[ID][]byte
certificates map[ID]*QuantumCertificate
finalized map[ID]bool
voteThreshold float64
finalityCount atomic.Uint64
round int
voteCache map[ID]map[string]bool
quantumReady bool
quantumProofs map[ID]*QuantumCertificate
}
// OrderVertex represents a vertex in the DAG
type OrderVertex struct {
ID ID
Order *lx.Order
NodeID string
Height uint64
Parents []ID
Timestamp time.Time
Trades []*lx.Trade
}
// GetStats returns Lux Consensus DAG statistics
func (lux *LuxDAGOrderBook) GetStats() map[string]interface{} {
lux.mu.RLock()
defer lux.mu.RUnlock()
return map[string]interface{}{
"node_id": lux.nodeID,
"vertices": len(lux.vertices),
"finalized": len(lux.finalized),
"frontier_size": len(lux.frontier),
"total_trades": uint64(0),
"quantum_finality": lux.finalityCount.Load(),
"certificates": len(lux.certificates),
"vote_threshold": lux.voteThreshold,
"quasar_certs": lux.quasar.CertificateCount(),
"quasar_skip_certs": lux.quasar.SkipCertificateCount(),
"peers": 0,
"fpc_enabled": lux.luxConfig.Enable,
}
}State Serialization
For persistence and network transfer:
import "encoding/json"
// Serialize order book state to JSON
func (ob *OrderBook) MarshalJSON() ([]byte, error) {
ob.mu.RLock()
defer ob.mu.RUnlock()
state := struct {
Symbol string `json:"symbol"`
LastOrderID uint64 `json:"last_order_id"`
LastTradeID uint64 `json:"last_trade_id"`
Orders map[uint64]*Order `json:"orders"`
Bids []PriceLevel `json:"bids"`
Asks []PriceLevel `json:"asks"`
Trades []Trade `json:"trades"`
Timestamp time.Time `json:"timestamp"`
}{
Symbol: ob.Symbol,
LastOrderID: ob.LastOrderID,
LastTradeID: ob.LastTradeID,
Orders: ob.Orders,
Bids: ob.Bids.getLevels(0),
Asks: ob.Asks.getLevels(0),
Trades: ob.Trades,
Timestamp: time.Now(),
}
return json.Marshal(state)
}
// Restore from serialized state
func (ob *OrderBook) UnmarshalJSON(data []byte) error {
ob.mu.Lock()
defer ob.mu.Unlock()
var state struct {
Symbol string `json:"symbol"`
LastOrderID uint64 `json:"last_order_id"`
LastTradeID uint64 `json:"last_trade_id"`
Orders map[uint64]*Order `json:"orders"`
Trades []Trade `json:"trades"`
}
if err := json.Unmarshal(data, &state); err != nil {
return err
}
ob.Symbol = state.Symbol
ob.LastOrderID = state.LastOrderID
ob.LastTradeID = state.LastTradeID
ob.Orders = state.Orders
ob.Trades = state.Trades
// Rebuild order trees from orders
ob.Bids = NewOrderTree(Buy)
ob.Asks = NewOrderTree(Sell)
for _, order := range ob.Orders {
if order.Side == Buy {
ob.Bids.addOrder(order)
} else {
ob.Asks.addOrder(order)
}
}
atomic.StorePointer(&ob.bids, unsafe.Pointer(ob.Bids))
atomic.StorePointer(&ob.asks, unsafe.Pointer(ob.Asks))
return nil
}Consistency Guarantees
Sequence Numbers
Track state changes for incremental updates:
type OrderBook struct {
// ...
LastUpdateID uint64 // Increments on every state change
OrderSeq uint64 // Sequence for ordering
}
// After any modification
atomic.AddUint64(&ob.LastUpdateID, 1)Atomic Operations
Lock-free reads for hot paths:
// Read best price without lock
func (ob *OrderBook) GetBestBid() float64 {
bids := ob.GetBids()
if bids == nil {
return 0
}
bestPriceInt := bids.bestPrice.Load() // Atomic
if bestPriceInt == 0 {
return 0
}
return float64(bestPriceInt) / PriceMultiplier
}
// Read order without lock
func (ob *OrderBook) GetOrder(orderID uint64) *Order {
// Try lock-free map first
if val, exists := ob.ordersMap.Load(orderID); exists {
if order, ok := val.(*Order); ok {
return order
}
}
// Fallback to locked map
ob.mu.RLock()
defer ob.mu.RUnlock()
return ob.Orders[orderID]
}Recovery Procedures
From Snapshot
func RecoverFromSnapshot(snapshot *OrderBookSnapshot) (*OrderBook, error) {
ob := NewOrderBook(snapshot.Symbol)
// Restore each order
for _, level := range snapshot.Bids {
order := &Order{
ID: level.OrderID,
Side: Buy,
Price: level.Price,
Size: level.Size,
User: level.UserID,
}
ob.AddOrder(order)
}
for _, level := range snapshot.Asks {
order := &Order{
ID: level.OrderID,
Side: Sell,
Price: level.Price,
Size: level.Size,
User: level.UserID,
}
ob.AddOrder(order)
}
return ob, nil
}From Event Log
func RecoverFromEvents(symbol string, events []Event) (*OrderBook, error) {
ob := NewOrderBook(symbol)
for _, event := range events {
switch event.Type {
case "order_added":
order := event.Data.(*Order)
ob.AddOrder(order)
case "order_canceled":
orderID := event.Data.(uint64)
ob.CancelOrder(orderID)
case "order_modified":
mod := event.Data.(OrderModification)
ob.ModifyOrder(mod.OrderID, mod.NewPrice, mod.NewSize)
}
}
return ob, nil
}Performance Considerations
| Operation | Latency | Notes |
|---|---|---|
| L2 Depth (10 levels) | ~2 us | Sorted aggregation |
| L3 Full Snapshot | ~50 us | All orders, sorted |
| Trade Buffer Add | ~50 ns | O(1) circular buffer |
| State Reset | ~100 us | Map recreation |
| JSON Serialize | ~1 ms | Depends on size |
Related Documentation
- Engine Overview - High-level architecture
- Order Book Internals - Data structures
- Matching Engine - Trade execution
- Event System - Real-time updates