跳到主要内容

04 · 中间件、上下文压缩与深入

这一章讲两件让 AgentScope「能扩展、能省钱」的机制——洋葱中间件和上下文压缩——再收尾巧妙之处、边界、横向对比、代码地图。

4.1 中间件:不改源码就能拦截循环

要解决的小问题

你想给 agent 加日志、加 token 预算上限、在每次推理前注入检索到的资料(RAG)、加分布式 tracing。这些都是横切关注点:它们不属于任何单个步骤,却要插进多个步骤。直接改 Agent 源码会把它撑爆。

思路:洋葱模型 + 钩子点

AgentScope 在循环的 5 个关键节点留了钩子,中间件像洋葱一层层包住真逻辑(middleware/_base.py:12):

钩子包住什么模式
on_reply整个 reply 过程洋葱(前后逻辑)
on_reasoning推理/模型调用阶段洋葱
on_acting单个工具的纯执行(不含权限/写上下文)洋葱
on_model_call最底层的模型 API 调用洋葱
on_compress_context上下文压缩洋葱
on_system_prompt系统提示字符串流水线(顺序变换)

前四个是「洋葱」:能在 next_handler() 前后插逻辑、能改输入、能转换产出。on_system_prompt 是「流水线」:多个中间件顺序应用,每个吃上一个的输出(agent/_agent.py:2030)。

实现:_xxx / _xxx_impl 夹层

index 提过的承重墙在这里展开。以 _reasoning 为例(agent/_agent.py:728):外层方法先看有没有注册 on_reasoning 中间件——没有就直接走 _reasoning_impl(零开销);有就构造一条 execute_chain 把它们串成洋葱:

# 示意,非源码:洋葱链怎么递归织起来
async def execute_chain(index=0, **kw):
if index >= len(middlewares):
async for item in self._reasoning_impl(**kw): # 洋葱最里层 = 真逻辑
yield item
else:
async def next_handler(**kw): # 传给中间件的「下一层」
async for item in execute_chain(index + 1, **kw):
yield item
async for item in middlewares[index].on_reasoning(
agent=self, input_kwargs=kw, next_handler=next_handler):
yield item

Agent.__init__(agent/_agent.py:167)启动时就按实现的钩子把中间件分桶(_reasoning_middlewares_acting_middlewares……),靠 is_implemented(middleware/_base.py:52,比较子类方法是否覆盖了基类)判断。于是运行时不必每次过滤,且没注册的钩子完全不进洋葱逻辑。

仓库自带的中间件(middleware/ 下)就是范例:_budget.py(预算)、_rag.py(检索增强)、_tracing/(链路追踪)、_longterm_memory/(长期记忆)、_tts_middleware.py(语音合成)。

4.2 上下文压缩:对话太长怎么办

要解决的小问题

对话越滚越长,迟早撑爆模型上下文窗口,而且越长越贵。需要在不丢关键信息的前提下「瘦身」。

思路:把旧对话换成结构化摘要

compress_context_compress_context_impl(agent/_agent.py:312)的流程:

1. 数当前 token,< trigger_ratio × context_size? ─是─▶ 啥都不做
2. 把上下文切成「要压缩的旧消息」+「保留的近消息」
切点由 reserve_ratio 决定(_split_context_for_compression)
3. 让模型对「旧消息」做结构化摘要(generate_structured_output)
摘要 schema = SummarySchema:任务概览/当前状态/重要发现/下一步/需保留的上下文
4. 用 summary_template 把摘要格式化,存进 state.summary
5. 若配了 offloader:把被压缩掉的原文 offload 到文件,
在摘要末尾附一句「原文在 '<path>',需要可去查」
6. state.context = 保留的近消息(旧的被摘要取代)

触发阈值与比例在 ContextConfig(agent/_config.py:51):trigger_ratio 默认 0.8(超过上下文 80% 触发),reserve_ratio 默认 0.1(保留约 10% 的近消息)。摘要不是随便截断,而是让模型按 SummarySchema(agent/_config.py:9)填五个结构化字段——为「未来的自己能接着干活」而写,这正是 compression_prompt 里那句「写一份能让你在未来上下文窗口里高效续工的延续摘要」。

巧妙之处:边界消息的「半压缩」

切分上下文时(_split_context_for_compression,agent/_agent.py:1730),从后往前累加 token 找切点,但切点正好落在某条消息中间怎么办?代码进一步在该消息的内容块层面再切一刀(agent/_agent.py:1788),而且避免把一对 tool_call 和它的 tool_result 拆散(agent/_agent.py:1804,扫描保留区里「有结果却没对应调用」的块,调整切点)。这种「连工具调用/结果配对都不破坏」的细致,是裸截断做不到的。

兜底:压缩本身也可能超长

如果连「压缩用的 prompt」都超了上下文,代码标记 context_overflow,压缩失败时逐条丢最老的消息再试(agent/_agent.py:462),直到能塞下。

4.3 工具结果的截断与 offload

超长工具结果(比如 Read 一个巨大文件)会瞬间撑爆上下文。_split_tool_result_for_compression(agent/_agent.py:1859)在结果超过 tool_result_limit(默认 50000 token,agent/_config.py:112)时把它切成「保留部分 + offload 部分」:

  • 找到刚好不超限的边界块;若边界块是文本,按 token 比例截断文本(agent/_agent.py:1936)。
  • 保留部分加一句 <<<TRUNCATED>>> reminder;配了 offloader 的话,溢出部分写到文件,reminder 里附上路径(agent/_agent.py:1503),模型需要时能去读。

Offloader 是个 Protocol(workspace/_offload_protocol.py),只定义了 offload_context / offload_tool_result——具体落盘交给工作区后端(本地 / Docker / E2B,见 workspace/ 下各实现)。

配套:Read 缓存的清理

压缩后还要清理「不再被保留消息引用的 Read 缓存」(_clear_unreserved_read_cache,agent/_agent.py:1833)——它扫描保留消息里的 Read 工具调用收集文件路径,把缓存里不在这批路径中的条目丢掉(state/_state.py:121clean_file_cache)。缓存本身是带 LRU + 字节上限的(ToolContext,state/_state.py:31)。

4.4 顺带一提:Toolkit 的分组与按需激活

工具不是全摊给模型,而是按 ToolGroup 分组(tool/_toolkit.py:66)。basic 组永远可用,其余组默认不激活——模型要用时先调内建的 meta-tool(ResetTools)激活对应组(tool/_toolkit.py:540check_tool_available 会拦未激活组的调用并提示先激活)。这让「工具很多但当前任务只需几个」时不必把全部 schema 塞进每次模型调用,省 token、降干扰。技能(Skill)走类似机制:不是直接调,而是先用 SkillViewer 读说明再按说明操作(tool/_toolkit.py:51 的指令模板)。

4.5 巧妙之处汇总(可借鉴)

  • 状态机驱动循环(_check_next_action,agent/_agent.py:2297):下一步由「上一条消息里工具调用的状态」推导,不写死步骤——这是暂停/续跑能优雅成立的根。
  • 输出协议化为事件流(event/_event.py):换来边生成边渲染、流中暂停、多端订阅。
  • 危险程度编码进决定(PermissionDecision.bypass_immune,permission/_decision.py):工具只标「这是安全 ASK」,引擎按模式决定怎么对待。
  • _acting 只包纯 I/O(agent/_agent.py:1574):权限/写上下文都在它外面,于是把它丢后台任务也安全(middleware/_base.py:114)。
  • 并发工具执行用哨兵保证不丢事件(agent/_agent.py:1281):gather 之后才放哨兵 = 所有事件已入队。
  • 压缩不破坏 tool_call/tool_result 配对(agent/_agent.py:1804)。
  • 音频块不进记忆(_save_to_context,agent/_agent.py:2245):助手说出来的语音只走流式事件,原始字节不污染对话历史。

4.6 边界与局限(诚实)

  • BYPASS 模式真的会执行危险操作。 它故意跳过所有安全 ASK(permission/_engine.py:337),只剩用户的 deny 规则当护栏。文档明说只在沙箱/容器或完全信任时用。
  • is_state_injected 工具 + 后台 offload 有并发隐患。 这类工具拿到的是活的 agent.state,丢后台跑可能并发改状态——源码里挂着 TODO 未解决(agent/_agent.py:1636middleware/_base.py:131)。
  • 压缩依赖模型的结构化输出能力。 摘要靠 generate_structured_output;若模型不擅长结构化输出,摘要质量会打折。
  • token 计数频繁。 压缩切分过程多次调 count_tokens(逐消息、逐块试探),对大上下文有成本。
  • 本文未深入的部分: app/ 的 FastAPI 多租户/多会话服务层、RAG、embedding、各家 formatter/model provider 实现——本文只确认了它们的存在与边界,未逐行精读。

4.7 横向对比(同 shelf 兄弟)

AgentScope 属于 multi-agent area。和兄弟项目的取舍差异:

  • vs agency-swarm:agency-swarm 不重写推理循环(那是 OpenAI Agents SDK 干的),只加「组织编排 + 通信流 + 共享历史」;AgentScope 自己拥有整个 reasoning-acting 循环,并把它做成可流式、可拦截、带权限引擎的核心资产。
  • 就「输出协议」而言:多数框架返回文本或简单 chunk,AgentScope 把输出做成细粒度 start/delta/end 事件协议(见 02 章),更偏向「为生产前端而设计」。
  • 就「安全」而言:它把权限做成独立引擎 + 多模式 + bypass-immune 安全门(03 章),这在 agent 框架里属于较重、较完整的一套。
  • 跨库原理对照见总库 doc 的「agent 循环」「对世界采取动作」分支:docs/index.md

4.8 代码地图(导航索引)

主题文件关键符号
主循环入口agent/_agent.pyAgent.reply_stream / Agent.reply / _reply_impl
下一步推导(状态机)agent/_agent.py_check_next_action / _get_executable_tool_calls
reasoning 实现agent/_agent.py_reasoning_impl / _prepare_model_input / _call_model
工具分批与并发agent/_agent.py_batch_tool_calls / _execute_concurrent_tool_calls / _into_queue
单工具生命周期agent/_agent.py_execute_tool_call / _acting / _acting_impl / _handle_error_tool_call
事件→上下文翻译agent/_agent.py_convert_chat_response_to_event / _convert_tool_chunk_to_event
上下文压缩agent/_agent.py_compress_context_impl / _split_context_for_compression
工具结果截断/offloadagent/_agent.py_split_tool_result_for_compression / _clear_unreserved_read_cache
事件类型event/_event.pyEventType / AgentEvent / ReplyStartEvent / ToolResultEndEvent
中间件协议middleware/_base.pyMiddlewareBase / is_implemented / on_reasoning / on_acting
权限引擎permission/_engine.pyPermissionEngine.check_permission / _check_default / _check_explore / _is_safety_ask
权限模式/决定permission/_types.py / permission/_decision.pyPermissionMode / PermissionBehavior / PermissionDecision.bypass_immune
工具协议tool/_base.pyToolBase.check_permissions / check_read_only / match_rule / generate_suggestions
工具执行/分组tool/_toolkit.pyToolkit.call_tool / check_tool_available / _get_available_tools
工具结果累积tool/_response.pyToolResponse.append_chunk / ToolChunk
内容块/状态机message/_block.pyToolCallBlock / ToolCallState / ToolResultState
Agent 状态state/_state.pyAgentState / ToolContext / ReadCacheEntry
配置agent/_config.pyContextConfig / ReActConfig / ModelConfig / SummarySchema
模型抽象model/_base.pyChatModelBase.__call__ / _call_api / _get_retryable_exceptions
offload 协议workspace/_offload_protocol.pyOffloader.offload_context / offload_tool_result

→ 回到 index