Skip to main content
友田 陽大
Voice AI
RAG
Python
アーキテクチャ設計
GCP
パフォーマンス
可観測性

Automatically detecting telop typos in TV programs: OCR × speech recognition cross-check, Cloud Workflows parallelization, and hybrid-OCR cost optimization

An explanation of an ML pipeline that automatically detects typos in broadcast-program telops (subtitles), with real code as the single source of truth. It digs at the implementation level into hybrid OCR (detect switches with local OCR and apply LLM OCR only to the diffs), the OCR-vs-speech-recognition cross-check, parallelization with Cloud Workflows (about 30% shorter), per-segment idempotent and resumable design, and monotonic progress with Firestore × SSE.

Published
Reading time
10 min read
Author
友田 陽大
Share

In the broadcast field there's a task — "checking telop (subtitle) typos" — that's plain but nerve-wracking. If a telop for a person's name or a program name has even a single error, it can become a broadcast accident. But having humans visually verify every cut takes time, and oversights happen.

The pipeline explained in this article is what automates this with AI. The subject is one feature of an internal AI platform built for a major domestic broadcaster. For confidentiality, proper nouns are hidden and the code is anonymized, but the architecture and design decisions are from the real product.

"Put in a video and typo candidates for the telops come out" — it's one line in words, but to make this hold at production quality, you need to cross three walls: (1) the cost wall, (2) the speed wall, and (3) the reliability wall. Let's look at them in order.

Rules for this article: the single source of truth is the real code. The "about 30% shorter (18 min → 13 min)" in the text is a value based on actual pipeline measurement. On the other hand, absolute accuracy values (detection rate, etc.) are dataset-dependent, so here it stays at explaining the mechanism.


1. The big picture: a 5-stage pipeline and two eyes

First, what is processed and how. The pipeline consists of 5 stages, with "two eyes" at its core.

[動画アップロード]
      │
      ▼
① Preprocess(動画分割・音声抽出)
      │
      ├──────────────┬──────────────┐  ← ここが並列
      ▼              ▼
② OCR(画像の目)    ③ Transcriber(耳)
   テロップを読む      発話を文字起こし
      └──────┬───────┘
             ▼
④ TypoChecker(照合)
   OCR結果 と 音声認識結果 を突き合わせ、
   誤字脱字・NGワード・固有名詞の不一致を検出
             ▼
⑤ Notifier(進捗・結果通知)

The crux is running ② OCR (the "eye" that reads the telops on screen) and ③ Transcriber (the "ear" that transcribes what's spoken) separately and cross-checking them in ④.

Why have two paths? Looking at the telop alone, it's hard to judge "is this really the correct spelling?" But cross-checking with an independent basis — the spoken content (the ear) — enables detections like "the screen shows '斎藤,' but the narration has a different reading — suspicion of spelling variation." Using not the confidence of one source but the discrepancy of two independent sources as the clue. This is the key to reducing false detections.


2. The cost wall: hybrid OCR as the answer

The first wall is cost.

OCR comes in roughly two lineages. Local lightweight OCR (PaddleOCR family) and LLM-based OCR (having a multimodal LLM read the image). LLM OCR is high-accuracy but expensive and slow. Calling an LLM per frame breaks the billing for a program tens of minutes long.

What works here is the nature of telops. A telop doesn't change every second. The same telop stays on screen for several to a dozen-plus seconds. In other words, if you capture only "the moment a telop switches," the number of unique telop types is far smaller than the number of frames.

So make it two-tier.

# src_ocr/aggregator/unique_telop_aggregator.py(匿名化・抜粋)
def extract_unique_telops(frames: list[Frame]) -> list[TelopGroup]:
    """ローカルOCRでテロップの『切り替わり』を検出し、
    連続する同一テロップを1グループに畳む。LLM OCRはグループ単位で1回だけ呼ぶ。"""
    groups: list[TelopGroup] = []
    current: TelopGroup | None = None

    for frame in frames:
        # ① 安価なローカルOCRで、まず「いま何が出ているか」を粗く読む
        rough = local_ocr(frame)               # PaddleOCR系:高速・低コスト
        if current and is_same_telop(current.rough, rough):
            current.frames.append(frame)        # 同じテロップ → グループに足すだけ
            continue
        # ② テロップが切り替わった → 新しいグループを開始
        current = TelopGroup(rough=rough, frames=[frame])
        groups.append(current)

    # ③ LLM OCR は「ユニークなテロップ群」にだけ適用する(フレーム全量には当てない)
    for g in groups:
        g.text = llm_ocr(representative_frame(g))  # 高精度・高コスト:回数を絞る
    return groups

The effect of this design is to drop the number of LLM calls from "the number of frames" to "the number of unique telops." Depending on how the program is made, this alone changes the order of magnitude. "Take aim with a cheap method, and use the expensive method only on decisive scenes" — the royal road of cost optimization, implemented on top of the domain's nature (telops persist).

When calling LLM OCR too, batch images into chunks to cut the number of requests themselves. We're expressing "spend money where accuracy is needed, and economize thoroughly elsewhere" as the structure of the pipeline.


3. The speed wall: parallelize the "eye" and the "ear" with Cloud Workflows

Next is speed.

② OCR (the eye) and ③ Transcriber (the ear) don't depend on each other. OCR processes from the video, the Transcriber from the audio, each independently. Run sequentially and it takes "OCR's 10 min + transcription's 5 min" = 15 min, but in parallel it takes only the slower one (10 min).

For orchestration, rather than a distributed queue like Celery, I chose Google Cloud Workflows (a managed workflow declared in YAML) + Cloud Run Jobs. Because it's cloud-native and stateless, auto-scales per job, and avoids keeping a fleet of workers resident locally.

# workflow.yaml(匿名化・抜粋)
main:
  steps:
    - preprocess:
        call: run_job
        args: { job: "preprocess", footage_id: ${footage_id} }

    # OCR と Transcriber を parallel ブランチで同時実行
    - analyze_in_parallel:
        parallel:
          shared: [results]
          branches:
            - ocr_branch:
                steps:
                  - run_ocr:
                      call: run_job
                      args: { job: "ocr", footage_id: ${footage_id}, concurrency_limit: 2 }
            - transcribe_branch:
                steps:
                  - run_transcriber:
                      call: run_job
                      args: { job: "transcriber", footage_id: ${footage_id}, concurrency_limit: 2 }

    # 両方そろってから照合(順序依存はここだけ)
    - cross_check:
        call: run_job
        args: { job: "typo_checker", footage_id: ${footage_id} }

    - notify:
        call: run_job
        args: { job: "notifier", footage_id: ${footage_id}, status: "done" }
逐次実行: OCR(10分) → Transcriber(5分) → TypoChecker(3分) = 18分
並列実行: [OCR(10分) ∥ Transcriber(5分)] → TypoChecker(3分) = 13分
                                              → 約30%短縮

Parallelization speeds things up pleasantly, but there's a trap here too. It's premised on being able to correctly merge the results run in parallel in the later stage. This connects to the third wall.


4. The reliability wall: "idempotent ID numbering" that withstands parallelism and retries

Parallel processing and long-running jobs have an unavoidable reality. Jobs fall over midway and get retried. Cloud Run Jobs can be retried on failure, and processing segments in parallel causes "only segment 3 failed and is re-run."

At this time, if each segment numbers telops with its own local IDs, the later merge has ID collisions or the re-run produces duplicates. So number a global ID that's deterministically unique across segments.

# src_common/lib/ocr_segment_merge.py(匿名化・抜粋)
TELOP_ID_SEGMENT_STRIDE = 100_000  # 1セグメントあたりのID空間

def global_telop_id(segment_index: int, local_id: int) -> int:
    """セグメント番号とローカルIDから、全体で一意なIDを決定的に算出する。
    入力が同じなら出力も同じ=再実行しても同じIDに収束(冪等)。"""
    return segment_index * TELOP_ID_SEGMENT_STRIDE + local_id

It's just this, but the effect is large. global_telop_id is a pure function determined only by inputs (segment number, local ID), so —

  • Even processing in parallel, the ID space is split per segment so there's no collision
  • Even re-running segment 3, the same telop gets the same global ID
  • No "ID remapping" is needed at all in the later merge

We realize "the result converges uniquely against retries (idempotent)" with the design of the numbering, not complex arbitration logic. For consistency in distributed processing, making the structure that doesn't collide in the first place is more robust than guarding with locks or arbitration.

The same philosophy runs through the progress display. Progress is written to Firestore and delivered to the UI via SSE (Server-Sent Events), but if parallel jobs write progress out of order, the progress bar could go backward. So pass it through a function that "allows only monotonic increase."

# src_common/lib/firestore.py(匿名化・抜粋)
def resolve_monotonic_progress(prev: float, incoming: float) -> float:
    """進捗は前回値を下回らせない。並列ジョブが古い値を書いても逆行させない。"""
    return max(prev, incoming)

A single line to protect the obvious user experience of "progress doesn't go back" in the parallel-distributed world.


5. Upload and backend: handling large materials

Broadcast materials are large. Video upload from the frontend is handled with signed-URL chunked parallel upload (up to 8 in parallel). Here too the naïve implementation has a trap. Managing slots with a semaphore tends to cause the accident of failing to return a slot on error and deadlocking.

So rather than "contending for slots with a semaphore," I made it divide chunks into batches of N and Promise.all per batch. Since a finished batch just advances to the next, the failure mode of a slot leak doesn't exist.

// front/src/api/upload-service.ts(匿名化・抜粋)
const MAX_CONCURRENCY = 8;
for (let i = 0; i < chunks.length; i += MAX_CONCURRENCY) {
  const batch = chunks.slice(i, i + MAX_CONCURRENCY);
  await Promise.all(batch.map(uploadChunk)); // バッチ単位 → スロット解放漏れが起きない
}

The backend is FastAPI (async). I/O to MySQL (SQLAlchemy async), Redis, and GCS is all non-blocking, and long-running job triggers are decoupled from HTTP via Cloud Tasks. The "material upload complete" webhook (/internal/footages/{id}/chunk-storage-ready) has an idempotency check before ML startup, so the same completion notification arriving twice doesn't double-start the job.

One design that quietly helped in local development: when using the GCS emulator, I separate the URL for access from inside the container and the URL returned to the browser via environment variables (STORAGE_EMULATOR_HOST and STORAGE_EMULATOR_PUBLIC_URL). Since host.docker.internal can't be name-resolved from the browser, swap in the public address only when returning a signed URL to the browser. It doesn't affect production, but erasing such "bumps in the development experience" quietly supports the team's speed.


6. The foundation that supports quality: types, lint, logs

ML/data-processing code tends to become "as long as it works," but to operate in production you need a foundation. On the Python side, dependencies are pinned with uv, and Ruff (lint/format), Pyright (type check), and pytest (many tests for ML/API together) are run. Logs are structured with loguru, using logger.info / success / error differently by stage, and the progress bar is tqdm.

For observability, I record each job's result (clean / has detections / failed) and elapsed time, making it traceable per workflow. A long-running, parallel, distributed pipeline can't be operated without seeing "where it took how many minutes and where it fell over." Observability is incorporated not as a feature added later but as a component of the pipeline.


7. Conclusion: the three walls, and how to cross them

Let me reorganize, once more, the walls crossed to make "put in a video and typo candidates for the telops come out" production-quality.

WallHow to crossThe principle that worked
CostHybrid OCR (detect switches locally → LLM only on diffs)Take aim with a cheap method, and confine the expensive method to decisive scenes
SpeedParallelize OCR (eye) and Transcriber (ear) with Cloud WorkflowsProcess independent work in parallel. About 30% shorter (18 min → 13 min)
ReliabilityIdempotent global ID numbering across segments + monotonic progressGuard consistency with a "structure that doesn't collide" rather than arbitration

And one more decision running through all of this was "have two eyes (OCR and speech recognition)." Using not the confidence of one source but the discrepancy of independent sources as the clue. This thinking, which structurally reduces false detections, applies not only to telop proofreading but to every scene where AI judges "correctness."

What's hard in AI implementation isn't getting a demo running but satisfying cost, speed, and reliability simultaneously and keeping it in production. Each of those decisions becomes the material by which an enterprise judges "it's safe to entrust this to this team."


All code in this article is anonymized and reconstructed, but the design decisions are from the real product. For consultations on "heavy, expensive AI/ML processing at production-operations quality" or "long-running jobs/pipelines on GCP," please reach out from the services page or contact.

友田

友田 陽大

Developer of a METI Minister's Award–winning product. With TypeScript + Python + AWS, I deliver SaaS, industry DX, and production-grade generative AI (RAG) end to end — from requirements to infrastructure and operations — single-handedly.

Got a challenge?

From design to implementation and operations — solo × generative AI

Implementation like this article's, end to end from requirements to production. Start with a free 30-minute technical consult and tell me about your situation.

Available for both project-based (contract) and advisory engagements. Start with a free 30-minute consult.

Also worth reading