AI API 速率限制:工作原理及应对方法
每个 AI API 都有速率限制。在开发阶段触及限制会很烦人,在生产环境触及限制则会让用户看到错误。理解各供应商的速率限制机制并构建合适的重试逻辑,是区分演示和生产应用的关键。
各供应商的限制方式
OpenAI
OpenAI 根据账户的使用历史和付费等级,采用分层速率限制。
| 等级 | RPM(请求数/分钟) | TPM(令牌数/分钟) | 获取方式 |
|---|---|---|---|
| 免费 | 3 | 40,000 | 新账户 |
| 一级 | 500 | 200,000 | 支付 $5 |
| 二级 | 5,000 | 2,000,000 | 支付 $50 |
| 三级 | 5,000 | 10,000,000 | 支付 $100 |
| 四级 | 10,000 | 50,000,000 | 支付 $250 |
| 五级 | 10,000 | 300,000,000 | 支付 $1,000 |
限制是按模型计算的。使用 GPT-4.1 不会消耗 GPT-4.1-mini 的配额。
Anthropic
Anthropic 采用类似的分层系统,限制请求数和令牌数。同时,低等级账户还会有每日令牌限制。
Google(Gemini)
Google AI Studio 对每个模型有独立限制。免费层对每日请求数较宽松,但对每分钟请求数限制较紧(Gemini 2.5 Flash 免费层为 15 RPM)。
聚合平台
像 LemonData 和 OpenRouter 这样的聚合平台,在上游限制基础上增加了自己的速率限制层。LemonData 使用基于角色的限制:
| 角色 | RPM |
|---|---|
| 用户 | 1,000 |
| 合作伙伴 | 3,000 |
| VIP | 10,000 |
优势在于:聚合平台的限制通常高于单个供应商的免费层,且多渠道路由意味着如果一个上游通道被限流,请求会自动切换到另一个通道。
读取速率限制响应头
所有主要供应商都会在响应头中返回速率限制信息:
x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500
要主动使用这些响应头,不要等到出现 429 错误才减速。
构建重试逻辑
错误示范
# 不要这样做
import time
def call_api(messages):
while True:
try:
return client.chat.completions.create(
model="gpt-4.1",
messages=messages
)
except Exception:
time.sleep(1) # 固定延迟,无退避,捕获所有异常
问题:没有指数退避,捕获了不可重试的错误,没有最大重试次数限制,无抖动。
正确示范
import time
import random
from openai import RateLimitError, APIError, APIConnectionError
def call_with_retry(messages, model="gpt-4.1", max_retries=3):
"""使用指数退避和抖动进行重试。"""
for attempt in range(max_retries + 1):
try:
return client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries:
raise
# 如果响应中有 retry_after,则使用它
wait = getattr(e, 'retry_after', None)
if wait is None:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"速率限制,等待 {wait:.1f} 秒(第 {attempt + 1} 次尝试)")
time.sleep(wait)
except APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
except APIError as e:
# 不重试客户端错误(400,401,403)
if e.status_code and 400 <= e.status_code < 500:
raise
if attempt == max_retries:
raise
time.sleep((2 ** attempt) + random.uniform(0, 1))
关键原则:
- 指数退避:1秒,2秒,4秒,8秒
- 抖动:随机增加 0-1 秒,防止“群体效应”
- 优先使用
retry_after头信息 - 不重试客户端错误(请求错误、认证失败)
- 设置最大重试次数
异步版本
import asyncio
import random
from openai import AsyncOpenAI, RateLimitError
async_client = AsyncOpenAI(
api_key="sk-lemon-xxx",
base_url="https://api.lemondata.cc/v1"
)
async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
for attempt in range(max_retries + 1):
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
高级:令牌桶速率限制器
对于高吞吐量应用,建议实现客户端速率限制,避免触及服务器限制:
import time
import asyncio
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒令牌数
self.capacity = capacity # 最大突发容量
self.tokens = capacity
self.last_refill = time.monotonic()
async def acquire(self, tokens: int = 1):
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# 等待足够的令牌
wait = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait)
# 500 请求/分钟 ≈ 每秒 8.3 次
limiter = TokenBucket(rate=8.0, capacity=20)
async def rate_limited_call(messages, model="gpt-4.1"):
await limiter.acquire()
return await async_client.chat.completions.create(
model=model,
messages=messages
)
速率限制时模型降级
当主模型被限流时,降级调用备用模型:
FALLBACK_CHAIN = [
"claude-sonnet-4-6",
"gpt-4.1",
"gpt-4.1-mini",
]
async def call_with_fallback(messages):
for model in FALLBACK_CHAIN:
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
continue
raise Exception("所有模型均被限流")
这正是 API 聚合器的优势所在。一个接口背后有 300+ 模型,始终有备用方案可用。
监控速率限制使用情况
跟踪速率限制的使用情况,提前发现问题,避免影响用户:
import logging
def log_rate_limits(response):
headers = response.headers
remaining = headers.get("x-ratelimit-remaining-requests")
limit = headers.get("x-ratelimit-limit-requests")
if remaining and int(remaining) < int(limit) * 0.1:
logging.warning(
f"速率限制警告:剩余请求数 {remaining}/{limit}"
)
当剩余容量低于 10% 时设置告警,这样你有时间实施限流,避免用户看到 429 错误。
总结
| 策略 | 适用场景 |
|---|---|
| 指数退避 | 所有场景(基础策略) |
| 客户端速率限制器 | 高吞吐量应用(>100 RPM) |
| 模型降级 | 有 SLA 要求的生产应用 |
| 主动监控 | 任何生产部署 |
| 批量 API | 非实时工作负载 |
目标不是完全避免速率限制,而是优雅地应对,让用户感受不到影响。
构建高可用 AI 应用:lemondata.cc 提供多渠道路由,自动处理上游速率限制。一个 API 密钥,300+ 模型。
