====== How to Deploy an Agent ====== A practical guide to deploying AI agents to production. Covers Docker, serverless, platform deployments, plus essential production concerns: health checks, monitoring, error handling, rate limiting, and cost control. ===== Deployment Architecture Overview ===== graph TB subgraph Client Layer A[Web App] --> LB[Load Balancer] B[Mobile App] --> LB C[API Consumer] --> LB end subgraph Application Layer LB --> D[FastAPI Agent Service] D --> E[Rate Limiter] E --> F[Agent Logic] end subgraph External Services F --> G[LLM API] F --> H[Vector DB] F --> I[Tool APIs] end subgraph Infrastructure D --> J[Redis Cache] D --> K[Prometheus Metrics] K --> L[Grafana Dashboard] end ===== The Base: FastAPI Agent Service ===== All deployment methods share a common FastAPI application. Build this first, then deploy anywhere. ==== Project Structure ==== agent-service/ ├── main.py # FastAPI app ├── requirements.txt ├── Dockerfile ├── docker-compose.yml └── fly.toml # (optional) Fly.io config ==== main.py ==== import hashlib import os from fastapi import FastAPI, HTTPException from pydantic import BaseModel from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential app = FastAPI(title="Agent Service") client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"]) # --- Models --- class AgentRequest(BaseModel): prompt: str max_tokens: int = 1000 user_id: str = "anonymous" class AgentResponse(BaseModel): result: str tokens_used: int # --- Retry logic for LLM calls --- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) async def call_llm(prompt: str, max_tokens: int) -> dict: response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens ) return { "content": response.choices[0].message.content, "tokens": response.usage.total_tokens } # --- Health checks --- @app.get("/health") async def health(): return {"status": "healthy"} @app.get("/ready") async def ready(): try: # Verify LLM API is reachable await client.models.list() return {"status": "ready"} except Exception as e: raise HTTPException(status_code=503, detail=f"Not ready: {e}") # --- Main endpoint --- @app.post("/invoke", response_model=AgentResponse) async def invoke_agent(request: AgentRequest): try: result = await call_llm(request.prompt, request.max_tokens) return AgentResponse( result=result["content"], tokens_used=result["tokens"] ) except Exception as e: raise HTTPException(status_code=500, detail=f"Agent error: {str(e)}") ==== requirements.txt ==== fastapi>=0.115.0 uvicorn[standard]>=0.32.0 openai>=1.50.0 pydantic>=2.9.0 tenacity>=8.2.0 slowapi>=0.1.9 prometheus-fastapi-instrumentator>=6.1.0 redis>=5.0.0 ===== Deployment Option 1: Docker + Docker Compose ===== ==== Dockerfile ==== FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] ==== docker-compose.yml ==== services: agent: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} depends_on: - redis restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data prometheus: image: prom/prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin volumes: redis_data: # Build and run docker compose up --build -d # Test curl http://localhost:8000/health curl -X POST http://localhost:8000/invoke \ -H "Content-Type: application/json" \ -d '{"prompt": "Hello, agent!", "user_id": "test"}' ===== Deployment Option 2: Fly.io ===== Fly.io deploys containers globally with edge compute and auto-scaling. ==== fly.toml ==== app = "my-agent-service" primary_region = "iad" [build] dockerfile = "Dockerfile" [http_service] internal_port = 8000 force_https = true auto_stop_machines = true auto_start_machines = true min_machines_running = 1 [http_service.checks] [http_service.checks.health] interval = "30s" timeout = "5s" path = "/health" method = "GET" [env] # Non-secret env vars here [[vm]] size = "shared-cpu-1x" memory = "512mb" # Install flyctl, login, then: fly launch --no-deploy fly secrets set OPENAI_API_KEY="your-key" fly deploy # Scale fly scale count 3 fly scale vm shared-cpu-2x --memory 1024 ===== Deployment Option 3: Modal (Serverless + GPU) ===== Modal is ideal for AI workloads — scales to zero, supports GPUs, Python-native. import modal app = modal.App("agent-service") image = modal.Image.debian_slim().pip_install( "fastapi", "uvicorn", "openai", "pydantic" ) @app.function( image=image, secrets=[modal.Secret.from_name("openai-secret")], container_idle_timeout=300, allow_concurrent_inputs=10 ) @modal.asgi_app() def serve(): from main import app as fastapi_app return fastapi_app # Deploy modal deploy modal_app.py # Logs modal app logs agent-service ===== Deployment Option 4: AWS Lambda ===== Cost-effective for low-traffic agents. Use Mangum to adapt FastAPI to Lambda. # lambda_handler.py from mangum import Mangum from main import app handler = Mangum(app) pip install mangum # Deploy via SAM, CDK, or Serverless Framework # Set OPENAI_API_KEY in Lambda environment variables **Lambda limits to be aware of:** 15 min timeout, 10 GB RAM max, cold starts of 1-3s. ===== Deployment Option 5: Railway ===== Railway auto-deploys from GitHub with built-in databases. # Procfile web: uvicorn main:app --host 0.0.0.0 --port $PORT - Connect your GitHub repo to Railway - Add environment variables (OPENAI_API_KEY) - Railway auto-detects Dockerfile or Procfile and deploys - Get a ''*.railway.app'' URL instantly ===== Deployment Comparison ===== ^ Platform ^ Scaling ^ Cold Start ^ GPU Support ^ Cost Model ^ Best For ^ | Docker (self-hosted) | Manual | None | Yes (with nvidia-docker) | Fixed server cost | Full control, on-prem | | Fly.io | Auto | ~1s | No | Pay per VM-second | Low-latency global APIs | | Modal | Auto (to zero) | ~2s | Yes (A100, H100) | Pay per second | GPU workloads, bursty traffic | | AWS Lambda | Auto (to zero) | 1-3s | No | Pay per invocation | Low-traffic, event-driven | | Railway | Auto | ~2s | No | Usage-based | Quick prototypes from GitHub | ===== Production Essentials ===== ==== Monitoring with Prometheus ==== # Add to main.py from prometheus_fastapi_instrumentator import Instrumentator instrumentator = Instrumentator( should_group_status_codes=True, should_respect_env_var=False ) instrumentator.instrument(app).expose(app, endpoint="/metrics") ==== prometheus.yml ==== global: scrape_interval: 15s scrape_configs: - job_name: "agent-service" static_configs: - targets: ["agent:8000"] ==== Rate Limiting ==== # Add to main.py from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from starlette.requests import Request limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.post("/invoke", response_model=AgentResponse) @limiter.limit("10/minute") async def invoke_agent(request: Request, agent_request: AgentRequest): # ... same implementation pass ==== Cost Control with Token Budgets ==== import redis.asyncio as redis redis_client = redis.from_url("redis://redis:6379") async def check_budget(user_id: str, tokens_requested: int, daily_limit: int = 100_000): key = f"tokens:{user_id}:{datetime.now().strftime('%Y-%m-%d')}" current = int(await redis_client.get(key) or 0) if current + tokens_requested > daily_limit: raise HTTPException( status_code=429, detail=f"Daily token budget exceeded ({current}/{daily_limit})" ) async def record_usage(user_id: str, tokens_used: int): key = f"tokens:{user_id}:{datetime.now().strftime('%Y-%m-%d')}" await redis_client.incrby(key, tokens_used) await redis_client.expire(key, 86400) # Expire after 24h ==== Semantic Response Caching ==== import hashlib async def get_cached_response(prompt: str) -> str | None: prompt_hash = hashlib.sha256(prompt.encode()).hexdigest() cached = await redis_client.get(f"cache:{prompt_hash}") return cached.decode() if cached else None async def cache_response(prompt: str, response: str, ttl: int = 3600): prompt_hash = hashlib.sha256(prompt.encode()).hexdigest() await redis_client.setex(f"cache:{prompt_hash}", ttl, response) ===== Production Checklist ===== * ☐ Health check endpoints (''/health'', ''/ready'') * ☐ Structured logging (JSON format) * ☐ Retry logic with exponential backoff * ☐ Rate limiting per user/IP * ☐ Token budget enforcement * ☐ Response caching for repeated queries * ☐ Prometheus metrics + Grafana dashboards * ☐ HTTPS enforcement * ☐ Input validation (Pydantic models) * ☐ Error handling that never leaks internals * ☐ Secrets in environment variables, never in code ===== See Also ===== * [[how_to_build_a_rag_pipeline|How to Build a RAG Pipeline]] * [[how_to_evaluate_an_agent|How to Evaluate an Agent]] * [[how_to_use_mcp|How to Use MCP]] {{tag>deployment docker fly-io modal serverless fastapi production how-to}}