Skip to main content
All posts

PostgreSQL LISTEN/NOTIFY for Real-Time Multi-Tenant Events

1 June 2026

PostgreSQL LISTEN/NOTIFY for Real-Time Multi-Tenant Events: Ditching Polling and WebSocket Complexity

I’ve shipped real-time features in CitizenApp using three different approaches: naive polling (embarrassing), Redis pub/sub (overkill), and now PostgreSQL’s native LISTEN/NOTIFY. The third option is what I should have started with.

Most teams reach for Redis or RabbitMQ the moment they need real-time updates. It’s the conventional wisdom. But here’s the truth: if you’re already running PostgreSQL, you have a battle-tested pub/sub system sitting right there. It handles multi-tenancy correctly, scales to thousands of concurrent connections, and eliminates an entire infrastructure dependency—which matters when you’re deploying to Render or Vercel where every added service is friction.

Why LISTEN/NOTIFY beats the alternatives

Polling is dead. HTTP requests every 2-5 seconds for “new notifications”? That’s technical debt masquerading as simplicity. It wastes bandwidth, kills your database with unnecessary queries, and users see stale data.

Redis is powerful but expensive. Not just in dollars—in operational overhead. You need to manage connection pools, handle failover, monitor memory usage, and keep another service running in production. At CitizenApp’s scale (thousands of concurrent tenants), we were paying $50/month for Redis on top of Render just to broadcast notifications that PostgreSQL could handle natively.

WebSockets without a broker are a nightmare. If you’re running multiple FastAPI workers (and you should be), a WebSocket connection to Worker A doesn’t know about events published by Worker B. You need a message broker to fan-out events across processes. Unless you use PostgreSQL LISTEN/NOTIFY, which handles that automatically.

PostgreSQL’s pub/sub is:

The tradeoff? LISTEN/NOTIFY doesn’t persist messages. If no one’s listening, the event vanishes. But for real-time notifications, analytics updates, and AI job completions (exactly what CitizenApp needs), that’s fine—you want the latest state, not a backlog.

The architecture

Here’s what I’m building:

  1. FastAPI WebSocket server that maintains connections from React clients.
  2. Database layer that listens for PostgreSQL notifications on tenant-specific channels.
  3. React client that subscribes to real-time updates without polling.
  4. Horizontal scaling across multiple workers without needing Redis.

The FastAPI side

# backend/main.py
import json
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, Depends
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
import psycopg

# Global store: tenant_id -> set of WebSocket connections
tenant_connections: dict[str, set[WebSocket]] = {}

async def get_db(request):
    async with AsyncSession(engine) as session:
        yield session

# FastAPI lifespan: start listener in background
@asynccontextmanager
async def lifespan(app: FastAPI):
    task = asyncio.create_task(listen_for_notifications())
    yield
    task.cancel()

app = FastAPI(lifespan=lifespan)

async def listen_for_notifications():
    """
    Connect to PostgreSQL directly and listen for NOTIFY events.
    This runs in the background for the entire app lifetime.
    """
    conn = await psycopg.AsyncConnection.connect(DATABASE_URL)
    
    # Listen on all tenant channels dynamically
    # In production, you'd track which channels are actually subscribed
    async with conn.cursor() as cur:
        # We'll use LISTEN with a pattern-like approach
        await cur.execute("LISTEN notifications")
    
    try:
        async for notify in conn.notifies():
            payload = json.loads(notify.payload)
            tenant_id = payload.get("tenant_id")
            
            if tenant_id in tenant_connections:
                # Broadcast to all WebSockets for this tenant
                disconnected = set()
                for websocket in tenant_connections[tenant_id]:
                    try:
                        await websocket.send_json({
                            "type": payload.get("type"),
                            "data": payload.get("data"),
                        })
                    except Exception:
                        disconnected.add(websocket)
                
                # Clean up dead connections
                tenant_connections[tenant_id] -= disconnected
                if not tenant_connections[tenant_id]:
                    del tenant_connections[tenant_id]
    finally:
        await conn.close()

@app.websocket("/ws/notifications")
async def websocket_notifications(websocket: WebSocket):
    await websocket.accept()
    
    # Extract tenant from JWT or query params
    tenant_id = websocket.query_params.get("tenant_id")
    
    if not tenant_id:
        await websocket.close(code=1008, reason="Missing tenant_id")
        return
    
    # Register this connection
    if tenant_id not in tenant_connections:
        tenant_connections[tenant_id] = set()
    tenant_connections[tenant_id].add(websocket)
    
    try:
        # Keep connection open, listen for client disconnects
        while True:
            await websocket.receive_text()
    except Exception:
        pass
    finally:
        tenant_connections[tenant_id].discard(websocket)

@app.post("/api/notifications")
async def send_notification(
    tenant_id: str,
    message: str,
    db: AsyncSession = Depends(get_db),
):
    """
    Example endpoint that triggers a notification.
    The NOTIFY happens inside the transaction.
    """
    # Create the notification record
    await db.execute(
        text("""
            INSERT INTO notifications (tenant_id, message, created_at)
            VALUES (:tenant_id, :message, NOW())
        """),
        {"tenant_id": tenant_id, "message": message},
    )
    
    # Send the PostgreSQL notification
    payload = json.dumps({
        "tenant_id": tenant_id,
        "type": "notification",
        "data": {"message": message},
    })
    await db.execute(
        text("SELECT pg_notify('notifications', :payload)"),
        {"payload": payload},
    )
    
    await db.commit()
    
    return {"ok": True}

The React side

// src/hooks/useNotifications.ts
import { useEffect, useRef } from "react";

export function useNotifications(tenantId: string, onMessage: (data: any) => void) {
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
    const wsUrl = `${protocol}//${window.location.host}/ws/notifications?tenant_id=${tenantId}`;

    const ws = new WebSocket(wsUrl);

    ws.onopen = () => {
      console.log("✓ Real-time connection established");
    };

    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      onMessage(message);
    };

    ws.onerror = () => {
      console.error("WebSocket error");
    };

    wsRef.current = ws;

    return () => {
      ws.close();
    };
  }, [tenantId, onMessage]);
}

// Usage in a component
export function NotificationCenter({ tenantId }: { tenantId: string }) {
  const [notifications, setNotifications] = useState<any[]>([]);

  useNotifications(tenantId, (message) => {
    setNotifications((prev) => [message, ...prev]);
  });

  return (
    <div className="space-y-2">
      {notifications.map((notif, i) => (
        <div key={i} className="p-3 bg-blue-50 rounded border-l-4 border-blue-400">
          {notif.data.message}
        </div>
      ))}
    </div>
  );
}

Broadcasting AI job completions

This is where LISTEN/NOTIFY shines in CitizenApp. When an AI feature finishes processing, we immediately notify the user without polling:

# backend/celery_task.py (or similar background job)
async def process_ai_feature(tenant_id: str, job_id: str, db: AsyncSession):
    # ... do AI work with Claude ...
    result = await call_claude_api(...)
    
    # Save result
    await db.execute(
        text("""
            UPDATE ai_jobs 
            SET result = :result, status = 'completed'
            WHERE id = :job_id AND tenant_id = :tenant_id
        """),
        {"result": json.dumps(result), "job_id": job_id, "tenant_id": tenant_id},
    )
    
    # Notify the tenant in real-time
    payload = json.dumps({
        "tenant_id": tenant_id,
        "type": "ai_job_complete",
        "data": {"job_id": job_id, "result": result},
    })
    await db.execute(
        text("SELECT pg_notify('notifications', :payload)"),
        {"payload": payload},
    )
    
    await db.commit()

Gotcha: Connection pooling and LISTEN

Here’s where I burned myself: PostgreSQL LISTEN connections can’t be reused in a connection pool. Once a connection executes LISTEN, it’s pinned to that listener—other queries fail.

Solution: Run LISTEN/NOTIFY on a separate database connection, not the one your FastAPI queries use:

# Wrong: connection pool conflict
async with engine.begin() as conn:
    await conn.execute(text("LISTEN notifications"))
You might also like

Building something like this?

I build production-grade Python + React applications. Let's talk about your project.

Get in touch