Skip to main content
All posts

FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget

5 June 2026

FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget

I’ve shipped AI feature processing in CitizenApp three different ways: BackgroundTasks, Celery + Redis, and naked async tasks. Each burned me in production. This post maps the exact failure modes that determine which pattern wins for your workload.

The tldr: BackgroundTasks is perfect for sub-5-second tasks in low-concurrency environments. Celery is overkill until you hit 50+ concurrent AI inferences or need horizontal scaling. Async tasks are the dangerous middle ground—they feel right until your process crashes and loses work.

Why This Matters for Multi-Tenant AI

When you’re processing Claude API calls across 200 tenants, a single failure mode cascades differently depending on your queue strategy:

I learned this when pushing CitizenApp to production. We had 12 concurrent AI document analyses running when I deployed a code change. BackgroundTasks lost 3 of them. The tenants got a generic error and no way to retry.

The Patterns

Pattern 1: FastAPI BackgroundTasks (Fire-and-Forget Lite)

This is what I use for synchronous side effects: logging, webhook retries, non-critical notifications.

from fastapi import BackgroundTasks, FastAPI
from sqlalchemy.orm import Session

app = FastAPI()

def log_ai_feature_usage(tenant_id: str, feature_name: str, db: Session):
    """Non-blocking usage logging—fires after response returns."""
    try:
        usage = FeatureUsage(
            tenant_id=tenant_id,
            feature_name=feature_name,
            timestamp=datetime.utcnow(),
        )
        db.add(usage)
        db.commit()
    except Exception as e:
        # This fails silently in BackgroundTasks. Genuinely doesn't matter here.
        logger.warning(f"Failed to log usage: {e}")

@app.post("/analyze")
async def analyze_document(
    tenant_id: str,
    document: str,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
):
    # Return immediately while logging happens in background
    background_tasks.add_task(log_ai_feature_usage, tenant_id, "document_analysis", db)
    
    return {"status": "queued", "document_id": "..."}

Why I use this: Synchronous, built-in, zero dependencies. No Redis. No Celery workers. Perfect for:

Where it breaks:

The last one bit me. A tenant’s usage logging task had a database constraint violation, and FastAPI’s default behavior logged it to stdout but didn’t surface it to the tenant. They lost audit data and blamed our system.

Pattern 2: Async Tasks (The Trap)

Here’s the seductive pattern that burned me hardest:

import asyncio
from datetime import datetime

async def process_ai_inference(tenant_id: str, input_text: str, db: Session):
    """Process Claude API call asynchronously."""
    try:
        # Call Claude
        response = await anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": input_text}],
        )
        
        # Store result
        inference = AIInference(
            tenant_id=tenant_id,
            input=input_text,
            output=response.content[0].text,
            status="completed",
            completed_at=datetime.utcnow(),
        )
        db.add(inference)
        db.commit()
    except Exception as e:
        logger.error(f"Inference failed for tenant {tenant_id}: {e}")
        # Try to mark as failed, but if DB is down, this silently fails
        inference = AIInference(
            tenant_id=tenant_id,
            status="failed",
            error=str(e),
        )
        db.add(inference)
        db.commit()

@app.post("/infer")
async def infer(tenant_id: str, input_text: str, db: Session = Depends(get_db)):
    # Fire task without waiting
    asyncio.create_task(process_ai_inference(tenant_id, input_text, db))
    return {"status": "processing"}

Why this feels right: One-liner to spawn work, no external dependencies, uses Python’s native async/await.

Why it’s a trap:

This is what we used in CitizenApp v1. We’d deploy, and during the 30-second shutdown, any in-flight Claude API calls would get killed mid-response. We’d log them as failed, but the tenant’s UI would timeout waiting for a response that was already discarded.

Pattern 3: Celery + Redis (Industrial Strength)

This is what I use now for anything that:

# tasks.py
from celery import Celery, Task
from kombu import Queue

app = Celery("citiezenapp")
app.conf.update(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    accept_content=["json"],
    result_expires=3600,
)

# Per-tenant queue routing
app.conf.task_queues = (
    Queue("default", routing_key="default"),
    Queue("ai_inference", routing_key="ai.inference"),
)

class CallbackTask(Task):
    """Track task state in DB for multi-tenant visibility."""
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        tenant_id = kwargs.get("tenant_id")
        logger.error(f"Task {task_id} failed for tenant {tenant_id}: {exc}")
        # Update tenant's failure count, trigger alert

@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_ai_inference(
    self,
    tenant_id: str,
    inference_id: str,
    input_text: str,
):
    """
    Process Claude inference with automatic retries.
    Lives in Redis until completion.
    """
    try:
        db = next(get_db())
        inference = db.query(AIInference).filter_by(id=inference_id).first()
        
        if not inference:
            return {"error": "Inference not found"}
        
        # Check tenant rate limits (per-tenant concurrency)
        tenant_config = db.query(TenantAIConfig).filter_by(
            tenant_id=tenant_id
        ).first()
        
        if tenant_config.concurrent_inferences >= tenant_config.max_concurrent:
            # Retry after 10 seconds
            raise self.retry(countdown=10)
        
        inference.status = "processing"
        db.commit()
        
        response = anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": input_text}],
        )
        
        inference.output = response.content[0].text
        inference.status = "completed"
        inference.completed_at = datetime.utcnow()
        db.commit()
        
        return {"inference_id": inference_id, "status": "completed"}
        
    except Exception as exc:
        # Exponential backoff: 60s, 120s, 240s
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

# fastapi_app.py
@app.post("/infer")
async def infer(
    tenant_id: str,
    input_text: str,
    db: Session = Depends(get_db),
):
    inference = AIInference(
        tenant_id=tenant_id,
        input=input_text,
        status="queued",
    )
    db.add(inference)
    db.commit()
    
    # Queue task, returns immediately
    task = process_ai_inference.apply_async(
        kwargs={
            "tenant_id": tenant_id,
            "inference_id": inference.id,
            "input_text": input_text,
        },
        queue="ai_inference",
        priority=5,  # Per-tenant prioritization
    )
    
    inference.celery_task_id = task.id
    db.commit()
    
    return {"inference_id": inference.id, "task_id": task.id}

Why I use Celery:

Cost: Running Redis + Celery workers adds ~$20

You might also like

Building something like this?

I build production-grade Python + React applications. Let's talk about your project.

Get in touch