跳到主要内容

检索抽象与数据灌入

本章讲数据侧的两件事:查询时 search() 怎么把 N 种向量库统一起来并行检索;以及灌库时数据怎么从 Schema.org JSON 变成向量库里的可检索文档。

3.1 一个 search(),背后多种向量库

NLWeb 支持 Qdrant / Azure AI Search / Milvus / Postgres / Elasticsearch / Snowflake / Cloudflare AutoRAG / Bing / Shopify MCP 等一长串后端。它用经典的「抽象基类 + 具体实现」把它们统一(core/retriever.py):

  • VectorDBClientInterfaceretriever.py:144)是抽象基类,定义 search / upload_documents / search_by_url / search_all_sites 等抽象方法。
  • 每个后端在 retrieval_providers/ 下有一个实现(如 qdrant.pyazure_search_client.py)。
  • VectorDBClientretriever.py:351)是「统一门面」,按配置把操作路由到对应实现。

按需安装依赖。某后端真正被用到时才 pip install 它的包(_ensure_package_installedretriever.py:96,依据 _db_type_packages 映射,retriever.py:80)。这样装 NLWeb 不必拖进所有数据库的 SDK。

客户端缓存get_vector_db_client 用全局缓存避免重复初始化(retriever.py:1055)。

3.2 多端点并行检索 + 合并去重

一个站点的数据可能分散在多个配置的后端。VectorDBClient.search 把所有「能处理该站」的端点并行查,再合并(retriever.py:780):

# 示意,非源码:多端点并行检索
tasks = []
for endpoint in enabled_endpoints:
client = get_client(endpoint)
if len(enabled_endpoints) == 1 or await client.can_handle_query(site):
tasks.append(client.search(query, site, num_results))
results = await asyncio.gather(*tasks, return_exceptions=True) # 并行
final = aggregate_results(results) # 按 URL 合并去重

端点能不能处理某站can_handle_query 判断(retriever.py:256):它问后端「你有哪些站」(get_sites),命中才查。这里有个 stale-while-revalidate 缓存(_get_cached_sitesretriever.py:289)——缓存新鲜直接用;稍旧就先返回旧值、后台异步刷新;太旧才同步拉取。避免每次检索都去问一遍后端有哪些站。

合并去重_aggregate_resultsretriever.py:608):同一个 URL 在多个端点都出现时,把各端点的 JSON 数据合并成一条(merge_json_array),并尽量保留各端点的相关性顺序(交错取值,retriever.py:662)。

结果四元组。检索返回的每条是 [url, json_str, name, site]retriever.py:631),这个四元组贯穿到 ranking(ranking.py:214 rankItem(url, json_str, name, site))。记住这个形状,整条管道的「条目」都是它。

3.3 site="all" 的特殊处理

site == "all":若配置里指定了具体站点列表,就替换成那些站(retriever.py:796);否则调 search_all_sitesretriever.py:842)。而 "all" 在 FastTrack 视角下是否「支持标准检索」还取决于聚合开关(fastTrack.py:29)——聚合关掉时,"all" 被当成普通单站检索。

3.4 数据怎么灌进去

查询能跑的前提是库里有数据。灌库(ingestion)在 data_loading/。核心是把一条带 Schema.org 标注的网页内容,变成向量库里的文档。

prepare_documents_from_jsondb_load_utils.py:157)是这一步的缩影:

# 示意,非源码:一条 schema.org JSON → 向量库文档
json_obj = json.loads(json_data)
trimmed = trim_schema_json(json_obj, site) # 裁掉无用字段,瘦身
for i, item in enumerate(as_list(trimmed)):
item_url = url if i == 0 else f"{url}#{i}" # 一页多条则加锚点
doc = {
"id": str(int64_hash(item_url)), # URL 哈希做稳定 id
"schema_json": json.dumps(item), # 原始结构化数据
"url": item_url,
"name": get_item_name(item),
"site": site,
}

几个要点:

  • 裁剪trim_schema_json 先把 Schema.org JSON 瘦身(去元数据等),减小嵌入与打分时的体积。
  • 稳定 id:用 URL 的哈希当主键(int64_hashdb_load_utils.py:58),同一 URL 重灌覆盖而非重复。
  • 嵌入文本就是 schema JSON 本身texts.append(item_json)db_load_utils.py:202)——直接把结构化 JSON 拿去算 embedding,让向量同时编码字段名和值。
  • 嵌入走统一的 get_embedding / batch_get_embeddingscore/embedding.py:28embedding.py:185),同样是 provider 可插拔(OpenAI / Gemini / Ollama / Azure / Snowflake 等,见 embedding_providers/)。

上传则经 VectorDBClient.upload_documentsretriever.py:744),走配置的 write_endpoint(与读端点分离,retriever.py:438)。

巧妙之处

  • 读写端点分离:检索可以多端点并行,写只走一个 write_endpointretriever.py:719),职责清晰。
  • stale-while-revalidate 站点缓存:用「先给旧的、后台刷新」把「问后端有哪些站」从关键路径上移走(retriever.py:304-309)。
  • 嵌入整段 JSON:不挑字段、直接嵌入结构化 JSON,让语义检索能同时利用字段名和值——简单但有效。
  • 按需装 SDK_db_type_packages + 运行时 pip(retriever.py:96),让核心安装轻量。

边界与局限

  • 运行时 pip installretriever.py:132)在离线/受限环境会失败;适合开发便利,生产宜预装。
  • 去重只按 URL(_deduplicate_by_urlretriever.py:583 取 content 更长者),同一条目若 URL 不同则去不掉。
  • 检索质量完全取决于灌进去的 Schema.org 标注质量——「垃圾进,垃圾出」。

代码地图

主题文件符号
后端抽象基类core/retriever.pyVectorDBClientInterface
统一门面 / 路由core/retriever.pyVectorDBClient / get_vector_db_client
多端点并行检索core/retriever.pyVectorDBClient.search
合并去重core/retriever.py_aggregate_results / _deduplicate_by_url
站点缓存(SWR)core/retriever.pyRetrievalClientBase._get_cached_sites
按需装包core/retriever.py_ensure_package_installed / _db_type_packages
简化检索入口core/retriever.pysearch(模块级函数)
schema → 文档data_loading/db_load_utils.pyprepare_documents_from_json / int64_hash
嵌入生成core/embedding.pyget_embedding / batch_get_embeddings