「メール送信に3秒かかってAPIレスポンスが詰まる」「決済後の集計バッチが昼間のトラフィックを巻き込んで落ちる」「画像変換を同期でやっていてリクエストがタイムアウトする」——これらはすべて、重い処理をリクエストの外に追い出すことで解ける問題です。Python でそれを担う事実上の標準が Celery、そしてそのメッセージ基盤として最も手軽に始められるのが Redis です。
ただし、Celery + Redis は「.delay() を呼べば動く」だけのフレームワークではありません。何も考えずに本番投入すると、タスクの二重実行・メッセージ消失・無限再配信・メモリ枯渇が静かに起こります。 これらはどれも、公式ドキュメントが明確に警告している既知の落とし穴です。
本記事は、Celery 5.6 系(2026年3月時点の最新安定版は 5.6.3、Python 3.9+/Python 3.14 対応)の公式ドキュメントに忠実に、本番運用に耐える設計を、実際に書けるコードと「いつ・どう使うか」の判断軸つきで解説します。私は実際の金銭を扱うサーバーレス決済基盤で冪等性・再試行・整合性の信頼性レイヤーを設計・主導してきました(関連事例)。その「分散処理で正しさを構造で守る」という規律を、Celery の文脈に翻訳したのが本記事です。
本文中のコードは Celery 5.6 系の API に準拠しています。設定キーとデフォルト値は公式ドキュメント(docs.celeryq.dev)に基づいて記載していますが、本番投入前にお使いのバージョンの該当ページで必ず確認してください。
この記事で扱うこと
- Celery + Redis を使うべき場面と、使うべきでない場面(最初に判断軸を持つ)
- 最小構成から本番設定までの設定リファレンス(公式デフォルト値つき)
- Redis を broker / backend にするときの3つの罠(
visibility_timeout・キー退避・ETA再配信) - 冪等性——「at-least-once(最低1回)」配信を前提にタスクを壊れないように設計する
- autoretry / backoff / jitter による再試行戦略(公式パラメータの正確な意味)
- Beat による定期実行、Canvas(chain/group/chord)によるワークフロー
- prefetch・並行度・time limit の本番チューニング
- 可観測性(Flower / inspect)とセキュリティ(json限定・Redis認証・秘密情報の扱い)
1. そもそも Celery を使うべきか — 判断を最初に
技術選定で最初にやるべきは「導入する理由」より「導入しない理由」を潰すことです。Celery は強力ですが、運用コスト(worker・broker・監視)を伴います。次の表で適性を判断してください。
| 状況 | 推奨 |
|---|---|
| リクエスト内で完結する軽い処理(数十ms) | Celery不要。 同期で十分。キューは複雑性の追加にすぎない |
| 重い/遅い処理をレスポンスから切り離したい(メール・PDF・画像変換・外部API) | Celery適性◎ |
| 定期バッチ(日次集計・クリーンアップ・リマインド) | Celery Beat 適性◎(または OS の cron / クラウドのスケジューラ) |
| 大量のI/O待ちを捌きたい(並行Web取得など) | Celery でも可。ただし asyncio ネイティブなら arq / Dramatiq も検討 |
| 厳密な順序保証・exactly-once が要件 | Celery + Redis は不向き。 後述の通り「最低1回」配信。Kafka やDBアウトボックスを検討 |
| AWS中心でサーバーレス志向 | SQS + Lambda の方がインフラ運用を抱えずに済む場合がある |
Redis を broker にする前に知るべきトレードオフ
Celery の broker(メッセージの通り道)には Redis のほか RabbitMQ が選べます。Redis は導入が圧倒的に簡単で、result backend も兼ねられる一方、インメモリゆえの限界があります。
| 観点 | Redis broker | RabbitMQ broker |
|---|---|---|
| 導入の手軽さ | ◎ 1コンテナで開始、backend兼用可 | △ AMQPの理解が要る |
| 配信保証 | ○ ただし永続化(AOF/RDB)と設定次第。クラッシュで未処理メッセージ消失リスク | ◎ 確認応答・永続キューが本職 |
| 大量メッセージ/高信頼 | △ メモリ上限・退避ポリシーに注意 | ◎ |
| ETA/長期スケジュール | ✕ visibility_timeout の制約が大きい(後述) | ○ |
結論: 中小規模・スタートアップ初期・「まず動かす」段階では Redis が最良の選択です。本記事も Redis を前提にします。ただし「金銭・在庫など消失が許されないメッセージを、強い配信保証で捌きたい」なら RabbitMQ を、「クラッシュ耐性を Redis で担保する」なら AOF 永続化を有効化したうえで運用してください。何を選んでも、後述の冪等性設計は必須です。Redis は「最低1回」配信であり、二重実行は設計で吸収する前提だからです。
2. 最小構成 — 公式推奨のプロジェクトレイアウト
公式が推奨するレイアウトに沿って、proj パッケージに Celery アプリを定義します。
proj/
├── __init__.py
├── celery.py # Celery アプリ本体と設定
└── tasks.py # タスク定義
# proj/celery.py
from celery import Celery
app = Celery(
"proj",
broker="redis://localhost:6379/0", # DB 0 = メッセージ用
backend="redis://localhost:6379/1", # DB 1 = 結果用(broker と分離)
include=["proj.tasks"],
)
# 設定は一箇所に集約(SSoT)。詳細は §4 の本番設定で深掘り。
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"], # json 以外を受け付けない(セキュリティ §11)
timezone="Asia/Tokyo",
enable_utc=True,
broker_connection_retry_on_startup=True, # 起動時に broker が未起動でも再接続を試みる
result_expires=3600, # 結果は1時間で失効(既定は1日)
)
# proj/tasks.py
from .celery import app
@app.task
def add(x: int, y: int) -> int:
return x + y
worker の起動と呼び出し:
# worker を起動(並行度は既定で CPU コア数)
$ celery -A proj worker --loglevel=INFO
>>> from proj.tasks import add
>>> add.delay(2, 3) # 非同期で投入(fire-and-forget の最短形)
>>> r = add.delay(2, 3)
>>> r.get(timeout=5) # 結果を取得(backend が必要)
5
ここまでは公式の "First steps" の通りです。問題はここから先——本番では .delay() の手軽さの裏に潜む配信セマンティクスと向き合う必要があります。
3. Redis を broker / backend にするときの「3つの罠」
Redis を使うなら、公式ドキュメントが警告する次の3点を設定する前提で押さえてください。知らずに本番投入すると、原因不明の二重実行や無限ループに悩まされます。
罠①:visibility_timeout — 「見えなくなる時間」を理解する
Redis broker は AMQP のような明示的な確認応答キューを持ちません。代わりに、worker がメッセージを取得すると一定時間そのメッセージを「不可視」にし、その時間内に確認応答(ack)が返らなければ、別の worker に再配信します。この猶予が visibility_timeout です。
app.conf.broker_transport_options = {"visibility_timeout": 3600} # 既定 1 時間(秒)
これが意味するのは——1つのタスクの実行(再試行待ちも含む)が visibility_timeout を超えると、Redis は「死んだ」と判断して同じタスクを別 worker に再配信するということ。つまり長時間タスクや長い countdown/eta を使うと、同じタスクが多重に走り続けることがあります。
罠②:ETA / countdown は「遠い未来」に使わない
公式は明言しています——eta と countdown で遠い未来をスケジュールするのは推奨されない。長くても数分にとどめよ。
理由は罠①と直結します。countdown=7200(2時間後)を指定すると、visibility_timeout(既定1時間)を超えた時点で Redis が再配信し、タスクが重複します。さらに、ETA タスクは worker のメモリに溜まる性質があり、大量に積むと OOM の原因になります(5.6 で worker_eta_task_limit という保護設定が追加されました)。
正しい使い分け:
- 「数十秒〜数分後」の遅延 →
countdown/etaで OK - 「数時間後・特定時刻・毎日」 → Beat(§7)か DB ベースのスケジューラを使う
どうしても長い visibility_timeout が必要なら、公式は3箇所すべてを同じ値に設定するよう指示しています(片方だけだと不整合が起きます)。
# 例:12 時間に伸ばすなら 3 つとも揃える
app.conf.broker_transport_options = {"visibility_timeout": 43200}
app.conf.result_backend_transport_options = {"visibility_timeout": 43200}
app.conf.visibility_timeout = 43200
罠③:Redis のメモリ退避ポリシー
Redis はインメモリDBなので、メモリ上限に達するとキーを退避(eviction)します。Celery のメッセージや結果キーが退避されると、InconsistencyError が出てタスクや結果が静かに消えます。
公式の指示は明確です——maxmemory-policy を noeviction か allkeys-lru に設定せよ。
# redis.conf(または CONFIG SET)
maxmemory 2gb
maxmemory-policy noeviction # メモリ満杯時は退避せず書き込みを拒否(消失より明示的エラーを選ぶ)
noeviction は「メモリが満杯なら新規書き込みを拒否してエラーにする」挙動です。一見不便ですが、「黙ってメッセージを失う」より「明示的に失敗する」方が、分散システムでは圧倒的に安全です。失敗は検知できますが、消失は検知できません。あわせて、worker のグレースフルシャットダウン時の再キューを改善する worker_soft_shutdown_timeout(秒)も検討してください。
4. 本番設定リファレンス(公式デフォルト値つき)
設定は専用モジュールに集約して SSoT 化します。各キーの公式デフォルト値を併記しておくので、「何を上書きしているか」が一目で分かります。
# proj/celeryconfig.py — 本番設定の単一の真実源
from celery.schedules import crontab
# --- シリアライズ(セキュリティの土台) ---
task_serializer = "json" # 既定: "json"(4.0 以降)。pickle は使わない
result_serializer = "json" # 既定: "json"
accept_content = ["json"] # 既定: {"json"}。受け入れる形式を json に固定
# --- broker / backend ---
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
broker_connection_retry_on_startup = True # 既定: 有効。起動時の接続リトライ
broker_pool_limit = 10 # 既定: 10。接続プールの上限
broker_transport_options = {"visibility_timeout": 3600}
# --- 結果の寿命(コスト効率) ---
result_expires = 3600 # 既定: 1 日。結果の墓標(tombstone)が消えるまでの秒数
task_ignore_result = False # 既定: 無効。結果が不要なタスクは個別に ignore_result=True
# --- 信頼性(冪等性とセットで効く。§5・§8 参照) ---
task_acks_late = True # 既定: 無効。実行「後」に ack(少なくとも1回保証へ)
task_reject_on_worker_lost = True # 既定: 無効。worker 異常終了時にメッセージを再キュー
# --- タイムアウト(暴走の封じ込め。§8 参照) ---
task_time_limit = 300 # 既定: 無制限。ハード上限(超えると worker ごと kill)
task_soft_time_limit = 270 # 既定: 無制限。ソフト上限(例外で猶予を与える)
# --- worker チューニング(§8 参照) ---
worker_prefetch_multiplier = 4 # 既定: 4。先読みするタスク数 × 並行度
worker_max_tasks_per_child = 1000 # 既定: 無制限。N 件処理ごとに子プロセスを再生成(メモリリーク対策)
worker_send_task_events = True # 既定: 無効。Flower 等での監視を有効化(§10)
task_track_started = True # 既定: 無効。"STARTED" 状態を報告(進捗可視化)
# --- ルーティング(§8 参照) ---
task_default_queue = "celery" # 既定: "celery"
task_routes = {
"proj.tasks.charge_*": {"queue": "payments"},
"proj.tasks.send_*": {"queue": "notifications"},
}
# --- 定期実行(§7 参照) ---
timezone = "Asia/Tokyo"
enable_utc = True
beat_schedule = {
"nightly-cleanup": {
"task": "proj.tasks.cleanup_expired",
"schedule": crontab(hour=3, minute=0), # 毎日 3:00
},
}
# proj/celery.py で読み込む
app = Celery("proj")
app.config_from_object("proj.celeryconfig")
app.autodiscover_tasks()
設計上のポイント: 設定を散らさず1モジュールに集約することで、レビューと差分管理が容易になり、「どこで何を上書きしたか」が追えます。これは DRY であると同時に、設定そのものをコードレビューの対象にするという運用です。
5. 冪等性 — 「最低1回」配信を前提に壊れないタスクを書く
ここが Celery 設計の核心です。公式ドキュメントは繰り返し警告しています:
理想的にはタスク関数は冪等であるべき——同じ引数で複数回呼ばれても、意図しない副作用を起こさないこと。Celery は既定で「at-least-once(最低1回)」配信であり、ack されるまでメッセージはキューに残るため、重複実行が起こりうる。
つまり「タスクは1回だけ実行される」という前提でコードを書いてはいけません。task_acks_late = True にすると配信保証は強くなりますが、その代償として**「worker が実行中にクラッシュすると、同じタスクが再実行される」**ことを公式は明記しています。信頼性(消えない)と冪等性(重複しても壊れない)はセットなのです。
アンチパターン:冪等性のないタスク
@app.task
def charge_customer(order_id: str, amount: int) -> None:
# 再配信されると二重課金になる。決済では致命的。
payment_gateway.charge(order_id, amount)
db.mark_paid(order_id)
このタスクが task_acks_late=True 下で実行中にクラッシュ → 再配信 → 二重課金。これは私が決済基盤で最も厳しく潰してきた事故そのものです。
解法A:冪等性キーで「1回に収束」させる
クライアント(または呼び出し元)が発行する一意キーで、処理済みを記録します。Redis の SET NX(存在しなければセット)を使うと原子的にロックできます。
import redis
r = redis.Redis.from_url("redis://localhost:6379/2")
def _claim_once(key: str, ttl: int = 86400) -> bool:
"""このキーで初めての実行なら True。原子的(SET NX)。"""
return bool(r.set(f"idem:{key}", "1", nx=True, ex=ttl))
@app.task(bind=True, acks_late=True, reject_on_worker_lost=True)
def charge_customer(self, order_id: str, amount: int, idempotency_key: str) -> None:
if not _claim_once(idempotency_key):
# 既に処理済み。再配信なので何もせず正常終了(= ack して再配信を止める)
return
payment_gateway.charge(order_id, amount)
db.mark_paid(order_id)
解法A の落とし穴:「先にマーク」は処理を失う
解法A には順序の罠があります。_claim_once を先に実行してその後にクラッシュすると、キーだけが残り、課金は実行されていないのに「処理済み」と誤判定され、決済が静かに失われます。これは決済 Webhook の実装で私が実際に踏み抜いた論点です。
正しい考え方は2つ:
-
下流の操作自体を冪等にする — 決済ゲートウェイ(Stripe 等)に冪等キーを渡す。ゲートウェイ側が重複を吸収するので、Celery 側が二重に呼んでも課金は1回。これが最も堅牢です。
@app.task(bind=True, acks_late=True, reject_on_worker_lost=True) def charge_customer(self, order_id: str, amount: int, idempotency_key: str) -> None: # ゲートウェイに冪等キーを委譲。再実行されても課金は 1 回に収束する。 payment_gateway.charge(order_id, amount, idempotency_key=idempotency_key) db.mark_paid(order_id) # UPSERT / 条件付き更新で冪等に -
DBの一意制約に守らせる —
idempotency_keyに UNIQUE 制約を張り、INSERT ... ON CONFLICT DO NOTHING(PostgreSQL)で「2回目の挿入は何もしない」をDB側で原子的に保証する。アプリのロックより堅牢で、単一障害点になりません。
原則:正しさを「運用の注意深さ」ではなく「構造(DB制約・ゲートウェイの冪等キー・原子的操作)」で守る。 レビューや手順書で守る正しさはいつか破れますが、一意制約や条件付き書き込みで守る正しさは破れません。
6. リトライ戦略 — autoretry / backoff / jitter
外部API・ネットワーク・一時的なロックは「たまに失敗するが、少し待てば成功する」失敗です。これを吸収するのがリトライ。Celery は宣言的に書けます。公式パラメータの正確な意味とデフォルト値を押さえましょう。
import requests
@app.task(
bind=True,
autoretry_for=(requests.RequestException,), # この例外群なら自動再試行
retry_backoff=True, # 指数バックオフ(True なら 1,2,4,8... 秒)
retry_backoff_max=600, # 既定 600。バックオフの上限(10 分)
retry_jitter=True, # 既定 True。遅延に乱数を混ぜ、同時再試行の雪崩を防ぐ
max_retries=5, # 既定 3。None なら無限再試行
retry_kwargs={"countdown": 5},
)
def fetch_exchange_rate(self, base: str) -> dict:
resp = requests.get(f"https://api.example.com/rate/{base}", timeout=10)
resp.raise_for_status()
return resp.json()
公式デフォルトの正確な値:
| パラメータ | 既定値 | 意味 |
|---|---|---|
max_retries | 3 | 諦めるまでの最大再試行回数。None で無限 |
default_retry_delay | 180(3分) | 再試行までの既定待機秒数 |
retry_backoff | False | True で指数バックオフ |
retry_backoff_max | 600(10分) | バックオフ遅延の上限 |
retry_jitter | True | バックオフに乱数を加える |
jitter が「雪崩」を防ぐ理由
外部APIが一時的に落ちると、同時刻に失敗した大量のタスクが、同じ間隔で一斉に再試行します。これが「サンダリングハード(thundering herd)」——復旧しかけたAPIに再びDDoS級の負荷をかけて再び落とす悪循環です。retry_jitter=True は再試行タイミングに乱数を混ぜて山をならし、これを防ぎます。既定で有効になっているのは、Celery がこの問題を熟知しているからです。
手動リトライ:「再試行すべき失敗」だけを再試行する
すべての例外を再試行するのは誤りです。「残高不足」や「バリデーションエラー」は何度再試行しても結果が変わらない——むしろ再試行はレイテンシとコストの無駄です。bind=True と self.retry() で、再試行すべき失敗だけを選別します。
@app.task(bind=True, max_retries=5)
def sync_inventory(self, sku: str) -> None:
try:
result = external_api.sync(sku)
except TransientError as exc:
# 一時的失敗 → 指数バックオフで再試行
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except PermanentError:
# 恒久的失敗 → 再試行せず即失敗(ログ・DLQ 行き)
logger.error("permanent failure for sku=%s", sku)
raise
db.save(sku, result)
self.request.retries で現在の再試行回数(0始まり)が取れます。「再試行で直る失敗」と「直らない失敗」を混ぜないことが、レイテンシ・コスト・可観測性のすべてを守ります。
7. 定期実行 — Celery Beat
「毎日3時にクリーンアップ」「5分ごとに集計」のような定期タスクは Celery Beat が担います。Beat はスケジュールに従ってタスクをキューに投入する常駐プロセスで、worker とは別に起動します。
from celery.schedules import crontab
app.conf.beat_schedule = {
# timedelta 形式:30 秒ごと(前回実行から 30 秒後)
"ping-every-30s": {
"task": "proj.tasks.healthcheck",
"schedule": 30.0,
},
# crontab 形式:毎週月曜 7:30
"weekly-report": {
"task": "proj.tasks.send_report",
"schedule": crontab(hour=7, minute=30, day_of_week=1),
"args": (16, 16),
},
# 毎日 3:00 にクリーンアップ
"nightly-cleanup": {
"task": "proj.tasks.cleanup_expired",
"schedule": crontab(hour=3, minute=0),
},
}
app.conf.timezone = "Asia/Tokyo"
$ celery -A proj beat --loglevel=INFO # Beat を起動(worker とは別プロセス)
Beat の最重要警告:「スケジューラは1つだけ」
公式が太字で警告している通り:
同一スケジュールに対して、スケジューラは常に1つだけ稼働させること。さもないとタスクが重複する。
ここを誤ると本番事故になります。Kubernetes で Beat を含む Pod を2レプリカにしたり、worker に -B(embedded beat)を付けたまま複数 worker を立てると、同じ定期タスクが台数分だけ重複投入されます。請求バッチが2回走る、通知が二重に飛ぶ——典型的な障害です。
対策:
- Beat は必ず単一インスタンスで動かす(Deployment は
replicas: 1、または Leader election)。 -B(embedded beat)は開発専用。公式も本番非推奨。worker のスケールと Beat の単一性は両立しないからです。- 動的にスケジュールを足したい・管理画面から編集したいなら、Django 環境では
django-celery-beatのDatabaseSchedulerを使う。スケジュールをDBに置き、複数 worker でも単一の真実源を共有できます。
$ celery -A proj beat -l INFO \
--scheduler django_celery_beat.schedulers:DatabaseScheduler
8. ワークフロー — Canvas(chain / group / chord)
「複数タスクを順番に」「並列に」「並列実行の後にまとめる」といった合成は Canvas が担います。土台はシグネチャ——タスク呼び出しを「データ」として持ち回れるオブジェクトです。
from celery import chain, group, chord
# シグネチャ:引数を部分適用した「呼び出しの設計図」
add.s(2, 2) # add(2, 2) のシグネチャ
add.si(2, 2) # immutable(不変)。前段の結果を受け取らない(コールバック向け)
# chain:直列。前段の結果が次段の第1引数に渡る → (2+2)+4)+8 = 16
chain(add.s(2, 2) | add.s(4) | add.s(8))().get() # 16
# group:並列。全タスクを同時に投入
group(add.s(i, i) for i in range(10))()
# chord:group の全完了後にコールバックを1回実行(並列 → 集約)
chord(add.s(i, i) for i in range(100))(tsum.s()).get()
実用例:並列取得 → 集約
「複数ソースからデータを並列取得し、全部揃ったら集計する」は chord の典型です。
from celery import chord
@app.task
def fetch_source(source_id: str) -> dict:
return external_api.fetch(source_id)
@app.task
def aggregate(results: list[dict]) -> dict:
return {"total": sum(r["value"] for r in results)}
# 5 ソースを並列取得し、全完了後に aggregate へまとめて渡す
workflow = chord(
(fetch_source.s(sid) for sid in ["a", "b", "c", "d", "e"]),
aggregate.s(),
)
result = workflow()
Canvas の落とし穴
-
chord のタスクは結果を捨ててはいけない。 公式の警告通り、chord の構成タスク(ヘッダ/ボディ)のどれかが
ignore_result=Trueだと、コールバックが「全完了」を判定できず壊れます。chord を使うならresult_backendを有効にし、関連タスクは結果を保存すること。 -
エラーハンドリングは
link_error/on_error。 chain の途中で失敗したとき、後続をどうするかを明示します。add.apply_async((2, 2), link=mul.s(16), link_error=log_error.s()) add.s(2, 2).on_error(log_error.s()).delay() -
immutable シグネチャ
.si()を使い分ける。 コールバックに前段の結果を渡したくない(固定引数で呼びたい)ときは.si()。.s()は前段の結果を先頭に prepend するため、非可換なタスクでは意図しない引数順になります。
9. 呼び出しオプション — delay() と apply_async()
delay() は apply_async() の手軽なショートカットです。実行オプションを付けたいときは apply_async() を使います。
# 等価
add.delay(2, 2)
add.apply_async(args=(2, 2))
# 実行オプションは apply_async でのみ指定可能
add.apply_async(
args=(2, 2),
countdown=10, # 10 秒後に実行(§3 の通り「数分」までに)
expires=60, # 60 秒以内に実行されなければ破棄
queue="payments", # ルーティング先キュー
priority=5, # 優先度(broker 依存)
retry=True,
retry_policy={"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.2},
)
送信時の接続エラーを握る
broker がダウンしていると、タスク送信そのものが失敗します。これを無視すると「投入したつもりが消えていた」事故になります。公式は OperationalError の捕捉を示しています。
try:
charge_customer.delay(order_id, amount, idem_key)
except charge_customer.OperationalError as exc:
# broker 接続失敗。アプリ側でフォールバック(DB アウトボックスに退避 等)
logger.exception("failed to enqueue task: %r", exc)
outbox.save_for_later(order_id, amount, idem_key)
expires も重要です。例えば「在庫通知」は5分後には無意味なので expires=300 を付ければ、worker が詰まって遅延しても古い無意味なタスクを自動破棄でき、回復時の無駄な処理とコストを防げます。
10. 本番チューニング — prefetch・並行度・time limit
prefetch(先読み)— ワークロードで値を変える
worker は効率のため、実行スロットの数より多くのタスクを**先読み(reserve)**します。その量が worker_prefetch_multiplier(既定 4)× 並行度です。これはワークロードによって最適値が真逆になります。
| ワークロード | 推奨 | 理由 |
|---|---|---|
| 長時間タスク(分単位) | worker_prefetch_multiplier = 1 | 先読みすると、1つの worker がタスクを抱え込み、他の worker が遊ぶ。公式も「長いタスクは 1」と明言 |
| 短時間・大量タスク(ミリ秒〜秒) | 50〜128 等の大きめの値 | 先読みでスループットを稼ぐ。公式は「64 や 128 が妥当な場面もある」 |
| 長短混在 | worker を分ける | 公式推奨:長いタスク用と短いタスク用の worker ノードを別設定で立て、ルーティングで振り分ける |
# 決済(長め・厳格):先読み 1、遅延 ack、低並行
$ celery -A proj worker -Q payments --prefetch-multiplier=1 --concurrency=4
# 通知(短く大量):先読み多め、高並行
$ celery -A proj worker -Q notifications --prefetch-multiplier=64 --concurrency=8
task_acks_late=True と worker_prefetch_multiplier=1 を組み合わせると、先読みによる抱え込みを減らしつつ「実行後 ack」で信頼性を上げられます(冪等性が前提)。Redis 限定で、スロットが空いたときだけ取得する worker_disable_prefetch も選択肢です。
メモリ対策 — 子プロセスの定期再生成
長時間動かすと、依存ライブラリのメモリリークで worker が肥大化します。対策は子プロセスの定期再生成です。
worker_max_tasks_per_child = 1000 # 1000 タスクごとに子を作り直す
worker_max_memory_per_child = 300000 # 300MB 超で子を作り直す(KB 単位)
ただし公式は過度な設定を戒めています。 例えば起動に1秒かかる子を「1タスクごとに再生成」すると、毎分60タスクしか捌けません。再生成コストとメモリ安定性のトレードオフを測って決めること。
time limit — 暴走を封じ込める
無限ループやハングしたタスクが worker を占有し続けるのを防ぎます。
task_soft_time_limit = 270 # ソフト:270 秒で SoftTimeLimitExceeded を送出(後始末の猶予)
task_time_limit = 300 # ハード:300 秒で worker ごと強制終了
ソフト上限は例外として捕捉でき、トランザクションのロールバックや一時ファイルの削除など後始末ができます。ハード上限はその猶予を与えず即 kill します。両方を設定し、ソフトをハードより小さくするのが定石です。
from celery.exceptions import SoftTimeLimitExceeded
@app.task(soft_time_limit=270, time_limit=300)
def render_pdf(doc_id: str) -> None:
try:
do_render(doc_id)
except SoftTimeLimitExceeded:
cleanup_temp_files(doc_id) # 後始末してから諦める
raise
11. 可観測性 — 見えないものは運用できない
分散タスクは「どこで詰まっているか」が見えにくい。Celery は標準で監視手段を備えています。
Flower — リアルタイムWeb監視
$ pip install flower
$ celery -A proj flower --port=5555 # http://localhost:5555
タスクの進捗・履歴・統計・worker状態をブラウザで確認でき、リモート制御やHTTP APIも提供します。Flower を使うには worker_send_task_events = True を有効にしておきます(§4 で設定済み)。
inspect / status — CLI でクラスタを覗く
$ celery -A proj status # 稼働中ノード一覧
$ celery -A proj inspect active # 実行中タスク
$ celery -A proj inspect scheduled # ETA 待ちタスク
$ celery -A proj inspect reserved # 先読み済み・実行待ちタスク
$ celery -A proj inspect stats # worker 統計
$ celery -A proj inspect ping # 死活確認
運用の勘所: inspect reserved の件数が膨らんでいたら、prefetch が過剰でタスクが worker に抱え込まれているサインです(§8)。scheduled が積み上がっていたら、eta/countdown の使いすぎ(§3)を疑います。メトリクスを勘の代わりにする——これは可観測性の本質です。
本番では Flower 単体に頼らず、task_track_started=True で進捗を可視化し、Prometheus エクスポータや構造化ログと組み合わせて、レイテンシ・失敗率・キュー滞留を継続監視するのが理想です。
12. セキュリティ — タスクキューは攻撃面になる
タスクキューは「信頼境界をまたぐデータの通り道」です。設計を誤ると深刻な脆弱性になります。
① シリアライザは json に固定。pickle は使わない
歴史的に Celery の最大の脆弱性は pickle デシリアライズによる任意コード実行(RCE) でした。pickle は任意の Python オブジェクトを復元するため、broker に侵入できる攻撃者が悪意あるペイロードを送れば、worker 上でコードが実行されます。Celery 4.0 以降の既定は json で、accept_content=["json"] を明示的に固定して pickle を拒否してください(§4 で設定済み)。
② Redis を外部に晒さない/認証とTLSを付ける
Redis は既定で認証なし。インターネットに露出した無認証 Redis は、過去に大規模な侵害の温床になりました。ネットワークで隔離(VPC / セキュリティグループ)したうえで、認証とTLSを付けます。
# パスワード認証 + TLS(rediss://)
broker_url = "rediss://:STRONG_PASSWORD@redis.internal:6380/0"
broker_use_ssl = {"ssl_cert_reqs": "required"}
パスワードや接続文字列はコードに直書きせず、環境変数やシークレットマネージャから読み込むこと(CLAUDE.md の鉄則でもあります)。
③ タスク引数に秘密情報を載せない
これは見落とされがちな最重要点です。タスクの引数は broker(Redis)と result backend にシリアライズされて保存され、Flower の画面にも表示されます。 平文のパスワード・APIキー・カード番号・個人情報を引数に渡すと、それらが Redis に残り、監視画面に映り、結果として意図せず漏洩します。
# アンチパターン:秘密情報が Redis と Flower に残る
send_email.delay(to="user@example.com", api_key="sk_live_xxx", ...)
# 正しい:ID だけを渡し、秘密情報は worker 側で安全に取得する
send_email.delay(notification_id="ntf_123") # worker が ID から DB/Secrets を引く
原則:キューに流すのは「参照(ID)」であって「中身(秘密)」ではない。 あわせて result_expires で結果の寿命を絞り(§4)、PII を含む結果が長く残らないようにします。
13. よくある落とし穴(FAQ)
Q. タスクが二度実行されます。バグですか?
A. 仕様です。Celery は「最低1回」配信であり、再配信・worker クラッシュ・acks_late で重複が起こりえます。タスクを冪等に設計するのが正解です(§5)。
Q. eta/countdown で指定したタスクが何度も走ります。
A. visibility_timeout(既定1時間)を超える遅延が原因です(§3)。数分を超える予約は Beat か DB スケジューラを使ってください。
Q. 定期タスクが2回ずつ実行されます。 A. Beat が複数稼働しています(§7)。Beat は単一インスタンスに限定してください。
Q. メモリ使用量が時間とともに増え続けます。
A. worker_max_tasks_per_child / worker_max_memory_per_child で子プロセスを定期再生成します(§8)。
Q. 長いタスクを投げると他のタスクが遅延します。
A. prefetch による抱え込みです。長時間タスクは worker_prefetch_multiplier=1、可能なら worker を分けてルーティングします(§8)。
Q. 結果が取れない/消えています。
A. result backend 未設定、ignore_result=True、result_expires での失効、または Redis のキー退避(§3)を確認してください。
Q. broker が一時的に落ちると送信が失敗します。
A. OperationalError を捕捉し、DBアウトボックス等にフォールバックします(§9)。送信は「失敗しうる操作」として扱ってください。
まとめ — Celery + Redis を貫く設計原則
Celery + Redis は「.delay() で動く」フレームワークではなく、分散システムの配信セマンティクスと正面から向き合う設計を要求します。本記事を貫いた原則を整理します。
| 原則 | 本ガイドでの現れ方 |
|---|---|
| 正しさは構造で守る | 冪等性は DB の一意制約・ゲートウェイの冪等キー・原子的操作に委譲(§5) |
| 配信は「最低1回」と心得る | 二重実行を異常ではなく正常系として吸収する設計(§5) |
| 失敗を分類する | 再試行すべき失敗(一時的)と、すべきでない失敗(恒久的)を峻別(§6) |
| 雪崩を防ぐ | jitter 付き指数バックオフでサンダリングハードを回避(§6) |
| 単一の真実源(SSoT/DRY) | 設定は1モジュール、定期実行は単一 Beat、スケジュールはDBに集約(§4・§7) |
| 暴走を封じ込める | time limit・prefetch・子プロセス再生成で worker を守る(§8) |
| 可観測性 | Flower・inspect・イベントで「詰まり」を勘ではなく数字で捉える(§11) |
| キューは攻撃面 | json 限定・Redis 認証/TLS・引数に秘密を載せない(§12) |
| 使わない判断も技術選定 | 軽い処理・厳密順序・遠い未来予約には別の道具を選ぶ(§1) |
非同期処理の品質は、派手な機能ではなく**「何も起きなかったこと」——二重課金が起きない、メッセージが消えない、夜間バッチが静かに終わる**——で測られます。それを偶然ではなく設計で保証することが、本番運用に耐えるタスクキューの条件です。私は実際の金銭を扱う決済基盤で、この「分散下で正しさを構造に落とし込む」規律を本番二重課金0件という形で実証してきました(決済信頼性の事例)。
Celery + Redis を使った非同期基盤の設計・既存システムの信頼性改善・二重実行や定期実行まわりの障害解消にお困りであれば、設計段階からお力になれます。「まず動かす」から「本番に耐える」へ——その距離を、最短で正確に詰めるお手伝いをします。