Build modern Python APIs at lightning speed. Every concept explained with code.
FastAPI is a modern, high-performance web framework for building APIs with Python 3.7+ based on standard type hints. It's built on top of Starlette (for the web parts) and Pydantic (for data validation).
Fastest Python framework with automatic validation, docs, and async support — all from type hints.
/docs and ReDoc at /redoc generated automatically.async/await out of the box.| Feature | Flask | FastAPI | Django REST |
|---|---|---|---|
| Speed | Slow (WSGI) | Very fast (ASGI) | Slow (WSGI) |
| Async | Limited | Native | Limited |
| Validation | Manual | Automatic (Pydantic) | Serializers |
| Auto docs | No | Yes (Swagger + ReDoc) | Plugin |
| Type hints | Optional | Core feature | Optional |
| Learning curve | Easy | Easy | Steep |
# Install
pip install fastapi uvicorn[standard]
# uvicorn = ASGI server that runs your FastAPI app
A working API in just a few lines — create an app, add a route, and run it.
# main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def root():
return {"message": "Hello, World!"}
@app.get("/health")
def health():
return {"status": "ok"}
# Run it
uvicorn main:app --reload
# main = file name (main.py)
# app = the FastAPI instance variable
# --reload = auto-restart on code changes (dev only)
# Now visit:
# http://127.0.0.1:8000 → your API
# http://127.0.0.1:8000/docs → Swagger UI (interactive docs)
# http://127.0.0.1:8000/redoc → ReDoc (alternative docs)
from fastapi import FastAPI
app = FastAPI()
# Each decorator = one endpoint
@app.get("/users") # GET — read data
def list_users(): ...
@app.post("/users") # POST — create data
def create_user(): ...
@app.put("/users/{id}") # PUT — full update
def update_user(id: int): ...
@app.patch("/users/{id}") # PATCH — partial update
def patch_user(id: int): ...
@app.delete("/users/{id}") # DELETE — remove data
def delete_user(id: int): ...
# Route order matters! More specific routes FIRST.
@app.get("/users/me") # ← this MUST come before /users/{id}
def get_current_user(): ...
@app.get("/users/{id}") # ← otherwise "me" would match as an {id}
def get_user(id: int): ...
Split your routes into separate files with prefixes and tags — keeps large APIs clean and organized.
# routes/users.py
from fastapi import APIRouter
router = APIRouter(prefix="/users", tags=["Users"])
@router.get("/")
def list_users(): ...
@router.get("/{id}")
def get_user(id: int): ...
# main.py
from fastapi import FastAPI
from routes.users import router as users_router
app = FastAPI()
app.include_router(users_router) # /users/ and /users/{id}
# Type hint = automatic validation + conversion
@app.get("/users/{user_id}")
def get_user(user_id: int): # auto-converted to int
return {"user_id": user_id}
# GET /users/42 → {"user_id": 42}
# GET /users/hello → 422 Validation Error (not an int)
# ── Enum for fixed values ──
from enum import Enum
class Role(str, Enum):
admin = "admin"
user = "user"
guest = "guest"
@app.get("/roles/{role}")
def get_role(role: Role):
return {"role": role.value}
# GET /roles/admin → {"role": "admin"}
# GET /roles/xyz → 422 error (not a valid Role)
# Any parameter NOT in the path = query parameter
@app.get("/users")
def list_users(
skip: int = 0, # optional, default 0
limit: int = 10, # optional, default 10
active: bool = True # optional, default True
):
return {"skip": skip, "limit": limit, "active": active}
# GET /users?skip=20&limit=5&active=false
# ── Required query params (no default) ──
@app.get("/search")
def search(q: str): # required! no default.
return {"query": q}
# GET /search → 422 error (q is required)
# GET /search?q=hello → {"query": "hello"}
# ── Optional with None ──
@app.get("/items")
def list_items(q: str | None = None):
if q:
return {"results": f"searching for {q}"}
return {"results": "all items"}
# ── Validation with Query() ──
from fastapi import Query
@app.get("/search")
def search(
q: str = Query(min_length=3, max_length=50, pattern="^[a-z]+"),
page: int = Query(default=1, ge=1), # >= 1
size: int = Query(default=10, le=100), # <= 100
):
return {"q": q, "page": page, "size": size}
from pydantic import BaseModel
class UserCreate(BaseModel):
name: str
email: str
age: int | None = None
@app.post("/users")
def create_user(user: UserCreate):
# user is already validated and typed!
return {"name": user.name, "email": user.email}
# POST /users
# Body: {"name": "Yatin", "email": "y@dev.com"}
# → {"name": "Yatin", "email": "y@dev.com"}
# POST /users
# Body: {"name": 123} → 422 error (name must be str, email missing)
# ── Body + Path + Query together ──
@app.put("/users/{user_id}")
def update_user(
user_id: int, # path param
user: UserCreate, # request body
notify: bool = False # query param
):
return {"id": user_id, "user": user.model_dump(), "notify": notify}
Pydantic is the backbone of FastAPI. It handles validation, serialization, and documentation all from your type hints.
from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime
class UserCreate(BaseModel):
name: str = Field(min_length=2, max_length=50)
email: str = Field(pattern=r"^[\w.-]+@[\w.-]+\.\w+$")
age: int = Field(ge=0, le=150)
tags: list[str] = []
# Custom validator
@field_validator("name")
@classmethod
def name_must_be_alpha(cls, v):
if not v.replace(" ", "").isalpha():
raise ValueError("Name must contain only letters")
return v.title()
class UserResponse(BaseModel):
id: int
name: str
email: str
created_at: datetime
model_config = {"from_attributes": True} # read from ORM objects
# ── Nested models ──
class Address(BaseModel):
street: str
city: str
country: str = "US"
class UserFull(BaseModel):
name: str
address: Address # nested!
friends: list["UserFull"] = [] # self-referencing
# ── Useful methods ──
user = UserCreate(name="yatin", email="y@dev.com", age=25)
user.model_dump() # → dict
user.model_dump_json() # → JSON string
user.model_dump(exclude={"tags"}) # exclude fields
UserCreate.model_json_schema() # → JSON Schema (for docs)
# response_model controls what the client SEES
# Your function can return more data — FastAPI filters it
class UserDB(BaseModel):
id: int
name: str
email: str
hashed_password: str # sensitive!
class UserOut(BaseModel):
id: int
name: str
email: str
# no password field → it's filtered out
@app.get("/users/{id}", response_model=UserOut)
def get_user(id: int):
# even if this returns a UserDB with password,
# the response only includes id, name, email
return UserDB(id=id, name="Yatin", email="y@dev.com", hashed_password="xxx")
# ── List responses ──
@app.get("/users", response_model=list[UserOut])
def list_users():
return [...]
from fastapi import FastAPI, HTTPException, status
# ── Set status code ──
@app.post("/users", status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
return user
# ── Raise errors ──
@app.get("/users/{id}")
def get_user(id: int):
user = fake_db.get(id)
if not user:
raise HTTPException(
status_code=404,
detail="User not found",
headers={"X-Error": "not-found"}
)
return user
# ── Custom exception handler ──
from fastapi.responses import JSONResponse
class ItemNotFound(Exception):
def __init__(self, item_id: int):
self.item_id = item_id
@app.exception_handler(ItemNotFound)
async def item_not_found_handler(request, exc):
return JSONResponse(
status_code=404,
content={"error": f"Item {exc.item_id} not found"}
)
| Code | Meaning | When to use |
|---|---|---|
200 | OK | Successful GET/PUT/PATCH |
201 | Created | Successful POST |
204 | No Content | Successful DELETE |
400 | Bad Request | Invalid input |
401 | Unauthorized | Not authenticated |
403 | Forbidden | Authenticated but no permission |
404 | Not Found | Resource doesn't exist |
422 | Unprocessable | Validation error (auto by FastAPI) |
500 | Server Error | Unhandled exception |
FastAPI's Depends() system lets you share logic across endpoints — database sessions, auth checks, pagination, etc.
from fastapi import Depends
# ── Simple dependency ──
def get_db():
db = SessionLocal()
try:
yield db # dependency with cleanup!
finally:
db.close()
@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
# ── Dependency with parameters ──
def pagination(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}
@app.get("/users")
def list_users(page: dict = Depends(pagination)):
return {"pagination": page}
# ── Auth dependency ──
def get_current_user(token: str = Header()):
user = decode_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
@app.get("/profile")
def profile(user = Depends(get_current_user)):
return {"user": user}
# ── Class-based dependency ──
class CommonParams:
def __init__(self, q: str | None = None, skip: int = 0, limit: int = 10):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items")
def list_items(params: CommonParams = Depends()):
return params
from fastapi.middleware.cors import CORSMiddleware
import time
# ── CORS (allow frontend to call your API) ──
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # or ["*"] for all
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Custom middleware ──
@app.middleware("http")
async def add_timing(request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
response.headers["X-Process-Time"] = str(elapsed)
return response
# pip install python-jose[cryptography] passlib[bcrypt]
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
# ── Hash & verify passwords ──
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# ── Create JWT token ──
def create_token(data: dict, expires_minutes: int = 30):
payload = data.copy()
payload["exp"] = datetime.utcnow() + timedelta(minutes=expires_minutes)
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# ── Decode & verify token ──
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
return get_user_from_db(user_id)
# ── Login endpoint ──
@app.post("/login")
def login(form: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form.username, form.password)
if not user:
raise HTTPException(status_code=401, detail="Bad credentials")
token = create_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}
# ── Protected endpoint ──
@app.get("/me")
def me(user = Depends(get_current_user)):
return user
# pip install sqlalchemy
# ── database.py ──
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "sqlite:///./app.db"
# For PostgreSQL: "postgresql://user:pass@localhost/dbname"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False)
class Base(DeclarativeBase):
pass
# ── models.py ──
from sqlalchemy import Column, Integer, String, Boolean
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
email = Column(String, unique=True, index=True)
is_active = Column(Boolean, default=True)
# ── Dependency ──
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ── CRUD endpoints ──
@app.post("/users", status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users")
def list_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
return db.query(User).offset(skip).limit(limit).all()
@app.get("/users/{id}")
def get_user(id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == id).first()
if not user:
raise HTTPException(status_code=404)
return user
@app.delete("/users/{id}", status_code=204)
def delete_user(id: int, db: Session = Depends(get_db)):
db.query(User).filter(User.id == id).delete()
db.commit()
import httpx
from fastapi import BackgroundTasks
# ── async endpoint (use for I/O-bound work) ──
@app.get("/external")
async def fetch_external():
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com/data")
return resp.json()
# Use `async def` when you have `await` calls
# Use regular `def` for CPU-bound or sync DB calls
# FastAPI runs regular `def` in a thread pool automatically
# ── Background tasks ──
def send_email(email: str, message: str):
# slow operation — runs AFTER response is sent
print(f"Sending to {email}: {message}")
@app.post("/signup")
def signup(email: str, bg: BackgroundTasks):
bg.add_task(send_email, email, "Welcome!")
return {"message": "Signed up!"} # returns immediately
from fastapi import File, UploadFile
# ── Single file ──
@app.post("/upload")
async def upload(file: UploadFile):
contents = await file.read()
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents)
}
# ── Multiple files ──
@app.post("/upload-many")
async def upload_many(files: list[UploadFile]):
return [{"name": f.filename} for f in files]
# ── Save to disk ──
@app.post("/upload-save")
async def save_file(file: UploadFile):
with open(f"uploads/{file.filename}", "wb") as f:
f.write(await file.read())
return {"saved": file.filename}
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected")
# ── Chat room (multiple clients) ──
class ConnectionManager:
def __init__(self):
self.connections: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.connections.append(ws)
def disconnect(self, ws: WebSocket):
self.connections.remove(ws)
async def broadcast(self, message: str):
for conn in self.connections:
await conn.send_text(message)
manager = ConnectionManager()
In production, a FastAPI app sits behind a reverse proxy (nginx) that handles SSL, static files, and load balancing. The app runs via gunicorn (process manager) spawning multiple uvicorn workers. The database is a separate service.
# How a request flows in production:
#
# Client → nginx (SSL, rate limit) → gunicorn (process manager)
# │ │
# │ ┌──────┼──────┐
# │ │ │ │
# │ worker worker worker (uvicorn)
# │ │ │ │
# │ └──────┼──────┘
# │ │
# │ PostgreSQL / Redis
#
# Run command:
# gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
# │ │
# 4 workers ASGI worker class
The complete layout for a production FastAPI app — models, schemas, routers, services, and tests all separated.
A deep dive into what each file does, why it exists, and the actual code that goes inside it.
# Reads from .env file or environment variables.
# Single source of truth for ALL configuration.
# Never hardcode secrets — always use this.
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# ── App ──
app_name: str = "My API"
debug: bool = False # True in dev, False in prod
api_version: str = "v1"
# ── Database ──
database_url: str # required! no default = must be set
# ── Auth ──
secret_key: str # for JWT signing
access_token_expire_minutes: int = 30
# ── CORS ──
allowed_origins: list[str] = ["http://localhost:3000"]
# ── External services ──
redis_url: str = "redis://localhost:6379"
smtp_host: str | None = None
# pydantic-settings reads from .env file automatically
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
# Cache it — settings are read once, reused everywhere
@lru_cache
def get_settings() -> Settings:
return Settings()
# Sets up SQLAlchemy engine, session factory, and Base class.
# Every file that talks to the DB imports from here.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import get_settings
settings = get_settings()
# Engine = connection pool to the database
# pool_pre_ping=True: test connections before using (handles DB restarts)
engine = create_engine(
settings.database_url,
pool_pre_ping=True,
# pool_size=20, # max persistent connections
# max_overflow=10, # extra connections under load
)
# SessionLocal = factory that creates DB sessions
# autocommit=False: you control when to commit
# autoflush=False: don't auto-sync to DB (explicit is better)
SessionLocal = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
)
# Base = all ORM models inherit from this
class Base(DeclarativeBase):
pass
# Functions injected via Depends() into route handlers.
# Keeps routers clean — they don't know how DB or auth works.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.database import SessionLocal
from app.config import get_settings
from app.models.user import User
settings = get_settings()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# ── Database session dependency ──
# yield = the session is given to the route handler,
# then ALWAYS closed in finally (even if route crashes)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ── Current user dependency ──
# Extracts and validates the JWT token from the Authorization header.
# If invalid → 401. If valid → returns the User object.
def get_current_user(
token: str = Depends(oauth2_scheme), # extract Bearer token
db: Session = Depends(get_db), # get DB session
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise credentials_exception
return user
# ── Admin-only dependency (chains on get_current_user) ──
def get_admin_user(user: User = Depends(get_current_user)) -> User:
if user.role != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return user
# SQLAlchemy ORM model = Python class that maps to a DB table.
# Each attribute = a column. This is your source of truth for the DB schema.
from sqlalchemy import Column, Integer, String, Boolean, DateTime, func
from sqlalchemy.orm import relationship
from app.database import Base
class User(Base):
__tablename__ = "users" # actual table name in the DB
# ── Columns ──
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
email = Column(String(255), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
role = Column(String(20), default="user") # "user" or "admin"
is_active = Column(Boolean, default=True)
# ── Timestamps (auto-set by DB) ──
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# ── Relationships ──
posts = relationship("Post", back_populates="author", cascade="all, delete")
def __repr__(self):
return f"User(id={self.id}, email={self.email})"
# Pydantic models define the SHAPE of your API.
# Separate from ORM models! Schema = what the client sees.
# Model = what the database stores.
from pydantic import BaseModel, Field, EmailStr
from datetime import datetime
# ── What the client SENDS to create a user ──
class UserCreate(BaseModel):
name: str = Field(min_length=2, max_length=100)
email: EmailStr # auto-validates email format
password: str = Field(min_length=8) # plain text — we hash it
# ── What the client SENDS to update a user ──
class UserUpdate(BaseModel):
name: str | None = None # all optional for PATCH
email: EmailStr | None = None
# ── What the client RECEIVES back ──
# Notice: no password field! Never expose passwords.
class UserResponse(BaseModel):
id: int
name: str
email: str
role: str
is_active: bool
created_at: datetime
# from_attributes = True lets Pydantic read from ORM objects
# (SQLAlchemy models use .email not ["email"])
model_config = {"from_attributes": True}
# ── Auth schemas ──
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
# All the "brain" of your app lives here.
# Services are pure logic — no HTTP, no request/response objects.
# This makes them reusable (CLI, workers, tests) and easy to test.
from sqlalchemy.orm import Session
from fastapi import HTTPException
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.services.auth_service import hash_password
def get_user_by_id(db: Session, user_id: int) -> User:
"""Fetch user or raise 404."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_user_by_email(db: Session, email: str) -> User | None:
"""Fetch user by email (for login, duplicate checks)."""
return db.query(User).filter(User.email == email).first()
def list_users(db: Session, skip: int = 0, limit: int = 20) -> list[User]:
"""Paginated user list."""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, data: UserCreate) -> User:
"""Create a new user. Hashes password, checks for duplicates."""
# Check if email already exists
if get_user_by_email(db, data.email):
raise HTTPException(status_code=400, detail="Email already registered")
# Create ORM object (hash the password!)
user = User(
name=data.name,
email=data.email,
hashed_password=hash_password(data.password),
)
# Add to DB session, commit, refresh to get auto-generated fields
db.add(user)
db.commit()
db.refresh(user) # now user.id and user.created_at are populated
return user
def update_user(db: Session, user_id: int, data: UserUpdate) -> User:
"""Partial update — only change fields that were sent."""
user = get_user_by_id(db, user_id)
# model_dump(exclude_unset=True) only includes fields the client sent
# so PATCH /users/1 {"name": "New"} only updates name, not email
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int) -> None:
"""Delete user or raise 404."""
user = get_user_by_id(db, user_id)
db.delete(user)
db.commit()
# Password hashing and JWT token creation/verification.
# Completely independent of HTTP — can be used anywhere.
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from app.config import get_settings
settings = get_settings()
# bcrypt is the industry standard for password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""One-way hash. Can't reverse it — only verify."""
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
"""Check if plain text matches the hash."""
return pwd_context.verify(plain, hashed)
def create_access_token(user_id: int) -> str:
"""Create a JWT token with user_id and expiration."""
payload = {
"sub": user_id, # subject = who this token is for
"exp": datetime.utcnow() + timedelta( # when it expires
minutes=settings.access_token_expire_minutes
),
"iat": datetime.utcnow(), # issued at
}
return jwt.encode(payload, settings.secret_key, algorithm="HS256")
# Routers are THIN. They only:
# 1. Define the HTTP method + path
# 2. Declare dependencies (DB, auth)
# 3. Call the service layer
# 4. Return the response
# NO business logic here!
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.schemas.user import UserCreate, UserUpdate, UserResponse
from app.services import user_service
from app.models.user import User
# prefix = all routes here start with /users
# tags = group in Swagger UI docs
router = APIRouter(prefix="/users", tags=["Users"])
@router.get("/", response_model=list[UserResponse])
def list_users(
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db), # injected DB session
):
"""Get paginated list of users."""
return user_service.list_users(db, skip, limit)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
"""Get a single user by ID."""
return user_service.get_user_by_id(db, user_id)
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user."""
return user_service.create_user(db, data)
@router.patch("/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
data: UserUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user), # must be logged in
):
"""Update user (partial). Requires authentication."""
return user_service.update_user(db, user_id, data)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a user. Requires authentication."""
user_service.delete_user(db, user_id)
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.schemas.user import UserCreate, UserResponse, Token
from app.services import user_service, auth_service
router = APIRouter(prefix="/auth", tags=["Auth"])
@router.post("/register", response_model=UserResponse, status_code=201)
def register(data: UserCreate, db: Session = Depends(get_db)):
"""Public registration endpoint."""
return user_service.create_user(db, data)
@router.post("/login", response_model=Token)
def login(
form: OAuth2PasswordRequestForm = Depends(), # username + password form
db: Session = Depends(get_db),
):
"""Login → returns JWT token."""
# 1. Find user by email (OAuth2 form uses "username" field)
user = user_service.get_user_by_email(db, form.username)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 2. Verify password
if not auth_service.verify_password(form.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
# 3. Create and return token
token = auth_service.create_access_token(user.id)
return {"access_token": token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
def me(current_user = Depends(get_current_user)):
"""Get the currently logged-in user."""
return current_user
# This is the root of your application.
# It creates the FastAPI instance, wires up routers,
# middleware, exception handlers, and startup/shutdown events.
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import get_settings
from app.database import engine, Base
from app.routers import users, auth
settings = get_settings()
# ── Startup / Shutdown events ──
@asynccontextmanager
async def lifespan(app: FastAPI):
# ── STARTUP: runs once when the app starts ──
print("Creating database tables...")
Base.metadata.create_all(bind=engine) # create tables if not exist
yield
# ── SHUTDOWN: runs when the app stops ──
print("Shutting down...")
# ── Create the FastAPI app ──
app = FastAPI(
title=settings.app_name,
version=settings.api_version,
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
lifespan=lifespan,
)
# ── Middleware ──
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Include routers ──
# Each router adds its own set of endpoints
app.include_router(auth.router) # /auth/login, /auth/register, /auth/me
app.include_router(users.router) # /users, /users/{id}
# ── Health check (every prod app needs this) ──
@app.get("/health", tags=["Health"])
def health():
return {"status": "ok"}
# Fixtures shared across ALL tests.
# Creates a separate test database so tests don't touch prod data.
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base
from app.dependencies import get_db
# Use SQLite in-memory for tests (fast, disposable)
TEST_DB_URL = "sqlite:///./test.db"
engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(bind=engine)
@pytest.fixture(autouse=True)
def setup_db():
"""Create tables before each test, drop after."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def db():
"""Provide a clean DB session for each test."""
session = TestSession()
try:
yield session
finally:
session.close()
@pytest.fixture
def client(db):
"""Test client that uses the test DB instead of prod DB."""
def override_get_db():
yield db
# Swap the real DB dependency with our test DB
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
# ── .env (NEVER commit this — it has real secrets) ──
DATABASE_URL=postgresql://myuser:mypassword@localhost:5432/mydb
SECRET_KEY=super-secret-jwt-key-change-this-in-production
DEBUG=false
ALLOWED_ORIGINS=["https://myapp.com","https://www.myapp.com"]
REDIS_URL=redis://localhost:6379
# ── .env.example (commit this — it's a template) ──
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
SECRET_KEY=change-me
DEBUG=true
ALLOWED_ORIGINS=["http://localhost:3000"]
# Runs your API + PostgreSQL + Redis in one command:
# docker compose up --build
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
env_file: .env
depends_on:
- db
- redis
volumes:
- .:/app # mount code for hot reload in dev
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
db:
image: postgres:16
environment:
POSTGRES_USER: myuser
POSTGRES_PASSWORD: mypassword
POSTGRES_DB: mydb
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data # persist data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
pgdata: # named volume so data survives restarts
# Multi-stage build: small final image, no dev dependencies
FROM python:3.12-slim as base
# Don't write .pyc files, unbuffered output (for logging)
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
# Install deps first (cached if requirements.txt doesn't change)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy app code
COPY . .
# Non-root user (security best practice)
RUN adduser --disabled-password --no-create-home appuser
USER appuser
# Production server: gunicorn + uvicorn workers
# Workers = 2 * CPU cores + 1
CMD ["gunicorn", "app.main:app", \
"-w", "4", \
"-k", "uvicorn.workers.UvicornWorker", \
"--bind", "0.0.0.0:8000"]
Follow a single request from client to database and back — see how each layer handles its part.
# POST /users {"name": "Yatin", "email": "y@dev.com", "password": "secret123"}
#
# 1. FastAPI receives the request
# 2. Pydantic validates the body → UserCreate schema
# (wrong type? missing field? → auto 422 error)
# 3. Router handler calls: user_service.create_user(db, data)
# 4. Service layer:
# - Checks if email exists (business rule)
# - Hashes the password (security)
# - Creates User ORM object
# - db.add() + db.commit() + db.refresh()
# 5. Returns User ORM object back to router
# 6. FastAPI serializes it through response_model=UserResponse
# (filters out hashed_password, formats dates)
# 7. Client receives: {"id": 1, "name": "Yatin", "email": "y@dev.com", ...}
#
# Request: Client → Router → Service → Database
# Response: Database → Service → Router → Pydantic → Client
# pip install pytest httpx
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_root():
resp = client.get("/")
assert resp.status_code == 200
assert resp.json() == {"message": "Hello, World!"}
def test_create_user():
resp = client.post("/users", json={
"name": "Yatin",
"email": "y@dev.com",
"age": 25
})
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "Yatin"
def test_not_found():
resp = client.get("/users/9999")
assert resp.status_code == 404
def test_validation_error():
resp = client.post("/users", json={"name": 123})
assert resp.status_code == 422
# ── Async testing ──
import pytest
from httpx import AsyncClient, ASGITransport
@pytest.mark.anyio
async def test_async():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
resp = await ac.get("/")
assert resp.status_code == 200
# ── Override dependencies in tests ──
def override_get_db():
db = TestSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
Package your FastAPI app into a Docker container for consistent deployment anywhere.
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Everything you need to do before deploying to production — security, performance, and reliability.
gunicorn with uvicorn workers: gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker--workers to 2 * CPU cores + 1--reload flagpydantic-settings)slowapi)loguru or stdlib logging)/healthType-safe configuration management — reads from .env files and environment variables with full validation.
# pip install pydantic-settings
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
secret_key: str
debug: bool = False
allowed_origins: list[str] = ["http://localhost:3000"]
model_config = {"env_file": ".env"}
settings = Settings()
# .env file:
# DATABASE_URL=postgresql://user:pass@localhost/mydb
# SECRET_KEY=super-secret-key-here
Redis is an in-memory data store — blazing fast key-value storage used for caching, sessions, rate limiting, and real-time pub/sub. It sits between your API and your database to speed things up.
# Install
pip install redis[hiredis]
# redis = Python Redis client (async support built-in)
# hiredis = C parser for faster responses (optional but recommended)
Create a Redis client as a dependency so every route can use it — connect on startup, disconnect on shutdown.
# app/redis.py
import redis.asyncio as redis
from app.config import settings
# Connection pool — reuses connections instead of opening new ones
pool = redis.ConnectionPool.from_url(
settings.redis_url, # "redis://localhost:6379"
max_connections=10,
decode_responses=True, # return strings, not bytes
)
async def get_redis():
client = redis.Redis(connection_pool=pool)
try:
yield client
finally:
await client.aclose()
# app/main.py
from contextlib import asynccontextmanager
from app.redis import pool
@asynccontextmanager
async def lifespan(app):
# Startup — pool is already created
yield
# Shutdown — close all connections
await pool.aclose()
app = FastAPI(lifespan=lifespan)
Store expensive query results in Redis — avoid hitting the database on every request. Set a TTL so stale data expires automatically.
import json
from fastapi import APIRouter, Depends
from app.redis import get_redis
router = APIRouter()
@router.get("/users/{user_id}")
async def get_user(user_id: int, r = Depends(get_redis), db = Depends(get_db)):
# 1. Check cache first
cache_key = f"user:{user_id}"
cached = await r.get(cache_key)
if cached:
return json.loads(cached) # cache HIT — no DB query!
# 2. Cache MISS — query database
user = db.query(User).get(user_id)
if not user:
raise HTTPException(404)
# 3. Store in cache (expire after 5 minutes)
user_data = {"id": user.id, "name": user.name, "email": user.email}
await r.set(cache_key, json.dumps(user_data), ex=300)
return user_data
# Invalidate cache when data changes
@router.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate, r = Depends(get_redis)):
# ... update in DB ...
await r.delete(f"user:{user_id}") # clear stale cache
return updated_user
Prevent abuse by limiting how many requests a client can make — Redis tracks request counts with auto-expiring keys.
from fastapi import Request, HTTPException
async def rate_limit(
request: Request,
r = Depends(get_redis),
limit: int = 100, # max requests
window: int = 60, # per 60 seconds
):
client_ip = request.client.host
key = f"rate:{client_ip}"
# Increment counter, set expiry on first request
current = await r.incr(key)
if current == 1:
await r.expire(key, window)
if current > limit:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {await r.ttl(key)}s"
)
# Apply to routes
@router.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
...
Store user sessions in Redis instead of the database — faster reads and automatic expiry for inactive sessions.
import uuid
async def create_session(r, user_id: int, ttl: int = 3600) -> str:
session_id = str(uuid.uuid4())
await r.hset(f"session:{session_id}", mapping={
"user_id": user_id,
"created": str(datetime.utcnow()),
})
await r.expire(f"session:{session_id}", ttl) # auto-delete after 1 hour
return session_id
async def get_session(r, session_id: str) -> dict | None:
data = await r.hgetall(f"session:{session_id}")
return data or None
async def delete_session(r, session_id: str):
await r.delete(f"session:{session_id}")
Publish events from one part of your app and subscribe to them from another — great for notifications and live updates.
# Publisher — send events from any route
@router.post("/orders")
async def create_order(order: OrderCreate, r = Depends(get_redis)):
# ... save to DB ...
# Publish event to "orders" channel
await r.publish("orders", json.dumps({
"event": "order_created",
"order_id": order_id,
"user_id": order.user_id,
}))
return {"id": order_id}
# Subscriber — listen via WebSocket
@router.websocket("/ws/orders")
async def order_feed(ws: WebSocket, r = Depends(get_redis)):
await ws.accept()
pubsub = r.pubsub()
await pubsub.subscribe("orders")
try:
async for msg in pubsub.listen():
if msg["type"] == "message":
await ws.send_text(msg["data"])
finally:
await pubsub.unsubscribe("orders")
Quick reference for the Redis operations you'll use most often in FastAPI.
| Operation | Command | Use Case |
|---|---|---|
| Set value | await r.set("key", "value", ex=300) | Cache with 5min TTL |
| Get value | await r.get("key") | Read from cache |
| Delete | await r.delete("key") | Invalidate cache |
| Increment | await r.incr("counter") | Rate limiting, counters |
| Hash set | await r.hset("user:1", mapping={...}) | Store objects (sessions) |
| Hash get all | await r.hgetall("user:1") | Read full object |
| Expire | await r.expire("key", 3600) | Auto-delete after time |
| TTL | await r.ttl("key") | Check remaining time |
| Exists | await r.exists("key") | Check before querying DB |
| List push | await r.lpush("queue", "task") | Simple job queue |
| List pop | await r.rpop("queue") | Process next job |
| Publish | await r.publish("channel", "msg") | Real-time events |
Apache Kafka is a distributed event streaming platform — it lets services communicate by producing and consuming messages through topics. Unlike HTTP APIs, Kafka is asynchronous and durable — messages persist even if the consumer is down.
Use REST for synchronous request-response. Use Kafka when services need to communicate without waiting for each other.
| Scenario | Use REST | Use Kafka |
|---|---|---|
| Get user profile | Yes | No |
| Send welcome email after signup | No | Yes |
| Process payment | Yes | No |
| Update analytics after purchase | No | Yes |
| Sync data between services | No | Yes |
| Real-time activity feed | No | Yes |
# Install
pip install aiokafka
# aiokafka = async Kafka client for Python (built on kafka-python)
# You also need a running Kafka broker (use Docker for local dev)
# docker-compose.yml — local Kafka setup
# services:
# kafka:
# image: confluentinc/cp-kafka:7.5.0
# ports:
# - "9092:9092"
# environment:
# KAFKA_NODE_ID: 1
# KAFKA_PROCESS_ROLES: broker,controller
# KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
# KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
# KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
Understand these four things and you understand Kafka — producers send messages, consumers read them, topics organize them, and consumer groups enable scaling.
# Kafka Architecture
#
# Producer ──→ Topic ──→ Consumer
# (log)
#
# Topic: A named category/feed of messages (like "orders", "emails")
# Partition: Topic is split into partitions for parallelism
# Producer: Sends messages TO a topic
# Consumer: Reads messages FROM a topic
# Consumer Group: Multiple consumers sharing the work (each gets different partitions)
# Offset: Position of a consumer in the topic (like a bookmark)
#
# Key difference from a queue:
# - Queue: message is deleted after being consumed
# - Kafka: message STAYS — multiple consumers can read it independently
Create a producer that connects on app startup and sends events whenever something happens in your API.
# app/kafka.py
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaProducerService:
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
)
async def start(self):
await self.producer.start()
async def stop(self):
await self.producer.stop()
async def send(self, topic: str, value: dict, key: str = None):
await self.producer.send_and_wait(topic, value=value, key=key)
kafka_producer = KafkaProducerService()
# app/main.py — connect producer on startup
from contextlib import asynccontextmanager
from app.kafka import kafka_producer
@asynccontextmanager
async def lifespan(app):
await kafka_producer.start() # connect on startup
yield
await kafka_producer.stop() # disconnect on shutdown
app = FastAPI(lifespan=lifespan)
# Use in routes — fire and forget
@router.post("/orders")
async def create_order(order: OrderCreate, db = Depends(get_db)):
# 1. Save to database (source of truth)
db_order = Order(**order.dict())
db.add(db_order)
db.commit()
# 2. Publish event to Kafka (async side effects)
await kafka_producer.send(
topic="orders",
key=str(db_order.id),
value={
"event": "order_created",
"order_id": db_order.id,
"user_id": order.user_id,
"total": order.total,
}
)
return {"id": db_order.id, "status": "created"}
Consumers run as background tasks that continuously listen for new messages and process them independently from the API.
# app/consumers/order_consumer.py
import asyncio
import json
from aiokafka import AIOKafkaConsumer
async def consume_orders():
consumer = AIOKafkaConsumer(
"orders", # topic to listen to
bootstrap_servers="localhost:9092",
group_id="email-service", # consumer group name
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest", # start from beginning if no offset saved
)
await consumer.start()
try:
async for msg in consumer:
event = msg.value
print(f"Received: {event}")
if event["event"] == "order_created":
# Send confirmation email, update analytics, etc.
await send_order_email(event["user_id"], event["order_id"])
finally:
await consumer.stop()
# app/main.py — run consumer as background task
from app.consumers.order_consumer import consume_orders
@asynccontextmanager
async def lifespan(app):
await kafka_producer.start()
# Start consumer in background (runs alongside the API)
consumer_task = asyncio.create_task(consume_orders())
yield
# Shutdown
consumer_task.cancel()
await kafka_producer.stop()
app = FastAPI(lifespan=lifespan)
One event, multiple consumers — each service reacts independently. The producer doesn't need to know who's listening.
# When a user signs up, multiple things need to happen:
#
# WITHOUT Kafka (tightly coupled):
# @router.post("/signup")
# async def signup(user):
# save_to_db(user) # 1. save
# await send_welcome_email() # 2. email (what if it fails?)
# await create_stripe_customer() # 3. billing (another failure point)
# await notify_slack() # 4. notify (yet another)
# # If step 3 fails, steps 1-2 already happened. Messy rollback.
#
# WITH Kafka (loosely coupled):
# @router.post("/signup")
# async def signup(user):
# save_to_db(user)
# await kafka.send("user_events", {"event": "user_signed_up", ...})
# # Done! Each service picks it up independently:
#
# ┌── email-service (group: "email") ───── sends welcome email
# │
# ├── billing-service (group: "billing") ── creates Stripe customer
# │
# └── notification-service (group: "notif") ── posts to Slack
#
# Each consumer group gets its OWN copy of every message.
# If email-service is down, messages wait. No data lost.
# Real example — producer publishes ONE event
@router.post("/signup")
async def signup(user: UserCreate, db = Depends(get_db)):
db_user = User(**user.dict())
db.add(db_user)
db.commit()
# Single event — multiple services react
await kafka_producer.send(
topic="user_events",
key=str(db_user.id),
value={
"event": "user_signed_up",
"user_id": db_user.id,
"email": db_user.email,
"name": db_user.name,
"timestamp": datetime.utcnow().isoformat(),
}
)
return {"id": db_user.id}
Consumer groups let you scale horizontally — add more consumers and Kafka automatically distributes partitions across them.
# How consumer groups work:
#
# Topic "orders" has 4 partitions:
# Partition 0 ──┐
# Partition 1 ──┤
# Partition 2 ──┤
# Partition 3 ──┘
#
# Consumer Group "email-service" with 2 consumers:
# Consumer A → reads Partition 0, 1
# Consumer B → reads Partition 2, 3
#
# Scale up to 4 consumers → each gets exactly 1 partition
# Scale up to 5 consumers → 1 sits idle (more consumers than partitions = waste)
#
# Different groups are INDEPENDENT:
# Group "email-service" → gets ALL messages (for sending emails)
# Group "analytics-service" → gets ALL messages (for tracking)
# Each group maintains its own offset (bookmark)
# Multiple consumers in the same group — they share the load
# Run these as separate processes/containers:
# Instance 1
consumer_1 = AIOKafkaConsumer(
"orders",
group_id="email-service", # same group
bootstrap_servers="localhost:9092",
)
# Instance 2 (separate process)
consumer_2 = AIOKafkaConsumer(
"orders",
group_id="email-service", # same group → Kafka splits partitions
bootstrap_servers="localhost:9092",
)
# Different group → gets its OWN copy of every message
analytics_consumer = AIOKafkaConsumer(
"orders",
group_id="analytics-service", # different group
bootstrap_servers="localhost:9092",
)
Redis and Kafka solve different problems — use them together for a complete architecture.
| Feature | Redis | Kafka |
|---|---|---|
| Purpose | Cache, sessions, rate limiting | Event streaming, service communication |
| Speed | Sub-millisecond reads | High throughput, slight latency |
| Data lifetime | Temporary (TTL-based) | Persistent (days/weeks/forever) |
| Pattern | Key-value lookup | Publish-subscribe / event log |
| Scaling | Vertical (bigger instance) | Horizontal (more partitions/brokers) |
| If it crashes | Cache is gone (rebuild from DB) | Messages are safe (replicated) |
# Common architecture: FastAPI + Redis + Kafka + PostgreSQL
#
# ┌─── Redis (cache) ───┐
# │ │
# Client → FastAPI ──┼─── PostgreSQL (data) │
# │ │
# └─── Kafka (events) ───┘
# │
# ┌─────────┼─────────┐
# │ │ │
# email-svc analytics billing
#
# 1. Request comes in → check Redis cache
# 2. Cache miss → query PostgreSQL
# 3. Store result in Redis for next time
# 4. Publish event to Kafka for side effects
# 5. Other services consume events independently
FastAPI is built on Starlette (ASGI) + Pydantic (validation). Flask/Django are WSGI-based.
| Feature | FastAPI | Flask | Django |
|---|---|---|---|
| Async | Native | Limited (2.0+) | Limited (3.1+) |
| Validation | Auto (Pydantic) | Manual | Forms/DRF serializers |
| API Docs | Auto Swagger + ReDoc | Manual | DRF + drf-spectacular |
| Speed | Node.js/Go level | Moderate | Moderate |
| Type hints | Core feature | Optional | Optional |
FastAPI = API-first. Django = full-stack (admin, ORM, templates). Flask = micro-framework (BYO everything).
WSGI (Web Server Gateway Interface) — synchronous. One request per thread. Flask/Django traditionally use this.
ASGI (Asynchronous Server Gateway Interface) — async-native. Handles thousands of concurrent connections on a single thread via event loop.
# WSGI — blocks during I/O
def app(environ, start_response):
data = fetch_from_db() # thread blocked here
start_response("200 OK", headers)
return [data]
# ASGI — non-blocking
async def app(scope, receive, send):
data = await fetch_from_db() # yields control
await send(response)
FastAPI uses ASGI → can handle WebSockets, long-polling, SSE, and high-concurrency I/O without threads.
Pydantic is a data validation library using Python type hints. FastAPI uses it for request/response validation, serialization, and auto-documentation.
from pydantic import BaseModel, Field, field_validator
class User(BaseModel):
name: str = Field(min_length=1, max_length=50)
email: str
age: int = Field(ge=0, le=150)
@field_validator("email")
@classmethod
def validate_email(cls, v):
if "@" not in v:
raise ValueError("Invalid email")
return v
# FastAPI auto-validates request body:
@app.post("/users")
async def create(user: User): # ← auto-validated
return user
Invalid data → automatic 422 Unprocessable Entity with detailed error messages. Zero manual validation code.
# Path parameter — identifies a SPECIFIC resource
@app.get("/users/{user_id}")
async def get_user(user_id: int): # /users/42
...
# Query parameters — filter/modify the response
@app.get("/users")
async def list_users(
skip: int = 0,
limit: int = 10,
role: str | None = None
): # /users?skip=0&limit=10&role=admin
...
Rule: Path params = required, identifies resource (/users/42). Query params = optional, filters/pagination (?page=2&sort=name).
DI in FastAPI lets you declare shared logic (DB sessions, auth, pagination) that gets automatically resolved and injected into route handlers.
from fastapi import Depends
# Dependency — reusable across routes
async def get_db():
db = SessionLocal()
try:
yield db # injected into route
finally:
db.close() # cleanup runs after response
# Sub-dependency — dependencies can depend on other dependencies
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
user = db.query(User).filter_by(token=token).first()
if not user: raise HTTPException(401)
return user
# Route just declares what it needs
@app.get("/profile")
async def profile(user: User = Depends(get_current_user)):
return user
Why powerful: Testable (override deps in tests), composable (deps chain), auto-cleanup (yield), cached per request (same dep instance reused).
# Return — simple, no cleanup needed
def get_settings():
return Settings() # just returns a value
# Yield — has setup + teardown (like context manager)
async def get_db():
db = SessionLocal() # SETUP: before yield
try:
yield db # INJECT: this value goes to the route
finally:
db.close() # TEARDOWN: runs after response is sent
Execution order:
yield runs → setupyield value injected into route handleryield runs → teardown (even if route raised an exception)Tricky gotcha: Exceptions in the route handler propagate into the yield dependency. That's why you need try/finally — without it, cleanup won't run on error.
| Feature | Middleware | Dependency |
|---|---|---|
| Scope | Every request | Specific routes |
| Access | Raw request/response | Parsed params |
| Use for | CORS, logging, timing | Auth, DB, validation |
| Can modify response? | Yes | No (only input) |
| Testable? | Harder | Easy (override) |
# Middleware — runs on EVERY request
@app.middleware("http")
async def add_timing(request: Request, call_next):
start = time.time()
response = await call_next(request)
response.headers["X-Process-Time"] = str(time.time() - start)
return response
# Dependency — runs only on routes that declare it
@app.get("/admin", dependencies=[Depends(require_admin)])
async def admin_panel(): ...
Rule: Cross-cutting concerns (logging, CORS, headers) → middleware. Business logic (auth, DB, pagination) → dependencies.
from fastapi.testclient import TestClient
# Production dependency
async def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Test override — use test database
async def override_get_db():
db = TestingSessionLocal()
try:
yield db
finally:
db.rollback()
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# Now all routes using Depends(get_db) will use test DB
response = client.get("/users")
# Clean up after tests
app.dependency_overrides.clear()
Key insight: dependency_overrides is a dict mapping original_dep → replacement_dep. This is why DI is superior to hard-coded function calls — zero code changes for testing.
# async def — runs on the event loop (main thread)
@app.get("/async")
async def async_route():
data = await async_db_query() # non-blocking
return data
# def — FastAPI runs it in a THREAD POOL automatically!
@app.get("/sync")
def sync_route():
data = blocking_db_query() # runs in threadpool
return data
The trap: If you use async def but call blocking code (without await), you block the entire event loop. No other request can be processed.
# ❌ WRONG — blocks event loop!
@app.get("/bad")
async def bad_route():
time.sleep(5) # blocks EVERYTHING for 5 seconds
return {"msg": "done"}
# ✅ Option 1 — use regular def (runs in threadpool)
@app.get("/good1")
def good_route():
time.sleep(5) # runs in thread, event loop free
return {"msg": "done"}
# ✅ Option 2 — use async sleep
@app.get("/good2")
async def good_route():
await asyncio.sleep(5) # non-blocking
return {"msg": "done"}
# ✅ Option 3 — run blocking code in executor
@app.get("/good3")
async def good_route():
data = await asyncio.to_thread(blocking_function)
return data
Rule: async def → only use await-able calls. def → blocking is fine (auto-threadpooled). Mixing them wrong is the #1 FastAPI performance killer.
# ❌ OLD WAY (deprecated in FastAPI 0.93+)
@app.on_event("startup")
async def startup():
app.state.db = create_pool()
@app.on_event("shutdown")
async def shutdown():
await app.state.db.close()
# ✅ NEW WAY — lifespan context manager
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# STARTUP — runs before first request
app.state.db = await create_pool()
app.state.redis = await aioredis.from_url("redis://localhost")
yield # app runs here
# SHUTDOWN — runs after last response
await app.state.db.close()
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
Why lifespan? Startup and shutdown are coupled — resources created in startup must be cleaned in shutdown. The context manager pattern guarantees this. on_event doesn't.
from fastapi import BackgroundTasks
@app.post("/send-email")
async def send_email(
email: str,
background: BackgroundTasks
):
background.add_task(send_email_task, email)
return {"msg": "Email queued"} # returns immediately
def send_email_task(email: str):
# runs AFTER response is sent
smtp.send(email) # blocking is OK — runs in threadpool
Limitations:
When to upgrade: Need retries/persistence → Celery or ARQ. Need event streaming → Kafka. BackgroundTasks is only for fire-and-forget lightweight work.
# ❌ Forgot to await — returns coroutine object, not result!
@app.get("/broken")
async def broken():
result = async_db_query() # missing await!
return result
# Returns: <coroutine object async_db_query at 0x...>
# Plus a RuntimeWarning: coroutine was never awaited
# ❌ Awaiting a regular function
async def handler():
result = await regular_function() # TypeError!
# TypeError: object int can't be used in 'await' expression
# ✅ Run sync function in async context
async def handler():
result = await asyncio.to_thread(regular_function)
Debug tip: If your API returns weird objects or you see coroutine never awaited warnings — you forgot an await.
# Base — shared fields
class UserBase(BaseModel):
name: str
email: str
# Create — what client sends (no id)
class UserCreate(UserBase):
password: str
# Read — what API returns (has id, no password)
class UserRead(UserBase):
id: int
model_config = ConfigDict(from_attributes=True)
# Update — all fields optional
class UserUpdate(BaseModel):
name: str | None = None
email: str | None = None
@app.post("/users", response_model=UserRead)
async def create(user: UserCreate): ...
@app.patch("/users/{id}", response_model=UserRead)
async def update(id: int, user: UserUpdate): ...
Why separate models? Security — UserCreate accepts password, UserRead never exposes it. UserUpdate makes everything optional for PATCH.
# SQLAlchemy model — uses attributes (user.name)
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
# Without from_attributes — FAILS
class UserRead(BaseModel):
id: int
name: str
user_db = db.query(UserDB).first()
UserRead.model_validate(user_db) # ❌ Pydantic expects dict
# With from_attributes — WORKS
class UserRead(BaseModel):
id: int
name: str
model_config = ConfigDict(from_attributes=True)
UserRead.model_validate(user_db) # ✅ reads user_db.id, user_db.name
Old name: class Config: orm_mode = True (Pydantic v1). New name: from_attributes=True (Pydantic v2).
Tells Pydantic to read data from object attributes (obj.field), not just dict keys (obj["field"]).
from fastapi import Query, Path, Body, Header
@app.get("/items/{item_id}")
async def read_item(
# Path — from URL path
item_id: int = Path(ge=1, description="Item ID"),
# Query — from ?key=value
q: str = Query(max_length=50, default=None),
# Header — from request headers
x_token: str = Header(),
):
...
@app.post("/items")
async def create(
# Body — from JSON body (can embed multiple)
item: Item = Body(embed=True),
):
...
Field is for inside Pydantic models. Query/Path/Body/Header are for route function parameters. They all support the same validation options (ge, max_length, regex, etc.).
class ItemUpdate(BaseModel):
name: str | None = None
price: float | None = None
description: str | None = None
@app.patch("/items/{item_id}")
async def update_item(item_id: int, updates: ItemUpdate):
# exclude_unset=True is the KEY — only gets fields client sent
update_data = updates.model_dump(exclude_unset=True)
# Without exclude_unset:
# {"name": None, "price": None, "description": None}
# With exclude_unset (client sent {"name": "New"}):
# {"name": "New"} ← only what was actually sent
db_item = get_item(item_id)
for field, value in update_data.items():
setattr(db_item, field, value)
db.commit()
Gotcha: Without exclude_unset=True, you'll overwrite every field with None — even ones the client didn't send. This is the #1 PATCH bug.
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt
from datetime import datetime, timedelta
SECRET = "your-secret-key"
ALGO = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
# 1. Create token
def create_token(data: dict, expires: timedelta = timedelta(hours=1)):
payload = {**data, "exp": datetime.utcnow() + expires}
return jwt.encode(payload, SECRET, algorithm=ALGO)
# 2. Verify token (dependency)
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET, algorithms=[ALGO])
user_id = payload.get("sub")
if not user_id: raise HTTPException(401)
except jwt.JWTError:
raise HTTPException(401, "Invalid token")
return get_user_from_db(user_id)
# 3. Login route
@app.post("/login")
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = authenticate(form.username, form.password)
if not user: raise HTTPException(401)
token = create_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}
# 4. Protected route
@app.get("/me")
async def me(user = Depends(get_current_user)):
return user
from enum import Enum
class Role(str, Enum):
USER = "user"
ADMIN = "admin"
MODERATOR = "moderator"
# Dependency factory — returns a dependency!
def require_role(*roles: Role):
async def checker(user = Depends(get_current_user)):
if user.role not in roles:
raise HTTPException(403, "Forbidden")
return user
return checker
# Usage — clean and declarative
@app.delete("/users/{id}")
async def delete_user(
id: int,
admin = Depends(require_role(Role.ADMIN))
):
...
@app.put("/posts/{id}")
async def edit_post(
id: int,
user = Depends(require_role(Role.ADMIN, Role.MODERATOR))
):
...
Key pattern: require_role is a dependency factory — a function that returns a dependency. This lets you parameterize access control per route.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)
# ✅ Correct — yield dependency with cleanup
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close() # ALWAYS close, even on error
@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
# ❌ WRONG — session leaks on exceptions!
@app.get("/bad")
def bad_route():
db = SessionLocal()
users = db.query(User).all() # if this throws...
db.close() # this never runs!
return users
Common gotcha: FastAPI's Depends ensures one session per request. If you create sessions manually inside routes, you risk leaks and inconsistent transactions.
# Sync — simpler, use with `def` routes
from sqlalchemy import create_engine
engine = create_engine("postgresql://...")
@app.get("/users")
def list_users(db: Session = Depends(get_db)):
return db.query(User).all() # FastAPI auto-threadpools this
# Async — more complex, higher throughput
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = async_sessionmaker(engine)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
Rule of thumb: Start with sync (simpler). Switch to async when you need high concurrency (1000+ concurrent connections). Sync with def routes is fast enough for most apps since FastAPI auto-threadpools them.
# Setup
# pip install alembic
# alembic init alembic
# alembic/env.py — point to your models
from app.models import Base
target_metadata = Base.metadata
# Generate migration after model changes
# alembic revision --autogenerate -m "add users table"
# Apply migration
# alembic upgrade head
# Rollback
# alembic downgrade -1
Interview trap: "What happens if you change a model but don't create a migration?" → Nothing changes in the database. SQLAlchemy models and database schema are independent. Alembic bridges them.
Production tip: Always review autogenerated migrations. They can miss renames (sees drop + create instead) and data migrations.
# 3-layer architecture:
# Router → Service → Repository
# router — HTTP concerns only
@router.get("/users/{id}", response_model=UserRead)
async def get_user(id: int, service: UserService = Depends()):
return service.get_by_id(id)
# service — business logic
class UserService:
def __init__(self, repo: UserRepo = Depends()):
self.repo = repo
def get_by_id(self, id: int):
user = self.repo.find(id)
if not user:
raise HTTPException(404)
return user
# repository — database access only
class UserRepo:
def __init__(self, db: Session = Depends(get_db)):
self.db = db
def find(self, id: int):
return self.db.query(User).get(id)
Why? Router doesn't know about DB. Service doesn't know about HTTP. Repository doesn't know about business rules. Each layer is independently testable.
# app/routers/users.py
from fastapi import APIRouter
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(get_current_user)] # applies to ALL routes
)
@router.get("/")
async def list_users(): ... # GET /users/
@router.get("/{id}")
async def get_user(id: int): ... # GET /users/42
# app/main.py
from app.routers import users, products, orders
app = FastAPI()
app.include_router(users.router)
app.include_router(products.router)
app.include_router(orders.router, prefix="/v2") # → /v2/orders/
Key: Routers have their own prefix, tags, and dependencies. include_router can add additional prefix (useful for API versioning).
# Custom exception class
class NotFoundError(Exception):
def __init__(self, resource: str, id: int):
self.resource = resource
self.id = id
# Register handler — catches this exception ANYWHERE
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError):
return JSONResponse(
status_code=404,
content={
"error": f"{exc.resource} {exc.id} not found",
"type": "not_found"
}
)
# Now just raise anywhere — no try/except in routes
class UserService:
def get(self, id: int):
user = self.repo.find(id)
if not user:
raise NotFoundError("User", id) # caught globally
return user
Gotcha: Exception handlers don't catch HTTPException by default — FastAPI has its own handler for that. To override it, register a handler for HTTPException explicitly.
# Option 1 — URL prefix (most common)
app.include_router(users_v1.router, prefix="/v1")
app.include_router(users_v2.router, prefix="/v2")
# GET /v1/users, GET /v2/users
# Option 2 — Header-based
@app.get("/users")
async def get_users(accept_version: str = Header(default="1")):
if accept_version == "2":
return v2_response()
return v1_response()
# Option 3 — Separate FastAPI apps (for major versions)
app_v1 = FastAPI()
app_v2 = FastAPI()
main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)
Best practice: URL prefix for simplicity. mount() for completely different API versions with separate docs pages.
@app.get("/users/{user_id}")
async def get_user(user_id: str):
return {"user_id": user_id}
@app.get("/users/me")
async def get_me():
return {"user": "current"}
# GET /users/me → {"user_id": "me"} — WRONG!
Why? Route order matters! /users/{user_id} matches /users/me first because it was defined first. FastAPI evaluates routes top to bottom.
Fix — put specific routes BEFORE parameterized ones:
@app.get("/users/me") # ← FIRST
async def get_me(): ...
@app.get("/users/{user_id}") # ← SECOND
async def get_user(user_id: str): ...
class UserDB(BaseModel):
id: int
name: str
email: str
hashed_password: str # SECRET!
# ❌ SECURITY BUG — leaks password hash!
@app.get("/users/{id}")
async def get_user(id: int):
user = db.query(User).get(id)
return user # returns ALL fields including hashed_password
# ✅ FIX — use response_model to filter output
class UserRead(BaseModel):
id: int
name: str
email: str
# no password field!
@app.get("/users/{id}", response_model=UserRead)
async def get_user(id: int):
return db.query(User).get(id) # password stripped by response_model
Lesson: response_model isn't just for docs — it's a security filter. Without it, you can accidentally expose internal fields (passwords, tokens, internal IDs).
async def get_db():
print("Creating DB session") # How many times?
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_user(db = Depends(get_db)):
return db.query(User).first()
async def get_settings(db = Depends(get_db)):
return db.query(Settings).first()
@app.get("/")
async def home(
user = Depends(get_user),
settings = Depends(get_settings)
):
return {"user": user, "settings": settings}
# "Creating DB session" prints ONCE, not twice!
Why? FastAPI caches dependencies per request by default. Same dependency function → same instance within one request. This is why get_db runs once even though two sub-dependencies use it.
To disable caching (rare — e.g., you want a fresh random value):
async def get_random():
return random.randint(1, 100)
# use_cache=False → runs fresh each time
@app.get("/")
async def route(
a = Depends(get_random, use_cache=False),
b = Depends(get_random, use_cache=False)
):
return {"a": a, "b": b} # different values!
# ❌ This looks fine but is WRONG in Pydantic v1
class Item(BaseModel):
tags: list[str] = [] # shared mutable default!
# Pydantic v2 actually handles this correctly — it copies the default
# But in v1, this was a real trap (same as Python's mutable default arg)
# ✅ Safe way (works in both v1 and v2)
class Item(BaseModel):
tags: list[str] = Field(default_factory=list)
# The REAL Pydantic trap — model_validate vs __init__
class User(BaseModel):
name: str
age: int
# These are NOT the same!
u1 = User(name="Alice", age="25") # Works! Coerces "25" → 25
u2 = User(name="Alice", age="abc") # ValidationError!
# Strict mode — no coercion
class StrictUser(BaseModel):
model_config = ConfigDict(strict=True)
name: str
age: int
StrictUser(name="Alice", age="25") # ❌ ValidationError! str != int
Key insight: Pydantic coerces types by default — "25" becomes 25. This is usually helpful but can hide bugs. Use strict=True when you need exact types.
@app.get("/items")
async def items_v1():
return ["v1"]
@app.get("/items")
async def items_v2():
return ["v2"]
# GET /items → ["v1"] — first one wins, second is SILENTLY ignored!
# No error, no warning. 😱
Why it's dangerous: When using include_router with multiple routers, you can accidentally create duplicate routes — especially with "/" paths. FastAPI won't warn you.
Debug tip: Check /docs (Swagger UI) — duplicate routes will show up there. Or inspect app.routes programmatically.
from collections import defaultdict
from time import time
# In-memory rate limiter (use Redis in production)
requests_log: dict[str, list[float]] = defaultdict(list)
def rate_limit(max_requests: int = 10, window: int = 60):
async def checker(request: Request):
client_ip = request.client.host
now = time()
# Clean old entries
requests_log[client_ip] = [
t for t in requests_log[client_ip]
if now - t < window
]
if len(requests_log[client_ip]) >= max_requests:
raise HTTPException(
429,
detail="Too many requests",
headers={"Retry-After": str(window)}
)
requests_log[client_ip].append(now)
return checker
# Usage — 5 requests per 60 seconds
@app.get("/api/data", dependencies=[Depends(rate_limit(5, 60))])
async def get_data():
return {"data": "here"}
Why this is tricky: rate_limit(5, 60) is a dependency factory — it returns a function that FastAPI calls. The outer function configures, the inner function executes per request.
Production: Use Redis instead of in-memory dict — this won't work with multiple workers.
# ❌ BUG — doesn't handle disconnection!
@app.websocket("/ws")
async def websocket(ws: WebSocket):
await ws.accept()
while True:
data = await ws.receive_text() # crashes on disconnect!
await ws.send_text(f"Echo: {data}")
# ✅ FIX — handle WebSocketDisconnect
from fastapi import WebSocketDisconnect
@app.websocket("/ws")
async def websocket(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected") # clean up here
Another gotcha: WebSocket routes don't use response_model, don't return HTTP status codes, and middleware runs differently for them. Auth must be handled manually (usually via query params since headers aren't sent in the initial handshake from browsers).
# FastAPI reads your type hints, models, and decorators
# to auto-generate the OpenAPI schema at /openapi.json
# Customize per-route:
@app.get(
"/users",
summary="List all users",
description="Returns paginated user list",
response_description="List of users",
tags=["users"],
deprecated=False,
responses={
404: {"description": "Not found"},
500: {"description": "Server error"},
}
)
async def list_users(): ...
# Customize globally:
app = FastAPI(
title="My API",
version="2.0.0",
description="Production API",
docs_url="/swagger", # custom path (default: /docs)
redoc_url="/documentation", # custom path (default: /redoc)
openapi_url="/api/schema", # custom schema path
)
# Disable docs in production:
app = FastAPI(docs_url=None, redoc_url=None)
Key insight: Everything in Swagger comes from your code — type hints, Pydantic models, decorator args. Better type annotations = better docs. Zero extra effort.