跳到主要内容

Workforce — 协调者-工人流水线

这章讲什么: Workforce 是 CAMEL 现代的多智能体编排:一个项目经理(协调者)+ 一个规划师(拆任务)+ 若干工人。它用一个 asyncio 任务通道把活异步派出去、收回来,任务失败时还会让 LLM 决定怎么补救。这是搭真实多 agent 系统的主力。

1. 它要解决的小问题

复杂任务("做柏林市场分析")需要多种专长 + 并行 + 容错:查资料的、分析的、写报告的各干各的,有人搞砸了不能让整个流程崩。Workforce 把这套「拆活—派活—干活—容错—汇总」自动化。

2. 顶层全景:三种内部 agent + 一个通道

Workforce 内部有三类 ChatAgent(类 docstring,workforce.py:182-186),外加一个 TaskChannel 做协调:

部件角色干什么
协调者 agent(coordinator)项目经理看每个工人的能力描述,把子任务派给最合适的
任务规划 agent(task planner)规划师把大任务拆成子任务、最后把结果组合
工人(workers)执行者SingleAgentWorker / RolePlayingWorker / 嵌套 Workforce
TaskChannel传送带异步存放任务,工人从这领活、干完放回
动态工人临时工现有工人搞不定时,运行时新造一个(恢复策略之一)
Task("做柏林市场分析")


┌──────────────┐ ① 拆成子任务
│ 任务规划 agent │ ───────────────┐
└──────────────┘ │ 查资料 / 分析 / 写报告
┌───────────────────────────┘

┌──────────────┐ ② 按能力派活
│ 协调者 agent │
└──────┬───────┘
│ post_task

┌────────────────────── TaskChannel(asyncio 传送带)──────────────────────┐
│ SENT → PROCESSING → RETURNED (任务在通道里的状态流转) │
└──────┬──────────────┬──────────────┬──────────────────────────────────┘
│ 领活 │ 领活 │ 领活
▼ ▼ ▼
[工人A 查资料] [工人B 分析] [工人C 写报告] ← 每个工人就是一个 ChatAgent
│ 干完 return_task 放回通道

③ 协调者收回:成功→处理依赖;失败→选恢复策略

怎么读:中间那条 TaskChannel 是传送带,所有派活/领活/交活都是往它里面改任务状态。工人是消费者,协调者是生产者。

3. 核心机制一:任务通道(生产者-消费者)

小问题: 协调者和多个工人在不同 asyncio 协程里跑,怎么安全地传任务、还支持并行?

思路: 用一个共享的 TaskChannel,内部是一个 asyncio.Condition + 几个按状态/按工人分组的索引。每个任务被包成 Packet,带状态 SENT → PROCESSING → RETURNED → ARCHIVED(task_channel.py:25-42)。

怎么转:

  • 协调者派活:post_task 把 packet 设为 SENT、丢进该工人的队列、notify_all(task_channel.py:203-213)。
  • 工人领活:get_assigned_task_by_assigneeConditionawait,被唤醒后原子地把任务从 SENT 改成 PROCESSING 并取走(task_channel.py:174-201)——这把「领同一个任务的竞态」挡住了。
  • 工人交活:return_task 把状态改 RETURNED、放进发布者队列(task_channel.py:228-240)。
  • 协调者收活:get_returned_task_by_publisher(task_channel.py:148-172)。
# 示意,非源码:领活的原子性
async def get_assigned_task(self, worker_id):
async with self._condition: # 拿锁
while True:
for task_id in self.queue[worker_id]:
if task.status == SENT: # 还没被别人领走
task.status = PROCESSING # 原子地占住
return task
await self._condition.wait() # 没活就睡,等 notify

重点看 status == SENT → PROCESSING 必须在同一个锁内完成,否则两个工人会领到同一个任务。

工人侧的领活循环在 worker.py:152 _listen_to_channel:asyncio.wait_for(self._get_assigned_task(), timeout=1.0) 拿活 → create_task(self._process_single_task(task)) 起一个协程去干 → 不阻塞,继续领下一个。所以一个工人能并行处理多个任务

4. 核心机制二:任务拆分与派发

拆分: Task.decompose()(tasks/task.py:408)用 TASK_DECOMPOSE_PROMPT 让规划 agent 把任务切成子任务列表;Task.compose()(tasks/task.py:579)反过来把子结果合并。任务是:subtasks + dependencies(tasks/task.py:264:272)。

派发: _call_coordinator_for_assignment(workforce.py:3768)把所有工人的能力描述(_get_child_nodes_info,含每个工人有哪些工具,workforce.py:3678)和待派任务塞进 ASSIGN_TASK_PROMPT,让协调者返回一个 TaskAssignResult——即「哪个子任务给哪个工人、依赖谁」。

校验回环: 协调者可能给出不存在的工人 ID。_call_coordinator_for_assignment 支持把 invalid_ids 连同合法 ID 列表反馈给协调者重试(workforce.py:3799-3807)——又一次「让 LLM 自我纠错」。

5. 核心机制三:失败恢复(精华)

小问题: 工人把任务搞砸了怎么办?直接报错太脆。

思路: 把「失败」变成一次协调者的推理。失败时调 _analyze_task(for_failure=True)(workforce.py:1645),把错误信息喂给协调者,让它在五种策略里选一个:

策略(RecoveryStrategy)含义
RETRY原样再试一次(临时错误,如网络超时)
REPLAN改写任务内容后重试(任务表述有问题)
DECOMPOSE把任务再拆细
CREATE_WORKER现有工人都不行,造一个新工人来干
REASSIGN把任务重新指派给另一个工人

枚举定义在 utils.py:211-218。选择逻辑由 LLM 产出 TaskAnalysisResult,再由 _apply_recovery_strategy(workforce.py:1828)执行。CREATE_WORKER 会真的在运行时 new 一个 ChatAgent 工人挂进通道(_create_worker_node_for_task,workforce.py:4186)。

妙在哪:容错不是写死的 try/except retry,而是一次 LLM 决策——同一个错误,模型可能这次选重试、下次选换人,取决于它对错误的理解。

6. 主循环:把上面串起来

Workforce.process_task(workforce.py:2822)是同步入口,内部跑 _listen_to_channel(workforce.py:5282)这个事件循环:

process_task(task)

▼ 启动所有工人的监听协程 + 自己的监听循环
while 还有 pending 或 in-flight 任务:
├─ 需要拆分?→ handle_decompose_append_task
├─ _post_ready_tasks() # 依赖满足的任务派出去
├─ returned = _get_returned_task() # 收一个干完的
├─ 成功 → _handle_completed_task() # 解锁依赖它的任务
└─ 失败 → _handle_failed_task() # 走恢复策略
→ 全部完成 → compose 出最终结果

关键行:workforce.py:5292 await self._post_ready_tasks()(先派一批);workforce.py:5393 returned_task = await self._get_returned_task()(收回);workforce.py:4941 _handle_completed_task;workforce.py:4642 _handle_failed_task

7. 关键细节 / 坑

① 工人是「能力描述 + ChatAgent」。 add_single_agent_worker(description, worker=agent)(workforce.py:3031)——那段 description 是协调者派活时唯一能看到的依据,写不好会派错活。

② 依赖驱动并行。 只有依赖都 DONE 的任务才会被 _post_ready_tasks 派出去(workforce.py:4397),所以无依赖的子任务天然并行,有依赖的串行。

③ 两种模式。 AUTO_DECOMPOSE(默认)用上面的智能恢复;PIPELINE 用简单重试、且允许失败任务把错误信息往下游传(workforce.py:257-262)。

share_memory 仅限 SingleAgentWorker 开了之后这些工人 + 协调者 + 规划者共享完整对话历史,利于任务交接;但 RolePlayingWorker 和嵌套 Workforce 不参与(workforce.py:225-233)。

8. 横向对比

02-role-playing.md §7 的对比表:RolePlaying 是对称双人、无拆分无恢复;Workforce 是非对称多工人、显式拆分 + LLM 恢复。两者都建立在 ChatAgent(见 01-chat-agent.md)之上。

9. 代码地图

主题文件符号
同步入口camel/societies/workforce/workforce.pyWorkforce.process_task
主事件循环camel/societies/workforce/workforce.pyWorkforce._listen_to_channel
派发就绪任务camel/societies/workforce/workforce.pyWorkforce._post_ready_tasks
协调者派活camel/societies/workforce/workforce.pyWorkforce._call_coordinator_for_assignment
失败分析camel/societies/workforce/workforce.pyWorkforce._analyze_task
执行恢复策略camel/societies/workforce/workforce.pyWorkforce._apply_recovery_strategy
恢复策略枚举camel/societies/workforce/utils.pyRecoveryStrategy
任务通道camel/societies/workforce/task_channel.pyTaskChannel / Packet
工人领活循环camel/societies/workforce/worker.pyWorker._listen_to_channel
任务树/拆分camel/tasks/task.pyTask.decompose / Task.compose
加工人camel/societies/workforce/workforce.pyWorkforce.add_single_agent_worker