Build an LLM-powered REST API with FastAPI
Table of Contents
Most LLM wrappers people ship are glorified requests.post() calls with no error handling, no rate limiting, and logs full of user prompts. This post walks you through building a proper production-grade FastAPI service that wraps Claude or OpenAI — with Pydantic v2 models, async SSE streaming, per-key rate limiting, exponential backoff retries, safe logging, and OpenTelemetry tracing you can actually use to debug latency.
If you’re new to working with Claude’s API directly, start with Getting Started with Claude API in Python before continuing here.
Project setup and dependencies
Create a virtual environment and install everything you need:
python -m venv .venv && source .venv/bin/activate
pip install fastapi==0.115.12 uvicorn[standard]==0.34.2 gunicorn==23.0.0 \
anthropic==0.52.0 openai==1.82.0 pydantic==2.11.4 \
slowapi==0.1.9 redis==5.2.1 httpx==0.28.1 \
opentelemetry-sdk==1.33.0 opentelemetry-instrumentation-fastapi==0.54b0 \
opentelemetry-exporter-otlp==1.33.0 tenacity==9.1.2
Your project structure:
llm_api/
├── main.py
├── config.py
├── middleware.py
├── routers/
│ └── chat.py
├── services/
│ └── llm.py
├── utils/
│ ├── logging.py
│ └── tracing.py
└── requirements.txt
Pydantic v2 models and async request handlers
Pydantic v2 uses model_config instead of the inner class Config pattern from v1. Define your request and response models cleanly:
# routers/chat.py
from pydantic import BaseModel, Field, model_validator
from typing import Literal, Optional
class Message(BaseModel):
role: Literal["user", "assistant", "system"]
content: str = Field(..., min_length=1, max_length=32_000)
class ChatRequest(BaseModel):
model_config = {"str_strip_whitespace": True}
messages: list[Message] = Field(..., min_length=1)
model: str = Field(default="claude-opus-4-5")
max_tokens: int = Field(default=1024, ge=1, le=8192)
temperature: float = Field(default=0.7, ge=0.0, le=1.0)
stream: bool = False
@model_validator(mode="after")
def check_last_message_is_user(self) -> "ChatRequest":
if self.messages[-1].role != "user":
raise ValueError("Last message must have role='user'")
return self
class ChatResponse(BaseModel):
id: str
content: str
model: str
input_tokens: int
output_tokens: int
Your async route handler:
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from .chat import ChatRequest, ChatResponse
from services.llm import complete, stream_complete
from middleware import verify_api_key, limiter
router = APIRouter(prefix="/v1")
@router.post("/chat", response_model=ChatResponse)
@limiter.limit("20/minute")
async def chat(request: Request, body: ChatRequest, _=Depends(verify_api_key)):
if body.stream:
return StreamingResponse(
stream_complete(body),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no", "Cache-Control": "no-cache"},
)
return await complete(body)
The X-Accel-Buffering: no header tells Nginx not to buffer the response — without it, your SSE stream will appear to hang until the buffer fills.
Token-by-token SSE streaming with StreamingResponse
Here is the LLM service layer with streaming for both Claude and OpenAI, plus retries via tenacity:
# services/llm.py
import asyncio, uuid, json, logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from anthropic import AsyncAnthropic, APIStatusError, APITimeoutError
from openai import AsyncOpenAI
from utils.logging import scrub_prompt
from utils.tracing import tracer
logger = logging.getLogger(__name__)
anthropic_client = AsyncAnthropic(timeout=30.0)
openai_client = AsyncOpenAI(timeout=30.0)
RETRYABLE_STATUS = {429, 529, 503}
def _is_retryable(exc: Exception) -> bool:
if isinstance(exc, APIStatusError):
return exc.status_code in RETRYABLE_STATUS
return isinstance(exc, (APITimeoutError, asyncio.TimeoutError))
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=1, max=16),
retry=retry_if_exception_type(Exception),
reraise=True,
)
async def complete(body):
with tracer.start_as_current_span("llm.complete") as span:
span.set_attribute("llm.model", body.model)
span.set_attribute("llm.max_tokens", body.max_tokens)
try:
if body.model.startswith("claude"):
resp = await anthropic_client.messages.create(
model=body.model,
max_tokens=body.max_tokens,
temperature=body.temperature,
messages=[m.model_dump() for m in body.messages],
)
return {
"id": resp.id,
"content": resp.content[0].text,
"model": resp.model,
"input_tokens": resp.usage.input_tokens,
"output_tokens": resp.usage.output_tokens,
}
else:
resp = await openai_client.chat.completions.create(
model=body.model,
max_tokens=body.max_tokens,
temperature=body.temperature,
messages=[m.model_dump() for m in body.messages],
)
return {
"id": resp.id,
"content": resp.choices[0].message.content,
"model": resp.model,
"input_tokens": resp.usage.prompt_tokens,
"output_tokens": resp.usage.completion_tokens,
}
except Exception as exc:
if not _is_retryable(exc):
raise
logger.warning("LLM upstream error, retrying: %s", type(exc).__name__)
raise
async def stream_complete(body):
"""Yields SSE-formatted chunks for token-by-token streaming."""
with tracer.start_as_current_span("llm.stream"):
try:
if body.model.startswith("claude"):
async with anthropic_client.messages.stream(
model=body.model,
max_tokens=body.max_tokens,
temperature=body.temperature,
messages=[m.model_dump() for m in body.messages],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'delta': text})}\n\n"
else:
stream = await openai_client.chat.completions.create(
model=body.model,
max_tokens=body.max_tokens,
temperature=body.temperature,
messages=[m.model_dump() for m in body.messages],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if delta:
yield f"data: {json.dumps({'delta': delta})}\n\n"
yield "data: [DONE]\n\n"
except Exception as exc:
error_payload = json.dumps({"error": type(exc).__name__, "detail": str(exc)})
yield f"data: {error_payload}\n\n"
API key middleware and per-key rate limiting
Use slowapi for rate limiting (it wraps limits and integrates cleanly with FastAPI) and a simple bearer token check:
# middleware.py
import os
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from slowapi import Limiter
from slowapi.util import get_remote_address
VALID_API_KEYS = set(os.getenv("API_KEYS", "").split(","))
bearer = HTTPBearer()
limiter = Limiter(key_func=get_remote_address) # swap for key-based func if needed
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(bearer)):
if credentials.credentials not in VALID_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
For per-key rate limiting with Redis (better for multi-worker deployments), replace key_func with:
from slowapi import Limiter
def get_api_key(request):
auth = request.headers.get("Authorization", "")
return auth.replace("Bearer ", "").strip() or get_remote_address(request)
limiter = Limiter(
key_func=get_api_key,
storage_uri="redis://localhost:6379",
)
This way each API key gets its own rate limit bucket shared across all Gunicorn workers.
Safe logging, structured error handling, and OpenTelemetry tracing
Never log raw prompt content — scrub it before it hits your log pipeline:
# utils/logging.py
import re
_CONTENT_RE = re.compile(r'"content"\s*:\s*"[^"]{20,}"')
def scrub_prompt(data: dict) -> dict:
"""Returns a shallow copy with message content replaced by length hint."""
messages = data.get("messages", [])
scrubbed = []
for m in messages:
scrubbed.append({**m, "content": f"<scrubbed len={len(m.get('content',''))}>"})
return {**data, "messages": scrubbed}
Wire OpenTelemetry into your app startup:
# utils/tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
def instrument_app(app):
FastAPIInstrumentor.instrument_app(app)
In main.py:
from fastapi import FastAPI
from slowapi.errors import RateLimitExceeded
from slowapi import _rate_limit_exceeded_handler
from routers.chat import router
from middleware import limiter
from utils.tracing import instrument_app
app = FastAPI(title="LLM API", version="1.0.0")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.include_router(router)
instrument_app(app)
Deploy with Uvicorn and Gunicorn
For production, Gunicorn manages worker processes while Uvicorn handles async I/O within each worker. A sensible formula for LLM workloads (which are I/O-bound, not CPU-bound) is (2 × CPU cores) + 1 workers:
gunicorn main:app \
--workers 5 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--graceful-timeout 30 \
--keep-alive 5 \
--access-logfile - \
--error-logfile -
Set --timeout 120 to handle slow LLM responses without Gunicorn killing the worker mid-stream. For containerized deployments, pin your worker count via an environment variable rather than auto-detecting CPU cores — cloud containers often report the host’s core count, not the container’s allocated vCPUs.
If you’re familiar with building REST APIs in other stacks, the patterns here map closely to what’s covered in Building a RESTful API with PHP and Laravel — the middleware and error handling philosophy is the same, just async.
Key takeaways
- Pydantic v2 + async handlers give you fast validation and non-blocking I/O — always set
timeouton your upstream LLM clients, not just at the Gunicorn level. - Retry with exponential backoff on 429/529/503 from upstream LLMs, but fail fast on 400/401/422 — those are your bugs, not transient errors.
- Scrub prompts before logging, rate-limit per API key using Redis-backed
slowapiso limits survive worker restarts, and add OpenTelemetry spans around every LLM call so you can see tail latency in production rather than guessing.
For the next step — giving your API tool-use capabilities so the LLM can call external services — check out Claude Tool Use: A Practical Guide to Function Calling.