# Flask × Celery × Redis: Running Background Tasks and Job Queues at Production Quality (Flask Context Integration, Idempotency, Resilience)

> A practical guide to designing async tasks / job queues at production quality with Flask Celery Redis. From the official celery_init_app and FlaskTask app-context integration, shared_task, .delay/AsyncResult, idempotency that withstands at-least-once delivery, the resilience of autoretry/acks_late/visibility timeout, Celery Beat's periodic execution, to a Docker setup that runs workers in a separate container plus Flower monitoring and task_id–correlated logs — explained with real code faithful to the latest Flask official docs and Celery documentation.

- Published: 2026-06-26
- Author: 友田 陽大
- Tags: Python, Flask, Celery, Redis, 非同期処理, 本番運用, バックエンド
- URL: https://tomodahinata.com/en/blog/flask-celery-redis-background-tasks-production-guide
- Category: Flask in production
- Pillar guide: https://tomodahinata.com/en/blog/flask-production-guide

## Key points

- To keep within the request/response time budget, offload email, PDF, export, webhooks, and periodic jobs to a separate process (worker). The architecture is web process + broker (Redis) + worker + result backend, with the worker in a container separate from Gunicorn
- Flask's official celery_init_app stretches an app_context via FlaskTask (a Task subclass). That's why current_app, db, and g are usable inside a task. Read config from Flask config's CELERY key, and use @shared_task rather than @celery_app.task
- Celery is at-least-once delivery. A task can run twice, so design it idempotently with an idempotency key, upsert, and de-duplication. This is the watershed of production quality
- Resilience comes from autoretry_for + retry_backoff + retry_jitter + max_retries; crash safety from acks_late + task_reject_on_worker_lost; with Redis it's backed by the visibility timeout. Pass an ID (int) rather than a SQLAlchemy object as the argument, and re-query inside the task
- Production operations: worker in a separate container + concurrency/prefetch tuning + --max-tasks-per-child for leak prevention, observability via Flower and task_id–correlated logs, and a warm shutdown that drains in-flight tasks. If Celery is overkill, honest alternatives like RQ or SQS/Lambda are presented too

---

## **Introduction: Trying to Do Everything Inside the Request Breaks Production**

A Flask view function runs within the **limited time budget** from receiving an HTTP request to returning a response. Yet a real app has heaps of work that won't fit within that budget. Generating invoice PDFs, sending confirmation emails, exporting CSVs, dispatching webhooks to external APIs, generating image thumbnails, nightly aggregation batches — run these synchronously inside a view, and the user is shown a spinner for seconds to tens of seconds, a Gunicorn worker is monopolized for one of them, and production silently clogs with timeouts and retry storms.

The solution is simple, yet bares its fangs if you get the design wrong. **"Offload heavy work outside the request (to a separate process)"** — this is a background task (job queue). The de facto standard in Python/Flask is **Celery**, and what carries its message delivery is **Redis** (or RabbitMQ).

This article, as a spoke of the [Flask production-operations guide (pillar)](/blog/flask-production-guide), runs through the design of assembling background tasks at production quality with **Flask 3.1.x + Celery 5.x + Redis**, using real code faithful to Flask's official Celery-integration pattern and the Celery official documentation. There are 3 central themes — **(1) Flask context integration** (creating a state where `db` and `current_app` are usable inside a task), **(2) idempotency** (the premise that a task can run twice), and **(3) resilience** (retries, crash safety, graceful shutdown). Miss these 3 and the job queue becomes "a black box that looks like it's working but occasionally corrupts data."

The author **designed and implemented the backend of a B2B SaaS that won the Minister of Economy, Trade and Industry Award in Python / Flask / SQLAlchemy / PostgreSQL, and ran its Celery workers in production on ECS (Fargate)**. To handle processing like billing, notifications, and exports — "must not fail, but can't be done inside the request" — without double charges or dropped notifications, what was needed was precisely this discipline of idempotency and resilience. This article verbalizes the design judgments from that field experience, against the official specifications.

> 💡 **The versions covered in this article**: it assumes **Celery 5.x (the latest at the time of writing is 5.6.3)** and **Flask 3.1.x**. Celery is installed separately via `pip install celery` (it is not a dependency of Flask). Redis takes the lead as the broker, but where resistance to data loss is required, RabbitMQ is treated honestly as a candidate too.

---

## **1. Why Background Tasks: The Components and the Great Principle of a "Separate Process"**

### 1.1 Work That Should Be Offloaded Outside the Request

First, let me make the criterion clear. The only work you may run synchronously inside a view is work that **fits within the user's wait time and that, even if it fails, can be returned to the user on the spot**. Processing like the following is offloaded to the background as a rule.

| Kind | Examples | Why offload |
|---|---|---|
| Notification | Confirmation email, Slack notification, push notification | External SMTP/API is slow / goes down. Don't make the response wait |
| Generation | Invoice PDF, reports, thumbnails, video transcoding | CPU/IO heavy, takes seconds to minutes |
| Export | CSV/Excel generation of large data, ZIP packaging | Grows in proportion to row count, exceeds the request budget |
| Integration | Webhook dispatch, third-party sync | The other side is delayed / fails. Retries are needed |
| Periodic | Nightly aggregation, reminders, expiry processing | No request exists in the first place (cron-like) |

### 1.2 The 4 Components

A job queue with Celery is made up of the following 4 actors.

| Role | Concrete form | Responsibility |
|---|---|---|
| **Web process** | Gunicorn + Flask `app` | "Enqueues" tasks (`.delay()`). Does not wait for the result |
| **Broker (required)** | **Redis** or RabbitMQ | The queue that stores and delivers task messages |
| **Worker process** | `celery -A make_celery worker` | Pulls tasks from the queue and "executes" them |
| **Result backend (optional)** | Redis / RPC / DB, etc. | Stores task results and states (if needed) |

The Celery docs state clearly that "Celery requires a **message broker**, and you can use RabbitMQ or Redis. The **result backend** is optional." On Redis, there is a note that it is "feature-complete but **more susceptible to data loss than RabbitMQ**," and this will definitely matter in the resilience design discussed later.

> ⚠️ **The most important great principle: the worker is a process / container separate from Gunicorn.** This is where beginners stumble most. `celery worker` is a process independent of the web server (Gunicorn), and you **start, scale, and deploy it separately**. No matter how much you scale the web, tasks won't get processed, and vice versa. In a container setup you separate the "web container," "worker container," and "redis container" (§7 shows a Docker Compose). This separation is on the same line of thought as the "separation of the WSGI app and the WSGI server" covered in the [production deployment guide](/blog/flask-deployment-gunicorn-docker-production-wsgi-guide) — **split processes by role**.

---

## **2. Flask's Official Celery Integration: A Deep Dive into `celery_init_app`**

This is the technical core of the article. The integration shown by the Flask official (flask.palletsprojects.com's `patterns/celery/`) aims to **"automatically stretch a Flask application context when running a task."** Why is that needed? Because inside a task you want to use `db.session`, `current_app.config`, and `g`. These Flask features work only on top of an **application context** (for details of this mechanism, see the [context deep-dive](/blog/flask-application-request-context-g-current-app-guide)). A worker running outside the request has no such context by default.

### 2.1 `FlaskTask`: A Task Subclass That Stretches the app_context

This is the official integration function. I present it verbatim in structure, without adding a single line.

```python
# myapp/celery_app.py
from celery import Celery, Task
from flask import Flask


def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app
```

The official explanation (translated verbatim) is: **"This Task subclass automatically runs the task function with a Flask application context active. This makes services like database connections available inside the task."** Let me pin down the meaning line by line.

- `class FlaskTask(Task)` … inherits Celery's `Task` and overrides `__call__`. **Every time a task is called, it wraps it in `with app.app_context():` before calling the body `self.run(...)`**. This makes the inside of the task "with an application context," so `db.session` / `current_app` / `g` are usable.
- `Celery(app.name, task_cls=FlaskTask)` … sets this `FlaskTask` as the default class for all tasks.
- `config_from_object(app.config["CELERY"])` … reads Celery's config from the **`CELERY` key of Flask config**. The config is unified on the Flask side, and double management disappears.
- `celery_app.set_default()` … sets this Celery app as the "current app." This is the key by which the `@shared_task` described later finds this instance during a request.
- `app.extensions["celery"] = celery_app` … stores it in the extension registry. The worker's entry point (§4) retrieves the Celery app from here.

### 2.2 Wiring Into the Application Factory

`celery_init_app` is called inside the `create_app` factory established in the [large-app structure guide](/blog/flask-application-factory-blueprints-large-app-structure-guide). The official factory wiring follows.

```python
# myapp/__init__.py
from flask import Flask

from myapp.celery_app import celery_init_app


def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app
```

There are 2 points. **(1)** Celery config is bundled as a dict inside the single key `CELERY`. `broker_url` is the broker (Redis), `result_backend` is where results are stored, and `task_ignore_result=True` is the default of "don't store results for tasks that don't need them" (§3.4 discusses the trade-off). **(2)** With `app.config.from_prefixed_env()`, you can also override from environment variables like `FLASK_CELERY__BROKER_URL` — i.e., you can **inject connection strings via environment variables in production rather than baking them into code**. The handling of secrets aligns with the pillar's and the [production deployment guide](/blog/flask-deployment-gunicorn-docker-production-wsgi-guide)'s policy (`FLASK_`-prefixed environment variables).

> 💡 **Why `with app.app_context()` is needed "for every task."** Unlike a request handler, a worker is a long-lived process, and one worker processes thousands of tasks in sequence. The context should be opened and closed per task, and `FlaskTask.__call__`'s `with` block creates exactly that scope. This way `db.session` is correctly established and torn down per task, preventing session leaks and state bleed into other tasks.

---

## **3. Defining, Invoking, and Retrieving Results of Tasks**

### 3.1 Use `@shared_task` (Not `@celery_app.task`)

In the factory pattern, using **`@shared_task` to define tasks** is strongly recommended officially. The reason (translated verbatim) is: **"`@celery_app.task` requires access to the `celery_app` object, which is not available in the factory pattern. Also, tasks decorated with it become bound to a specific Flask/Celery instance. Instead, use Celery's `@shared_task`. This creates task objects that 'access whatever the current app is.'"**

```python
# myapp/tasks.py
from celery import shared_task


@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b
```

Because `@shared_task` doesn't import a specific `celery_app`, it creates no circular imports and coexists cleanly with Blueprints and module splitting. Thanks to the `set_default()` that `celery_init_app` called, `add_together.delay(...)` finds the "current Celery app" at execution time and enqueues the task there.

### 3.2 Enqueue a Task: `.delay()`

Enqueuing from a view is `.delay()`. The standard is to **not wait for the result and return the `task_id` immediately**.

```python
# myapp/views.py
from flask import request

from myapp.tasks import add_together


@app.post("/add")
def start_add():
    a = request.json["a"]
    b = request.json["b"]
    result = add_together.delay(a, b)
    return {"result_id": result.id}
```

`.delay(a, b)` loads the task message onto the broker (Redis) and returns immediately. You return a `result_id` to the user and have them query for the result later — this "separation of enqueue and retrieval" is the essence of async.

### 3.3 Retrieve the Result: `AsyncResult`

The result or state is fetched from the `result_id` (task ID) via `AsyncResult`. The official polling endpoint follows.

```python
# myapp/views.py
from celery.result import AsyncResult


@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }
```

The front end holds the `result_id` and polls `/result/<id>`, reading `value` once `ready` becomes `true`. How to control enqueue time and delay with `apply_async` is the next section.

### 3.4 `.apply_async()`: Delay, Scheduling, Queue Routing

`.delay()` is a thin wrapper over `.apply_async()`. When you need finer control, use `apply_async`.

```python
# 30秒後に実行（リトライ間隔の調整やデバウンスに）
add_together.apply_async(args=[2, 3], countdown=30)

# 特定時刻に実行（予約配信・スケジュール送信）
from datetime import datetime, timedelta, timezone
add_together.apply_async(args=[2, 3], eta=datetime.now(timezone.utc) + timedelta(hours=1))

# キューを分けてルーティング（重いタスクと軽い通知を別ワーカーで捌く）
generate_invoice_pdf.apply_async(args=[invoice_id], queue="pdf")
```

With `countdown`/`eta` you get delay/scheduling, and with `queue` you get queue routing. Splitting queues prevents the accident where "heavy processing like PDF generation clogs light processing like email sending" (start a worker with `-Q pdf` to dedicate it).

### 3.5 The Trade-Off of `task_ignore_result`

`task_ignore_result=True` (the factory default) **skips writes to the result backend**. It is ideal for "fire-and-forget"–type tasks like email sending — no wasted results piling up in Redis, lower cost. On the other hand, a task that wants to return a result via `/result/<id>` is individually enabled with `@shared_task(ignore_result=False)`.

| Setting | Stores result | Suited task |
|---|---|---|
| `task_ignore_result=True` (default) | No | Notification, webhook, fire-and-forget |
| `@shared_task(ignore_result=False)` | Yes | Export, returning a computed result, showing progress |

> ⚠️ When you do store results, **the result backend swells without bound**. With Redis as the backend, old results keep eating memory, so always set a TTL with `result_expires` (default 1 day) (§5.4).

---

## **4. The Worker's Entry Point and Startup**

The worker, separate from the Flask app, retrieves the Celery app from a thin entry point called `make_celery.py` and starts. This is the official verbatim structure.

```python
# make_celery.py
from myapp import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]
```

It calls `create_app()` to assemble the Flask app and exposes the Celery app — stored in `app.extensions["celery"]` — as `celery_app`. The `-A` (app) of `celery -A make_celery` finds this `celery_app`.

```bash
# ワーカーを起動（INFO ログ）
celery -A make_celery worker --loglevel INFO

# 定期実行スケジューラ（Beat）を起動（§6）
celery -A make_celery beat --loglevel INFO
```

`worker` is the process that executes tasks, and `beat` is the scheduler that enqueues tasks cron-style. **The two are separate processes**, and in production you separate their containers too (§6, §7).

---

## **5. Production-Quality Resilience: This Is the Watershed Between Debt and Asset**

From here is the part that divides a demo from production. A job queue can be written in 30 minutes if "just running" is enough, but to **run it without breaking in production** you must correctly assemble idempotency, retries, crash safety, and argument design.

### 5.1 Idempotency: The Premise That a Task Can Run Twice

The most important premise. The delivery guarantee of Celery (and message queues in general) is **at-least-once**. That is, **the same task running 2 or more times happens as a normal case**. If a worker crashes right after completing a task but before sending the ack, the broker treats it as "incomplete" and redelivers.

Therefore, **a task must be designed to be idempotent (the same result no matter how many times it runs)**. This is exactly the same principle covered in the [SQS/Lambda idempotent-processing guide](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide); the essence is invariant even when the queue type changes.

```python
# myapp/tasks.py
from celery import shared_task

from myapp import db
from myapp.models import EmailLog, User


@shared_task(ignore_result=True)
def send_welcome_email(user_id: int, idempotency_key: str) -> None:
    # 冪等キーで「すでに送ったか」を原子的にチェック＆記録する
    existing = db.session.execute(
        db.select(EmailLog).filter_by(idempotency_key=idempotency_key)
    ).scalar_one_or_none()
    if existing is not None:
        return  # 二重実行：何もしない（冪等）

    user = db.session.get(User, user_id)
    if user is None:
        return  # ユーザーが消えている：静かに終える

    send_email(to=user.email, template="welcome")

    db.session.add(EmailLog(idempotency_key=idempotency_key, user_id=user_id))
    db.session.commit()  # 送信の事実を記録。次の実行はここで弾かれる
```

There are 3 standard ways to make a task idempotent.

| Technique | How | Suited situation |
|---|---|---|
| **Idempotency key + ledger** | Record "executed" with a unique key, return early on duplicates | Side effects that are bad to do twice — email, notifications, billing |
| **upsert** | `INSERT ... ON CONFLICT DO UPDATE` so insert and update give the same result | Record creation / update |
| **De-duplication (dedup)** | Lock on the task ID at enqueue / suppress an existing job | Cases where the same trigger fires repeatedly in a short window |

> 💡 **0 double charges is protected by idempotency**. In my B2B SaaS, "do-it-twice-and-it's-an-incident" processing — billing and notifications — was handled by Celery. **Recording the idempotency key in a ledger table with a unique constraint, and structurally rejecting re-execution the moment `commit` runs** — this design is why, even when a worker received a redelivery, no double charge occurred in production. "Safe to retry" is not a prayer; it is guaranteed by the table's unique constraint.

### 5.2 Retries: `autoretry_for` + Backoff + Jitter

External dependencies (SMTP, third-party APIs) fail temporarily. When they fail, the standard is to **automatically retry with exponential backoff + jitter**. This writes, declaratively with Celery decorators, the resilience patterns discussed in [retry, backoff, circuit breaker](/blog/retry-backoff-circuit-breaker-resilience-patterns-guide).

```python
# myapp/tasks.py
from celery import shared_task
from requests.exceptions import RequestException


@shared_task(
    autoretry_for=(RequestException,),  # この例外なら自動リトライ
    retry_backoff=True,                 # 指数バックオフ（2,4,8,16…秒）
    retry_backoff_max=600,              # 上限600秒
    retry_jitter=True,                  # ばらつきを足してthundering herdを防ぐ
    max_retries=5,                      # 5回で打ち切り
)
def deliver_webhook(endpoint_id: int, payload_id: int) -> None:
    endpoint = db.session.get(WebhookEndpoint, endpoint_id)
    payload = db.session.get(WebhookPayload, payload_id)
    resp = requests.post(endpoint.url, json=payload.body, timeout=10)
    resp.raise_for_status()  # 5xx等で RequestException → autoretry
```

| Parameter | Role | Recommendation |
|---|---|---|
| `autoretry_for` | The exception types to retry on | **Transient failures only**. Don't include permanent errors like `ValueError` |
| `retry_backoff` | Exponential backoff | `True`. Don't hammer the other side |
| `retry_jitter` | Inject variance | `True`. Scatter simultaneous redelivery on simultaneous failures (herd) |
| `max_retries` | Maximum number of attempts | Keep it finite. Infinite retries invite death |

> ⚠️ **Don't retry permanent errors**. If you include "fail no matter how many times" errors like validation failures (invalid input) or 404 in the retry targets, you keep burning workers for nothing. Limit `autoretry_for` to **only transient failures like network, timeout, and 5xx**, and topple permanent errors to immediate failure (or the dead-letter discussed later).

### 5.3 Crash Safety: `acks_late` + `task_reject_on_worker_lost`

Default Celery acks (acknowledges) a task **the moment it receives it**. This means "if a worker crashes while processing, that task is lost." For tasks that must not be lost, use `acks_late` to **ack after the task completes**.

```python
# myapp/celeryconfig.py（CELERY 辞書に展開してもよい）
task_acks_late = True                 # 完了後にackする（クラッシュで失わない）
task_reject_on_worker_lost = True     # ワーカー消失時はタスクを再キューする
worker_prefetch_multiplier = 1        # 1本ずつ取り、長時間タスクの抱え込みを防ぐ
```

The combination of `acks_late=True` and `task_reject_on_worker_lost=True` means that even if a worker vanishes to OOM or SIGKILL, **the task is redelivered**. But this is also a design that **raises the chance of a task running twice** — which is exactly why the idempotency of §5.1 is a prerequisite. **You must not introduce `acks_late` without idempotency** (a double execution directly causes an incident).

When using Redis as the broker, the timing of redelivery is decided by the **visibility timeout**. This is the threshold of "redeliver to another worker if the task a worker pulled is not acked within this many seconds."

```python
# Redisブローカー固有：visibility timeout（既定3600秒）
broker_transport_options = {"visibility_timeout": 3600}
```

> ⚠️ **Make the visibility timeout longer than the task's maximum execution time**. For example, if video transcoding takes 1 hour but the visibility timeout is 1800 seconds, **a still-running task is misjudged as "lost" and runs twice**. Set it with the longest task's duration + a margin. This is a Redis-specific pitfall that RabbitMQ doesn't have.

### 5.4 Pass an ID for Arguments (Don't Pass an Object)

A design principle the official docs warn about in bold: **"Pass the user's id rather than the user object."** Task arguments must be **serializable**, and you must not pass a SQLAlchemy model object as `.delay(user)`.

```python
# ❌ 悪い：モデルオブジェクトを渡す（シリアライズできない／古い状態が固定される）
send_welcome_email.delay(user)

# ✅ 良い：IDを渡し、タスク内のapp_contextで「いま」の状態を再クエリする
send_welcome_email.delay(user.id, idempotency_key=str(uuid4()))
```

There are 2 reasons. **(1) Can't be serialized** — a task is encoded as JSON, etc., and passes through the broker, so an ORM object can't ride along. **(2) The state goes stale** — even if you carry the object as of enqueue time, by the time the worker executes, the DB value may have changed. So **pass only the ID and `db.session.get(User, user_id)` inside the task to re-fetch the "now" state**. Thanks to `FlaskTask` stretching an app_context, this re-query writes naturally inside the task (§2.1's benefit pays off here).

> 💡 **The DB session is per task**. Because `FlaskTask.__call__` opens and closes the app_context per task, Flask-SQLAlchemy's `db.session` is also scoped per task. Make `db.session.commit()` (or `rollback()` on an exception) explicit at the task's end to prevent state bleed across sessions.

### 5.5 Where Failures Go: Dead-Letter and Redis's Data Loss

A task that exhausts `max_retries` ends in "failure." In production, **don't swallow failures; create a destination** — in Celery, the standard design is an `on_failure` hook, or evacuating to a separate "dead-letter" table / queue on failure.

```python
@shared_task(bind=True, max_retries=5, autoretry_for=(RequestException,))
def deliver_webhook(self, endpoint_id: int, payload_id: int) -> None:
    try:
        ...  # 本体
    except RequestException:
        if self.request.retries >= self.max_retries:
            record_dead_letter(endpoint_id, payload_id)  # 退避して可視化
        raise  # autoretry に委ねる
```

And the honest caution touched on at the start — **Redis is "susceptible to data loss" as a broker**. If Redis restarts or fails over, depending on the persistence settings, unprocessed tasks on the queue may vanish.

> ⚠️ **For "tasks that must never be lost," consider RabbitMQ**. For processing like billing or legal notifications where not even one drop is tolerated, choosing RabbitMQ — with its solid delivery guarantee — as the broker is the honest move. If you use Redis, enable AOF persistence and create a state where "even if it vanishes, it can be recovered by re-enqueue" with `acks_late` + idempotency. Either way, **design on the premise that "because it's Redis, it sometimes vanishes."** This is a trade-off I actually kept in mind in my own production, without idealizing Celery.

---

## **6. Periodic Execution: Celery Beat**

Cron-like periodic jobs (nightly aggregation, reminders, expiry processing) are enqueued by **Celery Beat**. Beat is a "scheduler" that merely loads task messages onto the queue at the appointed times — **the worker executes them**.

```python
# CELERY 辞書に beat_schedule を足す
from celery.schedules import crontab

CELERY = dict(
    broker_url="redis://localhost",
    result_backend="redis://localhost",
    beat_schedule={
        "expire-trials-nightly": {
            "task": "myapp.tasks.expire_trials",
            "schedule": crontab(hour=3, minute=0),  # 毎日03:00
        },
        "send-reminders-hourly": {
            "task": "myapp.tasks.send_due_reminders",
            "schedule": crontab(minute=0),  # 毎時0分
        },
    },
)
```

Startup is a separate process.

```bash
celery -A make_celery beat --loglevel INFO
```

> ⚠️ **Run only 1 Beat process**. If you start 2 Beats meaning to make them redundant, **the same schedule is enqueued twice** and the job runs twice. Run Beat as a single scheduler, and on a container platform fix it to "beat = 1 replica." And per §5.1, make the tasks Beat enqueues **idempotent** too — make "safe even if double-enqueued, safe even if double-executed" a double wall.

---

## **7. Deployment and Observability: Separate Containers, Monitoring, Correlated Logs**

### 7.1 The Worker Is a Separate Container: Docker Compose

Putting the great principle of §1.2 into a topology looks like this. Separate web, worker, beat, and redis, and the standard is to **start the same image with different commands** (the image is shared since it's the same codebase).

```yaml
# compose.yaml
services:
  redis:
    image: redis:7-alpine
    restart: unless-stopped

  web:
    build: .
    command: gunicorn -w 4 -b 0.0.0.0:8000 "make_celery:flask_app"
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
      FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
    depends_on: [redis]
    ports: ["8000:8000"]

  worker:
    build: .
    command: celery -A make_celery worker --loglevel INFO --concurrency 4 --max-tasks-per-child 100
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
      FLASK_CELERY__RESULT_BACKEND: redis://redis:6379/1
    depends_on: [redis]

  beat:
    build: .
    command: celery -A make_celery beat --loglevel INFO
    environment:
      FLASK_CELERY__BROKER_URL: redis://redis:6379/0
    depends_on: [redis]
```

web is Gunicorn (reusing the design from the [production deployment guide](/blog/flask-deployment-gunicorn-docker-production-wsgi-guide) as-is), worker and beat are Celery, and redis is the broker. On ECS (Fargate), this corresponds to 3 services: "web service," "worker service," and "beat service (1 replica)."

### 7.2 Tuning the Worker

| Flag | Role | Guidance |
|---|---|---|
| `--concurrency N` | Parallelism within the worker process | CPU-bound → ≈ vCPU count; IO-bound → more |
| `worker_prefetch_multiplier` | Number of tasks pulled at once | Long tasks → **1** (prevent hoarding and unfairness) |
| `--max-tasks-per-child N` | Restart the child process after this many | **Leak countermeasure**. Effective for heavy deps like PDF/image processing |

`--max-tasks-per-child` is a realistic defense against the problem of workers ballooning due to memory leaks in third-party libraries. It recreates the child process every N tasks and periodically frees memory.

### 7.3 Monitoring: Flower and Correlated Logs

Observability is the lifeline of production operations. Celery has a web dashboard called **Flower**, where you can see task success/failure, execution time, queue length, and worker activity in real time.

```bash
pip install flower
celery -A make_celery flower --port=5555
```

In addition, extend the **structured logging + request-ID correlation** established in the [error-handling / observability guide](/blog/flask-error-handling-logging-observability-guide) to the task side too. Always put the **`task_id` in task logs** — that way you can trace "user's request → enqueued task → execution in the worker" as a single line.

```python
import logging
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


@shared_task(bind=True)
def generate_invoice_pdf(self, invoice_id: int) -> None:
    # self.request.id が task_id。構造化ログに必ず含める
    logger.info("invoice pdf start", extra={"task_id": self.request.id, "invoice_id": invoice_id})
    ...
    logger.info("invoice pdf done", extra={"task_id": self.request.id, "invoice_id": invoice_id})
```

### 7.4 Graceful Shutdown (Warm Shutdown)

When you bring a worker down on each deploy, **you must not kill an in-progress task midway**. Send `SIGTERM` to a Celery worker and it does a **warm shutdown** — it stops accepting new tasks and **finishes the running tasks to the end** before terminating the process.

In an ECS rolling deploy, `SIGTERM` is sent at container stop, and within the grace of `stopTimeout` (default 30 seconds, extend if needed) the worker completes its warm shutdown. This applies, to the worker side, the same thought as the Gunicorn graceful shutdown covered in the [production deployment guide](/blog/flask-deployment-gunicorn-docker-production-wsgi-guide) — **"drain what's in progress before going down."** If you have long-running tasks, set `stopTimeout` longer than that longest duration.

---

## **8. Honest Alternatives: When Celery Is Overkill**

Celery is powerful, but **not every async processing needs Celery**. The best judgment for avoiding technical debt is often "don't use it."

| Option | Suited situation | Trade-off |
|---|---|---|
| **Celery + Redis/RabbitMQ** | Full-scale operation needing retries, scheduling, queue routing, large volumes | Many components (broker + worker + beat + monitoring) |
| **RQ (Redis Queue)** | Simple background processing. Want to complete with Redis alone | Thinner features than Celery (weak on advanced routing, etc.) |
| **Managed queue (SQS + Lambda)** | Want to lower operational load with serverless. AWS premise | Vendor lock-in. Runs outside the Flask process |

- **If the requirement is roughly "send one email asynchronously," Celery is overkill.** A lightweight library like `RQ` is often enough, and you needn't shoulder the whole operational set of broker + worker + beat + Flower.
- **If you want to lean serverless on an AWS premise, SQS + Lambda** is an honest choice too. You don't self-operate worker containers, and the design of idempotent processing can be left to the [SQS/Lambda/EventBridge idempotent async-processing guide](/blog/aws-sqs-lambda-eventbridge-idempotent-async-processing-guide). The core of idempotency is unchanged whether Celery or SQS.
- **If you want to dig deeper into Celery's design**, I've compiled general Celery/Redis production patterns not confined to Flask in the [Celery × Redis production job-queue guide](/blog/celery-redis-production-async-task-queue-guide).

The criterion is simple — **"is any of retries / scheduling / queue separation / monitoring genuinely needed?"** If yes, Celery; if not, RQ or a managed queue. It's not too late to migrate to Celery after the requirements grow (YAGNI).

---

## **Summary: A Celery Production Checklist**

The essentials of running Flask × Celery × Redis at production quality boil down to 3 points — **context integration, idempotency, and resilience**. Flask's official `celery_init_app` supports the first; the idempotency key and `acks_late`/retry design support the latter 2. Finally, let me close with a pre-production confirmation list.

| # | Check item | Basis (this article) |
|---|---|---|
| 1 | Is the worker started as a **process / container separate from Gunicorn**? | §1.2 / §7.1 |
| 2 | Does `celery_init_app`'s **`FlaskTask` stretch an app_context**, with `db`/`current_app` usable inside the task? | §2.1 |
| 3 | Is the config consolidated in **Flask config's `CELERY` key**, with connection strings injected via env vars? | §2.2 |
| 4 | Do task definitions use **`@shared_task`, not `@celery_app.task`**? | §3.1 |
| 5 | Enqueue via **`.delay()`/`.apply_async()`**, retrieve via **`AsyncResult`**, and `task_ignore_result` when no result is needed | §3.2–3.5 |
| 6 | **On the at-least-once premise, are all tasks designed idempotently** (idempotency key / upsert / dedup)? | §5.1 |
| 7 | **`autoretry_for` + `retry_backoff` + `retry_jitter` + `max_retries`** for transient failures, permanent errors fail immediately | §5.2 |
| 8 | **`acks_late` + `task_reject_on_worker_lost`** for tasks that must not be lost (idempotency is a prerequisite) | §5.3 |
| 9 | Is the Redis broker's **visibility timeout set longer than the longest task time**? | §5.3 |
| 10 | Do task arguments **pass an ID (int) and re-query inside the task** (model objects forbidden)? | §5.4 |
| 11 | If storing results, **TTL via `result_expires`**, and are failures evacuated to a **dead-letter**? | §3.5 / §5.5 |
| 12 | Understand Redis's data loss, and is a **critical task recoverable via RabbitMQ / AOF persistence + idempotency**? | §5.5 |
| 13 | Is **Beat 1 replica**, and are the tasks Beat enqueues idempotent? | §6 |
| 14 | **Leak countermeasure** via `--max-tasks-per-child`, hoarding prevention for long tasks via `prefetch=1` | §7.2 |
| 15 | Are you placing **`task_id` correlation** in structured logs along with **Flower monitoring**? | §7.3 |
| 16 | On deploy, a **warm shutdown via SIGTERM**, with `stopTimeout` longer than the longest task time? | §7.4 |
| 17 | Have you considered whether **Celery is overkill** in the first place (would RQ / SQS+Lambda suffice)? | §8 |

The real difficulty of a job queue lies not in how to use Celery, but in **assembling a design where data doesn't get corrupted on the premise of the reality that "tasks run twice, workers go down, and Redis sometimes loses data."** The reason I could protect 0 double charges in my B2B SaaS was not that Celery was excellent, but that **I made a unique constraint — the idempotency key — guarantee "safe to re-execute."** On the foundation of Flask context integration, you stack idempotency and resilience — assemble it in this order, and the job queue becomes a production asset.

For the overall Flask production design, start from the [Flask production-operations guide](/blog/flask-production-guide); for the context mechanism, the [context deep-dive](/blog/flask-application-request-context-g-current-app-guide); for deployment, the [production deployment guide](/blog/flask-deployment-gunicorn-docker-production-wsgi-guide).
