====== How to Deploy an Agent ======
A practical guide to deploying AI agents to production. Covers Docker, serverless, platform deployments, plus essential production concerns: health checks, monitoring, error handling, rate limiting, and cost control.
===== Deployment Architecture Overview =====
graph TB
subgraph Client Layer
A[Web App] --> LB[Load Balancer]
B[Mobile App] --> LB
C[API Consumer] --> LB
end
subgraph Application Layer
LB --> D[FastAPI Agent Service]
D --> E[Rate Limiter]
E --> F[Agent Logic]
end
subgraph External Services
F --> G[LLM API]
F --> H[Vector DB]
F --> I[Tool APIs]
end
subgraph Infrastructure
D --> J[Redis Cache]
D --> K[Prometheus Metrics]
K --> L[Grafana Dashboard]
end
===== The Base: FastAPI Agent Service =====
All deployment methods share a common FastAPI application. Build this first, then deploy anywhere.
==== Project Structure ====
agent-service/
├── main.py # FastAPI app
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── fly.toml # (optional) Fly.io config
==== main.py ====
import hashlib
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
app = FastAPI(title="Agent Service")
client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
# --- Models ---
class AgentRequest(BaseModel):
prompt: str
max_tokens: int = 1000
user_id: str = "anonymous"
class AgentResponse(BaseModel):
result: str
tokens_used: int
# --- Retry logic for LLM calls ---
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def call_llm(prompt: str, max_tokens: int) -> dict:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens
)
return {
"content": response.choices[0].message.content,
"tokens": response.usage.total_tokens
}
# --- Health checks ---
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/ready")
async def ready():
try:
# Verify LLM API is reachable
await client.models.list()
return {"status": "ready"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Not ready: {e}")
# --- Main endpoint ---
@app.post("/invoke", response_model=AgentResponse)
async def invoke_agent(request: AgentRequest):
try:
result = await call_llm(request.prompt, request.max_tokens)
return AgentResponse(
result=result["content"],
tokens_used=result["tokens"]
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Agent error: {str(e)}")
==== requirements.txt ====
fastapi>=0.115.0
uvicorn[standard]>=0.32.0
openai>=1.50.0
pydantic>=2.9.0
tenacity>=8.2.0
slowapi>=0.1.9
prometheus-fastapi-instrumentator>=6.1.0
redis>=5.0.0
===== Deployment Option 1: Docker + Docker Compose =====
==== Dockerfile ====
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
==== docker-compose.yml ====
services:
agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- redis
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
redis_data:
# Build and run
docker compose up --build -d
# Test
curl http://localhost:8000/health
curl -X POST http://localhost:8000/invoke \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, agent!", "user_id": "test"}'
===== Deployment Option 2: Fly.io =====
Fly.io deploys containers globally with edge compute and auto-scaling.
==== fly.toml ====
app = "my-agent-service"
primary_region = "iad"
[build]
dockerfile = "Dockerfile"
[http_service]
internal_port = 8000
force_https = true
auto_stop_machines = true
auto_start_machines = true
min_machines_running = 1
[http_service.checks]
[http_service.checks.health]
interval = "30s"
timeout = "5s"
path = "/health"
method = "GET"
[env]
# Non-secret env vars here
[[vm]]
size = "shared-cpu-1x"
memory = "512mb"
# Install flyctl, login, then:
fly launch --no-deploy
fly secrets set OPENAI_API_KEY="your-key"
fly deploy
# Scale
fly scale count 3
fly scale vm shared-cpu-2x --memory 1024
===== Deployment Option 3: Modal (Serverless + GPU) =====
Modal is ideal for AI workloads — scales to zero, supports GPUs, Python-native.
import modal
app = modal.App("agent-service")
image = modal.Image.debian_slim().pip_install(
"fastapi", "uvicorn", "openai", "pydantic"
)
@app.function(
image=image,
secrets=[modal.Secret.from_name("openai-secret")],
container_idle_timeout=300,
allow_concurrent_inputs=10
)
@modal.asgi_app()
def serve():
from main import app as fastapi_app
return fastapi_app
# Deploy
modal deploy modal_app.py
# Logs
modal app logs agent-service
===== Deployment Option 4: AWS Lambda =====
Cost-effective for low-traffic agents. Use Mangum to adapt FastAPI to Lambda.
# lambda_handler.py
from mangum import Mangum
from main import app
handler = Mangum(app)
pip install mangum
# Deploy via SAM, CDK, or Serverless Framework
# Set OPENAI_API_KEY in Lambda environment variables
**Lambda limits to be aware of:** 15 min timeout, 10 GB RAM max, cold starts of 1-3s.
===== Deployment Option 5: Railway =====
Railway auto-deploys from GitHub with built-in databases.
# Procfile
web: uvicorn main:app --host 0.0.0.0 --port $PORT
- Connect your GitHub repo to Railway
- Add environment variables (OPENAI_API_KEY)
- Railway auto-detects Dockerfile or Procfile and deploys
- Get a ''*.railway.app'' URL instantly
===== Deployment Comparison =====
^ Platform ^ Scaling ^ Cold Start ^ GPU Support ^ Cost Model ^ Best For ^
| Docker (self-hosted) | Manual | None | Yes (with nvidia-docker) | Fixed server cost | Full control, on-prem |
| Fly.io | Auto | ~1s | No | Pay per VM-second | Low-latency global APIs |
| Modal | Auto (to zero) | ~2s | Yes (A100, H100) | Pay per second | GPU workloads, bursty traffic |
| AWS Lambda | Auto (to zero) | 1-3s | No | Pay per invocation | Low-traffic, event-driven |
| Railway | Auto | ~2s | No | Usage-based | Quick prototypes from GitHub |
===== Production Essentials =====
==== Monitoring with Prometheus ====
# Add to main.py
from prometheus_fastapi_instrumentator import Instrumentator
instrumentator = Instrumentator(
should_group_status_codes=True,
should_respect_env_var=False
)
instrumentator.instrument(app).expose(app, endpoint="/metrics")
==== prometheus.yml ====
global:
scrape_interval: 15s
scrape_configs:
- job_name: "agent-service"
static_configs:
- targets: ["agent:8000"]
==== Rate Limiting ====
# Add to main.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/invoke", response_model=AgentResponse)
@limiter.limit("10/minute")
async def invoke_agent(request: Request, agent_request: AgentRequest):
# ... same implementation
pass
==== Cost Control with Token Budgets ====
import redis.asyncio as redis
redis_client = redis.from_url("redis://redis:6379")
async def check_budget(user_id: str, tokens_requested: int, daily_limit: int = 100_000):
key = f"tokens:{user_id}:{datetime.now().strftime('%Y-%m-%d')}"
current = int(await redis_client.get(key) or 0)
if current + tokens_requested > daily_limit:
raise HTTPException(
status_code=429,
detail=f"Daily token budget exceeded ({current}/{daily_limit})"
)
async def record_usage(user_id: str, tokens_used: int):
key = f"tokens:{user_id}:{datetime.now().strftime('%Y-%m-%d')}"
await redis_client.incrby(key, tokens_used)
await redis_client.expire(key, 86400) # Expire after 24h
==== Semantic Response Caching ====
import hashlib
async def get_cached_response(prompt: str) -> str | None:
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()
cached = await redis_client.get(f"cache:{prompt_hash}")
return cached.decode() if cached else None
async def cache_response(prompt: str, response: str, ttl: int = 3600):
prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()
await redis_client.setex(f"cache:{prompt_hash}", ttl, response)
===== Production Checklist =====
* ☐ Health check endpoints (''/health'', ''/ready'')
* ☐ Structured logging (JSON format)
* ☐ Retry logic with exponential backoff
* ☐ Rate limiting per user/IP
* ☐ Token budget enforcement
* ☐ Response caching for repeated queries
* ☐ Prometheus metrics + Grafana dashboards
* ☐ HTTPS enforcement
* ☐ Input validation (Pydantic models)
* ☐ Error handling that never leaks internals
* ☐ Secrets in environment variables, never in code
===== See Also =====
* [[how_to_build_a_rag_pipeline|How to Build a RAG Pipeline]]
* [[how_to_evaluate_an_agent|How to Evaluate an Agent]]
* [[how_to_use_mcp|How to Use MCP]]
{{tag>deployment docker fly-io modal serverless fastapi production how-to}}