メインコンテンツへスキップ
友田 陽大
Google Cloud Run 本番運用
GCP
Cloud Run
Cloud Workflows
バッチ処理
冪等性
サーバーレス
信頼性
回復性

Cloud Run Jobs と Cloud Workflows:長時間バッチ・並列処理を冪等・再開可能に設計する

HTTPに向かない処理(バッチ・長時間ジョブ・並列処理)をCloud Run JobsとCloud Workflowsで本番品質に作る実装ガイド。--tasks/--parallelismによるシャーディング、CLOUD_RUN_TASK_INDEXでの分割、決定的IDによる冪等・再開設計、Cloud Schedulerのcron実行とEventarcのイベント駆動、Workflowsの並列・リトライ・エラー処理までを、gcloud・YAML・Pythonの実コードで解説します。

公開日
読了時間
9分
著者
友田 陽大
シェア

「動画を処理する」「数万件をバッチで変換する」「LLMで一括分類する」——こうした処理を同期HTTPで抱え込むと、必ず破綻します。リクエストタイムアウト(Cloud Runは最大60分)を超え、クライアントが切れても処理は止められず、リトライで多重実行が起きる。正解は**「受付(Service)と実行(Job/Workflow)を分ける」**ことです。

私は放送事業者向けプラットフォームで、テロップ誤字検出パイプライン——動画から字幕を抽出し、OCR(画像)と音声認識(発話)を照合して誤字を検出する重い処理——を、まさにこの形で作りました。HTTPのAPIはジョブを起動して受付IDを返すだけにし、重い処理は Cloud Run Jobs + Cloud Workflows で並列実行。OCRと音声認識を並列化して 逐次18分→並列13分(約30%短縮) を実現し、長時間ジョブはセグメントIDを決定的に採番して冪等・再開可能に設計しました。

本記事は、その設計を公式ドキュメントに忠実に再現します。Servicesそのものの本番運用は Cloud Run 本番運用ガイド を参照してください。


まず分類する:Services / Jobs / Workflows / Worker Pools

処理を正しい箱に入れるだけで、設計の8割が決まります。

性質使い所
Services同期HTTPを捌くAPI・Webアプリ・Webhook
Jobs実行して完了したら止まるバッチ・DBマイグレーション・一括変換・長時間処理
Workflows複数ステップのオーケストレーションジョブ/サービス/APIを順序・並列・条件で束ねる
Worker Pools常駐バックグラウンドPub/Sub pull・Kafkaコンシューマ

「重いからJob」「複数を束ねるからWorkflow」——この一次分類を最初にやります。


Cloud Run Jobs:タスク分割で並列に捌く

ジョブは1つのジョブを最大10,000タスクに分割し、並列実行できます。主要なフラグは——

フラグ意味既定/上限
--tasksタスク数(仕事を何分割するか)既定1・最大10,000
--parallelism同時に走らせるタスク数既定は可能な限り並列
--max-retries失敗タスクの再試行回数既定3・最大10
--task-timeout1タスクの最大実行時間既定10分・最大168時間(7日)

各タスクは、自分が「全体の何番目か」を環境変数で知ります。

  • CLOUD_RUN_TASK_INDEX:このタスクの番号(0タスク数-1
  • CLOUD_RUN_TASK_COUNT:ジョブ全体のタスク数
  • CLOUD_RUN_TASK_ATTEMPT:このタスクの試行回数(リトライで増える)

タスクは終了コード0で成功。失敗すると --max-retries まで自動再試行され、あるタスクがリトライを使い切るとジョブ実行全体が失敗します。

シャーディング:INDEX で担当範囲を決める

「10,000件を100タスクで並列処理する」典型パターン。各タスクが INDEX から自分の担当スライスを決定的に計算します。

import os

task_index = int(os.environ["CLOUD_RUN_TASK_INDEX"])   # 0..count-1
task_count = int(os.environ["CLOUD_RUN_TASK_COUNT"])    # 例: 100

items = load_all_item_ids()        # 全件(外部ストアから取得)
# 自分の担当だけを処理(重複なく・漏れなく全体をカバー)
my_items = items[task_index::task_count]

for item_id in my_items:
    process_one(item_id)           # ← この1件が冪等であることが命(後述)
# 100タスク・最大20並列・各タスク最大1時間でデプロイして実行
gcloud run jobs deploy batch-convert \
  --image asia-northeast1-docker.pkg.dev/PROJECT_ID/app/batch:${GIT_SHA} \
  --region asia-northeast1 \
  --tasks 100 --parallelism 20 \
  --max-retries 3 --task-timeout 3600s \
  --service-account batch@PROJECT_ID.iam.gserviceaccount.com
gcloud run jobs execute batch-convert --region asia-northeast1 --wait

冪等・再開:本番品質の核心

ジョブは必ずリトライされ得る前提で設計します(タスク失敗・SIGTERM・再実行)。だから**「同じ処理が2回走っても結果が変わらない(冪等)」**でなければ、データが壊れます。これは私が本番二重課金0件の決済基盤で徹底したのと同じ原則です。

冪等にする2つの定石:

  1. 決定的なID採番:タスク内のローカルIDを、INDEX を使ってグローバルに一意化する。並列でも再実行でもIDが安定する。
# セグメント単位に「グローバルID = INDEX × stride + local_id」で決定的に採番。
# 並列処理しても後段マージでIDが衝突せず、再実行でも同じIDに収束する。
STRIDE = 100_000
def global_id(local_id: int) -> int:
    return task_index * STRIDE + local_id
  1. 書き込みを冪等にする:出力は「あれば上書き/無視」できる形に。オブジェクト保存なら決定的なオブジェクト名へ上書き、DBならUPSERT+一意制約
# 出力先を決定的なパスにする → 再実行は同じ場所を上書きするだけ(重複生成しない)
output_path = f"results/{global_id(local_id)}.json"
storage_client.bucket("outputs").blob(output_path).upload_from_string(payload)

私のテロップ誤字検出では、セグメントごとに グローバルテロップID = segment_index × stride + local_id を決定的に採番し、並列処理・部分再実行・再試行のどれでも最終結果が一意に収束するようにしました。「丁寧にリトライ制御する」のではなく、「リトライされても壊れない」よう構造で保証する——これが回復性の本質です。冪等な非同期処理の一般論は SQS/Lambdaの冪等処理 も参考になります。


起動する:スケジュール実行とイベント駆動

Cloud Scheduler(cron)

定期バッチは Cloud Scheduler から起動します。ジョブの実行はCloud Run Admin APIの jobs:run を叩くため、**OAuth認証(--oauth-service-account-email)**を使います(サービス呼び出しのOIDCとは別)。

# 毎日午前2時にジョブを起動。SAには roles/run.invoker が必要。
gcloud scheduler jobs create http nightly-batch \
  --location asia-northeast1 \
  --schedule="0 2 * * *" \
  --uri="https://run.googleapis.com/v2/projects/PROJECT_ID/locations/asia-northeast1/jobs/batch-convert:run" \
  --http-method POST \
  --oauth-service-account-email scheduler@PROJECT_ID.iam.gserviceaccount.com

Eventarc(イベント駆動)

「ファイルがアップロードされたら処理」はEventarcで。GCSの確定イベントなどを受けてサービス/ジョブを起動します。

# GCSへのアップロード(finalized)を受けてスキャナサービスを起動
gcloud eventarc triggers create scan-on-upload \
  --location asia-northeast1 \
  --destination-run-service malware-scanner \
  --event-filters "type=google.cloud.storage.object.v1.finalized" \
  --event-filters "bucket=uploads-raw" \
  --service-account eventarc@PROJECT_ID.iam.gserviceaccount.com

私のマルウェアスキャナは、GCSアップロードをEventarcで受けてClamAV(Cloud Run)に渡し、最大10GiBをバッファせずストリーミング検査してクリーン/隔離バケットへ振り分けていました。File.move の原子性で再試行に冪等にし、ゼロ長・アップロード中・削除済みは安全に無視。イベント駆動 × 冪等の典型です。


Cloud Workflows:複数ステップを束ねる

「ジョブAの後にBを並列、結果を集約してCを呼ぶ」——こうしたオーケストレーションは Cloud Workflows が担います。YAML/JSONでステップを宣言し、ステップ実行ごとの課金(アイドル時は無料)。HTTP呼び出しとGoogle Cloudコネクタ(Cloud Run含む)に対応し、最大1年まで状態を保持・待機できます。

# workflow.yaml — OCRと音声認識を並列実行し、結果を照合する(テロップ誤字検出の骨子)
main:
  params: [input]
  steps:
    - extract_subtitles:
        call: http.post
        args:
          url: ${"https://api-xxxxx.a.run.app/extract"}
          auth: { type: OIDC }   # 認証付きCloud Runサービスを安全に呼ぶ
          body: { video: ${input.video} }
        result: frames

    # ★ OCRと音声認識は相互依存しない → 並列実行で時間短縮
    - analyze_in_parallel:
        parallel:
          shared: [ocr_result, asr_result]
          branches:
            - ocr_branch:
                steps:
                  - run_ocr:
                      call: http.post
                      args:
                        url: ${"https://ocr-xxxxx.a.run.app/run"}
                        auth: { type: OIDC }
                        body: { frames: ${frames.body} }
                      result: ocr_result
            - asr_branch:
                steps:
                  - run_asr:
                      call: http.post
                      args:
                        url: ${"https://asr-xxxxx.a.run.app/run"}
                        auth: { type: OIDC }
                        body: { video: ${input.video} }
                      result: asr_result

    - reconcile:
        call: http.post
        args:
          url: ${"https://api-xxxxx.a.run.app/reconcile"}
          auth: { type: OIDC }
          body: { ocr: ${ocr_result.body}, asr: ${asr_result.body} }
        result: report
    - done:
        return: ${report.body}

リトライとエラー処理

不安定な外部呼び出しは、指数バックオフ付きリトライを宣言で重ねます。

    - call_flaky_service:
        try:
          call: http.post
          args:
            url: ${"https://flaky-xxxxx.a.run.app/run"}
            auth: { type: OIDC }
          result: r
        retry:
          predicate: ${http.default_retry_predicate}   # 5xx等で再試行
          max_retries: 5
          backoff: { initial_delay: 1, max_delay: 60, multiplier: 2 }
        except:
          as: e
          steps:
            - handle:
                call: sys.log
                args: { text: ${"failed: " + e.message}, severity: ERROR }

並列(parallel)で速くし、try/retry/except で壊れにくくする——これがWorkflowsの2大価値です。


いつ Jobs / いつ Workflows / いつ Worker Pools

  • 単一の重い処理を並列に捌くJobs--tasks でシャーディング)。
  • 複数の処理を順序・並列・条件で束ねる/長く待つWorkflows(オーケストレーター)。
  • キューを常時pullし続けるWorker Pools(Pub/Sub pull・Kafka)。
  • 多くの本番系は組み合わせになります(私のパイプラインは「Workflowsが全体を統率し、各重い処理はJob/Service、入口はEventarc」)。

本番投入チェックリスト

  • HTTPに向かない処理を Services から Jobs/Workflows へ切り離した
  • タスクは CLOUD_RUN_TASK_INDEX で担当範囲を決定的に分割
  • 各タスクの単位処理が 冪等(決定的ID+UPSERT/上書き)
  • リトライ前提で設計(--max-retries/Workflowsのretry
  • --task-timeout を処理に合わせて設定(最大7日)
  • 起動は Cloud Scheduler(cron・OAuth)Eventarc(イベント)
  • 複数ステップは Workflowsparallelで並列・try/exceptで回復)
  • 進捗・結果を 可観測に(構造化ログ・メトリクス・必要なら進捗ストア)

まとめ:重い処理は「切り離して・冪等にして・束ねる」

Cloud Runの本番設計は、**「HTTPに向かない処理を正しい箱に入れる」**ことから始まります。重い処理は Jobs に切り離し、タスク分割で並列に捌き、決定的IDで冪等・再開可能にし、Workflows で順序・並列・回復性を束ねる。これで、動画処理も大規模バッチも、止まらず・壊れず・速く回せます。

私が放送局の制作ワークフローで「完走・再開・冪等」を作り込めたのは、この分解があったからです。全体設計は Cloud Run 本番運用ガイド、CI/CDは Cloud Run CI/CDガイド、コストは 並行性・課金ガイド へ。バッチ・非同期基盤の設計でお困りなら、実装まで伴走します。

友田

友田 陽大

経済産業大臣賞 受賞プロダクト開発者。TypeScript + Python + AWS で、SaaS・業界DX・ 実用レベルの生成AI(RAG)を、要件定義からインフラ・運用まで一人で完遂します。

この記事の実装を、案件として承ります

GCP / Cloud Run のコンテナ基盤を、設計から本番運用・コスト最適化まで

Cloud Run(サービス+ジョブ)でのコンテナ基盤構築、AWS/オンプレからの移行、CI/CD(Workload Identityで鍵レス)、Cloud Armor・最小権限の多層防御、並行性と課金モデルのコスト最適化まで。放送事業者向けプラットフォームをGCPにIaCで構築・運用した知見で、速く・安く・安全に伴走します。

プロジェクト単位(請負)・技術顧問のどちらにも対応可能です。まずは30分の無料技術相談から。

あわせて読みたい