每个 AI API 都有速率限制 (rate limits)。在开发阶段遇到它们很烦人。在生产环境中遇到它们,用户就会看到错误、不完整的流 (streams) 以及看起来随机的超时,直到你检查出其中的规律。
关键的错误在于将速率限制视为单一问题。它通常是隐藏在同一个 429 错误背后的四个不同问题:
- requests per minute (每分钟请求数)
- tokens per minute (每分钟 token 数)
- concurrent in-flight requests (并发处理中的请求数)
- account-level or project-level quota exhaustion (账户级或项目级配额耗尽)
如果你只针对其中一个问题进行构建,其他问题仍然会困扰你。
如果你仍处于供应商迁移阶段,请先阅读迁移指南。如果你正在评估网关是否有助于处理回退 (fallback) 和运营开销,OpenRouter 对比是最佳的配套阅读材料。
速率限制的真实含义
请求限制 (Request limits)
这是显而易见的一个。你在短时间内发送了太多请求。
Token 限制 (Token limits)
这是团队容易低估的一个。一个长 prompt 消耗的预算可能与许多小请求相当。如果你突然添加了一个 20 KB 的 system prompt,请求计数可能看起来很正常,但 token 预算可能已经耗尽了。
并发限制 (Concurrency limits)
一些供应商和网关对你的每分钟平均值非常满意,直到你同时开启了五十个流。速率计划没问题,但突发流量的形状 (burst shape) 不行。
配额或余额耗尽 (Quota or balance exhaustion)
这在仪表盘中通常表现为“速率限制”症状,因为操作结果是一样的:调用停止成功。但补救措施不同。如果问题是余额为零,退避 (backoff) 是没用的。
供应商通常如何执行限制
具体的数值会随时间变化,这就是为什么在应用程序文档中硬编码公共价格表式的图表会很快过时。稳定的模式如下:
- OpenAI 风格的供应商通常会暴露请求和 token 的 header,并根据账户历史或使用层级调整你的上限。
- Anthropic 风格的供应商通常会强制执行分钟级吞吐量和更广泛的项目限制,特别是在高端模型上。
- Google 风格的供应商通常会将免费层级与付费层级的行为分开,并且限制可能因模型系列而异。
- 聚合器在下游约束之上又增加了一层限制,但作为回报,当一个下游暂时饱和时,它们可以路由到其他通道。
将供应商限制视为动态配置,而非固定常量。
读取速率限制 Header
所有主要供应商都会在响应 header 中返回速率限制信息:
x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500
主动使用这些 header。不要等到出现 429 错误才减速。
你需要的运营习惯很简单:
- 在成功时记录 header,而不只是在失败时。
- 当剩余容量低于阈值时发出告警。
- 在下一个请求越线之前调整流量。
如果你只在失败后才查看 header,那么你已经落后了。
构建重试逻辑
错误的方式
# 不要这样做
import time
def call_api(messages):
while True:
try:
return client.chat.completions.create(
model="gpt-4.1",
messages=messages
)
except Exception:
time.sleep(1) # 固定延迟,没有退避,捕获所有错误
问题:没有指数退避 (exponential backoff),捕获了不可重试的错误,没有最大重试限制,没有抖动 (jitter)。
正确的方式
import time
import random
from openai import RateLimitError, APIError, APIConnectionError
def call_with_retry(messages, model="gpt-4.1", max_retries=3):
"""使用指数退避和抖动进行重试。"""
for attempt in range(max_retries + 1):
try:
return client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries:
raise
# 如果响应中提供了 retry_after,则使用它
wait = getattr(e, 'retry_after', None)
if wait is None:
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
time.sleep(wait)
except APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
except APIError as e:
# 不要重试客户端错误 (400, 401, 403)
if e.status_code and 400 <= e.status_code < 500:
raise
if attempt == max_retries:
raise
time.sleep((2 ** attempt) + random.uniform(0, 1))
关键原则:
- 指数退避 (Exponential backoff):1s, 2s, 4s, 8s
- 抖动 (Jitter):随机增加 0-1s 以防止惊群效应 (thundering herd)
- 如果提供了
retry_afterheader,请遵守它 - 不要重试客户端错误(错误请求、认证失败)
- 设置最大重试次数
另外两条生产环境规则很重要:
- 永远不要在流式 (streaming) 端点上无限重试
- 除非操作是幂等的 (idempotent),否则永远不要重试已经与用户可见的副作用挂钩的请求
Chat completions 通常可以安全重试。工具触发的副作用通常不行。
异步版本
import asyncio
import random
from openai import AsyncOpenAI, RateLimitError
async_client = AsyncOpenAI(
api_key="sk-lemon-xxx",
base_url="https://api.lemondata.cc/v1"
)
async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
for attempt in range(max_retries + 1):
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
在流量演变成重试风暴前进行整形
重试逻辑只是解决方案的一半。如果你的上游已经过载,重试可能会将一次突发流量演变成一场自我造成的宕机。
三种控制措施可以起到关键作用:
1. 按租户或用户排队
如果一个客户开始大规模的批处理作业,你不希望每个其他客户都受到波及。
2. 限制并发流
流式端点很容易被低估,因为每个请求在长时间开启时看起来都很“便宜”。
3. 在发送前修剪 prompt
Token 限制通常是真正的天花板。长度增加一倍的 prompt 会使安全吞吐量大约减半。
客户端令牌桶 (Token Bucket)
对于高吞吐量应用,实施客户端速率限制以避免触及服务器限制:
import time
import asyncio
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒 token 数
self.capacity = capacity # 最大突发容量
self.tokens = capacity
self.last_refill = time.monotonic()
async def acquire(self, tokens: int = 1):
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return
# 等待足够的 token
wait = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait)
# 每分钟 500 次请求 = 约每秒 8.3 次
limiter = TokenBucket(rate=8.0, capacity=20)
async def rate_limited_call(messages, model="gpt-4.1"):
await limiter.acquire()
return await async_client.chat.completions.create(
model=model,
messages=messages
)
当你知道自己的上限时,令牌桶 (Token buckets) 很有用。当你根据观察到的 header 数据而不是硬编码的猜测来调整它们时,效果会更好。
速率限制时的模型回退 (Fallback)
当你的主模型受到速率限制时,回退到备选方案:
FALLBACK_CHAIN = [
"claude-sonnet-4-6",
"gpt-4.1",
"gpt-4.1-mini",
]
async def call_with_fallback(messages):
for model in FALLBACK_CHAIN:
try:
return await async_client.chat.completions.create(
model=model,
messages=messages
)
except RateLimitError:
continue
raise Exception("All models rate limited")
这就是模型网关发挥作用的地方,但前提是回退必须是经过深思熟虑的。不要在不考虑用户影响的情况下,悄悄地从高级推理模型跳转到微型预算模型。
一个合理的 fallback 链是:
- 同一供应商,较小的同系列模型
- 来自另一供应商的同等模型系列
- 最后才是更便宜或低上下文的模型
如果你将“可用性回退”与“成本回退”混为一谈,调试会很快变得一团糟。
监控速率限制使用情况
跟踪你的速率限制消耗情况,以便在影响用户之前发现问题:
import logging
def log_rate_limits(response):
headers = response.headers
remaining = headers.get("x-ratelimit-remaining-requests")
limit = headers.get("x-ratelimit-limit-requests")
if remaining and int(remaining) < int(limit) * 0.1:
logging.warning(
f"Rate limit warning: {remaining}/{limit} requests remaining"
)
当剩余容量降至 10% 以下时设置告警。这让你有时间在用户看到 429 错误之前实施限流 (throttling)。
你还应该记录:
- 请求 ID (request ID)
- 模型 (model)
- 输入大小估算 (input size estimate)
- 流持续时间 (stream duration)
- 重试次数 (retry count)
- 最终结果 (
success,rate_limited,network_error,quota_exhausted)
如果没有这些字段,速率限制事件就会变成瞎猜。
简单的生产环境检查清单
在称你的聊天机器人或智能体为“速率限制安全”之前,请验证这五项:
- 同步和异步路径都存在有界的重试策略。
- 在成功的响应中记录速率限制 header。
- 在上游调用之前存在按用户或按租户的流量整形。
- 至少存在一个经过验证的 fallback 模型。
- 前端获得清晰的错误状态,而不是挂起的流。
如果你正在构建完整的应用程序而不仅仅是重试原语,单 API Key 聊天机器人指南展示了这些部分如何融入真实的 FastAPI 服务。
总结
速率限制不是边缘情况。它是任何具有真实用途的 AI 产品的正常运行状态。处理得好的团队并没有神奇的更高限制。他们从一开始就将吞吐量、重试和回退视为应用程序设计的一部分。
在 LemonData 创建 API key,在生产流量到来之前测试你的重试路径,并在下一个 429 到来之前做好准备。
| 策略 | 适用场景 |
|---|---|
| 指数退避 (Exponential backoff) | 始终使用(基准) |
| 客户端速率限制器 | 高吞吐量应用 (>100 RPM) |
| 模型回退 (Fallback) | 有 SLA 要求的生产应用 |
| 主动监控 | 任何生产部署 |
| Batch API | 非实时工作负载 |
目标不是完全避免速率限制,而是优雅地处理它们,让你的用户永远察觉不到。
构建韧性 AI 应用:lemondata.cc 提供多通道路由,自动处理上游速率限制。一个 API key,300+ 模型。
