Skip to main content
友田 陽大
Python backend
Python
FastAPI
アーキテクチャ設計
可観測性
パフォーマンス
型安全

FastAPI Production-Operations Guide: Building APIs That Don't Fall Over with the Right Use of async, Pydantic v2 Boundary Validation, DI, and Observability

An implementation guide to operating FastAPI at production quality. Faithful to the official documentation, it explains the use of async def / def, Pydantic v2 boundary validation, dependency injection with Depends, structured logs and OpenTelemetry observability, the limits of BackgroundTasks and how to offload to a task queue, and testing and deployment—all in real code.

Published
Reading time
22 min read
Author
友田 陽大
Share
Contents

"I want to stand up an API in FastAPI"—as a requirement it's one line. Write @app.get and you have something working in 5 minutes. But the moment you try to put it in production, the things to decide multiply at once. Do you write it with async def or def? Where do you validate external input? How do you inject the DB session? Is it OK to run a heavy AI job inside a request? When it falls over, can you trace the cause from the logs?

This article is an implementation guide to operating FastAPI at production quality. Whereas the official tutorial carefully teaches you up to "making it work," here I focus on the decision criteria and code for building "doesn't fall over, can be traced, easy to change." As source material, I'll weave in design decisions from the in-house AI platform I built for a major Japanese broadcaster (the caption typo-detection pipeline; placing FastAPI as the API layer and separating long-running AI jobs to Cloud Run Jobs / Cloud Workflows).

The rules of this article: API specs and recommended patterns are based on the FastAPI / Pydantic official documentation (as of June 2026). Specs are revised, so always confirm the latest behavior in the official docs before going to production. The code is arranged in a form usable in real operation, but secrets (DB URLs, API keys, signing keys) are assumed to be in environment variables (never hardcode).


0. Why FastAPI can be called "fast and safe" in one line

Before design decisions, grasp what FastAPI stands on. The official docs cite these three as its foundation.

  • Standard Python type hints: just by writing parameter types, validation, conversion, and OpenAPI doc generation come along automatically.
  • Pydantic: data validation and parsing. The core validation logic is written in Rust, and the official docs say it's "one of the fastest-class data-validation libraries in Python."
  • Starlette: the ASGI web layer. The part that supports, in the official phrasing, "performance on par with NodeJS and Go."

In other words, FastAPI's "fast" and "fewer bugs" come from the design of getting validation, conversion, and documentation at once with a single type hint. This article aims to leverage this design philosophy to the fullest in the production-operations context.


1. Most important: the right use of async def and def

Get this wrong and you build either an API that doesn't get throughput or an API that occasionally freezes. It's FastAPI's biggest pitfall and biggest leverage.

1.1 The official "for people in a hurry" rule

Let me quote the official documentation's (/async/) decision criteria as-is.

  1. If you use an external library you're told to call with awaitasync def

    @app.get("/")
    async def read_results():
        results = await some_library()
        return results
    
  2. If it communicates externally but the library isn't await-capable (many DB libraries currently are) → a plain def

    @app.get("/")
    def results():
        results = some_library()
        return results
    
  3. If it communicates with nothing and doesn't need to wait → async def (no need to use await inside)

  4. If you can't decide → a plain def

1.2 Why it doesn't get slow even with def

When you write a path-operation function as a plain def, FastAPI runs it in an external thread pool and awaits it (because calling it directly would block the server). This applies not only to path-operation functions but also to dependencies (Depends).

This is the point that decisively differs from other async frameworks.

  • A blocking call in def: FastAPI offloads it to the thread pool, so the event loop doesn't stop.
  • A blocking call inside async def: it runs on the event loop as-is, so all requests freeze.

1.3 The most common accident: calling synchronous blocking inside async def

This is the most frequent, and hardest to notice, bug on the front line.

import time
import requests  # 同期ライブラリ(await できない)

@app.get("/bad")
async def bad_endpoint():
    # ❌ async def の中で同期ブロッキング I/O を呼んでいる
    # この requests.get が返るまで、サーバー全体の処理が止まる
    res = requests.get("https://slow-upstream.example.com/data")
    time.sleep(1)  # ❌ これも同様にイベントループを丸ごと止める
    return res.json()

It's declared async def but the inside is synchronous. During this one request's requests.get / time.sleep, all other requests the same worker should handle are blocked. This is the typical case where "latency collapses the moment you raise concurrent connections" in a load test.

There are two fix policies. (A) Make it def and leave it to the thread pool, or (B) switch to an async-capable library and await.

import anyio
import httpx  # async 対応の HTTP クライアント

# (A) 同期ライブラリを使い続けるなら def にする(FastAPIがスレッドプールへ逃がす)
@app.get("/ok-sync")
def ok_sync_endpoint():
    res = requests.get("https://slow-upstream.example.com/data")
    return res.json()

# (B) async ネイティブのライブラリで正しく await する
@app.get("/ok-async")
async def ok_async_endpoint():
    async with httpx.AsyncClient() as client:
        res = await client.get("https://slow-upstream.example.com/data")
    return res.json()

# どうしても async def の中で重い同期処理を呼ぶしかないときは、明示的にスレッドへ逃がす
@app.get("/ok-offload")
async def ok_offload_endpoint():
    # anyio.to_thread.run_sync でブロッキング関数をスレッドプールへ
    data = await anyio.to_thread.run_sync(blocking_cpu_or_io_work)
    return data

1.4 Decision table: async def or def

Your I/OFunction declarationReason
An async library called with await (httpx, asyncpg, async SDK)async defNatively non-blocking. Can await
A synchronous DB driver, requests, time.sleep, a synchronous SDKdefFastAPI offloads to the thread pool and doesn't stop the loop
Heavy CPU-bound computation (image processing, crypto, ML inference)def (or a separate process/queue)Even threads are affected by the GIL. If heavy, the queue in Chapter 7
Pure logic that doesn't communicate externallyasync defNo wait, so anything is fine. The official recommends async
Can't decidedefThe official "when in doubt, def." The lowest accident rate

Design guideline (KISS): "Make everything async def and it's fast" is wrong. Use async def only "when you truly await inside." If the inside is synchronous, plainly make it def and leave it to FastAPI—that's the choice with the fewest accidents and the most speed.


2. Pydantic v2: kill external input at the boundary

The essence of an API is "a checkpoint that converts untrusted data coming from the outside world into the inside's trustworthy types." Pydantic v2 lets you build that checkpoint with a single type hint. Trust external input not at all—this is the first principle of security.

2.1 Split request / response models

Make the input and output models different things. Don't let input include id or created_at, and don't leak the password hash in output—this is separation of concerns (SRP) and security too.

from datetime import datetime
from pydantic import BaseModel, EmailStr, Field, field_validator

# --- 入力(クライアントが送ってよいものだけ) ---
class UserCreate(BaseModel):
    email: EmailStr                                  # 形式不正は422で弾かれる
    display_name: str = Field(min_length=1, max_length=50)
    age: int = Field(ge=0, le=150)                   # 範囲制約(負の年齢を入れさせない)

    @field_validator("display_name")
    @classmethod
    def no_control_chars(cls, v: str) -> str:
        # ドメイン固有のルールはvalidatorで明示的に検証する
        if any(ord(c) < 0x20 for c in v):
            raise ValueError("制御文字は使用できません")
        return v.strip()

# --- 出力(サーバーが返してよいものだけ。秘密は含めない) ---
class UserPublic(BaseModel):
    id: int
    email: EmailStr
    display_name: str
    created_at: datetime
    # password_hash は意図的に含めない → 漏洩を型で防ぐ

Using Field's constraints (min_length / max_length / ge / le / gt, etc.) and field_validator consolidates the business rules in the model definition. The validation logic doesn't scatter into the handler body, so it's readable and easy to change (ETC).

2.2 Use it in the handler: bind the output with a type too via response_model

from fastapi import FastAPI

app = FastAPI()

@app.post("/users", response_model=UserPublic, status_code=201)
async def create_user(payload: UserCreate) -> UserPublic:
    # payload はここに来た時点で「検証済み」。中で再検証は不要(DRY)
    user = await repository.insert_user(payload)
    # response_model=UserPublic により、余分なフィールドは自動で削ぎ落とされる
    return user

Just by writing payload: UserCreate, an invalid request is rejected with 422 before it reaches the handler. You don't write a single line of validation code in the handler—this is the substance of "validation comes with a single type hint." Attach response_model and the output side is also shaped to the contract, structurally preventing the leakage of secret fields.

2.3 Data coming from "non-type places": model_validate / model_validate_json

External API responses, message-queue payloads, config files—data that doesn't pass through a FastAPI handler argument must always be validated before letting it inside.

import httpx

async def fetch_external_user(user_id: str) -> UserPublic:
    async with httpx.AsyncClient() as client:
        res = await client.get(f"https://upstream.example.com/users/{user_id}")
        res.raise_for_status()
    # 外部APIのJSONも「信用できない外部入力」。dictで持ち回らず即座にモデル化する
    return UserPublic.model_validate(res.json())     # dict → 検証済みモデル
    # JSON文字列を直接渡すなら model_validate_json(res.text) が一手少ない

A Pydantic v2 note: there are strict mode (no type conversion) and lax mode (coercing like "123"123). Consider strict for untrusted external input. Implicit conversions like "true" turning into True become a breeding ground for unexpected bugs. On validation failure, ValidationError returns which field failed and why in a structured way, so the error log itself becomes root-cause analysis.


3. Dependency injection (Depends): inject DB session, auth, config

Depends is the highest-value feature in FastAPI. You declare "what the handler needs," and the framework prepares and injects it. You can separate the DB session, the authenticated user, and config from the handler body (SRP).

Since FastAPI 0.95.0, the official docs recommend the way using Annotated. The standard is to fold dependencies into a type alias and reuse them.

from typing import Annotated
from fastapi import Depends

# 設定(環境変数から読む。シークレットはここに集約し、ハードコードしない)
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    jwt_secret: str
    class Config:
        env_file = ".env"

from functools import lru_cache

@lru_cache  # 設定は毎回読み直さず、プロセス内で1つ(コスト効率)
def get_settings() -> Settings:
    return Settings()  # 環境変数 DATABASE_URL / JWT_SECRET を読む

SettingsDep = Annotated[Settings, Depends(get_settings)]

3.2 Cleanup with yield dependencies (the standard for DB sessions)

A dependency using yield acquires before the yield and cleans up after the response. Closing the DB session is the royal road of this pattern.

from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

# lifespan で作ったエンジンから sessionmaker を用意(第4章参照)
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    session_factory: async_sessionmaker = request.app.state.session_factory
    async with session_factory() as session:   # yield の前 = 確保
        yield session                           # ハンドラへ注入
        # yield の後 = 後始末。async with が確実にクローズする(リーク防止)

DbDep = Annotated[AsyncSession, Depends(get_db)]

3.3 Make auth a dependency too

Auth is "a cross-cutting concern repeated in every handler." Carve it into a dependency, and each handler just receives current_user (DRY).

from fastapi import HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

bearer = HTTPBearer()

async def get_current_user(
    creds: Annotated[HTTPAuthorizationCredentials, Depends(bearer)],
    settings: SettingsDep,
    db: DbDep,
) -> UserPublic:
    # トークン検証は外部入力の検証そのもの。失敗は401で即座に返す
    user = await verify_jwt(creds.credentials, settings.jwt_secret, db)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="認証に失敗しました",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

CurrentUser = Annotated[UserPublic, Depends(get_current_user)]


@app.get("/me", response_model=UserPublic)
async def read_me(current_user: CurrentUser) -> UserPublic:
    return current_user  # 認証ロジックはハンドラに一切ない

A dependency can be written async def or def independently of the handler (the official docs state this clearly). At test time it can be swapped wholesale (Chapter 8's dependency_overrides)—this is the biggest return of DI.


4. lifespan: acquire heavy resources only once at startup

Don't create DB pools, ML models, or HTTP clients per request. Establishing connections and loading models cost seconds. With the officially-recommended lifespan, acquire them only once at startup and share them within the worker.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- 起動時(yield の前。一度だけ実行される) ---
    settings = get_settings()
    engine = create_async_engine(settings.database_url, pool_size=10, max_overflow=20)
    app.state.session_factory = async_sessionmaker(engine, expire_on_commit=False)
    app.state.http = httpx.AsyncClient(timeout=10.0)  # 使い回す共有クライアント
    # ML モデルなど重い資源もここでロードして app.state に置く

    yield  # ← ここでアプリが起動し、リクエストを受け始める

    # --- 終了時(yield の後。グレースフルシャットダウン時に一度だけ) ---
    await app.state.http.aclose()
    await engine.dispose()  # コネクションを綺麗に返す(リーク・接続枯渇の防止)

app = FastAPI(lifespan=lifespan)

Important: the old @app.on_event("startup") / @app.on_event("shutdown") are deprecated. With lifespan, the startup and shutdown logic gathers in one place, and you can share state across the yield. In new code, always use lifespan.

In the broadcaster platform too, the AI models, GCP clients, and DB pool were all acquired with lifespan and shared by placing them on app.state. Just by stopping per-request acquisition, p99 latency and the cold initial response stabilize dramatically (cost efficiency, performance).


5. Error handling: 4xx immediately, 5xx with structured logs

The principle of error handling is simple. Errors due to external input (things the client can fix) are returned immediately as 4xx. Server-side unexpected things (things the client can't fix) return 5xx, and the details go only into structured logs. Don't return stack traces or internal exception messages to the client (information leakage).

5.1 Business errors are HTTPException

from fastapi import HTTPException, status

@app.get("/users/{user_id}", response_model=UserPublic)
async def get_user(user_id: int, db: DbDep) -> UserPublic:
    user = await repository.find_user(db, user_id)
    if user is None:
        # 「存在しない」はクライアントが知るべき情報 → 404で即時
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="ユーザーが見つかりません")
    return user

5.2 Translate domain exceptions to HTTP in a centralized handler

Rather than throwing HTTPException scattered all over the handler, it's cleaner to have the domain layer throw a bare domain exception and translate it to HTTP at the boundary (separation of concerns).

import logging
from fastapi import Request
from fastapi.responses import JSONResponse

logger = logging.getLogger("app")

class DomainError(Exception):
    """業務ルール違反。messageはユーザーに見せてよい前提で書く。"""
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code

@app.exception_handler(DomainError)
async def domain_error_handler(request: Request, exc: DomainError):
    # 4xx: クライアント起因。messageはそのまま返してよい
    return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})

@app.exception_handler(Exception)
async def unhandled_error_handler(request: Request, exc: Exception):
    # 5xx: 想定外。詳細はログにだけ残し、クライアントには汎用メッセージ
    logger.exception("unhandled error", extra={"request_id": getattr(request.state, "request_id", None)})
    return JSONResponse(status_code=500, content={"detail": "内部エラーが発生しました"})

logger.exception leaves the stack trace only in the logs, and fixes the response to "an internal error occurred." The server knows what fell over, and it doesn't leak to the client—this is production error design.


6. Observability: request ID, structured logs, OpenTelemetry

Half of an "API that doesn't fall over" is "an API that can immediately trace the cause when it does fall over." The minimum equipment to make logs useful in production is three—request ID, structured logs, distributed tracing.

6.1 Request-ID middleware

With an ID running through one request, you can reconstruct one piece of processing across multiple log lines. Inherit it if there's an incoming header (continuing the trace from upstream), and number it if not.

import uuid
from starlette.middleware.base import BaseHTTPMiddleware

class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 上流が付けた ID を引き継ぐ。なければ採番(分散トレースの連結)
        request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
        request.state.request_id = request_id
        response = await call_next(request)
        response.headers["x-request-id"] = request_id  # 呼び出し側にも返す
        return response

app.add_middleware(RequestIDMiddleware)

6.2 Structured logs (JSON)

Production logs should be machine-readable. Emitting request_id / method / path / status / duration in JSON lets you filter and aggregate in a logging foundation (Cloud Logging, etc.).

import json, logging, time

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "level": record.levelname,
            "message": record.getMessage(),
            "request_id": getattr(record, "request_id", None),
        }
        return json.dumps(payload, ensure_ascii=False)

# アクセスログをミドルウェアで(所要時間つき)
class AccessLogMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start = time.perf_counter()
        response = await call_next(request)
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.info(
            "request",
            extra={"request_id": getattr(request.state, "request_id", None)},
        )
        # ⚠️ ここで body・メールアドレス・トークンなどPIIをログに入れない
        response.headers["x-response-time-ms"] = f"{elapsed_ms:.1f}"
        return response

Don't leave PII in logs: flowing the request body, email addresses, auth tokens, or names into structured logs is an accident. Record only metadata (ID, status, duration, error type). It's an absolute condition for internal-control projects, and was enforced in the broadcaster platform too.

6.3 The key points of OpenTelemetry integration

Processing spanning multiple services (FastAPI → DB → external API → job-execution foundation) can't be traced by logs alone. With OpenTelemetry auto-instrumentation, take a trace running through the request. The key point is just this.

from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

# 起動時に一度だけ。HTTPサーバ・クライアント・DBのスパンが自動で繋がる
FastAPIInstrumentor.instrument_app(app)

Install opentelemetry-instrumentation-fastapi and instrument with one line, and which span ate the time is visualized in the trace. Tie request_id to the trace ID and you can cross-reference logs and traces.


7. Heavy processing: the limits of BackgroundTasks and how to offload to a task queue

This is the watershed of production design. "Sending mail," "generating thumbnails," "long-running AI jobs"—where do you process these?

7.1 What BackgroundTasks can and can't do

FastAPI's BackgroundTasks is a mechanism that runs after returning the response, in the same process. It's suited only to "light, short, non-fatal-if-failed" side effects.

from fastapi import BackgroundTasks

@app.post("/users", status_code=201)
async def create_user(payload: UserCreate, bg: BackgroundTasks, db: DbDep):
    user = await repository.insert_user(db, payload)
    # 軽い後処理ならOK:ウェルカムメール送信など
    bg.add_task(send_welcome_email, user.email)
    return {"id": user.id}

The limits of BackgroundTasks are clear.

  • It runs in the same process: it eats the worker's CPU and memory. Loading heavy processing worsens the API's latency.
  • It's not persisted: if the process dies, the task disappears. No retries, no progress tracking.
  • It doesn't scale: the API's scale (the number of replicas) and the processing capacity get coupled.

7.2 Decision table: BackgroundTasks or a real task queue

AspectBackgroundTasks is enoughA real task queue (Celery / Cloud Run Jobs / Workflows)
DurationHundreds of ms to a few secondsTens of seconds to tens of minutes, long-running jobs
On failureAcceptable to disappearRetry, resume, progress tracking are essential
ResourcesLight I/O (mail, notification)CPU/GPU-intensive (video, OCR, ASR, LLM)
ScaleOK together with the APIWant to scale independently of the API
VisibilityUnneededPer-job state management needed

The decision criterion is one line. If any of "I want to retry on failure," "it takes tens of seconds or more," "it eats CPU/GPU" applies, graduate from in-process BackgroundTasks and offload to a persistent queue / job-execution foundation (the flip side of YAGNI: don't bring in a queue until needed, but bring it in without hesitation once needed).

7.3 A real example: separating a long-running AI job from FastAPI

In the broadcaster platform, for caption typo detection, extract OCR (on-screen text) and ASR (speech transcription) from video and cross-check them—a heavy job that obviously takes minutes to tens of minutes. Running this inside a FastAPI request or in BackgroundTasks is out of the question. The API clogs, and the job disappears every deploy.

So I separated the configuration this way.

  • FastAPI (async): concentrates on reception, auth, validation, starting the job, and returning status. Returns the request immediately.
  • Cloud Run Jobs: runs the actual heavy processing (OCR / ASR / cross-check) on an execution foundation that scales independently of the API.
  • Cloud Workflows: orchestration of multiple steps. Parallelized independent processes, shortening processing that took 18 minutes sequentially by about 30% (13 minutes in parallel).
@app.post("/jobs", status_code=202)  # 202 Accepted:受け付けたが完了はしていない
async def enqueue_job(req: JobRequest, current_user: CurrentUser, db: DbDep):
    # APIの仕事は「検証して、ジョブを起動して、追跡用IDを返す」まで
    job = await repository.create_job(db, owner=current_user.id, spec=req)
    await workflows_client.start_execution(job_id=job.id, spec=req.model_dump())
    # 重い処理はここで待たない。クライアントは job.id でステータスをポーリング
    return {"job_id": job.id, "status": "queued"}

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str, current_user: CurrentUser, db: DbDep):
    job = await repository.find_job(db, job_id, owner=current_user.id)
    if job is None:
        raise HTTPException(404, "ジョブが見つかりません")
    return {"job_id": job.id, "status": job.status, "progress": job.progress}

The API is "a thin layer that returns fast," the heaviness is on an external execution foundation"—just by drawing this boundary, the API becomes harder to fall over, and jobs can scale and retry independently. 202 Accepted + status polling (or a webhook) is the straightforward pattern for an async job API.


8. Testing: TestClient and httpx AsyncClient, dependency overrides

There's no production launch without a verification path. FastAPI is designed to be easy to test.

8.1 Synchronous testing: TestClient

The official basic form is fastapi.testclient.TestClient. The test function is a plain def, callable without await (it drives ASGI internally).

from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_create_user_validates_input():
    # 不正な入力(age が負)は 422 で弾かれるはず
    res = client.post("/users", json={"email": "x@example.com", "display_name": "Yu", "age": -1})
    assert res.status_code == 422

def test_create_user_ok():
    res = client.post("/users", json={"email": "yu@example.com", "display_name": "Yu", "age": 30})
    assert res.status_code == 201
    assert "id" in res.json()

8.2 Async testing: httpx AsyncClient + ASGITransport

When you want to test async code that hits the DB with await directly, not over HTTP, connect httpx's AsyncClient to the app with ASGITransport, as the official docs guide.

import pytest
import httpx
from httpx import ASGITransport
from app.main import app

@pytest.mark.anyio
async def test_me_async():
    transport = ASGITransport(app=app)  # アプリを直接ドライブ(実ネットワーク不要)
    async with httpx.AsyncClient(transport=transport, base_url="http://test") as ac:
        res = await ac.get("/me", headers={"Authorization": "Bearer faketoken"})
    assert res.status_code in (200, 401)

8.3 Dependency overrides: dependency_overrides

Here's the biggest return of DI. Swap the real DB or auth wholesale for a test fake. You can separate external dependencies, so tests become fast and deterministic.

async def override_get_db():
    # 本物の get_db の代わりに、テスト用のインメモリ/トランザクションロールバックなセッションを返す
    async with test_session_factory() as session:
        yield session

def fake_current_user() -> UserPublic:
    return UserPublic(id=1, email="test@example.com", display_name="Test", created_at=datetime.now())

# キーは「元の依存関数」、値は「差し替え関数」
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_current_user] = fake_current_user

def test_read_me_with_fake_auth():
    res = client.get("/me")
    assert res.status_code == 200
    assert res.json()["email"] == "test@example.com"

# テスト後は必ずクリアして、他テストへの汚染を防ぐ
app.dependency_overrides.clear()

Carving auth, DB, and external APIs out into dependencies lets you pinpoint-fake them with dependency_overrides. The investment of making auth a Depends in Chapter 3 is recovered here.


9. Deployment: uvicorn / gunicorn and graceful shutdown

9.1 Worker configuration

Run the ASGI server uvicorn under a process manager with multiple workers in production. Classically, gunicorn + uvicorn.workers.UvicornWorker, or uvicorn --workers N.

# uvicorn 単体で複数ワーカー(シンプルな構成)
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

# gunicorn をプロセスマネージャに、各ワーカーを uvicorn で(堅牢な定番)
gunicorn app.main:app \
  --worker-class uvicorn.workers.UvicornWorker \
  --workers 4 \
  --bind 0.0.0.0:8000 \
  --timeout 30 \
  --graceful-timeout 30

The rule of thumb for worker count is based on the number of CPU cores, adjusted by the workload (I/O-bound or CPU-bound). Increasing them blindly just eats memory—the DB pool and HTTP client acquired in lifespan are replicated per worker, so design so that worker count × pool size doesn't exceed the DB's max connections (the typical connection-exhaustion accident).

9.2 Graceful shutdown

So you don't drop requests every deploy, make shutdown gentle. gunicorn's --graceful-timeout is "the grace to wait for in-flight requests," and on a container foundation (Cloud Run / Kubernetes) it receives SIGTERM, handles existing requests within the grace, and then exits. At this time Chapter 4's lifespan shutdown processing (engine.dispose() / http.aclose()) runs, and it returns the connections cleanly before falling. This is the finishing touch for "not emitting 5xx even during a deploy."

A word on cost efficiency: an I/O-bound API can take high concurrency per worker, so using async correctly gets high throughput with few instances. Conversely, holding CPU-bound processing in the API process gets only as much parallelism as the core count, forcing you to increase instances wastefully. Chapter 7's "offload heavy processing outside" is tied not only to reliability but to the bill.


10. Summary: a production FastAPI cheat sheet

A quick reference for when you're unsure.

  • async def or def: an async library you awaitasync def. A synchronous library, requests, time.sleepdef (FastAPI offloads to the thread pool). When in doubt, def. Don't call synchronous blocking inside async def.
  • Validation: split input/output models, and kill it at the boundary with Field constraints and field_validator. Model external-API JSON immediately with model_validate. Consider strict for untrusted input.
  • DI: DB session, auth, config with Depends + Annotated. Clean up with yield dependencies. Design so you can dependency_overrides in tests.
  • Startup cost: acquire DB pools, models, and HTTP clients only once with lifespan and share via app.state. Don't use @app.on_event.
  • Errors: return 4xx immediately, 5xx with a generic message + structured logs. Don't return stack traces to the client.
  • Observability: request-ID attachment → structured (JSON) logs → OpenTelemetry instrumentation. Don't emit PII in logs.
  • Heavy processing: if tens of seconds or more / retry-required / CPU/GPU-intensive, graduate from BackgroundTasks to Celery / Cloud Run Jobs / Workflows. The API returns thin with 202 Accepted.
  • Testing: sync with TestClient, async with httpx.AsyncClient + ASGITransport. Swap dependencies with dependency_overrides.
  • Deployment: multiple workers with gunicorn + UvicornWorker, gracefully with --graceful-timeout and lifespan shutdown processing. Worker count × pool size ≤ DB max connections.

FastAPI is a "working in 5 minutes" framework, but production quality is decided by boundary design. Kill external input with types, offload cross-cutting concerns to dependencies, push heaviness out to an external execution foundation, and make everything traceable by ID—none of it is flashy, but this accumulation creates an "API that doesn't fall over, can be traced, is easy to change."

On the in-house AI platform for a broadcaster, I designed FastAPI as a thin reception layer for long-running AI jobs, separated heavy processing to Cloud Run Jobs / Cloud Workflows, and put it on production operation with observability and idempotency guaranteed. With generative AI (Claude Code) as my partner, my approach is to build fast and cheaply, solo while guaranteeing quality with verification gates.

"I want to put this API in production with FastAPI, but how should I design the use of async, job separation, and observability?"—I'll accompany you end-to-end, from that decision through implementation and operation. Feel free to reach out, even from the requirements-organizing stage.


References (official documentation)

友田

友田 陽大

Developer of a METI Minister's Award–winning product. With TypeScript + Python + AWS, I deliver SaaS, industry DX, and production-grade generative AI (RAG) end to end — from requirements to infrastructure and operations — single-handedly.

Got a challenge?

From design to implementation and operations — solo × generative AI

Implementation like this article's, end to end from requirements to production. Start with a free 30-minute technical consult and tell me about your situation.

Available for both project-based (contract) and advisory engagements. Start with a free 30-minute consult.

Also worth reading