前途科技
  • 科技
  • AI
    • AI 前沿技术
    • Agent生态
    • AI应用场景
    • AI 行业应用
  • 初创
  • 报告
  • 学习中心
    • 编程与工具
    • 数据科学与工程
我的兴趣
前途科技前途科技
Font ResizerAa
站内搜索
Have an existing account? Sign In
Follow US
Copyright © 2024 AccessPath.com, 前途国际科技咨询(北京)有限公司,版权所有。 | 京ICP备17045010号-1 | 京公网安备 11010502033860号
大模型与工程化

告别JS疲劳:用HTMX和FastAPI轻松打造流式智能聊天机器人(第二部分)

NEXTECH
Last updated: 2025年11月18日 上午7:26
By NEXTECH
Share
113 Min Read
SHARE

在第一部分中,探讨了如何利用HTMX为HTML元素增加交互性,实现“无JavaScript的JavaScript”。为了具体演示,当时已着手构建一个能返回模拟大型语言模型(LLM)响应的简单聊天应用。在本文中,将进一步扩展聊天机器人的功能,并增加多项特性,其中流式传输功能将大幅提升用户体验,相较于之前构建的同步聊天应用,这是一项显著的进步。

Contents
从同步通信到异步通信的演进SSE 与 WebSocket:实时通信协议之选实现目标与工作流程后端实现前端设计与实现集成Google Agent Development Kit,打造真正的聊天机器人总结
  • 实时流媒体功能 基于SSE的实时流式传输
  • 多用户会话架构 支持多用户的会话式架构
  • 异步协调 使用asyncio.Queue进行异步协调
  • 清晰的HTMX模式 采用专用SSE处理的清晰HTMX模式
  • Google搜索智能体 集成Google搜索智能体,以获取最新数据并回答查询
  • 几乎无需JavaScript 几乎零JavaScript的代码实现

以下是本文将要构建的成果预览:

流式聊天机器人演示动画

从同步通信到异步通信的演进

此前构建的应用利用了非常基础的Web表单功能,其通信方式是同步的。这意味着在服务器完成全部处理并返回响应之前,客户端无法获取任何信息。当发送一个请求后,只能等待完整的响应,然后才能将其显示出来。在这段等待过程中,用户体验可能会受到影响。

然而,现代聊天机器人采取了不同的工作模式,它们提供了异步通信能力。这通常通过流式传输实现:客户端可以持续接收更新和部分响应,而无需等待完整的最终响应。当响应生成过程耗时较长时,这种方式尤其有用,例如大型语言模型(LLM)生成长答案时,流式传输能显著提升用户体验。

SSE 与 WebSocket:实时通信协议之选

SSE(Server-Sent Events,服务器发送事件)和WebSocket都是客户端与服务器之间实现实时数据交换的协议。

You Might Also Like

安全负责任地部署AI:打破四大迷思,工程化信任之道
AI搜索如何精准筛选文档:RAG管线中的关键检索优化策略
告别设计烦恼:用Google Stitch和AI大模型,数分钟打造生产级UI原型!
金融业的智能体AI:印尼机遇与挑战深度解析

WebSocket支持全双工连接,这意味着浏览器和服务器可以同时发送和接收数据。这通常应用于在线游戏、即时聊天应用以及协作工具(如Google表格)等场景。

相比之下,SSE是单向的,只允许从服务器到客户端的单向通信。这意味着客户端不能通过SSE协议向服务器发送数据。如果说WebSocket像一场人们可以同时说话和倾听的双向电话交谈,那么SSE则更像是收听广播。SSE通常用于发送通知、更新金融应用中的图表或实时新闻推送。

那为什么选择SSE呢?因为在本文所构建的场景中,并不需要全双工通信。简单的HTTP协议(WebSocket的工作方式与HTTP不同)足以满足需求:客户端发送数据,服务器接收并返回数据。SSE的引入仅仅是为了实现数据以流的形式传输,其他更复杂的功能并非必需。

实现目标与工作流程

  1. 用户输入查询。
  2. 服务器接收查询并将其发送至大型语言模型(LLM)。
  3. LLM开始生成内容。
  4. 服务器即时返回LLM生成的每一小部分内容。
  5. 浏览器将这些内容片段逐步添加到DOM中。

接下来的工作将分为后端和前端两大部分进行。

后端实现

后端处理将分为两个主要步骤:

  • 一个POST接口,负责接收消息,但不返回任何内容。
  • 一个GET接口,负责读取消息队列并生成输出流。

在本次演示中,首先会通过重复用户输入来模拟大型语言模型(LLM)的响应,这意味着流式传输中的词语将与用户的输入完全相同。

为了确保通信的清晰与隔离,需要按用户会话分离消息流(即消息队列),否则不同用户之间的对话可能会混淆。因此,将创建一个会话字典来存储这些队列。

接下来,还需要指示后端在消息队列填充完毕后再开始流式传输响应。如果不在队列填充前等待,可能会遇到并发执行或时序问题:例如,客户端的SSE连接启动时队列是空的,SSE随即关闭;用户随后输入消息,但为时已晚,消息无法被处理。

解决方案是使用异步队列!异步队列具有多项优势:

  • 如果队列中有数据,则立即返回。
  • 如果队列为空,则暂停执行,直到调用queue.put()。
  • 支持多个消费者,每个消费者都能获取到自己的数据。
  • 线程安全,避免竞态条件。

下面是具体的代码实现:

from fastapi import FastAPI, Request, Form
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, StreamingResponse
import asyncio
import time
import uuid

app = FastAPI()
templates = Jinja2Templates("templates")

# This object will store session id and their corresponding value, an async queue.
sessions = dict()

@app.get("/")
async def root(request: Request):
    session_id = str(uuid.uuid4())
    sessions[session_id] = asyncio.Queue()
    return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})

@app.post("/chat")
async def chat(request: Request, query: str=Form(...), session_id: str=Form(...)):
    """ Send message to session-based queue """

    # Create the session if it does not exist
    if session_id not in sessions:
        sessions[session_id] = asyncio.Queue()

    # Put the message in the queue
    await sessions[session_id].put(query)

    return {"status": "queued", "session_id": session_id}

@app.get("/stream/{session_id}")
async def stream(session_id: str):

    async def response_stream():

        if session_id not in sessions:
            print(f"Session {session_id} not found!")
            return

        queue = sessions[session_id]

        # This BLOCKS until data arrives
        print(f"Waiting for message in session {session_id}")
        data = await queue.get()
        print(f"Got message: {data}")

        message = ""
        await asyncio.sleep(1)
        for token in data.replace("
", " ").split(" "):
            message += token + " "
            data = f"""data: <li class='mb-6 ml-[20%]'> <div class='font-bold text-right'>AI</div><div>{message}</div></li>

"""
            yield data
            await asyncio.sleep(0.03)

        queue.task_done()

    return StreamingResponse(response_stream(), media_type="text/event-stream")

下面解释几个关键概念。

会话隔离机制

确保每个用户拥有独立的消息队列至关重要,以避免不同对话之间产生混淆。实现这一目标的方法是使用会话字典。在实际的生产应用中,通常会使用Redis等工具来存储这些会话信息。在以下代码示例中,可以看到页面加载时会创建一个新的会话ID,并将其存储在sessions字典中。刷新页面将启动一个新的会话;虽然当前的消息队列没有持久化,但可以通过数据库等方式实现。该主题将在第三部分详细探讨。

# This object will store session id and their corresponding value, an async queue.
sessions = dict()

@app.get("/")
async def root(request: Request):
    session_id = str(uuid.uuid4())
    sessions[session_id] = asyncio.Queue()
    return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})

阻塞式协调机制

在SSE消息发送和用户查询接收之间,需要精确控制它们的顺序。在后端,正确的顺序应为:

  1. 接收用户消息。
  2. 创建消息队列并填充消息。
  3. 通过流式响应发送队列中的消息。

如果未能遵循这一顺序,可能会导致不希望发生的行为,例如,先读取(空的)消息队列,然后才用用户查询填充它。

控制执行顺序的解决方案是使用asyncio.Queue。此对象将用于两个关键场景:

  • 当向队列中插入新消息时。插入消息将“唤醒”SSE接口中的轮询操作。
await sessions[session_id].put(query)
  • 当从队列中拉取消息时。在这行代码中,程序将阻塞,直到队列发出“有新数据可用!”的信号:
data = await queue.get()

这种模式具有多项优势:

  • 每个用户拥有独立的队列。
  • 消除了竞态条件的风险。

流式传输模拟

在本文中,通过将用户查询拆分成单词并逐一返回这些单词,来模拟大型语言模型(LLM)的响应。在第三部分中,将实际接入一个真实的LLM。

流式传输通过FastAPI的StreamingResponse对象处理。此对象期望一个异步生成器,该生成器将持续yield数据直到生成器结束。必须使用yield关键字而非return关键字,否则生成器将在第一次迭代后停止。

接下来,将分解流式传输函数:

首先,需要确保当前会话有一个消息队列,以便从中拉取消息:

if session_id not in sessions:
    print(f"Session {session_id} not found!")
    return

queue = sessions[session_id]

接着,一旦获取了队列,如果队列中包含消息,将从队列中拉取消息;否则,代码将暂停并等待消息的到来。这是函数中最核心的部分:

# This BLOCKS until data arrives
print(f"Waiting for message in session {session_id}")
data = await queue.get()
print(f"Got message: {data}")

为了模拟流式传输,现在将消息分块为单词(此处称为tokens),并添加一些时间延迟(通过asyncio.sleep实现)来模拟大型语言模型(LLM)的文本生成过程。请注意,yield的数据实际上是HTML字符串,并被封装在一个以“data:”开头的字符串中。这是SSE消息的发送方式。也可以选择使用“event:”元数据来标记消息。例如:

event: my_custom_event
data: <div>Content to swap into your HTML page.</div>

下面展示如何在Python中实现它(对于追求完美的开发者,建议使用Jinja模板而非直接构建字符串来渲染HTML):

message = ""

# First pause to let the browser display "Thinking when the message is sent"
await asyncio.sleep(1)

# Simulate streaming by splitting message in words
for token in data.replace("
", " ").split(" "):

    # We append tokens to the message
    message += token + " "

    # We wrap the message in HTML tags with the "data" metadata
    data = f"""data: <li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div>{message}</div></li>

"""
    yield data

    # Pause to simulate the LLM generation process
    await asyncio.sleep(0.03)

queue.task_done()

前端设计与实现

前端主要承担两项任务:向后端发送用户查询,以及在特定通道(即session_id)上监听SSE消息。为此,采用了“职责分离”的设计理念,即每个HTMX元素仅负责一项单一任务:

  • 表单负责发送用户输入。
  • SSE监听器负责处理流式传输。
  • 聊天列表(ul)负责显示消息。

发送消息时,将使用表单中的标准textarea输入框。其HTMX的精妙之处在于:

<form 
    id="userInput" 
    class="flex max-h-16 gap-4"
    hx-post="/chat" 
    hx-swap="none"
    hx-trigger="click from:#submitButton" 
    hx-on::before-request="
        htmx.find('#chat').innerHTML += `<li class='mb-6 justify-start max-w-[80%]'><div class='font-bold'>Me</div><div>${htmx.find('#query').value}</div></li>`;
        htmx.find('#chat').innerHTML += `<li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div class='text-right'>Thinking...</div></li>`;
        htmx.find('#query').value = '';
    "
>
    <textarea 
        id="query" 
        name="query"
        class="flex w-full rounded-md border border-input bg-transparent px-3 py-2 text-sm shadow-sm placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-ring disabled:cursor-not-allowed disabled:opacity-50 min-h-[44px] max-h-[200px]"
        placeholder="Write a message..." 
        rows="4"></textarea>
    <button 
        type="submit" 
        id="submitButton"
        class="inline-flex max-h-16 items-center justify-center rounded-md bg-neutral-950 px-6 font-medium text-neutral-50 transition active:scale-110"
    >Sends</button>
</form>

回顾第一部分的文章,有几个值得解释的HTMX属性:

  • hx-post:表单数据将提交到的后端接口。
  • hx-swap:设置为none,因为在此场景中,后端接口不返回任何数据。
  • hx-trigger:指定哪个事件将触发请求。
  • hx-on::before-request:这部分使用了少量JavaScript,旨在提升应用响应速度。它会将用户的请求添加到聊天列表中,并在等待SSE消息流式传输时向用户显示“正在思考…”的信息。这比让用户盯着空白页面等待要友好得多。

值得注意的是,实际上向后端发送了两个参数:用户的输入和会话ID。这样,消息将被插入到后端正确的队列中。

接着,定义了另一个专门用于监听SSE消息的组件。

<!-- Messages will be added to this list-->
<div class="mb-auto max-h-[80%] overflow-auto">
    <ul id="chat" class="rounded-2xl p-4 mb-16 justify-start">
    </ul>
</div>

<!-- SSE listened (message buffer)-->
<div 
    hx-ext="sse" 
    sse-connect="/stream/{{ session_id }}" 
    sse-swap="message" 
    hx-swap="outerHTML scroll:bottom"
    hx-target="#chat>li:last-child" 
    style="display: none;"
></div>

该组件将监听/stream接口,并传递其会话ID,以仅监听此会话的消息。hx-target属性指示浏览器将数据添加到聊天列表的最后一个li元素中。hx-swap属性则指定数据实际上是为了替换整个当前的li元素。这就是流式传输效果的工作原理:用最新的消息片段替换当前显示的消息。

注意:也可以使用其他方法来替换DOM中的特定元素,例如带外(OOB)交换。它们的工作方式略有不同,因为需要一个特定的ID来在DOM中查找。在此案例中,特意没有为每个写入的列表元素分配ID。

集成Google Agent Development Kit,打造真正的聊天机器人

现在,是时候用一个真正的大型语言模型(LLM)替换之前模拟的流式传输接口了。为此,将利用Google ADK(Agent Development Kit)构建一个智能体,使其具备工具调用能力和记忆功能,从而能够获取实时信息并记住对话细节。

智能体(Agents)简介

读者可能已经了解大型语言模型(LLM)的概念。当前LLM的主要局限在于,它们本身无法获取实时信息:其知识储备仅限于训练时的数据。另一个局限是它们无法访问训练范围之外的信息(例如,公司内部数据)。

智能体(Agents)是一种能够进行推理、行动和观察的人工智能应用。推理部分由LLM(即“大脑”)负责,而智能体的“手”则被称为“工具”,可以有多种形式:

  • Python函数,例如用于调用API。
  • MCP服务器,这是一个允许智能体通过标准化接口连接到API的标准(例如,无需自行编写API连接器即可访问所有Gsuite工具)。
  • 其他智能体(在这种情况下,这种模式被称为智能体委派,由一个路由或主智能体控制不同的子智能体)。

在本次演示中,为了简化实现,将使用一个非常简单的智能体,它只具备一个工具:Google搜索。这将使其能够获取最新信息,并确保信息的可靠性(至少希望Google搜索结果是可靠的)。

在Google ADK的生态中,智能体需要以下基本信息:

  • 名称和描述,主要用于文档说明。
  • 指令:定义智能体行为的提示词(工具使用、输出格式、遵循步骤等)。
  • 工具:智能体为实现其目标可以使用的函数/MCP服务器/其他智能体。

此外,还有围绕记忆和会话管理的其他概念,但它们超出了本文的讨论范围。

接下来,定义智能体!

流式Google搜索智能体

from google.adk.agents import Agent
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.adk.tools import google_search

# Define constants for the agent
APP_NAME = "default"  # Application
USER_ID = "default"  # User
SESSION = "default"  # Session
MODEL_NAME = "gemini-2.5-flash-lite"

# Step 1: Create the LLM Agent
root_agent = Agent(
    model=MODEL_NAME,
    name="text_chat_bot",
    description="A text chatbot",
    instruction="You are a helpful assistant. Your goal is to answer questions based on your knowledge. Use your Google Search tool to provide the latest and most accurate information",
    tools=[google_search]
)

# Step 2: Set up Session Management
# InMemorySessionService stores conversations in RAM (temporary)
session_service = InMemorySessionService()

# Step 3: Create the Runner
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)

Runner对象充当用户与智能体之间的协调器。

接下来,重新定义/stream接口。首先检查智能体的会话是否存在,如果不存在则创建它:

# Attempt to create a new session or retrieve an existing one
        try:
            session = await session_service.create_session(
                app_name=APP_NAME, user_id=USER_ID, session_id=session_id
            )
        except:
            session = await session_service.get_session(
                app_name=APP_NAME, user_id=USER_ID, session_id=session_id
            )

然后,获取用户查询,并以异步方式将其传递给智能体,以获取流式响应:

# Convert the query string to the ADK Content format
        query = types.Content(role="user", parts=[types.Part(text=query)])

        # Stream the agent's response asynchronously
        async for event in runner.run_async(
            user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
        ):

接下来有一个微妙之处。当智能体生成响应时,它可能会输出双换行符“

”。这会带来问题,因为SSE事件以这个符号作为结束标记。因此,字符串中出现双换行符意味着:

  • 当前消息将被截断。
  • 下一条消息格式将不正确,并可能导致SSE流停止。

读者可以自行尝试。为了解决这个问题,将使用一个小技巧,同时结合另一个小技巧来格式化列表元素(因为使用了Tailwind CSS,它会覆盖某些CSS规则)。这个技巧是:

if event.partial:
                message += event.content.parts[0].text

                # Hack here
                html_content = markdown.markdown(message, extensions=['fenced_code']).replace("
", "<br/>").replace("<li>", "<li class='ml-4'>").replace("<ul>", "<ul class='list-disc'>")

                full_html = f"""data: <li class='mb-6 ml-[20%]'> <div class='font-bold text-right'>AI</div><div>{html_content}</div></li>

"""

                yield full_html

通过这种方式,可以确保双换行符不会中断SSE流。

以下是该路由的完整代码:

@app.get("/stream/{session_id}")
async def stream(session_id: str):

    async def response_stream():

        if session_id not in sessions:
            print(f"Session {session_id} not found!")
            return

        # Attempt to create a new session or retrieve an existing one
        try:
            session = await session_service.create_session(
                app_name=APP_NAME, user_id=USER_ID, session_id=session_id
            )
        except:
            session = await session_service.get_session(
                app_name=APP_NAME, user_id=USER_ID, session_id=session_id
            )

        queue = sessions[session_id]

        # This BLOCKS until data arrives
        print(f"Waiting for message in session {session_id}")
        query = await queue.get()
        print(f"Got message: {query}")

        message = ""

        # Convert the query string to the ADK Content format
        query = types.Content(role="user", parts=[types.Part(text=query)])

        # Stream the agent's response asynchronously
        async for event in runner.run_async(
            user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
        ):
            if event.partial:
                message += event.content.parts[0].text

                html_content = markdown.markdown(message, extensions=['fenced_code']).replace("
", "<br/>").replace("<li>", "<li class='ml-4'>").replace("<ul>", "<ul class='list-disc'>")

                full_html = f"""data: <li class='mb-6 ml-[20%]'> <div class='font-bold text-right'>AI</div><div>{html_content}</div></li>

"""

                yield full_html

        queue.task_done()

    return StreamingResponse(response_stream(), media_type="text/event-stream")

至此,读者将能够与自己的聊天机器人进行对话!

下面添加了一个小段CSS代码,用于格式化代码块。如果让聊天机器人生成代码片段,会希望它们能正确显示。这是CSS代码:

pre, code {
      background-color: black;
      color: lightgrey;
      padding: 1%;
      border-radius: 10px;
      white-space: pre-wrap;
      font-size: 0.8rem;
      letter-spacing: -1px;
    }

现在也可以生成代码片段了:

代码片段生成演示

效果令人惊叹!

工作流程回顾

通过不到200行代码,就能够构建一个具有以下工作流程的聊天应用,实现服务器流式响应并利用SSE和HTMX进行优雅显示。

User types "Hello World" → Submit
├── 1. Add "Me: Hello World" to chat
├── 2. Add "AI: Thinking..." to chat  
├── 3. POST /chat with message
├── 4. Server queues message
├── 5. SSE stream produces a LLM response based on the query
├── 6. Stream "AI: This" (replaces "Thinking...")
├── 7. Stream "AI: This is the answer ..."
└── 8. Complete

总结

在本系列文章中,展示了如何仅使用Python和HTML,在几乎不依赖JavaScript及重型JS框架的情况下,轻松开发一个聊天机器人应用。在神奇的HTMX库的帮助下,本文涵盖了服务器端渲染、服务器发送事件(SSE)、异步流式传输以及智能体等主题。

这些文章的主要目的是表明,Web应用程序开发对于非JavaScript开发者而言并非遥不可及。实际上,在Web开发中并非总是需要使用JavaScript,并且有充分的理由在某些情况下选择不使用它。尽管JavaScript是一种功能强大的语言,但有时它可能被过度使用,而忽略了更简单却同样健壮的方法。服务器端与客户端应用之间的争论由来已久且尚未平息,但希望这些文章能为一些读者带来启发,并最终有所收获。

TAGGED:FastAPIHTMX大模型工程无JavaScript流式聊天
Share This Article
Email Copy Link Print
Previous Article OpenAI终松口:员工股权捐赠慈善终获准,或达数百万美元
Next Article 20251118081010366.jpg Google DeepMind WeatherNext 2:AI天气预报迈向超精准时代
Leave a Comment

发表回复 取消回复

您的邮箱地址不会被公开。 必填项已用 * 标注

最新内容
星链设备在弗吉尼亚州乡村的户外场景
星链让我在任何地方都能“居家办公”——如今,我渴望改变
科技
谷歌眼镜产品图
谷歌眼镜的功与过:一场超前13年的科技预言与争议
科技
拥抱AI的“人性化”愿景:工具而非替代品
AI
图1:决策树分类器在Excel中的示例
机器学习决策树分类器全解析:从基尼系数到Excel实战
未分类

相关内容

SQL数据库设计图
大模型与工程化

图数据库RAG与SQL数据库RAG:大型语言模型性能深度比较

2025年11月2日
RAG工作原理示意图
大模型与工程化

RAG解决方案评估:从构建到生产就绪的全面指南

2025年9月22日
提示词优化技术概览图
大模型与工程化

大模型提示词优化:降低成本、减少延迟、提升性能的四大核心技巧

2025年10月30日
图片1:GPT-5智能体
大模型与工程化

构建GPT-5智能体:赋能AI应用的未来

2025年11月12日
Show More
前途科技

前途科技是一个致力于提供全球最新科技资讯的专业网站。我们以实时更新的方式,为用户呈现来自世界各地的科技新闻和深度分析,涵盖从技术创新到企业发展等多方面内容。专注于为用户提供高质量的科技创业新闻和行业动态。

分类

  • AI
  • 初创
  • 学习中心

快速链接

  • 阅读历史
  • 我的关注
  • 我的收藏

Copyright © 2025 AccessPath.com, 前途国际科技咨询(北京)有限公司,版权所有。 | 京ICP备17045010号-1 | 京公网安备 11010502033860号

前途科技
Username or Email Address
Password

Lost your password?

Not a member? Sign Up