跳到主要内容

Haystack — 架构与原理

30 秒导读: Haystack 是一个 Python 的 LLM 应用编排框架。你把一个个「组件」(检索器、prompt 模板、LLM、工具调用器……)当积木,用 connect() 接成一张图,框架负责按「谁的输入齐了谁先跑」的顺序执行。RAG 管道和工具型 Agent,都是这张图引擎上的两种典型用法。

1. 这是什么(零基础也能懂)

一句话定义: Haystack 是 deepset 出的开源框架,用来把「检索 + 模型生成 + 工具调用」这些步骤,组装成可运行、可保存、可调试的 管道(Pipeline)Agent

解决什么问题 / 给谁用:

假设你要做一个「问答机器人」:用户问一句话,你得先去文档库里捞出相关段落,把段落塞进一个 prompt 模板,再发给 GPT,最后把模型的回答整理成结构化答案。这一串步骤,如果手写就是一堆胶水代码,换个向量库、换个模型就得重写。

Haystack 把每一步都抽象成一个组件,步骤之间的连接是显式声明的。于是:

  • 步骤可替换(换 LLM、换向量库 = 换一个组件实例);
  • 整条管道可序列化成 YAML/JSON,存盘、版本管理、部署;
  • 数据怎么流是透明的,能画图、能下断点调试。

做生产级 RAG / 语义搜索 / 问答 / Agent 的工程师用。

它能做什么(功能):

  • 搭 RAG / 语义搜索 / 抽取式问答管道;
  • 搭能调工具、能多轮循环的 Agent;
  • 模型与厂商无关(OpenAI / Anthropic / HuggingFace / 本地模型……都是组件);
  • 管道可存可读、可循环可分支、可下断点恢复;
  • 自带追踪(tracing)和遥测。

用起来什么样: 一个最小 RAG 管道(取自 Pipeline.run 的官方 docstring)——

from haystack import Pipeline
from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever

pipe = Pipeline()
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store))
pipe.add_component("prompt_builder", ChatPromptBuilder(template=template))
pipe.add_component("llm", OpenAIChatGenerator())

pipe.connect("retriever", "prompt_builder.documents") # 检索结果 → 模板的 documents 变量
pipe.connect("prompt_builder", "llm") # 拼好的 prompt → LLM

results = pipe.run({
"retriever": {"query": question},
"prompt_builder": {"question": question},
})
print(results["llm"]["replies"][0].text)

你只声明「谁接谁」,从不写「先跑谁再跑谁」——执行顺序由引擎从图里算出来。

一句话直觉/类比: 把 Haystack 想成 数据流版的 Make/电子表格:每个组件是一个单元格,connect() 是单元格之间的引用,引擎自动算出「依赖谁先算」的拓扑顺序,依赖一齐就触发计算。


2. 顶层全景(它大概怎么转)

Haystack 的代码分两层,记住这个分层,整个项目就立起来了

是什么在哪
引擎层(core)组件契约 + 把组件连成图 + 调度执行haystack/core/
组件层(components)一堆现成积木:检索器、embedder、LLM、prompt 模板、工具调用器、Agent……haystack/components/
数据类(dataclasses)在组件间流动的数据:DocumentChatMessageToolCallhaystack/dataclasses/

顶层结构图(怎么读:左边是你写的图,右边是引擎,箭头是「调用 / 数据流」):

你声明的管道 引擎(core/pipeline)
┌────────────────────┐
│ add_component(name) │──注册节点──▶ networkx 有向图 self.graph
│ connect(a, b) │──加边────▶ (节点=组件, 边=socket 连接)
└────────────────────┘ │

run(data) ─────────────────────────▶ while 循环调度器
每轮: 给每个组件算优先级
(输入齐了=高优先, 没齐=阻塞)
取最高优先的跑 instance.run()
把输出写到下游组件的输入槽


叶子组件的输出 = 管道结果

部件一句话职责:

部件干什么在哪个文件
@component 装饰器把普通类登记成「组件」,解析它的输入/输出类型haystack/core/component/component.py
InputSocket / OutputSocket描述一个组件的某个输入/输出口:名字、类型、是否必填、连了谁haystack/core/component/types.py
PipelineBase持有图、connect()、序列化、调度用的优先级计算haystack/core/pipeline/base.py
Pipeline同步执行引擎:run() 里的 while 调度循环haystack/core/pipeline/pipeline.py
AsyncPipeline异步引擎:同样的图,无依赖的分支可并发跑haystack/core/pipeline/async_pipeline.py
component_checks.py「这个组件现在能不能跑」的全部判断逻辑haystack/core/pipeline/component_checks.py
Agent在引擎之上手写的 LLM↔工具循环haystack/components/agents/agent.py

主线走一遍(高层):

  1. add_component + connect,引擎在 self.graph(一张 networkx.MultiDiGraph)里记下节点和边。
  2. 你调 run(data),引擎把你的输入塞进各组件的「输入槽」。
  3. 引擎进入一个 while 循环:每轮给所有组件算一个优先级(输入齐全且被触发=能跑,否则阻塞),取优先级最高的那个跑它的 run()
  4. 组件的输出按图里的边,写进下游组件的输入槽——这可能让下游从「阻塞」变成「能跑」。
  5. 没有可跑的组件时退出,叶子节点(没有出边)的输出就是管道结果。

这套「按输入就绪度调度」的设计,天然支持循环(组件可被多次访问)和分支(路由器只给一条边送数据)。详见第 1 章。


阅读地图

建议顺序(由浅入深):

  1. 01-pipeline-engine.md — 先懂引擎:图怎么建、run() 的 while 循环怎么按优先级调度、怎么撑起循环和分支。这是 Haystack 的「心脏」。
  2. 02-component-contract.md — 组件契约:@component 装饰器干了什么、输入/输出 socket 怎么来的、connect() 怎么按类型配对、组件怎么序列化成 dict。
  3. 03-rag-path.md — 把第 1 章的引擎和第 2 章的契约串起来:一条真实 RAG 请求端到端走一遍。
  4. 04-agent-loop.md — Agent:它不是用图实现的,而是在引擎之上手写的循环。看 State、退出条件、断点恢复。
  5. 05-clever-and-boundaries.md — 巧妙之处、边界局限、横向对比、完整代码地图。

如果你只想要一个心智模型:读完本页 + 第 1 章即可。