Python SDK

Client

Client and AsyncClient - synchronous and asynchronous clients for LX

Client Classes

The Python SDK provides two client classes for different use cases:

  • Client - Synchronous client for scripts and simple applications
  • AsyncClient - Asynchronous client for high-performance and concurrent applications

Client (Synchronous)

The synchronous client is suitable for scripts, CLI tools, and applications that do not require concurrent operations.

Initialization

from lux_dex import Client

# Basic initialization
client = Client(
    json_rpc_url="http://localhost:8080/rpc"
)

# Full configuration
client = Client(
    json_rpc_url="http://localhost:8080/rpc",
    ws_url="ws://localhost:8081",
    api_key="your-api-key",
    api_secret="your-api-secret",
    timeout=30.0,
    max_retries=3
)

# From environment variables
client = Client.from_env()

Class Definition

from dataclasses import dataclass
from typing import Optional

@dataclass
class Client:
    """Synchronous LX client.

    Attributes:
        json_rpc_url: JSON-RPC endpoint URL
        ws_url: WebSocket endpoint URL (optional)
        api_key: API key for authentication (optional)
        api_secret: API secret for signing requests (optional)
        timeout: Request timeout in seconds (default: 30.0)
        max_retries: Maximum number of retry attempts (default: 3)
    """

    json_rpc_url: str
    ws_url: Optional[str] = None
    api_key: Optional[str] = None
    api_secret: Optional[str] = None
    timeout: float = 30.0
    max_retries: int = 3

Core Methods

from lux_dex import Client
from lux_dex.types import NodeInfo, Order, OrderBook, Trade

client = Client("http://localhost:8080/rpc")

# Node information
def get_info(self) -> NodeInfo:
    """Get node information and status."""
    ...

def ping(self) -> bool:
    """Test connectivity to the node."""
    ...

# Order management
def place_order(
    self,
    symbol: str,
    order_type: OrderType | str,
    side: Side | str,
    price: float,
    size: float,
    user_id: str | None = None,
    client_order_id: str | None = None,
    time_in_force: TimeInForce = TimeInForce.GTC
) -> Order:
    """Place a new order."""
    ...

def cancel_order(self, order_id: str) -> Order:
    """Cancel an existing order."""
    ...

def get_order(self, order_id: str) -> Order:
    """Get order details by ID."""
    ...

def get_orders(
    self,
    symbol: str | None = None,
    status: OrderStatus | None = None,
    limit: int = 100
) -> list[Order]:
    """Get list of orders with optional filters."""
    ...

# Market data
def get_order_book(
    self,
    symbol: str,
    depth: int = 10
) -> OrderBook:
    """Get order book snapshot."""
    ...

def get_best_bid(self, symbol: str) -> OrderBookLevel:
    """Get best bid price and size."""
    ...

def get_best_ask(self, symbol: str) -> OrderBookLevel:
    """Get best ask price and size."""
    ...

def get_trades(
    self,
    symbol: str,
    limit: int = 100
) -> list[Trade]:
    """Get recent trades."""
    ...

Usage Examples

from lux_dex import Client, OrderType, Side

client = Client("http://localhost:8080/rpc")

# Test connectivity
if client.ping():
    print("Connected to LX")

# Get node info
info = client.get_info()
print(f"Node ID: {info.node_id}")
print(f"Version: {info.version}")
print(f"Block height: {info.block_height}")

# Place a limit order
order = client.place_order(
    symbol="BTC-USD",
    order_type=OrderType.LIMIT,
    side=Side.BUY,
    price=50000.0,
    size=1.0,
    user_id="trader1"
)
print(f"Order placed: {order.order_id}")

# Check order status
order = client.get_order(order.order_id)
print(f"Status: {order.status}")
print(f"Filled: {order.filled_size}/{order.size}")

# Cancel order
cancelled = client.cancel_order(order.order_id)
print(f"Order cancelled: {cancelled.status}")

AsyncClient (Asynchronous)

The asynchronous client is designed for high-performance applications, web servers, and scenarios requiring concurrent operations.

Initialization

import asyncio
from lux_dex import AsyncClient

# Basic initialization
async def main():
    client = AsyncClient(
        json_rpc_url="http://localhost:8080/rpc"
    )

    # Use client...

    await client.close()

asyncio.run(main())
import asyncio
from lux_dex import AsyncClient

async def main():
    async with AsyncClient("http://localhost:8080/rpc") as client:
        info = await client.get_info()
        print(f"Connected to: {info.node_id}")

asyncio.run(main())

Class Definition

from typing import AsyncIterator

class AsyncClient:
    """Asynchronous LX client.

    Supports context manager protocol for automatic resource cleanup.
    All methods are coroutines that must be awaited.
    """

    def __init__(
        self,
        json_rpc_url: str,
        ws_url: str | None = None,
        api_key: str | None = None,
        api_secret: str | None = None,
        timeout: float = 30.0,
        max_retries: int = 3
    ) -> None:
        ...

    async def __aenter__(self) -> "AsyncClient":
        """Enter async context manager."""
        ...

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit async context manager, close connections."""
        ...

    async def close(self) -> None:
        """Close all connections and cleanup resources."""
        ...

Core Methods

from lux_dex import AsyncClient
from lux_dex.types import NodeInfo, Order, OrderBook, Trade

# All methods are coroutines
async def get_info(self) -> NodeInfo:
    """Get node information and status."""
    ...

async def ping(self) -> bool:
    """Test connectivity to the node."""
    ...

async def place_order(
    self,
    symbol: str,
    order_type: OrderType | str,
    side: Side | str,
    price: float,
    size: float,
    user_id: str | None = None,
    client_order_id: str | None = None,
    time_in_force: TimeInForce = TimeInForce.GTC
) -> Order:
    """Place a new order."""
    ...

async def cancel_order(self, order_id: str) -> Order:
    """Cancel an existing order."""
    ...

async def get_order(self, order_id: str) -> Order:
    """Get order details by ID."""
    ...

async def get_order_book(
    self,
    symbol: str,
    depth: int = 10
) -> OrderBook:
    """Get order book snapshot."""
    ...

async def get_trades(
    self,
    symbol: str,
    limit: int = 100
) -> list[Trade]:
    """Get recent trades."""
    ...

# Streaming methods return async iterators
async def stream_order_book(
    self,
    symbol: str,
    depth: int = 10
) -> AsyncIterator[OrderBook]:
    """Stream order book updates."""
    ...

async def stream_trades(
    self,
    symbol: str
) -> AsyncIterator[Trade]:
    """Stream trade executions."""
    ...

Usage Examples

import asyncio
from lux_dex import AsyncClient, OrderType, Side

async def main():
    async with AsyncClient("http://localhost:8080/rpc") as client:
        # Test connectivity
        if await client.ping():
            print("Connected to LX")

        # Get node info
        info = await client.get_info()
        print(f"Node: {info.node_id}, Height: {info.block_height}")

        # Place order
        order = await client.place_order(
            symbol="BTC-USD",
            order_type=OrderType.LIMIT,
            side=Side.BUY,
            price=50000.0,
            size=1.0
        )
        print(f"Order placed: {order.order_id}")

        # Cancel order
        cancelled = await client.cancel_order(order.order_id)
        print(f"Order cancelled: {cancelled.status}")

asyncio.run(main())

Concurrent Operations

import asyncio
from lux_dex import AsyncClient

async def main():
    async with AsyncClient("http://localhost:8080/rpc") as client:
        # Execute multiple requests concurrently
        results = await asyncio.gather(
            client.get_order_book("BTC-USD"),
            client.get_order_book("ETH-USD"),
            client.get_order_book("SOL-USD"),
            client.get_trades("BTC-USD", limit=50)
        )

        btc_book, eth_book, sol_book, btc_trades = results

        print(f"BTC spread: {btc_book.spread}")
        print(f"ETH spread: {eth_book.spread}")
        print(f"SOL spread: {sol_book.spread}")
        print(f"BTC recent trades: {len(btc_trades)}")

asyncio.run(main())

Configuration Options

ClientConfig

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RetryConfig:
    """Retry configuration for failed requests."""
    max_retries: int = 3
    backoff_base: float = 1.0
    backoff_max: float = 30.0
    retryable_status_codes: set[int] = field(
        default_factory=lambda: {429, 500, 502, 503, 504}
    )

@dataclass
class ClientConfig:
    """Complete client configuration."""
    json_rpc_url: str
    ws_url: Optional[str] = None
    api_key: Optional[str] = None
    api_secret: Optional[str] = None
    timeout: float = 30.0
    retry: RetryConfig = field(default_factory=RetryConfig)
    user_agent: str = "lux-dex-python/1.0.0"

Using ClientConfig

from lux_dex import Client
from lux_dex.config import ClientConfig, RetryConfig

config = ClientConfig(
    json_rpc_url="https://api.lux.network/rpc",
    ws_url="wss://api.lux.network/ws",
    api_key="your-api-key",
    api_secret="your-api-secret",
    timeout=60.0,
    retry=RetryConfig(
        max_retries=5,
        backoff_base=2.0,
        backoff_max=60.0
    )
)

client = Client(config=config)

Authentication

API Key Authentication

from lux_dex import Client

client = Client(
    json_rpc_url="https://api.lux.network/rpc",
    api_key="your-api-key",
    api_secret="your-api-secret"
)

# All requests are automatically signed
order = client.place_order(
    symbol="BTC-USD",
    order_type="limit",
    side="buy",
    price=50000.0,
    size=1.0
)

JWT Authentication

from lux_dex import Client

client = Client(
    json_rpc_url="https://api.lux.network/rpc"
)

# Set JWT token
client.set_jwt_token("eyJhbGciOiJIUzI1NiIs...")

# Or initialize with token
client = Client(
    json_rpc_url="https://api.lux.network/rpc",
    jwt_token="eyJhbGciOiJIUzI1NiIs..."
)

Request Signing

import hashlib
import hmac
import time

def sign_request(
    method: str,
    params: dict,
    api_secret: str
) -> str:
    """Generate HMAC-SHA256 signature for request."""
    timestamp = int(time.time() * 1000)
    payload = f"{timestamp}{method}{json.dumps(params, sort_keys=True)}"
    signature = hmac.new(
        api_secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()
    return signature

Error Handling

from lux_dex import Client
from lux_dex.exceptions import (
    LXDexError,
    ConnectionError,
    TimeoutError,
    AuthenticationError,
    InsufficientBalanceError,
    OrderNotFoundError,
    RateLimitError,
    ValidationError
)

client = Client("http://localhost:8080/rpc")

try:
    order = client.place_order(
        symbol="BTC-USD",
        order_type="limit",
        side="buy",
        price=50000.0,
        size=100.0
    )
except ConnectionError as e:
    print(f"Failed to connect: {e}")
except TimeoutError as e:
    print(f"Request timed out: {e}")
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except InsufficientBalanceError as e:
    print(f"Insufficient balance: need {e.required}, have {e.available}")
except OrderNotFoundError as e:
    print(f"Order not found: {e.order_id}")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
    print(f"Validation error: {e.field} - {e.message}")
except LXDexError as e:
    print(f"DEX error [{e.code}]: {e.message}")

Logging

import logging
from lux_dex import Client

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("lux_dex")
logger.setLevel(logging.DEBUG)

# Client automatically logs requests and responses
client = Client("http://localhost:8080/rpc")

# Or configure custom logger
client = Client(
    json_rpc_url="http://localhost:8080/rpc",
    logger=logger
)

Thread Safety

Synchronous Client

The synchronous client is thread-safe. You can share a single client instance across multiple threads:

import threading
from lux_dex import Client

client = Client("http://localhost:8080/rpc")

def place_order_thread(symbol: str, price: float):
    order = client.place_order(
        symbol=symbol,
        order_type="limit",
        side="buy",
        price=price,
        size=1.0
    )
    print(f"Thread {threading.current_thread().name}: {order.order_id}")

threads = [
    threading.Thread(target=place_order_thread, args=("BTC-USD", 50000 + i * 100))
    for i in range(10)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Asynchronous Client

The async client is designed for single-threaded async code. Use asyncio.gather for concurrent operations:

import asyncio
from lux_dex import AsyncClient

async def main():
    async with AsyncClient("http://localhost:8080/rpc") as client:
        # Concurrent operations in same event loop
        tasks = [
            client.place_order("BTC-USD", "limit", "buy", 50000 + i * 100, 1.0)
            for i in range(10)
        ]
        orders = await asyncio.gather(*tasks)
        for order in orders:
            print(f"Order: {order.order_id}")

asyncio.run(main())

Connection Pooling

Both clients use connection pooling internally for efficient resource usage:

from lux_dex import Client
from lux_dex.config import ClientConfig

config = ClientConfig(
    json_rpc_url="http://localhost:8080/rpc",
    # Connection pool settings
    pool_connections=10,
    pool_maxsize=100,
    pool_block=True
)

client = Client(config=config)

Next Steps