设置

语言

使用一个 API Key 构建 AI 聊天机器人:30 分钟从零到生产环境

L
LemonData
·2026年2月26日·557 次浏览
使用一个 API Key 构建 AI 聊天机器人:30 分钟从零到生产环境

本教程将使用 FastAPI、SSE 流式传输、对话记忆和模型切换功能,构建一个虽小但已具备生产能力的聊天机器人服务。我们的目标不是交付一个玩具演示,而是建立一个你可以真正投入产品前端并安全迭代的后端。

如果你已经将一个兼容 OpenAI 的 SDK 指向了 LemonData,那么本文将从那里开始。如果你还没有进行 Base URL 替换,请先阅读迁移指南。如果你主要关心负载下的请求整形和退避策略,请将本指南与 AI API 速率限制指南结合阅读。

我们要构建的内容

完成后的服务包含六个核心部分:

  1. 一个同步的 /chat 端点,用于冒烟测试。
  2. 一个流式的 /chat/stream 端点,用于真实 UI。
  3. conversation_id 为键的对话状态。
  4. 模型白名单,防止前端请求任意 ID。
  5. 错误处理机制,不会在遇到第一个 429 错误时崩溃。
  6. 从内存原型到 Redis 或 PostgreSQL 的清晰升级路径。

这足以支撑一个支持机器人、内部助手或嵌入式聊天组件的第一个版本。

安装最小化技术栈

pip install fastapi uvicorn openai pydantic redis

第一遍尝试时可以省略 redis,但现在就把 import 语句写好会很有用,这样升级路径就很明显了。

步骤 1:从一个简单乏味的聊天端点开始

开发聊天机器人最容易迷失方向的方式是在基础请求路径稳定之前就开始搞 Websockets、工具调用和智能体编排。先从一个小的端点开始,证明你的 API Key、Base URL 和模型路由是正确的。

from fastapi import FastAPI
from openai import OpenAI
from pydantic import BaseModel

app = FastAPI()

client = OpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4.1-mini"
    conversation_id: str | None = None

@app.post("/chat")
async def chat(req: ChatRequest):
    response = client.chat.completions.create(
        model=req.model,
        messages=[{"role": "user", "content": req.message}]
    )
    return {"reply": response.choices[0].message.content}

运行一次冒烟测试。如果这里失败了,就不要继续堆叠功能。

步骤 2:添加流式传输,因为用户在衡量延迟之前就能感受到延迟

大多数聊天机器人产品让人感觉慢,并不是因为模型慢,而是因为在完整响应到达之前 UI 一直是空白的。SSE(Server-Sent Events)对于许多聊天产品来说已经足够,且运维负担比 Websockets 更低。

from fastapi.responses import StreamingResponse

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    def generate():
        stream = client.chat.completions.create(
            model=req.model,
            messages=[{"role": "user", "content": req.message}],
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {delta.content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

在前端,最简单的浏览器端客户端就足够了:

async function sendMessage(payload) {
  const response = await fetch('/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(payload),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const chunk = decoder.decode(value, { stream: true });
    console.log(chunk);
  }
}

如果你的产品已经在使用浏览器客户端和标准 HTTP,SSE 可以保持架构的简洁性。

步骤 3:将对话状态移出请求体

第一个聊天机器人演示通常将完整的对话记录保存在浏览器中,并在每次轮转时发送。这在原型阶段可行,但一旦你需要重试、可恢复会话或服务端工具,就会变得一团糟。

开始时使用内存存储即可:

from collections import defaultdict
import uuid

conversations: dict[str, list] = defaultdict(list)
SYSTEM_PROMPT = "You are a helpful assistant. Be concise and direct."

def build_messages(conv_id: str, user_msg: str) -> list:
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    history = conversations[conv_id][-20:]
    messages.extend(history)
    messages.append({"role": "user", "content": user_msg})
    conversations[conv_id].append({"role": "user", "content": user_msg})
    return messages

升级到 Redis 主要是存储层面的对接:

import json
import redis

redis_client = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

def load_history(conv_id: str) -> list:
    raw = redis_client.get(f"chat:{conv_id}")
    return json.loads(raw) if raw else []

def save_history(conv_id: str, history: list) -> None:
    redis_client.setex(f"chat:{conv_id}", 60 * 60 * 24, json.dumps(history))

如果对话需要 TTL(过期时间)、可恢复性或多实例部署,请使用 Redis。如果对话记录本身就是产品数据,请使用 PostgreSQL。

步骤 4:将错误视为产品行为,而不仅仅是异常

如果你的聊天机器人是面向客户的,失败路径与成功路径同样重要。用户不在乎失败是源于速率限制、余额不足还是模型宕机,他们在乎的是 UI 是否卡死。

from openai import APIConnectionError, APIError, RateLimitError

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    conv_id = req.conversation_id or str(uuid.uuid4())
    messages = build_messages(conv_id, req.message)

    def generate():
        full_response = []
        try:
            stream = client.chat.completions.create(
                model=req.model,
                messages=messages,
                stream=True
            )
            for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    full_response.append(delta.content)
                    yield f"data: {delta.content}\n\n"
        except RateLimitError:
            yield "data: [ERROR] The model is busy. Please retry in a few seconds.\n\n"
        except APIConnectionError:
            yield "data: [ERROR] Temporary network issue. Please retry.\n\n"
        except APIError as error:
            yield f"data: [ERROR] {error.message}\n\n"
        else:
            conversations[conv_id].append(
                {"role": "assistant", "content": "".join(full_response)}
            )
        finally:
            yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Conversation-ID": conv_id}
    )

如果你正在承载有意义的流量,你还应该在请求到达上游之前对其进行整形。详细模式请参考 速率限制指南,简而言之:使用有限次数的重试、使用抖动(jitter),并且永远不要使用 except Exception: time.sleep(1)

步骤 5:模型切换需要白名单,而不是自由文本框

一个 API Key 可以访问数百个模型。但这并不意味着你的 UI 应该暴露数百个模型。后端应该发布一个与你的用例匹配的小型白名单。

AVAILABLE_MODELS = {
    "fast": "gpt-4.1-mini",
    "balanced": "claude-sonnet-4-6",
    "reasoning": "o3",
    "budget": "deepseek-chat",
}

@app.get("/models")
async def list_models():
    return {"models": AVAILABLE_MODELS}

这样做有三个好处:

  • 防止前端请求无效或已弃用的模型 ID
  • 让你以后可以重新映射层级,而无需重新部署每个客户端
  • 为你提供了一个实施成本控制的统一入口

如果你的团队仍在决定标准化哪些供应商,价格对比OpenRouter vs LemonData 对比是锁定白名单前值得阅读的两个页面。

步骤 6:在流量到来之前处理生产环境的边缘情况

聊天机器人后端达到生产级别,取决于周围边缘情况的处理,而不是核心聊天调用有多聪明。

清单很短:

  • 添加请求 ID,以便你可以将前端失败与后端日志关联起来
  • 限制单用户并发数和请求大小
  • 在长对话记录撑爆你的 token 预算之前对其进行修剪
  • 记录模型、延迟、输入大小和结束原因
  • 将用户可见的错误消息与内部错误详情分开
  • 测试一个备选模型,以便在第一次宕机发生前知道备选方案是可用的

历史记录修剪可以保持简单:

def trim_history(messages: list, max_tokens: int = 8000) -> list:
    system = messages[0]
    history = messages[1:]
    total_chars = len(system["content"])
    trimmed = []

    for msg in reversed(history):
        msg_chars = len(msg["content"])
        if total_chars + msg_chars > max_tokens * 4:
            break
        trimmed.insert(0, msg)
        total_chars += msg_chars

    return [system] + trimmed

重点不在于 token 级别的精确计算,而在于防止明显的上下文溢出。

从演示到产品

一旦这个后端稳定下来,下一次升级很少是“更多的 AI”,通常是枯燥的基础设施:

  • 身份验证,防止一个用户读取另一个用户的对话
  • 持久化,使会话在部署后依然存在
  • 速率限制,防止一个高频用户耗尽你的配额
  • 如果聊天机器人面向客户,则需要计费或用量归属
  • 如果对话需要长期记忆,则需要后台摘要功能

这就是为什么统一网关会有所帮助。一旦你完成了 Base URL 迁移,模型变更就不再是平台重写,而变成了配置调整。

冒烟测试

uvicorn main:app --reload --port 8000

curl -N -X POST http://localhost:8000/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"message": "Hello!", "model": "gpt-4.1-mini"}'

如果你能流式传输一轮对话、保留一个会话,并在强制失败时返回清晰的错误,你就拥有了正确的根基。

成本估算

LemonData 创建一个 API Key,将你的 OpenAI SDK 指向 https://api.lemondata.cc/v1,你就可以在不管理多个供应商账户的情况下,发布聊天机器人的第一个生产版本。

模型 每日成本 每月成本
GPT-4.1-mini 约 $2.40 约 $72
GPT-4.1 约 $12.00 约 $360
Claude Sonnet 4.6 约 $18.00 约 $540
DeepSeek V3 约 $1.68 约 $50

对于大多数应用,大多数对话使用 GPT-4.1-mini,仅在用户要求时升级到 Claude Sonnet 4.6,可以将每月成本控制在 $100 以内。


获取你的 API Key:lemondata.cc 通过一个端点提供 300 多个模型。注册即送 $1 免费额度。

分享: