LlamaIndex 灌库与切块 — 从文档到 Node
这章讲什么: 离线阶段——一份
Document怎么变成一批可检索的Node。先看「切块」(SentenceSplitter怎么在保住句子边界的前提下控制 块大小),再看「转换流水线」(run_transformations/IngestionPipeline)如何把切块、抽元数据、嵌入串起来,以及它的两大省钱机制:transformation 缓存 和 基于哈希的去重/更新。
2.1 一步直觉:灌库 = 跑一串「转换」
VectorStoreIndex.from_documents(docs) 看着是一个方法,内部其实是:
Documents ─▶ run_transformations(默认 [SentenceSplitter(), embed_model]) ─▶ Nodes(带嵌入)
─▶ 存进 vector store / docstore
源码:indices/base.py:111-129——先把每个 doc 的 hash 记进 docstore,再 run_transformations(documents, transformations, ...) 得到 nodes,最后用这批 nodes 构造索引。默认 transformations 来自 Settings.transformations(indices/base.py:109)。
转换的统一接口 TransformComponent(schema.py:191-204)只有一个抽象方法:
# 示意,非源码:任何「一步变换」都是 nodes → nodes
def __call__(self, nodes: Sequence[BaseNode], **kw) -> Sequence[BaseNode]: ...
切块器、元数据抽取器、嵌入模型全都实现这个签名,于是能任意串联。
2.2 SentenceSplitter:控制块大小,但别切碎句子
切块的两难:块太大 → 检索不精准、塞不进上下文;块太小 → 语义破碎。SentenceSplitter 的取法是**「尽量按完整句子凑到目标 token 数,带少量重叠」**。
思路:多级降级切分
当一段文本超过 chunk_size,_split 按这个优先级逐级降级地把它拆小(node_parser/text/sentence.py:198-245,docstring 在 199-207 列出顺序):
超过 chunk_size?
① 按段落分隔符切
② 还太大 → 按句子切(默认 nltk 句子分词器)
③ 还太大 → 按从句正则切 "[^,\.;]+[,\.;]?"
④ 还太大 → 按空格切
命中「能放下」就停,记成一个 _Split(标记 is_sentence)
这是「怎么读这张图」:从上到下是降级顺序,某级切出的片段能放进 chunk_size 就不再往下切。优先在「人类自然的边界」(段落 > 句子 > 从句 > 词)断开。
有个真实边界:如果一个片段切到底仍超 chunk_size(比如某些 CJK / emoji 单字符 token 很大且 chunk_size 很小),递归会死循环。代码特判 len(text_splits_by_fns) == 1 时不再递归、保留这个超大片段,让后续 _merge 抛出可读的 "Single token exceeded chunk size" 而非 RecursionError(node_parser/text/sentence.py:226-239、:279-280)。
合并 + 重叠
切碎之后 _merge 反过来把小片段贪心地塞进一个 chunk 直到接近 chunk_size,然后开新 chunk;开新 chunk 时从上一个 chunk 末尾回填一段作为 overlap(node_parser/text/sentence.py:247-313,close_chunk 里的 overlap 回填在 264-273)。重叠的意义:跨块边界的句子不至于被一刀两断,检索召回率更稳。
元数据感知
split_text_metadata_aware(sentence.py:156-174)会先减去元数据占用的 token,再用「有效 chunk_size」切——因为元数据最终会和正文一起进嵌入/提示,必须预留空间;元数据太大会直接报错或告警(sentence.py:159-172)。
2.3 IngestionPipeline:转换链 + 缓存 + 去重
from_documents 适合一次性建库。要反复增量灌(数据每天更新)时,用 IngestionPipeline(ingestion/pipeline.py:262)。它在 run_transformations 之上加了两层省钱机制。
默认转换链:[SentenceSplitter(), Settings.embed_model](ingestion/pipeline.py:423-427)。
机制一:transformation 级缓存
run_transformations 对每一步算一个哈希 hash(输入nodes, 该transform),命中缓存就跳过这步、直接取结果(ingestion/pipeline.py:100-110):
# 示意,非源码:对应 run_transformations 的核心循环
for transform in transformations:
h = get_transformation_hash(nodes, transform)
cached = cache.get(h)
if cached is not None:
nodes = cached # 命中:跳过这步
else:
nodes = transform(nodes) # 未命中:跑,并写回缓存
cache.put(h, nodes)
意义:同一批没变的输入再跑一次,切块/嵌入都不用重算。
机制二:基于文档哈希的去重 / 更新(DocstoreStrategy)
配了 docstore 后,pipeline 用上一章讲的 Node hash 做文档级去重。三种策略(ingestion/pipeline.py:242-259):
| 策略 | 行为 | 用途 |
|---|---|---|
UPSERTS(默认) | 按 ref_doc_id 比对哈希:没见过→插入;哈希变了→删旧+插新;没变→跳过 | 增量更新,内容变了才重算 |
DUPLICATES_ONLY | 只看哈希在不在,只新增 | 纯追加、不更新 |
UPSERTS_AND_DELETE | UPSERTS + 删掉「这批里没出现」的旧文档 | 全量同步(镜像数据源) |
upsert 的真实逻辑很直白(ingestion/pipeline.py:469-507):
# 示意,非源码:对应 _handle_upserts
existing_hash = docstore.get_document_hash(ref_doc_id)
if not existing_hash: # 新文档
keep[ref_doc_id] = node
elif existing_hash != node.hash: # 改过了
docstore.delete_ref_doc(ref_doc_id) # 删旧
vector_store.delete(ref_doc_id) # 向量库也删
keep[ref_doc_id] = node # 排队重灌
# else: 完全没变 → 跳过
DUPLICATES_ONLY 更简单——按 Node hash 去重(连「同一批内的重复」也用 current_hashes 去掉),ingestion/pipeline.py:451-467。
这就把「数据每天小幅变化」的成本压到「只重算变化的那部分」。
2.4 巧妙之处
- 切块用「降级 + 贪心合并」而非定长硬切:在 token 预算内最大化保住句子完整性,且对「无法再切的超大单元 」有明确的非崩溃出口(
sentence.py:226-239)。 - 两级省钱:transformation 缓存省「重复计算」,docstore 去重省「未变文档的重灌」——二者正交,可叠加。
- 去重锚在内容哈希而非时间戳/版本号,数据源没有可靠版本信息也能用(
schema.py的hash+ingestion/pipeline.py:_handle_upserts)。
2.5 边界
- 去重要求 docstore 跨运行持久化,否则每次都「没见过」(
DocstoreStrategydocstring 在ingestion/pipeline.py:243-245明说)。 UPSERTS_AND_DELETE会删掉本批未出现的文档——只适合「这批就是全量」的镜像场景,增量场景误用会误删。
2.6 代码地图
| 主题 | 文件 | 符号 |
|---|---|---|
| 灌库入口 | indices/base.py | BaseIndex.from_documents |
| 转换接口 | schema.py | TransformComponent |
| 跑转换链 + 缓存 | ingestion/pipeline.py | run_transformations |
| 切块降级 | node_parser/text/sentence.py | SentenceSplitter._split |
| 合并 + 重叠 | node_parser/text/sentence.py | SentenceSplitter._merge |
| 元数据感知切块 | node_parser/text/sentence.py | split_text_metadata_aware |
| 流水线 | ingestion/pipeline.py | IngestionPipeline.run |
| 去重策略 | ingestion/pipeline.py | DocstoreStrategy, _handle_upserts, _handle_duplicates |
| 增量刷新 | indices/base.py | refresh_ref_docs |