Canvas类详细解析

Canvas 类详细解析

概述

canvas.py 是 RAGFlow Agent 系统的核心执行引擎,定义了 Graph(基础图执行器)和 Canvas(完整的 Agent 画布)两个类。整个文件共 848 行,实现了:

  • DSL 驱动的组件图的加载、执行和序列化
  • 变量引用系统{component_id@variable} 语法,支持跨组件取值和赋值
  • 异步执行引擎:基于 asyncio + ThreadPoolExecutor 的并发调度
  • 流式消息输出:支持 TTS(文字转语音)的实时流式响应
  • 任务取消机制:基于 Redis 的取消信号传递
  • 异常处理与路径跳转:运行时错误可触发 goto 跳转或默认值回退

文件路径:agent/canvas.py


1. 依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio, base64, datetime, inspect, json, logging, re, time
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from functools import partial
from typing import Any, Union, Tuple

from agent.component import component_class # 动态组件工厂
from agent.component.base import ComponentBase # 组件基类
from agent.dsl_migration import normalize_chunker_dsl # DSL 版本迁移
from api.db.services.file_service import FileService # 文件解析服务
from api.db.services.llm_service import LLMBundle # LLM 调用封装
from api.db.services.task_service import has_canceled # 取消状态查询
from api.db.joint_services.tenant_model_service import get_tenant_default_model_by_type
from common.constants import LLMType
from common.misc_utils import get_uuid, hash_str2int
from common.exceptions import TaskCanceledException
from rag.prompts.generator import chunks_format
from rag.utils.redis_conn import REDIS_CONN
from rag.utils.tts_cache import synthesize_with_cache

关键依赖说明:

  • component_class():动态组件工厂函数,从 agent.componentagent.toolsrag.flow 三个模块中按类名查找并返回组件类
  • normalize_chunker_dsl():DSL 版本兼容层,将旧版 Splitter/HierarchicalMerger 等命名自动迁移到 TokenChunker/TitleChunker
  • LLMBundle:LLM 调用的统一封装,绑定 tenant_id 和模型配置
  • partial:用于标记”尚未完成的流式输出”,是一个核心的惰性求值模式

2. 类层次结构

1
2
3
4
5
Graph                    ← 基础图:DSL 加载、变量系统、取消机制
└── Canvas ← Agent 画布:完整执行引擎、历史、检索、TTS、文件处理

ComponentBase ← 组件基类(在 component/base.py 中)
└── Begin, Retrieval, Generate, Message, Switch, Loop, Iteration, ... ← 具体组件

Graph 是抽象基类,定义了图的基本操作,其 run() 方法只是 raise NotImplementedError()所有真正的执行逻辑都在 Canvas


3. Graph 类(第 43-282 行)

3.1 DSL 数据结构

Graph/Canvas 的核心数据模型是一个称为 DSL 的字典结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
dsl = {
"components": { # 组件映射表,key 为组件实例ID
"begin": {
"obj": { # 组件对象及其参数
"component_name": "Begin",
"params": {},
},
"downstream": ["answer_0"], # 下游组件 ID 列表
"upstream": [], # 上游组件 ID 列表
},
"retrieval_0": {
"obj": { "component_name": "Retrieval", "params": {} },
"downstream": ["generate_0"],
"upstream": ["answer_0"],
},
# ... 更多组件
},
"history": [], # 对话历史
"path": ["begin"], # 当前执行路径(组件 ID 的有序列表)
"retrieval": [ # 检索结果栈
{"chunks": [], "doc_aggs": []}
],
"globals": { # 全局变量
"sys.query": "",
"sys.user_id": tenant_id,
"sys.conversation_turns": 0,
"sys.files": []
}
}

3.2 __init__ 方法

1
2
3
4
5
6
7
8
9
10
def __init__(self, dsl: str, tenant_id=None, task_id=None, custom_header=None):
self.path = []
self.components = {}
self.error = ""
self.dsl = normalize_chunker_dsl(json.loads(dsl)) # ① DSL迁移
self._tenant_id = tenant_id
self.task_id = task_id if task_id else get_uuid()
self.custom_header = custom_header
self._thread_pool = ThreadPoolExecutor(max_workers=5) # ② 线程池
self.load()

执行流程:

  1. DSL 迁移normalize_chunker_dsl() 自动将旧版命名转换(如 SplitterTokenChunker
  2. 线程池:创建 max_workers=5 的线程池,用于执行同步阻塞的组件 _invoke()
  3. 加载:调用 load() 实例化所有组件

3.3 load() 方法(第 96-111 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
def load(self):
self.components = self.dsl["components"]
cpn_nms = set([])
for k, cpn in self.components.items():
cpn_nms.add(cpn["obj"]["component_name"])
param = component_class(cpn["obj"]["component_name"] + "Param")()
param.update(cpn["obj"]["params"])
try:
param.check()
except Exception as e:
raise ValueError(self.get_component_name(k) + f": {e}")
cpn["obj"] = component_class(cpn["obj"]["component_name"])(self, k, param)
self.path = self.dsl["path"]

这是 DSL 编译的核心,分三步:

步骤 操作 说明
1 component_class(name + "Param")() 动态加载参数类(如 RetrievalParam),实例化
2 param.update(params)param.check() 将 JSON 参数写入对象并进行校验
3 component_class(name)(self, k, param) 动态加载组件类(如 Retrieval),传入 canvas 引用、ID 和参数

设计要点:组件实例化时接收 self(即 Graph/Canvas 本身),因此每个组件都持有对图的反向引用(self._canvas),可以访问全局变量、其他组件输出等。

3.4 变量引用系统

RAGFlow 的 DSL 使用一种模板语法引用其他组件的输出:

1
{component_id@variable_name.sub_field}

支持的变量前缀:

  • sys.* — 系统全局变量(如 sys.query
  • env.* — 环境变量/用户定义变量(如 env.temperature
  • cid@var — 另一个组件的输出(如 retrieval_0@content

get_value_with_variable()(第 168-193 行)

替换字符串中的变量引用,返回最终值:

1
pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*")

正则匹配 {xxx}{{xxx}} 两种格式的变量引用,然后逐段替换。

特殊处理:

  • 如果值是 partial(惰性生成器),则消费所有 chunk 拼接为字符串
  • 如果值是字符串则直接替换
  • 其他类型转为 JSON 字符串

get_variable_value()(第 195-210 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
def get_variable_value(self, exp: str) -> Any:
exp = exp.strip("{").strip("}").strip(" ").strip("{").strip("}")
if exp.find("@") < 0:
return self.globals[exp] # sys.* 或 env.* 全局变量
cpn_id, var_nm = exp.split("@")
cpn = self.get_component(cpn_id)
parts = var_nm.split(".", 1)
root_key = parts[0]
rest = parts[1] if len(parts) > 1 else ""
root_val = cpn["obj"].output(root_key) # 调用组件的 output() 方法
if not rest:
return root_val
return self.get_variable_param_value(root_val, rest) # 深度取值

取值路径:从组件对象的 _param.outputs 字典中按 key 取顶层值,再按 . 分隔的路径做深度访问。

get_variable_param_value()(第 212-239 行)

支持字典、列表(索引)、对象属性三种深度访问方式:

1
2
3
4
5
6
7
for key in path.split('.'):
if isinstance(cur, dict):
cur = cur.get(key) # 字典按键访问
elif isinstance(cur, (list, tuple)):
cur = cur[int(key)] # 列表按索引访问
else:
cur = getattr(cur, key, None) # 对象按属性访问

set_variable_value() / set_variable_param_value()(第 241-271 行)

写回变量值,同样支持深度路径写入。

3.5 序列化:__str__()(第 113-132 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def __str__(self):
self.dsl["path"] = self.path
self.dsl["task_id"] = self.task_id
dsl = {"components": {}}
for k in self.dsl.keys():
if k in ["components"]:
continue
dsl[k] = deepcopy(self.dsl[k])
for k, cpn in self.components.items():
dsl["components"][k] = {}
for c in cpn.keys():
if c == "obj":
dsl["components"][k][c] = json.loads(str(cpn["obj"]))
else:
dsl["components"][k][c] = deepcopy(cpn[c])
return json.dumps(dsl, ensure_ascii=False)

将运行时状态序列化回 JSON 字符串。关键点:

  • 组件对象通过 str(cpn["obj"]) 序列化,即调用 ComponentBase.__str__(),输出 {"component_name": "...", "params": {...}}
  • 其他元数据(upstream、downstream、parent_id 等)使用 deepcopy 防止引用污染

3.6 取消机制(第 273-282 行)

基于 Redis 实现,不使用数据库轮询:

1
2
3
4
5
def is_canceled(self) -> bool:
return has_canceled(self.task_id) # 检查 Redis key: {task_id}-cancel

def cancel_task(self) -> bool:
REDIS_CONN.set(f"{self.task_id}-cancel", "x") # 设置取消信号

这是一个协作式取消模式:被取消的任务不会立即被杀死,而是在每次 is_canceled() 检查点主动抛出 TaskCanceledException


4. Canvas 类(第 285-848 行)

Canvas 继承 Graph,是完整的 Agent 执行引擎。

4.1 __init__ 方法(第 287-298 行)

1
2
3
4
5
6
7
8
9
10
11
12
def __init__(self, dsl: str, tenant_id=None, task_id=None, canvas_id=None, custom_header=None):
self.globals = {
"sys.query": "",
"sys.user_id": tenant_id,
"sys.conversation_turns": 0,
"sys.files": [],
"sys.history": [],
"sys.date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
}
self.variables = {}
super().__init__(dsl, tenant_id, task_id, custom_header=custom_header)
self._id = canvas_id

新增属性:

属性 类型 说明
globals dict 全局变量,包含 sys.*env.* 两种命名空间
variables dict 用户自定义的环境变量定义(类型、默认值等)
_id str Canvas 持久化 ID
history list[tuple] 对话历史,格式为 [(role, content), ...]
retrieval list[dict] 检索结果栈,每个元素为 {"chunks": {}, "doc_aggs": {}}
memory list[tuple] 长期记忆,格式为 [(user_msg, assist_msg, summary), ...]

4.2 load() 方法(第 300-324 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
def load(self):
super().load()
self.history = self.dsl["history"]
if "globals" in self.dsl:
self.globals = self.dsl["globals"]
# 确保必要的 key 存在
if "sys.history" not in self.globals:
self.globals["sys.history"] = []
if "sys.date" not in self.globals:
self.globals["sys.date"] = datetime.datetime.now(...)
# ...
self.retrieval = self.dsl["retrieval"]
self.memory = self.dsl.get("memory", [])

从 DSL 恢复持久化的全局变量和历史记录,同时确保必要字段存在(向后兼容)。

4.3 reset() 方法(第 332-373 行)

1
2
3
4
5
6
7
8
9
10
11
def reset(self, mem=False):
super().reset()
if not mem:
self.history = []
self.retrieval = []
self.memory = []
for k in self.globals.keys():
if k.startswith("sys."):
# 按类型重置为默认值("" / 0 / 0.0 / [] / {})
if k.startswith("env."):
# 按用户定义的变量类型重置

重置整个画布状态。mem=True 时保留对话历史和检索结果(用于同一对话的多轮交互)。

4.4 核心执行引擎:run() 方法(第 375-669 行)

这是整个文件最核心的方法,约 300 行。它是一个 async generator,通过 yield 输出实时事件流。

整体执行流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
用户输入 (query, files, inputs)


┌──────────────────────┐
│ 1. 初始化阶段 │
│ - 设置 sys.date │
│ - 处理 Webhook 输入 │
│ - 处理文件上传 │
│ - conversation_turns++│
│ - 取消检查 │
│ - yield: workflow_started│
└──────────┬───────────┘


┌──────────────────────┐
│ 2. 执行循环 (while) │
│ ┌────────────────┐ │
│ │ yield: node_started│
│ │ (每个节点) │
│ │ _run_batch() │ │ ← 并发执行一组节点
│ │ ├─ begin 节点 │ │
│ │ ├─ 中间节点 │ │
│ │ └─ message 节点 │ │
│ │ post-processing │ │
│ │ ├─ message → 流式│
│ │ ├─ categorize → │ │
│ │ ├─ switch → 分支 │
│ │ ├─ iteration → │
│ │ └─ loop → 循环 │
│ │ 错误处理 │ │
│ │ 路径推进 │ │
│ └────────────────┘ │
│ 检查 UserFillUp │
└──────────┬───────────┘


┌──────────────────────┐
│ 3. 结束阶段 │
│ yield: workflow_finished│
└──────────────────────┘

事件装饰器(第 414-423 行)

1
2
3
4
5
6
7
8
9
def decorate(event, dt):
nonlocal created_at
return {
"event": event,
"message_id": self.message_id,
"created_at": created_at,
"task_id": self.task_id,
"data": dt
}

所有 yield 出去的事件都经过此装饰器,统一添加 message_idtask_idcreated_at 等元数据。

事件类型:

事件 说明
workflow_started 工作流开始执行
node_started 节点开始执行
node_finished 节点执行完成(含输入、输出、耗时)
message 流式消息块(支持 start_to_think/end_to_think 标记)
message_end 消息结束(含参考文档、状态)
user_inputs 需要用户补充输入(UserFillUp 交互)
workflow_finished 工作流执行完成

输入初始化(第 376-412 行)

1
2
3
4
5
6
7
8
9
self.add_user_input(kwargs.get("query"))
# ...
for k in kwargs.keys():
if k in ["query", "user_id", "files"] and kwargs[k]:
if k == "files":
self.globals[f"sys.{k}"] = await self.get_files_async(kwargs[k], ...)
else:
self.globals[f"sys.{k}"] = kwargs[k]
self.globals["sys.conversation_turns"] += 1
  • 文件上传通过 FileService 异步解析(支持图片→base64、文档→文本提取)
  • 支持 Webhook 模式的 Begin 组件,直接注入 payload

核心执行循环(第 502-651 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
idx = len(self.path) - 1     # 当前批次的起始索引
while idx < len(self.path): # 直到没有新节点加入
to = len(self.path) # 当前批次的结束索引
# ① yield node_started 事件
for i in range(idx, to):
yield decorate("node_started", {...})

# ② 批量并发执行
await _run_batch(idx, to)

# ③ post-processing:逐节点处理输出、错误、路径推进
for i in range(idx, to):
# ... 处理 Message 流式输出
# ... 处理错误和异常跳转
# ... 推进路径(下游节点加入 self.path)

# ④ UserFillUp 检查
if any(component_name == "userfillup" for ...):
yield decorate("user_inputs", {...})
return # 等待用户输入
idx = to
_run_batch 并发执行器(第 437-484 行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def _run_batch(f, t):
loop = asyncio.get_running_loop()
tasks = []
max_concurrency = 5
sem = asyncio.Semaphore(max_concurrency) # 限制最大并发数为 5

async def _invoke_one(cpn_obj, sync_fn, call_kwargs, use_async):
async with sem:
if use_async:
await cpn_obj.invoke_async(...) # 异步调用
else:
await loop.run_in_executor( # 线程池调用
self._thread_pool,
partial(sync_fn, ...)
)
# ...
await asyncio.gather(*tasks)

并发策略

  • 使用 asyncio.Semaphore(5) 限制最大并发数
  • 异步组件(有 _invoke_async 协程)直接在事件循环中执行
  • 同步组件通过 run_in_executor 投递到线程池执行
  • 每个 batch 包含从 idxto 的所有节点,并发执行
依赖检查(第 466-470 行)
1
2
3
4
5
6
for _, ele in cpn.get_input_elements().items():
if isinstance(ele, dict) and ele.get("_cpn_id") and \
ele.get("_cpn_id") not in self.path[:i]:
self.path.pop(i) # 依赖未就绪,从路径中移除
t -= 1
break

在执行前检查每个节点的输入依赖:如果某个输入引用的组件尚未执行完,则将该节点从当前批次中移除,等待下一轮。

Message 流式输出(第 518-577 行)

1
2
3
4
5
6
7
if cpn_obj.component_name.lower() == "message":
if isinstance(cpn_obj.output("content"), partial):
stream = cpn_obj.output("content")()
# 处理流式输出,支持:
# - <think></think> 标签 → start_to_think / end_to_think
# - TTS 音频合成(每 16 个字符触发一次)
# - audio_binary 字段传递 base64 编码的音频

Message 组件是唯一产生用户可见输出的组件。核心设计:

  • partial 模式partial 是 Python 的 functools.partial,这里用作”惰性生成器”标记——如果 output 值是 partial 类型,表示这是一个需要被消费的生成器
  • 流式消费:逐 chunk 消费生成器,通过 yield decorate("message", ...) 实时推送给前端
  • TTS 集成:Message 配置 auto_play=True 时,自动调用 TTS 模型将文本转为语音

路径推进逻辑(第 601-629 行)

不同类型的组件有不同的下游推进逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 迭代/循环内部项结束 → 回到父迭代器
if cpn_obj.component_name.lower() in ("iterationitem","loopitem") and cpn_obj.end():
iter = cpn_obj.get_parent()
_extend_path(self.get_component(cpn["parent_id"])["downstream"])

# 分类/分支组件 → 使用 _next 输出决定路径
elif cpn_obj.component_name.lower() in ["categorize", "switch"]:
_extend_path(cpn_obj.output("_next"))

# 迭代/循环开始 → 进入内部第一个节点
elif cpn_obj.component_name.lower() in ("iteration", "loop"):
_append_path(cpn_obj.get_start())

# 退出循环 → 到循环体的下游
elif cpn_obj.component_name.lower() == "exitloop":
_extend_path(self.get_component(cpn["parent_id"])["downstream"])

# 有父组件的叶子节点 → 回到父组件的 start
elif not cpn["downstream"] and cpn_obj.get_parent():
_append_path(cpn_obj.get_parent().get_start())

# 默认 → 推进到 downstream 列表
else:
_extend_path(cpn["downstream"])

异常处理(第 579-589 行)

1
2
3
4
5
6
7
8
9
10
if cpn_obj.error():
ex = cpn_obj.exception_handler()
if ex and ex["goto"]:
self.path.extend(ex["goto"]) # 跳转到指定的异常处理路径
other_branch = True
elif ex and ex["default_value"]:
yield decorate("message", {"content": ex["default_value"]})
# 输出默认值并继续
else:
self.error = cpn_obj.error() # 标记全局错误,终止执行

4.5 TTS 文字转语音(第 683-717 行)

1
2
3
4
5
6
7
8
def tts(self, tts_mdl, text):
def clean_tts_text(text: str) -> str:
# 清理控制字符、emoji、多余空格
# 截断到 500 字符
if not tts_mdl or not text:
return None
text = clean_tts_text(text)
return synthesize_with_cache(tts_mdl, text)

TTS 合成逻辑:

  1. 文本清洗:移除控制字符、emoji,标准化空格
  2. 长度限制:最多 500 字符
  3. 缓存:通过 synthesize_with_cache 避免重复合成相同文本
  4. 流式触发:在 Message 的流式输出中,每累积 16 个字符触发一次 TTS

4.6 文件处理(第 752-778 行)

1
2
3
4
5
6
7
8
9
10
async def get_files_async(self, files, layout_recognize=None) -> list[str]:
# 图片文件 → image_to_base64 (data:image/...;base64,...)
# 文档文件 → FileService.parse (文本提取)
tasks = []
for file in files:
if file["mime_type"].find("image") >= 0:
tasks.append(loop.run_in_executor(..., image_to_base64, file))
else:
tasks.append(loop.run_in_executor(..., parse_file, file))
return await asyncio.gather(*tasks)

提供异步和同步两个版本的接口:

  • get_files_async():在事件循环中异步执行
  • get_files():同步包装器,从同步组件 invoke 路径调用时使用

4.7 Tool Use 回调(第 780-802 行)

1
2
3
4
def tool_use_callback(self, agent_id, func_name, params, result, elapsed_time=None):
# 记录每个组件调用工具的情况到 Redis
# key: {task_id}-{message_id}-logs, TTL: 10分钟
# 结构: [{"component_id": ..., "trace": [{path, tool_name, arguments, result, elapsed_time}]}]

用于 AgentWithTools 组件的工具调用追踪,结果存储在 Redis 中供前端展示。

4.8 参考文档管理(第 804-838 行)

1
2
3
4
5
6
7
8
9
10
11
12
13
def add_reference(self, chunks, doc_infos):
# 去重后添加到 retrieval 栈的最顶层
r = self.retrieval[-1]
for ck in chunks_format({"chunks": chunks}):
cid = hash_str2int(ck["id"], 500)
if cid not in r:
r["chunks"][cid] = ck
for doc in doc_infos:
if doc["doc_name"] not in r:
r["doc_aggs"][doc["doc_name"]] = doc

def get_reference(self):
return self.retrieval[-1] if self.retrieval else {"chunks": {}, "doc_aggs": {}}

检索到的文档块和聚合信息存储在 retrieval 栈中,用于构建最终的引用列表。

4.9 对话历史管理(第 719-732 行)

1
2
3
4
5
6
7
def get_history(self, window_size):
# 返回最近 window_size 轮的对话
# window_size <= 0 返回空列表

def add_user_input(self, question):
self.history.append(("user", question))
self.globals["sys.history"].append(f"{self.history[-1][0]}: {self.history[-1][1]}")

历史记录是双写的:同时保存在 self.historyself.globals["sys.history"] 中。


5. 组件执行流程(component/base.py 补充)

Canvas 不直接调用组件的业务逻辑,而是通过 ComponentBase 定义的标准接口:

invoke 调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
Canvas._run_batch()
→ ComponentBase.invoke(**kwargs) # 同步入口
→ set_output("_created_time", ...)
→ _invoke(**kwargs) # 业务逻辑
→ set_output("_elapsed_time", ...)
→ return output()

Canvas._run_batch()
→ ComponentBase.invoke_async(**kwargs) # 异步入口
→ check_if_canceled()
→ _invoke_async(**kwargs) 或 _invoke()
→ set_output("_elapsed_time", ...)
→ return output()

输入解析(get_input()

1
2
3
4
5
6
7
8
9
10
11
12
def get_input(self, key=None):
for var, o in self.get_input_elements().items():
v = self.get_param(var)
if isinstance(v, str) and self._canvas.is_reff(v):
self.set_input_value(var, self._canvas.get_variable_value(v))
elif isinstance(v, str) and re.search(self.variable_ref_patt, v):
elements = self.get_input_elements_from_text(v)
kv = {k: e.get('value', '') for k, e in elements.items()}
self.set_input_value(var, self.string_format(v, kv))
else:
self.set_input_value(var, v)
return res

输入解析的三种路径:

  1. 纯变量引用(如 "retrieval_0@content")→ 直接取值
  2. 模板字符串(如 "查询结果:{retrieval_0@content}")→ 解析变量后格式化
  3. 常量值→ 直接使用

6. DSL 迁移机制(dsl_migration.py)

normalize_chunker_dsl() 实现了 DSL 的向后兼容:

1
2
3
4
5
6
7
8
COMPONENT_RENAMES = {
"Splitter": "TokenChunker",
"HierarchicalMerger": "TitleChunker",
"PDFGenerator": "DocGenerator",
}
NODE_TYPE_RENAMES = {
"splitterNode": "chunkerNode",
}

迁移范围:

迁移目标 说明
components 的 key Splitter:abcTokenChunker:abc
obj.component_name SplitterTokenChunker
downstream/upstream ID 所有旧 ID 替换为新 ID
parent_id 指向旧 ID 的替换为新 ID
path 路径中的旧 ID 替换
graph.nodes node ID、parentId、type、label、name 全面更新
graph.edges source、target、id 更新
变量引用中的 ID 模板 {Splitter:abc@var}{TokenChunker:abc@var}

7. 并发模型总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  asyncio Event Loop (主线程)
┌─────────────────────────────┐
│ │
│ Canvas.run() [async gen] │
│ │ │
│ _run_batch() [coroutine] │
│ │ │
│ asyncio.gather( │
│ Semaphore(5) │
│ ├─ invoke_async() │ ← 异步组件直接 await
│ ├─ run_in_executor(...) │ ← 同步组件→线程池
│ ├─ run_in_executor(...) │
│ └─ ... │
│ │ │
│ yield decorate(event) │ ← 事件流输出
│ │
└──────────┬──────────────────┘

ThreadPoolExecutor(max_workers=5)
┌─────────────────────────────┐
│ Worker-1: LLM API call │
│ Worker-2: Retrieval query │
│ Worker-3: File parse │
│ Worker-4: TTS synthesis │
│ Worker-5: (idle) │
└─────────────────────────────┘

关键设计决策:

  1. 异步优先:主执行循环是 async generator,支持流式输出和并发执行
  2. 线程池隔离:同步阻塞操作(LLM HTTP 调用、文件 I/O)在线程池中执行,不阻塞事件循环
  3. 信号量限流Semaphore(5) 限制并发组件数,防止资源耗尽
  4. 协作式取消:关键检查点处调用 is_canceled() 抛出异常终止

8. 关键设计模式

8.1 Partial 惰性求值

functools.partial 在 RAGFlow 中被用作流式输出句柄

1
2
3
4
5
6
7
8
9
# 组件设置输出为一个 partial(包装了生成器/迭代器)
cpn_obj.set_output("content", partial(iter_chunks, generator))

# Canvas 检测到 partial,逐 chunk 消费
if isinstance(cpn_obj.output("content"), partial):
stream = cpn_obj.output("content")()
for m in stream:
yield decorate("message", {"content": m})
cpn_obj.set_output("content", _m) # 最终替换为完整字符串

这允许 Message 组件先产出完整的 node_finished 事件,而内容通过后续的 message 事件逐步推送。

8.2 路径驱动的执行模型

Canvas 不使用显式的拓扑排序(如 BFS/DFS),而是采用增量路径追加模型:

  • self.path 是一个有序列表,记录将要执行的组件 ID
  • 每个组件执行完成后,其 downstream 被追加到 path 末尾
  • 特殊组件(Switch、Categorize、Iteration、Loop)有自定义的路径推进逻辑
  • 依赖检查确保引用的上游组件已执行

8.3 DSL 序列化与组件实例的双向同步

1
2
3
4
5
6
7
JSON DSL (字符串)                   运行时对象
┌──────────┐ ┌──────────┐
│ dsl dict │ ─── load() ───→ │ components│
│ (JSON) │ │ (dict) │
│ │ ←── __str__() ── │ .obj │
└──────────┘ │ (实例) │
└──────────┘

load() 时从 JSON 创建组件实例,__str__() 时将组件实例序列化回 JSON。


9. 交互流程示例

以最简单的 RAG 流程为例:Begin → Retrieval → Generate → Message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1. Client 发送: {query: "什么是RAG?", files: [...]}
2. Canvas.run() 启动
3. self.path = ["begin"]
4. Batch 0: 执行 Begin 组件
- 将 query 写入 sys.query
- 将 files 解析后写入 sys.files
- downstream = ["retrieval_0"]
5. self.path = ["begin", "retrieval_0"]
6. Batch 1: 执行 Retrieval 组件
- 读取 sys.query
- 查询向量数据库,返回 chunks
- downstream = ["generate_0"]
7. self.path = ["begin", "retrieval_0", "generate_0"]
8. Batch 2: 执行 Generate 组件
- 读取 retrieval_0@chunks 和 sys.query
- 构建 Prompt,调用 LLM
- downstream = ["message_0"]
9. self.path = [..., "message_0"]
10. Batch 3: 执行 Message 组件(流式)
- yield message 事件(逐 token)
- yield message_end 事件
11. yield workflow_finished

10. 主要组件类型一览

组件名 文件 功能
Begin begin.py 工作流入口,定义模式(Chat/Agent/Webhook)和输入参数
Message message.py 输出消息给用户,支持流式 + TTS
LLM llm.py 通用 LLM 调用组件
AgentWithTools agent_with_tools.py Agent + 工具调用(Function Calling)
Retrieval (在 rag/flow 中) 知识库检索
Generate (在 rag/flow 中) RAG 生成回答
Categorize categorize.py 多路分类路由
Switch switch.py 条件分支
Iteration/IterationItem iteration.py/iterationitem.py 迭代执行
Loop/LoopItem/ExitLoop loop.py/loopitem.py/exit_loop.py 循环执行
UserFillUp fillup.py 暂停等待用户补充输入
VariableAssigner variable_assigner.py 变量赋值
VariableAggregator variable_aggregator.py 变量聚合
DocGenerator docs_generator.py 文档生成
ExcelProcessor excel_processor.py Excel 处理
Browser browser.py 网页浏览/爬取
Invoke invoke.py 调用其他画布/工作流
DataOperations data_operations.py 数据转换操作
StringTransform string_transform.py 字符串转换
ListOperations list_operations.py 列表操作

11. 关键配置与限制

配置项 默认值 说明
_thread_pool.max_workers 5 线程池大小,控制同步组件并发数
Semaphore(5) 5 异步信号量,控制组件并发执行数
MAX_CONCURRENT_CHATS 10 环境变量,全局并发对话限制
COMPONENT_EXEC_TIMEOUT 600s 环境变量,单组件执行超时
TTS 缓冲区 16 字符 流式 TTS 触发的字符累积阈值
TTS 最大文本长度 500 字符 单次 TTS 合成的最大文本
Tool use logs TTL 10 分钟 Redis 中工具调用日志过期时间
PARAM_MAXDEPTH (settings) 参数嵌套最大深度

12. 线程安全与注意事项

  1. 组件状态隔离:每个组件的 _param.inputs_param.outputs 是独立的,但通过 Canvas 的变量系统可以读写其他组件的输出
  2. 并发写入冲突:多个组件并发执行时,如果同时 set_variable_value 写入同一组件的同一 output key,可能存在竞态条件(实际使用中很少出现,因为组件依赖关系天然避免了大部分冲突)
  3. 事件循环绑定get_files_async 必须在运行中的事件循环内调用,get_files() 同步包装器通过 asyncio.run_coroutine_threadsafe 处理跨线程场景
  4. Redis 依赖:取消机制、工具调用日志都依赖 Redis,Redis 不可用时这些功能会静默失败(通过 try/except 处理)

RAG FLOW介绍

RAG 管理平台 — 变更汇总

概述

在 RAGFlow 基础上新增一个 RAG 前端管理平台,包含 5 个功能模块:概览、语义搜索、文档管理、配置管理、构建管理。

共涉及 20 个文件(14 个新增,6 个修改)


一、新增后端文件

1. api/apps/restful_apis/management_api.py

管理平台 API 端点,共 10 个接口,自动注册为 Flask Blueprint(前缀 /api/v1)。

方法 路由 用途
GET /management/overview 聚合所有数据集的概览统计
POST /management/search 跨数据集的语义搜索 + 质量评分
GET /management/documents 跨数据集的文档列表
GET /management/configs 所有数据集的配置列表
POST /management/configs/validate 验证配置参数
POST /management/builds 触发全量/增量重建
GET /management/builds 构建历史列表
GET /management/builds/<id> 构建详情
GET /management/builds/<id>/progress SSE 实时进度流
POST /management/builds/<id>/cancel 取消构建

2. api/apps/services/management_api_service.py

业务逻辑层,包含:

  • BuildRecordService — 构建记录的 CRUD
  • get_overview() — 跨数据集聚合:文档数、段落数、Token 数、类型分布、解析状态、组件健康
  • search_management() — 调用现有检索逻辑,附加质量评分计算(置信度/准确度/一致性/覆盖率)
  • list_management_documents() — 跨数据集文档查询,支持多条件筛选
  • list_management_configs() — 所有数据集配置导出
  • validate_config_management() — 配置参数校验
  • create_build() / list_builds() / get_build_detail() / cancel_build() — 构建全生命周期

3. api/db/db_models.py(修改)

新增 BuildRecord 模型(继承 DataBaseModel),字段如下:

字段 类型 说明
id CharField(32) PK 构建记录 ID
tenant_id CharField(32) 租户 ID
type CharField(16) full / incremental
status CharField(16) pending / running / completed / failed / cancelled
dataset_ids JSONField 数据集 ID 列表
progress IntegerField 进度 0-100
progress_msg CharField(255) 当前步骤描述
current_document CharField(255) 正在处理的文件
processed_documents IntegerField 已处理文档数
total_documents IntegerField 总文档数
chunk_count_before IntegerField 构建前段落数
chunk_count_after IntegerField 构建后段落数
started_at DateTimeField 开始时间
completed_at DateTimeField 完成时间
triggered_by CharField(32) 触发用户
error_message TextField 错误信息

表名:build_record,启动时自动创建。


二、新增前端文件(14 个)

页面组件(6 个)

1
2
3
4
5
6
7
web/src/pages/management/
├── layout.tsx # 侧边栏布局(5 个导航项)
├── overview/index.tsx # 模块1:概览仪表盘
├── semantic-search/index.tsx # 模块2:语义搜索
├── document-management/index.tsx # 模块3:文档管理
├── configuration-management/index.tsx # 模块4:配置管理
└── build-management/index.tsx # 模块5:构建管理

模块 1:概览 (overview/index.tsx)

  • 4 张统计卡片(总文档数、总段落数、总 Token 数、数据集数)
  • 文档类型分布饼图(Recharts PieChart)
  • 解析状态柱状图(Recharts BarChart)
  • 组件健康状态列表(MySQL / ES / Redis / MinIO)
  • 30 秒自动刷新数据

模块 2:语义搜索 (semantic-search/index.tsx)

  • 搜索输入框(debounce 500ms + Enter 触发)
  • 搜索结果列表(高亮匹配文本)
  • 质量评分雷达图(置信度/准确率/一致性/覆盖率)
  • 文档聚合标签侧边栏

模块 3:文档管理 (document-management/index.tsx)

  • 文档列表表格(TanStack Table)支持关键词搜索
  • 按文档类型/状态筛选
  • 点击行 → 右侧 Sheet 抽屉展示段落列表
  • 段落编辑 Dialog(react-hook-form)
  • 段落删除(二次确认)

模块 4:配置管理 (configuration-management/index.tsx)

  • 每个数据集一张配置卡片
  • chunk_size Slider(128-2048)
  • 嵌入模型展示(只读)
  • 相似度阈值配置
  • 验证 + 保存按钮
  • 内联验证结果展示(绿勾/红叉)

模块 5:构建管理 (build-management/index.tsx)

  • 构建类型选择(全量/增量)
  • 一键触发构建按钮
  • 活动构建进度面板(ProgressBar + SSE 实时更新)
  • 取消构建按钮
  • 构建历史表格(构建ID / 类型 / 状态 / 进度 / 时间)
  • 5 秒轮询刷新(有运行中构建时)

服务与状态管理(4 个)

1
2
3
4
5
6
7
8
9
10
web/src/
├── services/management-service.ts # Axios 服务层 + SSE 流式连接
├── store/management-store.ts # Zustand:侧边栏状态、构建运行标志
├── constants/management.ts # 状态颜色映射、常量配置
└── hooks/
├── use-management-overview.ts # 概览数据查询(30s 轮询)
├── use-management-search.ts # 搜索 mutation(debounce)
├── use-management-docs.ts # 文档/段落查询与变更
├── use-management-config.ts # 配置查询与变更
└── use-management-build.ts # 构建 CRUD + SSE 进度流

三、修改的前端文件(5 个)

1. web/src/routes.tsx

  • 新增 Management 相关 6 个路由枚举值
  • 新增 Management 路由组(懒加载),包含 5 个子路由
  • 默认重定向到 /management/overview

2. web/src/locales/en.ts

  • 新增 management 命名空间,约 60 个英文翻译键

3. web/src/locales/zh.ts

  • 新增 management 命名空间,约 60 个中文翻译键

4. web/src/layouts/components/global-navbar.tsx

  • PathMap 新增 Management 路径映射
  • menuItems 新增 Management 导航项

四、数据流设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[Page Component]
│ 调用

[Custom Hook (TanStack Query)]
│ 调用

[Service (Axios)]
│ HTTP

[Backend API (/api/v1/management/*)]
│ 调用

[Service Layer (management_api_service.py)]
│ 调用

[DB Services (DocumentService / KnowledgebaseService / BuildRecordService)]

状态管理决策

状态类型 工具 适用场景
服务端数据 TanStack Query 列表、详情、搜索结果
全局 UI 状态 Zustand 侧边栏折叠、构建运行标志
表单状态 react-hook-form + zod 配置编辑、段落编辑
瞬时 UI 状态 useState 搜索输入值、模态框开关

五、技术栈

与 RAGFlow 现有前端完全一致:

  • 框架:React 18 + TypeScript
  • 构建:Vite 7
  • 路由:React Router v7(createBrowserRouter + lazy loading)
  • UI 组件:shadcn/ui(Radix primitives + Tailwind CSS)
  • 数据获取:TanStack Query(React Query v5)
  • 全局状态:Zustand
  • 表单:react-hook-form + zod
  • 表格:@tanstack/react-table
  • 图表:Recharts(PieChart / BarChart / RadarChart)
  • HTTP:Axios(新服务)+ 原生 fetch(SSE 流)
  • 国际化:react-i18next
  • 实时通信:Server-Sent Events

六、启动说明

1
2
3
4
5
6
7
8
# 后端(management_api.py 会被自动发现注册)
docker compose -f docker/docker-compose.yml up -d

# 前端
cd web
npm install
npm run dev
# 访问 http://localhost:5173/management

导航栏会自动显示 Management 入口,点击进入管理平台。

RAG FLOW文档切分

RAGFlow 文档切分逻辑详解

整体架构

文档切分是 RAGFlow 检索管线中最核心的环节。RAGFlow 中有两套切分实现,共享底层算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
用户上传文档


┌─ rag/app/naive.py::chunk() ◄── 经典路径,文档首次上传
│ │
│ ▼
│ deepdoc/parser/* 解析文档 → sections 列表
│ │
│ ▼
│ rag/nlp/__init__.py::naive_merge() ← 核心合并算法
│ │
│ ▼
│ rag/nlp/__init__.py::tokenize_chunks() ← 最终 token 化

└─ rag/flow/chunker/token_chunker.py::TokenChunker ◄── Pipeline 组件路径


_build_json_chunks()
→ _merge_text_chunks_by_token_size()
→ _finalize_json_chunks()

两条路径的底层合并逻辑完全相同,区别在于:

维度 naive.py::chunk() TokenChunker
定位 一体式函数 Pipeline 组件
流程 解析 + 合并 + token 化一把梭 拆成多个阶段,可被 Agent 画布编排
适用 文档首次上传 Agent 工作流中的切分节点

完整数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
原始文档 (PDF/Word/Markdown/...)


deepdoc 解析器
│ 输出: sections = [(text, position_tag), ...]


┌── naive_merge() ──────────────────────────────────┐
│ │
│ 遍历 sections,逐段累计 token 数 │
│ 到达 chunk_token_num 阈值时切出新 chunk │
│ 如有 overlap,新 chunk 头部带上旧 chunk 的尾部 │
│ 如有自定义分隔符,直接按分隔符切分(跳过合并) │
│ │
│ 输出: chunks = ["文本块1", "文本块2", ...] │
└────────────────────────────────────────────────────┘


tokenize_chunks()
│ - 去掉 PDF 位置标签 @@...##
│ - 调用 rag_tokenizer.tokenize() 做分词
│ - 生成 coarse tokens (content_ltks) 和 fine-grained tokens (content_sm_ltks)
│ - 如有 child_delimiters,对每个 chunk 做二次切分


最终输出 (存入 ES/Infinity):
[
{
"content_with_weight": "文本块1",
"content_ltks": ["token1", "token2", ...], ← 粗粒度 token,用于检索
"content_sm_ltks": ["t", "to", "tok", ...], ← 细粒度 token,用于模糊匹配
"doc_type_kwd": "text", ← 类型标记
"position_int": [(pn, left, right, top, bottom)], ← PDF 坐标
"image": PIL.Image | None, ← 关联图片
},
...
]

第 1 层:解析 → sections

不同文件类型产生不同格式的 sections,最终统一成 [(text, position_info), ...]

PDF

1
2
# 来自 deepdoc/parser/pdf_parser.py
sections = [(b["text"], f"@@{page}\t{left}\t{right}\t{top}\t{bottom}##") for b in boxes]

sections 是 PDF 中每个文本框的内容,附带页码和坐标信息。

Word (DOCX)

1
2
# 来自 rag/app/naive.py::Docx.__call__
sections = [(line.get("text"), line.get("image"), line.get("table")) for line in lines]

Word 的 sections 是文档中的段落、图片和表格的混合序列。

其他格式

格式 解析器 输出
Excel ExcelParser [(cell_text, ""), ...]
Markdown MarkdownParser 按标题/分隔符切分的 blocks
HTML HtmlParser 文本段落列表
TXT TxtParser 按分隔符切分的文本段
JSON JsonParser 结构化 key-value 文本段

第 2 层:naive_merge — 核心合并算法

文件位置: rag/nlp/__init__.py:1070

1
def naive_merge(sections, chunk_token_num=128, delimiter="\n。;!?", overlapped_percent=0):

核心思路

逐段叠加,超限则切:

1
2
3
4
5
6
7
8
sections: [sec1, sec2, sec3, sec4, ...]


遍历每个 section:
1. 计算这个 section 的 token 数
2. 当前 chunk 是否已超过阈值?
→ Yes: 起新 chunk(带上 overlap 尾巴)
→ No: 追加到当前 chunk

切分条件判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def add_chunk(t, pos):
tnum = num_tokens_from_string(t)
# 如果文本太短(< 8 tokens),丢弃位置信息,避免碎片化
if tnum < 8:
pos = ""

# 关键判断:当前 chunk 的 token 数是否超过阈值
# 阈值 = chunk_token_num * (100 - overlapped_percent) / 100
if cks[-1] == "" or tk_nums[-1] > chunk_token_num * (100 - overlapped_percent) / 100.:
# 起新 chunk,先带上旧 chunk 的 overlap 尾巴
if cks:
overlapped = remove_tag(cks[-1])
t = overlapped[int(len(overlapped) * (100 - overlapped_percent) / 100.):] + t
if t.find(pos) < 0:
t += pos
cks.append(t) # 创建新 chunk
tk_nums.append(tnum)
else:
# 追加到当前 chunk(继续累积)
if cks[-1].find(pos) < 0:
t += pos
cks[-1] += t # 累积文本
tk_nums[-1] += tnum

关键设计:

  • 切分阈值 = chunk_token_num * (100 - overlapped_percent) / 100
    • chunk_token_num=512, overlapped_percent=0 → 阈值 = 512
    • chunk_token_num=512, overlapped_percent=10 → 阈值 = 460(提前触发,为 overlap 留空间)

Overlap(重叠)机制

相邻 chunk 之间有文本重叠,避免关键信息在 chunk 边界被截断:

1
2
3
chunk1: "ABCDEFGHIJKLMNOPQRST"    (20 chars, overlapped_percent=20%)
↓ 到达阈值,起新 chunk
chunk2: "QRST" + new_text ← 把 chunk1 后 20% 的内容带过来

实现:

1
2
3
overlapped = remove_tag(cks[-1])         # 去除 PDF 位置标签,得到纯文本
# 取后半部分作为 overlap
t = overlapped[int(len(overlapped) * 0.8):] + t

自定义分隔符模式

1
2
delimiter = "\n。;!?"           ← 默认:换行 + 中文断句符号
delimiter = "\n`##``---`" ← 支持 backtick 包裹的自定义分隔符

当存在 `分隔符` 时,走完全不同的切分逻辑——不再逐段累积合并:

1
2
3
4
5
6
custom_pattern = "##|---"    # 从 `##` 和 `---` 中提取

for sec, pos in sections:
split_sec = re.split(r"(%s)" % custom_pattern, sec)
for sub_sec in split_sec:
cks.append("\n" + sub_sec) # 每个分隔段直接成为独立 chunk

即:自定义分隔符模式下,按分隔符精确切分,每个段都是独立 chunk,不合并。


第 3 层:DOCX 特殊处理 — naive_merge_docx

文件位置: rag/nlp/__init__.py:1463

Word 文档走这条路径,因为其解析结果是图片、表格和文本的混合序列,需要区分 chunk 类型。

3.1 构建类型化 chunks — _build_cks()

1
2
3
4
5
6
7
8
9
10
sections = [(text, image, table), ...]


cks = [
{"text": "...", "ck_type": "text", "tk_nums": 50},
{"text": "...", "ck_type": "image", "tk_nums": 0}, ← 图片不参与 token 合并
{"text": "...", "ck_type": "table", "tk_nums": 120},
{"text": "...", "ck_type": "text", "tk_nums": 80},
...
]

三种 chunk 类型:text(可合并)、image(独立保留)、table(独立保留)。

3.2 附加上下文 — _add_context()

为表格/图片 chunks 附加周围的文本上下文,提升检索体验:

1
2
...text chunk...  |  [IMAGE chunk]  |  ...text chunk...
上下文上限 ↑ 图片本身 上下文下限 ↓
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _add_context(cks, idx, context_size):
# 向前收集文本
while prev >= 0 and remain_above > 0:
if cks[prev]["ck_type"] == "text":
piece = take_sentences_from_end(cks[prev]["text"], remain_above)
parts_above.insert(0, piece)

# 向后收集文本
while after < len(cks) and remain_below > 0:
if cks[after]["ck_type"] == "text":
piece = take_sentences_from_start(cks[after]["text"], remain_below)
parts_below.append(piece)

cks[idx]["context_above"] = "".join(parts_above)
cks[idx]["context_below"] = "".join(parts_below)

上下文收集是按 token 预算精确控制的,从最近的文本 chunk 开始逐块收集,直到满足 context_size 指定的 token 数。

3.3 合并文本 chunks — _merge_cks()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _merge_cks(cks, chunk_token_num, has_custom):
for i in range(len(cks)):
ck_type = cks[i]["ck_type"]

if ck_type == "text":
# 上一个文本 chunk 还没满 → 合并
if prev_text_ck >= 0 \
and merged[prev_text_ck]["tk_nums"] < chunk_token_num \
and not has_custom: # 自定义分隔符模式下不合并
merged[prev_text_ck]["text"] += cks[i]["text"]
merged[prev_text_ck]["tk_nums"] += cks[i]["tk_nums"]
else:
merged.append(cks[i]) # 起新 chunk
else:
merged.append(cks[i]) # 图片/表格独立保留,不合并

图片和表格永远不参与文本合并,它们作为独立 chunk 保留。


第 4 层:TokenChunker — Pipeline 组件版本

文件位置: rag/flow/chunker/token_chunker.py

这是 Agent 画布中的切分组件,参数更丰富,流程更结构化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
_upstream JSON


_build_json_chunks() ← 解析上游 JSON,按类型构建内部 chunk


_attach_context_to_media_chunks() ← 给 image/table 附加上下文文本


_merge_text_chunks_by_token_size() ← token_size 模式下合并小文本 chunks


_split_chunk_docs_by_children() ← 用 children_delimiters 二次切分


_finalize_json_chunks() ← 转成最终输出,写入 PDF 坐标等元数据

三种切分模式

1
2
3
self.delimiter_mode = "token_size"  # 默认:按 token 数累积合并
= "delimiter" # 按分隔符切分
= "one" # 一整块,不切
模式 行为
token_size 逐段累计 token,超限起新块(与 naive_merge 一致)
delimiter 用正则按分隔符切分,不合并
one 整个文档一个 chunk

children_delimiters 二次切分

一次切分后,可以用子分隔符对每个 chunk 做更细粒度的切分:

1
2
3
4
5
6
7
8
9
# 第一次切分:按主要分隔符(如 `\n`)
chunks = _split_text_by_pattern(payload, delimiter_pattern)

# 第二次切分:用 children_delimiters 对每个 chunk 再做切分
if custom_pattern:
for c in cks:
for text in _split_text_by_pattern(c, custom_pattern):
docs.append({"text": text, "mom": c})
# mom 字段保留父 chunk 原文,提供更大范围的上下文

mom 字段的作用:存切分前的父块原文,检索时可以提供原始段落的完整上下文。

媒体上下文附加(新版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _attach_context_to_media_chunks(chunks, table_context_size, image_context_size):
for i, chunk in enumerate(chunks):
if chunk["ck_type"] not in {"table", "image"}:
continue

context_size = image_context_size if chunk["ck_type"] == "image" else table_context_size

# 向前收集文本 chunks,直到满足 token 预算
prev = i - 1
while prev >= 0 and remain_above > 0:
if prev_chunk["ck_type"] == "text":
if prev_chunk["tk_nums"] >= remain_above:
# 只取末尾部分句子
parts_above.insert(0, _take_sentences(prev_chunk["text"], remain_above, from_end=True))
else:
parts_above.insert(0, prev_chunk["text"])

# 向后收集,对称逻辑
after = i + 1
while after < len(chunks) and remain_below > 0:
# ... 对称逻辑

chunk["context_above"] = "".join(parts_above)
chunk["context_below"] = "".join(parts_below)

最终输出:tokenize_chunks / tokenize

所有切分路径最终都通过 tokenize() 函数生成可检索的数据结构:

1
2
3
4
5
6
7
def tokenize(d, txt, eng):
d["content_with_weight"] = txt
# 去掉 HTML 表格标签后做粗粒度分词
t = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>", " ", txt)
d["content_ltks"] = rag_tokenizer.tokenize(t)
# 细粒度分词(用于模糊匹配)
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])

两种 token:

  • content_ltks:粗粒度分词,用于精确检索
  • content_sm_ltks:细粒度分词(字/子词级别),用于模糊匹配和拼写容错

参数总览

参数 默认值 作用 使用位置
chunk_token_num 128 (naive) / 512 (pipeline) 每个 chunk 的最大 token 数 naive_merge, TokenChunker
delimiter \n。;!? 断句分隔符。用 ` 包裹自定义分隔符 naive_merge, _build_cks
overlapped_percent 0 chunk 之间的重叠比例 (0–100) naive_merge, _merge_text_chunks_by_token_size
children_delimiters [] 二次切分分隔符列表 tokenize_chunks, _split_chunk_docs_by_children
table_context_size 0 表格附近附带的文本 token 数 naive_merge_docx, _attach_context_to_media_chunks
image_context_size 0 图片附近附带的文本 token 数 naive_merge_docx, _attach_context_to_media_chunks
delimiter_mode token_size 切分模式:token_size / delimiter / one TokenChunker

核心文件索引

文件 核心内容 行数
rag/nlp/__init__.py naive_merge(), naive_merge_docx(), tokenize_chunks(), tokenize(), split_with_pattern() ~1600
rag/app/naive.py chunk() — 文档上传入口,路由解析器 + 调用 merge ~1150
rag/flow/chunker/token_chunker.py TokenChunker — Pipeline 组件版切分器 ~370
rag/flow/base.py ProcessBase — 流水线组件基类 ~60
deepdoc/parser/pdf_parser.py PDF 解析器,产出 sections ~2500+
common/token_utils.py num_tokens_from_string() — token 计数

一句话总结

文档先被 deepdoc 解析成最小 text sections,然后逐段累加 token 数,超过 chunk_token_num 阈值就切一个新 chunk。同时支持 overlap 避免信息被截断、自定义分隔符精确控制切分边界、以及图片/表格附加上下文增强检索质量。

Hello World

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

RAG FLOW学习

lazyvim for java

lazyvim的java开发环境配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
-- ~/.config/nvim/lua/plugins/java.lua
return {
"mfussenegger/nvim-jdtls",
ft = "java", -- 仅在打开 .java 文件时启动
config = function()
-- 1. 定位 mason 安装的工具路径(自动适配版本/路径)
local mason_path = vim.fn.glob(vim.fn.stdpath("data") .. "/mason/packages/")
local jdtls_path = mason_path .. "jdtls/"
local java_debug_path = vim.fn.glob(mason_path .. "java-debug-adapter/extension/server/com.microsoft.java.debug.plugin-0.53.2.jar")

-- 2. 确保 java-debug bundle 存在
if java_debug_path == "" then
vim.notify("java-debug-adapter 未安装,请执行 :MasonInstall java-debug-adapter", vim.log.levels.ERROR)
return
end

-- 3. jdtls 启动命令(适配 macOS + mason 路径)
local cmd = {
jdtls_path .. "bin/jdtls", -- jdtls 可执行文件路径
"--jvm-arg=-javaagent:" .. jdtls_path .. "lombok.jar", -- 可选:支持 Lombok
}

-- 4. 找到 Java 项目根目录(必须有 mvnw/.gradlew/.git 之一)
local root_dir = vim.fs.dirname(vim.fs.find({ ".gradlew", ".git", "mvnw" }, { upward = true })[1] or vim.fn.getcwd())

-- 5. jdtls 核心配置(加载调试 bundle)
local config = {
cmd = cmd,
root_dir = root_dir,
init_options = {
bundles = { java_debug_path }, -- 加载 java-debug 扩展(关键)
},
settings = {
java = {
configuration = {
runtimes = { -- 可选:指定 Java 运行时(若系统有多个版本)
{
name = "JavaSE-17",
path = "/Library/Java/JavaVirtualMachines/temurin-17.jdk/Contents/Home",
},
},
},
},
},
}

-- 6. 启动 jdtls + 关联 nvim-dap 调试
require("jdtls").start_or_attach(config)
require("jdtls").setup_dap({ hotcodereplace = "auto" }) -- 让 jdtls 给 nvim-dap 提供适配器

-- ========== 新增:快捷键触发生成 Java 调试配置 ==========
-- 定义生成调试配置的核心函数
local generate_java_dap_config = function()
-- 检查 jdtls 是否已正常启动(避免无意义执行)
local jdtls_clients = vim.lsp.get_active_clients({ name = "jdtls" })
if #jdtls_clients == 0 then
vim.notify("JDTLS 未启动,请确保打开的是 Java 项目内的 .java 文件!", vim.log.levels.WARN)
return
end
-- 生成调试配置
require("jdtls.dap").setup_dap_main_class_configs()
vim.notify("Java 调试配置已生成 ✔️", vim.log.levels.INFO)
end

-- 绑定快捷键(<leader>dg 触发,仅在 Java 文件中生效)
-- leader 键默认是空格,即 空格 + d + g 执行生成操作
vim.keymap.set(
"n",
"<leader>dg",
generate_java_dap_config,
{
noremap = true,
silent = true,
buffer = 0, -- 仅在当前 Java 文件缓冲区生效(避免全局冲突)
desc = "生成 Java 调试配置" -- 快捷键描述(兼容 which-key 菜单)
}
)
end,
dependencies = {
"mfussenegger/nvim-dap",
"rcarriga/nvim-dap-ui",
},
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
-- ~/.config/nvim/lua/plugins/dap.lua
return {
"mfussenegger/nvim-dap",
config = function()
local dap = require('dap')
local dapui = require('dapui') -- 导入 dap-ui 模块(依赖已声明,可直接用)

-- 仅保留调试配置项(适配器由 nvim-jdtls 自动提供)
dap.configurations.java = {
{
type = "java",
request = "launch",
name = "Launch Main Class",
mainClass = "${file}", -- 自动识别当前文件主类
projectName = "${workspaceFolderBasename}",
},
{
type = "java",
request = "attach",
name = "Debug (Attach) - Remote",
hostName = "127.0.0.1",
port = 5005,
},
}

-- ========== 新增:调试核心快捷键(全局生效,Java 调试通用) ==========
local map_opts = { noremap = true, silent = true } -- 快捷键基础配置
local keymap = vim.keymap.set

-- 1. 断点相关
keymap("n", "<leader>db", dap.toggle_breakpoint,
vim.tbl_extend("force", map_opts, { desc = "DAP: 切换断点" }))
keymap("n", "<leader>dB", function()
dap.set_breakpoint(vim.fn.input("断点条件: ")) -- 条件断点
end, vim.tbl_extend("force", map_opts, { desc = "DAP: 设置条件断点" }))
keymap("n", "<leader>dr", dap.clear_breakpoints,
vim.tbl_extend("force", map_opts, { desc = "DAP: 清空所有断点" }))

-- 2. 调试流程控制
keymap("n", "<leader>dc", dap.continue,
vim.tbl_extend("force", map_opts, { desc = "DAP: 启动/继续调试" }))
keymap("n", "<leader>ds", dap.step_over,
vim.tbl_extend("force", map_opts, { desc = "DAP: 单步跳过(逐行执行)" }))
keymap("n", "<leader>di", dap.step_into,
vim.tbl_extend("force", map_opts, { desc = "DAP: 单步进入(进入函数)" }))
keymap("n", "<leader>do", dap.step_out,
vim.tbl_extend("force", map_opts, { desc = "DAP: 单步退出(退出函数)" }))
keymap("n", "<leader>dq", dap.terminate,
vim.tbl_extend("force", map_opts, { desc = "DAP: 终止调试会话" }))

-- 3. DAP UI 控制(配合 rcarriga/nvim-dap-ui)
keymap("n", "<leader>du", dapui.toggle,
vim.tbl_extend("force", map_opts, { desc = "DAP: 切换调试UI" }))

-- ========== 可选:自动联动 DAP UI(启动调试时打开,终止时关闭) ==========
dapui.setup() -- 初始化 dap-ui(默认布局,无需额外配置)
dap.listeners.after.event_initialized["dapui_config"] = function()
dapui.open() -- 调试启动 → 自动打开 UI
end
dap.listeners.before.event_terminated["dapui_config"] = function()
dapui.close() -- 调试终止 → 自动关闭 UI
end
dap.listeners.before.event_exited["dapui_config"] = function()
dapui.close() -- 调试退出 → 自动关闭 UI
end
end,
dependencies = { "rcarriga/nvim-dap-ui" },
}

lazyExtras:

mason:

2.png

nvim目录树

3.png

线段树

线段树

leetcode727

最小窗口子序列

题目描述

给定字符串 s1s2,找出 s1 中最短的连续 子串,使得 s2 是该子串的 子序列

如果 s1 中没有窗口可以包含 s2 中的所有字符,返回空字符串 ""。如果有不止一个最短长度的窗口,返回 开始位置最靠左 的那个。

示例 1

1
2
3
4
5
6
输入:
s1 = "abcdebdde", s2 = "bde"
输出:"bcde"
解释:
"bcde" 是答案,因为它在相同长度的字符串 "bdde" 出现之前。
"deb" 不是一个更短的答案,因为在窗口中必须按顺序出现 T 中的元素。

示例 2

1
2
输入:s1 = "jmeqksfrsdcmsiwvaovztaqenprpvnbstl", s2 = "u"
输出:""

解题思路

用滑动窗口去做,遍历 s1 串,如果 s2 到了末尾(p2 == l2),进行回溯寻找起点。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Solution {
public String minWindow(String s1, String s2) {
int l1 = s1.length(), l2 = s2.length();
int p1 = 0, p2 = 0;
int min = l1 + 1;
String res = "";
while (p1 < l1) {
if (s1.charAt(p1) == s2.charAt(p2)) {
p2++;
}
if (p2 == l2) {
int r = p1;
while (p2 > 0) {
if (s1.charAt(p1) == s2.charAt(p2 - 1)) {
p2--;
}
p1--;
}
p1++;
if (r - p1 + 1 < min) {
min = r - p1 + 1;
res = s1.substring(p1, r + 1);
}
}
p1++;
}
return res;
}
}

注意点

  1. 因为先做 p2++,所以末尾的判断条件是 p2 == l2,而不是 p2 == l2 - 1,此时 p1 还没加 1,还在最后一个字符位置。
  2. 进行回溯后,p1 指向的位置是第一个字符的前一个位置,所以要加 1。
  3. 要维护一个 min 变量,判断这个长度是不是最小的,如果是,就动态更新 res 的值。
  4. 因为 p1 的坐标进行了回溯,最后又加 1 了,所以下一次遍历是从 s1 的下一个字符开始的。s1 确实需要进行遍历,因为要找到最小的子串。

leetcode134

加油站

在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。

你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i+1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。

给定两个整数数组 gascost ,如果你可以按顺序绕环路行驶一周,则返回出发时加油站的编号,否则返回 -1 。如果存在解,则 保证 它是 唯一 的。

示例 1

1
2
3
4
5
6
7
8
9
10
输入: gas = [1,2,3,4,5], cost = [3,4,5,1,2]
输出: 3
解释:
从 3 号加油站(索引为 3 处)出发,可获得 4 升汽油。此时油箱有 = 0 + 4 = 4 升汽油
开往 4 号加油站,此时油箱有 4 - 1 + 5 = 8 升汽油
开往 0 号加油站,此时油箱有 8 - 2 + 1 = 7 升汽油
开往 1 号加油站,此时油箱有 7 - 3 + 2 = 6 升汽油
开往 2 号加油站,此时油箱有 6 - 4 + 3 = 5 升汽油
开往 3 号加油站,你需要消耗 5 升汽油,正好足够你返回到 3 号加油站。
因此,3 可为起始索引。

示例 2

1
2
3
4
5
6
7
8
9
输入: gas = [2,3,4], cost = [3,4,3]
输出: -1
解释:
你不能从 0 号或 1 号加油站出发,因为没有足够的汽油可以让你行驶到下一个加油站。
我们从 2 号加油站出发,可以获得 4 升汽油。 此时油箱有 = 0 + 4 = 4 升汽油
开往 0 号加油站,此时油箱有 4 - 3 + 2 = 3 升汽油
开往 1 号加油站,此时油箱有 3 - 3 + 3 = 3 升汽油
你无法返回 2 号加油站,因为返程需要消耗 4 升汽油,但是你的油箱只有 3 升汽油。
因此,无论怎样,你都不可能绕环路行驶一周。

解题思路

这是一道贪心问题,用图的思路去解决,重点是利用数组前缀和,让亏损最严重的点最后走。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Solution {
public int canCompleteCircuit(int[] gas, int[] cost) {
int len = gas.length;
int minIndex = 0;
int minValue = Integer.MAX_VALUE;
int spare = 0;
for (int i = 0; i < len; i++) {
spare += gas[i] - cost[i];
if (spare < minValue) {
minValue = spare;
minIndex = i;
}
}
return spare < 0 ? -1 : (minIndex + 1) % len;
}
}

leetcode673

最长递增子序列问题

题目描述

给定一个未排序的整数数组 nums返回最长递增子序列的个数

注意 这个数列必须是 严格 递增的。

示例 1

1
2
3
输入: [1,3,5,4,7]
输出: 2
解释: 有两个最长递增子序列,分别是 [1, 3, 4, 7] 和[1, 3, 5, 7]。

示例 2

1
2
3
输入: [2,2,2,2,2]
输出: 5
解释: 最长递增子序列的长度是1,并且存在5个子序列的长度为1,因此输出5。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Solution {
public int findNumberOfLIS(int[] nums) {
int len = nums.length;
int[] dp = new int[len];
int[] gp = new int[len];
Arrays.fill(dp, 1);
Arrays.fill(gp, 1);
int max = 1;
for (int i = 1; i < len; i++) {
for (int j = 0; j < i; j++) {
if (nums[j] < nums[i]) {
if (dp[j] + 1 > dp[i]) {
dp[i] = dp[j] + 1;
gp[i] = gp[j];
} else if (dp[j] + 1 == dp[i]) {
gp[i] += gp[j];
}
}
}
max = Math.max(max, dp[i]);
}
int ans = 0;
for (int i = 0; i < len; i++) {
if (dp[i] == max) {
ans += gp[i];
}
}
return ans;
}
}

总结

正常的求最长递增子序列是两层 for 循环、一个 dp 数组,求个数需要一个额外的 gp 数组,记录当下以 i 为结尾的最长子序列的个数,同时还要维护递增子序列最大值,最后遍历 gp 数组求和。