FastAPI Performance Tuning: Caching, Async, and Production Bottlenecks
I’ve tuned FastAPI applications from handling 50 req/s to 5,000+ req/s in production. The gap between a “working” FastAPI app and a fast one isn’t magic—it’s understanding where time actually goes and fixing it systematically.
Most performance advice is generic (“use caching”, “go async”). This post is specific: here’s what killed my APIs, how I measured it, and the exact patterns I use now in CitizenApp.
The Three Bottlenecks That Matter
Before optimizing anything, I profile. FastAPI’s Starlette foundation gives you request timing for free—but you need to look.
1. Synchronous database calls block the event loop. This is the killer. One 200ms database query in a sync endpoint blocks every other request waiting for an async worker. I learned this the hard way when our auth endpoint hit production traffic.
2. Missing connection pooling. SQLAlchemy without proper pool configuration creates new database connections per request. I’ve watched a 4-core Render instance exhaust Postgres connections (100 default limit) at only 300 concurrent users.
3. Uncompressed responses. This sounds trivial until you’re serving 5MB JSON arrays. Gzip cuts that to 200KB. On a 10Mbps connection, that’s the difference between 4 seconds and 160ms per request.
The Pattern I Use: Async-First with Strategic Sync
Here’s my rule: make the database layer async first, database access second, serialization third.
# ❌ This blocks. Don't do this.
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
return user
# ✅ This doesn't block. Do this.
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
return user
Why async matters: FastAPI runs on Uvicorn with an event loop. If your endpoint is sync, Uvicorn queues it to a thread pool—creating context switch overhead and limiting concurrency. Async endpoints stay on the event loop.
But here’s what burned me: not all libraries are async-friendly. Requests library? Sync only. Anthropic Python SDK? Sync by default. When I need sync code, I use run_in_threadpool:
from fastapi.concurrency import run_in_threadpool
import anthropic
@app.post("/analyze")
async def analyze(text: str):
client = anthropic.Anthropic()
# This would block. Don't do it directly:
# response = client.messages.create(...)
# Instead:
response = await run_in_threadpool(
client.messages.create,
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": text}]
)
return {"analysis": response.content[0].text}
This offloads the blocking call to Uvicorn’s thread pool without freezing the event loop.
Connection Pooling: The Silent Killer
SQLAlchemy’s default pool is designed for single-process apps. In production with multiple workers, you’ll exhaust connections.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool
# Development (single worker, temporary connections):
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool # Don't pool in dev
)
# Production (multiple workers, connection pooling):
engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5, # connections per worker
max_overflow=10, # overflow when pool exhausted
pool_recycle=3600, # recycle connections every hour
echo=False
)
async def get_async_db():
async with AsyncSession(engine) as session:
yield session
The math: pool_size * num_workers should be less than your database’s max connections. On Render’s standard plan (2 workers), I use pool_size=5 + max_overflow=10 = max 30 connections. Postgres default allows 100, so we’re safe.
Caching: Where It Actually Helps
I cache three things: rarely-changing data, expensive computations, and database queries.
from functools import lru_cache
from datetime import datetime, timedelta
# Option 1: Simple in-memory cache (single worker only)
@lru_cache(maxsize=128)
def get_org_plan(org_id: int) -> str:
# This would hit database
return "pro" # stubbed
# Option 2: Redis (multi-worker, production)
import redis.asyncio as redis
redis_client: redis.Redis = None
async def init_redis():
global redis_client
redis_client = await redis.from_url("redis://localhost")
async def get_cached_org_plan(org_id: int) -> str:
cached = await redis_client.get(f"org:{org_id}:plan")
if cached:
return cached.decode()
# Fetch from DB
result = await db.execute(select(Org).where(Org.id == org_id))
org = result.scalar_one()
# Cache for 1 hour
await redis_client.setex(f"org:{org_id}:plan", 3600, org.plan)
return org.plan
Don’t cache everything. Cache invalidation is hard. I cache only:
- Organization settings (invalidate on update)
- AI model pricing (stale data is acceptable)
- Public user profiles (public = cacheable)
What I don’t cache: auth tokens, user permissions, real-time data. The invalidation complexity isn’t worth it.
Compression and Middleware
from fastapi.middleware.gzip import GZIPMiddleware
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(GZIPMiddleware, minimum_size=1000)
# CORS first (applied last, evaluated first)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.citizenapp.com"],
allow_credentials=True,
)
Middleware order matters in FastAPI. The last add_middleware is the first evaluated. I add compression last (closest to the actual endpoint).
Gzip minimum_size=1000: don’t compress small responses. The overhead isn’t worth it.
Gotcha: Task Queues Hide Performance Issues
I used to ship everything async and assume it was fast. Then I shipped an endpoint that called an external API (3-second timeout). Users felt 3-second latency.
The fix: offload to a task queue.
from celery import Celery
celery_app = Celery("tasks", broker="redis://localhost")
@app.post("/send-email")
async def send_email(email: str):
# Fire and forget
celery_app.delay(send_email_task, email)
return {"status": "queued"}
@celery_app.task
def send_email_task(email: str):
# This blocks for 2 seconds. Celery workers handle it, not your API.
requests.post("https://sendgrid.com/...", ...)
Users get instant response. Background job handles the slow operation.
What I Missed Early
I profiled endpoints in isolation. Then production traffic patterns exposed cache misses, lock contention, and connection pool exhaustion I never saw in testing.
Load test with realistic traffic patterns, not synthetic uniform requests. Use locust or k6:
# locust test
from locust import HttpUser, task
class APIUser(HttpUser):
@task(3)
def list_items(self):
self.client.get("/items")
@task(1)
def create_item(self):
self.client.post("/items", json={"name": "test"})
This caught that our list endpoint (cached) was fine, but unbatched creates killed the database.
Your Move
Start with async database access. Add caching only when profiling shows repeated queries. Load test before deploying. Measure before and after—don’t guess.
That’s how CitizenApp handles thousands of concurrent users without infrastructure complexity.
Comments
All comments are moderated before appearing.
Leave a comment