# Turning source separation into a production API: the design of GPU worker × job queue × idempotency

> Taking source separation like Demucs from a demo to a production service. It explains, in type-safe FastAPI ingress and Python worker real code, an architecture that puts heavy GPU processing on an asynchronous job queue and guarantees idempotency, resilience, observability, and cost efficiency. It covers the design needed for production operation, from OOM recovery, graceful shutdown, at-least-once delivery, GPU auto-scaling, to testability.

- Published: 2026-06-25
- Author: 友田 陽大
- Tags: 音源分離, MLOps, アーキテクチャ設計, Python, GPU, FastAPI, 冪等性
- URL: https://tomodahinata.com/en/blog/music-source-separation-production-api-gpu-worker-queue
- Category: Audio source separation & preprocessing
- Pillar guide: https://tomodahinata.com/en/blog/music-source-separation-tool-selection-demucs-uvr-spleeter

## Key points

- Returning heavy GPU processing like source separation in a synchronous API definitely breaks down with long requests, timeouts, and double processing. The correct answer is the asynchronous separation of 'API ingress → job queue → GPU worker → object storage.'
- Idempotency is guaranteed with a job key of sha256(input URI + parameters). The at-least-once delivery of the queue (SQS/Redis) structurally prevents double separation with 'ack after success + skip the completed.'
- Resilience is multi-layered: OOM is graceful degradation of segment shrink → CPU fallback, failure is nack → exponential backoff → DLQ after N times, and SIGTERM exits after completing the current job. Since there's idempotency, it's safe even if force-killed, via re-delivery.
- Observability is structured logs (correlation ID) + metrics (queue stagnation, processing time, device). Cost is optimized with spot GPU, batches, auto-scaling on queue stagnation, and model-resident loading.
- Testability abstracts SeparationEngine with Protocol and unit-tests idempotency/resilience with a GPU-less FakeEngine. The Demucs↔UVR swap is also absorbed at the same boundary (ETC).

---

## The goal of this article

Getting source separation to "run" with [Demucs](/blog/demucs-v4-music-source-separation-production-guide) or [UVR5/MDX-Net](/blog/uvr5-mdx-net-vocal-separation-production-guide) takes minutes, as in each dedicated article. The problem is beyond that — the design to make it a **production service that handles thousands of audio files a day, without crashing, without double-charging, while holding down cost.**

This article shows a **tool-independent "turn heavy GPU processing into a production API" architecture** in type-safe real code. The subject is source separation, but the design used here (asynchronous job queue, idempotency, resilience, observability) is a **universal form transferable to any heavy AI processing, like video transcoding, transcription, and image generation.**

When you finish reading, the aim is a state where you can do the following.

1. You can explain why "a synchronous implementation that hits the API directly" breaks down in production and **design an asynchronous architecture.**
2. You can implement **idempotency, resilience, observability, and cost efficiency** as the code's structure.
3. You can **unit-test the worker without a GPU** and draw a boundary that doesn't break even when swapping Demucs↔UVR.

> **About the author (disclosure of credibility)**: I **single-handedly designed, implemented, and run in production an AI-video-localization platform** that fully automates multilingual localization just by uploading a video. I run its first stage, source separation, with exactly this article's "GPU worker × job queue." This article's design isn't desk theory but worked backward from **the experience of actually melting GPU bills, being woken up by OOM, and being charged for double processing.**

---

## 30-second summary

| Viewpoint | Design |
| --- | --- |
| **Basic form** | API ingress → **job queue** → GPU worker → object storage (fully asynchronous) |
| **Why async** | separation takes a few dozen seconds to a few minutes. Returning synchronously means timeout, double execution, can't scale |
| **Idempotency** | `sha256(input URI + parameters)` as the job key. Skip the completed, return the existing result on re-send |
| **Delivery guarantee** | the queue is **at-least-once.** Prevent double separation with "ack after success" and "skip the completed" |
| **Resilience** | OOM is graceful degradation, failure is exponential backoff → DLQ, SIGTERM exits after completing the current job |
| **Observability** | structured logs (correlation ID) + metrics (stagnation, processing time, device, GPU utilization) |
| **Cost** | spot GPU, batches, **auto-scale on queue stagnation,** model-resident loading |
| **Testability** | make `SeparationEngine` a Protocol, test without a GPU with `FakeEngine` |

The overall picture of the design is the next [5 layers](#overall-architecture-the-5-layers).

---

## Why "hitting the API directly" breaks down in production

The first thing everyone writes is this kind of synchronous implementation.

```python
# ❌ アンチパターン: リクエスト内で分離を完走させる
@app.post("/separate")
def separate(audio_url: str):
    stems = run_demucs(audio_url)   # 数十秒〜数分ブロックする
    return {"stems": stems}
```

This **works in a demo and definitely breaks in production.** There are four reasons.

- **Timeout**: separation takes a few dozen seconds to a few minutes. It's cut off exceeding the load balancer's / gateway's limit (often 30–60 seconds).
- **Double execution**: if the user mashes the submit button / the client auto-retries, **the same source is processed by the GPU many times** and melts the GPU bill.
- **Can't scale**: one request occupies one GPU for a long time. It clogs when concurrent access comes. There's no backpressure (a mechanism to deflect when overloaded).
- **Can't recover**: if the worker crashes during processing, that work **disappears.** You can't tell how far it progressed.

There's one solution — **separate "accept" and "process"** and interpose a **queue** between them.

---

## Overall architecture: the 5 layers

```text
┌──────────┐   ① accept (return immediately)  ┌──────────┐
│  Client  │ ─────────────────────────▶ │   API    │  ← FastAPI / type-safe boundary
└──────────┘   202 Accepted + job_id   │ ingress  │
      ▲                                  └────┬─────┘
      │ ④ come fetch the result (poll/webhook) │ enqueue (at-least-once)
      │                                       ▼
      │                                  ┌──────────┐
      │                                  │  Queue   │  ← SQS / Redis (backpressure + retry)
      │                                  └────┬─────┘
      │                                       │ reserve (visibility timeout)
      │                                       ▼
┌─────┴──────┐   ③ save the result        ┌──────────┐
│  Object    │ ◀──────────────────── │  GPU     │  ← model resident, idempotent, OOM recovery
│  Storage   │                        │  Worker  │
└────────────┘                        └──────────┘
                                       ② heavy processing only here
```

Each layer's **responsibility (SRP)** in one line.

- **API ingress**: validate the input, make an idempotency key, enqueue the job, and **return `202` immediately.** Does no heavy processing at all.
- **Queue**: accumulate work, handle retry and backpressure. **At-least-once delivery.**
- **GPU Worker**: take from the queue, separate, and save the result. **The only layer that touches the GPU.**
- **Object Storage**: place the input audio and output stems (S3, etc.).
- **Job Store**: holds the job's state (queued/running/done/failed) and the result URI (DB/Redis). The foundation of idempotency.

Below, I implement it per layer.

---

## Layer 1: API ingress (a type-safe receiver)

At the boundary, **always validate external input** (Pydantic v2). For details, see the [Pydantic v2 article](/blog/pydantic-v2-production-validation-type-safety) and the [FastAPI article](/blog/fastapi-production-async-pydantic-observability-guide).

```python
import hashlib
from pydantic import BaseModel, Field, field_validator

class SeparationJob(BaseModel):
    """分離ジョブの不変な仕様。これがそのまま冪等キーの素になる。"""
    model_config = {"frozen": True}  # 不変＝キーの安定性を保証

    audio_uri: str = Field(..., description="s3://bucket/key 形式のみ許可")
    engine: str = Field("demucs", pattern="^(demucs|uvr)$")  # 許可リストで制限
    two_stems: bool = False

    @field_validator("audio_uri")
    @classmethod
    def _only_trusted_scheme(cls, v: str) -> str:
        # セキュリティ: 任意URLを許すとSSRFの穴になる。自社バケットのs3://に限定
        if not v.startswith("s3://"):
            raise ValueError("audio_uri must be an s3:// URI")
        return v

def job_key(job: SeparationJob) -> str:
    """同じ入力×同じ設定なら必ず同じキー（冪等性の起点）。"""
    return hashlib.sha256(job.model_dump_json().encode()).hexdigest()
```

```python
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/v1/separations", status_code=202)
async def create(job: SeparationJob) -> dict:
    key = job_key(job)
    # 冪等性: 既存ジョブがあれば作り直さず返す（連打・自動リトライで二重投入しない）
    if (existing := await store.get(key)) is not None:
        return {"job_id": key, "status": existing.status, "cached": True}
    await store.put(key, status="queued")
    await queue.enqueue(key, job)          # キューに積むだけ。GPUは触らない
    return {"job_id": key, "status": "queued", "cached": False}

@app.get("/v1/separations/{job_id}")
async def status(job_id: str) -> dict:
    if (rec := await store.get(job_id)) is None:
        raise HTTPException(status_code=404, detail="job not found")
    return {"job_id": job_id, "status": rec.status, "stems": rec.stems}
```

The point is that ingress **only does "validate, idempotency key, enqueue, return immediately."** With this, both timeouts and double execution structurally disappear.

---

## Layer 2: the job queue (retry and backpressure)

**Abstract the queue implementation with a Protocol.** Whether SQS or Redis, you can swap it without changing the worker's code (ETC).

```python
from typing import Protocol
from dataclasses import dataclass

@dataclass(frozen=True, slots=True)
class Reserved:
    handle: str          # ack/nackに使う予約ハンドル
    job_id: str
    job: "SeparationJob"
    receive_count: int    # 何回配信されたか（DLQ判定に使う）

class JobQueue(Protocol):
    async def enqueue(self, job_id: str, job: SeparationJob) -> None: ...
    async def reserve(self) -> Reserved | None: ...   # ロングポーリング
    async def ack(self, handle: str) -> None: ...      # 完了。キューから消す
    async def nack(self, handle: str) -> None: ...     # 失敗。可視性タイムアウト後に再配信
```

For SQS, `reserve` is `ReceiveMessage` (with a visibility timeout), `ack` is `DeleteMessage`, and `nack` just resets the visibility timeout to 0. Since **at-least-once delivery** (at least once, with duplicates) is the premise, the double-processing countermeasure is done on the worker side next. Idempotent async processing with SQS/Lambda is detailed in the [dedicated article](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide).

---

## Layer 3: the GPU worker (the body)

This is the heart. The requirements are **① the model is resident-loaded (cold-start avoidance) ② idempotent processing ③ OOM recovery ④ graceful shutdown.**

First, **abstract the separation engine with a Protocol.** With this, both the Demucs↔UVR swap and GPU-less testing become possible (ETC + testability).

```python
from pathlib import Path
from typing import Protocol

class SeparationEngine(Protocol):
    """音源分離の抽象（port）。具体ツールはこの裏に隠す。"""
    name: str
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        """{ステム名: 出力ファイルパス} を返す。失敗時は例外を送出。"""
        ...
```

```python
import torch
import demucs.api

class DemucsEngine:
    """SeparationEngine の Demucs 実装（adapter）。"""
    def __init__(self, model: str = "htdemucs") -> None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
        # モデルは起動時に1度だけロードして常駐させる（毎ジョブのロードを避ける）
        self._sep = demucs.api.Separator(model=model, device=device)
        self.name = f"demucs:{model}"

    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        _, stems = self._sep.separate_audio_file(str(audio))
        out_dir.mkdir(parents=True, exist_ok=True)
        if two_stems:  # vocals / no_vocals に畳む
            stems = {"vocals": stems["vocals"],
                     "no_vocals": sum(s for n, s in stems.items() if n != "vocals")}
        outputs: dict[str, Path] = {}
        for name, src in stems.items():
            path = out_dir / f"{name}.wav"
            demucs.api.save_audio(src, str(path), samplerate=self._sep.samplerate)
            outputs[name] = path
        return outputs
```

And the worker loop. **Idempotency and ack order** are the crux.

```python
import logging, tempfile, threading
from pathlib import Path

log = logging.getLogger("separation.worker")

def run_worker(engine: SeparationEngine, queue: JobQueue,
               store: "JobStore", blob: "BlobStore", *, stop: threading.Event) -> None:
    while not stop.is_set():
        reserved = queue_reserve_blocking(queue)   # ロングポーリング
        if reserved is None:
            continue
        ctx = {"job_id": reserved.job_id, "engine": engine.name}  # 相関ID

        # 冪等性①: 再配信(at-least-once)で来た「完了済み」は処理せずackだけ
        if store.is_done(reserved.job_id):
            queue.ack(reserved.handle)
            log.info("skip.already_done", extra=ctx)
            continue

        try:
            with tempfile.TemporaryDirectory() as tmp:
                tmpdir = Path(tmp)
                local = blob.download(reserved.job.audio_uri, tmpdir / "in")
                stems = separate_with_recovery(engine, local, tmpdir / "out",
                                               two_stems=reserved.job.two_stems)
                # 結果をオブジェクトストレージへ（job_idで名前空間を切る＝上書き安全）
                uris = {n: blob.upload(p, f"{reserved.job_id}/{n}.wav")
                        for n, p in stems.items()}
            store.mark_done(reserved.job_id, uris)
            queue.ack(reserved.handle)              # 冪等性②: 成功して初めてack
            log.info("separation.done", extra={**ctx, "stems": list(uris)})
        except Exception as e:                      # 回復性: 失敗はnack→再試行/DLQ
            store.mark_failed(reserved.job_id, type(e).__name__)
            queue.nack(reserved.handle)
            log.error("separation.failed", extra={**ctx, "error": type(e).__name__})
```

**Why this order**: separation succeeds, the result is saved, and **only then `ack`.** Even if it crashes midway, since it didn't `ack`, the queue **re-delivers** after the visibility timeout (the work doesn't disappear). Even if the same job comes via re-delivery, the `is_done` check means **no double separation.** With these two, you can build **effectively-exactly-once** processing on top of an at-least-once queue.

### OOM recovery (graceful degradation)

Treat GPU memory shortage as a **normal path, not an exception** (also see the [resilience-patterns article](/blog/retry-backoff-circuit-breaker-resilience-patterns-guide)).

```python
def separate_with_recovery(engine: SeparationEngine, audio: Path, out: Path,
                           *, two_stems: bool) -> dict[str, Path]:
    try:
        return engine.separate(audio, out, two_stems=two_stems)
    except torch.cuda.OutOfMemoryError:
        torch.cuda.empty_cache()
        # 最後の砦: CPUエンジンへ退避（遅いが完走する）。冪等なので安全に作り直せる
        log.warning("oom.fallback_to_cpu")
        return DemucsEngine_cpu().separate(audio, out, two_stems=two_stems)
```

(The detailed version that gradually shrinks `segment` is in the [Demucs article](/blog/demucs-v4-music-source-separation-production-guide#本番で必ず詰まる5つの落とし穴と回復性設計).)

### Graceful shutdown (spot GPU countermeasure)

A spot instance is stopped with notice. If you **complete the current job on SIGTERM before exiting,** you don't miss work.

```python
import signal

stop = threading.Event()
# SIGTERM受信時: ループの先頭で抜ける。実行中のジョブは最後までやり切る
signal.signal(signal.SIGTERM, lambda *_: stop.set())
```

Even if **force-killed,** since it's before `ack`, the queue re-delivers and it's safely re-executed with idempotency. **The double preparation of "stop gracefully" and "fine even if killed roughly"** is the condition for using a spot GPU with peace of mind.

---

## Observability: trace a stalled process at a glance

If you can't later trace "why only this song is slow / failed," you can't run production. Output three things structured.

- **Structured logs + correlation ID**: pass `job_id` through all logs. JSON structured logs, not `print`, to be searchable/aggregatable. **Don't emit the audio content (PII).**
- **Metrics**: `queue_depth` (stagnation), `separation_duration_seconds` (processing time, per model), `device` (cuda/cpu), `gpu_utilization`. If stagnation keeps increasing, **scale shortage**; if the CPU rate rises, **the accident of GPU dropout.**
- **Trace**: connect ingress→queue→worker→storage with one trace. Correlation with OpenTelemetry is in the [dedicated article](/blog/opentelemetry-observability-production-tracing-metrics-logs).

```python
import time

def timed_separation(engine, audio, out, *, two_stems, metrics):
    start = time.monotonic()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    try:
        return separate_with_recovery(engine, audio, out, two_stems=two_stems)
    finally:
        # 症状ベースの監視: 処理時間・deviceをラベル付きで記録
        metrics.observe("separation_duration_seconds",
                        time.monotonic() - start,
                        labels={"engine": engine.name, "device": device})
```

---

## Cost efficiency: don't idle the GPU

In source separation, **GPU time is directly money.** There are four moves to carve the unit price.

1. **Model-resident loading**: don't reload the model per job (once at worker startup). Make the cold-start GPU time zero.
2. **Spot GPU**: a fraction of on-demand. Since there's graceful shutdown + idempotency, **you don't lose even if interrupted.**
3. **Auto-scale on queue stagnation**: increase/decrease the worker count targeting `queue_depth`. When idle, **scale in to zero** (don't hold an idle GPU).
4. **Batch processing**: bundle multiple songs into one GPU call (`-j` parallel). Raise throughput and amortize the startup cost.

For the concrete placement of the GPU foundation (ECS/EKS) and cost optimization, see the [ECS vs EKS article](/blog/aws-ecs-vs-eks-startup-decision-framework) and the [FinOps article](/blog/aws-terraform-startup-cost-optimization-finops). Adding **"zero re-separation of the same source" with an idempotent cache** directly erases wasted shots.

---

## Testability: verify the worker without a GPU

The biggest benefit of making `SeparationEngine` a Protocol is **this.** In CI without a GPU or Demucs, you can **unit-test the worker's idempotency and resilience.**

```python
class FakeEngine:
    """テスト用の分離エンジン。GPU不要で決定的な出力を返す。"""
    name = "fake"
    calls = 0
    def separate(self, audio: Path, out_dir: Path, *, two_stems: bool) -> dict[str, Path]:
        type(self).calls += 1
        out_dir.mkdir(parents=True, exist_ok=True)
        names = ["vocals", "no_vocals"] if two_stems else ["vocals", "drums", "bass", "other"]
        return {n: _touch(out_dir / f"{n}.wav") for n in names}

def test_redelivery_does_not_double_process(fake_queue, fake_store, fake_blob):
    """at-least-onceで同じジョブが2回来ても、分離は1回しか走らない。"""
    engine = FakeEngine()
    job_id = enqueue_and_run(engine, fake_queue, fake_store, fake_blob, times=2)
    assert fake_store.is_done(job_id)
    assert FakeEngine.calls == 1   # ← 冪等性の核心。2回流しても1回だけ
```

"Can't test without a GPU" is a defeat of design. Just by **driving the heavy side effects (GPU, I/O) beyond a boundary,** the logic can be tested fast and deterministically. This is the practical benefit of SRP and dependency inversion.

---

## Frequently asked questions (FAQ)

**Q. SQS or Redis for the queue?**
A. **If leaning on AWS, SQS** (DLQ and visibility timeout are standard). **If starting light, Redis** (RQ/Celery). Since it's abstracted with a Protocol, it's **swappable later.** The Celery/Redis implementation is in the [dedicated article](/blog/celery-redis-production-async-task-queue-guide).

**Q. poll or webhook for result notification?**
A. **Supporting both is ideal.** An internal batch is fine with poll (`GET /status`), and external integration with webhook (signature verification mandatory). For webhooks, the signature-verification pattern of the [Stripe article](/blog/stripe-payments-production-guide-webhooks-idempotency-subscriptions) can be reused.

**Q. With "ack after success," what if it crashes after saving but before ack?**
A. It's re-delivered, but the `is_done` check skips it, so **no double separation.** This is the standard for building exactly-once on top of at-least-once.

**Q. Does the GPU need to be always on?**
A. No. **Auto-scale on queue stagnation, and zero when idle.** Since the cold start takes the model-load time (a few dozen seconds), there's also the judgment to keep a minimum of 1 unit warm depending on the SLA.

**Q. Can this design be used for things other than source separation?**
A. **It can be used as-is.** Transcription, video transcoding, image/video generation — any processing that's **heavy, you want idempotent, and uses a GPU** all rides on this form. It's a general-purpose async-processing foundation.

---

## Summary: what separates "works" from "earns in production" is design

The key to making source separation a production service isn't the model's quality but **the architecture.**

1. **Separate accept and process** and put a queue between them (eradicate timeout, double execution).
2. With **idempotency** (sha256 key + skip the completed) and **at-least-once + ack after success,** structurally stop double separation.
3. With **resilience** (OOM degradation, nack/DLQ, graceful shutdown), make failure the normal path.
4. Weave **observability, cost, and testability** into the structure from the start, not as an afterthought.

Only after doing all this does it become not a demo but a service that "**doesn't crash no matter how many thousands come.**" And — **this design judgment is exactly where the most value comes out in contracting.** Anyone can make a PoC that just connects models, but a **production foundation that doesn't melt the GPU bill, doesn't double-charge, and is fine even on spot** has, as its quality, the number of times you've been burned.

> I implemented this article's architecture in the GPU-processing layer of an **AI-video-localization platform I actually run in production.** Not limited to source separation, if you're considering the productionization, cost optimization, or reliability design of heavy AI processing, please see my [track record](/case-studies/ai-video-localization-lipsync) and consult me. With **one person × generative AI,** I build end-to-end from PoC to production operation.

---

## Sources / related resources

- Individual tools: [Demucs complete guide](/blog/demucs-v4-music-source-separation-production-guide) / [UVR5・MDX-Net complete guide](/blog/uvr5-mdx-net-vocal-separation-production-guide) / [tool selection](/blog/music-source-separation-tool-selection-demucs-uvr-spleeter)
- Async, reliability: [SQS/Lambda idempotent async processing](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide) / [retry, backoff, circuit breaker](/blog/retry-backoff-circuit-breaker-resilience-patterns-guide) / [Transactional Outbox](/blog/transactional-outbox-pattern-reliable-event-publishing-guide)
- Foundation, observation: [FastAPI production design](/blog/fastapi-production-async-pydantic-observability-guide) / [OpenTelemetry observability](/blog/opentelemetry-observability-production-tracing-metrics-logs) / [ECS vs EKS](/blog/aws-ecs-vs-eks-startup-decision-framework)

※ This article's code shows the design's skeleton. When implementing, confirm the latest specs of each library/cloud in the primary sources.
