メインコンテンツへスキップ
友田 陽大
生成AI・LLM・RAG
RAG
Python
アーキテクチャ設計
GCP
パフォーマンス
可観測性

テレビ番組のテロップ誤字を自動検出する:OCR×音声認識のクロスチェックと Cloud Workflows 並列化・ハイブリッドOCRのコスト最適化

放送番組のテロップ(字幕)の誤字脱字を自動検出するMLパイプラインを、実コードを唯一の真実源として解説します。ローカルOCRで切り替わりを検出してLLM OCRを差分だけに当てるハイブリッドOCR、OCRと音声認識のクロスチェック、Cloud Workflowsによる並列化(約30%短縮)、セグメント単位の冪等・再開可能設計、Firestore×SSEの単調進捗までを実装レベルで掘り下げます。

公開日
読了時間
11分
著者
友田 陽大
シェア

放送現場には「テロップ(字幕)の誤字脱字チェック」という、地味だが神経を使う作業があります。人名や番組名のテロップに一文字でも誤りがあれば、放送事故になりかねない。けれど人間が全カットを目視で照合するのは時間がかかり、見落としも起きます。

これをAIで自動化するのが、本記事で解説するパイプラインです。題材は、国内大手放送事業者向けに構築した社内AIプラットフォームの一機能。守秘義務のため固有名詞は伏せ、コードは匿名化していますが、アーキテクチャと設計判断は実プロダクトのものです。

「動画を入れたらテロップの誤字候補が出てくる」——言葉にすれば一行ですが、本番品質でこれを成立させるには、(1) コストの壁、(2) 速度の壁、(3) 信頼性の壁、という3つの壁を越える必要があります。順に見ていきます。

この記事のルール:唯一の真実源は実コードです。本文中の「約30%短縮(18分→13分)」はパイプラインの実測に基づく値です。一方、精度の絶対値(検出率など)はデータセット依存のため、ここでは仕組みの説明にとどめます。


1. 全体像:5段のパイプラインと2つの目

まず、何をどう処理するか。パイプラインは5段で構成され、その中核に**「2つの目」**があります。

[動画アップロード]
      │
      ▼
① Preprocess(動画分割・音声抽出)
      │
      ├──────────────┬──────────────┐  ← ここが並列
      ▼              ▼
② OCR(画像の目)    ③ Transcriber(耳)
   テロップを読む      発話を文字起こし
      └──────┬───────┘
             ▼
④ TypoChecker(照合)
   OCR結果 と 音声認識結果 を突き合わせ、
   誤字脱字・NGワード・固有名詞の不一致を検出
             ▼
⑤ Notifier(進捗・結果通知)

肝は ②OCR(画面に出ているテロップを読む「目」)③Transcriber(喋っている内容を文字に起こす「耳」) を別々に走らせ、④でクロスチェックすることです。

なぜ2つの経路を持つのか。テロップだけを見ても「これは本当に正しい表記か?」は判断しにくい。でも、発話内容(耳)という独立した根拠と突き合わせれば、「画面には『斎藤』と出ているが、ナレーションでは別の読み——表記ゆれの疑いあり」といった検出ができます。1つの情報源の確信度ではなく、独立した2つの情報源の食い違いを手がかりにする。これが誤検出を減らす鍵です。


2. コストの壁:ハイブリッドOCRという回答

最初の壁はコストです。

OCRには大きく2系統あります。ローカルの軽量OCR(PaddleOCR系)と、LLMによるOCR(マルチモーダルLLMに画像を読ませる)です。**LLM OCRは精度が高いが、高価で遅い。**1フレームごとにLLMを呼んだら、数十分の番組で課金が破綻します。

ここで効くのが、テロップの性質です。テロップは1秒ごとに変わるわけではない。同じテロップが数秒〜十数秒、画面に出続けます。つまり、「テロップが切り替わった瞬間」だけを捉えれば、ユニークなテロップの種類はフレーム数よりずっと少ない。

そこで2段構えにします。

# src_ocr/aggregator/unique_telop_aggregator.py(匿名化・抜粋)
def extract_unique_telops(frames: list[Frame]) -> list[TelopGroup]:
    """ローカルOCRでテロップの『切り替わり』を検出し、
    連続する同一テロップを1グループに畳む。LLM OCRはグループ単位で1回だけ呼ぶ。"""
    groups: list[TelopGroup] = []
    current: TelopGroup | None = None

    for frame in frames:
        # ① 安価なローカルOCRで、まず「いま何が出ているか」を粗く読む
        rough = local_ocr(frame)               # PaddleOCR系:高速・低コスト
        if current and is_same_telop(current.rough, rough):
            current.frames.append(frame)        # 同じテロップ → グループに足すだけ
            continue
        # ② テロップが切り替わった → 新しいグループを開始
        current = TelopGroup(rough=rough, frames=[frame])
        groups.append(current)

    # ③ LLM OCR は「ユニークなテロップ群」にだけ適用する(フレーム全量には当てない)
    for g in groups:
        g.text = llm_ocr(representative_frame(g))  # 高精度・高コスト:回数を絞る
    return groups

この設計の効果は、LLM呼び出し回数を「フレーム数」から「ユニークなテロップ数」へ落とすことです。番組の作りにもよりますが、これだけで桁が変わります。「安価な方法で当たりをつけ、高価な方法は決定的な場面にだけ使う」——コスト最適化の王道を、ドメインの性質(テロップは持続する)に乗せて実装しています。

LLM OCRを呼ぶ際も、画像をチャンクにまとめてバッチ化し、リクエスト数そのものを削ります。**「精度が要るところに金を使い、それ以外は徹底的に節約する」**を、パイプラインの構造として表現しているわけです。


3. 速度の壁:Cloud Workflows で「目」と「耳」を並列に

次は速度です。

②OCR(目)と③Transcriber(耳)は、互いに依存しません。OCRは映像から、Transcriberは音声から、それぞれ独立に処理できます。逐次実行すれば「OCRの10分 + 文字起こしの5分」で15分かかりますが、並列なら遅い方(10分)で済みます。

オーケストレーションには Celery のような分散キューではなく、Google Cloud Workflows(YAMLで宣言するマネージドなワークフロー)+ Cloud Run Jobs を選びました。クラウドネイティブでステートレス、ジョブ単位で自動スケールし、ローカルにワーカー群を常駐させずに済むためです。

# workflow.yaml(匿名化・抜粋)
main:
  steps:
    - preprocess:
        call: run_job
        args: { job: "preprocess", footage_id: ${footage_id} }

    # OCR と Transcriber を parallel ブランチで同時実行
    - analyze_in_parallel:
        parallel:
          shared: [results]
          branches:
            - ocr_branch:
                steps:
                  - run_ocr:
                      call: run_job
                      args: { job: "ocr", footage_id: ${footage_id}, concurrency_limit: 2 }
            - transcribe_branch:
                steps:
                  - run_transcriber:
                      call: run_job
                      args: { job: "transcriber", footage_id: ${footage_id}, concurrency_limit: 2 }

    # 両方そろってから照合(順序依存はここだけ)
    - cross_check:
        call: run_job
        args: { job: "typo_checker", footage_id: ${footage_id} }

    - notify:
        call: run_job
        args: { job: "notifier", footage_id: ${footage_id}, status: "done" }
逐次実行: OCR(10分) → Transcriber(5分) → TypoChecker(3分) = 18分
並列実行: [OCR(10分) ∥ Transcriber(5分)] → TypoChecker(3分) = 13分
                                              → 約30%短縮

並列化で気持ちよく速くなりますが、ここにはもあります。並列に走らせた結果を後段で正しくマージできることが前提です。これが3つ目の壁につながります。


4. 信頼性の壁:並列・再試行に耐える「冪等なID採番」

並列処理と長時間ジョブには、避けられない現実があります。**ジョブは途中で落ちるし、再試行される。**Cloud Run Jobs は失敗時にリトライされ得るし、セグメントを並列に処理すれば「セグメント3だけ失敗して再実行」も起きます。

このとき、各セグメントが勝手にローカルなIDでテロップを採番していると、後段マージでIDが衝突したり、再実行で重複が生まれたりします。そこで、セグメントをまたいで決定的に一意になるグローバルIDを採番します。

# src_common/lib/ocr_segment_merge.py(匿名化・抜粋)
TELOP_ID_SEGMENT_STRIDE = 100_000  # 1セグメントあたりのID空間

def global_telop_id(segment_index: int, local_id: int) -> int:
    """セグメント番号とローカルIDから、全体で一意なIDを決定的に算出する。
    入力が同じなら出力も同じ=再実行しても同じIDに収束(冪等)。"""
    return segment_index * TELOP_ID_SEGMENT_STRIDE + local_id

たったこれだけですが、効果は大きい。global_telop_id入力(セグメント番号・ローカルID)だけから決まる純粋関数なので、

  • 並列に処理しても、セグメントごとにID空間が分かれていて衝突しない
  • セグメント3を再実行しても、同じテロップには同じグローバルIDが付く
  • 後段マージで「ID付け替え(remap)」が一切不要

「再試行に対して結果が一意に収束する(冪等)」を、複雑な調停ロジックではなく採番の設計で実現しています。分散処理の整合性は、ロックや調停で守るより、そもそも衝突しない構造にしてしまう方が堅牢です。

同じ思想は進捗表示にも貫かれています。進捗は Firestore に書き、UIへは SSE(Server-Sent Events)で配信しますが、並列ジョブが順不同で進捗を書くと進捗バーが逆行しかねません。そこで「単調増加だけを許す」関数を噛ませます。

# src_common/lib/firestore.py(匿名化・抜粋)
def resolve_monotonic_progress(prev: float, incoming: float) -> float:
    """進捗は前回値を下回らせない。並列ジョブが古い値を書いても逆行させない。"""
    return max(prev, incoming)

ユーザー体験の「進捗が戻らない」という当たり前を、並列分散の世界で守るための一行です。


5. アップロードとバックエンド:大容量素材を捌く

放送素材は大容量です。フロントからの動画アップロードは、**署名付きURLのチャンク並列アップロード(最大8並列)**で捌きます。ここでも素朴な実装には罠があります。セマフォでスロットを管理すると、エラー時にスロットを返し損ねてデッドロックする事故が起きがちです。

そこで「セマフォでスロットを取り合う」のではなく、チャンクをN個ずつのバッチに区切り、バッチ単位で Promise.all する方式にしました。バッチが終われば次のバッチへ進むだけなので、スロットの解放漏れという失敗モード自体が存在しません。

// front/src/api/upload-service.ts(匿名化・抜粋)
const MAX_CONCURRENCY = 8;
for (let i = 0; i < chunks.length; i += MAX_CONCURRENCY) {
  const batch = chunks.slice(i, i + MAX_CONCURRENCY);
  await Promise.all(batch.map(uploadChunk)); // バッチ単位 → スロット解放漏れが起きない
}

バックエンドは FastAPI(async)。MySQL(SQLAlchemy async)・Redis・GCS への I/O はすべてノンブロッキングで、長時間ジョブのトリガーは Cloud Tasks 経由でHTTPから切り離します。「素材アップロード完了」のWebhook(/internal/footages/{id}/chunk-storage-ready)は、ML起動前に冪等チェックを入れ、同じ完了通知が二度来てもジョブを二重起動しないようにしています。

ローカル開発で地味に効いた設計も一つ。GCSエミュレータを使うとき、コンテナ内部からのアクセス用URLとブラウザに返すURLを環境変数で分離しています(STORAGE_EMULATOR_HOSTSTORAGE_EMULATOR_PUBLIC_URL)。host.docker.internal はブラウザから名前解決できないため、署名付きURLをブラウザに返すときだけ公開アドレスに差し替える。本番には影響しませんが、こうした「開発体験の段差」を消しておくことが、チームの速度を地味に支えます。


6. 品質を支える土台:型・リント・ログ

ML/データ処理のコードは「動けばいい」になりがちですが、本番運用するなら土台が要ります。Python側は uv で依存を固定し、Ruff(リント/フォーマット)・Pyright(型チェック)・pytest(ML/API合わせて多数のテスト)を回します。ログは loguru で構造化し、logger.info / success / error を段階で使い分け、進捗バーは tqdm

可観測性では、各ジョブの結果(クリーン/検出あり/失敗)と所要時間を記録し、ワークフロー単位で追跡できるようにしています。長時間・並列・分散のパイプラインは「どこで何分かかり、どこで落ちたか」が見えないと運用できません。可観測性は機能の後付けではなく、パイプラインの構成要素として組み込んでいます。


7. まとめ:3つの壁と、その越え方

「動画を入れたらテロップの誤字候補が出る」を本番品質にするために越えた壁を、もう一度整理します。

越え方効いた原則
コストハイブリッドOCR(ローカルで切替検出→LLMは差分だけ)安価な方法で当たりをつけ、高価な方法は決定的場面に絞る
速度Cloud Workflows で OCR(目) と Transcriber(耳) を並列化依存のない処理は並列に。約30%短縮(18分→13分)
信頼性セグメント横断の冪等なグローバルID採番+単調進捗整合性は調停で守るより「衝突しない構造」で守る

そして、これら全体を貫くもう一つの判断が**「2つの目(OCRと音声認識)を持つ」**ことでした。1つの情報源の確信度ではなく、独立した情報源の食い違いを手がかりにする。誤検出を構造的に減らすこの考え方は、テロップ校正に限らず、AIで「正しさ」を判定するあらゆる場面に応用できます。

AIの実装で難しいのは、デモを動かすことではなく、コスト・速度・信頼性を同時に満たして本番に置き続けることです。その一つひとつの判断こそが、「このチームに任せて大丈夫だ」とエンタープライズが判断する材料になります。


本記事のコードはすべて匿名化・再構成していますが、設計判断は実プロダクトのものです。「重く高価なAI/ML処理を本番運用品質で」「GCP上の長時間ジョブ・パイプライン」のご相談は、サービスページまたはお問い合わせからどうぞ。

友田

友田 陽大

経済産業大臣賞 受賞プロダクト開発者。TypeScript + Python + AWS で、SaaS・業界DX・ 実用レベルの生成AI(RAG)を、要件定義からインフラ・運用まで一人で完遂します。

この記事で解説した技術の適用事例

国内大手放送事業者の番組制作を支援する社内AIプラットフォーム(マルチサービス基盤・認証ハブを構築)

ケーススタディを見る