设置

语言

AI API 速率限制:工作原理及应对方法

L
LemonData
·2026年2月26日·21 次浏览
#限流#生产环境#错误处理#教程#最佳实践
AI API 速率限制:工作原理及应对方法

AI API 速率限制:工作原理及应对方法

每个 AI API 都有速率限制。在开发阶段触及限制会很烦人,在生产环境触及限制则会让用户看到错误。理解各供应商的速率限制机制并构建合适的重试逻辑,是区分演示和生产应用的关键。

各供应商的限制方式

OpenAI

OpenAI 根据账户的使用历史和付费等级,采用分层速率限制。

等级 RPM(请求数/分钟) TPM(令牌数/分钟) 获取方式
免费 3 40,000 新账户
一级 500 200,000 支付 $5
二级 5,000 2,000,000 支付 $50
三级 5,000 10,000,000 支付 $100
四级 10,000 50,000,000 支付 $250
五级 10,000 300,000,000 支付 $1,000

限制是按模型计算的。使用 GPT-4.1 不会消耗 GPT-4.1-mini 的配额。

Anthropic

Anthropic 采用类似的分层系统,限制请求数和令牌数。同时,低等级账户还会有每日令牌限制。

Google(Gemini)

Google AI Studio 对每个模型有独立限制。免费层对每日请求数较宽松,但对每分钟请求数限制较紧(Gemini 2.5 Flash 免费层为 15 RPM)。

聚合平台

像 LemonData 和 OpenRouter 这样的聚合平台,在上游限制基础上增加了自己的速率限制层。LemonData 使用基于角色的限制:

角色 RPM
用户 1,000
合作伙伴 3,000
VIP 10,000

优势在于:聚合平台的限制通常高于单个供应商的免费层,且多渠道路由意味着如果一个上游通道被限流,请求会自动切换到另一个通道。

读取速率限制响应头

所有主要供应商都会在响应头中返回速率限制信息:

x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500

要主动使用这些响应头,不要等到出现 429 错误才减速。

构建重试逻辑

错误示范

# 不要这样做
import time

def call_api(messages):
    while True:
        try:
            return client.chat.completions.create(
                model="gpt-4.1",
                messages=messages
            )
        except Exception:
            time.sleep(1)  # 固定延迟,无退避,捕获所有异常

问题:没有指数退避,捕获了不可重试的错误,没有最大重试次数限制,无抖动。

正确示范

import time
import random
from openai import RateLimitError, APIError, APIConnectionError

def call_with_retry(messages, model="gpt-4.1", max_retries=3):
    """使用指数退避和抖动进行重试。"""
    for attempt in range(max_retries + 1):
        try:
            return client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries:
                raise
            # 如果响应中有 retry_after,则使用它
            wait = getattr(e, 'retry_after', None)
            if wait is None:
                wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"速率限制,等待 {wait:.1f} 秒(第 {attempt + 1} 次尝试)")
            time.sleep(wait)
        except APIConnectionError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait)
        except APIError as e:
            # 不重试客户端错误(400,401,403)
            if e.status_code and 400 <= e.status_code < 500:
                raise
            if attempt == max_retries:
                raise
            time.sleep((2 ** attempt) + random.uniform(0, 1))

关键原则:

  • 指数退避:1秒,2秒,4秒,8秒
  • 抖动:随机增加 0-1 秒,防止“群体效应”
  • 优先使用 retry_after 头信息
  • 不重试客户端错误(请求错误、认证失败)
  • 设置最大重试次数

异步版本

import asyncio
import random
from openai import AsyncOpenAI, RateLimitError

async_client = AsyncOpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)

高级:令牌桶速率限制器

对于高吞吐量应用,建议实现客户端速率限制,避免触及服务器限制:

import time
import asyncio

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # 每秒令牌数
        self.capacity = capacity  # 最大突发容量
        self.tokens = capacity
        self.last_refill = time.monotonic()

    async def acquire(self, tokens: int = 1):
        while True:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return
            # 等待足够的令牌
            wait = (tokens - self.tokens) / self.rate
            await asyncio.sleep(wait)

# 500 请求/分钟 ≈ 每秒 8.3 次
limiter = TokenBucket(rate=8.0, capacity=20)

async def rate_limited_call(messages, model="gpt-4.1"):
    await limiter.acquire()
    return await async_client.chat.completions.create(
        model=model,
        messages=messages
    )

速率限制时模型降级

当主模型被限流时,降级调用备用模型:

FALLBACK_CHAIN = [
    "claude-sonnet-4-6",
    "gpt-4.1",
    "gpt-4.1-mini",
]

async def call_with_fallback(messages):
    for model in FALLBACK_CHAIN:
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            continue
    raise Exception("所有模型均被限流")

这正是 API 聚合器的优势所在。一个接口背后有 300+ 模型,始终有备用方案可用。

监控速率限制使用情况

跟踪速率限制的使用情况,提前发现问题,避免影响用户:

import logging

def log_rate_limits(response):
    headers = response.headers
    remaining = headers.get("x-ratelimit-remaining-requests")
    limit = headers.get("x-ratelimit-limit-requests")
    if remaining and int(remaining) < int(limit) * 0.1:
        logging.warning(
            f"速率限制警告:剩余请求数 {remaining}/{limit}"
        )

当剩余容量低于 10% 时设置告警,这样你有时间实施限流,避免用户看到 429 错误。

总结

策略 适用场景
指数退避 所有场景(基础策略)
客户端速率限制器 高吞吐量应用(>100 RPM)
模型降级 有 SLA 要求的生产应用
主动监控 任何生产部署
批量 API 非实时工作负载

目标不是完全避免速率限制,而是优雅地应对,让用户感受不到影响。


构建高可用 AI 应用:lemondata.cc 提供多渠道路由,自动处理上游速率限制。一个 API 密钥,300+ 模型。

分享: