「動画を処理する」「数万件をバッチで変換する」「LLMで一括分類する」——こうした処理を同期HTTPで抱え込むと、必ず破綻します。リクエストタイムアウト(Cloud Runは最大60分)を超え、クライアントが切れても処理は止められず、リトライで多重実行が起きる。正解は**「受付(Service)と実行(Job/Workflow)を分ける」**ことです。
私は放送事業者向けプラットフォームで、テロップ誤字検出パイプライン——動画から字幕を抽出し、OCR(画像)と音声認識(発話)を照合して誤字を検出する重い処理——を、まさにこの形で作りました。HTTPのAPIはジョブを起動して受付IDを返すだけにし、重い処理は Cloud Run Jobs + Cloud Workflows で並列実行。OCRと音声認識を並列化して 逐次18分→並列13分(約30%短縮) を実現し、長時間ジョブはセグメントIDを決定的に採番して冪等・再開可能に設計しました。
本記事は、その設計を公式ドキュメントに忠実に再現します。Servicesそのものの本番運用は Cloud Run 本番運用ガイド を参照してください。
まず分類する:Services / Jobs / Workflows / Worker Pools
処理を正しい箱に入れるだけで、設計の8割が決まります。
| 箱 | 性質 | 使い所 |
|---|---|---|
| Services | 同期HTTPを捌く | API・Webアプリ・Webhook |
| Jobs | 実行して完了したら止まる | バッチ・DBマイグレーション・一括変換・長時間処理 |
| Workflows | 複数ステップのオーケストレーション | ジョブ/サービス/APIを順序・並列・条件で束ねる |
| Worker Pools | 常駐バックグラウンド | Pub/Sub pull・Kafkaコンシューマ |
「重いからJob」「複数を束ねるからWorkflow」——この一次分類を最初にやります。
Cloud Run Jobs:タスク分割で並列に捌く
ジョブは1つのジョブを最大10,000タスクに分割し、並列実行できます。主要なフラグは——
| フラグ | 意味 | 既定/上限 |
|---|---|---|
--tasks | タスク数(仕事を何分割するか) | 既定1・最大10,000 |
--parallelism | 同時に走らせるタスク数 | 既定は可能な限り並列 |
--max-retries | 失敗タスクの再試行回数 | 既定3・最大10 |
--task-timeout | 1タスクの最大実行時間 | 既定10分・最大168時間(7日) |
各タスクは、自分が「全体の何番目か」を環境変数で知ります。
CLOUD_RUN_TASK_INDEX:このタスクの番号(0〜タスク数-1)CLOUD_RUN_TASK_COUNT:ジョブ全体のタスク数CLOUD_RUN_TASK_ATTEMPT:このタスクの試行回数(リトライで増える)
タスクは終了コード0で成功。失敗すると --max-retries まで自動再試行され、あるタスクがリトライを使い切るとジョブ実行全体が失敗します。
シャーディング:INDEX で担当範囲を決める
「10,000件を100タスクで並列処理する」典型パターン。各タスクが INDEX から自分の担当スライスを決定的に計算します。
import os
task_index = int(os.environ["CLOUD_RUN_TASK_INDEX"]) # 0..count-1
task_count = int(os.environ["CLOUD_RUN_TASK_COUNT"]) # 例: 100
items = load_all_item_ids() # 全件(外部ストアから取得)
# 自分の担当だけを処理(重複なく・漏れなく全体をカバー)
my_items = items[task_index::task_count]
for item_id in my_items:
process_one(item_id) # ← この1件が冪等であることが命(後述)
# 100タスク・最大20並列・各タスク最大1時間でデプロイして実行
gcloud run jobs deploy batch-convert \
--image asia-northeast1-docker.pkg.dev/PROJECT_ID/app/batch:${GIT_SHA} \
--region asia-northeast1 \
--tasks 100 --parallelism 20 \
--max-retries 3 --task-timeout 3600s \
--service-account batch@PROJECT_ID.iam.gserviceaccount.com
gcloud run jobs execute batch-convert --region asia-northeast1 --wait
冪等・再開:本番品質の核心
ジョブは必ずリトライされ得る前提で設計します(タスク失敗・SIGTERM・再実行)。だから**「同じ処理が2回走っても結果が変わらない(冪等)」**でなければ、データが壊れます。これは私が本番二重課金0件の決済基盤で徹底したのと同じ原則です。
冪等にする2つの定石:
- 決定的なID採番:タスク内のローカルIDを、
INDEXを使ってグローバルに一意化する。並列でも再実行でもIDが安定する。
# セグメント単位に「グローバルID = INDEX × stride + local_id」で決定的に採番。
# 並列処理しても後段マージでIDが衝突せず、再実行でも同じIDに収束する。
STRIDE = 100_000
def global_id(local_id: int) -> int:
return task_index * STRIDE + local_id
- 書き込みを冪等にする:出力は「あれば上書き/無視」できる形に。オブジェクト保存なら決定的なオブジェクト名へ上書き、DBならUPSERT+一意制約。
# 出力先を決定的なパスにする → 再実行は同じ場所を上書きするだけ(重複生成しない)
output_path = f"results/{global_id(local_id)}.json"
storage_client.bucket("outputs").blob(output_path).upload_from_string(payload)
私のテロップ誤字検出では、セグメントごとに
グローバルテロップID = segment_index × stride + local_idを決定的に採番し、並列処理・部分再実行・再試行のどれでも最終結果が一意に収束するようにしました。「丁寧にリトライ制御する」のではなく、「リトライされても壊れない」よう構造で保証する——これが回復性の本質です。冪等な非同期処理の一般論は SQS/Lambdaの冪等処理 も参考になります。
起動する:スケジュール実行とイベント駆動
Cloud Scheduler(cron)
定期バッチは Cloud Scheduler から起動します。ジョブの実行はCloud Run Admin APIの jobs:run を叩くため、**OAuth認証(--oauth-service-account-email)**を使います(サービス呼び出しのOIDCとは別)。
# 毎日午前2時にジョブを起動。SAには roles/run.invoker が必要。
gcloud scheduler jobs create http nightly-batch \
--location asia-northeast1 \
--schedule="0 2 * * *" \
--uri="https://run.googleapis.com/v2/projects/PROJECT_ID/locations/asia-northeast1/jobs/batch-convert:run" \
--http-method POST \
--oauth-service-account-email scheduler@PROJECT_ID.iam.gserviceaccount.com
Eventarc(イベント駆動)
「ファイルがアップロードされたら処理」はEventarcで。GCSの確定イベントなどを受けてサービス/ジョブを起動します。
# GCSへのアップロード(finalized)を受けてスキャナサービスを起動
gcloud eventarc triggers create scan-on-upload \
--location asia-northeast1 \
--destination-run-service malware-scanner \
--event-filters "type=google.cloud.storage.object.v1.finalized" \
--event-filters "bucket=uploads-raw" \
--service-account eventarc@PROJECT_ID.iam.gserviceaccount.com
私のマルウェアスキャナは、GCSアップロードをEventarcで受けてClamAV(Cloud Run)に渡し、最大10GiBをバッファせずストリーミング検査してクリーン/隔離バケットへ振り分けていました。
File.moveの原子性で再試行に冪等にし、ゼロ長・アップロード中・削除済みは安全に無視。イベント駆動 × 冪等の典型です。
Cloud Workflows:複数ステップを束ねる
「ジョブAの後にBを並列、結果を集約してCを呼ぶ」——こうしたオーケストレーションは Cloud Workflows が担います。YAML/JSONでステップを宣言し、ステップ実行ごとの課金(アイドル時は無料)。HTTP呼び出しとGoogle Cloudコネクタ(Cloud Run含む)に対応し、最大1年まで状態を保持・待機できます。
# workflow.yaml — OCRと音声認識を並列実行し、結果を照合する(テロップ誤字検出の骨子)
main:
params: [input]
steps:
- extract_subtitles:
call: http.post
args:
url: ${"https://api-xxxxx.a.run.app/extract"}
auth: { type: OIDC } # 認証付きCloud Runサービスを安全に呼ぶ
body: { video: ${input.video} }
result: frames
# ★ OCRと音声認識は相互依存しない → 並列実行で時間短縮
- analyze_in_parallel:
parallel:
shared: [ocr_result, asr_result]
branches:
- ocr_branch:
steps:
- run_ocr:
call: http.post
args:
url: ${"https://ocr-xxxxx.a.run.app/run"}
auth: { type: OIDC }
body: { frames: ${frames.body} }
result: ocr_result
- asr_branch:
steps:
- run_asr:
call: http.post
args:
url: ${"https://asr-xxxxx.a.run.app/run"}
auth: { type: OIDC }
body: { video: ${input.video} }
result: asr_result
- reconcile:
call: http.post
args:
url: ${"https://api-xxxxx.a.run.app/reconcile"}
auth: { type: OIDC }
body: { ocr: ${ocr_result.body}, asr: ${asr_result.body} }
result: report
- done:
return: ${report.body}
リトライとエラー処理
不安定な外部呼び出しは、指数バックオフ付きリトライを宣言で重ねます。
- call_flaky_service:
try:
call: http.post
args:
url: ${"https://flaky-xxxxx.a.run.app/run"}
auth: { type: OIDC }
result: r
retry:
predicate: ${http.default_retry_predicate} # 5xx等で再試行
max_retries: 5
backoff: { initial_delay: 1, max_delay: 60, multiplier: 2 }
except:
as: e
steps:
- handle:
call: sys.log
args: { text: ${"failed: " + e.message}, severity: ERROR }
並列(parallel)で速くし、try/retry/except で壊れにくくする——これがWorkflowsの2大価値です。
いつ Jobs / いつ Workflows / いつ Worker Pools
- 単一の重い処理を並列に捌く → Jobs(
--tasksでシャーディング)。 - 複数の処理を順序・並列・条件で束ねる/長く待つ → Workflows(オーケストレーター)。
- キューを常時pullし続ける → Worker Pools(Pub/Sub pull・Kafka)。
- 多くの本番系は組み合わせになります(私のパイプラインは「Workflowsが全体を統率し、各重い処理はJob/Service、入口はEventarc」)。
本番投入チェックリスト
- HTTPに向かない処理を Services から Jobs/Workflows へ切り離した
- タスクは
CLOUD_RUN_TASK_INDEXで担当範囲を決定的に分割 - 各タスクの単位処理が 冪等(決定的ID+UPSERT/上書き)
- リトライ前提で設計(
--max-retries/Workflowsのretry) -
--task-timeoutを処理に合わせて設定(最大7日) - 起動は Cloud Scheduler(cron・OAuth) か Eventarc(イベント)
- 複数ステップは Workflows(
parallelで並列・try/exceptで回復) - 進捗・結果を 可観測に(構造化ログ・メトリクス・必要なら進捗ストア)
まとめ:重い処理は「切り離して・冪等にして・束ねる」
Cloud Runの本番設計は、**「HTTPに向かない処理を正しい箱に入れる」**ことから始まります。重い処理は Jobs に切り離し、タスク分割で並列に捌き、決定的IDで冪等・再開可能にし、Workflows で順序・並列・回復性を束ねる。これで、動画処理も大規模バッチも、止まらず・壊れず・速く回せます。
私が放送局の制作ワークフローで「完走・再開・冪等」を作り込めたのは、この分解があったからです。全体設計は Cloud Run 本番運用ガイド、CI/CDは Cloud Run CI/CDガイド、コストは 並行性・課金ガイド へ。バッチ・非同期基盤の設計でお困りなら、実装まで伴走します。