Python SDK
Client
Client and AsyncClient - synchronous and asynchronous clients for LX
Client Classes
The Python SDK provides two client classes for different use cases:
Client- Synchronous client for scripts and simple applicationsAsyncClient- Asynchronous client for high-performance and concurrent applications
Client (Synchronous)
The synchronous client is suitable for scripts, CLI tools, and applications that do not require concurrent operations.
Initialization
from lux_dex import Client
# Basic initialization
client = Client(
json_rpc_url="http://localhost:8080/rpc"
)
# Full configuration
client = Client(
json_rpc_url="http://localhost:8080/rpc",
ws_url="ws://localhost:8081",
api_key="your-api-key",
api_secret="your-api-secret",
timeout=30.0,
max_retries=3
)
# From environment variables
client = Client.from_env()Class Definition
from dataclasses import dataclass
from typing import Optional
@dataclass
class Client:
"""Synchronous LX client.
Attributes:
json_rpc_url: JSON-RPC endpoint URL
ws_url: WebSocket endpoint URL (optional)
api_key: API key for authentication (optional)
api_secret: API secret for signing requests (optional)
timeout: Request timeout in seconds (default: 30.0)
max_retries: Maximum number of retry attempts (default: 3)
"""
json_rpc_url: str
ws_url: Optional[str] = None
api_key: Optional[str] = None
api_secret: Optional[str] = None
timeout: float = 30.0
max_retries: int = 3Core Methods
from lux_dex import Client
from lux_dex.types import NodeInfo, Order, OrderBook, Trade
client = Client("http://localhost:8080/rpc")
# Node information
def get_info(self) -> NodeInfo:
"""Get node information and status."""
...
def ping(self) -> bool:
"""Test connectivity to the node."""
...
# Order management
def place_order(
self,
symbol: str,
order_type: OrderType | str,
side: Side | str,
price: float,
size: float,
user_id: str | None = None,
client_order_id: str | None = None,
time_in_force: TimeInForce = TimeInForce.GTC
) -> Order:
"""Place a new order."""
...
def cancel_order(self, order_id: str) -> Order:
"""Cancel an existing order."""
...
def get_order(self, order_id: str) -> Order:
"""Get order details by ID."""
...
def get_orders(
self,
symbol: str | None = None,
status: OrderStatus | None = None,
limit: int = 100
) -> list[Order]:
"""Get list of orders with optional filters."""
...
# Market data
def get_order_book(
self,
symbol: str,
depth: int = 10
) -> OrderBook:
"""Get order book snapshot."""
...
def get_best_bid(self, symbol: str) -> OrderBookLevel:
"""Get best bid price and size."""
...
def get_best_ask(self, symbol: str) -> OrderBookLevel:
"""Get best ask price and size."""
...
def get_trades(
self,
symbol: str,
limit: int = 100
) -> list[Trade]:
"""Get recent trades."""
...Usage Examples
from lux_dex import Client, OrderType, Side
client = Client("http://localhost:8080/rpc")
# Test connectivity
if client.ping():
print("Connected to LX")
# Get node info
info = client.get_info()
print(f"Node ID: {info.node_id}")
print(f"Version: {info.version}")
print(f"Block height: {info.block_height}")
# Place a limit order
order = client.place_order(
symbol="BTC-USD",
order_type=OrderType.LIMIT,
side=Side.BUY,
price=50000.0,
size=1.0,
user_id="trader1"
)
print(f"Order placed: {order.order_id}")
# Check order status
order = client.get_order(order.order_id)
print(f"Status: {order.status}")
print(f"Filled: {order.filled_size}/{order.size}")
# Cancel order
cancelled = client.cancel_order(order.order_id)
print(f"Order cancelled: {cancelled.status}")AsyncClient (Asynchronous)
The asynchronous client is designed for high-performance applications, web servers, and scenarios requiring concurrent operations.
Initialization
import asyncio
from lux_dex import AsyncClient
# Basic initialization
async def main():
client = AsyncClient(
json_rpc_url="http://localhost:8080/rpc"
)
# Use client...
await client.close()
asyncio.run(main())Context Manager (Recommended)
import asyncio
from lux_dex import AsyncClient
async def main():
async with AsyncClient("http://localhost:8080/rpc") as client:
info = await client.get_info()
print(f"Connected to: {info.node_id}")
asyncio.run(main())Class Definition
from typing import AsyncIterator
class AsyncClient:
"""Asynchronous LX client.
Supports context manager protocol for automatic resource cleanup.
All methods are coroutines that must be awaited.
"""
def __init__(
self,
json_rpc_url: str,
ws_url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
timeout: float = 30.0,
max_retries: int = 3
) -> None:
...
async def __aenter__(self) -> "AsyncClient":
"""Enter async context manager."""
...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context manager, close connections."""
...
async def close(self) -> None:
"""Close all connections and cleanup resources."""
...Core Methods
from lux_dex import AsyncClient
from lux_dex.types import NodeInfo, Order, OrderBook, Trade
# All methods are coroutines
async def get_info(self) -> NodeInfo:
"""Get node information and status."""
...
async def ping(self) -> bool:
"""Test connectivity to the node."""
...
async def place_order(
self,
symbol: str,
order_type: OrderType | str,
side: Side | str,
price: float,
size: float,
user_id: str | None = None,
client_order_id: str | None = None,
time_in_force: TimeInForce = TimeInForce.GTC
) -> Order:
"""Place a new order."""
...
async def cancel_order(self, order_id: str) -> Order:
"""Cancel an existing order."""
...
async def get_order(self, order_id: str) -> Order:
"""Get order details by ID."""
...
async def get_order_book(
self,
symbol: str,
depth: int = 10
) -> OrderBook:
"""Get order book snapshot."""
...
async def get_trades(
self,
symbol: str,
limit: int = 100
) -> list[Trade]:
"""Get recent trades."""
...
# Streaming methods return async iterators
async def stream_order_book(
self,
symbol: str,
depth: int = 10
) -> AsyncIterator[OrderBook]:
"""Stream order book updates."""
...
async def stream_trades(
self,
symbol: str
) -> AsyncIterator[Trade]:
"""Stream trade executions."""
...Usage Examples
import asyncio
from lux_dex import AsyncClient, OrderType, Side
async def main():
async with AsyncClient("http://localhost:8080/rpc") as client:
# Test connectivity
if await client.ping():
print("Connected to LX")
# Get node info
info = await client.get_info()
print(f"Node: {info.node_id}, Height: {info.block_height}")
# Place order
order = await client.place_order(
symbol="BTC-USD",
order_type=OrderType.LIMIT,
side=Side.BUY,
price=50000.0,
size=1.0
)
print(f"Order placed: {order.order_id}")
# Cancel order
cancelled = await client.cancel_order(order.order_id)
print(f"Order cancelled: {cancelled.status}")
asyncio.run(main())Concurrent Operations
import asyncio
from lux_dex import AsyncClient
async def main():
async with AsyncClient("http://localhost:8080/rpc") as client:
# Execute multiple requests concurrently
results = await asyncio.gather(
client.get_order_book("BTC-USD"),
client.get_order_book("ETH-USD"),
client.get_order_book("SOL-USD"),
client.get_trades("BTC-USD", limit=50)
)
btc_book, eth_book, sol_book, btc_trades = results
print(f"BTC spread: {btc_book.spread}")
print(f"ETH spread: {eth_book.spread}")
print(f"SOL spread: {sol_book.spread}")
print(f"BTC recent trades: {len(btc_trades)}")
asyncio.run(main())Configuration Options
ClientConfig
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RetryConfig:
"""Retry configuration for failed requests."""
max_retries: int = 3
backoff_base: float = 1.0
backoff_max: float = 30.0
retryable_status_codes: set[int] = field(
default_factory=lambda: {429, 500, 502, 503, 504}
)
@dataclass
class ClientConfig:
"""Complete client configuration."""
json_rpc_url: str
ws_url: Optional[str] = None
api_key: Optional[str] = None
api_secret: Optional[str] = None
timeout: float = 30.0
retry: RetryConfig = field(default_factory=RetryConfig)
user_agent: str = "lux-dex-python/1.0.0"Using ClientConfig
from lux_dex import Client
from lux_dex.config import ClientConfig, RetryConfig
config = ClientConfig(
json_rpc_url="https://api.lux.network/rpc",
ws_url="wss://api.lux.network/ws",
api_key="your-api-key",
api_secret="your-api-secret",
timeout=60.0,
retry=RetryConfig(
max_retries=5,
backoff_base=2.0,
backoff_max=60.0
)
)
client = Client(config=config)Authentication
API Key Authentication
from lux_dex import Client
client = Client(
json_rpc_url="https://api.lux.network/rpc",
api_key="your-api-key",
api_secret="your-api-secret"
)
# All requests are automatically signed
order = client.place_order(
symbol="BTC-USD",
order_type="limit",
side="buy",
price=50000.0,
size=1.0
)JWT Authentication
from lux_dex import Client
client = Client(
json_rpc_url="https://api.lux.network/rpc"
)
# Set JWT token
client.set_jwt_token("eyJhbGciOiJIUzI1NiIs...")
# Or initialize with token
client = Client(
json_rpc_url="https://api.lux.network/rpc",
jwt_token="eyJhbGciOiJIUzI1NiIs..."
)Request Signing
import hashlib
import hmac
import time
def sign_request(
method: str,
params: dict,
api_secret: str
) -> str:
"""Generate HMAC-SHA256 signature for request."""
timestamp = int(time.time() * 1000)
payload = f"{timestamp}{method}{json.dumps(params, sort_keys=True)}"
signature = hmac.new(
api_secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return signatureError Handling
from lux_dex import Client
from lux_dex.exceptions import (
LXDexError,
ConnectionError,
TimeoutError,
AuthenticationError,
InsufficientBalanceError,
OrderNotFoundError,
RateLimitError,
ValidationError
)
client = Client("http://localhost:8080/rpc")
try:
order = client.place_order(
symbol="BTC-USD",
order_type="limit",
side="buy",
price=50000.0,
size=100.0
)
except ConnectionError as e:
print(f"Failed to connect: {e}")
except TimeoutError as e:
print(f"Request timed out: {e}")
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except InsufficientBalanceError as e:
print(f"Insufficient balance: need {e.required}, have {e.available}")
except OrderNotFoundError as e:
print(f"Order not found: {e.order_id}")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
print(f"Validation error: {e.field} - {e.message}")
except LXDexError as e:
print(f"DEX error [{e.code}]: {e.message}")Logging
import logging
from lux_dex import Client
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("lux_dex")
logger.setLevel(logging.DEBUG)
# Client automatically logs requests and responses
client = Client("http://localhost:8080/rpc")
# Or configure custom logger
client = Client(
json_rpc_url="http://localhost:8080/rpc",
logger=logger
)Thread Safety
Synchronous Client
The synchronous client is thread-safe. You can share a single client instance across multiple threads:
import threading
from lux_dex import Client
client = Client("http://localhost:8080/rpc")
def place_order_thread(symbol: str, price: float):
order = client.place_order(
symbol=symbol,
order_type="limit",
side="buy",
price=price,
size=1.0
)
print(f"Thread {threading.current_thread().name}: {order.order_id}")
threads = [
threading.Thread(target=place_order_thread, args=("BTC-USD", 50000 + i * 100))
for i in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()Asynchronous Client
The async client is designed for single-threaded async code. Use asyncio.gather for concurrent operations:
import asyncio
from lux_dex import AsyncClient
async def main():
async with AsyncClient("http://localhost:8080/rpc") as client:
# Concurrent operations in same event loop
tasks = [
client.place_order("BTC-USD", "limit", "buy", 50000 + i * 100, 1.0)
for i in range(10)
]
orders = await asyncio.gather(*tasks)
for order in orders:
print(f"Order: {order.order_id}")
asyncio.run(main())Connection Pooling
Both clients use connection pooling internally for efficient resource usage:
from lux_dex import Client
from lux_dex.config import ClientConfig
config = ClientConfig(
json_rpc_url="http://localhost:8080/rpc",
# Connection pool settings
pool_connections=10,
pool_maxsize=100,
pool_block=True
)
client = Client(config=config)Next Steps
- Orders - Detailed order operations
- Order Book - Market data and subscriptions
- WebSocket - Real-time streaming
- Async Patterns - Advanced async usage