每個 AI API 都有速率限制 (rate limits)。在開發階段遇到很煩人,但在正式環境 (production) 遇到,使用者會看到錯誤、不完整的串流 (streams) 和逾時 (timeouts),在檢查模式之前,這些看起來就像隨機發生的問題。
關鍵的錯誤在於將速率限制視為單一問題。它通常是隱藏在同一個 429 錯誤背後的四種不同問題:
- requests per minute (每分鐘請求數)
- tokens per minute (每分鐘 token 數)
- concurrent in-flight requests (並行處理中的請求數)
- account-level or project-level quota exhaustion (帳戶級別或專案級別的配額耗盡)
如果您只針對其中一個問題進行建構,其他的問題仍然會困擾您。
如果您仍處於供應商遷移階段,請先閱讀遷移指南。如果您正在評估 gateway 是否有助於 fallback 和營運開銷,OpenRouter 比較是最佳的參考閱讀。
速率限制的真實含義
Request 限制
這是最顯而易見的一個。您在短時間內發送了過多請求。
Token 限制
這是團隊最常低估的一個。單個長 prompt 可能會消耗掉與許多小請求相同的預算。如果您突然增加了一個 20 KB 的 system prompt,請求計數可能看起來很正常,但 token 預算可能已經用完。
Concurrency 限制
某些供應商和 gateway 對您的每分鐘平均值非常滿意,直到您同時開啟五十個串流。速率方案沒問題,但爆發式的流量形狀 (burst shape) 有問題。
Quota 或餘額耗盡
這在儀表板中通常表現為「速率限制」症狀,因為營運結果是一樣的:呼叫停止成功。但補救措施不同。如果問題是零餘額,退避 (backoff) 是沒用的。
供應商通常如何執行限制
確切的數字會隨時間變化,這就是為什麼將公開價格表風格的圖表硬編碼到您的應用程式文件中會過時得很快。穩定的模式如下:
- OpenAI 風格的供應商通常會暴露請求和 token 的 headers,並根據帳戶歷史記錄或使用層級 (usage tier) 調整您的上限。
- Anthropic 風格的供應商通常會強制執行分鐘級吞吐量和更廣泛的專案限制,特別是在高端模型上。
- Google 風格的供應商通常將免費層級行為與付費層級行為分開,並且限制可能因模型系列而有劇烈差異。
- Aggregators (聚合器) 在上游限制之上增加了一層限制,但作為回報,當一個上游暫時飽和時,它們可以路由到其他管道。
將供應商限制視為動態配置,而非固定常數。
讀取速率限制 Headers
所有主要供應商都會在響應 headers 中返回速率限制資訊:
x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500
主動使用這些 headers。不要等到出現 429 錯誤才減速。
您想要的營運習慣很簡單:
- 在成功時記錄 headers,而不僅僅是在失敗時。
- 當剩餘容量低於閾值時發出告警。
- 在下一個請求越界之前調整流量。
如果您只在失敗後才查看 headers,那麼您已經落後了。
建構重試邏輯 (Retry Logic)
錯誤的做法
# 不要這樣做
import time
def call_api(messages):
while True:
try:
return client.chat.completions.create(
model="gpt-4.1",
messages=messages
)
except Exception:
time.sleep(1) # 固定延遲,沒有 backoff,捕獲所有錯誤
問題:沒有指數退避 (exponential backoff)、捕獲了不可重試的錯誤、沒有最大重試限制、沒有抖動 (jitter)。
正確的做法
import time
import random
from openai import RateLimitError, APIError, APIConnectionError
def call_with_retry(messages, model="gpt-4.1", max_retries=3):
"""使用指數退避和抖動進行重試。"""
for attempt in range(max_retries + 1):
try:
return client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries:
raise
# 如果可用,使用響應中的 retry_after
wait = getattr(e, 'retry_after', None)
if wait is None:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
time.sleep(wait)
except APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
except APIError as e:
# 不要重試客戶端錯誤 (400, 401, 403)
if e.status_code and 400 <= e.status_code < 500:
raise
if attempt == max_retries:
raise
time.sleep((2 ** attempt) + random.uniform(0, 1))
關鍵原則:
- 指數退避 (Exponential backoff):1s, 2s, 4s, 8s
- 抖動 (Jitter):隨機增加 0-1s 以防止驚群效應 (thundering herd)
- 尊重
retry_afterheader (如果提供) - 不要重試客戶端錯誤 (bad request, auth failure)
- 設置最大重試次數
兩條額外的正式環境規則很重要:
- 永遠不要在 streaming 端點上無限重試
- 除非操作是等冪的 (idempotent),否則永遠不要重試已經與使用者可見的副作用相關聯的請求
Chat completions 通常可以安全重試。由 tool 觸發的副作用通常則不行。
Async 版本
import asyncio
import random
from openai import AsyncOpenAI, RateLimitError
async_client = AsyncOpenAI(
api_key="sk-lemon-xxx",
base_url="https://api.lemondata.cc/v1"
)
async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
for attempt in range(max_retries + 1):
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
在演變成重試風暴前調整流量
重試邏輯只是解決方案的一半。如果您的上游已經過載,重試可能會將一次爆發變成一場自我造成的停機。
三個控制措施可以帶來改變:
1. 按租戶 (tenant) 或使用者排隊
如果一個客戶開始大規模的批次作業,您不希望每個其他客戶都受到波及。
2. 限制並行串流 (concurrent streams)
Streaming 端點很容易被低估,因為每個請求在保持開啟的很長時間內看起來都很「便宜」。
3. 在發送前修剪 prompt
Token 限制通常是真正的上限。長度增加一倍的 prompt 大約會使安全吞吐量減半。
客戶端 Token Bucket
對於高吞吐量的應用程式,請實作客戶端速率限制以避免觸及伺服器限制:
import time
import asyncio
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒 token 數
self.capacity = capacity # 最大爆發大小
self.tokens = capacity
self.last_refill = time.monotonic()
async def acquire(self, tokens: int = 1):
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# 等待足夠的 token
wait = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait)
# 每分鐘 500 個請求 = 每秒約 8.3 個
limiter = TokenBucket(rate=8.0, capacity=20)
async def rate_limited_call(messages, model="gpt-4.1"):
await limiter.acquire()
return await async_client.chat.completions.create(
model=model,
messages=messages
)
當您知道上限時,Token buckets 很有用。當您根據觀察到的 header 數據而不是硬編碼的猜測來調整它們時,效果會更好。
速率限制時的模型 Fallback
當您的主要模型受到速率限制時,請切換到備選方案:
FALLBACK_CHAIN = [
"claude-sonnet-4-6",
"gpt-4.1",
"gpt-4.1-mini",
]
async def call_with_fallback(messages):
for model in FALLBACK_CHAIN:
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
continue
raise Exception("All models rate limited")
這就是模型 gateway 提供幫助的地方,但前提是 fallback 是經過深思熟慮的。不要在不考慮使用者影響的情況下,默默地從高級推理模型跳轉到微型預算模型。
一個合理的 fallback 鏈是:
- 同一供應商的較小兄弟模型
- 來自另一供應商的等效模型系列
- 最後才是更便宜或低上下文 (low-context) 的模型
如果您在一個步驟中混合了「可用性 fallback」和「成本 fallback」,除錯會很快變得混亂。
監控速率限制使用情況
追蹤您的速率限制消耗,以便在影響使用者之前發現問題:
import logging
def log_rate_limits(response):
headers = response.headers
remaining = headers.get("x-ratelimit-remaining-requests")
limit = headers.get("x-ratelimit-limit-requests")
if remaining and int(remaining) < int(limit) * 0.1:
logging.warning(
f"Rate limit warning: {remaining}/{limit} requests remaining"
)
當剩餘容量降至 10% 以下時設置告警。這讓您有時間在使用者看到 429 錯誤之前實作流量調節 (throttling)。
您還應該記錄:
- request ID
- model
- 輸入大小估計
- 串流持續時間
- 重試次數
- 最終結果 (
success,rate_limited,network_error,quota_exhausted)
沒有這些欄位,速率限制事件就會變成瞎猜。
簡單的正式環境檢查清單
在稱您的聊天機器人或代理為「速率限制安全」之前,請驗證這五項:
- 針對同步和非同步路徑都存在有界的重試策略。
- 您在成功的響應中記錄了速率限制 headers。
- 在上游呼叫之前存在針對每個使用者或每個租戶的流量調整。
- 至少存在一個經過驗證的 fallback 模型。
- 前端獲得清晰的錯誤狀態,而不是掛起的串流。
如果您正在建構完整的應用程式而不僅僅是重試原語,單金鑰聊天機器人指南展示了這些組件如何整合到真實的 FastAPI 服務中。
總結
速率限制不是極端情況。它是任何具有實際使用量的 AI 產品的正常運行狀況。處理得好的團隊並沒有神奇的更高限制。他們從一開始就將吞吐量、重試和 fallback 視為應用程式設計的一部分。
在 LemonData 建立 API key,在正式流量進入前測試您的重試路徑,並在下一個 429 到來之前做好準備。
| 策略 | 何時使用 |
|---|---|
| 指數退避 (Exponential backoff) | 始終使用 (基準) |
| 客戶端速率限制器 | 高吞吐量應用程式 (>100 RPM) |
| 模型 fallback | 有 SLA 要求的正式環境應用程式 |
| 主動監控 | 任何正式環境部署 |
| Batch API | 非即時工作負載 |
目標不是完全避免速率限制,而是優雅地處理它們,讓您的使用者永遠不會察覺。
建構具備韌性的 AI 應用程式:lemondata.cc 提供多管道路由,自動處理上游速率限制。一個 API key,支援 300 多個模型。
