الإعدادات

اللغة

قيود معدل طلبات API الذكاء الاصطناعي: آلية العمل وكيفية التعامل معها

L
LemonData
·٢٦ فبراير ٢٠٢٦·649 مشاهدة
قيود معدل طلبات API الذكاء الاصطناعي: آلية العمل وكيفية التعامل معها

كل AI API لها حدود للمعدل (rate limits). مواجهتها أثناء التطوير أمر مزعج، أما مواجهتها في بيئة الإنتاج فتعني ظهور أخطاء للمستخدمين، وتدفقات جزئية (partial streams)، وانقطاعات تبدو عشوائية حتى تفحص النمط.

الخطأ الأساسي هو التعامل مع تحديد المعدل (rate limiting) كمشكلة واحدة. في الغالب، هي أربع مشكلات مختلفة تختبئ خلف نفس الخطأ 429:

  • الطلبات في الدقيقة (requests per minute)
  • الـ tokens في الدقيقة (tokens per minute)
  • الطلبات المتزامنة قيد التنفيذ (concurrent in-flight requests)
  • استنفاد الحصة على مستوى الحساب أو المشروع (account-level or project-level quota exhaustion)

إذا قمت بالبناء لمعالجة واحدة فقط من هذه المشكلات، فستظل المشكلات الأخرى تؤثر عليك.

إذا كنت لا تزال في مرحلة الانتقال بين المزودين، فاقرأ دليل الانتقال أولاً. أما إذا كنت تقيم ما إذا كانت البوابة (gateway) تساعد في التبديل التلقائي (fallback) وتقليل الأعباء التشغيلية، فإن مقارنة OpenRouter هي أفضل مرجع مصاحب.

ماذا تعني حدود المعدل (Rate Limits) فعلياً

حدود الطلبات (Request limits)

هذا هو النوع الواضح؛ حيث قمت بإرسال عدد كبير جداً من الطلبات في نافذة زمنية قصيرة.

حدود الـ Token (Token limits)

هذا هو النوع الذي تستهين به الفرق. يمكن لـ prompt طويل واحد أن يستهلك من الميزانية ما يستهلكه العديد من الطلبات الصغيرة. إذا أضفت فجأة system prompt بحجم 20 كيلوبايت، فقد يبدو عدد الطلبات طبيعياً بينما تكون ميزانية الـ tokens قد نفدت بالفعل.

حدود التزامن (Concurrency limits)

بعض المزودين والبوابات يقبلون متوسط طلباتك في الدقيقة تماماً حتى تفتح خمسين تدفقاً (streams) في وقت واحد. خطة المعدل قد تكون جيدة، لكن شكل الاندفاع (burst shape) ليس كذلك.

استنفاد الحصة أو الرصيد (Quota or balance exhaustion)

غالباً ما يظهر هذا كعرض من أعراض "تحديد المعدل" في لوحات التحكم لأن النتيجة التشغيلية واحدة: تتوقف الطلبات عن النجاح. لكن طريقة المعالجة تختلف؛ فإعادة المحاولة (backoff) لا فائدة منها إذا كانت المشكلة هي رصيد صفري.

كيف يفرض المزودون القيود عادةً

تتغير الأرقام الدقيقة بمرور الوقت، ولهذا السبب فإن كتابة جدول أسعار ثابت في توثيق تطبيقك يعد أمراً سيئاً. النمط المستقر هو كالتالي:

  • المزودون بأسلوب OpenAI عادةً ما يعرضون رؤوس (headers) الطلبات والـ tokens، ويقومون بتعديل سقف الاستهلاك بناءً على تاريخ الحساب أو فئة الاستخدام.
  • المزودون بأسلوب Anthropic يفرضون عادةً كلاً من الإنتاجية على مستوى الدقيقة وحدود المشروع الأوسع، خاصة في النماذج المتطورة.
  • المزودون بأسلوب Google غالباً ما يفصلون بين سلوك الفئة المجانية والمدفوعة، وقد تختلف الحدود بشكل حاد حسب عائلة النموذج.
  • المجمعات (Aggregators) تضيف طبقة قيود أخرى فوق قيود المصدر، ولكن في المقابل يمكنها التوجيه إلى قنوات أخرى عندما يتشبع أحد المصادر مؤقتاً.

تعامل مع حدود المزود كإعدادات حية، وليس كقيم ثابتة.

قراءة رؤوس تحديد المعدل (Rate Limit Headers)

يعيد جميع المزودين الرئيسيين معلومات حدود المعدل في رؤوس الاستجابة (response headers):

x-ratelimit-limit-requests: 500
x-ratelimit-remaining-requests: 499
x-ratelimit-reset-requests: 60s
x-ratelimit-limit-tokens: 200000
x-ratelimit-remaining-tokens: 199500

استخدم هذه الـ headers بشكل استباقي. لا تنتظر حدوث خطأ 429 لتبطئ العمل.

العادة التشغيلية التي تريدها بسيطة:

  1. سجل الـ headers عند النجاح، وليس فقط عند الفشل.
  2. أرسل تنبيهاً عندما تنخفض السعة المتبقية عن حد معين.
  3. نظم حركة المرور قبل أن يتجاوز الطلب التالي الحد المسموح.

إذا كنت تنظر إلى الـ headers فقط بعد الفشل، فأنت متأخر بالفعل.

بناء منطق إعادة المحاولة (Retry Logic)

الطريقة الخاطئة

# لا تفعل هذا
import time

def call_api(messages):
    while True:
        try:
            return client.chat.completions.create(
                model="gpt-4.1",
                messages=messages
            )
        except Exception:
            time.sleep(1)  # تأخير ثابت، لا يوجد تراجع أسي، يلتقط كل شيء

المشكلات: لا يوجد تراجع أسي (exponential backoff)، يلتقط أخطاء غير قابلة لإعادة المحاولة، لا يوجد حد أقصى لإعادة المحاولة، ولا يوجد تذبذب عشوائي (jitter).

الطريقة الصحيحة

import time
import random
from openai import RateLimitError, APIError, APIConnectionError

def call_with_retry(messages, model="gpt-4.1", max_retries=3):
    """إعادة المحاولة مع تراجع أسي وتذبذب عشوائي."""
    for attempt in range(max_retries + 1):
        try:
            return client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError as e:
            if attempt == max_retries:
                raise
            # استخدم retry_after من الاستجابة إذا كان متاحاً
            wait = getattr(e, 'retry_after', None)
            if wait is None:
                wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
            time.sleep(wait)
        except APIConnectionError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait)
        except APIError as e:
            # لا تعد المحاولة في أخطاء العميل (400, 401, 403)
            if e.status_code and 400 <= e.status_code < 500:
                raise
            if attempt == max_retries:
                raise
            time.sleep((2 ** attempt) + random.uniform(0, 1))

المبادئ الأساسية:

  • التراجع الأسي (Exponential backoff): 1 ثانية، 2 ثانية، 4 ثوانٍ، 8 ثوانٍ.
  • التذبذب العشوائي (Jitter): إضافة 0-1 ثانية عشوائية لمنع "القطيع المتزاحم" (thundering herd).
  • احترام رأس retry_after عندما يتم توفيره.
  • عدم إعادة المحاولة في أخطاء العميل (طلب سيئ، فشل المصادقة).
  • تحديد حد أقصى لعدد محاولات إعادة المحاولة.

هناك قاعدتان إضافيتان مهمتان للإنتاج:

  • لا تقم أبداً بإعادة المحاولة إلى الأبد في نقطة نهاية تدفق (streaming endpoint).
  • لا تقم أبداً بإعادة محاولة طلب مرتبط بالفعل بآثار جانبية مرئية للمستخدم ما لم تكن العملية متكررة (idempotent).

إكمال الدردشة (Chat completions) عادة ما يكون آمناً لإعادة المحاولة، أما الآثار الجانبية التي تطلقها الأدوات (tools) فغالباً لا تكون كذلك.

نسخة Async

import asyncio
import random
from openai import AsyncOpenAI, RateLimitError

async_client = AsyncOpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

async def call_with_retry_async(messages, model="gpt-4.1", max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            if attempt == max_retries:
                raise
            wait = (2 ** attempt) + random.uniform(0, 1)
            await asyncio.sleep(wait)

تنظيم حركة المرور قبل أن تتحول إلى عاصفة إعادة محاولة

منطق إعادة المحاولة هو نصف الحل فقط. إذا كان المصدر محملاً بالفعل، يمكن لإعادة المحاولة أن تحول اندفاعاً واحداً إلى انقطاع ذاتي الخدمة.

ثلاثة ضوابط تصنع الفارق:

1. الانتظار حسب العميل أو المستخدم (Queue by tenant or user)

إذا بدأ أحد العملاء مهمة دفعية (batch job) ضخمة، فأنت لا تريد أن يتأثر بقية العملاء بهذا الضرر.

2. تحديد التدفقات المتزامنة (Cap concurrent streams)

من السهل الاستهانة بنقاط نهاية التدفق لأن كل طلب "يبدو" رخيصاً بينما يظل مفتوحاً لفترة طويلة.

3. تقليص الـ prompts قبل إرسالها

حدود الـ tokens غالباً ما تكون السقف الحقيقي. الـ prompt الذي يبلغ طوله ضعف الطول المعتاد يقلل الإنتاجية الآمنة إلى النصف تقريباً.

خوارزمية "سطل الرموز" (Token Bucket) من جانب العميل

للتطبيقات ذات الإنتاجية العالية، قم بتنفيذ تحديد المعدل من جانب العميل لتجنب الوصول إلى حدود الخادم:

import time
import asyncio

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # tokens في الثانية
        self.capacity = capacity  # أقصى حجم للاندفاع
        self.tokens = capacity
        self.last_refill = time.monotonic()

    async def acquire(self, tokens: int = 1):
        while True:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_refill = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return
            # الانتظار للحصول على tokens كافية
            wait = (tokens - self.tokens) / self.rate
            await asyncio.sleep(wait)

# 500 طلب في الدقيقة = ~8.3 في الثانية
limiter = TokenBucket(rate=8.0, capacity=20)

async def rate_limited_call(messages, model="gpt-4.1"):
    await limiter.acquire()
    return await async_client.chat.completions.create(
        model=model,
        messages=messages
    )

خوارزميات Token buckets جيدة عندما تعرف سقف استهلاكك. وهي أفضل عندما تقوم بضبطها بناءً على بيانات الـ headers الملاحظة بدلاً من التخمين الثابت.

التبديل إلى نموذج بديل (Model Fallback) عند بلوغ حدود المعدل

عندما يصل نموذجك الأساسي إلى حد المعدل، انتقل إلى بديل:

FALLBACK_CHAIN = [
    "claude-sonnet-4-6",
    "gpt-4.1",
    "gpt-4.1-mini",
]

async def call_with_fallback(messages):
    for model in FALLBACK_CHAIN:
        try:
            return await async_client.chat.completions.create(
                model=model,
                messages=messages
            )
        except RateLimitError:
            continue
    raise Exception("All models rate limited")

هنا تبرز فائدة بوابات النماذج (model gateways)، ولكن فقط إذا كان التبديل مدروساً. لا تقفز بصمت من نموذج تفكير متميز إلى نموذج ميزانية صغير دون التفكير في التأثير على المستخدم.

سلسلة التبديل (fallback chain) المعقولة هي:

  • نفس المزود، نموذج أصغر من نفس العائلة.
  • عائلة نماذج مكافئة من مزود آخر.
  • فقط بعد ذلك، نموذج أرخص أو بسياق (context) أقل.

إذا خلطت بين "التبديل من أجل التوفر" و"التبديل من أجل التكلفة" في خطوة واحدة، فسيصبح تصحيح الأخطاء معقداً بسرعة.

مراقبة استخدام حدود المعدل

تتبع استهلاك حدود المعدل لاكتشاف المشكلات قبل أن تؤثر على المستخدمين:

import logging

def log_rate_limits(response):
    headers = response.headers
    remaining = headers.get("x-ratelimit-remaining-requests")
    limit = headers.get("x-ratelimit-limit-requests")
    if remaining and int(remaining) < int(limit) * 0.1:
        logging.warning(
            f"Rate limit warning: {remaining}/{limit} requests remaining"
        )

قم بإعداد تنبيهات عندما تنخفض السعة المتبقية عن 10%. يمنحك هذا وقتاً لتنفيذ تقليل الاستهلاك (throttling) قبل أن يرى المستخدمون أخطاء 429.

يجب عليك أيضاً تسجيل:

  • معرف الطلب (request ID)
  • النموذج (model)
  • تقدير حجم المدخلات
  • مدة التدفق (stream duration)
  • عدد محاولات إعادة المحاولة
  • النتيجة النهائية (success, rate_limited, network_error, quota_exhausted)

بدون هذه الحقول، تصبح حوادث حدود المعدل مجرد تخمين.

قائمة مراجعة بسيطة لبيئة الإنتاج

قبل أن تصف برنامج الدردشة أو العميل الخاص بك بأنه "آمن من حدود المعدل"، تحقق من هذه العناصر الخمسة:

  1. وجود سياسة إعادة محاولة محددة لكل من المسارات المتزامنة وغير المتزامنة.
  2. تسجيل رؤوس حدود المعدل في الاستجابات الناجحة.
  3. وجود تنظيم لكل مستخدم أو لكل عميل قبل استدعاء المصدر.
  4. وجود نموذج بديل (fallback) واحد تم التحقق منه على الأقل.
  5. تلقي الواجهة الأمامية لحالة خطأ واضحة بدلاً من تدفق معلق.

إذا كنت تبني التطبيق بالكامل وليس فقط منطق إعادة المحاولة، فإن دليل بناء chatbot بمفتاح واحد يوضح كيف تتناسب هذه القطع مع خدمة FastAPI حقيقية.

الخلاصة

تحديد المعدل ليس حالة نادرة؛ بل هو حالة تشغيل طبيعية لأي منتج AI لديه استخدام حقيقي. الفرق التي تتعامل معه بشكل جيد لا تملك حدوداً سحرية أعلى، بل تتعامل مع الإنتاجية، وإعادة المحاولة، والتبديل كجزء من تصميم التطبيق منذ البداية.

أنشئ مفتاح API في LemonData، واختبر مسار إعادة المحاولة قبل حركة مرور الإنتاج، واستعد للخطأ 429 القادم قبل وصوله.

الاستراتيجية متى تستخدمها
التراجع الأسي (Exponential backoff) دائماً (كأساس)
محدد المعدل من جانب العميل التطبيقات ذات الإنتاجية العالية (>100 RPM)
التبديل إلى نموذج بديل (Model fallback) تطبيقات الإنتاج مع متطلبات SLA
المراقبة الاستباقية أي نشر في بيئة الإنتاج
Batch API أعباء العمل التي لا تتطلب وقتاً حقيقياً

الهدف ليس تجنب حدود المعدل تماماً، بل التعامل معها بسلاسة حتى لا يلاحظها المستخدمون أبداً.


ابنِ تطبيقات AI مرنة: يوفر lemondata.cc توجيهاً متعدد القنوات يتعامل تلقائياً مع حدود المعدل للمصادر. مفتاح API واحد، أكثر من 300 نموذج.

Share: