跳到主要内容

CORE 摄取流水线 — 一段话怎么变成记忆

本章讲什么: 从「一段原始对话/文档进来」到「事实落进存储」,中间发生的每一步。这是写入侧的主线,读懂它你就懂了 CORE 一半。

1. 三阶段全景

CORE 把写入拆成三个队列阶段,前两个由 processEpisodePreprocessingprocessEpisodeIngestion 串起,第三个异步:

阶段一:预处理 阶段二:摄取(同步主线) 阶段三:图解析(异步)
(preprocess) (addEpisode) (graph-resolution)
│ │ │
切块?版本?diff? 清洗→抽取→反思→分类→落库 去重/合并/矛盾失效
│ │ │
存 episode 到图谱 世界事实→Neo4j / 心声→Aspects 见 03-graph-resolution

本章只讲阶段一、二;阶段三单独成章(03)。

阶段一、二为何分开?——为了省 token + 容错。预处理负责「切块、对文档算 diff」这类不需要每个 chunk 都重做的工作,而且把 episode 先存进图谱(避免下游竞态)。见 preprocess-episode.logic.ts:399-447(注释明确写了「Save episodes to Neo4j BEFORE enqueueing ingestion … race condition fix」)。

2. 阶段一:预处理(切块 / 版本 / diff)

2.1 切块:超过阈值才切

EpisodeChunker.needsChunking 用 token 数判断:

// 示意,非源码:重点看「按 token 阈值决定切不切」
needsChunking(content) {
const tokens = encode(content).length; // tiktoken 数 token
return tokens >= 1800; // ≥ maxChunkSize 才切
}

真实实现 episodeChunker.server.ts:65-68,阈值来自 DEFAULT_CONFIG(episodeChunker.server.ts:40-45):目标块 1250、最小 750、最大 1800 token。切块按段落边界、尽量贴近目标大小(episodeChunker.server.ts:141-169)。

2.2 文档版本与 diff:只摄取「改动」

这是预处理最妙的一手。对 DOCUMENT 类型,EpisodeVersioningService.analyzeVersionChanges 先算版本号,若已存在且内容变了,就跟上一版做 git 风格 diff,把只有改动的那部分当作要摄取的内容:

第 1 版文档全文 ──┐
├─► getGitStyleDiff(旧, 新) ──► 只有「+增/-删」的行
第 2 版文档全文 ──┘ └─► 这部分才喂给 LLM 抽取

真实实现 preprocess-episode.logic.ts:196-295。日志里直接打印 tokenSavings(:280-285)——(1 - diffLength/fullLength) * 100%。originalEpisodeBody 仍保留全文供下次比对(:358)。

坑/细节: 「压缩后的会话」(compact conversation)也走类似路径,但旧内容从 Postgres 的 Document 表取,而非从图谱(:207-242)。

3. 阶段二:摄取主线(addEpisode)

KnowledgeGraphService.addEpisode(knowledgeGraph.server.ts:78-393)是同步主线,六步走:

Step 1 取/建 episode ──► Step 2 拉上下文(同会话近 5 条 + 语义相似)
│ │
▼ ▼
Step 3 normalize(LLM 清洗)──► 若返回 NOTHING_TO_REMEMBER 就提前退出


Step 3.5 comprehendAndClassify(本章核心,见 §4)
│ ├─ 抽取世界事实 + 用户心声(并行)
│ ├─ 反思去噪(过滤会话噪声)
│ └─ 分类成 aspect

Step 4 落库:世界事实→saveTriple(Neo4j);心声→saveVoiceAspects(Aspects Store)
Step 5 生成并存嵌入(fact 嵌入 + entity 嵌入)→ 向量库

3.1 上下文检索(Step 2)

抽取前先给 LLM 喂「相关记忆」,提升一致性。两路:

  • 同会话最近 N 条(默认 DEFAULT_EPISODE_WINDOW = 5,knowledgeGraph.server.ts:66),按 sessionId 取(:204-218)。
  • 语义相似的旧 episode + 旧 fact(getRelatedMemories,:837-894),用 episode 内容的嵌入做向量搜索,minSimilarity 默认 0.75。

3.2 normalize:清洗成干净叙述(Step 3)

normalizeEpisodeBody(:725-826)用 LLM 把原始片段(可能是聊天记录、邮件、diff)清洗成统一的第三人称叙述,顺便注入用户名(做 user-centric 抽取)、该来源的摄取规则(getIngestionRulesForSource,:899-958,从 Postgres 拿)。

输出要求包在 <output>…</output> 里(:796)。若 LLM 判定这段没有可记的内容,返回字符串 NOTHING_TO_REMEMBER,主线直接退出并返回 0 语句(:241-250)——这也是退款逻辑的触发点(见 ingest-episode.logic.ts)。

4. 核心:comprehend + classify(世界 vs 心声)

这是 CORE 记忆「分而治之」的灵魂。comprehendAndClassify(knowledgeGraph.server.ts:404-720)把一段内容拆成两种本质不同的知识,各走各的管线。

为什么要分两种? 因为它们的本性不同:

类别是什么例子怎么存最好
世界事实 graph_facts客观的「谁-怎样-谁」「John 在 Google 工作」拆成原子 SPO 三元组进图谱(可连接、可推理)
用户心声 voice_facts主观的整句指令/偏好「永远在发 PR 前跑测试」整句进 Aspects Store(拆开就没意义)

类型定义见 packages/types/src/graph/graph.entity.ts:207-232:VOICE_ASPECTS(Directive/Preference/Habit/Belief/Goal/Task)与 GRAPH_ASPECTS(Identity/Event/Relationship/Decision/Knowledge/Problem/Task)。

4.1 三步:抽取 → 反思 → 分类

┌─ extract-world ─► graph_facts + entities ─┐
内容 ──┤ (并行,medium 模型) ├─► reflect 去噪 ─► classify ─► 落库
└─ extract-voice ─► voice_facts ────────────┘ (low,过会话噪声) (分 aspect)

Step 1 抽取(并行) —— extractWorldPrompt 抽世界事实 + 实体,extractVoicePrompt 抽心声,两个 LLM 调用并行(:422-468)。

Step 1.5 反思去噪 —— 关键的一道防线。reflectWorldPrompt / reflectVoicePrompt 用低成本模型过滤掉会话噪声(比如「让我想想」「好的谢谢」这类抽出来但不值得记的)。失败时优雅降级——回退到未过滤的原始抽取结果(:503-509)。

Step 2 分类 —— classifyVoicePrompt 把心声分成 Directive/Preference/… ;classifyWorldPrompt 把世界事实分成 Identity/Event/… 并附 event_date(:554-614)。

4.2 把分类结果织成三元组

分类完的世界事实要变成 Triple(subject-predicate-object + statement)。代码维护两张 map 来去重实体和谓词(同名只建一个节点),再组装:

// 示意,非源码:把「分类后的世界事实」织成图谱三元组
for (const stmt of classifiedWorld) {
const subject = entityMap.get(stmt.source) ?? newEntity(stmt.source); // 主语实体
const object = entityMap.get(stmt.target) ?? newEntity(stmt.target); // 宾语实体
const predicate = predicateMap.get(stmt.predicate); // 谓词也是实体
const statement = {
fact: stmt.fact, aspect: stmt.aspect,
validAt: episode.validAt, // 事实「何时成立」
invalidAt: null, // 出生即有效,等待未来被矛盾失效
};
triples.push({ statement, subject, predicate, object, provenance: episode });
}

真实实现 knowledgeGraph.server.ts:616-709。注意 predicate 本身也是一个 Entity(type="Predicate")——这是「具体化」的体现,下一章详述。validAt 取 episode 的 validAt(即 referenceTime),invalidAt 初始为 null

4.3 落库与嵌入(Step 4–5)

  • 世界三元组逐个 saveTriple(:292-294)→ Neo4j。
  • 心声 saveVoiceAspects(:298-311)→ Aspects Store(Postgres + 向量),分类为 null 的(被分类器否决)会被过滤掉(:711-717)。
  • 给每条 fact 和每个 entity 生成嵌入并批量入库(:313-366),供后续相似度去重 / 检索用。

5. 摄取之后:触发下游

主线返回后,processEpisodeIngestion(ingest-episode.logic.ts:55-340)负责编排下游异步任务:图解析(enqueueGraphResolution,:129-154)、标签分配、标题生成、persona 生成。还处理信用额度:NOTHING_TO_REMEMBER 时退还预留 credits(:170-192)。

6. 代码地图

主题文件符号
摄取主线(6 步)apps/webapp/app/services/knowledgeGraph.server.tsKnowledgeGraphService.addEpisode
世界 vs 心声抽取+分类apps/webapp/app/services/knowledgeGraph.server.tscomprehendAndClassify
清洗 / NOTHING_TO_REMEMBERapps/webapp/app/services/knowledgeGraph.server.tsnormalizeEpisodeBody
预处理(切块/版本/diff)apps/webapp/app/jobs/ingest/preprocess-episode.logic.tsprocessEpisodePreprocessing
切块阈值apps/webapp/app/services/episodeChunker.server.tsEpisodeChunker.needsChunking / DEFAULT_CONFIG
下游编排 + 退款apps/webapp/app/jobs/ingest/ingest-episode.logic.tsprocessEpisodeIngestion
aspect 类型定义packages/types/src/graph/graph.entity.tsVOICE_ASPECTS / GRAPH_ASPECTS / StatementAspects