news 2026/5/9 19:28:48

Agent Framework 定义流程节点以及节点的流式输出

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Agent Framework 定义流程节点以及节点的流式输出

今天我们开启Agent Framework Workflow系列,当我们在构建 AI Agent 或多步骤自动化系统时,工作通常不是一步完成的。而是一个任务可能会被拆分成多个独立步骤,每个步骤单独处理的过程。

在 Agent Framework 的 workflow 中,执行器(Executor)是工作流里的基本处理单元。它接收输入,执行一段具体逻辑,然后输出结果给下一个执行器。

比如,一个任务可以通过工作流编排进行串联,在执行过程中依次完成输入预处理、模型推理以及结果后处理等阶段。

如何创建执行器

我们定义两个 executor。

第一个是UppercaseExecutor,负责把输入字符串转换成大写:

internal sealed class UppercaseExecutor() : Executor<string, string>("UppercaseExecutor") { public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) => ValueTask.FromResult(message.ToUpperInvariant()); }

它继承自:

Executor

这表示该执行器接收一个string类型输入,并返回一个string类型结果。

构造函数中的"UppercaseExecutor"是执行器 ID,在后续流式事件输出中可以用来识别是哪一个执行器完成了处理。

第二个执行器是ReverseTextExecutor,负责反转字符串:

internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor") { public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) { return ValueTask.FromResult(string.Concat(message.Reverse())); } }

如果输入是:

HELLO, WORLD!

那么它会返回:

!DLROW ,OLLEH

如何构建工作流

当我们有了执行器之后,就可以使用WorkflowBuilder将它们连接起来。

示例中的核心代码如下:

UppercaseExecutor uppercase = new(); ReverseTextExecutor reverse = new(); WorkflowBuilder builder = new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); var workflow = builder.Build();

这里主要做了几件事:

完成了工作流的构建:首先创建 UppercaseExecutor 和 ReverseTextExecutor 两个执行器实例,分别保存为 uppercase 和 reverse;然后以 uppercase 作为工作流起点,在 uppercase 和 reverse 之间建立执行顺序,使 UppercaseExecutor 的输出作为 ReverseTextExecutor 的输入;最后指定 reverse 的输出作为整个 workflow 的最终结果,并构建出 workflow 实例。

其中:

builder.AddEdge(uppercase, reverse)

表示上游执行器uppercase的输出会传递给下游执行器reverse

而:

.WithOutputFrom(reverse)

表示reverse执行器的结果会作为工作流的输出。

工作流的流式输出

在Agent Framework Workflow中,工作流不是等待全部执行完之后才返回,而是使用的是流式输出:

我们接下来看一下流式工作流的原理:

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "Hello, World!"); awaitforeach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}"); } elseif (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred."); Console.ResetColor(); } elseif (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}."); Console.ResetColor(); } }

这里的使用了RunStreamingAsync。它不会只返回最终结果,而是返回一个StreamingRun对象。通过这个对象,可以监听工作流执行过程中的事件流。

接下来使用:

await foreach (WorkflowEvent evt in run.WatchStreamAsync())

逐个读取事件。这里是最最核心的地方。

监听执行器完成事件

当某个执行器完成处理后,会产生ExecutorCompletedEvent

示例代码中这样处理:

if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}"); }

这意味着每当一个 executor 完成时,程序都会打印:

  • 执行器 ID

  • 当前执行器输出的数据

运行后,可以看到如下输出:

我们不需要等到整个工作流全部结束,才能知道中间发生了什么。每一步完成后,都可以立即拿到对应事件。

处理工作流错误

除了正常完成事件,示例中还处理了两类错误事件。

第一类是WorkflowErrorEvent

else if (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine( workflowError.Exception?.ToString() ?? "Unknown workflow error occurred."); Console.ResetColor(); }

这类事件表示整个工作流层面发生了错误。

第二类是ExecutorFailedEvent

else if (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine( $"Executor '{executorFailed.ExecutorId}' failed with " + $"{(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}."); Console.ResetColor(); }

这类事件表示某个具体 executor 执行失败。

相比WorkflowErrorEventExecutorFailedEvent更具体,因为它包含了失败的 executor ID。对于复杂工作流来说,这对于排查问题非常有帮助。

小结

这一节我们介绍了Agent Framework Workflow的一些基础感念:

  • 如何定义执行器(Executor)

  • 如何使用 WorkflowBuilder 构建工作流

  • 如何使用 RunStreamingAsync 启动流式工作流

  • 如何使用 WatchStreamAsync 监听工作流事件

  • 如何区分处理执行器完成事件、执行器失败事件和工作流错误事件

源代码地址

https://github.com/bingbing-gui/dotnet-agent-playbook/tree/master/src/ai-agent/Agent-Framework/31-WorkFlow-Executor

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/9 19:25:59

构建AI绘画聊天机器人:本地部署ComfyUI与聊天平台的无缝集成方案

1. 项目概述&#xff1a;在聊天中调用你的私人AI画室如果你和我一样&#xff0c;既享受在Discord、Telegram或者iMessage里和朋友闲聊的轻松&#xff0c;又痴迷于在本地部署的ComfyUI里折腾各种AI生图、换脸、风格迁移&#xff0c;那你肯定也烦透了在两个世界间反复横跳的割裂感…

作者头像 李华
网站建设 2026/5/9 19:23:50

CANN稀疏张量合并算子

aclnnCoalesceSparse 【免费下载链接】ops-math 本项目是CANN提供的数学类基础计算算子库&#xff0c;实现网络在NPU上加速计算。 项目地址: https://gitcode.com/cann/ops-math &#x1f4c4; 查看源码 产品支持情况 产品是否支持Ascend 950PR/Ascend 950DTAtlas A3 …

作者头像 李华
网站建设 2026/5/9 19:21:33

告别懵圈!一文搞懂TJA1043/TJA1145的休眠唤醒实战(附流程图与代码思路)

嵌入式CAN收发器休眠唤醒机制深度解析与实战指南 在汽车电子和工业控制领域&#xff0c;CAN总线作为可靠的通信标准&#xff0c;其低功耗设计一直是工程师关注的焦点。TJA1043和TJA1145作为NXP旗下的主流CAN收发器&#xff0c;凭借出色的休眠唤醒功能被广泛应用于各类嵌入式系统…

作者头像 李华
网站建设 2026/5/9 19:16:47

大模型参数规模与性能的非线性关系:从Scaling Law到效率优化

1. 项目概述&#xff1a;从“大力出奇迹”到“精打细算”的范式转变几年前&#xff0c;当GPT-3以1750亿参数的庞大体量横空出世&#xff0c;展现出前所未有的通用对话能力时&#xff0c;整个行业仿佛找到了一个“万能公式”&#xff1a;堆参数&#xff0c;就能解锁智能。一时间…

作者头像 李华
网站建设 2026/5/9 19:16:41

科学AI安全实践:从风险识别到SciGuard框架构建

1. 科学AI的双刃剑&#xff1a;机遇背后的伦理与安全隐忧在化学实验室里&#xff0c;研究员小王正为一个新药分子的合成路线发愁。传统方法需要查阅大量文献、反复试错&#xff0c;耗时数月。他尝试使用一个基于AI的逆合成分析工具&#xff0c;输入目标分子的SMILES字符串&…

作者头像 李华