FastAPI + SQLAlchemy Integration Architecture
This document describes how to integrate dqlitepy with FastAPI and SQLAlchemy, including initialization patterns, query strategies, and production deployment considerations.
Overview
dqlitepy can be integrated with FastAPI applications in multiple ways, providing distributed database capabilities with the familiarity of SQLAlchemy ORM or direct SQL execution.
Integration Patterns
Pattern 1: SQLAlchemy ORM with Dependency Injection
The recommended pattern for FastAPI applications using SQLAlchemy ORM:
Implementation Example
Application Structure:
fastapi_app/
├── main.py # FastAPI app + lifespan
├── config.py # Configuration
├── database.py # Database setup
├── models.py # SQLAlchemy models
├── schemas.py # Pydantic schemas
├── crud.py # Database operations
└── routers/
├── users.py
└── items.py
database.py - Database Configuration:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from dqlitepy import Node
from pathlib import Path
# Global state
engine = None
SessionLocal = None
node = None
Base = declarative_base()
def init_database(node_address: str, data_dir: Path):
"""Initialize dqlite node and SQLAlchemy engine."""
global engine, SessionLocal, node
# Create and start dqlite node
node = Node(
address=node_address,
data_dir=data_dir,
auto_recovery=True,
snapshot_compression=True
)
node.start()
# Create SQLAlchemy engine
database_url = f"dqlite:///{node_address}/{data_dir}/app.db"
engine = create_engine(
database_url,
connect_args={
"check_same_thread": False, # Allow multi-threaded access
"timeout": 10.0
},
pool_pre_ping=True, # Verify connections
pool_size=5,
max_overflow=10
)
# Create session maker
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# Create tables
Base.metadata.create_all(bind=engine)
return engine, SessionLocal, node
def get_db():
"""Dependency for getting database sessions."""
db = SessionLocal()
try:
yield db
finally:
db.close()
def close_database():
"""Clean shutdown of database resources."""
global engine, node
if engine:
engine.dispose()
if node:
node.close()
main.py - FastAPI Application:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from pathlib import Path
from .database import init_database, get_db, close_database
from .models import User
from .schemas import UserCreate, UserResponse
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
# Startup
node_address = "127.0.0.1:9001"
data_dir = Path("/var/lib/dqlite")
engine, session_maker, node = init_database(node_address, data_dir)
app.state.engine = engine
app.state.node = node
yield # Application runs
# Shutdown
close_database()
app = FastAPI(lifespan=lifespan)
@app.post("/users/", response_model=UserResponse)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
"""Create a new user."""
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
"""Get a user by ID."""
return db.query(User).filter(User.id == user_id).first()
Pattern 2: Direct DB-API with Connection Pooling
For applications that prefer direct SQL execution without ORM overhead:
Connection Pool Implementation
database.py - Connection Pool:
from queue import Queue, Empty
from contextlib import contextmanager
from typing import Generator
import dqlitepy
from pathlib import Path
class ConnectionPool:
"""Simple connection pool for dqlitepy."""
def __init__(self, address: str, data_dir: Path, pool_size: int = 5):
self.address = address
self.data_dir = data_dir
self.pool_size = pool_size
self.pool: Queue = Queue(maxsize=pool_size)
self.node = None
def initialize(self):
"""Create node and initialize pool."""
# Start dqlite node
from dqlitepy import Node
self.node = Node(
address=self.address,
data_dir=self.data_dir
)
self.node.start()
# Create initial connections
for _ in range(self.pool_size):
conn = dqlitepy.connect(
address=self.address,
data_dir=self.data_dir
)
self.pool.put(conn)
@contextmanager
def get_connection(self) -> Generator:
"""Get a connection from the pool."""
conn = None
try:
conn = self.pool.get(timeout=5.0)
yield conn
finally:
if conn:
# Return to pool
try:
conn.rollback() # Ensure clean state
except:
pass
self.pool.put(conn)
def close(self):
"""Close all connections and node."""
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
conn.close()
except Empty:
break
if self.node:
self.node.close()
# Global pool
pool: ConnectionPool = None
def init_pool(address: str, data_dir: Path, pool_size: int = 5):
"""Initialize the connection pool."""
global pool
pool = ConnectionPool(address, data_dir, pool_size)
pool.initialize()
return pool
def get_connection():
"""Dependency for getting connections."""
with pool.get_connection() as conn:
yield conn
def close_pool():
"""Close the connection pool."""
if pool:
pool.close()
main.py - Direct SQL API:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from pathlib import Path
from .database import init_pool, get_connection, close_pool
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
# Startup
pool = init_pool(
address="127.0.0.1:9001",
data_dir=Path("/var/lib/dqlite"),
pool_size=10
)
app.state.pool = pool
yield # Application runs
# Shutdown
close_pool()
app = FastAPI(lifespan=lifespan)
@app.post("/users/")
def create_user(name: str, email: str, conn = Depends(get_connection)):
"""Create a user with direct SQL."""
cursor = conn.cursor()
cursor.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
user_id = cursor.lastrowid
return {"id": user_id, "name": name, "email": email}
@app.get("/users/{user_id}")
def get_user(user_id: int, conn = Depends(get_connection)):
"""Get a user with direct SQL."""
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, email FROM users WHERE id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="User not found")
return {"id": row[0], "name": row[1], "email": row[2]}
Query Execution Comparison
SQLAlchemy ORM vs Direct SQL
Performance Characteristics
| Aspect | SQLAlchemy ORM | Direct SQL |
|---|---|---|
| Latency | +0.5-2ms overhead | Baseline |
| Type Safety | Excellent (Python types) | Manual (tuples) |
| Flexibility | High (complex queries) | Very High (raw SQL) |
| Code Maintenance | Easy (models) | Moderate (SQL strings) |
| Memory | Higher (object hydration) | Lower (tuples) |
| Best For | CRUD operations, relationships | Performance-critical, complex queries |
Example Query Execution Flow
SQLAlchemy ORM Query
Example Code:
from sqlalchemy.orm import Session
def get_active_users(db: Session, limit: int = 10):
"""Get active users using SQLAlchemy ORM."""
return db.query(User)\
.filter(User.is_active == True)\
.order_by(User.created_at.desc())\
.limit(limit)\
.all()
Generated SQL:
SELECT users.id, users.name, users.email, users.is_active, users.created_at
FROM users
WHERE users.is_active = 1
ORDER BY users.created_at DESC
LIMIT 10
Direct SQL Query
Example Code:
from typing import List, Dict
import dqlitepy
def get_active_users(conn: dqlitepy.Connection, limit: int = 10) -> List[Dict]:
"""Get active users using direct SQL."""
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, email, is_active, created_at
FROM users
WHERE is_active = 1
ORDER BY created_at DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [
{
"id": row[0],
"name": row[1],
"email": row[2],
"is_active": bool(row[3]),
"created_at": row[4]
}
for row in rows
]
Advanced Integration Patterns
Pattern 3: Multi-Node Cluster Setup
For production deployments with high availability:
Cluster Configuration
config.py - Cluster Configuration:
from pydantic_settings import BaseSettings
from typing import List
class Settings(BaseSettings):
# Application settings
app_name: str = "FastAPI dqlite App"
debug: bool = False
# dqlite cluster settings
node_id: int
node_address: str
node_data_dir: str
cluster_nodes: List[str] # ["127.0.0.1:9001", "127.0.0.1:9002", ...]
# Database settings
database_name: str = "app.db"
connection_pool_size: int = 10
connection_timeout: float = 10.0
class Config:
env_file = ".env"
settings = Settings()
database.py - Cluster-Aware Setup:
from dqlitepy import Node, Client
from sqlalchemy import create_engine
from pathlib import Path
def init_cluster_database(settings):
"""Initialize node as part of cluster."""
data_dir = Path(settings.node_data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
# Create node with cluster awareness
node = Node(
node_id=settings.node_id,
address=settings.node_address,
data_dir=data_dir,
cluster=settings.cluster_nodes if settings.node_id > 1 else None,
auto_recovery=True,
snapshot_compression=True,
network_latency_ms=50
)
node.start()
# Join cluster if not bootstrap node
if settings.node_id > 1 and settings.cluster_nodes:
client = Client(settings.cluster_nodes)
try:
client.add(settings.node_id, settings.node_address)
finally:
client.close()
# Create engine with retry logic
database_url = f"dqlite:///{settings.node_address}/{data_dir}/{settings.database_name}"
engine = create_engine(
database_url,
connect_args={
"timeout": settings.connection_timeout,
"check_same_thread": False
},
pool_size=settings.connection_pool_size,
pool_pre_ping=True,
pool_recycle=3600 # Recycle connections after 1 hour
)
return engine, node
Pattern 4: Read/Write Splitting
Optimize performance by routing reads to followers:
Error Handling and Retry Logic
FastAPI Error Handling
Error Handler Implementation
from fastapi import HTTPException
from sqlalchemy.exc import OperationalError, DatabaseError
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class DatabaseErrorHandler:
"""Handle database errors with retry logic."""
@staticmethod
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
def execute_with_retry(func, *args, **kwargs):
"""Execute database operation with retry."""
try:
return func(*args, **kwargs)
except OperationalError as e:
logger.warning(f"Database operation failed: {e}")
# Check if it's a leader change or timeout
if "not leader" in str(e).lower():
logger.info("Leader changed, retrying...")
raise # Retry
elif "timeout" in str(e).lower():
logger.info("Operation timeout, retrying...")
raise # Retry
else:
# Don't retry other operational errors
raise HTTPException(status_code=503, detail="Database unavailable")
except DatabaseError as e:
logger.error(f"Database error: {e}")
raise HTTPException(status_code=500, detail="Database error")
# Usage in endpoint
@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
"""Create user with retry logic."""
def _create():
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
return DatabaseErrorHandler.execute_with_retry(_create)
Performance Optimization
Connection Pooling Configuration
Optimal Configuration:
from sqlalchemy import create_engine
engine = create_engine(
database_url,
# Pool configuration
pool_size=20, # Base connection pool
max_overflow=10, # Additional connections under load
pool_timeout=30, # Wait time for connection
pool_recycle=3600, # Recycle connections hourly
pool_pre_ping=True, # Verify connection before use
# Connection arguments
connect_args={
"timeout": 10.0, # Query timeout
"check_same_thread": False,
"isolation_level": None # Autocommit mode
},
# Execution options
echo=False, # Don't log SQL (production)
echo_pool=False, # Don't log pool events
future=True # SQLAlchemy 2.0 style
)
Monitoring and Observability
Health Check Endpoint
from fastapi import FastAPI
from sqlalchemy import text
@app.get("/health")
async def health_check():
"""Health check with database connectivity."""
try:
# Quick database check
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).scalar()
if result != 1:
return {"status": "unhealthy", "database": "failed"}
# Check node status
node_status = {
"running": app.state.node.is_running if hasattr(app.state, 'node') else False,
"address": app.state.node.address if hasattr(app.state, 'node') else None
}
return {
"status": "healthy",
"database": "connected",
"node": node_status
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {"status": "unhealthy", "error": str(e)}
@app.get("/metrics")
async def metrics():
"""Expose basic metrics."""
pool = engine.pool
return {
"pool": {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"total": pool.size() + pool.overflow()
},
"node": {
"id": app.state.node.id if hasattr(app.state, 'node') else None,
"address": app.state.node.address if hasattr(app.state, 'node') else None
}
}
Best Practices
1. Startup/Shutdown Lifecycle
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Proper lifecycle management."""
# Startup
logger.info("Starting application...")
try:
# Initialize database
engine, node = init_database()
app.state.engine = engine
app.state.node = node
# Wait for node to be ready
await asyncio.sleep(1)
logger.info("Application started successfully")
yield
finally:
# Shutdown
logger.info("Shutting down application...")
# Close database connections
if hasattr(app.state, 'engine'):
app.state.engine.dispose()
# Stop node gracefully
if hasattr(app.state, 'node'):
try:
app.state.node.handover() # Transfer leadership
await asyncio.sleep(0.5)
except:
pass
finally:
app.state.node.close()
logger.info("Application shutdown complete")
2. Transaction Management
from contextlib import contextmanager
@contextmanager
def transactional_session(db: Session):
"""Ensure proper transaction handling."""
try:
yield db
db.commit()
except Exception as e:
db.rollback()
logger.error(f"Transaction failed: {e}")
raise
finally:
db.close()
# Usage
def create_user_with_profile(user_data: dict, profile_data: dict):
"""Create user and profile in single transaction."""
with transactional_session(SessionLocal()) as db:
user = User(**user_data)
db.add(user)
db.flush() # Get user ID
profile = Profile(user_id=user.id, **profile_data)
db.add(profile)
# Commit happens automatically
3. Query Optimization
Use pagination for large result sets:
@app.get("/users/")
def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
"""Paginated user listing."""
return db.query(User)\
.offset(skip)\
.limit(min(limit, 1000))\ # Cap at 1000
.all()
Use select_in loading for relationships:
from sqlalchemy.orm import selectinload
@app.get("/users/{user_id}/posts")
def get_user_with_posts(user_id: int, db: Session = Depends(get_db)):
"""Efficiently load user with posts."""
return db.query(User)\
.options(selectinload(User.posts))\
.filter(User.id == user_id)\
.first()
Summary
| Integration Pattern | Use Case | Complexity | Performance |
|---|---|---|---|
| SQLAlchemy ORM | Standard CRUD, relationships | Medium | Good |
| Direct SQL | Performance-critical, complex queries | Low | Excellent |
| Connection Pool | High concurrency | Medium | Excellent |
| Multi-Node Cluster | High availability | High | Excellent |
Recommendations:
- Start with SQLAlchemy ORM for rapid development
- Use direct SQL for performance-critical paths
- Implement connection pooling for production
- Deploy multi-node cluster for high availability
- Add comprehensive error handling and retry logic
- Monitor pool metrics and query performance