# Cloud Run Jobs と Cloud Workflows：長時間バッチ・並列処理を冪等・再開可能に設計する

> HTTPに向かない処理（バッチ・長時間ジョブ・並列処理）をCloud Run JobsとCloud Workflowsで本番品質に作る実装ガイド。--tasks/--parallelismによるシャーディング、CLOUD_RUN_TASK_INDEXでの分割、決定的IDによる冪等・再開設計、Cloud Schedulerのcron実行とEventarcのイベント駆動、Workflowsの並列・リトライ・エラー処理までを、gcloud・YAML・Pythonの実コードで解説します。

- 公開日: 2026-06-28
- 著者: 友田 陽大
- タグ: GCP, Cloud Run, Cloud Workflows, バッチ処理, 冪等性, サーバーレス, 信頼性, 回復性
- URL: https://tomodahinata.com/blog/google-cloud-run-jobs-workflows-batch-async-idempotent-guide
- カテゴリ: Google Cloud Run 本番運用
- 総合ガイド: https://tomodahinata.com/blog/google-cloud-run-production-guide

## 要点

- Servicesは同期HTTPを捌くもの。実行して完了する処理はCloud Run Jobs、複数サービスのオーケストレーションはCloud Workflows、常駐バックグラウンドはWorker Poolsに分ける
- Jobsは1ジョブを最大10,000タスクに分割し並列実行できる。各タスクはCLOUD_RUN_TASK_INDEX（0〜count-1）/COUNT/ATTEMPTで自分の担当範囲を知る。終了コード0で成功、失敗は--max-retries（既定3・最大10）まで再試行
- 本番品質の鍵は冪等・再開。タスクIDからグローバルIDを決定的に採番すれば、並列実行も部分再実行も再試行も最終結果が一意に収束する。リトライ前提の設計にする
- 起動はCloud Scheduler（cron・OAuth認証でjobs:run）かEventarc（GCS/Pub/Subイベント駆動）。--task-timeoutは既定10分・最大168時間（7日）まで伸ばせる
- Cloud Workflowsは複数ステップのオーケストレーター。parallelで並列化、try/retry/exceptで指数バックオフ付きの回復性を宣言、ステップ実行ごとの課金。OCRと音声認識の並列化で処理時間を約30%短縮した実例がある

---

「動画を処理する」「数万件をバッチで変換する」「LLMで一括分類する」——こうした処理を**同期HTTPで抱え込む**と、必ず破綻します。リクエストタイムアウト（Cloud Runは最大60分）を超え、クライアントが切れても処理は止められず、リトライで多重実行が起きる。正解は**「受付（Service）と実行（Job/Workflow）を分ける」**ことです。

私は[放送事業者向けプラットフォーム](/case-studies/broadcaster-ai-content-platform)で、テロップ誤字検出パイプライン——動画から字幕を抽出し、OCR（画像）と音声認識（発話）を照合して誤字を検出する重い処理——を、まさにこの形で作りました。**HTTPのAPIはジョブを起動して受付IDを返すだけ**にし、重い処理は **Cloud Run Jobs + Cloud Workflows で並列実行**。OCRと音声認識を並列化して **逐次18分→並列13分（約30%短縮）** を実現し、長時間ジョブは**セグメントIDを決定的に採番して冪等・再開可能**に設計しました。

本記事は、その設計を[公式ドキュメント](https://docs.cloud.google.com/run/docs/create-jobs)に忠実に再現します。Servicesそのものの本番運用は [Cloud Run 本番運用ガイド](/blog/google-cloud-run-production-guide) を参照してください。

---

## まず分類する：Services / Jobs / Workflows / Worker Pools

処理を正しい箱に入れるだけで、設計の8割が決まります。

| 箱 | 性質 | 使い所 |
|----|------|-------|
| **Services** | 同期HTTPを捌く | API・Webアプリ・Webhook |
| **Jobs** | 実行して**完了したら止まる** | バッチ・DBマイグレーション・一括変換・長時間処理 |
| **Workflows** | 複数ステップの**オーケストレーション** | ジョブ/サービス/APIを順序・並列・条件で束ねる |
| **Worker Pools** | **常駐**バックグラウンド | Pub/Sub pull・Kafkaコンシューマ |

「重いからJob」「複数を束ねるからWorkflow」——この一次分類を最初にやります。

---

## Cloud Run Jobs：タスク分割で並列に捌く

ジョブは**1つのジョブを最大10,000タスクに分割**し、並列実行できます。主要なフラグは——

| フラグ | 意味 | 既定/上限 |
|--------|------|----------|
| `--tasks` | タスク数（仕事を何分割するか） | 既定1・最大10,000 |
| `--parallelism` | 同時に走らせるタスク数 | 既定は可能な限り並列 |
| `--max-retries` | 失敗タスクの再試行回数 | 既定3・最大10 |
| `--task-timeout` | 1タスクの最大実行時間 | 既定10分・最大168時間（7日） |

各タスクは、自分が「全体の何番目か」を環境変数で知ります。

- **`CLOUD_RUN_TASK_INDEX`**：このタスクの番号（`0` 〜 `タスク数-1`）
- **`CLOUD_RUN_TASK_COUNT`**：ジョブ全体のタスク数
- **`CLOUD_RUN_TASK_ATTEMPT`**：このタスクの試行回数（リトライで増える）

**タスクは終了コード0で成功**。失敗すると `--max-retries` まで自動再試行され、**あるタスクがリトライを使い切るとジョブ実行全体が失敗**します。

### シャーディング：INDEX で担当範囲を決める

「10,000件を100タスクで並列処理する」典型パターン。各タスクが `INDEX` から**自分の担当スライスを決定的に**計算します。

```python
import os

task_index = int(os.environ["CLOUD_RUN_TASK_INDEX"])   # 0..count-1
task_count = int(os.environ["CLOUD_RUN_TASK_COUNT"])    # 例: 100

items = load_all_item_ids()        # 全件（外部ストアから取得）
# 自分の担当だけを処理（重複なく・漏れなく全体をカバー）
my_items = items[task_index::task_count]

for item_id in my_items:
    process_one(item_id)           # ← この1件が冪等であることが命（後述）
```

```bash
# 100タスク・最大20並列・各タスク最大1時間でデプロイして実行
gcloud run jobs deploy batch-convert \
  --image asia-northeast1-docker.pkg.dev/PROJECT_ID/app/batch:${GIT_SHA} \
  --region asia-northeast1 \
  --tasks 100 --parallelism 20 \
  --max-retries 3 --task-timeout 3600s \
  --service-account batch@PROJECT_ID.iam.gserviceaccount.com
gcloud run jobs execute batch-convert --region asia-northeast1 --wait
```

---

## 冪等・再開：本番品質の核心

ジョブは**必ずリトライされ得る**前提で設計します（タスク失敗・SIGTERM・再実行）。だから**「同じ処理が2回走っても結果が変わらない（冪等）」**でなければ、データが壊れます。これは私が[本番二重課金0件の決済基盤](/case-studies/payment-platform-reliability)で徹底したのと同じ原則です。

**冪等にする2つの定石：**

1. **決定的なID採番**：タスク内のローカルIDを、`INDEX` を使ってグローバルに一意化する。並列でも再実行でもIDが安定する。

```python
# セグメント単位に「グローバルID = INDEX × stride + local_id」で決定的に採番。
# 並列処理しても後段マージでIDが衝突せず、再実行でも同じIDに収束する。
STRIDE = 100_000
def global_id(local_id: int) -> int:
    return task_index * STRIDE + local_id
```

2. **書き込みを冪等にする**：出力は「あれば上書き/無視」できる形に。オブジェクト保存なら**決定的なオブジェクト名へ上書き**、DBなら**UPSERT＋一意制約**。

```python
# 出力先を決定的なパスにする → 再実行は同じ場所を上書きするだけ（重複生成しない）
output_path = f"results/{global_id(local_id)}.json"
storage_client.bucket("outputs").blob(output_path).upload_from_string(payload)
```

> 私のテロップ誤字検出では、**セグメントごとに `グローバルテロップID = segment_index × stride + local_id` を決定的に採番**し、並列処理・部分再実行・再試行のどれでも最終結果が一意に収束するようにしました。「丁寧にリトライ制御する」のではなく、**「リトライされても壊れない」よう構造で保証する**——これが回復性の本質です。冪等な非同期処理の一般論は [SQS/Lambdaの冪等処理](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide) も参考になります。

---

## 起動する：スケジュール実行とイベント駆動

### Cloud Scheduler（cron）

定期バッチは Cloud Scheduler から起動します。ジョブの実行はCloud Run Admin APIの `jobs:run` を叩くため、**OAuth認証（`--oauth-service-account-email`）**を使います（サービス呼び出しのOIDCとは別）。

```bash
# 毎日午前2時にジョブを起動。SAには roles/run.invoker が必要。
gcloud scheduler jobs create http nightly-batch \
  --location asia-northeast1 \
  --schedule="0 2 * * *" \
  --uri="https://run.googleapis.com/v2/projects/PROJECT_ID/locations/asia-northeast1/jobs/batch-convert:run" \
  --http-method POST \
  --oauth-service-account-email scheduler@PROJECT_ID.iam.gserviceaccount.com
```

### Eventarc（イベント駆動）

「ファイルがアップロードされたら処理」はEventarcで。GCSの確定イベントなどを受けてサービス/ジョブを起動します。

```bash
# GCSへのアップロード（finalized）を受けてスキャナサービスを起動
gcloud eventarc triggers create scan-on-upload \
  --location asia-northeast1 \
  --destination-run-service malware-scanner \
  --event-filters "type=google.cloud.storage.object.v1.finalized" \
  --event-filters "bucket=uploads-raw" \
  --service-account eventarc@PROJECT_ID.iam.gserviceaccount.com
```

> 私のマルウェアスキャナは、**GCSアップロードをEventarcで受けてClamAV（Cloud Run）に渡し、最大10GiBをバッファせずストリーミング検査**してクリーン/隔離バケットへ振り分けていました。`File.move` の原子性で再試行に冪等にし、ゼロ長・アップロード中・削除済みは安全に無視。**イベント駆動 × 冪等**の典型です。

---

## Cloud Workflows：複数ステップを束ねる

「ジョブAの後にBを並列、結果を集約してCを呼ぶ」——こうした**オーケストレーション**は Cloud Workflows が担います。YAML/JSONでステップを宣言し、**ステップ実行ごとの課金**（アイドル時は無料）。HTTP呼び出しとGoogle Cloudコネクタ（Cloud Run含む）に対応し、**最大1年まで状態を保持・待機**できます。

```yaml
# workflow.yaml — OCRと音声認識を並列実行し、結果を照合する（テロップ誤字検出の骨子）
main:
  params: [input]
  steps:
    - extract_subtitles:
        call: http.post
        args:
          url: ${"https://api-xxxxx.a.run.app/extract"}
          auth: { type: OIDC }   # 認証付きCloud Runサービスを安全に呼ぶ
          body: { video: ${input.video} }
        result: frames

    # ★ OCRと音声認識は相互依存しない → 並列実行で時間短縮
    - analyze_in_parallel:
        parallel:
          shared: [ocr_result, asr_result]
          branches:
            - ocr_branch:
                steps:
                  - run_ocr:
                      call: http.post
                      args:
                        url: ${"https://ocr-xxxxx.a.run.app/run"}
                        auth: { type: OIDC }
                        body: { frames: ${frames.body} }
                      result: ocr_result
            - asr_branch:
                steps:
                  - run_asr:
                      call: http.post
                      args:
                        url: ${"https://asr-xxxxx.a.run.app/run"}
                        auth: { type: OIDC }
                        body: { video: ${input.video} }
                      result: asr_result

    - reconcile:
        call: http.post
        args:
          url: ${"https://api-xxxxx.a.run.app/reconcile"}
          auth: { type: OIDC }
          body: { ocr: ${ocr_result.body}, asr: ${asr_result.body} }
        result: report
    - done:
        return: ${report.body}
```

### リトライとエラー処理

不安定な外部呼び出しは、**指数バックオフ付きリトライ**を宣言で重ねます。

```yaml
    - call_flaky_service:
        try:
          call: http.post
          args:
            url: ${"https://flaky-xxxxx.a.run.app/run"}
            auth: { type: OIDC }
          result: r
        retry:
          predicate: ${http.default_retry_predicate}   # 5xx等で再試行
          max_retries: 5
          backoff: { initial_delay: 1, max_delay: 60, multiplier: 2 }
        except:
          as: e
          steps:
            - handle:
                call: sys.log
                args: { text: ${"failed: " + e.message}, severity: ERROR }
```

**並列（`parallel`）で速くし、`try`/`retry`/`except` で壊れにくくする**——これがWorkflowsの2大価値です。

---

## いつ Jobs / いつ Workflows / いつ Worker Pools

- **単一の重い処理を並列に捌く** → **Jobs**（`--tasks` でシャーディング）。
- **複数の処理を順序・並列・条件で束ねる／長く待つ** → **Workflows**（オーケストレーター）。
- **キューを常時pullし続ける** → **Worker Pools**（Pub/Sub pull・Kafka）。
- 多くの本番系は**組み合わせ**になります（私のパイプラインは「Workflowsが全体を統率し、各重い処理はJob/Service、入口はEventarc」）。

---

## 本番投入チェックリスト

- [ ] HTTPに向かない処理を **Services から Jobs/Workflows へ切り離した**
- [ ] タスクは **`CLOUD_RUN_TASK_INDEX`** で担当範囲を決定的に分割
- [ ] 各タスクの単位処理が **冪等**（決定的ID＋UPSERT/上書き）
- [ ] **リトライ前提**で設計（`--max-retries`／Workflowsの`retry`）
- [ ] `--task-timeout` を処理に合わせて設定（最大7日）
- [ ] 起動は **Cloud Scheduler（cron・OAuth）** か **Eventarc（イベント）**
- [ ] 複数ステップは **Workflows**（`parallel`で並列・`try/except`で回復）
- [ ] 進捗・結果を **可観測**に（構造化ログ・メトリクス・必要なら進捗ストア）

---

## まとめ：重い処理は「切り離して・冪等にして・束ねる」

Cloud Runの本番設計は、**「HTTPに向かない処理を正しい箱に入れる」**ことから始まります。重い処理は Jobs に切り離し、タスク分割で並列に捌き、決定的IDで冪等・再開可能にし、Workflows で順序・並列・回復性を束ねる。これで、**動画処理も大規模バッチも、止まらず・壊れず・速く**回せます。

私が放送局の制作ワークフローで「完走・再開・冪等」を作り込めたのは、この分解があったからです。全体設計は [Cloud Run 本番運用ガイド](/blog/google-cloud-run-production-guide)、CI/CDは [Cloud Run CI/CDガイド](/blog/google-cloud-run-cicd-cloud-build-github-actions-workload-identity-blue-green-canary-guide)、コストは [並行性・課金ガイド](/blog/google-cloud-run-autoscaling-concurrency-billing-cost-optimization-guide) へ。バッチ・非同期基盤の設計でお困りなら、実装まで伴走します。
