イベント駆動アーキテクチャの信頼性は、楽観的な前提では生まれません。生まれるのは、たった1つの悲観的な前提からです。
「イベントは順不同で届き、少なくとも1回(at-least-once)届き、そして失敗する」——これを所与として、冪等なコンシューマで設計する。これがDynamoDB StreamsによるCDC(Change Data Capture:変更データキャプチャ)を本番で機能させる唯一の正解です。Streamsは「テーブルへの全変更を、漏らさず・時系列で・near-real-timeに」配るための強力なプリミティブですが、その配り方には明確な保証と明確な限界があります。保証を過大評価すると、二重反映・順序崩れ・毒メッセージによるシャード停止で本番が壊れます。
本記事は、私がAWSサーバーレス(Lambda + DynamoDB)のマルチテナント決済プラットフォームの信頼性レイヤーを設計・主導し、本番稼働中の二重課金0件を維持してきた経験をベースに、Streams CDCを下流へ安全に伝播する設計だけを体系化したものです。データモデリングや冪等性・条件付き書き込みの基礎は姉妹記事DynamoDB シングルテーブル設計&本番信頼性パターン完全ガイドに、料金と性能はDynamoDB キャパシティ・コスト・性能設計 完全ガイドに譲ります。本記事はそれと相補的に、**「変更をどう捕まえ、どう伝え、どう壊れないようにするか」**に絞ります。
仕様・上限値はすべて AWS公式ドキュメント(2026年6月時点) に照合しています。
1. DynamoDB Streamsの仕組み:CDCの土台を正確に押さえる
Streamsとは何か
DynamoDB Streamsは、公式の定義そのままに言えば「テーブル内のアイテムレベルの変更を時系列で捕捉し、最大24時間ログに保存する」機能です。アプリケーションはこのログにアクセスし、変更前・変更後のデータをnear-real-timeで読み取れます。これはまさにCDCの教科書的な実装です。
Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attributes of the items that were modified.(アプリケーションがアイテムを作成・更新・削除するたびに、DynamoDB Streamsは変更されたアイテムの主キー属性とともにストリームレコードを書き込む)
重要なのは、Streamsがテーブルへの書き込みパスと非同期で動くことです。公式は「ストリームは非同期に動作するため、有効化してもテーブルの性能に影響しない」と明言しています。CDCの取得そのものが書き込みレイテンシを増やさない——これがStreamsをトランザクションログとして使える理由です。
ビュータイプ(StreamViewType):何を流すか
ストリームに「変更前/変更後のどの像を載せるか」は、テーブル単位の StreamViewType で決めます。公式の4種類:
| StreamViewType | ストリームレコードに載る内容 |
|---|---|
KEYS_ONLY | 変更されたアイテムのキー属性のみ |
NEW_IMAGE | 変更後のアイテム全体 |
OLD_IMAGE | 変更前のアイテム全体 |
NEW_AND_OLD_IMAGES | 変更前と変更後の両方 |
設計上の指針は明確です。マテリアライズドビュー同期・検索インデックス同期には NEW_AND_OLD_IMAGES が要ります。古いインデックスエントリを正確に消す(差分を取る)には「変更前の像」が、新しい値を反映するには「変更後の像」が必要だからです。一方、削除イベントを別システムへ通知するだけなら KEYS_ONLY で十分です。なお公式の注意点として、StreamViewType は一度設定すると変更できません。変えるにはストリームを無効化して作り直す必要があります(=新しいストリームになる)。
24時間保持、シャード、順序保証
CDCの設計判断を左右する3つの公式仕様を、正確に押さえます。
(1) 保持は24時間。 「DynamoDB Streamsの全データは24時間のライフタイムに従う。24時間より古いデータはいつ削除(トリミング)されてもおかしくない」。つまりStreamsは再生(リプレイ)用の永続ログではありません。コンシューマが24時間以上停止すれば、その間の変更は永久に失われます。長期保持・リプレイが要件なら、後述のKinesis Data Streamsを検討します。
(2) シャード。 ストリームレコードはシャードにグルーピングされ、各シャードは複数のレコードのコンテナです。シャードは必要に応じて自動で生成・分割され、親テーブルへの書き込みが増えるとシャードが分裂して並列処理できるようになります。シャードには親子関係(lineage)があり、親シャードを処理してから子シャードを処理することで順序が保たれます(Lambdaを使う場合はこの管理は自動)。
(3) 順序保証は「アイテム単位」だけ。 ここが最重要かつ最も誤解される点です。公式の保証は2つです:
- Each stream record appears exactly once in the stream.(各ストリームレコードはストリームに正確に1回現れる)
- For each item that is modified in a DynamoDB table, the stream records appear in the same sequence as the actual modifications to the item.(テーブル内で変更された各アイテムについて、ストリームレコードは実際の変更と同じ順序で現れる)
噛み砕くと——順序が保証されるのは「同一アイテム(同一の主キー)への変更の並び」だけです。異なるアイテム間、ましてやテーブル全体での全順序は保証されません。だから「注文Aの更新は、別の注文Bの更新より必ず先に届く」と期待してはいけません。同一の OrderId への created → paid の順序は守られますが、それを跨いだ順序に依存する設計は壊れます。
「exactly once」の罠:上の「正確に1回」はストリーム内のレコードの話であって、コンシューマ(Lambda)が正確に1回呼ばれるという意味ではありません。後述のとおりLambda配信は at-least-once です。ここを混同すると冪等化をサボって事故ります。
最後に実務で効く2つの細則:
- 何も変えない書き込みはレコードを生まない。 「
PutItem/UpdateItemがアイテムを実際に変更しなかった場合、Streamsはレコードを書かない」。冪等な再送(同じ値の上書き)がCDCを汚さないのは、この仕様のおかげです。 - TTL削除もStreamに出る。 期限切れによる自動削除も
REMOVEイベントとしてストリームに流れます(詳細はセクション8)。「TTLで静かに消えたはず」がCDCに現れるので、削除を下流に伝える設計では考慮が要ります。
2. DynamoDB Streams vs Kinesis Data Streams:CDCの配送路を選ぶ
DynamoDBの変更を捕まえる経路は2つあります。素のDynamoDB Streamsと、Kinesis Data Streams for DynamoDB(テーブルの変更をKinesisデータストリームへ複製する)です。多くのワークロードはStreamsで足りますが、保持・コンシューマ数・スループットが要件を超えるとKinesisが必要になります。
公式の記述に基づく比較:
| 観点 | DynamoDB Streams | Kinesis Data Streams for DynamoDB |
|---|---|---|
| データ保持 | 24時間固定 | 既定24時間、最大365日まで延長可(より長期のリプレイ) |
| 順序保証 | アイテム単位で保証(同一アイテムの変更は順序どおり) | 順不同になり得る(ApproximateCreationDateTime で順序を判定) |
| 重複 | 「各レコードはストリームに正確に1回」 | 同一アイテムの通知が複数回出得る(重複あり前提) |
| コンシューマ数 | 1シャードあたり最大2プロセス(Lambdaは最大2関数) | 拡張ファンアウトで2つ以上の下流アプリへ同時配信 |
| 主なコンシューマ | Lambdaトリガ、Kinesis Adapter(KCL) | KCL、Lambda、Firehose、Managed Service for Apache Flink |
| 関連付け | テーブルに対し1ストリーム | 1テーブル → 1 Kinesisデータストリーム |
| 課金 | (Lambda経由なら)GetRecords 課金なし | CDCユニット課金+Kinesis側の課金 |
公式はKinesis側のトレードオフを率直に書いています。
The Kinesis data stream records might appear in a different order than when the item changes occurred. The same item notifications might also appear more than once in the stream.(Kinesisデータストリームのレコードは、アイテム変更が起きた順序と異なる順序で現れることがある。同じアイテムの通知が2回以上現れることもある)
つまり長期保持・多数のファンアウト・大規模分析が必要なら Kinesis、ただし順序保証が弱まり重複が増えることを受け入れ、ApproximateCreationDateTime での順序判定と冪等化で吸収する。逆に標準的なイベント駆動・低レイテンシ・少数コンシューマなら DynamoDB Streams + Lambda がシンプルで安く速い、という線引きになります。
私の選定基準:決済基盤では、整合性が要る下流伝播(残高ビュー・台帳・監査)はDynamoDB Streams + Lambdaで組み、24時間以内に確実に消化できる設計(後述の可観測性で
IteratorAgeを監視)にしました。一方、長期リプレイや分析パイプラインが要る領域は Kinesis 側に逃がす、という使い分けです。「迷ったら Streams、保持/ファンアウトで詰まったら Kinesis」が実務の出発点です。
3. Lambdaトリガ設計:イベントソースマッピングを正しく構成する
Streamの最も一般的なコンシューマはLambdaトリガです。Lambdaはイベントソースマッピング(ESM)というリソースを通じてストリームをポーリングします。公式によれば、Lambdaは各シャードを毎秒4回ポーリングし、レコードがあると関数を同期呼び出しして結果を待ちます。
バッチング・並列度・開始位置
| パラメータ | 既定 | 範囲・上限 | 役割 |
|---|---|---|---|
BatchSize | 100 | 最大 10,000 | 1回の呼び出しに渡すレコード数の上限 |
MaximumBatchingWindowInSeconds | 0 | 最大 300(5分) | バッチが満ちるまで待つ時間。低トラフィックで呼び出し回数を抑える |
ParallelizationFactor | 1 | 最大 10 | 1シャードを同時に処理する並列バッチ数 |
StartingPosition | 必須 | TRIM_HORIZON / LATEST | 読み始める位置 |
公式の重要点を3つ:
- バッチのペイロード上限は6 MB。 Lambdaは「フルバッチに達する」「バッチングウィンドウが切れる」「ペイロードが6 MBに達する」のいずれかで呼び出します。
ParallelizationFactorを上げても順序は壊れない。 公式が明記するとおり、並列度を上げても Lambdaはアイテム(パーティション+ソートキー)単位の順序処理を保証します。IteratorAgeが高く処理が追いつかないときに、同一アイテムの順序を守ったままスループットを上げられます。LATESTは取りこぼし得る。 ESMの作成・更新時のポーリング開始は結果整合で数分かかるため、LATESTだとその間のイベントを取りこぼす可能性があります。取りこぼしを許さないならTRIM_HORIZONを指定せよ、と公式は明言しています。
エラー処理:毒メッセージでシャードを止めないために
ここが本番設計の心臓部です。同期呼び出しが失敗すると、Lambdaはそのバッチを成功するかレコードが期限切れになるまでリトライします。素朴に放置すると、1件の「毒メッセージ(poison pill)」がシャード全体を最大1日ブロックします(公式:「デフォルト設定では、不正なレコードが該当シャードの処理を最大1日ブロックし得る」)。これを防ぐ4つのレバー:
| 設定 | 既定 | 役割 |
|---|---|---|
BisectBatchOnFunctionError | false | 失敗時にバッチを2分割して不正レコードを隔離。分割はリトライ枠を消費しない |
MaximumRetryAttempts | -1(無限) | リトライ上限。最小0・最大10,000。-1はレコード期限切れまで再試行 |
MaximumRecordAgeInSeconds | -1(無限) | レコードの最大年齢。最小-1・最大604,800(7日)。古いレコードを破棄 |
DestinationConfig(on-failure) | なし | 破棄レコードのメタデータを送る先(標準SQS / 標準SNS / S3 / Kafka) |
公式が示す失敗時の挙動の整理:
- 呼び出し前の失敗(スロットリング等):レコードが期限切れ、または
MaximumRecordAgeInSecondsを超えるまでリトライ。 - 呼び出し中の失敗(関数がエラーを返す):レコードが期限切れ、
MaximumRecordAgeInSeconds超過、またはMaximumRetryAttempts到達までリトライ。BisectBatchOnFunctionErrorを有効にすると、失敗バッチを2分割して不正レコードを切り出し、タイムアウトを避ける。 - エラー処理をやり切っても失敗した場合、Lambdaはそのレコードを破棄して次のバッチへ進む。だからこそon-failure destination(DLQ)が必要です。
ただし重要な制約があります。on-failureの送信先に入るのは「失敗バッチのメタデータ」だけで、レコード本体は含まれません(S3送信先のみ payload に丸ごと含む)。SQS/SNSに来るのはシャードIDと開始/終了シーケンス番号です。
The actual records aren't included, so you must process this record and retrieve them from the stream before they expire and are lost.(実レコードは含まれないので、このメタデータを処理し、期限切れで失われる前にストリームから対象レコードを取得する必要がある)
つまりDLQに「証拠」は残せても「中身」は24時間以内に自力で拾い直す前提。だから現実的な再処理は ReportBatchItemFailures(セクション5)で1件単位に隔離し、DLQは最後の安全網に留めるのが定石です。
フィルタ条件:要らないイベントで関数を呼ばない
ESMの FilterCriteria で、Lambdaに渡す前にイベントを絞れます。CDCでは「特定のステータス変更だけ」「TTL削除だけ」を処理したいことが多く、フィルタで関数の呼び出し回数そのものを減らせます(公式:最大5パターンまで定義可能)。例えばTTL削除だけを拾うフィルタ:
{
"Filters": [
{
"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
}
]
}
「2百万件/時の変更のうちTTL削除が5%」のようなテーブルで、フィルタは呼び出しを10万件/時に削り、課金と無駄な処理を同時に減らします。フィルタは下流に流す前の最初の防波堤です。
4. 冪等なコンシューマ:at-least-once を前提に設計する
ここが本記事の哲学の中核です。Lambdaのイベントソースマッピングは「少なくとも1回」配信であり、重複は仕様として起こります。公式の警告は明快です。
Lambda event source mappings process each event at least once, and duplicate processing of records can occur. To avoid potential issues related to duplicate events, we strongly recommend that you make your function code idempotent.(Lambdaのイベントソースマッピングは各イベントを少なくとも1回処理し、レコードの重複処理が起こり得る。重複イベントの問題を避けるため、関数コードを冪等にすることを強く推奨する)
重複が起きる経路は構造的です——リトライ、バッチ再処理、ReportBatchItemFailures のチェックポイント以降の再送、ParallelizationFactor 起因の境界。だから「重複は来ない」ではなく「重複は来る。来ても結果が変わらないように作る」が唯一の正解です。
冪等化の定石は**処理済みマーカー(processed-marker)**です。各ストリームレコードは一意な eventID(または SequenceNumber)を持つので、これを冪等キーにして「初めて見たレコードだけ副作用を実行する」ように条件付き書き込みで弾きます。
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
DynamoDBDocumentClient,
PutCommand,
} from "@aws-sdk/lib-dynamodb";
import { ConditionalCheckFailedException } from "@aws-sdk/client-dynamodb";
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const TABLE = process.env.PROCESSED_TABLE!;
/**
* 「このストリームレコードを初めて処理するか」を原子的に判定する。
* eventID は各レコードで一意。attribute_not_exists で二重処理を弾く。
* 2回目以降は ConditionalCheckFailedException となり false を返す。
*/
async function claimOnce(eventId: string): Promise<boolean> {
try {
await ddb.send(
new PutCommand({
TableName: TABLE,
Item: {
PK: `EVT#${eventId}`,
processedAt: new Date().toISOString(),
// 処理済みマーカーを永久に残さない(容量・コスト抑制)。
// Streams 保持(24h)より十分長い TTL にして再送ウィンドウを覆う。
expiresAt: Math.floor(Date.now() / 1000) + 60 * 60 * 24 * 7,
},
// まだ処理していないレコードのときだけ「確保」できる
ConditionExpression: "attribute_not_exists(PK)",
}),
);
return true; // このレコードは初見 → 副作用を実行してよい
} catch (err) {
if (err instanceof ConditionalCheckFailedException) {
return false; // 既に処理済み(重複)→ スキップ
}
throw err;
}
}
ポイントは、冪等性をアプリのロックや「処理済みかを先にSELECT」では守らないことです。SELECTとPUTの間に並行呼び出しが割り込めば破れます。attribute_not_exists は書き込みと同一の原子操作として評価されるので競合下でも破れません。条件付き書き込みの詳細はシングルテーブル設計&冪等性ガイドに、SQS/EventBridgeでの非同期冪等処理はSQS×Lambda×EventBridgeで冪等な非同期処理を設計するにまとめています。
注意:処理済みマーカーの「確保」と「実際の副作用」を別の書き込みに分けると、確保後・副作用前にクラッシュした場合に副作用が一度も走らない窓ができます。理想は副作用と同じトランザクション(
TransactWriteItems)でマーカーも書くことです。マーカーTTLは、Streamsの再送が起き得る24時間を確実に覆う長さ(例:7日)にします。
5. 部分的バッチ失敗(ReportBatchItemFailures):成功を巻き戻さない
冪等化と並ぶ、もう一つの本番必須テクニックがこれです。既定のLambdaは、バッチが完全成功したときだけ最高シーケンス番号までチェックポイントし、それ以外はバッチ全体を失敗として丸ごとリトライします。100件中99件成功・1件失敗でも、99件が再処理されてしまう——冪等でなければ即事故、冪等でも無駄なリトライです。
ReportBatchItemFailures を有効化すると、失敗したレコードのシーケンス番号だけを返して部分成功にできます。公式の必須レスポンス構文:
{
"batchItemFailures": [
{ "itemIdentifier": "<SequenceNumber>" }
]
}
公式の挙動:
batchItemFailuresに複数あるとき、Lambdaは最小のシーケンス番号をチェックポイントにし、そこから再試行する。- 完全成功とみなす返り値:空の
batchItemFailuresリスト、null リスト、空/null のEventResponse。 - 完全失敗とみなす返り値:空文字の
itemIdentifier、null のitemIdentifier、不正なキー名。 - 有効化には ESM の
FunctionResponseTypesにReportBatchItemFailuresを含める。
これを踏まえた、冪等化(セクション4)+部分バッチ失敗+型安全を1つにまとめたTypeScriptハンドラが以下です。aws-lambda の型を使い、unmarshall でDynamoDB JSONを素のオブジェクトに戻して処理します。
import type {
DynamoDBStreamEvent,
DynamoDBStreamHandler,
DynamoDBBatchResponse,
DynamoDBBatchItemFailure,
DynamoDBRecord,
} from "aws-lambda";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import type { AttributeValue } from "@aws-sdk/client-dynamodb";
/**
* 1レコードを処理する。冪等&副作用はこの中だけに閉じ込める(SRP)。
* 失敗時は throw する。呼び出し側がシーケンス番号を失敗として記録する。
*/
async function handleRecord(record: DynamoDBRecord): Promise<void> {
// 変更前/変更後イメージは StreamViewType に依存。NEW_AND_OLD_IMAGES 前提。
const keys = record.dynamodb?.Keys;
const newImage = record.dynamodb?.NewImage;
const oldImage = record.dynamodb?.OldImage;
// DynamoDB JSON({ "S": "..." })→ 素の JS オブジェクトへ。
const key = keys ? unmarshall(keys as Record<string, AttributeValue>) : undefined;
const next = newImage ? unmarshall(newImage as Record<string, AttributeValue>) : undefined;
const prev = oldImage ? unmarshall(oldImage as Record<string, AttributeValue>) : undefined;
// eventID を冪等キーにして二重処理を弾く(セクション4)。
if (!record.eventID || !(await claimOnce(record.eventID))) return;
switch (record.eventName) {
case "INSERT":
case "MODIFY":
await projectUpsert(key, next, prev); // マテビュー/検索インデックスへ反映
break;
case "REMOVE":
await projectDelete(key, prev); // 下流から削除
break;
}
}
export const handler: DynamoDBStreamHandler = async (
event: DynamoDBStreamEvent,
): Promise<DynamoDBBatchResponse> => {
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
// 同一アイテムの順序を壊さないため、レコードは「直列」で処理する。
// 失敗したレコード以降が再試行されるよう、最小シーケンス番号を返す。
for (const record of event.Records) {
try {
await handleRecord(record);
} catch (err) {
// この1件を失敗として報告。Lambda はここから再チェックポイントする。
batchItemFailures.push({ itemIdentifier: record.dynamodb?.SequenceNumber ?? "" });
// 以降のレコードは次回まとめて再試行されるので、ここで打ち切る。
break;
}
}
return { batchItemFailures };
};
// 下流への反映は別モジュールに分離(マテビュー・検索同期は SRP で独立)
declare function projectUpsert(key: unknown, next: unknown, prev: unknown): Promise<void>;
declare function projectDelete(key: unknown, prev: unknown): Promise<void>;
設計判断を2つ補足します。第一に、ストリーム処理ではレコードを並列に投げず直列に処理し、最初の失敗で打ち切るのが安全です。チェックポイントは最小シーケンス番号なので、失敗以降を一括で再試行させた方が順序とリトライが素直に噛み合います。第二に、BisectBatchOnFunctionError と ReportBatchItemFailures は併用でき、公式によれば両方有効なときは返したシーケンス番号でバッチを二分割して残りだけ再試行します。重複を完全に消すことはできない(公式:「成功レコードでも再試行の可能性は残る」)ので、最後の砦は常に冪等性です。
6. 実用パターン:マテビュー・検索インデックス・fan-out・Outbox
ここまでの土台(順序保証・at-least-once・冪等化・部分失敗)の上に、本番でよく使う4つの伝播パターンを載せます。
(1) マテリアライズドビュー/集計
DynamoDBはJOINやGROUP BYが苦手です。だから「読み取り形に整形したビュー」を書き込み時にStreams経由で先に作っておくのがCDCの王道です。例えば「ユーザーごとの注文合計」「ステータス別件数」を、注文の INSERT/MODIFY/REMOVE を受けて別アイテム(または別テーブル)に集計します。NEW_AND_OLD_IMAGES で差分(旧ステータス→新ステータス)を取れるので、カウンタを正しく増減できます。集計の更新は条件付き原子更新で行い、冪等キーで二重加算を防ぎます(原子カウンタ ADD 単体は冪等でないため残高系には使わない——詳細はシングルテーブル設計ガイド)。
(2) OpenSearch等への検索インデックス同期
全文検索やファセット検索が要るなら、DynamoDBを真実の源(source of truth)に保ったまま、StreamsでOpenSearch等へ非同期同期します。INSERT/MODIFY でドキュメントを upsert、REMOVE(TTL削除含む)でドキュメントを delete。ここでも NEW_AND_OLD_IMAGES が効きます——旧像で「消すべき古いインデックスエントリ」を特定できるからです。同期は結果整合になるため、「書き込み直後に検索ヒットを期待しない」前提でUXを設計します。
(3) EventBridge Pipesでのfan-out
Lambdaを書かずに「Streams → 任意のターゲット」を繋ぎたいなら、Amazon EventBridge Pipesが有力です。公式によればPipesはソースとターゲットを点対点で繋ぎ、フィルタリングと(オプションの)エンリッチメントを挟めるマネージドな統合で、DynamoDB Streamsをソースとしてサポートします。
It reduces the need for specialized knowledge and integration code when developing event-driven architectures.(イベント駆動アーキテクチャ開発で、専門知識と統合コードの必要性を減らす)
Pipesは4ステージ(source → filtering → enrichment → target)で構成され、ターゲットにはEventBridgeイベントバス、Step Functions、SQS、SNS、Lambda、API Destinations等を選べます。**fan-out(1つの変更を多数の下流へ)**は、Streams → Pipes → EventBridgeイベントバスへ流し、バス上で複数のルール/ターゲットに配るのが定石です(イベントバスは公式いわく「多対多ルーティングに適する」)。「グルーコードを書かずにCDCを配る」ならPipes、「複雑な条件分岐や独自の冪等制御が要る」ならLambda、と使い分けます。
(4) Transactional Outbox連携
外部システムへのイベント発行で「DBは更新したがイベント発行に失敗(またはその逆)」を防ぐのがTransactional Outboxパターンです。アイデアは単純で、業務データとイベント(outboxレコード)を同一トランザクションで書き、確定したものだけを後から発行すること。DynamoDBではここが綺麗に噛み合います——TransactWriteItems で業務アイテムとoutboxアイテムを原子的に書けば、outboxに載った=業務変更が確定したが保証されます。そしてStreamsをそのoutboxのリレー(発行器)として使うと、ポーリング不要で確定イベントだけが下流に流れます。Streamsが「テーブルの確定済み変更ログ」である性質が、Outboxの発行保証とそのまま一致するわけです。詳細な実装はTransactional Outboxパターンで「確実なイベント発行」を設計するにまとめています。
7. Terraform:本番のイベントソースマッピング定義
ここまでの設計判断(バッチ・並列度・bisect・DLQ・部分失敗・フィルタ)を、再現可能なIaCに落とします。aws_lambda_event_source_mapping でDynamoDB Streamsを購読する本番構成です。
# 失敗バッチのメタデータを退避する DLQ(標準 SQS)
resource "aws_sqs_queue" "stream_dlq" {
name = "app-stream-dlq"
message_retention_seconds = 1209600 # 14日。Streams の 24h より長く保持して調査時間を確保
}
resource "aws_lambda_event_source_mapping" "ddb_stream" {
event_source_arn = aws_dynamodb_table.app.stream_arn
function_name = aws_lambda_function.projector.arn
starting_position = "TRIM_HORIZON" # LATEST は作成直後の取りこぼしリスクがある
# --- バッチング・並列度 ---
batch_size = 100 # 既定。下流の処理コストに合わせて調整
maximum_batching_window_in_seconds = 5 # 低トラフィック時に呼び出し回数を抑える
parallelization_factor = 10 # 同一アイテム順序は維持したままスループットを上げる
# --- 毒メッセージ対策(1件でシャードを止めない) ---
bisect_batch_on_function_error = true # 失敗バッチを二分割して不正レコードを隔離
maximum_retry_attempts = 5 # 無限リトライ(-1)で滞留させない
maximum_record_age_in_seconds = 3600 # 1時間より古いレコードは破棄して前進
# --- 部分的バッチ失敗:成功レコードを巻き戻さない ---
function_response_types = ["ReportBatchItemFailures"]
# --- 破棄レコードの退避先(最後の安全網。本体は含まれずメタデータのみ) ---
destination_config {
on_failure {
destination_arn = aws_sqs_queue.stream_dlq.arn
}
}
# --- フィルタ:下流に要るイベントだけを関数へ(例:status が paid になった変更のみ) ---
filter_criteria {
filter {
pattern = jsonencode({
dynamodb = {
NewImage = {
status = { S = ["paid"] }
}
}
})
}
}
}
設計上の勘所:
maximum_retry_attemptsとmaximum_record_age_in_secondsを必ず有限値にします。既定の-1(無限)は、毒メッセージがシャードを24時間ブロックする最悪ケースを招きます。- DLQにはメタデータしか入りません。SQSメッセージのシャードID+シーケンス番号から、24時間以内に対象レコードをストリームから拾い直す再処理ジョブを別途用意します。
- フィルタは「下流が要らないイベントで関数を呼ばない」ための最初の防波堤。
status = paidのようにCDCの意味で絞ります。 - Lambda関数のIAMロールには、ストリーム読み取り(
dynamodb:GetRecords/GetShardIterator/DescribeStream/ListStreams)とDLQ送信(sqs:SendMessage)の最小権限だけを与えます(最小権限の原則)。
8. 落とし穴と可観測性:詰まる前に気づく
よくある落とし穴(と対策)
| 落とし穴 | なぜ起きるか | 対策 |
|---|---|---|
| 毒メッセージでシャードが1日止まる | 失敗バッチを期限切れまで無限リトライ | BisectBatchOnFunctionError + 有限 MaximumRetryAttempts/MaximumRecordAgeInSeconds + DLQ |
| 重複処理で二重反映 | 配信は at-least-once | eventID 冪等キー+条件付き書き込み(セクション4) |
| 全体順序を仮定して壊れる | 順序保証はアイテム単位のみ | 異なるアイテム間の順序に依存しない設計。必要なら同一PKに寄せる |
| 成功レコードまで再処理 | 既定はバッチ全失敗でリトライ | ReportBatchItemFailures で部分成功(セクション5) |
| コンシューマ停止で変更喪失 | 保持は24時間のみ | IteratorAge 監視。長期保持が要るなら Kinesis(最大365日) |
| シャードあたり3つ目の読み手で詰まる | 1シャード最大2プロセス | Lambdaは同一ストリームに最大2関数。超えると read throttling |
| TTL削除を「変更」として誤伝播 | TTL削除も REMOVE で流れる | userIdentity でサービス削除を判別(下記) |
最後のTTLの扱いは見落としやすいので明示します。TTL(期限切れ自動削除)はStreamに REMOVE イベントとして流れ、通常の削除と区別するための userIdentity フィールドが付きます。
{
"userIdentity": {
"type": "Service",
"principalId": "dynamodb.amazonaws.com"
}
}
「ユーザー操作による削除」と「TTLによる失効」を下流で別扱いしたい(例:失効はアーカイブ、手動削除は監査ログ)なら、この userIdentity で分岐するか、ESMの FilterCriteria(セクション3)でTTL削除だけ/TTL削除以外だけを関数に渡します。
可観測性:IteratorAge を主役にする
ストリーム処理で最重要のメトリクスは IteratorAge(コンシューマが読んでいるレコードの古さ)です。これが上昇し続ける=消化が生成に追いついていない=放置すれば24時間の保持を超えてデータを失う、という危険信号です。最低限の監視:
| メトリクス | 何を示すか | アクション |
|---|---|---|
IteratorAge(Lambda) | 処理の遅延(ラグ) | 上昇トレンドで即アラート。ParallelizationFactor 増・処理軽量化 |
Errors / Throttles(Lambda) | 関数失敗・スロットリング | リトライ滞留・並行数上限の検知 |
DLQの ApproximateNumberOfMessagesVisible | 破棄された失敗バッチ | 1件でも来たら調査(毒メッセージ) |
ReadThrottleEvents(Streams/テーブル) | 読み手過多・容量不足 | シャードあたり2プロセス超を疑う |
私の決済基盤では、IteratorAge の上昇とDLQへの到達にCloudWatchアラームを張り、Slackへ即時通知する構成にしました。「処理が遅れ始めた瞬間」と「1件でも捨てた瞬間」を逃さないことが、24時間という保持の壁の内側で確実にCDCを消化する鍵です。可観測性の土台はキャパシティ設計と地続きなので、キャパシティ・コスト・性能設計ガイドも併せて参照してください。
FAQ
Q1. DynamoDB Streamsの保持期間は?リプレイできますか? 保持は24時間固定です。24時間より古いレコードはいつトリミングされてもおかしくありません。Streamsは長期のリプレイ用ログではないので、コンシューマが24時間以上止まればその間の変更は失われます。より長期の保持・リプレイが要るならKinesis Data Streams(既定24時間、最大365日)を使ってください。
Q2. ストリームの順序は保証されますか? 保証されるのは同一アイテム(同一主キー)への変更の順序だけです。公式は「各アイテムの変更は実際の順序どおりに現れる」とし、「各レコードはストリームに正確に1回現れる」と述べていますが、異なるアイテム間やテーブル全体の全順序は保証されません。全体順序に依存する設計は避け、必要なら同一PKに寄せます。
Q3. StreamsとKinesis Data Streams、どちらを選ぶべき?
標準的なイベント駆動・低レイテンシ・少数コンシューマ(最大2関数/シャード)ならDynamoDB Streams + Lambda。長期保持・多数のファンアウト・大規模分析が要るならKinesis。ただしKinesisは順不同になり得る・同一通知が複数回出得るため、ApproximateCreationDateTime での順序判定と冪等化が前提です。
Q4. 重複は来ますか?冪等性はどう担保しますか?
来ます。Lambdaのイベントソースマッピングは公式に**「少なくとも1回(at-least-once)」**で、重複処理が起こり得ます。AWSも関数コードを冪等にするよう強く推奨しています。各レコードの一意な eventID(または SequenceNumber)を冪等キーにし、attribute_not_exists の条件付き書き込みで二重処理を弾くのが定石です。
Q5. 1件のエラーでシャード全体が止まるのを防ぐには?
BisectBatchOnFunctionError(失敗バッチを二分割して不正レコードを隔離)、有限の MaximumRetryAttempts・MaximumRecordAgeInSeconds、on-failure destination(DLQ)、そして ReportBatchItemFailures(失敗レコードのシーケンス番号だけ返す部分成功)を組み合わせます。既定の無限リトライのままだと、毒メッセージがシャードを最大1日ブロックします。
Q6. TTLで削除されたアイテムもStreamに流れますか?
流れます。TTL削除は REMOVE イベントとしてストリームに現れ、userIdentity.type = "Service"・userIdentity.principalId = "dynamodb.amazonaws.com" が付きます。これでユーザー操作による削除と区別でき、ESMの FilterCriteria でTTL削除だけ/以外だけを関数に渡すこともできます。
おわりに:悲観的な前提が、楽観的に動くシステムを作る
DynamoDB StreamsによるCDCは、強力ですが無条件に安全ではありません。安全になるのは、「順不同・少なくとも1回・失敗する」を設計の出発点に置き、冪等なコンシューマと部分バッチ失敗と毒メッセージ隔離で固めたときだけです。
- 保証と限界を正確に知る:24時間保持・アイテム単位の順序・ストリーム内exactly-once。だが配信はat-least-once。
- 冪等を最後の砦にする:
eventID+条件付き書き込み。重複は消せない、結果を変えないようにする。 - シャードを止めない:bisect+有限リトライ+有限record age+DLQ+部分バッチ失敗。
- 配送路を要件で選ぶ:迷ったらStreams+Lambda、保持/ファンアウトで詰まったらKinesis、グルー削減ならEventBridge Pipes。
IteratorAgeで先回りする:24時間の壁の内側で確実に消化する。
私は、一人 × 生成AI(Claude Code)という体制で、設計判断は人間の検証ゲートを通す進め方を徹底し、本番二重課金0件のサーバーレス決済基盤でこのCDC伝播設計を実装・運用してきました。DynamoDB Streamsを軸にしたイベント駆動の信頼性設計(冪等なコンシューマ、マテビュー/検索同期、Outbox連携、毒メッセージ隔離、可観測性)について、設計レビューから実装まで伴走できます。
サーバーレス/イベント駆動の信頼性設計でお困りの方は、お問い合わせからご相談ください。 まずは現状の変更伝播パスとリスク箇所(順序依存・重複耐性・毒メッセージ)を洗い出すところからご一緒します。