Skip to main content
All posts

API Design for AI: Streaming, Structured Output, and Rate Limits

21 June 2026

API Design for AI: Streaming, Structured Output, and Rate Limits

Most API design guides treat endpoints like they’re serving JSON from a database. But Claude and other LLMs break that assumption: responses take 5–30 seconds, tokens cost money, and users expect real-time feedback. I’ve shipped CitizenApp with nine AI features across three product tiers, and I’ve learned the hard way that standard REST patterns don’t cut it.

This post covers three patterns I now use by default: streaming responses, enforced schema validation, and token-aware rate limiting.

Why Standard API Design Fails for LLMs

Traditional API endpoints are fire-and-forget: you POST data, wait for 200 OK, get your JSON. That works when your database query finishes in 50ms.

Claude? 5–30 seconds is normal. Users see a blank screen and assume your app is broken. Your database connection pools exhaust. Your infra costs skyrocket because every request holds resources for minutes.

Also: tokens cost money. A user asking for a 2,000-token response isn’t equivalent to a user asking for 10 rows from your database. You can’t treat them as the same quota unit.

I learned this the expensive way: CitizenApp’s early version rate-limited by request count. One power user submitted a single prompt that generated 50,000 tokens of output. That destroyed my Anthropic bill for the month.

Pattern 1: Streaming as Default

Streaming isn’t optional for AI APIs—it’s the only way to provide real user experience at scale.

The FastAPI + SSE Approach

I use Server-Sent Events (SSE) for streaming. It’s simpler than WebSockets for one-way, text-based data, and the browser API is excellent.

from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import anthropic
import json
from datetime import datetime

app = FastAPI()
client = anthropic.Anthropic()

@app.post("/api/generate")
async def generate_text(prompt: str, background_tasks: BackgroundTasks):
    """Stream Claude response back to client via SSE."""
    
    async def stream_response():
        try:
            with client.messages.stream(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            ) as stream:
                for text in stream.text_stream:
                    # SSE format: data: <json>\n\n
                    event = {
                        "type": "text_delta",
                        "text": text,
                        "timestamp": datetime.utcnow().isoformat()
                    }
                    yield f"data: {json.dumps(event)}\n\n"
                
                # Final event with usage stats
                final_event = {
                    "type": "done",
                    "usage": {
                        "input_tokens": stream.get_final_message().usage.input_tokens,
                        "output_tokens": stream.get_final_message().usage.output_tokens
                    }
                }
                yield f"data: {json.dumps(final_event)}\n\n"
        except Exception as e:
            error_event = {"type": "error", "message": str(e)}
            yield f"data: {json.dumps(error_event)}\n\n"
    
    return StreamingResponse(stream_response(), media_type="text/event-stream")

On the React/TypeScript side:

async function streamGeneration(prompt: string): Promise<void> {
  const eventSource = new EventSource(
    `/api/generate?prompt=${encodeURIComponent(prompt)}`
  );
  
  let totalTokens = 0;
  
  eventSource.addEventListener("message", (e) => {
    const event = JSON.parse(e.data);
    
    if (event.type === "text_delta") {
      // Append to UI in real-time
      setOutput((prev) => prev + event.text);
    } else if (event.type === "done") {
      totalTokens = event.usage.output_tokens;
      console.log(`Generated ${totalTokens} tokens`);
      eventSource.close();
    } else if (event.type === "error") {
      console.error(event.message);
      eventSource.close();
    }
  });
}

Why I prefer this: Users see text appearing in real-time. The response feels instant, even if Claude takes 15 seconds. No spinners. No anxiety. For CitizenApp’s document summarization feature, streaming reduced perceived latency by 60%.

Pattern 2: Structured Output with Schema Validation

Claude 3.5 supports tool_use for enforcing output schemas. I always use this for anything downstream—it’s cheaper and more reliable than prompt engineering.

from pydantic import BaseModel
from typing import Optional

class AnalysisResult(BaseModel):
    """Enforced schema for content analysis."""
    sentiment: str  # positive, neutral, negative
    confidence: float  # 0.0 to 1.0
    key_topics: list[str]
    summary: str
    requires_escalation: bool

def analyze_with_schema(text: str) -> AnalysisResult:
    """Use Claude with tool_use to enforce structured output."""
    
    tools = [
        {
            "name": "return_analysis",
            "description": "Return the analysis result",
            "input_schema": {
                "type": "object",
                "properties": {
                    "sentiment": {
                        "type": "string",
                        "enum": ["positive", "neutral", "negative"]
                    },
                    "confidence": {
                        "type": "number",
                        "minimum": 0,
                        "maximum": 1
                    },
                    "key_topics": {
                        "type": "array",
                        "items": {"type": "string"},
                        "max_items": 5
                    },
                    "summary": {"type": "string"},
                    "requires_escalation": {"type": "boolean"}
                },
                "required": ["sentiment", "confidence", "key_topics", "summary", "requires_escalation"]
            }
        }
    ]
    
    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=512,
        tools=tools,
        messages=[
            {
                "role": "user",
                "content": f"Analyze this text:\n\n{text}"
            }
        ]
    )
    
    # Extract tool use block
    for block in response.content:
        if block.type == "tool_use":
            result_data = block.input
            return AnalysisResult(**result_data)
    
    raise ValueError("Claude didn't use the tool")

Why this matters: Structured output isn’t about prettiness—it’s about predictability. Your downstream code doesn’t need error handling for weird Claude responses. Your database schema is guaranteed to match. Testing is deterministic.

Pattern 3: Token-Aware Rate Limiting

Request-count limits are meaningless for AI APIs. One user might consume 100 tokens; another might consume 50,000 in a single request.

from redis import Redis
from datetime import datetime, timedelta

redis = Redis(host="localhost")

class TokenQuota:
    def __init__(self, user_id: str, tier: str):
        self.user_id = user_id
        self.tier = tier
        # Define quota by tier
        self.daily_limit = {
            "free": 10_000,
            "pro": 500_000,
            "enterprise": None  # unlimited
        }[tier]
    
    def get_remaining(self) -> int:
        """Check remaining tokens for today."""
        key = f"tokens:{self.user_id}:{datetime.utcnow().date()}"
        used = redis.get(key)
        return self.daily_limit - int(used or 0)
    
    def deduct(self, tokens: int) -> bool:
        """Deduct tokens, return True if allowed."""
        if self.tier == "enterprise":
            return True
        
        remaining = self.get_remaining()
        if tokens > remaining:
            return False
        
        key = f"tokens:{self.user_id}:{datetime.utcnow().date()}"
        redis.incrby(key, tokens)
        # Reset tomorrow
        redis.expire(key, 86400)
        return True

Apply this after the Claude call completes:

@app.post("/api/generate")
async def generate_text(user_id: str, prompt: str):
    # ... stream Claude response ...
    
    # After stream ends, check quota
    quota = TokenQuota(user_id, user.tier)
    output_tokens = stream.get_final_message().usage.output_tokens
    
    if not quota.deduct(output_tokens):
        # Log this and notify user
        log_quota_exceeded(user_id, output_tokens)
        return {"error": "Quota exceeded"}
    
    return {"success": True}

I charge by tokens used, not requests made. Users on the free tier get 10K tokens/month. That’s fair, transparent, and matches how Claude actually costs money.

Gotcha: Streaming Timeout & Retries

Here’s what bit me: streaming responses timeout after ~60 seconds on most cloud platforms (Vercel, Render, Cloudflare Pages). Claude sometimes takes longer than that for 2000+ token generations.

I now set max_tokens=1024 by default and let users request longer responses via pagination. Yes, it’s UX friction. But it’s better than timeouts.

Also: never retry a streaming response. The user already saw partial output. You’ll either duplicate tokens or confuse them. Log it and ask them to resubmit if it failed.


API design for AI is about accepting constraints, not fighting them. Streaming feels slower but isn’t. Structured output adds 10 lines of code and eliminates 90% of bugs. Token-based quotas are the only fair way to charge.

Build for

You might also like

Comments

All comments are moderated before appearing.

Loading comments…

Leave a comment

0/2000

Building something like this?

I build production-grade Python + React applications. Let's talk about your project.

Get in touch