跳到主要内容

管道引擎:按「输入就绪度」调度

本章讲 Haystack 的心脏:一张图怎么被 run() 调度执行。读完你能讲清「为什么 Haystack 能跑循环和分支,而不只是顺序流水线」。

1. 图长什么样

管道内部就是一张 networkx.MultiDiGraph(有向多重图):

  • 节点 = 一个组件实例,外加它的输入/输出 socket 元数据;
  • = 一次 connect(),记着「从哪个输出 socket 到哪个输入 socket」。

图存在 PipelineBase.__init__self.graphhaystack/core/pipeline/base.py:112self.graph = networkx.MultiDiGraph())。注意一个刻意的设计:访问次数(visits)不存进图里——

引用:_get_component_with_graph_metadata_and_visitsbase.py:1207)的注释「We can't store visits in the pipeline graph because this would prevent reentrance / thread-safe execution」。它每轮把 visits 临时注入组件 dict,而不写回图。这是为了让同一张图能被并发/可重入地跑。

2. 核心机制:优先级调度

2.1 它要解决的小问题

顺序流水线很简单(A→B→C 就按顺序跑)。但 Haystack 想支持:

  • 分支:路由器只给「是」分支送数据,「否」分支的组件永远等不到输入;
  • 合并:一个组件等多个上游都到齐了再跑;
  • 循环:组件被反复执行(Agent 的雏形、重试……)。

纯拓扑排序做不到这些。Haystack 的答案是:不预先排序,而是每轮重新判断「谁现在能跑」

2.2 思路/直觉

给每个组件算一个优先级枚举(ComponentPrioritybase.py:75):

优先级含义
HIGHEST有 greedy 输入到了,且所有输入齐了——立刻跑1
READY所有上游都执行完了,可以跑2
DEFER还能跑,但建议再等等(lazy variadic 还可能来更多输入)3
DEFER_LAST最该往后拖4
BLOCKED必填输入没齐 / 没被触发——根本不能跑5

值越小越优先。引擎把组件丢进一个 FIFOPriorityQueue,每轮取最小值那个跑。

2.3 「能不能跑」的两道门

一个组件能跑,要同时过两道门(can_component_runcomponent_checks.py:12):

能跑 = 收齐了所有必填输入 AND 被触发过
(mandatory inputs) (trigger)

触发有三种来源(has_any_triggercomponent_checks.py:28):

1. 某个上游给它送了输入 (predecessor)
2. 用户从管道外直接喂了输入 (user input, 且 visits==0)
3. 它根本没有任何上游连入 (no incoming, 且 visits==0)
—— 这类「源头组件」靠 run() 被调用本身触发

注释里点明了关键不变量(component_checks.py:37-40):一个触发只能让组件跑一次,因为输入在执行前会被「消费」(删除),外部输入和 run() 调用都只在 visits==0 时算触发。这避免了无限循环。

2.4 优先级怎么算

_calculate_prioritybase.py:1184)把上面的判断翻译成枚举:

# 真实逻辑的白话版(示意,非源码;真实见 base.py:1194-1205)
if not can_component_run(comp, inputs):
return BLOCKED
if 有 greedy socket 到了 and 所有 socket 都齐了:
return HIGHEST
if 所有上游都执行完了:
return READY
if 所有 lazy variadic socket 都已定型:
return DEFER
return DEFER_LAST

2.5 主循环

Pipeline.runpipeline.py:114)的核心是一个 while True

  1. _get_next_runnable_componentbase.py:1225)从优先队列取最高优先的;
  2. 若取到的是 BLOCKED → 整个管道卡住了,退出(并诊断哪个组件堵了,base.py:1348 _find_components_blocking_pipeline);
  3. 若是 DEFER/DEFER_LAST 且队列里还有别的 → 用拓扑序做 tiebreak(_tiebreak_waiting_componentsbase.py:1299);
  4. _consume_component_inputsbase.py:1109)把该组件的输入从全局状态里取出并删掉(这就是「消费」);
  5. _run_componentpipeline.py:43)调 instance.run(**inputs),校验输出是 Mapping
  6. _write_component_outputsbase.py:1429)把输出按图里的边写进下游输入槽;
  7. 若队列「过期」(有更高优先的组件出现了)→ 重建队列(_is_queue_stalebase.py:1514)。

循环退出时,返回 pipeline_outputs——默认只含叶子组件(无出边)的输出,除非你用 include_outputs_from 显式要中间结果(见 run docstring,pipeline.py:198-202)。

关键流程图(怎么读:从上到下是一轮迭代,循环回到顶部直到无可跑组件):

┌─────────────────────────────────────┐
│ 从优先队列取最高优先组件 │◀──────────┐
└─────────────────────────────────────┘ │
│ │
┌────────┴─────────┐ │
BLOCKED? 可跑 │
│ │ │
▼ ▼ │
退出循环 消费它的输入(删掉) │
│ │
instance.run(**inputs) │
│ │
把输出写进下游输入槽 │
│ │
队列过期? ──是──▶ 重建队列 ───────┘

3. variadic socket:合并与贪婪

一个输入口可以接多个上游,这靠两种特殊类型标注(types.py:23,31):

  • Variadic[T](lazy):等所有连进来的上游都执行完,把它们的输出收成一个 list 再跑。用于「合并多路结果」(如 DocumentJoiner)。
  • GreedyVariadic[T]收到第一个输入就立刻跑,不等其他。

这两个标注只是给类型「打标记」——InputSocket.__post_init__types.py:78)检查 type.__metadata__ 是否等于约定的常量字符串(HAYSTACK_VARIADIC_ANNOTATION),来设 is_lazy_variadic / is_greedy

消费输入时(_consume_component_inputsbase.py:1132-1150)三种 socket 处理不同:

socket 类型怎么取值
普通只取第一个输入值
greedy variadic取第一个值,包成单元素 list,且总是删除(连用户输入也删,否则会无限触发)
lazy variadic取所有值(一个 list)

边界细节:多个 sender 连到同一个 list 型 socket 时,connect() 会把它自动升级成 lazy variadic(connect docstring,base.py:449-453)。同步 Pipeline 下,合并出的 list 按 sender 名字字母序排列,不是 connect() 调用顺序;AsyncPipeline 下不保证顺序。

4. 循环与分支怎么被撑起来

  • 循环:因为引擎每轮重判优先级、组件可被反复变成 READY,一个组件能跑多次(受 max_runs_per_component,默认 100 限制,base.py:93,103)。Agent 的早期形态、重试逻辑都靠这个。
  • 分支:路由器组件(haystack/components/routers/)只往一条输出边送值,另一条边的下游因为「必填输入没齐」永远 BLOCKED,自然不跑。

5. 同步 vs 异步

AsyncPipelineasync_pipeline.py)用同一套优先级逻辑,但调度器不同:它用 asyncio 把可跑的组件包成 task 并发执行,用一个 asyncio.Semaphore 限并发(async_pipeline.py:208),无依赖的分支可以真正并行。单个组件若没有 run_async,会被丢到线程池里跑(async_pipeline.py:83-88loop.run_in_executor)——注释特意提醒 contextvars(如 tracing span)不会自动传进线程池。(inferred: 「并行」的强弱取决于组件是否实现了 run_async,从 _run_component_async 的两条分支可推得)


下一章:组件本身的契约——@component 到底给一个普通类做了什么,connect() 怎么按类型配对。见 02-component-contract.md