跳到主要内容

流水线与注册表

本章讲 Chonkie 怎么把零散组件编排成一条可复用流水线,以及新组件如何自动挂载。这是 Chonkie 工程上最巧的部分。

1. CHOMP 流水线:声明乱序,执行有序

Pipeline 是一个流式构建器(src/chonkie/pipeline/pipeline.py:15)。每个 .xxx_with(...) 方法只是_steps 列表里追加一条记录,并不立刻执行(pipeline.py:272-292 等)。真正执行在 run()

关键巧妙处:你声明的顺序无所谓run() 第一件事就是 _reorder_steps(pipeline.py:484-517),把步骤按 CHOMP 固定顺序重排:

# 真实源码 pipeline.py:508
for step_type in ["fetch", "vision", "process", "chunk", "refine", "export", "write"]:

所以下面两种写法等价:

# 示意,非源码:两种顺序结果相同
Pipeline().chunk_with("recursive").refine_with("overlap") # 正序
Pipeline().refine_with("overlap").chunk_with("recursive") # 倒序——run() 会自动纠正

重排时还会自动补默认 chef:没声明 process 步骤,就塞一个 TextChef(pipeline.py:501-504),因为下游切块器需要 Document 输入。process/vision 只允许一个(取最后声明的);其余类型可多个(如多个 refinery,保持顺序)。

怎么读执行流

.run(texts=...)

├─ _reorder_steps() ← 按 CHOMP 重排 + 补 TextChef
├─ _validate_pipeline() ← 必须有 chunker;必须有 fetcher 或 texts
└─ for step in ordered:
_execute_step(step, data) ← data 在步骤间逐级传递

校验很硬(_validate_pipeline,pipeline.py:519-552):没有切块器直接报错——切块是 Chonkie 的不可省略的核心;没有 fetcher 又没传 texts 也报错(数据从哪来?)。

2. 注册表:装饰器把组件挂上去

组件怎么从字符串别名 "recursive" 找到 RecursiveChunker 类?靠一个全局注册表 ComponentRegistry(src/chonkie/utils/registry.py:512)。

每个组件类头上有个装饰器,如 @chunker("recursive")(见 recursive.py:25)。这些装饰器(registry.py:385-508)都包向通用的 pipeline_component(registry.py:320-381),在导入时把类注册进表:

# 真实源码 registry.py:366-371(节选)
ComponentRegistry.register(
name=cls.__name__, alias=alias,
component_class=cls, component_type=component_type,
)

巧妙处:注册时就做契约校验

装饰器不只是登记,还强制接口契约——按组件类型检查类是否实现了必需方法(registry.py:344-363):

# 真实源码 registry.py:345-353(节选)
required_methods = {
ComponentType.CHUNKER: ["chunk", "chunk_document"],
ComponentType.CHEF: ["process", "parse"], # 两个都要!
ComponentType.HANDSHAKE: ["write"],
...
}
missing = [m for m in required if not hasattr(cls, m)]
if missing:
raise ValueError(f"{cls.__name__} must implement {missing} ...")

少实现一个方法,导入时就炸,而不是运行到一半才挂——把错误左移到加载期。这也是为什么自定义组件只要继承对应 base 类(已实现 chunk_document 等)再打装饰器即可,见下文。

别名是按类型分作用域的((component_type, alias) 做键,registry.py:51-57),所以 chunker 和 refinery 可以同名;跨类型查找若撞名会报「歧义」让你指定类型(registry.py:108-121)。

3. 参数魔法:一份 kwargs,自动拆两半

这是 Pipeline 最容易让人困惑、也最聪明的一处。看这行:

# 示意:chunk_size 给谁?recipe 又给谁?
pipeline.chunk_with("recursive", chunk_size=512, recipe="markdown")

chunk_sizeRecursiveChunker.__init__ 的参数(构造时用),但有些参数是给 chunk() 方法调用时用的。Pipeline 怎么知道哪个给谁?答案:inspect.signature 读两边签名,自动分配(_split_parameters,pipeline.py:662-710):

# 真实源码 pipeline.py:682-695(节选)
init_param_names = set(inspect.signature(component_class.__init__).parameters) - {"self"}
method_param_names = set(inspect.signature(method).parameters) - exclude
init_kwargs = {k: v for k, v in kwargs.items() if k in init_param_names}
call_kwargs = {k: v for k, v in kwargs.items() if k in method_param_names}
  • 落在 __init__ 签名里的 → 构造组件时用。
  • 落在方法签名里的 → 调用时用。
  • 两边都不认的 → 立刻报错,并友好地列出可用参数(pipeline.py:703-708)。

recipe/lang元参数,在拆分前就被 pop 出来(pipeline.py:569-570):若组件有 from_recipe,就走配方构造(pipeline.py:589-596)。这把第 2 章 RecursiveChunker.from_recipe 接到了 Pipeline 上。

巧妙处:组件实例缓存

构造组件(尤其加载嵌入模型)可能很贵。Pipeline 用 (组件名, init_kwargs 的 JSON) 做键缓存实例(pipeline.py:580-601):同样配置的组件只构造一次。这对「同一 Pipeline 跑多批数据」很关键。

4. 执行分派:按步骤类型调对的方法

_call_component(pipeline.py:712-786)是个按 step_type 分派的大 switch。它知道每类组件该调哪个方法、上游数据该怎么喂:

步骤调用输入处理
chunkcomponent.chunk_document(doc)单 doc 或 list 逐个
refinecomponent.refine_document(doc)同上
exportcomponent.export(chunks)从 doc 抽 chunks,返回 doc 续传
writecomponent.write(chunks)抽 chunks 写库

注意 chunk/refine 走的是 *_document 变体(不是裸 chunk),这样文档元数据能一路带下去(回看第 1 章 chunk_document 的双模式)。异步版 _acall_component(pipeline.py:788-850)结构对称,对 list 输入用 asyncio.gather 并发。

5. 自己加一个组件(扩展点)

# 示意,非源码:三步加一个自定义切块器
from chonkie.chunker.base import BaseChunker
from chonkie.pipeline import chunker
from chonkie.types import Chunk

@chunker("myalias") # 1. 打装饰器注册别名
class MyChunker(BaseChunker): # 2. 继承基类(白送 chunk_document)
def chunk(self, text: str) -> list[Chunk]: # 3. 只实现这一个方法
...
# 之后 Pipeline().chunk_with("myalias") 就能用

继承 BaseChunkerchunk_document 等契约方法自动满足,装饰器校验便能通过。

6. 代码地图

主题文件符号
流水线构建器src/chonkie/pipeline/pipeline.pyPipeline
步骤重排(CHOMP)src/chonkie/pipeline/pipeline.py_reorder_steps
校验src/chonkie/pipeline/pipeline.py_validate_pipeline
参数自动拆分src/chonkie/pipeline/pipeline.py_split_parameters
实例缓存 + 配方src/chonkie/pipeline/pipeline.py_prepare_step_execution
执行分派(同步/异步)src/chonkie/pipeline/pipeline.py_call_component_acall_component
配方加载src/chonkie/pipeline/pipeline.pyPipeline.from_recipefrom_config
全局注册表src/chonkie/utils/registry.pyComponentRegistry_ComponentRegistry
注册装饰器 + 契约校验src/chonkie/utils/registry.pypipeline_componentchunker/refinery/…
组件类型枚举src/chonkie/utils/component.pyComponentType