FastAPI Lifespan Events for Multi-Tenant Resource Initialization: Setting Up Tenant Caches and AI Model Contexts Without Singleton Hell
I’ve watched multi-tenant SaaS applications crumble under their own weight at 3 AM more times than I care to admit. The pattern is always the same: someone lazy-loads a tenant’s Claude API context manager on the first request, a popular customer gets hammered, and suddenly you’re initializing the same context 50 times in parallel. Game over.
When I built CitizenApp, I made the opposite choice: I moved all expensive initialization into FastAPI’s lifespan context manager. It’s a subtle shift that eliminates thundering-herd effects, prevents race conditions on shared resources, and gives you a clean place to manage the lifecycle of per-tenant caches and AI model contexts. This post covers exactly how.
The Problem with Reactive Initialization
Most developers initialize tenant-specific resources on-demand:
# ❌ DON'T DO THIS
app = FastAPI()
tenant_caches: dict = {}
@app.get("/api/v1/tenant/{tenant_id}/analyze")
async def analyze(tenant_id: str, request: Request):
if tenant_id not in tenant_caches:
# BOOM: Multiple concurrent requests trigger this simultaneously
tenant_caches[tenant_id] = await expensive_initialization(tenant_id)
cache = tenant_caches[tenant_id]
return await process_with_cache(cache, request.json())
What happens next is predictable:
- Request 1 for tenant “acme-corp” starts initialization
- Request 2 for tenant “acme-corp” arrives before Request 1 finishes
- Request 3 for tenant “acme-corp” arrives
- You’re now initializing the same expensive resource three times in parallel
- Your database connection pool is exhausted
- Claude API calls timeout
- Your PagerDuty alarm triggers at 2 AM
This is the thundering herd. It’s a classic distributed systems problem, and the fix is straightforward: initialize everything before accepting traffic.
FastAPI Lifespan Events: The Better Way
FastAPI’s lifespan context manager (introduced in 0.93.0) gives you a hook that runs before your app starts serving requests and again when it shuts down. I prefer this over application startup/shutdown events because it’s explicit, testable, and handles cancellation gracefully.
Here’s the pattern I use in production:
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import asyncpg
from fastapi import FastAPI
import anthropic
# Global storage for tenant contexts (more on this later)
TENANT_CONTEXTS: dict[str, "TenantContext"] = {}
class TenantContext:
"""Encapsulates all resources for a single tenant."""
def __init__(self, tenant_id: str, db_pool: asyncpg.Pool, claude_client: anthropic.Anthropic):
self.tenant_id = tenant_id
self.db_pool = db_pool
self.claude_client = claude_client
self.feature_flags: dict[str, bool] = {}
self.model_cache: dict[str, any] = {}
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# Startup phase
print("🚀 Initializing tenant contexts...")
# 1. Create database pools
db_pool = await asyncpg.create_pool(
user="postgres",
password="secret",
database="multi_tenant_db",
min_size=5,
max_size=20,
)
# 2. Initialize Claude client (reuse single instance across tenants)
claude_client = anthropic.Anthropic()
# 3. Load all active tenants from database
query = "SELECT id, config FROM tenants WHERE status = 'active'"
tenant_rows = await db_pool.fetch(query)
for tenant_id, config in tenant_rows:
try:
# Create per-tenant context
ctx = TenantContext(tenant_id, db_pool, claude_client)
# Warm up feature flags
flags_query = "SELECT flag_name, enabled FROM feature_flags WHERE tenant_id = $1"
flags = await db_pool.fetch(flags_query, tenant_id)
ctx.feature_flags = {row['flag_name']: row['enabled'] for row in flags}
# Warm up any model-specific caches (e.g., tokenizer caches)
if ctx.feature_flags.get('ai_summarization', False):
ctx.model_cache['summarizer'] = await load_summarizer_context(tenant_id)
TENANT_CONTEXTS[tenant_id] = ctx
print(f"✓ Initialized tenant: {tenant_id}")
except Exception as e:
print(f"✗ Failed to initialize tenant {tenant_id}: {e}")
# Decide: fail hard or skip problematic tenants?
# I prefer failing hard to catch config errors early.
raise
print(f"✓ All {len(TENANT_CONTEXTS)} tenants ready")
# Yield control to the app
yield
# Shutdown phase
print("🛑 Cleaning up tenant contexts...")
await db_pool.close()
TENANT_CONTEXTS.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/api/v1/tenant/{tenant_id}/analyze")
async def analyze(tenant_id: str, request: Request):
# Resources are guaranteed to exist and warm
ctx = TENANT_CONTEXTS.get(tenant_id)
if not ctx:
raise HTTPException(status_code=404, detail="Tenant not found")
return await process_with_context(ctx, request.json())
Why I prefer this approach:
- No race conditions: All initialization happens before the first request lands.
- Thundering herd is impossible: There’s no lazy-load window for concurrent requests to exploit.
- Fails fast: Configuration errors surface at startup, not at 2 AM when a customer tries to use a feature.
- Testable: You can manually call the lifespan context in tests.
- Clean shutdown: Connections are closed gracefully without requests hanging.
Handling Dynamic Tenant Registration
What if a new tenant signs up at runtime? You have two options:
Option 1: Reinitialize on-demand with locking (my preference):
import asyncio
_tenant_init_locks: dict[str, asyncio.Lock] = {}
async def get_or_init_tenant(tenant_id: str) -> TenantContext:
# Fast path: already initialized
if tenant_id in TENANT_CONTEXTS:
return TENANT_CONTEXTS[tenant_id]
# Slow path: first request for this tenant
# Use a lock to prevent thundering herd
if tenant_id not in _tenant_init_locks:
_tenant_init_locks[tenant_id] = asyncio.Lock()
async with _tenant_init_locks[tenant_id]:
# Double-check: another coroutine might have initialized while we waited
if tenant_id in TENANT_CONTEXTS:
return TENANT_CONTEXTS[tenant_id]
# Now initialize safely
ctx = await _initialize_tenant_context(tenant_id)
TENANT_CONTEXTS[tenant_id] = ctx
return ctx
@app.get("/api/v1/tenant/{tenant_id}/analyze")
async def analyze(tenant_id: str, request: Request):
ctx = await get_or_init_tenant(tenant_id)
return await process_with_context(ctx, request.json())
This hybrid approach gets you 95% of the benefits with dynamic tenant support.
Gotcha: Global State and Testing
The biggest gotcha I hit was testing. When you stash TENANT_CONTEXTS globally, your tests interfere with each other:
# ❌ BAD TEST
@pytest.mark.asyncio
async def test_tenant_isolation():
# This test runs after another test that populated TENANT_CONTEXTS
# You get crosstalk, false positives, and debugging nightmares
ctx = await get_or_init_tenant("test-tenant-1")
assert ctx.feature_flags['ai_summarization'] == True
Fix: Use dependency injection to pass context:
# ✓ GOOD TEST
from fastapi.testclient import TestClient
@pytest.mark.asyncio
async def test_tenant_isolation():
# Create fresh context for this test only
test_db_pool = await asyncpg.create_pool(...)
test_ctx = TenantContext("test-tenant-1", test_db_pool, anthropic.Anthropic())
# Inject into app via override
app.dependency_overrides[get_current_tenant_context] = lambda: test_ctx
client = TestClient(app)
response = client.get("/api/v1/tenant/test-tenant-1/analyze")
assert response.status_code == 200
Avoiding Singleton Hell
I mentioned “singleton hell” in the title. What I mean: don’t do this:
# ❌ SINGLETON PATTERN (AVOID)
class TenantManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def initialize(self):
# Initialization logic here
pass
manager = TenantManager() # Globals everywhere, testing is painful
The lifespan context is cleaner: it leverages FastAPI’s dependency system and keeps initialization logic colocated with the app definition.
What I Missed
When I first shipped this pattern, I didn’t account for long-running server processes that need to refresh tenant configs without restarting. I added a background task that periodically reloads feature flags:
async def refresh_tenant_flags():
"""Periodically reload feature flags
Comments
All comments are moderated before appearing.
Leave a comment