「処理の進捗を画面にリアルタイムで出したい」「チャットや通知を即時に届けたい」——要件は素朴です。けれど素朴なリクエストの裏で、接続が切れたまま放置される・全クライアントへの配信が片方のプロセスにしか届かない・誰でも他人のチャネルに繋げてしまうといった事故が、本番で静かに起きます。WebSocket は「双方向で速い」反面、HTTP の常識(ステートレス・リクエスト単位)が通用しない領域だからです。
この記事は、FastAPI で本番品質の WebSocket 双方向リアルタイム通信を実装するためのガイドです。FastAPI 公式の WebSockets チュートリアルを最新の公式仕様に忠実に追いながら、公式が「最小例」として割愛する部分——いつ WebSocket を選ぶか、受信メッセージの境界検証、認証の WebSocket 特有の制約、水平スケール、切断クリーンアップ・ハートビート・再接続——まで踏み込みます。題材として、私が国内大手放送事業者向けに構築した社内AIプラットフォーム(FastAPI 製の長時間AIジョブと進捗配信を運用。テロップ誤字検出を async の長時間ジョブとして回し、その進捗を本番で配信した)での設計判断も交えます。なお同案件で実際に進捗配信に使ったのは Firestore です。本記事では WebSocket を「同一アプリ内でリアルタイム配信する代替手段」として、その判断軸ごと論じます(後述)。
この記事のルール:API・コードは FastAPI 公式ドキュメント(2026年6月時点) に基づきます。FastAPI の WebSocket は内部的に Starlette の
WebSocketを使い、websocketsパッケージに依存します(pip install websockets)。本記事はこの最新版に準拠します。仕様は改定されるため、本番投入前に必ず公式で最新の挙動を確認してください。シークレット(SECRET_KEY・Redis URL・JWT 鍵)は環境変数前提(ハードコード厳禁)。そして WebSocket 自体は通信を暗号化しません——本番はwss://(TLS)が必須です。
0. まず判断:いつ WebSocket を選ぶのか(SSE / ポーリングとの違い)
WebSocket は強力ですが、多くの「リアルタイム要件」は WebSocket を必要としません。最初に立てるべき問いは1つです——サーバーからクライアントへ送るだけか、双方向か。
- ポーリング(定期 fetch):クライアントが一定間隔で HTTP を叩く。実装が最も簡単で、既存の HTTP インフラ(CDN・認証・ロードバランサ)がそのまま効く。更新頻度が低く、数秒の遅延を許せるならこれで十分(KISS)。欠点は無駄なリクエストとサーバー負荷。
- SSE(Server-Sent Events):サーバー→クライアントの一方向ストリーム。通常の HTTP レスポンスを開いたまま流し続ける方式で、ブラウザが自動再接続してくれる。進捗バー・通知・ライブフィードのような「サーバーが押し出すだけ」の用途に最適。HTTP/1.1 では1ドメインあたりの同時接続数に制限がある点に注意(HTTP/2 で緩和)。
- WebSocket:双方向の持続接続。クライアントもサーバーも、任意のタイミングでメッセージを送れる。チャット・共同編集・対戦ゲーム・双方向の制御チャネルに必要。代わりに接続状態を自分で管理し、認証・スケール・切断処理を自前で作り込む責任を負う。
| 観点 | ポーリング | SSE | WebSocket |
|---|---|---|---|
| 通信方向 | クライアント発のみ | サーバー→クライアント(単方向) | 双方向 |
| 遅延 | 間隔ぶんの遅延 | 低い | 最も低い |
| 接続コスト | 都度接続(重い) | 1本を維持 | 1本を維持 |
| 自動再接続 | 不要(都度) | ブラウザが標準で行う | 自前で実装 |
| 認証/LB との相性 | HTTP のまま(容易) | HTTP のまま(容易) | アップグレード必要(要設計) |
| 向く用途 | 低頻度の更新 | 進捗・通知・ライブフィード | チャット・共同編集・ゲーム・制御 |
判断の目安は明快です——「クライアントもサーバーへ送る」必要があるなら WebSocket。サーバーが押し出すだけなら、まず SSE を検討してください。放送事業者向けプラットフォームで私が運用したテロップ誤字検出は、長時間ジョブの進捗を押し出す(実質サーバー→クライアントの単方向)用途でした。だからこそ WebSocket は必須ではなく、当時はFirestore のリアルタイムリスナで配信しています(マネージド・スケール・オフライン耐性が手に入る)。「双方向か?」「マネージドに寄せられるか?」を問わずに WebSocket を選ぶのは、運用負債の自己発注です。本記事は「双方向が要る/自前アプリ内で配信したい」ケースに対し、WebSocket を本番品質で作る道を扱います。
1. 最小の WebSocket エンドポイント
まず公式の最小形から。HTTP の @app.get に対応するのが @app.websocket です。
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# ハンドシェイクを受理する。これを呼ぶまで通信は始まらない。
await websocket.accept()
while True:
# クライアントからの1メッセージを待つ(届くまでここで非同期に待機)
data = await websocket.receive_text()
# そのまま送り返す(エコー)
await websocket.send_text(f"Message text was: {data}")
HTTP との違いが最初の3点です。
async defが前提:WebSocket ハンドラは長時間生き続け、awaitで受信を待ちます。receive_*は届くまでブロックせずに待機する非同期処理です(同期関数では書けません)。await websocket.accept()が必須:クライアントのアップグレード要求を受理して初めて接続が確立します。accept()の前に認証チェックを挟むのが本番の鉄則です(第5章)。while True:ループで生き続ける:HTTP の「1リクエスト1レスポンス」と違い、1つの接続の中で何度もメッセージをやり取りします。ループを抜ける=接続終了です。
要点:
@app.websocketのハンドラは「接続の一生」を表します。accept()で始まり、ループで会話し、切断(次章)で終わる——この3拍子を押さえると、以降の設計がすっと入ります。なおsend_text/receive_textのほか、バイナリ用のsend_bytes/receive_bytes、JSON 用のsend_json/receive_jsonがあります。
2. 受信ループと切断検知:WebSocketDisconnect
最小例の while True: には致命的な欠落があります——クライアントが切断したときの処理です。ブラウザを閉じる・ネットワークが落ちる・タブをリロードする。このとき receive_text() は WebSocketDisconnect 例外を投げます。これを捕まえないと、ループは例外で死に、接続の後始末(リスト除去・リソース解放)が走りません。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"echo: {data}")
except WebSocketDisconnect:
# クライアントが切断した。ここに来たら接続はもう無い。
# ここでクリーンアップ(後述の ConnectionManager からの除去など)を行う。
...
これが WebSocket 設計の第一原則です——切断は例外であり、必ず捕捉する。HTTP なら「レスポンスを返したら終わり」ですが、WebSocket はいつ切れるか分からない持続接続なので、後始末を try/except(さらに後述では finally)に集約します。
なぜ
finallyまで使うのか:切断(WebSocketDisconnect)以外にも、ハンドラ内の予期せぬ例外で抜けることがあります。接続をリストへ登録した以上、どんな経路で抜けても除去されねばなりません。だから第4章では、登録解除をfinallyに置きます。「登録したら、必ず外す」を構造で保証する——これは接続リークを防ぐ要です。
3. JSON とスキーマ検証:境界を守る
ここが公式チュートリアルのその先であり、本番品質の分かれ目です。**WebSocket で受信するメッセージは、HTTP のボディと同じく『信用できない外部入力』**です。receive_text() で受けた文字列を json.loads してそのまま使う——これは SQL インジェクションと同じ油断です。境界で必ず検証します。
FastAPI の HTTP エンドポイントは Pydantic で自動検証してくれますが、WebSocket のメッセージ本文は自動検証の対象外(FastAPI が検証するのは接続時のパス/クエリ/ヘッダ等の依存まで)。だからメッセージごとの検証は自分で Pydantic に通します。
from typing import Literal
from pydantic import BaseModel, ValidationError, Field
# 受信メッセージの「許される形」を型で定義する(判別可能なユニオン)
class ChatMessage(BaseModel):
type: Literal["chat"]
text: str = Field(min_length=1, max_length=2000) # 長さ上限=防御
class PingMessage(BaseModel):
type: Literal["ping"]
# type で分岐できるよう discriminated union にする
IncomingMessage = ChatMessage | PingMessage
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
raw = await websocket.receive_json() # まずは dict として受ける
try:
# ここが境界検証。未知の type・型違い・過大長はここで弾かれる。
message = _parse_incoming(raw)
except ValidationError:
# 不正な入力で接続ごと殺さず、エラーを返して接続は維持する判断もある
await websocket.send_json({"type": "error", "detail": "invalid message"})
continue
await _handle(websocket, message)
except WebSocketDisconnect:
...
from pydantic import TypeAdapter
# TypeAdapter はユニオン型をそのまま検証できる(モデルでなくてもよい)
_incoming_adapter: TypeAdapter[IncomingMessage] = TypeAdapter(IncomingMessage)
def _parse_incoming(raw: object) -> IncomingMessage:
# 外部入力 → 型の確定した内部表現へ。以降のコードは型を信頼できる。
return _incoming_adapter.validate_python(raw)
このひと手間が、「未知の type を受けてサーバーが落ちる」「巨大文字列でメモリを食い潰される」「想定外のフィールドで分岐がすり抜ける」を構造的に防ぎます。境界で型を確定させれば、以降のハンドラは型を信頼できます(入力境界での検証の考え方はリクエストバリデーションのガイドと同じ思想です)。
不正入力で接続を切るか、維持するか:これは設計判断です。明らかな攻撃(巨大ペイロード・連続する不正)なら
await websocket.close(code=1008)で切断が妥当。一過性のクライアントバグなら、上記のようにエラーを返して接続は維持するほうが UX を損ねません。「どちらを選ぶか」を、メッセージの種類ごとに決めておきます。
4. 複数クライアントと ConnectionManager
1対1のエコーから、1対多のブロードキャスト(チャット・通知・進捗の一斉配信)へ進みます。鍵は**「今つながっている全接続をどこかで保持する」**こと。公式が示すのが ConnectionManager パターンです。
from fastapi import WebSocket
class ConnectionManager:
def __init__(self) -> None:
# 現在アクティブな接続の集合。プロセスのメモリ上に持つ(←この前提が第6章の論点)。
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept() # 受理してから
self.active_connections.append(websocket) # 台帳に載せる
def disconnect(self, websocket: WebSocket) -> None:
self.active_connections.remove(websocket) # 台帳から外す
async def send_personal_message(self, message: str, websocket: WebSocket) -> None:
await websocket.send_text(message) # 特定の1接続へ
async def broadcast(self, message: str) -> None:
for connection in self.active_connections:
await connection.send_text(message) # 全接続へ
manager = ConnectionManager()
エンドポイントはこの台帳を使うだけになり、切断時の除去を確実に行います。
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# 自分以外も含む全員へ配信(部屋・チャンネル単位にするなら台帳を分ける)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
4.1 本番補強:除去を finally で保証し、配信エラーを隔離する
公式の最小形には、本番で効く2つの穴があります。(1) 切断以外の例外でも台帳から外れる保証と、(2) 1接続への送信失敗が全体のブロードキャストを巻き込まないこと。
import asyncio
import logging
logger = logging.getLogger("ws")
class ConnectionManager:
def __init__(self) -> None:
self.active_connections: set[WebSocket] = set() # 重複登録を避けるなら set
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self.active_connections.add(websocket)
def disconnect(self, websocket: WebSocket) -> None:
# discard は「居なくても例外を投げない」。二重切断でも安全(冪等)。
self.active_connections.discard(websocket)
async def broadcast(self, payload: dict) -> None:
# 送信中に1つでも死んでいる接続があると例外で全体が止まる。
# gather(return_exceptions=True) で個別の失敗を隔離する。
dead: list[WebSocket] = []
results = await asyncio.gather(
*(conn.send_json(payload) for conn in self.active_connections),
return_exceptions=True,
)
for conn, result in zip(self.active_connections, results):
if isinstance(result, Exception):
dead.append(conn) # 送れなかった接続は死んでいる
for conn in dead:
self.disconnect(conn) # まとめて掃除(クリーンアップ)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
raw = await websocket.receive_json()
message = _parse_incoming(raw) # 第3章の境界検証
await manager.broadcast({"from": client_id, "text": message.text})
except WebSocketDisconnect:
pass # 正常な切断
finally:
# どんな経路で抜けても必ず台帳から外す(接続リーク防止)
manager.disconnect(websocket)
配信は「ベストエフォート」と割り切る:ブロードキャストの途中で1接続が死んでいるのは正常な状態です(クライアントは予告なく消える)。
return_exceptions=Trueで個別の失敗を全体から隔離し、死んだ接続を掃除する——これで「1人の切断が全員への配信を巻き込む」事故を防ぎます。確実な到達保証が要るなら、それは WebSocket ではなく永続化+再送(メッセージキュー)の領域です(YAGNI を見極める)。
5. 認証:WebSocket 特有の制約
ここが WebSocket でいちばん事故りやすい領域です。「誰でも /ws に繋げてしまう」——認証を accept() の前に挟まないと、これが起きます。
5.1 ブラウザの制約:Authorization ヘッダを自由に付けられない
HTTP API なら Authorization: Bearer <token> でトークンを渡すのが定石です。ところがブラウザの WebSocket API(new WebSocket(url))は、リクエストヘッダを任意に設定できません。これが WebSocket 認証の根本的な制約です。そこで実務では次のいずれかでトークンを渡します。
- クエリパラメータ:
wss://api.example.com/ws?token=<jwt>。最も簡単。ただしURL はアクセスログ・プロキシ・ブラウザ履歴に残り得るので、短命トークンを使い、ログに URL を残さない設定とセットで使う。 - サブプロトコル(
Sec-WebSocket-Protocol):new WebSocket(url, ["bearer", token])のようにサブプロトコル名としてトークンを渡す。ヘッダには載るがクエリには出ないため、URL ログ漏れを避けられるのが利点。サーバーはaccept(subprotocol=...)で選んだプロトコルを返す。 - 接続後の最初のメッセージで認証:
accept()後の1通目で{"type": "auth", "token": ...}を送らせ、検証する。柔軟だが、未認証で接続が一瞬でも開くため、認証完了までは何もブロードキャストしない規律が要る。
5.2 依存(Depends)で accept 前に検証する
FastAPI の WebSocket は Depends・Query・Cookie・Header を HTTP と同じく使えます。認証失敗時は HTTPException ではなく WebSocketException を投げ、**WebSocket のクローズコード 1008(Policy Violation)**を返します。
from typing import Annotated
from fastapi import (
FastAPI, WebSocket, WebSocketException, WebSocketDisconnect,
Cookie, Query, Depends, status,
)
app = FastAPI()
async def get_token(
websocket: WebSocket,
# ブラウザの制約上、ヘッダは使いにくい → Cookie かクエリで受ける
session: Annotated[str | None, Cookie()] = None,
token: Annotated[str | None, Query()] = None,
) -> str:
candidate = session or token
if candidate is None:
# 認証情報が無い → ポリシー違反として接続を拒否(accept 自体が行われない)
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
return candidate
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: Annotated[str, Depends(get_token)], # ここで認証が走る
):
user = _verify_jwt(token) # JWT 検証(次項)。失敗なら例外で弾く
await websocket.accept() # 認証を通って初めて受理
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"hello {user.username}: {data}")
except WebSocketDisconnect:
...
JWT そのものの検証(jwt.decode での algorithms=[...] 明示・exp/aud/iss 検証)は HTTP API とまったく同じ勘所です。トークンの検証ロジックを WebSocket 用に書き直す必要はありません——HTTP の認証で作った検証関数を再利用してください(JWT 検証・認可スコープの実装はFastAPI 認証・認可ガイドに集約しています)。WebSocket 側の責務は「accept() の前に検証を終わらせ、失敗なら 1008 で閉じる」一点に尽きます。
5.3 Origin を検証する(CSWSH 対策)
見落とされがちなのが Origin 検証です。WebSocket は HTTP の 同一オリジンポリシーや CORS の保護を受けません。Cookie ベースの認証をしている場合、悪意あるサイトがユーザーのブラウザからあなたの WebSocket に勝手に接続できてしまう——**CSWSH(Cross-Site WebSocket Hijacking)**です。accept 前に Origin ヘッダを許可リストと突き合わせます。
ALLOWED_ORIGINS = {"https://app.example.com"} # 環境変数から読む
async def check_origin(websocket: WebSocket) -> None:
origin = websocket.headers.get("origin")
# 許可されていないオリジンからの接続は拒否
if origin not in ALLOWED_ORIGINS:
raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
クエリトークンとサブプロトコル、どちらを使うか:Bearer トークンを使う API なら、サブプロトコル方式が URL ログ漏れを避けられて無難です。Cookie セッションを使うなら、Origin 検証が事実上必須(CSWSH 対策)。いずれの方式でも、検証は必ず
accept()の前に。accept()してから「実は権限が無かった」では、未認証の接続が一瞬でも生きてしまいます。
6. 水平スケール:in-memory の限界と Redis Pub/Sub
ここが本番でいちばん効く章です。第4章の ConnectionManager には、見落とすと痛い前提があります——active_connections は『そのプロセスのメモリ』にしか存在しない。
WebSocket 接続は特定の1プロセス(1ワーカー/1レプリカ)に張り付きます。本番ではたいてい、複数ワーカー(fastapi run --workers)や複数レプリカ(コンテナ)でスケールします。すると——
- ユーザー A はレプリカ #1 に、ユーザー B はレプリカ #2 に接続している。
- レプリカ #1 で
manager.broadcast(...)を呼んでも、#1 に繋がっている接続にしか届かない。 - B はメッセージを受け取れない。
つまり、in-memory の ConnectionManager は単一プロセスでしか正しく動きません。スケールアウトした瞬間に「一部の人にしか届かない」という再現性の低いバグが出ます。解決策は、プロセス間でメッセージを配るブローカを挟むことです。定番が Redis Pub/Sub。
[クライアントA] ── レプリカ#1 ┐ ┌ レプリカ#1 → [クライアントA]
├─→ Redis (publish) ──┤
[クライアントB] ── レプリカ#2 ┘ channel: "room:42" └ レプリカ#2 → [クライアントB]
各レプリカは「自分が持つローカル接続」だけを管理し、配信はいったん Redis に publishします。全レプリカが同じチャンネルを subscribe しているので、メッセージは全レプリカに届き、各レプリカが自分のローカル接続へ送る。これで「どのレプリカに繋がっていても全員に届く」が成立します。
import asyncio
import json
from redis.asyncio import Redis
class BroadcastHub:
"""ローカル接続の管理 + Redis Pub/Sub での全レプリカ配信。"""
def __init__(self, redis: Redis, channel: str = "broadcast") -> None:
self._redis = redis
self._channel = channel
self._local: set[WebSocket] = set() # このレプリカが持つ接続だけ
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
self._local.add(websocket)
def disconnect(self, websocket: WebSocket) -> None:
self._local.discard(websocket)
async def publish(self, payload: dict) -> None:
# 直接ローカルへ送らず、まず Redis に publish(全レプリカへ波及させる)
await self._redis.publish(self._channel, json.dumps(payload))
async def _fan_out_to_local(self, payload: dict) -> None:
# Redis から受けたメッセージを、このレプリカのローカル接続へ配る
dead = []
for conn in self._local:
try:
await conn.send_json(payload)
except Exception:
dead.append(conn)
for conn in dead:
self.disconnect(conn)
async def run_subscriber(self) -> None:
# 起動時に1つだけ走らせる常駐タスク:Redis を購読し続ける
pubsub = self._redis.pubsub()
await pubsub.subscribe(self._channel)
async for message in pubsub.listen():
if message["type"] == "message":
await self._fan_out_to_local(json.loads(message["data"]))
購読タスクは、アプリの lifespan で起動・停止を管理します(接続プールと同じ思想)。
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = Redis.from_url(settings.redis_url) # URL は環境変数
hub = BroadcastHub(redis)
task = asyncio.create_task(hub.run_subscriber()) # 購読を常駐させる
app.state.hub = hub
try:
yield
finally:
task.cancel() # グレースフルに停止
await redis.aclose()
「スケールしない設計」を本番で踏まないために:開発機(単一プロセス)では in-memory の
ConnectionManagerで完璧に動きます。だからこそ罠です——スケールアウトして初めて壊れる。「複数レプリカで配信が要るか?」を最初に決め、要るなら最初からブローカ前提で組むのが安全です。逆に「管理画面の進捗表示で、接続は常に少数・単一プロセスで足りる」なら、ブローカは過剰(YAGNI)。放送事業者向けの案件で進捗配信に Firestore を選んだのも、この「配信のスケールと状態管理をマネージドに寄せる」判断でした。WebSocket を自前で持つなら、このブローカ層こそが運用の核になります。Redis Pub/Sub を薄くラップしたbroadcasterのようなライブラリも選択肢です。
7. 本番ハードニング:Origin・サイズ上限・ハートビート・再接続
「動く」WebSocket から「落ちない・漏れない・詰まらない」WebSocket への差分です。
7.1 メッセージサイズ上限とレート制限
受信メッセージは外部入力です。サイズ上限(Pydantic の max_length に加え、サーバー/プロキシ層でのフレームサイズ制限)と、レート制限(単位時間あたりのメッセージ数)を入れます。1接続が大量のメッセージを送りつけるスロー・ロリス型の枯渇攻撃を防ぎます。
import time
from collections import deque
class RateLimiter:
"""1接続あたり: 直近 window 秒で max_messages を超えたら True(=超過)。"""
def __init__(self, max_messages: int = 20, window: float = 1.0) -> None:
self._max = max_messages
self._window = window
self._timestamps: deque[float] = deque()
def is_exceeded(self) -> bool:
now = time.monotonic()
while self._timestamps and now - self._timestamps[0] > self._window:
self._timestamps.popleft() # 古い記録を捨てる(スライディングウィンドウ)
self._timestamps.append(now)
return len(self._timestamps) > self._max
limiter = RateLimiter()
# ループ内で:
if limiter.is_exceeded():
await websocket.close(code=1008, reason="rate limit exceeded")
break
7.2 ハートビート(ping/pong)とアイドルタイムアウト
TCP 接続は、相手が突然消えても(電源断・回線断)すぐには切断扱いになりません。receive_* は何も来ないまま延々と待ち続け、死んだ接続が台帳に居座り続ける(ゴースト接続)。これを検知するのがハートビートです。WebSocket プロトコルには ping/pong フレームがあり、websockets パッケージは自動 ping を備えますが、アプリ層でもアイドルタイムアウトを設けると堅牢です。
import asyncio
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
try:
# 一定時間メッセージが無ければタイムアウト → 生存確認・切断判断へ
data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
except asyncio.TimeoutError:
await websocket.send_json({"type": "ping"}) # 生きてるか問い合わせ
continue
await _handle(websocket, data)
except WebSocketDisconnect:
...
finally:
manager.disconnect(websocket)
7.3 バックプレッシャ:遅い受信者に引きずられない
クライアントの受信が遅い(モバイル回線等)と、サーバーの送信バッファに未送信メッセージが溜まり続け、メモリを圧迫します。対策は送信にもタイムアウトを設け、詰まった接続は切ること。「全員に届けようとして1人の遅延に全体が引きずられる」のを避けます(第4章の gather による隔離と同じ思想)。確実な順序保証・到達保証が要件なら、それは WebSocket 単体ではなくメッセージブローカの仕事です。
7.4 クライアント側:指数バックオフで再接続する(UX / a11y)
SSE と違い、WebSocket は自動再接続しません。ネットワークが瞬断したら、クライアントが自分で繋ぎ直す必要があります。素朴に即時リトライすると、サーバー復旧の瞬間に全クライアントが殺到(サンダリングハード)してしまうため、指数バックオフ+ジッタで再接続します。
# 概念コード(クライアント側のロジック例)。実装言語は問わない。
def next_delay(attempt: int) -> float:
base = min(2 ** attempt, 30) # 1, 2, 4, ... 最大30秒で頭打ち
return base * (0.5 + random.random()) # ジッタで殺到を散らす
UX としては、接続状態(接続中/再接続中/オフライン)を画面に明示し、再接続中も操作をブロックしすぎないこと。支援技術の利用者にも状態が伝わるよう、状態変化は aria-live 等で通知すると親切です。リアルタイム性は「繋がっているときの速さ」だけでなく、**「切れたときにどう振る舞うか」**まで含めて設計します。
7.5 wss://(TLS)は必須
WebSocket 自体は暗号化しません。平文 ws:// では、クエリで渡したトークンもメッセージ本文も盗聴され得ます。本番は wss://(TLS)必須。ロードバランサ/リバースプロキシ(ALB・Nginx 等)が WebSocket のアップグレード(Upgrade: websocket ヘッダの通過)と長時間接続のアイドルタイムアウトを正しく許容する設定になっているかも、合わせて確認してください。
8. テスト:client.websocket_connect
検証パスのない本番投入はありません。FastAPI の TestClient は websocket_connect をコンテキストマネージャとして提供し、WebSocket を同期的・決定的にテストできます。
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_echo():
# with でハンドシェイク〜切断まで面倒を見てくれる
with client.websocket_connect("/ws") as websocket:
websocket.send_text("hello")
data = websocket.receive_text()
assert data == "echo: hello"
def test_json_roundtrip():
with client.websocket_connect("/ws") as websocket:
websocket.send_json({"type": "chat", "text": "hi"})
assert websocket.receive_json() == {"from": 1, "text": "hi"}
境界検証(第3章)と認証(第5章)こそ、テストの主戦場です。「不正な JSON は弾かれるか」「トークン無しは 1008 で閉じられるか」を明示的に検証します。
import pytest
from fastapi import status
from starlette.websockets import WebSocketDisconnect
def test_invalid_message_does_not_crash():
with client.websocket_connect("/ws") as websocket:
websocket.send_json({"type": "unknown"}) # 未知の type
res = websocket.receive_json()
assert res["type"] == "error" # 接続は維持しつつエラーを返す
def test_missing_token_is_rejected():
# 認証無しでの接続は、ハンドシェイク段階で拒否される
with pytest.raises(WebSocketDisconnect) as exc:
with client.websocket_connect("/ws") as websocket:
websocket.receive_text()
assert exc.value.code == status.WS_1008_POLICY_VIOLATION
何をどこでテストするか:メッセージの境界検証・認証の合否・切断時のクリーンアップは
TestClientで決定的に検証できます。一方、水平スケール(Redis Pub/Sub での全レプリカ配信)は単体テストでは再現しづらく、複数プロセスを立てた統合テストか、Redis をフェイクに差し替えたBroadcastHub単体の検証で担保します。「単一プロセスでは通るが複数プロセスで壊れる」バグは、ここを意図的にテストしないと本番で初めて顕在化します。
9. 可観測性:接続数・切断理由を測る
WebSocket は持続接続ゆえに、HTTP のリクエスト数では実態が見えません。最低限、次を計測します。
- 現在の接続数(ゲージ):レプリカごと/チャンネルごと。ゴースト接続のリークは、この値が下がらないことで気づけます。
- 切断理由・クローズコード(カウンタ):
1000(正常)・1008(ポリシー違反=認証失敗)・1011(サーバーエラー)の内訳。1008の急増は攻撃や設定ミスのサインです。 - メッセージ流量・レート制限ヒット数:枯渇攻撃や暴走クライアントの早期検知。
- ブロードキャスト遅延:publish から各レプリカが配信するまでの時間。
これらを構造化ログ+メトリクスで出し、ダッシュボードとアラートに載せます(ロギング・メトリクス・ヘルスチェックの土台はFastAPI 本番運用ガイドに集約しています)。「繋がっているはずなのに届かない」を、本番で再現する前にメトリクスで捉える——これが WebSocket 運用の生命線です。
10. まとめ:本番 FastAPI WebSocket チートシート
迷ったときの早見表です。
- 選定:双方向が要るなら WebSocket。サーバー→クライアントの単方向なら SSE、低頻度の更新ならポーリングで十分(KISS / YAGNI)。「双方向か?」「マネージドに寄せられるか?」を最初に問う。
- 最小形:
@app.websocket("/ws")→await websocket.accept()→while True:でreceive_*/send_*。text/bytes/jsonの3系統。 - 切断:
receive_*は切断時にWebSocketDisconnectを投げる。必ずtry/exceptで捕捉し、台帳からの除去はfinallyで保証(接続リーク防止)。 - 境界検証:受信は信用できない外部入力。**
receive_json+ Pydantic(TypeAdapter・Literal判別ユニオン・max_length)**で検証。不正は「切る/エラー返却で維持」を種類ごとに決める。 - 複数クライアント:
ConnectionManager(active_connections/connect/disconnect/broadcast)。ブロードキャストはgather(return_exceptions=True)で個別失敗を隔離し死んだ接続を掃除。 - 認証:ブラウザは
Authorizationヘッダを付けられない → クエリ or サブプロトコル(Sec-WebSocket-Protocol)で JWTを渡し、accept()の前に検証。失敗はWebSocketException(WS_1008_POLICY_VIOLATION)。Origin 検証で CSWSH を防ぐ。JWT 検証ロジックは HTTP と共通。 - 水平スケール:in-memory の台帳は単一プロセス限定。複数ワーカー/レプリカでは Redis Pub/Sub 等のブローカで全プロセスへ配信。購読タスクは
lifespanで管理。 - ハードニング:サイズ上限・レート制限・ハートビート(ping/pong)+アイドルタイムアウト・バックプレッシャ・クライアントの指数バックオフ再接続・接続状態の UI 明示。
wss://(TLS)必須、LB の Upgrade 通過設定。 - テスト:
client.websocket_connectで境界検証・認証(1008)・切断クリーンアップを決定的に検証。スケールは複数プロセス統合テスト or フェイク Redis で。 - 可観測性:接続数(ゲージ)・クローズコード内訳・レート制限ヒット・配信遅延を計測。
1008急増は攻撃/設定ミスのサイン。
FastAPI は「数行でリアルタイム通信を動かせる」フレームワークですが、本番品質は接続の一生をどう設計するかで決まります。accept() の前に認証を終わらせ、受信を境界で検証し、切断を finally で必ず後始末し、複数レプリカへの配信をブローカに委ね、切れたときの再接続まで含めて設計する——どれも派手ではありませんが、この積み重ねが「繋がっていて・届いて・落ちない」リアルタイム体験を作ります。
私は放送事業者向けの社内AIプラットフォームで、FastAPI(async) の長時間AIジョブ(テロップ誤字検出)の進捗追跡を本番で運用しました。そこではリアルタイム配信を Firestore に寄せ、スケールと状態管理をマネージドに任せる判断をしています。「自前 WebSocket で持つべきか、マネージド配信に寄せるべきか」——この分岐こそが、リアルタイム設計でいちばん効く意思決定です。生成AI(Claude Code)を相棒に、一人で速く・安く作りつつ、検証ゲート(境界検証・認証・スケールの統合テスト)で品質を担保するのが私の進め方です。
「FastAPI でリアルタイム機能を載せたいが、WebSocket か SSE か、認証や水平スケールをどう設計すべきか」——その判断から実装・テスト・運用まで、一気通貫で伴走します。 要件整理の段階からでも、お気軽にご相談ください。
参考(公式ドキュメント)
- WebSockets(FastAPI) —
@app.websocket・accept・receive_*/send_*・WebSocketDisconnect・依存・ConnectionManager・websocket_connectでのテスト - WebSockets(Starlette) —
WebSocketの API(accept(subprotocol=...)・close(code=, reason=)・headers/query_params・iter_json等) - FastAPI CLI —
fastapi run --workersによる本番のワーカー多重化 - Lifespan Events(FastAPI) —
lifespanでの購読タスク・接続プールの起動/停止管理