跳到主要内容

第 5 章 · AWEL 数据流引擎

AWEL(Agentic Workflow Expression Language)是 DB-GPT 的执行底座——一个用 Python 运算符拼出来的异步数据流引擎。它独立于 Agent 框架,但 Agent 之上的工作流编排靠它。本章讲它怎么用 >> 把一堆算子串成 DAG 并执行。

5.1 它要解决的小问题

一次 RAG 或数据分析往往是固定流水线:"取输入 → 检索 → 拼 prompt → 调 LLM → 解析输出"。你希望像搭积木一样声明这条流水线,而不是手写一堆 await 串行调用;还希望它天生异步、能分支、能流式。AWEL 就是这个声明 + 执行的引擎。

5.2 思路/直觉:用 >> 画 DAG

AWEL 最讨喜的设计:重载 >> 运算符表示数据流向a >> b 就是"a 的输出流给 b"。

input_op >> retrieve_op >> prompt_op >> llm_op >> parse_op
│ │ │ │ │
取输入 检索知识 拼 prompt 调 LLM 解析输出
(上游) ──────────────────数据沿箭头流────────────────► (下游)

怎么读: 从左到右就是数据流向,每个框是一个"算子"(Operator)。这一行代码就定义了一张 DAG,运行时数据从左流到右。

5.3 真实实现:>> 是怎么变成依赖的

秘密在 DependencyMixin 重载了 Python 的 __rshift__(dbgpt/core/awel/dag/base.py:101-117):

# dag/base.py:101-117 真实片段
def __rshift__(self, nodes: DependencyType) -> DependencyType:
"""Set downstream nodes for current node. Implements: self >> nodes."""
self.set_downstream(nodes)
return nodes

a >> b 实际执行 a.set_downstream(b) 并返回 b——返回 b 是关键,这样 a >> b >> c 能链式继续(a >> b 返回 b,再 >> c)。对应地 <<__lshift__ 设上游(dag/base.py:84-99),还有 __rrshift__ 支持 [a, b] >> c 这种列表在左的写法(dag/base.py:119-125)。

每个算子继承 BaseOperator(dbgpt/core/awel/operators/base.py:161-),它同时是 DAGNode(所以有上下游)。算子类型有现成的:

算子干什么符号
Map一进一出的转换MapOperator
Join多个上游汇成一个JoinOperator
Branch按条件走不同下游BranchOperator
InputDAG 的入口,喂初始数据InputOperator
Trigger由 HTTP/迭代器等触发整张 DAGTriggerOperator, HttpTrigger
Stream流式(streamify/unstreamify)StreamifyAbsOperator

(导出清单见 awel/__init__.py:73-110。)

5.4 DAG 上下文管理:with DAG(...) 自动收集节点

DAG 用上下文管理器构建(dbgpt/core/awel/dag/base.py:788-)。with DAG("my_dag") as dag: 块里创建的算子会自动登记到这个 DAG,靠的是 DAGVar 维护的"当前 DAG 栈"(dag/base.py:136-183,enter_dag/exit_dag),还区分了同步线程栈和异步 contextvar 栈——这样并发构建多张 DAG 不会串味。

# 示意,基于真实 AWEL API:声明一张最小 DAG
with DAG("my_flow") as dag:
a = InputOperator(...) # 入口
b = MapOperator(...) # 转换
a >> b # 声明:a 的输出流给 b
result = await b.call(call_data=...) # 从某个算子触发执行

5.5 怎么执行:call() → Runner 按依赖跑

你不直接"跑 DAG",而是对某个算子调 .call()(operators/base.py:262-286):

# operators/base.py:282-286 真实片段
with root_tracer.start_span("dbgpt.awel.operator.call"):
out_ctx = await self._runner.execute_workflow(
self, call_data, exist_dag_ctx=dag_ctx, dag_variables=dag_variables)
return out_ctx.current_task_context.task_output.output

执行交给 DefaultWorkflowRunner(dbgpt/core/awel/runner/local_runner.py:24-)。execute_workflow(local_runner.py:43-)从你调用的那个算子出发,沿上游依赖递归地先把上游跑完(_execute_node,local_runner.py:125-),数据和每个节点的输出都存在 DAGContext 里流转(dag/base.py:625-)。分支算子会用 _skip_current_downstream_by_node_name / _skip_downstream_by_id 把没选中的分支整条剪掉(local_runner.py:215-241)。

所以执行模型是:拉模式 + 拓扑序——从终点拉,先算依赖,数据在 DAGContext 里逐节点传递。

5.6 关键细节 / 坑

  • >> 返回下游不是上游:a >> b 返回 b,链式拼接很顺,但如果你想拿到 a,别接着用返回值。
  • 元类做了装配:BaseOperatorMeta(operators/base.py:91-)在算子实例化时自动绑当前 DAG、注入 runner 等——所以你 with DAG 里 new 一个算子,它"自动知道"自己属于哪张 DAG。
  • call 是异步的:callasync;调试时可以用 _blocking_call(operators/base.py:288-314)同步跑,但注释明确说"仅供调试,请用 call"。
  • 触发器才是真入口:生产里 DAG 多由 HttpTrigger/IteratorTrigger 触发(awel/trigger/),.call() 更多用于程序内调用和测试。
  • 可远程执行:DAGVar._check_serializable(dag/base.py:148-150)在远程跑算子时会校验可序列化——AWEL 设计上支持把算子分布到远端执行。

5.7 它和 Agent 的关系

AWEL 在 DB-GPT 里是通用底座,不专属 Agent。但很多东西架在它上面:RAG 的检索-生成流水线、模型调用算子(dbgpt/model/operators/)、数据源算子(dbgpt/datasource/operators/),乃至多 Agent 的 layout 编排(见 examples/agents/awel_layout_agents_chat_examples.py)都用 AWEL 表达。把它理解成"Agent 之下、可声明、可异步、可分布的执行引擎"即可。

5.8 代码地图

主题文件路径(相对 packages/dbgpt-core/src/)关键符号
>> 运算符与依赖dbgpt/core/awel/dag/base.pyDependencyMixin.__rshift__, __lshift__
DAG / 节点 / 上下文栈dbgpt/core/awel/dag/base.pyDAG, DAGNode, DAGVar, DAGContext
算子基类与 calldbgpt/core/awel/operators/base.pyBaseOperator.call, BaseOperatorMeta
常用算子dbgpt/core/awel/operators/common_operator.pyMapOperator, JoinOperator, BranchOperator
执行器dbgpt/core/awel/runner/local_runner.pyDefaultWorkflowRunner.execute_workflow, _execute_node
触发器dbgpt/core/awel/trigger/HttpTrigger, IteratorTrigger
导出清单dbgpt/core/awel/__init__.py(AWEL 公共 API)