メインコンテンツへスキップ
友田 陽大
音源分離・音声前処理
音源分離
MLOps
アーキテクチャ設計
Python
GPU
FastAPI
冪等性

音源分離を本番APIにする:GPUワーカー × ジョブキュー × 冪等性の設計

Demucsなどの音源分離をデモから本番サービスへ。重いGPU処理を非同期ジョブキューに載せ、冪等性・回復性・可観測性・コスト効率を担保するアーキテクチャを、型安全なFastAPI ingress とPythonワーカーの実コードで解説。OOM回復・graceful shutdown・at-least-once配信・GPUオートスケール・テスト容易性まで、本番運用に必要な設計を網羅します。

公開日
読了時間
15分
著者
友田 陽大
シェア

この記事のゴール

DemucsUVR5/MDX-Net で音源分離を「動かす」ところまでは、各専用記事の通り数分でできます。問題はその先——1日に何千本も来る音声を、落ちずに・二重課金せず・コストを抑えて捌く本番サービスにする設計です。

本稿は、特定ツールに依存しない「重いGPU処理を本番APIにする」アーキテクチャを、型安全な実コードで示します。題材は音源分離ですが、ここで使う設計(非同期ジョブキュー・冪等性・回復性・可観測性)は、動画変換・文字起こし・画像生成など、あらゆる重いAI処理に転用できる普遍的な型です。

読み終えたときに、次ができる状態を目指します。

  1. なぜ「APIを直接叩く同期実装」が本番で破綻するのかを説明し、非同期アーキテクチャを設計できる。
  2. 冪等性・回復性・可観測性・コスト効率を、コードの構造として実装できる。
  3. GPUなしでワーカーをユニットテストし、Demucs↔UVRを差し替えても壊れない境界を引ける。

筆者について(信頼性の開示):私は、動画をアップロードするだけで多言語ローカライズまで全自動化するAI動画ローカライズ基盤を単独で設計・実装し、本番運用しています。その第1段の音源分離は、まさに本稿の「GPUワーカー × ジョブキュー」で回しています。本稿の設計は、机上論ではなく実際にGPU代を溶かし、OOMで叩き起こされ、二重処理に課金された経験から逆算したものです。


30秒のまとめ

観点設計
基本形API ingress → ジョブキュー → GPUワーカー → オブジェクトストレージ(完全非同期)
なぜ非同期か分離は数十秒〜数分。同期で返すとタイムアウト・二重実行・スケール不能
冪等性sha256(入力URI + パラメータ) をジョブキーに。完了済みはskip、再送は既存結果を返す
配信保証キューは at-least-once。「成功してからack」「完了済みskip」で二重分離を防ぐ
回復性OOMは段階縮退、失敗は指数バックオフ→DLQ、SIGTERMは現ジョブ完了後に終了
可観測性構造化ログ(相関ID)+メトリクス(滞留・処理時間・device・GPU使用率)
コストスポットGPU・バッチ・キュー滞留でオートスケール・モデル常駐ロード
テスト容易性SeparationEngine を Protocol 化、FakeEngine でGPUなしテスト

設計の全体像は次の5レイヤです。


なぜ「APIを直接叩く」と本番で破綻するのか

最初に誰もが書くのは、こういう同期実装です。

# ❌ アンチパターン: リクエスト内で分離を完走させる
@app.post("/separate")
def separate(audio_url: str):
    stems = run_demucs(audio_url)   # 数十秒〜数分ブロックする
    return {"stems": stems}

これはデモでは動き、本番で必ず壊れます。理由は4つ。

  • タイムアウト:分離は数十秒〜数分。ロードバランサ/ゲートウェイの上限(多くは30〜60秒)を超えて切られる。
  • 二重実行:ユーザーが送信ボタンを連打/クライアントが自動リトライすると、同じ音源を何度もGPUで処理してGPU代を溶かす。
  • スケール不能:1リクエストが1GPUを長時間占有。同時アクセスが来たら詰まる。バックプレッシャ(過負荷時に受け流す仕組み)が無い。
  • 回復不能:処理中にワーカーが落ちたら、その仕事は消える。どこまで進んだか分からない。

解決は1つ——「受け付ける」と「処理する」を分離し、間にキューを挟むことです。


全体アーキテクチャ:5つのレイヤ

┌──────────┐   ①受け付ける(即返す)      ┌──────────┐
│  Client  │ ─────────────────────────▶ │   API    │  ← FastAPI / 型安全な境界
└──────────┘   202 Accepted + job_id   │ ingress  │
      ▲                                  └────┬─────┘
      │ ④結果を取りに来る(poll/webhook)        │ enqueue (at-least-once)
      │                                       ▼
      │                                  ┌──────────┐
      │                                  │  Queue   │  ← SQS / Redis(バックプレッシャ+再試行)
      │                                  └────┬─────┘
      │                                       │ reserve(visibility timeout)
      │                                       ▼
┌─────┴──────┐   ③結果を保存          ┌──────────┐
│  Object    │ ◀──────────────────── │  GPU     │  ← モデル常駐・冪等処理・OOM回復
│  Storage   │                        │  Worker  │
└────────────┘                        └──────────┘
                                       ②重い処理はここだけ

各レイヤの**責務(SRP)**を1行で。

  • API ingress:入力を検証し、冪等キーを作り、キューに積んで202を返す。重い処理は一切しない。
  • Queue:仕事を貯め、再試行とバックプレッシャを担う。at-least-once配信
  • GPU Worker:キューから取り、分離し、結果を保存する。唯一GPUを触る層
  • Object Storage:入力音声と出力ステムを置く(S3など)。
  • Job Store:ジョブの状態(queued/running/done/failed)と結果URIを持つ(DB/Redis)。冪等性の土台。

以下、レイヤごとに実装します。


レイヤ1:API ingress(型安全な受け口)

境界では外部入力を必ず検証します(Pydantic v2)。詳細はPydantic v2の記事FastAPIの記事に。

import hashlib
from pydantic import BaseModel, Field, field_validator

class SeparationJob(BaseModel):
    """分離ジョブの不変な仕様。これがそのまま冪等キーの素になる。"""
    model_config = {"frozen": True}  # 不変=キーの安定性を保証

    audio_uri: str = Field(..., description="s3://bucket/key 形式のみ許可")
    engine: str = Field("demucs", pattern="^(demucs|uvr)$")  # 許可リストで制限
    two_stems: bool = False

    @field_validator("audio_uri")
    @classmethod
    def _only_trusted_scheme(cls, v: str) -> str:
        # セキュリティ: 任意URLを許すとSSRFの穴になる。自社バケットのs3://に限定
        if not v.startswith("s3://"):
            raise ValueError("audio_uri must be an s3:// URI")
        return v

def job_key(job: SeparationJob) -> str:
    """同じ入力×同じ設定なら必ず同じキー(冪等性の起点)。"""
    return hashlib.sha256(job.model_dump_json().encode()).hexdigest()
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/v1/separations", status_code=202)
async def create(job: SeparationJob) -> dict:
    key = job_key(job)
    # 冪等性: 既存ジョブがあれば作り直さず返す(連打・自動リトライで二重投入しない)
    if (existing := await store.get(key)) is not None:
        return {"job_id": key, "status": existing.status, "cached": True}
    await store.put(key, status="queued")
    await queue.enqueue(key, job)          # キューに積むだけ。GPUは触らない
    return {"job_id": key, "status": "queued", "cached": False}

@app.get("/v1/separations/{job_id}")
async def status(job_id: str) -> dict:
    if (rec := await store.get(job_id)) is None:
        raise HTTPException(status_code=404, detail="job not found")
    return {"job_id": job_id, "status": rec.status, "stems": rec.stems}

ポイントは、ingressが**「検証・冪等キー・enqueue・即返す」しかしない**こと。これでタイムアウトも二重実行も構造的に消えます。


レイヤ2:ジョブキュー(再試行とバックプレッシャ)

キューは実装を Protocol で抽象化します。SQSでもRedisでも、ワーカーのコードを変えずに差し替えられる(ETC)。

from typing import Protocol
from dataclasses import dataclass

@dataclass(frozen=True, slots=True)
class Reserved:
    handle: str          # ack/nackに使う予約ハンドル
    job_id: str
    job: "SeparationJob"
    receive_count: int    # 何回配信されたか(DLQ判定に使う)

class JobQueue(Protocol):
    async def enqueue(self, job_id: str, job: SeparationJob) -> None: ...
    async def reserve(self) -> Reserved | None: ...   # ロングポーリング
    async def ack(self, handle: str) -> None: ...      # 完了。キューから消す
    async def nack(self, handle: str) -> None: ...     # 失敗。可視性タイムアウト後に再配信

SQSなら reserveReceiveMessage(visibility timeout付き)、ackDeleteMessagenack は可視性タイムアウトを0に戻すだけ。at-least-once配信(少なくとも1回。重複あり)が前提なので、二重処理対策は次のワーカー側で行います。SQS/Lambdaでの冪等な非同期処理は専用記事で詳述しています。


レイヤ3:GPUワーカー(本体)

ここが心臓部です。要件は ①モデルは常駐ロード(コールドスタート回避)②冪等処理 ③OOM回復 ④graceful shutdown

まず、分離エンジンを Protocol で抽象化します。これにより Demucs↔UVR の差し替えも、GPUなしテストも可能になる(ETC+テスト容易性)。

from pathlib import Path
from typing import Protocol

class SeparationEngine(Protocol):
    """音源分離の抽象(port)。具体ツールはこの裏に隠す。"""
    name: str
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        """{ステム名: 出力ファイルパス} を返す。失敗時は例外を送出。"""
        ...
import torch
import demucs.api

class DemucsEngine:
    """SeparationEngine の Demucs 実装(adapter)。"""
    def __init__(self, model: str = "htdemucs") -> None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        # モデルは起動時に1度だけロードして常駐させる(毎ジョブのロードを避ける)
        self._sep = demucs.api.Separator(model=model, device=device)
        self.name = f"demucs:{model}"

    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        _, stems = self._sep.separate_audio_file(str(audio))
        out_dir.mkdir(parents=True, exist_ok=True)
        if two_stems:  # vocals / no_vocals に畳む
            stems = {"vocals": stems["vocals"],
                     "no_vocals": sum(s for n, s in stems.items() if n != "vocals")}
        outputs: dict[str, Path] = {}
        for name, src in stems.items():
            path = out_dir / f"{name}.wav"
            demucs.api.save_audio(src, str(path), samplerate=self._sep.samplerate)
            outputs[name] = path
        return outputs

そしてワーカーループ。冪等性とack順序が肝です。

import logging, tempfile, threading
from pathlib import Path

log = logging.getLogger("separation.worker")

def run_worker(engine: SeparationEngine, queue: JobQueue,
               store: "JobStore", blob: "BlobStore", *, stop: threading.Event) -> None:
    while not stop.is_set():
        reserved = queue_reserve_blocking(queue)   # ロングポーリング
        if reserved is None:
            continue
        ctx = {"job_id": reserved.job_id, "engine": engine.name}  # 相関ID

        # 冪等性①: 再配信(at-least-once)で来た「完了済み」は処理せずackだけ
        if store.is_done(reserved.job_id):
            queue.ack(reserved.handle)
            log.info("skip.already_done", extra=ctx)
            continue

        try:
            with tempfile.TemporaryDirectory() as tmp:
                tmpdir = Path(tmp)
                local = blob.download(reserved.job.audio_uri, tmpdir / "in")
                stems = separate_with_recovery(engine, local, tmpdir / "out",
                                               two_stems=reserved.job.two_stems)
                # 結果をオブジェクトストレージへ(job_idで名前空間を切る=上書き安全)
                uris = {n: blob.upload(p, f"{reserved.job_id}/{n}.wav")
                        for n, p in stems.items()}
            store.mark_done(reserved.job_id, uris)
            queue.ack(reserved.handle)              # 冪等性②: 成功して初めてack
            log.info("separation.done", extra={**ctx, "stems": list(uris)})
        except Exception as e:                      # 回復性: 失敗はnack→再試行/DLQ
            store.mark_failed(reserved.job_id, type(e).__name__)
            queue.nack(reserved.handle)
            log.error("separation.failed", extra={**ctx, "error": type(e).__name__})

なぜこの順序か:分離が成功し、結果を保存し終えて初めてackします。途中で落ちてもackしていないので、キューは可視性タイムアウト後に再配信してくれる(仕事は消えない)。再配信で同じジョブが来ても、is_doneチェックで二重分離しない。この2つで、at-least-onceキューの上に実質exactly-onceの処理を構築できます。

OOM回復(段階縮退)

GPUのメモリ不足を例外ではなく正常系として扱います(回復性パターンの記事も参照)。

def separate_with_recovery(engine: SeparationEngine, audio: Path, out: Path,
                           *, two_stems: bool) -> dict[str, Path]:
    try:
        return engine.separate(audio, out, two_stems=two_stems)
    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        # 最後の砦: CPUエンジンへ退避(遅いが完走する)。冪等なので安全に作り直せる
        log.warning("oom.fallback_to_cpu")
        return DemucsEngine_cpu().separate(audio, out, two_stems=two_stems)

segmentを段階的に縮める詳細版はDemucsの記事に。)

Graceful shutdown(スポットGPU対策)

スポットインスタンスは予告して停止されます。SIGTERMで現ジョブを完了させてから終了すれば、仕事を取りこぼしません。

import signal

stop = threading.Event()
# SIGTERM受信時: ループの先頭で抜ける。実行中のジョブは最後までやり切る
signal.signal(signal.SIGTERM, lambda *_: stop.set())

万一強制killされても、ack前なのでキューが再配信し、冪等性で安全に再実行されます。「優雅に止まる」と「乱暴に殺されても平気」の二重の備えが、スポットGPUを安心して使える条件です。


可観測性:止まった処理を一目で追う

「なぜこの曲だけ遅い/失敗した」を後から追えなければ、本番は回せません。三点を構造化して出します。

  • 構造化ログ+相関ID:全ログに job_id を通す。printではなくJSON構造化ログで、検索・集計可能に。音声の中身(PII)は出さない
  • メトリクスqueue_depth(滞留)、separation_duration_seconds(処理時間・モデル別)、device(cuda/cpu)、gpu_utilization。滞留が増え続けたらスケール不足、CPU率が上がったらGPU脱落の事故
  • トレース:ingress→queue→worker→storage を1本のトレースで繋ぐ。OpenTelemetryでの相関は専用記事に。
import time

def timed_separation(engine, audio, out, *, two_stems, metrics):
    start = time.monotonic()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    try:
        return separate_with_recovery(engine, audio, out, two_stems=two_stems)
    finally:
        # 症状ベースの監視: 処理時間・deviceをラベル付きで記録
        metrics.observe("separation_duration_seconds",
                        time.monotonic() - start,
                        labels={"engine": engine.name, "device": device})

コスト効率:GPUを遊ばせない

音源分離はGPU時間がそのままお金です。単価を刻む打ち手は4つ。

  1. モデル常駐ロード:毎ジョブでモデルをロードし直さない(ワーカー起動時に1回)。コールドスタートのGPU時間をゼロに。
  2. スポットGPU:オンデマンドの数分の1。graceful shutdown+冪等性があるので中断されても損しない
  3. キュー滞留でオートスケールqueue_depthをターゲットにワーカー数を増減。暇ならゼロまでスケールイン(アイドルGPUを持たない)。
  4. バッチ処理:複数曲をまとめて1回のGPU呼び出しに(-j並列)。スループットを上げ、起動コストを償却。

具体的なGPU基盤の置き場所(ECS/EKS)とコスト最適化はECS vs EKSの記事FinOpsの記事に。**冪等キャッシュで「同じ音源の再分離ゼロ」**を足せば、無駄打ちが直接消えます。


テスト容易性:GPUなしでワーカーを検証する

SeparationEngine を Protocol にした最大の恩恵がこれです。GPUもDemucsも無いCIで、ワーカーの冪等性・回復性をユニットテストできます。

class FakeEngine:
    """テスト用の分離エンジン。GPU不要で決定的な出力を返す。"""
    name = "fake"
    calls = 0
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        type(self).calls += 1
        out_dir.mkdir(parents=True, exist_ok=True)
        names = ["vocals", "no_vocals"] if two_stems else ["vocals", "drums", "bass", "other"]
        return {n: _touch(out_dir / f"{n}.wav") for n in names}

def test_redelivery_does_not_double_process(fake_queue, fake_store, fake_blob):
    """at-least-onceで同じジョブが2回来ても、分離は1回しか走らない。"""
    engine = FakeEngine()
    job_id = enqueue_and_run(engine, fake_queue, fake_store, fake_blob, times=2)
    assert fake_store.is_done(job_id)
    assert FakeEngine.calls == 1   # ← 冪等性の核心。2回流しても1回だけ

「GPUがないとテストできない」は設計の敗北です。重い副作用(GPU・I/O)を境界の向こうに追い出すだけで、ロジックは高速・決定的にテストできます。これが SRP と依存性逆転の実利です。


よくある質問(FAQ)

Q. キューはSQSとRedis、どちら? A. AWSに寄せるならSQS(DLQ・可視性タイムアウトが標準装備)。軽く始めるならRedis(RQ/Celery)。Protocolで抽象化してあるので後から差し替え可能。Celery/Redisの実装は専用記事に。

Q. 結果通知はpollとwebhook、どちら? A. 両対応が理想。社内バッチはpoll(GET /status)で十分、外部連携はwebhook(署名検証必須)。webhookはStripeの記事の署名検証パターンが流用できます。

Q. 「成功してからack」だと、保存後・ack前に落ちたら? A. 再配信されますが、is_doneチェックでskipされるので二重分離しません。これがat-least-onceの上でexactly-onceを作る定石です。

Q. GPUは常時起動が必要? A. 不要。キュー滞留でオートスケールし、暇ならゼロに。コールドスタートはモデルロード分(数十秒)かかるので、SLA次第で最小1台を温存する判断もあり。

Q. この設計は音源分離以外にも使える? A. そのまま使えます。文字起こし・動画変換・画像/動画生成など、重い・冪等にしたい・GPUを使う処理は全部この型に乗ります。汎用の非同期処理基盤です。


まとめ:「動く」と「本番で稼ぐ」を分けるのは設計

音源分離を本番サービスにする鍵は、モデルの良し悪しではなくアーキテクチャです。

  1. 受け付けると処理するを分離し、間にキューを置く(タイムアウト・二重実行の根絶)。
  2. 冪等性(sha256キー+完了skip)とat-least-once+成功後ackで、二重分離を構造的に止める。
  3. 回復性(OOM縮退・nack/DLQ・graceful shutdown)で、失敗を正常系にする。
  4. 可観測性・コスト・テスト容易性を、後付けでなく最初から構造に織り込む。

ここまでやって初めて、デモではなく「何千本来ても落ちない」サービスになります。そして——この設計判断こそ、外注で最も価値が出るところです。モデルを繋ぐだけのPoCは誰でも作れますが、GPU代を溶かさず・二重課金せず・スポットでも平気な本番基盤は、痛い目を見た数がそのまま品質になります。

私は、本稿のアーキテクチャを実際に本番運用しているAI動画ローカライズ基盤のGPU処理層で実装しました。音源分離に限らず、重いAI処理の本番化・コスト最適化・信頼性設計をお考えなら、実績をご覧のうえご相談ください。一人 × 生成AIで、PoCから本番運用まで一気通貫で作ります。


出典・関連リソース

※ 本稿のコードは設計の骨子を示すものです。実装時は各ライブラリ・クラウドの最新仕様を一次情報で確認してください。

友田

友田 陽大

経済産業大臣賞 受賞プロダクト開発者。TypeScript + Python + AWS で、SaaS・業界DX・ 実用レベルの生成AI(RAG)を、要件定義からインフラ・運用まで一人で完遂します。

この記事で解説した技術の適用事例

AI動画ローカライズ・リップシンク基盤

ケーススタディを見る