跳到主要内容

第 1 章:任务流水线引擎

本章讲 Cognee 的骨架:为什么 add、cognify 是「同一台引擎跑不同配方」,以及这台引擎(run_tasks_base)怎么用递归把一串任务串起来。读完你能看懂仓库里任何一条流水线。

1.1 它要解决的小问题

「把一段文本变成知识图谱」其实是好几步:分类 → 切块 → 抽图 → 落库。这些步骤:

  • 顺序固定,但每一步的输出是下一步的输入;
  • 有的步骤要分批跑(比如一次喂 100 个 chunk 给 LLM);
  • 希望可组合、可替换(换个切块策略、加一步时序抽取)。

Cognee 的答案:把每一步封装成一个 Task,用一个通用引擎把它们串起来跑。add 和 cognify 的区别仅仅是 Task 列表不同

1.2 思路:Task 是「函数 + 配置」,流水线是「Task 列表」

一个 Task 就是「一个可执行函数」加上「批大小等配置」。看 cognify 的默认任务列表,结构一目了然(cognee/api/v1/cognify/cognify.py:308-339,get_default_tasks):

# 摘要(真实源码在 get_default_tasks),每个 Task(fn, ...args) 就是流水线的一节
default_tasks = [
Task(classify_documents), # ① 原始 Data → 类型化 Document
Task(extract_chunks_from_documents, # ② Document → 语义文本块
max_chunk_size=..., chunker=chunker),
Task(extract_graph_and_summarize, # ③ LLM 抽实体/关系 + 摘要
graph_model=graph_model,
task_config={"batch_size": chunks_per_batch}),
Task(add_data_points, # ④ 节点/边/向量落库
task_config={"batch_size": chunks_per_batch}),
Task(extract_dlt_fk_edges), # ⑤ 结构化数据的外键边
]

对比 add 的任务列表——只有两步(cognee/api/v1/add/add.py,tasks 局部变量):

tasks = [
Task(resolve_data_directories, include_subdirectories=True), # 展开目录/路径
Task(ingest_data, dataset_name, user, node_set, ...), # 抽文本、存 Data 记录
]

同一个 run_pipeline,只是 tasks 不同。 这就是 Cognee 整个架构的复用支点。

1.3 图示:任务怎么串成一条流水线

怎么读:数据从左边进,经过每个 Task,前一个的输出直接成为后一个的输入extract_graph_and_summarize 这种带 batch_size 的任务会先攒够一批再跑。

raw text


[classify_documents] ──Document──▶ [extract_chunks] ──chunks──▶ [extract_graph_and_summarize]
(batch=100) │
summaries / nodes


[add_data_points] ──▶ DB

1.4 原理演示:递归串联的最小模型

引擎的精髓是「处理第一个任务,把它的每个输出递归喂给剩下的任务」。下面这段示意代码抓住了核心思想:

# 示意,非源码 —— 演示「递归串任务」的核心思想
async def run_tasks(tasks, data):
if not tasks: # 没有任务了,data 就是最终结果
yield data
return
head, *rest = tasks # 取第一个任务,剩下的留给递归
async for out in head.execute(data): # 第一个任务可能产出多个结果(流式)
async for result in run_tasks(rest, out): # 每个结果继续走剩下的任务
yield result

重点看: 任务是「异步生成器」,一个任务可以 yield 多次(比如切块产出多个 chunk),每次 yield 都会独立地把后续任务再走一遍。这让流水线天然支持「一进多出 / 流式」。

1.5 真实实现

真实引擎在 cognee/modules/pipelines/operations/run_tasks_base.py:260(run_tasks_base),结构和上面的示意几乎一一对应:

  • run_tasks_basetasks[0]running_task,tasks[1:]leftover_tasks,把下一个任务的 batch_size 算出来传下去。
  • 真正的执行 + 日志 + 遥测 + 错误处理在 handle_task(run_tasks_base.py:147),它 async for result_data in running_task.execute(...),对每个产出再 async for result in run_tasks_base(leftover_tasks, result_data, ...) ——这就是 1.4 里的双层递归(run_tasks_base.py:209-222)。

Task / TaskSpec / BoundTask 的定义在 cognee/modules/pipelines/tasks/task.py:TaskSpec@task 装饰器返回的可调用包装,调用它不执行函数,而是返回一个捕获了 kwargs 的 BoundTask,留给 run_pipeline 在执行时再注入流水线数据(task.pyBoundTask / TaskSpec docstring)。

1.6 关键细节:边跑边盖 provenance 戳

这是个容易忽略但很巧的细节。每当一个任务产出 DataPoint,handle_task递归地给它和它的嵌套字段盖上「血缘戳」:这条数据来自哪个流水线、哪个任务、哪个用户、属于哪个 node_set、内容哈希是多少(_stamp_provenance,run_tasks_base.py:33-110)。

几个讲究:

  • 只在字段为 None 时才写(if data.source_pipeline is None: ...),所以早期任务盖的戳不会被后面覆盖(run_tasks_base.py:64-67)。
  • 跨任务复用一个 visited 集合(ctx._provenance_visited),已盖过的节点在后续阶段直接跳过,避免重复递归(handle_taskprovenance_visited = ctx._provenance_visited,run_tasks_base.py:200)。
  • 还会写一个 topological_rank(任务在流水线里的第几步),给可视化按流水线顺序排版用(run_tasks_base.py:69-76)。

为什么重要: 这让图里每个节点都知道「我是怎么来的」,可视化、按用户/数据集过滤、溯源都靠它。

1.7 边界

  • 批大小由「下一个任务的 batch_size」决定(next_task_batch_size,run_tasks_base.py:278),不是当前任务——容易看错。
  • ctx(PipelineContext)只传给在签名里声明了 ctx 的任务(handle_taskif ctx is not None and running_task.accepts_ctx,run_tasks_base.py:172-174);没声明的任务拿不到 user/dataset 上下文。

1.8 代码地图

主题文件路径符号名
递归串任务的引擎cognee/modules/pipelines/operations/run_tasks_base.pyrun_tasks_base, handle_task
provenance 盖戳cognee/modules/pipelines/operations/run_tasks_base.py_stamp_provenance
Task 包装cognee/modules/pipelines/tasks/task.pyTask, TaskSpec, BoundTask
cognify 默认任务列表cognee/api/v1/cognify/cognify.pyget_default_tasks, get_temporal_tasks
add 任务列表cognee/api/v1/add/add.pyadd