# SQS + Lambda + EventBridge で冪等な非同期処理を作る：at-least-once 前提の重複・順序・DLQ 設計

> AWSのサーバーレス・イベント駆動非同期処理(SQS+Lambda+EventBridge)を本番品質で設計する実装ガイド。at-least-once配信ゆえの冪等consumer、可視性タイムアウト、DLQと再処理、FIFOの順序/重複排除、部分バッチ失敗(ReportBatchItemFailures)を実コードで解説します。

- 公開日: 2026-06-24
- 著者: 友田 陽大
- タグ: AWS, SQS, サーバーレス, 冪等性, アーキテクチャ設計
- URL: https://tomodahinata.com/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide

## 要点

- SQSはat-least-once配信で重複は仕様、冪等なconsumerを作ることがすべての設計の出発点
- ReportBatchItemFailuresで失敗idだけをbatchItemFailuresに返し、例外を上に投げてバッチ全体失敗にしない
- 冪等化は冪等性キー＋attribute_not_existsの条件付き挿入＋TTLか、Powertoolsの@idempotentで実装する
- 順序か重複が致命的ならFIFOを選ぶが、可視性タイムアウト切れの再配信は起きるためFIFOでも冪等化は必要
- 毒メッセージはmaxReceiveCountでDLQに隔離し、原因を直してからredrive、DLQ件数は1件で即通知する

---

「重い処理をキューに逃がして非同期でやりたい」——要件としては一行です。けれど本番に載せようとした瞬間、判断すべきことが一気に増えます。**同じメッセージが2回来たらどうするのか。処理が長引いて可視性タイムアウトが切れたら？ 何度やっても失敗する「毒メッセージ」はどこへ逃がすのか。順序が崩れると壊れる処理（残高更新・在庫引当）をどう守るのか。**

この記事は、AWS のサーバーレス・イベント駆動な非同期処理を、**SQS + Lambda + EventBridge + DLQ** で**本番品質**に組み上げるための実装ガイドです。題材として、私が中核開発者（主要開発者3名）として構築した、環境・カーボンクレジット／地域通貨のマルチテナント決済プラットフォーム（[サーバーレス決済プラットフォームの信頼性設計](/case-studies/payment-platform-reliability)）での設計判断も交えます。従業員カードの**月次一括課金・CO2集計**を、**SQS FIFO ＋ デッドレターキュー（DLQ）で順序保証・冪等・自動再処理**として組み、**本番稼働中の二重課金・残高不整合 0件**を達成した実例です。

> **この記事のルール**：SQS / Lambda / EventBridge / Powertools の仕様・パラメータ名・既定値は **AWS 公式ドキュメント（2026年6月時点）** に基づきます。クォータや既定値は改定されるため、本番投入前に必ず公式の各ページで最新値を確認してください。そして最重要の前提：**SQS は「少なくとも1回（at-least-once）」配信です。同じメッセージが2回届くのは「異常」ではなく「仕様」**。だから consumer を**冪等**に作るのが、すべての設計の出発点になります。

---

## 0. メンタルモデル：キュー＝「バッファ＋リトライ」、だから冪等が鉄則

最初に、この記事を貫く5つのメンタルモデルを固定します。ここを腹落ちさせれば、あとの実装はすべてこの帰結です。

- **キュー＝バッファ＋リトライ装置**。送信側（producer）と処理側（consumer）を時間的に切り離し、処理側が落ちてもメッセージは消えない。失敗したら再配信される。
- **at-least-once = 重複は前提**。公式は標準キューについて「at-least-once message delivery」、Lambda イベントソースについて「process each event at least once, and duplicate processing of records can occur」と明言しています。だから「同じメッセージが2回来ても結果が同じ」になるように作る。これが**冪等（idempotent）**。
- **可視性タイムアウト＝処理中の二重配信を防ぐ猶予**。メッセージを受信すると、その間は他の consumer から見えなくなる。期限内に削除しなければ、また見えるようになって再処理される。
- **DLQ＝『何度やっても失敗するメッセージ』の隔離先**。`maxReceiveCount` 回失敗したメッセージを本流から外し、調査・再処理できる場所に逃がす。毒メッセージで処理ライン全体を詰まらせないための安全弁。
- **冪等性は「結果が一意であること」、順序保証は「処理の順番」**。この2つは別物。標準キューはベストエフォート順序、FIFO キューが厳密順序。要件に応じて選ぶ（第4章）。

> このサイトには **常駐ワーカープール型（Celery + Redis）** の非同期タスクキュー本番ガイド（[Celery + Redis 非同期タスクキュー本番ガイド](/blog/celery-redis-production-async-task-queue-guide)）が別にあります。**あちらは「ワーカーを常駐させて回す」型、本記事は「サーバーレス・イベント駆動」型**で、補完関係にあります。常駐プロセスを持ちたくない／スパイクに自動追従させたい／運用をマネージドに寄せたいなら本記事の構成、ワーカーの細かい制御やブローカー選択を握りたいならあちらが向きます。

---

## 1. 全体像：4つの部品が何を担うか

サーバーレスのイベント駆動非同期処理は、役割の異なる4つの部品の組み合わせです。SRP（単一責任）で分けて理解すると、どこに何を書くかで迷わなくなります。

| 部品 | 役割（単一責任） | 失敗時の振る舞い |
| --- | --- | --- |
| **SQS（標準 / FIFO）** | メッセージのバッファリングと再配信。at-least-once 配信 | 削除されるまで保持。可視性タイムアウト切れで再配信 |
| **Lambda（イベントソースマッピング）** | キューをポーリングし、バッチで関数を同期呼び出し | 失敗バッチは可視性タイムアウト後に再表示。リトライ＆バックオフ |
| **DLQ（SQS）** | `maxReceiveCount` 超過の毒メッセージを隔離 | 本流から外し、調査・再処理（redrive）を待つ |
| **EventBridge** | イベントルーティング（バス）／定時起動（Scheduler） | スケジュール失敗にもリトライ・最大保持時間を設定可 |

データの流れは2系統あります。

1. **オンデマンド系**：何かのイベント（API 受付、S3 アップロード等）→ SQS にメッセージ投入 → Lambda が処理。
2. **定時バッチ系**：EventBridge Scheduler が cron で起動 → 対象を列挙して SQS にファンアウト → Lambda が並列処理。本記事の決済プラットフォームの「月次一括課金」はこちらです。

EventBridge には公式上、イベントを処理・配信する2つの方法 **event buses（ルーター）と pipes（ポイントツーポイント連携）** があり、さらに **EventBridge Scheduler**（cron / rate 式による定時・ワンタイム起動）が提供されます。本記事の「月次バッチ起動」は Scheduler の cron を使います。

---

## 2. SQS consumer を冪等に作る（この記事の心臓部）

at-least-once の世界では、**冪等な consumer こそが信頼性の土台**です。順番に作っていきます。

### 2.1 まず素朴な形：部分バッチ失敗を返す

Lambda は SQS をポーリングし、**1回の呼び出しに複数メッセージ（バッチ）**を渡します。公式の既定は「up to 10 messages」。ここで初学者が必ず踏む地雷があります。

> **公式の警告**：関数がバッチ処理中にエラーを投げると、**既定では成功済みのメッセージも含めてバッチ全体**が可視性タイムアウト後に再表示される。結果、同じメッセージを何度も処理しうる。

つまり「10件中1件失敗 → 9件は成功していたのに全部やり直し」になります。これを防ぐのが**部分バッチ失敗（partial batch response）**。イベントソースマッピングの `FunctionResponseTypes` に `ReportBatchItemFailures` を指定し、関数は**失敗したメッセージの ID だけ**を返します。

```python
"""SQS consumer: 部分バッチ失敗を返す最小形。
成功したメッセージは削除させ、失敗した id だけを再表示させる。"""

def lambda_handler(event, context):
    batch_item_failures = []

    for record in event["Records"]:
        try:
            process_one(record)  # ここが本体（次節で冪等にする）
        except Exception:
            # 失敗した messageId だけを itemIdentifier として積む
            batch_item_failures.append({"itemIdentifier": record["messageId"]})

    # この形を返すと、ここに載った id だけが再表示される
    return {"batchItemFailures": batch_item_failures}
```

返す JSON の形は公式で厳密に決まっています。`batchItemFailures` の各要素は `itemIdentifier`（= 失敗した `messageId`）を持つこと。例えば `id1`〜`id5` のうち `id2` と `id4` が失敗したなら、こう返します。

```json
{
  "batchItemFailures": [
    { "itemIdentifier": "id2" },
    { "itemIdentifier": "id4" }
  ]
}
```

**成功扱いになる条件と完全失敗になる条件**も公式が明記しています。ここを外すと「全件再処理」や「失敗が握りつぶされる」事故になるので、表で押さえておきます。

| 関数の戻り値 | Lambda の解釈 |
| --- | --- |
| 空の `batchItemFailures` リスト / null | バッチ全体を**成功**（全件削除） |
| `itemIdentifier` に失敗 id を列挙 | その id だけ**再表示**、残りは削除 |
| **例外をそのまま throw** | バッチ**全体を失敗**（全件再表示） |
| 不正な JSON / 空文字や null の `itemIdentifier` / 存在しない id | バッチ**全体を失敗**扱い |

> **設計の急所**：部分バッチ失敗を使うなら、**ハンドラの中で例外を握って `batchItemFailures` に積む**こと。うっかり例外を上まで投げると、せっかくの部分失敗指定が無効化されてバッチ全体失敗になります。

### 2.2 冪等化①：冪等性キー＋条件付き書き込み（自前の王道）

部分バッチ失敗で「成功分の無駄な再処理」は減りますが、**再配信そのものはゼロにできません**。だから処理本体（`process_one`）を冪等にします。決済プラットフォームで実際に採った王道がこれです。

**クライアントが発行した冪等性キー**を一意な識別子として使い、処理の入口で**条件付き挿入（`attribute_not_exists`）**を試みます。すでに同じキーが存在すれば「処理済み」とみなしてスキップ——これだけで二重実行が原理的に止まります。

```python
"""冪等化①: 冪等性キーの条件付き挿入で二重実行を阻止する。
DynamoDB の attribute_not_exists 条件で『初回だけ書ける』を実現する。
キーは『冪等性キーをソートキーに連結』し、TTL で自動失効させる。"""
import time
import boto3
from botocore.exceptions import ClientError

_ddb = boto3.resource("dynamodb")
_table = _ddb.Table("idempotency")
TTL_SECONDS = 90 * 24 * 60 * 60  # 既定90日で自動失効

class AlreadyProcessed(Exception):
    """同じ冪等性キーで処理済み。スキップしてよい（冪等）。"""

def claim_idempotency(tenant_id: str, idempotency_key: str) -> None:
    """初回だけ書き込みに成功する。2回目以降は ConditionalCheckFailed。"""
    # パーティション=テナント、ソートキー=冪等性キーを連結（マルチテナント分離）
    sort_key = f"charge#{idempotency_key}"
    try:
        _table.put_item(
            Item={
                "pk": tenant_id,
                "sk": sort_key,
                "ttl": int(time.time()) + TTL_SECONDS,  # 自動失効
            },
            # 「まだ無いときだけ書く」= 条件付き挿入。これが冪等性の要
            ConditionExpression="attribute_not_exists(pk) AND attribute_not_exists(sk)",
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
            raise AlreadyProcessed(sort_key) from e  # 意味論的失敗 → 再試行しない
        raise  # それ以外（スロットリング等）は上位へ
```

設計のポイントは3つ。

1. **冪等性キーはクライアント発行**。サーバー側で生成すると「再送＝別キー」になり重複を弾けません。「この操作はこのキー」と発行元が決め、**再送でも同じキー**を運ぶのが鉄則です。
2. **キーをソートキーに連結**し、パーティションキーをテナントにすることで、マルチテナントのデータ分離と冪等性チェックを1テーブルで両立。
3. **TTL（既定90日）で自動失効**。冪等性レコードを永久に持つ必要はありません。十分長い窓を取りつつ、TTL で自動的に掃除させてコストとテーブルサイズを抑えます（コスト効率）。

### 2.3 冪等化②：AWS Lambda Powertools の `@idempotent`（推奨ショートカット）

自前の条件付き書き込みは「効く」のですが、**毎プロジェクトで再発明するのは DRY 違反**です。AWS が公式に **Lambda Powertools の idempotency ユーティリティ**を提供しており、上記の「INPROGRESS ロック → COMPLETE キャッシュ → TTL 失効」をデコレータ一発で実装できます。新規なら、まずこちらを検討するのが定石です。

```python
"""冪等化②: Powertools の @idempotent_function。
DynamoDB を永続層に、ペイロードのハッシュを冪等キーにして二重実行を防ぐ。
INPROGRESS で同時実行をロックし、完了後は結果をキャッシュして返す。"""
import os
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer,
    IdempotencyConfig,
    idempotent_function,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

persistence = DynamoDBPersistenceLayer(table_name=os.environ["IDEMPOTENCY_TABLE"])

# event_key_jmespath で「冪等キーにする部分」を指定。
# ここでは body 内の冪等性キーだけをキーにする（メタデータの差で誤判定しないため）。
config = IdempotencyConfig(
    event_key_jmespath='powertools_json(body)."idempotency_key"',
    expires_after_seconds=90 * 24 * 60 * 60,  # 既定3600秒 → 90日に延長
)

@idempotent_function(
    data_keyword_argument="record",
    config=config,
    persistence_store=persistence,
)
def process_one(record: dict):
    # 同じ idempotency_key の再配信ではここは実行されず、前回結果が返る
    charge_employee_card(record)

def lambda_handler(event, context: LambdaContext):
    config.register_lambda_context(context)  # 残時間でタイムアウト保護
    batch_item_failures = []
    for r in event["Records"]:
        try:
            process_one(record=r)  # idempotent_function はキーワード引数必須
        except Exception:
            batch_item_failures.append({"itemIdentifier": r["messageId"]})
    return {"batchItemFailures": batch_item_failures}
```

公式の挙動を押さえておきます。

- 既定の `expires_after_seconds` は **3600秒（1時間）**。長めの再処理窓が必要なら明示的に延ばします。
- `event_key_jmespath` で**ペイロードのどの部分を冪等キーにするか**を JMESPath で指定。指定しなければイベント全体をキーにします。**メタデータ（受信回数など）が混ざる部分をキーにすると同一処理を別物と誤認**するので、本質的な識別子だけを指定するのが要点です。
- 同一ペイロードの**同時実行**は、`INPROGRESS` ロックにより2件目が `IdempotencyAlreadyInProgressError` になります（競合の握りつぶし防止）。
- ハッシュは既定 MD5。冪等キーは「関数名 + ペイロードのハッシュ」で組み立てられます。

> **自前 vs Powertools の使い分け**：マルチテナントの分離や「冪等性キー＋ソートキー連結」のような**ドメイン固有の鍵設計**を握りたいなら自前（2.2）。標準的な「同じ入力なら同じ結果を返す」だけで十分なら Powertools（2.3）。決済プラットフォームでは前者を選びましたが、それは**テナント境界とキー設計を自分で保証したかった**から。要件が普通なら、車輪の再発明はしないでください。

### 2.4 リトライしてよい失敗 vs してはいけない失敗

冪等化と同じくらい重要なのが、**「何を再試行し、何を即失敗させるか」の線引き**です。ここを雑にすると、無駄なリトライでコストとレイテンシを溶かすか、リトライ不能な失敗を延々と回し続けます。決済プラットフォームで採った基準を一般化したのが次の表です。

| 失敗の種類 | 例 | リトライ？ | 理由 |
| --- | --- | --- | --- |
| **一時的競合・スロットリング** | `TransactionConflict`、`ProvisionedThroughputExceeded`、5xx、ネットワーク断 | する（指数バックオフ＋ジッター） | 時間を置けば成功しうる。べき等操作なら安全 |
| **意味論的失敗（入力不正）** | `ConditionalCheckFailed`（処理済み）、4xx、バリデーション違反 | **しない（即時伝播）** | 何度やっても結果は同じ。リトライは無駄かつ有害 |
| **毒メッセージ** | 何度処理しても例外になるレコード | しない → **DLQ へ隔離** | 本流を詰まらせない。隔離して原因調査 |

決済プラットフォームでは、**一時的競合（`TransactionConflict`）のときだけ**「基点 50ms × 2^n の指数バックオフ＋ジッター（±50%）で最大3回」再試行し、**意味論的失敗（`ConditionalCheckFailed`）は再試行せず即時伝播**しました。コードで表すとこうです。

```python
"""リトライ対象を限定する指数バックオフ＋ジッター。
一時的競合だけを再試行し、意味論的失敗は即座に上げる（fail fast）。"""
import random
import time
from botocore.exceptions import ClientError

RETRYABLE = {"TransactionConflict", "ThrottlingException",
             "ProvisionedThroughputExceededException"}

def with_backoff(fn, *, max_attempts: int = 3, base_ms: float = 50.0):
    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except ClientError as e:
            code = e.response["Error"]["Code"]
            if code not in RETRYABLE:
                raise  # 意味論的失敗は再試行しても無駄 → 即伝播
            if attempt == max_attempts:
                raise
            # 基点 50ms × 2^(n-1)、ジッター ±50% で群衆突入（thundering herd）を散らす
            backoff = base_ms * (2 ** (attempt - 1))
            jitter = backoff * random.uniform(-0.5, 0.5)
            time.sleep((backoff + jitter) / 1000.0)
```

ジッター（±50%）は、複数の consumer が**同じタイミングで一斉に再試行して再び衝突する**（thundering herd）のを散らすためです。リトライ間隔を一定にすると、失敗が同期して波を打ちます。**ランダムにずらす**だけで競合の山がならされます。

---

## 3. DLQ：毒メッセージの隔離と再処理

冪等 consumer を組んでも、**何度やっても失敗するメッセージ**は出ます。スキーマ不整合、参照先の消失、想定外の入力——これらを本流で延々リトライさせると、健全なメッセージまで遅延します。これが**毒メッセージ（poison message）**問題で、解は **DLQ** です。

### 3.1 DLQ の仕組み：`maxReceiveCount` と `RedrivePolicy`

公式の定義はシンプルです。ソースキューに **redrive policy** を設定し、`maxReceiveCount`（メッセージが DLQ に移される前に consumer が受信できる回数）を指定します。`maxReceiveCount` 回受信されても削除されなければ、メッセージは DLQ に移ります。

```yaml
# SAM / CloudFormation: 本流キュー + DLQ + redrive policy
Resources:
  ChargeDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: charge-dlq.fifo
      FifoQueue: true
      # 公式の推奨: DLQ の保持期間は本流より長く取る
      MessageRetentionPeriod: 1209600  # 14日（最大）

  ChargeQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: charge.fifo
      FifoQueue: true
      ContentBasedDeduplication: false  # 明示的な dedup id を使う（後述）
      VisibilityTimeout: 180            # consumer の最大処理時間に合わせる（第5章）
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt ChargeDLQ.Arn
        maxReceiveCount: 5              # 5回失敗したら DLQ へ隔離
```

公式が明記する運用上の急所を3つ。

- **DLQ はソースキューと同じ AWS アカウント・リージョン**、かつ**同じ型**（FIFO の DLQ は FIFO、標準の DLQ は標準）でなければなりません。
- **DLQ の保持期間は本流より長く**取るのがベストプラクティス。標準キューでは「元の enqueue タイムスタンプ」基準で失効するため、本流で時間を食ったメッセージは DLQ で短命になり得ます（公式の例：本流で1日 → DLQ 保持4日なら、DLQ では3日で消える）。
- `maxReceiveCount` を**低くしすぎない**。`1` だと「1回の失敗で即 DLQ」。一時障害でも隔離されてしまうので、十分なリトライ余地（公式例では `> 3`）を残します。

### 3.2 FIFO で DLQ を使うときの注意

公式は FIFO で DLQ を使うことに警告を付けています。**順序が文脈を持つ処理（編集指示の EDL など）では、DLQ で順序が崩れると意味が壊れる**から。決済の月次課金では、**メッセージグループ（テナント×月）単位で順序が閉じている**ため、特定メッセージが DLQ に落ちてもグループ内の他処理に致命的影響が出ない設計にしました。FIFO + DLQ を使うなら、「**DLQ 行きで順序が壊れて困るのはどのグループか**」を必ず洗い出してください。

### 3.3 再処理（redrive）：原因を直してから戻す

DLQ に溜まったメッセージは、**dead-letter queue redrive** でソースキュー（または別の宛先）へ戻せます。運用フローはこうです。

1. **隔離**：`maxReceiveCount` 超過で自動的に DLQ へ。
2. **調査**：DLQ のメッセージ本文とログ（例外スタック）を突き合わせ、なぜ失敗したかを特定。「consumer に十分な処理時間を与えていたか」も公式が挙げる確認観点です。
3. **修正**：コード／データ／参照先の不整合を直す。
4. **再処理（redrive）**：原因を直してから DLQ → ソースキューへ redrive。**冪等 consumer なら、すでに部分的に成功していた処理も二重実行されない**ので、安心して戻せます。

> **冪等性と DLQ はセットで効く**。冪等でない consumer に redrive すると、「半分成功していた処理がもう一度走る」事故が起きます。決済プラットフォームで**自動再処理が安全に回った**のは、2章の冪等化が土台にあったからです。

---

## 4. 標準 vs FIFO：順序と重複排除をどう設計するか

SQS には2つの型があり、**順序と重複の保証が根本的に違います**。ここの選択が、後の信頼性設計を決めます。

| 観点 | 標準キュー（Standard） | FIFO キュー（First-In-First-Out） |
| --- | --- | --- |
| 配信保証 | **at-least-once**（重複あり得る） | **exactly-once processing**（重複を入れない） |
| 順序 | ベストエフォート（前後し得る） | **厳密順序**（`MessageGroupId` 単位） |
| 重複排除 | なし（consumer 側で冪等化） | **5分間の重複排除**（`MessageDeduplicationId`） |
| スループット | ほぼ無制限 | 標準より低い（high-throughput モードあり） |
| 主な用途 | 高スループット・順序不問の処理 | 課金・在庫・台帳など順序と重複が致命的な処理 |

### 4.1 FIFO の2つの ID を取り違えない

FIFO を使うなら、性質の異なる2つの ID を**絶対に混同しない**でください。

- **`MessageGroupId`**：**順序の単位**。同じグループ ID のメッセージは厳密に順序処理される。決済では「テナント×対象月」をグループにし、テナントをまたぐ課金は並列化しつつ、同一テナント内は順序を守りました。
- **`MessageDeduplicationId`**：**重複排除の単位**。公式いわく、`SendMessage` を**5分間の重複排除インターバル**内で再試行しても、同じ dedup id なら SQS は重複を入れません。

```python
"""FIFO への送信: グループ ID で順序を、dedup id で重複排除を制御する。"""
import boto3

sqs = boto3.client("sqs")

def enqueue_monthly_charge(tenant_id: str, year_month: str, payload: str,
                           idempotency_key: str) -> None:
    sqs.send_message(
        QueueUrl=CHARGE_FIFO_URL,
        MessageBody=payload,
        # 順序の単位: テナント×月。これが同じものは厳密順序で処理される
        MessageGroupId=f"{tenant_id}#{year_month}",
        # 重複排除の単位: 冪等性キー。5分窓内の再送は SQS が弾く
        MessageDeduplicationId=idempotency_key,
    )
```

公式によると、重複排除の設定方法は2通り。**コンテンツベース重複排除**（メッセージ本文の SHA-256 ハッシュを dedup id にする。本文は見るが属性は見ない）を有効化するか、**明示的に `MessageDeduplicationId` を渡す**か。決済では**冪等性キーを明示的に dedup id として渡す**形にしました。本文が同一でも「別の課金」なら別キーになるため、コンテンツベースより意図が正確に表現できるからです。

> **重要な誤解の訂正**：FIFO の「5分重複排除」は**送信側の重複（同じ `SendMessage` の再試行）**を防ぐもので、**consumer 側の at-least-once 再配信を消すものではありません**。可視性タイムアウト切れによる再処理は FIFO でも起きます。だから**FIFO でも consumer の冪等化は必要**です。「FIFO にしたから冪等はいらない」は誤りです。

### 4.2 標準と FIFO、どちらを選ぶか

判断基準はシンプルです。

- **順序が壊れても結果が壊れないなら標準**。通知送信、サムネ生成、ログ集計など。スループットと安さを取る。
- **順序か重複のどちらかでも壊れると致命的なら FIFO**。課金、残高更新、在庫引当、台帳。決済プラットフォームの月次課金は迷わず FIFO でした。

KISS の観点では「まず標準＋冪等 consumer で足りないか」を問うべきですが、**順序が結果に効く処理に標準を使うのは KISS ではなく単なる手抜き**です。要件が順序を要求するなら FIFO を選び、複雑さを正しく引き受けてください。

---

## 5. 可視性タイムアウト：処理時間との関係で決める

可視性タイムアウトは、**設計者が必ず明示的に決めるべき値**です。既定任せにすると事故ります。

### 5.1 既定と上限（公式の数値）

- **既定は 30 秒**。
- 受信した瞬間にカウント開始。期限内に削除しなければ再表示。
- `ChangeMessageVisibility` で**動的に延長／短縮**できる。`VisibilityTimeout` を `0` にすれば即座に他の consumer へ解放。
- **上限は「最初の受信から 12 時間」**。延長してもこの 12 時間は**リセットされない**。これを超える処理が要るなら、Step Functions を使うかタスクを分割せよ、と公式は述べています。
- 標準キューの**インフライト上限は約 120,000 件**。超えると `OverLimit` エラー（ロングポーリング時はエラーではなく新規メッセージを返さない）。

### 5.2 決め方：処理時間 ＋ バッファ

公式のベストプラクティスは「**メッセージの処理＋削除に通常かかる最大時間に合わせる**」。短すぎると**処理中に再配信されて二重処理**（無駄な重複・コスト）、長すぎると**失敗時の再試行が遅れる**。

実務での決め方はこうです。

1. consumer の **P99 処理時間**を計測する（例：1メッセージ最大 60 秒）。
2. それに**バッファを乗せる**（×3 程度。例：180 秒）。Lambda なら**関数タイムアウト ≦ 可視性タイムアウト**を満たすこと。逆だと、処理中なのに再配信される。
3. 処理時間が読めない／可変なら、**ハートビートで `ChangeMessageVisibility` を定期延長**する。ただし 12 時間上限は意識する。

```yaml
# 可視性タイムアウトは「Lambda 関数タイムアウト × バッファ」で決める
ChargeQueue:
  Type: AWS::SQS::Queue
  Properties:
    VisibilityTimeout: 180   # 関数タイムアウト60秒の3倍。再配信前に余裕を持たせる
```

> **Lambda 固有の注意**：公式によると、**関数コード起因のエラー**では Lambda は処理を止めて並列度を絞り、可視性タイムアウト後にメッセージが再表示されます。**スロットリング起因**では、メッセージのタイムスタンプが可視性タイムアウトを超えるまでリトライし、超えたら**ドロップ**します。つまり可視性タイムアウトは「再試行をどれだけ許すか」のダイヤルでもあります。`ReportBatchItemFailures` を有効にすると、関数失敗時に**ポーリングを絞らない**ので、一部失敗が処理レートに響くのを避けられます。

---

## 6. EventBridge で月次バッチを起動する

決済プラットフォームの「月次一括課金」は、**EventBridge Scheduler の cron** で定時起動します。Scheduler は cron / rate 式の繰り返しパターンに加え、ワンタイム起動、柔軟な時間枠、**リトライ上限・失敗時の最大保持時間**まで設定できる、サーバーレスのスケジューラです。

```yaml
# 毎月1日 03:00(UTC) に「月次課金ディスパッチャ」Lambda を起動する
MonthlyChargeSchedule:
  Type: AWS::Scheduler::Schedule
  Properties:
    Name: monthly-charge-dispatch
    ScheduleExpression: cron(0 3 1 * ? *)   # 毎月1日 03:00 UTC
    ScheduleExpressionTimezone: Asia/Tokyo
    FlexibleTimeWindow:
      Mode: "OFF"
    Target:
      Arn: !GetAtt ChargeDispatcherFunction.Arn
      RoleArn: !GetAtt SchedulerInvokeRole.Arn
      RetryPolicy:
        MaximumRetryAttempts: 3            # 失敗時のリトライ上限
        MaximumEventAgeInSeconds: 3600     # 失敗イベントの最大保持時間
```

ディスパッチャ Lambda 自体は**重い処理をしません**（SRP）。やるのは「対象テナント×従業員を列挙し、SQS FIFO へファンアウトする」だけ。実際の課金は consumer 側で冪等・順序保証つきに処理されます。

```python
"""月次課金ディスパッチャ: 列挙して FIFO に投入するだけ（処理はしない）。
重い課金処理は冪等な consumer に委ね、ここは『起動とファンアウト』に専念する。"""
def lambda_handler(event, context):
    year_month = current_billing_month()  # 例: "2026-06"
    for tenant_id, employee_id, amount in iter_billable(year_month):
        # 冪等性キー = ディスパッチャが決定的に生成（再実行で同じキーになる）
        idempotency_key = f"{tenant_id}:{employee_id}:{year_month}"
        enqueue_monthly_charge(
            tenant_id=tenant_id,
            year_month=year_month,
            payload=build_charge_payload(employee_id, amount),
            idempotency_key=idempotency_key,
        )
```

ここに**もう一段の冪等性**が効いています。冪等性キーを `tenant:employee:year_month` で**決定的に**生成するため、スケジューラが何らかの理由で**二重起動しても、FIFO の dedup と consumer の冪等化の二重で重複課金を弾けます**。「スケジューラは exactly-once ではない」を前提に、下流で吸収する設計です。

> **なぜ SQS を挟むのか**：ディスパッチャから直接課金 API を叩く構成もあり得ます。が、それだと「途中で落ちたら残りが処理されない」「課金 API のレート制限を受け止められない」。SQS を挟むことで、**バッファリング（レート制御）・リトライ・部分失敗の隔離（DLQ）**をマネージドに獲得できます。これが ETC（変更容易性）と信頼性の両取りです。

---

## 7. 本番運用：可観測性とコスト

設計が正しくても、**見えなければ運用できません**。決済プラットフォームで実際に効いた可観測性とコストの勘所をまとめます。

### 7.1 何を監視するか

最低限、次の CloudWatch メトリクスにアラームを張ります。

| メトリクス | 何を意味するか | アラームの意図 |
| --- | --- | --- |
| `ApproximateNumberOfMessagesVisible`（キュー深さ） | 未処理メッセージの滞留 | 処理が追いついていない／consumer 停止の検知 |
| `ApproximateAgeOfOldestMessage` | 最古メッセージの滞留時間 | 詰まり・毒メッセージの兆候 |
| **DLQ の `ApproximateNumberOfMessagesVisible`** | 隔離された毒メッセージ件数 | **1件でも入ったら即通知**（要調査） |
| `NumberOfMessagesDeleted` | 削除（＝成功処理）件数 | **0 に落ちたら**部分バッチ失敗の戻し漏れの疑い |

公式も「`NumberOfMessagesDeleted` が 0 に落ちる／`ApproximateAgeOfOldestMessage` が急増するのは、関数が失敗メッセージを正しく返せていないサイン」と明記しています。決済プラットフォームでは**CloudWatch アラーム 20 超＋複合アラーム**を組み、**構造化ログを重大度に応じて Slack 通知**する運用にしました。とりわけ **DLQ に1件でも入ったら即通知**は、毒メッセージを早期に捕まえる生命線です。

### 7.2 ログに何を残すか

冪等・再処理を追えるよう、**本文（PII）ではなくメタデータ**を構造化ログで残します。

- `messageId` / 冪等性キー / `MessageGroupId` / `ApproximateReceiveCount`（受信回数）
- 失敗種別（一時的競合 / 意味論的失敗 / 毒メッセージ）とリトライ回数
- DLQ 行きになった場合の理由（例外クラス＋要約）

`ApproximateReceiveCount` は SQS イベントの `attributes` に入っており、「このメッセージは何回再配信されたか」が一目で分かります。**3回目以降が増えてきたら、毒メッセージか可視性タイムアウト不足を疑う**シグナルです。

### 7.3 コストの勘所

- **SQS のリクエスト課金**：ロングポーリング（`WaitTimeSeconds` を大きく）で空ポーリングを減らし、リクエスト数＝コストを下げる。
- **バッチで処理する**：1呼び出しで最大10件を処理すれば、Lambda 呼び出し回数と SQS API 回数の両方が減る（コスト効率）。
- **冪等レコードは TTL で掃除**：DynamoDB の TTL（既定90日）で自動失効させ、ストレージと RCU/WCU の無駄を抑える。
- **`maxReceiveCount` を適正に**：低すぎると毒でないメッセージまで DLQ に落ちて再処理の手間（＝人件費）が増え、高すぎると無駄なリトライで課金が増える。

---

## 8. まとめ：冪等な非同期処理チートシート

最後に、迷ったときの早見表です。

- **大前提**：SQS は at-least-once。**重複は仕様**。consumer を冪等に作るのがすべての出発点。
- **部分バッチ失敗**：`FunctionResponseTypes=ReportBatchItemFailures` ＋ 失敗 id を `batchItemFailures` / `itemIdentifier` で返す。例外を上に投げない。
- **冪等化**：自前なら冪等性キー＋`attribute_not_exists` の条件付き挿入＋TTL。新規なら Powertools の `@idempotent` / `@idempotent_function`。
- **リトライの線引き**：一時的競合だけ指数バックオフ＋ジッターで再試行。意味論的失敗は即伝播。
- **DLQ**：`RedrivePolicy` の `maxReceiveCount` で毒メッセージを隔離（低くしすぎない）。保持期間は本流より長く。原因を直してから redrive。
- **標準 vs FIFO**：順序か重複が致命的なら FIFO（`MessageGroupId` で順序、`MessageDeduplicationId` で5分重複排除）。**FIFO でも consumer の冪等化は必要**。
- **可視性タイムアウト**：既定30秒。P99 処理時間×バッファで決める。Lambda 関数タイムアウト ≦ 可視性タイムアウト。上限は最初の受信から12時間。
- **定時起動**：EventBridge Scheduler の cron。ディスパッチャは列挙＆ファンアウトに専念し、重い処理は冪等 consumer に委ねる。
- **可観測性**：キュー深さ・最古メッセージ滞留・**DLQ 件数（1件で即通知）**・削除数にアラーム。

サーバーレスの非同期処理は「キューに逃がすだけ」に見えて、**at-least-once 配信を前提に、冪等性・順序・再試行・隔離を設計する仕事**です。私は環境分野のマルチテナント決済プラットフォームで、従業員カードの月次一括課金・CO2集計を **SQS FIFO ＋ DLQ で順序保証・冪等・自動再処理**として組み、冪等性キーの条件付き挿入・リトライ対象の限定・20超のアラームによる可観測性で固め、**本番稼働中の二重課金・残高不整合 0件**を達成しました。

**「自社のこの非同期処理を、二重実行なく・順序を壊さず・落ちても自動で立ち直る形にしたい」——その設計から実装・運用まで、一人 × 生成AI（Claude Code）の速さで一気通貫に伴走できます。** 要件の整理段階からでも、お気軽にご相談ください。常駐ワーカープール型が向くケースなら[Celery + Redis 非同期タスクキュー本番ガイド](/blog/celery-redis-production-async-task-queue-guide)も併せてどうぞ。

---

### 参考（公式ドキュメント）

- [What is Amazon Simple Queue Service?](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) — 標準キューの at-least-once / FIFO の exactly-once processing、メッセージライフサイクル
- [Amazon SQS visibility timeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html) — 既定30秒・最大12時間、`ChangeMessageVisibility`、インフライト上限
- [Exactly-once processing in Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-exactly-once-processing.html) — `MessageDeduplicationId`、5分重複排除、コンテンツベース重複排除
- [Using dead-letter queues in Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) — `RedrivePolicy` / `maxReceiveCount` / redrive、FIFO + DLQ の注意
- [Using Lambda with Amazon SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) — イベントソースマッピング、バッチ、FIFO イベント、`MessageGroupId` / `MessageDeduplicationId`
- [Handling errors for an SQS event source in Lambda](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html) — `ReportBatchItemFailures` / `batchItemFailures` / `itemIdentifier`、成功/失敗条件、バックオフ
- [What Is Amazon EventBridge?](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-what-is.html) — event buses / pipes、EventBridge Scheduler（cron / rate 式）
- [Idempotency - Powertools for AWS Lambda (Python)](https://docs.aws.amazon.com/powertools/python/latest/utilities/idempotency/) — `@idempotent` / `@idempotent_function`、`IdempotencyConfig`、`DynamoDBPersistenceLayer`、`event_key_jmespath`、`expires_after_seconds`
