この記事のゴール
Demucs や UVR5/MDX-Net で音源分離を「動かす」ところまでは、各専用記事の通り数分でできます。問題はその先——1日に何千本も来る音声を、落ちずに・二重課金せず・コストを抑えて捌く本番サービスにする設計です。
本稿は、特定ツールに依存しない「重いGPU処理を本番APIにする」アーキテクチャを、型安全な実コードで示します。題材は音源分離ですが、ここで使う設計(非同期ジョブキュー・冪等性・回復性・可観測性)は、動画変換・文字起こし・画像生成など、あらゆる重いAI処理に転用できる普遍的な型です。
読み終えたときに、次ができる状態を目指します。
- なぜ「APIを直接叩く同期実装」が本番で破綻するのかを説明し、非同期アーキテクチャを設計できる。
- 冪等性・回復性・可観測性・コスト効率を、コードの構造として実装できる。
- GPUなしでワーカーをユニットテストし、Demucs↔UVRを差し替えても壊れない境界を引ける。
筆者について(信頼性の開示):私は、動画をアップロードするだけで多言語ローカライズまで全自動化するAI動画ローカライズ基盤を単独で設計・実装し、本番運用しています。その第1段の音源分離は、まさに本稿の「GPUワーカー × ジョブキュー」で回しています。本稿の設計は、机上論ではなく実際にGPU代を溶かし、OOMで叩き起こされ、二重処理に課金された経験から逆算したものです。
30秒のまとめ
| 観点 | 設計 |
|---|---|
| 基本形 | API ingress → ジョブキュー → GPUワーカー → オブジェクトストレージ(完全非同期) |
| なぜ非同期か | 分離は数十秒〜数分。同期で返すとタイムアウト・二重実行・スケール不能 |
| 冪等性 | sha256(入力URI + パラメータ) をジョブキーに。完了済みはskip、再送は既存結果を返す |
| 配信保証 | キューは at-least-once。「成功してからack」「完了済みskip」で二重分離を防ぐ |
| 回復性 | OOMは段階縮退、失敗は指数バックオフ→DLQ、SIGTERMは現ジョブ完了後に終了 |
| 可観測性 | 構造化ログ(相関ID)+メトリクス(滞留・処理時間・device・GPU使用率) |
| コスト | スポットGPU・バッチ・キュー滞留でオートスケール・モデル常駐ロード |
| テスト容易性 | SeparationEngine を Protocol 化、FakeEngine でGPUなしテスト |
設計の全体像は次の5レイヤです。
なぜ「APIを直接叩く」と本番で破綻するのか
最初に誰もが書くのは、こういう同期実装です。
# ❌ アンチパターン: リクエスト内で分離を完走させる
@app.post("/separate")
def separate(audio_url: str):
stems = run_demucs(audio_url) # 数十秒〜数分ブロックする
return {"stems": stems}
これはデモでは動き、本番で必ず壊れます。理由は4つ。
- タイムアウト:分離は数十秒〜数分。ロードバランサ/ゲートウェイの上限(多くは30〜60秒)を超えて切られる。
- 二重実行:ユーザーが送信ボタンを連打/クライアントが自動リトライすると、同じ音源を何度もGPUで処理してGPU代を溶かす。
- スケール不能:1リクエストが1GPUを長時間占有。同時アクセスが来たら詰まる。バックプレッシャ(過負荷時に受け流す仕組み)が無い。
- 回復不能:処理中にワーカーが落ちたら、その仕事は消える。どこまで進んだか分からない。
解決は1つ——「受け付ける」と「処理する」を分離し、間にキューを挟むことです。
全体アーキテクチャ:5つのレイヤ
┌──────────┐ ①受け付ける(即返す) ┌──────────┐
│ Client │ ─────────────────────────▶ │ API │ ← FastAPI / 型安全な境界
└──────────┘ 202 Accepted + job_id │ ingress │
▲ └────┬─────┘
│ ④結果を取りに来る(poll/webhook) │ enqueue (at-least-once)
│ ▼
│ ┌──────────┐
│ │ Queue │ ← SQS / Redis(バックプレッシャ+再試行)
│ └────┬─────┘
│ │ reserve(visibility timeout)
│ ▼
┌─────┴──────┐ ③結果を保存 ┌──────────┐
│ Object │ ◀──────────────────── │ GPU │ ← モデル常駐・冪等処理・OOM回復
│ Storage │ │ Worker │
└────────────┘ └──────────┘
②重い処理はここだけ
各レイヤの**責務(SRP)**を1行で。
- API ingress:入力を検証し、冪等キーを作り、キューに積んで即
202を返す。重い処理は一切しない。 - Queue:仕事を貯め、再試行とバックプレッシャを担う。at-least-once配信。
- GPU Worker:キューから取り、分離し、結果を保存する。唯一GPUを触る層。
- Object Storage:入力音声と出力ステムを置く(S3など)。
- Job Store:ジョブの状態(queued/running/done/failed)と結果URIを持つ(DB/Redis)。冪等性の土台。
以下、レイヤごとに実装します。
レイヤ1:API ingress(型安全な受け口)
境界では外部入力を必ず検証します(Pydantic v2)。詳細はPydantic v2の記事・FastAPIの記事に。
import hashlib
from pydantic import BaseModel, Field, field_validator
class SeparationJob(BaseModel):
"""分離ジョブの不変な仕様。これがそのまま冪等キーの素になる。"""
model_config = {"frozen": True} # 不変=キーの安定性を保証
audio_uri: str = Field(..., description="s3://bucket/key 形式のみ許可")
engine: str = Field("demucs", pattern="^(demucs|uvr)$") # 許可リストで制限
two_stems: bool = False
@field_validator("audio_uri")
@classmethod
def _only_trusted_scheme(cls, v: str) -> str:
# セキュリティ: 任意URLを許すとSSRFの穴になる。自社バケットのs3://に限定
if not v.startswith("s3://"):
raise ValueError("audio_uri must be an s3:// URI")
return v
def job_key(job: SeparationJob) -> str:
"""同じ入力×同じ設定なら必ず同じキー(冪等性の起点)。"""
return hashlib.sha256(job.model_dump_json().encode()).hexdigest()
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.post("/v1/separations", status_code=202)
async def create(job: SeparationJob) -> dict:
key = job_key(job)
# 冪等性: 既存ジョブがあれば作り直さず返す(連打・自動リトライで二重投入しない)
if (existing := await store.get(key)) is not None:
return {"job_id": key, "status": existing.status, "cached": True}
await store.put(key, status="queued")
await queue.enqueue(key, job) # キューに積むだけ。GPUは触らない
return {"job_id": key, "status": "queued", "cached": False}
@app.get("/v1/separations/{job_id}")
async def status(job_id: str) -> dict:
if (rec := await store.get(job_id)) is None:
raise HTTPException(status_code=404, detail="job not found")
return {"job_id": job_id, "status": rec.status, "stems": rec.stems}
ポイントは、ingressが**「検証・冪等キー・enqueue・即返す」しかしない**こと。これでタイムアウトも二重実行も構造的に消えます。
レイヤ2:ジョブキュー(再試行とバックプレッシャ)
キューは実装を Protocol で抽象化します。SQSでもRedisでも、ワーカーのコードを変えずに差し替えられる(ETC)。
from typing import Protocol
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class Reserved:
handle: str # ack/nackに使う予約ハンドル
job_id: str
job: "SeparationJob"
receive_count: int # 何回配信されたか(DLQ判定に使う)
class JobQueue(Protocol):
async def enqueue(self, job_id: str, job: SeparationJob) -> None: ...
async def reserve(self) -> Reserved | None: ... # ロングポーリング
async def ack(self, handle: str) -> None: ... # 完了。キューから消す
async def nack(self, handle: str) -> None: ... # 失敗。可視性タイムアウト後に再配信
SQSなら reserve は ReceiveMessage(visibility timeout付き)、ack は DeleteMessage、nack は可視性タイムアウトを0に戻すだけ。at-least-once配信(少なくとも1回。重複あり)が前提なので、二重処理対策は次のワーカー側で行います。SQS/Lambdaでの冪等な非同期処理は専用記事で詳述しています。
レイヤ3:GPUワーカー(本体)
ここが心臓部です。要件は ①モデルは常駐ロード(コールドスタート回避)②冪等処理 ③OOM回復 ④graceful shutdown。
まず、分離エンジンを Protocol で抽象化します。これにより Demucs↔UVR の差し替えも、GPUなしテストも可能になる(ETC+テスト容易性)。
from pathlib import Path
from typing import Protocol
class SeparationEngine(Protocol):
"""音源分離の抽象(port)。具体ツールはこの裏に隠す。"""
name: str
def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
"""{ステム名: 出力ファイルパス} を返す。失敗時は例外を送出。"""
...
import torch
import demucs.api
class DemucsEngine:
"""SeparationEngine の Demucs 実装(adapter)。"""
def __init__(self, model: str = "htdemucs") -> None:
device = "cuda" if torch.cuda.is_available() else "cpu"
# モデルは起動時に1度だけロードして常駐させる(毎ジョブのロードを避ける)
self._sep = demucs.api.Separator(model=model, device=device)
self.name = f"demucs:{model}"
def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
_, stems = self._sep.separate_audio_file(str(audio))
out_dir.mkdir(parents=True, exist_ok=True)
if two_stems: # vocals / no_vocals に畳む
stems = {"vocals": stems["vocals"],
"no_vocals": sum(s for n, s in stems.items() if n != "vocals")}
outputs: dict[str, Path] = {}
for name, src in stems.items():
path = out_dir / f"{name}.wav"
demucs.api.save_audio(src, str(path), samplerate=self._sep.samplerate)
outputs[name] = path
return outputs
そしてワーカーループ。冪等性とack順序が肝です。
import logging, tempfile, threading
from pathlib import Path
log = logging.getLogger("separation.worker")
def run_worker(engine: SeparationEngine, queue: JobQueue,
store: "JobStore", blob: "BlobStore", *, stop: threading.Event) -> None:
while not stop.is_set():
reserved = queue_reserve_blocking(queue) # ロングポーリング
if reserved is None:
continue
ctx = {"job_id": reserved.job_id, "engine": engine.name} # 相関ID
# 冪等性①: 再配信(at-least-once)で来た「完了済み」は処理せずackだけ
if store.is_done(reserved.job_id):
queue.ack(reserved.handle)
log.info("skip.already_done", extra=ctx)
continue
try:
with tempfile.TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
local = blob.download(reserved.job.audio_uri, tmpdir / "in")
stems = separate_with_recovery(engine, local, tmpdir / "out",
two_stems=reserved.job.two_stems)
# 結果をオブジェクトストレージへ(job_idで名前空間を切る=上書き安全)
uris = {n: blob.upload(p, f"{reserved.job_id}/{n}.wav")
for n, p in stems.items()}
store.mark_done(reserved.job_id, uris)
queue.ack(reserved.handle) # 冪等性②: 成功して初めてack
log.info("separation.done", extra={**ctx, "stems": list(uris)})
except Exception as e: # 回復性: 失敗はnack→再試行/DLQ
store.mark_failed(reserved.job_id, type(e).__name__)
queue.nack(reserved.handle)
log.error("separation.failed", extra={**ctx, "error": type(e).__name__})
なぜこの順序か:分離が成功し、結果を保存し終えて初めてackします。途中で落ちてもackしていないので、キューは可視性タイムアウト後に再配信してくれる(仕事は消えない)。再配信で同じジョブが来ても、is_doneチェックで二重分離しない。この2つで、at-least-onceキューの上に実質exactly-onceの処理を構築できます。
OOM回復(段階縮退)
GPUのメモリ不足を例外ではなく正常系として扱います(回復性パターンの記事も参照)。
def separate_with_recovery(engine: SeparationEngine, audio: Path, out: Path,
*, two_stems: bool) -> dict[str, Path]:
try:
return engine.separate(audio, out, two_stems=two_stems)
except torch.cuda.OutOfMemoryError:
torch.cuda.empty_cache()
# 最後の砦: CPUエンジンへ退避(遅いが完走する)。冪等なので安全に作り直せる
log.warning("oom.fallback_to_cpu")
return DemucsEngine_cpu().separate(audio, out, two_stems=two_stems)
(segmentを段階的に縮める詳細版はDemucsの記事に。)
Graceful shutdown(スポットGPU対策)
スポットインスタンスは予告して停止されます。SIGTERMで現ジョブを完了させてから終了すれば、仕事を取りこぼしません。
import signal
stop = threading.Event()
# SIGTERM受信時: ループの先頭で抜ける。実行中のジョブは最後までやり切る
signal.signal(signal.SIGTERM, lambda *_: stop.set())
万一強制killされても、ack前なのでキューが再配信し、冪等性で安全に再実行されます。「優雅に止まる」と「乱暴に殺されても平気」の二重の備えが、スポットGPUを安心して使える条件です。
可観測性:止まった処理を一目で追う
「なぜこの曲だけ遅い/失敗した」を後から追えなければ、本番は回せません。三点を構造化して出します。
- 構造化ログ+相関ID:全ログに
job_idを通す。printではなくJSON構造化ログで、検索・集計可能に。音声の中身(PII)は出さない。 - メトリクス:
queue_depth(滞留)、separation_duration_seconds(処理時間・モデル別)、device(cuda/cpu)、gpu_utilization。滞留が増え続けたらスケール不足、CPU率が上がったらGPU脱落の事故。 - トレース:ingress→queue→worker→storage を1本のトレースで繋ぐ。OpenTelemetryでの相関は専用記事に。
import time
def timed_separation(engine, audio, out, *, two_stems, metrics):
start = time.monotonic()
device = "cuda" if torch.cuda.is_available() else "cpu"
try:
return separate_with_recovery(engine, audio, out, two_stems=two_stems)
finally:
# 症状ベースの監視: 処理時間・deviceをラベル付きで記録
metrics.observe("separation_duration_seconds",
time.monotonic() - start,
labels={"engine": engine.name, "device": device})
コスト効率:GPUを遊ばせない
音源分離はGPU時間がそのままお金です。単価を刻む打ち手は4つ。
- モデル常駐ロード:毎ジョブでモデルをロードし直さない(ワーカー起動時に1回)。コールドスタートのGPU時間をゼロに。
- スポットGPU:オンデマンドの数分の1。graceful shutdown+冪等性があるので中断されても損しない。
- キュー滞留でオートスケール:
queue_depthをターゲットにワーカー数を増減。暇ならゼロまでスケールイン(アイドルGPUを持たない)。 - バッチ処理:複数曲をまとめて1回のGPU呼び出しに(
-j並列)。スループットを上げ、起動コストを償却。
具体的なGPU基盤の置き場所(ECS/EKS)とコスト最適化はECS vs EKSの記事・FinOpsの記事に。**冪等キャッシュで「同じ音源の再分離ゼロ」**を足せば、無駄打ちが直接消えます。
テスト容易性:GPUなしでワーカーを検証する
SeparationEngine を Protocol にした最大の恩恵がこれです。GPUもDemucsも無いCIで、ワーカーの冪等性・回復性をユニットテストできます。
class FakeEngine:
"""テスト用の分離エンジン。GPU不要で決定的な出力を返す。"""
name = "fake"
calls = 0
def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
type(self).calls += 1
out_dir.mkdir(parents=True, exist_ok=True)
names = ["vocals", "no_vocals"] if two_stems else ["vocals", "drums", "bass", "other"]
return {n: _touch(out_dir / f"{n}.wav") for n in names}
def test_redelivery_does_not_double_process(fake_queue, fake_store, fake_blob):
"""at-least-onceで同じジョブが2回来ても、分離は1回しか走らない。"""
engine = FakeEngine()
job_id = enqueue_and_run(engine, fake_queue, fake_store, fake_blob, times=2)
assert fake_store.is_done(job_id)
assert FakeEngine.calls == 1 # ← 冪等性の核心。2回流しても1回だけ
「GPUがないとテストできない」は設計の敗北です。重い副作用(GPU・I/O)を境界の向こうに追い出すだけで、ロジックは高速・決定的にテストできます。これが SRP と依存性逆転の実利です。
よくある質問(FAQ)
Q. キューはSQSとRedis、どちら? A. AWSに寄せるならSQS(DLQ・可視性タイムアウトが標準装備)。軽く始めるならRedis(RQ/Celery)。Protocolで抽象化してあるので後から差し替え可能。Celery/Redisの実装は専用記事に。
Q. 結果通知はpollとwebhook、どちら?
A. 両対応が理想。社内バッチはpoll(GET /status)で十分、外部連携はwebhook(署名検証必須)。webhookはStripeの記事の署名検証パターンが流用できます。
Q. 「成功してからack」だと、保存後・ack前に落ちたら?
A. 再配信されますが、is_doneチェックでskipされるので二重分離しません。これがat-least-onceの上でexactly-onceを作る定石です。
Q. GPUは常時起動が必要? A. 不要。キュー滞留でオートスケールし、暇ならゼロに。コールドスタートはモデルロード分(数十秒)かかるので、SLA次第で最小1台を温存する判断もあり。
Q. この設計は音源分離以外にも使える? A. そのまま使えます。文字起こし・動画変換・画像/動画生成など、重い・冪等にしたい・GPUを使う処理は全部この型に乗ります。汎用の非同期処理基盤です。
まとめ:「動く」と「本番で稼ぐ」を分けるのは設計
音源分離を本番サービスにする鍵は、モデルの良し悪しではなくアーキテクチャです。
- 受け付けると処理するを分離し、間にキューを置く(タイムアウト・二重実行の根絶)。
- 冪等性(sha256キー+完了skip)とat-least-once+成功後ackで、二重分離を構造的に止める。
- 回復性(OOM縮退・nack/DLQ・graceful shutdown)で、失敗を正常系にする。
- 可観測性・コスト・テスト容易性を、後付けでなく最初から構造に織り込む。
ここまでやって初めて、デモではなく「何千本来ても落ちない」サービスになります。そして——この設計判断こそ、外注で最も価値が出るところです。モデルを繋ぐだけのPoCは誰でも作れますが、GPU代を溶かさず・二重課金せず・スポットでも平気な本番基盤は、痛い目を見た数がそのまま品質になります。
私は、本稿のアーキテクチャを実際に本番運用しているAI動画ローカライズ基盤のGPU処理層で実装しました。音源分離に限らず、重いAI処理の本番化・コスト最適化・信頼性設計をお考えなら、実績をご覧のうえご相談ください。一人 × 生成AIで、PoCから本番運用まで一気通貫で作ります。
出典・関連リソース
- 個別ツール:Demucs 完全ガイド / UVR5・MDX-Net 完全ガイド / ツール選定
- 非同期・信頼性:SQS/Lambda 冪等な非同期処理 / リトライ・バックオフ・サーキットブレーカ / Transactional Outbox
- 基盤・観測:FastAPI 本番設計 / OpenTelemetry 可観測性 / ECS vs EKS
※ 本稿のコードは設計の骨子を示すものです。実装時は各ライブラリ・クラウドの最新仕様を一次情報で確認してください。