# GuardDuty の finding を EventBridge で自動インシデント対応（SOAR）にする：本番設計の全体像を Terraform / Step Functions / Python で

> GuardDuty の finding を EventBridge→Step Functions で自動インシデント対応（SOAR）に変える本番設計ガイド。重大度の数値マッチ・通知は広く封じ込めは許可リストで狭く・破壊は人間承認、EC2/IAM/S3/EKS プレイブックと冪等性・DLQ・最小権限を実コードで解説します。

- 公開日: 2026-06-27
- 著者: 友田 陽大
- タグ: セキュリティ, AWS, GuardDuty, EventBridge, インシデント対応
- URL: https://tomodahinata.com/blog/aws-guardduty-eventbridge-automated-remediation-incident-response-guide
- カテゴリ: Amazon GuardDuty 本番運用
- 総合ガイド: https://tomodahinata.com/blog/aws-guardduty-threat-detection-multi-account-terraform-eventbridge-guide

## 要点

- GuardDuty の finding は全件 EventBridge に流れる（source=aws.guardduty / detail-type=GuardDuty Finding）。detail.severity は数値なので {"numeric":[">=",7]} で『High 以上だけ』を機械的に拾える
- 遅延の罠：新規 finding は約5分で届くが、再発（更新）の配信頻度はデフォルト6時間。自動対応を組むなら finding_publishing_frequency を FIFTEEN_MINUTES にする
- 対応設計の鉄則は『通知は広く・封じ込めは許可リストで狭く・破壊的操作は人間承認を挟む』。EventBridge は at-least-once なので、対応は finding id を冪等キーにして取り消し可能な封じ込めから始める
- オーケストレーションは Step Functions：enrich → decide → contain → notify → ticket。破壊的ステップは task token で人間承認を待つ。EC2/IAM/S3/EKS はリソース種別ごとのプレイブックに分ける
- 本番品質は配管で決まる。DLQ＋リトライ・最小権限の責任者ロール・構造化ログ（機密値は出さない）・create-sample-findings での経路検証・Security Hub カスタムアクションで人手起動の窓口も用意する

---

「GuardDuty で検知はできました。で、検知した**あと**は誰が何をするんですか？」——これが、脅威検知の次に必ず来る、そして多くの組織で答えに詰まる問いです。

[GuardDuty で脅威検知を本番設計する](/blog/aws-guardduty-threat-detection-multi-account-terraform-eventbridge-guide)と、ダッシュボードには finding が並びます。けれど **finding は「赤いカード」であって「対応」ではありません**。深夜2時に `UnauthorizedAccess:EC2/MaliciousIPCaller.Custom` が出ても、それを人間が気づき、コンソールを開き、インスタンスを特定し、隔離 SG を作って付け替える——この間に攻撃者はデータを抜き、横展開します。**MTTR（平均対応時間）を縮める唯一の道は、検知と対応の間に「配管」を通すこと**です。

この記事は、その配管——**GuardDuty の finding を Amazon EventBridge で受け、自動インシデント対応（いわゆる SOAR: Security Orchestration, Automation and Response）に変える**ための本番設計ガイドです。ピラー記事が EventBridge の入口（severity マッチ＋単一 Lambda での隔離）までを扱ったのに対し、本記事はその**先**——**Step Functions による多段オーケストレーション、リソース種別ごとのプレイブック、人間承認ゲート、冪等性、DLQ、最小権限の責任者ロール**——を、Terraform / ASL / Python の実コードで通します。題材として、私がマルチアカウント AWS 上の[サーバーレス決済プラットフォーム](/case-studies/payment-platform-reliability)で IAM・可観測性・DR を横断実装し、**実際の金銭を扱う基盤で「二重課金0件」を冪等性で担保した**経験——その「at-least-once な世界で副作用を1回に保つ」発想は、自動対応の設計とまったく同じです——も交えます。

> **この記事のルール**：仕様・イベント構造・重大度の数値帯・配信頻度は **AWS 公式ドキュメント（2026年6月時点）** に基づきます。finding の種類・推奨対応・API は改定されるため、本番投入前に必ず公式の最新情報を確認してください。そして自動対応を組む上での鉄則——**GuardDuty は検知であって防御ではなく、自動対応は多層防御の一部にすぎません**。自動化は **①冪等（at-least-once で2回来ても1回分）・②スコープを絞る（型の許可リスト）・③取り消し可能・④破壊的操作は人間承認を挟む**——この4条件を満たすものから始めてください。誤検知一発で本番を落とすのが、自動対応最大のリスクです。

---

## 0. メンタルモデル：自動対応は「決める配管」であって「殴る自動化」ではない

設計を始める前に、一行で固定します。

> **自動インシデント対応 ＝ finding（構造化された脅威の通知）を入力に、文脈を補強（enrich）し、対応レベルを判断（decide）し、安全な封じ込め（contain）と通知（notify）とチケット化（ticket）を、冪等かつ取り消し可能に実行する「決める配管」。破壊的な手当ては人間の承認を挟む。**

ここから、設計のすべてを貫く3つの帰結が出ます。

1. **「全部に殴る」は事故。** 自動対応で最もやってはいけないのは、すべての finding にインスタンス終了や鍵削除を自動で打つことです。GuardDuty にも誤検知はあります（5章で抑える方法を扱います）。誤検知一発で本番インスタンスを落とせば、攻撃より自分の自動化のほうが高頻度で障害を起こします。だから設計原則は **「通知は広く・封じ込めは狭く・破壊は人間を挟む」**。
2. **EventBridge は at-least-once。だから冪等が前提。** EventBridge は同じ finding で対応をもう一度起動し得ます。決済基盤で二重課金を防ぐのと同じで、**対応も「2回来ても副作用は1回分」**でなければなりません。冪等キーは finding の `id`（＋ `updatedAt`）。これが本記事の背骨です。
3. **検知の速さと対応の速さは別。** GuardDuty は新規 finding を約5分で届けますが、**再発（更新）の配信はデフォルト6時間**遅れます。自動対応の反応速度を上げるには `finding_publishing_frequency = FIFTEEN_MINUTES` を設定する——これを知らないと「自動化したのに対応が遅い」罠にはまります（3章で詳述）。

この3点を押さえると、自動対応の設計は **「①どの finding を・どこに流すか（EventBridge ルーティング）→ ②どう判断し・どう手当てするか（Step Functions ＋ プレイブック）→ ③どう壊れずに回すか（冪等・DLQ・最小権限・可観測性）」** の3つに分解できると分かります。順に作ります。

---

## 1. 全体アーキテクチャ：5つの動詞（enrich → decide → contain → notify → ticket）

先に完成形の地図を出します。GuardDuty から右に向かって、イベントが流れます。

```text
GuardDuty finding
   │  (source=aws.guardduty / detail-type="GuardDuty Finding" / detail.severity=数値)
   ▼
EventBridge ルール（severity の数値マッチ + 型ルーティング + アーカイブ除外）
   ├─▶ [常に] SNS → Slack / メール       … 通知は広く（人間トリアージを速く）
   └─▶ Step Functions ステートマシン       … 対応は厳選（決める配管）
          1. ENRICH   finding を補強（タグ・所有者・関連リソース・既知IoC）
          2. DECIDE   重大度 × 型 × 許可リストで「通知のみ / 封じ込め / 要承認」を決定
          3. CONTAIN  リソース種別ごとのプレイブック Lambda（冪等・取り消し可能）
          4. APPROVE  破壊的操作だけ task token で人間承認を待つ（Slack/メール）
          5. NOTIFY+TICKET  対応結果を通知し、Jira/ServiceNow にチケット起票
          └─ 失敗時 → DLQ（SQS）＋ リトライ。取りこぼさない
```

人手起動の窓口も用意します。**Security Hub のカスタムアクション**を使うと、アナリストがコンソールで finding を選んで「Quarantine」ボタンを押す＝EventBridge イベントが飛び、同じ Step Functions を起動できます（8章）。**「自動で起きる対応」と「人が引き金を引く対応」を同じ配管に集約**するのが、運用をシンプルに保つコツです。

各構成要素と、その選定理由を表にします。

| 役割 | サービス | なぜこれか |
| --- | --- | --- |
| 検知の発生源 | GuardDuty | 全 finding が自動で EventBridge に流れる |
| ルーティング | EventBridge ルール | severity の**数値マッチ**・型ベース分岐・アーカイブ除外をサーバーレスで |
| 広い通知 | SNS（→ Slack/メール） | 配信先を後から足せる。人間トリアージの起点 |
| オーケストレーション | **Step Functions** | enrich→decide→contain の**多段・条件分岐・人間承認待ち**を可視化された状態機械で |
| 個別の手当て | Lambda（種別ごと） | EC2/IAM/S3/EKS の**プレイブックを SRP で分離**。最小権限を種別ごとに絞れる |
| 信頼性 | SQS（DLQ）＋ retry_policy | at-least-once ＋ 一過性障害に対し**取りこぼさない** |
| 人手起動 | Security Hub カスタムアクション | アナリストがコンソールから同じ配管を起動 |

> **なぜ単一の巨大 Lambda にしないか（SRP）**：「1つの Lambda で全部やる」は、IAM ロールが `ec2:*` も `iam:*` も `s3:*` も持つ巨大権限になり、最小権限を破壊します。**判断（decide）と手当て（contain）を分け、手当てをリソース種別ごとに割る**と、各 Lambda の権限を「EC2 だけ」「IAM だけ」に絞れます。万一その Lambda が侵害されても被害を局所化できる——これは最小権限の典型的な配当です。

---

## 2. EventBridge ルーティング設計：severity 数値マッチ・型分岐・アーカイブ除外

すべての GuardDuty finding は、ほぼリアルタイムで EventBridge に届きます。届くイベントの形はこうです（公式の event schema）。

```json
{
  "version": "0",
  "id": "cd2d702e-ab31-411b-9344-793ce56b1bc7",
  "detail-type": "GuardDuty Finding",
  "source": "aws.guardduty",
  "account": "111122223333",
  "region": "ap-northeast-1",
  "time": "2026-06-27T02:00:00Z",
  "detail": {
    "id": "1ab23c...",
    "type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
    "severity": 7.5,
    "accountId": "111122223333",
    "region": "ap-northeast-1",
    "updatedAt": "2026-06-27T02:00:00.000Z",
    "service": { "archived": false },
    "resource": { "resourceType": "Instance", "instanceDetails": { "instanceId": "i-0abc..." } }
  }
}
```

設計上の急所は3つです。

### 2.1 severity は「数値」——だから閾値で機械的に切れる

`detail.severity` は文字列ではなく**数値**です。EventBridge の**数値マッチ**で「High 以上（≥ 7）だけ Step Functions へ」を1行で表現できます。重大度の数値帯（公式）はこうです。

| 重大度 | 数値帯 | 自動対応での扱い |
| --- | --- | --- |
| **Critical** | **9.0 – 10.0** | 攻撃シーケンス進行中の可能性。最優先で封じ込め＋即エスカレーション |
| **High** | **7.0 – 8.9** | リソースが侵害され不正利用が進行中。封じ込めを検討（許可リスト次第） |
| **Medium** | **4.0 – 6.9** | 逸脱した不審挙動。**通知して人間が調査**（自動封じ込めはしない） |
| **Low** | **1.0 – 3.9** | 侵害に至らない試行（ポートスキャン等）。記録のみ |

> **`AttackSequence:*` は必ず Critical**：[Extended Threat Detection](/blog/aws-guardduty-extended-threat-detection-attack-sequence-findings-guide) が弱いシグナルを24時間窓で相関して束ねる「攻撃シーケンス」finding は、その性質上すべて **Critical（9.0〜10.0）** に分類されます。`AttackSequence:S3/...`、`AttackSequence:EKS/CompromisedCluster` など。**自動対応では `AttackSequence:*` を最優先のトリガー**に据えるのが定石です（文脈が線で来ているので、最も対応価値が高い）。

### 2.2 アーカイブされた finding は流さない

公式の挙動として、**Suppression Rule で自動アーカイブされた finding は EventBridge に送られません**（手動アーカイブのものは、アーカイブ後の発生分が頻度に従って送られます）。とはいえ、ノイズを二重に弾く意味で、ルール側でも `detail.service.archived` が `false` のものだけ拾うフィルタを足しておくと堅牢です。

### 2.3 ルールを Terraform で：数値マッチ ＋ 型ルーティング ＋ DLQ ＋ リトライ

「High 以上（≥7）かつ未アーカイブ」を Step Functions に流し、同時に SNS で広く通知する配線です。

```hcl
# ── High 以上(severity >= 7) かつ未アーカイブの GuardDuty finding を捕捉 ──
resource "aws_cloudwatch_event_rule" "gd_high" {
  name        = "guardduty-high-to-soar"
  description = "Route GuardDuty findings (severity >= 7, not archived) to the SOAR pipeline"
  event_pattern = jsonencode({
    source        = ["aws.guardduty"]
    "detail-type" = ["GuardDuty Finding"]
    detail = {
      severity = [{ numeric = [">=", 7] }] # 数値マッチ。High(7.0-8.9) と Critical(9.0-10.0)
      service  = { archived = [false] }    # 抑制で自動アーカイブされたものは元々来ないが、二重に弾く
    }
  })
}

# ① 人間向け：必ず SNS（→ Slack/メール）へ。通知は広く。
resource "aws_cloudwatch_event_target" "to_sns" {
  rule      = aws_cloudwatch_event_rule.gd_high.name
  target_id = "notify-sns"
  arn       = aws_sns_topic.security_alerts.arn
}

# ② 機械向け：オーケストレーションする Step Functions ステートマシンへ。
resource "aws_cloudwatch_event_target" "to_sfn" {
  rule      = aws_cloudwatch_event_rule.gd_high.name
  target_id = "soar-state-machine"
  arn       = aws_sfn_state_machine.responder.arn
  role_arn  = aws_iam_role.eventbridge_invoke_sfn.arn # EventBridge が SFN を起動するためのロール

  # 一過性の失敗に備えてリトライ＆DLQ（取りこぼさない）。
  retry_policy {
    maximum_event_age_in_seconds = 3600 # 最大1時間まで再試行（古すぎる対応は意味がない）
    maximum_retry_attempts       = 4
  }
  dead_letter_config { arn = aws_sqs_queue.soar_dlq.arn }
}

# 配送に失敗したイベントを溜める DLQ。ここが空でないことを必ずアラート対象にする。
resource "aws_sqs_queue" "soar_dlq" {
  name                      = "soar-eventbridge-dlq"
  message_retention_seconds = 1209600 # 14日。調査の猶予を最大に取る
}
```

> **DLQ は「作って終わり」ではない**：DLQ にメッセージが溜まる＝**自動対応が起動できなかった finding が存在する**ということ。これは検知の取りこぼしと同義で、最も危険な沈黙です。`ApproximateNumberOfMessagesVisible > 0` を CloudWatch アラームにして、DLQ が空でないこと自体を即通知してください。信頼性の設計では「失敗を握りつぶさない」ことが本質です。

---

## 3. 遅延の罠：新規は約5分・更新はデフォルト6時間

自動対応を組むなら、これは**必読**の落とし穴です。EventBridge への配信頻度は、finding の種類で挙動が違います（公式）。

- **新規 finding（ユニークな finding ID の初回生成）**：**ほぼリアルタイム（約5分）**で配信。この頻度はデフォルトでは変更できません。
- **既存 finding の再発（subsequent occurrences）**：同一タイプの再発を一定間隔で1イベントに集約して配信。この間隔が `finding_publishing_frequency` で、**`FIFTEEN_MINUTES` / `ONE_HOUR` / `SIX_HOURS`（デフォルト）** から選べます。**デフォルトは6時間**です。

つまり——**「最初の検知は速いが、続報は最大6時間遅れる」**。攻撃が継続している finding の続報を6時間待っていては、自動対応の意味が薄れます。だから自動対応を組むなら **`FIFTEEN_MINUTES` に設定**します。

```hcl
# detector に対し、再発 finding の EventBridge 配信頻度を 15 分に。
# 注意: この頻度を変えられるのは（委任）管理者アカウントのみ。
#       メンバーアカウントは自分用に変更できず、管理者の設定が全メンバーに波及する。
resource "aws_guardduty_detector" "this" {
  enable                       = true
  finding_publishing_frequency = "FIFTEEN_MINUTES" # 自動対応の反応速度を上げる
}
```

> **マルチアカウントでの含意**：[委任管理者](/blog/aws-guardduty-multi-account-organizations-delegated-administrator-terraform-guide)アカウントでこの頻度を設定すると、**全メンバーアカウントに同じ頻度が適用**されます。メンバー側で個別に変えることはできません。全社の自動対応速度を、委任管理者の1設定で統制できる——これは利点でもあり、変更時の影響範囲が広いという注意点でもあります。

---

## 4. 対応設計の原則：通知は広く・封じ込めは許可リストで狭く・破壊は人間承認

ここが SOAR 設計の心臓です。3層に分けて、**自動化の攻撃性を「リスクに比例」させます**。

| 層 | 何を | どの finding に | 取り消し可能性 |
| --- | --- | --- | --- |
| **NOTIFY（通知）** | SNS → Slack/メールに enrich して通知 | **High 以上すべて**（広く） | — |
| **CONTAIN（封じ込め）** | 隔離 SG 付け替え・public access ブロック等 | **型の許可リスト ∩ High 以上**（狭く） | **取り消せる操作だけ** |
| **DESTROY（破壊的）** | 鍵の無効化・インスタンス終了 | 同上 ＋ **人間承認後** | 戻しにくい → だから人間を挟む |

設計の言語化：

- **通知（notify）は広く**：トリアージは人間が速くできるよう、High 以上は全部通知に流す。通知はコストが低く、取りこぼしのリスクが高いので「広く」が正解。
- **封じ込め（contain）は狭く・許可リストで**：自動で手を出すのは、**誤検知が少なく・操作が取り消せる**ものに限定します。「型の許可リスト（allowlist）」に載った finding 型だけが自動封じ込めの対象。許可リストにない型は、たとえ High でも通知＋チケット止まり。
- **破壊的操作（destroy）は人間承認を挟む**：鍵の無効化やインスタンス終了は、後で正規利用と判明したときに戻しにくい。だから**自動では「承認待ち」にして、人間がボタンを押すまで実行しない**（Step Functions の task token、6.3節）。

> **冪等性が前提条件**：上のどの操作も、EventBridge の at-least-once 配信で**2回起動され得ます**。封じ込めは「2回隔離されても状態は1回と同じ」でなければなりません。冪等キーは finding の **`id`**。さらに「同じ finding の新しい `updatedAt` か？」を見れば、古い再送を弾けます。これは決済基盤で**同じ決済イベントを2回受けても課金は1回**にする発想——`id` を冪等キーに、処理済みを記録して2回目を no-op にする——とまったく同じです。

---

## 5. リソース種別ごとのプレイブック：EC2 / IAM / S3 / EKS

封じ込めの中身は、**リソース種別で全く違います**。finding の `detail.resource.resourceType`（`Instance` / `AccessKey` / `S3Bucket` / `Kubernetes` ...）で分岐し、種別ごとのプレイブックに渡します。各プレイブックは公式の「Remediating findings」に準拠します。

### 5.1 EC2（`Instance`）→ 隔離 SG ＋ IAM ロールの遮断

公式の EC2 remediation は明快です：**専用の Isolation セキュリティグループを作り**（インバウンド・アウトバウンドとも `0.0.0.0/0 (0-65535)` を許可しない）、それをインスタンスに**関連付け**、**それ以外の SG をすべて外す**。自動対応はこれをコード化します。

- 隔離 SG（egress も絞る）を**事前に作っておき**、検知時は付け替えるだけにする（実行時に SG を作らない＝速い・冪等）。
- **既存の追跡済みコネクションは SG 変更では切れない**（公式の注記）。将来トラフィックだけがブロックされる点に注意。即時遮断が要るなら NACL も併用。
- インスタンスに**強い IAM ロールが付いている**なら、認証情報の持ち出し（`InstanceCredentialExfiltration`）に備え、ロールを「拒否のみ」のポリシーに差し替える／セッションを失効させることも検討（破壊的寄り＝承認ゲート向き）。
- **終了（terminate）は自動でしない**——調査用にスナップショットを取り、終了は人間判断に回す。

### 5.2 IAM（`AccessKey`）→ 無効化・ローテーションは承認後

公式の credentials remediation は「関与した IAM エンティティと API を特定 → 権限を確認 → 正規利用か確認 → 漏洩なら鍵をローテーション」。鍵の無効化は**戻しにくい破壊的操作**なので、自動対応では：

- **`AKIA`（長期キー）と `ASIA`（STS 一時キー）を区別**する。`ASIA` は短命で、コンソールから無効化できない（対処は権限剥奪・セッション失効側）。
- 長期キーの**無効化（`UpdateAccessKey` → `Inactive`）は人間承認後**に実行。削除（`DeleteAccessKey`）はさらに慎重に。
- 並行して、流出疑いのプリンシパルに**広範な明示 Deny を一時アタッチ**して被害を止める手もある（取り消せるので承認のハードルは下げられる）。

### 5.3 S3（`S3Bucket`）→ public access ブロック・ポリシー見直し

公式の S3 remediation は「関与したバケット・呼び出し元・API を特定 → アクセスが正規か判断 → Block Public Access 等で締める」。`ANONYMOUS_PRINCIPAL` が出たら**バケットが公開されている**サイン。

- **S3 Block Public Access を有効化**（アカウント／バケット単位の4設定）。これは比較的安全で取り消し可能なので、許可リストに載せやすい。
- 過度に緩いバケットポリシー・ACL を検出したら**通知＋チケット**（自動でポリシーを書き換えるのは誤対応リスクが高い）。
- 機密データの有無は **Macie** で評価——「何が漏れたか」をチケットに添える。

### 5.4 EKS（`Kubernetes`）→ pod の cordon / 隔離

EKS の侵害（侵害された pod・特権サービスアカウントトークン悪用）に対する封じ込めは Kubernetes 層で行います。

- 対象ノードを **cordon**（新規スケジューリング停止）し、侵害 pod に**ネットワークポリシーで隔離**（egress 遮断）を適用。
- 侵害が疑われるサービスアカウントの**トークンをローテーション**、関連する IRSA ロールを締める。
- これらは `kubectl` 相当の API 操作になるため、責任者 Lambda に**クラスタへの最小権限の RBAC**を与える設計が要る（種別ごとにロールを割る理由がここにも出る）。

> **共通原則**：どのプレイブックも「**まず取り消せる封じ込め（隔離・public access ブロック・cordon）→ 破壊的な手当て（鍵削除・終了）は承認後**」の順。封じ込めで時間を稼ぎ、破壊は人間が決める——これが「速さ」と「安全」を両立させる構造です。

---

## 6. オーケストレーション：Step Functions（ASL）＋ enrich/route Lambda（Python）

判断と手当ての多段フローは、単一 Lambda の `if` 地獄ではなく **Step Functions** で組みます。理由は明快——**状態が可視化され、人間承認の「待ち」を持て、各ステップを個別にリトライ／DLQ でき、失敗箇所が一目で分かる**から。

### 6.1 ステートマシン（ASL スケッチ）

```json
{
  "Comment": "GuardDuty finding -> automated incident response (SOAR)",
  "StartAt": "Enrich",
  "States": {
    "Enrich": {
      "Comment": "finding を補強し、対応レベル(notify/contain/approve)を決定する",
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${EnrichRouterFunctionArn}",
        "Payload.$": "$"
      },
      "ResultSelector": { "decision.$": "$.Payload.decision", "finding.$": "$.Payload.finding" },
      "Retry": [{ "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 }],
      "Next": "RouteByDecision"
    },
    "RouteByDecision": {
      "Comment": "decision に応じて分岐。notify-only はそのまま通知へ",
      "Type": "Choice",
      "Choices": [
        { "Variable": "$.decision.mode", "StringEquals": "contain", "Next": "RouteByResourceType" },
        { "Variable": "$.decision.mode", "StringEquals": "approve", "Next": "WaitForHumanApproval" }
      ],
      "Default": "NotifyAndTicket"
    },
    "RouteByResourceType": {
      "Comment": "リソース種別ごとのプレイブックへ。封じ込めは冪等・取り消し可能なものだけ",
      "Type": "Choice",
      "Choices": [
        { "Variable": "$.decision.resourceType", "StringEquals": "Instance",   "Next": "ContainEC2" },
        { "Variable": "$.decision.resourceType", "StringEquals": "S3Bucket",   "Next": "ContainS3" },
        { "Variable": "$.decision.resourceType", "StringEquals": "Kubernetes", "Next": "ContainEKS" }
      ],
      "Default": "NotifyAndTicket"
    },
    "ContainEC2": { "Type": "Task", "Resource": "${ContainEc2FunctionArn}", "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
    "ContainS3":  { "Type": "Task", "Resource": "${ContainS3FunctionArn}",  "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
    "ContainEKS": { "Type": "Task", "Resource": "${ContainEksFunctionArn}", "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },

    "WaitForHumanApproval": {
      "Comment": "破壊的操作(鍵無効化・終了)は task token で人間の承認を待つ。承認まで実行しない",
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "${RequestApprovalFunctionArn}",
        "Payload": { "finding.$": "$.finding", "taskToken.$": "$$.Task.Token" }
      },
      "TimeoutSeconds": 3600,
      "Next": "ExecuteDestructive",
      "Catch": [{ "ErrorEquals": ["States.Timeout"], "Next": "NotifyAndTicket" }]
    },
    "ExecuteDestructive": {
      "Comment": "承認済みのときだけ破壊的アクションを実行（鍵無効化等）",
      "Type": "Task",
      "Resource": "${ExecuteDestructiveFunctionArn}",
      "Next": "NotifyAndTicket"
    },

    "NotifyAndTicket": {
      "Comment": "結果を通知し、チケットを起票して終了（常にここを通る）",
      "Type": "Task",
      "Resource": "${NotifyTicketFunctionArn}",
      "End": true
    }
  }
}
```

設計判断のポイント：

- **`Catch` で必ず `NotifyAndTicket` に合流**——封じ込めが失敗しても、人間が気づける通知とチケットは必ず出す（沈黙を作らない）。
- **`waitForTaskToken`** で人間承認を「待つ」——Step Functions は task token を返すまで課金されずに待機できる。承認 Lambda が Slack/メールに承認リンクを出し、人間が押すと `SendTaskSuccess` で再開（破壊的操作だけにこのゲートを使う）。
- **`Retry` は冪等な Enrich にだけ厚く**——封じ込めタスクは Lambda 内部で冪等にする（後述）ので、ステートマシン側の二重起動も安全。

### 6.2 enrich + route ディスパッチャ（Python・冪等）

ピラー記事の「単一 Lambda で隔離」とは別物の、**判断専用**ディスパッチャです。**副作用を持たず**、finding を補強して `decision`（mode と resourceType）を返すだけ。テストしやすく、最小権限（読み取り＋ DynamoDB のみ）で済みます。

```python
"""GuardDuty finding を補強し、対応レベルを決定する enrich/route ディスパッチャ。

設計原則:
  - 副作用なし: 判断だけを返す純粋寄りの Lambda。封じ込めは別 Lambda に委譲。
  - 冪等: finding.id を冪等キーに、updatedAt で『古い再送』を弾く（DynamoDB 条件付き書き込み）。
  - スコープを絞る: 自動封じ込めは CONTAIN_ALLOWLIST に載った型 + High 以上のみ。
  - 破壊的は承認待ちへ: DESTRUCTIVE_ALLOWLIST は 'approve' を返し、自動実行しない。
  - 可観測: 構造化ログ。機密値(認証情報・PII)は出さない。
"""
from __future__ import annotations

import json
import logging
import os
import time
from typing import Any, Final, TypedDict

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ddb = boto3.client("dynamodb")
IDEMPOTENCY_TABLE: Final[str] = os.environ["IDEMPOTENCY_TABLE"]
IDEMPOTENCY_TTL_SECONDS: Final[int] = 7 * 24 * 3600  # 7日で自動失効（TTL 属性で削除）

# 自動封じ込め（冪等・取り消し可能）を許す finding 型。
CONTAIN_ALLOWLIST: Final[frozenset[str]] = frozenset({
    "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
    "CryptoCurrency:EC2/BitcoinTool.B!DNS",
    "Backdoor:EC2/C&CActivity.B!DNS",
    "Policy:S3/BucketPublicAccessGranted",
    "Policy:S3/BucketAnonymousAccessGranted",
})
# 破壊的操作（鍵無効化等）= 人間承認を挟む型。自動実行しない。
DESTRUCTIVE_ALLOWLIST: Final[frozenset[str]] = frozenset({
    "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS",
    "CredentialAccess:IAMUser/AnomalousBehavior",
})


class Decision(TypedDict):
    mode: str          # "notify" | "contain" | "approve"
    resourceType: str  # "Instance" | "S3Bucket" | "Kubernetes" | "AccessKey" | ...
    reason: str


def handler(event: dict[str, Any], _context: object) -> dict[str, Any]:
    detail = event["detail"]
    finding_id: str = detail["id"]
    updated_at: str = detail.get("updatedAt", detail.get("createdAt", ""))
    finding_type: str = detail["type"]
    severity: float = float(detail["severity"])
    resource_type: str = detail.get("resource", {}).get("resourceType", "Unknown")

    log = {"finding_id": finding_id, "type": finding_type,
           "severity": severity, "resourceType": resource_type}

    # ── 冪等ガード ──
    # 同じ finding_id を、同じ(以前の)updatedAt で2回処理しない。
    # at-least-once 配信や Step Functions の再試行で再入しても副作用は1回分。
    if not _claim_finding(finding_id, updated_at):
        logger.info(json.dumps({**log, "decision": "duplicate-skip"}))
        return {"finding": _safe_finding(detail), "decision": Decision(
            mode="notify", resourceType=resource_type, reason="duplicate (already processed)")}

    # ── 判断 ──
    if finding_type in DESTRUCTIVE_ALLOWLIST and severity >= 7.0:
        decision = Decision(mode="approve", resourceType=resource_type,
                            reason="destructive remediation requires human approval")
    elif finding_type in CONTAIN_ALLOWLIST and severity >= 7.0:
        decision = Decision(mode="contain", resourceType=resource_type,
                            reason="allowlisted reversible containment")
    else:
        # 許可リスト外・Medium 以下・型不明 → 通知＋チケットのみ。
        decision = Decision(mode="notify", resourceType=resource_type,
                            reason="notify-only (not in allowlist or below threshold)")

    logger.info(json.dumps({**log, "decision": decision["mode"], "reason": decision["reason"]}))
    return {"finding": _safe_finding(detail), "decision": decision}


def _claim_finding(finding_id: str, updated_at: str) -> bool:
    """冪等トークンを確保。新規 or より新しい updatedAt のときだけ True を返す。

    条件付き書き込みで『未処理 or 受信した updatedAt がより新しい』場合のみ上書き。
    既に同じ/より新しい版を処理済みなら False（＝重複）。
    """
    now = int(time.time())
    try:
        ddb.put_item(
            TableName=IDEMPOTENCY_TABLE,
            Item={
                "finding_id": {"S": finding_id},
                "updated_at": {"S": updated_at},
                "ttl": {"N": str(now + IDEMPOTENCY_TTL_SECONDS)},
            },
            # 未登録、または登録済みでも今回の updated_at の方が新しいときだけ書く。
            ConditionExpression="attribute_not_exists(finding_id) OR updated_at < :u",
            ExpressionAttributeValues={":u": {"S": updated_at}},
        )
        return True
    except ClientError as exc:
        if exc.response["Error"]["Code"] == "ConditionalCheckFailedException":
            return False  # 同じ/古い再送 → スキップ
        raise


def _safe_finding(detail: dict[str, Any]) -> dict[str, Any]:
    """下流に渡す finding から、機密になり得るフィールドを落とす（ログ・通知に乗せない）。"""
    keep = ("id", "type", "severity", "accountId", "region", "title", "updatedAt")
    slim = {k: detail.get(k) for k in keep}
    res = detail.get("resource", {})
    slim["resourceType"] = res.get("resourceType")
    slim["instanceId"] = res.get("instanceDetails", {}).get("instanceId")
    slim["bucketName"] = (res.get("s3BucketDetails") or [{}])[0].get("name")
    return slim
```

このコードの設計を明示します。

- **冪等は DynamoDB の条件付き書き込みで担保**：`finding_id` をキーに「未処理 or より新しい `updatedAt`」のときだけ書く。2回目以降は `ConditionalCheckFailedException` で `duplicate-skip` になり、封じ込めは走りません。TTL でレコードは自動失効。これが**決済の「同じイベントを2回処理しても1回分」**と同型の防御です。
- **判断と手当ての分離（SRP）**：この Lambda は `ec2:*` も `iam:*` も要りません。権限は **DynamoDB への put と読み取りだけ**。手当ての強い権限は、種別ごとの封じ込め Lambda にだけ与えます。
- **機密を下流に流さない（可観測性 × セキュリティ）**：`_safe_finding` で必要なフィールドだけに絞り、通知・ログに認証情報や PII が混ざらないようにします。

### 6.3 封じ込め Lambda 側も冪等に

種別ごとの封じ込め Lambda（例：EC2）は、ピラー記事の隔離ロジックと同様に**それ自体も冪等**にします（隔離タグでガードし、2回目は `already-quarantined` で no-op）。**「ディスパッチャの冪等」と「封じ込めの冪等」を二重に持つ**ことで、どこで再送・再試行が起きても安全になります。これは多層の冪等防御です。

---

## 7. 最小権限・可観測性・テスト：本番品質は配管で決まる

### 7.1 責任者ロールは「種別ごとに・最小に」

自動対応の IAM は、攻撃者にとって魅力的な標的です（侵害すれば隔離も鍵削除もできてしまう）。だから**種別ごとに分割し、各 Lambda の権限を最小に**します。EC2 封じ込め Lambda の例：

```hcl
# EC2 封じ込め Lambda の実行ロール。EC2 の隔離に必要な action だけ。
data "aws_iam_policy_document" "contain_ec2" {
  statement {
    sid       = "DescribeForQuarantine"
    actions   = ["ec2:DescribeInstances"]
    resources = ["*"] # Describe は arn 単位で絞れないため condition で補う
  }
  statement {
    sid    = "QuarantineActions"
    actions = [
      "ec2:ModifyNetworkInterfaceAttribute", # ENI を隔離SGに付け替え
      "ec2:CreateTags",                       # 冪等ガード用の隔離タグ
    ]
    resources = ["*"]
    condition {
      test     = "StringEquals"
      variable = "aws:RequestedRegion"
      values   = [var.region] # 対象リージョンに限定
    }
  }
  # 重要: terminate / delete 系は与えない。破壊的操作は別ロール＋承認後に限る。
}
```

> **EventBridge → Step Functions のロールも最小に**：EventBridge が起動できるのは**その1本のステートマシンだけ**になるよう `resources` を絞ります。「EventBridge が任意の SFN を起動できる」ロールは過剰権限です。

### 7.2 可観測性：構造化ログ・コンソール深リンク・機密ゼロ

- **構造化ログ（JSON）**で `finding_id` / `type` / `severity` / `decision` / `action` を出し、後から「どの finding にどう対応したか」を機械的に追えるようにする。
- 通知には**コンソールへの深リンク**を入れて、人間が1クリックで finding にたどり着けるようにする（`https://console.aws.amazon.com/guardduty/home?region=<region>#/findings?search=id%3D<id>`）。
- **機密値（認証情報・PII・生のリソース詳細）はログにも通知にも出さない**（6.2 の `_safe_finding`）。可観測性とセキュリティは両立させる。

### 7.3 テスト：実際の攻撃を待たずに経路を検証する

「検証経路を先に作る」原則の出番です。GuardDuty は**サンプル finding を意図的に生成**できます。これで EventBridge → Step Functions → 封じ込めの経路を、本物の攻撃前に通します。

```bash
# 各 finding 型のサンプルを発火させ、SOAR パイプラインが想定通り動くか検証する。
aws guardduty create-sample-findings \
  --detector-id "$DETECTOR_ID" \
  --finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" \
                  "Policy:S3/BucketPublicAccessGranted" \
                  "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS"
```

検証の階層：

1. **ユニットテスト**：`handler` の判断ロジックを、サンプル finding の JSON を入力に網羅。`boto3` クライアントをスタブ化し、`contain` / `approve` / `notify` / `duplicate-skip` の分岐を全部叩く（副作用がないので速い）。
2. **冪等テスト**：同じ finding を2回入力して、2回目が `duplicate-skip` になることを確認。
3. **経路テスト（E2E）**：`create-sample-findings` で実イベントを流し、Step Functions の実行履歴で各ステートが期待どおり遷移したかを確認。**封じ込め Lambda は最初 `DRY_RUN=true`** にして「判断はするが手は出さない」状態で本番トラフィックに当て、意思決定だけを先に検証してから手当てを有効化する。

> **段階導入**：`DRY_RUN` モードは「自動化を本番に入れる」最大の不安——誤対応——を抑える安全弁です。まず数週間 DRY_RUN で「もし自動化していたら、どの finding にどう対応していたか」をログで観察し、誤対応がゼロだと確認してから手当てを ON にします。これは [WAF を Count から Block に上げる](/blog/waf-defense-in-depth-aws-waf-cloud-armor-owasp-guide)のと同じ「観測してから強制」の発想です。

---

## 8. 人手起動の窓口：Security Hub カスタムアクション

完全自動化が怖い対応（鍵削除・本番終了）や、自動許可リストに載せていない型に対しては、**アナリストがコンソールから引き金を引ける窓口**を用意します。**Security Hub のカスタムアクション**がそれです。

仕組み：Security Hub にカスタムアクション（例：`Quarantine`）を作ると、アナリストがコンソールで finding を選んでそのアクションを実行したとき、**EventBridge イベントが飛びます**。形はこうです（公式）。

```json
{
  "version": "0",
  "detail-type": "Security Hub Findings - Custom Action",
  "source": "aws.securityhub",
  "account": "111122223333",
  "region": "ap-northeast-1",
  "resources": [
    "arn:aws:securityhub:ap-northeast-1:111122223333:action/custom/quarantine"
  ],
  "detail": {
    "actionName": "quarantine",
    "actionDescription": "Quarantine the affected resource",
    "findings": [ { "...": "ASFF 形式の finding 1件" } ]
  }
}
```

このイベントを拾う EventBridge ルールは、**カスタムアクションの ARN（`resources`）でマッチ**させ、同じ Step Functions を起動します。

```hcl
# Security Hub の手動カスタムアクション（"Quarantine"）を拾い、同じ SOAR 配管へ。
resource "aws_cloudwatch_event_rule" "sh_custom_action" {
  name = "securityhub-quarantine-custom-action"
  event_pattern = jsonencode({
    source        = ["aws.securityhub"]
    "detail-type" = ["Security Hub Findings - Custom Action"]
    resources     = ["arn:aws:securityhub:${var.region}:${var.account_id}:action/custom/quarantine"]
  })
}

resource "aws_cloudwatch_event_target" "sh_to_sfn" {
  rule     = aws_cloudwatch_event_rule.sh_custom_action.name
  arn      = aws_sfn_state_machine.responder.arn
  role_arn = aws_iam_role.eventbridge_invoke_sfn.arn
}
```

> **設計の含意**：これで **「自動で起きる対応（GuardDuty → EventBridge）」と「人が起こす対応（Security Hub → EventBridge）」が、同じ Step Functions に集約**されます。配管が1本なので、ロジックの重複（DRY 違反）が起きず、テストも一箇所で済みます。GuardDuty の finding を Security Hub に集約しておけば、複数の検知ソース（Macie・Inspector・サードパーティ）に対しても**同じ「Quarantine ボタン」**を提供できます。

---

## 9. まとめ：GuardDuty 自動インシデント対応チートシート

迷ったときの早見表です。

- **何を作るのか**：finding を **enrich → decide → contain → notify → ticket** に変える「決める配管」。GuardDuty は検知、対応の価値は **MTTR** で決まる。
- **EventBridge ルーティング**：`source=aws.guardduty` / `detail-type="GuardDuty Finding"`。**`detail.severity` は数値** → `{"numeric":[">=",7]}` で High 以上を機械的に拾う。`AttackSequence:*` は必ず Critical＝最優先トリガー。抑制で自動アーカイブされた finding は EventBridge に来ない。
- **遅延の罠**：新規 finding は約5分、**再発はデフォルト6時間**。自動対応には `finding_publishing_frequency = FIFTEEN_MINUTES`（委任管理者が設定＝全メンバーに波及）。
- **対応設計の原則**：**通知は広く（High 以上全部）・封じ込めは型の許可リストで狭く（取り消せる操作だけ）・破壊的操作は人間承認を挟む**。
- **冪等性**：EventBridge は **at-least-once**。finding の **`id` を冪等キー**、`updatedAt` で古い再送を弾く（条件付き書き込み）。封じ込め Lambda 側も二重に冪等化。決済の「二重課金0件」と同型。
- **プレイブック（種別ごと）**：EC2 → 隔離 SG 付け替え（既存接続は SG では切れない）。IAM AccessKey → 無効化/失効は**承認後**（`AKIA`/`ASIA` を区別）。S3 → Block Public Access。EKS → cordon＋pod 隔離。
- **オーケストレーション**：**Step Functions**。`waitForTaskToken` で人間承認を待ち、`Catch` で必ず NotifyAndTicket に合流（沈黙を作らない）。
- **本番品質**：**DLQ が空でないことをアラート**、責任者ロールは**種別ごとに最小権限**、構造化ログ＋深リンク・機密ゼロ、**`create-sample-findings` で経路検証**、`DRY_RUN` で意思決定を先に本番検証。
- **人手起動**：**Security Hub カスタムアクション**（`detail-type="Security Hub Findings - Custom Action"`）で同じ配管をアナリストが起動。

GuardDuty を「赤いダッシュボード」で終わらせるか、「検知が即・安全な対応に変わる仕組み」にできるかは、**finding を冪等で・スコープを絞った・取り消せる自動対応に変える配管**を作れるかで決まります。最大のレバレッジは検知そのものより、その先の配管にあります。

私はマルチアカウントの[サーバーレス決済プラットフォーム](/case-studies/payment-platform-reliability)で、**実際の金銭・カーボンクレジット・地域通貨を扱う基盤の IAM・可観測性・DR を横断実装**し、「正しさ」を運用の注意深さではなく**コードの構造と冪等性**で担保してきました——at-least-once な世界で**同じ決済イベントを2回受けても課金は1回**にする、その発想はそのまま自動インシデント対応に転用できます。GuardDuty の自動対応も同じ思想で設計します——**①EventBridge で severity と型を機械的に切り分け、②Step Functions で「決める配管」を可視化し、③finding id を冪等キーに、許可リストで・取り消せる・承認ゲート付きの対応**にする。検知を行動に変える配管を、運用に載る形まで作り切ります。インシデント対応そのものの運用設計（[ランブック・オンコール・ポストモーテム](/blog/incident-response-runbook-postmortem-oncall-sre-guide)）と組み合わせれば、自動対応と人間の判断がかみ合った体制になります。

**「自社の GuardDuty にどこまで自動対応を任せ、どこに人間の承認ゲートを置き、どう冪等・最小権限・取り消し可能に設計するか」——EventBridge ルーティングから Step Functions のオーケストレーション、リソース種別ごとのプレイブック、テストと段階導入まで、一人 ×生成AI（Claude Code）で速く・安全に伴走できます。** 要件の整理段階からでも、お気軽にご相談ください。

---

### 参考（公式ドキュメント）

- [Processing GuardDuty findings with Amazon EventBridge](https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings_cloudwatch.html) — finding イベントの schema（source/detail-type/detail）・配信頻度（新規は near real-time、再発は 15分/1時間/6時間デフォルト）・severity 数値マッチ・抑制アーカイブと EventBridge の関係
- [Remediating detected GuardDuty security findings](https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_remediate.html) — リソース種別ごとの推奨対応（EC2/S3/AccessKey/ECS/EKS/RDS/Lambda）
- [Remediating a potentially compromised EC2 instance](https://docs.aws.amazon.com/guardduty/latest/ug/compromised-ec2.html) — Isolation SG（0.0.0.0/0 を許可しない）・他 SG を外す・既存接続は SG では切れない
- [Remediating potentially compromised AWS credentials](https://docs.aws.amazon.com/guardduty/latest/ug/compromised-creds.html) — `AKIA`/`ASIA` の区別・権限確認・正規利用の確認・ローテーション
- [Remediating a potentially compromised S3 bucket](https://docs.aws.amazon.com/guardduty/latest/ug/compromised-s3.html) — `ANONYMOUS_PRINCIPAL`・S3 Block Public Access・バケットポリシー見直し
- [Severity levels of GuardDuty findings](https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings-severity.html) — Critical 9.0–10.0 / High 7.0–8.9 / Medium 4.0–6.9 / Low 1.0–3.9 の定義
- [Using EventBridge for automated response and remediation (Security Hub)](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-cloudwatch-events.html) — Security Hub → EventBridge・自動/カスタムアクション・Step Functions 起動
- [EventBridge event formats for Security Hub CSPM](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-cwe-event-formats.html) — `Security Hub Findings - Custom Action` の event 形式（actionName / findings / custom action ARN）
