用户点下停止按钮后,如何让 AI 闭嘴?不增加模型费用,不破坏响应式编程体验。
在 AI 应用开发中,流式(Streaming)输出几乎是标配——它能显著降低首字延迟,让用户像看真人打字一样获得反馈。但随之而来的问题是:如果用户觉得 AI 跑题了、答案太长了,或者纯粹就是不想等了,该如何优雅地中断一个正在进行的流式响应?
本文将结合Spring AI和Project Reactor,给出一个轻量级、纯后端可控的“停止生成”实现方案,并解释其背后的原理与局限。
一、功能与原理
1.1 功能表现
在 AI 生成内容的过程中,用户可以主动打断输出,界面不再显示后续内容,就像这样:
(前端 UI 上出现一个“停止”按钮,点击后流式输出立刻中止)
1.2 核心原理
重要前提:AI 大模型一旦开始生成,服务端无法真正中断模型的计算,调用仍然会产生费用。我们能做的,是在 Flux 数据流层面中断内容的输出——即后端虽然还会收到模型返回的完整内容,但我们可以选择不再将后续数据推送给客户端。
实现思路:维护每个会话(sessionId)的生成状态标志,在 Flux 流处理中通过takeWhile动态判断是否继续往下发送数据。当用户调用停止接口时,我们清除该标志,Flux 流便会自然终止。
基于这个前提,实现思路如下:
为每个会话(
sessionId)维护一个“是否允许继续输出”的标志(存在即允许,删除即禁止)。在 Flux 响应式流中,使用
takeWhile操作符:每次要发送数据前都会检查该标志,一旦标志消失,takeWhile返回false,流立即终止。当用户点击“停止”时,后端调用
/stop接口,删除对应会话的标志。下一次流内检查时便会自动结束,且前端会收到一个自定义的STOP事件。
这样既实现了即时停止的交互,又避免了对原有流式逻辑的大规模侵入。
二、接口设计
需要一个参数sessionId,用于标识要停止的对话会话。
POST /stop?sessionId=xxx三、代码实现
3.1 Controller 层
@PostMapping("/stop") public void stop(@RequestParam("sessionId") String sessionId) { this.chatService.stop(sessionId); }3.2 Service 接口
/** * 停止生成 * @param sessionId 会话id */ void stop(String sessionId);3.3 Service 实现(核心)
使用ConcurrentHashMap暂存每个会话的生成状态(生产环境建议改用 Redis 以支持分布式)
@Slf4j @Service @RequiredArgsConstructor public class ChatServiceImpl implements ChatService { private final ChatClient chatClient; private final SystemPromptConfig systemPromptConfig; // 存储每个会话的生成状态,true 表示允许继续输出 private static final Map<String, Boolean> GENERATE_STATUS = new ConcurrentHashMap<>(); @Override public Flux<ChatEventVO> chat(String question, String sessionId) { return this.chatClient.prompt() .system(promptSystem -> promptSystem .text(this.systemPromptConfig.getChatSystemMessage().get()) .param("now", DateUtil.now()) ) .user(question) .stream() .chatResponse() .doFirst(() -> GENERATE_STATUS.put(sessionId, true)) // 开始生成时置为 true .doOnError(throwable -> GENERATE_STATUS.remove(sessionId)) .doOnComplete(() -> GENERATE_STATUS.remove(sessionId)) .takeWhile(response -> GENERATE_STATUS.getOrDefault(sessionId, false)) // 关键:控制流是否继续 .map(chatResponse -> { String text = chatResponse.getResult().getOutput().getText(); return ChatEventVO.builder() .eventData(text) .eventType(ChatEventTypeEnum.DATA.getValue()) .build(); }) .concatWith(Flux.just(ChatEventVO.builder() .eventType(ChatEventTypeEnum.STOP.getValue()) .build())); } @Override public void stop(String sessionId) { GENERATE_STATUS.remove(sessionId); // 移除标志,takeWhile 将返回 false } }3.4 关键点说明
doFirst:在 Flux 开始订阅时设置状态为true。takeWhile:每次发送数据前检查状态,若为false则立即终止流。stop方法:只需从 Map 中删除对应的 key,下一次takeWhile判断时便会中断。异常与完成回调:无论正常结束或异常,都要清理状态,避免内存泄漏。
| 关键点 | 作用与注意事项 |
|---|---|
ConcurrentHashMap | 线程安全,支持多会话并发。但不适用于分布式多实例部署,此时应换用 Redis。 |
doFirst | 在订阅时(Flux 实际执行前)设置标志,确保takeWhile首次检查时值为true。 |
takeWhile | 重中之重。每次下游请求一个数据时,都会执行其谓词函数。当谓词返回false时,流立即完成(发送onComplete信号),不再拉取上游数据。 |
| 异常/完成清理 | 无论正常结束还是异常,都必须移除标志,避免内存泄漏。 |
concatWith追加 STOP 事件 | 让前端能明确知道“流是因为停止而结束的”,而不是网络中断。前端可以据此隐藏停止按钮、提示“已停止”。 |
四、测试验证
4.1 测试准备
启动 Spring AI 应用,确保已接入一个真实的大模型(如 OpenAI、Ollama 等)。
前端使用 EventSource 或 fetch + ReadableStream 消费
/chat/stream接口。
4.2 测试步骤
正常对话:发送一个需要较长时间生成的问题,例如“请详细介绍 Reactor 编程模型”。观察前端持续打印 Token。
中途停止:在生成还未结束时,调用
POST /stop?sessionId=xxx(sessionId与对话请求相同)。观察现象:
前端:立即停止收到新的
DATA事件,并收到一个STOP事件。界面上的“停止”按钮消失或变为可用状态。后端日志:
用户主动停止生成,sessionId: xxx被打印,同时该会话的生成标志被删除。模型侧:大模型依然会继续生成剩余内容(可在模型提供商的控制台看到调用时长和 Token 消耗),但后端不会再推送给客户端。
再次对话:使用同一个
sessionId发起新对话,应该能正常流式输出不受影响。
4.3 预期结果截图(可自行补充)
(建议贴一张前端停止前后的对比图,或后端日志截图)
✅ 验证通过:用户点击停止后,前端立即中止接收,后端不再发送数据,且不影响其他会话。
五、总结与扩展
| 优点 | 注意事项 |
|---|---|
| 实现简单,基于 Reactor 原生操作符 | 不能节省 AI 调用费用,模型仍会生成完 |
| 对原有流式逻辑侵入小 | 分布式环境需替换 Map 为 Redis |
| 支持多会话并发 | 前端需配合处理STOP事件类型 |
扩展建议:
支持“停止后续重放”
用户停止后,如果想从断点继续看?可以额外保存已生成内容的片段 ID,提供“继续生成”接口,但实现会复杂很多(需模型支持分段续写)。结合前端心跳
如果用户的网络断开,后端的GENERATE_STATUS会一直残留。可以结合 WebSocket 的心跳或前端 unload 事件,在会话结束时调用/stop或设置过期时间。使用 Redis 实现分布式控制
将GENERATE_STATUS改为RedisTemplate<String, Boolean>,在stop()中执行redisTemplate.delete(sessionId),在takeWhile中通过redisTemplate.hasKey(sessionId)判断。注意每次判断都会产生网络开销,可考虑本地缓存 + 发布订阅优化。增加超时自动停止
某些模型生成时间过长(如超过 60 秒),可以在doFirst时注册一个延迟任务,超时后自动执行stop(sessionId),避免无限等待。日志与监控
建议将stop日志单独统计,监控“用户主动停止率”。如果某个问题经常被停止,说明模型回答质量或长度可能有问题,需要优化 Prompt 或调整maxTokens参数。
通过这种“流级中断”的方式,我们以极低的代价提升了 AI 应用的交互体验,用户不再被冗长的生成过程困扰。