Cluster with Client Example
Learn how to manage a dqlite cluster remotely using the client API, enabling external applications to interact with the cluster without embedding nodes.
Overview
This example demonstrates:
- Creating a dqlite cluster
- Connecting with the client API
- Remote database operations
- Client-side load balancing
- Leader discovery
Perfect for: Building applications that need to interact with an external dqlite cluster.
Prerequisites
- Python 3.12 or higher
uvpackage manager installed- dqlitepy installed in your project
- Understanding of clustering (see Multi-Node Cluster)
Quick Start
Run the example with one command:
cd examples/cluster_with_client
./quickstart.sh
The script will:
- Install dqlitepy from the repository
- Create a 3-node cluster
- Connect with the client API
- Perform remote operations
Manual Installation
To run manually:
cd examples/cluster_with_client
uv sync
uv run python -m cluster_with_client_example.main
Architecture
Code Walkthrough
Setting Up the Cluster
First, create a cluster that accepts client connections:
import tempfile
from dqlitepy import Node
# Create and configure nodes
data_dirs = [tempfile.mkdtemp(prefix=f"dqlite_node{i}_") for i in range(1, 4)]
nodes = []
for i in range(1, 4):
node = Node(
node_id=i,
address=f"127.0.0.1:900{i}",
data_dir=data_dirs[i-1]
)
node.set_bind_address(f"127.0.0.1:900{i}")
nodes.append(node)
# Set cluster configuration
cluster_config = [
{"id": 1, "address": "127.0.0.1:9001"},
{"id": 2, "address": "127.0.0.1:9002"},
{"id": 3, "address": "127.0.0.1:9003"}
]
for node in nodes:
node.set_cluster(cluster_config)
Key Points:
- Nodes must be running before client connection
- All nodes should be in the cluster configuration
- Client will discover the leader automatically
Connecting with the Client
from dqlitepy import Client
# Create client instance
client = Client(
cluster=[
"127.0.0.1:9001",
"127.0.0.1:9002",
"127.0.0.1:9003"
]
)
# Connect to the cluster
conn = client.connect(database="app.db")
cursor = conn.cursor()
Key Points:
- Provide all cluster node addresses
- Client automatically finds the leader
- Connection works like a standard DB-API connection
Creating and Managing Data
# Create a table
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
customer TEXT NOT NULL,
amount REAL NOT NULL,
status TEXT DEFAULT 'pending'
)
""")
# Insert data
orders = [
(1, "customer@example.com", 199.99, "pending"),
(2, "user@example.com", 49.99, "shipped"),
(3, "client@example.com", 299.99, "pending")
]
cursor.executemany(
"INSERT INTO orders (id, customer, amount, status) VALUES (?, ?, ?, ?)",
orders
)
conn.commit()
print(f"Inserted {cursor.rowcount} orders")
Key Points:
- Standard SQL operations work transparently
- Writes are automatically routed to the leader
- Client handles retries on leader changes
Querying Data
# Query orders
cursor.execute("""
SELECT id, customer, amount, status
FROM orders
WHERE status = ?
ORDER BY amount DESC
""", ("pending",))
results = cursor.fetchall()
print("\nPending Orders:")
for order_id, customer, amount, status in results:
print(f" Order {order_id}: {customer} - ${amount}")
Key Points:
- Reads can be served by any node
- Use parameterized queries for safety
- Results are returned immediately
Handling Leader Changes
import time
# Simulate leader failure
print("\nSimulating leader failure...")
nodes[0].close() # Shut down node 1 (leader)
time.sleep(2) # Wait for new election
# Client automatically reconnects to new leader
cursor.execute(
"INSERT INTO orders (id, customer, amount, status) VALUES (?, ?, ?, ?)",
(4, "new@example.com", 399.99, "pending")
)
conn.commit()
print("Successfully wrote to new leader!")
Key Points:
- Client transparently handles leader changes
- Operations may experience brief delay during election
- No application code changes needed
Client Load Balancing
# The client provides automatic load balancing for reads
cursor.execute("SELECT COUNT(*) FROM orders")
total = cursor.fetchone()[0]
print(f"\nTotal orders: {total}")
# Each query may be served by a different follower
cursor.execute("SELECT AVG(amount) FROM orders")
avg = cursor.fetchone()[0]
print(f"Average order: ${avg:.2f}")
Key Points:
- Client distributes read queries across followers
- Improves read scalability
- Reduces load on leader
Expected Output
Creating 3-node cluster...
Nodes initialized and cluster formed.
Connecting client to cluster...
Client connected to cluster at 127.0.0.1:9001 (leader)
Creating table and inserting data...
Inserted 3 orders
Pending Orders:
Order 3: client@example.com - $299.99
Order 1: customer@example.com - $199.99
Simulating leader failure...
Node 1 shut down.
Waiting for leader election...
Client reconnected to new leader at 127.0.0.1:9002
Successfully wrote to new leader!
Total orders: 4
Average order: $237.49
Example completed successfully!
Client Configuration Options
Connection Timeout
client = Client(
cluster=["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"],
timeout=10.0 # seconds
)
Retry Configuration
# Client automatically retries failed operations
# during leader changes with exponential backoff
conn = client.connect(
database="app.db",
max_retries=5
)
Leader Discovery
# Get current leader information
leader_address = client.get_leader()
print(f"Current leader: {leader_address}")
Common Issues
Connection Refused
Ensure all nodes are running:
# Check if nodes are listening
import socket
for port in [9001, 9002, 9003]:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
if result == 0:
print(f"Port {port}: Open")
else:
print(f"Port {port}: Closed")
sock.close()
No Leader Found
Wait for leader election to complete:
import time
max_wait = 10
waited = 0
while waited < max_wait:
try:
conn = client.connect(database="app.db")
break
except Exception as e:
print(f"Waiting for leader... ({e})")
time.sleep(1)
waited += 1
Client Not Updating Leader
If the client can't find the new leader after a change:
# Reconnect with fresh client instance
client.close()
client = Client(cluster=["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"])
Best Practices
Connection Pooling
For production applications:
class ClientPool:
def __init__(self, cluster, pool_size=5):
self.cluster = cluster
self.pool = [Client(cluster=cluster) for _ in range(pool_size)]
self.index = 0
def get_connection(self, database):
client = self.pool[self.index]
self.index = (self.index + 1) % len(self.pool)
return client.connect(database=database)
Error Handling
from dqlitepy import ClientError
try:
cursor.execute("INSERT INTO orders ...")
conn.commit()
except ClientError as e:
print(f"Client error: {e}")
conn.rollback()
Resource Cleanup
try:
# Your operations
pass
finally:
cursor.close()
conn.close()
client.close()
Source Code
The complete source code is available at:
Next Steps
After mastering the client API, try:
- SQLAlchemy ORM - Use an ORM with the client
- FastAPI Integration - Build a REST API
- Clustering Guide - Advanced cluster management