Anthropic Batch API for Asynchronous Multi-Tenant AI Processing: Cutting Claude Costs by 50% Without Sacrificing Feature Responsiveness
I’m going to be blunt: if you’re running a SaaS that uses Claude, and you haven’t implemented the Batch API yet, you’re hemorrhaging money. Not metaphorically—literally 50% off the per-token cost for any request that doesn’t need a response in the next 60 seconds.
At CitizenApp, we process thousands of document summaries, compliance classifications, and policy analyses daily. Our first instinct was synchronous: user uploads document → Claude responds in real-time → dashboard updates. It felt responsive. It also felt like burning cash.
Then we realized: 80% of those requests don’t actually need synchronous responses. A user uploads a document for summarization? They’re fine waiting 5–30 minutes. A tenant wants to bulk-classify 500 policies? That’s explicitly an async job. The Batch API is built for this, and it’s absurdly underutilized.
Here’s how I wired it into CitizenApp—and why you should too.
Why Batch API Matters (Beyond the 50% Discount)
The 50% cost reduction is the headline, but the real win is architectural. Batching forces you to separate concerns:
- Synchronous requests (fast, expensive): Real-time chat, document Q&A, immediate feedback loops.
- Asynchronous requests (slow, cheap): Bulk operations, scheduled analyses, background enrichment.
This separation is healthy. You stop cramming everything through Claude’s sync API and actually think about what needs to be urgent.
In CitizenApp’s case, we reduced our monthly Claude bill from ~$8k to ~$4k while actually increasing feature coverage. The key was offloading 65% of our workload to batches.
Architecture: Queue → Batch → LISTEN/NOTIFY → React
Here’s the flow:
User Request
↓
FastAPI Endpoint (Validate, Enqueue)
↓
PostgreSQL Queue Table
↓
Batch Processor (reads queue, submits to Anthropic)
↓
Anthropic Batch Job (runs in background)
↓
Webhook/Polling Handler (gets results)
↓
PostgreSQL LISTEN/NOTIFY
↓
WebSocket → React 19 Dashboard (real-time update)
Let’s build it.
Step 1: Data Model
# models.py
from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class AIJobStatus(str, enum.Enum):
QUEUED = "queued"
SUBMITTED = "submitted"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class AIJob(Base):
__tablename__ = "ai_jobs"
id = Column(String, primary_key=True)
tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
user_id = Column(String, nullable=False)
job_type = Column(String, nullable=False) # "summarize", "classify", etc.
status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)
input_data = Column(Text, nullable=False) # JSON stringified
result = Column(Text, nullable=True) # Result from Claude
batch_id = Column(String, nullable=True, index=True) # Anthropic batch ID
request_id = Column(String, nullable=True) # Within batch
created_at = Column(DateTime, default=datetime.utcnow, index=True)
completed_at = Column(DateTime, nullable=True)
error_message = Column(Text, nullable=True)
Step 2: Queue Endpoint
# api/ai.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus
router = APIRouter()
class SummarizeRequest(BaseModel):
document_text: str
max_length: int = 500
@router.post("/api/ai/summarize")
async def queue_summarize(
request: SummarizeRequest,
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
user_id: str = Depends(get_user_id),
):
"""
Queue a document for summarization (async).
Returns immediately with job ID.
User receives result via WebSocket when batch completes.
"""
job_id = str(uuid.uuid4())
job = AIJob(
id=job_id,
tenant_id=tenant_id,
user_id=user_id,
job_type="summarize",
input_data=json.dumps({
"document_text": request.document_text,
"max_length": request.max_length,
}),
status=AIJobStatus.QUEUED,
)
db.add(job)
db.commit()
return {
"job_id": job_id,
"status": "queued",
"message": "Your request is queued. You'll receive results in 5-30 minutes.",
}
Step 3: Batch Processor (Background Task)
I prefer a separate worker process (via Celery or APScheduler) that runs periodically. Here’s APScheduler for simplicity:
# workers/batch_processor.py
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime
client = Anthropic()
def process_batch_jobs():
"""
Run every 5 minutes (via APScheduler).
Collects queued jobs and submits to Anthropic Batch API.
"""
db = SessionLocal()
# Get all queued jobs (batch size: 10k requests per batch max)
queued_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.QUEUED
).limit(100).all()
if not queued_jobs:
db.close()
return
# Build batch request
requests = []
job_map = {}
for job in queued_jobs:
input_data = json.loads(job.input_data)
if job.job_type == "summarize":
message = f"Summarize the following document in {input_data['max_length']} words:\n\n{input_data['document_text']}"
elif job.job_type == "classify":
message = f"Classify this text into one of: {input_data['categories']}\n\nText: {input_data['text']}"
else:
continue
request_id = job.id
job_map[request_id] = job.id
requests.append({
"custom_id": request_id,
"params": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": [{"role": "user", "content": message}],
},
})
# Submit batch to Anthropic
batch = client.beta.messages.batches.create(
requests=requests,
betas=["batch-2024-09-24"],
)
# Mark all jobs as submitted
for job in queued_jobs:
job.status = AIJobStatus.SUBMITTED
job.batch_id = batch.id
db.commit()
print(f"Submitted batch {batch.id} with {len(requests)} requests")
db.close()
def poll_batch_results():
"""
Run every 30 seconds.
Checks submitted batches for completion, stores results.
"""
db = SessionLocal()
# Get all submitted jobs
submitted_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.SUBMITTED
).all()
batch_ids = set(job.batch_id for job in submitted_jobs)
for batch_id in batch_ids:
batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])
if batch.processing_status == "in_progress":
continue
if batch.processing_status == "expired":
# Mark jobs as failed
for job in submitted_jobs:
if job.batch_id == batch_id:
job.status = AIJobStatus.FAILED
job.error_message = "Batch expired"
db.commit()
continue
# Batch complete — fetch results
results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])
for result in results:
request_id = result.custom_id
job = db.query(AIJob).filter(AIJob.id == request_id).first()
if not job:
continue
if result.result.type == "succeeded":
job.status = AIJobStatus.COMPLETED
job.result = result.result.message.content[0].text
elif result.result.type == "errored":
job.status = AIJobStatus.FAILED
job.error_message = result.result.error.message
job.completed_at = datetime.utcnow()
db.commit()
# Notify connected clients (next step)
notify_clients_batch_complete(batch_id)
db.close()