news 2026/4/23 9:58:20

宇树Java面试被问:RocketMQ事务消息的二阶段提交实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
宇树Java面试被问:RocketMQ事务消息的二阶段提交实现

一、核心概念理解

事务消息解决什么问题?

java

复制

下载

// 分布式事务典型问题:本地事务与消息发送的一致性 // 传统方式存在的问题: 1. 先发消息,后执行本地事务 → 消息发送成功但本地事务失败 2. 先执行本地事务,后发消息 → 本地事务成功但消息发送失败

RocketMQ事务消息的核心机制

text

复制

下载

Producer发送Half消息 → Broker存储Half消息 → 执行本地事务 ↓ Broker等待事务状态回查 ← Producer返回本地事务结果 ↓ 根据结果提交或回滚消息

二、两阶段提交详细流程

第一阶段:发送Half消息

java

复制

下载

public class TransactionProducer { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); // 2. 设置事务监听器(核心) producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg Half消息 * @param arg 业务参数 * @return 本地事务状态 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地数据库事务 boolean success = doLocalBusinessTransaction(msg, arg); if (success) { System.out.println("本地事务执行成功,提交消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("本地事务执行失败,回滚消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println("本地事务执行异常,回查"); return LocalTransactionState.UNKNOW; } } /** * 事务回查(Broker主动查询事务状态) * @param msg Half消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据业务ID查询本地事务状态 String transactionId = msg.getTransactionId(); boolean status = queryLocalTransactionStatus(transactionId); if (status) { System.out.println("事务回查:本地事务已提交"); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println("事务回查:本地事务已回滚"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 Message msg = new Message("TransactionTopic", "TagA", "Order-001".getBytes(StandardCharsets.UTF_8)); // 设置事务ID(关键) msg.setKeys("TXN-" + System.currentTimeMillis()); // 发送Half消息(第一阶段) SendResult sendResult = producer.sendMessageInTransaction(msg, // 业务参数,会在executeLocalTransaction中传递 new BusinessParam("orderId", "123456", 100.00) ); System.out.println("Half消息发送结果: " + sendResult.getSendStatus()); } }

篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】

三、完整执行时序图

text

复制

下载

┌─────────┐ ┌────────┐ ┌────────┐ │ Producer│ │ Broker │ │ 本地DB │ └────┬────┘ └───┬────┘ └────┬───┘ │ 1.发送Half消息 │ │ │───────────────>│ │ │ │ │ │ │ 2.存储Half消息 │ │ │───────────────>│ │ │ │ │ 3.返回Half成功 │ │ │<───────────────│ │ │ │ │ │ 4.执行本地事务 │ │ │────────────────────────────────>│ │ │ │ │ 5.返回事务状态 │ │ │───────────────>│ │ │ │ │ │ │6.提交/回滚消息 │ │ │───────────────>│ │ │ │ │(可能)7.事务回查 │ │ │<───────────────│ │ │ │ │ │ 8.返回回查结果 │ │ │───────────────>│ │ │ │ │ │ │9.最终提交/回滚 │ │ │───────────────>│

四、关键配置参数

yaml

复制

下载

# Broker端配置 broker.conf: transactionCheckMax: 15 # 最大回查次数 transactionCheckInterval: 60000 # 回查间隔(ms) transactionTimeOut: 6000 # 超时时间(ms) # Producer端配置 producer: checkThreadPoolMinSize: 1 # 回查线程池最小 checkThreadPoolMaxSize: 1 # 回查线程池最大 checkRequestHoldMax: 2000 # 回查请求队列大小

五、代码实现最佳实践

1. 完整的订单事务示例

java

复制

下载

@Service public class OrderTransactionService { @Resource private OrderMapper orderMapper; @Resource private TransactionMQProducer transactionMQProducer; /** * 创建订单事务消息 */ public void createOrderWithTransaction(OrderDTO orderDTO) { // 构建消息 Message msg = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(orderDTO)); // 设置业务标识 msg.setKeys("ORDER_" + orderDTO.getOrderNo()); msg.putUserProperty("businessType", "ORDER_CREATE"); // 发送事务消息 SendResult sendResult = transactionMQProducer.sendMessageInTransaction( msg, new OrderTransactionArg(orderDTO) ); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("Half消息发送失败"); } } /** * 事务监听器实现 */ @Component public class OrderTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { OrderTransactionArg transactionArg = (OrderTransactionArg) arg; OrderDTO orderDTO = transactionArg.getOrderDTO(); try { // 1. 保存订单到数据库 Order order = convertToOrder(orderDTO); orderMapper.insert(order); // 2. 扣减库存(调用库存服务) boolean deductResult = inventoryService.deductStock( orderDTO.getProductId(), orderDTO.getQuantity() ); if (!deductResult) { // 库存不足,回滚本地事务 orderMapper.deleteById(order.getId()); return LocalTransactionState.ROLLBACK_MESSAGE; } // 3. 记录事务日志(用于回查) transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", order.getId(), LocalTransactionState.COMMIT_MESSAGE.name() ); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("订单本地事务执行异常", e); // 记录异常状态 transactionLogService.saveTransactionLog( msg.getTransactionId(), "ORDER_CREATE", null, LocalTransactionState.UNKNOW.name() ); return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据事务ID查询事务日志 String transactionId = msg.getTransactionId(); TransactionLog log = transactionLogService.getByTransactionId(transactionId); if (log == null) { // 没有事务记录,需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } if ("COMMIT_MESSAGE".equals(log.getStatus())) { // 事务已提交 return LocalTransactionState.COMMIT_MESSAGE; } else { // 事务需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } } } /** * 事务参数封装 */ @Data @AllArgsConstructor public static class OrderTransactionArg { private OrderDTO orderDTO; } }

2. 消费端幂等处理

java

复制

下载

@Component @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "ORDER_CONSUMER_GROUP" ) public class OrderConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { // 1. 检查消息幂等性 String messageId = message.getMsgId(); if (redisTemplate.hasKey("MSG_" + messageId)) { log.info("消息已处理,跳过: {}", messageId); return; } // 2. 解析消息 OrderDTO orderDTO = JSON.parseObject(message.getBody(), OrderDTO.class); // 3. 业务处理 try { // 更新订单状态为已确认 orderService.confirmOrder(orderDTO.getOrderNo()); // 4. 记录已处理消息 redisTemplate.opsForValue().set( "MSG_" + messageId, "PROCESSED", 1, TimeUnit.HOURS ); } catch (Exception e) { log.error("订单处理失败,将重试", e); throw new RuntimeException(e); } } }

六、面试问题回答要点

问题:RocketMQ事务消息如何实现二阶段提交?

回答结构:

  1. 概念解释

    • "RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性"

    • "核心思想:将本地事务和消息发送绑定,通过Half消息和状态回查机制实现"

  2. 第一阶段(Half消息阶段)

    • "Producer发送Half消息到Broker,Broker存储但不对Consumer可见"

    • "Half消息发送成功后,执行本地事务"

    • "本地事务执行结果返回给Broker:COMMIT、ROLLBACK或UNKNOWN"

篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc

需要全套面试笔记及答案
【点击此处即可/免费获取】​​​

  1. 第二阶段(状态确认阶段)

    • "如果本地事务返回COMMIT/ROLLBACK,Broker立即提交/回滚消息"

    • "如果返回UNKNOWN,Broker会发起事务状态回查"

    • "Producer实现TransactionListener.checkLocalTransaction()进行状态查询"

  2. 关键机制

    • "事务状态回查:解决网络超时或生产者宕机问题"

    • "消息幂等性:消费端需要处理重复消息"

    • "超时机制:超过配置时间未确认的消息会自动回滚"

  3. 代码示例

    java

    复制 下载
    // 简要展示核心代码结构 producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状态回查 } });
  4. 适用场景

    • "订单创建+通知库存"

    • "支付成功+发送通知"

    • "任何需要保证本地事务和消息发送一致性的场景"

  5. 注意事项

    • "事务消息不支持定时和批量消息"

    • "确保checkLocalTransaction方法的幂等性"

    • "合理配置回查次数和间隔"

面试加分项

  • 提到"最大努力通知型事务"

  • 对比TCC、Saga等分布式事务方案

  • 强调消息幂等处理的重要性

  • 提及RocketMQ 4.3+的事务消息优化

这样的回答既展示了理论知识,又体现了实际编码能力,适合中高级Java岗位面试。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 8:36:16

计算机毕业设计springboot高校线上选课管理系统 高校在线选课系统的设计与实现 基于B/S架构的教务选课服务平台开发

计算机毕业设计springboot高校线上选课管理系统 &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。高等教育规模持续扩大与教学模式数字化转型交织推动&#xff0c;传统线下选课方式…

作者头像 李华
网站建设 2026/4/22 22:41:21

客服接待功能

功能提示&#xff1a; 客服接待可通过PC端和手机移动端PC端登录PC端浏览器输入 &#xff1a; 您的域名/kefu 即可进入客服登录页面。&#xff08;1&#xff09;账号密码登录&#xff08;2&#xff09;扫码登录也可通过系统后台&#xff0c;客服管理 > 客服列表 > 进入工作…

作者头像 李华
网站建设 2026/4/23 8:36:16

西门子200smart模拟量滤波防抖程序:让信号采集更稳更准

西门子200smart模拟量滤波防抖程序&#xff0c;能实现电流电压和热电阻模拟量信号的采集&#xff0c;有滤波&#xff0c;有高位和低位报警&#xff0c;采用for循环指令和间接寻址&#xff0c;让程序简单好用&#xff0c;并且针对程序&#xff0c;录制了视频讲解&#xff0c;详细…

作者头像 李华
网站建设 2026/4/23 8:36:48

GIF裁剪后画面变形?教你3种精准裁剪动图尺寸的方法

制作表情包、编辑聊天动图、发布社交媒体内容时&#xff0c;裁剪GIF动画画面尺寸是高频需求。但很多人遇到这些问题&#xff1a;裁剪后动图播放卡顿、帧序错乱、关键元素被裁掉&#xff0c;或上传后被强制拉伸变形&#xff0c;尤其是制作微信表情时&#xff0c;没按1:1方形比例…

作者头像 李华
网站建设 2026/4/23 8:33:52

国标GB28181算法算力平台EasyGBS助力工业生产全流程可视化与智能化监管

一、引言 在制造业数字化转型进入深水区的当下&#xff0c;生产全流程的透明化管控与精细化运营已成为企业降本增效、保障安全的核心诉求。企业想要在行业中占据优势&#xff0c;实现生产流程的高效管理与精准把控成为关键。国标GB28181算法算力平台EasyGBS凭借其全协议兼容、…

作者头像 李华
网站建设 2026/4/23 8:35:43

网络安全到底是什么?一篇概念详解(附学习资料)

一、什么是网络安全&#xff1f; “网络安全是指网络系统的硬件、软件及其系统中的数据受到保护&#xff0c;不因偶然的或者恶意的原因而遭受到破坏、更改、泄露、系统连续可靠正常地运行&#xff0c;网络服务不中断。” 说白了网络安全就是维护网络系统上的信息安全。 信息…

作者头像 李华