news 2026/6/10 19:47:31

从原理到落地:本地消息表 + RocketMQ 分布式事务方案

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从原理到落地:本地消息表 + RocketMQ 分布式事务方案

如今的微服务架构中,分布式事务是保证跨服务数据一致性的核心难题。

本文采用本地消息表与RocketMQ实现最终一致性分布式事务,解决电商下单场景中,订单创建库存扣减两个不同服务分布式调用时,保证要么全部成功,要么全部失败的问题,全程不依赖复杂中间件,架构简单、生产可用,实现业务落地,其他的业务可以参考。


一、什么是本地消息表模式?

本地消息表(Local Message Table)是实现分布式事务的经典模式之一,也被称为事务消息模式,其核心思想可以概括为一句话:

用本地事务保证“业务操作 + 消息记录”的原子性,再通过消息重试机制实现跨服务的最终一致性。

1. 核心流程拆解

我们以“订单创建 + 库存扣减”场景为例,完整流程分为以下几步:

  1. 订单服务开启本地事务:同时执行两个操作
    • 业务操作:创建订单,订单状态标记为“待确认”;
    • 消息操作:向本地消息表插入一条待发送的消息,记录库存扣减的关键信息,消息状态标记为“待发送”。
  2. 本地事务提交:订单创建和消息插入同时成功或同时失败,确保订单存在时,对应的消息一定存在。
  3. 事务提交后发送消息:订单服务向消息队列(如Kafka、RocketMQ)发送库存扣减消息,发送成功后更新本地消息表状态为“已发送”。
  4. 库存服务消费消息:库存服务监听消息队列,收到消息后执行库存扣减操作,并向订单服务发送处理结果(成功/失败)。
  5. 结果反馈与状态更新:订单服务收到库存扣减结果,更新订单状态和本地消息状态;若库存扣减失败,可触发订单回滚或人工补偿流程。
  6. 定时任务兜底重试:启动定时任务,定期扫描本地消息表中“待发送”或“发送失败”的消息,重新投递,避免因网络波动导致的消息丢失。

2. 模式核心优势

  • 无强依赖:不依赖复杂的分布式事务中间件(如Seata、XA),基于关系型数据库和消息队列即可实现,架构简单易落地;
  • 最终一致性保障:通过本地事务 + 消息重试 + 结果反馈的闭环,确保跨服务数据最终一致;
  • 高可用:即使消息队列短暂宕机、网络分区,定时任务兜底机制也能保证消息最终被处理;
  • 解耦性强:订单服务与库存服务通过消息异步通信,无需直接耦合调用,提升系统扩展性。

二、相关表

1. 订单表

CREATETABLE`t_order`(`id`bigintNOTNULLAUTO_INCREMENT,`order_no`varchar(64)NOTNULLCOMMENT'订单编号',`user_id`bigintNOTNULL,`product_id`bigintNOTNULL,`quantity`intNOTNULL,`amount`decimal(10,2)NOTNULL,`status`tinyintNOTNULLDEFAULT0COMMENT'0待确认 1已完成 2已取消',`create_time`datetimeDEFAULTCURRENT_TIMESTAMP,`update_time`datetimeDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,PRIMARYKEY(`id`),UNIQUEKEY`uk_order_no`(`order_no`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='订单表';

2. 本地消息表

CREATETABLE`local_message`(`id`bigintNOTNULLAUTO_INCREMENT,`message_id`varchar(64)NOTNULLCOMMENT'唯一消息ID',`content`textNOTNULLCOMMENT'消息内容JSON',`topic`varchar(64)NOTNULLCOMMENT'RocketMQ主题',`status`tinyintNOTNULLDEFAULT0COMMENT'0待发送 1已发送 2发送失败 3处理成功 4处理失败',`retry_count`intDEFAULT0,`next_retry_time`datetimeNOTNULL,`create_time`datetimeDEFAULTCURRENT_TIMESTAMP,`update_time`datetimeDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,PRIMARYKEY(`id`),UNIQUEKEY`uk_message_id`(`message_id`),KEY`idx_status_retry`(`status`,`next_retry_time`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='本地消息表';

三、代码实现

1. 订单服务:创建订单 + 保存消息(事务保证原子性)

@Service@Transactional(rollbackFor=Exception.class)publicclassOrderServiceImplimplementsOrderService{@AutowiredprivateOrderMapperorderMapper;@AutowiredprivateLocalMessageMapperlocalMessageMapper;@AutowiredprivateRocketMQTemplaterocketMQTemplate;privatestaticfinalStringTOPIC="inventory-deduct-topic";@OverridepublicvoidcreateOrder(OrderCreateDTOdto){// 1. 创建订单Orderorder=buildOrder(dto);orderMapper.insert(order);// 2. 构造库存扣减消息InventoryMessagemessage=newInventoryMessage();message.setOrderId(order.getId());message.setOrderNo(order.getOrderNo());message.setProductId(dto.getProductId());message.setQuantity(dto.getQuantity());// 3. 插入本地消息(事务内)LocalMessagelocalMessage=buildLocalMessage(message);localMessageMapper.insert(localMessage);// 4. 事务提交后异步发送消息StringmessageId=localMessage.getMessageId();TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCommit(){sendMessageAfterCommit(messageId,message);}});}}

2. 发送消息到 RocketMQ

@AsyncpublicvoidsendMessageAfterCommit(StringmessageId,InventoryMessagemessage){try{rocketMQTemplate.syncSend(TOPIC,MessageBuilder.withPayload(message).setHeader("messageId",messageId).build());// 更新消息状态为已发送localMessageMapper.updateStatus(messageId,1);}catch(Exceptione){// 发送失败,等待定时任务重试localMessageMapper.updateStatus(messageId,2);}}

3. 库存服务:消费 RocketMQ 消息 + 扣库存(保证幂等)

@Service@RocketMQMessageListener(topic="inventory-deduct-topic",consumerGroup="inventory-group")publicclassInventoryConsumerimplementsRocketMQListener<InventoryMessage>{@AutowiredprivateInventoryServiceinventoryService;@OverridepublicvoidonMessage(InventoryMessagemessage){// 幂等判断:根据 messageId / orderId 避免重复扣减if(inventoryService.isProcessed(message.getMessageId())){return;}// 扣减库存inventoryService.deductStock(message.getProductId(),message.getQuantity());// 标记已处理inventoryService.markProcessed(message.getMessageId());}}

4. 定时任务:兜底重试失败消息

@Component@Scheduled(cron="0 */2 * * * ?")publicclassMessageRetryTask{@AutowiredprivateLocalMessageMapperlocalMessageMapper;@AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidretrySend(){// 查询待重试消息List<LocalMessage>list=localMessageMapper.selectRetryMessages();for(LocalMessagemsg:list){try{rocketMQTemplate.syncSend(msg.getTopic(),msg.getContent());localMessageMapper.updateStatus(msg.getMessageId(),1);}catch(Exceptione){localMessageMapper.increaseRetry(msg.getMessageId());}}}}

四、生产避坑

1. 幂等性问题:如何避免消息重复处理?

本地消息表模式中,消息重试、网络波动都可能导致库存服务重复收到同一条消息,必须实现幂等性,保证一条消息无论被消费多少次,结果都只生效一次:

  • 方案1:基于 messageId 做幂等:库存服务处理消息前,先根据messageId查询处理状态,已处理过的消息直接返回成功;
  • 方案2:基于业务主键做幂等:库存扣减时,根据orderId判断是否已处理过该订单的扣减请求;
  • 方案3:使用分布式锁:处理消息时加分布式锁,保证同一消息同一时间只有一个线程在处理。

示例代码(库存服务幂等处理):

publicbooleandeductStock(LongproductId,intquantity,LongorderId){// 1. 幂等校验:查询该订单是否已扣减过库存if(inventoryDeductRecordMapper.existsByOrderId(orderId)){log.info("订单{}已扣减过库存,无需重复处理",orderId);returntrue;}// 2. 扣减库存(乐观锁实现)intupdateCount=inventoryMapper.deductStockWithLock(productId,quantity);if(updateCount==0){log.warn("商品{}库存不足,扣减失败",productId);returnfalse;}// 3. 记录扣减记录,作为幂等标识InventoryDeductRecordrecord=newInventoryDeductRecord();record.setOrderId(orderId);record.setProductId(productId);record.setQuantity(quantity);record.setCreateTime(newDate());inventoryDeductRecordMapper.insert(record);returntrue;}

2. 消息丢失问题:如何保证消息不丢?

基于RocketMQ的分布式事务消息架构,想要做到零消息丢失,需要从三端同时保证:

  • 订单服务端:业务数据 + 本地消息在同一个本地事务中,确保消息一定落地;即使发送失败,定时任务也会持续重试,保证消息最终发出。
  • RocketMQ 服务端:使用同步刷盘 + 主从同步,消息必须落盘并同步到从节点才算发送成功,避免 Broker 宕机丢消息。
  • 消费端:使用RocketMQ 集群消费模式 + 手动 ACK,只有业务真正执行成功后才返回 CONSUME_SUCCESS,消费异常时返回 RECONSUME_LATER,让消息重新进入队列重试,绝对不会丢消息。

3. 消息堆积问题:如何处理大量未处理消息?

在高并发场景下,若库存服务处理速度跟不上发送速度,会出现RocketMQ 消息堆积,可通过以下方案解决:

  • 提高消费并行度:调整消费者线程数consumeThreadMin/consumeThreadMax,提升消费速度;
  • 重试策略优化:使用指数退避重试,避免消息频繁重试压垮服务;
  • 死信队列处理:消息重试超过 16 次(RocketMQ 默认最大重试次数)后自动进入死信队列,不再影响正常消息,由人工/补偿系统处理;
  • 优化消费逻辑:将耗时长的逻辑异步化,避免阻塞消费线程。

4. 性能瓶颈优化

基于本地消息表 + RocketMQ的架构,可从以下方向做生产级性能优化:

  • 本地消息表索引优化:为statusnext_retry_time建立联合索引,大幅提升定时任务扫描效率;
  • 消息批量消费:开启 RocketMQ 批量消费,减少IO与数据库交互次数;
  • 异步发送消息:订单创建后使用异步线程发送,不阻塞主线程,提升接口吞吐量;
  • 消息发送超时控制:设置合理的sendMsgTimeout,避免发送阻塞导致业务线程等待。

五、模式对比与适用场景

1. 本地消息表 vs 其他分布式事务方案

方案优点缺点适用场景
本地消息表实现简单、无强依赖、高可用依赖定时任务兜底、存在最终一致性延迟对实时性要求不高、对一致性要求较高的业务
Seata AT模式无侵入、性能好依赖Seata Server、存在全局锁性能瓶颈中低并发、对一致性要求高的业务
TCC模式性能高、一致性强侵入性强、开发成本高高并发、核心交易业务
XA模式强一致性性能差、阻塞时间长低并发、强一致性要求的场景

2. 适用场景

本地消息表模式特别适合以下场景:

  • 电商下单、订单支付、库存扣减等异步化业务场景;
  • 对实时性要求不高(秒级延迟可接受),但对数据一致性要求较高的业务;
  • 不想引入复杂分布式事务中间件,希望基于现有技术栈实现一致性的场景。

六、总结与扩展

本文基于本地消息表模式,完整实现了分布式事务解决方案,核心思路可以总结为:

  1. 本地事务原子性:订单创建和消息记录在同一事务中,确保“订单存在则消息必存在”;
  2. 消息异步通信:通过消息队列实现跨服务解耦,库存服务异步处理扣减操作;
  3. 重试与结果反馈:定时任务兜底重试 + 库存处理结果反馈,确保消息最终被处理;
  4. 幂等性保障:通过消息ID或业务主键实现幂等,避免重复处理导致的数据问题。



← 上一篇 别再愁Java项目没亮点!普通 CRUD 项目其实也能征服面试官!!!
记得点赞、关注、收藏哦!
下一篇 JUC小册——公平锁和非公平锁 →
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 19:43:34

# 高并发核心系统中分布式事务一致性架构演进实践

# 高并发核心系统中分布式事务一致性架构演进实践企业核心系统一旦进入多应用协同阶段&#xff0c;分布式事务就不再只是“数据库提交失败”的问题&#xff0c;而是订单、库存、质检、财务、审计等跨域状态如何在高并发下保持可追溯、可恢复、可观测。以[北京米德兰科技有限公司…

作者头像 李华
网站建设 2026/6/10 19:32:30

MySQL 8.0实战:一条SQL搞定用户签到统计(INSERT ... ON DUPLICATE KEY UPDATE详解)

MySQL 8.0实战&#xff1a;高效处理用户签到系统的原子化更新策略在用户行为跟踪系统中&#xff0c;签到功能看似简单却暗藏玄机。想象一个电商平台需要同时记录用户的首次签到日期、最近签到时间和连续签到天数——传统方案需要先查询后判断再操作&#xff0c;不仅代码臃肿&am…

作者头像 李华
网站建设 2026/6/10 19:22:05

微信小程序活动座位可视化选座源码,带用户管理与实时状态更新

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;直接可用的微信小程序选座系统源码&#xff0c;支持活动座位图动态展示、用户点击选座、已选/可选/不可用状态实时刷新、选座后自动生成订单&#xff0c;并完成用户信息登记与管理。项目包含完整页面结构&#…

作者头像 李华
网站建设 2026/6/10 19:17:01

话题锚定:信息时代最被忽视的阅读元能力

1. 这不是语法题&#xff0c;而是一场阅读理解的底层能力重建“What is the article’s topic means?”——看到这个句子&#xff0c;很多人的第一反应是&#xff1a;这明显有语病&#xff0c;应该是 “What does the article’s topic mean?” 才对。但我要说&#xff0c;停…

作者头像 李华