SQLAlchemy Bulk Operations for AI Feature Analytics: Writing 10K Events/Second Without Database Meltdown
CitizenApp logs analytics for 9 AI features—document summarization, smart search, recommendation engine, automated workflows, sentiment analysis, entity extraction, content generation, anomaly detection, and predictive forecasting. Each feature fires events on user interaction, API calls, token usage, latency, and errors. At scale, that’s easily 10K events per second during peak hours.
The first version of our analytics pipeline was naive: one session.add() per event, flush every second. We hit database locks within a week. Queries took 45 seconds. Dashboards timed out. The lesson was brutal—individual inserts don’t scale, and neither does pretending they will.
This post covers the exact patterns I use in production: bulk insertion with bulk_insert_mappings(), intelligent session batching, circular buffer partitioning, and query patterns that actually run fast on dense analytics tables.
Why Not Just Use a Time-Series Database?
I considered ClickHouse, TimescaleDB, and Datadog. They’re excellent. But CitizenApp runs on Render and Vercel—we wanted one database, one bill, one monitoring surface. PostgreSQL with intelligent partitioning and bulk writes handles our volume fine. If you’re doing 100K+ events/second across 50 features, break out ClickHouse. But for most SaaS companies with 5–20 AI features, this approach is simpler and cheaper.
The Naive Approach (Don’t Do This)
# ❌ This will destroy your database
from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = FastAPI()
engine = create_engine("postgresql://...")
@app.post("/api/log-event")
async def log_event(event: EventSchema, db: Session = Depends(get_db)):
db_event = AnalyticsEvent(
tenant_id=event.tenant_id,
feature_name=event.feature_name,
user_id=event.user_id,
tokens_used=event.tokens_used,
latency_ms=event.latency_ms,
status=event.status,
timestamp=datetime.utcnow(),
)
db.add(db_event)
db.commit() # ← Each request commits. Database hatred intensifies.
Why this fails:
- One commit per event = 10K transactions/second = disk I/O hell
- No batching = SQLAlchemy can’t optimize
- Autoflush on query means your aggregation queries lock writes
- Index thrashing on timestamp columns
I ran this for two days. At 3 AM, dashboards went black. pg_stat_statements showed 80% of time in INSERT. Never again.
The Right Way: Bulk Insert with Session Batching
The fix is to batch events in memory, flush in chunks, and let SQLAlchemy + PostgreSQL handle the bulk write efficiently:
# ✅ Production-grade event logging
from datetime import datetime
from typing import List
from sqlalchemy import Column, Integer, String, Float, DateTime, Index
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
import logging
logger = logging.getLogger(__name__)
class AnalyticsEvent(Base):
__tablename__ = "analytics_events"
id = Column(Integer, primary_key=True)
tenant_id = Column(String(36), nullable=False)
feature_name = Column(String(50), nullable=False)
user_id = Column(String(36), nullable=False)
tokens_used = Column(Integer, default=0)
latency_ms = Column(Integer, default=0)
status = Column(String(20), default="success")
timestamp = Column(DateTime, nullable=False, index=True)
__table_args__ = (
Index("idx_tenant_feature_time", tenant_id, feature_name, timestamp),
Index("idx_timestamp_status", timestamp, status),
)
class EventBuffer:
"""In-memory buffer for analytics events with intelligent flushing."""
def __init__(self, batch_size: int = 500, max_wait_seconds: int = 5):
self.batch_size = batch_size
self.max_wait_seconds = max_wait_seconds
self.buffer: List[dict] = []
self.last_flush = datetime.utcnow()
def add(self, event: dict) -> None:
"""Add event to buffer. Auto-flush if batch size reached."""
self.buffer.append(event)
if len(self.buffer) >= self.batch_size:
self._flush()
elif (datetime.utcnow() - self.last_flush).total_seconds() > self.max_wait_seconds:
self._flush()
def _flush(self) -> None:
"""Bulk insert all buffered events."""
if not self.buffer:
return
try:
# Will be injected from FastAPI dependency
db = get_db()
# bulk_insert_mappings is significantly faster than add() loop
db.bulk_insert_mappings(AnalyticsEvent, self.buffer)
db.commit()
logger.info(f"Flushed {len(self.buffer)} analytics events")
self.buffer.clear()
self.last_flush = datetime.utcnow()
except Exception as e:
logger.error(f"Analytics flush failed: {e}")
db.rollback()
# Don't raise—analytics shouldn't break user features
# Global buffer instance
event_buffer = EventBuffer(batch_size=500, max_wait_seconds=3)
@app.post("/api/ai-feature/{feature_name}")
async def call_ai_feature(
feature_name: str,
request: FeatureRequest,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
):
"""Call an AI feature and log usage."""
start_time = time.perf_counter()
try:
# Feature logic here
result = await call_claude_api(request.prompt)
latency_ms = int((time.perf_counter() - start_time) * 1000)
# Log event to buffer (non-blocking)
event_buffer.add({
"tenant_id": current_user.tenant_id,
"feature_name": feature_name,
"user_id": current_user.id,
"tokens_used": result.usage.input_tokens + result.usage.output_tokens,
"latency_ms": latency_ms,
"status": "success",
"timestamp": datetime.utcnow(),
})
return {"result": result.content}
except Exception as e:
event_buffer.add({
"tenant_id": current_user.tenant_id,
"feature_name": feature_name,
"user_id": current_user.id,
"tokens_used": 0,
"latency_ms": int((time.perf_counter() - start_time) * 1000),
"status": "error",
"timestamp": datetime.utcnow(),
})
raise
Why this works:
bulk_insert_mappings()batches the INSERT into one statement. PostgreSQL executes ~500 rows in a single round-trip.- Dual flush strategy: flush on batch size OR time-based. Prevents 2-hour delays for low-traffic tenants.
- Non-blocking logging: Event buffering doesn’t slow down the user’s request.
- Automatic retry on rollback would require a queue (Celery/RabbitMQ). For non-critical analytics, I let failed batches drop.
Fast Aggregation with Partitioning
Dense analytics tables become slow even with indexes. I use PostgreSQL’s table partitioning to keep active data hot:
# ✅ Partition by month
CREATE TABLE analytics_events (
id BIGSERIAL,
tenant_id VARCHAR(36) NOT NULL,
feature_name VARCHAR(50) NOT NULL,
user_id VARCHAR(36) NOT NULL,
tokens_used INTEGER DEFAULT 0,
latency_ms INTEGER DEFAULT 0,
status VARCHAR(20) DEFAULT 'success',
timestamp TIMESTAMP NOT NULL,
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);
CREATE TABLE analytics_events_2024_01 PARTITION OF analytics_events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE analytics_events_2024_02 PARTITION OF analytics_events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Indexes on each partition
CREATE INDEX idx_2024_01_tenant_feature
ON analytics_events_2024_01 (tenant_id, feature_name, timestamp);
Then query aggregations run only against relevant partitions:
# ✅ This scans only Jan 2024 partition + Feb 2024 partition
stmt = (
select(
AnalyticsEvent.feature_name,
func.count(AnalyticsEvent.id).label("total_calls"),
func.sum(AnalyticsEvent.tokens_used).label("total_tokens"),
func.avg(AnalyticsEvent.latency_ms).label("avg_latency_ms"),
)
.where(
and_(
AnalyticsEvent.tenant_id == tenant_id,
AnalyticsEvent.timestamp >= datetime(2024, 1, 1),
AnalyticsEvent.timestamp < datetime(2024, 3, 1),
AnalyticsEvent.status == "success",
)
)
.group_by(AnalyticsEvent.feature_name)
)
result = db.execute(stmt).fetchall()
Partitioning makes this query run in <200