メインコンテンツへスキップ
友田 陽大
音声・ボイスAI
Python
音声合成
生成AI
Qwen
リアルタイム
アーキテクチャ設計

Qwen-TTS リアルタイム音声エージェント実装ガイド:WebSocketストリーミング・ブラウザ再生・バージイン(割り込み)まで

Qwen3-TTS-Flash-Realtime で「話しながら返す」低遅延の音声エージェントを本番実装するガイド。WebSocketの双方向プロトコル(session.created / response.audio.delta / session.finished)、server_commit と commit の使い分け、LLM出力のストリーミング合成、ブラウザでのPCM 24kHzギャップレス再生、バージイン(割り込み)、接続の回復性・first_audio_delay計測まで実コードで解説します。

公開日
読了時間
11分
著者
友田 陽大
シェア

音声エージェントの品質は、賢さよりも**間(ま)**で決まります。どんなに良い回答でも、ユーザーが問いかけてから音が出るまで2秒黙れば「壊れている」と感じられる。逆に、最初のひと言が0.5秒で返り、ユーザーが話し始めたら即座に黙るだけで、体験は一気に「対話」になります。

この記事は、Alibaba Cloud(Qwen)の Qwen3-TTS-Flash-Realtime で、低遅延の音声エージェントを本番実装するためのガイドです。基礎(モデル・音色・料金)はQwen-TTS 本番運用ガイドに譲り、本記事は**「話しながら返す」ストリーミングの設計**に集中します。RAG接客の文脈は生成AI音声チャットボットで実際に構築しました。

この記事のルール:API仕様は Alibaba Cloud Model Studio の Qwen-TTS リアルタイム音声合成ドキュメント(2026年6月時点) に基づきます。メソッド名・イベント名は公式SDKに準拠して記載しますが、SDKは更新されるため本番前に公式ドキュメントで最新の型を確認してください。APIキーは環境変数前提(ハードコード厳禁・ブラウザに出さない)です。


0. 全体像:音声エージェントのループと遅延バジェット

リアルタイム音声エージェントは、3つの段の連鎖です。

ユーザー発話 → ① STT(文字起こし) → ② LLM(応答生成) → ③ TTS(音声合成) → 再生
       ↑__________________________ バージイン(割り込みで停止)__________________________|

体感を決めるのは合計レイテンシではなく、最初の音が出るまでの時間です。だから設計の鉄則は2つ。

  1. 段をパイプライン化する:LLMの全文生成を待たず、最初の文ができた瞬間にTTSへ流す
  2. first_audio_delay を計測対象にする:「最初の音までの時間」を本番のSLI(指標)にし、回帰を監視する(第6章)。

「合計が速い」より「最初が速い」。ナレーション一括生成(バッチ)と対話(リアルタイム)は別物です。バッチで足りるなら WebSocket を張らない方が単純で安い(KISS)。本記事は対話の話です。


1. realtime API の基礎:WebSocketとイベント

Qwen-TTS realtime は 全二重の WebSocket で動きます。

  • モデル:qwen3-tts-flash-realtime / qwen3-tts-instruct-flash-realtime
  • エンドポイント(国際):wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime(中国本土は dashscope.aliyuncs.com
  • 音声は response.audio.delta イベントで base64 エンコードされた PCM(24kHz/モノ/16bit)として逐次届く

公式SDKは、サーバーイベントをコールバックで受け取る形です。最小の実装はこうなります。

import os
import base64
import threading
import dashscope
from dashscope.audio.qwen_tts_realtime import *

class TtsCallback(QwenTtsRealtimeCallback):
    """サーバーから届くイベントを処理する。音声断片はここに流れてくる。"""
    def __init__(self, sink) -> None:
        self._done = threading.Event()
        self._sink = sink  # 受け取ったPCMの行き先(ファイル / 再生 / WS中継)

    def on_open(self) -> None:
        # 接続確立。プレイヤーの初期化などはここで
        pass

    def on_close(self, code, msg) -> None:
        self._sink.close()

    def on_event(self, response: dict) -> None:
        etype = response["type"]
        if etype == "session.created":
            pass  # response["session"]["id"] でセッションIDが取れる
        elif etype == "response.audio.delta":
            pcm = base64.b64decode(response["delta"])  # 24kHz mono 16bit
            self._sink.write(pcm)                       # 即・下流へ(溜めない)
        elif etype == "response.done":
            pass  # 1応答ぶんの合成が完了
        elif etype == "session.finished":
            self._done.set()                            # セッション終了

    def wait(self) -> None:
        self._done.wait()

接続とセッション設定:

dashscope.api_key = os.environ["DASHSCOPE_API_KEY"]
dashscope.base_websocket_api_url = "wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime"

callback = TtsCallback(sink)   # 自分で参照を保持(後で wait() する)
tts = QwenTtsRealtime(
    model="qwen3-tts-flash-realtime",
    callback=callback,
    url="wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime",
)
tts.connect()
tts.update_session(
    voice="Cherry",
    response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
    mode="server_commit",   # 第2章で commit と比較
)

テキストの送り方はモードで変わります(次章)。基本は append_text() で流し込み、finish()(または commit())で確定します。

tts.append_text("本日はお問い合わせありがとうございます。")
tts.finish()       # server_commit: これ以上テキストが無いことを通知
callback.wait()    # session.finished まで待つ(保持した参照を使う)

2. server_commitcommit:区切りを誰が決めるか

realtime にはテキストの区切り方が違う2モードがあり、用途で選びます。

server_commit(サーバー主導)commit(クライアント主導)
区切りサーバーがテキストを賢く分割クライアントが commit() で明示
送り方append_text() を繰り返し → finish()append_text()commit() を発話単位で
向く用途記事・長文ナレーションの連続合成対話(一発話ごとに区切りたい)

音声エージェントでLLM出力を流す場合、選択肢は2つです。

  • A. commit で文単位に送る:LLMのトークンを受けつつ句点・改行で区切って commit()。最初の文を最速で音にできる(first_audio_delay 最小)。制御が効く分コードは増える。
  • B. server_commit に丸投げ:トークンをそのまま append_text() し続け、最後に finish()。実装は単純だが、区切りはサーバー任せ。

迷ったら対話はA(commit)、記事読み上げはB(server_commit)。「最初の音を最速で」が最優先の対話では、自分で文境界を切る価値があります(ETC:あとから区切りロジックを差し替えやすい)。


3. LLM → TTS パイプライン:最初の文を最速で音にする

肝は「LLMの全文を待たない」こと。トークンが流れてきたら、文の切れ目で即TTSへ渡します。文分割は単一責務の純関数に切り出します(SRP・テスト容易性)。

import re

_SENTENCE_END = re.compile(r"(?<=[。.!?!?\n])")

def split_complete_sentences(buffer: str) -> tuple[list[str], str]:
    """確定した文のリストと、未確定の残りを返す純関数。
    句点・改行までを「確定文」とし、途中の断片は次回に持ち越す。"""
    parts = _SENTENCE_END.split(buffer)
    if not buffer.endswith(("。", ".", "!", "?", "!", "?", "\n")):
        remainder = parts.pop() if parts else ""
    else:
        remainder = ""
    return [p for p in parts if p.strip()], remainder

def speak_llm_stream(tts: "QwenTtsRealtime", token_stream) -> None:
    """LLMのトークンストリームを、文ができ次第TTSへ流す(commitモード)。"""
    buf = ""
    for token in token_stream:        # ②LLMのストリーミング出力
        buf += token
        sentences, buf = split_complete_sentences(buf)
        for s in sentences:
            tts.append_text(s)
            tts.commit()              # 文ごとに確定 → 最初の音が早く出る
    if buf.strip():                   # 末尾の残り
        tts.append_text(buf)
    tts.finish()

バックプレッシャに注意:LLMの生成より再生は遅い(音声は実時間で進む)ので、append_text を無制限に呼ぶと内部キューが膨らみます。長文では「再生済み秒数」を見て送出を絞るか、server_commit でサーバーに任せます。音声は実時間という物理に縛られるのが、テキストストリームとの決定的な違いです。


4. ブラウザ再生:鍵を出さずにPCMを鳴らす

ブラウザから直接 DashScope を叩いてはいけません(APIキー漏洩)。正しい構成は3層です。

ブラウザ  ⇄  あなたのサーバー(Next.js)  ⇄  DashScope WS
(音声再生)   (鍵を保持・WS中継・認可)      (合成)

サーバー(Next.js / Node)がブラウザと WebSocket を張り、内部で DashScope の realtime に接続して音声断片を中継します。ブラウザは届いた PCM を Web Audio API でギャップレス再生します。24kHz/モノ/16bit の PCM をそのまま鳴らすプレイヤーはこう書けます。

/** 24kHz mono 16bit PCM のチャンクを、隙間なく順に再生するプレイヤー。
 *  AudioBufferSourceNode を時刻指定でスケジュールし、途切れを防ぐ。 */
export class PcmStreamPlayer {
  private ctx: AudioContext;
  private nextStartTime = 0;
  private readonly sources = new Set<AudioBufferSourceNode>();

  constructor(private readonly sampleRate = 24_000) {
    this.ctx = new AudioContext({ sampleRate });
  }

  /** Int16 PCM(リトルエンディアン)を1チャンク受け取り、末尾に連結再生する。 */
  enqueue(pcm: ArrayBuffer): void {
    const int16 = new Int16Array(pcm);
    const float32 = new Float32Array(int16.length);
    for (let i = 0; i < int16.length; i++) float32[i] = int16[i] / 0x8000; // -1..1へ正規化

    const buffer = this.ctx.createBuffer(1, float32.length, this.sampleRate);
    buffer.getChannelData(0).set(float32);

    const src = this.ctx.createBufferSource();
    src.buffer = buffer;
    src.connect(this.ctx.destination);

    const startAt = Math.max(this.ctx.currentTime, this.nextStartTime);
    src.start(startAt);                       // 前のチャンクの直後にスケジュール
    this.nextStartTime = startAt + buffer.duration;

    this.sources.add(src);
    src.onended = () => this.sources.delete(src);
  }

  /** バージイン(割り込み):再生中の音を即座に止め、キューを捨てる。 */
  stop(): void {
    for (const s of this.sources) { try { s.stop(); } catch {} }
    this.sources.clear();
    this.nextStartTime = this.ctx.currentTime;
  }
}

ブラウザ側の受信ループ(サーバーが中継した PCM を鳴らす):

const player = new PcmStreamPlayer();
const ws = new WebSocket(`wss://${location.host}/api/voice`); // 自サーバーのWS
ws.binaryType = "arraybuffer";

ws.onmessage = (e) => {
  if (e.data instanceof ArrayBuffer) player.enqueue(e.data); // 音声PCM
  // 制御メッセージ(テキスト)は別途JSONで受ける設計にする
};

AudioContext はユーザー操作で起こす:多くのブラウザは自動再生をブロックします。「会話を始める」ボタンの onClicknew AudioContext() / ctx.resume() を呼び、ユーザー起点で音声を有効化してください(これは a11y とブラウザポリシーの両方の要請)。

中継WS(/api/voice)は必ず認可する:このエンドポイントは DashScope の API キーを握る課金経路です。転送する前にハンドシェイク段階でセッション Cookie か短命トークンを検証してください。未認証だと第三者にあなたの課金で TTS を使われます(鍵をブラウザに出さない代わりに、中継の認可で守る)。


5. バージイン(割り込み):話しかけられたら即黙る

対話を「対話」たらしめる最重要機能がバージインです。ユーザーがエージェントの発話中に話し始めたら、①生成を止め ②再生中の音を捨てるを即座に行います。

  1. VAD(音声区間検出)でユーザー発話を検知したら、
  2. サーバー側で進行中の TTS セッションを破棄し(以降の response.audio.delta を中継しない)、
  3. ブラウザの player.stop() で再生バッファをフラッシュ。
// ブラウザ:ユーザーが話し始めた合図を受けたら即停止
function onUserStartedSpeaking() {
  player.stop();                         // 再生中の音を捨てる(残響を残さない)
  ws.send(JSON.stringify({ type: "barge_in" })); // サーバーに生成中断を依頼
}

肝は「停止の責務を再生側に持たせる」こと。サーバーの中断が一瞬遅れても、ブラウザが先に黙れば体感は途切れません。遅延の世界では、ユーザーに最も近い層で止めるのが正解です。


6. 回復性・冪等・可観測性

WebSocket は切れます。realtime は課金もします。本番には次を入れます。

  • first_audio_delay を計測:SDK の get_first_audio_delay() で「最初の音までの時間」を取得し、構造化ログ/メトリクスに出す。公式は「初回はWS接続確立を含むため初動が遅い」と明記しているので、接続をプールしてウォームに保つのが定石。
  • 自動再接続(指数バックオフ)on_close を検知したら再接続。ただし同じ発話を二重に再生しないよう、発話には一意IDを振り、再生済みIDはスキップ(冪等)。
  • タイムアウトとフォールバック:first_audio_delay が閾値(例:1.5秒)を超えたら、「少々お待ちください」の定型音声に切り替える、あるいはバッチTTSへフォールバック(回復性)。
  • コストの可視化:文字数=課金量。セッションごとに「合成文字数・推定コスト・first_audio_delay・中断回数」を記録し、OpenTelemetryで相関させる。
  • PIIを残さない:会話本文(個人情報を含み得る)はログに残さず、メタデータのみ。
# セッション終了時に計測値を構造化ログへ(本文は出さない)
log.info("tts_session", extra={
    "session_id": tts.get_session_id(),
    "first_audio_delay_ms": tts.get_first_audio_delay(),
    "chars": total_chars,
    "barge_in_count": barge_ins,
})

7. まとめ:低遅延音声エージェントの設計チェックリスト

  • 指標は first_audio_delay:合計ではなく「最初の音」を最適化し、SLIにする。
  • パイプライン化:LLMの最初の文ができたら即TTS(commit で文単位送出)。
  • 3層構成:ブラウザ ⇄ 自サーバー ⇄ DashScope。鍵はサーバー、再生はブラウザ。
  • ギャップレス再生:PCM 24kHz を AudioBufferSourceNode で時刻スケジュール。
  • バージイン:ユーザーに最も近い再生層で即停止+生成中断。
  • 回復性:再接続+発話ID冪等、タイムアウト時の定型音声フォールバック。
  • 可観測性:first_audio_delay・文字数・コスト・中断回数を相関ログに。本文PIIは出さない。

音声エージェントは「賢さ」より「間」の設計です。私はRAG搭載の音声接客システムで、応答の速さと割り込みの自然さを作り込みました。「自社の業務を、待たせない音声対話にする」——その設計から実装・運用まで一気通貫で伴走できます。 要件整理からお気軽にご相談ください。


参考(公式ドキュメント)

友田

友田 陽大

経済産業大臣賞 受賞プロダクト開発者。TypeScript + Python + AWS で、SaaS・業界DX・ 実用レベルの生成AI(RAG)を、要件定義からインフラ・運用まで一人で完遂します。

この記事で解説した技術の適用事例

生成AI音声チャットボット(RAG搭載・来店客向けキオスク+会話分析コンソール)

ケーススタディを見る