Settings

Language

Build an AI Chatbot with One API Key: From Zero to Production in 30 Minutes

L
LemonData
·February 26, 2026·17 views
#chatbot#tutorial#python#fastapi#streaming
Build an AI Chatbot with One API Key: From Zero to Production in 30 Minutes

Build an AI Chatbot with One API Key: From Zero to Production in 30 Minutes

This tutorial builds a production-ready AI chatbot backend with streaming responses, conversation history, model switching, and proper error handling. We'll use Python, FastAPI, and the OpenAI SDK pointed at an API aggregator so you can use any model.

Prerequisites

pip install fastapi uvicorn openai

Step 1: Basic Chat Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from pydantic import BaseModel

app = FastAPI()

client = OpenAI(
    api_key="sk-lemon-xxx",
    base_url="https://api.lemondata.cc/v1"
)

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4.1-mini"
    conversation_id: str | None = None

@app.post("/chat")
async def chat(req: ChatRequest):
    response = client.chat.completions.create(
        model=req.model,
        messages=[{"role": "user", "content": req.message}]
    )
    return {"reply": response.choices[0].message.content}

This works but has no streaming, no history, and no error handling. Let's fix that.

Step 2: Add Streaming

Streaming sends tokens as they're generated instead of waiting for the full response. Users see the reply forming in real-time.

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    def generate():
        stream = client.chat.completions.create(
            model=req.model,
            messages=[{"role": "user", "content": req.message}],
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {delta.content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Step 3: Conversation History

Store conversation history in memory (swap for Redis or a database in production).

from collections import defaultdict
import uuid

conversations: dict[str, list] = defaultdict(list)

SYSTEM_PROMPT = "You are a helpful assistant. Be concise and direct."

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    conv_id = req.conversation_id or str(uuid.uuid4())

    # Build message history
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    messages.extend(conversations[conv_id])
    messages.append({"role": "user", "content": req.message})

    # Store user message
    conversations[conv_id].append(
        {"role": "user", "content": req.message}
    )

    def generate():
        full_response = []
        stream = client.chat.completions.create(
            model=req.model,
            messages=messages,
            stream=True
        )
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                full_response.append(delta.content)
                yield f"data: {delta.content}\n\n"

        # Store assistant response
        conversations[conv_id].append(
            {"role": "assistant", "content": "".join(full_response)}
        )
        yield f"data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Conversation-ID": conv_id}
    )

Step 4: Error Handling

AI API calls can fail for several reasons: rate limits, insufficient balance, model unavailable. Handle each case:

from openai import (
    APIError,
    RateLimitError,
    APIConnectionError
)

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    conv_id = req.conversation_id or str(uuid.uuid4())
    messages = build_messages(conv_id, req.message)

    def generate():
        try:
            full_response = []
            stream = client.chat.completions.create(
                model=req.model,
                messages=messages,
                stream=True
            )
            for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    full_response.append(delta.content)
                    yield f"data: {delta.content}\n\n"

            conversations[conv_id].append(
                {"role": "assistant", "content": "".join(full_response)}
            )

        except RateLimitError as e:
            yield f"data: [ERROR] Rate limited. Please wait a moment.\n\n"
        except APIConnectionError:
            yield f"data: [ERROR] Connection failed. Retrying...\n\n"
        except APIError as e:
            yield f"data: [ERROR] {e.message}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

def build_messages(conv_id: str, user_msg: str) -> list:
    messages = [{"role": "system", "content": SYSTEM_PROMPT}]
    # Keep last 10 turns to manage context length
    history = conversations[conv_id][-20:]
    messages.extend(history)
    messages.append({"role": "user", "content": user_msg})
    conversations[conv_id].append({"role": "user", "content": user_msg})
    return messages

Step 5: Model Switching

Let users switch models mid-conversation. Different models for different needs:

AVAILABLE_MODELS = {
    "fast": "gpt-4.1-mini",
    "smart": "claude-sonnet-4-6",
    "reasoning": "o3",
    "budget": "deepseek-chat",
    "creative": "claude-sonnet-4-6",
}

@app.get("/models")
async def list_models():
    return {"models": AVAILABLE_MODELS}

The frontend can present these as options. Since all models use the same OpenAI-compatible format through the aggregator, switching is just changing the model parameter.

Step 6: Context Window Management

Long conversations exceed model context limits. Implement a sliding window:

def trim_history(messages: list, max_tokens: int = 8000) -> list:
    """Keep system prompt + recent messages within token budget."""
    # Rough estimate: 1 token ≈ 4 characters
    system = messages[0]  # Always keep system prompt
    history = messages[1:]

    total_chars = len(system["content"])
    trimmed = []

    for msg in reversed(history):
        msg_chars = len(msg["content"])
        if total_chars + msg_chars > max_tokens * 4:
            break
        trimmed.insert(0, msg)
        total_chars += msg_chars

    return [system] + trimmed

Complete Application

# Run with: uvicorn main:app --reload --port 8000
# Test: curl -N -X POST http://localhost:8000/chat/stream \
#   -H "Content-Type: application/json" \
#   -d '{"message": "Hello!", "model": "gpt-4.1-mini"}'

The full code is under 100 lines. From here you can add:

  • Authentication (API keys or JWT)
  • Persistent storage (PostgreSQL or Redis for conversations)
  • Rate limiting per user
  • Usage tracking and billing
  • WebSocket support for bidirectional streaming
  • Frontend (React, Vue, or vanilla JS with EventSource)

Cost Estimate

For a chatbot handling 1,000 conversations/day (average 5 turns each):

Model Daily Cost Monthly Cost
GPT-4.1-mini ~$2.40 ~$72
GPT-4.1 ~$12.00 ~$360
Claude Sonnet 4.6 ~$18.00 ~$540
DeepSeek V3 ~$1.68 ~$50

Using GPT-4.1-mini for most conversations and upgrading to Claude Sonnet 4.6 only when users request it keeps costs under $100/month for most applications.


Get your API key: lemondata.cc provides 300+ models through one endpoint. $1 free credit to start building.

Share: