導入:リクエストの中で全部やろうとすると本番は壊れる
Flask のビュー関数は、HTTP リクエストを受けてレスポンスを返すまでの 限られた時間予算の中で動きます。ところが実際のアプリには、その予算に収まらない仕事が山ほどあります。請求書 PDF の生成、確認メールの送信、CSV エクスポート、外部 API への Webhook 送出、画像のサムネイル生成、夜間の集計バッチ——これらをビューの中で同期実行すると、ユーザーは数秒〜数十秒スピナーを見せられ、Gunicorn のワーカーは 1 本占有され、タイムアウトやリトライ嵐で本番が静かに詰まります。
解決策は単純で、しかし設計を間違えると牙を剥きます。「重い仕事をリクエストの外(別プロセス)に逃がす」——これがバックグラウンドタスク(ジョブキュー)です。Python/Flask での事実上の標準が Celery であり、そのメッセージ配送を担うのが Redis(または RabbitMQ)です。
本記事は、Flask 本番運用ガイド(ピラー) のスポークとして、Flask 3.1 系+ Celery 5.x + Redis でバックグラウンドタスクを本番品質に組み上げる設計を、Flask 公式の Celery 統合パターンと Celery 公式ドキュメントに忠実な実コードで通します。中心テーマは 3 つ——(1) Flask コンテキスト統合(タスク内で db や current_app を使える状態を作る)、(2) 冪等性(タスクは二重実行されうるという前提)、(3) 回復性(リトライ・クラッシュ安全・グレースフルシャットダウン)。この 3 つを外すと、ジョブキューは「動いているように見えて、たまにデータを壊すブラックボックス」になります。
筆者は、経済産業大臣賞を受賞した B2B SaaS のバックエンドを Python / Flask / SQLAlchemy / PostgreSQL で設計・実装し、その Celery ワーカーを ECS(Fargate) 上で本番運用してきました。請求・通知・エクスポートといった「失敗してはいけないが、リクエストの中ではやれない」処理を、二重課金や通知の取りこぼしを起こさずに捌くために必要だったのは、まさにこの冪等性と回復性の規律でした。本記事はその実戦の設計判断を、公式仕様に照らして言語化したものです。
💡 この記事で扱うバージョン:Celery 5.x(執筆時点の最新は 5.6.3) と Flask 3.1 系を前提とします。Celery は別途
pip install celeryでインストールします(Flask の依存ではありません)。ブローカーは Redis を主役にしますが、データ損失への耐性が要る箇所では RabbitMQ も検討対象として正直に扱います。
1. なぜバックグラウンドタスクか:構成要素と「別プロセス」という大原則
1.1 リクエストの外に逃がすべき仕事
まず判断基準を明確にします。ビューの中で同期実行してよいのは、ユーザーの待ち時間に収まり、かつ失敗してもその場でユーザーに返せる仕事だけです。次のような処理は、原則としてバックグラウンドに逃がします。
| 種類 | 例 | 逃がす理由 |
|---|---|---|
| 通知 | 確認メール・Slack 通知・プッシュ通知 | 外部 SMTP/API が遅い・落ちる。レスポンスを待たせない |
| 生成 | 請求書 PDF・帳票・サムネイル・動画変換 | CPU/IO が重く、秒〜分かかる |
| エクスポート | 大量データの CSV/Excel 生成・ZIP 化 | 行数に比例して伸び、リクエスト予算を超える |
| 連携 | Webhook 送出・サードパーティ同期 | 相手が遅延・失敗する。リトライが要る |
| 定期実行 | 夜間集計・リマインダー・期限切れ処理 | そもそもリクエストが存在しない(cron 的) |
1.2 4 つの構成要素
Celery によるジョブキューは、次の 4 つの登場人物で成り立ちます。
| 役割 | 実体 | 担当 |
|---|---|---|
| Web プロセス | Gunicorn + Flask app | タスクを「投入」する(.delay())。結果は待たない |
| ブローカー(必須) | Redis または RabbitMQ | タスクメッセージを貯めて配送するキュー |
| ワーカープロセス | celery -A make_celery worker | キューからタスクを取り出して「実行」する |
| 結果バックエンド(任意) | Redis / RPC / DB など | タスクの結果・状態を保存する(要れば) |
Celery 公式は明確に「Celery には メッセージブローカーが必要で、RabbitMQ か Redis を使える。結果バックエンドは任意」と述べています。Redis については「feature-complete だが、RabbitMQ よりデータ損失に弱い」という注意書きがあり、これは後述する回復性設計で必ず効いてきます。
⚠️ 最重要の大原則:ワーカーは Gunicorn とは別のプロセス/別のコンテナ。これは初学者が最もつまずく点です。
celery workerは Web サーバー(Gunicorn)とは独立したプロセスで、別個に起動・スケール・デプロイします。Web をいくらスケールしてもタスクは捌けませんし、逆もまた然り。コンテナ構成では「web コンテナ」「worker コンテナ」「redis コンテナ」を分けます(§7 で Docker Compose を示します)。この分離は 本番デプロイガイド で扱った「WSGI アプリと WSGI サーバーの分離」と同じ思想——役割ごとにプロセスを分ける——の延長線上にあります。
2. Flask 公式の Celery 統合:celery_init_app を深掘りする
ここが本記事の技術的な核です。Flask 公式(flask.palletsprojects.com の patterns/celery/)が示す統合は、「タスクを実行するときに Flask のアプリケーションコンテキストを自動で張る」ことを目的にしています。なぜそれが要るのか——タスクの中でも db.session や current_app.config、g を使いたいからです。Flask のこれらはアプリケーションコンテキストの上でしか動きません(この仕組みの詳細は コンテキスト徹底解説 を参照)。リクエストの外で動くワーカーには、デフォルトではそのコンテキストが存在しません。
2.1 FlaskTask:app_context を張る Task サブクラス
公式の統合関数がこれです。一行も足さず、verbatim の構造で示します。
# myapp/celery_app.py
from celery import Celery, Task
from flask import Flask
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
公式の説明(verbatim 訳)はこうです——「この Task サブクラスは、タスク関数を Flask のアプリケーションコンテキストが有効な状態で自動的に実行する。これにより、データベース接続のようなサービスがタスク内で利用可能になる」。一行ずつの意味を押さえます。
class FlaskTask(Task)… Celery のTaskを継承し、__call__を上書きする。タスクが呼ばれるたびにwith app.app_context():で囲んでから本体self.run(...)を呼ぶ。これでタスク内が「アプリケーションコンテキストあり」になり、db.session/current_app/gが使える。Celery(app.name, task_cls=FlaskTask)… このFlaskTaskを全タスクの既定クラスに据える。config_from_object(app.config["CELERY"])… Celery の設定を Flask config のCELERYキーから読む。設定が Flask 側に一本化され、二重管理が消える。celery_app.set_default()… この Celery アプリを「カレントアプリ」に設定する。これが後述の@shared_taskがリクエスト中にこのインスタンスを見つける鍵になる。app.extensions["celery"] = celery_app… 拡張レジストリに格納する。ワーカーのエントリポイント(§4)がここから Celery アプリを取り出す。
2.2 アプリケーションファクトリへの結線
celery_init_app は、大規模構成ガイド で確立した create_app ファクトリの中で呼びます。公式のファクトリ結線が次です。
# myapp/__init__.py
from flask import Flask
from myapp.celery_app import celery_init_app
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
ポイントは 2 つ。(1) Celery 設定は CELERY という 1 つのキーの中に辞書としてまとめる。broker_url がブローカー(Redis)、result_backend が結果保存先、task_ignore_result=True は「結果が要らないタスクは結果を保存しない」既定(§3.4 でトレードオフを論じます)。(2) app.config.from_prefixed_env() で FLASK_CELERY__BROKER_URL のような環境変数からも上書きできる——つまり接続文字列をコードに焼き込まず、本番では環境変数で注入できます。秘密情報の扱いはピラーと 本番デプロイガイド の方針(FLASK_ 前綴り環境変数)に揃います。
💡 なぜ
with app.app_context()が「タスクのたびに」必要か。リクエストハンドラと違い、ワーカーは長時間生きるプロセスで、1 本のワーカーが何千ものタスクを順に処理します。コンテキストはタスク単位で開いて閉じるべきで、FlaskTask.__call__のwithブロックがちょうどそのスコープを作っています。これによりdb.sessionがタスクごとに正しく確立・破棄され、セッションのリークや別タスクへの状態漏れを防げます。
3. タスクを定義し、呼び出し、結果を取る
3.1 @shared_task を使う(@celery_app.task ではない)
ファクトリパターンでは、タスクの定義に @shared_task を使うことが公式に強く推奨されています。理由(verbatim 訳)はこうです——「@celery_app.task は celery_app オブジェクトへのアクセスを必要とするが、ファクトリパターンではそれが利用できない。また、それで装飾したタスクは特定の Flask/Celery インスタンスに縛られてしまう。代わりに Celery の @shared_task を使う。これは『カレントアプリが何であれ、それにアクセスする』タスクオブジェクトを作る」。
# myapp/tasks.py
from celery import shared_task
@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
return a + b
@shared_task は特定の celery_app を import しないため、循環 import を生まず、Blueprint やモジュール分割と綺麗に共存します。celery_init_app が呼んだ set_default() のおかげで、add_together.delay(...) は実行時に「カレントの Celery アプリ」を見つけてそこへタスクを投入します。
3.2 タスクを投入する:.delay()
ビューからの投入は .delay() です。結果は待たず、task_id を即座に返すのが定石です。
# myapp/views.py
from flask import request
from myapp.tasks import add_together
@app.post("/add")
def start_add():
a = request.json["a"]
b = request.json["b"]
result = add_together.delay(a, b)
return {"result_id": result.id}
.delay(a, b) はタスクメッセージをブローカー(Redis)に積み、すぐ返ります。ユーザーには result_id を返し、後で結果を問い合わせてもらう——この「投入と取得の分離」が非同期の本質です。
3.3 結果を取る:AsyncResult
結果や状態は result_id(タスク ID)から AsyncResult で引きます。公式のポーリング用エンドポイントが次です。
# myapp/views.py
from celery.result import AsyncResult
@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
result = AsyncResult(id)
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result if result.ready() else None,
}
フロントは result_id を持って /result/<id> をポーリングし、ready が true になったら value を読みます。apply_async で投入時刻や遅延を制御する方法は次節です。
3.4 .apply_async():遅延・予約・キュールーティング
.delay() は .apply_async() の薄いラッパーです。細かな制御が要るときは apply_async を使います。
# 30秒後に実行(リトライ間隔の調整やデバウンスに)
add_together.apply_async(args=[2, 3], countdown=30)
# 特定時刻に実行(予約配信・スケジュール送信)
from datetime import datetime, timedelta, timezone
add_together.apply_async(args=[2, 3], eta=datetime.now(timezone.utc) + timedelta(hours=1))
# キューを分けてルーティング(重いタスクと軽い通知を別ワーカーで捌く)
generate_invoice_pdf.apply_async(args=[invoice_id], queue="pdf")
countdown/eta で遅延・予約、queue でキュールーティングができます。キューを分けると「PDF 生成のような重い処理が、メール送信のような軽い処理を詰まらせる」事故を防げます(ワーカーを -Q pdf で起動して専用に張る)。
3.5 task_ignore_result のトレードオフ
task_ignore_result=True(ファクトリの既定)は、結果バックエンドへの書き込みをスキップします。メール送信のような「投げっぱなしでよい」タスクには最適——Redis に無駄な結果を溜めず、コストも下がります。一方、/result/<id> で結果を返したいタスクは @shared_task(ignore_result=False) で個別に有効化します。
| 設定 | 結果の保存 | 向くタスク |
|---|---|---|
task_ignore_result=True(既定) | しない | 通知・Webhook・fire-and-forget |
@shared_task(ignore_result=False) | する | エクスポート・計算結果を返す・進捗を見せる |
⚠️ 結果を保存する場合、結果バックエンドは無限に膨らみます。Redis をバックエンドにすると古い結果がメモリを食い続けるので、
result_expires(既定 1 日)で TTL を必ず設定してください(§5.4)。
4. ワーカーのエントリポイントと起動
ワーカーは Flask アプリとは別に、make_celery.py という薄いエントリポイントから Celery アプリを取り出して起動します。公式の verbatim 構造です。
# make_celery.py
from myapp import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
create_app() を呼んで Flask アプリを組み立て、app.extensions["celery"] に格納しておいた Celery アプリを celery_app として公開します。celery -A make_celery の -A(app)がこの celery_app を見つけます。
# ワーカーを起動(INFO ログ)
celery -A make_celery worker --loglevel INFO
# 定期実行スケジューラ(Beat)を起動(§6)
celery -A make_celery beat --loglevel INFO
worker がタスクを実行するプロセス、beat が cron 的にタスクを投入するスケジューラです。両者は別プロセスで、本番ではコンテナも分けます(§6・§7)。
5. 本番品質の回復性:ここが負債と資産の分水嶺
ここからが、デモと本番を分ける部分です。ジョブキューは「動かす」だけなら 30 分で書けますが、本番で壊れないように動かすには、冪等性・リトライ・クラッシュ安全・引数設計を正しく組む必要があります。
5.1 冪等性:タスクは二重実行されうるという前提
最重要の前提です。Celery(および一般的なメッセージキュー)の配送保証は at-least-once(最低 1 回) です。つまり、同じタスクが 2 回以上実行されることが正常系として起こりえます。ワーカーがタスク完了直後・ack 送信前にクラッシュすれば、ブローカーは「未完了」とみなして再配送するからです。
したがって、タスクは冪等(idempotent:何回実行しても結果が同じ)に設計しなければなりません。これは SQS/Lambda の冪等処理ガイド で扱った原則とまったく同じで、キューの種類が変わっても本質は不変です。
# myapp/tasks.py
from celery import shared_task
from myapp import db
from myapp.models import EmailLog, User
@shared_task(ignore_result=True)
def send_welcome_email(user_id: int, idempotency_key: str) -> None:
# 冪等キーで「すでに送ったか」を原子的にチェック&記録する
existing = db.session.execute(
db.select(EmailLog).filter_by(idempotency_key=idempotency_key)
).scalar_one_or_none()
if existing is not None:
return # 二重実行:何もしない(冪等)
user = db.session.get(User, user_id)
if user is None:
return # ユーザーが消えている:静かに終える
send_email(to=user.email, template="welcome")
db.session.add(EmailLog(idempotency_key=idempotency_key, user_id=user_id))
db.session.commit() # 送信の事実を記録。次の実行はここで弾かれる
冪等化の定番手は 3 つです。
| 手法 | やり方 | 向く場面 |
|---|---|---|
| 冪等キー+台帳 | 一意キーで「実行済み」を記録し、重複は早期 return | メール・通知・課金など「2 回やるとまずい」副作用 |
| upsert | INSERT ... ON CONFLICT DO UPDATE で挿入も更新も同結果に | レコードの作成・更新 |
| 重複排除(dedup) | 投入時にタスク ID をキーにロック/既存ジョブを抑止 | 同一トリガが短時間に連発するケース |
💡 二重課金 0 件は冪等性で守る。筆者の B2B SaaS では、課金・通知という「2 回やったら事故」の処理を Celery で捌いていました。冪等キーを一意制約付きの台帳テーブルに記録し、
commitが走った時点で再実行を構造的に弾く——この設計があったから、ワーカーが再配送を受けても本番で二重課金は発生しませんでした。「リトライしても安全」は祈りではなく、テーブルの一意制約で保証するものです。
5.2 リトライ:autoretry_for + バックオフ+ジッタ
外部依存(SMTP・サードパーティ API)は一時的に失敗します。失敗したら指数バックオフ+ジッタで自動リトライするのが定石です。これは リトライ・バックオフ・サーキットブレーカ で論じた回復性パターンを、Celery のデコレータで宣言的に書いたものです。
# myapp/tasks.py
from celery import shared_task
from requests.exceptions import RequestException
@shared_task(
autoretry_for=(RequestException,), # この例外なら自動リトライ
retry_backoff=True, # 指数バックオフ(2,4,8,16…秒)
retry_backoff_max=600, # 上限600秒
retry_jitter=True, # ばらつきを足してthundering herdを防ぐ
max_retries=5, # 5回で打ち切り
)
def deliver_webhook(endpoint_id: int, payload_id: int) -> None:
endpoint = db.session.get(WebhookEndpoint, endpoint_id)
payload = db.session.get(WebhookPayload, payload_id)
resp = requests.post(endpoint.url, json=payload.body, timeout=10)
resp.raise_for_status() # 5xx等で RequestException → autoretry
| パラメータ | 役割 | 推奨 |
|---|---|---|
autoretry_for | リトライ対象の例外型 | 一時障害のみ。ValueError 等の恒久エラーは入れない |
retry_backoff | 指数バックオフ | True。相手を連打しない |
retry_jitter | ばらつき注入 | True。同時失敗の同時再送(herd)を散らす |
max_retries | 最大試行回数 | 有限に。無限リトライは死を呼ぶ |
⚠️ 恒久エラーをリトライしてはいけない。バリデーション失敗(不正な入力)や 404 のような「何回やっても失敗する」エラーをリトライ対象に入れると、ワーカーを無駄に焼き続けます。
autoretry_forはネットワーク・タイムアウト・5xx のような一時障害だけに絞り、恒久エラーは即座に失敗(または後述の dead-letter)に倒します。
5.3 クラッシュ安全:acks_late + task_reject_on_worker_lost
デフォルトの Celery は、タスクを受け取った瞬間に ack(確認応答)します。これは「ワーカーが処理中にクラッシュすると、そのタスクは失われる」ことを意味します。失ってはいけないタスクには acks_late を使い、タスクが完了してから ack させます。
# myapp/celeryconfig.py(CELERY 辞書に展開してもよい)
task_acks_late = True # 完了後にackする(クラッシュで失わない)
task_reject_on_worker_lost = True # ワーカー消失時はタスクを再キューする
worker_prefetch_multiplier = 1 # 1本ずつ取り、長時間タスクの抱え込みを防ぐ
acks_late=True と task_reject_on_worker_lost=True の組み合わせで、ワーカーが OOM や SIGKILL で消えてもタスクは再配送されます。ただしこれは**「タスクが二重実行される」可能性を上げる設計**でもある——だからこそ §5.1 の冪等性が前提条件になります。冪等性なしに acks_late を入れてはいけません(二重実行が事故に直結する)。
Redis をブローカーにする場合、再配送のタイミングは visibility timeout で決まります。これは「ワーカーが取り出したタスクを、この秒数 ack しなければ別ワーカーに再配送する」しきい値です。
# Redisブローカー固有:visibility timeout(既定3600秒)
broker_transport_options = {"visibility_timeout": 3600}
⚠️ visibility timeout はタスクの最長実行時間より長くする。たとえば動画変換に 1 時間かかるのに visibility timeout が 1800 秒だと、まだ実行中のタスクが「失われた」と誤判定されて二重に走ります。最長タスクの所要時間+余裕を見て設定してください。これは RabbitMQ には無い Redis 特有の落とし穴です。
5.4 引数は ID を渡す(オブジェクトを渡さない)
公式が太字で警告する設計原則です——「ユーザーオブジェクトではなく、ユーザーの id を渡せ(Pass the user's id rather than the user object)」。タスク引数はシリアライズ可能でなければならず、SQLAlchemy のモデルオブジェクトを .delay(user) のように渡してはいけません。
# ❌ 悪い:モデルオブジェクトを渡す(シリアライズできない/古い状態が固定される)
send_welcome_email.delay(user)
# ✅ 良い:IDを渡し、タスク内のapp_contextで「いま」の状態を再クエリする
send_welcome_email.delay(user.id, idempotency_key=str(uuid4()))
理由は 2 つ。(1) シリアライズできない——タスクは JSON 等でエンコードされてブローカーを通るので、ORM オブジェクトは載らない。(2) 状態が古くなる——投入時点のオブジェクトをそのまま運んでも、ワーカーが実行する頃には DB の値が変わっているかもしれない。だからID だけ渡し、タスク内で db.session.get(User, user_id) して「いま」の状態を取り直す。FlaskTask が app_context を張ってくれているおかげで、この再クエリがタスク内で自然に書けます(§2.1 の効能がここで効きます)。
💡 DB セッションはタスク単位で。
FlaskTask.__call__がタスクごとに app_context を開閉するため、Flask-SQLAlchemy のdb.sessionもタスク単位でスコープされます。タスクの最後にdb.session.commit()(または例外時にrollback())を明示し、セッションを跨いだ状態漏れを防いでください。
5.5 失敗の行き先:dead-letter と Redis のデータ損失
max_retries を使い切ったタスクは「失敗」で終わります。本番では失敗を握り潰さず、行き先を作る——Celery では on_failure フックや、失敗時に別の「dead-letter」テーブル/キューへ退避させる設計が定石です。
@shared_task(bind=True, max_retries=5, autoretry_for=(RequestException,))
def deliver_webhook(self, endpoint_id: int, payload_id: int) -> None:
try:
... # 本体
except RequestException:
if self.request.retries >= self.max_retries:
record_dead_letter(endpoint_id, payload_id) # 退避して可視化
raise # autoretry に委ねる
そして冒頭で触れた正直な注意——Redis はブローカーとして「データ損失に弱い」。Redis が再起動・フェイルオーバーすると、永続化設定によってはキュー上の未処理タスクが消える可能性があります。
⚠️ 「絶対に失ってはいけないタスク」は RabbitMQ を検討する。請求・法的通知のように 1 通の取りこぼしも許されない処理では、配送保証の堅い RabbitMQ をブローカーに選ぶのが誠実です。Redis を使うなら AOF 永続化を有効化し、
acks_late+冪等性で「消えても再投入で回復できる」状態を作る——どちらにせよ、「Redis だから時々消える」を前提に設計してください。これは Celery を美化せず、自分の本番で実際に意識したトレードオフです。
6. 定期実行:Celery Beat
cron 的な定期ジョブ(夜間集計・リマインダー・期限切れ処理)は Celery Beat で投入します。Beat は「スケジューラ」で、決められた時刻にタスクメッセージをキューへ積むだけ——実行するのはワーカーです。
# CELERY 辞書に beat_schedule を足す
from celery.schedules import crontab
CELERY = dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
beat_schedule={
"expire-trials-nightly": {
"task": "myapp.tasks.expire_trials",
"schedule": crontab(hour=3, minute=0), # 毎日03:00
},
"send-reminders-hourly": {
"task": "myapp.tasks.send_due_reminders",
"schedule": crontab(minute=0), # 毎時0分
},
},
)
起動は別プロセスです。
celery -A make_celery beat --loglevel INFO
⚠️ Beat は 1 プロセスだけ動かす。Beat を冗長化のつもりで 2 つ起動すると、同じスケジュールが二重に投入され、ジョブが 2 回走ります。Beat は単一スケジューラで動かし、コンテナ基盤では「beat はレプリカ数 1」に固定してください。そして §5.1 の通り、Beat が投入するタスク自身も冪等に作る——「二重投入されても、二重実行されても安全」を二重の防壁にします。
7. デプロイと可観測性:別コンテナ・監視・相関ログ
7.1 ワーカーは別コンテナ:Docker Compose
§1.2 の大原則を構成図に落とすと、こうなります。web・worker・beat・redis を分け、同じイメージを別コマンドで起動するのが定石です(同一コードベースなのでイメージは共有)。
# compose.yaml
services:
redis:
image: redis:7-alpine
restart: unless-stopped
web:
build: .
command: gunicorn -w 4 -b 0.0.0.0:8000 "make_celery:flask_app"
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
depends_on: [redis]
ports: ["8000:8000"]
worker:
build: .
command: celery -A make_celery worker --loglevel INFO --concurrency 4 --max-tasks-per-child 100
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
depends_on: [redis]
beat:
build: .
command: celery -A make_celery beat --loglevel INFO
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
depends_on: [redis]
web は Gunicorn(本番デプロイガイド の設計をそのまま流用)、worker と beat は Celery、redis はブローカー。ECS(Fargate) ではこれが「web サービス」「worker サービス」「beat サービス(レプリカ 1)」の 3 サービスに対応します。
7.2 ワーカーのチューニング
| フラグ | 役割 | 指針 |
|---|---|---|
--concurrency N | ワーカープロセス内の並列数 | CPU バウンドなら ≒ vCPU 数、IO バウンドなら多め |
worker_prefetch_multiplier | 1 度に取り込むタスク数 | 長時間タスクは 1(抱え込みと不公平を防ぐ) |
--max-tasks-per-child N | この回数で子プロセスを再起動 | メモリリーク対策。PDF/画像処理など重い依存で有効 |
--max-tasks-per-child は、サードパーティライブラリのメモリリークでワーカーが太り続ける問題への現実的な防御です。N タスクごとに子プロセスを作り直し、メモリを定期的に解放します。
7.3 監視:Flower と相関ログ
可観測性は本番運用の生命線です。Celery には Flower という Web ダッシュボードがあり、タスクの成功/失敗、実行時間、キュー長、ワーカーの稼働をリアルタイムに見られます。
pip install flower
celery -A make_celery flower --port=5555
加えて、エラー処理・可観測性ガイド で確立した構造化ログ+リクエスト ID 相関を、タスク側にも延長します。タスクログには task_id を必ず載せる——そうすれば「ユーザーのリクエスト → 投入されたタスク → ワーカーでの実行」を一本の線で追えます。
import logging
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True)
def generate_invoice_pdf(self, invoice_id: int) -> None:
# self.request.id が task_id。構造化ログに必ず含める
logger.info("invoice pdf start", extra={"task_id": self.request.id, "invoice_id": invoice_id})
...
logger.info("invoice pdf done", extra={"task_id": self.request.id, "invoice_id": invoice_id})
7.4 グレースフルシャットダウン(warm shutdown)
デプロイのたびにワーカーを落とすとき、処理中のタスクを途中で殺してはいけません。Celery のワーカーに SIGTERM を送ると warm shutdown——新規タスクの受け取りを止め、実行中のタスクを最後まで終えてからプロセスを終了します。
ECS のローリングデプロイでは、コンテナ停止時に SIGTERM が飛び、stopTimeout(既定 30 秒、必要なら延長)の猶予内でワーカーが warm shutdown を完了します。これは 本番デプロイガイド で扱った Gunicorn のグレースフルシャットダウンと同じ思想——「処理中のものを捌き切ってから落とす」——をワーカー側に適用したものです。長時間タスクがある場合は stopTimeout をその最長所要時間より長く設定してください。
8. 誠実な代替:Celery が過剰なとき
Celery は強力ですが、すべての非同期処理に Celery が要るわけではありません。技術的負債を避ける最良の判断は、しばしば「使わない」ことです。
| 選択肢 | 向く場面 | トレードオフ |
|---|---|---|
| Celery + Redis/RabbitMQ | リトライ・スケジュール・キュールーティング・大量タスクが要る本格運用 | 構成要素が多い(ブローカー+ワーカー+beat+監視) |
| RQ(Redis Queue) | シンプルなバックグラウンド処理。Redis だけで完結したい | Celery より機能が薄い(高度なルーティング等は弱い) |
| マネージドキュー(SQS + Lambda) | サーバーレスで運用負荷を下げたい。AWS 前提 | ベンダーロックイン。Flask プロセス外で実行 |
- 要件が「メールを 1 通、非同期で送りたい」程度なら、Celery はオーバーキルです。
RQのような軽量ライブラリで十分なことが多く、ブローカー+ワーカー+ beat + Flower という運用一式を背負う必要はありません。 - AWS 前提でサーバーレスに寄せたいなら、SQS + Lambda という選択も誠実です。ワーカーコンテナを自前運用せず、冪等処理の設計は SQS/Lambda/EventBridge の冪等非同期処理ガイド に委ねられます。冪等性という核は Celery でも SQS でも変わりません。
- Celery の設計をもっと深掘りしたい場合は、Flask に閉じない一般的な Celery/Redis 本番パターンを Celery × Redis 本番ジョブキューガイド にまとめています。
判断基準はシンプルです——「リトライ/スケジュール/キュー分離/監視のどれかが本当に要るか」。要るなら Celery、要らないなら RQ かマネージドキュー。要件が育ってから Celery に移行しても遅くありません(YAGNI)。
まとめ:Celery 本番チェックリスト
Flask × Celery × Redis を本番品質で動かす要点は、コンテキスト統合・冪等性・回復性の 3 点に集約されます。Flask 公式の celery_init_app が前者を、冪等キーと acks_late/リトライ設計が後の 2 つを支えます。最後に、本番投入前の確認リストで締めます。
| # | チェック項目 | 根拠(本記事) |
|---|---|---|
| 1 | ワーカーを Gunicorn とは別プロセス/別コンテナで起動しているか | §1.2 / §7.1 |
| 2 | celery_init_app の FlaskTask で app_context を張り、タスク内で db/current_app が使えるか | §2.1 |
| 3 | 設定を Flask config の CELERY キーに集約し、接続文字列を環境変数で注入しているか | §2.2 |
| 4 | タスク定義に @celery_app.task ではなく @shared_task を使っているか | §3.1 |
| 5 | 投入は .delay()/.apply_async()、取得は AsyncResult、結果不要なら task_ignore_result | §3.2–3.5 |
| 6 | at-least-once を前提に、全タスクを冪等に設計したか(冪等キー/upsert/dedup) | §5.1 |
| 7 | 一時障害に autoretry_for+retry_backoff+retry_jitter+max_retries、恒久エラーは即失敗 | §5.2 |
| 8 | 失ってはいけないタスクに acks_late+task_reject_on_worker_lost(冪等性が前提) | §5.3 |
| 9 | Redis ブローカーの visibility timeout を最長タスク時間より長く設定したか | §5.3 |
| 10 | タスク引数は ID(int)を渡し、タスク内で再クエリしているか(モデルオブジェクト禁止) | §5.4 |
| 11 | 結果を保存するなら result_expires で TTL、失敗は dead-letter に退避しているか | §3.5 / §5.5 |
| 12 | Redis のデータ損失を理解し、致命タスクは RabbitMQ/AOF 永続化+冪等で回復可能にしたか | §5.5 |
| 13 | Beat はレプリカ 1、Beat が投入するタスクも冪等か | §6 |
| 14 | --max-tasks-per-child でリーク対策、prefetch=1 で長時間タスクの抱え込み防止 | §7.2 |
| 15 | Flower 監視+構造化ログに task_id 相関を載せているか | §7.3 |
| 16 | デプロイ時に SIGTERM で warm shutdown、stopTimeout が最長タスク時間より長いか | §7.4 |
| 17 | そもそも Celery が過剰でないか(RQ/SQS+Lambda で足りないか)を検討したか | §8 |
ジョブキューの本当の難しさは、Celery の使い方ではなく、「タスクは二重に走り、ワーカーは落ち、Redis は時々データを失う」という現実を前提に、それでもデータが壊れない設計を組むことにあります。筆者の B2B SaaS で二重課金 0 件を守れたのは、Celery が優秀だったからではなく、冪等キーという一意制約に「再実行しても安全」を保証させたからでした。Flask のコンテキスト統合という土台の上に、冪等性と回復性を載せる——この順番で組めば、ジョブキューは本番の資産になります。
Flask 全体の本番設計は Flask 本番運用ガイド を、コンテキストの仕組みは コンテキスト徹底解説 を、デプロイは 本番デプロイガイド を、それぞれ起点にしてください。