メインコンテンツへスキップ
友田 陽大
Amazon GuardDuty 本番運用
セキュリティ
AWS
GuardDuty
EventBridge
インシデント対応

GuardDuty の finding を EventBridge で自動インシデント対応(SOAR)にする:本番設計の全体像を Terraform / Step Functions / Python で

GuardDuty の finding を EventBridge→Step Functions で自動インシデント対応(SOAR)に変える本番設計ガイド。重大度の数値マッチ・通知は広く封じ込めは許可リストで狭く・破壊は人間承認、EC2/IAM/S3/EKS プレイブックと冪等性・DLQ・最小権限を実コードで解説します。

公開日
読了時間
29分
著者
友田 陽大
シェア
目次

「GuardDuty で検知はできました。で、検知したあとは誰が何をするんですか?」——これが、脅威検知の次に必ず来る、そして多くの組織で答えに詰まる問いです。

GuardDuty で脅威検知を本番設計すると、ダッシュボードには finding が並びます。けれど finding は「赤いカード」であって「対応」ではありません。深夜2時に UnauthorizedAccess:EC2/MaliciousIPCaller.Custom が出ても、それを人間が気づき、コンソールを開き、インスタンスを特定し、隔離 SG を作って付け替える——この間に攻撃者はデータを抜き、横展開します。MTTR(平均対応時間)を縮める唯一の道は、検知と対応の間に「配管」を通すことです。

この記事は、その配管——GuardDuty の finding を Amazon EventBridge で受け、自動インシデント対応(いわゆる SOAR: Security Orchestration, Automation and Response)に変えるための本番設計ガイドです。ピラー記事が EventBridge の入口(severity マッチ+単一 Lambda での隔離)までを扱ったのに対し、本記事はその——Step Functions による多段オーケストレーション、リソース種別ごとのプレイブック、人間承認ゲート、冪等性、DLQ、最小権限の責任者ロール——を、Terraform / ASL / Python の実コードで通します。題材として、私がマルチアカウント AWS 上のサーバーレス決済プラットフォームで IAM・可観測性・DR を横断実装し、実際の金銭を扱う基盤で「二重課金0件」を冪等性で担保した経験——その「at-least-once な世界で副作用を1回に保つ」発想は、自動対応の設計とまったく同じです——も交えます。

この記事のルール:仕様・イベント構造・重大度の数値帯・配信頻度は AWS 公式ドキュメント(2026年6月時点) に基づきます。finding の種類・推奨対応・API は改定されるため、本番投入前に必ず公式の最新情報を確認してください。そして自動対応を組む上での鉄則——GuardDuty は検知であって防御ではなく、自動対応は多層防御の一部にすぎません。自動化は ①冪等(at-least-once で2回来ても1回分)・②スコープを絞る(型の許可リスト)・③取り消し可能・④破壊的操作は人間承認を挟む——この4条件を満たすものから始めてください。誤検知一発で本番を落とすのが、自動対応最大のリスクです。


0. メンタルモデル:自動対応は「決める配管」であって「殴る自動化」ではない

設計を始める前に、一行で固定します。

自動インシデント対応 = finding(構造化された脅威の通知)を入力に、文脈を補強(enrich)し、対応レベルを判断(decide)し、安全な封じ込め(contain)と通知(notify)とチケット化(ticket)を、冪等かつ取り消し可能に実行する「決める配管」。破壊的な手当ては人間の承認を挟む。

ここから、設計のすべてを貫く3つの帰結が出ます。

  1. 「全部に殴る」は事故。 自動対応で最もやってはいけないのは、すべての finding にインスタンス終了や鍵削除を自動で打つことです。GuardDuty にも誤検知はあります(5章で抑える方法を扱います)。誤検知一発で本番インスタンスを落とせば、攻撃より自分の自動化のほうが高頻度で障害を起こします。だから設計原則は 「通知は広く・封じ込めは狭く・破壊は人間を挟む」
  2. EventBridge は at-least-once。だから冪等が前提。 EventBridge は同じ finding で対応をもう一度起動し得ます。決済基盤で二重課金を防ぐのと同じで、**対応も「2回来ても副作用は1回分」**でなければなりません。冪等キーは finding の id(+ updatedAt)。これが本記事の背骨です。
  3. 検知の速さと対応の速さは別。 GuardDuty は新規 finding を約5分で届けますが、再発(更新)の配信はデフォルト6時間遅れます。自動対応の反応速度を上げるには finding_publishing_frequency = FIFTEEN_MINUTES を設定する——これを知らないと「自動化したのに対応が遅い」罠にはまります(3章で詳述)。

この3点を押さえると、自動対応の設計は 「①どの finding を・どこに流すか(EventBridge ルーティング)→ ②どう判断し・どう手当てするか(Step Functions + プレイブック)→ ③どう壊れずに回すか(冪等・DLQ・最小権限・可観測性)」 の3つに分解できると分かります。順に作ります。


1. 全体アーキテクチャ:5つの動詞(enrich → decide → contain → notify → ticket)

先に完成形の地図を出します。GuardDuty から右に向かって、イベントが流れます。

GuardDuty finding
   │  (source=aws.guardduty / detail-type="GuardDuty Finding" / detail.severity=数値)
   ▼
EventBridge ルール(severity の数値マッチ + 型ルーティング + アーカイブ除外)
   ├─▶ [常に] SNS → Slack / メール       … 通知は広く(人間トリアージを速く)
   └─▶ Step Functions ステートマシン       … 対応は厳選(決める配管)
          1. ENRICH   finding を補強(タグ・所有者・関連リソース・既知IoC)
          2. DECIDE   重大度 × 型 × 許可リストで「通知のみ / 封じ込め / 要承認」を決定
          3. CONTAIN  リソース種別ごとのプレイブック Lambda(冪等・取り消し可能)
          4. APPROVE  破壊的操作だけ task token で人間承認を待つ(Slack/メール)
          5. NOTIFY+TICKET  対応結果を通知し、Jira/ServiceNow にチケット起票
          └─ 失敗時 → DLQ(SQS)+ リトライ。取りこぼさない

人手起動の窓口も用意します。Security Hub のカスタムアクションを使うと、アナリストがコンソールで finding を選んで「Quarantine」ボタンを押す=EventBridge イベントが飛び、同じ Step Functions を起動できます(8章)。「自動で起きる対応」と「人が引き金を引く対応」を同じ配管に集約するのが、運用をシンプルに保つコツです。

各構成要素と、その選定理由を表にします。

役割サービスなぜこれか
検知の発生源GuardDuty全 finding が自動で EventBridge に流れる
ルーティングEventBridge ルールseverity の数値マッチ・型ベース分岐・アーカイブ除外をサーバーレスで
広い通知SNS(→ Slack/メール)配信先を後から足せる。人間トリアージの起点
オーケストレーションStep Functionsenrich→decide→contain の多段・条件分岐・人間承認待ちを可視化された状態機械で
個別の手当てLambda(種別ごと)EC2/IAM/S3/EKS のプレイブックを SRP で分離。最小権限を種別ごとに絞れる
信頼性SQS(DLQ)+ retry_policyat-least-once + 一過性障害に対し取りこぼさない
人手起動Security Hub カスタムアクションアナリストがコンソールから同じ配管を起動

なぜ単一の巨大 Lambda にしないか(SRP):「1つの Lambda で全部やる」は、IAM ロールが ec2:*iam:*s3:* も持つ巨大権限になり、最小権限を破壊します。判断(decide)と手当て(contain)を分け、手当てをリソース種別ごとに割ると、各 Lambda の権限を「EC2 だけ」「IAM だけ」に絞れます。万一その Lambda が侵害されても被害を局所化できる——これは最小権限の典型的な配当です。


2. EventBridge ルーティング設計:severity 数値マッチ・型分岐・アーカイブ除外

すべての GuardDuty finding は、ほぼリアルタイムで EventBridge に届きます。届くイベントの形はこうです(公式の event schema)。

{
  "version": "0",
  "id": "cd2d702e-ab31-411b-9344-793ce56b1bc7",
  "detail-type": "GuardDuty Finding",
  "source": "aws.guardduty",
  "account": "111122223333",
  "region": "ap-northeast-1",
  "time": "2026-06-27T02:00:00Z",
  "detail": {
    "id": "1ab23c...",
    "type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
    "severity": 7.5,
    "accountId": "111122223333",
    "region": "ap-northeast-1",
    "updatedAt": "2026-06-27T02:00:00.000Z",
    "service": { "archived": false },
    "resource": { "resourceType": "Instance", "instanceDetails": { "instanceId": "i-0abc..." } }
  }
}

設計上の急所は3つです。

2.1 severity は「数値」——だから閾値で機械的に切れる

detail.severity は文字列ではなく数値です。EventBridge の数値マッチで「High 以上(≥ 7)だけ Step Functions へ」を1行で表現できます。重大度の数値帯(公式)はこうです。

重大度数値帯自動対応での扱い
Critical9.0 – 10.0攻撃シーケンス進行中の可能性。最優先で封じ込め+即エスカレーション
High7.0 – 8.9リソースが侵害され不正利用が進行中。封じ込めを検討(許可リスト次第)
Medium4.0 – 6.9逸脱した不審挙動。通知して人間が調査(自動封じ込めはしない)
Low1.0 – 3.9侵害に至らない試行(ポートスキャン等)。記録のみ

AttackSequence:* は必ず CriticalExtended Threat Detection が弱いシグナルを24時間窓で相関して束ねる「攻撃シーケンス」finding は、その性質上すべて Critical(9.0〜10.0) に分類されます。AttackSequence:S3/...AttackSequence:EKS/CompromisedCluster など。自動対応では AttackSequence:* を最優先のトリガーに据えるのが定石です(文脈が線で来ているので、最も対応価値が高い)。

2.2 アーカイブされた finding は流さない

公式の挙動として、Suppression Rule で自動アーカイブされた finding は EventBridge に送られません(手動アーカイブのものは、アーカイブ後の発生分が頻度に従って送られます)。とはいえ、ノイズを二重に弾く意味で、ルール側でも detail.service.archivedfalse のものだけ拾うフィルタを足しておくと堅牢です。

2.3 ルールを Terraform で:数値マッチ + 型ルーティング + DLQ + リトライ

「High 以上(≥7)かつ未アーカイブ」を Step Functions に流し、同時に SNS で広く通知する配線です。

# ── High 以上(severity >= 7) かつ未アーカイブの GuardDuty finding を捕捉 ──
resource "aws_cloudwatch_event_rule" "gd_high" {
  name        = "guardduty-high-to-soar"
  description = "Route GuardDuty findings (severity >= 7, not archived) to the SOAR pipeline"
  event_pattern = jsonencode({
    source        = ["aws.guardduty"]
    "detail-type" = ["GuardDuty Finding"]
    detail = {
      severity = [{ numeric = [">=", 7] }] # 数値マッチ。High(7.0-8.9) と Critical(9.0-10.0)
      service  = { archived = [false] }    # 抑制で自動アーカイブされたものは元々来ないが、二重に弾く
    }
  })
}

# ① 人間向け:必ず SNS(→ Slack/メール)へ。通知は広く。
resource "aws_cloudwatch_event_target" "to_sns" {
  rule      = aws_cloudwatch_event_rule.gd_high.name
  target_id = "notify-sns"
  arn       = aws_sns_topic.security_alerts.arn
}

# ② 機械向け:オーケストレーションする Step Functions ステートマシンへ。
resource "aws_cloudwatch_event_target" "to_sfn" {
  rule      = aws_cloudwatch_event_rule.gd_high.name
  target_id = "soar-state-machine"
  arn       = aws_sfn_state_machine.responder.arn
  role_arn  = aws_iam_role.eventbridge_invoke_sfn.arn # EventBridge が SFN を起動するためのロール

  # 一過性の失敗に備えてリトライ&DLQ(取りこぼさない)。
  retry_policy {
    maximum_event_age_in_seconds = 3600 # 最大1時間まで再試行(古すぎる対応は意味がない)
    maximum_retry_attempts       = 4
  }
  dead_letter_config { arn = aws_sqs_queue.soar_dlq.arn }
}

# 配送に失敗したイベントを溜める DLQ。ここが空でないことを必ずアラート対象にする。
resource "aws_sqs_queue" "soar_dlq" {
  name                      = "soar-eventbridge-dlq"
  message_retention_seconds = 1209600 # 14日。調査の猶予を最大に取る
}

DLQ は「作って終わり」ではない:DLQ にメッセージが溜まる=自動対応が起動できなかった finding が存在するということ。これは検知の取りこぼしと同義で、最も危険な沈黙です。ApproximateNumberOfMessagesVisible > 0 を CloudWatch アラームにして、DLQ が空でないこと自体を即通知してください。信頼性の設計では「失敗を握りつぶさない」ことが本質です。


3. 遅延の罠:新規は約5分・更新はデフォルト6時間

自動対応を組むなら、これは必読の落とし穴です。EventBridge への配信頻度は、finding の種類で挙動が違います(公式)。

  • 新規 finding(ユニークな finding ID の初回生成):**ほぼリアルタイム(約5分)**で配信。この頻度はデフォルトでは変更できません。
  • 既存 finding の再発(subsequent occurrences):同一タイプの再発を一定間隔で1イベントに集約して配信。この間隔が finding_publishing_frequency で、FIFTEEN_MINUTES / ONE_HOUR / SIX_HOURS(デフォルト) から選べます。デフォルトは6時間です。

つまり——「最初の検知は速いが、続報は最大6時間遅れる」。攻撃が継続している finding の続報を6時間待っていては、自動対応の意味が薄れます。だから自動対応を組むなら FIFTEEN_MINUTES に設定します。

# detector に対し、再発 finding の EventBridge 配信頻度を 15 分に。
# 注意: この頻度を変えられるのは(委任)管理者アカウントのみ。
#       メンバーアカウントは自分用に変更できず、管理者の設定が全メンバーに波及する。
resource "aws_guardduty_detector" "this" {
  enable                       = true
  finding_publishing_frequency = "FIFTEEN_MINUTES" # 自動対応の反応速度を上げる
}

マルチアカウントでの含意委任管理者アカウントでこの頻度を設定すると、全メンバーアカウントに同じ頻度が適用されます。メンバー側で個別に変えることはできません。全社の自動対応速度を、委任管理者の1設定で統制できる——これは利点でもあり、変更時の影響範囲が広いという注意点でもあります。


4. 対応設計の原則:通知は広く・封じ込めは許可リストで狭く・破壊は人間承認

ここが SOAR 設計の心臓です。3層に分けて、自動化の攻撃性を「リスクに比例」させます

何をどの finding に取り消し可能性
NOTIFY(通知)SNS → Slack/メールに enrich して通知High 以上すべて(広く)
CONTAIN(封じ込め)隔離 SG 付け替え・public access ブロック等型の許可リスト ∩ High 以上(狭く)取り消せる操作だけ
DESTROY(破壊的)鍵の無効化・インスタンス終了同上 + 人間承認後戻しにくい → だから人間を挟む

設計の言語化:

  • 通知(notify)は広く:トリアージは人間が速くできるよう、High 以上は全部通知に流す。通知はコストが低く、取りこぼしのリスクが高いので「広く」が正解。
  • 封じ込め(contain)は狭く・許可リストで:自動で手を出すのは、誤検知が少なく・操作が取り消せるものに限定します。「型の許可リスト(allowlist)」に載った finding 型だけが自動封じ込めの対象。許可リストにない型は、たとえ High でも通知+チケット止まり。
  • 破壊的操作(destroy)は人間承認を挟む:鍵の無効化やインスタンス終了は、後で正規利用と判明したときに戻しにくい。だから自動では「承認待ち」にして、人間がボタンを押すまで実行しない(Step Functions の task token、6.3節)。

冪等性が前提条件:上のどの操作も、EventBridge の at-least-once 配信で2回起動され得ます。封じ込めは「2回隔離されても状態は1回と同じ」でなければなりません。冪等キーは finding の id。さらに「同じ finding の新しい updatedAt か?」を見れば、古い再送を弾けます。これは決済基盤で同じ決済イベントを2回受けても課金は1回にする発想——id を冪等キーに、処理済みを記録して2回目を no-op にする——とまったく同じです。


5. リソース種別ごとのプレイブック:EC2 / IAM / S3 / EKS

封じ込めの中身は、リソース種別で全く違います。finding の detail.resource.resourceTypeInstance / AccessKey / S3Bucket / Kubernetes ...)で分岐し、種別ごとのプレイブックに渡します。各プレイブックは公式の「Remediating findings」に準拠します。

5.1 EC2(Instance)→ 隔離 SG + IAM ロールの遮断

公式の EC2 remediation は明快です:専用の Isolation セキュリティグループを作り(インバウンド・アウトバウンドとも 0.0.0.0/0 (0-65535) を許可しない)、それをインスタンスに関連付けそれ以外の SG をすべて外す。自動対応はこれをコード化します。

  • 隔離 SG(egress も絞る)を事前に作っておき、検知時は付け替えるだけにする(実行時に SG を作らない=速い・冪等)。
  • 既存の追跡済みコネクションは SG 変更では切れない(公式の注記)。将来トラフィックだけがブロックされる点に注意。即時遮断が要るなら NACL も併用。
  • インスタンスに強い IAM ロールが付いているなら、認証情報の持ち出し(InstanceCredentialExfiltration)に備え、ロールを「拒否のみ」のポリシーに差し替える/セッションを失効させることも検討(破壊的寄り=承認ゲート向き)。
  • 終了(terminate)は自動でしない——調査用にスナップショットを取り、終了は人間判断に回す。

5.2 IAM(AccessKey)→ 無効化・ローテーションは承認後

公式の credentials remediation は「関与した IAM エンティティと API を特定 → 権限を確認 → 正規利用か確認 → 漏洩なら鍵をローテーション」。鍵の無効化は戻しにくい破壊的操作なので、自動対応では:

  • AKIA(長期キー)と ASIA(STS 一時キー)を区別する。ASIA は短命で、コンソールから無効化できない(対処は権限剥奪・セッション失効側)。
  • 長期キーの無効化(UpdateAccessKeyInactive)は人間承認後に実行。削除(DeleteAccessKey)はさらに慎重に。
  • 並行して、流出疑いのプリンシパルに広範な明示 Deny を一時アタッチして被害を止める手もある(取り消せるので承認のハードルは下げられる)。

5.3 S3(S3Bucket)→ public access ブロック・ポリシー見直し

公式の S3 remediation は「関与したバケット・呼び出し元・API を特定 → アクセスが正規か判断 → Block Public Access 等で締める」。ANONYMOUS_PRINCIPAL が出たらバケットが公開されているサイン。

  • S3 Block Public Access を有効化(アカウント/バケット単位の4設定)。これは比較的安全で取り消し可能なので、許可リストに載せやすい。
  • 過度に緩いバケットポリシー・ACL を検出したら通知+チケット(自動でポリシーを書き換えるのは誤対応リスクが高い)。
  • 機密データの有無は Macie で評価——「何が漏れたか」をチケットに添える。

5.4 EKS(Kubernetes)→ pod の cordon / 隔離

EKS の侵害(侵害された pod・特権サービスアカウントトークン悪用)に対する封じ込めは Kubernetes 層で行います。

  • 対象ノードを cordon(新規スケジューリング停止)し、侵害 pod にネットワークポリシーで隔離(egress 遮断)を適用。
  • 侵害が疑われるサービスアカウントのトークンをローテーション、関連する IRSA ロールを締める。
  • これらは kubectl 相当の API 操作になるため、責任者 Lambda にクラスタへの最小権限の RBACを与える設計が要る(種別ごとにロールを割る理由がここにも出る)。

共通原則:どのプレイブックも「まず取り消せる封じ込め(隔離・public access ブロック・cordon)→ 破壊的な手当て(鍵削除・終了)は承認後」の順。封じ込めで時間を稼ぎ、破壊は人間が決める——これが「速さ」と「安全」を両立させる構造です。


6. オーケストレーション:Step Functions(ASL)+ enrich/route Lambda(Python)

判断と手当ての多段フローは、単一 Lambda の if 地獄ではなく Step Functions で組みます。理由は明快——状態が可視化され、人間承認の「待ち」を持て、各ステップを個別にリトライ/DLQ でき、失敗箇所が一目で分かるから。

6.1 ステートマシン(ASL スケッチ)

{
  "Comment": "GuardDuty finding -> automated incident response (SOAR)",
  "StartAt": "Enrich",
  "States": {
    "Enrich": {
      "Comment": "finding を補強し、対応レベル(notify/contain/approve)を決定する",
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${EnrichRouterFunctionArn}",
        "Payload.$": "$"
      },
      "ResultSelector": { "decision.$": "$.Payload.decision", "finding.$": "$.Payload.finding" },
      "Retry": [{ "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 }],
      "Next": "RouteByDecision"
    },
    "RouteByDecision": {
      "Comment": "decision に応じて分岐。notify-only はそのまま通知へ",
      "Type": "Choice",
      "Choices": [
        { "Variable": "$.decision.mode", "StringEquals": "contain", "Next": "RouteByResourceType" },
        { "Variable": "$.decision.mode", "StringEquals": "approve", "Next": "WaitForHumanApproval" }
      ],
      "Default": "NotifyAndTicket"
    },
    "RouteByResourceType": {
      "Comment": "リソース種別ごとのプレイブックへ。封じ込めは冪等・取り消し可能なものだけ",
      "Type": "Choice",
      "Choices": [
        { "Variable": "$.decision.resourceType", "StringEquals": "Instance",   "Next": "ContainEC2" },
        { "Variable": "$.decision.resourceType", "StringEquals": "S3Bucket",   "Next": "ContainS3" },
        { "Variable": "$.decision.resourceType", "StringEquals": "Kubernetes", "Next": "ContainEKS" }
      ],
      "Default": "NotifyAndTicket"
    },
    "ContainEC2": { "Type": "Task", "Resource": "${ContainEc2FunctionArn}", "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
    "ContainS3":  { "Type": "Task", "Resource": "${ContainS3FunctionArn}",  "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
    "ContainEKS": { "Type": "Task", "Resource": "${ContainEksFunctionArn}", "Next": "NotifyAndTicket",
      "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },

    "WaitForHumanApproval": {
      "Comment": "破壊的操作(鍵無効化・終了)は task token で人間の承認を待つ。承認まで実行しない",
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "${RequestApprovalFunctionArn}",
        "Payload": { "finding.$": "$.finding", "taskToken.$": "$$.Task.Token" }
      },
      "TimeoutSeconds": 3600,
      "Next": "ExecuteDestructive",
      "Catch": [{ "ErrorEquals": ["States.Timeout"], "Next": "NotifyAndTicket" }]
    },
    "ExecuteDestructive": {
      "Comment": "承認済みのときだけ破壊的アクションを実行(鍵無効化等)",
      "Type": "Task",
      "Resource": "${ExecuteDestructiveFunctionArn}",
      "Next": "NotifyAndTicket"
    },

    "NotifyAndTicket": {
      "Comment": "結果を通知し、チケットを起票して終了(常にここを通る)",
      "Type": "Task",
      "Resource": "${NotifyTicketFunctionArn}",
      "End": true
    }
  }
}

設計判断のポイント:

  • Catch で必ず NotifyAndTicket に合流——封じ込めが失敗しても、人間が気づける通知とチケットは必ず出す(沈黙を作らない)。
  • waitForTaskToken で人間承認を「待つ」——Step Functions は task token を返すまで課金されずに待機できる。承認 Lambda が Slack/メールに承認リンクを出し、人間が押すと SendTaskSuccess で再開(破壊的操作だけにこのゲートを使う)。
  • Retry は冪等な Enrich にだけ厚く——封じ込めタスクは Lambda 内部で冪等にする(後述)ので、ステートマシン側の二重起動も安全。

6.2 enrich + route ディスパッチャ(Python・冪等)

ピラー記事の「単一 Lambda で隔離」とは別物の、判断専用ディスパッチャです。副作用を持たず、finding を補強して decision(mode と resourceType)を返すだけ。テストしやすく、最小権限(読み取り+ DynamoDB のみ)で済みます。

"""GuardDuty finding を補強し、対応レベルを決定する enrich/route ディスパッチャ。

設計原則:
  - 副作用なし: 判断だけを返す純粋寄りの Lambda。封じ込めは別 Lambda に委譲。
  - 冪等: finding.id を冪等キーに、updatedAt で『古い再送』を弾く(DynamoDB 条件付き書き込み)。
  - スコープを絞る: 自動封じ込めは CONTAIN_ALLOWLIST に載った型 + High 以上のみ。
  - 破壊的は承認待ちへ: DESTRUCTIVE_ALLOWLIST は 'approve' を返し、自動実行しない。
  - 可観測: 構造化ログ。機密値(認証情報・PII)は出さない。
"""
from __future__ import annotations

import json
import logging
import os
import time
from typing import Any, Final, TypedDict

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ddb = boto3.client("dynamodb")
IDEMPOTENCY_TABLE: Final[str] = os.environ["IDEMPOTENCY_TABLE"]
IDEMPOTENCY_TTL_SECONDS: Final[int] = 7 * 24 * 3600  # 7日で自動失効(TTL 属性で削除)

# 自動封じ込め(冪等・取り消し可能)を許す finding 型。
CONTAIN_ALLOWLIST: Final[frozenset[str]] = frozenset({
    "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
    "CryptoCurrency:EC2/BitcoinTool.B!DNS",
    "Backdoor:EC2/C&CActivity.B!DNS",
    "Policy:S3/BucketPublicAccessGranted",
    "Policy:S3/BucketAnonymousAccessGranted",
})
# 破壊的操作(鍵無効化等)= 人間承認を挟む型。自動実行しない。
DESTRUCTIVE_ALLOWLIST: Final[frozenset[str]] = frozenset({
    "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS",
    "CredentialAccess:IAMUser/AnomalousBehavior",
})


class Decision(TypedDict):
    mode: str          # "notify" | "contain" | "approve"
    resourceType: str  # "Instance" | "S3Bucket" | "Kubernetes" | "AccessKey" | ...
    reason: str


def handler(event: dict[str, Any], _context: object) -> dict[str, Any]:
    detail = event["detail"]
    finding_id: str = detail["id"]
    updated_at: str = detail.get("updatedAt", detail.get("createdAt", ""))
    finding_type: str = detail["type"]
    severity: float = float(detail["severity"])
    resource_type: str = detail.get("resource", {}).get("resourceType", "Unknown")

    log = {"finding_id": finding_id, "type": finding_type,
           "severity": severity, "resourceType": resource_type}

    # ── 冪等ガード ──
    # 同じ finding_id を、同じ(以前の)updatedAt で2回処理しない。
    # at-least-once 配信や Step Functions の再試行で再入しても副作用は1回分。
    if not _claim_finding(finding_id, updated_at):
        logger.info(json.dumps({**log, "decision": "duplicate-skip"}))
        return {"finding": _safe_finding(detail), "decision": Decision(
            mode="notify", resourceType=resource_type, reason="duplicate (already processed)")}

    # ── 判断 ──
    if finding_type in DESTRUCTIVE_ALLOWLIST and severity >= 7.0:
        decision = Decision(mode="approve", resourceType=resource_type,
                            reason="destructive remediation requires human approval")
    elif finding_type in CONTAIN_ALLOWLIST and severity >= 7.0:
        decision = Decision(mode="contain", resourceType=resource_type,
                            reason="allowlisted reversible containment")
    else:
        # 許可リスト外・Medium 以下・型不明 → 通知+チケットのみ。
        decision = Decision(mode="notify", resourceType=resource_type,
                            reason="notify-only (not in allowlist or below threshold)")

    logger.info(json.dumps({**log, "decision": decision["mode"], "reason": decision["reason"]}))
    return {"finding": _safe_finding(detail), "decision": decision}


def _claim_finding(finding_id: str, updated_at: str) -> bool:
    """冪等トークンを確保。新規 or より新しい updatedAt のときだけ True を返す。

    条件付き書き込みで『未処理 or 受信した updatedAt がより新しい』場合のみ上書き。
    既に同じ/より新しい版を処理済みなら False(=重複)。
    """
    now = int(time.time())
    try:
        ddb.put_item(
            TableName=IDEMPOTENCY_TABLE,
            Item={
                "finding_id": {"S": finding_id},
                "updated_at": {"S": updated_at},
                "ttl": {"N": str(now + IDEMPOTENCY_TTL_SECONDS)},
            },
            # 未登録、または登録済みでも今回の updated_at の方が新しいときだけ書く。
            ConditionExpression="attribute_not_exists(finding_id) OR updated_at < :u",
            ExpressionAttributeValues={":u": {"S": updated_at}},
        )
        return True
    except ClientError as exc:
        if exc.response["Error"]["Code"] == "ConditionalCheckFailedException":
            return False  # 同じ/古い再送 → スキップ
        raise


def _safe_finding(detail: dict[str, Any]) -> dict[str, Any]:
    """下流に渡す finding から、機密になり得るフィールドを落とす(ログ・通知に乗せない)。"""
    keep = ("id", "type", "severity", "accountId", "region", "title", "updatedAt")
    slim = {k: detail.get(k) for k in keep}
    res = detail.get("resource", {})
    slim["resourceType"] = res.get("resourceType")
    slim["instanceId"] = res.get("instanceDetails", {}).get("instanceId")
    slim["bucketName"] = (res.get("s3BucketDetails") or [{}])[0].get("name")
    return slim

このコードの設計を明示します。

  • 冪等は DynamoDB の条件付き書き込みで担保finding_id をキーに「未処理 or より新しい updatedAt」のときだけ書く。2回目以降は ConditionalCheckFailedExceptionduplicate-skip になり、封じ込めは走りません。TTL でレコードは自動失効。これが**決済の「同じイベントを2回処理しても1回分」**と同型の防御です。
  • 判断と手当ての分離(SRP):この Lambda は ec2:*iam:* も要りません。権限は DynamoDB への put と読み取りだけ。手当ての強い権限は、種別ごとの封じ込め Lambda にだけ与えます。
  • 機密を下流に流さない(可観測性 × セキュリティ)_safe_finding で必要なフィールドだけに絞り、通知・ログに認証情報や PII が混ざらないようにします。

6.3 封じ込め Lambda 側も冪等に

種別ごとの封じ込め Lambda(例:EC2)は、ピラー記事の隔離ロジックと同様にそれ自体も冪等にします(隔離タグでガードし、2回目は already-quarantined で no-op)。「ディスパッチャの冪等」と「封じ込めの冪等」を二重に持つことで、どこで再送・再試行が起きても安全になります。これは多層の冪等防御です。


7. 最小権限・可観測性・テスト:本番品質は配管で決まる

7.1 責任者ロールは「種別ごとに・最小に」

自動対応の IAM は、攻撃者にとって魅力的な標的です(侵害すれば隔離も鍵削除もできてしまう)。だから種別ごとに分割し、各 Lambda の権限を最小にします。EC2 封じ込め Lambda の例:

# EC2 封じ込め Lambda の実行ロール。EC2 の隔離に必要な action だけ。
data "aws_iam_policy_document" "contain_ec2" {
  statement {
    sid       = "DescribeForQuarantine"
    actions   = ["ec2:DescribeInstances"]
    resources = ["*"] # Describe は arn 単位で絞れないため condition で補う
  }
  statement {
    sid    = "QuarantineActions"
    actions = [
      "ec2:ModifyNetworkInterfaceAttribute", # ENI を隔離SGに付け替え
      "ec2:CreateTags",                       # 冪等ガード用の隔離タグ
    ]
    resources = ["*"]
    condition {
      test     = "StringEquals"
      variable = "aws:RequestedRegion"
      values   = [var.region] # 対象リージョンに限定
    }
  }
  # 重要: terminate / delete 系は与えない。破壊的操作は別ロール+承認後に限る。
}

EventBridge → Step Functions のロールも最小に:EventBridge が起動できるのはその1本のステートマシンだけになるよう resources を絞ります。「EventBridge が任意の SFN を起動できる」ロールは過剰権限です。

7.2 可観測性:構造化ログ・コンソール深リンク・機密ゼロ

  • **構造化ログ(JSON)**で finding_id / type / severity / decision / action を出し、後から「どの finding にどう対応したか」を機械的に追えるようにする。
  • 通知にはコンソールへの深リンクを入れて、人間が1クリックで finding にたどり着けるようにする(https://console.aws.amazon.com/guardduty/home?region=<region>#/findings?search=id%3D<id>)。
  • 機密値(認証情報・PII・生のリソース詳細)はログにも通知にも出さない(6.2 の _safe_finding)。可観測性とセキュリティは両立させる。

7.3 テスト:実際の攻撃を待たずに経路を検証する

「検証経路を先に作る」原則の出番です。GuardDuty はサンプル finding を意図的に生成できます。これで EventBridge → Step Functions → 封じ込めの経路を、本物の攻撃前に通します。

# 各 finding 型のサンプルを発火させ、SOAR パイプラインが想定通り動くか検証する。
aws guardduty create-sample-findings \
  --detector-id "$DETECTOR_ID" \
  --finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" \
                  "Policy:S3/BucketPublicAccessGranted" \
                  "UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS"

検証の階層:

  1. ユニットテストhandler の判断ロジックを、サンプル finding の JSON を入力に網羅。boto3 クライアントをスタブ化し、contain / approve / notify / duplicate-skip の分岐を全部叩く(副作用がないので速い)。
  2. 冪等テスト:同じ finding を2回入力して、2回目が duplicate-skip になることを確認。
  3. 経路テスト(E2E)create-sample-findings で実イベントを流し、Step Functions の実行履歴で各ステートが期待どおり遷移したかを確認。封じ込め Lambda は最初 DRY_RUN=true にして「判断はするが手は出さない」状態で本番トラフィックに当て、意思決定だけを先に検証してから手当てを有効化する。

段階導入DRY_RUN モードは「自動化を本番に入れる」最大の不安——誤対応——を抑える安全弁です。まず数週間 DRY_RUN で「もし自動化していたら、どの finding にどう対応していたか」をログで観察し、誤対応がゼロだと確認してから手当てを ON にします。これは WAF を Count から Block に上げるのと同じ「観測してから強制」の発想です。


8. 人手起動の窓口:Security Hub カスタムアクション

完全自動化が怖い対応(鍵削除・本番終了)や、自動許可リストに載せていない型に対しては、アナリストがコンソールから引き金を引ける窓口を用意します。Security Hub のカスタムアクションがそれです。

仕組み:Security Hub にカスタムアクション(例:Quarantine)を作ると、アナリストがコンソールで finding を選んでそのアクションを実行したとき、EventBridge イベントが飛びます。形はこうです(公式)。

{
  "version": "0",
  "detail-type": "Security Hub Findings - Custom Action",
  "source": "aws.securityhub",
  "account": "111122223333",
  "region": "ap-northeast-1",
  "resources": [
    "arn:aws:securityhub:ap-northeast-1:111122223333:action/custom/quarantine"
  ],
  "detail": {
    "actionName": "quarantine",
    "actionDescription": "Quarantine the affected resource",
    "findings": [ { "...": "ASFF 形式の finding 1件" } ]
  }
}

このイベントを拾う EventBridge ルールは、カスタムアクションの ARN(resources)でマッチさせ、同じ Step Functions を起動します。

# Security Hub の手動カスタムアクション("Quarantine")を拾い、同じ SOAR 配管へ。
resource "aws_cloudwatch_event_rule" "sh_custom_action" {
  name = "securityhub-quarantine-custom-action"
  event_pattern = jsonencode({
    source        = ["aws.securityhub"]
    "detail-type" = ["Security Hub Findings - Custom Action"]
    resources     = ["arn:aws:securityhub:${var.region}:${var.account_id}:action/custom/quarantine"]
  })
}

resource "aws_cloudwatch_event_target" "sh_to_sfn" {
  rule     = aws_cloudwatch_event_rule.sh_custom_action.name
  arn      = aws_sfn_state_machine.responder.arn
  role_arn = aws_iam_role.eventbridge_invoke_sfn.arn
}

設計の含意:これで 「自動で起きる対応(GuardDuty → EventBridge)」と「人が起こす対応(Security Hub → EventBridge)」が、同じ Step Functions に集約されます。配管が1本なので、ロジックの重複(DRY 違反)が起きず、テストも一箇所で済みます。GuardDuty の finding を Security Hub に集約しておけば、複数の検知ソース(Macie・Inspector・サードパーティ)に対しても**同じ「Quarantine ボタン」**を提供できます。


9. まとめ:GuardDuty 自動インシデント対応チートシート

迷ったときの早見表です。

  • 何を作るのか:finding を enrich → decide → contain → notify → ticket に変える「決める配管」。GuardDuty は検知、対応の価値は MTTR で決まる。
  • EventBridge ルーティングsource=aws.guardduty / detail-type="GuardDuty Finding"detail.severity は数値{"numeric":[">=",7]} で High 以上を機械的に拾う。AttackSequence:* は必ず Critical=最優先トリガー。抑制で自動アーカイブされた finding は EventBridge に来ない。
  • 遅延の罠:新規 finding は約5分、再発はデフォルト6時間。自動対応には finding_publishing_frequency = FIFTEEN_MINUTES(委任管理者が設定=全メンバーに波及)。
  • 対応設計の原則通知は広く(High 以上全部)・封じ込めは型の許可リストで狭く(取り消せる操作だけ)・破壊的操作は人間承認を挟む
  • 冪等性:EventBridge は at-least-once。finding の id を冪等キーupdatedAt で古い再送を弾く(条件付き書き込み)。封じ込め Lambda 側も二重に冪等化。決済の「二重課金0件」と同型。
  • プレイブック(種別ごと):EC2 → 隔離 SG 付け替え(既存接続は SG では切れない)。IAM AccessKey → 無効化/失効は承認後AKIA/ASIA を区別)。S3 → Block Public Access。EKS → cordon+pod 隔離。
  • オーケストレーションStep FunctionswaitForTaskToken で人間承認を待ち、Catch で必ず NotifyAndTicket に合流(沈黙を作らない)。
  • 本番品質DLQ が空でないことをアラート、責任者ロールは種別ごとに最小権限、構造化ログ+深リンク・機密ゼロ、create-sample-findings で経路検証DRY_RUN で意思決定を先に本番検証。
  • 人手起動Security Hub カスタムアクションdetail-type="Security Hub Findings - Custom Action")で同じ配管をアナリストが起動。

GuardDuty を「赤いダッシュボード」で終わらせるか、「検知が即・安全な対応に変わる仕組み」にできるかは、finding を冪等で・スコープを絞った・取り消せる自動対応に変える配管を作れるかで決まります。最大のレバレッジは検知そのものより、その先の配管にあります。

私はマルチアカウントのサーバーレス決済プラットフォームで、実際の金銭・カーボンクレジット・地域通貨を扱う基盤の IAM・可観測性・DR を横断実装し、「正しさ」を運用の注意深さではなくコードの構造と冪等性で担保してきました——at-least-once な世界で同じ決済イベントを2回受けても課金は1回にする、その発想はそのまま自動インシデント対応に転用できます。GuardDuty の自動対応も同じ思想で設計します——①EventBridge で severity と型を機械的に切り分け、②Step Functions で「決める配管」を可視化し、③finding id を冪等キーに、許可リストで・取り消せる・承認ゲート付きの対応にする。検知を行動に変える配管を、運用に載る形まで作り切ります。インシデント対応そのものの運用設計(ランブック・オンコール・ポストモーテム)と組み合わせれば、自動対応と人間の判断がかみ合った体制になります。

「自社の GuardDuty にどこまで自動対応を任せ、どこに人間の承認ゲートを置き、どう冪等・最小権限・取り消し可能に設計するか」——EventBridge ルーティングから Step Functions のオーケストレーション、リソース種別ごとのプレイブック、テストと段階導入まで、一人 ×生成AI(Claude Code)で速く・安全に伴走できます。 要件の整理段階からでも、お気軽にご相談ください。


参考(公式ドキュメント)

友田

友田 陽大

経済産業大臣賞 受賞プロダクト開発者。TypeScript + Python + AWS で、SaaS・業界DX・ 実用レベルの生成AI(RAG)を、要件定義からインフラ・運用まで一人で完遂します。

この記事の実装を、案件として承ります

GuardDuty の finding を、自動インシデント対応(SOAR)に変える

finding を EventBridge で受け、『通知は広く・封じ込めは許可リストで狭く・破壊は人間承認』の冪等な自動対応へ。Step Functions / Lambda で MTTR を縮める対応基盤を設計・実装します。

プロジェクト単位(請負)・技術顧問のどちらにも対応可能です。まずは30分の無料技術相談から。