跳到主要内容

04 · 传输层与跨语言一致性

这一章讲底层:事件怎么在网络上走、为什么有「chunk 简写事件」、以及同一份协议怎么在 TypeScript 和 Python 之间保持一致。

4.1 传输与语义解耦:一条事件流可以走任何管道

AG-UI 刻意把「事件是什么」和「事件怎么传」分开。README 说它「works with any event transport(SSE/WebSockets/webhooks)」。客户端的传输入口是 transformHttpEventStream(packages/client/src/transform/http.ts:13),它先缓冲、读到 HTTP headers 再按 content-type 决定解析器:

// packages/client/src/transform/http.ts:43-51(节选)
if (contentType === proto.AGUI_MEDIA_TYPE) {
parseProtoStream(bufferSubject).subscribe(/* protobuf 二进制 */);
} else {
parseSSEStream(bufferSubject, log).subscribe(/* SSE + JSON */);
}

两种格式:

格式media type解析器何时用
SSE + JSON(默认)非 proto 的任意(通常 text/event-stream)parseSSEStream + EventSchemas.parse绝大多数场景
protobuf(二进制)application/vnd.ag-ui.event+protoparseProtoStream高吞吐/省带宽

AGUI_MEDIA_TYPE 这个常量是二进制协议的「暗号」,TS 与 Python 各定义一份且字面值完全相同(packages/proto/src/index.ts:3sdks/python/ag_ui/encoder/encoder.py:7)。

4.2 SSE 解析:严格遵循标准,处理三件麻烦事

parseSSEStream(packages/client/src/transform/sse.ts)按 SSE 标准把字节流切成 JSON,处理了三个真实麻烦:

  • UTF-8 跨块:用 new TextDecoder("utf-8"){ stream: true } 模式,避免一个多字节字符被切在两个网络块之间时乱码。
  • 事件按 \n\n 分隔、可跨块:buffer 累积,split(/\n\n/) 后把最后一段不完整的留在 buffer 里等下一块。
  • 只取 data: 行、忽略 event:/id:/retry::多行 data 拼接后再 JSON.parse

后端的编码方对称地简单——Python 的 EventEncoder._encode_sse 就一行(sdks/python/ag_ui/encoder/encoder.py:28-33):

# sdks/python/ag_ui/encoder/encoder.py(节选)
def _encode_sse(self, event: BaseEvent) -> str:
return f"data: {event.model_dump_json(by_alias=True, exclude_none=True)}\n\n"

注意 by_alias=True:Python 端字段用 snake/别名,序列化时按协议的 camelCase 别名输出,这样 wire 上和 TS 完全对齐。

4.3 chunk 事件:给后端的「简写」,在客户端就地展开

协议里有三个 *_CHUNK 事件(TEXT_MESSAGE_CHUNK/TOOL_CALL_CHUNK/REASONING_MESSAGE_CHUNK)。它们不是新语义,而是「start+content 合一」的便捷写法——后端不想显式发 START/END 时,可以只发 chunk,客户端的 transformChunks(ch02 工序②)会自动补出标准的 START/CONTENT/END。

它的工作方式是个小状态机,记录当前处于 text/tool/reasoning 哪种「模式」(chunks/transform.ts:42):

  • 来一个 chunk,若当前不在对应模式(或 id 变了),先关掉上一个(补一个 *_END),再开一个新的(补一个 *_START),然后把 delta 转成 *_CONTENT
  • 流结束时 finalize 里补上最后一个 *_END(chunks/transform.ts:314-317),保证不留未闭合的消息。
收到: TOOL_CALL_CHUNK{id=c1,name=f,delta="{"} TOOL_CALL_CHUNK{delta="}"} <流结束>
展开: TOOL_CALL_START{c1,f} → TOOL_CALL_ARGS{c1,"{"} → TOOL_CALL_ARGS{c1,"}"} → TOOL_CALL_END{c1}

这解释了 ch03 为什么 apply 阶段见到 chunk 就 throw:chunk 必须在 verify/apply 之前被展开,否则下游的状态机和 reducer 不认识它。

4.4 内容协商:后端怎么知道该发 SSE 还是 protobuf

编码侧(@ag-ui/encoder)用标准 HTTP 内容协商。EventEncoder 看请求的 Accept 头,用 preferredMediaTypes(从 negotiator 库改写而来,encoder/src/media-type.ts)判断客户端是否接受 proto:

// packages/encoder/src/encoder.ts:64-67(节选)
const preferred = preferredMediaTypes(acceptHeader, [proto.AGUI_MEDIA_TYPE]);
return preferred.includes(proto.AGUI_MEDIA_TYPE); // 接受就发二进制,否则 SSE

这是教科书式的 HTTP 做法:同一个端点,按 Accept 头决定响应格式,客户端和服务端不用预先约定。

4.5 跨语言一致性:同一份协议,各语言一份同形 schema

AG-UI 的协议本体没有用一个跨语言的 IDL 单一来源(proto 只覆盖二进制传输);事件/类型的「真源」是各语言手写但刻意同形的 schema:

关切TypeScript(Zod)Python(Pydantic)
事件类型packages/core/src/events.ts EventTypesdks/python/ag_ui/core/events.py
数据模型packages/core/src/types.tssdks/python/ag_ui/core/types.py
能力声明packages/core/src/capabilities.tssdks/python/ag_ui/core/capabilities.py
SSE 编码@ag-ui/encoderag_ui/encoder/encoder.py

一致性靠几件事撑住:

  • 字段别名对齐:Python 序列化 by_alias=True,wire 上是同一套 camelCase 名。
  • 宽松校验:BaseEventSchema.passthrough()(ch01)让一端新增字段不会让另一端解码失败。
  • 兼容补丁:为对方序列化习惯让路,如 TS 接受 Python 输出的 "outcome": null(ch01 §1.5)。
  • 客户端中间件做版本翻译(ch02 §2.4):老事件形态在客户端就地升级。

这是一种「靠纪律而非工具保证一致」的策略:好处是各语言能用最地道的 schema 库(Zod/Pydantic)、错误信息友好;代价是新增/修改事件得在多处同步,且像 verifyEvents 这种「规则即代码」的逻辑,严格说每个语言都得各实现一遍。

4.6 能力声明:让客户端「先问后用」

AgentCapabilities(packages/core/src/capabilities.ts:220)是个分类齐整的「我支持什么」清单:transport(streaming/websocket/httpBinary/resumable…)、toolsstate(snapshots/deltas)、reasoning(含 encrypted 零数据保留)、humanInTheLoop(含 interrupts/approveWithEdits)、multimodal 等,全部字段可选。AbstractAgent.getCapabilities?()(agent.ts:146)是可选实现——agent 自报能力,客户端据此决定显示哪些 UI(文件上传按钮、批准对话框…)。所有字段「省略 = 未声明(未知)」而非「不支持」(schema 顶部注释明说),这是个谨慎的语义选择。

4.7 边界与局限(诚实)

  • 运行时以 TypeScript 为主:verifyEvents/defaultApplyEvents 这类「消费端引擎」目前最完整的是 TS(packages/client);Python 等 SDK 侧重协议类型与编码,消费端引擎的完整度需按具体 SDK 核实,本文未逐一验证非 TS 的客户端折叠逻辑 (inferred)。
  • 没有单一 IDL 真源:协议契约靠各语言手写同形 schema + 纪律保持一致(§4.5),不是从一份 schema 自动生成全部语言。
  • 顺序规则散落在状态机:合法事件顺序的权威是 verifyEvents 代码而非独立规范,跨语言对齐靠各自重写。
  • 传输适配仍以 HTTP/SSE 为中心:WebSocket/webhook 在能力枚举里有位置,但本文只核实了 HTTP+SSE 与 protobuf 两条解码路径的真实实现。

4.8 横向对比(同 shelf 兄弟协议)

README 自己点明了三件套的分工——这是理解 AG-UI 定位最好的一句话:

协议解决什么一句话
MCP给 agent 工具agent ↔ 工具/数据源
A2Aagent 之间通信agent ↔ agent
AG-UI把 agent 带进用户界面agent ↔ 用户/UI

和 MCP 对比一个具体取舍:MCP 是请求/响应 + 工具发现为主的 JSON-RPC 风格;AG-UI 是单向事件流 + 客户端状态折叠为主——因为 UI 场景的本质是「实时回放一个正在发生的过程」,而不是「调一个函数拿返回值」。

4.9 代码地图

主题文件符号名
按 content-type 选解析器packages/client/src/transform/http.tstransformHttpEventStream
SSE 解析(UTF-8/跨块/多行 data)packages/client/src/transform/sse.tsparseSSEStream
protobuf 解析packages/client/src/transform/proto.tsparseProtoStream
chunk 事件展开packages/client/src/chunks/transform.tstransformChunks
二进制 media type 常量packages/proto/src/index.tsAGUI_MEDIA_TYPE
内容协商编码器packages/encoder/src/encoder.tsEventEncoder
Python SSE 编码sdks/python/ag_ui/encoder/encoder.pyEventEncoder._encode_sse
能力声明packages/core/src/capabilities.tsAgentCapabilitiesSchema