Core Concepts
Reasoning
Memory & Retrieval
Agent Types
Design Patterns
Training & Alignment
Frameworks
Tools
Safety & Security
Evaluation
Meta
Core Concepts
Reasoning
Memory & Retrieval
Agent Types
Design Patterns
Training & Alignment
Frameworks
Tools
Safety & Security
Evaluation
Meta
A practical guide to handling API rate limits across all major LLM providers. Includes real rate limit values, retry strategies, multi-provider fallback chains, and production-ready code1)2).
Every LLM provider enforces rate limits to prevent abuse, ensure fair access, and manage infrastructure load. Rate limits are measured across multiple dimensions:3)
Exceeding any single dimension triggers an HTTP 429 (Too Many Requests) error.
| Tier | Requirement | GPT-4o RPM | GPT-4o TPM | GPT-4o-mini RPM | GPT-4o-mini TPM |
|---|---|---|---|---|---|
| Free | Email verify | 3 | 40,000 | 3 | 40,000 |
| Tier 1 | $5 paid | 500 | 30,000 | 500 | 200,000 |
| Tier 2 | $50 paid + 7 days | 5,000 | 450,000 | 5,000 | 2,000,000 |
| Tier 3 | $100 paid + 7 days | 5,000 | 800,000 | 5,000 | 4,000,000 |
| Tier 4 | $250 paid + 14 days | 10,000 | 2,000,000 | 10,000 | 10,000,000 |
| Tier 5 | $1,000 paid + 30 days | 10,000 | 12,000,000 | 10,000 | 15,000,000 |
| Tier | Requirement | Claude Sonnet RPM | Claude Sonnet ITPM | Claude Sonnet OTPM |
|---|---|---|---|---|
| Tier 1 | $5 deposit | 50 | 20,000 | 4,000 |
| Tier 2 | $40 spent | 1,000 | 100,000 | 20,000 |
| Tier 3 | $200 spent | 2,000 | 200,000 | 40,000 |
| Tier 4 | $400 spent | 4,000 | 400,000 | 80,000 |
Anthropic uses a token bucket algorithm — capacity replenishes continuously rather than resetting at fixed intervals. Cached tokens from prompt caching do NOT count toward ITPM limits, potentially 5-10x effective throughput4).
| Tier | Requirement | Gemini 2.0 Flash RPM | Gemini 2.0 Flash TPM | Gemini 1.5 Pro RPM |
|---|---|---|---|---|
| Free | No card needed | 10 | 250,000 | 5 |
| Tier 1 | Enable billing | 200 | 1,000,000 | 150 |
| Tier 2 | $250 spent + 30 days | 1,000 | 2,000,000 | 500 |
| Tier 3 | $1,000 spent | 2,000+ | Custom | 1,000+ |
Important: December 2025 brought 50-92% reductions to free tier quotas. Flash model dropped from 250 to 20 RPD.
| Provider | Free Tier RPM | Paid RPM | Notes |
|---|---|---|---|
| Groq | 30 | 300+ | Extremely fast inference, generous for open models |
| Mistral | 5 | 500+ | Tiered by plan (Experiment/Production) |
| Together AI | 60 | 600+ | Focus on open-source models |
| Fireworks AI | 100 | 600+ | Optimized for throughput |
The standard approach. On 429, wait progressively longer with random jitter to avoid thundering herd.
import time import random import httpx from typing import Any class RetryWithBackoff: """Retry API calls with exponential backoff and jitter.""" def __init__(self, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0): self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay def execute(self, func, *args, **kwargs) -> Any: last_exception = None for attempt in range(self.max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception = e error_code = getattr(e, 'status_code', None) # Only retry on rate limit (429) or server errors (5xx) if error_code and error_code not in (429, 500, 502, 503, 504): raise # Don't retry client errors like 400, 401, 403 # Calculate delay: exponential backoff + random jitter delay = min(self.base_delay * (2 ** attempt), self.max_delay) jitter = random.uniform(0, delay * 0.5) total_delay = delay + jitter # Check for Retry-After header retry_after = getattr(e, 'headers', {}).get('retry-after') if retry_after: total_delay = max(total_delay, float(retry_after)) print(f"Rate limited (attempt {attempt + 1}/{self.max_retries}). " f"Waiting {total_delay:.1f}s...") time.sleep(total_delay) raise last_exception # Usage with OpenAI from openai import OpenAI client = OpenAI() retryer = RetryWithBackoff(max_retries=5, base_delay=1.0) response = retryer.execute( client.chat.completions.create, model="gpt-4o", messages=[{"role": "user", "content": "Hello"}], )
When one provider is rate-limited, automatically fall through to the next. This is the most effective strategy for high-throughput applications.
import os from dataclasses import dataclass, field from typing import Optional import openai import anthropic @dataclass class ProviderConfig: name: str model: str client: Any priority: int # Lower = preferred cost_per_1k_input: float cost_per_1k_output: float failures: int = 0 last_failure: float = 0.0 class MultiProviderFallback: """Route requests across multiple LLM providers with automatic fallback.""" def __init__(self): self.providers: list[ProviderConfig] = [] self.backoff = RetryWithBackoff(max_retries=2, base_delay=0.5) def add_openai(self, model: str = "gpt-4o", priority: int = 1): self.providers.append(ProviderConfig( name="openai", model=model, client=openai.OpenAI(), priority=priority, cost_per_1k_input=0.0025, cost_per_1k_output=0.01 )) def add_anthropic(self, model: str = "claude-sonnet-4-20250514", priority: int = 2): self.providers.append(ProviderConfig( name="anthropic", model=model, client=anthropic.Anthropic(), priority=priority, cost_per_1k_input=0.003, cost_per_1k_output=0.015 )) def add_google(self, model: str = "gemini-2.0-flash", priority: int = 3): import google.generativeai as genai genai.configure(api_key=os.environ.get("GOOGLE_API_KEY")) self.providers.append(ProviderConfig( name="google", model=model, client=genai, priority=priority, cost_per_1k_input=0.0001, cost_per_1k_output=0.0004 )) def chat(self, messages: list[dict], **kwargs) -> dict: """Send request with automatic provider fallback.""" sorted_providers = sorted(self.providers, key=lambda p: (p.failures, p.priority)) errors = [] for provider in sorted_providers: try: result = self._call_provider(provider, messages, **kwargs) provider.failures = max(0, provider.failures - 1) # Recover on success return {"provider": provider.name, "model": provider.model, "content": result} except Exception as e: provider.failures += 1 provider.last_failure = __import__('time').time() errors.append(f"{provider.name}: {e}") continue raise RuntimeError(f"All providers failed: {'; '.join(errors)}") def _call_provider(self, provider: ProviderConfig, messages: list, **kwargs) -> str: if provider.name == "openai": resp = provider.client.chat.completions.create( model=provider.model, messages=messages, **kwargs ) return resp.choices[0].message.content elif provider.name == "anthropic": # Convert OpenAI format to Anthropic format system = next((m["content"] for m in messages if m["role"] == "system"), "") user_msgs = [m for m in messages if m["role"] != "system"] resp = provider.client.messages.create( model=provider.model, system=system, messages=user_msgs, max_tokens=kwargs.get("max_tokens", 4096) ) return resp.content[0].text elif provider.name == "google": model = provider.client.GenerativeModel(provider.model) prompt = "\n".join(m["content"] for m in messages) resp = model.generate_content(prompt) return resp.text # Usage fallback = MultiProviderFallback() fallback.add_openai(priority=1) # Preferred fallback.add_anthropic(priority=2) # First fallback fallback.add_google(priority=3) # Budget fallback((Google Gemini API Rate Limits — [[https://ai.google.dev/pricing]]))((AI Free API, "Gemini API Rate Limits 2026," 2026 — [[https://blog.laozhang.ai/en/posts/gemini-api-rate-limits-guide]])) result = fallback.chat([ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Explain rate limiting."} ]) print(f"Answered by {result['provider']} ({result['model']})")
Buffer requests and release them at a controlled rate to stay under limits.
import asyncio import time from collections import deque class TokenBucketRateLimiter: """Token bucket rate limiter matching how providers enforce limits.""" def __init__(self, rpm: int = 500, tpm: int = 200_000): self.rpm = rpm self.tpm = tpm self.request_timestamps = deque() self.token_usage = deque() # (timestamp, tokens) def _cleanup(self, window: deque, seconds: int = 60): cutoff = time.time() - seconds while window and window[0] < cutoff: window.popleft() def _cleanup_tokens(self, seconds: int = 60): cutoff = time.time() - seconds while self.token_usage and self.token_usage[0][0] < cutoff: self.token_usage.popleft() async def acquire(self, estimated_tokens: int = 1000): """Wait until we can make a request within rate limits.""" while True: now = time.time() self._cleanup(self.request_timestamps) self._cleanup_tokens() current_rpm = len(self.request_timestamps) current_tpm = sum(t for _, t in self.token_usage) if current_rpm < self.rpm and (current_tpm + estimated_tokens) < self.tpm: self.request_timestamps.append(now) self.token_usage.append((now, estimated_tokens)) return # Proceed with request # Calculate wait time if current_rpm >= self.rpm: wait = 60 - (now - self.request_timestamps[0]) else: wait = 60 - (now - self.token_usage[0][0]) await asyncio.sleep(max(wait, 0.1)) def update_actual_tokens(self, estimated: int, actual: int): """Correct token count after receiving response.""" # Find and update the most recent matching estimate for i in range(len(self.token_usage) - 1, -1, -1): if self.token_usage[i][1] == estimated: ts = self.token_usage[i][0] self.token_usage[i] = (ts, actual) break # Usage limiter = TokenBucketRateLimiter(rpm=500, tpm=200_000) async def make_request(client, prompt, estimated_tokens=500): await limiter.acquire(estimated_tokens) response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) actual_tokens = response.usage.total_tokens limiter.update_actual_tokens(estimated_tokens, actual_tokens) return response
Allocate token budgets per user, per task, or per time window to prevent any single consumer from exhausting shared limits.
import time from collections import defaultdict class TokenBudgetManager: """Allocate and track token budgets across users and tasks.""" def __init__(self, global_daily_budget: int = 10_000_000): self.global_daily_budget = global_daily_budget self.usage = defaultdict(lambda: {"tokens": 0, "requests": 0, "reset_at": 0}) self.global_usage = {"tokens": 0, "reset_at": 0} def _reset_if_needed(self, record: dict): if time.time() > record["reset_at"]: record["tokens"] = 0 record["requests"] = 0 record["reset_at"] = time.time() + 86400 # 24h window def check_budget(self, user_id: str, estimated_tokens: int, user_daily_limit: int = 500_000) -> dict: """Check if request is within budget before making API call.""" self._reset_if_needed(self.usage[user_id]) self._reset_if_needed(self.global_usage) user = self.usage[user_id] if user["tokens"] + estimated_tokens > user_daily_limit: return { "allowed": False, "reason": f"User daily limit reached: {user['tokens']:,}/{user_daily_limit:,} tokens", "resets_in": int(user["reset_at"] - time.time()) } if self.global_usage["tokens"] + estimated_tokens > self.global_daily_budget: return { "allowed": False, "reason": "Global daily budget exhausted", "resets_in": int(self.global_usage["reset_at"] - time.time()) } return {"allowed": True, "user_remaining": user_daily_limit - user["tokens"]} def record_usage(self, user_id: str, tokens: int): self.usage[user_id]["tokens"] += tokens self.usage[user_id]["requests"] += 1 self.global_usage["tokens"] += tokens
Anthropic's prompt caching can reduce effective token consumption by up to 90% for repeated prefixes (system prompts, tool definitions, few-shot examples). Cached tokens cost 10% of normal input tokens and do NOT count toward ITPM rate limits.
import anthropic client = anthropic.Anthropic() # The system prompt and tools are cached after first request # Subsequent requests with same prefix use cached tokens response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, system=[ { "type": "text", "text": "You are an expert assistant...", # Long system prompt "cache_control": {"type": "ephemeral"} # Cache this block } ], messages=[{"role": "user", "content": "New question here"}] ) # Check cache performance print(f"Input tokens: {response.usage.input_tokens}") print(f"Cache read tokens: {response.usage.cache_read_input_tokens}") print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}") # cache_read tokens cost 0.1x normal price and don't count toward ITPM
| Situation | Immediate Fix | Long-term Fix |
|---|---|---|
| Hitting RPM limit | Add 100-200ms delay between requests | Upgrade tier or add provider fallback |
| Hitting TPM limit | Shorten prompts, compress context | Use prompt caching, switch to smaller model |
| Hitting RPD limit | Wait for daily reset | Upgrade tier, implement token budgeting |
| Burst traffic spikes | Queue requests with rate limiter | Pre-compute during off-peak, add caching layer |
| Multiple users competing | Per-user rate limiting | Token budget allocation per user/team |
| All providers rate limited | Wait with exponential backoff | Add more providers, pre-purchase reserved capacity |