Skip to main content
友田 陽大
Audio source separation & preprocessing
音源分離
MLOps
アーキテクチャ設計
Python
GPU
FastAPI
冪等性

Turning source separation into a production API: the design of GPU worker × job queue × idempotency

Taking source separation like Demucs from a demo to a production service. It explains, in type-safe FastAPI ingress and Python worker real code, an architecture that puts heavy GPU processing on an asynchronous job queue and guarantees idempotency, resilience, observability, and cost efficiency. It covers the design needed for production operation, from OOM recovery, graceful shutdown, at-least-once delivery, GPU auto-scaling, to testability.

Published
Reading time
14 min read
Author
友田 陽大
Share

The goal of this article

Getting source separation to "run" with Demucs or UVR5/MDX-Net takes minutes, as in each dedicated article. The problem is beyond that — the design to make it a production service that handles thousands of audio files a day, without crashing, without double-charging, while holding down cost.

This article shows a tool-independent "turn heavy GPU processing into a production API" architecture in type-safe real code. The subject is source separation, but the design used here (asynchronous job queue, idempotency, resilience, observability) is a universal form transferable to any heavy AI processing, like video transcoding, transcription, and image generation.

When you finish reading, the aim is a state where you can do the following.

  1. You can explain why "a synchronous implementation that hits the API directly" breaks down in production and design an asynchronous architecture.
  2. You can implement idempotency, resilience, observability, and cost efficiency as the code's structure.
  3. You can unit-test the worker without a GPU and draw a boundary that doesn't break even when swapping Demucs↔UVR.

About the author (disclosure of credibility): I single-handedly designed, implemented, and run in production an AI-video-localization platform that fully automates multilingual localization just by uploading a video. I run its first stage, source separation, with exactly this article's "GPU worker × job queue." This article's design isn't desk theory but worked backward from the experience of actually melting GPU bills, being woken up by OOM, and being charged for double processing.


30-second summary

ViewpointDesign
Basic formAPI ingress → job queue → GPU worker → object storage (fully asynchronous)
Why asyncseparation takes a few dozen seconds to a few minutes. Returning synchronously means timeout, double execution, can't scale
Idempotencysha256(input URI + parameters) as the job key. Skip the completed, return the existing result on re-send
Delivery guaranteethe queue is at-least-once. Prevent double separation with "ack after success" and "skip the completed"
ResilienceOOM is graceful degradation, failure is exponential backoff → DLQ, SIGTERM exits after completing the current job
Observabilitystructured logs (correlation ID) + metrics (stagnation, processing time, device, GPU utilization)
Costspot GPU, batches, auto-scale on queue stagnation, model-resident loading
Testabilitymake SeparationEngine a Protocol, test without a GPU with FakeEngine

The overall picture of the design is the next 5 layers.


Why "hitting the API directly" breaks down in production

The first thing everyone writes is this kind of synchronous implementation.

# ❌ アンチパターン: リクエスト内で分離を完走させる
@app.post("/separate")
def separate(audio_url: str):
    stems = run_demucs(audio_url)   # 数十秒〜数分ブロックする
    return {"stems": stems}

This works in a demo and definitely breaks in production. There are four reasons.

  • Timeout: separation takes a few dozen seconds to a few minutes. It's cut off exceeding the load balancer's / gateway's limit (often 30–60 seconds).
  • Double execution: if the user mashes the submit button / the client auto-retries, the same source is processed by the GPU many times and melts the GPU bill.
  • Can't scale: one request occupies one GPU for a long time. It clogs when concurrent access comes. There's no backpressure (a mechanism to deflect when overloaded).
  • Can't recover: if the worker crashes during processing, that work disappears. You can't tell how far it progressed.

There's one solution — separate "accept" and "process" and interpose a queue between them.


Overall architecture: the 5 layers

┌──────────┐   ① accept (return immediately)  ┌──────────┐
│  Client  │ ─────────────────────────▶ │   API    │  ← FastAPI / type-safe boundary
└──────────┘   202 Accepted + job_id   │ ingress  │
      ▲                                  └────┬─────┘
      │ ④ come fetch the result (poll/webhook) │ enqueue (at-least-once)
      │                                       ▼
      │                                  ┌──────────┐
      │                                  │  Queue   │  ← SQS / Redis (backpressure + retry)
      │                                  └────┬─────┘
      │                                       │ reserve (visibility timeout)
      │                                       ▼
┌─────┴──────┐   ③ save the result        ┌──────────┐
│  Object    │ ◀──────────────────── │  GPU     │  ← model resident, idempotent, OOM recovery
│  Storage   │                        │  Worker  │
└────────────┘                        └──────────┘
                                       ② heavy processing only here

Each layer's responsibility (SRP) in one line.

  • API ingress: validate the input, make an idempotency key, enqueue the job, and return 202 immediately. Does no heavy processing at all.
  • Queue: accumulate work, handle retry and backpressure. At-least-once delivery.
  • GPU Worker: take from the queue, separate, and save the result. The only layer that touches the GPU.
  • Object Storage: place the input audio and output stems (S3, etc.).
  • Job Store: holds the job's state (queued/running/done/failed) and the result URI (DB/Redis). The foundation of idempotency.

Below, I implement it per layer.


Layer 1: API ingress (a type-safe receiver)

At the boundary, always validate external input (Pydantic v2). For details, see the Pydantic v2 article and the FastAPI article.

import hashlib
from pydantic import BaseModel, Field, field_validator

class SeparationJob(BaseModel):
    """分離ジョブの不変な仕様。これがそのまま冪等キーの素になる。"""
    model_config = {"frozen": True}  # 不変=キーの安定性を保証

    audio_uri: str = Field(..., description="s3://bucket/key 形式のみ許可")
    engine: str = Field("demucs", pattern="^(demucs|uvr)$")  # 許可リストで制限
    two_stems: bool = False

    @field_validator("audio_uri")
    @classmethod
    def _only_trusted_scheme(cls, v: str) -> str:
        # セキュリティ: 任意URLを許すとSSRFの穴になる。自社バケットのs3://に限定
        if not v.startswith("s3://"):
            raise ValueError("audio_uri must be an s3:// URI")
        return v

def job_key(job: SeparationJob) -> str:
    """同じ入力×同じ設定なら必ず同じキー(冪等性の起点)。"""
    return hashlib.sha256(job.model_dump_json().encode()).hexdigest()
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/v1/separations", status_code=202)
async def create(job: SeparationJob) -> dict:
    key = job_key(job)
    # 冪等性: 既存ジョブがあれば作り直さず返す(連打・自動リトライで二重投入しない)
    if (existing := await store.get(key)) is not None:
        return {"job_id": key, "status": existing.status, "cached": True}
    await store.put(key, status="queued")
    await queue.enqueue(key, job)          # キューに積むだけ。GPUは触らない
    return {"job_id": key, "status": "queued", "cached": False}

@app.get("/v1/separations/{job_id}")
async def status(job_id: str) -> dict:
    if (rec := await store.get(job_id)) is None:
        raise HTTPException(status_code=404, detail="job not found")
    return {"job_id": job_id, "status": rec.status, "stems": rec.stems}

The point is that ingress only does "validate, idempotency key, enqueue, return immediately." With this, both timeouts and double execution structurally disappear.


Layer 2: the job queue (retry and backpressure)

Abstract the queue implementation with a Protocol. Whether SQS or Redis, you can swap it without changing the worker's code (ETC).

from typing import Protocol
from dataclasses import dataclass

@dataclass(frozen=True, slots=True)
class Reserved:
    handle: str          # ack/nackに使う予約ハンドル
    job_id: str
    job: "SeparationJob"
    receive_count: int    # 何回配信されたか(DLQ判定に使う)

class JobQueue(Protocol):
    async def enqueue(self, job_id: str, job: SeparationJob) -> None: ...
    async def reserve(self) -> Reserved | None: ...   # ロングポーリング
    async def ack(self, handle: str) -> None: ...      # 完了。キューから消す
    async def nack(self, handle: str) -> None: ...     # 失敗。可視性タイムアウト後に再配信

For SQS, reserve is ReceiveMessage (with a visibility timeout), ack is DeleteMessage, and nack just resets the visibility timeout to 0. Since at-least-once delivery (at least once, with duplicates) is the premise, the double-processing countermeasure is done on the worker side next. Idempotent async processing with SQS/Lambda is detailed in the dedicated article.


Layer 3: the GPU worker (the body)

This is the heart. The requirements are ① the model is resident-loaded (cold-start avoidance) ② idempotent processing ③ OOM recovery ④ graceful shutdown.

First, abstract the separation engine with a Protocol. With this, both the Demucs↔UVR swap and GPU-less testing become possible (ETC + testability).

from pathlib import Path
from typing import Protocol

class SeparationEngine(Protocol):
    """音源分離の抽象(port)。具体ツールはこの裏に隠す。"""
    name: str
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        """{ステム名: 出力ファイルパス} を返す。失敗時は例外を送出。"""
        ...
import torch
import demucs.api

class DemucsEngine:
    """SeparationEngine の Demucs 実装(adapter)。"""
    def __init__(self, model: str = "htdemucs") -> None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        # モデルは起動時に1度だけロードして常駐させる(毎ジョブのロードを避ける)
        self._sep = demucs.api.Separator(model=model, device=device)
        self.name = f"demucs:{model}"

    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        _, stems = self._sep.separate_audio_file(str(audio))
        out_dir.mkdir(parents=True, exist_ok=True)
        if two_stems:  # vocals / no_vocals に畳む
            stems = {"vocals": stems["vocals"],
                     "no_vocals": sum(s for n, s in stems.items() if n != "vocals")}
        outputs: dict[str, Path] = {}
        for name, src in stems.items():
            path = out_dir / f"{name}.wav"
            demucs.api.save_audio(src, str(path), samplerate=self._sep.samplerate)
            outputs[name] = path
        return outputs

And the worker loop. Idempotency and ack order are the crux.

import logging, tempfile, threading
from pathlib import Path

log = logging.getLogger("separation.worker")

def run_worker(engine: SeparationEngine, queue: JobQueue,
               store: "JobStore", blob: "BlobStore", *, stop: threading.Event) -> None:
    while not stop.is_set():
        reserved = queue_reserve_blocking(queue)   # ロングポーリング
        if reserved is None:
            continue
        ctx = {"job_id": reserved.job_id, "engine": engine.name}  # 相関ID

        # 冪等性①: 再配信(at-least-once)で来た「完了済み」は処理せずackだけ
        if store.is_done(reserved.job_id):
            queue.ack(reserved.handle)
            log.info("skip.already_done", extra=ctx)
            continue

        try:
            with tempfile.TemporaryDirectory() as tmp:
                tmpdir = Path(tmp)
                local = blob.download(reserved.job.audio_uri, tmpdir / "in")
                stems = separate_with_recovery(engine, local, tmpdir / "out",
                                               two_stems=reserved.job.two_stems)
                # 結果をオブジェクトストレージへ(job_idで名前空間を切る=上書き安全)
                uris = {n: blob.upload(p, f"{reserved.job_id}/{n}.wav")
                        for n, p in stems.items()}
            store.mark_done(reserved.job_id, uris)
            queue.ack(reserved.handle)              # 冪等性②: 成功して初めてack
            log.info("separation.done", extra={**ctx, "stems": list(uris)})
        except Exception as e:                      # 回復性: 失敗はnack→再試行/DLQ
            store.mark_failed(reserved.job_id, type(e).__name__)
            queue.nack(reserved.handle)
            log.error("separation.failed", extra={**ctx, "error": type(e).__name__})

Why this order: separation succeeds, the result is saved, and only then ack. Even if it crashes midway, since it didn't ack, the queue re-delivers after the visibility timeout (the work doesn't disappear). Even if the same job comes via re-delivery, the is_done check means no double separation. With these two, you can build effectively-exactly-once processing on top of an at-least-once queue.

OOM recovery (graceful degradation)

Treat GPU memory shortage as a normal path, not an exception (also see the resilience-patterns article).

def separate_with_recovery(engine: SeparationEngine, audio: Path, out: Path,
                           *, two_stems: bool) -> dict[str, Path]:
    try:
        return engine.separate(audio, out, two_stems=two_stems)
    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        # 最後の砦: CPUエンジンへ退避(遅いが完走する)。冪等なので安全に作り直せる
        log.warning("oom.fallback_to_cpu")
        return DemucsEngine_cpu().separate(audio, out, two_stems=two_stems)

(The detailed version that gradually shrinks segment is in the Demucs article.)

Graceful shutdown (spot GPU countermeasure)

A spot instance is stopped with notice. If you complete the current job on SIGTERM before exiting, you don't miss work.

import signal

stop = threading.Event()
# SIGTERM受信時: ループの先頭で抜ける。実行中のジョブは最後までやり切る
signal.signal(signal.SIGTERM, lambda *_: stop.set())

Even if force-killed, since it's before ack, the queue re-delivers and it's safely re-executed with idempotency. The double preparation of "stop gracefully" and "fine even if killed roughly" is the condition for using a spot GPU with peace of mind.


Observability: trace a stalled process at a glance

If you can't later trace "why only this song is slow / failed," you can't run production. Output three things structured.

  • Structured logs + correlation ID: pass job_id through all logs. JSON structured logs, not print, to be searchable/aggregatable. Don't emit the audio content (PII).
  • Metrics: queue_depth (stagnation), separation_duration_seconds (processing time, per model), device (cuda/cpu), gpu_utilization. If stagnation keeps increasing, scale shortage; if the CPU rate rises, the accident of GPU dropout.
  • Trace: connect ingress→queue→worker→storage with one trace. Correlation with OpenTelemetry is in the dedicated article.
import time

def timed_separation(engine, audio, out, *, two_stems, metrics):
    start = time.monotonic()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    try:
        return separate_with_recovery(engine, audio, out, two_stems=two_stems)
    finally:
        # 症状ベースの監視: 処理時間・deviceをラベル付きで記録
        metrics.observe("separation_duration_seconds",
                        time.monotonic() - start,
                        labels={"engine": engine.name, "device": device})

Cost efficiency: don't idle the GPU

In source separation, GPU time is directly money. There are four moves to carve the unit price.

  1. Model-resident loading: don't reload the model per job (once at worker startup). Make the cold-start GPU time zero.
  2. Spot GPU: a fraction of on-demand. Since there's graceful shutdown + idempotency, you don't lose even if interrupted.
  3. Auto-scale on queue stagnation: increase/decrease the worker count targeting queue_depth. When idle, scale in to zero (don't hold an idle GPU).
  4. Batch processing: bundle multiple songs into one GPU call (-j parallel). Raise throughput and amortize the startup cost.

For the concrete placement of the GPU foundation (ECS/EKS) and cost optimization, see the ECS vs EKS article and the FinOps article. Adding "zero re-separation of the same source" with an idempotent cache directly erases wasted shots.


Testability: verify the worker without a GPU

The biggest benefit of making SeparationEngine a Protocol is this. In CI without a GPU or Demucs, you can unit-test the worker's idempotency and resilience.

class FakeEngine:
    """テスト用の分離エンジン。GPU不要で決定的な出力を返す。"""
    name = "fake"
    calls = 0
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        type(self).calls += 1
        out_dir.mkdir(parents=True, exist_ok=True)
        names = ["vocals", "no_vocals"] if two_stems else ["vocals", "drums", "bass", "other"]
        return {n: _touch(out_dir / f"{n}.wav") for n in names}

def test_redelivery_does_not_double_process(fake_queue, fake_store, fake_blob):
    """at-least-onceで同じジョブが2回来ても、分離は1回しか走らない。"""
    engine = FakeEngine()
    job_id = enqueue_and_run(engine, fake_queue, fake_store, fake_blob, times=2)
    assert fake_store.is_done(job_id)
    assert FakeEngine.calls == 1   # ← 冪等性の核心。2回流しても1回だけ

"Can't test without a GPU" is a defeat of design. Just by driving the heavy side effects (GPU, I/O) beyond a boundary, the logic can be tested fast and deterministically. This is the practical benefit of SRP and dependency inversion.


Frequently asked questions (FAQ)

Q. SQS or Redis for the queue? A. If leaning on AWS, SQS (DLQ and visibility timeout are standard). If starting light, Redis (RQ/Celery). Since it's abstracted with a Protocol, it's swappable later. The Celery/Redis implementation is in the dedicated article.

Q. poll or webhook for result notification? A. Supporting both is ideal. An internal batch is fine with poll (GET /status), and external integration with webhook (signature verification mandatory). For webhooks, the signature-verification pattern of the Stripe article can be reused.

Q. With "ack after success," what if it crashes after saving but before ack? A. It's re-delivered, but the is_done check skips it, so no double separation. This is the standard for building exactly-once on top of at-least-once.

Q. Does the GPU need to be always on? A. No. Auto-scale on queue stagnation, and zero when idle. Since the cold start takes the model-load time (a few dozen seconds), there's also the judgment to keep a minimum of 1 unit warm depending on the SLA.

Q. Can this design be used for things other than source separation? A. It can be used as-is. Transcription, video transcoding, image/video generation — any processing that's heavy, you want idempotent, and uses a GPU all rides on this form. It's a general-purpose async-processing foundation.


Summary: what separates "works" from "earns in production" is design

The key to making source separation a production service isn't the model's quality but the architecture.

  1. Separate accept and process and put a queue between them (eradicate timeout, double execution).
  2. With idempotency (sha256 key + skip the completed) and at-least-once + ack after success, structurally stop double separation.
  3. With resilience (OOM degradation, nack/DLQ, graceful shutdown), make failure the normal path.
  4. Weave observability, cost, and testability into the structure from the start, not as an afterthought.

Only after doing all this does it become not a demo but a service that "doesn't crash no matter how many thousands come." And — this design judgment is exactly where the most value comes out in contracting. Anyone can make a PoC that just connects models, but a production foundation that doesn't melt the GPU bill, doesn't double-charge, and is fine even on spot has, as its quality, the number of times you've been burned.

I implemented this article's architecture in the GPU-processing layer of an AI-video-localization platform I actually run in production. Not limited to source separation, if you're considering the productionization, cost optimization, or reliability design of heavy AI processing, please see my track record and consult me. With one person × generative AI, I build end-to-end from PoC to production operation.


※ This article's code shows the design's skeleton. When implementing, confirm the latest specs of each library/cloud in the primary sources.

友田

友田 陽大

Developer of a METI Minister's Award–winning product. With TypeScript + Python + AWS, I deliver SaaS, industry DX, and production-grade generative AI (RAG) end to end — from requirements to infrastructure and operations — single-handedly.

Got a challenge?

From design to implementation and operations — solo × generative AI

Implementation like this article's, end to end from requirements to production. Start with a free 30-minute technical consult and tell me about your situation.

Available for both project-based (contract) and advisory engagements. Start with a free 30-minute consult.

Also worth reading