协议面与流式输出
本章讲 NLWeb 对外的「面」:同一个引擎怎么同时是 REST 端点和 MCP server,输出怎么统一成一种消息流。这是「NLWeb 之于 MCP,正如 HTML 之于 HTTP」这句话的落地处。
3.1 一个引擎,两层协议外壳
核心引擎 NLWebHandler 不关心自己被谁调用——它只认 query_params 和一个能 write_stream 的 http_handler。不同协议各写一层薄外壳,把外部请求翻译成这两样:
| 协议外壳 | 入口 | 干什么 |
|---|---|---|
REST /ask | webserver/routes/api.py | HTTP 查询参数 → query_params |
MCP /mcp | webserver/routes/mcp.py + mcp_wrapper.py | JSON-RPC → query_params |
| A2A | webserver/routes/a2a.py | (本次未深入) |
都收敛到 NLWebHandler(query_params, http_handler).runQuery()。
3.2 当 MCP server:把 ask 暴露成工具
MCP 外壳是标准 JSON-RPC 2.0 server(MCPHandler,mcp_wrapper.py:29,协议版本 2024-11-05,mcp_wrapper.py:27)。它按 method 分发(mcp_wrapper.py:62):
initialize→ 返回 server 能力与信息(handle_initialize,mcp_wrapper.py:133)。tools/list→ 列出可用工具(handle_tools_list,mcp_wrapper.py:150):ask(主问答)、list_sites、以及配置开启时的who。每个带标准inputSchema。tools/call→ 真正执行(handle_tools_call,mcp_wrapper.py:291)。
看 ask 工具的 tools/call(mcp_wrapper.py:301):它把 MCP 的 arguments 翻译成 query_params(注意都包成 list,模仿 URL 参数形态,mcp_wrapper.py:311),用一个 ChunkCapture 收集引擎的流式输出,跑 runQuery(),最后把所有 chunk 拼成 MCP 标准的 {content: [{type:"text", text:...}], isError: false} 返回。
# 示意,非源码:MCP ask 工具的本质
query_params["query"] = [arguments["query"]] # MCP 参数 → 引擎参数
query_params["site"] = arguments.get("site", [])
capture = ChunkCapture() # 收集引擎吐出的消息
handler = NLWebHandler(query_params, capture) # 同一个引擎
result = await asyncio.wait_for(handler.runQuery(), timeout=30.0)
return {"content": [{"type": "text", "text": "".join(capture.chunks)}],
"isError": False}
30 秒超时是 MCP 这侧硬编码的护栏(mcp_wrapper.py:342);超时返回 isError: True 并提示用户换更具体的站/更简单的问题(mcp_wrapper.py:344-354)。
流式变体handle_streaming_tools_call(mcp_wrapper.py:213):当 MCP 请求要 streaming 时,改用 SSE,把每个 chunk 包成 function_stream_event 事件推出,最后发 function_stream_end。
横向对比:本 ai-protocol-reference 货架还有一个 MCP-spec 子库(其文档如
docs/mcp-spec/03-server-primitives.md讲 tools 这类 server primitive、docs/mcp-spec/01-jsonrpc-and-messages.md讲 JSON-RPC 消息;这些路径属于该子库,不是 nlweb 仓库内文件),描述的是「协议长什么样」;NLWeb 的mcp_wrapper.py则是「一个真实服务怎么把自己实现成符合该协议的 server」。两边对照读,能把抽象规范和具体落地接起来。
3.3 统一的 Message 协议
引擎内部一切对外输出都是 Message(core/schemas.py:96)。它的设计原则写在类注释里:把「谁发的」(sender_type) 和「发了什么」(message_type) 分开。
sender_type:user / assistant / system(schemas.py:15)。message_type:query / result / nlws(生成的答案) / intermediate_message / error / tool_selection… (schemas.py:30)。
一组便捷构造函数生成常见消息:create_assistant_result(搜索结果,schemas.py:286)、create_assistant_answer(生成式答案,schemas.py:316)、create_status_message、create_error_message 等。ranking 发结果时就调 create_assistant_result(ranking.py:396)。
3.4 流式发送与协议版本
所有消息经 MessageSender.send_message(core/utils/message_senders.py:297)发出。它做三件事:补元数据 → 存档(无论流不流式都存,供返回完整消息列表)→ 若流式则推给 http_handler。
这里有个协议版本分叉(message_senders.py:316):
# 示意,非源码:v0.55 与旧版的流式差异
if is_v055() and message.type == "result" and isinstance(content, list):
for item in content: # v0.55: 每条结果一个具名 SSE 事件
await http.write_sse_event("result", {"index": i, "item": item})
else:
await http.write_stream(message) # 旧版: 整条消息一个 data 事件
v0.55 协议把 「一批结果」拆成逐条具名 event: result 事件(带递增 index),更利于客户端增量渲染;旧版则整条消息塞进一个无名 data 事件。第一条 result 发出时还会插一个「首结果耗时」头(message_senders.py:308,send_time_to_first_result),用于性能观测。
3.5 把主线和协议接起来
回看 01 章的 runQuery:send_begin_response / send_end_response(baseHandler.py:325、baseHandler.py:340)就是 MessageSender 在请求两端各发一个边界消息;中间 ranking 的 sendAnswers、post-ranking 的地图/摘要消息,全都经同一个 send_message 通道流出。协议层不感知业务,业务层不感知协议——中间隔着 Message 这层契约。
巧妙之处
- 引擎与协议解耦:
NLWebHandler只依赖一个有write_stream的对象,于是 REST、MCP、SSE、甚至「收集到内存」(ChunkCapture)都能复用同一引擎(mcp_wrapper.py:322)。 - sender 与 type 正交:避免老式「message_type 既表谁发又表内容」的纠缠(
schemas.py:441create_legacy_message标注的正是要摆脱的旧形态)。 - 同一个 ask,人和 agent 共用:这正是 NLWeb 的核心主张——结构化 + 自然语言端点,既给人类聊天 UI,也给 MCP agent,一套实现两类消费者。
边界与局限
- MCP 那层多处把
initialize检查注释掉了(mcp_wrapper.py:77-86),为兼容不规范客户端而放宽,严格性打了折。 - 30 秒超时只在 MCP 外壳,REST 路径靠别处控制;长查询在 MCP 下会被硬切。
- A2A、OAuth、会话持久化等协议面本次未深入,仅在路由层看到入口(
webserver/routes/)。
代码地图
| 主题 | 文件 | 符号 |
|---|---|---|
| MCP 路由 | webserver/routes/mcp.py | setup_mcp_routes / mcp_handler |
| MCP 协议处理 | webserver/mcp_wrapper.py | MCPHandler.handle_request |
| 列工具 | webserver/mcp_wrapper.py | handle_tools_list |
| 执行 ask 工具 | webserver/mcp_wrapper.py | handle_tools_call |
| MCP 流式 | webserver/mcp_wrapper.py | handle_streaming_tools_call |
| 消息结构 | core/schemas.py | Message / SenderType / MessageType |
| 结果消息构造 | core/schemas.py | create_assistant_result / create_assistant_answer |
| 流式发送 / 版本分叉 | core/utils/message_senders.py | MessageSender.send_message |