如今的微服务架构中,分布式事务是保证跨服务数据一致性的核心难题。
本文采用本地消息表与RocketMQ实现最终一致性分布式事务,解决电商下单场景中,订单创建和库存扣减两个不同服务分布式调用时,保证要么全部成功,要么全部失败的问题,全程不依赖复杂中间件,架构简单、生产可用,实现业务落地,其他的业务可以参考。
一、什么是本地消息表模式?
本地消息表(Local Message Table)是实现分布式事务的经典模式之一,也被称为事务消息模式,其核心思想可以概括为一句话:
用本地事务保证“业务操作 + 消息记录”的原子性,再通过消息重试机制实现跨服务的最终一致性。
1. 核心流程拆解
我们以“订单创建 + 库存扣减”场景为例,完整流程分为以下几步:
- 订单服务开启本地事务:同时执行两个操作
- 业务操作:创建订单,订单状态标记为“待确认”;
- 消息操作:向本地消息表插入一条待发送的消息,记录库存扣减的关键信息,消息状态标记为“待发送”。
- 本地事务提交:订单创建和消息插入同时成功或同时失败,确保订单存在时,对应的消息一定存在。
- 事务提交后发送消息:订单服务向消息队列(如Kafka、RocketMQ)发送库存扣减消息,发送成功后更新本地消息表状态为“已发送”。
- 库存服务消费消息:库存服务监听消息队列,收到消息后执行库存扣减操作,并向订单服务发送处理结果(成功/失败)。
- 结果反馈与状态更新:订单服务收到库存扣减结果,更新订单状态和本地消息状态;若库存扣减失败,可触发订单回滚或人工补偿流程。
- 定时任务兜底重试:启动定时任务,定期扫描本地消息表中“待发送”或“发送失败”的消息,重新投递,避免因网络波动导致的消息丢失。
2. 模式核心优势
- 无强依赖:不依赖复杂的分布式事务中间件(如Seata、XA),基于关系型数据库和消息队列即可实现,架构简单易落地;
- 最终一致性保障:通过本地事务 + 消息重试 + 结果反馈的闭环,确保跨服务数据最终一致;
- 高可用:即使消息队列短暂宕机、网络分区,定时任务兜底机制也能保证消息最终被处理;
- 解耦性强:订单服务与库存服务通过消息异步通信,无需直接耦合调用,提升系统扩展性。
二、相关表
1. 订单表
CREATETABLE`t_order`(`id`bigintNOTNULLAUTO_INCREMENT,`order_no`varchar(64)NOTNULLCOMMENT'订单编号',`user_id`bigintNOTNULL,`product_id`bigintNOTNULL,`quantity`intNOTNULL,`amount`decimal(10,2)NOTNULL,`status`tinyintNOTNULLDEFAULT0COMMENT'0待确认 1已完成 2已取消',`create_time`datetimeDEFAULTCURRENT_TIMESTAMP,`update_time`datetimeDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,PRIMARYKEY(`id`),UNIQUEKEY`uk_order_no`(`order_no`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='订单表';2. 本地消息表
CREATETABLE`local_message`(`id`bigintNOTNULLAUTO_INCREMENT,`message_id`varchar(64)NOTNULLCOMMENT'唯一消息ID',`content`textNOTNULLCOMMENT'消息内容JSON',`topic`varchar(64)NOTNULLCOMMENT'RocketMQ主题',`status`tinyintNOTNULLDEFAULT0COMMENT'0待发送 1已发送 2发送失败 3处理成功 4处理失败',`retry_count`intDEFAULT0,`next_retry_time`datetimeNOTNULL,`create_time`datetimeDEFAULTCURRENT_TIMESTAMP,`update_time`datetimeDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,PRIMARYKEY(`id`),UNIQUEKEY`uk_message_id`(`message_id`),KEY`idx_status_retry`(`status`,`next_retry_time`))ENGINE=InnoDBDEFAULTCHARSET=utf8mb4COMMENT='本地消息表';三、代码实现
1. 订单服务:创建订单 + 保存消息(事务保证原子性)
@Service@Transactional(rollbackFor=Exception.class)publicclassOrderServiceImplimplementsOrderService{@AutowiredprivateOrderMapperorderMapper;@AutowiredprivateLocalMessageMapperlocalMessageMapper;@AutowiredprivateRocketMQTemplaterocketMQTemplate;privatestaticfinalStringTOPIC="inventory-deduct-topic";@OverridepublicvoidcreateOrder(OrderCreateDTOdto){// 1. 创建订单Orderorder=buildOrder(dto);orderMapper.insert(order);// 2. 构造库存扣减消息InventoryMessagemessage=newInventoryMessage();message.setOrderId(order.getId());message.setOrderNo(order.getOrderNo());message.setProductId(dto.getProductId());message.setQuantity(dto.getQuantity());// 3. 插入本地消息(事务内)LocalMessagelocalMessage=buildLocalMessage(message);localMessageMapper.insert(localMessage);// 4. 事务提交后异步发送消息StringmessageId=localMessage.getMessageId();TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronization(){@OverridepublicvoidafterCommit(){sendMessageAfterCommit(messageId,message);}});}}2. 发送消息到 RocketMQ
@AsyncpublicvoidsendMessageAfterCommit(StringmessageId,InventoryMessagemessage){try{rocketMQTemplate.syncSend(TOPIC,MessageBuilder.withPayload(message).setHeader("messageId",messageId).build());// 更新消息状态为已发送localMessageMapper.updateStatus(messageId,1);}catch(Exceptione){// 发送失败,等待定时任务重试localMessageMapper.updateStatus(messageId,2);}}3. 库存服务:消费 RocketMQ 消息 + 扣库存(保证幂等)
@Service@RocketMQMessageListener(topic="inventory-deduct-topic",consumerGroup="inventory-group")publicclassInventoryConsumerimplementsRocketMQListener<InventoryMessage>{@AutowiredprivateInventoryServiceinventoryService;@OverridepublicvoidonMessage(InventoryMessagemessage){// 幂等判断:根据 messageId / orderId 避免重复扣减if(inventoryService.isProcessed(message.getMessageId())){return;}// 扣减库存inventoryService.deductStock(message.getProductId(),message.getQuantity());// 标记已处理inventoryService.markProcessed(message.getMessageId());}}4. 定时任务:兜底重试失败消息
@Component@Scheduled(cron="0 */2 * * * ?")publicclassMessageRetryTask{@AutowiredprivateLocalMessageMapperlocalMessageMapper;@AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidretrySend(){// 查询待重试消息List<LocalMessage>list=localMessageMapper.selectRetryMessages();for(LocalMessagemsg:list){try{rocketMQTemplate.syncSend(msg.getTopic(),msg.getContent());localMessageMapper.updateStatus(msg.getMessageId(),1);}catch(Exceptione){localMessageMapper.increaseRetry(msg.getMessageId());}}}}四、生产避坑
1. 幂等性问题:如何避免消息重复处理?
本地消息表模式中,消息重试、网络波动都可能导致库存服务重复收到同一条消息,必须实现幂等性,保证一条消息无论被消费多少次,结果都只生效一次:
- 方案1:基于 messageId 做幂等:库存服务处理消息前,先根据
messageId查询处理状态,已处理过的消息直接返回成功; - 方案2:基于业务主键做幂等:库存扣减时,根据
orderId判断是否已处理过该订单的扣减请求; - 方案3:使用分布式锁:处理消息时加分布式锁,保证同一消息同一时间只有一个线程在处理。
示例代码(库存服务幂等处理):
publicbooleandeductStock(LongproductId,intquantity,LongorderId){// 1. 幂等校验:查询该订单是否已扣减过库存if(inventoryDeductRecordMapper.existsByOrderId(orderId)){log.info("订单{}已扣减过库存,无需重复处理",orderId);returntrue;}// 2. 扣减库存(乐观锁实现)intupdateCount=inventoryMapper.deductStockWithLock(productId,quantity);if(updateCount==0){log.warn("商品{}库存不足,扣减失败",productId);returnfalse;}// 3. 记录扣减记录,作为幂等标识InventoryDeductRecordrecord=newInventoryDeductRecord();record.setOrderId(orderId);record.setProductId(productId);record.setQuantity(quantity);record.setCreateTime(newDate());inventoryDeductRecordMapper.insert(record);returntrue;}2. 消息丢失问题:如何保证消息不丢?
基于RocketMQ的分布式事务消息架构,想要做到零消息丢失,需要从三端同时保证:
- 订单服务端:业务数据 + 本地消息在同一个本地事务中,确保消息一定落地;即使发送失败,定时任务也会持续重试,保证消息最终发出。
- RocketMQ 服务端:使用同步刷盘 + 主从同步,消息必须落盘并同步到从节点才算发送成功,避免 Broker 宕机丢消息。
- 消费端:使用RocketMQ 集群消费模式 + 手动 ACK,只有业务真正执行成功后才返回 CONSUME_SUCCESS,消费异常时返回 RECONSUME_LATER,让消息重新进入队列重试,绝对不会丢消息。
3. 消息堆积问题:如何处理大量未处理消息?
在高并发场景下,若库存服务处理速度跟不上发送速度,会出现RocketMQ 消息堆积,可通过以下方案解决:
- 提高消费并行度:调整消费者线程数
consumeThreadMin/consumeThreadMax,提升消费速度; - 重试策略优化:使用指数退避重试,避免消息频繁重试压垮服务;
- 死信队列处理:消息重试超过 16 次(RocketMQ 默认最大重试次数)后自动进入死信队列,不再影响正常消息,由人工/补偿系统处理;
- 优化消费逻辑:将耗时长的逻辑异步化,避免阻塞消费线程。
4. 性能瓶颈优化
基于本地消息表 + RocketMQ的架构,可从以下方向做生产级性能优化:
- 本地消息表索引优化:为
status和next_retry_time建立联合索引,大幅提升定时任务扫描效率; - 消息批量消费:开启 RocketMQ 批量消费,减少IO与数据库交互次数;
- 异步发送消息:订单创建后使用异步线程发送,不阻塞主线程,提升接口吞吐量;
- 消息发送超时控制:设置合理的
sendMsgTimeout,避免发送阻塞导致业务线程等待。
五、模式对比与适用场景
1. 本地消息表 vs 其他分布式事务方案
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 本地消息表 | 实现简单、无强依赖、高可用 | 依赖定时任务兜底、存在最终一致性延迟 | 对实时性要求不高、对一致性要求较高的业务 |
| Seata AT模式 | 无侵入、性能好 | 依赖Seata Server、存在全局锁性能瓶颈 | 中低并发、对一致性要求高的业务 |
| TCC模式 | 性能高、一致性强 | 侵入性强、开发成本高 | 高并发、核心交易业务 |
| XA模式 | 强一致性 | 性能差、阻塞时间长 | 低并发、强一致性要求的场景 |
2. 适用场景
本地消息表模式特别适合以下场景:
- 电商下单、订单支付、库存扣减等异步化业务场景;
- 对实时性要求不高(秒级延迟可接受),但对数据一致性要求较高的业务;
- 不想引入复杂分布式事务中间件,希望基于现有技术栈实现一致性的场景。
六、总结与扩展
本文基于本地消息表模式,完整实现了分布式事务解决方案,核心思路可以总结为:
- 本地事务原子性:订单创建和消息记录在同一事务中,确保“订单存在则消息必存在”;
- 消息异步通信:通过消息队列实现跨服务解耦,库存服务异步处理扣减操作;
- 重试与结果反馈:定时任务兜底重试 + 库存处理结果反馈,确保消息最终被处理;
- 幂等性保障:通过消息ID或业务主键实现幂等,避免重复处理导致的数据问题。
| ← 上一篇 别再愁Java项目没亮点!普通 CRUD 项目其实也能征服面试官!!! | 记得点赞、关注、收藏哦! | 下一篇 JUC小册——公平锁和非公平锁 → |