跳到主要内容

第 2 章 · chunk 向量检索:并发铺开,再融合收口

这一章讲向量检索这条线:从一个问题,怎么变成一组排好序、去过重的文字片段。核心是「分解 × 多 KB 并发 → 融合去重 → 重排」。

2.1 三层结构

chunk 检索分三层,职责清晰:

ChunkFusionRetriever (融合层:分解问题 + 跨 KB 并发 + 融合)
└── 对每个知识库各持一个 ↓
ChunkSimpleRetriever (执行层:单个 KB 的一次向量检索 + 后处理)
└── TiDBVectorStore.query() (存储层:TiDB 上的余弦近邻搜索)
  • 融合层 ChunkFusionRetriever(retrievers/chunk/fusion_retriever.py):决定「查几个问题、查几个库、结果怎么合」。
  • 执行层 ChunkSimpleRetriever(retrievers/chunk/simple_retriever.py):一个 KB 的一次实际检索,含 reranker / metadata filter 等后处理。
  • 存储层 TiDBVectorStore(indices/vector_search/vector_store/tidb_vector_store.py):真正发 SQL 做向量近邻。

2.2 融合层:分解 × 多 KB 的笛卡尔积并发

融合的通用逻辑在基类 MultiKBFusionRetriever._retrieve(multiple_knowledge_base.py:54)。它做两件事:决定要查哪些 query,然后把 (query × retriever) 全部并发铺开

# 真实源码节选,multiple_knowledge_base.py:54
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
if self._use_query_decompose:
queries = self._gen_sub_queries(query_bundle) # 拆成多个子问题
else:
queries = [query_bundle] # 就用原问题

tasks, task_queries = [], []
for query in queries:
for i, retriever in enumerate(self._retrievers): # 每个 KB 一个 retriever
tasks.append(retriever.aretrieve(query.query_str))
task_queries.append((query.query_str, i))

task_results = run_async_tasks(tasks) # 全部并发执行
results = {qt: r for qt, r in zip(task_queries, task_results)}
return self._fusion(query_bundle.query_str, results) # 交给子类融合

读法: 若有 3 个子问题 × 2 个知识库,就并发发出 6 次检索,结果以 (子问题, KB序号) 为 key 收进 results,再交给 _fusion 合并。run_async_tasks 让这些检索并行,不必一个个等。

2.3 子问题分解:DSPy 把复杂问题拆成 ≤5 步

_gen_sub_queries(multiple_knowledge_base.py:77)调 QueryDecomposer.decompose。分解本身是一个 DSPy 程序(question_gen/query_decomposer.py):

# 真实源码节选,query_decomposer.py:28 DecomposeQuery 的 docstring(即 prompt)
"""You are an expert in knowledge base graph construction...
Your current task is to deconstruct the user's query into a series of step-by-step questions.
... 4. Constraints: Limit the output to no more than 5 questions ..."""

输出是结构化的 SubQuestions,每个子问题带一段 reasoning(为什么要问这步)。QueryDecomposer 还能从 complied_program_path 加载一个编译好的 DSPy 程序(query_decomposer.py:73)——也就是用 DSPy 优化器离线调好的 few-shot 版本,而不是裸 prompt。

关键: 分解默认不在对话主路径生效(第 1 章提过 search_relevant_chunks 写死了 use_query_decompose=False)。它主要通过 /retrieve/chunks API 的 use_query_decompose 配置打开,或在图谱检索里用。

2.4 融合去重:按 node.hash 取最大分(simple fusion)

几十路检索结果回来后,大量 chunk 会重复(同一段被多个子问题/库都捞到)。ChunkFusionRetriever._simple_fusion(fusion_retriever.py:59)用一招干净利落地去重:

# 真实源码节选,fusion_retriever.py:59
all_nodes: Dict[str, NodeWithScore] = {}
for nodes_with_scores in results.values():
for node_with_score in nodes_with_scores:
hash = node_with_score.node.hash # 用内容 hash 作 key
if hash in all_nodes:
max_score = max(node_with_score.score or 0.0, all_nodes[hash].score or 0.0)
all_nodes[hash].score = max_score # 重复则保留较高分
else:
all_nodes[hash] = node_with_score
return sorted(all_nodes.values(), key=lambda x: x.score or 0.0, reverse=True)

妙在哪:node.hash(chunk 内容哈希)而不是 id 去重——内容相同就算一个。重复出现时取最高分(max),理由是:某个子问题特别命中这段,这段就该排前面,不该被其它子问题的低分稀释。这是 RRF(Reciprocal Rank Fusion)之外的另一种朴素但有效的融合策略:max-pooling by content

2.5 执行层:单 KB 一次检索的全流程

ChunkSimpleRetriever._retrieve(simple_retriever.py:82)是一次真实检索的完整链路:

# 真实源码节选,simple_retriever.py:82
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
if query_bundle.embedding is None and len(query_bundle.embedding_strs) > 0:
query_bundle.embedding = self._embed_model.get_agg_embedding_from_queries(
query_bundle.embedding_strs) # ① 问题向量化
result = self._vector_store.query(
VectorStoreQuery(
query_str=query_bundle.query_str,
query_embedding=query_bundle.embedding,
similarity_top_k=self._config.similarity_top_k or self._config.top_k,
)) # ② TiDB 向量近邻
nodes = self._build_node_list_from_query_result(result)
for node_postprocessor in self._node_postprocessors:
nodes = node_postprocessor.postprocess_nodes(nodes, query_bundle=query_bundle) # ③ 后处理
return nodes[: self._config.top_k] # ④ 截断到 top_k

后处理器(_node_postprocessors)在构造时按配置组装(simple_retriever.py:64-80):

后处理器作用条件
MetadataPostFilter按元数据过滤(如只要某产品/版本)metadata_filter.enabled
reranker用交叉编码器重排,提精度reranker.enabled

2.6 存储层:TiDB 向量近邻 + oversampling

TiDBVectorStore.query(tidb_vector_store.py:167)发的是一条带两段 limit 的 SQL,这就是 oversampling(过采样) 的实现:

# 真实源码节选,tidb_vector_store.py:201
sub = alias(
subquery.order_by(asc("distance"))
.limit(query.similarity_top_k * self._oversampling_factor) # ① 先多取 N 倍候选
.subquery(), "sub")
stmt = (select(...).order_by(asc("distance"))
.limit(query.similarity_top_k)) # ② 再裁到 top_k

为什么过采样? HNSW 这类近似最近邻(ANN)索引为了快,会牺牲一点召回——可能漏掉真正最近的几个。先取 top_k × oversampling_factor 个候选(默认 5 倍,chunk/schema.py:23),再在这批里排序裁剪,能把召回率拉回来。注释也直说了:「The higher the factor, the higher recall rate」(tidb_vector_store.py:62)。

距离用余弦:self._chunk_db_model.embedding.cosine_distance(query.query_embedding)(tidb_vector_store.py:192),返回时换算成相似度 1 - distance(:254)。

2.7 reranker:HTTP 调一个交叉编码器

以本地 reranker 为例(rerankers/local/local_reranker.py),它就是把候选段落和 query 一起 POST 给一个打分服务,拿回 scores 排序:

# 真实源码节选,local_reranker.py:78
resp = self._session.post(self.api_url, json={
"query": query_bundle.query_str,
"model": self.model, # 默认 BAAI/bge-reranker-v2-m3
"passages": texts,
})
results = zip(range(len(nodes)), resp.json()["scores"])
results = sorted(results, key=lambda x: x[1], reverse=True)[: self.top_n] # 取 top_n

职责分工: 向量检索负责快、广地召回候选(top_k × oversampling),reranker 负责慢、准地在小批候选里精排(top_n)。这是 RAG 的经典两段式:召回 → 精排

2.8 小结

  • 融合层把 (子问题 × 知识库) 并发铺开,执行层各做一次「向量化 → TiDB 近邻 → 后处理 → 截断」。
  • 融合用 node.hash 去重 + max 取分,简单而稳。
  • TiDB 层靠 oversampling 补 ANN 的召回损失;reranker 在小批里精排。

下一章是全书最硬核的:知识图谱那条线的 retrieve_with_weight