news 2026/5/3 8:12:33

Microsoft Agent Framework - Workflow 示例 — Checkpoint 与状态恢复

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Microsoft Agent Framework - Workflow 示例 — Checkpoint 与状态恢复

本篇包含三个渐进式示例,围绕同一个猜数字游戏展示 Workflow 的检查点(Checkpoint)机制:

示例

定位

核心点

CheckpointAndResume

同一 Run 实例上恢复

RestoreCheckpointAsync
CheckpointAndRehydrate

新实例"注水"恢复

ResumeStreamingAsync
CheckpointWithHumanInTheLoop

Checkpoint + RequestPort

交互节点上的检查点


之前写了一篇文章,分享了我基于 Microsoft Agent Framework 打造的开源项目 Inkwell 的实战经验:

用 Microsoft Agent Framework 造一个 AI 内容工厂:一次 Harness Engineering 实战

开源项目:https://github.com/shuaihuadu/inkwell

Microsoft Agent Framework 官方项目地址:https://github.com/microsoft/agent-framework

Microsoft Agent Framework 正在定义下一代 AI 应用的标准——你的技术栈该刷新了!我建了个交流群,欢迎加入,一起学习,不掉队。


猜数字游戏逻辑

三个示例共享同一套猜数字游戏逻辑,理解它是后续分析检查点机制的前提。

游戏规则

Workflow 由两个 Executor 组成一个反馈循环:

  • GuessNumberExecutor(猜测者):维护一个搜索范围[LowerBound, UpperBound],初始为[1, 100],每次取中间值(L+U)/2作为猜测

  • JudgeExecutor(裁判):持有目标数字42,判断猜测值并回复Above(猜大了)、Below(猜小了)或直接输出结果(猜对了)

两者通过NumberSignal枚举通信:Init(开始猜)、AboveBelow

二分查找执行过程

GuessNumberExecutor的核心逻辑:收到反馈后先调整边界,再用新边界计算下一次猜测

case NumberSignal.Above: this.UpperBound = this.NextGuess - 1; // 用旧边界的 NextGuess 调整上界 await context.SendMessageAsync(this.NextGuess, ...); // 用新边界算猜测

完整的 7 轮执行追踪:

轮次

信号

执行前 L/U

边界调整

调整后 L/U

猜测

1

Init

1/100

1/100

50

2

Above

1/100

U=50-1=49

1/49

25

3

Below

1/49

L=25+1=26

26/49

37

4

Below

26/49

L=37+1=38

38/49

43

5

Above

38/49

U=43-1=42

38/42

40

6

Below

38/42

L=40+1=41

41/42

41

7

Below

41/42

L=41+1=42

42/42

42 ✅

范围[1, 100]二分查找最多 ⌈log₂100⌉ = 7 次,目标值 42 恰好走满 7 步。两个 Executor 交替执行,每次执行各占一个 Super Step,因此整个游戏会产生14 个 Super Step(GuessNumberExecutor 执行 7 次 + JudgeExecutor 执行 7 次,其中 6 次反馈 + 1 次输出结果)。

示例 3 的变体

第三个示例(CheckpointWithHumanInTheLoop)中,GuessNumberExecutor被替换为RequestPort,由用户手动输入猜测值而非自动二分。信号类型也从NumberSignal枚举升级为SignalWithNumber类,携带上次猜测值以提示用户。


核心概念

在分析具体示例之前,需要理解三个关键概念:

  • Super Step:Workflow 的执行划分为多个阶段,每个阶段运行一批 Executor 并等待它们完成。一个 Super Step 结束时就是一个天然的检查点时机

  • Checkpoint:在 Super Step 结束时自动保存 Workflow 的完整状态快照,包括所有 Executor 的内部状态和共享状态

  • CheckpointManager:管理检查点的序列化/反序列化。CheckpointManager.Default提供内存中的默认实现


示例 1:CheckpointAndResume——同实例恢复

示例实现的功能

在猜数字 Workflow 运行过程中自动创建检查点,运行完成后从之前的某个检查点在同一个 Run 实例上恢复状态并重新执行。

代码完整解析

启用检查点
var checkpointManager = CheckpointManager.Default; await using StreamingRun checkpointedRun = await InProcessExecution .RunStreamingAsync(workflow, NumberSignal.Init, checkpointManager);

第三个参数checkpointManager让框架在每个 Super Step 结束时自动生成CheckpointInfo

收集检查点
case SuperStepCompletedEvent superStepCompletedEvt: CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint; if (checkpoint is not null) { checkpoints.Add(checkpoint); }

通过SuperStepCompletedEvent事件获取每个 Super Step 的检查点信息。

恢复执行
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { ... }

RestoreCheckpointAsync同一个 Run 实例上恢复状态。恢复后再次调用WatchStreamAsync从恢复点继续执行。

Executor 的状态保存/恢复
// 保存状态 protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, ...) => context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), ...); // 恢复状态 protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, ...) => (LowerBound, UpperBound) = await context.ReadStateAsync<(int, int)>(StateKey, ...);

每个有状态的 Executor 需要覆写两个方法:

  • OnCheckpointingAsync:在 Super Step 结束时被框架调用,通过QueueStateUpdateAsync将状态排队写入检查点

  • OnCheckpointRestoredAsync:恢复检查点时被框架调用,通过ReadStateAsync从检查点中读回状态


示例 2:CheckpointAndRehydrate——新实例重建

示例实现的功能

与示例 1 的区别在于,不在同一个 Run 上恢复,而是创建一个全新的 Workflow 实例并从检查点"注水"(Rehydrate),模拟进程重启后的恢复场景。

代码完整解析

恢复到新实例
var newWorkflow = WorkflowFactory.BuildWorkflow(); // 全新 Workflow 实例 await using StreamingRun newCheckpointedRun = await InProcessExecution.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager);

关键 API 是InProcessExecution.ResumeStreamingAsync(而非RestoreCheckpointAsync):

  • 接受一个全新的Workflow实例

  • savedCheckpoint恢复状态

  • 返回新的StreamingRun实例

这模拟了真实的持久化恢复场景——应用崩溃后重建 Workflow 并从保存的检查点继续。

与示例 1 的核心区别

特性

CheckpointAndResume

CheckpointAndRehydrate

恢复目标

同一个 Run 实例

全新 Workflow + 新 Run

API

run.RestoreCheckpointAsyncInProcessExecution.ResumeStreamingAsync

应用场景

运行中的回退/重试

进程重启后的恢复

Executor 实例

复用原有实例

全新实例


示例 3:CheckpointWithHumanInTheLoop——检查点 + 人机交互

示例实现的功能

将 Checkpoint 与 RequestPort(HumanInTheLoop)结合,展示更复杂的场景:每次等待用户输入时都会产生检查点,恢复后可以从任意交互点重新开始。

代码完整解析

信号类型升级
internal sealed class SignalWithNumber { public NumberSignal Signal { get; } public int? Number { get; } }

从简单的NumberSignal枚举升级为包含当前猜测值的复合类型,让用户知道上次猜了什么数字。

每次交互产生两个检查点

每个 RequestPort 交互循环经历两个 Super Step:

  1. RequestPort 发出RequestInfoEvent,Super Step 结束 →检查点 1(等待输入的状态)

  2. 收到响应后 JudgeExecutor 处理完成,Super Step 结束 →检查点 2(处理完成的状态)

因此如果猜了 3 次,会产生约 6 个检查点。

恢复交互
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { switch (evt) { case RequestInfoEvent requestInputEvt: ExternalResponse response = HandleExternalRequest(requestInputEvt.Request); await checkpointedRun.SendResponseAsync(response); break; // 其他事件处理同首次运行... } }

恢复后的处理逻辑与首次运行完全一致,体现了 Checkpoint + HumanInTheLoop 的无缝结合。


深入剖析

检查点的存储粒度

检查点不是保存整个 Workflow 对象的序列化,而是保存:

  1. 每个 Executor 通过OnCheckpointingAsync主动写入的状态

  2. 边的状态数据(EdgeStateData),记录各条边上的消息传递状态

  3. 共享状态(Shared State)中的全部数据

  4. Workflow 引擎的执行位置信息(当前 Super Step 编号、Runner 状态等)

这个设计要求开发者显式声明需要持久化的状态。没有覆写OnCheckpointingAsync的 Executor,恢复后将以初始状态开始。

OnCheckpointingAsync 的设计哲学

框架不自动序列化 Executor 的全部字段,而是让开发者手动选择要持久化的状态。这看似麻烦,但有明确的好处:

  1. 避免序列化问题:Executor 可能持有不可序列化的资源(如 IChatClient、文件句柄)

  2. 状态精简:只持久化恢复所必需的最小状态,减少检查点大小

  3. 版本兼容:Executor 代码升级后只要 state key 和数据格式一致,旧检查点仍可恢复

Super Step 与检查点的关系

Input → [Super Step 1: Executor A 执行] → Checkpoint 1 → [Super Step 2: Executor B 执行] → Checkpoint 2 → [Super Step 3: Executor A 再次执行] → Checkpoint 3 → ...

每个 Super Step 的边界就是一个一致性快照点。在循环 Workflow 中,每次循环迭代(A → B → A)可能跨越多个 Super Step,每个 Step 结束都生成检查点。

CheckpointManager 的扩展性

CheckpointManager.Default是内存实现,等价于CheckpointManager.CreateInMemory()。要接入自定义存储后端,使用:

CheckpointManager.CreateJson(ICheckpointStore<JsonElement> store)

开发者只需实现ICheckpointStore<JsonElement>接口,即可将检查点持久化到任意后端(Redis、Azure Blob Storage、数据库等)。MAF 内置了FileSystemJsonCheckpointStoreJsonCheckpointStore作为参考实现。

这与 MAF 的 Durable Agent 架构互补——Durable Agent 通过 DurableTask 框架实现更完整的持久化,Checkpoint 则提供了轻量级的本地快照能力。

应用场景

  • 长流程恢复:运行时长的 Workflow 崩溃后从最后检查点恢复,避免从头运行

  • 状态机分支探查:一个检查点多次恢复,比较不同后续分支的表现

  • HITL 中断保抜:用户长时间未响应时保存现场,下次恢复就像从未中断

  • 发布协调:代码升级后用旬heckpoint测试兼容性

  • 调试与排错:定位问题 Executor 后从前一个检查点重跑,缩小复现范围

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/3 8:04:28

深度学习KV缓存优化:OxyGen架构设计与性能提升

1. 项目背景与核心价值在深度学习推理场景中&#xff0c;KV缓存&#xff08;Key-Value Cache&#xff09;管理已成为影响系统性能的关键瓶颈。当模型需要处理多任务并行请求时&#xff0c;传统的静态内存分配方式会导致两大典型问题&#xff1a;一方面&#xff0c;预分配固定大…

作者头像 李华
网站建设 2026/5/3 7:57:43

【仅限内部技术团队流通】C语言Modbus扩展安全加固手册:防重放攻击、非法寄存器写入、缓冲区溢出三重防御(含静态分析报告)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Modbus协议安全威胁全景图与加固必要性分析 Modbus 作为工业控制领域最广泛部署的通信协议之一&#xff0c;其设计初衷聚焦于简单性与实时性&#xff0c;而非安全性。原始 Modbus/TCP 协议缺乏身份认证…

作者头像 李华
网站建设 2026/5/3 7:53:33

AI智能体如何赋能星际探索:从RAG到工具调用的技术架构解析

1. 项目概述&#xff1a;当星际探索遇上AI代理最近在GitHub上看到一个挺有意思的项目&#xff0c;叫“GPTARS_Interstellar”。光看名字&#xff0c;就透着一股科幻和硬核技术混合的味道。GPTARS&#xff0c;这名字拆开看&#xff0c;GPT大家都很熟了&#xff0c;是那个强大的语…

作者头像 李华
网站建设 2026/5/3 7:52:00

Hyprland窗口摇晃截图插件:手势交互提升Linux桌面效率

1. 项目概述与核心价值最近在折腾 Hyprland 窗口管理器&#xff0c;发现一个痛点&#xff1a;当我想快速截取某个窗口或者某个区域的屏幕内容时&#xff0c;总是需要先呼出截图工具&#xff0c;再手动选择窗口或区域&#xff0c;步骤略显繁琐。直到我发现了ddVital/hyprshake这…

作者头像 李华