設定

言語

AI APIのRate Limiting:その仕組みと対処法

L
LemonData
·2026年2月26日·652 回表示
AI APIのRate Limiting:その仕組みと対処法

すべてのAI APIにはレート制限があります。開発中にこれに遭遇するのは面倒なだけですが、本番環境で発生すると、ユーザーにはエラーや不完全なストリーム、タイムアウトが表示され、パターンを調査するまではランダムな不具合のように見えてしまいます。

よくある間違いは、レート制限を単一の問題として扱うことです。通常、同じ429エラーの背後には、4つの異なる問題が隠れています。

  • 分あたりのリクエスト数 (requests per minute)
  • 分あたりのトークン数 (tokens per minute)
  • 同時実行リクエスト数 (concurrent in-flight requests)
  • アカウントレベルまたはプロジェクトレベルのクォータ(割当)不足

これらの一つだけに備えて構築しても、他の問題によってシステムは停止してしまいます。

プロバイダーの移行段階にある場合は、まず移行ガイドを読んでください。ゲートウェイがフォールバックや運用オーバーヘッドの削減に役立つかどうかを検討している場合は、OpenRouterの比較が最適な参考資料になります。

レート制限が実際に意味するもの

リクエスト制限

これは明白なものです。短期間にあまりにも多くのリクエストを送信した場合に発生します。

トークン制限

これは多くのチームが過小評価しているものです。たった一つの長いプロンプトが、多くの小さなリクエストと同じくらいの予算を消費することがあります。突然20 KBのシステムプロンプトを追加すると、リクエスト数は正常に見えても、トークン予算がすでに使い果たされているという状況が起こり得ます。

同時実行制限

一部のプロバイダーやゲートウェイは、分あたりの平均値には寛容ですが、一度に50個のストリームを開こうとすると制限をかけます。料金プランには問題がなくても、バーストの形状が許容されないのです。

クォータまたは残高不足

これはダッシュボード上で「レート制限」の症状として現れることがよくあります。運用上の結果(呼び出しが成功しなくなる)が同じだからです。しかし、対処法は異なります。問題が残高ゼロである場合、バックオフ(待機)は無意味です。

プロバイダーによる一般的な制限の適用方法

正確な数値は時間の経過とともに変化するため、公開されている料金表のようなチャートをアプリケーションのドキュメントにハードコーディングすると、すぐに内容が古くなってしまいます。安定したパターンは以下の通りです:

  • OpenAIスタイルのプロバイダーは通常、リクエストとトークンのヘッダーを公開し、アカウントの履歴や使用ティアに基づいて上限を調整します。
  • Anthropicスタイルのプロバイダーは通常、分単位のスループットと、より広範なプロジェクト制限の両方を適用します(特にハイエンドモデルにおいて)。
  • Googleスタイルのプロバイダーは、無料枠と有料枠の挙動を分けていることが多く、モデルファミリーによって制限が大きく異なる場合があります。
  • アグリゲーターは、アップストリームの制約の上にさらにもう一つの制限レイヤーを追加しますが、その代わりに、一つのアップストリームが一時的に飽和したときに他のチャネルへルーティングすることができます。

プロバイダーの制限は定数ではなく、動的な設定として扱うべきです。

レート制限ヘッダーの読み取り

主要なプロバイダーはすべて、レスポンスヘッダーでレート制限情報を返します:

x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500

これらのヘッダーをプロアクティブに使用してください。429エラーが発生してから速度を落とすのでは遅すぎます。

推奨される運用習慣はシンプルです:

  1. 失敗時だけでなく、成功時にもヘッダーをログに記録する。
  2. 残りの容量が閾値を下回ったときにアラートを出す。
  3. 次のリクエストが制限を超える前に、トラフィックを調整(シェイピング)する。

失敗した後にしかヘッダーを確認しないのであれば、すでに後手に回っています。

リトライロジックの構築

間違った方法

# これは避けてください
import time

def call_api(messages):
    while True:
        try:
            return client.chat.completions.create(
                model="gpt-4.1",
                messages=messages
            )
        except Exception:
            time.sleep(1)  # 固定遅延、バックオフなし、すべての例外をキャッチ

問題点:指数バックオフがない、リトライ不可能なエラーまでキャッチしてしまう、最大リトライ制限がない、ジッター(ゆらぎ)がない。

正しい方法

import time
import random
from openai import RateLimitError, APIError, APIConnectionError

def call_with_retry(messages, model="gpt-4.1", max_retries=3):
    """指数バックオフとジッターを伴うリトライ。"""
    for attempt in range(max_retries + 1):
        try:
            return client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries:
                raise
            # レスポンスに retry_after があればそれを使用
            wait = getattr(e, 'retry_after', None)
            if wait is None:
                wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"レート制限に達しました。{wait:.1f}秒待機します (試行 {attempt + 1})")
            time.sleep(wait)
        except APIConnectionError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait)
        except APIError as e:
            # クライアントエラー (400, 401, 403) はリトライしない
            if e.status_code and 400 <= e.status_code < 500:
                raise
            if attempt == max_retries:
                raise
            time.sleep((2 ** attempt) + random.uniform(0, 1))

主要な原則:

  • 指数バックオフ:1秒、2秒、4秒、8秒と増やす
  • ジッター:サンダリングハード現象を防ぐために0〜1秒のランダムな時間を追加する
  • 提供されている場合は retry_after ヘッダーを尊重する
  • クライアントエラー(不正なリクエスト、認証失敗)はリトライしない
  • 最大リトライ回数を設定する

本番環境における2つの追加ルール:

  • ストリーミングエンドポイントで無限にリトライしない
  • 操作が冪等(idempotent)でない限り、すでにユーザーに見える副作用が発生しているリクエストをリトライしない

チャットの補完(Chat completions)は通常リトライしても安全です。しかし、ツールによってトリガーされる副作用はそうでないことが多いです。

Async(非同期)版

import asyncio
import random
from openai import AsyncOpenAI, RateLimitError

async_client = AsyncOpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)

リトライの嵐になる前にトラフィックを調整する

リトライロジックは解決策の半分に過ぎません。アップストリームがすでに過負荷状態にある場合、リトライによって一時的なバーストが自業自得のシステム停止に変わってしまうことがあります。

以下の3つの制御が違いを生みます:

1. テナントまたはユーザーごとのキューイング

一人の顧客が大規模なバッチジョブを開始したとしても、他のすべての顧客がその影響(爆発半径)を受けるべきではありません。

2. 同時実行ストリームの制限

ストリーミングエンドポイントは過小評価されがちです。各リクエストは「安価」に見えますが、長時間開いたままになるからです。

3. 送信前にプロンプトをトリミングする

トークン制限が本当の上限であることがよくあります。プロンプトの長さが2倍になると、安全なスループットはおよそ半分になります。

クライアントサイドのトークンバケット

高スループットのアプリケーションでは、サーバーの制限に達しないようにクライアントサイドでレート制限を実装します:

import time
import asyncio

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # 秒あたりのトークン数
        self.capacity = capacity  # 最大バーストサイズ
        self.tokens = capacity
        self.last_refill = time.monotonic()

    async def acquire(self, tokens: int = 1):
        while True:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return
            # トークンが貯まるまで待機
            wait = (tokens - self.tokens) / self.rate
            await asyncio.sleep(wait)

# 500リクエスト/分 = 約8.3リクエスト/秒
limiter = TokenBucket(rate=8.0, capacity=20)

async def rate_limited_call(messages, model="gpt-4.1"):
    await limiter.acquire()
    return await async_client.chat.completions.create(
        model=model,
        messages=messages
    )

トークンバケットは、上限がわかっている場合に有効です。さらに、ハードコーディングされた推測値ではなく、観測されたヘッダーデータから調整するとより効果的です。

レート制限時のモデルフォールバック

プライマリモデルがレート制限に達した場合は、代替モデルにフォールバックします:

FALLBACK_CHAIN = [
    "claude-sonnet-4-6",
    "gpt-4.1",
    "gpt-4.1-mini",
]

async def call_with_fallback(messages):
    for model in FALLBACK_CHAIN:
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            continue
    raise Exception("すべてのモデルでレート制限に達しました")

ここでモデルゲートウェイが役立ちますが、フォールバックは意図的である必要があります。ユーザーへの影響を考えずに、プレミアムな推論モデルから安価な小型モデルへ黙って切り替えるようなことはしないでください。

合理的なフォールバックチェーンは以下の通りです:

  • 同じプロバイダーの、より小規模な兄弟モデル
  • 別のプロバイダーの、同等のモデルファミリー
  • その後に初めて、より安価またはコンテキストの短いモデル

「可用性のためのフォールバック」と「コストのためのフォールバック」を一つのステップで混ぜてしまうと、デバッグが非常に困難になります。

レート制限の使用状況の監視

ユーザーに影響が出る前に問題を察知できるよう、レート制限の消費状況を追跡してください:

import logging

def log_rate_limits(response):
    headers = response.headers
    remaining = headers.get("x-ratelimit-remaining-requests")
    limit = headers.get("x-ratelimit-limit-requests")
    if remaining and int(remaining) < int(limit) * 0.1:
        logging.warning(
            f"レート制限警告: 残り {remaining}/{limit} リクエスト"
        )

残り容量が10%を下回ったときにアラートを設定します。これにより、ユーザーが429エラーに遭遇する前にスロットリングを実装する余裕が生まれます。

また、以下の項目もログに記録すべきです:

  • リクエストID
  • モデル
  • 入力サイズの推定値
  • ストリームの持続時間
  • リトライ回数
  • 最終結果 (success, rate_limited, network_error, quota_exhausted)

これらのフィールドがなければ、レート制限のインシデント調査は単なる推測になってしまいます。

シンプルな本番環境チェックリスト

チャットボットやエージェントを「レート制限に対して安全」と呼ぶ前に、以下の5項目を確認してください:

  1. 同期・非同期の両方のパスに、制限付きのリトライポリシーが存在する。
  2. 成功したレスポンスのレート制限ヘッダーをログに記録している。
  3. アップストリーム呼び出しの前に、ユーザーごとまたはテナントごとの調整(シェイピング)が存在する。
  4. 検証済みのフォールバックモデルが少なくとも一つ存在する。
  5. フロントエンドが、ハングしたストリームではなく、クリーンなエラー状態を受け取れるようになっている。

リトライの基本部分だけでなく、アプリケーション全体を構築している場合は、1つのキーで構築するチャットボットガイドで、これらの要素が実際のFastAPIサービスにどのように組み込まれるかを確認できます。

まとめ

レート制限は特殊なケースではありません。実際に利用されているAI製品にとって、それは通常の運用条件です。うまく対処しているチームは、魔法のような高い制限を持っているわけではありません。彼らはスループット、リトライ、フォールバックを最初からアプリケーション設計の一部として扱っているのです。

LemonDataでAPIキーを作成し、本番トラフィックが来る前にリトライパスをテストし、次の429エラーが発生する前に備えを固めましょう。

戦略 使用すべき場面
指数バックオフ 常に(基本)
クライアントサイドのレートリミッター 高スループットアプリ (>100 RPM)
モデルフォールバック SLA要件のある本番アプリ
プロアクティブな監視 あらゆる本番デプロイメント
Batch API リアルタイム性を求めないワークロード

目標はレート制限を完全に避けることではありません。ユーザーが気づかないように、優雅に処理することです。


レジリエントなAIアプリケーションを構築しましょう:lemondata.ccは、アップストリームのレート制限を自動的に処理するマルチチャネルルーティングを提供します。一つのAPIキーで300以上のモデルを利用可能です。

Share: