Module 03
Database Integration
SQLAlchemy, Async operations, PostgreSQL, and MongoDB integration with best practices.
01
Why Database Integration?
In real-world apps, APIs are useless without persistence. Database integration allows FastAPI to store, retrieve, and manipulate data.
02
Types of Databases Used with FastAPI
| Type | Examples |
|---|---|
| SQL (Relational) | PostgreSQL, MySQL, SQLite |
| NoSQL | MongoDB, Redis |
👉 For interviews → PostgreSQL + SQLAlchemy = must know
03
What is SQLAlchemy?
SQLAlchemy is a Python ORM (Object Relational Mapper) that lets you interact with databases using Python objects instead of raw SQL.
04
Installation
Terminal
bash
pip install sqlalchemy psycopg2-binary
# For async:
pip install asyncpg05
Project Structure (Important)
project/ │── main.py │── database.py │── models.py │── schemas.py │── crud.py
👉 This structure is very important for interviews
06
Step-by-Step Database Setup
Step 1: Database Connection
database.py
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)Step 2: Base Model
from sqlalchemy.orm import declarative_base
database.py
python
Base = declarative_base()Step 3: Define Models
models.py
python
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String)
email = Column(String, unique=True)Step 4: Create Tables
main.py
python
Base.metadata.create_all(bind=engine)Step 5: Pydantic Schemas
schemas.py
python
from pydantic import BaseModel
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: int
name: str
email: str
class Config:
orm_mode = TrueStep 6: Dependency (DB Session)
main.py
python
from database import SessionLocal
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()Step 7: CRUD Operations
crud.py
python
def create_user(db, user):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_userStep 8: API Integration
main.py
python
from fastapi import Depends
from sqlalchemy.orm import Session
@app.post("/users")
def create(user: UserCreate, db: Session = Depends(get_db)):
return create_user(db, user)07
Async Database Integration (IMPORTANT)
Why Async?
- 📈 Better performance under load
- 🔄 Non-blocking I/O
Async Setup
database.py
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)Async Dependency
main.py
python
async def get_db():
async with AsyncSessionLocal() as session:
yield sessionAsync CRUD
crud.py
python
async def create_user(db: AsyncSession, user):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user08
Relationships (VERY IMPORTANT)
🔗 One-to-Many Example
models.py
python
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
posts = relationship("Post", back_populates="owner")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="posts")09
Common ORM Operations
Get All
Query
python
db.query(User).all()Get by ID
Query
python
db.query(User).filter(User.id == id).first()Update
Mutation
python
user.name = "New Name"
db.commit()Delete
Mutation
python
db.delete(user)
db.commit()10
Transactions (Important)
Rollback Logic
python
try:
db.add(user)
db.commit()
except:
db.rollback()11
Migrations (Alembic)
Install
Terminal
bash
pip install alembicCommands
Terminal
bash
alembic init alembic
alembic revision --autogenerate -m "init"
alembic upgrade head👉 VERY IMPORTANT for interviews
12
MongoDB Integration (Step-by-Step)
Step 1: Install Motor
Terminal
bash
pip install motorStep 2: Connection Lifecycle
lifespan logic
python
from motor.motor_asyncio import AsyncIOMotorClient
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
app.mongodb_client = AsyncIOMotorClient("mongodb://localhost:27017")
app.database = app.mongodb_client["my_db"]
yield
app.mongodb_client.close()Step 3: MongoDB Model & CRUD
MongoDB API
python
@app.post("/items")
async def create_item(item: ItemModel, request: Request):
new_item = await request.app.database["items"].insert_one(item.dict())
return {"id": str(new_item.inserted_id)}13
PostgreSQL Tips
- Tip 1
Performance
Use indexes for faster query performance.
- Tip 2
IDs
Use UUID instead of integer IDs (advanced/scalable).
- Tip 3
Pooling
Handle connection pooling to manage multiple concurrent requests safely.
14
Common Mistakes
❌ Not closing DB session
❌ Using sync DB in async API
❌ Not using orm_mode
❌ Mixing schema & model
15
Interview Preparation
🧠 Theory Questions
What is ORM?▼
Maps database tables to Python objects.
What is SQLAlchemy?▼
Python ORM for DB interaction.
Difference: Schema vs Model?▼
Model defines DB structure (SQLAlchemy), Schema handles validation (Pydantic).
Why use Depends(get_db)?▼
It implements dependency injection for managing the database session lifecycle.
Sync vs Async DB?▼
Sync is blocking/simple; Async is non-blocking/scalable (use asyncpg).
What is Alembic?▼
A migration tool used to track and apply database schema changes.
💻 Coding Challenges
Q1: Create User API with DB
API Endpoint
python
@app.post("/users")
def create(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_userQ2: Get All Users
API Endpoint
python
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
return db.query(User).all()Q3: Update User
API Endpoint
python
@app.put("/users/{id}")
def update(id: int, user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == id).first()
db_user.name = user.name
db.commit()
return db_userQ4: Delete User
API Endpoint
python
@app.delete("/users/{id}")
def delete(id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == id).first()
db.delete(user)
db.commit()
return {"msg": "Deleted"}16
🧩 Bonus: Databases in ML APIs
💡
Why DB Matters in ML APIs?
- • Store predictions
- • Store user input
- • Log model performance
- • Track experiments
Example Use Case
ML Logging Schema
python
class Prediction(Base):
__tablename__ = "predictions"
id = Column(Integer, primary_key=True)
input_data = Column(String)
result = Column(String)