Skip to main content

SQLAlchemy Integration

dqlitepy provides full SQLAlchemy support through a custom dialect and DB-API 2.0 compliant interface.

Overview

The SQLAlchemy integration allows you to:

  • Use SQLAlchemy ORM with distributed SQLite
  • Define models with declarative syntax
  • Perform CRUD operations with automatic replication
  • Use relationships and foreign keys
  • Leverage SQLAlchemy's query builder

Quick Start

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, Session
from dqlitepy import Node
from dqlitepy.sqlalchemy import register_dqlite_node

# Define models
Base = declarative_base()

class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)

# Start dqlite node
node = Node("127.0.0.1:9001", "/tmp/dqlite")
node.start()

# Register node with SQLAlchemy
register_dqlite_node(node, node_name='default')

# Create engine and tables
engine = create_engine('dqlite:///myapp.db')
Base.metadata.create_all(engine)

# Use ORM
with Session(engine) as session:
user = User(name='Alice', email='alice@example.com')
session.add(user)
session.commit()

# Query
users = session.query(User).filter(User.name == 'Alice').all()
for user in users:
print(f"{user.name}: {user.email}")

Connection URLs

dqlitepy uses custom connection URLs:

# Basic format
engine = create_engine('dqlite:///database.db')

# With named node (if you have multiple nodes)
engine = create_engine('dqlite:///database.db?node=node1')

# The node must be registered before creating the engine
from dqlitepy.sqlalchemy import register_dqlite_node
register_dqlite_node(node, node_name='node1')

Model Definition

Define models using standard SQLAlchemy syntax:

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship
from datetime import datetime

Base = declarative_base()

class Group(Base):
__tablename__ = 'groups'

id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True)
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

# Relationship
members = relationship("Member", back_populates="group", cascade="all, delete-orphan")

class Member(Base):
__tablename__ = 'members'

id = Column(Integer, primary_key=True)
group_id = Column(Integer, ForeignKey('groups.id'), nullable=False)
name = Column(String(100), nullable=False)
email = Column(String(100))
role = Column(String(50))
joined_at = Column(DateTime, default=datetime.utcnow)

# Relationship
group = relationship("Group", back_populates="members")

JSON Column Support

dqlitepy provides a custom JSON column type:

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from dqlitepy.sqlalchemy import JSON

Base = declarative_base()

class Config(Base):
__tablename__ = 'config'

id = Column(Integer, primary_key=True)
key = Column(String(50), unique=True)
value = Column(JSON) # Automatically serializes/deserializes JSON

# Usage
with Session(engine) as session:
config = Config(
key='app_settings',
value={'theme': 'dark', 'language': 'en', 'notifications': True}
)
session.add(config)
session.commit()

# Query returns Python dict
settings = session.query(Config).filter(Config.key == 'app_settings').first()
print(settings.value) # {'theme': 'dark', 'language': 'en', 'notifications': True}

CRUD Operations

Create (INSERT)

with Session(engine) as session:
# Single insert
user = User(name='Bob', email='bob@example.com')
session.add(user)
session.commit()

# Bulk insert
users = [
User(name='Charlie', email='charlie@example.com'),
User(name='Diana', email='diana@example.com'),
]
session.add_all(users)
session.commit()

Read (SELECT)

with Session(engine) as session:
# Get by primary key
user = session.get(User, 1)

# Filter query
users = session.query(User).filter(User.name.like('A%')).all()

# Order by
users = session.query(User).order_by(User.name.desc()).all()

# Limit and offset
users = session.query(User).limit(10).offset(20).all()

# Count
count = session.query(User).count()

# Exists
exists = session.query(User).filter(User.email == 'alice@example.com').first() is not None

Update

with Session(engine) as session:
# Update single record
user = session.get(User, 1)
user.email = 'newemail@example.com'
session.commit()

# Bulk update
session.query(User).filter(User.name == 'Bob').update({User.email: 'bob@newdomain.com'})
session.commit()

Delete

with Session(engine) as session:
# Delete single record
user = session.get(User, 1)
session.delete(user)
session.commit()

# Bulk delete
session.query(User).filter(User.name == 'Bob').delete()
session.commit()

Relationships

One-to-Many

# Add members to a group
with Session(engine) as session:
group = Group(name='Engineering', description='Software Engineers')

# Add related objects
group.members.append(Member(name='Alice', email='alice@example.com', role='Senior'))
group.members.append(Member(name='Bob', email='bob@example.com', role='Junior'))

session.add(group)
session.commit()

# Query with relationships
with Session(engine) as session:
group = session.query(Group).filter(Group.name == 'Engineering').first()
print(f"Group: {group.name}")
for member in group.members:
print(f" - {member.name} ({member.role})")

Eager Loading

from sqlalchemy.orm import selectinload

with Session(engine) as session:
# Load relationships in single query
groups = session.query(Group).options(selectinload(Group.members)).all()

# Now accessing members doesn't trigger additional queries
for group in groups:
print(f"{group.name}: {len(group.members)} members")

Transactions

SQLAlchemy sessions handle transactions automatically:

with Session(engine) as session:
try:
# Multiple operations in a transaction
user1 = User(name='Test1', email='test1@example.com')
user2 = User(name='Test2', email='test2@example.com')

session.add_all([user1, user2])
session.commit() # Commits both inserts atomically

except Exception as e:
session.rollback() # Rolls back on error
raise

Explicit transaction control:

with Session(engine) as session:
with session.begin():
# Transaction starts automatically
user = User(name='Test', email='test@example.com')
session.add(user)
# Transaction commits when block exits

Raw SQL

You can execute raw SQL when needed:

from sqlalchemy import text

with Session(engine) as session:
# Execute raw SQL
result = session.execute(text("SELECT * FROM users WHERE name = :name"), {"name": "Alice"})

for row in result:
print(dict(row._mapping))

# DML operations
session.execute(text("UPDATE users SET email = :email WHERE id = :id"),
{"email": "new@example.com", "id": 1})
session.commit()

Cluster Considerations

When using SQLAlchemy with a dqlite cluster:

Register Multiple Nodes

# Start multiple nodes
node1 = Node("172.20.0.11:9001", "/data/node1", cluster=[...])
node2 = Node("172.20.0.12:9001", "/data/node2", cluster=[...])
node3 = Node("172.20.0.13:9001", "/data/node3", cluster=[...])

node1.start()
node2.start()
node3.start()

# Register all nodes (engine will use first registered node)
register_dqlite_node(node1, 'node1')
register_dqlite_node(node2, 'node2')
register_dqlite_node(node3, 'node3')

# Create engine (uses node1)
engine = create_engine('dqlite:///myapp.db?node=node1')

Handle Leader Changes

Writes automatically route to the leader:

from dqlitepy import NoLeaderError
import time

def execute_with_retry(session, func, max_retries=5):
for attempt in range(max_retries):
try:
return func(session)
except NoLeaderError:
if attempt < max_retries - 1:
time.sleep(1)
continue
raise

# Usage
with Session(engine) as session:
execute_with_retry(session, lambda s: s.add(User(name='Test')))
session.commit()

FastAPI Example

Complete example using FastAPI with SQLAlchemy and dqlitepy:

from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from dqlitepy import Node
from dqlitepy.sqlalchemy import register_dqlite_node

# Initialize node
node = Node("127.0.0.1:9001", "/data")
node.start()
register_dqlite_node(node, 'default')

# Create engine
engine = create_engine('dqlite:///myapp.db')
SessionLocal = sessionmaker(bind=engine)

app = FastAPI()

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

@app.post("/users")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
user = User(name=name, email=email)
db.add(user)
db.commit()
db.refresh(user)
return user

@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()

Performance Tips

  1. Use bulk operations for multiple inserts:
session.bulk_insert_mappings(User, [
{'name': 'User1', 'email': 'user1@example.com'},
{'name': 'User2', 'email': 'user2@example.com'},
])
  1. Use eager loading to avoid N+1 queries:
groups = session.query(Group).options(selectinload(Group.members)).all()
  1. Pool connections (not needed with dqlitepy, but use session pooling):
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)

Limitations and Known Issues

Current Limitations

  1. Parameter Binding: Not yet implemented in DB-API layer

    • Workaround: Use SQLAlchemy's parameter handling (it works correctly)
    • Direct cursor usage requires careful string interpolation
  2. Explicit Transactions: No BEGIN/COMMIT/ROLLBACK support yet

    • Each statement is automatically committed via Raft consensus
    • SQLAlchemy's session.commit() and session.rollback() work as expected
  3. Batch Operations: executemany() not implemented in DB-API layer

    • Workaround: Use SQLAlchemy's bulk operations or loop with individual operations
  4. Advanced Features: Some edge cases may not work

    • Sequences not supported (use AUTOINCREMENT instead)
    • Some advanced locking/isolation features not available

Planned Improvements

  • Parameter binding with ? placeholders in DB-API layer
  • Explicit transaction support (BEGIN/COMMIT/ROLLBACK)
  • Batch execution with executemany()
  • Advanced type mapping (custom types, arrays)
  • Connection pooling optimizations
  • Query plan inspection
  • Performance profiling integration

Thread Safety

The SQLAlchemy dialect is thread-safe at level 1 (threads may share the module but not connections). Use SQLAlchemy's session management for thread safety:

from sqlalchemy.orm import sessionmaker, scoped_session

# Thread-safe session factory
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

# Use in threads
def worker():
session = Session()
try:
users = session.query(User).all()
# ... do work ...
finally:
Session.remove() # Clean up thread-local session

Next Steps