跳到主要内容

协议面与流式输出

本章讲 NLWeb 对外的「面」:同一个引擎怎么同时是 REST 端点和 MCP server,输出怎么统一成一种消息流。这是「NLWeb 之于 MCP,正如 HTML 之于 HTTP」这句话的落地处。

3.1 一个引擎,两层协议外壳

核心引擎 NLWebHandler 不关心自己被谁调用——它只认 query_params 和一个能 write_streamhttp_handler。不同协议各写一层薄外壳,把外部请求翻译成这两样:

协议外壳入口干什么
REST /askwebserver/routes/api.pyHTTP 查询参数 → query_params
MCP /mcpwebserver/routes/mcp.py + mcp_wrapper.pyJSON-RPC → query_params
A2Awebserver/routes/a2a.py(本次未深入)

都收敛到 NLWebHandler(query_params, http_handler).runQuery()

3.2 当 MCP server:把 ask 暴露成工具

MCP 外壳是标准 JSON-RPC 2.0 server(MCPHandlermcp_wrapper.py:29,协议版本 2024-11-05mcp_wrapper.py:27)。它按 method 分发(mcp_wrapper.py:62):

  • initialize → 返回 server 能力与信息(handle_initializemcp_wrapper.py:133)。
  • tools/list → 列出可用工具(handle_tools_listmcp_wrapper.py:150):ask(主问答)、list_sites、以及配置开启时的 who。每个带标准 inputSchema
  • tools/call → 真正执行(handle_tools_callmcp_wrapper.py:291)。

ask 工具的 tools/callmcp_wrapper.py:301):它把 MCP 的 arguments 翻译成 query_params(注意都包成 list,模仿 URL 参数形态,mcp_wrapper.py:311),用一个 ChunkCapture 收集引擎的流式输出,跑 runQuery(),最后把所有 chunk 拼成 MCP 标准的 {content: [{type:"text", text:...}], isError: false} 返回。

# 示意,非源码:MCP ask 工具的本质
query_params["query"] = [arguments["query"]] # MCP 参数 → 引擎参数
query_params["site"] = arguments.get("site", [])
capture = ChunkCapture() # 收集引擎吐出的消息
handler = NLWebHandler(query_params, capture) # 同一个引擎
result = await asyncio.wait_for(handler.runQuery(), timeout=30.0)
return {"content": [{"type": "text", "text": "".join(capture.chunks)}],
"isError": False}

30 秒超时是 MCP 这侧硬编码的护栏(mcp_wrapper.py:342);超时返回 isError: True 并提示用户换更具体的站/更简单的问题(mcp_wrapper.py:344-354)。

流式变体handle_streaming_tools_callmcp_wrapper.py:213):当 MCP 请求要 streaming 时,改用 SSE,把每个 chunk 包成 function_stream_event 事件推出,最后发 function_stream_end

横向对比:本 ai-protocol-reference 货架还有一个 MCP-spec 子库(其文档如 docs/mcp-spec/03-server-primitives.md 讲 tools 这类 server primitive、docs/mcp-spec/01-jsonrpc-and-messages.md 讲 JSON-RPC 消息;这些路径属于该子库,不是 nlweb 仓库内文件),描述的是「协议长什么样」;NLWeb 的 mcp_wrapper.py 则是「一个真实服务怎么把自己实现成符合该协议的 server」。两边对照读,能把抽象规范和具体落地接起来。

3.3 统一的 Message 协议

引擎内部一切对外输出都是 Messagecore/schemas.py:96)。它的设计原则写在类注释里:把「谁发的」(sender_type) 和「发了什么」(message_type) 分开

  • sender_type:user / assistant / system(schemas.py:15)。
  • message_type:query / result / nlws(生成的答案) / intermediate_message / error / tool_selection… (schemas.py:30)。

一组便捷构造函数生成常见消息:create_assistant_result(搜索结果,schemas.py:286)、create_assistant_answer(生成式答案,schemas.py:316)、create_status_messagecreate_error_message 等。ranking 发结果时就调 create_assistant_resultranking.py:396)。

3.4 流式发送与协议版本

所有消息经 MessageSender.send_messagecore/utils/message_senders.py:297)发出。它做三件事:补元数据 → 存档(无论流不流式都存,供返回完整消息列表)→ 若流式则推给 http_handler

这里有个协议版本分叉message_senders.py:316):

# 示意,非源码:v0.55 与旧版的流式差异
if is_v055() and message.type == "result" and isinstance(content, list):
for item in content: # v0.55: 每条结果一个具名 SSE 事件
await http.write_sse_event("result", {"index": i, "item": item})
else:
await http.write_stream(message) # 旧版: 整条消息一个 data 事件

v0.55 协议把「一批结果」拆成逐条具名 event: result 事件(带递增 index),更利于客户端增量渲染;旧版则整条消息塞进一个无名 data 事件。第一条 result 发出时还会插一个「首结果耗时」头(message_senders.py:308send_time_to_first_result),用于性能观测。

3.5 把主线和协议接起来

回看 01 章的 runQuerysend_begin_response / send_end_responsebaseHandler.py:325baseHandler.py:340)就是 MessageSender 在请求两端各发一个边界消息;中间 ranking 的 sendAnswers、post-ranking 的地图/摘要消息,全都经同一个 send_message 通道流出。协议层不感知业务,业务层不感知协议——中间隔着 Message 这层契约。

巧妙之处

  • 引擎与协议解耦NLWebHandler 只依赖一个有 write_stream 的对象,于是 REST、MCP、SSE、甚至「收集到内存」(ChunkCapture)都能复用同一引擎(mcp_wrapper.py:322)。
  • sender 与 type 正交:避免老式「message_type 既表谁发又表内容」的纠缠(schemas.py:441 create_legacy_message 标注的正是要摆脱的旧形态)。
  • 同一个 ask,人和 agent 共用:这正是 NLWeb 的核心主张——结构化 + 自然语言端点,既给人类聊天 UI,也给 MCP agent,一套实现两类消费者。

边界与局限

  • MCP 那层多处把 initialize 检查注释掉了(mcp_wrapper.py:77-86),为兼容不规范客户端而放宽,严格性打了折。
  • 30 秒超时只在 MCP 外壳,REST 路径靠别处控制;长查询在 MCP 下会被硬切。
  • A2A、OAuth、会话持久化等协议面本次未深入,仅在路由层看到入口(webserver/routes/)。

代码地图

主题文件符号
MCP 路由webserver/routes/mcp.pysetup_mcp_routes / mcp_handler
MCP 协议处理webserver/mcp_wrapper.pyMCPHandler.handle_request
列工具webserver/mcp_wrapper.pyhandle_tools_list
执行 ask 工具webserver/mcp_wrapper.pyhandle_tools_call
MCP 流式webserver/mcp_wrapper.pyhandle_streaming_tools_call
消息结构core/schemas.pyMessage / SenderType / MessageType
结果消息构造core/schemas.pycreate_assistant_result / create_assistant_answer
流式发送 / 版本分叉core/utils/message_senders.pyMessageSender.send_message