跳到主要内容

Agentset 摄取管线 — 文件如何变成向量

本章讲什么: 写入侧的全程。一份文件 / 一个 URL / 一段文本,如何被切成 chunk、算成向量、存进向量库,以及这中间的两级异步任务和状态机。

1. 先建直觉:摄取就是「拆书 + 编码 + 上架」

RAG 要能「按语义检索」,前提是资料已经被预处理好。摄取(ingestion)干三件事:

  1. 拆书(分块 / chunking): 把一份长文档切成几百字一段的小块(chunk),因为检索和喂给 LLM 都是按「段」来的。
  2. 编码(嵌入 / embedding): 把每个 chunk 用嵌入模型变成一串数字(向量),语义相近的 chunk 向量也相近。
  3. 上架(落库 / upsert): 把「向量 + 原文 + 元数据」写进向量库,等着被检索。

Agentset 把这套做成两级后台异步任务,因为摄取可能很慢(几千个网页、几小时)。任务框架用的是 Trigger.dev(schemaTask 即「带 schema 校验的后台任务」)。

2. 顶层:两级任务 + 一个外部服务

ingestJob (一个摄取作业) process-document (逐文档,可并发 90)
┌──────────────────────────────┐ ┌────────────────────────────────────┐
│ ① 读作业配置 │ │ ① 文档置 PROCESSING │
│ ② 按 payload 类型分支: │ 批量 │ ② 拿 namespace 的嵌入模型 + 向量库 │
│ FILE/TEXT/CRAWL/YOUTUBE... │ 触发 │ ③ 若 CRAWL/YOUTUBE:chunk 已在 S3 │
│ ③ 建一行行 Document(QUEUED) │ ───────▶ │ 否则:调外部 Partition 服务现切 │
│ ④ 计费/页数限额检查 │ 子任务 │ ④ 每 30 条 chunk 一批:embedMany │
│ ⑤ batchTriggerAndWait 子任务 │ │ ⑤ vectorStore.upsert(算好向量的批) │
│ ⑥ 汇总成功/失败 → 作业状态 │ ◀─────── │ ⑥ 文档置 COMPLETED + 发 webhook │
└──────────────────────────────┘ 结果 └────────────────────────────────────┘


外部 Partition 服务(不在本仓库)
PARTITION_API_URL/ingest|crawl|youtube
负责真正的 OCR / 解析 / 分块

一个关键边界:真正的「切书」不在本仓库。 分块、OCR、PDF 解析这些重活外包给一个独立的 HTTP 服务(env.PARTITION_API_URL)。本仓库只负责编排、嵌入、落库、状态机。所以本章读不到「怎么切 PDF」的算法——那是闭源服务;能读到的是「怎么调它、怎么收它吐回来的 chunk」。

3. 第一级:ingestJob —— 把一个作业摊成多行文档

它要解决的小问题: 用户提交的「一个摄取作业」可能对应一个文件,也可能对应一次爬取(几千个网页)。要先把它规整成一行行 Document 记录,再逐个处理。

ingestJob(packages/jobs/src/tasks/ingest.ts:33,schemaTask,maxDuration 12 小时)按 payload.type 分支:

payload 类型怎么变成 Document
TEXT / FILE / MANAGED_FILE单个文档,直接 db.document.create(ingest.ts:359 起)
CRAWL先调 PARTITION_API_URL/crawl 爬,拿回一批页面,createManyAndReturn(ingest.ts:136)
YOUTUBE/youtube 拿字幕,批量建文档(ingest.ts:236)
多文件批量分 20 个一批 createManyAndReturn(ingest.ts:417 起)

精华细节:页数限额在建文档时就「卡」住。 免费计划有页数上限。代码在批量建文档时逐个累加预估页数,一旦超限,这一行文档直接建成 FAILED 状态、不进处理队列(ingest.ts:193-221):

// 示意,改写自 ingest.ts:193 起
const docPages = doc.total_characters / 1000; // 1 页 ≈ 1000 字符
const isOverLimit = !allowOverage && usedPages + docPages > availablePages;
if (!isOverLimit) usedPages += docPages;
// 超限的文档建成 FAILED,只有 QUEUED 的才进下一级队列
status: isOverLimit ? DocumentStatus.FAILED : DocumentStatus.QUEUED;

建完文档后,ingestJob 把它们按 30 个一批processDocument.batchTriggerAndWait 触发子任务并等结果(ingest.ts:498-535),最后汇总成功/失败、更新作业与组织/命名空间的页数统计。

4. 第二级:processDocument —— 切块 → 嵌入 → upsert

它要解决的小问题: 拿到一个具体文档,把它变成向量库里的一堆行。

processDocument(packages/jobs/src/tasks/process-document.ts:72,并发上限 90)流程:

4.1 准备:挑这个 namespace 该用的嵌入模型和向量库

// process-document.ts:171
const [embeddingModel, vectorStore] = await Promise.all([
getNamespaceEmbeddingModel(ingestJob.namespace, "document"),
getNamespaceVectorStore(ingestJob.namespace, document.tenantId),
]);

注意第二个参数:嵌入用 "document" 类型(有些厂商如 Voyage 区分「文档向量」和「查询向量」,见第 2 章);向量库带上 tenantId 做租户隔离。

4.2 拿 chunk:两条路

  • 已经切好的(CRAWL / YOUTUBE): chunk 在摄取作业阶段已被外部服务切好、存进 S3,这里直接 getChunksJsonFromS3 取回(process-document.ts:210)。
  • 现切的(FILE / TEXT):PARTITION_API_URL/ingest,但不是同步等结果——用 Trigger.dev 的 wait.createToken + wait.forToken 异步等外部服务回调(process-document.ts:267-296)。外部服务切完后把结果 POST 回这个 token,任务才继续。

这种「发请求 → 挂起等回调 token」的模式,是因为切一份大 PDF 可能要好几分钟,不能让任务一直占着连接干等。

4.3 核心:每 30 条 chunk 一批,嵌入 + upsert

这是写入侧真正的「循环」。processBatch(process-document.ts:34)对一批 chunk:

// 示意,改写自 process-document.ts:34 起。BATCH_SIZE = 30
const results = await embedMany({ // ① 一次批量算 30 个向量
model: embeddingModel,
values: batch.map((chunk) => chunk.text),
maxRetries: 5,
});
const chunks = batch.map((chunk, idx) => ({ // ② 向量 + 文本 + 元数据 拼好
documentId,
chunk: { ...chunk, metadata: { ...extraMetadata, ...chunk.metadata } },
embedding: results.embeddings[idx]!,
}));
await vectorStore.upsert({ chunks }); // ③ 整批写进向量库

为什么分批? 嵌入 API 按批调用更省钱、更快;upsert 整批写也比逐条写高效。30 这个数字是 BATCH_SIZE 常量(process-document.ts:32)。

4.4 chunk 在向量库里长什么样

makeChunk(packages/engine/src/chunk.ts:7)决定每行的 id 和字段:

// chunk.ts:7
id: `${documentId}#${chunk.id}`, // 文档 id + chunk id,天然带前缀、可按文档删
vector: embedding,
text: chunk.text,
metadata: removeTextFromMetadata ? chunk.metadata : { ...chunk.metadata, text: chunk.text },

精华:id 设计 = 文档级删除的钥匙。 id 形如 doc_abc#5,所以删一个文档时可以按 documentId 过滤批量删(见 process-document.ts:196deleteByFilter({ documentId })),不必逐 chunk 找。

5. 状态机与可观测性

文档在处理中会被反复改状态,每个状态都发一个 webhook,方便外部系统订阅:

QUEUED → PROCESSING → PRE_PROCESSING(等外部分块)→ PROCESSING → COMPLETED
↘ (任意步出错) FAILED

状态字段与时间戳见 process-document.ts:134-155(置 PROCESSING)、:358(置 COMPLETED)、onFailure 回调置 FAILED(:82)。每次转移都调 emitDocumentWebhook,事件名如 document.processing / document.ready / document.error

6. 巧妙之处(可借鉴)

  • 两级任务分治: 「编排(建文档 + 限额)」和「干活(嵌入 + 落库)」拆成两个任务,后者可独立高并发(90),前者只管批量触发和汇总(ingest.ts:502)。
  • 异步等外部服务用 token 而非轮询: wait.createToken / wait.forToken(process-document.ts:267),避免长时间占用执行槽。
  • 限额前置到建文档时: 超限文档直接建成 FAILED 不进队列(ingest.ts:198),省掉无谓的嵌入开销。
  • 重处理时先按 filter 清旧 chunk: cleanup 模式下先 deleteByFilter({ documentId }) 再重写,且 Pinecone 还要过 5 req/s 限速(process-document.ts:177-198)。

7. 边界与局限

  • 不在本仓库: 真正的解析 / OCR / 分块算法在闭源 Partition 服务里,本章只能讲调用契约(请求体 PartitionBody、返回 PartitionResult,见 packages/engine/src/partition/types.ts:95)。
  • chunk 大小默认 2048:partition/types.ts:36chunk_size 注释;可在作业/文档配置里覆盖(partition/index.ts:93)。
  • 页数按字符估: 「页」是 total_characters / 1000 的估算(ingest.ts:194),不是真实物理页。

8. 代码地图

主题文件路径符号名
摄取作业(第一级)packages/jobs/src/tasks/ingest.tsingestJob
逐文档处理(第二级)packages/jobs/src/tasks/process-document.tsprocessDocument / processBatch
组装外部分块请求体packages/engine/src/partition/index.tsgetPartitionDocumentBody
分块请求/响应类型packages/engine/src/partition/types.tsPartitionBody / PartitionResult / PartitionBatch
chunk → 向量库行packages/engine/src/chunk.tsmakeChunk
嵌入模型工厂packages/engine/src/embedding/index.tsgetNamespaceEmbeddingModel