news 2026/6/15 16:54:51

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

一、为什么简单的链式调用不够用

在真实业务里,单靠一个 Prompt 很难处理复杂的流程。开发者通常会把多个 LLM 调用、API 请求和数据清洗步骤串在一起。但一旦逻辑变复杂,这种线性调用就会出问题。

代码里开始出现大量的if-else嵌套,处理异步等待和节点依赖变得非常麻烦。这时候,有向无环图(DAG)是一个更稳妥的选择。它不仅能理清任务顺序,还能让没有依赖关系的节点并行执行,减少等待时间。


二、DAG 调度逻辑与拓扑排序

工作流里的每个步骤就是一个“节点”,节点间的依赖关系就是“边”。要执行这个图,核心是先做环路检测,再通过拓扑排序确定执行顺序。

下面的图展示了一个简单的流程:先清洗输入,然后并行做情感分类和关键词提取,最后汇总生成报告。

graph LR Start([启动]) --> NodeA[清洗输入] NodeA --> NodeB[情感分类] NodeA --> NodeC[关键词提取] NodeB --> NodeD[生成报告] NodeC --> NodeD NodeD --> End([结束]) style NodeB fill:#bbf,stroke:#333,stroke-width:2px style NodeD fill:#bfb,stroke:#333,stroke-width:2px

节点 B 和 C 都依赖 A,但它们之间没关系,所以引擎会让它们同时跑。


三、Node.js 轻量级实现

这是一个基于 JavaScript 的简单 DAG 引擎。它实现了拓扑排序来检查依赖,并支持异步并发执行。

class WorkflowNode { constructor(id, taskFunction) { this.id = id; this.taskFunction = taskFunction; this.dependencies = []; this.status = 'PENDING'; this.result = null; } addDependency(nodeId) { this.dependencies.push(nodeId); } } class DagEngine { constructor() { this.nodes = new Map(); } registerNode(node) { this.nodes.set(node.id, node); } // 拓扑排序:检查环路并决定顺序 resolveExecutionOrder() { const inDegree = new Map(); const adjList = new Map(); const order = []; for (const [id, node] of this.nodes) { inDegree.set(id, 0); adjList.set(id, []); } for (const [id, node] of this.nodes) { for (const depId of node.dependencies) { if (!this.nodes.has(depId)) { throw new Error(`节点 ${id} 依赖的 ${depId} 未注册`); } adjList.get(depId).push(id); inDegree.set(id, inDegree.get(id) + 1); } } const queue = []; for (const [id, degree] of inDegree) { if (degree === 0) queue.push(id); } while (queue.length > 0) { const currId = queue.shift(); order.push(currId); for (const nextId of adjList.get(currId)) { inDegree.set(nextId, inDegree.get(nextId) - 1); if (inDegree.get(nextId) === 0) { queue.push(nextId); } } } if (order.length !== this.nodes.size) { throw new Error("检测到循环依赖,无法执行"); } return order; } // 并发执行 async executeWorkflow(inputContext) { const completedResults = { ...inputContext }; const runningPromises = new Map(); while (true) { let hasPending = false; let progressed = false; for (const [id, node] of this.nodes) { if (node.status === 'COMPLETED' || node.status === 'FAILED') continue; hasPending = true; if (node.status === 'RUNNING') continue; // 检查依赖是否都完成了 const allDepsMet = node.dependencies.every(depId => { const depNode = this.nodes.get(depId); return depNode && depNode.status === 'COMPLETED'; }); if (allDepsMet) { node.status = 'RUNNING'; progressed = true; const promise = (async () => { try { const depData = {}; node.dependencies.forEach(depId => { depData[depId] = this.nodes.get(depId).result; }); node.result = await node.taskFunction(completedResults, depData); node.status = 'COMPLETED'; } catch (error) { node.status = 'FAILED'; throw error; } })(); runningPromises.set(id, promise); } } if (!hasPending) break; if (!progressed && runningPromises.size === 0) { throw new Error("死锁:没有节点能继续执行"); } await Promise.race(runningPromises.values()); for (const [id, promise] of runningPromises) { const node = this.nodes.get(id); if (node.status === 'COMPLETED' || node.status === 'FAILED') { runningPromises.delete(id); } } } const finalOutput = {}; for (const [id, node] of this.nodes) { finalOutput[id] = node.result; } return finalOutput; } } // 测试运行 (async () => { const engine = new DagEngine(); const nodeA = new WorkflowNode('CleanInput', async (context) => { return context.rawText.trim().replace(/[<>]/g, ''); }); const nodeB = new WorkflowNode('LlmClassify', async (context, depData) => { const text = depData.CleanInput; await new Promise(resolve => setTimeout(resolve, 500)); // 模拟 API 延迟 return text.includes("好") ? "POSITIVE" : "NEGATIVE"; }); nodeB.addDependency('CleanInput'); const nodeC = new WorkflowNode('ExtractKeywords', async (context, depData) => { const text = depData.CleanInput; return text.split(' ').filter(word => word.length > 1); }); nodeC.addDependency('CleanInput'); const nodeD = new WorkflowNode('GenerateReport', async (context, depData) => { const sentiment = depData.LlmClassify; const keywords = depData.ExtractKeywords; return `情感: ${sentiment}, 关键词: [${keywords.join(', ')}]`; }); nodeD.addDependency('LlmClassify'); nodeD.addDependency('ExtractKeywords'); engine.registerNode(nodeA); engine.registerNode(nodeB); engine.registerNode(nodeC); engine.registerNode(nodeD); const order = engine.resolveExecutionOrder(); console.log("执行顺序:", order.join(' -> ')); const result = await engine.executeWorkflow({ rawText: " 这个产品设计得非常 好,解决了我的痛点。 " }); console.log("结果:", result); })();

四、生产环境需要考虑的几个问题

上面的代码适合本地或简单场景,如果要上生产环境,还得考虑下面几点:

1. 内存 vs 持久化
内存里的调度很快,但服务器一挂,中间结果就没了。如果工作流跑了几分钟才失败,重头再来很浪费。生产环境通常要用 Redis 或像 Temporal 这样的状态机来存状态,但这会增加网络延迟。

2. 重试策略与成本
大模型 API 经常超时或限流,加重试机制是必须的。但要注意,如果上游节点因为超时一直重试,可能会在短时间内消耗大量 Token。给每个节点设置重试上限和超时时间是必要的。

3. 静态图 vs 动态分支
DAG 在运行前就定好了结构,容易校验。但 LLM 的输出是动态的,有时候需要根据结果决定下一步走哪条路。如果要支持这种动态分支,图的拓扑结构得在运行时变,这会大大增加调试难度。


五、小结

做智能工作流,核心是把杂乱的调用拆成清晰的节点和依赖。用拓扑排序处理并发,不需要复杂的框架,也能让多个模型任务协同工作。对于小团队来说,这种轻量级的方案既能控制成本,也能保证流程跑得通。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/16 16:53:55

D2R Pixel Bot:暗黑破坏神2重制版终极自动化指南

D2R Pixel Bot&#xff1a;暗黑破坏神2重制版终极自动化指南 【免费下载链接】botty D2R Pixel Bot 项目地址: https://gitcode.com/gh_mirrors/bo/botty D2R Pixel Bot是一款专为《暗黑破坏神2重制版》设计的开源自动化工具&#xff0c;通过先进的图像识别和路径规划技…

作者头像 李华
网站建设 2026/6/15 16:53:50

华硕笔记本性能优化新选择:G-Helper轻量级控制工具深度解析

华硕笔记本性能优化新选择&#xff1a;G-Helper轻量级控制工具深度解析 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops with nearly the same functionality. Works with ROG Zephyrus, Flow, TUF, Strix, Scar, ProArt, Vivobook, Zenboo…

作者头像 李华
网站建设 2026/6/15 16:52:54

MPC8544E eTSEC控制器配置指南:从信号解析到寄存器实战

1. 项目概述与eTSEC核心价值 在嵌入式网络开发领域&#xff0c;尤其是基于PowerPC架构的高性能通信处理器平台&#xff0c;网络接口的底层配置与调试往往是项目成败的关键一环。今天&#xff0c;我们就来深入剖析飞思卡尔&#xff08;现恩智浦&#xff09;MPC8544E PowerQUICC …

作者头像 李华
网站建设 2026/6/15 16:52:01

5分钟快速掌握Unity游戏去马赛克:六大智能插件完整指南

5分钟快速掌握Unity游戏去马赛克&#xff1a;六大智能插件完整指南 【免费下载链接】UniversalUnityDemosaics A collection of universal demosaic BepInEx plugins for games made in Unity3D engine 项目地址: https://gitcode.com/gh_mirrors/un/UniversalUnityDemosaics…

作者头像 李华