FastAPI Mastery

Build modern Python APIs at lightning speed. Every concept explained with code.

01 What is FastAPI?

FastAPI is a modern, high-performance web framework for building APIs with Python 3.7+ based on standard type hints. It's built on top of Starlette (for the web parts) and Pydantic (for data validation).

Why FastAPI?

Fastest Python framework with automatic validation, docs, and async support — all from type hints.

FeatureFlaskFastAPIDjango REST
SpeedSlow (WSGI)Very fast (ASGI)Slow (WSGI)
AsyncLimitedNativeLimited
ValidationManualAutomatic (Pydantic)Serializers
Auto docsNoYes (Swagger + ReDoc)Plugin
Type hintsOptionalCore featureOptional
Learning curveEasyEasySteep

02 Setup & First App

# Install
pip install fastapi uvicorn[standard]

# uvicorn = ASGI server that runs your FastAPI app

Hello World

A working API in just a few lines — create an app, add a route, and run it.

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def root():
    return {"message": "Hello, World!"}

@app.get("/health")
def health():
    return {"status": "ok"}
# Run it
uvicorn main:app --reload

# main   = file name (main.py)
# app    = the FastAPI instance variable
# --reload = auto-restart on code changes (dev only)

# Now visit:
# http://127.0.0.1:8000        → your API
# http://127.0.0.1:8000/docs   → Swagger UI (interactive docs)
# http://127.0.0.1:8000/redoc  → ReDoc (alternative docs)
That's it. You have a running API with auto-generated interactive documentation. No config, no boilerplate.

03 Routing & HTTP Methods

from fastapi import FastAPI

app = FastAPI()

# Each decorator = one endpoint
@app.get("/users")          # GET    — read data
def list_users(): ...

@app.post("/users")         # POST   — create data
def create_user(): ...

@app.put("/users/{id}")     # PUT    — full update
def update_user(id: int): ...

@app.patch("/users/{id}")   # PATCH  — partial update
def patch_user(id: int): ...

@app.delete("/users/{id}")  # DELETE — remove data
def delete_user(id: int): ...

# Route order matters! More specific routes FIRST.
@app.get("/users/me")       # ← this MUST come before /users/{id}
def get_current_user(): ...

@app.get("/users/{id}")     # ← otherwise "me" would match as an {id}
def get_user(id: int): ...

APIRouter — Organize Routes

Split your routes into separate files with prefixes and tags — keeps large APIs clean and organized.

# routes/users.py
from fastapi import APIRouter

router = APIRouter(prefix="/users", tags=["Users"])

@router.get("/")
def list_users(): ...

@router.get("/{id}")
def get_user(id: int): ...

# main.py
from fastapi import FastAPI
from routes.users import router as users_router

app = FastAPI()
app.include_router(users_router)  # /users/ and /users/{id}

04 Path Parameters

# Type hint = automatic validation + conversion
@app.get("/users/{user_id}")
def get_user(user_id: int):      # auto-converted to int
    return {"user_id": user_id}

# GET /users/42    → {"user_id": 42}
# GET /users/hello → 422 Validation Error (not an int)

# ── Enum for fixed values ──
from enum import Enum

class Role(str, Enum):
    admin = "admin"
    user = "user"
    guest = "guest"

@app.get("/roles/{role}")
def get_role(role: Role):
    return {"role": role.value}

# GET /roles/admin → {"role": "admin"}
# GET /roles/xyz   → 422 error (not a valid Role)

05 Query Parameters

# Any parameter NOT in the path = query parameter
@app.get("/users")
def list_users(
    skip: int = 0,           # optional, default 0
    limit: int = 10,          # optional, default 10
    active: bool = True       # optional, default True
):
    return {"skip": skip, "limit": limit, "active": active}

# GET /users?skip=20&limit=5&active=false

# ── Required query params (no default) ──
@app.get("/search")
def search(q: str):           # required! no default.
    return {"query": q}

# GET /search         → 422 error (q is required)
# GET /search?q=hello → {"query": "hello"}

# ── Optional with None ──
@app.get("/items")
def list_items(q: str | None = None):
    if q:
        return {"results": f"searching for {q}"}
    return {"results": "all items"}

# ── Validation with Query() ──
from fastapi import Query

@app.get("/search")
def search(
    q: str = Query(min_length=3, max_length=50, pattern="^[a-z]+"),
    page: int = Query(default=1, ge=1),           # >= 1
    size: int = Query(default=10, le=100),         # <= 100
):
    return {"q": q, "page": page, "size": size}

06 Request Body

from pydantic import BaseModel

class UserCreate(BaseModel):
    name: str
    email: str
    age: int | None = None

@app.post("/users")
def create_user(user: UserCreate):
    # user is already validated and typed!
    return {"name": user.name, "email": user.email}

# POST /users
# Body: {"name": "Yatin", "email": "y@dev.com"}
# → {"name": "Yatin", "email": "y@dev.com"}

# POST /users
# Body: {"name": 123}  → 422 error (name must be str, email missing)

# ── Body + Path + Query together ──
@app.put("/users/{user_id}")
def update_user(
    user_id: int,              # path param
    user: UserCreate,            # request body
    notify: bool = False       # query param
):
    return {"id": user_id, "user": user.model_dump(), "notify": notify}

07 Pydantic Models Deep Dive

Pydantic is the backbone of FastAPI. It handles validation, serialization, and documentation all from your type hints.

from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime

class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    email: str = Field(pattern=r"^[\w.-]+@[\w.-]+\.\w+$")
    age: int = Field(ge=0, le=150)
    tags: list[str] = []

    # Custom validator
    @field_validator("name")
    @classmethod
    def name_must_be_alpha(cls, v):
        if not v.replace(" ", "").isalpha():
            raise ValueError("Name must contain only letters")
        return v.title()

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    created_at: datetime

    model_config = {"from_attributes": True}  # read from ORM objects

# ── Nested models ──
class Address(BaseModel):
    street: str
    city: str
    country: str = "US"

class UserFull(BaseModel):
    name: str
    address: Address              # nested!
    friends: list["UserFull"] = []  # self-referencing

# ── Useful methods ──
user = UserCreate(name="yatin", email="y@dev.com", age=25)
user.model_dump()            # → dict
user.model_dump_json()       # → JSON string
user.model_dump(exclude={"tags"})  # exclude fields
UserCreate.model_json_schema()     # → JSON Schema (for docs)

08 Response Models

# response_model controls what the client SEES
# Your function can return more data — FastAPI filters it

class UserDB(BaseModel):
    id: int
    name: str
    email: str
    hashed_password: str   # sensitive!

class UserOut(BaseModel):
    id: int
    name: str
    email: str
    # no password field → it's filtered out

@app.get("/users/{id}", response_model=UserOut)
def get_user(id: int):
    # even if this returns a UserDB with password,
    # the response only includes id, name, email
    return UserDB(id=id, name="Yatin", email="y@dev.com", hashed_password="xxx")

# ── List responses ──
@app.get("/users", response_model=list[UserOut])
def list_users():
    return [...]

09 Status Codes & Error Handling

from fastapi import FastAPI, HTTPException, status

# ── Set status code ──
@app.post("/users", status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate):
    return user

# ── Raise errors ──
@app.get("/users/{id}")
def get_user(id: int):
    user = fake_db.get(id)
    if not user:
        raise HTTPException(
            status_code=404,
            detail="User not found",
            headers={"X-Error": "not-found"}
        )
    return user

# ── Custom exception handler ──
from fastapi.responses import JSONResponse

class ItemNotFound(Exception):
    def __init__(self, item_id: int):
        self.item_id = item_id

@app.exception_handler(ItemNotFound)
async def item_not_found_handler(request, exc):
    return JSONResponse(
        status_code=404,
        content={"error": f"Item {exc.item_id} not found"}
    )
CodeMeaningWhen to use
200OKSuccessful GET/PUT/PATCH
201CreatedSuccessful POST
204No ContentSuccessful DELETE
400Bad RequestInvalid input
401UnauthorizedNot authenticated
403ForbiddenAuthenticated but no permission
404Not FoundResource doesn't exist
422UnprocessableValidation error (auto by FastAPI)
500Server ErrorUnhandled exception

10 Headers & Cookies

from fastapi import Header, Cookie, Response

# ── Read headers ──
@app.get("/agent")
def get_agent(user_agent: str = Header()):
    return {"agent": user_agent}
# Header names auto-convert: user_agent → User-Agent

# ── Read cookies ──
@app.get("/me")
def get_me(session_id: str | None = Cookie(default=None)):
    return {"session": session_id}

# ── Set headers & cookies in response ──
@app.post("/login")
def login(response: Response):
    response.set_cookie(key="session_id", value="abc123", httponly=True)
    response.headers["X-Custom"] = "my-value"
    return {"message": "logged in"}

11 Dependency Injection

FastAPI's Depends() system lets you share logic across endpoints — database sessions, auth checks, pagination, etc.

from fastapi import Depends

# ── Simple dependency ──
def get_db():
    db = SessionLocal()
    try:
        yield db           # dependency with cleanup!
    finally:
        db.close()

@app.get("/users")
def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()

# ── Dependency with parameters ──
def pagination(skip: int = 0, limit: int = 10):
    return {"skip": skip, "limit": limit}

@app.get("/users")
def list_users(page: dict = Depends(pagination)):
    return {"pagination": page}

# ── Auth dependency ──
def get_current_user(token: str = Header()):
    user = decode_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

@app.get("/profile")
def profile(user = Depends(get_current_user)):
    return {"user": user}

# ── Class-based dependency ──
class CommonParams:
    def __init__(self, q: str | None = None, skip: int = 0, limit: int = 10):
        self.q = q
        self.skip = skip
        self.limit = limit

@app.get("/items")
def list_items(params: CommonParams = Depends()):
    return params

12 Middleware & CORS

from fastapi.middleware.cors import CORSMiddleware
import time

# ── CORS (allow frontend to call your API) ──
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],  # or ["*"] for all
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Custom middleware ──
@app.middleware("http")
async def add_timing(request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    elapsed = time.perf_counter() - start
    response.headers["X-Process-Time"] = str(elapsed)
    return response

13 Authentication (JWT)

# pip install python-jose[cryptography] passlib[bcrypt]
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from passlib.context import CryptContext
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")

# ── Hash & verify passwords ──
def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

# ── Create JWT token ──
def create_token(data: dict, expires_minutes: int = 30):
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + timedelta(minutes=expires_minutes)
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

# ── Decode & verify token ──
def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if not user_id:
            raise HTTPException(status_code=401, detail="Invalid token")
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")
    return get_user_from_db(user_id)

# ── Login endpoint ──
@app.post("/login")
def login(form: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form.username, form.password)
    if not user:
        raise HTTPException(status_code=401, detail="Bad credentials")
    token = create_token({"sub": str(user.id)})
    return {"access_token": token, "token_type": "bearer"}

# ── Protected endpoint ──
@app.get("/me")
def me(user = Depends(get_current_user)):
    return user

14 Database (SQLAlchemy)

# pip install sqlalchemy

# ── database.py ──
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

DATABASE_URL = "sqlite:///./app.db"
# For PostgreSQL: "postgresql://user:pass@localhost/dbname"

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=engine, autoflush=False)

class Base(DeclarativeBase):
    pass

# ── models.py ──
from sqlalchemy import Column, Integer, String, Boolean
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, nullable=False)
    email = Column(String, unique=True, index=True)
    is_active = Column(Boolean, default=True)

# ── Dependency ──
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# ── CRUD endpoints ──
@app.post("/users", status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(name=user.name, email=user.email)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users")
def list_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    return db.query(User).offset(skip).limit(limit).all()

@app.get("/users/{id}")
def get_user(id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == id).first()
    if not user:
        raise HTTPException(status_code=404)
    return user

@app.delete("/users/{id}", status_code=204)
def delete_user(id: int, db: Session = Depends(get_db)):
    db.query(User).filter(User.id == id).delete()
    db.commit()

15 Async & Background Tasks

import httpx
from fastapi import BackgroundTasks

# ── async endpoint (use for I/O-bound work) ──
@app.get("/external")
async def fetch_external():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com/data")
    return resp.json()

# Use `async def` when you have `await` calls
# Use regular `def` for CPU-bound or sync DB calls
# FastAPI runs regular `def` in a thread pool automatically

# ── Background tasks ──
def send_email(email: str, message: str):
    # slow operation — runs AFTER response is sent
    print(f"Sending to {email}: {message}")

@app.post("/signup")
def signup(email: str, bg: BackgroundTasks):
    bg.add_task(send_email, email, "Welcome!")
    return {"message": "Signed up!"}  # returns immediately

16 File Uploads

from fastapi import File, UploadFile

# ── Single file ──
@app.post("/upload")
async def upload(file: UploadFile):
    contents = await file.read()
    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": len(contents)
    }

# ── Multiple files ──
@app.post("/upload-many")
async def upload_many(files: list[UploadFile]):
    return [{"name": f.filename} for f in files]

# ── Save to disk ──
@app.post("/upload-save")
async def save_file(file: UploadFile):
    with open(f"uploads/{file.filename}", "wb") as f:
        f.write(await file.read())
    return {"saved": file.filename}

17 WebSockets

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

# ── Chat room (multiple clients) ──
class ConnectionManager:
    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, message: str):
        for conn in self.connections:
            await conn.send_text(message)

manager = ConnectionManager()

18 Project Structure

Production Architecture

In production, a FastAPI app sits behind a reverse proxy (nginx) that handles SSL, static files, and load balancing. The app runs via gunicorn (process manager) spawning multiple uvicorn workers. The database is a separate service.

# How a request flows in production:
#
# Client → nginx (SSL, rate limit) → gunicorn (process manager)
#            │                            │
#            │                     ┌──────┼──────┐
#            │                     │      │      │
#            │                  worker  worker  worker  (uvicorn)
#            │                     │      │      │
#            │                     └──────┼──────┘
#            │                            │
#            │                      PostgreSQL / Redis
#
# Run command:
# gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
#                        │         │
#                    4 workers   ASGI worker class

Full Folder Structure

The complete layout for a production FastAPI app — models, schemas, routers, services, and tests all separated.

my-api/ │ ├── app/ # all application code lives here │ ├── __init__.py # makes app/ a Python package │ ├── main.py # FastAPI app instance, startup, routers │ ├── config.py # env vars, settings (pydantic-settings) │ ├── database.py # SQLAlchemy engine, session, Base class │ ├── dependencies.py # shared deps: get_db, get_current_user │ │ │ ├── models/ # SQLAlchemy ORM models (DB tables) │ │ ├── __init__.py # re-exports all models │ │ ├── user.py # User table │ │ └── post.py # Post table │ │ │ ├── schemas/ # Pydantic models (request/response shapes) │ │ ├── __init__.py │ │ ├── user.py # UserCreate, UserResponse, UserUpdate │ │ └── post.py # PostCreate, PostResponse │ │ │ ├── routers/ # API route handlers (thin layer) │ │ ├── __init__.py │ │ ├── users.py # /users endpoints │ │ ├── posts.py # /posts endpoints │ │ └── auth.py # /login, /register, /me │ │ │ ├── services/ # business logic (reusable, testable) │ │ ├── __init__.py │ │ ├── user_service.py # create_user, get_user, etc. │ │ └── auth_service.py # hash_password, create_token, etc. │ │ │ └── utils/ # helpers that don't fit elsewhere │ ├── __init__.py │ └── email.py # send_email, etc. │ ├── alembic/ # DB migration scripts (auto-generated) │ ├── env.py # migration config │ └── versions/ # migration files │ └── 001_create_users.py │ ├── tests/ # all tests mirror app/ structure │ ├── conftest.py # fixtures: test client, test DB │ ├── test_users.py │ └── test_auth.py │ ├── alembic.ini # alembic config (DB URL) ├── pyproject.toml # project metadata, deps, tool config ├── requirements.txt # pip freeze output ├── Dockerfile # container build ├── docker-compose.yml # app + postgres + redis ├── .env # secrets (NEVER commit this) ├── .env.example # template for .env (commit this) └── .gitignore
Key principle: Routers are thin — they only handle HTTP (parse request, call service, return response). Services hold the business logic. Models define the DB. Schemas define the API contract. This separation makes everything testable and swappable.

Every File Explained

A deep dive into what each file does, why it exists, and the actual code that goes inside it.

app/config.py — Environment & Settings

# Reads from .env file or environment variables.
# Single source of truth for ALL configuration.
# Never hardcode secrets — always use this.

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # ── App ──
    app_name: str = "My API"
    debug: bool = False                 # True in dev, False in prod
    api_version: str = "v1"

    # ── Database ──
    database_url: str                    # required! no default = must be set

    # ── Auth ──
    secret_key: str                      # for JWT signing
    access_token_expire_minutes: int = 30

    # ── CORS ──
    allowed_origins: list[str] = ["http://localhost:3000"]

    # ── External services ──
    redis_url: str = "redis://localhost:6379"
    smtp_host: str | None = None

    # pydantic-settings reads from .env file automatically
    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}


# Cache it — settings are read once, reused everywhere
@lru_cache
def get_settings() -> Settings:
    return Settings()

app/database.py — Database Connection

# Sets up SQLAlchemy engine, session factory, and Base class.
# Every file that talks to the DB imports from here.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import get_settings

settings = get_settings()

# Engine = connection pool to the database
# pool_pre_ping=True: test connections before using (handles DB restarts)
engine = create_engine(
    settings.database_url,
    pool_pre_ping=True,
    # pool_size=20,         # max persistent connections
    # max_overflow=10,      # extra connections under load
)

# SessionLocal = factory that creates DB sessions
# autocommit=False: you control when to commit
# autoflush=False: don't auto-sync to DB (explicit is better)
SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,
    autoflush=False,
)

# Base = all ORM models inherit from this
class Base(DeclarativeBase):
    pass

app/dependencies.py — Shared Dependencies

# Functions injected via Depends() into route handlers.
# Keeps routers clean — they don't know how DB or auth works.

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import jwt, JWTError
from app.database import SessionLocal
from app.config import get_settings
from app.models.user import User

settings = get_settings()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


# ── Database session dependency ──
# yield = the session is given to the route handler,
# then ALWAYS closed in finally (even if route crashes)
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# ── Current user dependency ──
# Extracts and validates the JWT token from the Authorization header.
# If invalid → 401. If valid → returns the User object.
def get_current_user(
    token: str = Depends(oauth2_scheme),   # extract Bearer token
    db: Session = Depends(get_db),          # get DB session
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise credentials_exception
    return user


# ── Admin-only dependency (chains on get_current_user) ──
def get_admin_user(user: User = Depends(get_current_user)) -> User:
    if user.role != "admin":
        raise HTTPException(status_code=403, detail="Admin access required")
    return user

app/models/user.py — Database Table

# SQLAlchemy ORM model = Python class that maps to a DB table.
# Each attribute = a column. This is your source of truth for the DB schema.

from sqlalchemy import Column, Integer, String, Boolean, DateTime, func
from sqlalchemy.orm import relationship
from app.database import Base


class User(Base):
    __tablename__ = "users"   # actual table name in the DB

    # ── Columns ──
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    role = Column(String(20), default="user")           # "user" or "admin"
    is_active = Column(Boolean, default=True)

    # ── Timestamps (auto-set by DB) ──
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

    # ── Relationships ──
    posts = relationship("Post", back_populates="author", cascade="all, delete")

    def __repr__(self):
        return f"User(id={self.id}, email={self.email})"

app/schemas/user.py — API Contracts (Pydantic)

# Pydantic models define the SHAPE of your API.
# Separate from ORM models! Schema = what the client sees.
# Model = what the database stores.

from pydantic import BaseModel, Field, EmailStr
from datetime import datetime


# ── What the client SENDS to create a user ──
class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    email: EmailStr                           # auto-validates email format
    password: str = Field(min_length=8)       # plain text — we hash it


# ── What the client SENDS to update a user ──
class UserUpdate(BaseModel):
    name: str | None = None                  # all optional for PATCH
    email: EmailStr | None = None


# ── What the client RECEIVES back ──
# Notice: no password field! Never expose passwords.
class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    role: str
    is_active: bool
    created_at: datetime

    # from_attributes = True lets Pydantic read from ORM objects
    # (SQLAlchemy models use .email not ["email"])
    model_config = {"from_attributes": True}


# ── Auth schemas ──
class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

app/services/user_service.py — Business Logic

# All the "brain" of your app lives here.
# Services are pure logic — no HTTP, no request/response objects.
# This makes them reusable (CLI, workers, tests) and easy to test.

from sqlalchemy.orm import Session
from fastapi import HTTPException
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.services.auth_service import hash_password


def get_user_by_id(db: Session, user_id: int) -> User:
    """Fetch user or raise 404."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user


def get_user_by_email(db: Session, email: str) -> User | None:
    """Fetch user by email (for login, duplicate checks)."""
    return db.query(User).filter(User.email == email).first()


def list_users(db: Session, skip: int = 0, limit: int = 20) -> list[User]:
    """Paginated user list."""
    return db.query(User).offset(skip).limit(limit).all()


def create_user(db: Session, data: UserCreate) -> User:
    """Create a new user. Hashes password, checks for duplicates."""

    # Check if email already exists
    if get_user_by_email(db, data.email):
        raise HTTPException(status_code=400, detail="Email already registered")

    # Create ORM object (hash the password!)
    user = User(
        name=data.name,
        email=data.email,
        hashed_password=hash_password(data.password),
    )

    # Add to DB session, commit, refresh to get auto-generated fields
    db.add(user)
    db.commit()
    db.refresh(user)         # now user.id and user.created_at are populated
    return user


def update_user(db: Session, user_id: int, data: UserUpdate) -> User:
    """Partial update — only change fields that were sent."""
    user = get_user_by_id(db, user_id)

    # model_dump(exclude_unset=True) only includes fields the client sent
    # so PATCH /users/1 {"name": "New"} only updates name, not email
    update_data = data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(user, field, value)

    db.commit()
    db.refresh(user)
    return user


def delete_user(db: Session, user_id: int) -> None:
    """Delete user or raise 404."""
    user = get_user_by_id(db, user_id)
    db.delete(user)
    db.commit()

app/services/auth_service.py — Auth Logic

# Password hashing and JWT token creation/verification.
# Completely independent of HTTP — can be used anywhere.

from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from app.config import get_settings

settings = get_settings()

# bcrypt is the industry standard for password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    """One-way hash. Can't reverse it — only verify."""
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    """Check if plain text matches the hash."""
    return pwd_context.verify(plain, hashed)


def create_access_token(user_id: int) -> str:
    """Create a JWT token with user_id and expiration."""
    payload = {
        "sub": user_id,                          # subject = who this token is for
        "exp": datetime.utcnow() + timedelta(      # when it expires
            minutes=settings.access_token_expire_minutes
        ),
        "iat": datetime.utcnow(),                  # issued at
    }
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")

app/routers/users.py — Route Handlers

# Routers are THIN. They only:
# 1. Define the HTTP method + path
# 2. Declare dependencies (DB, auth)
# 3. Call the service layer
# 4. Return the response
# NO business logic here!

from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.schemas.user import UserCreate, UserUpdate, UserResponse
from app.services import user_service
from app.models.user import User

# prefix = all routes here start with /users
# tags = group in Swagger UI docs
router = APIRouter(prefix="/users", tags=["Users"])


@router.get("/", response_model=list[UserResponse])
def list_users(
    skip: int = 0,
    limit: int = 20,
    db: Session = Depends(get_db),              # injected DB session
):
    """Get paginated list of users."""
    return user_service.list_users(db, skip, limit)


@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
    """Get a single user by ID."""
    return user_service.get_user_by_id(db, user_id)


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(data: UserCreate, db: Session = Depends(get_db)):
    """Register a new user."""
    return user_service.create_user(db, data)


@router.patch("/{user_id}", response_model=UserResponse)
def update_user(
    user_id: int,
    data: UserUpdate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),  # must be logged in
):
    """Update user (partial). Requires authentication."""
    return user_service.update_user(db, user_id, data)


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
    user_id: int,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    """Delete a user. Requires authentication."""
    user_service.delete_user(db, user_id)

app/routers/auth.py — Auth Routes

from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.schemas.user import UserCreate, UserResponse, Token
from app.services import user_service, auth_service

router = APIRouter(prefix="/auth", tags=["Auth"])


@router.post("/register", response_model=UserResponse, status_code=201)
def register(data: UserCreate, db: Session = Depends(get_db)):
    """Public registration endpoint."""
    return user_service.create_user(db, data)


@router.post("/login", response_model=Token)
def login(
    form: OAuth2PasswordRequestForm = Depends(),  # username + password form
    db: Session = Depends(get_db),
):
    """Login → returns JWT token."""

    # 1. Find user by email (OAuth2 form uses "username" field)
    user = user_service.get_user_by_email(db, form.username)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid credentials")

    # 2. Verify password
    if not auth_service.verify_password(form.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Invalid credentials")

    # 3. Create and return token
    token = auth_service.create_access_token(user.id)
    return {"access_token": token, "token_type": "bearer"}


@router.get("/me", response_model=UserResponse)
def me(current_user = Depends(get_current_user)):
    """Get the currently logged-in user."""
    return current_user

app/main.py — App Entry Point

# This is the root of your application.
# It creates the FastAPI instance, wires up routers,
# middleware, exception handlers, and startup/shutdown events.

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import get_settings
from app.database import engine, Base
from app.routers import users, auth

settings = get_settings()


# ── Startup / Shutdown events ──
@asynccontextmanager
async def lifespan(app: FastAPI):
    # ── STARTUP: runs once when the app starts ──
    print("Creating database tables...")
    Base.metadata.create_all(bind=engine)   # create tables if not exist
    yield
    # ── SHUTDOWN: runs when the app stops ──
    print("Shutting down...")


# ── Create the FastAPI app ──
app = FastAPI(
    title=settings.app_name,
    version=settings.api_version,
    docs_url="/docs",          # Swagger UI
    redoc_url="/redoc",        # ReDoc
    lifespan=lifespan,
)


# ── Middleware ──
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ── Include routers ──
# Each router adds its own set of endpoints
app.include_router(auth.router)     # /auth/login, /auth/register, /auth/me
app.include_router(users.router)    # /users, /users/{id}


# ── Health check (every prod app needs this) ──
@app.get("/health", tags=["Health"])
def health():
    return {"status": "ok"}

tests/conftest.py — Test Setup

# Fixtures shared across ALL tests.
# Creates a separate test database so tests don't touch prod data.

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base
from app.dependencies import get_db

# Use SQLite in-memory for tests (fast, disposable)
TEST_DB_URL = "sqlite:///./test.db"
engine = create_engine(TEST_DB_URL, connect_args={"check_same_thread": False})
TestSession = sessionmaker(bind=engine)


@pytest.fixture(autouse=True)
def setup_db():
    """Create tables before each test, drop after."""
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)


@pytest.fixture
def db():
    """Provide a clean DB session for each test."""
    session = TestSession()
    try:
        yield session
    finally:
        session.close()


@pytest.fixture
def client(db):
    """Test client that uses the test DB instead of prod DB."""
    def override_get_db():
        yield db

    # Swap the real DB dependency with our test DB
    app.dependency_overrides[get_db] = override_get_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()

.env / .env.example — Environment Variables

# ── .env (NEVER commit this — it has real secrets) ──
DATABASE_URL=postgresql://myuser:mypassword@localhost:5432/mydb
SECRET_KEY=super-secret-jwt-key-change-this-in-production
DEBUG=false
ALLOWED_ORIGINS=["https://myapp.com","https://www.myapp.com"]
REDIS_URL=redis://localhost:6379

# ── .env.example (commit this — it's a template) ──
DATABASE_URL=postgresql://user:password@localhost:5432/dbname
SECRET_KEY=change-me
DEBUG=true
ALLOWED_ORIGINS=["http://localhost:3000"]

docker-compose.yml — Full Stack

# Runs your API + PostgreSQL + Redis in one command:
# docker compose up --build

version: "3.9"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    depends_on:
      - db
      - redis
    volumes:
      - .:/app              # mount code for hot reload in dev
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

  db:
    image: postgres:16
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: mydb
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data    # persist data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  pgdata:                    # named volume so data survives restarts

Dockerfile — Production Build

# Multi-stage build: small final image, no dev dependencies

FROM python:3.12-slim as base

# Don't write .pyc files, unbuffered output (for logging)
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

WORKDIR /app

# Install deps first (cached if requirements.txt doesn't change)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy app code
COPY . .

# Non-root user (security best practice)
RUN adduser --disabled-password --no-create-home appuser
USER appuser

# Production server: gunicorn + uvicorn workers
# Workers = 2 * CPU cores + 1
CMD ["gunicorn", "app.main:app", \
     "-w", "4", \
     "-k", "uvicorn.workers.UvicornWorker", \
     "--bind", "0.0.0.0:8000"]

How Data Flows Through the Layers

Follow a single request from client to database and back — see how each layer handles its part.

# POST /users {"name": "Yatin", "email": "y@dev.com", "password": "secret123"}
#
# 1. FastAPI receives the request
# 2. Pydantic validates the body → UserCreate schema
#    (wrong type? missing field? → auto 422 error)
# 3. Router handler calls: user_service.create_user(db, data)
# 4. Service layer:
#    - Checks if email exists (business rule)
#    - Hashes the password (security)
#    - Creates User ORM object
#    - db.add() + db.commit() + db.refresh()
# 5. Returns User ORM object back to router
# 6. FastAPI serializes it through response_model=UserResponse
#    (filters out hashed_password, formats dates)
# 7. Client receives: {"id": 1, "name": "Yatin", "email": "y@dev.com", ...}
#
# Request:  Client → Router → Service → Database
# Response: Database → Service → Router → Pydantic → Client

19 Testing

# pip install pytest httpx
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_root():
    resp = client.get("/")
    assert resp.status_code == 200
    assert resp.json() == {"message": "Hello, World!"}

def test_create_user():
    resp = client.post("/users", json={
        "name": "Yatin",
        "email": "y@dev.com",
        "age": 25
    })
    assert resp.status_code == 201
    data = resp.json()
    assert data["name"] == "Yatin"

def test_not_found():
    resp = client.get("/users/9999")
    assert resp.status_code == 404

def test_validation_error():
    resp = client.post("/users", json={"name": 123})
    assert resp.status_code == 422

# ── Async testing ──
import pytest
from httpx import AsyncClient, ASGITransport

@pytest.mark.anyio
async def test_async():
    async with AsyncClient(
        transport=ASGITransport(app=app), base_url="http://test"
    ) as ac:
        resp = await ac.get("/")
        assert resp.status_code == 200

# ── Override dependencies in tests ──
def override_get_db():
    db = TestSessionLocal()
    try:
        yield db
    finally:
        db.close()

app.dependency_overrides[get_db] = override_get_db

20 Deployment

Dockerfile

Package your FastAPI app into a Docker container for consistent deployment anywhere.

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Production Checklist

Everything you need to do before deploying to production — security, performance, and reliability.

pydantic-settings for Config

Type-safe configuration management — reads from .env files and environment variables with full validation.

# pip install pydantic-settings
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    secret_key: str
    debug: bool = False
    allowed_origins: list[str] = ["http://localhost:3000"]

    model_config = {"env_file": ".env"}

settings = Settings()

# .env file:
# DATABASE_URL=postgresql://user:pass@localhost/mydb
# SECRET_KEY=super-secret-key-here

21 Redis

Redis is an in-memory data store — blazing fast key-value storage used for caching, sessions, rate limiting, and real-time pub/sub. It sits between your API and your database to speed things up.

# Install
pip install redis[hiredis]

# redis     = Python Redis client (async support built-in)
# hiredis   = C parser for faster responses (optional but recommended)

Connection Setup

Create a Redis client as a dependency so every route can use it — connect on startup, disconnect on shutdown.

# app/redis.py
import redis.asyncio as redis
from app.config import settings

# Connection pool — reuses connections instead of opening new ones
pool = redis.ConnectionPool.from_url(
    settings.redis_url,      # "redis://localhost:6379"
    max_connections=10,
    decode_responses=True,   # return strings, not bytes
)

async def get_redis():
    client = redis.Redis(connection_pool=pool)
    try:
        yield client
    finally:
        await client.aclose()
# app/main.py
from contextlib import asynccontextmanager
from app.redis import pool

@asynccontextmanager
async def lifespan(app):
    # Startup — pool is already created
    yield
    # Shutdown — close all connections
    await pool.aclose()

app = FastAPI(lifespan=lifespan)

Caching

Store expensive query results in Redis — avoid hitting the database on every request. Set a TTL so stale data expires automatically.

import json
from fastapi import APIRouter, Depends
from app.redis import get_redis

router = APIRouter()

@router.get("/users/{user_id}")
async def get_user(user_id: int, r = Depends(get_redis), db = Depends(get_db)):
    # 1. Check cache first
    cache_key = f"user:{user_id}"
    cached = await r.get(cache_key)

    if cached:
        return json.loads(cached)   # cache HIT — no DB query!

    # 2. Cache MISS — query database
    user = db.query(User).get(user_id)
    if not user:
        raise HTTPException(404)

    # 3. Store in cache (expire after 5 minutes)
    user_data = {"id": user.id, "name": user.name, "email": user.email}
    await r.set(cache_key, json.dumps(user_data), ex=300)

    return user_data


# Invalidate cache when data changes
@router.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate, r = Depends(get_redis)):
    # ... update in DB ...
    await r.delete(f"user:{user_id}")   # clear stale cache
    return updated_user
Cache-aside pattern: Check cache → miss → query DB → store in cache. This is the most common caching strategy. Always invalidate (delete) the cache when the underlying data changes.

Rate Limiting

Prevent abuse by limiting how many requests a client can make — Redis tracks request counts with auto-expiring keys.

from fastapi import Request, HTTPException

async def rate_limit(
    request: Request,
    r = Depends(get_redis),
    limit: int = 100,        # max requests
    window: int = 60,         # per 60 seconds
):
    client_ip = request.client.host
    key = f"rate:{client_ip}"

    # Increment counter, set expiry on first request
    current = await r.incr(key)
    if current == 1:
        await r.expire(key, window)

    if current > limit:
        raise HTTPException(
            status_code=429,
            detail=f"Rate limit exceeded. Try again in {await r.ttl(key)}s"
        )

# Apply to routes
@router.get("/search", dependencies=[Depends(rate_limit)])
async def search(q: str):
    ...

Session Storage

Store user sessions in Redis instead of the database — faster reads and automatic expiry for inactive sessions.

import uuid

async def create_session(r, user_id: int, ttl: int = 3600) -> str:
    session_id = str(uuid.uuid4())
    await r.hset(f"session:{session_id}", mapping={
        "user_id": user_id,
        "created": str(datetime.utcnow()),
    })
    await r.expire(f"session:{session_id}", ttl)  # auto-delete after 1 hour
    return session_id

async def get_session(r, session_id: str) -> dict | None:
    data = await r.hgetall(f"session:{session_id}")
    return data or None

async def delete_session(r, session_id: str):
    await r.delete(f"session:{session_id}")

Pub/Sub — Real-Time Events

Publish events from one part of your app and subscribe to them from another — great for notifications and live updates.

# Publisher — send events from any route
@router.post("/orders")
async def create_order(order: OrderCreate, r = Depends(get_redis)):
    # ... save to DB ...
    # Publish event to "orders" channel
    await r.publish("orders", json.dumps({
        "event": "order_created",
        "order_id": order_id,
        "user_id": order.user_id,
    }))
    return {"id": order_id}

# Subscriber — listen via WebSocket
@router.websocket("/ws/orders")
async def order_feed(ws: WebSocket, r = Depends(get_redis)):
    await ws.accept()
    pubsub = r.pubsub()
    await pubsub.subscribe("orders")

    try:
        async for msg in pubsub.listen():
            if msg["type"] == "message":
                await ws.send_text(msg["data"])
    finally:
        await pubsub.unsubscribe("orders")

Common Redis Commands

Quick reference for the Redis operations you'll use most often in FastAPI.

OperationCommandUse Case
Set valueawait r.set("key", "value", ex=300)Cache with 5min TTL
Get valueawait r.get("key")Read from cache
Deleteawait r.delete("key")Invalidate cache
Incrementawait r.incr("counter")Rate limiting, counters
Hash setawait r.hset("user:1", mapping={...})Store objects (sessions)
Hash get allawait r.hgetall("user:1")Read full object
Expireawait r.expire("key", 3600)Auto-delete after time
TTLawait r.ttl("key")Check remaining time
Existsawait r.exists("key")Check before querying DB
List pushawait r.lpush("queue", "task")Simple job queue
List popawait r.rpop("queue")Process next job
Publishawait r.publish("channel", "msg")Real-time events
Redis is not a database replacement. It's volatile memory — if Redis crashes, cached data is gone. Always have the real data in PostgreSQL. Redis just makes reads faster.

22 Kafka

Apache Kafka is a distributed event streaming platform — it lets services communicate by producing and consuming messages through topics. Unlike HTTP APIs, Kafka is asynchronous and durable — messages persist even if the consumer is down.

When to Use Kafka vs REST

Use REST for synchronous request-response. Use Kafka when services need to communicate without waiting for each other.

ScenarioUse RESTUse Kafka
Get user profileYesNo
Send welcome email after signupNoYes
Process paymentYesNo
Update analytics after purchaseNoYes
Sync data between servicesNoYes
Real-time activity feedNoYes
# Install
pip install aiokafka

# aiokafka = async Kafka client for Python (built on kafka-python)
# You also need a running Kafka broker (use Docker for local dev)
# docker-compose.yml — local Kafka setup
# services:
#   kafka:
#     image: confluentinc/cp-kafka:7.5.0
#     ports:
#       - "9092:9092"
#     environment:
#       KAFKA_NODE_ID: 1
#       KAFKA_PROCESS_ROLES: broker,controller
#       KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
#       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
#       KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
#       KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
#       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
#       CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"

Core Concepts

Understand these four things and you understand Kafka — producers send messages, consumers read them, topics organize them, and consumer groups enable scaling.

# Kafka Architecture
#
# Producer ──→ Topic ──→ Consumer
#              (log)
#
# Topic:          A named category/feed of messages (like "orders", "emails")
# Partition:      Topic is split into partitions for parallelism
# Producer:       Sends messages TO a topic
# Consumer:       Reads messages FROM a topic
# Consumer Group: Multiple consumers sharing the work (each gets different partitions)
# Offset:         Position of a consumer in the topic (like a bookmark)
#
# Key difference from a queue:
# - Queue: message is deleted after being consumed
# - Kafka: message STAYS — multiple consumers can read it independently

Producer — Sending Events

Create a producer that connects on app startup and sends events whenever something happens in your API.

# app/kafka.py
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

class KafkaProducerService:
    def __init__(self, bootstrap_servers: str = "localhost:9092"):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),
            key_serializer=lambda k: k.encode("utf-8") if k else None,
        )

    async def start(self):
        await self.producer.start()

    async def stop(self):
        await self.producer.stop()

    async def send(self, topic: str, value: dict, key: str = None):
        await self.producer.send_and_wait(topic, value=value, key=key)

kafka_producer = KafkaProducerService()
# app/main.py — connect producer on startup
from contextlib import asynccontextmanager
from app.kafka import kafka_producer

@asynccontextmanager
async def lifespan(app):
    await kafka_producer.start()     # connect on startup
    yield
    await kafka_producer.stop()      # disconnect on shutdown

app = FastAPI(lifespan=lifespan)
# Use in routes — fire and forget
@router.post("/orders")
async def create_order(order: OrderCreate, db = Depends(get_db)):
    # 1. Save to database (source of truth)
    db_order = Order(**order.dict())
    db.add(db_order)
    db.commit()

    # 2. Publish event to Kafka (async side effects)
    await kafka_producer.send(
        topic="orders",
        key=str(db_order.id),
        value={
            "event": "order_created",
            "order_id": db_order.id,
            "user_id": order.user_id,
            "total": order.total,
        }
    )

    return {"id": db_order.id, "status": "created"}

Consumer — Processing Events

Consumers run as background tasks that continuously listen for new messages and process them independently from the API.

# app/consumers/order_consumer.py
import asyncio
import json
from aiokafka import AIOKafkaConsumer

async def consume_orders():
    consumer = AIOKafkaConsumer(
        "orders",                       # topic to listen to
        bootstrap_servers="localhost:9092",
        group_id="email-service",       # consumer group name
        value_deserializer=lambda m: json.loads(m.decode("utf-8")),
        auto_offset_reset="earliest",   # start from beginning if no offset saved
    )
    await consumer.start()

    try:
        async for msg in consumer:
            event = msg.value
            print(f"Received: {event}")

            if event["event"] == "order_created":
                # Send confirmation email, update analytics, etc.
                await send_order_email(event["user_id"], event["order_id"])

    finally:
        await consumer.stop()
# app/main.py — run consumer as background task
from app.consumers.order_consumer import consume_orders

@asynccontextmanager
async def lifespan(app):
    await kafka_producer.start()

    # Start consumer in background (runs alongside the API)
    consumer_task = asyncio.create_task(consume_orders())

    yield

    # Shutdown
    consumer_task.cancel()
    await kafka_producer.stop()

app = FastAPI(lifespan=lifespan)

Event-Driven Pattern

One event, multiple consumers — each service reacts independently. The producer doesn't need to know who's listening.

# When a user signs up, multiple things need to happen:
#
# WITHOUT Kafka (tightly coupled):
# @router.post("/signup")
# async def signup(user):
#     save_to_db(user)             # 1. save
#     await send_welcome_email()   # 2. email (what if it fails?)
#     await create_stripe_customer() # 3. billing (another failure point)
#     await notify_slack()          # 4. notify (yet another)
#     # If step 3 fails, steps 1-2 already happened. Messy rollback.
#
# WITH Kafka (loosely coupled):
# @router.post("/signup")
# async def signup(user):
#     save_to_db(user)
#     await kafka.send("user_events", {"event": "user_signed_up", ...})
#     # Done! Each service picks it up independently:
#
#   ┌── email-service (group: "email") ───── sends welcome email
#   │
#   ├── billing-service (group: "billing") ── creates Stripe customer
#   │
#   └── notification-service (group: "notif") ── posts to Slack
#
# Each consumer group gets its OWN copy of every message.
# If email-service is down, messages wait. No data lost.
# Real example — producer publishes ONE event
@router.post("/signup")
async def signup(user: UserCreate, db = Depends(get_db)):
    db_user = User(**user.dict())
    db.add(db_user)
    db.commit()

    # Single event — multiple services react
    await kafka_producer.send(
        topic="user_events",
        key=str(db_user.id),
        value={
            "event": "user_signed_up",
            "user_id": db_user.id,
            "email": db_user.email,
            "name": db_user.name,
            "timestamp": datetime.utcnow().isoformat(),
        }
    )

    return {"id": db_user.id}

Consumer Groups & Scaling

Consumer groups let you scale horizontally — add more consumers and Kafka automatically distributes partitions across them.

# How consumer groups work:
#
# Topic "orders" has 4 partitions:
#   Partition 0 ──┐
#   Partition 1 ──┤
#   Partition 2 ──┤
#   Partition 3 ──┘
#
# Consumer Group "email-service" with 2 consumers:
#   Consumer A → reads Partition 0, 1
#   Consumer B → reads Partition 2, 3
#
# Scale up to 4 consumers → each gets exactly 1 partition
# Scale up to 5 consumers → 1 sits idle (more consumers than partitions = waste)
#
# Different groups are INDEPENDENT:
#   Group "email-service"   → gets ALL messages (for sending emails)
#   Group "analytics-service" → gets ALL messages (for tracking)
#   Each group maintains its own offset (bookmark)
# Multiple consumers in the same group — they share the load
# Run these as separate processes/containers:

# Instance 1
consumer_1 = AIOKafkaConsumer(
    "orders",
    group_id="email-service",   # same group
    bootstrap_servers="localhost:9092",
)

# Instance 2 (separate process)
consumer_2 = AIOKafkaConsumer(
    "orders",
    group_id="email-service",   # same group → Kafka splits partitions
    bootstrap_servers="localhost:9092",
)

# Different group → gets its OWN copy of every message
analytics_consumer = AIOKafkaConsumer(
    "orders",
    group_id="analytics-service",  # different group
    bootstrap_servers="localhost:9092",
)

Redis + Kafka Together

Redis and Kafka solve different problems — use them together for a complete architecture.

FeatureRedisKafka
PurposeCache, sessions, rate limitingEvent streaming, service communication
SpeedSub-millisecond readsHigh throughput, slight latency
Data lifetimeTemporary (TTL-based)Persistent (days/weeks/forever)
PatternKey-value lookupPublish-subscribe / event log
ScalingVertical (bigger instance)Horizontal (more partitions/brokers)
If it crashesCache is gone (rebuild from DB)Messages are safe (replicated)
# Common architecture: FastAPI + Redis + Kafka + PostgreSQL
#
#                    ┌─── Redis (cache) ───┐
#                    │                      │
# Client → FastAPI ──┼─── PostgreSQL (data) │
#                    │                      │
#                    └─── Kafka (events) ───┘
#                              │
#                    ┌─────────┼─────────┐
#                    │         │         │
#               email-svc  analytics  billing
#
# 1. Request comes in → check Redis cache
# 2. Cache miss → query PostgreSQL
# 3. Store result in Redis for next time
# 4. Publish event to Kafka for side effects
# 5. Other services consume events independently
Rule of thumb: PostgreSQL is your source of truth. Redis makes reads fast. Kafka connects your services. Use all three together for production-grade APIs.

23 Interview Questions

Expand All

Fundamentals

What is FastAPI and how is it different from Flask/Django?

FastAPI is built on Starlette (ASGI) + Pydantic (validation). Flask/Django are WSGI-based.

FeatureFastAPIFlaskDjango
AsyncNativeLimited (2.0+)Limited (3.1+)
ValidationAuto (Pydantic)ManualForms/DRF serializers
API DocsAuto Swagger + ReDocManualDRF + drf-spectacular
SpeedNode.js/Go levelModerateModerate
Type hintsCore featureOptionalOptional

FastAPI = API-first. Django = full-stack (admin, ORM, templates). Flask = micro-framework (BYO everything).

What is ASGI vs WSGI? Why does it matter?

WSGI (Web Server Gateway Interface) — synchronous. One request per thread. Flask/Django traditionally use this.

ASGI (Asynchronous Server Gateway Interface) — async-native. Handles thousands of concurrent connections on a single thread via event loop.

# WSGI — blocks during I/O
def app(environ, start_response):
    data = fetch_from_db()  # thread blocked here
    start_response("200 OK", headers)
    return [data]

# ASGI — non-blocking
async def app(scope, receive, send):
    data = await fetch_from_db()  # yields control
    await send(response)

FastAPI uses ASGI → can handle WebSockets, long-polling, SSE, and high-concurrency I/O without threads.

What is Pydantic and why does FastAPI use it?

Pydantic is a data validation library using Python type hints. FastAPI uses it for request/response validation, serialization, and auto-documentation.

from pydantic import BaseModel, Field, field_validator

class User(BaseModel):
    name: str = Field(min_length=1, max_length=50)
    email: str
    age: int = Field(ge=0, le=150)

    @field_validator("email")
    @classmethod
    def validate_email(cls, v):
        if "@" not in v:
            raise ValueError("Invalid email")
        return v

# FastAPI auto-validates request body:
@app.post("/users")
async def create(user: User):  # ← auto-validated
    return user

Invalid data → automatic 422 Unprocessable Entity with detailed error messages. Zero manual validation code.

What are path parameters vs query parameters? When to use which?
# Path parameter — identifies a SPECIFIC resource
@app.get("/users/{user_id}")
async def get_user(user_id: int):  # /users/42
    ...

# Query parameters — filter/modify the response
@app.get("/users")
async def list_users(
    skip: int = 0,
    limit: int = 10,
    role: str | None = None
):  # /users?skip=0&limit=10&role=admin
    ...

Rule: Path params = required, identifies resource (/users/42). Query params = optional, filters/pagination (?page=2&sort=name).

Dependency Injection & Middleware

Explain FastAPI's Dependency Injection system. Why is it powerful?

DI in FastAPI lets you declare shared logic (DB sessions, auth, pagination) that gets automatically resolved and injected into route handlers.

from fastapi import Depends

# Dependency — reusable across routes
async def get_db():
    db = SessionLocal()
    try:
        yield db       # injected into route
    finally:
        db.close()   # cleanup runs after response

# Sub-dependency — dependencies can depend on other dependencies
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
):
    user = db.query(User).filter_by(token=token).first()
    if not user: raise HTTPException(401)
    return user

# Route just declares what it needs
@app.get("/profile")
async def profile(user: User = Depends(get_current_user)):
    return user

Why powerful: Testable (override deps in tests), composable (deps chain), auto-cleanup (yield), cached per request (same dep instance reused).

What's the difference between Depends with yield vs return? When does cleanup run?
# Return — simple, no cleanup needed
def get_settings():
    return Settings()  # just returns a value

# Yield — has setup + teardown (like context manager)
async def get_db():
    db = SessionLocal()      # SETUP: before yield
    try:
        yield db             # INJECT: this value goes to the route
    finally:
        db.close()           # TEARDOWN: runs after response is sent

Execution order:

  1. Code before yield runs → setup
  2. yield value injected into route handler
  3. Route handler executes and returns response
  4. Code after yield runs → teardown (even if route raised an exception)

Tricky gotcha: Exceptions in the route handler propagate into the yield dependency. That's why you need try/finally — without it, cleanup won't run on error.

Middleware vs Dependencies — when to use which?
FeatureMiddlewareDependency
ScopeEvery requestSpecific routes
AccessRaw request/responseParsed params
Use forCORS, logging, timingAuth, DB, validation
Can modify response?YesNo (only input)
Testable?HarderEasy (override)
# Middleware — runs on EVERY request
@app.middleware("http")
async def add_timing(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    response.headers["X-Process-Time"] = str(time.time() - start)
    return response

# Dependency — runs only on routes that declare it
@app.get("/admin", dependencies=[Depends(require_admin)])
async def admin_panel(): ...

Rule: Cross-cutting concerns (logging, CORS, headers) → middleware. Business logic (auth, DB, pagination) → dependencies.

How do you override dependencies in tests?
from fastapi.testclient import TestClient

# Production dependency
async def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Test override — use test database
async def override_get_db():
    db = TestingSessionLocal()
    try:
        yield db
    finally:
        db.rollback()
        db.close()

app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)

# Now all routes using Depends(get_db) will use test DB
response = client.get("/users")

# Clean up after tests
app.dependency_overrides.clear()

Key insight: dependency_overrides is a dict mapping original_dep → replacement_dep. This is why DI is superior to hard-coded function calls — zero code changes for testing.

Async, Performance & Tricky Behavior

async def vs def in FastAPI — what ACTUALLY happens?
# async def — runs on the event loop (main thread)
@app.get("/async")
async def async_route():
    data = await async_db_query()  # non-blocking
    return data

# def — FastAPI runs it in a THREAD POOL automatically!
@app.get("/sync")
def sync_route():
    data = blocking_db_query()  # runs in threadpool
    return data

The trap: If you use async def but call blocking code (without await), you block the entire event loop. No other request can be processed.

# ❌ WRONG — blocks event loop!
@app.get("/bad")
async def bad_route():
    time.sleep(5)  # blocks EVERYTHING for 5 seconds
    return {"msg": "done"}

# ✅ Option 1 — use regular def (runs in threadpool)
@app.get("/good1")
def good_route():
    time.sleep(5)  # runs in thread, event loop free
    return {"msg": "done"}

# ✅ Option 2 — use async sleep
@app.get("/good2")
async def good_route():
    await asyncio.sleep(5)  # non-blocking
    return {"msg": "done"}

# ✅ Option 3 — run blocking code in executor
@app.get("/good3")
async def good_route():
    data = await asyncio.to_thread(blocking_function)
    return data

Rule: async def → only use await-able calls. def → blocking is fine (auto-threadpooled). Mixing them wrong is the #1 FastAPI performance killer.

What is the lifespan event? How is it different from on_event?
# ❌ OLD WAY (deprecated in FastAPI 0.93+)
@app.on_event("startup")
async def startup():
    app.state.db = create_pool()

@app.on_event("shutdown")
async def shutdown():
    await app.state.db.close()

# ✅ NEW WAY — lifespan context manager
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # STARTUP — runs before first request
    app.state.db = await create_pool()
    app.state.redis = await aioredis.from_url("redis://localhost")

    yield  # app runs here

    # SHUTDOWN — runs after last response
    await app.state.db.close()
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

Why lifespan? Startup and shutdown are coupled — resources created in startup must be cleaned in shutdown. The context manager pattern guarantees this. on_event doesn't.

How does BackgroundTasks work? What are its limitations?
from fastapi import BackgroundTasks

@app.post("/send-email")
async def send_email(
    email: str,
    background: BackgroundTasks
):
    background.add_task(send_email_task, email)
    return {"msg": "Email queued"}  # returns immediately

def send_email_task(email: str):
    # runs AFTER response is sent
    smtp.send(email)  # blocking is OK — runs in threadpool

Limitations:

  • No retry — if it fails, it's gone. No error notification.
  • No persistence — server restart = lost tasks.
  • Same process — heavy tasks block your worker.
  • No tracking — can't check task status.

When to upgrade: Need retries/persistence → Celery or ARQ. Need event streaming → Kafka. BackgroundTasks is only for fire-and-forget lightweight work.

What happens if you await a non-coroutine? Or forget to await?
# ❌ Forgot to await — returns coroutine object, not result!
@app.get("/broken")
async def broken():
    result = async_db_query()  # missing await!
    return result
    # Returns: <coroutine object async_db_query at 0x...>
    # Plus a RuntimeWarning: coroutine was never awaited

# ❌ Awaiting a regular function
async def handler():
    result = await regular_function()  # TypeError!
    # TypeError: object int can't be used in 'await' expression

# ✅ Run sync function in async context
async def handler():
    result = await asyncio.to_thread(regular_function)

Debug tip: If your API returns weird objects or you see coroutine never awaited warnings — you forgot an await.

Pydantic Tricks & Gotchas

How do you create different Pydantic models for Create, Read, Update?
# Base — shared fields
class UserBase(BaseModel):
    name: str
    email: str

# Create — what client sends (no id)
class UserCreate(UserBase):
    password: str

# Read — what API returns (has id, no password)
class UserRead(UserBase):
    id: int
    model_config = ConfigDict(from_attributes=True)

# Update — all fields optional
class UserUpdate(BaseModel):
    name: str | None = None
    email: str | None = None

@app.post("/users", response_model=UserRead)
async def create(user: UserCreate): ...

@app.patch("/users/{id}", response_model=UserRead)
async def update(id: int, user: UserUpdate): ...

Why separate models? Security — UserCreate accepts password, UserRead never exposes it. UserUpdate makes everything optional for PATCH.

What is model_config = ConfigDict(from_attributes=True) and why do you need it?
# SQLAlchemy model — uses attributes (user.name)
class UserDB(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)

# Without from_attributes — FAILS
class UserRead(BaseModel):
    id: int
    name: str

user_db = db.query(UserDB).first()
UserRead.model_validate(user_db)  # ❌ Pydantic expects dict

# With from_attributes — WORKS
class UserRead(BaseModel):
    id: int
    name: str
    model_config = ConfigDict(from_attributes=True)

UserRead.model_validate(user_db)  # ✅ reads user_db.id, user_db.name

Old name: class Config: orm_mode = True (Pydantic v1). New name: from_attributes=True (Pydantic v2).

Tells Pydantic to read data from object attributes (obj.field), not just dict keys (obj["field"]).

What's the difference between Field, Query, Path, Body, Header?
from fastapi import Query, Path, Body, Header

@app.get("/items/{item_id}")
async def read_item(
    # Path — from URL path
    item_id: int = Path(ge=1, description="Item ID"),

    # Query — from ?key=value
    q: str = Query(max_length=50, default=None),

    # Header — from request headers
    x_token: str = Header(),
):
    ...

@app.post("/items")
async def create(
    # Body — from JSON body (can embed multiple)
    item: Item = Body(embed=True),
):
    ...

Field is for inside Pydantic models. Query/Path/Body/Header are for route function parameters. They all support the same validation options (ge, max_length, regex, etc.).

How do you handle partial updates (PATCH) properly?
class ItemUpdate(BaseModel):
    name: str | None = None
    price: float | None = None
    description: str | None = None

@app.patch("/items/{item_id}")
async def update_item(item_id: int, updates: ItemUpdate):
    # exclude_unset=True is the KEY — only gets fields client sent
    update_data = updates.model_dump(exclude_unset=True)

    # Without exclude_unset:
    # {"name": None, "price": None, "description": None}
    # With exclude_unset (client sent {"name": "New"}):
    # {"name": "New"}  ← only what was actually sent

    db_item = get_item(item_id)
    for field, value in update_data.items():
        setattr(db_item, field, value)
    db.commit()

Gotcha: Without exclude_unset=True, you'll overwrite every field with None — even ones the client didn't send. This is the #1 PATCH bug.

Authentication & Security

Implement JWT auth flow in FastAPI from scratch
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt
from datetime import datetime, timedelta

SECRET = "your-secret-key"
ALGO = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

# 1. Create token
def create_token(data: dict, expires: timedelta = timedelta(hours=1)):
    payload = {**data, "exp": datetime.utcnow() + expires}
    return jwt.encode(payload, SECRET, algorithm=ALGO)

# 2. Verify token (dependency)
async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET, algorithms=[ALGO])
        user_id = payload.get("sub")
        if not user_id: raise HTTPException(401)
    except jwt.JWTError:
        raise HTTPException(401, "Invalid token")
    return get_user_from_db(user_id)

# 3. Login route
@app.post("/login")
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = authenticate(form.username, form.password)
    if not user: raise HTTPException(401)
    token = create_token({"sub": str(user.id)})
    return {"access_token": token, "token_type": "bearer"}

# 4. Protected route
@app.get("/me")
async def me(user = Depends(get_current_user)):
    return user
How do you implement role-based access control (RBAC)?
from enum import Enum

class Role(str, Enum):
    USER = "user"
    ADMIN = "admin"
    MODERATOR = "moderator"

# Dependency factory — returns a dependency!
def require_role(*roles: Role):
    async def checker(user = Depends(get_current_user)):
        if user.role not in roles:
            raise HTTPException(403, "Forbidden")
        return user
    return checker

# Usage — clean and declarative
@app.delete("/users/{id}")
async def delete_user(
    id: int,
    admin = Depends(require_role(Role.ADMIN))
):
    ...

@app.put("/posts/{id}")
async def edit_post(
    id: int,
    user = Depends(require_role(Role.ADMIN, Role.MODERATOR))
):
    ...

Key pattern: require_role is a dependency factory — a function that returns a dependency. This lets you parameterize access control per route.

Database & ORM

How do you handle database sessions properly in FastAPI?
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

engine = create_engine("postgresql://user:pass@localhost/db")
SessionLocal = sessionmaker(bind=engine)

# ✅ Correct — yield dependency with cleanup
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()  # ALWAYS close, even on error

@app.get("/users")
def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()

# ❌ WRONG — session leaks on exceptions!
@app.get("/bad")
def bad_route():
    db = SessionLocal()
    users = db.query(User).all()  # if this throws...
    db.close()                     # this never runs!
    return users

Common gotcha: FastAPI's Depends ensures one session per request. If you create sessions manually inside routes, you risk leaks and inconsistent transactions.

SQLAlchemy sync vs async — which to use with FastAPI?
# Sync — simpler, use with `def` routes
from sqlalchemy import create_engine
engine = create_engine("postgresql://...")

@app.get("/users")
def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()  # FastAPI auto-threadpools this

# Async — more complex, higher throughput
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = async_sessionmaker(engine)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

Rule of thumb: Start with sync (simpler). Switch to async when you need high concurrency (1000+ concurrent connections). Sync with def routes is fast enough for most apps since FastAPI auto-threadpools them.

How do you handle database migrations with Alembic?
# Setup
# pip install alembic
# alembic init alembic

# alembic/env.py — point to your models
from app.models import Base
target_metadata = Base.metadata

# Generate migration after model changes
# alembic revision --autogenerate -m "add users table"

# Apply migration
# alembic upgrade head

# Rollback
# alembic downgrade -1

Interview trap: "What happens if you change a model but don't create a migration?" → Nothing changes in the database. SQLAlchemy models and database schema are independent. Alembic bridges them.

Production tip: Always review autogenerated migrations. They can miss renames (sees drop + create instead) and data migrations.

Architecture & Design

How do you structure a large FastAPI project? Repository pattern?
# 3-layer architecture:
# Router → Service → Repository

# router — HTTP concerns only
@router.get("/users/{id}", response_model=UserRead)
async def get_user(id: int, service: UserService = Depends()):
    return service.get_by_id(id)

# service — business logic
class UserService:
    def __init__(self, repo: UserRepo = Depends()):
        self.repo = repo

    def get_by_id(self, id: int):
        user = self.repo.find(id)
        if not user:
            raise HTTPException(404)
        return user

# repository — database access only
class UserRepo:
    def __init__(self, db: Session = Depends(get_db)):
        self.db = db

    def find(self, id: int):
        return self.db.query(User).get(id)

Why? Router doesn't know about DB. Service doesn't know about HTTP. Repository doesn't know about business rules. Each layer is independently testable.

APIRouter vs including routers — how does it work?
# app/routers/users.py
from fastapi import APIRouter

router = APIRouter(
    prefix="/users",
    tags=["users"],
    dependencies=[Depends(get_current_user)]  # applies to ALL routes
)

@router.get("/")
async def list_users(): ...   # GET /users/

@router.get("/{id}")
async def get_user(id: int): ...   # GET /users/42

# app/main.py
from app.routers import users, products, orders

app = FastAPI()
app.include_router(users.router)
app.include_router(products.router)
app.include_router(orders.router, prefix="/v2")  # → /v2/orders/

Key: Routers have their own prefix, tags, and dependencies. include_router can add additional prefix (useful for API versioning).

How do you handle custom exception handlers?
# Custom exception class
class NotFoundError(Exception):
    def __init__(self, resource: str, id: int):
        self.resource = resource
        self.id = id

# Register handler — catches this exception ANYWHERE
@app.exception_handler(NotFoundError)
async def not_found_handler(request: Request, exc: NotFoundError):
    return JSONResponse(
        status_code=404,
        content={
            "error": f"{exc.resource} {exc.id} not found",
            "type": "not_found"
        }
    )

# Now just raise anywhere — no try/except in routes
class UserService:
    def get(self, id: int):
        user = self.repo.find(id)
        if not user:
            raise NotFoundError("User", id)  # caught globally
        return user

Gotcha: Exception handlers don't catch HTTPException by default — FastAPI has its own handler for that. To override it, register a handler for HTTPException explicitly.

How do you version your API?
# Option 1 — URL prefix (most common)
app.include_router(users_v1.router, prefix="/v1")
app.include_router(users_v2.router, prefix="/v2")
# GET /v1/users, GET /v2/users

# Option 2 — Header-based
@app.get("/users")
async def get_users(accept_version: str = Header(default="1")):
    if accept_version == "2":
        return v2_response()
    return v1_response()

# Option 3 — Separate FastAPI apps (for major versions)
app_v1 = FastAPI()
app_v2 = FastAPI()

main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)

Best practice: URL prefix for simplicity. mount() for completely different API versions with separate docs pages.

Tricky & Brain Teasers

Why does this route return {"item_id": "me"} instead of the current user?
@app.get("/users/{user_id}")
async def get_user(user_id: str):
    return {"user_id": user_id}

@app.get("/users/me")
async def get_me():
    return {"user": "current"}

# GET /users/me → {"user_id": "me"} — WRONG!

Why? Route order matters! /users/{user_id} matches /users/me first because it was defined first. FastAPI evaluates routes top to bottom.

Fix — put specific routes BEFORE parameterized ones:

@app.get("/users/me")       # ← FIRST
async def get_me(): ...

@app.get("/users/{user_id}")  # ← SECOND
async def get_user(user_id: str): ...
What's wrong here? — The response_model leak trap
class UserDB(BaseModel):
    id: int
    name: str
    email: str
    hashed_password: str  # SECRET!

# ❌ SECURITY BUG — leaks password hash!
@app.get("/users/{id}")
async def get_user(id: int):
    user = db.query(User).get(id)
    return user  # returns ALL fields including hashed_password

# ✅ FIX — use response_model to filter output
class UserRead(BaseModel):
    id: int
    name: str
    email: str
    # no password field!

@app.get("/users/{id}", response_model=UserRead)
async def get_user(id: int):
    return db.query(User).get(id)  # password stripped by response_model

Lesson: response_model isn't just for docs — it's a security filter. Without it, you can accidentally expose internal fields (passwords, tokens, internal IDs).

Why does this dependency run twice? — Caching gotcha
async def get_db():
    print("Creating DB session")  # How many times?
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

async def get_user(db = Depends(get_db)):
    return db.query(User).first()

async def get_settings(db = Depends(get_db)):
    return db.query(Settings).first()

@app.get("/")
async def home(
    user = Depends(get_user),
    settings = Depends(get_settings)
):
    return {"user": user, "settings": settings}

# "Creating DB session" prints ONCE, not twice!

Why? FastAPI caches dependencies per request by default. Same dependency function → same instance within one request. This is why get_db runs once even though two sub-dependencies use it.

To disable caching (rare — e.g., you want a fresh random value):

async def get_random():
    return random.randint(1, 100)

# use_cache=False → runs fresh each time
@app.get("/")
async def route(
    a = Depends(get_random, use_cache=False),
    b = Depends(get_random, use_cache=False)
):
    return {"a": a, "b": b}  # different values!
What's the output? — The mutable Pydantic default trap
# ❌ This looks fine but is WRONG in Pydantic v1
class Item(BaseModel):
    tags: list[str] = []  # shared mutable default!

# Pydantic v2 actually handles this correctly — it copies the default
# But in v1, this was a real trap (same as Python's mutable default arg)

# ✅ Safe way (works in both v1 and v2)
class Item(BaseModel):
    tags: list[str] = Field(default_factory=list)

# The REAL Pydantic trap — model_validate vs __init__
class User(BaseModel):
    name: str
    age: int

# These are NOT the same!
u1 = User(name="Alice", age="25")    # Works! Coerces "25" → 25
u2 = User(name="Alice", age="abc")   # ValidationError!

# Strict mode — no coercion
class StrictUser(BaseModel):
    model_config = ConfigDict(strict=True)
    name: str
    age: int

StrictUser(name="Alice", age="25")  # ❌ ValidationError! str != int

Key insight: Pydantic coerces types by default"25" becomes 25. This is usually helpful but can hide bugs. Use strict=True when you need exact types.

What happens when two routes have the same path and method?
@app.get("/items")
async def items_v1():
    return ["v1"]

@app.get("/items")
async def items_v2():
    return ["v2"]

# GET /items → ["v1"] — first one wins, second is SILENTLY ignored!
# No error, no warning. 😱

Why it's dangerous: When using include_router with multiple routers, you can accidentally create duplicate routes — especially with "/" paths. FastAPI won't warn you.

Debug tip: Check /docs (Swagger UI) — duplicate routes will show up there. Or inspect app.routes programmatically.

Implement a rate limiter using dependencies
from collections import defaultdict
from time import time

# In-memory rate limiter (use Redis in production)
requests_log: dict[str, list[float]] = defaultdict(list)

def rate_limit(max_requests: int = 10, window: int = 60):
    async def checker(request: Request):
        client_ip = request.client.host
        now = time()

        # Clean old entries
        requests_log[client_ip] = [
            t for t in requests_log[client_ip]
            if now - t < window
        ]

        if len(requests_log[client_ip]) >= max_requests:
            raise HTTPException(
                429,
                detail="Too many requests",
                headers={"Retry-After": str(window)}
            )
        requests_log[client_ip].append(now)
    return checker

# Usage — 5 requests per 60 seconds
@app.get("/api/data", dependencies=[Depends(rate_limit(5, 60))])
async def get_data():
    return {"data": "here"}

Why this is tricky: rate_limit(5, 60) is a dependency factory — it returns a function that FastAPI calls. The outer function configures, the inner function executes per request.

Production: Use Redis instead of in-memory dict — this won't work with multiple workers.

What's wrong with this WebSocket implementation?
# ❌ BUG — doesn't handle disconnection!
@app.websocket("/ws")
async def websocket(ws: WebSocket):
    await ws.accept()
    while True:
        data = await ws.receive_text()  # crashes on disconnect!
        await ws.send_text(f"Echo: {data}")

# ✅ FIX — handle WebSocketDisconnect
from fastapi import WebSocketDisconnect

@app.websocket("/ws")
async def websocket(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")  # clean up here

Another gotcha: WebSocket routes don't use response_model, don't return HTTP status codes, and middleware runs differently for them. Auth must be handled manually (usually via query params since headers aren't sent in the initial handshake from browsers).

How does FastAPI auto-generate OpenAPI docs? Can you customize them?
# FastAPI reads your type hints, models, and decorators
# to auto-generate the OpenAPI schema at /openapi.json

# Customize per-route:
@app.get(
    "/users",
    summary="List all users",
    description="Returns paginated user list",
    response_description="List of users",
    tags=["users"],
    deprecated=False,
    responses={
        404: {"description": "Not found"},
        500: {"description": "Server error"},
    }
)
async def list_users(): ...

# Customize globally:
app = FastAPI(
    title="My API",
    version="2.0.0",
    description="Production API",
    docs_url="/swagger",        # custom path (default: /docs)
    redoc_url="/documentation",  # custom path (default: /redoc)
    openapi_url="/api/schema",   # custom schema path
)

# Disable docs in production:
app = FastAPI(docs_url=None, redoc_url=None)

Key insight: Everything in Swagger comes from your code — type hints, Pydantic models, decorator args. Better type annotations = better docs. Zero extra effort.


Previous Guide Python — The Complete Guide Variables, OOP, data structures, decorators, NumPy, Pandas — the foundation.