Task Queues in FastAPI: Handling Long-Running Operations Like a Senior Engineer
I’ve watched junior developers block HTTP requests on database migrations, file uploads, and AI inference calls. Then I’ve watched them ship broken retry logic that duplicates work or loses jobs entirely. This post is what I wish I’d read before burning production on a Sunday morning.
Why Task Queues Matter (And When They Don’t)
A task queue is fundamentally about decoupling work from HTTP requests. When a user submits data to your FastAPI endpoint, you don’t want them waiting 30 seconds for a Celery task to complete. You accept the request, queue the work, and respond immediately.
I prefer Celery + Redis over alternatives because:
- Celery is production-tested at scale (Spotify, Instagram). It handles retries, dead-letter queues, and result persistence out of the box.
- Redis is stupid fast for a broker. No database overhead, no complex setup.
- It integrates seamlessly with FastAPI without architectural gymnastics.
I’ve seen teams try to use asyncio alone for “background tasks”. That works until your process crashes and loses jobs. I’ve also seen custom queue implementations that leak memory under load. Neither is worth the pain.
The Production Setup
Here’s the minimal, opinionated stack I use in CitizenApp:
# requirements.txt
fastapi==0.104.0
celery==5.3.4
redis==5.0.1
python-dotenv==1.0.0
sqlalchemy==2.0.0
First, configure Celery properly:
# app/celery_app.py
from celery import Celery
import os
celery_app = Celery(
"app",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_acks_late=True, # Critical: acknowledge after task completes
worker_prefetch_multiplier=1, # Prevent task hoarding
task_soft_time_limit=600, # 10 minutes soft limit
task_time_limit=900, # 15 minutes hard limit
)
@celery_app.task(bind=True, max_retries=3)
def process_user_export(self, user_id: int):
try:
# Your actual work
export_data = generate_user_export(user_id)
save_to_s3(export_data)
except Exception as exc:
# Exponential backoff: 60s, 120s, 240s
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
Notice task_acks_late=True. This is mandatory in production. Without it, Redis acknowledges the task immediately, and if your worker crashes mid-execution, you lose the job. With late acks, the task stays in the queue until it actually completes.
Now integrate it with FastAPI:
# app/api/exports.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.celery_app import process_user_export
import logging
router = APIRouter()
logger = logging.getLogger(__name__)
class ExportRequest(BaseModel):
user_id: int
@router.post("/export")
async def request_export(req: ExportRequest):
"""Queue a user export, return immediately."""
try:
task = process_user_export.delay(req.user_id)
return {
"task_id": task.id,
"status": "queued",
"message": "Your export is being prepared. Check back shortly."
}
except Exception as e:
logger.error(f"Failed to queue export: {e}")
raise HTTPException(status_code=500, detail="Failed to queue task")
@router.get("/export/{task_id}")
async def get_export_status(task_id: str):
"""Poll for task completion."""
from app.celery_app import celery_app
result = celery_app.AsyncResult(task_id)
if result.state == "PENDING":
return {"status": "pending", "progress": 0}
elif result.state == "SUCCESS":
return {"status": "completed", "download_url": result.result}
elif result.state == "FAILURE":
return {
"status": "failed",
"error": str(result.info)
}
else:
return {"status": result.state}
This is the pattern: queue immediately in your endpoint, let clients poll a status endpoint.
Handling Webhooks and External Services
Long-running API calls to external services (AI inference, payment processing) should always be queued:
# app/tasks/ai_tasks.py
from app.celery_app import celery_app
from anthropic import Anthropic
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, max_retries=5)
def analyze_user_document(self, document_id: int, user_id: int):
"""
Call Claude API, save results to database.
Retries on network failures, not on validation errors.
"""
try:
doc = get_document(document_id)
if not doc:
raise ValueError(f"Document {document_id} not found")
client = Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Analyze this: {doc.text}"
}]
)
analysis = response.content[0].text
save_analysis_to_db(document_id, analysis)
# Send webhook to user's system
notify_analysis_complete(user_id, document_id)
except ValueError as e:
# Don't retry validation errors
logger.error(f"Validation error: {e}")
save_task_failure(document_id, str(e))
except Exception as e:
# Retry network/API errors with exponential backoff
logger.warning(f"Attempt {self.request.retries + 1} failed: {e}")
raise self.retry(exc=e, countdown=2 ** self.request.retries * 30)
I prefer this over fire-and-forget because you get automatic retries and error tracking for free.
Gotcha: Dead Letter Queues and Observability
Here’s what burned me in production: tasks would fail silently, and I wouldn’t know until users complained.
Always implement task failure logging:
# app/celery_app.py (add to config)
@celery_app.task(bind=True)
def on_task_failure(self, exc, task_id, args, kwargs, einfo):
"""Called when any task fails."""
logger.error(
f"Task {task_id} failed",
extra={
"task_name": self.name,
"exception": str(exc),
"traceback": str(einfo),
"args": args,
}
)
# Send alert to Sentry, DataDog, etc.
send_alert_to_monitoring(task_id, exc)
celery_app.conf.task_error_handler = on_task_failure
And set up a dead-letter queue for tasks that exceed max retries:
# app/tasks/dlq.py
@celery_app.task
def handle_dead_letter(task_name, task_id, args, exc):
"""
Tasks that failed all retries land here.
Log for manual investigation.
"""
logger.critical(
f"DEAD LETTER: {task_name} ({task_id})",
extra={"original_args": args, "final_exception": str(exc)}
)
I’ve also learned that you must monitor queue depth. A growing queue means your workers can’t keep up. Add a simple health check:
# app/api/health.py
from app.celery_app import celery_app
from celery.app.control import Inspect
@router.get("/health/queues")
async def queue_health():
"""Check Celery queue depths."""
inspect = Inspect(app=celery_app)
active = inspect.active()
reserved = inspect.reserved()
total_active = sum(len(tasks) for tasks in active.values())
total_reserved = sum(len(tasks) for tasks in reserved.values())
return {
"workers_online": len(active),
"active_tasks": total_active,
"reserved_tasks": total_reserved,
"healthy": total_reserved < 1000 # Alert if backlog grows
}
What I Missed Early On
I didn’t use task_acks_late=True initially. Lost jobs on worker crashes. I also didn’t set worker_prefetch_multiplier=1, so one worker would hog all pending tasks while others sat idle. And I definitely didn’t implement proper error handling—I just assumed tasks would work.
Start with the config I showed you. It’s battle-tested. Then optimize once you see what actually bottlenecks.
Comments
All comments are moderated before appearing.
Leave a comment