SQLAlchemy Integration
dqlitepy provides full SQLAlchemy support through a custom dialect and DB-API 2.0 compliant interface.
Overview
The SQLAlchemy integration allows you to:
- Use SQLAlchemy ORM with distributed SQLite
- Define models with declarative syntax
- Perform CRUD operations with automatic replication
- Use relationships and foreign keys
- Leverage SQLAlchemy's query builder
Quick Start
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, Session
from dqlitepy import Node
from dqlitepy.sqlalchemy import register_dqlite_node
# Define models
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
# Start dqlite node
node = Node("127.0.0.1:9001", "/tmp/dqlite")
node.start()
# Register node with SQLAlchemy
register_dqlite_node(node, node_name='default')
# Create engine and tables
engine = create_engine('dqlite:///myapp.db')
Base.metadata.create_all(engine)
# Use ORM
with Session(engine) as session:
user = User(name='Alice', email='alice@example.com')
session.add(user)
session.commit()
# Query
users = session.query(User).filter(User.name == 'Alice').all()
for user in users:
print(f"{user.name}: {user.email}")
Connection URLs
dqlitepy uses custom connection URLs:
# Basic format
engine = create_engine('dqlite:///database.db')
# With named node (if you have multiple nodes)
engine = create_engine('dqlite:///database.db?node=node1')
# The node must be registered before creating the engine
from dqlitepy.sqlalchemy import register_dqlite_node
register_dqlite_node(node, node_name='node1')
Model Definition
Define models using standard SQLAlchemy syntax:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.orm import declarative_base, relationship
from datetime import datetime
Base = declarative_base()
class Group(Base):
__tablename__ = 'groups'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True)
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationship
members = relationship("Member", back_populates="group", cascade="all, delete-orphan")
class Member(Base):
__tablename__ = 'members'
id = Column(Integer, primary_key=True)
group_id = Column(Integer, ForeignKey('groups.id'), nullable=False)
name = Column(String(100), nullable=False)
email = Column(String(100))
role = Column(String(50))
joined_at = Column(DateTime, default=datetime.utcnow)
# Relationship
group = relationship("Group", back_populates="members")
JSON Column Support
dqlitepy provides a custom JSON column type:
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from dqlitepy.sqlalchemy import JSON
Base = declarative_base()
class Config(Base):
__tablename__ = 'config'
id = Column(Integer, primary_key=True)
key = Column(String(50), unique=True)
value = Column(JSON) # Automatically serializes/deserializes JSON
# Usage
with Session(engine) as session:
config = Config(
key='app_settings',
value={'theme': 'dark', 'language': 'en', 'notifications': True}
)
session.add(config)
session.commit()
# Query returns Python dict
settings = session.query(Config).filter(Config.key == 'app_settings').first()
print(settings.value) # {'theme': 'dark', 'language': 'en', 'notifications': True}
CRUD Operations
Create (INSERT)
with Session(engine) as session:
# Single insert
user = User(name='Bob', email='bob@example.com')
session.add(user)
session.commit()
# Bulk insert
users = [
User(name='Charlie', email='charlie@example.com'),
User(name='Diana', email='diana@example.com'),
]
session.add_all(users)
session.commit()
Read (SELECT)
with Session(engine) as session:
# Get by primary key
user = session.get(User, 1)
# Filter query
users = session.query(User).filter(User.name.like('A%')).all()
# Order by
users = session.query(User).order_by(User.name.desc()).all()
# Limit and offset
users = session.query(User).limit(10).offset(20).all()
# Count
count = session.query(User).count()
# Exists
exists = session.query(User).filter(User.email == 'alice@example.com').first() is not None
Update
with Session(engine) as session:
# Update single record
user = session.get(User, 1)
user.email = 'newemail@example.com'
session.commit()
# Bulk update
session.query(User).filter(User.name == 'Bob').update({User.email: 'bob@newdomain.com'})
session.commit()
Delete
with Session(engine) as session:
# Delete single record
user = session.get(User, 1)
session.delete(user)
session.commit()
# Bulk delete
session.query(User).filter(User.name == 'Bob').delete()
session.commit()
Relationships
One-to-Many
# Add members to a group
with Session(engine) as session:
group = Group(name='Engineering', description='Software Engineers')
# Add related objects
group.members.append(Member(name='Alice', email='alice@example.com', role='Senior'))
group.members.append(Member(name='Bob', email='bob@example.com', role='Junior'))
session.add(group)
session.commit()
# Query with relationships
with Session(engine) as session:
group = session.query(Group).filter(Group.name == 'Engineering').first()
print(f"Group: {group.name}")
for member in group.members:
print(f" - {member.name} ({member.role})")
Eager Loading
from sqlalchemy.orm import selectinload
with Session(engine) as session:
# Load relationships in single query
groups = session.query(Group).options(selectinload(Group.members)).all()
# Now accessing members doesn't trigger additional queries
for group in groups:
print(f"{group.name}: {len(group.members)} members")
Transactions
SQLAlchemy sessions handle transactions automatically:
with Session(engine) as session:
try:
# Multiple operations in a transaction
user1 = User(name='Test1', email='test1@example.com')
user2 = User(name='Test2', email='test2@example.com')
session.add_all([user1, user2])
session.commit() # Commits both inserts atomically
except Exception as e:
session.rollback() # Rolls back on error
raise
Explicit transaction control:
with Session(engine) as session:
with session.begin():
# Transaction starts automatically
user = User(name='Test', email='test@example.com')
session.add(user)
# Transaction commits when block exits
Raw SQL
You can execute raw SQL when needed:
from sqlalchemy import text
with Session(engine) as session:
# Execute raw SQL
result = session.execute(text("SELECT * FROM users WHERE name = :name"), {"name": "Alice"})
for row in result:
print(dict(row._mapping))
# DML operations
session.execute(text("UPDATE users SET email = :email WHERE id = :id"),
{"email": "new@example.com", "id": 1})
session.commit()
Cluster Considerations
When using SQLAlchemy with a dqlite cluster:
Register Multiple Nodes
# Start multiple nodes
node1 = Node("172.20.0.11:9001", "/data/node1", cluster=[...])
node2 = Node("172.20.0.12:9001", "/data/node2", cluster=[...])
node3 = Node("172.20.0.13:9001", "/data/node3", cluster=[...])
node1.start()
node2.start()
node3.start()
# Register all nodes (engine will use first registered node)
register_dqlite_node(node1, 'node1')
register_dqlite_node(node2, 'node2')
register_dqlite_node(node3, 'node3')
# Create engine (uses node1)
engine = create_engine('dqlite:///myapp.db?node=node1')
Handle Leader Changes
Writes automatically route to the leader:
from dqlitepy import NoLeaderError
import time
def execute_with_retry(session, func, max_retries=5):
for attempt in range(max_retries):
try:
return func(session)
except NoLeaderError:
if attempt < max_retries - 1:
time.sleep(1)
continue
raise
# Usage
with Session(engine) as session:
execute_with_retry(session, lambda s: s.add(User(name='Test')))
session.commit()
FastAPI Example
Complete example using FastAPI with SQLAlchemy and dqlitepy:
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from dqlitepy import Node
from dqlitepy.sqlalchemy import register_dqlite_node
# Initialize node
node = Node("127.0.0.1:9001", "/data")
node.start()
register_dqlite_node(node, 'default')
# Create engine
engine = create_engine('dqlite:///myapp.db')
SessionLocal = sessionmaker(bind=engine)
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users")
def create_user(name: str, email: str, db: Session = Depends(get_db)):
user = User(name=name, email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
Performance Tips
- Use bulk operations for multiple inserts:
session.bulk_insert_mappings(User, [
{'name': 'User1', 'email': 'user1@example.com'},
{'name': 'User2', 'email': 'user2@example.com'},
])
- Use eager loading to avoid N+1 queries:
groups = session.query(Group).options(selectinload(Group.members)).all()
- Pool connections (not needed with dqlitepy, but use session pooling):
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False)
Limitations and Known Issues
Current Limitations
-
Parameter Binding: Not yet implemented in DB-API layer
- Workaround: Use SQLAlchemy's parameter handling (it works correctly)
- Direct cursor usage requires careful string interpolation
-
Explicit Transactions: No BEGIN/COMMIT/ROLLBACK support yet
- Each statement is automatically committed via Raft consensus
- SQLAlchemy's session.commit() and session.rollback() work as expected
-
Batch Operations:
executemany()not implemented in DB-API layer- Workaround: Use SQLAlchemy's bulk operations or loop with individual operations
-
Advanced Features: Some edge cases may not work
- Sequences not supported (use AUTOINCREMENT instead)
- Some advanced locking/isolation features not available
Planned Improvements
- Parameter binding with
?placeholders in DB-API layer - Explicit transaction support (BEGIN/COMMIT/ROLLBACK)
- Batch execution with
executemany() - Advanced type mapping (custom types, arrays)
- Connection pooling optimizations
- Query plan inspection
- Performance profiling integration
Thread Safety
The SQLAlchemy dialect is thread-safe at level 1 (threads may share the module but not connections). Use SQLAlchemy's session management for thread safety:
from sqlalchemy.orm import sessionmaker, scoped_session
# Thread-safe session factory
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
# Use in threads
def worker():
session = Session()
try:
users = session.query(User).all()
# ... do work ...
finally:
Session.remove() # Clean up thread-local session
Next Steps
- DB-API 2.0 Interface - Lower-level database interface
- FastAPI Example - Complete FastAPI integration
- API Reference - Complete API documentation