设置

语言

使用一个API密钥构建AI聊天机器人:30分钟从零到上线

L
LemonData
·2026年2月26日·15 次浏览
#聊天机器人#教程#Python#FastAPI#流式传输
使用一个API密钥构建AI聊天机器人:30分钟从零到上线

用一个 API 密钥构建 AI 聊天机器人:30 分钟从零到上线

本教程将构建一个生产级的 AI 聊天机器人后端,支持流式响应、对话历史、模型切换和完善的错误处理。我们将使用 Python、FastAPI 以及指向 API 聚合器的 OpenAI SDK,这样你就可以使用任何模型。

前提条件

pip install fastapi uvicorn openai

步骤 1:基础聊天接口

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel

app = FastAPI()

client = OpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4.1-mini"
    conversation_id: str | None = None

@app.post("/chat")
async def chat(req: ChatRequest):
    response = client.chat.completions.create(
        model=req.model,
        messages=[{"role": "user", "content": req.message}]
    )
    return {"reply": response.choices[0].message.content}

这段代码能工作,但没有流式响应、没有历史记录,也没有错误处理。我们来改进它。

步骤 2:添加流式响应

流式响应会在生成令牌时即时发送,而不是等待完整回复。用户可以实时看到回复内容的生成过程。

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    def generate():
        stream = client.chat.completions.create(
            model=req.model,
            messages=[{"role": "user", "content": req.message}],
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {delta.content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

步骤 3:对话历史

将对话历史存储在内存中(生产环境可替换为 Redis 或数据库)。

from collections import defaultdict
import uuid

conversations: dict[str, list] = defaultdict(list)

SYSTEM_PROMPT = "你是一个乐于助人的助手。请简洁明了地回答。"

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    conv_id = req.conversation_id or str(uuid.uuid4())

    # 构建消息历史
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    messages.extend(conversations[conv_id])
    messages.append({"role": "user", "content": req.message})

    # 存储用户消息
    conversations[conv_id].append(
        {"role": "user", "content": req.message}
    )

    def generate():
        full_response = []
        stream = client.chat.completions.create(
            model=req.model,
            messages=messages,
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                full_response.append(delta.content)
                yield f"data: {delta.content}\n\n"

        # 存储助手回复
        conversations[conv_id].append(
            {"role": "assistant", "content": "".join(full_response)}
        )
        yield f"data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Conversation-ID": conv_id}
    )

步骤 4:错误处理

AI API 调用可能因多种原因失败:速率限制、余额不足、模型不可用。针对每种情况进行处理:

from openai import (
    APIError,
    RateLimitError,
    APIConnectionError
)

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    conv_id = req.conversation_id or str(uuid.uuid4())
    messages = build_messages(conv_id, req.message)

    def generate():
        try:
            full_response = []
            stream = client.chat.completions.create(
                model=req.model,
                messages=messages,
                stream=True
            )
            for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    full_response.append(delta.content)
                    yield f"data: {delta.content}\n\n"

            conversations[conv_id].append(
                {"role": "assistant", "content": "".join(full_response)}
            )

        except RateLimitError as e:
            yield f"data: [ERROR] 速率限制。请稍后再试。\n\n"
        except APIConnectionError:
            yield f"data: [ERROR] 连接失败,正在重试...\n\n"
        except APIError as e:
            yield f"data: [ERROR] {e.message}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

def build_messages(conv_id: str, user_msg: str) -> list:
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    # 保留最近 10 轮对话以管理上下文长度
    history = conversations[conv_id][-20:]
    messages.extend(history)
    messages.append({"role": "user", "content": user_msg})
    conversations[conv_id].append({"role": "user", "content": user_msg})
    return messages

步骤 5:模型切换

允许用户在对话中切换模型。不同模型满足不同需求:

AVAILABLE_MODELS = {
    "fast": "gpt-4.1-mini",
    "smart": "claude-sonnet-4-6",
    "reasoning": "o3",
    "budget": "deepseek-chat",
    "creative": "claude-sonnet-4-6",
}

@app.get("/models")
async def list_models():
    return {"models": AVAILABLE_MODELS}

前端可以将这些作为选项展示。由于所有模型都通过聚合器使用相同的 OpenAI 兼容格式,切换只需更改 model 参数即可。

步骤 6:上下文窗口管理

长对话会超过模型上下文限制。实现滑动窗口机制:

def trim_history(messages: list, max_tokens: int = 8000) -> list:
    """保留系统提示 + 最近消息,控制在令牌预算内。"""
    # 粗略估计:1 个令牌 ≈ 4 个字符
    system = messages[0]  # 始终保留系统提示
    history = messages[1:]

    total_chars = len(system["content"])
    trimmed = []

    for msg in reversed(history):
        msg_chars = len(msg["content"])
        if total_chars + msg_chars > max_tokens * 4:
            break
        trimmed.insert(0, msg)
        total_chars += msg_chars

    return [system] + trimmed

完整应用

# 运行命令:uvicorn main:app --reload --port 8000
# 测试命令:curl -N -X POST http://localhost:8000/chat/stream \
#   -H "Content-Type: application/json" \
#   -d '{"message": "Hello!", "model": "gpt-4.1-mini"}'

完整代码不到 100 行。从这里你可以添加:

  • 身份认证(API 密钥或 JWT)
  • 持久化存储(PostgreSQL 或 Redis 存储对话)
  • 用户速率限制
  • 使用统计和计费
  • 支持双向流的 WebSocket
  • 前端(React、Vue 或使用 EventSource 的原生 JS)

成本估算

对于每天处理 1000 个对话(平均每个对话 5 轮)的聊天机器人:

模型 每日成本 每月成本
GPT-4.1-mini 约 $2.40 约 $72
GPT-4.1 约 $12.00 约 $360
Claude Sonnet 4.6 约 $18.00 约 $540
DeepSeek V3 约 $1.68 约 $50

大多数对话使用 GPT-4.1-mini,只有用户请求时才升级到 Claude Sonnet 4.6,这样大多数应用的成本可控制在每月 100 美元以内。


获取你的 API 密钥:lemondata.cc 通过一个接口提供 300+ 模型。赠送 $1 免费额度,开始构建吧。

分享: