检索抽象与数据灌入
本章讲数据侧的两件事:查询时
search()怎么把 N 种向量库统一起来并行检索;以及灌库时数据怎么从 Schema.org JSON 变成向量库里的可检索文档。
3.1 一个 search(),背后多种向量库
NLWeb 支持 Qdrant / Azure AI Search / Milvus / Postgres / Elasticsearch / Snowflake / Cloudflare AutoRAG / Bing / Shopify MCP 等一长串后端。它用经典的「抽象基类 + 具体实现」把它们统一(core/retriever.py):
VectorDBClientInterface(retriever.py:144)是抽象基类,定义search/upload_documents/search_by_url/search_all_sites等抽象方法。- 每个后端在
retrieval_providers/下有一个实现(如qdrant.py、azure_search_client.py)。 VectorDBClient(retriever.py:351)是「统一门面」,按配置把操作路由到对应实现。
按需安装依赖。某后端真正被用到时才 pip install 它的包(_ensure_package_installed,retriever.py:96,依据 _db_type_packages 映射,retriever.py:80)。这样装 NLWeb 不必拖进所有数据库的 SDK。
客户端缓存。get_vector_db_client 用全局缓存避免重复初始化(retriever.py:1055)。
3.2 多端点并行检索 + 合并去重
一个站点的数据可能分散在多个配置的后端。VectorDBClient.search 把所有「能处理该站」的端点并行查,再合并(retriever.py:780):
# 示意,非源码:多端点并行检索
tasks = []
for endpoint in enabled_endpoints:
client = get_client(endpoint)
if len(enabled_endpoints) == 1 or await client.can_handle_query(site):
tasks.append(client.search(query, site, num_results))
results = await asyncio.gather(*tasks, return_exceptions=True) # 并行
final = aggregate_results(results) # 按 URL 合并去重
端点能不能处理某站由 can_handle_query 判断(retriever.py:256):它问后端「你有哪些站」(get_sites),命中才查。这里有个 stale-while-revalidate 缓存(_get_cached_sites,retriever.py:289)——缓存新鲜直接用;稍旧就先返回旧值、后台异步刷新;太旧才同步拉取。避免每次检索都去问一遍后端有哪些站。
合并去重(_aggregate_results,retriever.py:608):同一个 URL 在多个端点都出现时,把各端点的 JSON 数据合并成一条(merge_json_array),并尽量保留各端点的相关性顺序(交错取值,retriever.py:662)。
结果四元组。检索返回的每条是 [url, json_str, name, site](retriever.py:631),这个四元组贯穿到 ranking(ranking.py:214 rankItem(url, json_str, name, site))。记住这个形状,整条管道的「条目」都是它。
3.3 site="all" 的特殊处理
当 site == "all":若配置里指定了具体站点列表,就替换成那些 站(retriever.py:796);否则调 search_all_sites(retriever.py:842)。而 "all" 在 FastTrack 视角下是否「支持标准检索」还取决于聚合开关(fastTrack.py:29)——聚合关掉时,"all" 被当成普通单站检索。
3.4 数据怎么灌进去
查询能跑的前提是库里有数据。灌库(ingestion)在 data_loading/。核心是把一条带 Schema.org 标注的网页内容,变成向量库里的文档。
prepare_documents_from_json(db_load_utils.py:157)是这一步的缩影:
# 示意,非源码:一条 schema.org JSON → 向量库文档
json_obj = json.loads(json_data)
trimmed = trim_schema_json(json_obj, site) # 裁掉无用字段,瘦身
for i, item in enumerate(as_list(trimmed)):
item_url = url if i == 0 else f"{url}#{i}" # 一页多条则加锚点
doc = {
"id": str(int64_hash(item_url)), # URL 哈希做稳定 id
"schema_json": json.dumps(item), # 原始结构化数据
"url": item_url,
"name": get_item_name(item),
"site": site,
}
几个要点:
- 裁剪:
trim_schema_json先把 Schema.org JSON 瘦身(去元数据等),减小嵌入与打分时的体积。 - 稳定 id:用 URL 的哈希当主键(
int64_hash,db_load_utils.py:58),同一 URL 重灌覆盖而非重复。 - 嵌入文本就是 schema JSON 本身:
texts.append(item_json)(db_load_utils.py:202)——直接把结构化 JSON 拿去算 embedding,让向量同时编码字段名和值。 - 嵌入走统一的
get_embedding/batch_get_embeddings(core/embedding.py:28、embedding.py:185),同样是 provider 可插拔(OpenAI / Gemini / Ollama / Azure / Snowflake 等,见embedding_providers/)。
上传则经 VectorDBClient.upload_documents(retriever.py:744),走配置的 write_endpoint(与读端点分离,retriever.py:438)。
巧妙之处
- 读写端点分离:检索可以多端点并行,写只走一个
write_endpoint(retriever.py:719),职责清晰。 - stale-while-revalidate 站点缓存:用「先给旧的、后台刷新」把「问后端有哪些站」从关键路径上移走(
retriever.py:304-309)。 - 嵌入整段 JSON:不挑字段、直接嵌入结构化 JSON,让语义检索 能同时利用字段名和值——简单但有效。
- 按需装 SDK:
_db_type_packages+ 运行时 pip(retriever.py:96),让核心安装轻量。
边界与局限
- 运行时
pip install(retriever.py:132)在离线/受限环境会失败;适合开发便利,生产宜预装。 - 去重只按 URL(
_deduplicate_by_url,retriever.py:583取 content 更长者),同一条目若 URL 不同则去不掉。 - 检索质量完全取决于灌进去的 Schema.org 标注质量——「垃圾进,垃圾出」。
代码地图
| 主题 | 文件 | 符号 |
|---|---|---|
| 后端抽象基类 | core/retriever.py | VectorDBClientInterface |
| 统一门面 / 路由 | core/retriever.py | VectorDBClient / get_vector_db_client |
| 多端点并行检索 | core/retriever.py | VectorDBClient.search |
| 合并去重 | core/retriever.py | _aggregate_results / _deduplicate_by_url |
| 站点缓存(SWR) | core/retriever.py | RetrievalClientBase._get_cached_sites |
| 按需装包 | core/retriever.py | _ensure_package_installed / _db_type_packages |
| 简化检索入口 | core/retriever.py | search(模块级函数) |
| schema → 文档 | data_loading/db_load_utils.py | prepare_documents_from_json / int64_hash |
| 嵌入生成 | core/embedding.py | get_embedding / batch_get_embeddings |