"I want to stand up an API in FastAPI"—as a requirement it's one line. Write @app.get and you have something working in 5 minutes. But the moment you try to put it in production, the things to decide multiply at once. Do you write it with async def or def? Where do you validate external input? How do you inject the DB session? Is it OK to run a heavy AI job inside a request? When it falls over, can you trace the cause from the logs?
This article is an implementation guide to operating FastAPI at production quality. Whereas the official tutorial carefully teaches you up to "making it work," here I focus on the decision criteria and code for building "doesn't fall over, can be traced, easy to change." As source material, I'll weave in design decisions from the in-house AI platform I built for a major Japanese broadcaster (the caption typo-detection pipeline; placing FastAPI as the API layer and separating long-running AI jobs to Cloud Run Jobs / Cloud Workflows).
The rules of this article: API specs and recommended patterns are based on the FastAPI / Pydantic official documentation (as of June 2026). Specs are revised, so always confirm the latest behavior in the official docs before going to production. The code is arranged in a form usable in real operation, but secrets (DB URLs, API keys, signing keys) are assumed to be in environment variables (never hardcode).
0. Why FastAPI can be called "fast and safe" in one line
Before design decisions, grasp what FastAPI stands on. The official docs cite these three as its foundation.
- Standard Python type hints: just by writing parameter types, validation, conversion, and OpenAPI doc generation come along automatically.
- Pydantic: data validation and parsing. The core validation logic is written in Rust, and the official docs say it's "one of the fastest-class data-validation libraries in Python."
- Starlette: the ASGI web layer. The part that supports, in the official phrasing, "performance on par with NodeJS and Go."
In other words, FastAPI's "fast" and "fewer bugs" come from the design of getting validation, conversion, and documentation at once with a single type hint. This article aims to leverage this design philosophy to the fullest in the production-operations context.
1. Most important: the right use of async def and def
Get this wrong and you build either an API that doesn't get throughput or an API that occasionally freezes. It's FastAPI's biggest pitfall and biggest leverage.
1.1 The official "for people in a hurry" rule
Let me quote the official documentation's (/async/) decision criteria as-is.
-
If you use an external library you're told to call with
await→async def@app.get("/") async def read_results(): results = await some_library() return results -
If it communicates externally but the library isn't
await-capable (many DB libraries currently are) → a plaindef@app.get("/") def results(): results = some_library() return results -
If it communicates with nothing and doesn't need to wait →
async def(no need to useawaitinside) -
If you can't decide → a plain
def
1.2 Why it doesn't get slow even with def
When you write a path-operation function as a plain def, FastAPI runs it in an external thread pool and awaits it (because calling it directly would block the server). This applies not only to path-operation functions but also to dependencies (Depends).
This is the point that decisively differs from other async frameworks.
- A blocking call in
def: FastAPI offloads it to the thread pool, so the event loop doesn't stop. - A blocking call inside
async def: it runs on the event loop as-is, so all requests freeze.
1.3 The most common accident: calling synchronous blocking inside async def
This is the most frequent, and hardest to notice, bug on the front line.
import time
import requests # 同期ライブラリ(await できない)
@app.get("/bad")
async def bad_endpoint():
# ❌ async def の中で同期ブロッキング I/O を呼んでいる
# この requests.get が返るまで、サーバー全体の処理が止まる
res = requests.get("https://slow-upstream.example.com/data")
time.sleep(1) # ❌ これも同様にイベントループを丸ごと止める
return res.json()
It's declared async def but the inside is synchronous. During this one request's requests.get / time.sleep, all other requests the same worker should handle are blocked. This is the typical case where "latency collapses the moment you raise concurrent connections" in a load test.
There are two fix policies. (A) Make it def and leave it to the thread pool, or (B) switch to an async-capable library and await.
import anyio
import httpx # async 対応の HTTP クライアント
# (A) 同期ライブラリを使い続けるなら def にする(FastAPIがスレッドプールへ逃がす)
@app.get("/ok-sync")
def ok_sync_endpoint():
res = requests.get("https://slow-upstream.example.com/data")
return res.json()
# (B) async ネイティブのライブラリで正しく await する
@app.get("/ok-async")
async def ok_async_endpoint():
async with httpx.AsyncClient() as client:
res = await client.get("https://slow-upstream.example.com/data")
return res.json()
# どうしても async def の中で重い同期処理を呼ぶしかないときは、明示的にスレッドへ逃がす
@app.get("/ok-offload")
async def ok_offload_endpoint():
# anyio.to_thread.run_sync でブロッキング関数をスレッドプールへ
data = await anyio.to_thread.run_sync(blocking_cpu_or_io_work)
return data
1.4 Decision table: async def or def
| Your I/O | Function declaration | Reason |
|---|---|---|
An async library called with await (httpx, asyncpg, async SDK) | async def | Natively non-blocking. Can await |
A synchronous DB driver, requests, time.sleep, a synchronous SDK | def | FastAPI offloads to the thread pool and doesn't stop the loop |
| Heavy CPU-bound computation (image processing, crypto, ML inference) | def (or a separate process/queue) | Even threads are affected by the GIL. If heavy, the queue in Chapter 7 |
| Pure logic that doesn't communicate externally | async def | No wait, so anything is fine. The official recommends async |
| Can't decide | def | The official "when in doubt, def." The lowest accident rate |
Design guideline (KISS): "Make everything
async defand it's fast" is wrong. Useasync defonly "when you trulyawaitinside." If the inside is synchronous, plainly make itdefand leave it to FastAPI—that's the choice with the fewest accidents and the most speed.
2. Pydantic v2: kill external input at the boundary
The essence of an API is "a checkpoint that converts untrusted data coming from the outside world into the inside's trustworthy types." Pydantic v2 lets you build that checkpoint with a single type hint. Trust external input not at all—this is the first principle of security.
2.1 Split request / response models
Make the input and output models different things. Don't let input include id or created_at, and don't leak the password hash in output—this is separation of concerns (SRP) and security too.
from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, field_validator
# --- 入力(クライアントが送ってよいものだけ) ---
class UserCreate(BaseModel):
email: EmailStr # 形式不正は422で弾かれる
display_name: str = Field(min_length=1, max_length=50)
age: int = Field(ge=0, le=150) # 範囲制約(負の年齢を入れさせない)
@field_validator("display_name")
@classmethod
def no_control_chars(cls, v: str) -> str:
# ドメイン固有のルールはvalidatorで明示的に検証する
if any(ord(c) < 0x20 for c in v):
raise ValueError("制御文字は使用できません")
return v.strip()
# --- 出力(サーバーが返してよいものだけ。秘密は含めない) ---
class UserPublic(BaseModel):
id: int
email: EmailStr
display_name: str
created_at: datetime
# password_hash は意図的に含めない → 漏洩を型で防ぐ
Using Field's constraints (min_length / max_length / ge / le / gt, etc.) and field_validator consolidates the business rules in the model definition. The validation logic doesn't scatter into the handler body, so it's readable and easy to change (ETC).
2.2 Use it in the handler: bind the output with a type too via response_model
from fastapi import FastAPI
app = FastAPI()
@app.post("/users", response_model=UserPublic, status_code=201)
async def create_user(payload: UserCreate) -> UserPublic:
# payload はここに来た時点で「検証済み」。中で再検証は不要(DRY)
user = await repository.insert_user(payload)
# response_model=UserPublic により、余分なフィールドは自動で削ぎ落とされる
return user
Just by writing payload: UserCreate, an invalid request is rejected with 422 before it reaches the handler. You don't write a single line of validation code in the handler—this is the substance of "validation comes with a single type hint." Attach response_model and the output side is also shaped to the contract, structurally preventing the leakage of secret fields.
2.3 Data coming from "non-type places": model_validate / model_validate_json
External API responses, message-queue payloads, config files—data that doesn't pass through a FastAPI handler argument must always be validated before letting it inside.
import httpx
async def fetch_external_user(user_id: str) -> UserPublic:
async with httpx.AsyncClient() as client:
res = await client.get(f"https://upstream.example.com/users/{user_id}")
res.raise_for_status()
# 外部APIのJSONも「信用できない外部入力」。dictで持ち回らず即座にモデル化する
return UserPublic.model_validate(res.json()) # dict → 検証済みモデル
# JSON文字列を直接渡すなら model_validate_json(res.text) が一手少ない
A Pydantic v2 note: there are
strictmode (no type conversion) andlaxmode (coercing like"123"→123). Consider strict for untrusted external input. Implicit conversions like"true"turning intoTruebecome a breeding ground for unexpected bugs. On validation failure,ValidationErrorreturns which field failed and why in a structured way, so the error log itself becomes root-cause analysis.
3. Dependency injection (Depends): inject DB session, auth, config
Depends is the highest-value feature in FastAPI. You declare "what the handler needs," and the framework prepares and injects it. You can separate the DB session, the authenticated user, and config from the handler body (SRP).
3.1 Make a type alias with Annotated (officially recommended)
Since FastAPI 0.95.0, the official docs recommend the way using Annotated. The standard is to fold dependencies into a type alias and reuse them.
from typing import Annotated
from fastapi import Depends
# 設定(環境変数から読む。シークレットはここに集約し、ハードコードしない)
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str
jwt_secret: str
class Config:
env_file = ".env"
from functools import lru_cache
@lru_cache # 設定は毎回読み直さず、プロセス内で1つ(コスト効率)
def get_settings() -> Settings:
return Settings() # 環境変数 DATABASE_URL / JWT_SECRET を読む
SettingsDep = Annotated[Settings, Depends(get_settings)]
3.2 Cleanup with yield dependencies (the standard for DB sessions)
A dependency using yield acquires before the yield and cleans up after the response. Closing the DB session is the royal road of this pattern.
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# lifespan で作ったエンジンから sessionmaker を用意(第4章参照)
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
session_factory: async_sessionmaker = request.app.state.session_factory
async with session_factory() as session: # yield の前 = 確保
yield session # ハンドラへ注入
# yield の後 = 後始末。async with が確実にクローズする(リーク防止)
DbDep = Annotated[AsyncSession, Depends(get_db)]
3.3 Make auth a dependency too
Auth is "a cross-cutting concern repeated in every handler." Carve it into a dependency, and each handler just receives current_user (DRY).
from fastapi import HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
bearer = HTTPBearer()
async def get_current_user(
creds: Annotated[HTTPAuthorizationCredentials, Depends(bearer)],
settings: SettingsDep,
db: DbDep,
) -> UserPublic:
# トークン検証は外部入力の検証そのもの。失敗は401で即座に返す
user = await verify_jwt(creds.credentials, settings.jwt_secret, db)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="認証に失敗しました",
headers={"WWW-Authenticate": "Bearer"},
)
return user
CurrentUser = Annotated[UserPublic, Depends(get_current_user)]
@app.get("/me", response_model=UserPublic)
async def read_me(current_user: CurrentUser) -> UserPublic:
return current_user # 認証ロジックはハンドラに一切ない
A dependency can be written async def or def independently of the handler (the official docs state this clearly). At test time it can be swapped wholesale (Chapter 8's dependency_overrides)—this is the biggest return of DI.
4. lifespan: acquire heavy resources only once at startup
Don't create DB pools, ML models, or HTTP clients per request. Establishing connections and loading models cost seconds. With the officially-recommended lifespan, acquire them only once at startup and share them within the worker.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- 起動時(yield の前。一度だけ実行される) ---
settings = get_settings()
engine = create_async_engine(settings.database_url, pool_size=10, max_overflow=20)
app.state.session_factory = async_sessionmaker(engine, expire_on_commit=False)
app.state.http = httpx.AsyncClient(timeout=10.0) # 使い回す共有クライアント
# ML モデルなど重い資源もここでロードして app.state に置く
yield # ← ここでアプリが起動し、リクエストを受け始める
# --- 終了時(yield の後。グレースフルシャットダウン時に一度だけ) ---
await app.state.http.aclose()
await engine.dispose() # コネクションを綺麗に返す(リーク・接続枯渇の防止)
app = FastAPI(lifespan=lifespan)
Important: the old
@app.on_event("startup")/@app.on_event("shutdown")are deprecated. Withlifespan, the startup and shutdown logic gathers in one place, and you can share state across theyield. In new code, always uselifespan.
In the broadcaster platform too, the AI models, GCP clients, and DB pool were all acquired with lifespan and shared by placing them on app.state. Just by stopping per-request acquisition, p99 latency and the cold initial response stabilize dramatically (cost efficiency, performance).
5. Error handling: 4xx immediately, 5xx with structured logs
The principle of error handling is simple. Errors due to external input (things the client can fix) are returned immediately as 4xx. Server-side unexpected things (things the client can't fix) return 5xx, and the details go only into structured logs. Don't return stack traces or internal exception messages to the client (information leakage).
5.1 Business errors are HTTPException
from fastapi import HTTPException, status
@app.get("/users/{user_id}", response_model=UserPublic)
async def get_user(user_id: int, db: DbDep) -> UserPublic:
user = await repository.find_user(db, user_id)
if user is None:
# 「存在しない」はクライアントが知るべき情報 → 404で即時
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ユーザーが見つかりません")
return user
5.2 Translate domain exceptions to HTTP in a centralized handler
Rather than throwing HTTPException scattered all over the handler, it's cleaner to have the domain layer throw a bare domain exception and translate it to HTTP at the boundary (separation of concerns).
import logging
from fastapi import Request
from fastapi.responses import JSONResponse
logger = logging.getLogger("app")
class DomainError(Exception):
"""業務ルール違反。messageはユーザーに見せてよい前提で書く。"""
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
@app.exception_handler(DomainError)
async def domain_error_handler(request: Request, exc: DomainError):
# 4xx: クライアント起因。messageはそのまま返してよい
return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})
@app.exception_handler(Exception)
async def unhandled_error_handler(request: Request, exc: Exception):
# 5xx: 想定外。詳細はログにだけ残し、クライアントには汎用メッセージ
logger.exception("unhandled error", extra={"request_id": getattr(request.state, "request_id", None)})
return JSONResponse(status_code=500, content={"detail": "内部エラーが発生しました"})
logger.exception leaves the stack trace only in the logs, and fixes the response to "an internal error occurred." The server knows what fell over, and it doesn't leak to the client—this is production error design.
6. Observability: request ID, structured logs, OpenTelemetry
Half of an "API that doesn't fall over" is "an API that can immediately trace the cause when it does fall over." The minimum equipment to make logs useful in production is three—request ID, structured logs, distributed tracing.
6.1 Request-ID middleware
With an ID running through one request, you can reconstruct one piece of processing across multiple log lines. Inherit it if there's an incoming header (continuing the trace from upstream), and number it if not.
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# 上流が付けた ID を引き継ぐ。なければ採番(分散トレースの連結)
request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["x-request-id"] = request_id # 呼び出し側にも返す
return response
app.add_middleware(RequestIDMiddleware)
6.2 Structured logs (JSON)
Production logs should be machine-readable. Emitting request_id / method / path / status / duration in JSON lets you filter and aggregate in a logging foundation (Cloud Logging, etc.).
import json, logging, time
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"level": record.levelname,
"message": record.getMessage(),
"request_id": getattr(record, "request_id", None),
}
return json.dumps(payload, ensure_ascii=False)
# アクセスログをミドルウェアで(所要時間つき)
class AccessLogMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"request",
extra={"request_id": getattr(request.state, "request_id", None)},
)
# ⚠️ ここで body・メールアドレス・トークンなどPIIをログに入れない
response.headers["x-response-time-ms"] = f"{elapsed_ms:.1f}"
return response
Don't leave PII in logs: flowing the request body, email addresses, auth tokens, or names into structured logs is an accident. Record only metadata (ID, status, duration, error type). It's an absolute condition for internal-control projects, and was enforced in the broadcaster platform too.
6.3 The key points of OpenTelemetry integration
Processing spanning multiple services (FastAPI → DB → external API → job-execution foundation) can't be traced by logs alone. With OpenTelemetry auto-instrumentation, take a trace running through the request. The key point is just this.
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# 起動時に一度だけ。HTTPサーバ・クライアント・DBのスパンが自動で繋がる
FastAPIInstrumentor.instrument_app(app)
Install opentelemetry-instrumentation-fastapi and instrument with one line, and which span ate the time is visualized in the trace. Tie request_id to the trace ID and you can cross-reference logs and traces.
7. Heavy processing: the limits of BackgroundTasks and how to offload to a task queue
This is the watershed of production design. "Sending mail," "generating thumbnails," "long-running AI jobs"—where do you process these?
7.1 What BackgroundTasks can and can't do
FastAPI's BackgroundTasks is a mechanism that runs after returning the response, in the same process. It's suited only to "light, short, non-fatal-if-failed" side effects.
from fastapi import BackgroundTasks
@app.post("/users", status_code=201)
async def create_user(payload: UserCreate, bg: BackgroundTasks, db: DbDep):
user = await repository.insert_user(db, payload)
# 軽い後処理ならOK:ウェルカムメール送信など
bg.add_task(send_welcome_email, user.email)
return {"id": user.id}
The limits of BackgroundTasks are clear.
- It runs in the same process: it eats the worker's CPU and memory. Loading heavy processing worsens the API's latency.
- It's not persisted: if the process dies, the task disappears. No retries, no progress tracking.
- It doesn't scale: the API's scale (the number of replicas) and the processing capacity get coupled.
7.2 Decision table: BackgroundTasks or a real task queue
| Aspect | BackgroundTasks is enough | A real task queue (Celery / Cloud Run Jobs / Workflows) |
|---|---|---|
| Duration | Hundreds of ms to a few seconds | Tens of seconds to tens of minutes, long-running jobs |
| On failure | Acceptable to disappear | Retry, resume, progress tracking are essential |
| Resources | Light I/O (mail, notification) | CPU/GPU-intensive (video, OCR, ASR, LLM) |
| Scale | OK together with the API | Want to scale independently of the API |
| Visibility | Unneeded | Per-job state management needed |
The decision criterion is one line. If any of "I want to retry on failure," "it takes tens of seconds or more," "it eats CPU/GPU" applies, graduate from in-process BackgroundTasks and offload to a persistent queue / job-execution foundation (the flip side of YAGNI: don't bring in a queue until needed, but bring it in without hesitation once needed).
7.3 A real example: separating a long-running AI job from FastAPI
In the broadcaster platform, for caption typo detection, extract OCR (on-screen text) and ASR (speech transcription) from video and cross-check them—a heavy job that obviously takes minutes to tens of minutes. Running this inside a FastAPI request or in BackgroundTasks is out of the question. The API clogs, and the job disappears every deploy.
So I separated the configuration this way.
- FastAPI (async): concentrates on reception, auth, validation, starting the job, and returning status. Returns the request immediately.
- Cloud Run Jobs: runs the actual heavy processing (OCR / ASR / cross-check) on an execution foundation that scales independently of the API.
- Cloud Workflows: orchestration of multiple steps. Parallelized independent processes, shortening processing that took 18 minutes sequentially by about 30% (13 minutes in parallel).
@app.post("/jobs", status_code=202) # 202 Accepted:受け付けたが完了はしていない
async def enqueue_job(req: JobRequest, current_user: CurrentUser, db: DbDep):
# APIの仕事は「検証して、ジョブを起動して、追跡用IDを返す」まで
job = await repository.create_job(db, owner=current_user.id, spec=req)
await workflows_client.start_execution(job_id=job.id, spec=req.model_dump())
# 重い処理はここで待たない。クライアントは job.id でステータスをポーリング
return {"job_id": job.id, "status": "queued"}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str, current_user: CurrentUser, db: DbDep):
job = await repository.find_job(db, job_id, owner=current_user.id)
if job is None:
raise HTTPException(404, "ジョブが見つかりません")
return {"job_id": job.id, "status": job.status, "progress": job.progress}
The API is "a thin layer that returns fast," the heaviness is on an external execution foundation"—just by drawing this boundary, the API becomes harder to fall over, and jobs can scale and retry independently. 202 Accepted + status polling (or a webhook) is the straightforward pattern for an async job API.
8. Testing: TestClient and httpx AsyncClient, dependency overrides
There's no production launch without a verification path. FastAPI is designed to be easy to test.
8.1 Synchronous testing: TestClient
The official basic form is fastapi.testclient.TestClient. The test function is a plain def, callable without await (it drives ASGI internally).
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_create_user_validates_input():
# 不正な入力(age が負)は 422 で弾かれるはず
res = client.post("/users", json={"email": "x@example.com", "display_name": "Yu", "age": -1})
assert res.status_code == 422
def test_create_user_ok():
res = client.post("/users", json={"email": "yu@example.com", "display_name": "Yu", "age": 30})
assert res.status_code == 201
assert "id" in res.json()
8.2 Async testing: httpx AsyncClient + ASGITransport
When you want to test async code that hits the DB with await directly, not over HTTP, connect httpx's AsyncClient to the app with ASGITransport, as the official docs guide.
import pytest
import httpx
from httpx import ASGITransport
from app.main import app
@pytest.mark.anyio
async def test_me_async():
transport = ASGITransport(app=app) # アプリを直接ドライブ(実ネットワーク不要)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as ac:
res = await ac.get("/me", headers={"Authorization": "Bearer faketoken"})
assert res.status_code in (200, 401)
8.3 Dependency overrides: dependency_overrides
Here's the biggest return of DI. Swap the real DB or auth wholesale for a test fake. You can separate external dependencies, so tests become fast and deterministic.
async def override_get_db():
# 本物の get_db の代わりに、テスト用のインメモリ/トランザクションロールバックなセッションを返す
async with test_session_factory() as session:
yield session
def fake_current_user() -> UserPublic:
return UserPublic(id=1, email="test@example.com", display_name="Test", created_at=datetime.now())
# キーは「元の依存関数」、値は「差し替え関数」
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_current_user] = fake_current_user
def test_read_me_with_fake_auth():
res = client.get("/me")
assert res.status_code == 200
assert res.json()["email"] == "test@example.com"
# テスト後は必ずクリアして、他テストへの汚染を防ぐ
app.dependency_overrides.clear()
Carving auth, DB, and external APIs out into dependencies lets you pinpoint-fake them with dependency_overrides. The investment of making auth a Depends in Chapter 3 is recovered here.
9. Deployment: uvicorn / gunicorn and graceful shutdown
9.1 Worker configuration
Run the ASGI server uvicorn under a process manager with multiple workers in production. Classically, gunicorn + uvicorn.workers.UvicornWorker, or uvicorn --workers N.
# uvicorn 単体で複数ワーカー(シンプルな構成)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
# gunicorn をプロセスマネージャに、各ワーカーを uvicorn で(堅牢な定番)
gunicorn app.main:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 30 \
--graceful-timeout 30
The rule of thumb for worker count is based on the number of CPU cores, adjusted by the workload (I/O-bound or CPU-bound). Increasing them blindly just eats memory—the DB pool and HTTP client acquired in lifespan are replicated per worker, so design so that worker count × pool size doesn't exceed the DB's max connections (the typical connection-exhaustion accident).
9.2 Graceful shutdown
So you don't drop requests every deploy, make shutdown gentle. gunicorn's --graceful-timeout is "the grace to wait for in-flight requests," and on a container foundation (Cloud Run / Kubernetes) it receives SIGTERM, handles existing requests within the grace, and then exits. At this time Chapter 4's lifespan shutdown processing (engine.dispose() / http.aclose()) runs, and it returns the connections cleanly before falling. This is the finishing touch for "not emitting 5xx even during a deploy."
A word on cost efficiency: an I/O-bound API can take high concurrency per worker, so using
asynccorrectly gets high throughput with few instances. Conversely, holding CPU-bound processing in the API process gets only as much parallelism as the core count, forcing you to increase instances wastefully. Chapter 7's "offload heavy processing outside" is tied not only to reliability but to the bill.
10. Summary: a production FastAPI cheat sheet
A quick reference for when you're unsure.
async defordef: an async library youawait→async def. A synchronous library,requests,time.sleep→def(FastAPI offloads to the thread pool). When in doubt,def. Don't call synchronous blocking insideasync def.- Validation: split input/output models, and kill it at the boundary with
Fieldconstraints andfield_validator. Model external-API JSON immediately withmodel_validate. Consider strict for untrusted input. - DI: DB session, auth, config with
Depends+Annotated. Clean up withyielddependencies. Design so you candependency_overridesin tests. - Startup cost: acquire DB pools, models, and HTTP clients only once with
lifespanand share viaapp.state. Don't use@app.on_event. - Errors: return 4xx immediately, 5xx with a generic message + structured logs. Don't return stack traces to the client.
- Observability: request-ID attachment → structured (JSON) logs → OpenTelemetry instrumentation. Don't emit PII in logs.
- Heavy processing: if tens of seconds or more / retry-required / CPU/GPU-intensive, graduate from
BackgroundTasksto Celery / Cloud Run Jobs / Workflows. The API returns thin with202 Accepted. - Testing: sync with
TestClient, async withhttpx.AsyncClient+ASGITransport. Swap dependencies withdependency_overrides. - Deployment: multiple workers with
gunicorn+UvicornWorker, gracefully with--graceful-timeoutandlifespanshutdown processing. Worker count × pool size ≤ DB max connections.
FastAPI is a "working in 5 minutes" framework, but production quality is decided by boundary design. Kill external input with types, offload cross-cutting concerns to dependencies, push heaviness out to an external execution foundation, and make everything traceable by ID—none of it is flashy, but this accumulation creates an "API that doesn't fall over, can be traced, is easy to change."
On the in-house AI platform for a broadcaster, I designed FastAPI as a thin reception layer for long-running AI jobs, separated heavy processing to Cloud Run Jobs / Cloud Workflows, and put it on production operation with observability and idempotency guaranteed. With generative AI (Claude Code) as my partner, my approach is to build fast and cheaply, solo while guaranteeing quality with verification gates.
"I want to put this API in production with FastAPI, but how should I design the use of async, job separation, and observability?"—I'll accompany you end-to-end, from that decision through implementation and operation. Feel free to reach out, even from the requirements-organizing stage.
References (official documentation)
- FastAPI official top — the design philosophy and feature list built on type hints, Pydantic, and Starlette
- Concurrency and async / await (FastAPI) — the rule for using
async defanddef, thread-pool behavior - Dependencies (FastAPI) — dependency injection with
Depends/Annotated - Lifespan Events (FastAPI) — startup/shutdown processing with
@asynccontextmanager - Testing (FastAPI) — testing with
TestClientand httpx AsyncClient - Pydantic official documentation — Pydantic v2 models, validation,
Field,field_validator