Introduction: Trying to Do Everything Inside the Request Breaks Production
A Flask view function runs within the limited time budget from receiving an HTTP request to returning a response. Yet a real app has heaps of work that won't fit within that budget. Generating invoice PDFs, sending confirmation emails, exporting CSVs, dispatching webhooks to external APIs, generating image thumbnails, nightly aggregation batches — run these synchronously inside a view, and the user is shown a spinner for seconds to tens of seconds, a Gunicorn worker is monopolized for one of them, and production silently clogs with timeouts and retry storms.
The solution is simple, yet bares its fangs if you get the design wrong. "Offload heavy work outside the request (to a separate process)" — this is a background task (job queue). The de facto standard in Python/Flask is Celery, and what carries its message delivery is Redis (or RabbitMQ).
This article, as a spoke of the Flask production-operations guide (pillar), runs through the design of assembling background tasks at production quality with Flask 3.1.x + Celery 5.x + Redis, using real code faithful to Flask's official Celery-integration pattern and the Celery official documentation. There are 3 central themes — (1) Flask context integration (creating a state where db and current_app are usable inside a task), (2) idempotency (the premise that a task can run twice), and (3) resilience (retries, crash safety, graceful shutdown). Miss these 3 and the job queue becomes "a black box that looks like it's working but occasionally corrupts data."
The author designed and implemented the backend of a B2B SaaS that won the Minister of Economy, Trade and Industry Award in Python / Flask / SQLAlchemy / PostgreSQL, and ran its Celery workers in production on ECS (Fargate). To handle processing like billing, notifications, and exports — "must not fail, but can't be done inside the request" — without double charges or dropped notifications, what was needed was precisely this discipline of idempotency and resilience. This article verbalizes the design judgments from that field experience, against the official specifications.
💡 The versions covered in this article: it assumes Celery 5.x (the latest at the time of writing is 5.6.3) and Flask 3.1.x. Celery is installed separately via
pip install celery(it is not a dependency of Flask). Redis takes the lead as the broker, but where resistance to data loss is required, RabbitMQ is treated honestly as a candidate too.
1. Why Background Tasks: The Components and the Great Principle of a "Separate Process"
1.1 Work That Should Be Offloaded Outside the Request
First, let me make the criterion clear. The only work you may run synchronously inside a view is work that fits within the user's wait time and that, even if it fails, can be returned to the user on the spot. Processing like the following is offloaded to the background as a rule.
| Kind | Examples | Why offload |
|---|---|---|
| Notification | Confirmation email, Slack notification, push notification | External SMTP/API is slow / goes down. Don't make the response wait |
| Generation | Invoice PDF, reports, thumbnails, video transcoding | CPU/IO heavy, takes seconds to minutes |
| Export | CSV/Excel generation of large data, ZIP packaging | Grows in proportion to row count, exceeds the request budget |
| Integration | Webhook dispatch, third-party sync | The other side is delayed / fails. Retries are needed |
| Periodic | Nightly aggregation, reminders, expiry processing | No request exists in the first place (cron-like) |
1.2 The 4 Components
A job queue with Celery is made up of the following 4 actors.
| Role | Concrete form | Responsibility |
|---|---|---|
| Web process | Gunicorn + Flask app | "Enqueues" tasks (.delay()). Does not wait for the result |
| Broker (required) | Redis or RabbitMQ | The queue that stores and delivers task messages |
| Worker process | celery -A make_celery worker | Pulls tasks from the queue and "executes" them |
| Result backend (optional) | Redis / RPC / DB, etc. | Stores task results and states (if needed) |
The Celery docs state clearly that "Celery requires a message broker, and you can use RabbitMQ or Redis. The result backend is optional." On Redis, there is a note that it is "feature-complete but more susceptible to data loss than RabbitMQ," and this will definitely matter in the resilience design discussed later.
⚠️ The most important great principle: the worker is a process / container separate from Gunicorn. This is where beginners stumble most.
celery workeris a process independent of the web server (Gunicorn), and you start, scale, and deploy it separately. No matter how much you scale the web, tasks won't get processed, and vice versa. In a container setup you separate the "web container," "worker container," and "redis container" (§7 shows a Docker Compose). This separation is on the same line of thought as the "separation of the WSGI app and the WSGI server" covered in the production deployment guide — split processes by role.
2. Flask's Official Celery Integration: A Deep Dive into celery_init_app
This is the technical core of the article. The integration shown by the Flask official (flask.palletsprojects.com's patterns/celery/) aims to "automatically stretch a Flask application context when running a task." Why is that needed? Because inside a task you want to use db.session, current_app.config, and g. These Flask features work only on top of an application context (for details of this mechanism, see the context deep-dive). A worker running outside the request has no such context by default.
2.1 FlaskTask: A Task Subclass That Stretches the app_context
This is the official integration function. I present it verbatim in structure, without adding a single line.
# myapp/celery_app.py
from celery import Celery, Task
from flask import Flask
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
The official explanation (translated verbatim) is: "This Task subclass automatically runs the task function with a Flask application context active. This makes services like database connections available inside the task." Let me pin down the meaning line by line.
class FlaskTask(Task)… inherits Celery'sTaskand overrides__call__. Every time a task is called, it wraps it inwith app.app_context():before calling the bodyself.run(...). This makes the inside of the task "with an application context," sodb.session/current_app/gare usable.Celery(app.name, task_cls=FlaskTask)… sets thisFlaskTaskas the default class for all tasks.config_from_object(app.config["CELERY"])… reads Celery's config from theCELERYkey of Flask config. The config is unified on the Flask side, and double management disappears.celery_app.set_default()… sets this Celery app as the "current app." This is the key by which the@shared_taskdescribed later finds this instance during a request.app.extensions["celery"] = celery_app… stores it in the extension registry. The worker's entry point (§4) retrieves the Celery app from here.
2.2 Wiring Into the Application Factory
celery_init_app is called inside the create_app factory established in the large-app structure guide. The official factory wiring follows.
# myapp/__init__.py
from flask import Flask
from myapp.celery_app import celery_init_app
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app
There are 2 points. (1) Celery config is bundled as a dict inside the single key CELERY. broker_url is the broker (Redis), result_backend is where results are stored, and task_ignore_result=True is the default of "don't store results for tasks that don't need them" (§3.4 discusses the trade-off). (2) With app.config.from_prefixed_env(), you can also override from environment variables like FLASK_CELERY__BROKER_URL — i.e., you can inject connection strings via environment variables in production rather than baking them into code. The handling of secrets aligns with the pillar's and the production deployment guide's policy (FLASK_-prefixed environment variables).
💡 Why
with app.app_context()is needed "for every task." Unlike a request handler, a worker is a long-lived process, and one worker processes thousands of tasks in sequence. The context should be opened and closed per task, andFlaskTask.__call__'swithblock creates exactly that scope. This waydb.sessionis correctly established and torn down per task, preventing session leaks and state bleed into other tasks.
3. Defining, Invoking, and Retrieving Results of Tasks
3.1 Use @shared_task (Not @celery_app.task)
In the factory pattern, using @shared_task to define tasks is strongly recommended officially. The reason (translated verbatim) is: "@celery_app.task requires access to the celery_app object, which is not available in the factory pattern. Also, tasks decorated with it become bound to a specific Flask/Celery instance. Instead, use Celery's @shared_task. This creates task objects that 'access whatever the current app is.'"
# myapp/tasks.py
from celery import shared_task
@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
return a + b
Because @shared_task doesn't import a specific celery_app, it creates no circular imports and coexists cleanly with Blueprints and module splitting. Thanks to the set_default() that celery_init_app called, add_together.delay(...) finds the "current Celery app" at execution time and enqueues the task there.
3.2 Enqueue a Task: .delay()
Enqueuing from a view is .delay(). The standard is to not wait for the result and return the task_id immediately.
# myapp/views.py
from flask import request
from myapp.tasks import add_together
@app.post("/add")
def start_add():
a = request.json["a"]
b = request.json["b"]
result = add_together.delay(a, b)
return {"result_id": result.id}
.delay(a, b) loads the task message onto the broker (Redis) and returns immediately. You return a result_id to the user and have them query for the result later — this "separation of enqueue and retrieval" is the essence of async.
3.3 Retrieve the Result: AsyncResult
The result or state is fetched from the result_id (task ID) via AsyncResult. The official polling endpoint follows.
# myapp/views.py
from celery.result import AsyncResult
@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
result = AsyncResult(id)
return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result if result.ready() else None,
}
The front end holds the result_id and polls /result/<id>, reading value once ready becomes true. How to control enqueue time and delay with apply_async is the next section.
3.4 .apply_async(): Delay, Scheduling, Queue Routing
.delay() is a thin wrapper over .apply_async(). When you need finer control, use apply_async.
# 30秒後に実行(リトライ間隔の調整やデバウンスに)
add_together.apply_async(args=[2, 3], countdown=30)
# 特定時刻に実行(予約配信・スケジュール送信)
from datetime import datetime, timedelta, timezone
add_together.apply_async(args=[2, 3], eta=datetime.now(timezone.utc) + timedelta(hours=1))
# キューを分けてルーティング(重いタスクと軽い通知を別ワーカーで捌く)
generate_invoice_pdf.apply_async(args=[invoice_id], queue="pdf")
With countdown/eta you get delay/scheduling, and with queue you get queue routing. Splitting queues prevents the accident where "heavy processing like PDF generation clogs light processing like email sending" (start a worker with -Q pdf to dedicate it).
3.5 The Trade-Off of task_ignore_result
task_ignore_result=True (the factory default) skips writes to the result backend. It is ideal for "fire-and-forget"–type tasks like email sending — no wasted results piling up in Redis, lower cost. On the other hand, a task that wants to return a result via /result/<id> is individually enabled with @shared_task(ignore_result=False).
| Setting | Stores result | Suited task |
|---|---|---|
task_ignore_result=True (default) | No | Notification, webhook, fire-and-forget |
@shared_task(ignore_result=False) | Yes | Export, returning a computed result, showing progress |
⚠️ When you do store results, the result backend swells without bound. With Redis as the backend, old results keep eating memory, so always set a TTL with
result_expires(default 1 day) (§5.4).
4. The Worker's Entry Point and Startup
The worker, separate from the Flask app, retrieves the Celery app from a thin entry point called make_celery.py and starts. This is the official verbatim structure.
# make_celery.py
from myapp import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"]
It calls create_app() to assemble the Flask app and exposes the Celery app — stored in app.extensions["celery"] — as celery_app. The -A (app) of celery -A make_celery finds this celery_app.
# ワーカーを起動(INFO ログ)
celery -A make_celery worker --loglevel INFO
# 定期実行スケジューラ(Beat)を起動(§6)
celery -A make_celery beat --loglevel INFO
worker is the process that executes tasks, and beat is the scheduler that enqueues tasks cron-style. The two are separate processes, and in production you separate their containers too (§6, §7).
5. Production-Quality Resilience: This Is the Watershed Between Debt and Asset
From here is the part that divides a demo from production. A job queue can be written in 30 minutes if "just running" is enough, but to run it without breaking in production you must correctly assemble idempotency, retries, crash safety, and argument design.
5.1 Idempotency: The Premise That a Task Can Run Twice
The most important premise. The delivery guarantee of Celery (and message queues in general) is at-least-once. That is, the same task running 2 or more times happens as a normal case. If a worker crashes right after completing a task but before sending the ack, the broker treats it as "incomplete" and redelivers.
Therefore, a task must be designed to be idempotent (the same result no matter how many times it runs). This is exactly the same principle covered in the SQS/Lambda idempotent-processing guide; the essence is invariant even when the queue type changes.
# myapp/tasks.py
from celery import shared_task
from myapp import db
from myapp.models import EmailLog, User
@shared_task(ignore_result=True)
def send_welcome_email(user_id: int, idempotency_key: str) -> None:
# 冪等キーで「すでに送ったか」を原子的にチェック&記録する
existing = db.session.execute(
db.select(EmailLog).filter_by(idempotency_key=idempotency_key)
).scalar_one_or_none()
if existing is not None:
return # 二重実行:何もしない(冪等)
user = db.session.get(User, user_id)
if user is None:
return # ユーザーが消えている:静かに終える
send_email(to=user.email, template="welcome")
db.session.add(EmailLog(idempotency_key=idempotency_key, user_id=user_id))
db.session.commit() # 送信の事実を記録。次の実行はここで弾かれる
There are 3 standard ways to make a task idempotent.
| Technique | How | Suited situation |
|---|---|---|
| Idempotency key + ledger | Record "executed" with a unique key, return early on duplicates | Side effects that are bad to do twice — email, notifications, billing |
| upsert | INSERT ... ON CONFLICT DO UPDATE so insert and update give the same result | Record creation / update |
| De-duplication (dedup) | Lock on the task ID at enqueue / suppress an existing job | Cases where the same trigger fires repeatedly in a short window |
💡 0 double charges is protected by idempotency. In my B2B SaaS, "do-it-twice-and-it's-an-incident" processing — billing and notifications — was handled by Celery. Recording the idempotency key in a ledger table with a unique constraint, and structurally rejecting re-execution the moment
commitruns — this design is why, even when a worker received a redelivery, no double charge occurred in production. "Safe to retry" is not a prayer; it is guaranteed by the table's unique constraint.
5.2 Retries: autoretry_for + Backoff + Jitter
External dependencies (SMTP, third-party APIs) fail temporarily. When they fail, the standard is to automatically retry with exponential backoff + jitter. This writes, declaratively with Celery decorators, the resilience patterns discussed in retry, backoff, circuit breaker.
# myapp/tasks.py
from celery import shared_task
from requests.exceptions import RequestException
@shared_task(
autoretry_for=(RequestException,), # この例外なら自動リトライ
retry_backoff=True, # 指数バックオフ(2,4,8,16…秒)
retry_backoff_max=600, # 上限600秒
retry_jitter=True, # ばらつきを足してthundering herdを防ぐ
max_retries=5, # 5回で打ち切り
)
def deliver_webhook(endpoint_id: int, payload_id: int) -> None:
endpoint = db.session.get(WebhookEndpoint, endpoint_id)
payload = db.session.get(WebhookPayload, payload_id)
resp = requests.post(endpoint.url, json=payload.body, timeout=10)
resp.raise_for_status() # 5xx等で RequestException → autoretry
| Parameter | Role | Recommendation |
|---|---|---|
autoretry_for | The exception types to retry on | Transient failures only. Don't include permanent errors like ValueError |
retry_backoff | Exponential backoff | True. Don't hammer the other side |
retry_jitter | Inject variance | True. Scatter simultaneous redelivery on simultaneous failures (herd) |
max_retries | Maximum number of attempts | Keep it finite. Infinite retries invite death |
⚠️ Don't retry permanent errors. If you include "fail no matter how many times" errors like validation failures (invalid input) or 404 in the retry targets, you keep burning workers for nothing. Limit
autoretry_forto only transient failures like network, timeout, and 5xx, and topple permanent errors to immediate failure (or the dead-letter discussed later).
5.3 Crash Safety: acks_late + task_reject_on_worker_lost
Default Celery acks (acknowledges) a task the moment it receives it. This means "if a worker crashes while processing, that task is lost." For tasks that must not be lost, use acks_late to ack after the task completes.
# myapp/celeryconfig.py(CELERY 辞書に展開してもよい)
task_acks_late = True # 完了後にackする(クラッシュで失わない)
task_reject_on_worker_lost = True # ワーカー消失時はタスクを再キューする
worker_prefetch_multiplier = 1 # 1本ずつ取り、長時間タスクの抱え込みを防ぐ
The combination of acks_late=True and task_reject_on_worker_lost=True means that even if a worker vanishes to OOM or SIGKILL, the task is redelivered. But this is also a design that raises the chance of a task running twice — which is exactly why the idempotency of §5.1 is a prerequisite. You must not introduce acks_late without idempotency (a double execution directly causes an incident).
When using Redis as the broker, the timing of redelivery is decided by the visibility timeout. This is the threshold of "redeliver to another worker if the task a worker pulled is not acked within this many seconds."
# Redisブローカー固有:visibility timeout(既定3600秒)
broker_transport_options = {"visibility_timeout": 3600}
⚠️ Make the visibility timeout longer than the task's maximum execution time. For example, if video transcoding takes 1 hour but the visibility timeout is 1800 seconds, a still-running task is misjudged as "lost" and runs twice. Set it with the longest task's duration + a margin. This is a Redis-specific pitfall that RabbitMQ doesn't have.
5.4 Pass an ID for Arguments (Don't Pass an Object)
A design principle the official docs warn about in bold: "Pass the user's id rather than the user object." Task arguments must be serializable, and you must not pass a SQLAlchemy model object as .delay(user).
# ❌ 悪い:モデルオブジェクトを渡す(シリアライズできない/古い状態が固定される)
send_welcome_email.delay(user)
# ✅ 良い:IDを渡し、タスク内のapp_contextで「いま」の状態を再クエリする
send_welcome_email.delay(user.id, idempotency_key=str(uuid4()))
There are 2 reasons. (1) Can't be serialized — a task is encoded as JSON, etc., and passes through the broker, so an ORM object can't ride along. (2) The state goes stale — even if you carry the object as of enqueue time, by the time the worker executes, the DB value may have changed. So pass only the ID and db.session.get(User, user_id) inside the task to re-fetch the "now" state. Thanks to FlaskTask stretching an app_context, this re-query writes naturally inside the task (§2.1's benefit pays off here).
💡 The DB session is per task. Because
FlaskTask.__call__opens and closes the app_context per task, Flask-SQLAlchemy'sdb.sessionis also scoped per task. Makedb.session.commit()(orrollback()on an exception) explicit at the task's end to prevent state bleed across sessions.
5.5 Where Failures Go: Dead-Letter and Redis's Data Loss
A task that exhausts max_retries ends in "failure." In production, don't swallow failures; create a destination — in Celery, the standard design is an on_failure hook, or evacuating to a separate "dead-letter" table / queue on failure.
@shared_task(bind=True, max_retries=5, autoretry_for=(RequestException,))
def deliver_webhook(self, endpoint_id: int, payload_id: int) -> None:
try:
... # 本体
except RequestException:
if self.request.retries >= self.max_retries:
record_dead_letter(endpoint_id, payload_id) # 退避して可視化
raise # autoretry に委ねる
And the honest caution touched on at the start — Redis is "susceptible to data loss" as a broker. If Redis restarts or fails over, depending on the persistence settings, unprocessed tasks on the queue may vanish.
⚠️ For "tasks that must never be lost," consider RabbitMQ. For processing like billing or legal notifications where not even one drop is tolerated, choosing RabbitMQ — with its solid delivery guarantee — as the broker is the honest move. If you use Redis, enable AOF persistence and create a state where "even if it vanishes, it can be recovered by re-enqueue" with
acks_late+ idempotency. Either way, design on the premise that "because it's Redis, it sometimes vanishes." This is a trade-off I actually kept in mind in my own production, without idealizing Celery.
6. Periodic Execution: Celery Beat
Cron-like periodic jobs (nightly aggregation, reminders, expiry processing) are enqueued by Celery Beat. Beat is a "scheduler" that merely loads task messages onto the queue at the appointed times — the worker executes them.
# CELERY 辞書に beat_schedule を足す
from celery.schedules import crontab
CELERY = dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
beat_schedule={
"expire-trials-nightly": {
"task": "myapp.tasks.expire_trials",
"schedule": crontab(hour=3, minute=0), # 毎日03:00
},
"send-reminders-hourly": {
"task": "myapp.tasks.send_due_reminders",
"schedule": crontab(minute=0), # 毎時0分
},
},
)
Startup is a separate process.
celery -A make_celery beat --loglevel INFO
⚠️ Run only 1 Beat process. If you start 2 Beats meaning to make them redundant, the same schedule is enqueued twice and the job runs twice. Run Beat as a single scheduler, and on a container platform fix it to "beat = 1 replica." And per §5.1, make the tasks Beat enqueues idempotent too — make "safe even if double-enqueued, safe even if double-executed" a double wall.
7. Deployment and Observability: Separate Containers, Monitoring, Correlated Logs
7.1 The Worker Is a Separate Container: Docker Compose
Putting the great principle of §1.2 into a topology looks like this. Separate web, worker, beat, and redis, and the standard is to start the same image with different commands (the image is shared since it's the same codebase).
# compose.yaml
services:
redis:
image: redis:7-alpine
restart: unless-stopped
web:
build: .
command: gunicorn -w 4 -b 0.0.0.0:8000 "make_celery:flask_app"
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
depends_on: [redis]
ports: ["8000:8000"]
worker:
build: .
command: celery -A make_celery worker --loglevel INFO --concurrency 4 --max-tasks-per-child 100
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
depends_on: [redis]
beat:
build: .
command: celery -A make_celery beat --loglevel INFO
environment:
FLASK_CELERY__BROKER_URL: redis://redis:6379/0
depends_on: [redis]
web is Gunicorn (reusing the design from the production deployment guide as-is), worker and beat are Celery, and redis is the broker. On ECS (Fargate), this corresponds to 3 services: "web service," "worker service," and "beat service (1 replica)."
7.2 Tuning the Worker
| Flag | Role | Guidance |
|---|---|---|
--concurrency N | Parallelism within the worker process | CPU-bound → ≈ vCPU count; IO-bound → more |
worker_prefetch_multiplier | Number of tasks pulled at once | Long tasks → 1 (prevent hoarding and unfairness) |
--max-tasks-per-child N | Restart the child process after this many | Leak countermeasure. Effective for heavy deps like PDF/image processing |
--max-tasks-per-child is a realistic defense against the problem of workers ballooning due to memory leaks in third-party libraries. It recreates the child process every N tasks and periodically frees memory.
7.3 Monitoring: Flower and Correlated Logs
Observability is the lifeline of production operations. Celery has a web dashboard called Flower, where you can see task success/failure, execution time, queue length, and worker activity in real time.
pip install flower
celery -A make_celery flower --port=5555
In addition, extend the structured logging + request-ID correlation established in the error-handling / observability guide to the task side too. Always put the task_id in task logs — that way you can trace "user's request → enqueued task → execution in the worker" as a single line.
import logging
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(bind=True)
def generate_invoice_pdf(self, invoice_id: int) -> None:
# self.request.id が task_id。構造化ログに必ず含める
logger.info("invoice pdf start", extra={"task_id": self.request.id, "invoice_id": invoice_id})
...
logger.info("invoice pdf done", extra={"task_id": self.request.id, "invoice_id": invoice_id})
7.4 Graceful Shutdown (Warm Shutdown)
When you bring a worker down on each deploy, you must not kill an in-progress task midway. Send SIGTERM to a Celery worker and it does a warm shutdown — it stops accepting new tasks and finishes the running tasks to the end before terminating the process.
In an ECS rolling deploy, SIGTERM is sent at container stop, and within the grace of stopTimeout (default 30 seconds, extend if needed) the worker completes its warm shutdown. This applies, to the worker side, the same thought as the Gunicorn graceful shutdown covered in the production deployment guide — "drain what's in progress before going down." If you have long-running tasks, set stopTimeout longer than that longest duration.
8. Honest Alternatives: When Celery Is Overkill
Celery is powerful, but not every async processing needs Celery. The best judgment for avoiding technical debt is often "don't use it."
| Option | Suited situation | Trade-off |
|---|---|---|
| Celery + Redis/RabbitMQ | Full-scale operation needing retries, scheduling, queue routing, large volumes | Many components (broker + worker + beat + monitoring) |
| RQ (Redis Queue) | Simple background processing. Want to complete with Redis alone | Thinner features than Celery (weak on advanced routing, etc.) |
| Managed queue (SQS + Lambda) | Want to lower operational load with serverless. AWS premise | Vendor lock-in. Runs outside the Flask process |
- If the requirement is roughly "send one email asynchronously," Celery is overkill. A lightweight library like
RQis often enough, and you needn't shoulder the whole operational set of broker + worker + beat + Flower. - If you want to lean serverless on an AWS premise, SQS + Lambda is an honest choice too. You don't self-operate worker containers, and the design of idempotent processing can be left to the SQS/Lambda/EventBridge idempotent async-processing guide. The core of idempotency is unchanged whether Celery or SQS.
- If you want to dig deeper into Celery's design, I've compiled general Celery/Redis production patterns not confined to Flask in the Celery × Redis production job-queue guide.
The criterion is simple — "is any of retries / scheduling / queue separation / monitoring genuinely needed?" If yes, Celery; if not, RQ or a managed queue. It's not too late to migrate to Celery after the requirements grow (YAGNI).
Summary: A Celery Production Checklist
The essentials of running Flask × Celery × Redis at production quality boil down to 3 points — context integration, idempotency, and resilience. Flask's official celery_init_app supports the first; the idempotency key and acks_late/retry design support the latter 2. Finally, let me close with a pre-production confirmation list.
| # | Check item | Basis (this article) |
|---|---|---|
| 1 | Is the worker started as a process / container separate from Gunicorn? | §1.2 / §7.1 |
| 2 | Does celery_init_app's FlaskTask stretch an app_context, with db/current_app usable inside the task? | §2.1 |
| 3 | Is the config consolidated in Flask config's CELERY key, with connection strings injected via env vars? | §2.2 |
| 4 | Do task definitions use @shared_task, not @celery_app.task? | §3.1 |
| 5 | Enqueue via .delay()/.apply_async(), retrieve via AsyncResult, and task_ignore_result when no result is needed | §3.2–3.5 |
| 6 | On the at-least-once premise, are all tasks designed idempotently (idempotency key / upsert / dedup)? | §5.1 |
| 7 | autoretry_for + retry_backoff + retry_jitter + max_retries for transient failures, permanent errors fail immediately | §5.2 |
| 8 | acks_late + task_reject_on_worker_lost for tasks that must not be lost (idempotency is a prerequisite) | §5.3 |
| 9 | Is the Redis broker's visibility timeout set longer than the longest task time? | §5.3 |
| 10 | Do task arguments pass an ID (int) and re-query inside the task (model objects forbidden)? | §5.4 |
| 11 | If storing results, TTL via result_expires, and are failures evacuated to a dead-letter? | §3.5 / §5.5 |
| 12 | Understand Redis's data loss, and is a critical task recoverable via RabbitMQ / AOF persistence + idempotency? | §5.5 |
| 13 | Is Beat 1 replica, and are the tasks Beat enqueues idempotent? | §6 |
| 14 | Leak countermeasure via --max-tasks-per-child, hoarding prevention for long tasks via prefetch=1 | §7.2 |
| 15 | Are you placing task_id correlation in structured logs along with Flower monitoring? | §7.3 |
| 16 | On deploy, a warm shutdown via SIGTERM, with stopTimeout longer than the longest task time? | §7.4 |
| 17 | Have you considered whether Celery is overkill in the first place (would RQ / SQS+Lambda suffice)? | §8 |
The real difficulty of a job queue lies not in how to use Celery, but in assembling a design where data doesn't get corrupted on the premise of the reality that "tasks run twice, workers go down, and Redis sometimes loses data." The reason I could protect 0 double charges in my B2B SaaS was not that Celery was excellent, but that I made a unique constraint — the idempotency key — guarantee "safe to re-execute." On the foundation of Flask context integration, you stack idempotency and resilience — assemble it in this order, and the job queue becomes a production asset.
For the overall Flask production design, start from the Flask production-operations guide; for the context mechanism, the context deep-dive; for deployment, the production deployment guide.