RocketMQ封装通用消息队列生产者
在分布式系统开发中,RocketMQ作为高性能的消息中间件被广泛使用。为了简化业务层使用、统一消息发送规范、屏蔽底层RocketMQ细节,本文将手把手带你封装一个通用的消息队列生产者接口,实现普通同步消息和事务消息的统一管理,支持业务key、日志标识、事务回查等核心能力,直接可落地到生产环境!
一、封装背景与设计思路
1. 为什么要封装?
- 解耦底层依赖:业务代码不直接依赖RocketMQTemplate,后续切换消息中间件只需修改适配器,无需改动业务逻辑;
- 统一规范:强制要求业务携带
topic、keys(幂等)、bizDesc(日志标识),避免日志混乱、消息幂等性无法保证; - 简化使用:封装事务消息的复杂流程,业务层只需关注本地事务逻辑,无需关心Half消息、事务提交/回滚细节;
- 统一日志:标准化生产者日志,方便问题排查;
- 事务安全:完整实现事务回查机制,保证分布式事务最终一致性。
2. 核心设计
- 接口层:定义
MessageQueueProducer通用接口,抽象普通消息、事务消息发送方法; - 适配层:
RocketMQProducerAdapter实现接口,对接RocketMQTemplate,处理消息包装、异常、日志; - 事务增强:
DelegatingTransactionListener统一托管本地事务+事务回查,保证分布式事务可靠性。
二、核心代码实现
1. 消息队列生产者接口(MessageQueueProducer)
首先定义通用接口,规范所有消息生产者的行为,不绑定任何中间件,具备极高扩展性。
/** * 消息队列生产者接口 * 通用抽象,适配各类MQ中间件 * @author 你的ID */publicinterfaceMessageQueueProducer{/** * 发送普通同步消息 * * @param topic 目标topic * @param keys 业务key,可用于幂等判断 * @param bizDesc 业务描述,用于日志标识 * @param body 业务载荷 * @return RocketMQ 发送结果,包含 msgId、sendStatus 等信息 */SendResultsend(Stringtopic,Stringkeys,StringbizDesc,Objectbody);/** * 发送事务消息 * <p> * 流程:发送 half 消息 → 执行本地事务 → 根据结果 commit/rollback * <p> * 事务回查由按 topic 注册的 {@link TransactionChecker} 处理,需提前通过 * {@link DelegatingTransactionListener#registerChecker(String, TransactionChecker)} 注册 * * @param topic 目标 topic * @param keys 业务 key * @param bizDesc 业务描述 * @param body 业务载荷 * @param localTransaction 本地事务逻辑,在 half 消息发送成功后执行;抛异常则回滚消息 */voidsendInTransaction(Stringtopic,Stringkeys,StringbizDesc,Objectbody,Consumer<Object>localTransaction);}2. 通用RocketMQ事务消息监听器(DelegatingTransactionListener)
这是事务消息的核心灵魂!统一处理本地事务执行+事务回查,支持按Topic注册回查逻辑,保证分布式事务安全。
importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.spring.annotation.RocketMQTransactionListener;importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;importorg.apache.rocketmq.spring.core.RocketMQLocalTransactionState;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.transaction.PlatformTransactionManager;importorg.springframework.transaction.support.TransactionTemplate;importjava.util.concurrent.ConcurrentMap;importjava.util.concurrent.ConcurrentHashMap;importjava.util.function.Consumer;/** * 通用的 RocketMQ 事务消息监听器 * 统一处理:本地事务执行 + 事务回查 * @author 你的ID */@Slf4j@RocketMQTransactionListenerpublicclassDelegatingTransactionListenerimplementsRocketMQLocalTransactionListener{// 事务ID消息头(生产者传递过来)staticfinalStringHEADER_TX_ID="TRANSACTION_CONTEXT_ID";// 主题消息头(用于事务回查匹配)staticfinalStringHEADER_TOPIC="TRANSACTION_TOPIC";/** * 本地事务执行逻辑,per-message,仅当前实例有效 * key: txId 事务唯一ID * value: 业务本地事务逻辑 */privatefinalConcurrentMap<String,Consumer<Object>>localTransactionMap=newConcurrentHashMap<>();/** * 事务回查逻辑,per-topic,所有实例共享(Spring Bean 注册) * key: topic 消息主题 * value: 事务回查器 */privatefinalConcurrentMap<String,TransactionChecker>checkerMap=newConcurrentHashMap<>();// 事务管理器(保证本地事务原子性)@AutowiredprivatePlatformTransactionManagertransactionManager;/** * 注册本地事务(生产者调用) */publicvoidregisterLocalTransaction(StringtxId,Consumer<Object>localTransaction){localTransactionMap.put(txId,localTransaction);}/** * 注册事务回查器(业务初始化时调用) */publicvoidregisterChecker(Stringtopic,TransactionCheckerchecker){checkerMap.put(topic,checker);}/** * 执行本地事务(Half消息发送成功后回调) */@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemessage,Objectarg){StringtxId=(String)message.getHeaders().get(HEADER_TX_ID);Consumer<Object>localTransaction=txId!=null?localTransactionMap.remove(txId):null;// 无事务逻辑 → 回滚if(localTransaction==null){log.error("[事务消息] 未找到本地事务逻辑, txId={}",txId);returnRocketMQLocalTransactionState.ROLLBACK;}try{// 手动开启事务,保证本地事务原子性newTransactionTemplate(transactionManager).executeWithoutResult(status->localTransaction.accept(arg));// 事务执行成功 → 提交消息returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){log.error("[事务消息] 本地事务执行失败, txId={}",txId,e);// 事务执行失败 → 回滚消息returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 事务回查(异常场景:服务宕机、事务超时,RocketMQ主动回查) */@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemessage){Stringtopic=(String)message.getHeaders().get(HEADER_TOPIC);TransactionCheckerchecker=topic!=null?checkerMap.get(topic):null;// 无回查器 → 默认回滚if(checker==null){log.warn("[事务消息] 回查时未找到 topic={} 对应的 checker, 默认 ROLLBACK",topic);returnRocketMQLocalTransactionState.ROLLBACK;}try{// 解析消息体,执行回查逻辑MessageWrapper<?>wrapper=(MessageWrapper<?>)message.getPayload();booleancommitted=checker.check(wrapper);// 根据回查结果决定提交/回滚RocketMQLocalTransactionStatestate=committed?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;log.info("[事务消息] 回查结果: topic={}, state={}",topic,state);returnstate;}catch(Exceptione){log.error("[事务消息] 回查异常, topic={}",topic,e);// 异常 → 未知状态,等待下次回查returnRocketMQLocalTransactionState.UNKNOWN;}}}3. RocketMQ生产者适配器(RocketMQProducerAdapter)
实现通用接口,对接Spring Cloud Alibaba RocketMQ,封装消息发送、事务管理、日志打印、异常捕获等逻辑。
importcn.hutool.core.util.StrUtil;importlombok.RequiredArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.common.message.MessageConst;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importjava.util.UUID;importjava.util.function.Consumer;/** * 基于 RocketMQ 的消息生产者适配器 * 实现通用消息队列接口,屏蔽底层RocketMQ细节 * @author 你的ID */@Slf4j@RequiredArgsConstructorpublicclassRocketMQProducerAdapterimplementsMessageQueueProducer{// RocketMQ核心模板privatefinalRocketMQTemplaterocketMQTemplate;// 事务监听器(托管本地事务+事务回查)privatefinalDelegatingTransactionListenertransactionListener;/** * 发送普通同步消息 */@OverridepublicSendResultsend(Stringtopic,Stringkeys,StringbizDesc,Objectbody){// 无业务key时,自动生成UUID(保证消息唯一)keys=StrUtil.isEmpty(keys)?UUID.randomUUID().toString():keys;// 构建消息体(包装业务数据+keys)Message<MessageWrapper<Object>>message=MessageBuilder.withPayload(MessageWrapper.builder().keys(keys).body(body).build()).setHeader(MessageConst.PROPERTY_KEYS,keys).build();SendResultsendResult;try{// 同步发送消息sendResult=rocketMQTemplate.syncSend(topic,message);}catch(Throwableex){log.error("[生产者] {} - 消息发送失败,topic: {}, keys: {}",bizDesc,topic,keys,ex);throwex;}// 标准化日志输出log.info("[生产者] {} - 发送结果: {}, 消息ID: {}, Keys: {}",bizDesc,sendResult.getSendStatus(),sendResult.getMsgId(),keys);returnsendResult;}/** * 发送事务消息(封装Half消息+本地事务逻辑) */@OverridepublicvoidsendInTransaction(Stringtopic,Stringkeys,StringbizDesc,Objectbody,Consumer<Object>localTransaction){// 生成唯一标识keys=StrUtil.isEmpty(keys)?UUID.randomUUID().toString():keys;StringtxId=UUID.randomUUID().toString();// 注册本地事务到监听器(事务执行时回调)transactionListener.registerLocalTransaction(txId,localTransaction);// 构建事务消息(携带事务ID、topic,用于回查)Message<MessageWrapper<Object>>message=MessageBuilder.withPayload(MessageWrapper.builder().keys(keys).body(body).build()).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(DelegatingTransactionListener.HEADER_TX_ID,txId).setHeader(DelegatingTransactionListener.HEADER_TOPIC,topic).build();TransactionSendResultsendResult;try{// 发送事务消息sendResult=rocketMQTemplate.sendMessageInTransaction(topic,message,null);}catch(Throwableex){log.error("[生产者] {} - 事务消息发送失败,topic: {}, keys: {}",bizDesc,topic,keys,ex);throwex;}log.info("[生产者] {} - 事务消息发送结果: {}, 本地事务状态: {}, 消息ID: {}, Keys: {}",bizDesc,sendResult.getSendStatus(),sendResult.getLocalTransactionState(),sendResult.getMsgId(),keys);}}4. 配套基础类(保证代码可直接运行)
(1)消息包装类(MessageWrapper)
统一消息体格式,存储业务key和业务数据:
importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;/** * 消息包装类 * 统一封装业务key和业务载荷 */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassMessageWrapper<T>{/** * 业务key(幂等用) */privateStringkeys;/** * 业务消息体 */privateTbody;}(2)事务回查接口(TransactionChecker)
用于自定义事务回查逻辑:
/** * 事务回查接口 * 业务实现此接口,自定义本地事务状态校验逻辑 */publicinterfaceTransactionChecker{/** * 检查本地事务是否已提交 * @param wrapper 消息包装体 * @return true=已提交,false=未提交/已回滚 */booleancheck(MessageWrapper<?>wrapper);}三、代码核心特性讲解
1. 普通消息核心能力
- 自动生成业务key:未传入
keys时自动生成UUID,保证消息唯一性,支持幂等消费; - 消息统一包装:通过
MessageWrapper标准化消息体,方便消费者解析; - 异常捕获+日志:发送失败自动打印错误日志,抛出异常方便业务层处理;
- 同步发送:使用
syncSend保证消息发送可靠性。
2. 事务消息核心能力
- 屏蔽复杂流程:业务层无需关心Half消息、事务提交/回滚,只需传入本地事务逻辑;
- 事务托管:通过
DelegatingTransactionListener托管本地事务,自动执行+事务控制; - 完整事务回查:异常宕机/超时场景,RocketMQ主动回查,保证事务最终一致性;
- 按Topic隔离:不同Topic支持独立的回查逻辑,扩展性极强;
- 手动事务控制:使用
TransactionTemplate保证本地事务原子性。
四、业务层使用示例
1. 发送普通消息
@Service@RequiredArgsConstructorpublicclassBizService{privatefinalMessageQueueProducermessageQueueProducer;publicvoidsendNormalMsg(){// 业务数据UserDTOuser=newUserDTO("1","测试用户");// 发送消息messageQueueProducer.send("user-topic","user-1","用户创建消息",user);}}2. 发送事务消息
@Service@RequiredArgsConstructorpublicclassBizService{privatefinalMessageQueueProducermessageQueueProducer;privatefinalUserMapperuserMapper;privatefinalDelegatingTransactionListenertransactionListener;// 项目启动时注册事务回查器@PostConstructpublicvoidinitChecker(){transactionListener.registerChecker("user-trans-topic",wrapper->{// 事务回查逻辑:根据消息key查询数据库,判断事务是否提交StringuserId=((UserDTO)wrapper.getBody()).getId();returnuserMapper.selectById(userId)!=null;});}publicvoidsendTransactionMsg(){UserDTOuser=newUserDTO("1","测试用户");// 发送事务消息,传入本地事务逻辑messageQueueProducer.sendInTransaction("user-trans-topic","trans-user-1","用户创建事务消息",user,(arg)->{// 本地事务逻辑:执行数据库操作userMapper.insert(user);// 抛异常则自动回滚消息});}}五、总结
封装的RocketMQ通用生产者具备以下优势:
- 高扩展性:面向接口编程,切换MQ中间件只需新增适配器;
- 生产可用:包含幂等、日志、异常处理、事务消息、事务回查等企业级能力;
- 使用简单:业务层零感知底层细节,一行代码发送消息;
- 事务安全:完整实现RocketMQ事务消息机制,保证分布式事务最终一致性;
- 规范统一:强制业务规范,避免团队开发混乱。