Skip to main content
友田 陽大
Flask in production
Python
Flask
Celery
Redis
非同期処理
本番運用
バックエンド

Flask × Celery × Redis: Running Background Tasks and Job Queues at Production Quality (Flask Context Integration, Idempotency, Resilience)

A practical guide to designing async tasks / job queues at production quality with Flask Celery Redis. From the official celery_init_app and FlaskTask app-context integration, shared_task, .delay/AsyncResult, idempotency that withstands at-least-once delivery, the resilience of autoretry/acks_late/visibility timeout, Celery Beat's periodic execution, to a Docker setup that runs workers in a separate container plus Flower monitoring and task_id–correlated logs — explained with real code faithful to the latest Flask official docs and Celery documentation.

Published
Reading time
23 min read
Author
友田 陽大
Share

Introduction: Trying to Do Everything Inside the Request Breaks Production

A Flask view function runs within the limited time budget from receiving an HTTP request to returning a response. Yet a real app has heaps of work that won't fit within that budget. Generating invoice PDFs, sending confirmation emails, exporting CSVs, dispatching webhooks to external APIs, generating image thumbnails, nightly aggregation batches — run these synchronously inside a view, and the user is shown a spinner for seconds to tens of seconds, a Gunicorn worker is monopolized for one of them, and production silently clogs with timeouts and retry storms.

The solution is simple, yet bares its fangs if you get the design wrong. "Offload heavy work outside the request (to a separate process)" — this is a background task (job queue). The de facto standard in Python/Flask is Celery, and what carries its message delivery is Redis (or RabbitMQ).

This article, as a spoke of the Flask production-operations guide (pillar), runs through the design of assembling background tasks at production quality with Flask 3.1.x + Celery 5.x + Redis, using real code faithful to Flask's official Celery-integration pattern and the Celery official documentation. There are 3 central themes — (1) Flask context integration (creating a state where db and current_app are usable inside a task), (2) idempotency (the premise that a task can run twice), and (3) resilience (retries, crash safety, graceful shutdown). Miss these 3 and the job queue becomes "a black box that looks like it's working but occasionally corrupts data."

The author designed and implemented the backend of a B2B SaaS that won the Minister of Economy, Trade and Industry Award in Python / Flask / SQLAlchemy / PostgreSQL, and ran its Celery workers in production on ECS (Fargate). To handle processing like billing, notifications, and exports — "must not fail, but can't be done inside the request" — without double charges or dropped notifications, what was needed was precisely this discipline of idempotency and resilience. This article verbalizes the design judgments from that field experience, against the official specifications.

💡 The versions covered in this article: it assumes Celery 5.x (the latest at the time of writing is 5.6.3) and Flask 3.1.x. Celery is installed separately via pip install celery (it is not a dependency of Flask). Redis takes the lead as the broker, but where resistance to data loss is required, RabbitMQ is treated honestly as a candidate too.


1. Why Background Tasks: The Components and the Great Principle of a "Separate Process"

1.1 Work That Should Be Offloaded Outside the Request

First, let me make the criterion clear. The only work you may run synchronously inside a view is work that fits within the user's wait time and that, even if it fails, can be returned to the user on the spot. Processing like the following is offloaded to the background as a rule.

KindExamplesWhy offload
NotificationConfirmation email, Slack notification, push notificationExternal SMTP/API is slow / goes down. Don't make the response wait
GenerationInvoice PDF, reports, thumbnails, video transcodingCPU/IO heavy, takes seconds to minutes
ExportCSV/Excel generation of large data, ZIP packagingGrows in proportion to row count, exceeds the request budget
IntegrationWebhook dispatch, third-party syncThe other side is delayed / fails. Retries are needed
PeriodicNightly aggregation, reminders, expiry processingNo request exists in the first place (cron-like)

1.2 The 4 Components

A job queue with Celery is made up of the following 4 actors.

RoleConcrete formResponsibility
Web processGunicorn + Flask app"Enqueues" tasks (.delay()). Does not wait for the result
Broker (required)Redis or RabbitMQThe queue that stores and delivers task messages
Worker processcelery -A make_celery workerPulls tasks from the queue and "executes" them
Result backend (optional)Redis / RPC / DB, etc.Stores task results and states (if needed)

The Celery docs state clearly that "Celery requires a message broker, and you can use RabbitMQ or Redis. The result backend is optional." On Redis, there is a note that it is "feature-complete but more susceptible to data loss than RabbitMQ," and this will definitely matter in the resilience design discussed later.

⚠️ The most important great principle: the worker is a process / container separate from Gunicorn. This is where beginners stumble most. celery worker is a process independent of the web server (Gunicorn), and you start, scale, and deploy it separately. No matter how much you scale the web, tasks won't get processed, and vice versa. In a container setup you separate the "web container," "worker container," and "redis container" (§7 shows a Docker Compose). This separation is on the same line of thought as the "separation of the WSGI app and the WSGI server" covered in the production deployment guidesplit processes by role.


2. Flask's Official Celery Integration: A Deep Dive into celery_init_app

This is the technical core of the article. The integration shown by the Flask official (flask.palletsprojects.com's patterns/celery/) aims to "automatically stretch a Flask application context when running a task." Why is that needed? Because inside a task you want to use db.session, current_app.config, and g. These Flask features work only on top of an application context (for details of this mechanism, see the context deep-dive). A worker running outside the request has no such context by default.

2.1 FlaskTask: A Task Subclass That Stretches the app_context

This is the official integration function. I present it verbatim in structure, without adding a single line.

# myapp/celery_app.py
from celery import Celery, Task
from flask import Flask


def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

The official explanation (translated verbatim) is: "This Task subclass automatically runs the task function with a Flask application context active. This makes services like database connections available inside the task." Let me pin down the meaning line by line.

  • class FlaskTask(Task) … inherits Celery's Task and overrides __call__. Every time a task is called, it wraps it in with app.app_context(): before calling the body self.run(...). This makes the inside of the task "with an application context," so db.session / current_app / g are usable.
  • Celery(app.name, task_cls=FlaskTask) … sets this FlaskTask as the default class for all tasks.
  • config_from_object(app.config["CELERY"]) … reads Celery's config from the CELERY key of Flask config. The config is unified on the Flask side, and double management disappears.
  • celery_app.set_default() … sets this Celery app as the "current app." This is the key by which the @shared_task described later finds this instance during a request.
  • app.extensions["celery"] = celery_app … stores it in the extension registry. The worker's entry point (§4) retrieves the Celery app from here.

2.2 Wiring Into the Application Factory

celery_init_app is called inside the create_app factory established in the large-app structure guide. The official factory wiring follows.

# myapp/__init__.py
from flask import Flask

from myapp.celery_app import celery_init_app


def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

There are 2 points. (1) Celery config is bundled as a dict inside the single key CELERY. broker_url is the broker (Redis), result_backend is where results are stored, and task_ignore_result=True is the default of "don't store results for tasks that don't need them" (§3.4 discusses the trade-off). (2) With app.config.from_prefixed_env(), you can also override from environment variables like FLASK_CELERY__BROKER_URL — i.e., you can inject connection strings via environment variables in production rather than baking them into code. The handling of secrets aligns with the pillar's and the production deployment guide's policy (FLASK_-prefixed environment variables).

💡 Why with app.app_context() is needed "for every task." Unlike a request handler, a worker is a long-lived process, and one worker processes thousands of tasks in sequence. The context should be opened and closed per task, and FlaskTask.__call__'s with block creates exactly that scope. This way db.session is correctly established and torn down per task, preventing session leaks and state bleed into other tasks.


3. Defining, Invoking, and Retrieving Results of Tasks

3.1 Use @shared_task (Not @celery_app.task)

In the factory pattern, using @shared_task to define tasks is strongly recommended officially. The reason (translated verbatim) is: "@celery_app.task requires access to the celery_app object, which is not available in the factory pattern. Also, tasks decorated with it become bound to a specific Flask/Celery instance. Instead, use Celery's @shared_task. This creates task objects that 'access whatever the current app is.'"

# myapp/tasks.py
from celery import shared_task


@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b

Because @shared_task doesn't import a specific celery_app, it creates no circular imports and coexists cleanly with Blueprints and module splitting. Thanks to the set_default() that celery_init_app called, add_together.delay(...) finds the "current Celery app" at execution time and enqueues the task there.

3.2 Enqueue a Task: .delay()

Enqueuing from a view is .delay(). The standard is to not wait for the result and return the task_id immediately.

# myapp/views.py
from flask import request

from myapp.tasks import add_together


@app.post("/add")
def start_add():
    a = request.json["a"]
    b = request.json["b"]
    result = add_together.delay(a, b)
    return {"result_id": result.id}

.delay(a, b) loads the task message onto the broker (Redis) and returns immediately. You return a result_id to the user and have them query for the result later — this "separation of enqueue and retrieval" is the essence of async.

3.3 Retrieve the Result: AsyncResult

The result or state is fetched from the result_id (task ID) via AsyncResult. The official polling endpoint follows.

# myapp/views.py
from celery.result import AsyncResult


@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }

The front end holds the result_id and polls /result/<id>, reading value once ready becomes true. How to control enqueue time and delay with apply_async is the next section.

3.4 .apply_async(): Delay, Scheduling, Queue Routing

.delay() is a thin wrapper over .apply_async(). When you need finer control, use apply_async.

# 30秒後に実行(リトライ間隔の調整やデバウンスに)
add_together.apply_async(args=[2, 3], countdown=30)

# 特定時刻に実行(予約配信・スケジュール送信)
from datetime import datetime, timedelta, timezone
add_together.apply_async(args=[2, 3], eta=datetime.now(timezone.utc) + timedelta(hours=1))

# キューを分けてルーティング(重いタスクと軽い通知を別ワーカーで捌く)
generate_invoice_pdf.apply_async(args=[invoice_id], queue="pdf")

With countdown/eta you get delay/scheduling, and with queue you get queue routing. Splitting queues prevents the accident where "heavy processing like PDF generation clogs light processing like email sending" (start a worker with -Q pdf to dedicate it).

3.5 The Trade-Off of task_ignore_result

task_ignore_result=True (the factory default) skips writes to the result backend. It is ideal for "fire-and-forget"–type tasks like email sending — no wasted results piling up in Redis, lower cost. On the other hand, a task that wants to return a result via /result/<id> is individually enabled with @shared_task(ignore_result=False).

SettingStores resultSuited task
task_ignore_result=True (default)NoNotification, webhook, fire-and-forget
@shared_task(ignore_result=False)YesExport, returning a computed result, showing progress

⚠️ When you do store results, the result backend swells without bound. With Redis as the backend, old results keep eating memory, so always set a TTL with result_expires (default 1 day) (§5.4).


4. The Worker's Entry Point and Startup

The worker, separate from the Flask app, retrieves the Celery app from a thin entry point called make_celery.py and starts. This is the official verbatim structure.

# make_celery.py
from myapp import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

It calls create_app() to assemble the Flask app and exposes the Celery app — stored in app.extensions["celery"] — as celery_app. The -A (app) of celery -A make_celery finds this celery_app.

# ワーカーを起動(INFO ログ)
celery -A make_celery worker --loglevel INFO

# 定期実行スケジューラ(Beat)を起動(§6)
celery -A make_celery beat --loglevel INFO

worker is the process that executes tasks, and beat is the scheduler that enqueues tasks cron-style. The two are separate processes, and in production you separate their containers too (§6, §7).


5. Production-Quality Resilience: This Is the Watershed Between Debt and Asset

From here is the part that divides a demo from production. A job queue can be written in 30 minutes if "just running" is enough, but to run it without breaking in production you must correctly assemble idempotency, retries, crash safety, and argument design.

5.1 Idempotency: The Premise That a Task Can Run Twice

The most important premise. The delivery guarantee of Celery (and message queues in general) is at-least-once. That is, the same task running 2 or more times happens as a normal case. If a worker crashes right after completing a task but before sending the ack, the broker treats it as "incomplete" and redelivers.

Therefore, a task must be designed to be idempotent (the same result no matter how many times it runs). This is exactly the same principle covered in the SQS/Lambda idempotent-processing guide; the essence is invariant even when the queue type changes.

# myapp/tasks.py
from celery import shared_task

from myapp import db
from myapp.models import EmailLog, User


@shared_task(ignore_result=True)
def send_welcome_email(user_id: int, idempotency_key: str) -> None:
    # 冪等キーで「すでに送ったか」を原子的にチェック&記録する
    existing = db.session.execute(
        db.select(EmailLog).filter_by(idempotency_key=idempotency_key)
    ).scalar_one_or_none()
    if existing is not None:
        return  # 二重実行:何もしない(冪等)

    user = db.session.get(User, user_id)
    if user is None:
        return  # ユーザーが消えている:静かに終える

    send_email(to=user.email, template="welcome")

    db.session.add(EmailLog(idempotency_key=idempotency_key, user_id=user_id))
    db.session.commit()  # 送信の事実を記録。次の実行はここで弾かれる

There are 3 standard ways to make a task idempotent.

TechniqueHowSuited situation
Idempotency key + ledgerRecord "executed" with a unique key, return early on duplicatesSide effects that are bad to do twice — email, notifications, billing
upsertINSERT ... ON CONFLICT DO UPDATE so insert and update give the same resultRecord creation / update
De-duplication (dedup)Lock on the task ID at enqueue / suppress an existing jobCases where the same trigger fires repeatedly in a short window

💡 0 double charges is protected by idempotency. In my B2B SaaS, "do-it-twice-and-it's-an-incident" processing — billing and notifications — was handled by Celery. Recording the idempotency key in a ledger table with a unique constraint, and structurally rejecting re-execution the moment commit runs — this design is why, even when a worker received a redelivery, no double charge occurred in production. "Safe to retry" is not a prayer; it is guaranteed by the table's unique constraint.

5.2 Retries: autoretry_for + Backoff + Jitter

External dependencies (SMTP, third-party APIs) fail temporarily. When they fail, the standard is to automatically retry with exponential backoff + jitter. This writes, declaratively with Celery decorators, the resilience patterns discussed in retry, backoff, circuit breaker.

# myapp/tasks.py
from celery import shared_task
from requests.exceptions import RequestException


@shared_task(
    autoretry_for=(RequestException,),  # この例外なら自動リトライ
    retry_backoff=True,                 # 指数バックオフ(2,4,8,16…秒)
    retry_backoff_max=600,              # 上限600秒
    retry_jitter=True,                  # ばらつきを足してthundering herdを防ぐ
    max_retries=5,                      # 5回で打ち切り
)
def deliver_webhook(endpoint_id: int, payload_id: int) -> None:
    endpoint = db.session.get(WebhookEndpoint, endpoint_id)
    payload = db.session.get(WebhookPayload, payload_id)
    resp = requests.post(endpoint.url, json=payload.body, timeout=10)
    resp.raise_for_status()  # 5xx等で RequestException → autoretry
ParameterRoleRecommendation
autoretry_forThe exception types to retry onTransient failures only. Don't include permanent errors like ValueError
retry_backoffExponential backoffTrue. Don't hammer the other side
retry_jitterInject varianceTrue. Scatter simultaneous redelivery on simultaneous failures (herd)
max_retriesMaximum number of attemptsKeep it finite. Infinite retries invite death

⚠️ Don't retry permanent errors. If you include "fail no matter how many times" errors like validation failures (invalid input) or 404 in the retry targets, you keep burning workers for nothing. Limit autoretry_for to only transient failures like network, timeout, and 5xx, and topple permanent errors to immediate failure (or the dead-letter discussed later).

5.3 Crash Safety: acks_late + task_reject_on_worker_lost

Default Celery acks (acknowledges) a task the moment it receives it. This means "if a worker crashes while processing, that task is lost." For tasks that must not be lost, use acks_late to ack after the task completes.

# myapp/celeryconfig.py(CELERY 辞書に展開してもよい)
task_acks_late = True                 # 完了後にackする(クラッシュで失わない)
task_reject_on_worker_lost = True     # ワーカー消失時はタスクを再キューする
worker_prefetch_multiplier = 1        # 1本ずつ取り、長時間タスクの抱え込みを防ぐ

The combination of acks_late=True and task_reject_on_worker_lost=True means that even if a worker vanishes to OOM or SIGKILL, the task is redelivered. But this is also a design that raises the chance of a task running twice — which is exactly why the idempotency of §5.1 is a prerequisite. You must not introduce acks_late without idempotency (a double execution directly causes an incident).

When using Redis as the broker, the timing of redelivery is decided by the visibility timeout. This is the threshold of "redeliver to another worker if the task a worker pulled is not acked within this many seconds."

# Redisブローカー固有:visibility timeout(既定3600秒)
broker_transport_options = {"visibility_timeout": 3600}

⚠️ Make the visibility timeout longer than the task's maximum execution time. For example, if video transcoding takes 1 hour but the visibility timeout is 1800 seconds, a still-running task is misjudged as "lost" and runs twice. Set it with the longest task's duration + a margin. This is a Redis-specific pitfall that RabbitMQ doesn't have.

5.4 Pass an ID for Arguments (Don't Pass an Object)

A design principle the official docs warn about in bold: "Pass the user's id rather than the user object." Task arguments must be serializable, and you must not pass a SQLAlchemy model object as .delay(user).

# ❌ 悪い:モデルオブジェクトを渡す(シリアライズできない/古い状態が固定される)
send_welcome_email.delay(user)

# ✅ 良い:IDを渡し、タスク内のapp_contextで「いま」の状態を再クエリする
send_welcome_email.delay(user.id, idempotency_key=str(uuid4()))

There are 2 reasons. (1) Can't be serialized — a task is encoded as JSON, etc., and passes through the broker, so an ORM object can't ride along. (2) The state goes stale — even if you carry the object as of enqueue time, by the time the worker executes, the DB value may have changed. So pass only the ID and db.session.get(User, user_id) inside the task to re-fetch the "now" state. Thanks to FlaskTask stretching an app_context, this re-query writes naturally inside the task (§2.1's benefit pays off here).

💡 The DB session is per task. Because FlaskTask.__call__ opens and closes the app_context per task, Flask-SQLAlchemy's db.session is also scoped per task. Make db.session.commit() (or rollback() on an exception) explicit at the task's end to prevent state bleed across sessions.

5.5 Where Failures Go: Dead-Letter and Redis's Data Loss

A task that exhausts max_retries ends in "failure." In production, don't swallow failures; create a destination — in Celery, the standard design is an on_failure hook, or evacuating to a separate "dead-letter" table / queue on failure.

@shared_task(bind=True, max_retries=5, autoretry_for=(RequestException,))
def deliver_webhook(self, endpoint_id: int, payload_id: int) -> None:
    try:
        ...  # 本体
    except RequestException:
        if self.request.retries >= self.max_retries:
            record_dead_letter(endpoint_id, payload_id)  # 退避して可視化
        raise  # autoretry に委ねる

And the honest caution touched on at the start — Redis is "susceptible to data loss" as a broker. If Redis restarts or fails over, depending on the persistence settings, unprocessed tasks on the queue may vanish.

⚠️ For "tasks that must never be lost," consider RabbitMQ. For processing like billing or legal notifications where not even one drop is tolerated, choosing RabbitMQ — with its solid delivery guarantee — as the broker is the honest move. If you use Redis, enable AOF persistence and create a state where "even if it vanishes, it can be recovered by re-enqueue" with acks_late + idempotency. Either way, design on the premise that "because it's Redis, it sometimes vanishes." This is a trade-off I actually kept in mind in my own production, without idealizing Celery.


6. Periodic Execution: Celery Beat

Cron-like periodic jobs (nightly aggregation, reminders, expiry processing) are enqueued by Celery Beat. Beat is a "scheduler" that merely loads task messages onto the queue at the appointed times — the worker executes them.

# CELERY 辞書に beat_schedule を足す
from celery.schedules import crontab

CELERY = dict(
    broker_url="redis://localhost",
    result_backend="redis://localhost",
    beat_schedule={
        "expire-trials-nightly": {
            "task": "myapp.tasks.expire_trials",
            "schedule": crontab(hour=3, minute=0),  # 毎日03:00
        },
        "send-reminders-hourly": {
            "task": "myapp.tasks.send_due_reminders",
            "schedule": crontab(minute=0),  # 毎時0分
        },
    },
)

Startup is a separate process.

celery -A make_celery beat --loglevel INFO

⚠️ Run only 1 Beat process. If you start 2 Beats meaning to make them redundant, the same schedule is enqueued twice and the job runs twice. Run Beat as a single scheduler, and on a container platform fix it to "beat = 1 replica." And per §5.1, make the tasks Beat enqueues idempotent too — make "safe even if double-enqueued, safe even if double-executed" a double wall.


7. Deployment and Observability: Separate Containers, Monitoring, Correlated Logs

7.1 The Worker Is a Separate Container: Docker Compose

Putting the great principle of §1.2 into a topology looks like this. Separate web, worker, beat, and redis, and the standard is to start the same image with different commands (the image is shared since it's the same codebase).

# compose.yaml
services:
  redis:
    image: redis:7-alpine
    restart: unless-stopped

  web:
    build: .
    command: gunicorn -w 4 -b 0.0.0.0:8000 "make_celery:flask_app"
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
      FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
    depends_on: [redis]
    ports: ["8000:8000"]

  worker:
    build: .
    command: celery -A make_celery worker --loglevel INFO --concurrency 4 --max-tasks-per-child 100
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
      FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
    depends_on: [redis]

  beat:
    build: .
    command: celery -A make_celery beat --loglevel INFO
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
    depends_on: [redis]

web is Gunicorn (reusing the design from the production deployment guide as-is), worker and beat are Celery, and redis is the broker. On ECS (Fargate), this corresponds to 3 services: "web service," "worker service," and "beat service (1 replica)."

7.2 Tuning the Worker

FlagRoleGuidance
--concurrency NParallelism within the worker processCPU-bound → ≈ vCPU count; IO-bound → more
worker_prefetch_multiplierNumber of tasks pulled at onceLong tasks → 1 (prevent hoarding and unfairness)
--max-tasks-per-child NRestart the child process after this manyLeak countermeasure. Effective for heavy deps like PDF/image processing

--max-tasks-per-child is a realistic defense against the problem of workers ballooning due to memory leaks in third-party libraries. It recreates the child process every N tasks and periodically frees memory.

7.3 Monitoring: Flower and Correlated Logs

Observability is the lifeline of production operations. Celery has a web dashboard called Flower, where you can see task success/failure, execution time, queue length, and worker activity in real time.

pip install flower
celery -A make_celery flower --port=5555

In addition, extend the structured logging + request-ID correlation established in the error-handling / observability guide to the task side too. Always put the task_id in task logs — that way you can trace "user's request → enqueued task → execution in the worker" as a single line.

import logging
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True)
def generate_invoice_pdf(self, invoice_id: int) -> None:
    # self.request.id が task_id。構造化ログに必ず含める
    logger.info("invoice pdf start", extra={"task_id": self.request.id, "invoice_id": invoice_id})
    ...
    logger.info("invoice pdf done", extra={"task_id": self.request.id, "invoice_id": invoice_id})

7.4 Graceful Shutdown (Warm Shutdown)

When you bring a worker down on each deploy, you must not kill an in-progress task midway. Send SIGTERM to a Celery worker and it does a warm shutdown — it stops accepting new tasks and finishes the running tasks to the end before terminating the process.

In an ECS rolling deploy, SIGTERM is sent at container stop, and within the grace of stopTimeout (default 30 seconds, extend if needed) the worker completes its warm shutdown. This applies, to the worker side, the same thought as the Gunicorn graceful shutdown covered in the production deployment guide"drain what's in progress before going down." If you have long-running tasks, set stopTimeout longer than that longest duration.


8. Honest Alternatives: When Celery Is Overkill

Celery is powerful, but not every async processing needs Celery. The best judgment for avoiding technical debt is often "don't use it."

OptionSuited situationTrade-off
Celery + Redis/RabbitMQFull-scale operation needing retries, scheduling, queue routing, large volumesMany components (broker + worker + beat + monitoring)
RQ (Redis Queue)Simple background processing. Want to complete with Redis aloneThinner features than Celery (weak on advanced routing, etc.)
Managed queue (SQS + Lambda)Want to lower operational load with serverless. AWS premiseVendor lock-in. Runs outside the Flask process
  • If the requirement is roughly "send one email asynchronously," Celery is overkill. A lightweight library like RQ is often enough, and you needn't shoulder the whole operational set of broker + worker + beat + Flower.
  • If you want to lean serverless on an AWS premise, SQS + Lambda is an honest choice too. You don't self-operate worker containers, and the design of idempotent processing can be left to the SQS/Lambda/EventBridge idempotent async-processing guide. The core of idempotency is unchanged whether Celery or SQS.
  • If you want to dig deeper into Celery's design, I've compiled general Celery/Redis production patterns not confined to Flask in the Celery × Redis production job-queue guide.

The criterion is simple — "is any of retries / scheduling / queue separation / monitoring genuinely needed?" If yes, Celery; if not, RQ or a managed queue. It's not too late to migrate to Celery after the requirements grow (YAGNI).


Summary: A Celery Production Checklist

The essentials of running Flask × Celery × Redis at production quality boil down to 3 points — context integration, idempotency, and resilience. Flask's official celery_init_app supports the first; the idempotency key and acks_late/retry design support the latter 2. Finally, let me close with a pre-production confirmation list.

#Check itemBasis (this article)
1Is the worker started as a process / container separate from Gunicorn?§1.2 / §7.1
2Does celery_init_app's FlaskTask stretch an app_context, with db/current_app usable inside the task?§2.1
3Is the config consolidated in Flask config's CELERY key, with connection strings injected via env vars?§2.2
4Do task definitions use @shared_task, not @celery_app.task?§3.1
5Enqueue via .delay()/.apply_async(), retrieve via AsyncResult, and task_ignore_result when no result is needed§3.2–3.5
6On the at-least-once premise, are all tasks designed idempotently (idempotency key / upsert / dedup)?§5.1
7autoretry_for + retry_backoff + retry_jitter + max_retries for transient failures, permanent errors fail immediately§5.2
8acks_late + task_reject_on_worker_lost for tasks that must not be lost (idempotency is a prerequisite)§5.3
9Is the Redis broker's visibility timeout set longer than the longest task time?§5.3
10Do task arguments pass an ID (int) and re-query inside the task (model objects forbidden)?§5.4
11If storing results, TTL via result_expires, and are failures evacuated to a dead-letter?§3.5 / §5.5
12Understand Redis's data loss, and is a critical task recoverable via RabbitMQ / AOF persistence + idempotency?§5.5
13Is Beat 1 replica, and are the tasks Beat enqueues idempotent?§6
14Leak countermeasure via --max-tasks-per-child, hoarding prevention for long tasks via prefetch=1§7.2
15Are you placing task_id correlation in structured logs along with Flower monitoring?§7.3
16On deploy, a warm shutdown via SIGTERM, with stopTimeout longer than the longest task time?§7.4
17Have you considered whether Celery is overkill in the first place (would RQ / SQS+Lambda suffice)?§8

The real difficulty of a job queue lies not in how to use Celery, but in assembling a design where data doesn't get corrupted on the premise of the reality that "tasks run twice, workers go down, and Redis sometimes loses data." The reason I could protect 0 double charges in my B2B SaaS was not that Celery was excellent, but that I made a unique constraint — the idempotency key — guarantee "safe to re-execute." On the foundation of Flask context integration, you stack idempotency and resilience — assemble it in this order, and the job queue becomes a production asset.

For the overall Flask production design, start from the Flask production-operations guide; for the context mechanism, the context deep-dive; for deployment, the production deployment guide.

友田

友田 陽大

Developer of a METI Minister's Award–winning product. With TypeScript + Python + AWS, I deliver SaaS, industry DX, and production-grade generative AI (RAG) end to end — from requirements to infrastructure and operations — single-handedly.

Got a challenge?

From design to implementation and operations — solo × generative AI

Implementation like this article's, end to end from requirements to production. Start with a free 30-minute technical consult and tell me about your situation.

Available for both project-based (contract) and advisory engagements. Start with a free 30-minute consult.

Also worth reading