简简单单 Online zuozuo :本心、输入输出、结果
文章目录
- TBMQ 如何用 Redis 实现持久化消息存储
- 前言
- 1、为什么选择 Redis?
- 2、迁移到 Redis
- 3、Redis 集群约束与 Lua 脚本原子操作
- 4、选择合适的 Redis 数据结构
- 5、动态管理有序集合大小
- 6、消息检索与清理
- 7、从 Jedis 迁移到 Lettuce
- 8、总结
TBMQ 如何用 Redis 实现持久化消息存储
编辑 | 简简单单 Online zuozuo
地址 | https://blog.csdn.net/qq_15071263
如果觉得本文对你有帮助,欢迎关注、点赞、收藏、评论,谢谢
前言
TBMQ(ThingsBoard 开源 MQTT broker)主要设计用于从物联网设备聚合数据并可靠地投递到后端应用。应用会订阅来自数万甚至数十万设备的数据并消费。为应对不同场景,TBMQ 将 MQTT 客户端分为应用客户端与标准物联网设备:应用客户端始终持久化,依赖 Kafka 做会话持久化与消息投递;设备端(DEVICE)则使用另一套持久化方案。本文从技术角度介绍 TBMQ 如何用 Redis 管理 DEVICE 客户端的持久化 MQTT 会话,旨在为正在设计可扩展、会话感知系统的软件工程师提供可落地的参考。
#TBMQ #Redis #MQTT #持久化会话 #物联网 #消息存储 #Lua脚本 #ThingsBoard
1、为什么选择 Redis?
在 TBMQ 1.x 中,DEVICE 客户端的消息持久化与拉取依赖 PostgreSQL,以保证客户端重连时仍能收到未投递的消息。PostgreSQL 在初期表现尚可,但随着持久化 MQTT 会话数量增长,我们预判其架构会成为瓶颈;若想深入了解当时基于 PostgreSQL 的用法与架构取舍,可参考 TBMQ 官方博客中的相关文章。为应对这一问题,我们评估了能随负载更好扩展的替代方案,并很快选定 Redis:其水平扩展能力、原生集群支持与高性能特性更适合高吞吐、会话型工作负载。
2、迁移到 Redis
确定方向后,我们启动了迁移:先评估能在保持 PostgreSQL 方案功能的前提下、又符合 Redis Cluster 约束的数据结构。迁移过程既要保留原有语义(消息顺序、重连后可检索未投递消息等),又要利用 Redis 集群的分片与高并发能力,因此数据模型与键的设计需要重新梳理。
3、Redis 集群约束与 Lua 脚本原子操作
迁移时我们意识到:要高效实现消息持久化与顺序,需要多种 Redis 数据结构配合,而这会带来多键操作与 Redis Cluster 的槽位约束问题。Redis Cluster 要求相关键落在同一槽位,通常通过 hash tag(将键中{xxx}部分用于计算槽位)保证同一客户端的数据在同一节点,从而避免跨槽错误。但在高吞吐场景下,同一 MQTT 客户端可能并发收到大量消息,仅靠 hash tag 不够,还必须保证多步操作的原子性。Redis 单条命令是原子的,而我们则需要对每个 MQTT 客户端的多种结构在一次逻辑操作中一起更新,若顺序执行多条命令,会出现竞态与不一致。因此我们决定:凡是涉及保存消息或重连时拉取未投递消息等操作,都通过独立的 Lua 脚本执行,从而在一次脚本执行中完成所有相关读写,保证一致性。
4、选择合适的 Redis 数据结构
持久化会话的一个核心需求是:在客户端多次断线重连后,仍能按顺序投递消息。在对比多种 Redis 结构后,我们选用有序集合(sorted set)来维护消息引用顺序:有序集合按 score 天然有序,便于按升序或降序快速取出一批消息。消息体本身则用字符串存储,通过 SET 命令并设置过期时间(EX),实现 O(1) 写入与 TTL 管理;取回或删除消息体也是 O(1),且不影响有序集合结构。键的命名采用 hash tag 形式,例如{client_id}_messages,其中{client_id}为持久化 MQTT 客户端的唯一 ID 作为 hash tag,_messages为固定后缀。MQTT 报文 ID 范围为 0–65535,会回绕;我们使用单调递增的 score 表示“逻辑顺序”,这样即使 packet ID 从 65535 回到 1,score 仍继续递增(例如下一跳为 65536),从而在有序集合中始终保持正确顺序。此外,我们用一条字符串键保存“已处理的最大 MQTT packet ID”,与 PostgreSQL 方案中的用途一致:客户端重连时,服务端据此确定下一个要保存到 Redis 的消息应使用的 packet ID。
消息载荷的写入与 TTL 示例(概念上)如下:
SET key EX ttl_seconds payload按需获取或删除载荷:
GET key DEL key存储“最后处理的 packet ID”的字符串键可类比为:
SET {client_id}_last_packet_id value5、动态管理有序集合大小
在采用有序集合 + 字符串、并为每条消息设置 TTL 后,我们不再依赖按时间周期做批量清理;同时参考 PostgreSQL 时代的策略,对每个持久化客户端限制最大保留消息数,以控制和预测单客户端内存占用。为贴合 MQTT 协议本身的限制,单客户端持久化消息数上限设为 65535。在 Redis 侧我们实现了有序集合大小的动态管理:每次新消息写入后,对有序集合做裁剪,使消息总数不超过该上限,从而在保证顺序与 TTL 的前提下控制内存与行为。
6、消息检索与清理
设计不仅在新消息持久化时做动态大小管理,还在“消息检索”时做清理——即设备重连并拉取未投递消息时,在 Lua 脚本中一并完成已投递或过期消息的删除与有序集合的维护。这样,通过 Redis 有序集合与字符串配合,再加上 Lua 脚本的原子性,我们实现了高效的消息持久化、按序检索以及写入与检索时的动态清理,避免无序膨胀。
7、从 Jedis 迁移到 Lettuce
为验证基于 Redis 的持久化消息架构的可扩展性,我们选择了点对点(P2P)MQTT 通信模式做性能测试。在大规模测试前,先用 PostgreSQL 做持久化的原型测试,其吞吐约在 30k msg/s 达到上限;迁到 Redis 后,我们发现 Jedis 成为新的瓶颈。Jedis 稳定但为同步客户端,每条 Redis 命令顺序执行,在高并发下会限制 Netty 等异步 I/O 的利用率。RedisInsight 显示单节点约 66k 命令/秒,与 TBMQ 约 40k msg/s 对应(因每条消息会触发多次 Redis 操作)。为解决这一问题,我们改用基于 Netty 的异步 Redis 客户端 Lettuce。迁移后吞吐提升到约 60k msg/s,单节点命令数约 100k/s,与预期一致;Lettuce 支持多命令并行发送与处理,能更好发挥 Redis 的并发能力,最终达到了我们期望的性能提升。更详细的性能测试可参考 TBMQ 官方发布的性能测试专题文章。
8、总结
在分布式系统中,当使用传统数据库等垂直扩展组件来承载高并发、会话型工作负载时,往往会出现可扩展性瓶颈。TBMQ 从 PostgreSQL 迁到 Redis 的实践表明:将会话存储卸载到 Redis,并在迁移过程中做好数据结构选择、集群约束、原子操作与客户端选型,可以在 TBMQ 2.x 中构建出支持大量并发持久化会话、且无单点故障的持久化层。希望本文能为在设计可扩展、会话感知分布式系统的工程师提供一些可借鉴的思路。
生如逆旅,一苇以航
欢迎关注、欢迎联系交流、欢迎沟通想法、欢迎交换意见、欢迎合作咨询
感谢亲的关注、点赞、收藏、评论,一键三连支持,谢谢