news 2026/5/8 15:53:50

RocketMQ封装通用消息队列生产者

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RocketMQ封装通用消息队列生产者

RocketMQ封装通用消息队列生产者

在分布式系统开发中,RocketMQ作为高性能的消息中间件被广泛使用。为了简化业务层使用、统一消息发送规范、屏蔽底层RocketMQ细节,本文将手把手带你封装一个通用的消息队列生产者接口,实现普通同步消息事务消息的统一管理,支持业务key、日志标识、事务回查等核心能力,直接可落地到生产环境!

一、封装背景与设计思路

1. 为什么要封装?

  1. 解耦底层依赖:业务代码不直接依赖RocketMQTemplate,后续切换消息中间件只需修改适配器,无需改动业务逻辑;
  2. 统一规范:强制要求业务携带topickeys(幂等)、bizDesc(日志标识),避免日志混乱、消息幂等性无法保证;
  3. 简化使用:封装事务消息的复杂流程,业务层只需关注本地事务逻辑,无需关心Half消息、事务提交/回滚细节;
  4. 统一日志:标准化生产者日志,方便问题排查;
  5. 事务安全:完整实现事务回查机制,保证分布式事务最终一致性。

2. 核心设计

  • 接口层:定义MessageQueueProducer通用接口,抽象普通消息、事务消息发送方法;
  • 适配层RocketMQProducerAdapter实现接口,对接RocketMQTemplate,处理消息包装、异常、日志;
  • 事务增强DelegatingTransactionListener统一托管本地事务+事务回查,保证分布式事务可靠性。

二、核心代码实现

1. 消息队列生产者接口(MessageQueueProducer)

首先定义通用接口,规范所有消息生产者的行为,不绑定任何中间件,具备极高扩展性。

/** * 消息队列生产者接口 * 通用抽象,适配各类MQ中间件 * @author 你的ID */publicinterfaceMessageQueueProducer{/** * 发送普通同步消息 * * @param topic 目标topic * @param keys 业务key,可用于幂等判断 * @param bizDesc 业务描述,用于日志标识 * @param body 业务载荷 * @return RocketMQ 发送结果,包含 msgId、sendStatus 等信息 */SendResultsend(Stringtopic,Stringkeys,StringbizDesc,Objectbody);/** * 发送事务消息 * <p> * 流程:发送 half 消息 → 执行本地事务 → 根据结果 commit/rollback * <p> * 事务回查由按 topic 注册的 {@link TransactionChecker} 处理,需提前通过 * {@link DelegatingTransactionListener#registerChecker(String, TransactionChecker)} 注册 * * @param topic 目标 topic * @param keys 业务 key * @param bizDesc 业务描述 * @param body 业务载荷 * @param localTransaction 本地事务逻辑,在 half 消息发送成功后执行;抛异常则回滚消息 */voidsendInTransaction(Stringtopic,Stringkeys,StringbizDesc,Objectbody,Consumer<Object>localTransaction);}

2. 通用RocketMQ事务消息监听器(DelegatingTransactionListener)

这是事务消息的核心灵魂!统一处理本地事务执行+事务回查,支持按Topic注册回查逻辑,保证分布式事务安全。

importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.spring.annotation.RocketMQTransactionListener;importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionState;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.transaction.PlatformTransactionManager;importorg.springframework.transaction.support.TransactionTemplate;importjava.util.concurrent.ConcurrentMap;importjava.util.concurrent.ConcurrentHashMap;importjava.util.function.Consumer;/** * 通用的 RocketMQ 事务消息监听器 * 统一处理:本地事务执行 + 事务回查 * @author 你的ID */@Slf4j@RocketMQTransactionListenerpublicclassDelegatingTransactionListenerimplementsRocketMQLocalTransactionListener{// 事务ID消息头(生产者传递过来)staticfinalStringHEADER_TX_ID="TRANSACTION_CONTEXT_ID";// 主题消息头(用于事务回查匹配)staticfinalStringHEADER_TOPIC="TRANSACTION_TOPIC";/** * 本地事务执行逻辑,per-message,仅当前实例有效 * key: txId 事务唯一ID * value: 业务本地事务逻辑 */privatefinalConcurrentMap<String,Consumer<Object>>localTransactionMap=newConcurrentHashMap<>();/** * 事务回查逻辑,per-topic,所有实例共享(Spring Bean 注册) * key: topic 消息主题 * value: 事务回查器 */privatefinalConcurrentMap<String,TransactionChecker>checkerMap=newConcurrentHashMap<>();// 事务管理器(保证本地事务原子性)@AutowiredprivatePlatformTransactionManagertransactionManager;/** * 注册本地事务(生产者调用) */publicvoidregisterLocalTransaction(StringtxId,Consumer<Object>localTransaction){localTransactionMap.put(txId,localTransaction);}/** * 注册事务回查器(业务初始化时调用) */publicvoidregisterChecker(Stringtopic,TransactionCheckerchecker){checkerMap.put(topic,checker);}/** * 执行本地事务(Half消息发送成功后回调) */@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemessage,Objectarg){StringtxId=(String)message.getHeaders().get(HEADER_TX_ID);Consumer<Object>localTransaction=txId!=null?localTransactionMap.remove(txId):null;// 无事务逻辑 → 回滚if(localTransaction==null){log.error("[事务消息] 未找到本地事务逻辑, txId={}",txId);returnRocketMQLocalTransactionState.ROLLBACK;}try{// 手动开启事务,保证本地事务原子性newTransactionTemplate(transactionManager).executeWithoutResult(status->localTransaction.accept(arg));// 事务执行成功 → 提交消息returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){log.error("[事务消息] 本地事务执行失败, txId={}",txId,e);// 事务执行失败 → 回滚消息returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 事务回查(异常场景:服务宕机、事务超时,RocketMQ主动回查) */@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemessage){Stringtopic=(String)message.getHeaders().get(HEADER_TOPIC);TransactionCheckerchecker=topic!=null?checkerMap.get(topic):null;// 无回查器 → 默认回滚if(checker==null){log.warn("[事务消息] 回查时未找到 topic={} 对应的 checker, 默认 ROLLBACK",topic);returnRocketMQLocalTransactionState.ROLLBACK;}try{// 解析消息体,执行回查逻辑MessageWrapper<?>wrapper=(MessageWrapper<?>)message.getPayload();booleancommitted=checker.check(wrapper);// 根据回查结果决定提交/回滚RocketMQLocalTransactionStatestate=committed?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;log.info("[事务消息] 回查结果: topic={}, state={}",topic,state);returnstate;}catch(Exceptione){log.error("[事务消息] 回查异常, topic={}",topic,e);// 异常 → 未知状态,等待下次回查returnRocketMQLocalTransactionState.UNKNOWN;}}}

3. RocketMQ生产者适配器(RocketMQProducerAdapter)

实现通用接口,对接Spring Cloud Alibaba RocketMQ,封装消息发送、事务管理、日志打印、异常捕获等逻辑。

importcn.hutool.core.util.StrUtil;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.common.message.MessageConst;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importjava.util.UUID;importjava.util.function.Consumer;/** * 基于 RocketMQ 的消息生产者适配器 * 实现通用消息队列接口,屏蔽底层RocketMQ细节 * @author 你的ID */@Slf4j@RequiredArgsConstructorpublicclassRocketMQProducerAdapterimplementsMessageQueueProducer{// RocketMQ核心模板privatefinalRocketMQTemplaterocketMQTemplate;// 事务监听器(托管本地事务+事务回查)privatefinalDelegatingTransactionListenertransactionListener;/** * 发送普通同步消息 */@OverridepublicSendResultsend(Stringtopic,Stringkeys,StringbizDesc,Objectbody){// 无业务key时,自动生成UUID(保证消息唯一)keys=StrUtil.isEmpty(keys)?UUID.randomUUID().toString():keys;// 构建消息体(包装业务数据+keys)Message<MessageWrapper<Object>>message=MessageBuilder.withPayload(MessageWrapper.builder().keys(keys).body(body).build()).setHeader(MessageConst.PROPERTY_KEYS,keys).build();SendResultsendResult;try{// 同步发送消息sendResult=rocketMQTemplate.syncSend(topic,message);}catch(Throwableex){log.error("[生产者] {} - 消息发送失败,topic: {}, keys: {}",bizDesc,topic,keys,ex);throwex;}// 标准化日志输出log.info("[生产者] {} - 发送结果: {}, 消息ID: {}, Keys: {}",bizDesc,sendResult.getSendStatus(),sendResult.getMsgId(),keys);returnsendResult;}/** * 发送事务消息(封装Half消息+本地事务逻辑) */@OverridepublicvoidsendInTransaction(Stringtopic,Stringkeys,StringbizDesc,Objectbody,Consumer<Object>localTransaction){// 生成唯一标识keys=StrUtil.isEmpty(keys)?UUID.randomUUID().toString():keys;StringtxId=UUID.randomUUID().toString();// 注册本地事务到监听器(事务执行时回调)transactionListener.registerLocalTransaction(txId,localTransaction);// 构建事务消息(携带事务ID、topic,用于回查)Message<MessageWrapper<Object>>message=MessageBuilder.withPayload(MessageWrapper.builder().keys(keys).body(body).build()).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(DelegatingTransactionListener.HEADER_TX_ID,txId).setHeader(DelegatingTransactionListener.HEADER_TOPIC,topic).build();TransactionSendResultsendResult;try{// 发送事务消息sendResult=rocketMQTemplate.sendMessageInTransaction(topic,message,null);}catch(Throwableex){log.error("[生产者] {} - 事务消息发送失败,topic: {}, keys: {}",bizDesc,topic,keys,ex);throwex;}log.info("[生产者] {} - 事务消息发送结果: {}, 本地事务状态: {}, 消息ID: {}, Keys: {}",bizDesc,sendResult.getSendStatus(),sendResult.getLocalTransactionState(),sendResult.getMsgId(),keys);}}

4. 配套基础类(保证代码可直接运行)

(1)消息包装类(MessageWrapper)

统一消息体格式,存储业务key和业务数据:

importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;/** * 消息包装类 * 统一封装业务key和业务载荷 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassMessageWrapper<T>{/** * 业务key(幂等用) */privateStringkeys;/** * 业务消息体 */privateTbody;}
(2)事务回查接口(TransactionChecker)

用于自定义事务回查逻辑:

/** * 事务回查接口 * 业务实现此接口,自定义本地事务状态校验逻辑 */publicinterfaceTransactionChecker{/** * 检查本地事务是否已提交 * @param wrapper 消息包装体 * @return true=已提交,false=未提交/已回滚 */booleancheck(MessageWrapper<?>wrapper);}

三、代码核心特性讲解

1. 普通消息核心能力

  1. 自动生成业务key:未传入keys时自动生成UUID,保证消息唯一性,支持幂等消费;
  2. 消息统一包装:通过MessageWrapper标准化消息体,方便消费者解析;
  3. 异常捕获+日志:发送失败自动打印错误日志,抛出异常方便业务层处理;
  4. 同步发送:使用syncSend保证消息发送可靠性。

2. 事务消息核心能力

  1. 屏蔽复杂流程:业务层无需关心Half消息、事务提交/回滚,只需传入本地事务逻辑;
  2. 事务托管:通过DelegatingTransactionListener托管本地事务,自动执行+事务控制;
  3. 完整事务回查:异常宕机/超时场景,RocketMQ主动回查,保证事务最终一致性;
  4. 按Topic隔离:不同Topic支持独立的回查逻辑,扩展性极强;
  5. 手动事务控制:使用TransactionTemplate保证本地事务原子性。

四、业务层使用示例

1. 发送普通消息

@Service@RequiredArgsConstructorpublicclassBizService{privatefinalMessageQueueProducermessageQueueProducer;publicvoidsendNormalMsg(){// 业务数据UserDTOuser=newUserDTO("1","测试用户");// 发送消息messageQueueProducer.send("user-topic","user-1","用户创建消息",user);}}

2. 发送事务消息

@Service@RequiredArgsConstructorpublicclassBizService{privatefinalMessageQueueProducermessageQueueProducer;privatefinalUserMapperuserMapper;privatefinalDelegatingTransactionListenertransactionListener;// 项目启动时注册事务回查器@PostConstructpublicvoidinitChecker(){transactionListener.registerChecker("user-trans-topic",wrapper->{// 事务回查逻辑:根据消息key查询数据库,判断事务是否提交StringuserId=((UserDTO)wrapper.getBody()).getId();returnuserMapper.selectById(userId)!=null;});}publicvoidsendTransactionMsg(){UserDTOuser=newUserDTO("1","测试用户");// 发送事务消息,传入本地事务逻辑messageQueueProducer.sendInTransaction("user-trans-topic","trans-user-1","用户创建事务消息",user,(arg)->{// 本地事务逻辑:执行数据库操作userMapper.insert(user);// 抛异常则自动回滚消息});}}

五、总结

封装的RocketMQ通用生产者具备以下优势:

  1. 高扩展性:面向接口编程,切换MQ中间件只需新增适配器;
  2. 生产可用:包含幂等、日志、异常处理、事务消息、事务回查等企业级能力;
  3. 使用简单:业务层零感知底层细节,一行代码发送消息;
  4. 事务安全:完整实现RocketMQ事务消息机制,保证分布式事务最终一致性;
  5. 规范统一:强制业务规范,避免团队开发混乱。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/8 15:53:46

阿里云2026年喂饭级教程集成Hermes Agent/OpenClaw及Token Plan

阿里云2026年喂饭级教程集成Hermes Agent/OpenClaw及Token Plan。OpenClaw作为阿里云生态下新一代的开源AI自动化代理平台&#xff0c;曾用名Moltbot/Clawdbot&#xff0c;凭借“自然语言交互自动化任务执行大模型智能决策”的核心能力&#xff0c;正在重构个人与企业的工作效率…

作者头像 李华
网站建设 2026/5/8 15:53:30

随机像素艺术:SDL2的应用实例

在现代编程中,图形和图像处理是一个非常热门的话题。今天我们来探讨如何使用SDL2(Simple DirectMedia Layer 2)库来创建一个简单却有趣的随机像素生成器项目。 项目背景 想象一下,你想要在一个2000x2000像素的窗口中生成随机移动的像素点,每个点根据移动方向显示不同的颜…

作者头像 李华
网站建设 2026/5/8 15:53:21

Arm ETE与TRBE技术架构解析及调试实践

1. Arm ETE与TRBE技术架构解析在处理器调试与性能分析领域&#xff0c;嵌入式追踪技术如同手术中的内窥镜&#xff0c;能够实时观察指令执行的微观行为。Armv9架构中的嵌入式追踪扩展&#xff08;Embedded Trace Extension, ETE&#xff09;与追踪缓冲区扩展&#xff08;Trace …

作者头像 李华
网站建设 2026/5/8 15:52:57

驾驶安全新视角:人机协同与主动安全技术应用

1. 从“人机之争”到“人机协同”&#xff1a;重新审视驾驶安全的核心矛盾最近几年&#xff0c;每次有涉及高级辅助驾驶或自动驾驶测试车辆的事故发生&#xff0c;舆论场总会迅速分裂成两个阵营&#xff1a;一方急切地宣告“AI还不成熟&#xff0c;人类驾驶更可靠”&#xff0c…

作者头像 李华
网站建设 2026/5/8 15:52:52

FPGA加速开发实战:从HLS到RAS平台化转型与性能优化

1. 从HDL到云端&#xff1a;FPGA开发范式的演进与挑战如果你和我一样&#xff0c;在FPGA这个行当里摸爬滚打了十几年&#xff0c;从VHDL/Verilog一行行代码“焊”逻辑&#xff0c;到看着芯片规模从几万门膨胀到几千万门&#xff0c;你一定能感受到那股暗流涌动的变革力量。过去…

作者头像 李华
网站建设 2026/5/8 15:52:39

洛阳汽车贴膜门店怎么选?2026年靠谱门店榜单,这几家门店实测。

前言洛阳本地汽车后市场协会 2025 年发布的《洛阳汽车后市场消费白皮书》显示&#xff0c;截至 2026 年洛阳汽车保有量已突破 280 万辆&#xff0c;汽车贴膜作为新车落地、老车养护的刚需项目&#xff0c;本地市场规模已超 2 亿元。但从消费投诉数据来看&#xff0c;汽车贴膜品…

作者头像 李华