Canvas 类详细解析
概述
canvas.py 是 RAGFlow Agent 系统的核心执行引擎,定义了 Graph(基础图执行器)和 Canvas(完整的 Agent 画布)两个类。整个文件共 848 行,实现了:
- DSL 驱动的组件图的加载、执行和序列化
- 变量引用系统:
{component_id@variable}语法,支持跨组件取值和赋值 - 异步执行引擎:基于
asyncio+ThreadPoolExecutor的并发调度 - 流式消息输出:支持 TTS(文字转语音)的实时流式响应
- 任务取消机制:基于 Redis 的取消信号传递
- 异常处理与路径跳转:运行时错误可触发 goto 跳转或默认值回退
文件路径:agent/canvas.py
1. 依赖关系
1 | import asyncio, base64, datetime, inspect, json, logging, re, time |
关键依赖说明:
component_class():动态组件工厂函数,从agent.component、agent.tools、rag.flow三个模块中按类名查找并返回组件类normalize_chunker_dsl():DSL 版本兼容层,将旧版Splitter/HierarchicalMerger等命名自动迁移到TokenChunker/TitleChunkerLLMBundle:LLM 调用的统一封装,绑定 tenant_id 和模型配置partial:用于标记”尚未完成的流式输出”,是一个核心的惰性求值模式
2. 类层次结构
1 | Graph ← 基础图:DSL 加载、变量系统、取消机制 |
Graph 是抽象基类,定义了图的基本操作,其 run() 方法只是 raise NotImplementedError()。所有真正的执行逻辑都在 Canvas 中。
3. Graph 类(第 43-282 行)
3.1 DSL 数据结构
Graph/Canvas 的核心数据模型是一个称为 DSL 的字典结构:
1 | dsl = { |
3.2 __init__ 方法
1 | def __init__(self, dsl: str, tenant_id=None, task_id=None, custom_header=None): |
执行流程:
- DSL 迁移:
normalize_chunker_dsl()自动将旧版命名转换(如Splitter→TokenChunker) - 线程池:创建
max_workers=5的线程池,用于执行同步阻塞的组件_invoke() - 加载:调用
load()实例化所有组件
3.3 load() 方法(第 96-111 行)
1 | def load(self): |
这是 DSL 编译的核心,分三步:
| 步骤 | 操作 | 说明 |
|---|---|---|
| 1 | component_class(name + "Param")() |
动态加载参数类(如 RetrievalParam),实例化 |
| 2 | param.update(params) → param.check() |
将 JSON 参数写入对象并进行校验 |
| 3 | component_class(name)(self, k, param) |
动态加载组件类(如 Retrieval),传入 canvas 引用、ID 和参数 |
设计要点:组件实例化时接收 self(即 Graph/Canvas 本身),因此每个组件都持有对图的反向引用(self._canvas),可以访问全局变量、其他组件输出等。
3.4 变量引用系统
RAGFlow 的 DSL 使用一种模板语法引用其他组件的输出:
1 | {component_id@variable_name.sub_field} |
支持的变量前缀:
sys.*— 系统全局变量(如sys.query)env.*— 环境变量/用户定义变量(如env.temperature)cid@var— 另一个组件的输出(如retrieval_0@content)
get_value_with_variable()(第 168-193 行)
替换字符串中的变量引用,返回最终值:
1 | pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*") |
正则匹配 {xxx} 或 {{xxx}} 两种格式的变量引用,然后逐段替换。
特殊处理:
- 如果值是
partial(惰性生成器),则消费所有 chunk 拼接为字符串 - 如果值是字符串则直接替换
- 其他类型转为 JSON 字符串
get_variable_value()(第 195-210 行)
1 | def get_variable_value(self, exp: str) -> Any: |
取值路径:从组件对象的 _param.outputs 字典中按 key 取顶层值,再按 . 分隔的路径做深度访问。
get_variable_param_value()(第 212-239 行)
支持字典、列表(索引)、对象属性三种深度访问方式:
1 | for key in path.split('.'): |
set_variable_value() / set_variable_param_value()(第 241-271 行)
写回变量值,同样支持深度路径写入。
3.5 序列化:__str__()(第 113-132 行)
1 | def __str__(self): |
将运行时状态序列化回 JSON 字符串。关键点:
- 组件对象通过
str(cpn["obj"])序列化,即调用ComponentBase.__str__(),输出{"component_name": "...", "params": {...}} - 其他元数据(upstream、downstream、parent_id 等)使用
deepcopy防止引用污染
3.6 取消机制(第 273-282 行)
基于 Redis 实现,不使用数据库轮询:
1 | def is_canceled(self) -> bool: |
这是一个协作式取消模式:被取消的任务不会立即被杀死,而是在每次 is_canceled() 检查点主动抛出 TaskCanceledException。
4. Canvas 类(第 285-848 行)
Canvas 继承 Graph,是完整的 Agent 执行引擎。
4.1 __init__ 方法(第 287-298 行)
1 | def __init__(self, dsl: str, tenant_id=None, task_id=None, canvas_id=None, custom_header=None): |
新增属性:
| 属性 | 类型 | 说明 |
|---|---|---|
globals |
dict |
全局变量,包含 sys.* 和 env.* 两种命名空间 |
variables |
dict |
用户自定义的环境变量定义(类型、默认值等) |
_id |
str |
Canvas 持久化 ID |
history |
list[tuple] |
对话历史,格式为 [(role, content), ...] |
retrieval |
list[dict] |
检索结果栈,每个元素为 {"chunks": {}, "doc_aggs": {}} |
memory |
list[tuple] |
长期记忆,格式为 [(user_msg, assist_msg, summary), ...] |
4.2 load() 方法(第 300-324 行)
1 | def load(self): |
从 DSL 恢复持久化的全局变量和历史记录,同时确保必要字段存在(向后兼容)。
4.3 reset() 方法(第 332-373 行)
1 | def reset(self, mem=False): |
重置整个画布状态。mem=True 时保留对话历史和检索结果(用于同一对话的多轮交互)。
4.4 核心执行引擎:run() 方法(第 375-669 行)
这是整个文件最核心的方法,约 300 行。它是一个 async generator,通过 yield 输出实时事件流。
整体执行流程图
1 | 用户输入 (query, files, inputs) |
事件装饰器(第 414-423 行)
1 | def decorate(event, dt): |
所有 yield 出去的事件都经过此装饰器,统一添加 message_id、task_id、created_at 等元数据。
事件类型:
| 事件 | 说明 |
|---|---|
workflow_started |
工作流开始执行 |
node_started |
节点开始执行 |
node_finished |
节点执行完成(含输入、输出、耗时) |
message |
流式消息块(支持 start_to_think/end_to_think 标记) |
message_end |
消息结束(含参考文档、状态) |
user_inputs |
需要用户补充输入(UserFillUp 交互) |
workflow_finished |
工作流执行完成 |
输入初始化(第 376-412 行)
1 | self.add_user_input(kwargs.get("query")) |
- 文件上传通过
FileService异步解析(支持图片→base64、文档→文本提取) - 支持 Webhook 模式的 Begin 组件,直接注入 payload
核心执行循环(第 502-651 行)
1 | idx = len(self.path) - 1 # 当前批次的起始索引 |
_run_batch 并发执行器(第 437-484 行)
1 | async def _run_batch(f, t): |
并发策略:
- 使用
asyncio.Semaphore(5)限制最大并发数 - 异步组件(有
_invoke_async协程)直接在事件循环中执行 - 同步组件通过
run_in_executor投递到线程池执行 - 每个 batch 包含从
idx到to的所有节点,并发执行
依赖检查(第 466-470 行)
1 | for _, ele in cpn.get_input_elements().items(): |
在执行前检查每个节点的输入依赖:如果某个输入引用的组件尚未执行完,则将该节点从当前批次中移除,等待下一轮。
Message 流式输出(第 518-577 行)
1 | if cpn_obj.component_name.lower() == "message": |
Message 组件是唯一产生用户可见输出的组件。核心设计:
partial模式:partial是 Python 的functools.partial,这里用作”惰性生成器”标记——如果 output 值是partial类型,表示这是一个需要被消费的生成器- 流式消费:逐 chunk 消费生成器,通过
yield decorate("message", ...)实时推送给前端 - TTS 集成:Message 配置
auto_play=True时,自动调用 TTS 模型将文本转为语音
路径推进逻辑(第 601-629 行)
不同类型的组件有不同的下游推进逻辑:
1 | # 迭代/循环内部项结束 → 回到父迭代器 |
异常处理(第 579-589 行)
1 | if cpn_obj.error(): |
4.5 TTS 文字转语音(第 683-717 行)
1 | def tts(self, tts_mdl, text): |
TTS 合成逻辑:
- 文本清洗:移除控制字符、emoji,标准化空格
- 长度限制:最多 500 字符
- 缓存:通过
synthesize_with_cache避免重复合成相同文本 - 流式触发:在 Message 的流式输出中,每累积 16 个字符触发一次 TTS
4.6 文件处理(第 752-778 行)
1 | async def get_files_async(self, files, layout_recognize=None) -> list[str]: |
提供异步和同步两个版本的接口:
get_files_async():在事件循环中异步执行get_files():同步包装器,从同步组件 invoke 路径调用时使用
4.7 Tool Use 回调(第 780-802 行)
1 | def tool_use_callback(self, agent_id, func_name, params, result, elapsed_time=None): |
用于 AgentWithTools 组件的工具调用追踪,结果存储在 Redis 中供前端展示。
4.8 参考文档管理(第 804-838 行)
1 | def add_reference(self, chunks, doc_infos): |
检索到的文档块和聚合信息存储在 retrieval 栈中,用于构建最终的引用列表。
4.9 对话历史管理(第 719-732 行)
1 | def get_history(self, window_size): |
历史记录是双写的:同时保存在 self.history 和 self.globals["sys.history"] 中。
5. 组件执行流程(component/base.py 补充)
Canvas 不直接调用组件的业务逻辑,而是通过 ComponentBase 定义的标准接口:
invoke 调用链
1 | Canvas._run_batch() |
输入解析(get_input())
1 | def get_input(self, key=None): |
输入解析的三种路径:
- 纯变量引用(如
"retrieval_0@content")→ 直接取值 - 模板字符串(如
"查询结果:{retrieval_0@content}")→ 解析变量后格式化 - 常量值→ 直接使用
6. DSL 迁移机制(dsl_migration.py)
normalize_chunker_dsl() 实现了 DSL 的向后兼容:
1 | COMPONENT_RENAMES = { |
迁移范围:
| 迁移目标 | 说明 |
|---|---|
components 的 key |
Splitter:abc → TokenChunker:abc |
obj.component_name |
Splitter → TokenChunker |
downstream/upstream ID |
所有旧 ID 替换为新 ID |
parent_id |
指向旧 ID 的替换为新 ID |
path |
路径中的旧 ID 替换 |
graph.nodes |
node ID、parentId、type、label、name 全面更新 |
graph.edges |
source、target、id 更新 |
| 变量引用中的 ID | 模板 {Splitter:abc@var} → {TokenChunker:abc@var} |
7. 并发模型总结
1 | asyncio Event Loop (主线程) |
关键设计决策:
- 异步优先:主执行循环是 async generator,支持流式输出和并发执行
- 线程池隔离:同步阻塞操作(LLM HTTP 调用、文件 I/O)在线程池中执行,不阻塞事件循环
- 信号量限流:
Semaphore(5)限制并发组件数,防止资源耗尽 - 协作式取消:关键检查点处调用
is_canceled()抛出异常终止
8. 关键设计模式
8.1 Partial 惰性求值
functools.partial 在 RAGFlow 中被用作流式输出句柄:
1 | # 组件设置输出为一个 partial(包装了生成器/迭代器) |
这允许 Message 组件先产出完整的 node_finished 事件,而内容通过后续的 message 事件逐步推送。
8.2 路径驱动的执行模型
Canvas 不使用显式的拓扑排序(如 BFS/DFS),而是采用增量路径追加模型:
self.path是一个有序列表,记录将要执行的组件 ID- 每个组件执行完成后,其
downstream被追加到 path 末尾 - 特殊组件(Switch、Categorize、Iteration、Loop)有自定义的路径推进逻辑
- 依赖检查确保引用的上游组件已执行
8.3 DSL 序列化与组件实例的双向同步
1 | JSON DSL (字符串) 运行时对象 |
load() 时从 JSON 创建组件实例,__str__() 时将组件实例序列化回 JSON。
9. 交互流程示例
以最简单的 RAG 流程为例:Begin → Retrieval → Generate → Message
1 | 1. Client 发送: {query: "什么是RAG?", files: [...]} |
10. 主要组件类型一览
| 组件名 | 文件 | 功能 |
|---|---|---|
Begin |
begin.py |
工作流入口,定义模式(Chat/Agent/Webhook)和输入参数 |
Message |
message.py |
输出消息给用户,支持流式 + TTS |
LLM |
llm.py |
通用 LLM 调用组件 |
AgentWithTools |
agent_with_tools.py |
Agent + 工具调用(Function Calling) |
Retrieval |
(在 rag/flow 中) | 知识库检索 |
Generate |
(在 rag/flow 中) | RAG 生成回答 |
Categorize |
categorize.py |
多路分类路由 |
Switch |
switch.py |
条件分支 |
Iteration/IterationItem |
iteration.py/iterationitem.py |
迭代执行 |
Loop/LoopItem/ExitLoop |
loop.py/loopitem.py/exit_loop.py |
循环执行 |
UserFillUp |
fillup.py |
暂停等待用户补充输入 |
VariableAssigner |
variable_assigner.py |
变量赋值 |
VariableAggregator |
variable_aggregator.py |
变量聚合 |
DocGenerator |
docs_generator.py |
文档生成 |
ExcelProcessor |
excel_processor.py |
Excel 处理 |
Browser |
browser.py |
网页浏览/爬取 |
Invoke |
invoke.py |
调用其他画布/工作流 |
DataOperations |
data_operations.py |
数据转换操作 |
StringTransform |
string_transform.py |
字符串转换 |
ListOperations |
list_operations.py |
列表操作 |
11. 关键配置与限制
| 配置项 | 默认值 | 说明 |
|---|---|---|
_thread_pool.max_workers |
5 | 线程池大小,控制同步组件并发数 |
Semaphore(5) |
5 | 异步信号量,控制组件并发执行数 |
MAX_CONCURRENT_CHATS |
10 | 环境变量,全局并发对话限制 |
COMPONENT_EXEC_TIMEOUT |
600s | 环境变量,单组件执行超时 |
| TTS 缓冲区 | 16 字符 | 流式 TTS 触发的字符累积阈值 |
| TTS 最大文本长度 | 500 字符 | 单次 TTS 合成的最大文本 |
| Tool use logs TTL | 10 分钟 | Redis 中工具调用日志过期时间 |
PARAM_MAXDEPTH |
(settings) | 参数嵌套最大深度 |
12. 线程安全与注意事项
- 组件状态隔离:每个组件的
_param.inputs和_param.outputs是独立的,但通过 Canvas 的变量系统可以读写其他组件的输出 - 并发写入冲突:多个组件并发执行时,如果同时
set_variable_value写入同一组件的同一 output key,可能存在竞态条件(实际使用中很少出现,因为组件依赖关系天然避免了大部分冲突) - 事件循环绑定:
get_files_async必须在运行中的事件循环内调用,get_files()同步包装器通过asyncio.run_coroutine_threadsafe处理跨线程场景 - Redis 依赖:取消机制、工具调用日志都依赖 Redis,Redis 不可用时这些功能会静默失败(通过 try/except 处理)

