跳到主要内容

消息流水线:洋葱模型

这章讲什么: AstrBot 收到任何一条消息,都让它从同一条「流水线」走一遍。本章讲清楚这条流水线由哪 9 个阶段组成、它们的顺序、以及一个巧妙的实现:用异步生成器递归地实现「洋葱模型」,让阶段既能在消息处理前插手、又能在处理后插手。

1. 它要解决的小问题

收到一条聊天消息后,在真正「回复」之前,有一堆通用判断要做:

  • 这条消息是发给机器人的吗?(群里别人随便聊天,机器人不该乱插嘴)
  • 这个用户/群在白名单里吗?会话开关开着吗?有没有超频?
  • 内容安全吗?要不要预处理?
  • 该交给哪个插件处理,还是直接问大模型?
  • 回复要不要加前缀、转成图片或语音?怎么按平台格式发出去?

如果把这些逻辑全塞进一个大函数,会乱成一团且难扩展。AstrBot 的做法:把它们拆成一串有序的「阶段(Stage)」,像工厂流水线一样依次处理同一个事件对象。

2. 思路/直觉:有序的阶段链

核心抽象只有一个:Stage。每个阶段是一个类,有一个 process(event) 方法,接收同一个 AstrMessageEvent,可以读它、改它、甚至「叫停」它。

阶段的执行顺序是写死的一张清单(stage_order.py):

stage_order.py: STAGES_ORDER
──────────────────────────────────
1. WakingCheckStage 检查是否需要唤醒(该不该理这条消息)
2. WhitelistCheckStage 群聊/私聊白名单
3. SessionStatusCheckStage 会话是否整体启用
4. RateLimitStage 频率限制
5. ContentSafetyCheckStage 内容安全
6. PreProcessStage 预处理
7. ProcessStage ★核心:交给插件,或调 LLM/Agent
8. ResultDecorateStage 装饰结果(加前缀/转图片/转语音)
9. RespondStage 真正把消息发出去

引用:阶段顺序定义在 astrbot/core/pipeline/stage_order.py:3(STAGES_ORDER),每项的中文注释就说明了它干什么。

3. 入口:事件总线把事件喂进流水线

流水线不是凭空运行的,它由事件总线驱动。事件总线维护一个异步队列,无限循环地取事件、找到对应的流水线调度器、启动一次执行。

# 示意,非源码 —— 事件总线的核心循环
async def dispatch():
while True:
event = await event_queue.get() # 取出一个消息事件
conf_id = config_mgr.get_conf_info(
event.unified_msg_origin)["id"] # 该会话用哪套配置
scheduler = scheduler_mapping[conf_id] # 对应的流水线调度器
task = asyncio.create_task(scheduler.execute(event)) # 并发执行

真实实现见 astrbot/core/event_bus.py:39(EventBus.dispatch)。重点:每个事件用 create_task 并发处理,所以多个会话能同时跑;_pending_tasks 持有强引用防止任务被 GC(event_bus.py:52)。每套配置(conf_id)有独立的流水线调度器,这让「不同群用不同配置」成为可能。

4. 核心机制:洋葱模型(为什么不是简单的顺序执行)

4.1 问题:有些阶段需要「前后夹住」后续阶段

想象「发送消息后要记日志」或「整个处理用一个 session 锁包起来」这种需求——它需要在后续所有阶段跑完之后再做点事。简单的 for stage in stages: stage.process() 做不到这点,因为一个阶段 return 后就彻底交棒了。

4.2 思路:阶段可以是「异步生成器」,yield 是分界点

AstrBot 让 process() 可以返回两种东西:

  • 普通协程(不含 yield):跑完就交给下一阶段。这是「基线」情况。
  • 异步生成器(含 yield):yield 之前是前置处理,yield 之后是后置处理。调度器在它 yield 的那一刻,转去跑所有后续阶段,跑完再回到 yield 之后继续。

这样就形成了洋葱:外层阶段的「后置处理」包住了所有内层阶段。

WakingCheck.process()
│ 前置: 判断唤醒
│ yield ───────────┐
│ ▼
│ Whitelist.process()
│ │ 前置
│ │ yield ──────┐
│ │ ▼
│ │ ... ProcessStage(调 Agent) ...
│ │ │
│ │ 后置 ◄──────┘ (内层全跑完才回来)
│ 后置 ◄──────┘

4.3 真实实现

调度器用递归实现这个洋葱。_process_stages 从第 i 个阶段开始,遇到异步生成器就在 yield 点递归调用自己处理 i+1 之后的阶段:

# 示意,非源码 —— 洋葱模型的递归核心
async def _process_stages(event, from_stage=0):
for i in range(from_stage, len(stages)):
coroutine = stages[i].process(event)
if isinstance(coroutine, AsyncGenerator):
async for _ in coroutine: # 跑到 yield(前置已完成)
if event.is_stopped(): break
await _process_stages(event, i + 1) # 递归跑后续阶段
if event.is_stopped(): break # 回来后,继续 yield 之后(后置)
else:
await coroutine # 普通协程:跑完即可
if event.is_stopped(): break

真实代码见 astrbot/core/pipeline/scheduler.py:35(PipelineScheduler._process_stages)。isinstance(coroutine, AsyncGenerator) 那行(scheduler.py:50)是判断洋葱 vs 基线的关键。

关键细节:event.is_stopped() 是熔断开关。 任何阶段调 event.stop_event() 后,后续阶段和外层的后置处理都会 break 跳出——这就是「机器人决定不理这条消息」的实现方式。Stage 抽象基类定义在 astrbot/core/pipeline/stage.py:19,register_stage 装饰器(stage.py:13)把实现类登记进 registered_stages,再由调度器按 STAGES_ORDER 排序(scheduler.py:22)。

5. 第一个阶段细看:唤醒判定(该不该理这条消息)

这是最有「业务味」的阶段,也是新手最常困惑的地方:为什么群里发消息机器人不回? 答案就在这里。

唤醒成立的条件(任一即可)如下表。其中前 4 行直接对应 astrbot/core/pipeline/waking_check/stage.py:36 的类 docstring(docstring 列出 5 条);「At 全体成员」「私聊」两行的判定逻辑则来自代码本身——私聊唤醒见 stage.py:141-:145,At 全体成员见对 AtAll 消息段的处理:

条件说明出处
被 @ 了群聊里 At 了机器人本身docstring 条 1
引用了机器人的消息Reply 的 sender 是机器人docstring 条 2
命中唤醒前缀消息以 wake_prefix(如 /)开头,且开头不是 At 别人docstring 条 3
插件指令命中某个插件 handler 的 filter 通过docstring 条 4
私聊默认私聊直接唤醒(除非配置要求私聊也要前缀)stage.py:141-:145
At 全体成员且未配置忽略 AtAll代码 AtAll 判定(非 docstring)

判定逻辑见 waking_check/stage.py:102(检查 wake_prefix)到 :145(私聊唤醒)。命中前缀时会把前缀从消息里剥掉(stage.py:119),这样下游 Agent 拿到的就是干净的用户输入。

这个阶段还顺手做了插件路由。 它遍历所有注册了「适配器消息事件」的插件 handler,逐个跑它们的 filter(指令匹配、权限、正则等),把命中的塞进 activated_handlers(stage.py:160-:234),供后面的 ProcessStage 用。权限不足时会直接回复并 stop_event()(stage.py:210)。

最后一道闸: 如果走完所有判定 is_wake 仍为 False,就 event.stop_event()(stage.py:237-238)——整条流水线在这里熔断,机器人保持沉默。

6. 核心阶段:ProcessStage 怎么分流

ProcessStage 是第 7 个阶段,决定这条消息交给插件处理还是交给 Agent。逻辑见 astrbot/core/pipeline/process_stage/stage.py:28(ProcessStage.process):

ProcessStage.process

有 activated_handlers?
├── 是 ──► StarRequestSubStage 跑插件 handler
│ └─ handler 若返回 ProviderRequest(想调 LLM)
│ ──► 再走 AgentRequestSubStage

└── 否 ──► 满足「被 @/唤醒 且未禁用 LLM」?
└─ 是 ──► AgentRequestSubStage(默认问大模型)
  • 插件分支:star_request_sub_stage.process(event)(stage.py:38)。如果插件 handler 自己发起了 LLM 请求(返回 ProviderRequest),会把它设进 provider_request 再转 Agent 分支(stage.py:41-48)。
  • Agent 分支:当没有插件命中、但消息确实是冲机器人来的(event.is_at_or_wake_command)且没被禁用 LLM,就走 agent_sub_stage.process(stage.py:65)。这条分支进入下一章的主角——本地 Agent 工具循环。

AgentRequestSubStage 还会根据配置 agent_runner_type 选「本地 Agent」还是「第三方 Agent」(Dify/Coze 等),见 process_stage/method/agent_request.py:27-:31

7. 收尾两阶段:装饰与发送

  • ResultDecorateStage(result_decorate/stage.py:22):触发 OnDecoratingResultEvent 插件钩子(:160),按概率把文本转语音(TTS,:256-:264),或转图片(t2i),加回复前缀等。
  • RespondStage(respond/stage.py:19):真正把 MessageEventResult 交给平台适配器发出。流式结果走 event.send_streaming(:223),普通结果走 event.send,发完触发 OnAfterMessageSentEvent 钩子(:315)。

8. 巧妙之处(可借鉴)

  • 洋葱模型用「异步生成器是否含 yield」自动区分前后置,不需要阶段显式声明自己要不要后置处理——写 yield 就有,不写就没有。代码极简却表达力强(scheduler.py:50)。
  • 每套会话配置独立一条流水线(core_lifecycle.py:425 load_pipeline_scheduler),天然支持「不同群不同行为」,无需在阶段里到处写 if。
  • stop_event() 作为统一熔断,任何阶段都能在任意深度终止整条链,且会正确触发所有外层的提前退出(scheduler.py:54:64:74)。

9. 边界与局限

  • 阶段顺序是硬编码的(STAGES_ORDER),插件无法插入新的「阶段」,只能在既有阶段提供的钩子点(如 OnLLMRequestEvent)挂逻辑。
  • 洋葱递归对每个 yield 都递归一层,阶段数固定(9 个)所以深度可控;但要理解控制流需要读懂这段递归,门槛略高。

10. 代码地图(导航索引)

主题文件路径符号名
阶段执行顺序astrbot/core/pipeline/stage_order.pySTAGES_ORDER
阶段抽象基类 + 注册astrbot/core/pipeline/stage.pyStageregister_stageregistered_stages
流水线调度(洋葱递归)astrbot/core/pipeline/scheduler.pyPipelineScheduler._process_stagesPipelineScheduler.execute
事件总线派发astrbot/core/event_bus.pyEventBus.dispatch
唤醒判定astrbot/core/pipeline/waking_check/stage.pyWakingCheckStage.process
处理阶段分流astrbot/core/pipeline/process_stage/stage.pyProcessStage.process
Agent vs 第三方分流astrbot/core/pipeline/process_stage/method/agent_request.pyAgentRequestSubStage
装饰阶段astrbot/core/pipeline/result_decorate/stage.pyResultDecorateStage
发送阶段astrbot/core/pipeline/respond/stage.pyRespondStage
生命周期装配流水线astrbot/core/core_lifecycle.pyAstrBotCoreLifecycle.load_pipeline_scheduler