# Qwen-TTS リアルタイム音声エージェント実装ガイド：WebSocketストリーミング・ブラウザ再生・バージイン（割り込み）まで

> Qwen3-TTS-Flash-Realtime で「話しながら返す」低遅延の音声エージェントを本番実装するガイド。WebSocketの双方向プロトコル（session.created / response.audio.delta / session.finished）、server_commit と commit の使い分け、LLM出力のストリーミング合成、ブラウザでのPCM 24kHzギャップレス再生、バージイン（割り込み）、接続の回復性・first_audio_delay計測まで実コードで解説します。

- 公開日: 2026-06-25
- 著者: 友田 陽大
- タグ: Python, 音声合成, 生成AI, Qwen, リアルタイム, アーキテクチャ設計
- URL: https://tomodahinata.com/blog/qwen-tts-realtime-voice-agent-websocket-streaming-guide

## 要点

- 音声エージェントの体感は「最初の音が出るまでの時間（first_audio_delay）」で決まり、ここを設計の中心に置く
- Qwen-TTS realtime は WebSocket で append_text→commit/finish、音声は response.audio.delta（base64 PCM）で逐次届く
- LLMのトークンを文境界でcommitするか、server_commit でサーバーに区切りを任せるかを要件で選ぶ
- APIキーは絶対にブラウザへ出さず、Next.js等のサーバーがWebSocketを中継し、ブラウザはPCMをギャップレス再生する
- 本番にはバージイン（割り込みで即停止）・接続の自動再接続・タイムアウト・コスト/遅延の可観測性が必須

---

音声エージェントの品質は、賢さよりも**間（ま）**で決まります。どんなに良い回答でも、ユーザーが問いかけてから音が出るまで2秒黙れば「壊れている」と感じられる。逆に、**最初のひと言が0.5秒で返り、ユーザーが話し始めたら即座に黙る**だけで、体験は一気に「対話」になります。

この記事は、Alibaba Cloud（Qwen）の **Qwen3-TTS-Flash-Realtime** で、**低遅延の音声エージェント**を本番実装するためのガイドです。基礎（モデル・音色・料金）は[Qwen-TTS 本番運用ガイド](/blog/qwen-tts-qwen3-tts-flash-production-guide)に譲り、本記事は**「話しながら返す」ストリーミングの設計**に集中します。RAG接客の文脈は[生成AI音声チャットボット](/case-studies/ai-voice-chatbot)で実際に構築しました。

> **この記事のルール**：API仕様は **Alibaba Cloud Model Studio の Qwen-TTS リアルタイム音声合成ドキュメント（2026年6月時点）** に基づきます。メソッド名・イベント名は公式SDKに準拠して記載しますが、SDKは更新されるため本番前に[公式ドキュメント](https://www.alibabacloud.com/help/en/model-studio/qwen-tts-realtime)で最新の型を確認してください。APIキーは環境変数前提（ハードコード厳禁・ブラウザに出さない）です。

---

## 0. 全体像：音声エージェントのループと遅延バジェット

リアルタイム音声エージェントは、3つの段の連鎖です。

```text
ユーザー発話 → ① STT（文字起こし） → ② LLM（応答生成） → ③ TTS（音声合成） → 再生
       ↑__________________________ バージイン（割り込みで停止）__________________________|
```

- ①STT は[Whisper / gpt-4o-transcribe のストリーミング](/blog/openai-whisper-production-guide-selfhost-vs-api)。
- ②LLM の応答ストリーミングは[Vercel AI SDK 本番ガイド](/blog/vercel-ai-sdk-production-llm-apps-streaming-tools-rag)。
- ③**TTS が本記事の主役**。Qwen-TTS realtime が担います。

体感を決めるのは合計レイテンシではなく、**最初の音が出るまでの時間**です。だから設計の鉄則は2つ。

1. **段をパイプライン化する**：LLMの全文生成を待たず、**最初の文ができた瞬間にTTSへ流す**。
2. **first_audio_delay を計測対象にする**：「最初の音までの時間」を本番のSLI（指標）にし、回帰を監視する（第6章）。

> 「合計が速い」より「最初が速い」。ナレーション一括生成（バッチ）と対話（リアルタイム）は別物です。バッチで足りるなら WebSocket を張らない方が単純で安い（KISS）。本記事は**対話**の話です。

---

## 1. realtime API の基礎：WebSocketとイベント

Qwen-TTS realtime は **全二重の WebSocket** で動きます。

- モデル：`qwen3-tts-flash-realtime` / `qwen3-tts-instruct-flash-realtime`
- エンドポイント（国際）：`wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime`（中国本土は `dashscope.aliyuncs.com`）
- 音声は **`response.audio.delta` イベント**で base64 エンコードされた PCM（24kHz/モノ/16bit）として逐次届く

公式SDKは、サーバーイベントを**コールバック**で受け取る形です。最小の実装はこうなります。

```python
import os
import base64
import threading
import dashscope
from dashscope.audio.qwen_tts_realtime import *

class TtsCallback(QwenTtsRealtimeCallback):
    """サーバーから届くイベントを処理する。音声断片はここに流れてくる。"""
    def __init__(self, sink) -> None:
        self._done = threading.Event()
        self._sink = sink  # 受け取ったPCMの行き先（ファイル / 再生 / WS中継）

    def on_open(self) -> None:
        # 接続確立。プレイヤーの初期化などはここで
        pass

    def on_close(self, code, msg) -> None:
        self._sink.close()

    def on_event(self, response: dict) -> None:
        etype = response["type"]
        if etype == "session.created":
            pass  # response["session"]["id"] でセッションIDが取れる
        elif etype == "response.audio.delta":
            pcm = base64.b64decode(response["delta"])  # 24kHz mono 16bit
            self._sink.write(pcm)                       # 即・下流へ（溜めない）
        elif etype == "response.done":
            pass  # 1応答ぶんの合成が完了
        elif etype == "session.finished":
            self._done.set()                            # セッション終了

    def wait(self) -> None:
        self._done.wait()
```

接続とセッション設定：

```python
dashscope.api_key = os.environ["DASHSCOPE_API_KEY"]
dashscope.base_websocket_api_url = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime"

callback = TtsCallback(sink)   # 自分で参照を保持（後で wait() する）
tts = QwenTtsRealtime(
    model="qwen3-tts-flash-realtime",
    callback=callback,
    url="wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime",
)
tts.connect()
tts.update_session(
    voice="Cherry",
    response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
    mode="server_commit",   # 第2章で commit と比較
)
```

テキストの送り方は**モードで変わります**（次章）。基本は `append_text()` で流し込み、`finish()`（または `commit()`）で確定します。

```python
tts.append_text("本日はお問い合わせありがとうございます。")
tts.finish()       # server_commit: これ以上テキストが無いことを通知
callback.wait()    # session.finished まで待つ（保持した参照を使う）
```

---

## 2. `server_commit` と `commit`：区切りを誰が決めるか

realtime には**テキストの区切り方が違う2モード**があり、用途で選びます。

| | `server_commit`（サーバー主導） | `commit`（クライアント主導） |
| --- | --- | --- |
| 区切り | サーバーがテキストを賢く分割 | クライアントが `commit()` で明示 |
| 送り方 | `append_text()` を繰り返し → `finish()` | `append_text()` → `commit()` を発話単位で |
| 向く用途 | 記事・長文ナレーションの連続合成 | **対話**（一発話ごとに区切りたい） |

音声エージェントでLLM出力を流す場合、選択肢は2つです。

- **A. `commit` で文単位に送る**：LLMのトークンを受けつつ**句点・改行で区切って `commit()`**。最初の文を最速で音にできる（first_audio_delay 最小）。制御が効く分コードは増える。
- **B. `server_commit` に丸投げ**：トークンをそのまま `append_text()` し続け、最後に `finish()`。実装は単純だが、区切りはサーバー任せ。

迷ったら**対話はA（commit）、記事読み上げはB（server_commit）**。「最初の音を最速で」が最優先の対話では、自分で文境界を切る価値があります（ETC：あとから区切りロジックを差し替えやすい）。

---

## 3. LLM → TTS パイプライン：最初の文を最速で音にする

肝は「**LLMの全文を待たない**」こと。トークンが流れてきたら、**文の切れ目で即TTSへ**渡します。文分割は単一責務の純関数に切り出します（SRP・テスト容易性）。

```python
import re

_SENTENCE_END = re.compile(r"(?<=[。．！？!?\n])")

def split_complete_sentences(buffer: str) -> tuple[list[str], str]:
    """確定した文のリストと、未確定の残りを返す純関数。
    句点・改行までを「確定文」とし、途中の断片は次回に持ち越す。"""
    parts = _SENTENCE_END.split(buffer)
    if not buffer.endswith(("。", "．", "！", "？", "!", "?", "\n")):
        remainder = parts.pop() if parts else ""
    else:
        remainder = ""
    return [p for p in parts if p.strip()], remainder

def speak_llm_stream(tts: "QwenTtsRealtime", token_stream) -> None:
    """LLMのトークンストリームを、文ができ次第TTSへ流す（commitモード）。"""
    buf = ""
    for token in token_stream:        # ②LLMのストリーミング出力
        buf += token
        sentences, buf = split_complete_sentences(buf)
        for s in sentences:
            tts.append_text(s)
            tts.commit()              # 文ごとに確定 → 最初の音が早く出る
    if buf.strip():                   # 末尾の残り
        tts.append_text(buf)
    tts.finish()
```

> **バックプレッシャに注意**：LLMの生成より再生は遅い（音声は実時間で進む）ので、`append_text` を無制限に呼ぶと内部キューが膨らみます。長文では「再生済み秒数」を見て送出を絞るか、`server_commit` でサーバーに任せます。**音声は実時間という物理に縛られる**のが、テキストストリームとの決定的な違いです。

---

## 4. ブラウザ再生：鍵を出さずにPCMを鳴らす

**ブラウザから直接 DashScope を叩いてはいけません**（APIキー漏洩）。正しい構成は3層です。

```text
ブラウザ  ⇄  あなたのサーバー（Next.js）  ⇄  DashScope WS
（音声再生）   （鍵を保持・WS中継・認可）      （合成）
```

サーバー（Next.js / Node）がブラウザと WebSocket を張り、内部で DashScope の realtime に接続して**音声断片を中継**します。ブラウザは届いた PCM を **Web Audio API でギャップレス再生**します。24kHz/モノ/16bit の PCM をそのまま鳴らすプレイヤーはこう書けます。

```ts
/** 24kHz mono 16bit PCM のチャンクを、隙間なく順に再生するプレイヤー。
 *  AudioBufferSourceNode を時刻指定でスケジュールし、途切れを防ぐ。 */
export class PcmStreamPlayer {
  private ctx: AudioContext;
  private nextStartTime = 0;
  private readonly sources = new Set<AudioBufferSourceNode>();

  constructor(private readonly sampleRate = 24_000) {
    this.ctx = new AudioContext({ sampleRate });
  }

  /** Int16 PCM（リトルエンディアン）を1チャンク受け取り、末尾に連結再生する。 */
  enqueue(pcm: ArrayBuffer): void {
    const int16 = new Int16Array(pcm);
    const float32 = new Float32Array(int16.length);
    for (let i = 0; i < int16.length; i++) float32[i] = int16[i] / 0x8000; // -1..1へ正規化

    const buffer = this.ctx.createBuffer(1, float32.length, this.sampleRate);
    buffer.getChannelData(0).set(float32);

    const src = this.ctx.createBufferSource();
    src.buffer = buffer;
    src.connect(this.ctx.destination);

    const startAt = Math.max(this.ctx.currentTime, this.nextStartTime);
    src.start(startAt);                       // 前のチャンクの直後にスケジュール
    this.nextStartTime = startAt + buffer.duration;

    this.sources.add(src);
    src.onended = () => this.sources.delete(src);
  }

  /** バージイン（割り込み）：再生中の音を即座に止め、キューを捨てる。 */
  stop(): void {
    for (const s of this.sources) { try { s.stop(); } catch {} }
    this.sources.clear();
    this.nextStartTime = this.ctx.currentTime;
  }
}
```

ブラウザ側の受信ループ（サーバーが中継した PCM を鳴らす）：

```ts
const player = new PcmStreamPlayer();
const ws = new WebSocket(`wss://${location.host}/api/voice`); // 自サーバーのWS
ws.binaryType = "arraybuffer";

ws.onmessage = (e) => {
  if (e.data instanceof ArrayBuffer) player.enqueue(e.data); // 音声PCM
  // 制御メッセージ（テキスト）は別途JSONで受ける設計にする
};
```

> **`AudioContext` はユーザー操作で起こす**：多くのブラウザは自動再生をブロックします。「会話を始める」ボタンの `onClick` で `new AudioContext()` / `ctx.resume()` を呼び、ユーザー起点で音声を有効化してください（これは a11y とブラウザポリシーの両方の要請）。

> **中継WS（`/api/voice`）は必ず認可する**：このエンドポイントは DashScope の API キーを握る**課金経路**です。転送する前にハンドシェイク段階でセッション Cookie か短命トークンを検証してください。未認証だと第三者にあなたの課金で TTS を使われます（鍵をブラウザに出さない代わりに、中継の認可で守る）。

---

## 5. バージイン（割り込み）：話しかけられたら即黙る

対話を「対話」たらしめる最重要機能が**バージイン**です。ユーザーがエージェントの発話中に話し始めたら、**①生成を止め ②再生中の音を捨てる**を即座に行います。

1. **VAD（音声区間検出）でユーザー発話を検知**したら、
2. サーバー側で進行中の TTS セッションを破棄し（以降の `response.audio.delta` を中継しない）、
3. ブラウザの `player.stop()` で再生バッファをフラッシュ。

```ts
// ブラウザ：ユーザーが話し始めた合図を受けたら即停止
function onUserStartedSpeaking() {
  player.stop();                         // 再生中の音を捨てる（残響を残さない）
  ws.send(JSON.stringify({ type: "barge_in" })); // サーバーに生成中断を依頼
}
```

肝は「**停止の責務を再生側に持たせる**」こと。サーバーの中断が一瞬遅れても、ブラウザが先に黙れば体感は途切れません。**遅延の世界では、ユーザーに最も近い層で止めるのが正解**です。

---

## 6. 回復性・冪等・可観測性

WebSocket は切れます。realtime は課金もします。本番には次を入れます。

- **first_audio_delay を計測**：SDK の `get_first_audio_delay()` で「最初の音までの時間」を取得し、構造化ログ／メトリクスに出す。公式は「初回はWS接続確立を含むため初動が遅い」と明記しているので、**接続をプールしてウォームに保つ**のが定石。
- **自動再接続（指数バックオフ）**：`on_close` を検知したら再接続。ただし**同じ発話を二重に再生しない**よう、発話には一意IDを振り、再生済みIDはスキップ（冪等）。
- **タイムアウトとフォールバック**：first_audio_delay が閾値（例：1.5秒）を超えたら、**「少々お待ちください」の定型音声**に切り替える、あるいはバッチTTSへフォールバック（回復性）。
- **コストの可視化**：文字数＝課金量。セッションごとに「合成文字数・推定コスト・first_audio_delay・中断回数」を記録し、[OpenTelemetry](/blog/opentelemetry-observability-production-tracing-metrics-logs)で相関させる。
- **PIIを残さない**：会話本文（個人情報を含み得る）はログに残さず、メタデータのみ。

```python
# セッション終了時に計測値を構造化ログへ（本文は出さない）
log.info("tts_session", extra={
    "session_id": tts.get_session_id(),
    "first_audio_delay_ms": tts.get_first_audio_delay(),
    "chars": total_chars,
    "barge_in_count": barge_ins,
})
```

---

## 7. まとめ：低遅延音声エージェントの設計チェックリスト

- **指標は first_audio_delay**：合計ではなく「最初の音」を最適化し、SLIにする。
- **パイプライン化**：LLMの最初の文ができたら即TTS（`commit` で文単位送出）。
- **3層構成**：ブラウザ ⇄ 自サーバー ⇄ DashScope。鍵はサーバー、再生はブラウザ。
- **ギャップレス再生**：PCM 24kHz を `AudioBufferSourceNode` で時刻スケジュール。
- **バージイン**：ユーザーに最も近い再生層で即停止＋生成中断。
- **回復性**：再接続＋発話ID冪等、タイムアウト時の定型音声フォールバック。
- **可観測性**：first_audio_delay・文字数・コスト・中断回数を相関ログに。本文PIIは出さない。

音声エージェントは「賢さ」より「間」の設計です。私は[RAG搭載の音声接客システム](/case-studies/ai-voice-chatbot)で、応答の速さと割り込みの自然さを作り込みました。**「自社の業務を、待たせない音声対話にする」——その設計から実装・運用まで一気通貫で伴走できます。** 要件整理からお気軽にご相談ください。

---

### 参考（公式ドキュメント）

- [Qwen-TTS リアルタイム音声合成（Alibaba Cloud Model Studio）](https://www.alibabacloud.com/help/en/model-studio/qwen-tts-realtime) — WebSocketプロトコル・SDK・イベント・モード
- [Qwen-TTS 音声合成（Model Studio）](https://www.alibabacloud.com/help/en/model-studio/qwen-tts) — モデル・音色・パラメータの基礎
- [MDN: Web Audio API](https://developer.mozilla.org/en-US/docs/Web/API/Web_Audio_API) — `AudioBufferSourceNode` によるスケジューリング再生
