news 2026/5/9 10:32:06

Go语言消息队列事务:Exactly-Once与At-Least-Once语义

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Go语言消息队列事务:Exactly-Once与At-Least-Once语义

Go语言消息队列事务:Exactly-Once与At-Least-Once语义

1. 消息语义

消息队列有三种传递语义:At-Most-Once(最多一次)、At-Least-Once(至少一次)和Exactly-Once(恰好一次)。

type DeliverySemantics int const ( AtMostOnce DeliverySemantics = 1 AtLeastOnce DeliverySemantics = 2 ExactlyOnce DeliverySemantics = 3 )

2. At-Least-Once实现

At-Least-Once需要消费者进行消息确认。

type AtLeastOnceConsumer struct { consumer *RabbitMQConsumer processed map[string]bool mu sync.RWMutex } func NewAtLeastOnceConsumer(consumer *RabbitMQConsumer) *AtLeastOnceConsumer { return &AtLeastOnceConsumer{ consumer: consumer, processed: make(map[string]bool), } } func (c *AtLeastOnceConsumer) Process(ctx context.Context, handler func([]byte) error) error { return c.consumer.ConsumeWithContext(ctx, func(msg []byte) error { msgID := generateMsgID(msg) c.mu.RLock() if c.processed[msgID] { c.mu.RUnlock() return nil } c.mu.RUnlock() if err := handler(msg); err != nil { return err } c.mu.Lock() c.processed[msgID] = true c.mu.Unlock() return nil }) } func generateMsgID(msg []byte) string { h := fnv.New32a() h.Write(msg) return fmt.Sprintf("%d-%d", time.Now().UnixNano(), h.Sum32()) }

3. Exactly-Once实现

Exactly-Once需要生产者和消费者配合,通过事务和幂等性保证。

type ExactlyOnceProducer struct { producer *RabbitMQProducer txn *amqp.Tx } func (p *ExactlyOnceProducer) PublishWithTransaction(ctx context.Context, routingKey string, body []byte) error { if p.txn == nil { tx, err := p.producer.conn.Channel().Tx() if err != nil { return err } p.txn = tx } err := p.producer.conn.Channel().PublishWithContext( ctx, p.exchange, routingKey, false, false, amqp.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp.Persistent, }, ) if err != nil { p.txn.Rollback() return err } return p.txn.Commit() } type IdempotentHandler struct { storage map[string]bool mu sync.RWMutex } func NewIdempotentHandler() *IdempotentHandler { return &IdempotentHandler{ storage: make(map[string]bool), } } func (h *IdempotentHandler) Handle(msgID string, handler func() error) error { h.mu.RLock() if h.storage[msgID] { h.mu.RUnlock() return nil } h.mu.RUnlock() if err := handler(); err != nil { return err } h.mu.Lock() h.storage[msgID] = true h.mu.Unlock() return nil }

4. 总结

本文介绍了At-Least-Once和Exactly-Once消息语义的实现方法,开发者应根据业务需求选择合适的传递语义。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/9 10:32:05

DownKyi:3步掌握B站视频批量下载与专业处理的完整方案

DownKyi:3步掌握B站视频批量下载与专业处理的完整方案 【免费下载链接】downkyi 哔哩下载姬downkyi,哔哩哔哩网站视频下载工具,支持批量下载,支持8K、HDR、杜比视界,提供工具箱(音视频提取、去水印等&#…

作者头像 李华
网站建设 2026/5/9 10:31:05

云原生应用开发最佳实践:构建现代化云原生应用

云原生应用开发最佳实践:构建现代化云原生应用 一、引言 在云原生时代,传统的单体应用架构已经无法满足快速迭代、弹性伸缩和高可用的需求。云原生应用开发强调容器化、微服务、持续交付和自动化运维,是构建现代化应用的关键方法论。 二、云原…

作者头像 李华
网站建设 2026/5/9 10:30:33

无需复杂配置 OpenClaw 2.7.1 版本 Windows 一键部署方法

前言 本地 AI 智能体技术不断成熟,私有化部署、数据安全可控、低门槛快速落地,已经成为用户选择方案的重要标准。开源轻量化 AI 智能体OpenClaw 2.7.1 版本完成全面升级,在环境适配、服务稳定性与模型集成度上均有明显提升,支持 …

作者头像 李华
网站建设 2026/5/9 10:28:41

golang如何实现消息防重复发送_golang消息防重复发送实现教程

Kafka幂等生产者是最省心的防重方案,需开启idempotence并满足单生产者单分区条件;语义去重则需应用层指纹Redis原子校验。用 Kafka 幂等生产者是最省心的防重发送方案Kafka 0.11.0 原生支持幂等性,只要开启配置,就能从协议层杜绝「…

作者头像 李华
网站建设 2026/5/9 10:26:10

LLM4RS开源项目:用ChatGPT做推荐系统排序任务的评估框架与实践指南

1. 项目概述:当大语言模型遇上推荐系统最近几年,大语言模型(LLM)的能力边界一直是业界探索的热点。从写诗作画到代码生成,大家似乎都在好奇:它还能做什么?作为一个长期混迹在推荐系统领域的老兵…

作者头像 李华
网站建设 2026/5/9 10:26:09

Godot 4 Importality插件:实现Blender文件直接导入,革新3D资产工作流

1. 项目概述与核心价值最近在Godot社区里,一个名为nklbdev/godot-4-importality的项目引起了我的注意。乍一看这个标题,你可能和我最初一样有点摸不着头脑——“Importality”是什么?但当你点开仓库,看到它的描述“A Godot 4 plug…

作者头像 李华