# FastAPI WebSocket Production Guide: Building Bidirectional Realtime Comms with Connection Management, Auth, and Horizontal Scaling

> A guide to implementing bidirectional realtime communication over WebSockets in FastAPI at production quality. Faithful to the latest official docs—@app.websocket, accept, receive/send, WebSocketDisconnect—plus connection management and broadcast with ConnectionManager, boundary validation with Pydantic, JWT auth via query/subprotocol, horizontal scaling with Redis Pub/Sub, and heartbeats, reconnection, and observability, all in real code.

- Published: 2026-06-26
- Author: 友田 陽大
- Tags: Python, FastAPI, WebSocket, リアルタイム, 可観測性
- URL: https://tomodahinata.com/en/blog/fastapi-websockets-realtime-production-guide
- Category: Python backend
- Pillar guide: https://tomodahinata.com/en/blog/fastapi-production-async-pydantic-observability-guide

## Key points

- WebSocket is the choice when you need 'bidirectional, low-latency, persistent connections.' If you only need one-way server→client notifications, SSE is enough; for occasional updates, polling. Choose by decision table
- The minimal form is @app.websocket('/ws') → await websocket.accept() → receive/send in while True. Detect disconnects with the WebSocketDisconnect exception and clean up in finally
- Incoming messages are untrusted external input. Validate at the boundary with receive_json + Pydantic, and reject unknown types and oversized payloads. Bundle multiple clients with a ConnectionManager (active_connections/connect/disconnect/broadcast)
- Browser WebSockets cannot freely set the Authorization header. Pass the JWT via a query parameter or subprotocol (Sec-WebSocket-Protocol) and validate before accept. On failure, WebSocketException(WS_1008_POLICY_VIOLATION). Validate Origin too
- An in-memory ConnectionManager is single-process only. With multiple workers/replicas you need a broker like Redis Pub/Sub to deliver to every process. Production work means building in size limits, rate limiting, heartbeats, exponential-backoff reconnection, and connection-count metrics

---

"I want to show processing progress on screen in real time." "I want to deliver chat or notifications instantly." The requirements sound simple. But behind those simple requests, accidents quietly happen in production: **connections left dangling after a disconnect; broadcasts to all clients reaching only one process's clients; anyone able to connect to someone else's channel.** WebSocket is "bidirectional and fast," but in exchange it operates in a realm where **the common sense of HTTP (stateless, per-request) no longer applies.**

This article is a guide to implementing **production-quality bidirectional realtime communication over WebSockets** in FastAPI. While following FastAPI's official WebSockets tutorial **faithfully to the latest official spec**, it digs into the parts the official "minimal example" omits—**when to choose WebSocket, boundary validation of incoming messages, the WebSocket-specific constraints of authentication, horizontal scaling, and disconnect cleanup, heartbeats, and reconnection.** As source material, I'll weave in design decisions from the in-house AI platform I built for a major Japanese broadcaster ([operating long-running AI jobs and progress delivery built on FastAPI](/case-studies/broadcaster-ai-content-platform); running caption typo detection as long-running `async` jobs and delivering their progress in production). Note that **what I actually used for progress delivery in that project was Firestore.** Here I discuss WebSocket as "**an alternative way to deliver realtime within the same app**," including the criteria for that decision (covered later).

> **The rules of this article**: APIs and code are based on the **FastAPI official documentation (as of June 2026)**. FastAPI's WebSocket internally uses Starlette's `WebSocket` and depends on the `websockets` package (`pip install websockets`). This article conforms to that latest version. Specs get revised, so always confirm the latest behavior in the official docs before going to production. **Secrets (SECRET_KEY, Redis URL, JWT keys) are assumed to be in environment variables** (never hardcode). And **WebSocket itself does not encrypt the connection—`wss://` (TLS) is mandatory in production.**

---

## 0. First, the decision: when to choose WebSocket (vs. SSE / polling)

WebSocket is powerful, but **many "realtime requirements" don't need WebSocket at all.** The first question to ask is just one: **is it server→client only, or bidirectional?**

- **Polling (periodic fetch)**: the client hits HTTP at a fixed interval. Easiest to implement, and your existing HTTP infrastructure (CDN, auth, load balancer) works as-is. If updates are infrequent and you can tolerate a few seconds of delay, **this is enough** (KISS). The downside is wasted requests and server load.
- **SSE (Server-Sent Events)**: a **one-way** server→client stream. It keeps a normal HTTP response open and streams over it, and **the browser reconnects automatically**. Ideal for "the server just pushes" use cases like progress bars, notifications, and live feeds. Note that under HTTP/1.1 there's a **limit on simultaneous connections per domain** (relaxed in HTTP/2).
- **WebSocket**: a **bidirectional** persistent connection. Both client and server can send messages at any time. Needed for chat, collaborative editing, multiplayer games, and bidirectional control channels. In exchange, you take on the responsibility of **managing the connection state yourself** and building auth, scaling, and disconnect handling on your own.

| Aspect | Polling | SSE | WebSocket |
| --- | --- | --- | --- |
| Direction | Client-initiated only | Server→client (one-way) | **Bidirectional** |
| Latency | Delay of the interval | Low | **Lowest** |
| Connection cost | Connect each time (heavy) | Hold one | Hold one |
| Auto reconnect | Not needed (each time) | **Browser does it by default** | **Implement yourself** |
| Fit with auth/LB | Stays HTTP (easy) | Stays HTTP (easy) | Needs upgrade (design required) |
| Good for | Low-frequency updates | Progress, notifications, live feeds | Chat, collaborative editing, games, control |

The rule of thumb is clear: **if "the client also sends to the server" is required, use WebSocket. If the server just pushes, consider SSE first.** The caption typo detection I operated on the broadcaster platform was a use case of **pushing the progress of long-running jobs (effectively one-way server→client)**. That's exactly why WebSocket wasn't mandatory, and at the time I **delivered via Firestore's realtime listeners** (gaining managed operation, scale, and offline resilience). **Choosing WebSocket without asking "is it bidirectional?" and "can it be offloaded to a managed service?" is self-inflicted operational debt.** This article covers building WebSocket **at production quality** for cases where "**bidirectional is required, or I want to deliver within my own app.**"

---

## 1. The minimal WebSocket endpoint

Start with the official minimal form. The WebSocket counterpart to HTTP's `@app.get` is **`@app.websocket`**.

```python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # ハンドシェイクを受理する。これを呼ぶまで通信は始まらない。
    await websocket.accept()
    while True:
        # クライアントからの1メッセージを待つ（届くまでここで非同期に待機）
        data = await websocket.receive_text()
        # そのまま送り返す（エコー）
        await websocket.send_text(f"Message text was: {data}")
```

The first three differences from HTTP:

- **`async def` is required**: a WebSocket handler stays alive for a long time and waits for messages with `await`. `receive_*` is an async operation that **waits without blocking until a message arrives** (you can't write it as a sync function).
- **`await websocket.accept()` is mandatory**: the connection is established only after you **accept** the client's upgrade request. The production rule is to **insert an authentication check before `accept()`** (Section 5).
- **It stays alive in the `while True:` loop**: unlike HTTP's "one request, one response," you exchange messages many times within a single connection. Exiting the loop = ending the connection.

> **Key point**: a `@app.websocket` handler represents "the lifetime of a connection." It begins with `accept()`, converses in the loop, and ends with disconnect (next chapter)—once you grasp this three-beat rhythm, everything that follows falls into place. In addition to `send_text`/`receive_text`, there are `send_bytes`/`receive_bytes` for binary and `send_json`/`receive_json` for JSON.

---

## 2. The receive loop and disconnect detection: `WebSocketDisconnect`

The minimal example's `while True:` has a **fatal omission**—**handling when the client disconnects.** Closing the browser, the network dropping, reloading the tab. At that point `receive_text()` raises a **`WebSocketDisconnect` exception**. If you don't catch it, the loop dies on the exception and **the connection cleanup (list removal, resource release) never runs.**

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"echo: {data}")
    except WebSocketDisconnect:
        # クライアントが切断した。ここに来たら接続はもう無い。
        # ここでクリーンアップ（後述の ConnectionManager からの除去など）を行う。
        ...
```

This is the **first principle** of WebSocket design: **a disconnect is an exception, and you must always catch it.** With HTTP, "return a response and you're done"; but WebSocket is **a persistent connection that may break at any moment**, so you concentrate the cleanup in `try/except` (and, as shown later, `finally`).

> **Why even use `finally`**: besides a disconnect (`WebSocketDisconnect`), you can also exit via an unexpected exception inside the handler. **Once you've registered a connection in a list, it must be removed no matter which path you exit through.** That's why in Section 4 we place the deregistration in `finally`. Guaranteeing "if you register, you always unregister" structurally is the key to preventing connection leaks.

---

## 3. JSON and schema validation: guard the boundary

This is **beyond** the official tutorial, and the dividing line of production quality. **A message received over WebSocket is, like an HTTP body, "untrusted external input."** `json.loads`-ing the string you got from `receive_text()` and using it as-is is the same carelessness as SQL injection. **Always validate at the boundary.**

FastAPI's HTTP endpoints validate automatically with Pydantic, but **the body of a WebSocket message is outside the scope of automatic validation** (FastAPI validates only the path/query/header dependencies at connection time). So **you run per-message validation through Pydantic yourself.**

```python
from typing import Literal
from pydantic import BaseModel, ValidationError, Field

# 受信メッセージの「許される形」を型で定義する（判別可能なユニオン）
class ChatMessage(BaseModel):
    type: Literal["chat"]
    text: str = Field(min_length=1, max_length=2000)   # 長さ上限＝防御

class PingMessage(BaseModel):
    type: Literal["ping"]

# type で分岐できるよう discriminated union にする
IncomingMessage = ChatMessage | PingMessage

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_json()    # まずは dict として受ける
            try:
                # ここが境界検証。未知の type・型違い・過大長はここで弾かれる。
                message = _parse_incoming(raw)
            except ValidationError:
                # 不正な入力で接続ごと殺さず、エラーを返して接続は維持する判断もある
                await websocket.send_json({"type": "error", "detail": "invalid message"})
                continue
            await _handle(websocket, message)
    except WebSocketDisconnect:
        ...
```

```python
from pydantic import TypeAdapter

# TypeAdapter はユニオン型をそのまま検証できる（モデルでなくてもよい）
_incoming_adapter: TypeAdapter[IncomingMessage] = TypeAdapter(IncomingMessage)

def _parse_incoming(raw: object) -> IncomingMessage:
    # 外部入力 → 型の確定した内部表現へ。以降のコードは型を信頼できる。
    return _incoming_adapter.validate_python(raw)
```

This little bit of effort structurally prevents **"the server crashes on an unknown `type`," "memory is exhausted by a huge string," and "a branch is bypassed by an unexpected field."** Once you fix the type at the boundary, the downstream handlers can **trust the type** ([the thinking behind validation at the input boundary is the same as in the request-validation guide](/blog/fastapi-request-validation-query-path-body-parameters-guide)).

> **Disconnect on invalid input, or keep the connection?**: this is a design decision. For **a clear attack** (a huge payload, a stream of malformed input), `await websocket.close(code=1008)` to **disconnect** is reasonable. For **a transient client bug**, returning an error and **keeping the connection** (as above) is less disruptive to UX. Decide "which to choose" per message type in advance.

---

## 4. Multiple clients and `ConnectionManager`

From a 1-to-1 echo, we advance to a **1-to-many broadcast** (a fan-out of chat, notifications, progress). The key is to **hold all currently connected connections somewhere.** What the official docs show is the **`ConnectionManager`** pattern.

```python
from fastapi import WebSocket

class ConnectionManager:
    def __init__(self) -> None:
        # 現在アクティブな接続の集合。プロセスのメモリ上に持つ（←この前提が第6章の論点）。
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()                 # 受理してから
        self.active_connections.append(websocket)  # 台帳に載せる

    def disconnect(self, websocket: WebSocket) -> None:
        self.active_connections.remove(websocket)  # 台帳から外す

    async def send_personal_message(self, message: str, websocket: WebSocket) -> None:
        await websocket.send_text(message)         # 特定の1接続へ

    async def broadcast(self, message: str) -> None:
        for connection in self.active_connections:
            await connection.send_text(message)    # 全接続へ

manager = ConnectionManager()
```

The endpoint just uses this ledger and **reliably removes** the connection on disconnect.

```python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 自分以外も含む全員へ配信（部屋・チャンネル単位にするなら台帳を分ける）
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
```

### 4.1 Production hardening: guarantee removal in `finally` and isolate delivery errors

The official minimal form has two holes that bite in production: **(1) guaranteeing removal from the ledger even on exceptions other than a disconnect**, and **(2) ensuring a send failure to one connection doesn't drag down the whole broadcast.**

```python
import asyncio
import logging

logger = logging.getLogger("ws")

class ConnectionManager:
    def __init__(self) -> None:
        self.active_connections: set[WebSocket] = set()   # 重複登録を避けるなら set

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self.active_connections.add(websocket)

    def disconnect(self, websocket: WebSocket) -> None:
        # discard は「居なくても例外を投げない」。二重切断でも安全（冪等）。
        self.active_connections.discard(websocket)

    async def broadcast(self, payload: dict) -> None:
        # 送信中に1つでも死んでいる接続があると例外で全体が止まる。
        # gather(return_exceptions=True) で個別の失敗を隔離する。
        dead: list[WebSocket] = []
        results = await asyncio.gather(
            *(conn.send_json(payload) for conn in self.active_connections),
            return_exceptions=True,
        )
        for conn, result in zip(self.active_connections, results):
            if isinstance(result, Exception):
                dead.append(conn)              # 送れなかった接続は死んでいる
        for conn in dead:
            self.disconnect(conn)              # まとめて掃除（クリーンアップ）
```

```python
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            raw = await websocket.receive_json()
            message = _parse_incoming(raw)        # 第3章の境界検証
            await manager.broadcast({"from": client_id, "text": message.text})
    except WebSocketDisconnect:
        pass                                       # 正常な切断
    finally:
        # どんな経路で抜けても必ず台帳から外す（接続リーク防止）
        manager.disconnect(websocket)
```

> **Treat delivery as "best-effort"**: a dead connection partway through a broadcast is **a normal state** (clients vanish without warning). Use `return_exceptions=True` to **isolate individual failures from the whole** and sweep dead connections—this prevents the accident where "one client's disconnect drags down delivery to everyone." If you need a firm delivery guarantee, that's the realm of **persistence + retransmission (a message queue)**, not WebSocket (judge YAGNI).

---

## 5. Authentication: the WebSocket-specific constraints

This is the area most prone to accidents in WebSocket. **"Anyone can connect to `/ws`"**—this happens if you don't insert authentication before `accept()`.

### 5.1 The browser constraint: you can't freely set the `Authorization` header

With an HTTP API, the standard is to pass the token in `Authorization: Bearer <token>`. But **the browser's WebSocket API (`new WebSocket(url)`) can't set request headers arbitrarily.** This is the fundamental constraint of WebSocket auth. So in practice you pass the token in one of the following ways.

- **Query parameter**: `wss://api.example.com/ws?token=<jwt>`. Easiest. But **URLs can be left in access logs, proxies, and browser history**, so use a **short-lived token** and pair it with a setting that doesn't keep the URL in logs.
- **Subprotocol (`Sec-WebSocket-Protocol`)**: like `new WebSocket(url, ["bearer", token])`, **pass the token as a subprotocol name.** It appears in the header but not the query, so the advantage is **avoiding URL log leaks.** The server returns the chosen protocol with `accept(subprotocol=...)`.
- **Authenticate in the first message after connecting**: have the first message after `accept()` send `{"type": "auth", "token": ...}` and validate it. Flexible, but since **the connection is briefly open while unauthenticated**, you need the discipline of broadcasting nothing until auth completes.

### 5.2 Validate before `accept` with dependencies (`Depends`)

FastAPI's WebSocket can **use `Depends`, `Query`, `Cookie`, and `Header` just like HTTP.** On auth failure, raise a **`WebSocketException`** rather than an `HTTPException`, returning the **WebSocket close code `1008` (Policy Violation).**

```python
from typing import Annotated
from fastapi import (
    FastAPI, WebSocket, WebSocketException, WebSocketDisconnect,
    Cookie, Query, Depends, status,
)

app = FastAPI()

async def get_token(
    websocket: WebSocket,
    # ブラウザの制約上、ヘッダは使いにくい → Cookie かクエリで受ける
    session: Annotated[str | None, Cookie()] = None,
    token: Annotated[str | None, Query()] = None,
) -> str:
    candidate = session or token
    if candidate is None:
        # 認証情報が無い → ポリシー違反として接続を拒否（accept 自体が行われない）
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
    return candidate

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    token: Annotated[str, Depends(get_token)],   # ここで認証が走る
):
    user = _verify_jwt(token)                    # JWT 検証（次項）。失敗なら例外で弾く
    await websocket.accept()                     # 認証を通って初めて受理
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"hello {user.username}: {data}")
    except WebSocketDisconnect:
        ...
```

Validating the JWT itself (explicit `algorithms=[...]` in `jwt.decode`, validating `exp`/`aud`/`iss`) is exactly the same know-how as an HTTP API. **You don't need to rewrite the token validation logic for WebSocket**—reuse the validation function you built for HTTP auth ([I've consolidated JWT validation and authorization scopes in the FastAPI auth/authorization guide](/blog/fastapi-authentication-oauth2-jwt-security-scopes-production-guide)). The WebSocket side's sole responsibility is one thing: "**finish validation before `accept()`, and on failure close with `1008`.**"

### 5.3 Validate the Origin (CSWSH countermeasure)

Often overlooked is **Origin validation.** WebSocket **does not receive the protection of HTTP's same-origin policy or CORS.** If you do Cookie-based auth, a malicious site can **connect to your WebSocket from the user's browser without permission**—**CSWSH (Cross-Site WebSocket Hijacking).** Match the `Origin` header against an allowlist before `accept`.

```python
ALLOWED_ORIGINS = {"https://app.example.com"}   # 環境変数から読む

async def check_origin(websocket: WebSocket) -> None:
    origin = websocket.headers.get("origin")
    # 許可されていないオリジンからの接続は拒否
    if origin not in ALLOWED_ORIGINS:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)
```

> **Query token or subprotocol—which to use?**: for an API that uses **Bearer tokens, the subprotocol method is the safe bet** since it avoids URL log leaks. If you **use Cookie sessions, Origin validation is effectively mandatory** (CSWSH countermeasure). Either way, **always validate before `accept()`.** If you find out "actually they had no permission" after `accept()`, an unauthenticated connection has lived, however briefly.

---

## 6. Horizontal scaling: the limits of in-memory and Redis Pub/Sub

This is **the chapter that bites hardest in production.** The `ConnectionManager` in Section 4 has an assumption that hurts if you overlook it: **`active_connections` exists only in "that process's memory."**

A WebSocket connection **sticks to one specific process (one worker / one replica).** In production you usually scale with multiple workers (`fastapi run --workers`) or multiple replicas (containers). Then—

- User A is connected to replica #1, user B to replica #2.
- Calling `manager.broadcast(...)` on replica #1 **reaches only the connections on #1.**
- **B never receives the message.**

In other words, **an in-memory `ConnectionManager` only works correctly on a single process.** The moment you scale out, you get a low-reproducibility bug where "the message reaches only some people." The solution is to insert **a broker that distributes messages across processes.** The standard is **Redis Pub/Sub.**

```text
[Client A] ── Replica#1 ┐                        ┌ Replica#1 → [Client A]
                        ├─→ Redis (publish) ─────┤
[Client B] ── Replica#2 ┘    channel: "room:42"  └ Replica#2 → [Client B]
```

Each replica manages only "its own local connections," and **delivery first publishes to Redis.** Since all replicas **subscribe** to the same channel, the message **reaches every replica**, and each replica **sends to its own local connections.** This makes "everyone gets it no matter which replica you're connected to" hold.

```python
import asyncio
import json
from redis.asyncio import Redis

class BroadcastHub:
    """ローカル接続の管理 + Redis Pub/Sub での全レプリカ配信。"""

    def __init__(self, redis: Redis, channel: str = "broadcast") -> None:
        self._redis = redis
        self._channel = channel
        self._local: set[WebSocket] = set()     # このレプリカが持つ接続だけ

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        self._local.add(websocket)

    def disconnect(self, websocket: WebSocket) -> None:
        self._local.discard(websocket)

    async def publish(self, payload: dict) -> None:
        # 直接ローカルへ送らず、まず Redis に publish（全レプリカへ波及させる）
        await self._redis.publish(self._channel, json.dumps(payload))

    async def _fan_out_to_local(self, payload: dict) -> None:
        # Redis から受けたメッセージを、このレプリカのローカル接続へ配る
        dead = []
        for conn in self._local:
            try:
                await conn.send_json(payload)
            except Exception:
                dead.append(conn)
        for conn in dead:
            self.disconnect(conn)

    async def run_subscriber(self) -> None:
        # 起動時に1つだけ走らせる常駐タスク：Redis を購読し続ける
        pubsub = self._redis.pubsub()
        await pubsub.subscribe(self._channel)
        async for message in pubsub.listen():
            if message["type"] == "message":
                await self._fan_out_to_local(json.loads(message["data"]))
```

The subscriber task manages its start/stop via the app's **`lifespan`** (the same thinking as a connection pool).

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    redis = Redis.from_url(settings.redis_url)   # URL は環境変数
    hub = BroadcastHub(redis)
    task = asyncio.create_task(hub.run_subscriber())   # 購読を常駐させる
    app.state.hub = hub
    try:
        yield
    finally:
        task.cancel()                            # グレースフルに停止
        await redis.aclose()
```

> **So you don't step on a "doesn't scale" design in production**: on a dev machine (single process), the in-memory `ConnectionManager` works perfectly. **That's exactly the trap**—it breaks only after you scale out. **Decide first "do I need delivery across multiple replicas?"** and if so, build with a broker from the start—that's safe. Conversely, if "the admin screen's progress display, where connections are always few and a single process suffices," then a broker is overkill (YAGNI). My **choosing Firestore for progress delivery** in the broadcaster project was also this decision to "**offload delivery's scaling and state management to a managed service.**" If you hold WebSocket yourself, this broker layer becomes the core of operations. A library like `broadcaster` that thinly wraps Redis Pub/Sub is also an option.

---

## 7. Production hardening: Origin, size limits, heartbeats, reconnection

This is the gap from a WebSocket that "works" to one that **doesn't fall over, doesn't leak, doesn't clog.**

### 7.1 Message size limits and rate limiting

Incoming messages are external input. Add **size limits** (in addition to Pydantic's `max_length`, frame-size limits at the server/proxy layer) and **rate limiting** (number of messages per unit time). This prevents a **slow-loris-style exhaustion attack** where one connection floods you with a huge number of messages.

```python
import time
from collections import deque

class RateLimiter:
    """1接続あたり: 直近 window 秒で max_messages を超えたら True（＝超過）。"""
    def __init__(self, max_messages: int = 20, window: float = 1.0) -> None:
        self._max = max_messages
        self._window = window
        self._timestamps: deque[float] = deque()

    def is_exceeded(self) -> bool:
        now = time.monotonic()
        while self._timestamps and now - self._timestamps[0] > self._window:
            self._timestamps.popleft()         # 古い記録を捨てる（スライディングウィンドウ）
        self._timestamps.append(now)
        return len(self._timestamps) > self._max
```

```python
limiter = RateLimiter()
# ループ内で:
if limiter.is_exceeded():
    await websocket.close(code=1008, reason="rate limit exceeded")
    break
```

### 7.2 Heartbeat (ping/pong) and idle timeout

A TCP connection **isn't treated as disconnected right away even if the peer suddenly vanishes (power loss, line drop).** `receive_*` waits forever with nothing arriving, and **the dead connection lingers in the ledger** (a ghost connection). What detects this is a **heartbeat.** The WebSocket protocol has ping/pong frames, and the `websockets` package provides **automatic ping**, but adding an **idle timeout** at the app layer makes it robust.

```python
import asyncio

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            try:
                # 一定時間メッセージが無ければタイムアウト → 生存確認・切断判断へ
                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
            except asyncio.TimeoutError:
                await websocket.send_json({"type": "ping"})   # 生きてるか問い合わせ
                continue
            await _handle(websocket, data)
    except WebSocketDisconnect:
        ...
    finally:
        manager.disconnect(websocket)
```

### 7.3 Backpressure: don't get dragged down by a slow receiver

If a client's receiving is slow (mobile line, etc.), **unsent messages pile up** in the server's send buffer and pressure memory. The countermeasure is to **set a timeout on sends too and cut off clogged connections.** This avoids "the whole system being dragged down by one client's latency while trying to deliver to everyone" (the same thinking as the `gather` isolation in Section 4). If your requirements need firm ordering and delivery guarantees, that's **the job of a message broker**, not WebSocket alone.

### 7.4 Client side: reconnect with exponential backoff (UX / a11y)

Unlike SSE, **WebSocket does not reconnect automatically.** If the network blips, **the client must reconnect itself.** A naive immediate retry causes **all clients to stampede the moment the server recovers** (thundering herd), so reconnect with **exponential backoff + jitter.**

```python
# 概念コード（クライアント側のロジック例）。実装言語は問わない。
def next_delay(attempt: int) -> float:
    base = min(2 ** attempt, 30)           # 1, 2, 4, ... 最大30秒で頭打ち
    return base * (0.5 + random.random())  # ジッタで殺到を散らす
```

For UX, **clearly show the connection state (connected / reconnecting / offline) on screen** and don't block operations too much during reconnection. So the state reaches assistive-technology users too, it's kind to announce state changes via **`aria-live`** and the like. Realtime quality includes not just "speed while connected" but **"how it behaves when disconnected."**

### 7.5 `wss://` (TLS) is mandatory

WebSocket itself doesn't encrypt. **Over plaintext `ws://`, both the token passed in the query and the message bodies can be eavesdropped.** In production, **`wss://` (TLS) is mandatory.** Also confirm that your load balancer / reverse proxy (ALB, Nginx, etc.) is correctly configured to allow the WebSocket upgrade (passing the `Upgrade: websocket` header) and **the idle timeout for long-lived connections.**

---

## 8. Testing: `client.websocket_connect`

There's no production launch without a verification path. FastAPI's `TestClient` provides **`websocket_connect` as a context manager**, letting you test WebSocket **synchronously and deterministically.**

```python
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_echo():
    # with でハンドシェイク〜切断まで面倒を見てくれる
    with client.websocket_connect("/ws") as websocket:
        websocket.send_text("hello")
        data = websocket.receive_text()
        assert data == "echo: hello"

def test_json_roundtrip():
    with client.websocket_connect("/ws") as websocket:
        websocket.send_json({"type": "chat", "text": "hi"})
        assert websocket.receive_json() == {"from": 1, "text": "hi"}
```

**Boundary validation (Section 3) and authentication (Section 5) are the main battlefield of testing.** Explicitly verify "is invalid JSON rejected?" and "is a no-token connection closed with `1008`?"

```python
import pytest
from fastapi import status
from starlette.websockets import WebSocketDisconnect

def test_invalid_message_does_not_crash():
    with client.websocket_connect("/ws") as websocket:
        websocket.send_json({"type": "unknown"})         # 未知の type
        res = websocket.receive_json()
        assert res["type"] == "error"                    # 接続は維持しつつエラーを返す

def test_missing_token_is_rejected():
    # 認証無しでの接続は、ハンドシェイク段階で拒否される
    with pytest.raises(WebSocketDisconnect) as exc:
        with client.websocket_connect("/ws") as websocket:
            websocket.receive_text()
    assert exc.value.code == status.WS_1008_POLICY_VIOLATION
```

> **What to test where**: **message boundary validation, auth pass/fail, and cleanup on disconnect** can be verified deterministically with `TestClient`. On the other hand, **horizontal scaling (all-replica delivery via Redis Pub/Sub)** is hard to reproduce in unit tests; cover it with **an integration test that stands up multiple processes**, or with `BroadcastHub` unit tests that swap Redis for a fake. The bug "passes on a single process but breaks on multiple processes" **first surfaces in production** unless you test it deliberately here.

---

## 9. Observability: measure connection count and disconnect reasons

Because WebSocket is a **persistent connection**, HTTP request counts don't reveal reality. At minimum, measure the following.

- **Current connection count** (gauge): per replica / per channel. You notice **a ghost-connection leak** by this value not coming down.
- **Disconnect reasons / close codes** (counter): the breakdown of `1000` (normal), `1008` (policy violation = auth failure), `1011` (server error). **A spike in `1008` is a sign of an attack or a misconfiguration.**
- **Message throughput / rate-limit hit count**: early detection of exhaustion attacks and runaway clients.
- **Broadcast latency**: the time from publish to each replica delivering.

Emit these as structured logs + metrics, and put them on a dashboard and alerts ([the foundation of logging, metrics, and health checks is consolidated in the FastAPI production-operations guide](/blog/fastapi-production-async-pydantic-observability-guide)). **Catching "supposedly connected but not receiving" with metrics before it reproduces in production**—this is the lifeline of WebSocket operations.

---

## 10. Summary: a production FastAPI WebSocket cheat sheet

A quick reference for when you're unsure.

- **Selection**: if bidirectional is required, WebSocket. If server→client one-way, **SSE**; for low-frequency updates, **polling** is enough (KISS / YAGNI). Ask first: "is it bidirectional?" and "can it be offloaded to a managed service?"
- **Minimal form**: `@app.websocket("/ws")` → `await websocket.accept()` → `receive_*`/`send_*` in `while True:`. Three families: `text`/`bytes`/`json`.
- **Disconnect**: `receive_*` raises **`WebSocketDisconnect`** on disconnect. Always catch it with `try/except`, and **guarantee removal from the ledger in `finally`** (connection-leak prevention).
- **Boundary validation**: incoming is untrusted external input. Validate with **`receive_json` + Pydantic (`TypeAdapter`, `Literal` discriminated union, `max_length`).** For invalid input, decide "cut / keep with an error return" per type.
- **Multiple clients**: `ConnectionManager` (`active_connections` / `connect` / `disconnect` / `broadcast`). Broadcast with `gather(return_exceptions=True)` to **isolate individual failures** and sweep dead connections.
- **Authentication**: the browser can't set the `Authorization` header → pass the **JWT via query or subprotocol (`Sec-WebSocket-Protocol`)** and **validate before `accept()`.** On failure, **`WebSocketException(WS_1008_POLICY_VIOLATION)`.** Prevent CSWSH with **Origin validation.** The JWT validation logic is shared with HTTP.
- **Horizontal scaling**: an in-memory ledger is **single-process only.** With multiple workers/replicas, deliver to all processes with a broker like **Redis Pub/Sub.** Manage the subscriber task in `lifespan`.
- **Hardening**: size limits, rate limiting, heartbeat (ping/pong) + idle timeout, backpressure, **client exponential-backoff reconnection**, and showing connection state in the UI. **`wss://` (TLS) mandatory**, and the LB's Upgrade pass-through setting.
- **Testing**: with `client.websocket_connect`, deterministically verify boundary validation, auth (`1008`), and disconnect cleanup. For scaling, a multi-process integration test or fake Redis.
- **Observability**: measure connection count (gauge), close-code breakdown, rate-limit hits, and delivery latency. A spike in `1008` is a sign of an attack/misconfiguration.

---

FastAPI is a framework where "you can get realtime communication running in a few lines," but production quality is decided by **how you design the lifetime of a connection.** **Finish auth before `accept()`, validate input at the boundary, always clean up disconnects in `finally`, delegate delivery to multiple replicas to a broker, and design through to reconnection when the connection breaks**—none of it is flashy, but this accumulation creates a realtime experience that is "**connected, delivered, and doesn't fall over.**"

On the in-house AI platform for a major broadcaster, I **operated progress tracking for long-running AI jobs (caption typo detection) on FastAPI (async) in production.** There I leaned realtime delivery onto **Firestore**, choosing to entrust scaling and state management to a managed service. **"Should you hold it as your own WebSocket, or lean on managed delivery?"**—this fork is the decision that matters most in realtime design. With generative AI (Claude Code) as my partner, my approach is to **build fast and cheaply, solo**, while guaranteeing quality with verification gates (boundary validation, auth, scaling integration tests).

**"I want to add a realtime feature in FastAPI, but should I choose WebSocket or SSE, and how should I design auth and horizontal scaling?"—I'll accompany you end-to-end, from that decision through implementation, testing, and operations.** Feel free to reach out, even from the requirements-organizing stage.

---

### References (official documentation)

- [WebSockets (FastAPI)](https://fastapi.tiangolo.com/advanced/websockets/) — `@app.websocket`, `accept`, `receive_*`/`send_*`, `WebSocketDisconnect`, dependencies, `ConnectionManager`, testing with `websocket_connect`
- [WebSockets (Starlette)](https://www.starlette.io/websockets/) — the `WebSocket` API (`accept(subprotocol=...)`, `close(code=, reason=)`, `headers`/`query_params`, `iter_json`, etc.)
- [FastAPI CLI](https://fastapi.tiangolo.com/fastapi-cli/) — production worker multiplexing via `fastapi run --workers`
- [Lifespan Events (FastAPI)](https://fastapi.tiangolo.com/advanced/events/) — managing subscriber-task and connection-pool startup/shutdown with `lifespan`
