news 2026/4/23 13:53:24

基于Netty的设备接入网关系统实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于Netty的设备接入网关系统实战

基于Netty的设备接入网关系统实战

作者:系统管理员

摘要

基于Netty的设备接入网关系统实战


基于Netty的设备接入网关系统实战

本文将完整实现一套企业级设备接入网关,涵盖Netty长连接核心特性(主从Reactor、粘包解包、心跳保活、设备认证)、网关集群、Redis/MySQL集群适配,提供可直接落地的Java代码及全流程说明。

一、整体架构设计

核心组件

  1. Netty网关层

    :主从Reactor模型,处理设备TCP长连接、粘包解包、心跳、认证

  2. 集群层

    :Netty网关集群+Redis集群(主从+哨兵)+MySQL主从

  3. 转发层

    :基于设备ID的请求路由转发

  4. 存储层

    :Redis缓存设备状态,MySQL持久化设备信息

技术栈

  • 核心:Netty 4.1.x

  • 存储:Redis 6.x(主从+哨兵)、MySQL 8.x(主从)

  • 序列化:Protobuf(高效二进制,适合物联网场景)

  • 工具:Lombok、HikariCP、Redisson

二、核心代码实现

1. 基础依赖(Maven)

<dependencies> <!-- Netty核心 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.94.Final</version> </dependency> <!-- Protobuf --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.24.4</version> </dependency> <!-- Redis --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.3</version> </dependency> <!-- MySQL --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <version>5.0.1</version> </dependency> <!-- 工具 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>

2. 协议定义(Protobuf)

定义设备通信协议(解决粘包/解包、数据结构化问题)

syntax = "proto3"; package com.device.gateway.protocol; option java_outer_classname = "DeviceProtocol"; // 设备消息体 message DeviceMessage { string deviceId = 1; // 设备ID int32 messageType = 2; // 消息类型:1-认证 2-心跳 3-业务数据 4-响应 bytes data = 3; // 业务数据 int64 timestamp = 4; // 时间戳 string authCode = 5; // 认证码(仅认证消息使用) }

编译后生成DeviceProtocol类,用于Netty编解码。

3. Netty粘包/解包处理器

基于长度字段的拆包器(解决TCP粘包/半包问题)

package com.device.gateway.netty.codec; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** * 粘包解包器:基于长度字段 * 协议格式:长度字段(4字节) + protobuf数据 */ public class DeviceFrameDecoder extends LengthFieldBasedFrameDecoder { // 最大帧长度:1024*1024=1MB private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 长度字段偏移量:0 private static final int LENGTH_FIELD_OFFSET = 0; // 长度字段长度:4字节(int) private static final int LENGTH_FIELD_LENGTH = 4; // 长度调整值:0 private static final int LENGTH_ADJUSTMENT = 0; // 跳过初始字节数:4(跳过长度字段) private static final int INITIAL_BYTES_TO_STRIP = 4; public DeviceFrameDecoder() { super(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 校验最小长度(长度字段+至少1字节数据) if (in.readableBytes() < LENGTH_FIELD_LENGTH + 1) { return null; } return super.decode(ctx, in); } } /** * Protobuf编码器:添加长度字段 */ public class DeviceFrameEncoder extends io.netty.handler.codec.MessageToByteEncoder<DeviceProtocol.DeviceMessage> { @Override protected void encode(ChannelHandlerContext ctx, DeviceProtocol.DeviceMessage msg, ByteBuf out) throws Exception { // 序列化Protobuf byte[] data = msg.toByteArray(); // 写入长度字段(4字节) out.writeInt(data.length); // 写入protobuf数据 out.writeBytes(data); } } /** * Protobuf编解码器 */ public class DeviceProtobufCodec { public static DeviceFrameDecoder decoder() { return new DeviceFrameDecoder(); } public static DeviceFrameEncoder encoder() { return new DeviceFrameEncoder(); } }

4. 心跳检测处理器

基于Netty的IdleStateHandler实现心跳保活

package com.device.gateway.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * 心跳处理器: * - 读空闲(设备30秒未发数据):发送心跳请求 * - 读空闲60秒:断开连接 */ @Slf4j public class HeartbeatHandler extends ChannelInboundHandlerAdapter { // 读空闲超时时间(秒) public static final int READER_IDLE_TIME = 30; // 写空闲超时时间(秒) public static final int WRITER_IDLE_TIME = 0; // 全空闲超时时间(秒) public static final int ALL_IDLE_TIME = 0; // 心跳失败次数 private int heartbeatFailCount = 0; // 最大心跳失败次数 private static final int MAX_HEARTBEAT_FAIL_COUNT = 2; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { heartbeatFailCount++; log.warn("设备[{}]读空闲,心跳失败次数:{}", ctx.channel().id(), heartbeatFailCount); // 超过最大失败次数,断开连接 if (heartbeatFailCount >= MAX_HEARTBEAT_FAIL_COUNT) { log.error("设备[{}]心跳超时,断开连接", ctx.channel().id()); ctx.close(); return; } // 发送心跳请求 DeviceProtocol.DeviceMessage heartbeatMsg = DeviceProtocol.DeviceMessage.newBuilder() .setMessageType(2) // 2-心跳 .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(heartbeatMsg); } } else { super.userEventTriggered(ctx, evt); } } // 收到心跳响应,重置失败次数 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; if (message.getMessageType() == 2) { heartbeatFailCount = 0; log.debug("收到设备[{}]心跳响应", message.getDeviceId()); } } super.channelRead(ctx, msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("心跳处理器异常", cause); ctx.close(); } }

5. 设备认证处理器

package com.device.gateway.netty.auth; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备认证处理器:首次连接必须认证,否则断开 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceAuthHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; // 检查是否已认证 Boolean isAuth = deviceRedisService.getDeviceAuthState(message.getDeviceId()); if (isAuth != null && isAuth) { // 已认证,透传消息 ctx.fireChannelRead(msg); return; } // 未认证,检查是否是认证消息 if (message.getMessageType() != 1) { log.warn("设备[{}]未认证,拒绝非认证消息", message.getDeviceId()); ctx.close(); return; } // 执行认证 boolean authSuccess = authenticate(message); if (authSuccess) { // 认证成功,缓存认证状态 deviceRedisService.setDeviceAuthState(message.getDeviceId(), true, 3600 * 24); // 绑定设备ID到Channel ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).set(message.getDeviceId()); // 记录连接信息 deviceRedisService.setDeviceChannel(message.getDeviceId(), ctx.channel().id().asLongText()); log.info("设备[{}]认证成功", message.getDeviceId()); // 发送认证成功响应 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(message.getDeviceId()) .setMessageType(4) // 4-响应 .setData("auth_success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); // 透传消息 ctx.fireChannelRead(msg); } else { log.warn("设备[{}]认证失败,断开连接", message.getDeviceId()); ctx.close(); } } else { ctx.fireChannelRead(msg); } } /** * 设备认证逻辑:校验设备ID和认证码 */ private boolean authenticate(DeviceProtocol.DeviceMessage message) { // 从MySQL/Redis获取设备认证信息 String storedAuthCode = deviceRedisService.getDeviceAuthCode(message.getDeviceId()); if (storedAuthCode == null) { // 从MySQL主库查询 storedAuthCode = deviceRedisService.loadDeviceAuthCodeFromDb(message.getDeviceId()); if (storedAuthCode == null) { return false; } // 缓存到Redis deviceRedisService.setDeviceAuthCode(message.getDeviceId(), storedAuthCode, 3600); } // 校验认证码 return storedAuthCode.equals(message.getAuthCode()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("设备认证异常", cause); ctx.close(); } }

6. 主从Reactor模型的Netty服务端

package com.device.gateway.netty.server; import com.device.gateway.netty.auth.DeviceAuthHandler; import com.device.gateway.netty.codec.DeviceProtobufCodec; import com.device.gateway.netty.heartbeat.HeartbeatHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; /** * Netty服务端:主从Reactor模型 * - BossGroup:处理连接请求(主Reactor) * - WorkerGroup:处理IO读写(从Reactor) */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyDeviceServer { // 监听端口 @Value("${netty.server.port:8888}") private int port; // Boss线程数:CPU核心数 private static final int BOSS_THREADS = Runtime.getRuntime().availableProcessors(); // Worker线程数:CPU核心数*2 private static final int WORKER_THREADS = Runtime.getRuntime().availableProcessors() * 2; // BossGroup:主Reactor private EventLoopGroup bossGroup; // WorkerGroup:从Reactor private EventLoopGroup workerGroup; private final DeviceAuthHandler deviceAuthHandler; private final DeviceMessageHandler deviceMessageHandler; /** * 启动Netty服务 */ @PostConstruct public void start() { bossGroup = new NioEventLoopGroup(BOSS_THREADS); workerGroup = new NioEventLoopGroup(WORKER_THREADS); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) // NIO服务端通道 .channel(NioServerSocketChannel.class) // 连接队列大小 .option(ChannelOption.SO_BACKLOG, 1024) // 开启TCP保活 .childOption(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法(低延迟) .childOption(ChannelOption.TCP_NODELAY, true) // 通道初始化 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 1. 心跳检测:读空闲30秒 pipeline.addLast(new IdleStateHandler( HeartbeatHandler.READER_IDLE_TIME, HeartbeatHandler.WRITER_IDLE_TIME, HeartbeatHandler.ALL_IDLE_TIME, TimeUnit.SECONDS )); // 2. 心跳处理器 pipeline.addLast(new HeartbeatHandler()); // 3. 粘包解包器 pipeline.addLast(DeviceProtobufCodec.decoder()); pipeline.addLast(DeviceProtobufCodec.encoder()); // 4. 设备认证处理器 pipeline.addLast(deviceAuthHandler); // 5. 业务消息处理器 pipeline.addLast(deviceMessageHandler); } }); // 绑定端口,同步等待成功 ChannelFuture future = bootstrap.bind(port).sync(); log.info("Netty设备网关启动成功,监听端口:{}", port); // 等待服务端关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("Netty服务启动异常", e); Thread.currentThread().interrupt(); } } /** * 停止Netty服务 */ @PreDestroy public void stop() { log.info("开始停止Netty设备网关"); if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("Netty设备网关停止完成"); } }

7. 设备消息转发处理器

package com.device.gateway.netty.handler; import com.device.gateway.redis.DeviceRedisService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 设备消息转发处理器:根据设备ID路由消息 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceRedisService deviceRedisService; private final DeviceMessageRouter deviceMessageRouter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof DeviceProtocol.DeviceMessage) { DeviceProtocol.DeviceMessage message = (DeviceProtocol.DeviceMessage) msg; String deviceId = message.getDeviceId(); log.debug("收到设备[{}]消息,类型:{}", deviceId, message.getMessageType()); // 1. 更新设备最后活跃时间 deviceRedisService.updateDeviceLastActiveTime(deviceId); // 2. 消息转发(根据业务类型路由) deviceMessageRouter.route(message); // 3. 响应处理(如果需要) if (message.getMessageType() == 3) { // 业务数据 DeviceProtocol.DeviceMessage response = DeviceProtocol.DeviceMessage.newBuilder() .setDeviceId(deviceId) .setMessageType(4) .setData("success".getBytes()) .setTimestamp(System.currentTimeMillis()) .build(); ctx.writeAndFlush(response); } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 设备断开连接,清理缓存 String deviceId = ctx.channel().attr(AttributeKey.valueOf("DEVICE_ID")).get(); if (deviceId != null) { log.info("设备[{}]断开连接", deviceId); deviceRedisService.clearDeviceState(deviceId); } super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("消息处理器异常", cause); ctx.close(); } } /** * 消息路由器:集群内转发消息 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceMessageRouter { private final DeviceRedisService deviceRedisService; private final NettyClusterService nettyClusterService; /** * 路由消息到目标设备 */ public void route(DeviceProtocol.DeviceMessage message) { String deviceId = message.getDeviceId(); // 1. 获取设备所在的网关节点 String gatewayNode = deviceRedisService.getDeviceGatewayNode(deviceId); if (gatewayNode == null) { log.warn("设备[{}]未在线", deviceId); return; } // 2. 判断是否是当前节点 String currentNode = nettyClusterService.getCurrentNodeId(); if (currentNode.equals(gatewayNode)) { // 当前节点,直接处理 processLocalMessage(message); } else { // 其他节点,集群转发 nettyClusterService.forwardMessage(gatewayNode, message); } } /** * 处理本地消息 */ private void processLocalMessage(DeviceProtocol.DeviceMessage message) { // 业务处理逻辑:如保存数据、调用业务接口等 log.debug("处理本地设备[{}]消息", message.getDeviceId()); } }

8. Redis集群配置(主从+哨兵)

package com.device.gateway.redis; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Redis集群配置:主从+哨兵 */ @Configuration public class RedisConfig { @Value("${redis.sentinel.master-name:mymaster}") private String masterName; @Value("${redis.sentinel.nodes:127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381}") private String sentinelNodes; @Value("${redis.password:}") private String password; @Bean public RedissonClient redissonClient() { Config config = new Config(); // 哨兵模式配置 config.useSentinelServers() .setMasterName(masterName) .addSentinelAddress(parseSentinelNodes()) .setPassword(password.isEmpty() ? null : password) .setDatabase(0) .setConnectTimeout(3000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1000); return Redisson.create(config); } /** * 解析哨兵节点为Redis地址格式 */ private String[] parseSentinelNodes() { String[] nodes = sentinelNodes.split(","); for (int i = 0; i < nodes.length; i++) { nodes[i] = "redis://" + nodes[i].trim(); } return nodes; } } /** * 设备Redis服务 */ @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class DeviceRedisService { private final RedissonClient redissonClient; private final DeviceMapper deviceMapper; // 设备认证状态Key:DEVICE_AUTH:{deviceId} private static final String KEY_DEVICE_AUTH = "DEVICE_AUTH:%s"; // 设备认证码Key:DEVICE_AUTH_CODE:{deviceId} private static final String KEY_DEVICE_AUTH_CODE = "DEVICE_AUTH_CODE:%s"; // 设备通道Key:DEVICE_CHANNEL:{deviceId} private static final String KEY_DEVICE_CHANNEL = "DEVICE_CHANNEL:%s"; // 设备网关节点Key:DEVICE_GATEWAY:{deviceId} private static final String KEY_DEVICE_GATEWAY = "DEVICE_GATEWAY:%s"; // 设备最后活跃时间Key:DEVICE_LAST_ACTIVE:{deviceId} private static final String KEY_DEVICE_LAST_ACTIVE = "DEVICE_LAST_ACTIVE:%s"; /** * 设置设备认证状态 */ public void setDeviceAuthState(String deviceId, boolean state, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH, deviceId); redissonClient.getBucket(key).set(state, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证状态 */ public Boolean getDeviceAuthState(String deviceId) { String key = String.format(KEY_DEVICE_AUTH, deviceId); return redissonClient.getBucket(key).get(); } /** * 设置设备认证码缓存 */ public void setDeviceAuthCode(String deviceId, String authCode, long expireSeconds) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); redissonClient.getBucket(key).set(authCode, expireSeconds, TimeUnit.SECONDS); } /** * 获取设备认证码缓存 */ public String getDeviceAuthCode(String deviceId) { String key = String.format(KEY_DEVICE_AUTH_CODE, deviceId); return redissonClient.getBucket(key).get(); } /** * 从MySQL加载设备认证码 */ public String loadDeviceAuthCodeFromDb(String deviceId) { // 从MySQL主库查询 return deviceMapper.getDeviceAuthCode(deviceId); } /** * 设置设备通道信息 */ public void setDeviceChannel(String deviceId, String channelId) { String key = String.format(KEY_DEVICE_CHANNEL, deviceId); redissonClient.getBucket(key).set(channelId); } /** * 设置设备网关节点 */ public void setDeviceGatewayNode(String deviceId, String nodeId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); redissonClient.getBucket(key).set(nodeId); } /** * 获取设备网关节点 */ public String getDeviceGatewayNode(String deviceId) { String key = String.format(KEY_DEVICE_GATEWAY, deviceId); return redissonClient.getBucket(key).get(); } /** * 更新设备最后活跃时间 */ public void updateDeviceLastActiveTime(String deviceId) { String key = String.format(KEY_DEVICE_LAST_ACTIVE, deviceId); redissonClient.getBucket(key).set(System.currentTimeMillis()); } /** * 清理设备状态 */ public void clearDeviceState(String deviceId) { redissonClient.getBucket(String.format(KEY_DEVICE_AUTH, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_CHANNEL, deviceId)).delete(); redissonClient.getBucket(String.format(KEY_DEVICE_GATEWAY, deviceId)).delete(); } }

9. MySQL主从配置

# application.yml spring: datasource: # 主库配置 master: jdbc-url: jdbc:mysql://master-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver # 从库配置 slave: jdbc-url: jdbc:mysql://slave-mysql:3306/device_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver type: com.zaxxer.hikari.HikariDataSource hikari: maximum-pool-size: 10 minimum-idle: 5 idle-timeout: 300000 connection-timeout: 20000
package com.device.gateway.db; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * MySQL主从数据源配置 */ @Configuration public class DataSourceConfig { /** * 主库数据源 */ @Bean(name = "masterDataSource") @ConfigurationProperties("spring.datasource.master") public DataSource masterDataSource() { return DataSourceBuilder.create().build(); } /** * 从库数据源 */ @Bean(name = "slaveDataSource") @ConfigurationProperties("spring.datasource.slave") public DataSource slaveDataSource() { return DataSourceBuilder.create().build(); } /** * 动态数据源路由 */ @Bean(name = "dynamicDataSource") public AbstractRoutingDataSource dynamicDataSource( @Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("slaveDataSource") DataSource slaveDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put("master", masterDataSource); targetDataSources.put("slave", slaveDataSource); DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource(); routingDataSource.setTargetDataSources(targetDataSources); routingDataSource.setDefaultTargetDataSource(masterDataSource); return routingDataSource; } /** * 懒加载数据源(优化性能) */ @Bean @Primary public DataSource dataSource(@Qualifier("dynamicDataSource") AbstractRoutingDataSource dynamicDataSource) { return new LazyConnectionDataSourceProxy(dynamicDataSource); } } /** * 动态数据源路由 */ public class DynamicRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } } /** * 数据源上下文 Holder */ public class DataSourceContextHolder { private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>(); public static void setDataSourceType(String type) { CONTEXT_HOLDER.set(type); } public static String getDataSourceType() { return CONTEXT_HOLDER.get() == null ? "master" : CONTEXT_HOLDER.get(); } public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } } /** * 数据源注解:标记读从库 */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ReadOnly { } /** * 数据源切面:自动切换主从库 */ @Aspect @Component @Order(1) public class DataSourceAspect { @Around("@annotation(readOnly)") public Object around(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable { try { DataSourceContextHolder.setDataSourceType("slave"); return joinPoint.proceed(); } finally { DataSourceContextHolder.clearDataSourceType(); } } }

10. Netty网关集群服务

package com.device.gateway.cluster; import com.device.gateway.redis.DeviceRedisService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.UUID; /** * Netty网关集群服务 */ @Slf4j @Component @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class NettyClusterService { @Value("${netty.cluster.node-id:}") private String nodeId; @Value("${netty.server.port:8888}") private int port; private final DeviceRedisService deviceRedisService; @PostConstruct public void init() { // 生成节点ID:IP+端口 或 配置的节点ID if (nodeId == null || nodeId.isEmpty()) { try { String ip = InetAddress.getLocalHost().getHostAddress(); nodeId = ip + ":" + port; } catch (UnknownHostException e) { nodeId = UUID.randomUUID().toString().substring(0, 8); } } log.info("Netty网关集群节点ID:{}", nodeId); } /** * 获取当前节点ID */ public String getCurrentNodeId() { return nodeId; } /** * 转发消息到指定节点 * 实现方式: * 1. Redis Pub/Sub * 2. HTTP接口 * 3. Netty集群通信 */ public void forwardMessage(String targetNode, DeviceProtocol.DeviceMessage message) { // 示例:使用Redis Pub/Sub转发 String channel = "DEVICE_MESSAGE:" + targetNode; deviceRedisService.getRedissonClient().getTopic(channel).publish(message.toByteArray()); log.debug("转发消息到节点[{}],设备[{}]", targetNode, message.getDeviceId()); } }

三、核心功能说明

1. 主从Reactor模型

  • BossGroup(主Reactor)

    :处理TCP连接请求,默认CPU核心数线程

  • WorkerGroup(从Reactor)

    :处理IO读写,默认CPU核心数*2线程

  • 优势:高并发、高吞吐量,适合大量设备长连接场景

2. 粘包/解包处理

  • 采用LengthFieldBasedFrameDecoder,基于长度字段的拆包方式

  • 协议格式:4字节长度 + Protobuf数据

  • 解决TCP粘包/半包问题,保证数据完整性

3. 心跳保活

  • 基于IdleStateHandler检测读空闲(设备30秒未发数据)

  • 心跳失败2次(总计60秒)断开连接

  • 收到心跳响应重置失败计数,保证长连接稳定性

4. 设备认证

  • 首次连接必须发送认证消息(设备ID+认证码)

  • 认证信息缓存到Redis,避免频繁查询DB

  • 未认证设备拒绝所有非认证消息,保障安全性

5. 设备转发

  • 基于设备ID路由消息,通过Redis记录设备所在网关节点

  • 同节点直接处理,跨节点通过Redis Pub/Sub转发

  • 支持网关集群部署,负载均衡

6. Redis集群(主从+哨兵)

  • 哨兵模式实现Redis高可用,自动故障转移

  • 缓存设备状态、认证信息、连接信息,提升性能

  • 过期策略保证缓存有效性

7. MySQL集群(主从)

  • 主库写入,从库读取,读写分离提升性能

  • 注解+切面自动切换数据源,对业务透明

  • 设备基础信息持久化存储

四、部署与扩展

1. 单机部署

  • 启动Redis哨兵集群(1主2从3哨兵)

  • 启动MySQL主从复制

  • 启动Netty网关服务

2. 集群部署

  • 多台服务器部署Netty网关,使用相同Redis集群

  • 设备连接任意网关节点,通过Redis路由消息

  • 网关节点无状态,可水平扩展

3. 监控与运维

  • 监控Redis集群状态(主从切换、哨兵状态)

  • 监控MySQL主从同步延迟

  • 监控Netty网关连接数、消息吞吐量、心跳成功率

  • 设备离线告警、认证失败告警

五、总结

本方案完整实现了基于Netty的设备接入网关核心功能,解决了长连接管理、粘包处理、心跳保活、设备认证、集群转发等关键问题,并适配了Redis/MySQL集群保证高可用。代码结构清晰,可根据实际业务需求扩展:

  • 协议扩展:支持JSON、自定义二进制协议

  • 加密扩展:添加SSL/TLS加密通信

  • 负载均衡:接入Nginx/TCP负载均衡器

  • 监控扩展:集成Prometheus/Grafana监控指标


原文链接: https://1024bat.cn/article/33

来源: 淘书1024bat

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 8:56:30

新手教程:掌握工业控制相关模拟电路基础知识总结要点

从零开始搞懂工业控制中的模拟电路&#xff1a;一位工程师的实战笔记你有没有遇到过这样的场景&#xff1f;现场的温度传感器读数莫名其妙跳动&#xff0c;PLC输入点频繁误触发&#xff0c;或者刚调好的4-20mA信号一上电就漂移得离谱……很多新手第一反应是“软件问题”、“通信…

作者头像 李华
网站建设 2026/4/23 8:58:57

Qwen3-VL-WEBUI地标检测实战:地理图像理解部署案例

Qwen3-VL-WEBUI地标检测实战&#xff1a;地理图像理解部署案例 1. 引言&#xff1a;为何选择Qwen3-VL-WEBUI进行地标检测&#xff1f; 随着多模态大模型的快速发展&#xff0c;视觉-语言理解能力已成为AI应用落地的关键环节。在旅游、导航、城市规划和文化遗产保护等场景中&a…

作者头像 李华
网站建设 2026/4/23 8:55:12

Axure RP中文界面终极配置指南:快速实现完全汉化

Axure RP中文界面终极配置指南&#xff1a;快速实现完全汉化 【免费下载链接】axure-cn Chinese language file for Axure RP. Axure RP 简体中文语言包&#xff0c;不定期更新。支持 Axure 9、Axure 10。 项目地址: https://gitcode.com/gh_mirrors/ax/axure-cn 还在为…

作者头像 李华
网站建设 2026/4/23 8:57:55

批量图像处理新革命:BIMP插件的终极效率指南

批量图像处理新革命&#xff1a;BIMP插件的终极效率指南 【免费下载链接】gimp-plugin-bimp 项目地址: https://gitcode.com/gh_mirrors/gi/gimp-plugin-bimp 还在为海量图片处理而头痛吗&#xff1f;无论是摄影师需要批量优化RAW文件&#xff0c;设计师需要统一素材规…

作者头像 李华
网站建设 2026/4/23 9:48:03

胡桃工具箱使用指南:让原神游戏体验更智能高效

胡桃工具箱使用指南&#xff1a;让原神游戏体验更智能高效 【免费下载链接】Snap.Hutao 实用的开源多功能原神工具箱 &#x1f9f0; / Multifunctional Open-Source Genshin Impact Toolkit &#x1f9f0; 项目地址: https://gitcode.com/GitHub_Trending/sn/Snap.Hutao …

作者头像 李华
网站建设 2026/4/8 3:13:38

Qwen3-VL-WEBUI部署指南:Linux与Windows兼容性说明

Qwen3-VL-WEBUI部署指南&#xff1a;Linux与Windows兼容性说明 1. 简介与背景 随着多模态大模型的快速发展&#xff0c;阿里云推出的 Qwen3-VL 系列成为当前视觉-语言任务中的领先方案之一。作为 Qwen 系列中功能最强大的视觉语言模型&#xff0c;Qwen3-VL 在文本生成、图像理…

作者头像 李华