跳到主要内容

03 · Flow(事件驱动的精确编排)

本章讲什么: Flow 是 Crew 之外的第二套范式。你用装饰器声明「方法之间谁触发谁」,Flow 引擎据此调度。它适合需要确定性走向、分支路由、并行、持久化的工作流——比 Crew 的「自治队伍」更可控。

3.1 它要解决的小问题

Crew 把过程交给 Agent 自治,但很多生产场景你想要精确控制:第一步算个值,据此分支走 A 或 B,B 完成后并行触发 C 和 D,全程可中断恢复。把这些写成一堆 if/await 会很乱。Flow 让你用声明式装饰器表达这张「事件图」,引擎负责调度。

3.2 用起来什么样

# 示意,非源码:一个最小 Flow
from crewai.flow.flow import Flow, start, listen, router

class Guide(Flow):
@start() # 入口方法
def fetch(self):
return "raw data"

@router(fetch) # fetch 完成后,路由器决定走向
def decide(self, data):
return "big" if len(data) > 100 else "small"

@listen("small") # 监听路由器吐出的标签 "small"
def handle_small(self):
return "handled small"

Guide().kickoff()

关键直觉:方法的「完成」就是一个事件,@listen(X) 订阅这个事件@router 特殊在它的返回值变成新的触发标签,从而改变后续走向。

3.3 装饰器 DSL:四件套

装饰器含义实现
@start()标记 Flow 的入口方法(可多个,并行启动)flow/dsl/_start.py
@listen(cond)当条件满足时执行;cond 可是方法引用、路由标签、或 or_/and_flow/dsl/_listen.py (listen)
@router(cond)像 listen,但返回值成为新触发事件,用于分支flow/dsl/_router.py
or_(...) / and_(...)多触发条件:任一满足 / 全部满足flow/dsl/_conditions.py

这些装饰器不直接执行逻辑,而是把方法包成一个带 FlowMethodDefinition 的 wrapper。看 listen 的实现(flow/dsl/_listen.py:45-55):

# _listen.py:45-55(摘)
def decorator(func):
wrapper = ListenMethod(func)
_merge_flow_method_definition(
wrapper,
FlowMethodDefinition(
do=_method_action(func),
listen=_to_definition_condition(condition), # 把触发条件标准化
),
)
return wrapper

Flow 的元类 FlowMeta(flow/runtime/__init__.py:380)在类定义时扫描这些 wrapper,建立「触发条件 → 方法」的注册表。

3.4 执行模型:谁触发谁

怎么读: kickoff 先跑所有 @start 方法;每个方法完成后,引擎查「谁在听这个方法」,先跑路由器(改走向),再并行跑普通监听器。

kickoff
│ 跑所有 @start 方法

方法 M 完成 ──→ _execute_listeners(M, result)

① 先反复跑被 M 触发的 @router(顺序、可链式)
│ router 的返回值 → 新触发标签,可再触发别的 router

② 收集全部触发标签 [M, router结果...]

③ 找出被触发的普通 @listen 方法
→ asyncio.gather 并行执行
→ 每个完成后又递归 _execute_listeners

入口 Flow.kickoff(flow/runtime/__init__.py:1835)其实是 kickoff_async(:1934)的同步包装——「所有状态初始化和事件发射都在 async 方法里处理」。

调度核心是 _execute_listeners(flow/runtime/__init__.py:2704)。它的两段式很关键(见其 docstring :2722-2726):

Routers are executed sequentially to maintain flow control. Normal listeners are executed in parallel for efficiency.

先跑路由器(顺序、可链式)。 while True 循环不断找被当前触发标签触发的 router(router_only=True),执行后把 router 的返回值转成新的触发标签,可能再触发下一个 router(:2738-2778)。这让路由可以链式收窄走向。

再并行跑监听器。 对每个触发标签,_find_triggered_methods(..., router_only=False) 找出监听者,然后 asyncio.gather(*tasks) 并行执行(:2812-2821)。

3.5 or_ / and_ 条件

@listen(or_(a, b)) 表示 a 或 b 任一完成就触发;and_(a, b) 表示两者都完成才触发。and_ 的实现靠累积已见事件:_condition_met 把每个到达的触发标签记进 _pending_events,直到集合满足条件才放行(flow/runtime/__init__.py:2837-2848)。

or_ 有个细节叫**「已触发」去重**:多事件 or_ 监听器触发一次后会被记进 _fired_or_listeners,避免同一轮被重复触发(:2860-2869);循环结构里又有 _rearm_or_listeners(重新武装)机制,让循环流程中 or_ 监听器能被再次触发(:1001-1028:2787-2788)。这是支持有环 Flow(循环工作流)的基础。

3.6 并行与竞速

除了普通的 asyncio.gather 并行,Flow 还支持竞速组(racing group):当多个监听器为同一个 or_ 条件竞争「谁先到」时,_execute_racing_listeners 处理这种「先到先得」语义,同时让非竞速的监听器照常并行(flow/runtime/__init__.py:2796-2811_execute_racing_listeners :1105)。

3.7 状态、持久化与重放

Flow 有一个结构化状态对象 FlowState(flow/runtime/__init__.py:365),带一个 id。状态可以是字典或 Pydantic 模型,由 _initialize_state(:1688)从 inputs 初始化。

两套独立的「记住进度」机制(kickoff docstring 明确区分,:1864-1869):

机制触发用途
@persist + restore_from_state_id给方法/Flow 标 @persist,kickoff 传 restore_from_state_id从某次历史快照恢复状态(新 run 用新 state.id,不污染原历史)
Checkpoint(from_checkpoint)kickoff 传 from_checkpoint 配置检查点恢复

两者不能同时用,同传会 raise ValueError(:1864-1869)。

3.8 巧妙之处:Crew 与 Flow 互相嵌套

Flow 的方法里可以直接 SomeCrew().kickoff(),把「自治队伍」当成流程图里的一个节点;反过来,第 1 章已说过,新版 Agent 执行器 AgentExecutor 本身就是 Flow 的子类。所以这两套范式不是对立的,而是可组合的——你用 Flow 画确定性骨架,在需要开放式推理的节点塞一个 Crew。这正是 README「combine precise workflow control ... and native support for Crews」的工程兑现。

→ 下一章:支撑这一切的横切设施(LLM 层、记忆、事件总线)与边界。