# 音源分離を本番APIにする：GPUワーカー × ジョブキュー × 冪等性の設計

> Demucsなどの音源分離をデモから本番サービスへ。重いGPU処理を非同期ジョブキューに載せ、冪等性・回復性・可観測性・コスト効率を担保するアーキテクチャを、型安全なFastAPI ingress とPythonワーカーの実コードで解説。OOM回復・graceful shutdown・at-least-once配信・GPUオートスケール・テスト容易性まで、本番運用に必要な設計を網羅します。

- 公開日: 2026-06-25
- 著者: 友田 陽大
- タグ: 音源分離, MLOps, アーキテクチャ設計, Python, GPU, FastAPI, 冪等性
- URL: https://tomodahinata.com/blog/music-source-separation-production-api-gpu-worker-queue

## 要点

- 音源分離のような重いGPU処理を同期APIで返すと、長時間リクエスト・タイムアウト・二重処理で必ず破綻する。正解は『API ingress → ジョブキュー → GPUワーカー → オブジェクトストレージ』の非同期分離
- 冪等性は sha256(入力URI + パラメータ) のジョブキーで担保。キュー(SQS/Redis)のat-least-once配信は『成功してからack＋完了済みskip』で二重分離を構造的に防ぐ
- 回復性は多層：OOMはsegment縮小→CPU退避の段階縮退、失敗はnack→指数バックオフ→N回でDLQ、SIGTERMは現ジョブ完了後に終了。冪等性があるので強制killされても再配信で安全
- 可観測性は構造化ログ(相関ID)＋メトリクス(キュー滞留・処理時間・device)。コストはスポットGPU・バッチ・キュー滞留でのオートスケール・モデル常駐ロードで最適化する
- テスト容易性は SeparationEngine を Protocol で抽象化し、GPUなしのFakeEngineで冪等性・回復性をユニットテスト。Demucs↔UVRの差し替えも同じ境界で吸収する(ETC)

---

## この記事のゴール

[Demucs](/blog/demucs-v4-music-source-separation-production-guide) や [UVR5/MDX-Net](/blog/uvr5-mdx-net-vocal-separation-production-guide) で音源分離を「動かす」ところまでは、各専用記事の通り数分でできます。問題はその先——**1日に何千本も来る音声を、落ちずに・二重課金せず・コストを抑えて捌く本番サービス**にする設計です。

本稿は、**特定ツールに依存しない「重いGPU処理を本番APIにする」アーキテクチャ**を、型安全な実コードで示します。題材は音源分離ですが、ここで使う設計（非同期ジョブキュー・冪等性・回復性・可観測性）は、**動画変換・文字起こし・画像生成など、あらゆる重いAI処理に転用できる普遍的な型**です。

読み終えたときに、次ができる状態を目指します。

1. なぜ「APIを直接叩く同期実装」が本番で破綻するのかを説明し、**非同期アーキテクチャを設計**できる。
2. **冪等性・回復性・可観測性・コスト効率**を、コードの構造として実装できる。
3. GPUなしで**ワーカーをユニットテスト**し、Demucs↔UVRを差し替えても壊れない境界を引ける。

> **筆者について（信頼性の開示）**：私は、動画をアップロードするだけで多言語ローカライズまで全自動化する**AI動画ローカライズ基盤を単独で設計・実装し、本番運用**しています。その第1段の音源分離は、まさに本稿の「GPUワーカー × ジョブキュー」で回しています。本稿の設計は、机上論ではなく**実際にGPU代を溶かし、OOMで叩き起こされ、二重処理に課金された経験**から逆算したものです。

---

## 30秒のまとめ

| 観点 | 設計 |
| --- | --- |
| **基本形** | API ingress → **ジョブキュー** → GPUワーカー → オブジェクトストレージ（完全非同期） |
| **なぜ非同期か** | 分離は数十秒〜数分。同期で返すとタイムアウト・二重実行・スケール不能 |
| **冪等性** | `sha256(入力URI + パラメータ)` をジョブキーに。完了済みはskip、再送は既存結果を返す |
| **配信保証** | キューは **at-least-once**。「成功してからack」「完了済みskip」で二重分離を防ぐ |
| **回復性** | OOMは段階縮退、失敗は指数バックオフ→DLQ、SIGTERMは現ジョブ完了後に終了 |
| **可観測性** | 構造化ログ（相関ID）＋メトリクス（滞留・処理時間・device・GPU使用率） |
| **コスト** | スポットGPU・バッチ・**キュー滞留でオートスケール**・モデル常駐ロード |
| **テスト容易性** | `SeparationEngine` を Protocol 化、`FakeEngine` でGPUなしテスト |

設計の全体像は次の[5レイヤ](#全体アーキテクチャ5つのレイヤ)です。

---

## なぜ「APIを直接叩く」と本番で破綻するのか

最初に誰もが書くのは、こういう同期実装です。

```python
# ❌ アンチパターン: リクエスト内で分離を完走させる
@app.post("/separate")
def separate(audio_url: str):
    stems = run_demucs(audio_url)   # 数十秒〜数分ブロックする
    return {"stems": stems}
```

これは**デモでは動き、本番で必ず壊れます**。理由は4つ。

- **タイムアウト**：分離は数十秒〜数分。ロードバランサ／ゲートウェイの上限（多くは30〜60秒）を超えて切られる。
- **二重実行**：ユーザーが送信ボタンを連打／クライアントが自動リトライすると、**同じ音源を何度もGPUで処理**してGPU代を溶かす。
- **スケール不能**：1リクエストが1GPUを長時間占有。同時アクセスが来たら詰まる。バックプレッシャ（過負荷時に受け流す仕組み）が無い。
- **回復不能**：処理中にワーカーが落ちたら、その仕事は**消える**。どこまで進んだか分からない。

解決は1つ——**「受け付ける」と「処理する」を分離**し、間に**キュー**を挟むことです。

---

## 全体アーキテクチャ：5つのレイヤ

```text
┌──────────┐   ①受け付ける(即返す)      ┌──────────┐
│  Client  │ ─────────────────────────▶ │   API    │  ← FastAPI / 型安全な境界
└──────────┘   202 Accepted + job_id   │ ingress  │
      ▲                                  └────┬─────┘
      │ ④結果を取りに来る(poll/webhook)        │ enqueue (at-least-once)
      │                                       ▼
      │                                  ┌──────────┐
      │                                  │  Queue   │  ← SQS / Redis（バックプレッシャ＋再試行）
      │                                  └────┬─────┘
      │                                       │ reserve（visibility timeout）
      │                                       ▼
┌─────┴──────┐   ③結果を保存          ┌──────────┐
│  Object    │ ◀──────────────────── │  GPU     │  ← モデル常駐・冪等処理・OOM回復
│  Storage   │                        │  Worker  │
└────────────┘                        └──────────┘
                                       ②重い処理はここだけ
```

各レイヤの**責務（SRP）**を1行で。

- **API ingress**：入力を検証し、冪等キーを作り、キューに積んで**即`202`を返す**。重い処理は一切しない。
- **Queue**：仕事を貯め、再試行とバックプレッシャを担う。**at-least-once配信**。
- **GPU Worker**：キューから取り、分離し、結果を保存する。**唯一GPUを触る層**。
- **Object Storage**：入力音声と出力ステムを置く（S3など）。
- **Job Store**：ジョブの状態（queued/running/done/failed）と結果URIを持つ（DB/Redis）。冪等性の土台。

以下、レイヤごとに実装します。

---

## レイヤ1：API ingress（型安全な受け口）

境界では**外部入力を必ず検証**します（Pydantic v2）。詳細は[Pydantic v2の記事](/blog/pydantic-v2-production-validation-type-safety)・[FastAPIの記事](/blog/fastapi-production-async-pydantic-observability-guide)に。

```python
import hashlib
from pydantic import BaseModel, Field, field_validator

class SeparationJob(BaseModel):
    """分離ジョブの不変な仕様。これがそのまま冪等キーの素になる。"""
    model_config = {"frozen": True}  # 不変＝キーの安定性を保証

    audio_uri: str = Field(..., description="s3://bucket/key 形式のみ許可")
    engine: str = Field("demucs", pattern="^(demucs|uvr)$")  # 許可リストで制限
    two_stems: bool = False

    @field_validator("audio_uri")
    @classmethod
    def _only_trusted_scheme(cls, v: str) -> str:
        # セキュリティ: 任意URLを許すとSSRFの穴になる。自社バケットのs3://に限定
        if not v.startswith("s3://"):
            raise ValueError("audio_uri must be an s3:// URI")
        return v

def job_key(job: SeparationJob) -> str:
    """同じ入力×同じ設定なら必ず同じキー（冪等性の起点）。"""
    return hashlib.sha256(job.model_dump_json().encode()).hexdigest()
```

```python
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/v1/separations", status_code=202)
async def create(job: SeparationJob) -> dict:
    key = job_key(job)
    # 冪等性: 既存ジョブがあれば作り直さず返す（連打・自動リトライで二重投入しない）
    if (existing := await store.get(key)) is not None:
        return {"job_id": key, "status": existing.status, "cached": True}
    await store.put(key, status="queued")
    await queue.enqueue(key, job)          # キューに積むだけ。GPUは触らない
    return {"job_id": key, "status": "queued", "cached": False}

@app.get("/v1/separations/{job_id}")
async def status(job_id: str) -> dict:
    if (rec := await store.get(job_id)) is None:
        raise HTTPException(status_code=404, detail="job not found")
    return {"job_id": job_id, "status": rec.status, "stems": rec.stems}
```

ポイントは、ingressが**「検証・冪等キー・enqueue・即返す」しかしない**こと。これでタイムアウトも二重実行も構造的に消えます。

---

## レイヤ2：ジョブキュー（再試行とバックプレッシャ）

キューは**実装を Protocol で抽象化**します。SQSでもRedisでも、ワーカーのコードを変えずに差し替えられる（ETC）。

```python
from typing import Protocol
from dataclasses import dataclass

@dataclass(frozen=True, slots=True)
class Reserved:
    handle: str          # ack/nackに使う予約ハンドル
    job_id: str
    job: "SeparationJob"
    receive_count: int    # 何回配信されたか（DLQ判定に使う）

class JobQueue(Protocol):
    async def enqueue(self, job_id: str, job: SeparationJob) -> None: ...
    async def reserve(self) -> Reserved | None: ...   # ロングポーリング
    async def ack(self, handle: str) -> None: ...      # 完了。キューから消す
    async def nack(self, handle: str) -> None: ...     # 失敗。可視性タイムアウト後に再配信
```

SQSなら `reserve` は `ReceiveMessage`（visibility timeout付き）、`ack` は `DeleteMessage`、`nack` は可視性タイムアウトを0に戻すだけ。**at-least-once配信**（少なくとも1回。重複あり）が前提なので、二重処理対策は次のワーカー側で行います。SQS/Lambdaでの冪等な非同期処理は[専用記事](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide)で詳述しています。

---

## レイヤ3：GPUワーカー（本体）

ここが心臓部です。要件は **①モデルは常駐ロード（コールドスタート回避）②冪等処理 ③OOM回復 ④graceful shutdown**。

まず、分離エンジンを **Protocol で抽象化**します。これにより Demucs↔UVR の差し替えも、GPUなしテストも可能になる（ETC＋テスト容易性）。

```python
from pathlib import Path
from typing import Protocol

class SeparationEngine(Protocol):
    """音源分離の抽象（port）。具体ツールはこの裏に隠す。"""
    name: str
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        """{ステム名: 出力ファイルパス} を返す。失敗時は例外を送出。"""
        ...
```

```python
import torch
import demucs.api

class DemucsEngine:
    """SeparationEngine の Demucs 実装（adapter）。"""
    def __init__(self, model: str = "htdemucs") -> None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        # モデルは起動時に1度だけロードして常駐させる（毎ジョブのロードを避ける）
        self._sep = demucs.api.Separator(model=model, device=device)
        self.name = f"demucs:{model}"

    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        _, stems = self._sep.separate_audio_file(str(audio))
        out_dir.mkdir(parents=True, exist_ok=True)
        if two_stems:  # vocals / no_vocals に畳む
            stems = {"vocals": stems["vocals"],
                     "no_vocals": sum(s for n, s in stems.items() if n != "vocals")}
        outputs: dict[str, Path] = {}
        for name, src in stems.items():
            path = out_dir / f"{name}.wav"
            demucs.api.save_audio(src, str(path), samplerate=self._sep.samplerate)
            outputs[name] = path
        return outputs
```

そしてワーカーループ。**冪等性とack順序**が肝です。

```python
import logging, tempfile, threading
from pathlib import Path

log = logging.getLogger("separation.worker")

def run_worker(engine: SeparationEngine, queue: JobQueue,
               store: "JobStore", blob: "BlobStore", *, stop: threading.Event) -> None:
    while not stop.is_set():
        reserved = queue_reserve_blocking(queue)   # ロングポーリング
        if reserved is None:
            continue
        ctx = {"job_id": reserved.job_id, "engine": engine.name}  # 相関ID

        # 冪等性①: 再配信(at-least-once)で来た「完了済み」は処理せずackだけ
        if store.is_done(reserved.job_id):
            queue.ack(reserved.handle)
            log.info("skip.already_done", extra=ctx)
            continue

        try:
            with tempfile.TemporaryDirectory() as tmp:
                tmpdir = Path(tmp)
                local = blob.download(reserved.job.audio_uri, tmpdir / "in")
                stems = separate_with_recovery(engine, local, tmpdir / "out",
                                               two_stems=reserved.job.two_stems)
                # 結果をオブジェクトストレージへ（job_idで名前空間を切る＝上書き安全）
                uris = {n: blob.upload(p, f"{reserved.job_id}/{n}.wav")
                        for n, p in stems.items()}
            store.mark_done(reserved.job_id, uris)
            queue.ack(reserved.handle)              # 冪等性②: 成功して初めてack
            log.info("separation.done", extra={**ctx, "stems": list(uris)})
        except Exception as e:                      # 回復性: 失敗はnack→再試行/DLQ
            store.mark_failed(reserved.job_id, type(e).__name__)
            queue.nack(reserved.handle)
            log.error("separation.failed", extra={**ctx, "error": type(e).__name__})
```

**なぜこの順序か**：分離が成功し、結果を保存し終えて**初めて`ack`**します。途中で落ちても`ack`していないので、キューは可視性タイムアウト後に**再配信**してくれる（仕事は消えない）。再配信で同じジョブが来ても、`is_done`チェックで**二重分離しない**。この2つで、at-least-onceキューの上に**実質exactly-once**の処理を構築できます。

### OOM回復（段階縮退）

GPUのメモリ不足を**例外ではなく正常系**として扱います（[回復性パターンの記事](/blog/retry-backoff-circuit-breaker-resilience-patterns-guide)も参照）。

```python
def separate_with_recovery(engine: SeparationEngine, audio: Path, out: Path,
                           *, two_stems: bool) -> dict[str, Path]:
    try:
        return engine.separate(audio, out, two_stems=two_stems)
    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        # 最後の砦: CPUエンジンへ退避（遅いが完走する）。冪等なので安全に作り直せる
        log.warning("oom.fallback_to_cpu")
        return DemucsEngine_cpu().separate(audio, out, two_stems=two_stems)
```

（`segment`を段階的に縮める詳細版は[Demucsの記事](/blog/demucs-v4-music-source-separation-production-guide#本番で必ず詰まる5つの落とし穴と回復性設計)に。）

### Graceful shutdown（スポットGPU対策）

スポットインスタンスは予告して停止されます。**SIGTERMで現ジョブを完了させてから終了**すれば、仕事を取りこぼしません。

```python
import signal

stop = threading.Event()
# SIGTERM受信時: ループの先頭で抜ける。実行中のジョブは最後までやり切る
signal.signal(signal.SIGTERM, lambda *_: stop.set())
```

万一**強制kill**されても、`ack`前なのでキューが再配信し、冪等性で安全に再実行されます。**「優雅に止まる」と「乱暴に殺されても平気」の二重の備え**が、スポットGPUを安心して使える条件です。

---

## 可観測性：止まった処理を一目で追う

「なぜこの曲だけ遅い／失敗した」を後から追えなければ、本番は回せません。三点を構造化して出します。

- **構造化ログ＋相関ID**：全ログに `job_id` を通す。`print`ではなくJSON構造化ログで、検索・集計可能に。**音声の中身（PII）は出さない**。
- **メトリクス**：`queue_depth`（滞留）、`separation_duration_seconds`（処理時間・モデル別）、`device`（cuda/cpu）、`gpu_utilization`。滞留が増え続けたら**スケール不足**、CPU率が上がったら**GPU脱落の事故**。
- **トレース**：ingress→queue→worker→storage を1本のトレースで繋ぐ。OpenTelemetryでの相関は[専用記事](/blog/opentelemetry-observability-production-tracing-metrics-logs)に。

```python
import time

def timed_separation(engine, audio, out, *, two_stems, metrics):
    start = time.monotonic()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    try:
        return separate_with_recovery(engine, audio, out, two_stems=two_stems)
    finally:
        # 症状ベースの監視: 処理時間・deviceをラベル付きで記録
        metrics.observe("separation_duration_seconds",
                        time.monotonic() - start,
                        labels={"engine": engine.name, "device": device})
```

---

## コスト効率：GPUを遊ばせない

音源分離は**GPU時間がそのままお金**です。単価を刻む打ち手は4つ。

1. **モデル常駐ロード**：毎ジョブでモデルをロードし直さない（ワーカー起動時に1回）。コールドスタートのGPU時間をゼロに。
2. **スポットGPU**：オンデマンドの数分の1。graceful shutdown＋冪等性があるので**中断されても損しない**。
3. **キュー滞留でオートスケール**：`queue_depth`をターゲットにワーカー数を増減。暇なら**ゼロまでスケールイン**（アイドルGPUを持たない）。
4. **バッチ処理**：複数曲をまとめて1回のGPU呼び出しに（`-j`並列）。スループットを上げ、起動コストを償却。

具体的なGPU基盤の置き場所（ECS/EKS）とコスト最適化は[ECS vs EKSの記事](/blog/aws-ecs-vs-eks-startup-decision-framework)・[FinOpsの記事](/blog/aws-terraform-startup-cost-optimization-finops)に。**冪等キャッシュで「同じ音源の再分離ゼロ」**を足せば、無駄打ちが直接消えます。

---

## テスト容易性：GPUなしでワーカーを検証する

`SeparationEngine` を Protocol にした最大の恩恵が**これ**です。GPUもDemucsも無いCIで、**ワーカーの冪等性・回復性をユニットテスト**できます。

```python
class FakeEngine:
    """テスト用の分離エンジン。GPU不要で決定的な出力を返す。"""
    name = "fake"
    calls = 0
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        type(self).calls += 1
        out_dir.mkdir(parents=True, exist_ok=True)
        names = ["vocals", "no_vocals"] if two_stems else ["vocals", "drums", "bass", "other"]
        return {n: _touch(out_dir / f"{n}.wav") for n in names}

def test_redelivery_does_not_double_process(fake_queue, fake_store, fake_blob):
    """at-least-onceで同じジョブが2回来ても、分離は1回しか走らない。"""
    engine = FakeEngine()
    job_id = enqueue_and_run(engine, fake_queue, fake_store, fake_blob, times=2)
    assert fake_store.is_done(job_id)
    assert FakeEngine.calls == 1   # ← 冪等性の核心。2回流しても1回だけ
```

「GPUがないとテストできない」は設計の敗北です。**重い副作用（GPU・I/O）を境界の向こうに追い出す**だけで、ロジックは高速・決定的にテストできます。これが SRP と依存性逆転の実利です。

---

## よくある質問（FAQ）

**Q. キューはSQSとRedis、どちら？**
A. **AWSに寄せるならSQS**（DLQ・可視性タイムアウトが標準装備）。**軽く始めるならRedis**（RQ/Celery）。Protocolで抽象化してあるので**後から差し替え可能**。Celery/Redisの実装は[専用記事](/blog/celery-redis-production-async-task-queue-guide)に。

**Q. 結果通知はpollとwebhook、どちら？**
A. **両対応が理想**。社内バッチはpoll（`GET /status`）で十分、外部連携はwebhook（署名検証必須）。webhookは[Stripeの記事](/blog/stripe-payments-production-guide-webhooks-idempotency-subscriptions)の署名検証パターンが流用できます。

**Q. 「成功してからack」だと、保存後・ack前に落ちたら？**
A. 再配信されますが、`is_done`チェックでskipされるので**二重分離しません**。これがat-least-onceの上でexactly-onceを作る定石です。

**Q. GPUは常時起動が必要？**
A. 不要。**キュー滞留でオートスケールし、暇ならゼロ**に。コールドスタートはモデルロード分(数十秒)かかるので、SLA次第で最小1台を温存する判断もあり。

**Q. この設計は音源分離以外にも使える？**
A. **そのまま使えます**。文字起こし・動画変換・画像/動画生成など、**重い・冪等にしたい・GPUを使う**処理は全部この型に乗ります。汎用の非同期処理基盤です。

---

## まとめ：「動く」と「本番で稼ぐ」を分けるのは設計

音源分離を本番サービスにする鍵は、モデルの良し悪しではなく**アーキテクチャ**です。

1. **受け付けると処理するを分離**し、間にキューを置く（タイムアウト・二重実行の根絶）。
2. **冪等性**（sha256キー＋完了skip）と**at-least-once＋成功後ack**で、二重分離を構造的に止める。
3. **回復性**（OOM縮退・nack/DLQ・graceful shutdown）で、失敗を正常系にする。
4. **可観測性・コスト・テスト容易性**を、後付けでなく最初から構造に織り込む。

ここまでやって初めて、デモではなく「**何千本来ても落ちない**」サービスになります。そして——**この設計判断こそ、外注で最も価値が出るところ**です。モデルを繋ぐだけのPoCは誰でも作れますが、**GPU代を溶かさず・二重課金せず・スポットでも平気な本番基盤**は、痛い目を見た数がそのまま品質になります。

> 私は、本稿のアーキテクチャを**実際に本番運用しているAI動画ローカライズ基盤**のGPU処理層で実装しました。音源分離に限らず、重いAI処理の本番化・コスト最適化・信頼性設計をお考えなら、[実績](/case-studies/ai-video-localization-lipsync)をご覧のうえご相談ください。**一人 × 生成AI**で、PoCから本番運用まで一気通貫で作ります。

---

## 出典・関連リソース

- 個別ツール：[Demucs 完全ガイド](/blog/demucs-v4-music-source-separation-production-guide) ／ [UVR5・MDX-Net 完全ガイド](/blog/uvr5-mdx-net-vocal-separation-production-guide) ／ [ツール選定](/blog/music-source-separation-tool-selection-demucs-uvr-spleeter)
- 非同期・信頼性：[SQS/Lambda 冪等な非同期処理](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide) ／ [リトライ・バックオフ・サーキットブレーカ](/blog/retry-backoff-circuit-breaker-resilience-patterns-guide) ／ [Transactional Outbox](/blog/transactional-outbox-pattern-reliable-event-publishing-guide)
- 基盤・観測：[FastAPI 本番設計](/blog/fastapi-production-async-pydantic-observability-guide) ／ [OpenTelemetry 可観測性](/blog/opentelemetry-observability-production-tracing-metrics-logs) ／ [ECS vs EKS](/blog/aws-ecs-vs-eks-startup-decision-framework)

※ 本稿のコードは設計の骨子を示すものです。実装時は各ライブラリ・クラウドの最新仕様を一次情報で確認してください。
