跳到主要内容

编排引擎:10 个 Phase 的主线

这章讲 DeepCode 的「调度中枢」:execute_multi_agent_research_pipeline 怎么把整件事拆成 10 个工位、各工位产出落到哪、enable_indexing 开关砍掉哪几步、最后怎么如实汇报状态(而不是一律说成功)。

1. 它要解决的小问题

「把论文变成代码」是个长流程:解析、规划、搜参考、写代码……每一步都可能慢、可能失败。如果把它写成一坨,任何一步崩了都难定位,也难做「快速模式 vs 完整模式」的切换。

DeepCode 的答案:把流程切成编号清晰的 Phase,每个 Phase 一个 async 函数,串在一个主函数里顺序 await

2. 主线:10 个 Phase

入口是 execute_multi_agent_research_pipeline(workflows/agent_orchestration_engine.py:1739)。它的躯干就是一串带进度回调的 await:

# 示意,非源码:主流水线的骨架(对应 :1782-:1974)
ctx = await prepare_workflow_environment(...) # Phase 0+1 准备工作区
await acquire_input_artifact(ctx, logger) # Phase 2 下载/转 markdown
dir_info = await synthesize_workspace_infrastructure_agent(ctx, logger) # Phase 3
seg = await orchestrate_document_preprocessing_agent(dir_info, logger) # Phase 4 分段
await orchestrate_code_planning_agent(dir_info, logger, cb) # Phase 5 ★规划
if plan_review_callback: await run_plan_review_gate(...) # 人工/程序审计闸门
if enable_indexing: # Phase 6/7/8 只在完整模式跑
ref = await orchestrate_reference_intelligence_agent(...)
await automate_repository_acquisition_agent(...)
idx = await orchestrate_codebase_intelligence_agent(...)
impl = await synthesize_code_implementation_agent(...) # Phase 9 ★实现

关键观察:阶段间靠文件传话。 每个 Phase 把产物写到任务目录里的固定文件,下一个 Phase 去读。WorkflowContext(workflows/workflow_context.py:63)用 @property 把这些路径定死:

产物文件谁写谁读属性
*.md(论文转写)Phase 2Phase 5 规划
initial_plan.txtPhase 5Phase 9 实现initial_plan_path
reference.txtPhase 6Phase 7reference_path
code_base/Phase 7Phase 8
indexes/*.jsonPhase 8Phase 9(CodeRAG)
implement_code_summary.mdPhase 9 内部Phase 9 自己(记忆)

这就是 §index 里反复强调的「不靠对话历史、靠落盘文件传递状态」:任何 Phase 都能从盘上的中间产物断点续跑(ctx.skip_research_analysis 走 resume 路径,见 :1810)。

3. enable_indexing:一个开关砍掉三步

DeepCode 有「完整模式」和「快速模式」。区别就在 enable_indexing:

  • 完整模式(默认 True):跑 Phase 6/7/8——上网搜相关论文引用 → 下载 GitHub 仓库 → 给这些仓库建 CodeRAG 索引,实现时当参考。
  • 快速模式(False):这三步全跳过,只往对应文件写一句「skipped」占位,保持文件结构一致(:1912-:1963)。
# 示意,非源码:快速模式下仍写占位文件,保证下游不缺文件(对应 :1914-:1916)
if enable_indexing:
reference_result = await orchestrate_reference_intelligence_agent(...)
else:
reference_result = "Reference ... skipped - fast mode enabled"
with open(dir_info["reference_path"], "w") as f:
f.write(reference_result) # 占位,让 Phase 7/9 读得到

为什么这么设计: 搜+下+索引是最慢、最依赖网络的部分。快速模式让你在「不要参考、只凭论文硬写」时省掉它们,但下游代码不必加一堆 if 文件存在 的判断——文件总是存在,只是内容是占位符。这是「用占位保持契约不变」的小技巧。

4. 深入:几个值得注意的工程细节

① 规划产物缺失就硬中止。 Phase 5 跑完立刻检查 initial_plan.txt 是否存在,不存在直接 raise,绝不让后续阶段在没有计划的情况下空转(:1885-:1888)。这是「前置校验,早失败」。

② 诊断断点。 Phase 8 入口埋了个环境变量开关 DEEPCODE_STOP_AT_PHASE=8,设了就在这里抛异常停下,方便开发者检查中间状态(:1943-:1948)。平时不激活。

③ 如实汇报状态,而不是一律「成功」。 收尾时它把实现结果细分成 completed / incomplete / warning / error,incomplete 会明确写「写了 N 个文件,M 个没实现」(:2017-:2040)。这背后是实现循环返回的真实 run_state(见第 03 章)。

④ 状态一定回写 session store。 finally 块里无论成功、异常、还是被取消,都把最终状态写回 tasks.jsonl(:2068-:2087),CLI 和后端共用这条路径。用 _final_status 变量在各分支间传递,保证 finally 拿得到。

5. 巧妙之处

  • 编号 Phase + 落盘契约 = 可断点续跑、可单步调试。 整条流水线本质是「函数 + 文件」的状态机,没有隐藏的内存状态在阶段间漂。
  • 占位文件保持下游契约。 快速模式不改下游代码,只改文件内容。
  • task contextvar 绑定日志。 _bind_task(ctx.task_id) 让这条流水线里所有 loguru 调用自动路由到 <task_dir>/logs/system.jsonl(:1795-:1799),且用 token 栈式恢复,支持同进程跑多个任务。

6. 边界与局限

  • 严格顺序,无并行 Phase。 10 个 Phase 串行 await,慢的阶段(规划、实现)会拖住整条线。
  • 强依赖中间文件格式。 如果 initial_plan.txt 的结构被改坏,下游解析(第 02、03 章)会连锁出错。
  • chat 模式是另一条线。 自然语言/聊天式输入走的是 execute_chat_based_planning_pipeline(:2121)和 run_chat_planning_agent(:1630),不是本章这条论文复现主线。

7. 横向对比

和「单循环 ReAct agent」(典型编码助手:一个 while,模型自己决定下一步)不同,DeepCode 是固定工序的流水线 + 内嵌一个实现子循环。它把「规划」从「实现」里彻底剥离成独立 Phase 并冻结产物——这点更像传统编译器的「前端/后端分离」,而非对话式 agent。代价是灵活性,换来的是可控性和可续跑性。

8. 代码地图

主题文件符号
主流水线入口workflows/agent_orchestration_engine.pyexecute_multi_agent_research_pipeline
工作区准备workflows/environment.pyprepare_workflow_environment
输入获取workflows/agent_orchestration_engine.pyacquire_input_artifact
各 Phase 编排函数同上orchestrate_document_preprocessing_agent / orchestrate_code_planning_agent / orchestrate_reference_intelligence_agent / automate_repository_acquisition_agent / orchestrate_codebase_intelligence_agent / synthesize_code_implementation_agent
路径契约workflows/workflow_context.pyWorkflowContext(initial_plan_pathreference_path 等 property)
聊天式入口workflows/agent_orchestration_engine.pyexecute_chat_based_planning_pipeline / run_chat_planning_agent