In the broadcast field there's a task — "checking telop (subtitle) typos" — that's plain but nerve-wracking. If a telop for a person's name or a program name has even a single error, it can become a broadcast accident. But having humans visually verify every cut takes time, and oversights happen.
The pipeline explained in this article is what automates this with AI. The subject is one feature of an internal AI platform built for a major domestic broadcaster. For confidentiality, proper nouns are hidden and the code is anonymized, but the architecture and design decisions are from the real product.
"Put in a video and typo candidates for the telops come out" — it's one line in words, but to make this hold at production quality, you need to cross three walls: (1) the cost wall, (2) the speed wall, and (3) the reliability wall. Let's look at them in order.
Rules for this article: the single source of truth is the real code. The "about 30% shorter (18 min → 13 min)" in the text is a value based on actual pipeline measurement. On the other hand, absolute accuracy values (detection rate, etc.) are dataset-dependent, so here it stays at explaining the mechanism.
1. The big picture: a 5-stage pipeline and two eyes
First, what is processed and how. The pipeline consists of 5 stages, with "two eyes" at its core.
[動画アップロード]
│
▼
① Preprocess(動画分割・音声抽出)
│
├──────────────┬──────────────┐ ← ここが並列
▼ ▼
② OCR(画像の目) ③ Transcriber(耳)
テロップを読む 発話を文字起こし
└──────┬───────┘
▼
④ TypoChecker(照合)
OCR結果 と 音声認識結果 を突き合わせ、
誤字脱字・NGワード・固有名詞の不一致を検出
▼
⑤ Notifier(進捗・結果通知)
The crux is running ② OCR (the "eye" that reads the telops on screen) and ③ Transcriber (the "ear" that transcribes what's spoken) separately and cross-checking them in ④.
Why have two paths? Looking at the telop alone, it's hard to judge "is this really the correct spelling?" But cross-checking with an independent basis — the spoken content (the ear) — enables detections like "the screen shows '斎藤,' but the narration has a different reading — suspicion of spelling variation." Using not the confidence of one source but the discrepancy of two independent sources as the clue. This is the key to reducing false detections.
2. The cost wall: hybrid OCR as the answer
The first wall is cost.
OCR comes in roughly two lineages. Local lightweight OCR (PaddleOCR family) and LLM-based OCR (having a multimodal LLM read the image). LLM OCR is high-accuracy but expensive and slow. Calling an LLM per frame breaks the billing for a program tens of minutes long.
What works here is the nature of telops. A telop doesn't change every second. The same telop stays on screen for several to a dozen-plus seconds. In other words, if you capture only "the moment a telop switches," the number of unique telop types is far smaller than the number of frames.
So make it two-tier.
# src_ocr/aggregator/unique_telop_aggregator.py(匿名化・抜粋)
def extract_unique_telops(frames: list[Frame]) -> list[TelopGroup]:
"""ローカルOCRでテロップの『切り替わり』を検出し、
連続する同一テロップを1グループに畳む。LLM OCRはグループ単位で1回だけ呼ぶ。"""
groups: list[TelopGroup] = []
current: TelopGroup | None = None
for frame in frames:
# ① 安価なローカルOCRで、まず「いま何が出ているか」を粗く読む
rough = local_ocr(frame) # PaddleOCR系:高速・低コスト
if current and is_same_telop(current.rough, rough):
current.frames.append(frame) # 同じテロップ → グループに足すだけ
continue
# ② テロップが切り替わった → 新しいグループを開始
current = TelopGroup(rough=rough, frames=[frame])
groups.append(current)
# ③ LLM OCR は「ユニークなテロップ群」にだけ適用する(フレーム全量には当てない)
for g in groups:
g.text = llm_ocr(representative_frame(g)) # 高精度・高コスト:回数を絞る
return groups
The effect of this design is to drop the number of LLM calls from "the number of frames" to "the number of unique telops." Depending on how the program is made, this alone changes the order of magnitude. "Take aim with a cheap method, and use the expensive method only on decisive scenes" — the royal road of cost optimization, implemented on top of the domain's nature (telops persist).
When calling LLM OCR too, batch images into chunks to cut the number of requests themselves. We're expressing "spend money where accuracy is needed, and economize thoroughly elsewhere" as the structure of the pipeline.
3. The speed wall: parallelize the "eye" and the "ear" with Cloud Workflows
Next is speed.
② OCR (the eye) and ③ Transcriber (the ear) don't depend on each other. OCR processes from the video, the Transcriber from the audio, each independently. Run sequentially and it takes "OCR's 10 min + transcription's 5 min" = 15 min, but in parallel it takes only the slower one (10 min).
For orchestration, rather than a distributed queue like Celery, I chose Google Cloud Workflows (a managed workflow declared in YAML) + Cloud Run Jobs. Because it's cloud-native and stateless, auto-scales per job, and avoids keeping a fleet of workers resident locally.
# workflow.yaml(匿名化・抜粋)
main:
steps:
- preprocess:
call: run_job
args: { job: "preprocess", footage_id: ${footage_id} }
# OCR と Transcriber を parallel ブランチで同時実行
- analyze_in_parallel:
parallel:
shared: [results]
branches:
- ocr_branch:
steps:
- run_ocr:
call: run_job
args: { job: "ocr", footage_id: ${footage_id}, concurrency_limit: 2 }
- transcribe_branch:
steps:
- run_transcriber:
call: run_job
args: { job: "transcriber", footage_id: ${footage_id}, concurrency_limit: 2 }
# 両方そろってから照合(順序依存はここだけ)
- cross_check:
call: run_job
args: { job: "typo_checker", footage_id: ${footage_id} }
- notify:
call: run_job
args: { job: "notifier", footage_id: ${footage_id}, status: "done" }
逐次実行: OCR(10分) → Transcriber(5分) → TypoChecker(3分) = 18分
並列実行: [OCR(10分) ∥ Transcriber(5分)] → TypoChecker(3分) = 13分
→ 約30%短縮
Parallelization speeds things up pleasantly, but there's a trap here too. It's premised on being able to correctly merge the results run in parallel in the later stage. This connects to the third wall.
4. The reliability wall: "idempotent ID numbering" that withstands parallelism and retries
Parallel processing and long-running jobs have an unavoidable reality. Jobs fall over midway and get retried. Cloud Run Jobs can be retried on failure, and processing segments in parallel causes "only segment 3 failed and is re-run."
At this time, if each segment numbers telops with its own local IDs, the later merge has ID collisions or the re-run produces duplicates. So number a global ID that's deterministically unique across segments.
# src_common/lib/ocr_segment_merge.py(匿名化・抜粋)
TELOP_ID_SEGMENT_STRIDE = 100_000 # 1セグメントあたりのID空間
def global_telop_id(segment_index: int, local_id: int) -> int:
"""セグメント番号とローカルIDから、全体で一意なIDを決定的に算出する。
入力が同じなら出力も同じ=再実行しても同じIDに収束(冪等)。"""
return segment_index * TELOP_ID_SEGMENT_STRIDE + local_id
It's just this, but the effect is large. global_telop_id is a pure function determined only by inputs (segment number, local ID), so —
- Even processing in parallel, the ID space is split per segment so there's no collision
- Even re-running segment 3, the same telop gets the same global ID
- No "ID remapping" is needed at all in the later merge
We realize "the result converges uniquely against retries (idempotent)" with the design of the numbering, not complex arbitration logic. For consistency in distributed processing, making the structure that doesn't collide in the first place is more robust than guarding with locks or arbitration.
The same philosophy runs through the progress display. Progress is written to Firestore and delivered to the UI via SSE (Server-Sent Events), but if parallel jobs write progress out of order, the progress bar could go backward. So pass it through a function that "allows only monotonic increase."
# src_common/lib/firestore.py(匿名化・抜粋)
def resolve_monotonic_progress(prev: float, incoming: float) -> float:
"""進捗は前回値を下回らせない。並列ジョブが古い値を書いても逆行させない。"""
return max(prev, incoming)
A single line to protect the obvious user experience of "progress doesn't go back" in the parallel-distributed world.
5. Upload and backend: handling large materials
Broadcast materials are large. Video upload from the frontend is handled with signed-URL chunked parallel upload (up to 8 in parallel). Here too the naïve implementation has a trap. Managing slots with a semaphore tends to cause the accident of failing to return a slot on error and deadlocking.
So rather than "contending for slots with a semaphore," I made it divide chunks into batches of N and Promise.all per batch. Since a finished batch just advances to the next, the failure mode of a slot leak doesn't exist.
// front/src/api/upload-service.ts(匿名化・抜粋)
const MAX_CONCURRENCY = 8;
for (let i = 0; i < chunks.length; i += MAX_CONCURRENCY) {
const batch = chunks.slice(i, i + MAX_CONCURRENCY);
await Promise.all(batch.map(uploadChunk)); // バッチ単位 → スロット解放漏れが起きない
}
The backend is FastAPI (async). I/O to MySQL (SQLAlchemy async), Redis, and GCS is all non-blocking, and long-running job triggers are decoupled from HTTP via Cloud Tasks. The "material upload complete" webhook (/internal/footages/{id}/chunk-storage-ready) has an idempotency check before ML startup, so the same completion notification arriving twice doesn't double-start the job.
One design that quietly helped in local development: when using the GCS emulator, I separate the URL for access from inside the container and the URL returned to the browser via environment variables (STORAGE_EMULATOR_HOST and STORAGE_EMULATOR_PUBLIC_URL). Since host.docker.internal can't be name-resolved from the browser, swap in the public address only when returning a signed URL to the browser. It doesn't affect production, but erasing such "bumps in the development experience" quietly supports the team's speed.
6. The foundation that supports quality: types, lint, logs
ML/data-processing code tends to become "as long as it works," but to operate in production you need a foundation. On the Python side, dependencies are pinned with uv, and Ruff (lint/format), Pyright (type check), and pytest (many tests for ML/API together) are run. Logs are structured with loguru, using logger.info / success / error differently by stage, and the progress bar is tqdm.
For observability, I record each job's result (clean / has detections / failed) and elapsed time, making it traceable per workflow. A long-running, parallel, distributed pipeline can't be operated without seeing "where it took how many minutes and where it fell over." Observability is incorporated not as a feature added later but as a component of the pipeline.
7. Conclusion: the three walls, and how to cross them
Let me reorganize, once more, the walls crossed to make "put in a video and typo candidates for the telops come out" production-quality.
| Wall | How to cross | The principle that worked |
|---|---|---|
| Cost | Hybrid OCR (detect switches locally → LLM only on diffs) | Take aim with a cheap method, and confine the expensive method to decisive scenes |
| Speed | Parallelize OCR (eye) and Transcriber (ear) with Cloud Workflows | Process independent work in parallel. About 30% shorter (18 min → 13 min) |
| Reliability | Idempotent global ID numbering across segments + monotonic progress | Guard consistency with a "structure that doesn't collide" rather than arbitration |
And one more decision running through all of this was "have two eyes (OCR and speech recognition)." Using not the confidence of one source but the discrepancy of independent sources as the clue. This thinking, which structurally reduces false detections, applies not only to telop proofreading but to every scene where AI judges "correctness."
What's hard in AI implementation isn't getting a demo running but satisfying cost, speed, and reliability simultaneously and keeping it in production. Each of those decisions becomes the material by which an enterprise judges "it's safe to entrust this to this team."
All code in this article is anonymized and reconstructed, but the design decisions are from the real product. For consultations on "heavy, expensive AI/ML processing at production-operations quality" or "long-running jobs/pipelines on GCP," please reach out from the services page or contact.