FastAPI Integration Example
Learn how to build a production-ready REST API using FastAPI with dqlitepy and SQLAlchemy, including cluster management, Docker support, and comprehensive CLI tools.
Overview
This comprehensive example demonstrates:
- FastAPI REST API with dqlitepy backend
- SQLAlchemy ORM for data modeling
- Multi-node cluster management
- Docker and Docker Compose deployment
- CLI for database operations
- Health checks and monitoring
- Production-ready patterns
Perfect for: Building scalable, distributed web applications with dqlitepy.
Prerequisites
- Python 3.12 or higher
uvpackage manager installed- Docker and Docker Compose (for containerized deployment)
- Understanding of FastAPI basics
- Familiarity with SQLAlchemy (see SQLAlchemy ORM Example)
Quick Start
Local Development
Run the API locally:
cd examples/fast_api_example
./quickstart.sh
The script will:
- Install all dependencies
- Set up a 3-node cluster
- Start the FastAPI server
- Display API endpoints
Docker Deployment
Run the full cluster with Docker:
cd examples/fast_api_example
docker-compose up -d
This starts:
- 3 dqlite nodes (ports 9001-9003)
- FastAPI server (port 8000)
- Automatic cluster formation
Architecture
Project Structure
fast_api_example/
├── fast_api_example/
│ ├── __init__.py
│ ├── app.py # FastAPI application
│ ├── cli.py # CLI commands
│ ├── config.py # Configuration
│ ├── database.py # Database setup
│ ├── db_dqlite.py # dqlite integration
│ ├── models.py # SQLAlchemy models
│ ├── schemas.py # Pydantic schemas
│ └── routes/
│ ├── __init__.py
│ ├── items.py # Item endpoints
│ └── health.py # Health check endpoints
├── docker-compose.yml # Docker orchestration
├── Dockerfile # Container image
├── pyproject.toml # Dependencies
├── quickstart.sh # Quick start script
└── README.md # Documentation
Code Walkthrough
Database Configuration
# fast_api_example/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Database settings
database_name: str = "fastapi_app.db"
# Cluster settings
node_id: int = 1
node_address: str = "127.0.0.1:9001"
cluster_nodes: list[str] = [
"127.0.0.1:9001",
"127.0.0.1:9002",
"127.0.0.1:9003"
]
# API settings
api_host: str = "0.0.0.0"
api_port: int = 8000
class Config:
env_file = ".env"
settings = Settings()
Key Points:
- Use Pydantic settings for configuration
- Support environment variables
- Separate concerns (DB, cluster, API)
Database Setup
# fast_api_example/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from .config import settings
# Create engine with dqlitepy
engine = create_engine(
f"dqlite+pydqlite:///{settings.database_name}",
connect_args={
"cluster": settings.cluster_nodes
}
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base class for models
Base = declarative_base()
def get_db():
"""Dependency for FastAPI routes."""
db = SessionLocal()
try:
yield db
finally:
db.close()
Key Points:
- Use dependency injection for sessions
- Automatic session cleanup
- Single engine for the application
Data Models
# fast_api_example/models.py
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime
from datetime import datetime
from .database import Base
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False, index=True)
description = Column(String(500))
price = Column(Float, nullable=False)
in_stock = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
Key Points:
- Standard SQLAlchemy models
- Indexes for common queries
- Timestamp tracking
Pydantic Schemas
# fast_api_example/schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
class ItemBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: str | None = Field(None, max_length=500)
price: float = Field(..., gt=0)
in_stock: bool = True
class ItemCreate(ItemBase):
pass
class ItemUpdate(BaseModel):
name: str | None = Field(None, min_length=1, max_length=100)
description: str | None = Field(None, max_length=500)
price: float | None = Field(None, gt=0)
in_stock: bool | None = None
class ItemResponse(ItemBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
Key Points:
- Separate schemas for create/update/response
- Validation with Pydantic
- Type safety
FastAPI Routes
# fast_api_example/routes/items.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ..database import get_db
from ..models import Item
from ..schemas import ItemCreate, ItemUpdate, ItemResponse
router = APIRouter(prefix="/items", tags=["items"])
@router.post("/", response_model=ItemResponse, status_code=status.HTTP_201_CREATED)
def create_item(item: ItemCreate, db: Session = Depends(get_db)):
"""Create a new item."""
db_item = Item(**item.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
@router.get("/", response_model=list[ItemResponse])
def list_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""List all items."""
items = db.query(Item).offset(skip).limit(limit).all()
return items
@router.get("/{item_id}", response_model=ItemResponse)
def get_item(item_id: int, db: Session = Depends(get_db)):
"""Get a specific item."""
item = db.query(Item).filter(Item.id == item_id).first()
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@router.put("/{item_id}", response_model=ItemResponse)
def update_item(item_id: int, item_update: ItemUpdate, db: Session = Depends(get_db)):
"""Update an item."""
db_item = db.query(Item).filter(Item.id == item_id).first()
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
update_data = item_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
@router.delete("/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_item(item_id: int, db: Session = Depends(get_db)):
"""Delete an item."""
db_item = db.query(Item).filter(Item.id == item_id).first()
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
db.delete(db_item)
db.commit()
return None
Key Points:
- RESTful endpoint design
- Dependency injection for database sessions
- Proper HTTP status codes
- Error handling
Health Check Endpoint
# fast_api_example/routes/health.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..database import get_db
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/")
def health_check(db: Session = Depends(get_db)):
"""Health check endpoint."""
try:
# Check database connectivity
db.execute(text("SELECT 1"))
return {
"status": "healthy",
"database": "connected"
}
except Exception as e:
return {
"status": "unhealthy",
"database": "disconnected",
"error": str(e)
}
@router.get("/cluster")
def cluster_status():
"""Get cluster status."""
# Implementation to check cluster nodes
return {
"nodes": [
{"id": 1, "address": "127.0.0.1:9001", "role": "leader"},
{"id": 2, "address": "127.0.0.1:9002", "role": "follower"},
{"id": 3, "address": "127.0.0.1:9003", "role": "follower"}
]
}
Key Points:
- Separate health check endpoints
- Database connectivity verification
- Cluster status monitoring
FastAPI Application
# fast_api_example/app.py
from fastapi import FastAPI
from .database import engine, Base
from .routes import items, health
from .config import settings
# Create tables
Base.metadata.create_all(bind=engine)
# Create FastAPI app
app = FastAPI(
title="dqlitepy FastAPI Example",
description="REST API with dqlitepy and SQLAlchemy",
version="1.0.0"
)
# Include routers
app.include_router(items.router)
app.include_router(health.router)
@app.get("/")
def root():
return {
"message": "dqlitepy FastAPI Example",
"docs": "/docs",
"health": "/health"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"fast_api_example.app:app",
host=settings.api_host,
port=settings.api_port,
reload=True
)
Key Points:
- Automatic table creation
- Router composition
- Built-in API documentation at
/docs
CLI Tool
# fast_api_example/cli.py
import typer
from .database import SessionLocal, engine, Base
from .models import Item
app = typer.Typer()
@app.command()
def init_db():
"""Initialize the database."""
Base.metadata.create_all(bind=engine)
typer.echo("Database initialized!")
@app.command()
def seed_data():
"""Seed the database with sample data."""
db = SessionLocal()
items = [
Item(name="Laptop", description="High-performance laptop", price=999.99),
Item(name="Mouse", description="Wireless mouse", price=29.99),
Item(name="Keyboard", description="Mechanical keyboard", price=79.99),
]
db.add_all(items)
db.commit()
typer.echo(f"Added {len(items)} items to the database!")
db.close()
@app.command()
def list_items():
"""List all items in the database."""
db = SessionLocal()
items = db.query(Item).all()
for item in items:
typer.echo(f"{item.id}: {item.name} - ${item.price}")
db.close()
if __name__ == "__main__":
app()
Key Points:
- Typer for CLI commands
- Database initialization
- Seed data for testing
- Administrative operations
API Endpoints
Items
POST /items/- Create a new itemGET /items/- List all items (supports pagination)GET /items/{item_id}- Get a specific itemPUT /items/{item_id}- Update an itemDELETE /items/{item_id}- Delete an item
Health
GET /health/- Application health checkGET /health/cluster- Cluster status
Documentation
GET /docs- Interactive API documentation (Swagger UI)GET /redoc- Alternative API documentation (ReDoc)
Running the Application
Development Mode
### Running the Application
```bash
# Start the API server
uv run python -m fast_api_example.app
# Or use uvicorn directly
uv run uvicorn fast_api_example.app:app --reload
```text
### CLI Commands
```bash
# Initialize database
uv run python -m fast_api_example.cli init-db
# Seed sample data
uv run python -m fast_api_example.cli seed-data
# List items
uv run python -m fast_api_example.cli list-items
```text
### Deploying with Docker
```bash
# Build and start all services
docker-compose up -d
# View logs
docker-compose logs -f
# Stop services
docker-compose down
```text
## Testing the API
### Using curl
```bash
# Create an item
curl -X POST http://localhost:8000/items/ \
-H "Content-Type: application/json" \
-d '{"name": "Monitor", "description": "4K monitor", "price": 399.99}'
# List items
curl http://localhost:8000/items/
# Get specific item
curl http://localhost:8000/items/1
# Update item
curl -X PUT http://localhost:8000/items/1 \
-H "Content-Type: application/json" \
-d '{"price": 899.99}'
# Delete item
curl -X DELETE http://localhost:8000/items/1
# Health check
curl http://localhost:8000/health/
```text
### Using Python requests
```python
import requests
base_url = "http://localhost:8000"
# Create item
response = requests.post(
f"{base_url}/items/",
json={"name": "Monitor", "price": 399.99, "in_stock": True}
)
print(response.json())
# List items
response = requests.get(f"{base_url}/items/")
print(response.json())
```text
## Docker Configuration
### Dockerfile
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
RUN pip install uv
COPY pyproject.toml .
RUN uv pip install --system -e .
# Copy application
COPY fast_api_example ./fast_api_example
# Expose API port
EXPOSE 8000
# Run application
CMD ["uvicorn", "fast_api_example.app:app", "--host", "0.0.0.0", "--port", "8000"]
```text
### docker-compose.yml
```yaml
version: '3.8'
services:
node1:
build: .
environment:
- NODE_ID=1
- NODE_ADDRESS=node1:9001
- CLUSTER_NODES=node1:9001,node2:9002,node3:9003
ports:
- "9001:9001"
volumes:
- node1-data:/data
node2:
build: .
environment:
- NODE_ID=2
- NODE_ADDRESS=node2:9002
- CLUSTER_NODES=node1:9001,node2:9002,node3:9003
ports:
- "9002:9002"
volumes:
- node2-data:/data
node3:
build: .
environment:
- NODE_ID=3
- NODE_ADDRESS=node3:9003
- CLUSTER_NODES=node1:9001,node2:9002,node3:9003
ports:
- "9003:9003"
volumes:
- node3-data:/data
api:
build: .
command: uvicorn fast_api_example.app:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
depends_on:
- node1
- node2
- node3
environment:
- CLUSTER_NODES=node1:9001,node2:9002,node3:9003
volumes:
node1-data:
node2-data:
node3-data:
```text
## Expected Output
When you access `http://localhost:8000/docs`, you'll see interactive API documentation.
Console output when starting the server:
INFO: Started server process [12345] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
## Best Practices
### Error Handling
```python
from fastapi import HTTPException
from sqlalchemy.exc import IntegrityError
@router.post("/items/")
def create_item(item: ItemCreate, db: Session = Depends(get_db)):
try:
db_item = Item(**item.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Item already exists")
```text
### Pagination
```python
@router.get("/items/")
def list_items(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
items = db.query(Item).offset(skip).limit(limit).all()
total = db.query(Item).count()
return {
"items": items,
"total": total,
"skip": skip,
"limit": limit
}
```text
### Background Tasks
```python
from fastapi import BackgroundTasks
def send_notification(item: Item):
# Send notification logic
pass
@router.post("/items/")
def create_item(
item: ItemCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
db_item = Item(**item.model_dump())
db.add(db_item)
db.commit()
db.refresh(db_item)
background_tasks.add_task(send_notification, db_item)
return db_item
```text
## Common Issues
### Port Already in Use
Change the port in configuration:
```python
# In config.py or .env
api_port = 8001
```text
### Database Connection Errors
Check cluster nodes are running:
```bash
# Test cluster connectivity
curl http://localhost:9001/health
curl http://localhost:9002/health
curl http://localhost:9003/health
```text
### Docker Network Issues
Ensure services can communicate:
```bash
docker-compose exec api ping node1
```text
## Source Code
The complete source code is available at:
- [`examples/fast_api_example/`](https://github.com/vantagecompute/dqlitepy/tree/main/examples/fast_api_example/)
## Next Steps
After mastering this example:
1. Study the [FastAPI Integration Architecture](../architecture/fastapi-integration.md)
## Related Documentation
- [FastAPI Official Documentation](https://fastapi.tiangolo.com/)
- [SQLAlchemy ORM Example](./sqlalchemy-orm.md)
- [Clustering Guide](../clustering.md)
- [Client API Reference](../api/client.md)