news 2026/6/21 7:32:56

Spring Boot 3 + Netty 构建高并发即时通讯服务

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot 3 + Netty 构建高并发即时通讯服务

ChannelNetty中代表一个网络连接,它的生命周期包括以下几个主要状态:

  • 注册(Registered)

    :Channel 被注册到 EventLoop 上

  • 激活(Active)

    :Channel 连接建立并就绪

  • 非激活(Inactive)

    :Channel 连接断开

  • 注销(Unregistered)

    :Channel 从 EventLoop 中注销

这些状态变化会触发 ChannelHandler 中的相应生命周期方法,如 channelRegistered()、channelActive() 等。

Handler的生命周期

Handler是数据处理的核心组件,它们也有清晰的生命周期:

  • 添加

    :handlerAdded() 在 Handler 被添加到 ChannelPipeline 时调用

  • 移除

    :handlerRemoved() 在Handler从ChannelPipeline 移除时调用

  • 异常

    :exceptionCaught() 在处理过程中发生异常时调用

服务器启动流程
  1. 创建服务器引导类

    :使用 ServerBootstrap 配置服务器参数

  2. 设置线程模型

    :指定 Boss 线程组和 Worker 线程组

  3. 配置Channel

    :选择 NioServerSocketChannel 等实现

  4. 添加处理器

    :配置 ChannelInitializer 来设置每个新连接的处理链

  5. 绑定端口

    :调用 bind() 方法启动服务器并监听端口

我们可以把这个过程想象成组装和使用一台机器:首先准备好零件(创建组件),然后按照说明书组装(配置连接),接通电源(启动服务),机器开始工作(处理数据),最后关闭电源拆解维护(关闭资源)。整个过程有条不紊,每个组件都知道自己什么时候该做什么事情。

实时通讯技术方案选型

在构建需要实时数据交互的应用时,有三种主流技术方案:

Ajax轮训

原理:客户端定时向服务器发送请求,检查是否有新数据。

优势:实现简单,兼容性极佳,几乎所有浏览器都支持,服务器逻辑直观。

劣势:产生大量无效请求浪费资源,实时性受轮询间隔限制,延迟明显,高并发时可能造成服务器压力。

Long pull(长轮询)

原理:客户端发送请求后,服务器保持连接不立即响应,直到有新数据或超时才返回,客户端收到后立即发起新请求。

优势:减少无效请求,实时性较轮询有所提升,兼容性良好。

劣势:服务器需维持大量连接,高并发场景资源消耗大,仍有一定延迟。

WebSocket

原理:建立单一TCP连接后提供持久双向通信通道,双方可随时发送数据。

优势:真正的实时双向通信,延迟低,协议开销小,适合频繁数据交换,资源消耗相对较低。

劣势:实现复杂度较高,部分老旧浏览器不支持,某些网络环境可能受限。

在我们要构建的即时通讯服务中,WebSocket无疑是最佳选择,它能最好地满足我们对实时性的要求。值得一提的是,Netty提供了对WebSocket的原生支持和优化实现,这让我们能够轻松构建可扩展且高效的实时通讯系统,省去了处理底层通信细节的繁琐工作,更专注于业务逻辑的实现。

代码实现

本节将围绕前后端关键实现展开,给大家展示如何基于Netty开发即时通讯服务。

前端

本文侧重于后端服务的构建,因此前端只展示核心通信代码。以下代码实现了与Netty服务器建立WebSocket连接、消息收发及状态管理的关键功能,为后续后端实现提供了交互基础。

// 1. WebSocket连接全局配置 globalData: { // WebSocket服务器连接地址 chatServerUrl: "ws://127.0.0.1:875/ws", // 全局WebSocket连接对象 CHAT: null, // 标记WebSocket连接状态 chatSocketOpen: false, }, // 2. 应用启动时初始化WebSocket连接 onLaunch: function() { // 程序启动时连接聊天服务器 this.doConnect(false); }, // 3. 核心方法:建立WebSocket连接 doConnect(isFirst) { // 重连时显示提示 if (isFirst) { uni.showToast({ icon: "loading", title: "断线重连中...", duration: 2000 }); } var me = this; // 仅当用户已登录时才连接WebSocket if (me.getUserInfoSession() != null && me.getUserInfoSession() != "" && me.getUserInfoSession() != undefined) { // 创建WebSocket连接 me.globalData.CHAT = uni.connectSocket({ url: me.globalData.chatServerUrl, complete: ()=> {} }); // 4. 连接成功事件处理 me.globalData.CHAT.onOpen(function(){ // 更新连接状态标记 me.globalData.chatSocketOpen = true; console.log("ws连接已打开,socketOpen = " + me.globalData.chatSocketOpen); // 构建初始化消息(消息类型0表示连接初始化) var chatMsg = { senderId: me.getUserInfoSession().id, msgType: 0 } var dataContent = { chatMsg: chatMsg } var msgPending = JSON.stringify(dataContent); // 发送初始化消息,通知服务器用户身份 me.globalData.CHAT.send({ data: msgPending }); }); // 5. 连接关闭事件处理 me.globalData.CHAT.onClose(function(){ me.globalData.chatSocketOpen = false; console.log("ws连接已关闭,socketOpen = " + me.globalData.chatSocketOpen); }); // 6. 接收消息事件处理 me.globalData.CHAT.onMessage(function(res){ console.log('App.vue 收到服务器内容:' + res.data); // 处理接收到的消息 me.dealReceiveLastestMsg(JSON.parse(res.data)); }); // 7. 连接错误事件处理 me.globalData.CHAT.onError(function(){ me.globalData.chatSocketOpen = false; console.log('WebSocket连接打开失败,请检查!'); }); } }, // 8. 发送WebSocket消息的通用方法 sendSocketMessage(msg) { // 检查连接状态,只有在连接开启时才发送 if (this.globalData.chatSocketOpen) { uni.sendSocketMessage({ data: msg }); } else { uni.showToast({ icon: "none", title: "您已断开聊天服务器的连接" }) } }, // 9. 处理接收到的消息 dealReceiveLastestMsg(msgJSON) { console.log(msgJSON); var chatMsg = msgJSON.chatMsg; var chatTime = msgJSON.chatTime; var senderId = chatMsg.senderId; var receiverType = chatMsg.receiverType; console.log('chatMsg.receiverType = ' + receiverType); var me = this; // 获取发送者的用户信息 var userId = me.getUserInfoSession().id; var userToken = me.getUserSessionToken(); var serverUrl = me.globalData.serverUrl; // 请求发送者详细信息 uni.request({ method: "POST", header: { headerUserId: userId, headerUserToken: userToken }, url: serverUrl + "/userinfo/get?userId=" + senderId, success(result) { if (result.data.status == 200) { var currentSourceUserInfo = result.data.data; me.currentSourceUserInfo = currentSourceUserInfo; // 根据消息类型设置显示内容 var msgShow = chatMsg.msg; if (chatMsg.msgType == 2) { msgShow = "[图片]" } elseif (chatMsg.msgType == 4) { msgShow = "[视频]" } elseif (chatMsg.msgType == 3) { msgShow = "[语音]" } // 保存最新消息到本地存储 me.saveLastestMsgToLocal(senderId, currentSourceUserInfo, msgShow, chatTime, msgJSON); } } }) }, // 10. 将最新消息保存到本地存储 saveLastestMsgToLocal(sourceUserId, sourceUser, msgContent, chatTime, msgJSON) { // 构造最新消息对象 var lastMsg = { sourceUserId: sourceUserId, // 源头用户,聊天对象 name: sourceUser.nickname, face: sourceUser.face, msgContent: msgContent, chatTime: chatTime, unReadCounts: 0, communicationType: 1, // 1:单聊,2:群聊 } // 获取本地存储的聊天列表 var lastestUserChatList = uni.getStorageSync("lastestUserChatList"); if (lastestUserChatList == null || lastestUserChatList == undefined || lastestUserChatList == "") { lastestUserChatList = []; } // 更新或新增消息记录 var dealMsg = false; for (var i = 0; i < lastestUserChatList.length; i++) { var tmp = lastestUserChatList[i]; if (tmp.sourceUserId == lastMsg.sourceUserId) { // 已存在聊天记录,更新最新消息 lastestUserChatList.splice(i, 1, lastMsg); dealMsg = true; break; } } if (!dealMsg) { // 新的聊天对象,添加到列表开头 lastestUserChatList.unshift(lastMsg); } // 保存更新后的聊天列表 uni.setStorageSync("lastestUserChatList", lastestUserChatList); // 通知UI更新 uni.$emit('reRenderReceiveMsgInMsgVue', "domeafavor"); uni.$emit('receiveMsgInMsgListVue', msgJSON); }, // 11. 关闭WebSocket连接 closeWSConnect() { this.globalData.CHAT.close(); }
后端

万事要开头,始于导入依赖。(Ps:这里大家在实操的时候去Maven仓库找最新版本,不一定非要和我的版本一样)

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.2.0.Final</version> </dependency>

1.首先创建服务器启动类,这是整个Netty服务器的入口点,负责配置和启动WebSocket服务器。

import com.pitayafruits.netty.websocket.WSServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; * netty 服务启动类 */ publicclassChatServer { publicstaticvoidmain(String[] args)throws InterruptedException { EventLoopGroupbossGroup=newNioEventLoopGroup(); EventLoopGroupworkerGroup=newNioEventLoopGroup(); try { ServerBootstrapserver=newServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(newWSServerInitializer()); ChannelFuturechannelFuture= server.bind(875).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

关键点

  • 采用Reactor模式,使用两个线程池:bossGroup和workerGroup

  • bossGroup负责接受客户端连接

  • workerGroup负责处理IO操作

  • 服务器绑定在875端口

2.接下来创建通道初始化器,负责配置每个新建立的连接的通道,设置处理器链(Pipeline)。

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; * 初始化器,channel注册后,会执行里面相应的初始化方法 */ publicclassWSServerInitializerextendsChannelInitializer<SocketChannel> { @Override protectedvoidinitChannel(SocketChannel socketChannel)throws Exception { ChannelPipelinepipeline= socketChannel.pipeline(); pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newHttpObjectAggregator(1024 * 64)); pipeline.addLast(newWebSocketServerProtocolHandler("/ws")); pipeline.addLast(newChatHandler()); } }

关键点

  • 处理HTTP协议:HttpServerCodecChunkedWriteHandlerHttpObjectAggregator

  • 处理WebSocket协议:WebSocketServerProtocolHandler("/ws"),指定WebSocket的路由为"/ws"

  • 添加自定义业务处理器:ChatHandler,处理具体的消息交互逻辑

3.接着创建会话管理器,管理用户ID与通道(Channel)之间的映射关系,支持同一用户多端登录。(Ps:这个根据实际业务情况来,如果不需要支持多端登录,则不需要创建。)

import io.netty.channel.Channel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; * 会话管理 */ publicclassUserChannelSession { privatestatic Map<String, List<Channel>> multiSession = newHashMap<>(); privatestatic Map<String, String> userChannelIdRelation = newHashMap<>(); publicstaticvoidputUserChannelIdRelation(String userId, String channelId) { userChannelIdRelation.put(channelId, userId); } publicstatic String getUserIdByChannelId(String channelId) { return userChannelIdRelation.get(channelId); } publicstaticvoidputMultiChannels(String userId, Channel channel) { List<Channel> channels = getMultiChannels(userId); if (channels == null || channels.size() == 0) { channels = newArrayList<>(); } channels.add(channel); multiSession.put(userId, channels); } publicstaticvoidremoveUserChannels(String userId, String channelId) { List<Channel> channels = getMultiChannels(userId); if (channels == null || channels.size() == 0) { return; } for (Channel channel : channels) { if (channel.id().asLongText().equals(channelId)) { channels.remove(channel); } } multiSession.put(userId, channels); } publicstatic List<Channel> getMultiChannels(String userId) { return multiSession.get(userId); } }

4.最后是创建消息处理器,它是核心业务逻辑处理器,负责处理客户端发送的WebSocket消息。大家可以注意到这里对于消息类型留了扩展的口子,本次我们实现先只实现文字消息。

import com.pitayafruits.enums.MsgTypeEnum; import com.pitayafruits.pojo.netty.ChatMsg; import com.pitayafruits.utils.JsonUtils; import com.pitayafruits.pojo.netty.DataContent; import com.pitayafruits.utils.LocalDateUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import java.time.LocalDateTime; import java.util.List; * 自定义助手类 */ publicclassChatHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> { publicstaticChannelGroupclients=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protectedvoidchannelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { Stringcontent= textWebSocketFrame.text(); DataContentdataContent= JsonUtils.jsonToPojo(content, DataContent.class); ChatMsgchatMsg= dataContent.getChatMsg(); StringmsgText= chatMsg.getMsg(); StringreceiverId= chatMsg.getReceiverId(); StringsenderId= chatMsg.getSenderId(); chatMsg.setChatTime(LocalDateTime.now()); IntegermsgType= chatMsg.getMsgType(); ChannelcurrentChannel= channelHandlerContext.channel(); StringcurrentChannelId= currentChannel.id().asLongText(); if (msgType == MsgTypeEnum.CONNECT_INIT.type) { UserChannelSession.putMultiChannels(senderId, currentChannel); UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId); } elseif (msgType == MsgTypeEnum.WORDS.type) { List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId); if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) { chatMsg.setIsReceiverOnLine(false); } else { chatMsg.setIsReceiverOnLine(true); for (Channel receiverChannel : receiverChannels) { ChannelfindChannel= clients.find(receiverChannel.id()); if (findChannel != null) { dataContent.setChatMsg(chatMsg); StringchatTimeFormat= LocalDateUtils.format(chatMsg.getChatTime(), LocalDateUtils.DATETIME_PATTERN_2); dataContent.setChatTime(chatTimeFormat); findChannel.writeAndFlush( newTextWebSocketFrame( JsonUtils.objectToJson(dataContent))); } } } } currentChannel.writeAndFlush(newTextWebSocketFrame(currentChannelId)); } @Override publicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception { ChannelcurrentChannel= ctx.channel(); clients.add(currentChannel); } @Override publicvoidhandlerRemoved(ChannelHandlerContext ctx)throws Exception { ChannelcurrentChannel= ctx.channel(); StringuserId= UserChannelSession.getUserIdByChannelId(currentChannel.id().asLongText()); UserChannelSession.removeUserChannels(userId, currentChannel.id().asLongText()); clients.remove(currentChannel); } @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { Channelchannel= ctx.channel(); channel.close(); clients.remove(channel); StringuserId= UserChannelSession.getUserIdByChannelId(channel.id().asLongText()); UserChannelSession.removeUserChannels(userId, channel.id().asLongText()); } }

再梳理一下完整流程:

1.服务器启动ChatServer创建并配置Netty服务器,设置线程模型和端口;

2.通道初始化:当有新连接时,WSServerInitializer设置处理器链Pipeline;

3.连接建立ChatHandler.handlerAdded()将连接添加到ChannelGroup;

4.消息处理

  • 客户端先发送初始化消息,建立用户ID与Channel的映射关系;

  • 客户端后续发送聊天消息,服务器查找接收者的Channel并转发消息;

5.连接断开ChatHandler.handlerRemoved()清理资源,移除映射关系。

效果演示

小结

至此,我们已成功构建了一个基于Netty的即时通讯服务。虽然当前实现仍有一些局限,如缺少离线消息存储机制、消息类型较为单一、未实现消息持久化等,但本文 + 代码示例给大家展示了基于Netty构建聊天服务的核心架构与完整流程。

基于现有示例,可以轻松地扩展更多功能,如添加消息队列实现离线消息推送、集成数据库实现消息持久化、增加群聊和多媒体消息支持等。

希望本文能为各位读者提供实现思路,也鼓励大家在这个基础上进行实践操作,打造更加完善的即时通讯服务。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/20 11:42:51

探索ABB机器人视觉引导抓取:C#、Halcon与RobotStudio的梦幻联动

abb机器人视觉引导抓取C#联合halcon联合RobotStudio实现虚拟仿真九点标定海康工业相机C#上位机视觉抓取 -本链接只出源码工作站&#xff0c;不出任何硬件&#xff0c;工业相机请自备 -提供2个版本一个是有海康工业相机 和 无工业相机 1.有海康工业相机提供标定教程和咨询 2.没有…

作者头像 李华
网站建设 2026/6/19 10:53:37

基于SpringBoot + QLExpress打造动态规则引擎

一、为什么需要动态规则引擎? 在开始技术实现之前,我们先来理解为什么动态规则引擎如此重要。 1.1 传统业务规则的痛点 // 传统业务规则的痛点示例 public class TraditionalBusinessRules {public void痛点() {System.out.println("=== 传统业务规则的痛点 ==="…

作者头像 李华
网站建设 2026/6/18 20:35:09

【数据结构】栈——超详解!!!(包含栈的实现)

【数据结构】栈——超详解&#xff01;&#xff01;&#xff01;&#xff08;包含栈的实现&#xff09;前言一、栈是什么&#xff1f;1. 后进先出&#xff08;LIFO&#xff09;2. 压栈&&出栈二、栈的实现1. 用什么来实现&#xff1f;2. 实现思路3.注意4. 代码实现&…

作者头像 李华
网站建设 2026/6/19 19:59:07

“渝”见硬核实力!凯云汽车测试解决方案亮相重庆行业盛会

11月13日&#xff0c;以“惟测励新&#xff0c;笃质致远”为主题的中国汽车检测测试与质量大会在重庆喜来登酒店隆重举行。来自国内各大汽车主机厂、检测认证机构、设备仪器企业及系统集成商的行业精英齐聚一堂&#xff0c;共同探讨汽车检测测试领域的新思维、新技术与新产品&a…

作者头像 李华
网站建设 2026/6/14 10:08:09

精准测试,决胜未来:控制系统测试验证解决方案

在信息化战争的战场上&#xff0c;“指哪打哪” 的精确打击能力&#xff0c;离不开控制系统的稳定运行。而一套控制系统从设计图纸落地为实战装备&#xff0c;中间藏着一个关键环节 —— 测试验证。凯云推出了一套完整的控制系统测试验证环境解决方案&#xff0c;致力于为各类装…

作者头像 李华
网站建设 2026/6/20 22:32:36

实力加冕!凯云入选国家第七批专精特新 “小巨人” 企业名单

近日&#xff0c;北京市经济和信息化局正式发布《关于北京市第七批专精特新 “小巨人” 企业和 2025 年专精特新 “小巨人” 复核通过企业名单进行公示的通知》。凭借国内先进、自主可控的平台产品&#xff0c;以及行业领先的数字化服务能力&#xff0c;凯云联创&#xff08;北…

作者头像 李华