Skip to main content
All posts

Task Queues in FastAPI: Handling Long-Running Operations Like a Senior Engineer

16 June 2026

Task Queues in FastAPI: Handling Long-Running Operations Like a Senior Engineer

I’ve watched junior developers block HTTP requests on database migrations, file uploads, and AI inference calls. Then I’ve watched them ship broken retry logic that duplicates work or loses jobs entirely. This post is what I wish I’d read before burning production on a Sunday morning.

Why Task Queues Matter (And When They Don’t)

A task queue is fundamentally about decoupling work from HTTP requests. When a user submits data to your FastAPI endpoint, you don’t want them waiting 30 seconds for a Celery task to complete. You accept the request, queue the work, and respond immediately.

I prefer Celery + Redis over alternatives because:

I’ve seen teams try to use asyncio alone for “background tasks”. That works until your process crashes and loses jobs. I’ve also seen custom queue implementations that leak memory under load. Neither is worth the pain.

The Production Setup

Here’s the minimal, opinionated stack I use in CitizenApp:

# requirements.txt
fastapi==0.104.0
celery==5.3.4
redis==5.0.1
python-dotenv==1.0.0
sqlalchemy==2.0.0

First, configure Celery properly:

# app/celery_app.py
from celery import Celery
import os

celery_app = Celery(
    "app",
    broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_acks_late=True,  # Critical: acknowledge after task completes
    worker_prefetch_multiplier=1,  # Prevent task hoarding
    task_soft_time_limit=600,  # 10 minutes soft limit
    task_time_limit=900,  # 15 minutes hard limit
)

@celery_app.task(bind=True, max_retries=3)
def process_user_export(self, user_id: int):
    try:
        # Your actual work
        export_data = generate_user_export(user_id)
        save_to_s3(export_data)
    except Exception as exc:
        # Exponential backoff: 60s, 120s, 240s
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

Notice task_acks_late=True. This is mandatory in production. Without it, Redis acknowledges the task immediately, and if your worker crashes mid-execution, you lose the job. With late acks, the task stays in the queue until it actually completes.

Now integrate it with FastAPI:

# app/api/exports.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.celery_app import process_user_export
import logging

router = APIRouter()
logger = logging.getLogger(__name__)

class ExportRequest(BaseModel):
    user_id: int

@router.post("/export")
async def request_export(req: ExportRequest):
    """Queue a user export, return immediately."""
    try:
        task = process_user_export.delay(req.user_id)
        return {
            "task_id": task.id,
            "status": "queued",
            "message": "Your export is being prepared. Check back shortly."
        }
    except Exception as e:
        logger.error(f"Failed to queue export: {e}")
        raise HTTPException(status_code=500, detail="Failed to queue task")

@router.get("/export/{task_id}")
async def get_export_status(task_id: str):
    """Poll for task completion."""
    from app.celery_app import celery_app
    
    result = celery_app.AsyncResult(task_id)
    
    if result.state == "PENDING":
        return {"status": "pending", "progress": 0}
    elif result.state == "SUCCESS":
        return {"status": "completed", "download_url": result.result}
    elif result.state == "FAILURE":
        return {
            "status": "failed",
            "error": str(result.info)
        }
    else:
        return {"status": result.state}

This is the pattern: queue immediately in your endpoint, let clients poll a status endpoint.

Handling Webhooks and External Services

Long-running API calls to external services (AI inference, payment processing) should always be queued:

# app/tasks/ai_tasks.py
from app.celery_app import celery_app
from anthropic import Anthropic
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, max_retries=5)
def analyze_user_document(self, document_id: int, user_id: int):
    """
    Call Claude API, save results to database.
    Retries on network failures, not on validation errors.
    """
    try:
        doc = get_document(document_id)
        if not doc:
            raise ValueError(f"Document {document_id} not found")
        
        client = Anthropic()
        response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"Analyze this: {doc.text}"
            }]
        )
        
        analysis = response.content[0].text
        save_analysis_to_db(document_id, analysis)
        
        # Send webhook to user's system
        notify_analysis_complete(user_id, document_id)
        
    except ValueError as e:
        # Don't retry validation errors
        logger.error(f"Validation error: {e}")
        save_task_failure(document_id, str(e))
    except Exception as e:
        # Retry network/API errors with exponential backoff
        logger.warning(f"Attempt {self.request.retries + 1} failed: {e}")
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 30)

I prefer this over fire-and-forget because you get automatic retries and error tracking for free.

Gotcha: Dead Letter Queues and Observability

Here’s what burned me in production: tasks would fail silently, and I wouldn’t know until users complained.

Always implement task failure logging:

# app/celery_app.py (add to config)
@celery_app.task(bind=True)
def on_task_failure(self, exc, task_id, args, kwargs, einfo):
    """Called when any task fails."""
    logger.error(
        f"Task {task_id} failed",
        extra={
            "task_name": self.name,
            "exception": str(exc),
            "traceback": str(einfo),
            "args": args,
        }
    )
    # Send alert to Sentry, DataDog, etc.
    send_alert_to_monitoring(task_id, exc)

celery_app.conf.task_error_handler = on_task_failure

And set up a dead-letter queue for tasks that exceed max retries:

# app/tasks/dlq.py
@celery_app.task
def handle_dead_letter(task_name, task_id, args, exc):
    """
    Tasks that failed all retries land here.
    Log for manual investigation.
    """
    logger.critical(
        f"DEAD LETTER: {task_name} ({task_id})",
        extra={"original_args": args, "final_exception": str(exc)}
    )

I’ve also learned that you must monitor queue depth. A growing queue means your workers can’t keep up. Add a simple health check:

# app/api/health.py
from app.celery_app import celery_app
from celery.app.control import Inspect

@router.get("/health/queues")
async def queue_health():
    """Check Celery queue depths."""
    inspect = Inspect(app=celery_app)
    active = inspect.active()
    reserved = inspect.reserved()
    
    total_active = sum(len(tasks) for tasks in active.values())
    total_reserved = sum(len(tasks) for tasks in reserved.values())
    
    return {
        "workers_online": len(active),
        "active_tasks": total_active,
        "reserved_tasks": total_reserved,
        "healthy": total_reserved < 1000  # Alert if backlog grows
    }

What I Missed Early On

I didn’t use task_acks_late=True initially. Lost jobs on worker crashes. I also didn’t set worker_prefetch_multiplier=1, so one worker would hog all pending tasks while others sat idle. And I definitely didn’t implement proper error handling—I just assumed tasks would work.

Start with the config I showed you. It’s battle-tested. Then optimize once you see what actually bottlenecks.

You might also like

Comments

All comments are moderated before appearing.

Loading comments…

Leave a comment

0/2000

Building something like this?

I build production-grade Python + React applications. Let's talk about your project.

Get in touch