news 2026/6/9 20:15:54

RabbitMQ投递回调机制以及策略业务补偿

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RabbitMQ投递回调机制以及策略业务补偿

在 RabbitMQ 中,生产者发送消息后,有可能遇到以下几种情况:

消息成功投递到交换机(Exchange)

消息未能成功投递到交换机(Exchange)

消息成功进入交换机但无法路由到队列(Queue)

如果生产者端没有回调确认机制,就可能出现严重的数据不一致:

举例: Redis 已经增加点赞数,但消息并未真正进入 MQ,数据库后续也无法更新,就出现了 “缓存超前、数据库缺失” 的问题。

为了解决这种问题,Spring AMQP 提供了:

RabbitTemplate.setConfirmCallback()

RabbitTemplate.setReturnsCallback()

来捕获和处理消息投递的成功与失败。

但是在复杂系统中,不同的业务消息(例如“下单”、“扣库存”、“发积分”)在投递失败时,需要采取不同的补偿逻辑。

弊端:如果你只写一份大而全的回调逻辑,代码就会充满大量的 if else 判断,非常难维护。

二、策略模式思想引入

策略模式的核心思想是:定义一系列算法(或行为),让它们可以相互替换,且算法的变化不会影响使用算法的客户。

“算法” ≈ “不同的消息回调处理逻辑”

“客户” ≈ “RabbitTemplate 的 ConfirmCallback 回调”

操作:通过(根据业务抽象)接口 + Map 注入,在运行时动态选择。

代码实现

1、定义统一的回调处理接口

public interface ConfirmCallbackService {

/**

* 投递失败后的回调处理

* @param message 投递的消息对象

*/

void confirmCallback(Message message);

}

例:定义点赞案例的实现类(可选):

public class LikeConfirmCallback implements ConfirmCallbackService{

/**

* 注入RedisTemplate

*/

private final RedisTemplate<String,Integer> redisTemplate;

/**

* 执行失败后的反向操作

* @param message 投递的消息对象

*/

@Override

public void confirmCallback(Message message) {

byte[] bytes = message.getBody();

//反向序列化为LikeDTO对象

try {

LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);

if(dto.getLikeStatus()){

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(), dto.getUid());

}else{

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(),dto.getUid());

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

}

小技巧:

可选不单独定义类,而是让业务层本身实现ConfirmCallbackService接口,简化书写操作

分离成策略类则更利于模块化、解耦和扩展。

2、回调上下文: 策略分发器

@Component

@RequiredArgsConstructor

@Slf4j

public class ConfirmCallbackContext {

/**

* 注入RabbitTemplate

*/

private final RabbitTemplate rabbitTemplate;

/**

* 注入所有ConfirmCallbackService的实现类

* 在不同的业务场景调用不同的实现来处理投递失败的业务逻辑

*/

private final Map<String,ConfirmCallbackService> confirmCallbackServiceMap;

/**

* 统一调用回调处理

* 在容器初始化就执行这个方法

*/

@PostConstruct

public void confirmCallback(){

rabbitTemplate.setConfirmCallback((cdata,ack,cause)->{

ReturnedMessage returnedMessage = cdata.getReturned();

if(ack){

log.info("The message was delivered to the{}",returnedMessage);

}else{

//获取业务实现的bean的id

String beanName = returnedMessage.getReplyText();

//根据bean的名称从map中获取相应的实现类

ConfirmCallbackService callbackService = confirmCallbackServiceMap.get(beanName);

callbackService.confirmCallback(returnedMessage.getMessage());

}

});

}

}

核心原理:

Spring Boot 会自动扫描所有实现 ConfirmCallbackService 的 Bean

Bean 名称作为 key,Bean 实例作为 value 注入到 Map<String, ConfirmCallbackService>

ConfirmCallbackContext 根据 replyText 动态找到对应的策略实现类

3.消息发送端封装

@Component

@RequiredArgsConstructor

public class RabbitManager<T> {

private final RabbitTemplate rabbitTemplate;

public void send(String exchange,String routingKey,

String callbackBeanName,T data){

try {

//创建cdata对象并设置一个id

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//将投递的数据转换为byte[]

byte[] bytes = new ObjectMapper().writeValueAsBytes(data);

//将bytes封装为Message对象

Message message = new Message(bytes);

//创建一个投递失败时返回的消息对象

ReturnedMessage returnedMessage = new ReturnedMessage(message, 0,

callbackBeanName, exchange,routingKey);

//将ReturnedMesssage保存到cdata中

correlationData.setReturned(returnedMessage);

//发送

rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);

} catch (Exception e) {

throw new RuntimeException(e);

}

}

}

** 关键点:**

callbackBeanName 会被放进 replyText 中,作为“回调策略的指针”。

4.点赞业务逻辑方法

4.1简化写法

@Override

public LikeDTO likeEssay(Integer uid, Integer eid) {

boolean likeStatus = false;

//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞

if(isLike(eid, uid)) {

//将用户ID从set集合中移除

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);

} else {

likeStatus = true;

//将用户ID添加到set集合中

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);

}

//获取当前帖子在redis中的点赞总数

Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);

//创建LikeDTO封装修改的数据并发布到消息队列

LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);

//发送到mq异步更新到数据库

rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY,

"likeServiceImpl", likeDTO);

return likeDTO;

}

/**

* 消息投递失败后的处理

* @param message 失败后返回的消息

*/

@Override

public void confirmCallback(Message message) {

byte[] bytes = message.getBody();

try {

//反序列化为LikeDTO对象

LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);

//执行反向操作

if(dto.getLikeStatus()) {

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());

} else {

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());

}

} catch (IOException e) {

throw new RuntimeException(e);

}

}

4.2 有业务实现类时

````

public LikeDTO likeEssay(Integer uid, Integer eid) {

boolean likeStatus = false;

//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞

if(isLike(uid,eid)){

//取消点赞

redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());

likeMapper.deleteLike(eid,uid);

}else{

likeStatus = true;

//将用户ID添加到set集合中

redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());

}

//获取当前帖子在redis中的点赞总数

Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);

//创建LikeDTO封装修改的数据并发布到消息队列

LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);

//发送到mq异步更新到数据库

rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY,

"likeConfirmCallbackService",likeDTO);

return likeDTO;

}

最终目标:当点赞消息从生产者发送到 RabbitMQ 时,一旦投递失败,系统能自动执行反向补偿逻辑,确保 Redis 与数据库的一致性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 12:18:14

阐述cnn模型识别图像中的对象的流程

卷积神经网络&#xff08;CNN&#xff09;识别图像中对象的核心逻辑是 **“从原始像素逐层抽象特征&#xff0c;再通过特征映射与概率输出完成识别”**&#xff0c;整个流程遵循“数据预处理→特征提取→特征压缩→分类/定位输出”的递进逻辑&#xff0c;每个环节环环相扣&…

作者头像 李华
网站建设 2026/6/9 21:28:39

TTPLA数据集:让AI成为电力巡检的智能守护者

TTPLA数据集&#xff1a;让AI成为电力巡检的智能守护者 【免费下载链接】ttpla_dataset aerial images dataset on transmission towers and power lines 项目地址: https://gitcode.com/gh_mirrors/tt/ttpla_dataset 还在为电力巡检的效率和安全性发愁吗&#xff1f;&…

作者头像 李华
网站建设 2026/6/10 17:20:38

AI时代年轻人的第二职业路径:从个人辅助到业务级落地

一、为什么说 AI 正在创造新的职业窗口&#xff1f;随着 AI 能力从工具化走向业务深度整合&#xff0c;年轻人获得了一种全新的身份&#xff1a;能把 AI 变成生产力的人&#xff0c;就是新的稀缺资源。在技术社区看来&#xff0c;这不只是风口&#xff0c;而是开发者与非开发者…

作者头像 李华
网站建设 2026/6/10 13:29:07

GPT-5.2 API 太慢?Python 实现异步视频预处理加速实战

昨天凌晨 OpenAI 发布 GPT-5.2 后&#xff0c;我也第一时间申请了 API 权限进行测试。新模型的推理能力确实惊人&#xff0c;但在处理视频流时&#xff0c;我遇到了一个严重的工程瓶颈&#xff1a;直接调用 Vision API 上传 4K 视频&#xff0c;首字生成时间 (TTFT) 经常超过 4…

作者头像 李华
网站建设 2026/6/10 0:56:57

智能医疗 | BUFNet:让脑肿瘤 MRI 分割更可靠的一次重要突破,一文看懂“边界感知 + 不确定性驱动”的多模态融合网络

BUFNet: Boundary-aware and uncertainty-driven multi-modal fusionnetwork for MR brain tumor segmentation 脑肿瘤自动分割一直是医学影像领域公认的“硬骨头”。尽管深度学习已经在 MRI 分割任务中取得了长足进展,但在真实临床场景中,模型依然面临两个致命挑战: 肿瘤边…

作者头像 李华
网站建设 2026/6/10 15:35:13

Zotero-reference插件:让学术写作中的文献管理效率翻倍

Zotero-reference插件&#xff1a;让学术写作中的文献管理效率翻倍 【免费下载链接】zotero-reference PDF references add-on for Zotero. 项目地址: https://gitcode.com/gh_mirrors/zo/zotero-reference 还在为学术论文写作中繁琐的参考文献格式而头疼吗&#xff1f;…

作者头像 李华