設定

語言

AI API 速率限制:運作原理與應對方法

L
LemonData
·2026年2月26日·25 次瀏覽
#速率限制#生產環境#錯誤處理#教學#最佳實踐
AI API 速率限制:運作原理與應對方法

AI API 速率限制 (Rate Limiting):運作原理與處理方法

每個 AI API 都有速率限制。在開發階段遇到會令人煩惱,但在正式環境中遇到,使用者就會看到錯誤。了解不同供應商的速率限制運作方式並建立正確的重試邏輯 (retry logic),是展示專案與正式產品之間的關鍵差異。

各家供應商如何限制你

OpenAI

OpenAI 根據您的帳戶使用歷史和付費等級採用分層速率限制。

層級 (Tier) RPM (每分鐘請求數) TPM (每分鐘 Token 數) 達成條件
Free 3 40,000 新帳戶
Tier 1 500 200,000 已支付 $5
Tier 2 5,000 2,000,000 已支付 $50
Tier 3 5,000 10,000,000 已支付 $100
Tier 4 10,000 50,000,000 已支付 $250
Tier 5 10,000 300,000,000 已支付 $1,000

限制是針對每個模型的。使用 GPT-4.1 不會消耗您的 GPT-4.1-mini 配額。

Anthropic

Anthropic 使用類似的分層系統,同時具有 RPM 和 TPM 限制。他們還對較低層級實施每日 Token 限制。

Google (Gemini)

Google AI Studio 具有針對每個模型的限制。免費層級在每日請求量上很慷慨,但在每分鐘速率上較嚴格(Gemini 2.5 Flash 免費層級為 15 RPM)。

聚合平台 (Aggregator Platforms)

像 LemonData 和 OpenRouter 這樣的聚合平台在原廠限制之上增加了自己的速率限制層。LemonData 使用基於角色的限制:

角色 (Role) RPM
User 1,000
Partner 3,000
VIP 10,000

優勢:聚合平台的限制通常高於單個供應商的免費層級,且多通道路由意味著如果一個上游通道被限流,請求會自動路由到另一個通道。

讀取速率限制標頭 (Headers)

所有主要的供應商都會在回應標頭中返回速率限制資訊:

x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500

主動使用這些標頭。不要等到出現 429 錯誤才減速。

建立重試邏輯 (Retry Logic)

錯誤的做法

# 不要這樣做
import time

def call_api(messages):
    while True:
        try:
            return client.chat.completions.create(
                model="gpt-4.1",
                messages=messages
            )
        except Exception:
            time.sleep(1)  # 固定延遲,無退避,捕獲所有錯誤

問題:沒有指數退避 (exponential backoff)、捕獲了不可重試的錯誤、沒有最大重試限制、沒有抖動 (jitter)。

正確的做法

import time
import random
from openai import RateLimitError, APIError, APIConnectionError

def call_with_retry(messages, model="gpt-4.1", max_retries=3):
    """使用指數退避和抖動進行重試。"""
    for attempt in range(max_retries + 1):
        try:
            return client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries:
                raise
            # 如果可用,使用回應中的 retry_after
            wait = getattr(e, 'retry_after', None)
            if wait is None:
                wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
            time.sleep(wait)
        except APIConnectionError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait)
        except APIError as e:
            # 不要重試客戶端錯誤 (400, 401, 403)
            if e.status_code and 400 <= e.status_code < 500:
                raise
            if attempt == max_retries:
                raise
            time.sleep((2 ** attempt) + random.uniform(0, 1))

核心原則:

  • 指數退避:1秒、2秒、4秒、8秒
  • 抖動 (Jitter):增加隨機的 0-1 秒以防止驚群效應 (thundering herd)
  • 尊重提供的 retry_after 標頭
  • 不要重試客戶端錯誤(錯誤請求、認證失敗)
  • 設置最大重試次數

非同步版本 (Async Version)

import asyncio
import random
from openai import AsyncOpenAI, RateLimitError

async_client = AsyncOpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)

進階:權杖桶 (Token Bucket) 速率限制器

對於高吞吐量的應用程式,請實作客戶端速率限制以避免觸發伺服器限制:

import time
import asyncio

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # 每秒 token 數
        self.capacity = capacity  # 最大突發容量
        self.tokens = capacity
        self.last_refill = time.monotonic()

    async def acquire(self, tokens: int = 1):
        while True:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return
            # 等待足夠的 token
            wait = (tokens - self.tokens) / self.rate
            await asyncio.sleep(wait)

# 每分鐘 500 次請求 = 每秒約 8.3 次
limiter = TokenBucket(rate=8.0, capacity=20)

async def rate_limited_call(messages, model="gpt-4.1"):
    await limiter.acquire()
    return await async_client.chat.completions.create(
        model=model,
        messages=messages
    )

速率限制時的模型備援 (Fallback)

當您的主要模型被限流時,回退到備選模型:

FALLBACK_CHAIN = [
    "claude-sonnet-4-6",
    "gpt-4.1",
    "gpt-4.1-mini",
]

async def call_with_fallback(messages):
    for model in FALLBACK_CHAIN:
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            continue
    raise Exception("所有模型均已達到速率限制")

這正是 API 聚合平台大顯身手的地方。一個端點背後有 300 多個模型,您始終有備援可用。

監控速率限制使用情況

追蹤您的速率限制消耗,以便在影響使用者之前發現問題:

import logging

def log_rate_limits(response):
    headers = response.headers
    remaining = headers.get("x-ratelimit-remaining-requests")
    limit = headers.get("x-ratelimit-limit-requests")
    if remaining and int(remaining) < int(limit) * 0.1:
        logging.warning(
            f"速率限制警告:剩餘 {remaining}/{limit} 次請求"
        )

當剩餘容量低於 10% 時設置警報。這讓您有時間在使用者看到 429 錯誤之前實施流量控制 (throttling)。

總結

策略 何時使用
指數退避 (Exponential backoff) 始終使用(基礎配置)
客戶端速率限制器 高吞吐量應用 (>100 RPM)
模型備援 (Model fallback) 有 SLA 要求的正式環境應用
主動監控 任何正式環境部署
Batch API 非即時性的工作負載

目標不是完全避免速率限制,而是優雅地處理它們,讓您的使用者永遠不會察覺。


構建具備韌性的 AI 應用程式:lemondata.cc 提供自動處理上游速率限制的多通道路由。一個 API 金鑰,300+ 個模型。

Share: