Skip to main content

Cluster with Client Example

Learn how to manage a dqlite cluster remotely using the client API, enabling external applications to interact with the cluster without embedding nodes.

Overview

This example demonstrates:

  • Creating a dqlite cluster
  • Connecting with the client API
  • Remote database operations
  • Client-side load balancing
  • Leader discovery

Perfect for: Building applications that need to interact with an external dqlite cluster.

Prerequisites

  • Python 3.12 or higher
  • uv package manager installed
  • dqlitepy installed in your project
  • Understanding of clustering (see Multi-Node Cluster)

Quick Start

Run the example with one command:

cd examples/cluster_with_client
./quickstart.sh

The script will:

  1. Install dqlitepy from the repository
  2. Create a 3-node cluster
  3. Connect with the client API
  4. Perform remote operations

Manual Installation

To run manually:

cd examples/cluster_with_client
uv sync
uv run python -m cluster_with_client_example.main

Architecture

Code Walkthrough

Setting Up the Cluster

First, create a cluster that accepts client connections:

import tempfile
from dqlitepy import Node

# Create and configure nodes
data_dirs = [tempfile.mkdtemp(prefix=f"dqlite_node{i}_") for i in range(1, 4)]
nodes = []

for i in range(1, 4):
node = Node(
node_id=i,
address=f"127.0.0.1:900{i}",
data_dir=data_dirs[i-1]
)
node.set_bind_address(f"127.0.0.1:900{i}")
nodes.append(node)

# Set cluster configuration
cluster_config = [
{"id": 1, "address": "127.0.0.1:9001"},
{"id": 2, "address": "127.0.0.1:9002"},
{"id": 3, "address": "127.0.0.1:9003"}
]

for node in nodes:
node.set_cluster(cluster_config)

Key Points:

  • Nodes must be running before client connection
  • All nodes should be in the cluster configuration
  • Client will discover the leader automatically

Connecting with the Client

from dqlitepy import Client

# Create client instance
client = Client(
cluster=[
"127.0.0.1:9001",
"127.0.0.1:9002",
"127.0.0.1:9003"
]
)

# Connect to the cluster
conn = client.connect(database="app.db")
cursor = conn.cursor()

Key Points:

  • Provide all cluster node addresses
  • Client automatically finds the leader
  • Connection works like a standard DB-API connection

Creating and Managing Data

# Create a table
cursor.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY,
customer TEXT NOT NULL,
amount REAL NOT NULL,
status TEXT DEFAULT 'pending'
)
""")

# Insert data
orders = [
(1, "customer@example.com", 199.99, "pending"),
(2, "user@example.com", 49.99, "shipped"),
(3, "client@example.com", 299.99, "pending")
]

cursor.executemany(
"INSERT INTO orders (id, customer, amount, status) VALUES (?, ?, ?, ?)",
orders
)
conn.commit()

print(f"Inserted {cursor.rowcount} orders")

Key Points:

  • Standard SQL operations work transparently
  • Writes are automatically routed to the leader
  • Client handles retries on leader changes

Querying Data

# Query orders
cursor.execute("""
SELECT id, customer, amount, status
FROM orders
WHERE status = ?
ORDER BY amount DESC
""", ("pending",))

results = cursor.fetchall()

print("\nPending Orders:")
for order_id, customer, amount, status in results:
print(f" Order {order_id}: {customer} - ${amount}")

Key Points:

  • Reads can be served by any node
  • Use parameterized queries for safety
  • Results are returned immediately

Handling Leader Changes

import time

# Simulate leader failure
print("\nSimulating leader failure...")
nodes[0].close() # Shut down node 1 (leader)

time.sleep(2) # Wait for new election

# Client automatically reconnects to new leader
cursor.execute(
"INSERT INTO orders (id, customer, amount, status) VALUES (?, ?, ?, ?)",
(4, "new@example.com", 399.99, "pending")
)
conn.commit()

print("Successfully wrote to new leader!")

Key Points:

  • Client transparently handles leader changes
  • Operations may experience brief delay during election
  • No application code changes needed

Client Load Balancing

# The client provides automatic load balancing for reads
cursor.execute("SELECT COUNT(*) FROM orders")
total = cursor.fetchone()[0]
print(f"\nTotal orders: {total}")

# Each query may be served by a different follower
cursor.execute("SELECT AVG(amount) FROM orders")
avg = cursor.fetchone()[0]
print(f"Average order: ${avg:.2f}")

Key Points:

  • Client distributes read queries across followers
  • Improves read scalability
  • Reduces load on leader

Expected Output

Creating 3-node cluster...
Nodes initialized and cluster formed.

Connecting client to cluster...
Client connected to cluster at 127.0.0.1:9001 (leader)

Creating table and inserting data...
Inserted 3 orders

Pending Orders:
Order 3: client@example.com - $299.99
Order 1: customer@example.com - $199.99

Simulating leader failure...
Node 1 shut down.
Waiting for leader election...

Client reconnected to new leader at 127.0.0.1:9002
Successfully wrote to new leader!

Total orders: 4
Average order: $237.49

Example completed successfully!

Client Configuration Options

Connection Timeout

client = Client(
cluster=["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"],
timeout=10.0 # seconds
)

Retry Configuration

# Client automatically retries failed operations
# during leader changes with exponential backoff
conn = client.connect(
database="app.db",
max_retries=5
)

Leader Discovery

# Get current leader information
leader_address = client.get_leader()
print(f"Current leader: {leader_address}")

Common Issues

Connection Refused

Ensure all nodes are running:

# Check if nodes are listening
import socket

for port in [9001, 9002, 9003]:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('127.0.0.1', port))
if result == 0:
print(f"Port {port}: Open")
else:
print(f"Port {port}: Closed")
sock.close()

No Leader Found

Wait for leader election to complete:

import time

max_wait = 10
waited = 0
while waited < max_wait:
try:
conn = client.connect(database="app.db")
break
except Exception as e:
print(f"Waiting for leader... ({e})")
time.sleep(1)
waited += 1

Client Not Updating Leader

If the client can't find the new leader after a change:

# Reconnect with fresh client instance
client.close()
client = Client(cluster=["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"])

Best Practices

Connection Pooling

For production applications:

class ClientPool:
def __init__(self, cluster, pool_size=5):
self.cluster = cluster
self.pool = [Client(cluster=cluster) for _ in range(pool_size)]
self.index = 0

def get_connection(self, database):
client = self.pool[self.index]
self.index = (self.index + 1) % len(self.pool)
return client.connect(database=database)

Error Handling

from dqlitepy import ClientError

try:
cursor.execute("INSERT INTO orders ...")
conn.commit()
except ClientError as e:
print(f"Client error: {e}")
conn.rollback()

Resource Cleanup

try:
# Your operations
pass
finally:
cursor.close()
conn.close()
client.close()

Source Code

The complete source code is available at:

Next Steps

After mastering the client API, try:

  1. SQLAlchemy ORM - Use an ORM with the client
  2. FastAPI Integration - Build a REST API
  3. Clustering Guide - Advanced cluster management