この記事のゴール
UVR5/MDX-Net や Demucs で「1ファイルを手元で分離する」のは簡単です。問題は——毎日数千ファイルを、落とさず・安く・冪等に処理する段階。ここで多くのPoCが「手動スクリプトの limit」にぶつかります。
本稿は、音源分離を AWS で本番スケールさせるキュー駆動バッチ基盤を、設計思想と動くコードで示します。読み終えたとき、あなたは次を組めます。
- S3イベント → SQS → GPUワーカー → S3 の疎結合パイプラインを設計できる。
- 分単位の長時間ジョブを 可視性タイムアウトのハートビートで安全に処理できる。
- Spot 中断・毒メッセージ・二重配信に耐える、冪等で回復性のある GPU ワーカーを実装できる。
📐 この記事の立ち位置:音源分離を本番サービス化する汎用パターン(型安全なAPI ingress・ジョブキュー・OOM段階縮退・テスト容易性などプラットフォーム非依存の設計)はGPUワーカー基盤の記事で扱っています。本稿はそれを AWS で具体実装する「プラットフォーム実装編」——SQSの可視性タイムアウトとメッセージ上限、AWS Batch/ECS でのGPU要求、S3イベント駆動、Spot、Terraform、g4dnのコストといった AWS固有の勘所に絞ります。汎用設計はあちらを、AWS実装は本稿を参照してください。
筆者について(信頼性の開示):私は「音声分離 → 文字起こし → 翻訳 → 多言語吹き替え → 口元同期」を全自動化する AI動画ローカライズ基盤を単独で設計・実装し、本番運用しています。その第1段の音源分離を、まさに本稿のキュー駆動 GPU バッチで回しています。本稿の「ハートビート」「Spot グレースフル終了」「冪等キャッシュ」は、デモ知識ではなく本番でジョブを落とさないために実装した設計そのものです。案件の概要は実績に、決済基盤で培った冪等性の原則は信頼性クラスタの記事にまとめています。
30秒のまとめ(結論を先に)
| 論点 | 結論 |
|---|---|
| 全体構成 | S3(入力)→ S3イベント → SQS → GPUワーカー → S3(出力)。疎結合・リトライ可能・冪等 |
| なぜキュー駆動 | スパイクを吸収し、ワーカーをキュー深さでオートスケール。失敗はリトライ、毒は DLQ へ |
| 冪等性 | SQS は少なくとも1回配信。ワーカーは S3 出力キーの存在チェックで二重処理を防ぐ |
| 長時間ジョブ | 可視性タイムアウト(既定30秒・最大12時間)を ChangeMessageVisibility で定期延長(ハートビート) |
| GPU基盤 | キュー駆動バッチは AWS Batch(GPU=resourceRequirements type=GPU、Spot・リトライ内蔵)が本命。常駐型は ECS |
| GPU種別 | g4dn(NVIDIA T4 16GB)= ML推論で最安。品質/長尺で VRAM が要れば g5(A10G 24GB)/g6(L4 24GB) |
| コスト | Spot + オートスケール + 冪等キャッシュ。Spot 中断は SIGTERM で受けて安全に手放す |
| 大きい入力 | SQS メッセージ上限は 1 MiB(旧256 KB)。音声本体は必ず S3 に置きポインタを渡す |
| 毒メッセージ | maxReceiveCount 超で DLQ へ隔離し、本流を詰まらせない |
アーキテクチャ:疎結合のキュー駆動パイプライン
┌─────────────┐ ObjectCreated ┌──────────────┐
upload → │ S3 入力bucket │ ───イベント通知──→ │ SQS キュー │ ──┐
└─────────────┘ └──────────────┘ │ long poll
│ 失敗N回 │
▼ ▼
┌──────────┐ ┌──────────────────┐
│ DLQ │ │ GPUワーカー群 │
└──────────┘ │ (AWS Batch / ECS) │
│ 分離 + 冪等チェック │
└──────────────────┘
│ stems
▼
┌────────────────┐
│ S3 出力bucket │
└────────────────┘
設計の核心は 「各コンポーネントが互いを知らない」 こと。アップロード側はワーカーの存在を知らず、ワーカーはスケールしても整合性を壊しません。S3 が真実源、SQS が緩衝材、ワーカーがべき等な変換器——この役割分担(SRP)が、スパイクにも障害にも耐える基盤を作ります。
⚠️ 無限ループに注意:S3 イベントで起動したワーカーが同じバケットに書き戻すと、通知が再帰して暴走します。入力と出力でバケット(または prefix)を必ず分けること。S3 通知は SNS / SQS / Lambda / EventBridge に送れますが、SQS FIFO は直接の通知先にできない点も注意(必要なら EventBridge 経由)。
なぜ「少なくとも1回」を前提に設計するのか
SQS は **at-least-once delivery(少なくとも1回配信)**です。AWS 公式も明記しています。
due to the at-least-once delivery model of Amazon SQS, there's no absolute guarantee that a message won't be delivered more than once during the visibility timeout period.
つまり 同じ音声が2回処理されることが「正常系」として起こり得る。これを握りつぶすのではなく、冪等性で吸収します。最も堅いのは 「出力 S3 キーが既に存在するなら、処理せず成功扱いにする」——分離は重い GPU 処理なので、二重実行の回避はそのままコスト削減になります。冪等性の一般原則は非同期処理の冪等設計で詳説しています。
# 冪等キー:音声の内容ではなく"入力S3キー + モデル"で出力先を決定論的に決める
def output_prefix(input_key: str, model: str) -> str:
"""同じ入力×同じモデルなら必ず同じ出力prefix。存在すれば再処理しない。"""
safe_model = model.replace("/", "_")
return f"stems/{safe_model}/{input_key}"
長時間ジョブの肝:可視性タイムアウトのハートビート
音源分離は 1曲で数十秒〜数分かかります。SQS の可視性タイムアウト(既定30秒)を超えて処理すると、メッセージが他のワーカーに再配信され、二重処理になります。
対策は ハートビート——処理中に ChangeMessageVisibility で可視性を定期的に延長し続けます。AWS 公式の推奨でもあります。
Implement a heartbeat mechanism to periodically extend the visibility timeout, ensuring the message remains invisible until processing is complete.
ただし 可視性タイムアウトには「最初の受信から最大12時間」という上限があります(延長してもこの上限はリセットされない)。これを超える超長尺は、Step Functions で分割するのが公式の指針です。
# heartbeat.py — 処理中だけ可視性を延長し続けるコンテキストマネージャ
import threading
from contextlib import contextmanager
@contextmanager
def visibility_heartbeat(sqs, queue_url: str, receipt_handle: str,
*, extend_to: int = 300, interval: int = 120):
"""interval秒ごとに可視性をextend_to秒へ延長。長時間ジョブの二重配信を防ぐ。"""
stop = threading.Event()
def beat() -> None:
while not stop.wait(interval): # interval待つ。stopが立てば即終了
try:
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=extend_to, # 最初の受信から最大12hの制約に注意
)
except Exception: # 延長失敗は致命的でない。次の周期で再試行
pass
t = threading.Thread(target=beat, daemon=True)
t.start()
try:
yield
finally:
stop.set() # 処理完了で確実に停止(リーク防止)
t.join(timeout=1)
GPU 基盤の選択:AWS Batch か ECS か
GPU ワーカーの置き場は主に2つ。キュー駆動の単発バッチ推論なら AWS Batch が purpose-built です。
| 観点 | AWS Batch | ECS(常駐サービス) |
|---|---|---|
| 位置づけ | バッチ計算に特化(キュー+計算環境を自動スケール) | コンテナの常駐運用 |
| GPU 指定 | resourceRequirements の type=GPU | task definition の resourceRequirements |
| Spot 統合 | 組込み(Spot/On-Demand を混在可) | 自前で容量プロバイダ設定 |
| リトライ | ジョブ定義の attempts(1〜10)内蔵 | アプリ/サービス側で実装 |
| 向く用途 | キュー駆動の単発ジョブ大量処理 | 低レイテンシ常駐・きめ細かい制御 |
AWS Batch の GPU 要求はジョブ定義にこう書きます(公式の形)。
{
"resourceRequirements": [
{ "type": "GPU", "value": "1" },
{ "type": "VCPU", "value": "4" },
{ "type": "MEMORY", "value": "16384" }
]
}
💡 GPU 種別の選び方:AWS 公式いわく g4dn(NVIDIA T4・16GB)は「ML推論で最も低コストな GPU インスタンス」。MDX-Net の大量バッチはこれで十分。RoFormer など VRAM を食うモデルや長尺は g5(A10G・24GB)/ g6(L4・24GB)/ g6e(L40S・48GB) に上げます。ECS で GPU を使う場合は GPU 最適化 AMI と
ECS_ENABLE_GPU_SUPPORT=trueが必要です。
世界最高峰のワーカー実装(boto3)
設計要件を1つのワーカーに集約します——①モデル常駐、②S3キー冪等性、③可視性ハートビート、④Spot中断のグレースフル終了、⑤毒メッセージはDLQ任せ、⑥構造化ログ。
# worker.py — 本番GPU音源分離ワーカー(SQS駆動・冪等・回復性)
from __future__ import annotations
import json
import logging
import os
import signal
import sys
import tempfile
from pathlib import Path
from urllib.parse import unquote_plus
import boto3
from audio_separator.separator import Separator
from heartbeat import visibility_heartbeat # 前掲
# --- 構造化ログ(相関IDを必ず通す。音声内容=PIIは出さない)---
logging.basicConfig(level=logging.INFO, format='%(message)s')
log = logging.getLogger("worker")
def emit(event: str, **fields) -> None:
log.info(json.dumps({"event": event, **fields}, ensure_ascii=False))
QUEUE_URL = os.environ["QUEUE_URL"]
OUT_BUCKET = os.environ["OUTPUT_BUCKET"]
MODEL = os.environ.get("SEPARATION_MODEL", "UVR-MDX-NET-Inst_HQ_3.onnx")
MODEL_CACHE = os.environ.get("MODEL_CACHE_DIR", "/models") # イメージに焼き込み済み推奨
sqs = boto3.client("sqs")
s3 = boto3.client("s3")
_shutdown = False
def _on_sigterm(*_): # Spot中断(2分前通知)/ECS停止で届く
global _shutdown
_shutdown = True
emit("shutdown_requested") # 新規受信を止め、処理中ジョブを安全に終える
signal.signal(signal.SIGTERM, _on_sigterm)
# モデルは一度だけVRAMへ。リクエスト間で再利用(ロードは高コスト)
_separator: Separator | None = None
def get_separator() -> Separator:
global _separator
if _separator is None:
_separator = Separator(output_format="flac", model_file_dir=MODEL_CACHE)
_separator.load_model(model_filename=MODEL)
emit("model_loaded", model=MODEL)
return _separator
def _already_done(prefix: str) -> bool:
"""出力S3キーが存在するなら処理済み=冪等にスキップ(二重GPU実行を防ぐ)。"""
resp = s3.list_objects_v2(Bucket=OUT_BUCKET, Prefix=prefix, MaxKeys=1)
return resp.get("KeyCount", 0) > 0
def process_message(msg: dict) -> None:
body = json.loads(msg["Body"])
rec = body["Records"][0]["s3"] # S3イベント通知の形
in_bucket = rec["bucket"]["name"]
in_key = unquote_plus(rec["object"]["key"]) # S3通知のkeyはURLエンコード済(空白→'+'・特殊文字→%XX)
prefix = f"stems/{MODEL.replace('/', '_')}/{in_key}" # 決定論的な出力先
cid = msg["MessageId"]
if _already_done(prefix):
emit("skip_idempotent", cid=cid, key=in_key)
return # 既に成果物あり=成功扱い
with visibility_heartbeat(sqs, QUEUE_URL, msg["ReceiptHandle"]):
with tempfile.TemporaryDirectory() as tmp:
src = Path(tmp) / Path(in_key).name
s3.download_file(in_bucket, in_key, str(src))
sep = get_separator()
sep.output_dir = tmp
stems = sep.separate(str(src)) # GPU処理(ハートビートが守る)
for stem in stems:
key = f"{prefix}/{Path(stem).name}"
s3.upload_file(stem, OUT_BUCKET, key)
emit("separated", cid=cid, key=in_key, stems=len(stems))
def main() -> None:
get_separator() # 起動時ウォームアップ
while not _shutdown:
resp = sqs.receive_message(
QueueUrl=QUEUE_URL, MaxNumberOfMessages=1,
WaitTimeSeconds=20, # ロングポーリング(空ポーリング課金を削減)
)
for msg in resp.get("Messages", []):
try:
process_message(msg)
sqs.delete_message(QueueUrl=QUEUE_URL,
ReceiptHandle=msg["ReceiptHandle"]) # 成功時のみ削除
except Exception:
# 失敗は削除しない → 可視性切れで再配信 → maxReceiveCount超でDLQへ
emit("process_failed", cid=msg.get("MessageId"))
log.exception("processing error")
emit("drained_and_exit")
sys.exit(0)
if __name__ == "__main__":
main()
このワーカーが本番品質である理由は、失敗の扱いにあります。
- 成功時のみ
delete_message。失敗したメッセージは削除されず、可視性切れで自動再配信される。 - 再配信が
maxReceiveCountを超えると DLQ へ隔離され、本流を詰まらせない(設定は次節)。 - SIGTERM(Spot 中断/ECS 停止)で新規受信を止め、処理中だけ完遂。中断されたジョブも
deleteしていないので別ワーカーが拾い直す——取りこぼしゼロ。
インフラを Terraform で(キュー + DLQ + S3通知)
キューと デッドレターキュー(DLQ)、再ドライブ方針、S3 → SQS 通知を宣言的に定義します。
# main.tf — SQS + DLQ + S3イベント通知
resource "aws_sqs_queue" "dlq" {
name = "separation-dlq"
message_retention_seconds = 1209600 # 14日。原因調査の猶予を確保
}
resource "aws_sqs_queue" "jobs" {
name = "separation-jobs"
visibility_timeout_seconds = 300 # ハートビート前提の初期値
receive_wait_time_seconds = 20 # ロングポーリング既定
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 5 # 5回失敗で毒メッセージをDLQへ
})
}
# S3 → SQS(ObjectCreatedのみ。入力prefixに限定して再帰を防ぐ)
resource "aws_s3_bucket_notification" "ingest" {
bucket = aws_s3_bucket.input.id
queue {
queue_arn = aws_sqs_queue.jobs.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "uploads/"
}
}
📦 大きい音声の扱い:SQS メッセージ上限は 1 MiB(2025年に 256 KB から拡張)。音声ファイルは小さくないので、本体は必ず S3 に置き、メッセージには S3 のキー(ポインタ)だけを載せます。上の S3 通知方式なら、メッセージはイベントメタデータだけなので自然にこの形になります。
コスト最適化:Spot × オートスケール × 冪等キャッシュ
GPU は高価です。本番の単価は次の三段で刻みます。
- Spot インスタンス:GPU バッチは中断耐性を作り込めば Spot が効く。AWS Batch は Spot/On-Demand を混在でき、中断時は再投入される。前掲ワーカーは SIGTERM(中断2分前通知)で安全に手放すので、Spot で失われたジョブも DLQ ではなく再配信で拾い直す。
- キュー深さオートスケール:
ApproximateNumberOfMessagesVisibleを指標に、キューが溜まったらワーカー増、空なら 0 まで縮小。アイドル GPU を持たない。 - 冪等キャッシュ:S3 キー存在チェックで再処理ゼロ。再アップロード・再送・Spot 再投入で同じ曲を二度 GPU に通さない。
💴 コストモデルの考え方:単価 ≈ (1曲の処理秒数 ÷ 3600)× インスタンス時間単価 ÷ Spot 割引率。g4dn(T4)は AWS 公式が「ML推論で最安の GPU」と言う通り、MDX-Net の大量バッチでは第一候補。最新の正確な料金は料金ページで確認してください(本稿は単価の絶対値を断定しません)。
可観測性:止まった処理を一目で追える状態に
大量バッチで最も怖いのは「どのジョブがどこで詰まったか分からない」こと。前掲ワーカーの構造化ログ(emit)に加え、最低限これを揃えます。
- キュー深さ・DLQ 件数を CloudWatch アラームに(DLQ > 0 は即通知=毒メッセージ検知)。
- 処理時間・成功/失敗・冪等スキップ数をメトリクス化し、スループットとコストを可視化。
- ログには **相関ID(MessageId / S3キー)**を必ず通す。音声内容(PII になり得る)は出さない。
可観測性の体系的な作り込みはOpenTelemetry の記事に、リトライ/バックオフ/サーキットブレーカは回復性パターンの記事にまとめています。
テスト容易性:GPU なしで回せる境界を作る
GPU 依存のコードは CI で回しにくい——だからこそ境界を切るのが世界最高峰の設計です。
output_prefix/_already_doneのような純粋ロジックは I/O なしで単体テストできる。- S3 / SQS は moto でモックし、ワーカーの配送・冪等・DLQ 経路をGPUなしで検証。
- 分離本体(
Separator.separate)はインターフェース越しに差し替え、ダミー出力でパイプラインを通す。
# test_idempotency.py — GPU不要。冪等ロジックだけを検証
from worker import output_prefix # 純関数として切り出しておく
def test_output_prefix_is_deterministic():
a = output_prefix("uploads/song.wav", "UVR-MDX-NET-Inst_HQ_3.onnx")
b = output_prefix("uploads/song.wav", "UVR-MDX-NET-Inst_HQ_3.onnx")
assert a == b # 同じ入力×モデルなら必ず同じ出力先=冪等の土台
def test_model_namespaced():
p1 = output_prefix("uploads/x.wav", "model_a")
p2 = output_prefix("uploads/x.wav", "model_b")
assert p1 != p2 # モデルが違えば出力先も分かれる
よくある質問(FAQ)
Q. なぜ Lambda ではなくコンテナ(Batch/ECS)? A. 音源分離は GPU と分単位の実行時間が要る重い処理です。Lambda は GPU 非対応で実行時間にも制約があるため、**GPU コンテナ(AWS Batch / ECS)**が適します。軽い前処理(正規化・キュー投入)だけ Lambda に切り出すのは有効です。
Q. 可視性タイムアウトは最初から長くすればいい? A. 長すぎると失敗時の再配信が遅れ、短すぎると二重処理。だから「短め+ハートビートで延長」が定石です。なお延長しても最初の受信から12時間が上限なので、超長尺は Step Functions で分割します。
Q. Spot 中断でジョブが消えませんか?
A. 消えません。ワーカーは 成功時しか delete_message しないので、中断されたジョブは可視性切れで再配信され、別ワーカーが拾い直します。SIGTERM で処理中だけ完遂すれば、取りこぼしは出ません。
Q. 同じファイルが二重処理されない保証は? A. SQS は少なくとも1回配信なので「絶対に1回」は保証されません。だから出力 S3 キーの存在チェックで冪等化し、二重実行を成功扱いで吸収します。これが GPU コストの無駄打ちも防ぎます。
Q. モデルの初回ダウンロードでコールドスタートが遅いです。
A. モデルをコンテナイメージに焼き込む(ビルド時に1回 DL)か、model_file_dir を永続ボリュームに向けます。/tmp は揮発するので、コンテナ毎の再 DL を避けてください(詳細はトラブルシューティング記事)。
まとめ:手動スクリプトから「落とさない基盤」へ
音源分離を本番スケールさせる本質は、モデルではなく 「少なくとも1回・失敗する・中断される」を前提にした基盤設計にあります。
- 疎結合:S3 → SQS → GPUワーカー → S3。各々が互いを知らない。
- 冪等:出力 S3 キーの存在チェックで二重処理を吸収(=コスト削減)。
- 回復性:可視性ハートビート・成功時のみ削除・DLQ・Spot のグレースフル終了。
- コスト:Spot × キュー深さオートスケール × 冪等キャッシュ。
- 可観測性・テスト容易性:相関ID・DLQ アラーム・GPUなしで回る境界。
私は、この基盤を音声分離を第1段に持つ AI動画ローカライズ基盤で実際に本番運用しています。GPU を使う音声・動画 AI を「PoC で動く」から「本番でコストを抑えつつ落とさない」へ引き上げる設計を、実績とともにご相談ください。一人 × 生成AIで、設計から本番運用まで一気通貫で支援します。
出典・公式リソース
- SQS 可視性タイムアウト:Amazon SQS visibility timeout(既定30秒・最大12時間・ハートビート・at-least-once)
- SQS メッセージ上限:SQS quotas / 1 MiB へ拡張(2025)
- AWS Batch(GPU):Job definition parameters / What is AWS Batch
- ECS GPU:Working with GPUs on Amazon ECS
- GPUインスタンス:Amazon EC2 G4(T4)
- S3 イベント通知:Amazon S3 Event Notifications(送信先・at-least-once・FIFO非対応)
※ AWS の仕様・上限・料金は更新されます。実装前に必ず一次情報を確認してください。Spot 中断挙動や料金の絶対値は本稿では断定せず、最新の公式ドキュメント/料金ページの確認を前提としています。