# FastAPI WebSocket 本番実装ガイド：双方向リアルタイム通信を接続管理・認証・水平スケールまで作り込む

> FastAPIでWebSocketによる双方向リアルタイム通信を本番品質で実装するガイド。公式ドキュメント最新版に忠実な@app.websocket・accept・receive/send・WebSocketDisconnectに加え、ConnectionManagerでの接続管理とブロードキャスト、Pydanticでの境界検証、クエリ/サブプロトコルでのJWT認証、Redis Pub/Subでの水平スケール、ハートビート・再接続・可観測性まで実コードで解説します。

- 公開日: 2026-06-26
- 著者: 友田 陽大
- タグ: Python, FastAPI, WebSocket, リアルタイム, 可観測性
- URL: https://tomodahinata.com/blog/fastapi-websockets-realtime-production-guide

## 要点

- WebSocketは『双方向・低遅延・持続接続』が要るときの選択肢。サーバー→クライアントの一方向通知だけならSSE、たまの更新ならポーリングで十分。決定表で使い分ける
- 最小形は @app.websocket('/ws') → await websocket.accept() → while True で receive/send。切断は WebSocketDisconnect 例外で検知し、finally でクリーンアップする
- 受信メッセージは信用できない外部入力。receive_json + Pydanticで境界検証し、未知のtypeや過大サイズは弾く。複数クライアントは ConnectionManager（active_connections/connect/disconnect/broadcast）で束ねる
- ブラウザのWebSocketは Authorization ヘッダを任意に付けられない。JWTはクエリパラメータかサブプロトコル(Sec-WebSocket-Protocol)で渡し、accept前に検証。失敗時は WebSocketException(WS_1008_POLICY_VIOLATION)。Originも検証する
- in-memoryのConnectionManagerは単一プロセス限定。複数ワーカー/レプリカでは Redis Pub/Sub 等のブローカで全プロセスへ配信が必要。本番はサイズ上限・レート制限・ハートビート・指数バックオフ再接続・接続数メトリクスまで作り込む

---

「処理の進捗を画面にリアルタイムで出したい」「チャットや通知を即時に届けたい」——要件は素朴です。けれど素朴なリクエストの裏で、**接続が切れたまま放置される・全クライアントへの配信が片方のプロセスにしか届かない・誰でも他人のチャネルに繋げてしまう**といった事故が、本番で静かに起きます。WebSocket は「双方向で速い」反面、**HTTP の常識（ステートレス・リクエスト単位）が通用しない**領域だからです。

この記事は、FastAPI で**本番品質の WebSocket 双方向リアルタイム通信**を実装するためのガイドです。FastAPI 公式の WebSockets チュートリアルを**最新の公式仕様に忠実に**追いながら、公式が「最小例」として割愛する部分——**いつ WebSocket を選ぶか、受信メッセージの境界検証、認証の WebSocket 特有の制約、水平スケール、切断クリーンアップ・ハートビート・再接続**——まで踏み込みます。題材として、私が国内大手放送事業者向けに構築した社内AIプラットフォーム（[FastAPI 製の長時間AIジョブと進捗配信を運用](/case-studies/broadcaster-ai-content-platform)。テロップ誤字検出を `async` の長時間ジョブとして回し、その進捗を本番で配信した）での設計判断も交えます。なお同案件で**実際に進捗配信に使ったのは Firestore** です。本記事では WebSocket を「**同一アプリ内でリアルタイム配信する代替手段**」として、その判断軸ごと論じます（後述）。

> **この記事のルール**：API・コードは **FastAPI 公式ドキュメント（2026年6月時点）** に基づきます。FastAPI の WebSocket は内部的に Starlette の `WebSocket` を使い、`websockets` パッケージに依存します（`pip install websockets`）。本記事はこの最新版に準拠します。仕様は改定されるため、本番投入前に必ず公式で最新の挙動を確認してください。**シークレット（SECRET_KEY・Redis URL・JWT 鍵）は環境変数前提**（ハードコード厳禁）。そして **WebSocket 自体は通信を暗号化しません——本番は `wss://`（TLS）が必須**です。

---

## 0. まず判断：いつ WebSocket を選ぶのか（SSE / ポーリングとの違い）

WebSocket は強力ですが、**多くの「リアルタイム要件」は WebSocket を必要としません**。最初に立てるべき問いは1つです——**サーバーからクライアントへ送るだけか、双方向か**。

- **ポーリング（定期 fetch）**：クライアントが一定間隔で HTTP を叩く。実装が最も簡単で、既存の HTTP インフラ（CDN・認証・ロードバランサ）がそのまま効く。更新頻度が低く、数秒の遅延を許せるなら**これで十分**（KISS）。欠点は無駄なリクエストとサーバー負荷。
- **SSE（Server-Sent Events）**：サーバー→クライアントの**一方向**ストリーム。通常の HTTP レスポンスを開いたまま流し続ける方式で、**ブラウザが自動再接続**してくれる。進捗バー・通知・ライブフィードのような「サーバーが押し出すだけ」の用途に最適。HTTP/1.1 では**1ドメインあたりの同時接続数に制限**がある点に注意（HTTP/2 で緩和）。
- **WebSocket**：**双方向**の持続接続。クライアントもサーバーも、任意のタイミングでメッセージを送れる。チャット・共同編集・対戦ゲーム・双方向の制御チャネルに必要。代わりに**接続状態を自分で管理**し、認証・スケール・切断処理を自前で作り込む責任を負う。

| 観点 | ポーリング | SSE | WebSocket |
| --- | --- | --- | --- |
| 通信方向 | クライアント発のみ | サーバー→クライアント（単方向） | **双方向** |
| 遅延 | 間隔ぶんの遅延 | 低い | **最も低い** |
| 接続コスト | 都度接続（重い） | 1本を維持 | 1本を維持 |
| 自動再接続 | 不要（都度） | **ブラウザが標準で行う** | **自前で実装** |
| 認証/LB との相性 | HTTP のまま（容易） | HTTP のまま（容易） | アップグレード必要（要設計） |
| 向く用途 | 低頻度の更新 | 進捗・通知・ライブフィード | チャット・共同編集・ゲーム・制御 |

判断の目安は明快です——**「クライアントもサーバーへ送る」必要があるなら WebSocket。サーバーが押し出すだけなら、まず SSE を検討**してください。放送事業者向けプラットフォームで私が運用したテロップ誤字検出は、**長時間ジョブの進捗を押し出す（実質サーバー→クライアントの単方向）**用途でした。だからこそ WebSocket は必須ではなく、当時は**Firestore のリアルタイムリスナで配信**しています（マネージド・スケール・オフライン耐性が手に入る）。**「双方向か？」「マネージドに寄せられるか？」を問わずに WebSocket を選ぶのは、運用負債の自己発注**です。本記事は「**双方向が要る／自前アプリ内で配信したい**」ケースに対し、WebSocket を**本番品質で**作る道を扱います。

---

## 1. 最小の WebSocket エンドポイント

まず公式の最小形から。HTTP の `@app.get` に対応するのが **`@app.websocket`** です。

```python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # ハンドシェイクを受理する。これを呼ぶまで通信は始まらない。
    await websocket.accept()
    while True:
        # クライアントからの1メッセージを待つ（届くまでここで非同期に待機）
        data = await websocket.receive_text()
        # そのまま送り返す（エコー）
        await websocket.send_text(f"Message text was: {data}")
```

HTTP との違いが最初の3点です。

- **`async def` が前提**：WebSocket ハンドラは長時間生き続け、`await` で受信を待ちます。`receive_*` は**届くまでブロックせずに待機**する非同期処理です（同期関数では書けません）。
- **`await websocket.accept()` が必須**：クライアントのアップグレード要求を**受理**して初めて接続が確立します。`accept()` の前に**認証チェックを挟む**のが本番の鉄則です（第5章）。
- **`while True:` ループで生き続ける**：HTTP の「1リクエスト1レスポンス」と違い、1つの接続の中で何度もメッセージをやり取りします。ループを抜ける＝接続終了です。

> **要点**：`@app.websocket` のハンドラは「接続の一生」を表します。`accept()` で始まり、ループで会話し、切断（次章）で終わる——この3拍子を押さえると、以降の設計がすっと入ります。なお `send_text`/`receive_text` のほか、バイナリ用の `send_bytes`/`receive_bytes`、JSON 用の `send_json`/`receive_json` があります。

---

## 2. 受信ループと切断検知：`WebSocketDisconnect`

最小例の `while True:` には**致命的な欠落**があります——**クライアントが切断したときの処理**です。ブラウザを閉じる・ネットワークが落ちる・タブをリロードする。このとき `receive_text()` は **`WebSocketDisconnect` 例外**を投げます。これを捕まえないと、ループは例外で死に、**接続の後始末（リスト除去・リソース解放）が走りません**。

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"echo: {data}")
    except WebSocketDisconnect:
        # クライアントが切断した。ここに来たら接続はもう無い。
        # ここでクリーンアップ（後述の ConnectionManager からの除去など）を行う。
        ...
```

これが WebSocket 設計の**第一原則**です——**切断は例外であり、必ず捕捉する**。HTTP なら「レスポンスを返したら終わり」ですが、WebSocket は**いつ切れるか分からない持続接続**なので、後始末を `try/except`（さらに後述では `finally`）に集約します。

> **なぜ `finally` まで使うのか**：切断（`WebSocketDisconnect`）以外にも、ハンドラ内の予期せぬ例外で抜けることがあります。**接続をリストへ登録した以上、どんな経路で抜けても除去されねばなりません**。だから第4章では、登録解除を `finally` に置きます。「登録したら、必ず外す」を構造で保証する——これは接続リークを防ぐ要です。

---

## 3. JSON とスキーマ検証：境界を守る

ここが公式チュートリアルの**その先**であり、本番品質の分かれ目です。**WebSocket で受信するメッセージは、HTTP のボディと同じく『信用できない外部入力』**です。`receive_text()` で受けた文字列を `json.loads` してそのまま使う——これは SQL インジェクションと同じ油断です。**境界で必ず検証**します。

FastAPI の HTTP エンドポイントは Pydantic で自動検証してくれますが、**WebSocket のメッセージ本文は自動検証の対象外**（FastAPI が検証するのは接続時のパス/クエリ/ヘッダ等の依存まで）。だから**メッセージごとの検証は自分で Pydantic に通します**。

```python
from typing import Literal
from pydantic import BaseModel, ValidationError, Field

# 受信メッセージの「許される形」を型で定義する（判別可能なユニオン）
class ChatMessage(BaseModel):
    type: Literal["chat"]
    text: str = Field(min_length=1, max_length=2000)   # 長さ上限＝防御

class PingMessage(BaseModel):
    type: Literal["ping"]

# type で分岐できるよう discriminated union にする
IncomingMessage = ChatMessage | PingMessage

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_json()    # まずは dict として受ける
            try:
                # ここが境界検証。未知の type・型違い・過大長はここで弾かれる。
                message = _parse_incoming(raw)
            except ValidationError:
                # 不正な入力で接続ごと殺さず、エラーを返して接続は維持する判断もある
                await websocket.send_json({"type": "error", "detail": "invalid message"})
                continue
            await _handle(websocket, message)
    except WebSocketDisconnect:
        ...
```

```python
from pydantic import TypeAdapter

# TypeAdapter はユニオン型をそのまま検証できる（モデルでなくてもよい）
_incoming_adapter: TypeAdapter[IncomingMessage] = TypeAdapter(IncomingMessage)

def _parse_incoming(raw: object) -> IncomingMessage:
    # 外部入力 → 型の確定した内部表現へ。以降のコードは型を信頼できる。
    return _incoming_adapter.validate_python(raw)
```

このひと手間が、**「未知の `type` を受けてサーバーが落ちる」「巨大文字列でメモリを食い潰される」「想定外のフィールドで分岐がすり抜ける」**を構造的に防ぎます。境界で型を確定させれば、以降のハンドラは**型を信頼**できます（[入力境界での検証の考え方はリクエストバリデーションのガイド](/blog/fastapi-request-validation-query-path-body-parameters-guide)と同じ思想です）。

> **不正入力で接続を切るか、維持するか**：これは設計判断です。**明らかな攻撃**（巨大ペイロード・連続する不正）なら `await websocket.close(code=1008)` で**切断**が妥当。**一過性のクライアントバグ**なら、上記のように**エラーを返して接続は維持**するほうが UX を損ねません。「どちらを選ぶか」を、メッセージの種類ごとに決めておきます。

---

## 4. 複数クライアントと `ConnectionManager`

1対1のエコーから、**1対多のブロードキャスト**（チャット・通知・進捗の一斉配信）へ進みます。鍵は**「今つながっている全接続をどこかで保持する」**こと。公式が示すのが **`ConnectionManager`** パターンです。

```python
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self) -> None:
        # 現在アクティブな接続の集合。プロセスのメモリ上に持つ（←この前提が第6章の論点）。
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()                 # 受理してから
        self.active_connections.append(websocket)  # 台帳に載せる

    def disconnect(self, websocket: WebSocket) -> None:
        self.active_connections.remove(websocket)  # 台帳から外す

    async def send_personal_message(self, message: str, websocket: WebSocket) -> None:
        await websocket.send_text(message)         # 特定の1接続へ

    async def broadcast(self, message: str) -> None:
        for connection in self.active_connections:
            await connection.send_text(message)    # 全接続へ

manager = ConnectionManager()
```

エンドポイントはこの台帳を使うだけになり、**切断時の除去を確実に**行います。

```python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 自分以外も含む全員へ配信（部屋・チャンネル単位にするなら台帳を分ける）
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
```

### 4.1 本番補強：除去を `finally` で保証し、配信エラーを隔離する

公式の最小形には、本番で効く2つの穴があります。**(1) 切断以外の例外でも台帳から外れる保証**と、**(2) 1接続への送信失敗が全体のブロードキャストを巻き込まない**こと。

```python
import asyncio
import logging

logger = logging.getLogger("ws")

class ConnectionManager:
    def __init__(self) -> None:
        self.active_connections: set[WebSocket] = set()   # 重複登録を避けるなら set

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self.active_connections.add(websocket)

    def disconnect(self, websocket: WebSocket) -> None:
        # discard は「居なくても例外を投げない」。二重切断でも安全（冪等）。
        self.active_connections.discard(websocket)

    async def broadcast(self, payload: dict) -> None:
        # 送信中に1つでも死んでいる接続があると例外で全体が止まる。
        # gather(return_exceptions=True) で個別の失敗を隔離する。
        dead: list[WebSocket] = []
        results = await asyncio.gather(
            *(conn.send_json(payload) for conn in self.active_connections),
            return_exceptions=True,
        )
        for conn, result in zip(self.active_connections, results):
            if isinstance(result, Exception):
                dead.append(conn)              # 送れなかった接続は死んでいる
        for conn in dead:
            self.disconnect(conn)              # まとめて掃除（クリーンアップ）
```

```python
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            raw = await websocket.receive_json()
            message = _parse_incoming(raw)        # 第3章の境界検証
            await manager.broadcast({"from": client_id, "text": message.text})
    except WebSocketDisconnect:
        pass                                       # 正常な切断
    finally:
        # どんな経路で抜けても必ず台帳から外す（接続リーク防止）
        manager.disconnect(websocket)
```

> **配信は「ベストエフォート」と割り切る**：ブロードキャストの途中で1接続が死んでいるのは**正常な状態**です（クライアントは予告なく消える）。`return_exceptions=True` で**個別の失敗を全体から隔離**し、死んだ接続を掃除する——これで「1人の切断が全員への配信を巻き込む」事故を防ぎます。確実な到達保証が要るなら、それは WebSocket ではなく**永続化＋再送（メッセージキュー）の領域**です（YAGNI を見極める）。

---

## 5. 認証：WebSocket 特有の制約

ここが WebSocket でいちばん事故りやすい領域です。**「誰でも `/ws` に繋げてしまう」**——認証を `accept()` の前に挟まないと、これが起きます。

### 5.1 ブラウザの制約：`Authorization` ヘッダを自由に付けられない

HTTP API なら `Authorization: Bearer <token>` でトークンを渡すのが定石です。ところが**ブラウザの WebSocket API（`new WebSocket(url)`）は、リクエストヘッダを任意に設定できません**。これが WebSocket 認証の根本的な制約です。そこで実務では次のいずれかでトークンを渡します。

- **クエリパラメータ**：`wss://api.example.com/ws?token=<jwt>`。最も簡単。ただし**URL はアクセスログ・プロキシ・ブラウザ履歴に残り得る**ので、**短命トークン**を使い、ログに URL を残さない設定とセットで使う。
- **サブプロトコル（`Sec-WebSocket-Protocol`）**：`new WebSocket(url, ["bearer", token])` のように**サブプロトコル名としてトークンを渡す**。ヘッダには載るがクエリには出ないため、**URL ログ漏れを避けられる**のが利点。サーバーは `accept(subprotocol=...)` で選んだプロトコルを返す。
- **接続後の最初のメッセージで認証**：`accept()` 後の1通目で `{"type": "auth", "token": ...}` を送らせ、検証する。柔軟だが、**未認証で接続が一瞬でも開く**ため、認証完了までは何もブロードキャストしない規律が要る。

### 5.2 依存（`Depends`）で `accept` 前に検証する

FastAPI の WebSocket は **`Depends`・`Query`・`Cookie`・`Header` を HTTP と同じく使えます**。認証失敗時は `HTTPException` ではなく **`WebSocketException`** を投げ、**WebSocket のクローズコード `1008`（Policy Violation）**を返します。

```python
from typing import Annotated
from fastapi import (
    FastAPI, WebSocket, WebSocketException, WebSocketDisconnect,
    Cookie, Query, Depends, status,
)

app = FastAPI()

async def get_token(
    websocket: WebSocket,
    # ブラウザの制約上、ヘッダは使いにくい → Cookie かクエリで受ける
    session: Annotated[str | None, Cookie()] = None,
    token: Annotated[str | None, Query()] = None,
) -> str:
    candidate = session or token
    if candidate is None:
        # 認証情報が無い → ポリシー違反として接続を拒否（accept 自体が行われない）
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return candidate

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    token: Annotated[str, Depends(get_token)],   # ここで認証が走る
):
    user = _verify_jwt(token)                    # JWT 検証（次項）。失敗なら例外で弾く
    await websocket.accept()                     # 認証を通って初めて受理
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"hello {user.username}: {data}")
    except WebSocketDisconnect:
        ...
```

JWT そのものの検証（`jwt.decode` での `algorithms=[...]` 明示・`exp`/`aud`/`iss` 検証）は HTTP API とまったく同じ勘所です。**トークンの検証ロジックを WebSocket 用に書き直す必要はありません**——HTTP の認証で作った検証関数を再利用してください（[JWT 検証・認可スコープの実装はFastAPI 認証・認可ガイド](/blog/fastapi-authentication-oauth2-jwt-security-scopes-production-guide)に集約しています）。WebSocket 側の責務は「**`accept()` の前に検証を終わらせ、失敗なら `1008` で閉じる**」一点に尽きます。

### 5.3 Origin を検証する（CSWSH 対策）

見落とされがちなのが **Origin 検証**です。WebSocket は HTTP の **同一オリジンポリシーや CORS の保護を受けません**。Cookie ベースの認証をしている場合、悪意あるサイトがユーザーのブラウザから**あなたの WebSocket に勝手に接続**できてしまう——**CSWSH（Cross-Site WebSocket Hijacking）**です。`accept` 前に `Origin` ヘッダを許可リストと突き合わせます。

```python
ALLOWED_ORIGINS = {"https://app.example.com"}   # 環境変数から読む

async def check_origin(websocket: WebSocket) -> None:
    origin = websocket.headers.get("origin")
    # 許可されていないオリジンからの接続は拒否
    if origin not in ALLOWED_ORIGINS:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
```

> **クエリトークンとサブプロトコル、どちらを使うか**：**Bearer トークンを使う API なら、サブプロトコル方式が URL ログ漏れを避けられて無難**です。**Cookie セッションを使うなら、Origin 検証が事実上必須**（CSWSH 対策）。いずれの方式でも、**検証は必ず `accept()` の前**に。`accept()` してから「実は権限が無かった」では、未認証の接続が一瞬でも生きてしまいます。

---

## 6. 水平スケール：in-memory の限界と Redis Pub/Sub

ここが**本番でいちばん効く章**です。第4章の `ConnectionManager` には、見落とすと痛い前提があります——**`active_connections` は『そのプロセスのメモリ』にしか存在しない**。

WebSocket 接続は**特定の1プロセス（1ワーカー／1レプリカ）に張り付きます**。本番ではたいてい、複数ワーカー（`fastapi run --workers`）や複数レプリカ（コンテナ）でスケールします。すると——

- ユーザー A はレプリカ #1 に、ユーザー B はレプリカ #2 に接続している。
- レプリカ #1 で `manager.broadcast(...)` を呼んでも、**#1 に繋がっている接続にしか届かない**。
- **B はメッセージを受け取れない**。

つまり、**in-memory の `ConnectionManager` は単一プロセスでしか正しく動きません**。スケールアウトした瞬間に「一部の人にしか届かない」という再現性の低いバグが出ます。解決策は、**プロセス間でメッセージを配るブローカ**を挟むことです。定番が **Redis Pub/Sub**。

```text
[クライアントA] ── レプリカ#1 ┐                      ┌ レプリカ#1 → [クライアントA]
                              ├─→ Redis (publish) ──┤
[クライアントB] ── レプリカ#2 ┘    channel: "room:42" └ レプリカ#2 → [クライアントB]
```

各レプリカは「自分が持つローカル接続」だけを管理し、**配信はいったん Redis に publish**します。全レプリカが同じチャンネルを **subscribe** しているので、メッセージは**全レプリカに届き**、各レプリカが**自分のローカル接続へ送る**。これで「どのレプリカに繋がっていても全員に届く」が成立します。

```python
import asyncio
import json
from redis.asyncio import Redis

class BroadcastHub:
    """ローカル接続の管理 + Redis Pub/Sub での全レプリカ配信。"""

    def __init__(self, redis: Redis, channel: str = "broadcast") -> None:
        self._redis = redis
        self._channel = channel
        self._local: set[WebSocket] = set()     # このレプリカが持つ接続だけ

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self._local.add(websocket)

    def disconnect(self, websocket: WebSocket) -> None:
        self._local.discard(websocket)

    async def publish(self, payload: dict) -> None:
        # 直接ローカルへ送らず、まず Redis に publish（全レプリカへ波及させる）
        await self._redis.publish(self._channel, json.dumps(payload))

    async def _fan_out_to_local(self, payload: dict) -> None:
        # Redis から受けたメッセージを、このレプリカのローカル接続へ配る
        dead = []
        for conn in self._local:
            try:
                await conn.send_json(payload)
            except Exception:
                dead.append(conn)
        for conn in dead:
            self.disconnect(conn)

    async def run_subscriber(self) -> None:
        # 起動時に1つだけ走らせる常駐タスク：Redis を購読し続ける
        pubsub = self._redis.pubsub()
        await pubsub.subscribe(self._channel)
        async for message in pubsub.listen():
            if message["type"] == "message":
                await self._fan_out_to_local(json.loads(message["data"]))
```

購読タスクは、アプリの **`lifespan`** で起動・停止を管理します（接続プールと同じ思想）。

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    redis = Redis.from_url(settings.redis_url)   # URL は環境変数
    hub = BroadcastHub(redis)
    task = asyncio.create_task(hub.run_subscriber())   # 購読を常駐させる
    app.state.hub = hub
    try:
        yield
    finally:
        task.cancel()                            # グレースフルに停止
        await redis.aclose()
```

> **「スケールしない設計」を本番で踏まないために**：開発機（単一プロセス）では in-memory の `ConnectionManager` で完璧に動きます。**だからこそ罠**です——スケールアウトして初めて壊れる。**「複数レプリカで配信が要るか？」を最初に決め**、要るなら最初からブローカ前提で組むのが安全です。逆に「管理画面の進捗表示で、接続は常に少数・単一プロセスで足りる」なら、ブローカは過剰（YAGNI）。放送事業者向けの案件で**進捗配信に Firestore を選んだ**のも、この「**配信のスケールと状態管理をマネージドに寄せる**」判断でした。WebSocket を自前で持つなら、このブローカ層こそが運用の核になります。Redis Pub/Sub を薄くラップした `broadcaster` のようなライブラリも選択肢です。

---

## 7. 本番ハードニング：Origin・サイズ上限・ハートビート・再接続

「動く」WebSocket から「**落ちない・漏れない・詰まらない**」WebSocket への差分です。

### 7.1 メッセージサイズ上限とレート制限

受信メッセージは外部入力です。**サイズ上限**（Pydantic の `max_length` に加え、サーバー/プロキシ層でのフレームサイズ制限）と、**レート制限**（単位時間あたりのメッセージ数）を入れます。1接続が大量のメッセージを送りつける**スロー・ロリス型の枯渇攻撃**を防ぎます。

```python
import time
from collections import deque

class RateLimiter:
    """1接続あたり: 直近 window 秒で max_messages を超えたら True（＝超過）。"""
    def __init__(self, max_messages: int = 20, window: float = 1.0) -> None:
        self._max = max_messages
        self._window = window
        self._timestamps: deque[float] = deque()

    def is_exceeded(self) -> bool:
        now = time.monotonic()
        while self._timestamps and now - self._timestamps[0] > self._window:
            self._timestamps.popleft()         # 古い記録を捨てる（スライディングウィンドウ）
        self._timestamps.append(now)
        return len(self._timestamps) > self._max
```

```python
limiter = RateLimiter()
# ループ内で:
if limiter.is_exceeded():
    await websocket.close(code=1008, reason="rate limit exceeded")
    break
```

### 7.2 ハートビート（ping/pong）とアイドルタイムアウト

TCP 接続は、**相手が突然消えても（電源断・回線断）すぐには切断扱いになりません**。`receive_*` は何も来ないまま延々と待ち続け、**死んだ接続が台帳に居座り続ける**（ゴースト接続）。これを検知するのが**ハートビート**です。WebSocket プロトコルには ping/pong フレームがあり、`websockets` パッケージは**自動 ping** を備えますが、アプリ層でも**アイドルタイムアウト**を設けると堅牢です。

```python
import asyncio

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            try:
                # 一定時間メッセージが無ければタイムアウト → 生存確認・切断判断へ
                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
            except asyncio.TimeoutError:
                await websocket.send_json({"type": "ping"})   # 生きてるか問い合わせ
                continue
            await _handle(websocket, data)
    except WebSocketDisconnect:
        ...
    finally:
        manager.disconnect(websocket)
```

### 7.3 バックプレッシャ：遅い受信者に引きずられない

クライアントの受信が遅い（モバイル回線等）と、サーバーの送信バッファに**未送信メッセージが溜まり**続け、メモリを圧迫します。対策は**送信にもタイムアウトを設け、詰まった接続は切る**こと。「全員に届けようとして1人の遅延に全体が引きずられる」のを避けます（第4章の `gather` による隔離と同じ思想）。確実な順序保証・到達保証が要件なら、それは WebSocket 単体ではなく**メッセージブローカの仕事**です。

### 7.4 クライアント側：指数バックオフで再接続する（UX / a11y）

SSE と違い、**WebSocket は自動再接続しません**。ネットワークが瞬断したら、**クライアントが自分で繋ぎ直す**必要があります。素朴に即時リトライすると、**サーバー復旧の瞬間に全クライアントが殺到**（サンダリングハード）してしまうため、**指数バックオフ＋ジッタ**で再接続します。

```python
# 概念コード（クライアント側のロジック例）。実装言語は問わない。
def next_delay(attempt: int) -> float:
    base = min(2 ** attempt, 30)           # 1, 2, 4, ... 最大30秒で頭打ち
    return base * (0.5 + random.random())  # ジッタで殺到を散らす
```

UX としては、**接続状態（接続中／再接続中／オフライン）を画面に明示**し、再接続中も操作をブロックしすぎないこと。支援技術の利用者にも状態が伝わるよう、状態変化は **`aria-live`** 等で通知すると親切です。リアルタイム性は「繋がっているときの速さ」だけでなく、**「切れたときにどう振る舞うか」**まで含めて設計します。

### 7.5 `wss://`（TLS）は必須

WebSocket 自体は暗号化しません。**平文 `ws://` では、クエリで渡したトークンもメッセージ本文も盗聴され得ます**。本番は **`wss://`（TLS）必須**。ロードバランサ／リバースプロキシ（ALB・Nginx 等）が WebSocket のアップグレード（`Upgrade: websocket` ヘッダの通過）と**長時間接続のアイドルタイムアウト**を正しく許容する設定になっているかも、合わせて確認してください。

---

## 8. テスト：`client.websocket_connect`

検証パスのない本番投入はありません。FastAPI の `TestClient` は **`websocket_connect` をコンテキストマネージャ**として提供し、WebSocket を**同期的・決定的にテスト**できます。

```python
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_echo():
    # with でハンドシェイク〜切断まで面倒を見てくれる
    with client.websocket_connect("/ws") as websocket:
        websocket.send_text("hello")
        data = websocket.receive_text()
        assert data == "echo: hello"

def test_json_roundtrip():
    with client.websocket_connect("/ws") as websocket:
        websocket.send_json({"type": "chat", "text": "hi"})
        assert websocket.receive_json() == {"from": 1, "text": "hi"}
```

**境界検証（第3章）と認証（第5章）こそ、テストの主戦場**です。「不正な JSON は弾かれるか」「トークン無しは `1008` で閉じられるか」を明示的に検証します。

```python
import pytest
from fastapi import status
from starlette.websockets import WebSocketDisconnect

def test_invalid_message_does_not_crash():
    with client.websocket_connect("/ws") as websocket:
        websocket.send_json({"type": "unknown"})         # 未知の type
        res = websocket.receive_json()
        assert res["type"] == "error"                    # 接続は維持しつつエラーを返す

def test_missing_token_is_rejected():
    # 認証無しでの接続は、ハンドシェイク段階で拒否される
    with pytest.raises(WebSocketDisconnect) as exc:
        with client.websocket_connect("/ws") as websocket:
            websocket.receive_text()
    assert exc.value.code == status.WS_1008_POLICY_VIOLATION
```

> **何をどこでテストするか**：**メッセージの境界検証・認証の合否・切断時のクリーンアップ**は `TestClient` で決定的に検証できます。一方、**水平スケール（Redis Pub/Sub での全レプリカ配信）**は単体テストでは再現しづらく、**複数プロセスを立てた統合テスト**か、Redis をフェイクに差し替えた `BroadcastHub` 単体の検証で担保します。「単一プロセスでは通るが複数プロセスで壊れる」バグは、ここを意図的にテストしないと**本番で初めて顕在化**します。

---

## 9. 可観測性：接続数・切断理由を測る

WebSocket は**持続接続**ゆえに、HTTP のリクエスト数では実態が見えません。最低限、次を計測します。

- **現在の接続数**（ゲージ）：レプリカごと／チャンネルごと。**ゴースト接続のリーク**は、この値が下がらないことで気づけます。
- **切断理由・クローズコード**（カウンタ）：`1000`（正常）・`1008`（ポリシー違反＝認証失敗）・`1011`（サーバーエラー）の内訳。**`1008` の急増は攻撃や設定ミスのサイン**です。
- **メッセージ流量・レート制限ヒット数**：枯渇攻撃や暴走クライアントの早期検知。
- **ブロードキャスト遅延**：publish から各レプリカが配信するまでの時間。

これらを構造化ログ＋メトリクスで出し、ダッシュボードとアラートに載せます（[ロギング・メトリクス・ヘルスチェックの土台はFastAPI 本番運用ガイド](/blog/fastapi-production-async-pydantic-observability-guide)に集約しています）。**「繋がっているはずなのに届かない」を、本番で再現する前にメトリクスで捉える**——これが WebSocket 運用の生命線です。

---

## 10. まとめ：本番 FastAPI WebSocket チートシート

迷ったときの早見表です。

- **選定**：双方向が要るなら WebSocket。サーバー→クライアントの単方向なら **SSE**、低頻度の更新なら**ポーリング**で十分（KISS / YAGNI）。「双方向か？」「マネージドに寄せられるか？」を最初に問う。
- **最小形**：`@app.websocket("/ws")` → `await websocket.accept()` → `while True:` で `receive_*`/`send_*`。`text`/`bytes`/`json` の3系統。
- **切断**：`receive_*` は切断時に **`WebSocketDisconnect`** を投げる。必ず `try/except` で捕捉し、**台帳からの除去は `finally`** で保証（接続リーク防止）。
- **境界検証**：受信は信用できない外部入力。**`receive_json` + Pydantic（`TypeAdapter`・`Literal` 判別ユニオン・`max_length`）**で検証。不正は「切る／エラー返却で維持」を種類ごとに決める。
- **複数クライアント**：`ConnectionManager`（`active_connections`／`connect`／`disconnect`／`broadcast`）。ブロードキャストは `gather(return_exceptions=True)` で**個別失敗を隔離**し死んだ接続を掃除。
- **認証**：ブラウザは `Authorization` ヘッダを付けられない → **クエリ or サブプロトコル(`Sec-WebSocket-Protocol`)で JWT**を渡し、**`accept()` の前に検証**。失敗は **`WebSocketException(WS_1008_POLICY_VIOLATION)`**。**Origin 検証**で CSWSH を防ぐ。JWT 検証ロジックは HTTP と共通。
- **水平スケール**：in-memory の台帳は**単一プロセス限定**。複数ワーカー/レプリカでは **Redis Pub/Sub** 等のブローカで全プロセスへ配信。購読タスクは `lifespan` で管理。
- **ハードニング**：サイズ上限・レート制限・ハートビート(ping/pong)＋アイドルタイムアウト・バックプレッシャ・**クライアントの指数バックオフ再接続**・接続状態の UI 明示。**`wss://`（TLS）必須**、LB の Upgrade 通過設定。
- **テスト**：`client.websocket_connect` で境界検証・認証(`1008`)・切断クリーンアップを決定的に検証。スケールは複数プロセス統合テスト or フェイク Redis で。
- **可観測性**：接続数（ゲージ）・クローズコード内訳・レート制限ヒット・配信遅延を計測。`1008` 急増は攻撃/設定ミスのサイン。

---

FastAPI は「数行でリアルタイム通信を動かせる」フレームワークですが、本番品質は**接続の一生をどう設計するか**で決まります。**`accept()` の前に認証を終わらせ、受信を境界で検証し、切断を `finally` で必ず後始末し、複数レプリカへの配信をブローカに委ね、切れたときの再接続まで含めて設計する**——どれも派手ではありませんが、この積み重ねが「**繋がっていて・届いて・落ちない**」リアルタイム体験を作ります。

私は放送事業者向けの社内AIプラットフォームで、**FastAPI(async) の長時間AIジョブ（テロップ誤字検出）の進捗追跡を本番で運用**しました。そこではリアルタイム配信を **Firestore** に寄せ、スケールと状態管理をマネージドに任せる判断をしています。**「自前 WebSocket で持つべきか、マネージド配信に寄せるべきか」**——この分岐こそが、リアルタイム設計でいちばん効く意思決定です。生成AI（Claude Code）を相棒に、**一人で速く・安く**作りつつ、検証ゲート（境界検証・認証・スケールの統合テスト）で品質を担保するのが私の進め方です。

**「FastAPI でリアルタイム機能を載せたいが、WebSocket か SSE か、認証や水平スケールをどう設計すべきか」——その判断から実装・テスト・運用まで、一気通貫で伴走します。** 要件整理の段階からでも、お気軽にご相談ください。

---

### 参考（公式ドキュメント）

- [WebSockets（FastAPI）](https://fastapi.tiangolo.com/advanced/websockets/) — `@app.websocket`・`accept`・`receive_*`/`send_*`・`WebSocketDisconnect`・依存・`ConnectionManager`・`websocket_connect` でのテスト
- [WebSockets（Starlette）](https://www.starlette.io/websockets/) — `WebSocket` の API（`accept(subprotocol=...)`・`close(code=, reason=)`・`headers`/`query_params`・`iter_json` 等）
- [FastAPI CLI](https://fastapi.tiangolo.com/fastapi-cli/) — `fastapi run --workers` による本番のワーカー多重化
- [Lifespan Events（FastAPI）](https://fastapi.tiangolo.com/advanced/events/) — `lifespan` での購読タスク・接続プールの起動/停止管理
