「GuardDuty で検知はできました。で、検知したあとは誰が何をするんですか?」——これが、脅威検知の次に必ず来る、そして多くの組織で答えに詰まる問いです。
GuardDuty で脅威検知を本番設計すると、ダッシュボードには finding が並びます。けれど finding は「赤いカード」であって「対応」ではありません。深夜2時に UnauthorizedAccess:EC2/MaliciousIPCaller.Custom が出ても、それを人間が気づき、コンソールを開き、インスタンスを特定し、隔離 SG を作って付け替える——この間に攻撃者はデータを抜き、横展開します。MTTR(平均対応時間)を縮める唯一の道は、検知と対応の間に「配管」を通すことです。
この記事は、その配管——GuardDuty の finding を Amazon EventBridge で受け、自動インシデント対応(いわゆる SOAR: Security Orchestration, Automation and Response)に変えるための本番設計ガイドです。ピラー記事が EventBridge の入口(severity マッチ+単一 Lambda での隔離)までを扱ったのに対し、本記事はその先——Step Functions による多段オーケストレーション、リソース種別ごとのプレイブック、人間承認ゲート、冪等性、DLQ、最小権限の責任者ロール——を、Terraform / ASL / Python の実コードで通します。題材として、私がマルチアカウント AWS 上のサーバーレス決済プラットフォームで IAM・可観測性・DR を横断実装し、実際の金銭を扱う基盤で「二重課金0件」を冪等性で担保した経験——その「at-least-once な世界で副作用を1回に保つ」発想は、自動対応の設計とまったく同じです——も交えます。
この記事のルール:仕様・イベント構造・重大度の数値帯・配信頻度は AWS 公式ドキュメント(2026年6月時点) に基づきます。finding の種類・推奨対応・API は改定されるため、本番投入前に必ず公式の最新情報を確認してください。そして自動対応を組む上での鉄則——GuardDuty は検知であって防御ではなく、自動対応は多層防御の一部にすぎません。自動化は ①冪等(at-least-once で2回来ても1回分)・②スコープを絞る(型の許可リスト)・③取り消し可能・④破壊的操作は人間承認を挟む——この4条件を満たすものから始めてください。誤検知一発で本番を落とすのが、自動対応最大のリスクです。
0. メンタルモデル:自動対応は「決める配管」であって「殴る自動化」ではない
設計を始める前に、一行で固定します。
自動インシデント対応 = finding(構造化された脅威の通知)を入力に、文脈を補強(enrich)し、対応レベルを判断(decide)し、安全な封じ込め(contain)と通知(notify)とチケット化(ticket)を、冪等かつ取り消し可能に実行する「決める配管」。破壊的な手当ては人間の承認を挟む。
ここから、設計のすべてを貫く3つの帰結が出ます。
- 「全部に殴る」は事故。 自動対応で最もやってはいけないのは、すべての finding にインスタンス終了や鍵削除を自動で打つことです。GuardDuty にも誤検知はあります(5章で抑える方法を扱います)。誤検知一発で本番インスタンスを落とせば、攻撃より自分の自動化のほうが高頻度で障害を起こします。だから設計原則は 「通知は広く・封じ込めは狭く・破壊は人間を挟む」。
- EventBridge は at-least-once。だから冪等が前提。 EventBridge は同じ finding で対応をもう一度起動し得ます。決済基盤で二重課金を防ぐのと同じで、**対応も「2回来ても副作用は1回分」**でなければなりません。冪等キーは finding の
id(+updatedAt)。これが本記事の背骨です。 - 検知の速さと対応の速さは別。 GuardDuty は新規 finding を約5分で届けますが、再発(更新)の配信はデフォルト6時間遅れます。自動対応の反応速度を上げるには
finding_publishing_frequency = FIFTEEN_MINUTESを設定する——これを知らないと「自動化したのに対応が遅い」罠にはまります(3章で詳述)。
この3点を押さえると、自動対応の設計は 「①どの finding を・どこに流すか(EventBridge ルーティング)→ ②どう判断し・どう手当てするか(Step Functions + プレイブック)→ ③どう壊れずに回すか(冪等・DLQ・最小権限・可観測性)」 の3つに分解できると分かります。順に作ります。
1. 全体アーキテクチャ:5つの動詞(enrich → decide → contain → notify → ticket)
先に完成形の地図を出します。GuardDuty から右に向かって、イベントが流れます。
GuardDuty finding
│ (source=aws.guardduty / detail-type="GuardDuty Finding" / detail.severity=数値)
▼
EventBridge ルール(severity の数値マッチ + 型ルーティング + アーカイブ除外)
├─▶ [常に] SNS → Slack / メール … 通知は広く(人間トリアージを速く)
└─▶ Step Functions ステートマシン … 対応は厳選(決める配管)
1. ENRICH finding を補強(タグ・所有者・関連リソース・既知IoC)
2. DECIDE 重大度 × 型 × 許可リストで「通知のみ / 封じ込め / 要承認」を決定
3. CONTAIN リソース種別ごとのプレイブック Lambda(冪等・取り消し可能)
4. APPROVE 破壊的操作だけ task token で人間承認を待つ(Slack/メール)
5. NOTIFY+TICKET 対応結果を通知し、Jira/ServiceNow にチケット起票
└─ 失敗時 → DLQ(SQS)+ リトライ。取りこぼさない
人手起動の窓口も用意します。Security Hub のカスタムアクションを使うと、アナリストがコンソールで finding を選んで「Quarantine」ボタンを押す=EventBridge イベントが飛び、同じ Step Functions を起動できます(8章)。「自動で起きる対応」と「人が引き金を引く対応」を同じ配管に集約するのが、運用をシンプルに保つコツです。
各構成要素と、その選定理由を表にします。
| 役割 | サービス | なぜこれか |
|---|---|---|
| 検知の発生源 | GuardDuty | 全 finding が自動で EventBridge に流れる |
| ルーティング | EventBridge ルール | severity の数値マッチ・型ベース分岐・アーカイブ除外をサーバーレスで |
| 広い通知 | SNS(→ Slack/メール) | 配信先を後から足せる。人間トリアージの起点 |
| オーケストレーション | Step Functions | enrich→decide→contain の多段・条件分岐・人間承認待ちを可視化された状態機械で |
| 個別の手当て | Lambda(種別ごと) | EC2/IAM/S3/EKS のプレイブックを SRP で分離。最小権限を種別ごとに絞れる |
| 信頼性 | SQS(DLQ)+ retry_policy | at-least-once + 一過性障害に対し取りこぼさない |
| 人手起動 | Security Hub カスタムアクション | アナリストがコンソールから同じ配管を起動 |
なぜ単一の巨大 Lambda にしないか(SRP):「1つの Lambda で全部やる」は、IAM ロールが
ec2:*もiam:*もs3:*も持つ巨大権限になり、最小権限を破壊します。判断(decide)と手当て(contain)を分け、手当てをリソース種別ごとに割ると、各 Lambda の権限を「EC2 だけ」「IAM だけ」に絞れます。万一その Lambda が侵害されても被害を局所化できる——これは最小権限の典型的な配当です。
2. EventBridge ルーティング設計:severity 数値マッチ・型分岐・アーカイブ除外
すべての GuardDuty finding は、ほぼリアルタイムで EventBridge に届きます。届くイベントの形はこうです(公式の event schema)。
{
"version": "0",
"id": "cd2d702e-ab31-411b-9344-793ce56b1bc7",
"detail-type": "GuardDuty Finding",
"source": "aws.guardduty",
"account": "111122223333",
"region": "ap-northeast-1",
"time": "2026-06-27T02:00:00Z",
"detail": {
"id": "1ab23c...",
"type": "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
"severity": 7.5,
"accountId": "111122223333",
"region": "ap-northeast-1",
"updatedAt": "2026-06-27T02:00:00.000Z",
"service": { "archived": false },
"resource": { "resourceType": "Instance", "instanceDetails": { "instanceId": "i-0abc..." } }
}
}
設計上の急所は3つです。
2.1 severity は「数値」——だから閾値で機械的に切れる
detail.severity は文字列ではなく数値です。EventBridge の数値マッチで「High 以上(≥ 7)だけ Step Functions へ」を1行で表現できます。重大度の数値帯(公式)はこうです。
| 重大度 | 数値帯 | 自動対応での扱い |
|---|---|---|
| Critical | 9.0 – 10.0 | 攻撃シーケンス進行中の可能性。最優先で封じ込め+即エスカレーション |
| High | 7.0 – 8.9 | リソースが侵害され不正利用が進行中。封じ込めを検討(許可リスト次第) |
| Medium | 4.0 – 6.9 | 逸脱した不審挙動。通知して人間が調査(自動封じ込めはしない) |
| Low | 1.0 – 3.9 | 侵害に至らない試行(ポートスキャン等)。記録のみ |
AttackSequence:*は必ず Critical:Extended Threat Detection が弱いシグナルを24時間窓で相関して束ねる「攻撃シーケンス」finding は、その性質上すべて Critical(9.0〜10.0) に分類されます。AttackSequence:S3/...、AttackSequence:EKS/CompromisedClusterなど。自動対応ではAttackSequence:*を最優先のトリガーに据えるのが定石です(文脈が線で来ているので、最も対応価値が高い)。
2.2 アーカイブされた finding は流さない
公式の挙動として、Suppression Rule で自動アーカイブされた finding は EventBridge に送られません(手動アーカイブのものは、アーカイブ後の発生分が頻度に従って送られます)。とはいえ、ノイズを二重に弾く意味で、ルール側でも detail.service.archived が false のものだけ拾うフィルタを足しておくと堅牢です。
2.3 ルールを Terraform で:数値マッチ + 型ルーティング + DLQ + リトライ
「High 以上(≥7)かつ未アーカイブ」を Step Functions に流し、同時に SNS で広く通知する配線です。
# ── High 以上(severity >= 7) かつ未アーカイブの GuardDuty finding を捕捉 ──
resource "aws_cloudwatch_event_rule" "gd_high" {
name = "guardduty-high-to-soar"
description = "Route GuardDuty findings (severity >= 7, not archived) to the SOAR pipeline"
event_pattern = jsonencode({
source = ["aws.guardduty"]
"detail-type" = ["GuardDuty Finding"]
detail = {
severity = [{ numeric = [">=", 7] }] # 数値マッチ。High(7.0-8.9) と Critical(9.0-10.0)
service = { archived = [false] } # 抑制で自動アーカイブされたものは元々来ないが、二重に弾く
}
})
}
# ① 人間向け:必ず SNS(→ Slack/メール)へ。通知は広く。
resource "aws_cloudwatch_event_target" "to_sns" {
rule = aws_cloudwatch_event_rule.gd_high.name
target_id = "notify-sns"
arn = aws_sns_topic.security_alerts.arn
}
# ② 機械向け:オーケストレーションする Step Functions ステートマシンへ。
resource "aws_cloudwatch_event_target" "to_sfn" {
rule = aws_cloudwatch_event_rule.gd_high.name
target_id = "soar-state-machine"
arn = aws_sfn_state_machine.responder.arn
role_arn = aws_iam_role.eventbridge_invoke_sfn.arn # EventBridge が SFN を起動するためのロール
# 一過性の失敗に備えてリトライ&DLQ(取りこぼさない)。
retry_policy {
maximum_event_age_in_seconds = 3600 # 最大1時間まで再試行(古すぎる対応は意味がない)
maximum_retry_attempts = 4
}
dead_letter_config { arn = aws_sqs_queue.soar_dlq.arn }
}
# 配送に失敗したイベントを溜める DLQ。ここが空でないことを必ずアラート対象にする。
resource "aws_sqs_queue" "soar_dlq" {
name = "soar-eventbridge-dlq"
message_retention_seconds = 1209600 # 14日。調査の猶予を最大に取る
}
DLQ は「作って終わり」ではない:DLQ にメッセージが溜まる=自動対応が起動できなかった finding が存在するということ。これは検知の取りこぼしと同義で、最も危険な沈黙です。
ApproximateNumberOfMessagesVisible > 0を CloudWatch アラームにして、DLQ が空でないこと自体を即通知してください。信頼性の設計では「失敗を握りつぶさない」ことが本質です。
3. 遅延の罠:新規は約5分・更新はデフォルト6時間
自動対応を組むなら、これは必読の落とし穴です。EventBridge への配信頻度は、finding の種類で挙動が違います(公式)。
- 新規 finding(ユニークな finding ID の初回生成):**ほぼリアルタイム(約5分)**で配信。この頻度はデフォルトでは変更できません。
- 既存 finding の再発(subsequent occurrences):同一タイプの再発を一定間隔で1イベントに集約して配信。この間隔が
finding_publishing_frequencyで、FIFTEEN_MINUTES/ONE_HOUR/SIX_HOURS(デフォルト) から選べます。デフォルトは6時間です。
つまり——「最初の検知は速いが、続報は最大6時間遅れる」。攻撃が継続している finding の続報を6時間待っていては、自動対応の意味が薄れます。だから自動対応を組むなら FIFTEEN_MINUTES に設定します。
# detector に対し、再発 finding の EventBridge 配信頻度を 15 分に。
# 注意: この頻度を変えられるのは(委任)管理者アカウントのみ。
# メンバーアカウントは自分用に変更できず、管理者の設定が全メンバーに波及する。
resource "aws_guardduty_detector" "this" {
enable = true
finding_publishing_frequency = "FIFTEEN_MINUTES" # 自動対応の反応速度を上げる
}
マルチアカウントでの含意:委任管理者アカウントでこの頻度を設定すると、全メンバーアカウントに同じ頻度が適用されます。メンバー側で個別に変えることはできません。全社の自動対応速度を、委任管理者の1設定で統制できる——これは利点でもあり、変更時の影響範囲が広いという注意点でもあります。
4. 対応設計の原則:通知は広く・封じ込めは許可リストで狭く・破壊は人間承認
ここが SOAR 設計の心臓です。3層に分けて、自動化の攻撃性を「リスクに比例」させます。
| 層 | 何を | どの finding に | 取り消し可能性 |
|---|---|---|---|
| NOTIFY(通知) | SNS → Slack/メールに enrich して通知 | High 以上すべて(広く) | — |
| CONTAIN(封じ込め) | 隔離 SG 付け替え・public access ブロック等 | 型の許可リスト ∩ High 以上(狭く) | 取り消せる操作だけ |
| DESTROY(破壊的) | 鍵の無効化・インスタンス終了 | 同上 + 人間承認後 | 戻しにくい → だから人間を挟む |
設計の言語化:
- 通知(notify)は広く:トリアージは人間が速くできるよう、High 以上は全部通知に流す。通知はコストが低く、取りこぼしのリスクが高いので「広く」が正解。
- 封じ込め(contain)は狭く・許可リストで:自動で手を出すのは、誤検知が少なく・操作が取り消せるものに限定します。「型の許可リスト(allowlist)」に載った finding 型だけが自動封じ込めの対象。許可リストにない型は、たとえ High でも通知+チケット止まり。
- 破壊的操作(destroy)は人間承認を挟む:鍵の無効化やインスタンス終了は、後で正規利用と判明したときに戻しにくい。だから自動では「承認待ち」にして、人間がボタンを押すまで実行しない(Step Functions の task token、6.3節)。
冪等性が前提条件:上のどの操作も、EventBridge の at-least-once 配信で2回起動され得ます。封じ込めは「2回隔離されても状態は1回と同じ」でなければなりません。冪等キーは finding の
id。さらに「同じ finding の新しいupdatedAtか?」を見れば、古い再送を弾けます。これは決済基盤で同じ決済イベントを2回受けても課金は1回にする発想——idを冪等キーに、処理済みを記録して2回目を no-op にする——とまったく同じです。
5. リソース種別ごとのプレイブック:EC2 / IAM / S3 / EKS
封じ込めの中身は、リソース種別で全く違います。finding の detail.resource.resourceType(Instance / AccessKey / S3Bucket / Kubernetes ...)で分岐し、種別ごとのプレイブックに渡します。各プレイブックは公式の「Remediating findings」に準拠します。
5.1 EC2(Instance)→ 隔離 SG + IAM ロールの遮断
公式の EC2 remediation は明快です:専用の Isolation セキュリティグループを作り(インバウンド・アウトバウンドとも 0.0.0.0/0 (0-65535) を許可しない)、それをインスタンスに関連付け、それ以外の SG をすべて外す。自動対応はこれをコード化します。
- 隔離 SG(egress も絞る)を事前に作っておき、検知時は付け替えるだけにする(実行時に SG を作らない=速い・冪等)。
- 既存の追跡済みコネクションは SG 変更では切れない(公式の注記)。将来トラフィックだけがブロックされる点に注意。即時遮断が要るなら NACL も併用。
- インスタンスに強い IAM ロールが付いているなら、認証情報の持ち出し(
InstanceCredentialExfiltration)に備え、ロールを「拒否のみ」のポリシーに差し替える/セッションを失効させることも検討(破壊的寄り=承認ゲート向き)。 - 終了(terminate)は自動でしない——調査用にスナップショットを取り、終了は人間判断に回す。
5.2 IAM(AccessKey)→ 無効化・ローテーションは承認後
公式の credentials remediation は「関与した IAM エンティティと API を特定 → 権限を確認 → 正規利用か確認 → 漏洩なら鍵をローテーション」。鍵の無効化は戻しにくい破壊的操作なので、自動対応では:
AKIA(長期キー)とASIA(STS 一時キー)を区別する。ASIAは短命で、コンソールから無効化できない(対処は権限剥奪・セッション失効側)。- 長期キーの無効化(
UpdateAccessKey→Inactive)は人間承認後に実行。削除(DeleteAccessKey)はさらに慎重に。 - 並行して、流出疑いのプリンシパルに広範な明示 Deny を一時アタッチして被害を止める手もある(取り消せるので承認のハードルは下げられる)。
5.3 S3(S3Bucket)→ public access ブロック・ポリシー見直し
公式の S3 remediation は「関与したバケット・呼び出し元・API を特定 → アクセスが正規か判断 → Block Public Access 等で締める」。ANONYMOUS_PRINCIPAL が出たらバケットが公開されているサイン。
- S3 Block Public Access を有効化(アカウント/バケット単位の4設定)。これは比較的安全で取り消し可能なので、許可リストに載せやすい。
- 過度に緩いバケットポリシー・ACL を検出したら通知+チケット(自動でポリシーを書き換えるのは誤対応リスクが高い)。
- 機密データの有無は Macie で評価——「何が漏れたか」をチケットに添える。
5.4 EKS(Kubernetes)→ pod の cordon / 隔離
EKS の侵害(侵害された pod・特権サービスアカウントトークン悪用)に対する封じ込めは Kubernetes 層で行います。
- 対象ノードを cordon(新規スケジューリング停止)し、侵害 pod にネットワークポリシーで隔離(egress 遮断)を適用。
- 侵害が疑われるサービスアカウントのトークンをローテーション、関連する IRSA ロールを締める。
- これらは
kubectl相当の API 操作になるため、責任者 Lambda にクラスタへの最小権限の RBACを与える設計が要る(種別ごとにロールを割る理由がここにも出る)。
共通原則:どのプレイブックも「まず取り消せる封じ込め(隔離・public access ブロック・cordon)→ 破壊的な手当て(鍵削除・終了)は承認後」の順。封じ込めで時間を稼ぎ、破壊は人間が決める——これが「速さ」と「安全」を両立させる構造です。
6. オーケストレーション:Step Functions(ASL)+ enrich/route Lambda(Python)
判断と手当ての多段フローは、単一 Lambda の if 地獄ではなく Step Functions で組みます。理由は明快——状態が可視化され、人間承認の「待ち」を持て、各ステップを個別にリトライ/DLQ でき、失敗箇所が一目で分かるから。
6.1 ステートマシン(ASL スケッチ)
{
"Comment": "GuardDuty finding -> automated incident response (SOAR)",
"StartAt": "Enrich",
"States": {
"Enrich": {
"Comment": "finding を補強し、対応レベル(notify/contain/approve)を決定する",
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${EnrichRouterFunctionArn}",
"Payload.$": "$"
},
"ResultSelector": { "decision.$": "$.Payload.decision", "finding.$": "$.Payload.finding" },
"Retry": [{ "ErrorEquals": ["States.TaskFailed"], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 }],
"Next": "RouteByDecision"
},
"RouteByDecision": {
"Comment": "decision に応じて分岐。notify-only はそのまま通知へ",
"Type": "Choice",
"Choices": [
{ "Variable": "$.decision.mode", "StringEquals": "contain", "Next": "RouteByResourceType" },
{ "Variable": "$.decision.mode", "StringEquals": "approve", "Next": "WaitForHumanApproval" }
],
"Default": "NotifyAndTicket"
},
"RouteByResourceType": {
"Comment": "リソース種別ごとのプレイブックへ。封じ込めは冪等・取り消し可能なものだけ",
"Type": "Choice",
"Choices": [
{ "Variable": "$.decision.resourceType", "StringEquals": "Instance", "Next": "ContainEC2" },
{ "Variable": "$.decision.resourceType", "StringEquals": "S3Bucket", "Next": "ContainS3" },
{ "Variable": "$.decision.resourceType", "StringEquals": "Kubernetes", "Next": "ContainEKS" }
],
"Default": "NotifyAndTicket"
},
"ContainEC2": { "Type": "Task", "Resource": "${ContainEc2FunctionArn}", "Next": "NotifyAndTicket",
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
"ContainS3": { "Type": "Task", "Resource": "${ContainS3FunctionArn}", "Next": "NotifyAndTicket",
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
"ContainEKS": { "Type": "Task", "Resource": "${ContainEksFunctionArn}", "Next": "NotifyAndTicket",
"Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "NotifyAndTicket" }] },
"WaitForHumanApproval": {
"Comment": "破壊的操作(鍵無効化・終了)は task token で人間の承認を待つ。承認まで実行しない",
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "${RequestApprovalFunctionArn}",
"Payload": { "finding.$": "$.finding", "taskToken.$": "$$.Task.Token" }
},
"TimeoutSeconds": 3600,
"Next": "ExecuteDestructive",
"Catch": [{ "ErrorEquals": ["States.Timeout"], "Next": "NotifyAndTicket" }]
},
"ExecuteDestructive": {
"Comment": "承認済みのときだけ破壊的アクションを実行(鍵無効化等)",
"Type": "Task",
"Resource": "${ExecuteDestructiveFunctionArn}",
"Next": "NotifyAndTicket"
},
"NotifyAndTicket": {
"Comment": "結果を通知し、チケットを起票して終了(常にここを通る)",
"Type": "Task",
"Resource": "${NotifyTicketFunctionArn}",
"End": true
}
}
}
設計判断のポイント:
Catchで必ずNotifyAndTicketに合流——封じ込めが失敗しても、人間が気づける通知とチケットは必ず出す(沈黙を作らない)。waitForTaskTokenで人間承認を「待つ」——Step Functions は task token を返すまで課金されずに待機できる。承認 Lambda が Slack/メールに承認リンクを出し、人間が押すとSendTaskSuccessで再開(破壊的操作だけにこのゲートを使う)。Retryは冪等な Enrich にだけ厚く——封じ込めタスクは Lambda 内部で冪等にする(後述)ので、ステートマシン側の二重起動も安全。
6.2 enrich + route ディスパッチャ(Python・冪等)
ピラー記事の「単一 Lambda で隔離」とは別物の、判断専用ディスパッチャです。副作用を持たず、finding を補強して decision(mode と resourceType)を返すだけ。テストしやすく、最小権限(読み取り+ DynamoDB のみ)で済みます。
"""GuardDuty finding を補強し、対応レベルを決定する enrich/route ディスパッチャ。
設計原則:
- 副作用なし: 判断だけを返す純粋寄りの Lambda。封じ込めは別 Lambda に委譲。
- 冪等: finding.id を冪等キーに、updatedAt で『古い再送』を弾く(DynamoDB 条件付き書き込み)。
- スコープを絞る: 自動封じ込めは CONTAIN_ALLOWLIST に載った型 + High 以上のみ。
- 破壊的は承認待ちへ: DESTRUCTIVE_ALLOWLIST は 'approve' を返し、自動実行しない。
- 可観測: 構造化ログ。機密値(認証情報・PII)は出さない。
"""
from __future__ import annotations
import json
import logging
import os
import time
from typing import Any, Final, TypedDict
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ddb = boto3.client("dynamodb")
IDEMPOTENCY_TABLE: Final[str] = os.environ["IDEMPOTENCY_TABLE"]
IDEMPOTENCY_TTL_SECONDS: Final[int] = 7 * 24 * 3600 # 7日で自動失効(TTL 属性で削除)
# 自動封じ込め(冪等・取り消し可能)を許す finding 型。
CONTAIN_ALLOWLIST: Final[frozenset[str]] = frozenset({
"UnauthorizedAccess:EC2/MaliciousIPCaller.Custom",
"CryptoCurrency:EC2/BitcoinTool.B!DNS",
"Backdoor:EC2/C&CActivity.B!DNS",
"Policy:S3/BucketPublicAccessGranted",
"Policy:S3/BucketAnonymousAccessGranted",
})
# 破壊的操作(鍵無効化等)= 人間承認を挟む型。自動実行しない。
DESTRUCTIVE_ALLOWLIST: Final[frozenset[str]] = frozenset({
"UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS",
"CredentialAccess:IAMUser/AnomalousBehavior",
})
class Decision(TypedDict):
mode: str # "notify" | "contain" | "approve"
resourceType: str # "Instance" | "S3Bucket" | "Kubernetes" | "AccessKey" | ...
reason: str
def handler(event: dict[str, Any], _context: object) -> dict[str, Any]:
detail = event["detail"]
finding_id: str = detail["id"]
updated_at: str = detail.get("updatedAt", detail.get("createdAt", ""))
finding_type: str = detail["type"]
severity: float = float(detail["severity"])
resource_type: str = detail.get("resource", {}).get("resourceType", "Unknown")
log = {"finding_id": finding_id, "type": finding_type,
"severity": severity, "resourceType": resource_type}
# ── 冪等ガード ──
# 同じ finding_id を、同じ(以前の)updatedAt で2回処理しない。
# at-least-once 配信や Step Functions の再試行で再入しても副作用は1回分。
if not _claim_finding(finding_id, updated_at):
logger.info(json.dumps({**log, "decision": "duplicate-skip"}))
return {"finding": _safe_finding(detail), "decision": Decision(
mode="notify", resourceType=resource_type, reason="duplicate (already processed)")}
# ── 判断 ──
if finding_type in DESTRUCTIVE_ALLOWLIST and severity >= 7.0:
decision = Decision(mode="approve", resourceType=resource_type,
reason="destructive remediation requires human approval")
elif finding_type in CONTAIN_ALLOWLIST and severity >= 7.0:
decision = Decision(mode="contain", resourceType=resource_type,
reason="allowlisted reversible containment")
else:
# 許可リスト外・Medium 以下・型不明 → 通知+チケットのみ。
decision = Decision(mode="notify", resourceType=resource_type,
reason="notify-only (not in allowlist or below threshold)")
logger.info(json.dumps({**log, "decision": decision["mode"], "reason": decision["reason"]}))
return {"finding": _safe_finding(detail), "decision": decision}
def _claim_finding(finding_id: str, updated_at: str) -> bool:
"""冪等トークンを確保。新規 or より新しい updatedAt のときだけ True を返す。
条件付き書き込みで『未処理 or 受信した updatedAt がより新しい』場合のみ上書き。
既に同じ/より新しい版を処理済みなら False(=重複)。
"""
now = int(time.time())
try:
ddb.put_item(
TableName=IDEMPOTENCY_TABLE,
Item={
"finding_id": {"S": finding_id},
"updated_at": {"S": updated_at},
"ttl": {"N": str(now + IDEMPOTENCY_TTL_SECONDS)},
},
# 未登録、または登録済みでも今回の updated_at の方が新しいときだけ書く。
ConditionExpression="attribute_not_exists(finding_id) OR updated_at < :u",
ExpressionAttributeValues={":u": {"S": updated_at}},
)
return True
except ClientError as exc:
if exc.response["Error"]["Code"] == "ConditionalCheckFailedException":
return False # 同じ/古い再送 → スキップ
raise
def _safe_finding(detail: dict[str, Any]) -> dict[str, Any]:
"""下流に渡す finding から、機密になり得るフィールドを落とす(ログ・通知に乗せない)。"""
keep = ("id", "type", "severity", "accountId", "region", "title", "updatedAt")
slim = {k: detail.get(k) for k in keep}
res = detail.get("resource", {})
slim["resourceType"] = res.get("resourceType")
slim["instanceId"] = res.get("instanceDetails", {}).get("instanceId")
slim["bucketName"] = (res.get("s3BucketDetails") or [{}])[0].get("name")
return slim
このコードの設計を明示します。
- 冪等は DynamoDB の条件付き書き込みで担保:
finding_idをキーに「未処理 or より新しいupdatedAt」のときだけ書く。2回目以降はConditionalCheckFailedExceptionでduplicate-skipになり、封じ込めは走りません。TTL でレコードは自動失効。これが**決済の「同じイベントを2回処理しても1回分」**と同型の防御です。 - 判断と手当ての分離(SRP):この Lambda は
ec2:*もiam:*も要りません。権限は DynamoDB への put と読み取りだけ。手当ての強い権限は、種別ごとの封じ込め Lambda にだけ与えます。 - 機密を下流に流さない(可観測性 × セキュリティ):
_safe_findingで必要なフィールドだけに絞り、通知・ログに認証情報や PII が混ざらないようにします。
6.3 封じ込め Lambda 側も冪等に
種別ごとの封じ込め Lambda(例:EC2)は、ピラー記事の隔離ロジックと同様にそれ自体も冪等にします(隔離タグでガードし、2回目は already-quarantined で no-op)。「ディスパッチャの冪等」と「封じ込めの冪等」を二重に持つことで、どこで再送・再試行が起きても安全になります。これは多層の冪等防御です。
7. 最小権限・可観測性・テスト:本番品質は配管で決まる
7.1 責任者ロールは「種別ごとに・最小に」
自動対応の IAM は、攻撃者にとって魅力的な標的です(侵害すれば隔離も鍵削除もできてしまう)。だから種別ごとに分割し、各 Lambda の権限を最小にします。EC2 封じ込め Lambda の例:
# EC2 封じ込め Lambda の実行ロール。EC2 の隔離に必要な action だけ。
data "aws_iam_policy_document" "contain_ec2" {
statement {
sid = "DescribeForQuarantine"
actions = ["ec2:DescribeInstances"]
resources = ["*"] # Describe は arn 単位で絞れないため condition で補う
}
statement {
sid = "QuarantineActions"
actions = [
"ec2:ModifyNetworkInterfaceAttribute", # ENI を隔離SGに付け替え
"ec2:CreateTags", # 冪等ガード用の隔離タグ
]
resources = ["*"]
condition {
test = "StringEquals"
variable = "aws:RequestedRegion"
values = [var.region] # 対象リージョンに限定
}
}
# 重要: terminate / delete 系は与えない。破壊的操作は別ロール+承認後に限る。
}
EventBridge → Step Functions のロールも最小に:EventBridge が起動できるのはその1本のステートマシンだけになるよう
resourcesを絞ります。「EventBridge が任意の SFN を起動できる」ロールは過剰権限です。
7.2 可観測性:構造化ログ・コンソール深リンク・機密ゼロ
- **構造化ログ(JSON)**で
finding_id/type/severity/decision/actionを出し、後から「どの finding にどう対応したか」を機械的に追えるようにする。 - 通知にはコンソールへの深リンクを入れて、人間が1クリックで finding にたどり着けるようにする(
https://console.aws.amazon.com/guardduty/home?region=<region>#/findings?search=id%3D<id>)。 - 機密値(認証情報・PII・生のリソース詳細)はログにも通知にも出さない(6.2 の
_safe_finding)。可観測性とセキュリティは両立させる。
7.3 テスト:実際の攻撃を待たずに経路を検証する
「検証経路を先に作る」原則の出番です。GuardDuty はサンプル finding を意図的に生成できます。これで EventBridge → Step Functions → 封じ込めの経路を、本物の攻撃前に通します。
# 各 finding 型のサンプルを発火させ、SOAR パイプラインが想定通り動くか検証する。
aws guardduty create-sample-findings \
--detector-id "$DETECTOR_ID" \
--finding-types "UnauthorizedAccess:EC2/MaliciousIPCaller.Custom" \
"Policy:S3/BucketPublicAccessGranted" \
"UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.OutsideAWS"
検証の階層:
- ユニットテスト:
handlerの判断ロジックを、サンプル finding の JSON を入力に網羅。boto3クライアントをスタブ化し、contain/approve/notify/duplicate-skipの分岐を全部叩く(副作用がないので速い)。 - 冪等テスト:同じ finding を2回入力して、2回目が
duplicate-skipになることを確認。 - 経路テスト(E2E):
create-sample-findingsで実イベントを流し、Step Functions の実行履歴で各ステートが期待どおり遷移したかを確認。封じ込め Lambda は最初DRY_RUN=trueにして「判断はするが手は出さない」状態で本番トラフィックに当て、意思決定だけを先に検証してから手当てを有効化する。
段階導入:
DRY_RUNモードは「自動化を本番に入れる」最大の不安——誤対応——を抑える安全弁です。まず数週間 DRY_RUN で「もし自動化していたら、どの finding にどう対応していたか」をログで観察し、誤対応がゼロだと確認してから手当てを ON にします。これは WAF を Count から Block に上げるのと同じ「観測してから強制」の発想です。
8. 人手起動の窓口:Security Hub カスタムアクション
完全自動化が怖い対応(鍵削除・本番終了)や、自動許可リストに載せていない型に対しては、アナリストがコンソールから引き金を引ける窓口を用意します。Security Hub のカスタムアクションがそれです。
仕組み:Security Hub にカスタムアクション(例:Quarantine)を作ると、アナリストがコンソールで finding を選んでそのアクションを実行したとき、EventBridge イベントが飛びます。形はこうです(公式)。
{
"version": "0",
"detail-type": "Security Hub Findings - Custom Action",
"source": "aws.securityhub",
"account": "111122223333",
"region": "ap-northeast-1",
"resources": [
"arn:aws:securityhub:ap-northeast-1:111122223333:action/custom/quarantine"
],
"detail": {
"actionName": "quarantine",
"actionDescription": "Quarantine the affected resource",
"findings": [ { "...": "ASFF 形式の finding 1件" } ]
}
}
このイベントを拾う EventBridge ルールは、カスタムアクションの ARN(resources)でマッチさせ、同じ Step Functions を起動します。
# Security Hub の手動カスタムアクション("Quarantine")を拾い、同じ SOAR 配管へ。
resource "aws_cloudwatch_event_rule" "sh_custom_action" {
name = "securityhub-quarantine-custom-action"
event_pattern = jsonencode({
source = ["aws.securityhub"]
"detail-type" = ["Security Hub Findings - Custom Action"]
resources = ["arn:aws:securityhub:${var.region}:${var.account_id}:action/custom/quarantine"]
})
}
resource "aws_cloudwatch_event_target" "sh_to_sfn" {
rule = aws_cloudwatch_event_rule.sh_custom_action.name
arn = aws_sfn_state_machine.responder.arn
role_arn = aws_iam_role.eventbridge_invoke_sfn.arn
}
設計の含意:これで 「自動で起きる対応(GuardDuty → EventBridge)」と「人が起こす対応(Security Hub → EventBridge)」が、同じ Step Functions に集約されます。配管が1本なので、ロジックの重複(DRY 違反)が起きず、テストも一箇所で済みます。GuardDuty の finding を Security Hub に集約しておけば、複数の検知ソース(Macie・Inspector・サードパーティ)に対しても**同じ「Quarantine ボタン」**を提供できます。
9. まとめ:GuardDuty 自動インシデント対応チートシート
迷ったときの早見表です。
- 何を作るのか:finding を enrich → decide → contain → notify → ticket に変える「決める配管」。GuardDuty は検知、対応の価値は MTTR で決まる。
- EventBridge ルーティング:
source=aws.guardduty/detail-type="GuardDuty Finding"。detail.severityは数値 →{"numeric":[">=",7]}で High 以上を機械的に拾う。AttackSequence:*は必ず Critical=最優先トリガー。抑制で自動アーカイブされた finding は EventBridge に来ない。 - 遅延の罠:新規 finding は約5分、再発はデフォルト6時間。自動対応には
finding_publishing_frequency = FIFTEEN_MINUTES(委任管理者が設定=全メンバーに波及)。 - 対応設計の原則:通知は広く(High 以上全部)・封じ込めは型の許可リストで狭く(取り消せる操作だけ)・破壊的操作は人間承認を挟む。
- 冪等性:EventBridge は at-least-once。finding の
idを冪等キー、updatedAtで古い再送を弾く(条件付き書き込み)。封じ込め Lambda 側も二重に冪等化。決済の「二重課金0件」と同型。 - プレイブック(種別ごと):EC2 → 隔離 SG 付け替え(既存接続は SG では切れない)。IAM AccessKey → 無効化/失効は承認後(
AKIA/ASIAを区別)。S3 → Block Public Access。EKS → cordon+pod 隔離。 - オーケストレーション:Step Functions。
waitForTaskTokenで人間承認を待ち、Catchで必ず NotifyAndTicket に合流(沈黙を作らない)。 - 本番品質:DLQ が空でないことをアラート、責任者ロールは種別ごとに最小権限、構造化ログ+深リンク・機密ゼロ、
create-sample-findingsで経路検証、DRY_RUNで意思決定を先に本番検証。 - 人手起動:Security Hub カスタムアクション(
detail-type="Security Hub Findings - Custom Action")で同じ配管をアナリストが起動。
GuardDuty を「赤いダッシュボード」で終わらせるか、「検知が即・安全な対応に変わる仕組み」にできるかは、finding を冪等で・スコープを絞った・取り消せる自動対応に変える配管を作れるかで決まります。最大のレバレッジは検知そのものより、その先の配管にあります。
私はマルチアカウントのサーバーレス決済プラットフォームで、実際の金銭・カーボンクレジット・地域通貨を扱う基盤の IAM・可観測性・DR を横断実装し、「正しさ」を運用の注意深さではなくコードの構造と冪等性で担保してきました——at-least-once な世界で同じ決済イベントを2回受けても課金は1回にする、その発想はそのまま自動インシデント対応に転用できます。GuardDuty の自動対応も同じ思想で設計します——①EventBridge で severity と型を機械的に切り分け、②Step Functions で「決める配管」を可視化し、③finding id を冪等キーに、許可リストで・取り消せる・承認ゲート付きの対応にする。検知を行動に変える配管を、運用に載る形まで作り切ります。インシデント対応そのものの運用設計(ランブック・オンコール・ポストモーテム)と組み合わせれば、自動対応と人間の判断がかみ合った体制になります。
「自社の GuardDuty にどこまで自動対応を任せ、どこに人間の承認ゲートを置き、どう冪等・最小権限・取り消し可能に設計するか」——EventBridge ルーティングから Step Functions のオーケストレーション、リソース種別ごとのプレイブック、テストと段階導入まで、一人 ×生成AI(Claude Code)で速く・安全に伴走できます。 要件の整理段階からでも、お気軽にご相談ください。
参考(公式ドキュメント)
- Processing GuardDuty findings with Amazon EventBridge — finding イベントの schema(source/detail-type/detail)・配信頻度(新規は near real-time、再発は 15分/1時間/6時間デフォルト)・severity 数値マッチ・抑制アーカイブと EventBridge の関係
- Remediating detected GuardDuty security findings — リソース種別ごとの推奨対応(EC2/S3/AccessKey/ECS/EKS/RDS/Lambda)
- Remediating a potentially compromised EC2 instance — Isolation SG(0.0.0.0/0 を許可しない)・他 SG を外す・既存接続は SG では切れない
- Remediating potentially compromised AWS credentials —
AKIA/ASIAの区別・権限確認・正規利用の確認・ローテーション - Remediating a potentially compromised S3 bucket —
ANONYMOUS_PRINCIPAL・S3 Block Public Access・バケットポリシー見直し - Severity levels of GuardDuty findings — Critical 9.0–10.0 / High 7.0–8.9 / Medium 4.0–6.9 / Low 1.0–3.9 の定義
- Using EventBridge for automated response and remediation (Security Hub) — Security Hub → EventBridge・自動/カスタムアクション・Step Functions 起動
- EventBridge event formats for Security Hub CSPM —
Security Hub Findings - Custom Actionの event 形式(actionName / findings / custom action ARN)