====== How to Handle Rate Limits ======
A practical guide to handling API rate limits across all major LLM providers. Includes real rate limit values, retry strategies, multi-provider fallback chains, and production-ready code((AI Free API, "Claude API Quota Tiers and Limits Explained," 2026 — [[https://www.aifreeapi.com/en/posts/claude-api-quota-tiers-limits]]))((Requesty, "Rate Limits for LLM Providers," 2025 — [[https://www.requesty.ai/blog/rate-limits-for-llm-providers-openai-anthropic-and-deepseek]])).
===== Why Rate Limits Exist =====
Every LLM provider enforces rate limits to prevent abuse, ensure fair access, and manage infrastructure load. Rate limits are measured across multiple dimensions:((Vellum, "How to Manage OpenAI Rate Limits," 2025 — [[https://vellum.ai/blog/how-to-manage-openai-rate-limits-as-you-scale-your-app]]))
* **RPM** — Requests per minute
* **TPM** — Tokens per minute (sometimes split into ITPM/OTPM for input/output)
* **RPD** — Requests per day
* **IPM** — Images per minute (for multimodal models)
Exceeding any single dimension triggers an HTTP 429 (Too Many Requests) error.
===== Rate Limits by Provider (2025-2026) =====
==== OpenAI ====
^ Tier ^ Requirement ^ GPT-4o RPM ^ GPT-4o TPM ^ GPT-4o-mini RPM ^ GPT-4o-mini TPM ^
| Free | Email verify | 3 | 40,000 | 3 | 40,000 |
| Tier 1 | $5 paid | 500 | 30,000 | 500 | 200,000 |
| Tier 2 | $50 paid + 7 days | 5,000 | 450,000 | 5,000 | 2,000,000 |
| Tier 3 | $100 paid + 7 days | 5,000 | 800,000 | 5,000 | 4,000,000 |
| Tier 4 | $250 paid + 14 days | 10,000 | 2,000,000 | 10,000 | 10,000,000 |
| Tier 5 | $1,000 paid + 30 days | 10,000 | 12,000,000 | 10,000 | 15,000,000 |
==== Anthropic (Claude) ====
^ Tier ^ Requirement ^ Claude Sonnet RPM ^ Claude Sonnet ITPM ^ Claude Sonnet OTPM ^
| Tier 1 | $5 deposit | 50 | 20,000 | 4,000 |
| Tier 2 | $40 spent | 1,000 | 100,000 | 20,000 |
| Tier 3 | $200 spent | 2,000 | 200,000 | 40,000 |
| Tier 4 | $400 spent | 4,000 | 400,000 | 80,000 |
Anthropic uses a token bucket algorithm — capacity replenishes continuously rather than resetting at fixed intervals. Cached tokens from prompt caching do NOT count toward ITPM limits, potentially 5-10x effective throughput((Anthropic Rate Limits — [[https://docs.anthropic.com/en/api/rate-limits]])).
==== Google Gemini ====
^ Tier ^ Requirement ^ Gemini 2.0 Flash RPM ^ Gemini 2.0 Flash TPM ^ Gemini 1.5 Pro RPM ^
| Free | No card needed | 10 | 250,000 | 5 |
| Tier 1 | Enable billing | 200 | 1,000,000 | 150 |
| Tier 2 | $250 spent + 30 days | 1,000 | 2,000,000 | 500 |
| Tier 3 | $1,000 spent | 2,000+ | Custom | 1,000+ |
**Important:** December 2025 brought 50-92% reductions to free tier quotas. Flash model dropped from 250 to 20 RPD.
==== Other Providers ====
^ Provider ^ Free Tier RPM ^ Paid RPM ^ Notes ^
| Groq | 30 | 300+ | Extremely fast inference, generous for open models |((OpenAI Rate Limits Documentation — [[https://platform.openai.com/docs/guides/rate-limits]]))
| Mistral | 5 | 500+ | Tiered by plan (Experiment/Production) |
| Together AI | 60 | 600+ | Focus on open-source models |
| Fireworks AI | 100 | 600+ | Optimized for throughput |
===== Diagnostic Flowchart =====
graph TD
A[Getting 429 Error] --> B{Which dimension hit?}
B -->|RPM| C{Are requests bursty?}
B -->|TPM| D{Are prompts large?}
B -->|RPD| E{Daily volume too high?}
C -->|Yes| F[Add request queuing with spacing]
C -->|No| G{On correct tier?}
G -->|No| H[Upgrade provider tier]
G -->|Yes| I[Add provider fallback]
D -->|Yes| J[Compress prompts + use caching]
D -->|No| K[Reduce concurrent requests]
E -->|Yes| L{Budget allows upgrade?}
L -->|Yes| H
L -->|No| M[Implement token budgeting per user]
F --> N[Implement exponential backoff]
I --> O[Multi-provider fallback chain]
J --> P[Use prompt caching to reduce token count]
===== Strategy 1: Exponential Backoff with Jitter =====
The standard approach. On 429, wait progressively longer with random jitter to avoid thundering herd.
import time
import random
import httpx
from typing import Any
class RetryWithBackoff:
"""Retry API calls with exponential backoff and jitter."""
def __init__(self, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
def execute(self, func, *args, **kwargs) -> Any:
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
error_code = getattr(e, 'status_code', None)
# Only retry on rate limit (429) or server errors (5xx)
if error_code and error_code not in (429, 500, 502, 503, 504):
raise # Don't retry client errors like 400, 401, 403
# Calculate delay: exponential backoff + random jitter
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
jitter = random.uniform(0, delay * 0.5)
total_delay = delay + jitter
# Check for Retry-After header
retry_after = getattr(e, 'headers', {}).get('retry-after')
if retry_after:
total_delay = max(total_delay, float(retry_after))
print(f"Rate limited (attempt {attempt + 1}/{self.max_retries}). "
f"Waiting {total_delay:.1f}s...")
time.sleep(total_delay)
raise last_exception
# Usage with OpenAI
from openai import OpenAI
client = OpenAI()
retryer = RetryWithBackoff(max_retries=5, base_delay=1.0)
response = retryer.execute(
client.chat.completions.create,
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
===== Strategy 2: Multi-Provider Fallback Chain =====
When one provider is rate-limited, automatically fall through to the next. This is the most effective strategy for high-throughput applications.
import os
from dataclasses import dataclass, field
from typing import Optional
import openai
import anthropic
@dataclass
class ProviderConfig:
name: str
model: str
client: Any
priority: int # Lower = preferred
cost_per_1k_input: float
cost_per_1k_output: float
failures: int = 0
last_failure: float = 0.0
class MultiProviderFallback:
"""Route requests across multiple LLM providers with automatic fallback."""
def __init__(self):
self.providers: list[ProviderConfig] = []
self.backoff = RetryWithBackoff(max_retries=2, base_delay=0.5)
def add_openai(self, model: str = "gpt-4o", priority: int = 1):
self.providers.append(ProviderConfig(
name="openai", model=model,
client=openai.OpenAI(),
priority=priority,
cost_per_1k_input=0.0025, cost_per_1k_output=0.01
))
def add_anthropic(self, model: str = "claude-sonnet-4-20250514", priority: int = 2):
self.providers.append(ProviderConfig(
name="anthropic", model=model,
client=anthropic.Anthropic(),
priority=priority,
cost_per_1k_input=0.003, cost_per_1k_output=0.015
))
def add_google(self, model: str = "gemini-2.0-flash", priority: int = 3):
import google.generativeai as genai
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
self.providers.append(ProviderConfig(
name="google", model=model,
client=genai,
priority=priority,
cost_per_1k_input=0.0001, cost_per_1k_output=0.0004
))
def chat(self, messages: list[dict], **kwargs) -> dict:
"""Send request with automatic provider fallback."""
sorted_providers = sorted(self.providers, key=lambda p: (p.failures, p.priority))
errors = []
for provider in sorted_providers:
try:
result = self._call_provider(provider, messages, **kwargs)
provider.failures = max(0, provider.failures - 1) # Recover on success
return {"provider": provider.name, "model": provider.model, "content": result}
except Exception as e:
provider.failures += 1
provider.last_failure = __import__('time').time()
errors.append(f"{provider.name}: {e}")
continue
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
def _call_provider(self, provider: ProviderConfig, messages: list, **kwargs) -> str:
if provider.name == "openai":
resp = provider.client.chat.completions.create(
model=provider.model, messages=messages, **kwargs
)
return resp.choices[0].message.content
elif provider.name == "anthropic":
# Convert OpenAI format to Anthropic format
system = next((m["content"] for m in messages if m["role"] == "system"), "")
user_msgs = [m for m in messages if m["role"] != "system"]
resp = provider.client.messages.create(
model=provider.model,
system=system,
messages=user_msgs,
max_tokens=kwargs.get("max_tokens", 4096)
)
return resp.content[0].text
elif provider.name == "google":
model = provider.client.GenerativeModel(provider.model)
prompt = "\n".join(m["content"] for m in messages)
resp = model.generate_content(prompt)
return resp.text
# Usage
fallback = MultiProviderFallback()
fallback.add_openai(priority=1) # Preferred
fallback.add_anthropic(priority=2) # First fallback
fallback.add_google(priority=3) # Budget fallback((Google Gemini API Rate Limits — [[https://ai.google.dev/pricing]]))((AI Free API, "Gemini API Rate Limits 2026," 2026 — [[https://blog.laozhang.ai/en/posts/gemini-api-rate-limits-guide]]))
result = fallback.chat([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain rate limiting."}
])
print(f"Answered by {result['provider']} ({result['model']})")
===== Strategy 3: Request Queue with Rate Tracking =====
Buffer requests and release them at a controlled rate to stay under limits.
import asyncio
import time
from collections import deque
class TokenBucketRateLimiter:
"""Token bucket rate limiter matching how providers enforce limits."""
def __init__(self, rpm: int = 500, tpm: int = 200_000):
self.rpm = rpm
self.tpm = tpm
self.request_timestamps = deque()
self.token_usage = deque() # (timestamp, tokens)
def _cleanup(self, window: deque, seconds: int = 60):
cutoff = time.time() - seconds
while window and window[0] < cutoff:
window.popleft()
def _cleanup_tokens(self, seconds: int = 60):
cutoff = time.time() - seconds
while self.token_usage and self.token_usage[0][0] < cutoff:
self.token_usage.popleft()
async def acquire(self, estimated_tokens: int = 1000):
"""Wait until we can make a request within rate limits."""
while True:
now = time.time()
self._cleanup(self.request_timestamps)
self._cleanup_tokens()
current_rpm = len(self.request_timestamps)
current_tpm = sum(t for _, t in self.token_usage)
if current_rpm < self.rpm and (current_tpm + estimated_tokens) < self.tpm:
self.request_timestamps.append(now)
self.token_usage.append((now, estimated_tokens))
return # Proceed with request
# Calculate wait time
if current_rpm >= self.rpm:
wait = 60 - (now - self.request_timestamps[0])
else:
wait = 60 - (now - self.token_usage[0][0])
await asyncio.sleep(max(wait, 0.1))
def update_actual_tokens(self, estimated: int, actual: int):
"""Correct token count after receiving response."""
# Find and update the most recent matching estimate
for i in range(len(self.token_usage) - 1, -1, -1):
if self.token_usage[i][1] == estimated:
ts = self.token_usage[i][0]
self.token_usage[i] = (ts, actual)
break
# Usage
limiter = TokenBucketRateLimiter(rpm=500, tpm=200_000)
async def make_request(client, prompt, estimated_tokens=500):
await limiter.acquire(estimated_tokens)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
actual_tokens = response.usage.total_tokens
limiter.update_actual_tokens(estimated_tokens, actual_tokens)
return response
===== Strategy 4: Token Budgeting =====
Allocate token budgets per user, per task, or per time window to prevent any single consumer from exhausting shared limits.
import time
from collections import defaultdict
class TokenBudgetManager:
"""Allocate and track token budgets across users and tasks."""
def __init__(self, global_daily_budget: int = 10_000_000):
self.global_daily_budget = global_daily_budget
self.usage = defaultdict(lambda: {"tokens": 0, "requests": 0, "reset_at": 0})
self.global_usage = {"tokens": 0, "reset_at": 0}
def _reset_if_needed(self, record: dict):
if time.time() > record["reset_at"]:
record["tokens"] = 0
record["requests"] = 0
record["reset_at"] = time.time() + 86400 # 24h window
def check_budget(self, user_id: str, estimated_tokens: int,
user_daily_limit: int = 500_000) -> dict:
"""Check if request is within budget before making API call."""
self._reset_if_needed(self.usage[user_id])
self._reset_if_needed(self.global_usage)
user = self.usage[user_id]
if user["tokens"] + estimated_tokens > user_daily_limit:
return {
"allowed": False,
"reason": f"User daily limit reached: {user['tokens']:,}/{user_daily_limit:,} tokens",
"resets_in": int(user["reset_at"] - time.time())
}
if self.global_usage["tokens"] + estimated_tokens > self.global_daily_budget:
return {
"allowed": False,
"reason": "Global daily budget exhausted",
"resets_in": int(self.global_usage["reset_at"] - time.time())
}
return {"allowed": True, "user_remaining": user_daily_limit - user["tokens"]}
def record_usage(self, user_id: str, tokens: int):
self.usage[user_id]["tokens"] += tokens
self.usage[user_id]["requests"] += 1
self.global_usage["tokens"] += tokens
===== Strategy 5: Prompt Caching =====
Anthropic's prompt caching can reduce effective token consumption by up to 90% for repeated prefixes (system prompts, tool definitions, few-shot examples). Cached tokens cost 10% of normal input tokens and do NOT count toward ITPM rate limits.
import anthropic
client = anthropic.Anthropic()
# The system prompt and tools are cached after first request
# Subsequent requests with same prefix use cached tokens
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are an expert assistant...", # Long system prompt
"cache_control": {"type": "ephemeral"} # Cache this block
}
],
messages=[{"role": "user", "content": "New question here"}]
)
# Check cache performance
print(f"Input tokens: {response.usage.input_tokens}")
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
# cache_read tokens cost 0.1x normal price and don't count toward ITPM
===== Quick Reference: What to Do When Rate Limited =====
^ Situation ^ Immediate Fix ^ Long-term Fix ^
| Hitting RPM limit | Add 100-200ms delay between requests | Upgrade tier or add provider fallback |
| Hitting TPM limit | Shorten prompts, compress context | Use prompt caching, switch to smaller model |
| Hitting RPD limit | Wait for daily reset | Upgrade tier, implement token budgeting |
| Burst traffic spikes | Queue requests with rate limiter | Pre-compute during off-peak, add caching layer |
| Multiple users competing | Per-user rate limiting | Token budget allocation per user/team |
| All providers rate limited | Wait with exponential backoff | Add more providers, pre-purchase reserved capacity |
===== See Also =====
* [[common_agent_failure_modes|Common Agent Failure Modes]]
* [[why_is_my_agent_hallucinating|Why Is My Agent Hallucinating?]]
* [[why_is_my_rag_returning_bad_results|Why Is My RAG Returning Bad Results?]]
===== References =====