FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget
I’ve shipped AI feature processing in CitizenApp three different ways: BackgroundTasks, Celery + Redis, and naked async tasks. Each burned me in production. This post maps the exact failure modes that determine which pattern wins for your workload.
The tldr: BackgroundTasks is perfect for sub-5-second tasks in low-concurrency environments. Celery is overkill until you hit 50+ concurrent AI inferences or need horizontal scaling. Async tasks are the dangerous middle ground—they feel right until your process crashes and loses work.
Why This Matters for Multi-Tenant AI
When you’re processing Claude API calls across 200 tenants, a single failure mode cascades differently depending on your queue strategy:
- BackgroundTasks dies with the process → tenant’s AI feature blocks or silently fails
- Async tasks die if you don’t handle graceful shutdown → you lose inference requests mid-processing
- Celery with Redis persists work → AI features eventually complete, even after deploys
I learned this when pushing CitizenApp to production. We had 12 concurrent AI document analyses running when I deployed a code change. BackgroundTasks lost 3 of them. The tenants got a generic error and no way to retry.
The Patterns
Pattern 1: FastAPI BackgroundTasks (Fire-and-Forget Lite)
This is what I use for synchronous side effects: logging, webhook retries, non-critical notifications.
from fastapi import BackgroundTasks, FastAPI
from sqlalchemy.orm import Session
app = FastAPI()
def log_ai_feature_usage(tenant_id: str, feature_name: str, db: Session):
"""Non-blocking usage logging—fires after response returns."""
try:
usage = FeatureUsage(
tenant_id=tenant_id,
feature_name=feature_name,
timestamp=datetime.utcnow(),
)
db.add(usage)
db.commit()
except Exception as e:
# This fails silently in BackgroundTasks. Genuinely doesn't matter here.
logger.warning(f"Failed to log usage: {e}")
@app.post("/analyze")
async def analyze_document(
tenant_id: str,
document: str,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
# Return immediately while logging happens in background
background_tasks.add_task(log_ai_feature_usage, tenant_id, "document_analysis", db)
return {"status": "queued", "document_id": "..."}
Why I use this: Synchronous, built-in, zero dependencies. No Redis. No Celery workers. Perfect for:
- Analytics logging
- Webhook retries (with exponential backoff in the task)
- Cleanup jobs under 5 seconds
- Non-critical email sends
Where it breaks:
- Task dies if the worker process crashes → no retry mechanism
- No visibility into failures (unless you instrument logging)
- Blocks deployment until all tasks complete (FastAPI waits for BackgroundTasks on shutdown)
- No tenant isolation by default—tasks run in shared worker pool
The last one bit me. A tenant’s usage logging task had a database constraint violation, and FastAPI’s default behavior logged it to stdout but didn’t surface it to the tenant. They lost audit data and blamed our system.
Pattern 2: Async Tasks (The Trap)
Here’s the seductive pattern that burned me hardest:
import asyncio
from datetime import datetime
async def process_ai_inference(tenant_id: str, input_text: str, db: Session):
"""Process Claude API call asynchronously."""
try:
# Call Claude
response = await anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": input_text}],
)
# Store result
inference = AIInference(
tenant_id=tenant_id,
input=input_text,
output=response.content[0].text,
status="completed",
completed_at=datetime.utcnow(),
)
db.add(inference)
db.commit()
except Exception as e:
logger.error(f"Inference failed for tenant {tenant_id}: {e}")
# Try to mark as failed, but if DB is down, this silently fails
inference = AIInference(
tenant_id=tenant_id,
status="failed",
error=str(e),
)
db.add(inference)
db.commit()
@app.post("/infer")
async def infer(tenant_id: str, input_text: str, db: Session = Depends(get_db)):
# Fire task without waiting
asyncio.create_task(process_ai_inference(tenant_id, input_text, db))
return {"status": "processing"}
Why this feels right: One-liner to spawn work, no external dependencies, uses Python’s native async/await.
Why it’s a trap:
- Tasks are lost on process crash—no persistence
- Graceful shutdown is tricky (need
asyncio.gather()on app.on_event(“shutdown”)) - Concurrent limit is your server’s event loop, not explicit
- Tenant isolation requires manual task tracking
- Zero visibility into which tenants have stuck tasks
This is what we used in CitizenApp v1. We’d deploy, and during the 30-second shutdown, any in-flight Claude API calls would get killed mid-response. We’d log them as failed, but the tenant’s UI would timeout waiting for a response that was already discarded.
Pattern 3: Celery + Redis (Industrial Strength)
This is what I use now for anything that:
- Takes >5 seconds
- Must survive a deploy
- Needs per-tenant concurrency limits
- Touches expensive APIs (Claude, Stripe)
# tasks.py
from celery import Celery, Task
from kombu import Queue
app = Celery("citiezenapp")
app.conf.update(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
task_serializer="json",
accept_content=["json"],
result_expires=3600,
)
# Per-tenant queue routing
app.conf.task_queues = (
Queue("default", routing_key="default"),
Queue("ai_inference", routing_key="ai.inference"),
)
class CallbackTask(Task):
"""Track task state in DB for multi-tenant visibility."""
def on_failure(self, exc, task_id, args, kwargs, einfo):
tenant_id = kwargs.get("tenant_id")
logger.error(f"Task {task_id} failed for tenant {tenant_id}: {exc}")
# Update tenant's failure count, trigger alert
@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_ai_inference(
self,
tenant_id: str,
inference_id: str,
input_text: str,
):
"""
Process Claude inference with automatic retries.
Lives in Redis until completion.
"""
try:
db = next(get_db())
inference = db.query(AIInference).filter_by(id=inference_id).first()
if not inference:
return {"error": "Inference not found"}
# Check tenant rate limits (per-tenant concurrency)
tenant_config = db.query(TenantAIConfig).filter_by(
tenant_id=tenant_id
).first()
if tenant_config.concurrent_inferences >= tenant_config.max_concurrent:
# Retry after 10 seconds
raise self.retry(countdown=10)
inference.status = "processing"
db.commit()
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": input_text}],
)
inference.output = response.content[0].text
inference.status = "completed"
inference.completed_at = datetime.utcnow()
db.commit()
return {"inference_id": inference_id, "status": "completed"}
except Exception as exc:
# Exponential backoff: 60s, 120s, 240s
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
# fastapi_app.py
@app.post("/infer")
async def infer(
tenant_id: str,
input_text: str,
db: Session = Depends(get_db),
):
inference = AIInference(
tenant_id=tenant_id,
input=input_text,
status="queued",
)
db.add(inference)
db.commit()
# Queue task, returns immediately
task = process_ai_inference.apply_async(
kwargs={
"tenant_id": tenant_id,
"inference_id": inference.id,
"input_text": input_text,
},
queue="ai_inference",
priority=5, # Per-tenant prioritization
)
inference.celery_task_id = task.id
db.commit()
return {"inference_id": inference.id, "task_id": task.id}
Why I use Celery:
- Tasks persist in Redis—survive deploys, crashes, power outages
- Built-in retry logic with exponential backoff
- Per-queue concurrency control (we have separate
ai_inferencequeue with 10 workers) - Dead letter queue for failed tasks
- Monitoring integration (Flower, DataDog)
- Per-tenant rate limiting (shown above)
Cost: Running Redis + Celery workers adds ~$20