AI API 速率限制 (Rate Limiting):運作原理與處理方法
每個 AI API 都有速率限制。在開發階段遇到會令人煩惱,但在正式環境中遇到,使用者就會看到錯誤。了解不同供應商的速率限制運作方式並建立正確的重試邏輯 (retry logic),是展示專案與正式產品之間的關鍵差異。
各家供應商如何限制你
OpenAI
OpenAI 根據您的帳戶使用歷史和付費等級採用分層速率限制。
| 層級 (Tier) | RPM (每分鐘請求數) | TPM (每分鐘 Token 數) | 達成條件 |
|---|---|---|---|
| Free | 3 | 40,000 | 新帳戶 |
| Tier 1 | 500 | 200,000 | 已支付 $5 |
| Tier 2 | 5,000 | 2,000,000 | 已支付 $50 |
| Tier 3 | 5,000 | 10,000,000 | 已支付 $100 |
| Tier 4 | 10,000 | 50,000,000 | 已支付 $250 |
| Tier 5 | 10,000 | 300,000,000 | 已支付 $1,000 |
限制是針對每個模型的。使用 GPT-4.1 不會消耗您的 GPT-4.1-mini 配額。
Anthropic
Anthropic 使用類似的分層系統,同時具有 RPM 和 TPM 限制。他們還對較低層級實施每日 Token 限制。
Google (Gemini)
Google AI Studio 具有針對每個模型的限制。免費層級在每日請求量上很慷慨,但在每分鐘速率上較嚴格(Gemini 2.5 Flash 免費層級為 15 RPM)。
聚合平台 (Aggregator Platforms)
像 LemonData 和 OpenRouter 這樣的聚合平台在原廠限制之上增加了自己的速率限制層。LemonData 使用基於角色的限制:
| 角色 (Role) | RPM |
|---|---|
| User | 1,000 |
| Partner | 3,000 |
| VIP | 10,000 |
優勢:聚合平台的限制通常高於單個供應商的免費層級,且多通道路由意味著如果一個上游通道被限流,請求會自動路由到另一個通道。
讀取速率限制標頭 (Headers)
所有主要的供應商都會在回應標頭中返回速率限制資訊:
x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500
主動使用這些標頭。不要等到出現 429 錯誤才減速。
建立重試邏輯 (Retry Logic)
錯誤的做法
# 不要這樣做
import time
def call_api(messages):
while True:
try:
return client.chat.completions.create(
model="gpt-4.1",
messages=messages
)
except Exception:
time.sleep(1) # 固定延遲,無退避,捕獲所有錯誤
問題:沒有指數退避 (exponential backoff)、捕獲了不可重試的錯誤、沒有最大重試限制、沒有抖動 (jitter)。
正確的做法
import time
import random
from openai import RateLimitError, APIError, APIConnectionError
def call_with_retry(messages, model="gpt-4.1", max_retries=3):
"""使用指數退避和抖動進行重試。"""
for attempt in range(max_retries + 1):
try:
return client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries:
raise
# 如果可用,使用回應中的 retry_after
wait = getattr(e, 'retry_after', None)
if wait is None:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
time.sleep(wait)
except APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
except APIError as e:
# 不要重試客戶端錯誤 (400, 401, 403)
if e.status_code and 400 <= e.status_code < 500:
raise
if attempt == max_retries:
raise
time.sleep((2 ** attempt) + random.uniform(0, 1))
核心原則:
- 指數退避:1秒、2秒、4秒、8秒
- 抖動 (Jitter):增加隨機的 0-1 秒以防止驚群效應 (thundering herd)
- 尊重提供的
retry_after標頭 - 不要重試客戶端錯誤(錯誤請求、認證失敗)
- 設置最大重試次數
非同步版本 (Async Version)
import asyncio
import random
from openai import AsyncOpenAI, RateLimitError
async_client = AsyncOpenAI(
api_key="sk-lemon-xxx",
base_url="https://api.lemondata.cc/v1"
)
async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
for attempt in range(max_retries + 1):
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
進階:權杖桶 (Token Bucket) 速率限制器
對於高吞吐量的應用程式,請實作客戶端速率限制以避免觸發伺服器限制:
import time
import asyncio
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒 token 數
self.capacity = capacity # 最大突發容量
self.tokens = capacity
self.last_refill = time.monotonic()
async def acquire(self, tokens: int = 1):
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# 等待足夠的 token
wait = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait)
# 每分鐘 500 次請求 = 每秒約 8.3 次
limiter = TokenBucket(rate=8.0, capacity=20)
async def rate_limited_call(messages, model="gpt-4.1"):
await limiter.acquire()
return await async_client.chat.completions.create(
model=model,
messages=messages
)
速率限制時的模型備援 (Fallback)
當您的主要模型被限流時,回退到備選模型:
FALLBACK_CHAIN = [
"claude-sonnet-4-6",
"gpt-4.1",
"gpt-4.1-mini",
]
async def call_with_fallback(messages):
for model in FALLBACK_CHAIN:
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
continue
raise Exception("所有模型均已達到速率限制")
這正是 API 聚合平台大顯身手的地方。一個端點背後有 300 多個模型,您始終有備援可用。
監控速率限制使用情況
追蹤您的速率限制消耗,以便在影響使用者之前發現問題:
import logging
def log_rate_limits(response):
headers = response.headers
remaining = headers.get("x-ratelimit-remaining-requests")
limit = headers.get("x-ratelimit-limit-requests")
if remaining and int(remaining) < int(limit) * 0.1:
logging.warning(
f"速率限制警告:剩餘 {remaining}/{limit} 次請求"
)
當剩餘容量低於 10% 時設置警報。這讓您有時間在使用者看到 429 錯誤之前實施流量控制 (throttling)。
總結
| 策略 | 何時使用 |
|---|---|
| 指數退避 (Exponential backoff) | 始終使用(基礎配置) |
| 客戶端速率限制器 | 高吞吐量應用 (>100 RPM) |
| 模型備援 (Model fallback) | 有 SLA 要求的正式環境應用 |
| 主動監控 | 任何正式環境部署 |
| Batch API | 非即時性的工作負載 |
目標不是完全避免速率限制,而是優雅地處理它們,讓您的使用者永遠不會察覺。
構建具備韌性的 AI 應用程式:lemondata.cc 提供自動處理上游速率限制的多通道路由。一個 API 金鑰,300+ 個模型。
