管道引擎:按「输入就绪度」调度
本章讲 Haystack 的心脏:一张图怎么被
run()调 度执行。读完你能讲清「为什么 Haystack 能跑循环和分支,而不只是顺序流水线」。
1. 图长什么样
管道内部就是一张 networkx.MultiDiGraph(有向多重图):
- 节点 = 一个组件实例,外加它的输入/输出 socket 元数据;
- 边 = 一次
connect(),记着「从哪个输出 socket 到哪个输入 socket」。
图存在 PipelineBase.__init__ 的 self.graph(haystack/core/pipeline/base.py:112,self.graph = networkx.MultiDiGraph())。注意一个刻意的设计:访问次数(visits)不存进图里——
引用:
_get_component_with_graph_metadata_and_visits(base.py:1207)的注释「We can't store visits in the pipeline graph because this would prevent reentrance / thread-safe execution」。它每轮把visits临时注入组件 dict,而不写回图。这是为了让同一张图能被并发/可重入地跑。
2. 核心机制:优先级调度
2.1 它要解决的小问题
顺序流水线很简单(A→B→C 就按顺序跑)。但 Haystack 想支持:
- 分支:路由器只给「是」分支送数据,「否」分支的组件永远等不到输入;
- 合并:一个组件等多个上游都到齐了再跑;
- 循环:组件被反复执行(Agent 的雏形、重试……)。
纯拓扑排序做不到这些。Haystack 的答案是:不预先排序,而是每轮重新判断「谁现在能跑」。
2.2 思路/直觉
给每个组件算一个优先级枚举(ComponentPriority,base.py:75):
| 优先级 | 含义 | 值 |
|---|---|---|
HIGHEST | 有 greedy 输入到了,且所有输入齐了——立刻跑 | 1 |
READY | 所有上游都执行完了,可以跑 | 2 |
DEFER | 还能跑,但建议再等等(lazy variadic 还可能来更多输入) | 3 |
DEFER_LAST | 最该往后拖 | 4 |
BLOCKED | 必填输入没齐 / 没被触发——根本不能跑 | 5 |
值越小越优先。引擎把组件丢进一个 FIFOPriorityQueue,每轮取最小值那个跑。
2.3 「能不能跑」的两道门
一个组件能跑,要同时过两道门(can_component_run,component_checks.py:12):
能跑 = 收齐了所有必填输入 AND 被触发过
(mandatory inputs) (trigger)
触发有三种来源(has_any_trigger,component_checks.py:28):
1. 某个上游给它送了输入 (predecessor)
2. 用户从管道外直接喂了输入 (user input, 且 visits==0)
3. 它根本没有任何上游连入 (no incoming, 且 visits==0)
—— 这类「源头组件」靠 run() 被调用本身触发
注释里点明了关键不变量(component_checks.py:37-40):一个触发只能让组件跑一次,因为输入在执行前会被「消费」(删除),外部输入和 run() 调用都只在 visits==0 时算触发。这避免了无限循环。
2.4 优先级怎么算
_calculate_priority(base.py:1184)把上面的判断翻译成枚举:
# 真实逻辑的白话版(示意,非源码;真实见 base.py:1194-1205)
if not can_component_run(comp, inputs):
return BLOCKED
if 有 greedy socket 到了 and 所有 socket 都齐了:
return HIGHEST
if 所有上游都执行完了:
return READY
if 所有 lazy variadic socket 都已定型:
return DEFER
return DEFER_LAST
2.5 主循环
Pipeline.run(pipeline.py:114)的 核心是一个 while True:
_get_next_runnable_component(base.py:1225)从优先队列取最高优先的;- 若取到的是
BLOCKED→ 整个管道卡住了,退出(并诊断哪个组件堵了,base.py:1348_find_components_blocking_pipeline); - 若是
DEFER/DEFER_LAST且队列里还有别的 → 用拓扑序做 tiebreak(_tiebreak_waiting_components,base.py:1299); _consume_component_inputs(base.py:1109)把该组件的输入从全局状态里取出并删掉(这就是「消费」);_run_component(pipeline.py:43)调instance.run(**inputs),校验输出是Mapping;_write_component_outputs(base.py:1429)把输出按图里的边写进下游输入槽;- 若队列「过期」(有更高优先的组件出现了)→ 重建队列(
_is_queue_stale,base.py:1514)。
循环退出时,返回 pipeline_outputs——默认只含叶子组件(无出边)的输出,除非你用 include_outputs_from 显式要中间结果(见 run docstring,pipeline.py:198-202)。
关键流程图(怎么读:从上到下是一轮迭代,循环回到顶部直到无可跑组件):
┌─────────────────────────────────────┐
│ 从优先队列取最高优先组件 │◀──────────┐
└─────────────────────────────────────┘ │
│ │
┌────────┴─────────┐ │
BLOCKED? 可跑 │
│ │ │
▼ ▼ │
退出循环 消费它的输入(删掉) │
│ │
instance.run(**inputs) │
│ │
把输出写进下游输入槽 │
│ │
队列过期? ──是──▶ 重建队列 ───────┘
3. variadic socket:合并与贪婪
一个输入口可以接多个上游,这靠两种特殊类型标注(types.py:23,31):
Variadic[T](lazy):等所有连进来的上游都执行完,把它们的输出收成一个 list 再跑。用于「合并多路结果」(如DocumentJoiner)。GreedyVariadic[T]:收到第一个输入就立刻跑,不等其他。
这两个标注只是给类型「打标记」——InputSocket.__post_init__(types.py:78)检查 type.__metadata__ 是否等于约定的常量字符串(HAYSTACK_VARIADIC_ANNOTATION),来设 is_lazy_variadic / is_greedy。
消费输入时(_consume_component_inputs,base.py:1132-1150)三种 socket 处理不同:
| socket 类型 | 怎么取值 |
|---|---|
| 普通 | 只取第一个输入值 |
| greedy variadic | 取第一个值,包成单元素 list,且总是删除(连用户输入也删,否则会无限触发) |
| lazy variadic | 取所有值(一个 list) |
边界细节:多个 sender 连到同一个 list 型 socket 时,
connect()会把它自动升级成 lazy variadic(connectdocstring,base.py:449-453)。同步Pipeline下,合并出的 list 按 sender 名字字母序排列,不是connect()调用顺序;AsyncPipeline下不保证顺序。
4. 循环与分支怎么被撑起来
- 循环:因为引擎每轮重判优先级、组件可被反复变成
READY,一个组件能跑多次(受max_runs_per_component,默认 100 限制,base.py:93,103)。Agent 的早期形态、重试逻辑都靠这个。 - 分支:路由器组件(
haystack/components/routers/)只往一条输出边送值,另一条边的下游因为「必填输入没齐」永远BLOCKED,自然不跑。
5. 同步 vs 异步
AsyncPipeline(async_pipeline.py)用同一套优先级逻辑,但调度器不同:它用 asyncio 把可跑的组件包成 task 并发执行,用一个 asyncio.Semaphore 限并发(async_pipeline.py:208),无依赖的分支可以真正并行。单个组件若没有 run_async,会被丢到线 程池里跑(async_pipeline.py:83-88,loop.run_in_executor)——注释特意提醒 contextvars(如 tracing span)不会自动传进线程池。(inferred: 「并行」的强弱取决于组件是否实现了 run_async,从 _run_component_async 的两条分支可推得)
下一章:组件本身的契约——@component 到底给一个普通类做了什么,connect() 怎么按类型配对。见 02-component-contract.md。