news 2026/4/23 15:35:05

websockets 全面教程:常用 API 串联与实战指南

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
websockets 全面教程:常用 API 串联与实战指南

大家好,我是jobleap.cn的小九。
Python 的websockets库是基于asyncio实现的异步 WebSocket 客户端/服务端框架,遵循 RFC 6455 标准,支持全双工通信、心跳检测、认证等核心特性,是构建实时通信场景(如聊天室、实时监控、推送服务)的首选工具。本文将系统讲解websockets的所有常用 API,并通过实战案例串联知识点,帮助你掌握异步 WebSocket 开发的全流程。

一、环境准备:安装依赖

websockets仅支持 Python 3.7+,且依赖asyncio(Python 3.7+ 内置),安装命令如下:

# 安装最新稳定版pipinstallwebsockets# 验证安装python -c"import websockets; print(websockets.__version__)"

二、核心概念与基础架构

WebSocket 是基于 TCP 的全双工通信协议,核心特点:

  • 一次握手,持久连接(无需反复建立 HTTP 连接);
  • 服务端/客户端可主动推送消息;
  • 支持文本、二进制消息传输。

websockets核心 API 分为两大模块:

  • 服务端websockets.serve()(创建服务端)、websockets.WebSocketServerProtocol(连接对象);
  • 客户端websockets.connect()(创建客户端)、websockets.WebSocketClientProtocol(连接对象);
  • 通用:消息收发(send()/recv())、连接关闭(close())、状态检测等。

三、基础常用 API 详解

3.1 最简示例:Echo 服务(服务端+客户端)

实现“客户端发什么,服务端就返回什么”的基础功能,覆盖核心 API:serve()connect()recv()send()

3.1.1 服务端代码
importasyncioimportwebsockets# 核心处理函数:每个客户端连接会触发该函数asyncdefecho_handler(websocket,path):""" websocket: 客户端连接对象(WebSocketServerProtocol 实例) path: 客户端请求的路径(如 ws://localhost:8765/chat) """try:# 循环接收客户端消息(全双工通信,持续监听)asyncformessageinwebsocket:print(f"服务端收到:{message}")# 发送消息给客户端(echo 逻辑)awaitwebsocket.send(f"Echo:{message}")finally:print(f"客户端{websocket.remote_address}断开连接")# 启动服务端asyncdefstart_echo_server():# 核心 API:websockets.serve() 创建服务端# 参数:处理函数、绑定地址、端口asyncwithwebsockets.serve(echo_handler,"localhost",8765):print("Echo 服务端已启动:ws://localhost:8765")# 挂起事件循环,保持服务运行awaitasyncio.Future()# 无限运行# 运行服务端if__name__=="__main__":asyncio.run(start_echo_server())
3.1.2 客户端代码
importasyncioimportwebsocketsasyncdefecho_client():# 核心 API:websockets.connect() 连接服务端asyncwithwebsockets.connect("ws://localhost:8765")aswebsocket:# 发送文本消息(核心 API:send())awaitwebsocket.send("Hello WebSocket!")# 接收消息(核心 API:recv())response=awaitwebsocket.recv()print(f"客户端收到:{response}")# 主动发送多条消息foriinrange(2):msg=f"测试消息{i+1}"awaitwebsocket.send(msg)res=awaitwebsocket.recv()print(f"客户端收到:{res}")# 运行客户端(需先启动服务端)if__name__=="__main__":asyncio.run(echo_client())

3.2 消息类型:文本与二进制消息

WebSocket 支持文本(str)和二进制(bytes)两种消息类型,websockets原生支持这两种格式的收发:

3.2.1 服务端(支持二进制消息)
importasyncioimportwebsocketsasyncdefbinary_handler(websocket,path):asyncformessageinwebsocket:# 判断消息类型ifisinstance(message,str):print(f"文本消息:{message}")awaitwebsocket.send(f"文本响应:{message}")elifisinstance(message,bytes):print(f"二进制消息(长度:{len(message)}):{message}")# 二进制消息响应(比如返回原字节+后缀)awaitwebsocket.send(message+b"_response")asyncdefstart_binary_server():asyncwithwebsockets.serve(binary_handler,"localhost",8766):print("二进制消息服务端已启动:ws://localhost:8766")awaitasyncio.Future()# asyncio.run(start_binary_server())
3.2.2 客户端(发送二进制消息)
importasyncioimportwebsocketsasyncdefbinary_client():asyncwithwebsockets.connect("ws://localhost:8766")aswebsocket:# 发送文本消息awaitwebsocket.send("这是文本消息")text_resp=awaitwebsocket.recv()print(f"文本响应:{text_resp}")# 发送二进制消息(比如图片字节、自定义二进制数据)binary_data=b"hello_binary"awaitwebsocket.send(binary_data)binary_resp=awaitwebsocket.recv()print(f"二进制响应:{binary_resp}")# asyncio.run(binary_client())

3.3 连接管理:关闭连接与状态检测

websockets提供连接状态检测、主动关闭连接的 API,核心属性/方法:

  • websocket.closed:判断连接是否关闭(布尔值);
  • websocket.close(code=1000, reason=""):主动关闭连接(code 遵循 WebSocket 标准码);
  • websocket.close_code/websocket.close_reason:获取关闭码/原因。
示例:主动关闭连接与状态检测
importasyncioimportwebsocketsasyncdefclose_handler(websocket,path):# 接收客户端消息,触发关闭逻辑message=awaitwebsocket.recv()ifmessage=="close":# 主动关闭连接(标准关闭码 1000:正常关闭)awaitwebsocket.close(code=1000,reason="客户端请求关闭")print(f"连接已关闭(码:{websocket.close_code},原因:{websocket.close_reason})")else:awaitwebsocket.send(f"收到:{message}")asyncdefstart_close_server():asyncwithwebsockets.serve(close_handler,"localhost",8767):print("关闭连接测试服务端已启动:ws://localhost:8767")awaitasyncio.Future()asyncdefclose_client():asyncwithwebsockets.connect("ws://localhost:8767")aswebsocket:# 检测初始状态print(f"初始连接状态:{'关闭'ifwebsocket.closedelse'开启'}")# 发送关闭指令awaitwebsocket.send("close")# 等待连接关闭(可选)awaitasyncio.sleep(0.1)print(f"发送关闭指令后状态:{'关闭'ifwebsocket.closedelse'开启'}")# 先启动服务端,再运行客户端# asyncio.run(start_close_server())# asyncio.run(close_client())

3.4 异常处理:捕获 WebSocket 相关异常

websockets定义了多种异常类型,核心异常包括:

  • websockets.exceptions.ConnectionClosed:连接已关闭;
  • websockets.exceptions.ConnectionClosedError:连接异常关闭;
  • websockets.exceptions.InvalidURI:无效的 WebSocket 地址;
  • websockets.exceptions.TimeoutError:连接/收发超时。
示例:完整异常处理
importasyncioimportwebsocketsfromwebsockets.exceptionsimport(ConnectionClosed,InvalidURI,TimeoutError)asyncdefexception_handler(websocket,path):try:asyncformessageinwebsocket:print(f"收到消息:{message}")awaitwebsocket.send(f"响应:{message}")exceptConnectionClosedase:print(f"连接关闭异常(码:{e.code},原因:{e.reason})")exceptExceptionase:print(f"未知异常:{e}")asyncdefstart_exception_server():asyncwithwebsockets.serve(exception_handler,"localhost",8768):print("异常处理服务端已启动:ws://localhost:8768")awaitasyncio.Future()asyncdefexception_client():try:# 测试1:无效 URI(故意写错)# async with websockets.connect("ws://localhost:8768/invalid") as websocket:# 测试2:正常连接后强制断开asyncwithwebsockets.connect("ws://localhost:8768",timeout=5)aswebsocket:awaitwebsocket.send("测试消息")# 模拟服务端断开后继续发送awaitasyncio.sleep(1)awaitwebsocket.send("断开后发送的消息")# 触发 ConnectionClosedexceptInvalidURIase:print(f"错误:无效的 URI -{e}")exceptTimeoutErrorase:print(f"错误:连接超时 -{e}")exceptConnectionClosedase:print(f"错误:连接已关闭 -{e.code}|{e.reason}")exceptExceptionase:print(f"通用错误:{e}")# asyncio.run(start_exception_server())# asyncio.run(exception_client())

3.5 进阶 API:心跳检测(防止连接断开)

WebSocket 长连接可能因网络闲置被网关/服务器断开,心跳检测是核心解决方案:客户端/服务端定期发送心跳包(如ping),对方回复pong确认连接存活。

websockets内置ping()/pong()方法,也可自定义心跳逻辑:

示例:带心跳检测的客户端+服务端
importasyncioimportwebsocketsfromdatetimeimportdatetime# ---------------------- 服务端 ----------------------asyncdefheartbeat_server_handler(websocket,path):# 启动心跳检测任务(后台运行)heartbeat_task=asyncio.create_task(server_heartbeat(websocket))try:asyncformessageinwebsocket:ifmessage=="ping":# 响应心跳包awaitwebsocket.send("pong")print(f"[{datetime.now()}] 服务端收到心跳,回复 pong")else:print(f"[{datetime.now()}] 服务端收到消息:{message}")awaitwebsocket.send(f"响应:{message}")finally:# 取消心跳任务heartbeat_task.cancel()print(f"[{datetime.now()}] 客户端断开,心跳任务已取消")asyncdefserver_heartbeat(websocket):"""服务端心跳检测:定期检查连接状态"""whileTrue:awaitasyncio.sleep(10)# 每10秒检测一次ifwebsocket.closed:break# 主动发送 ping(可选,也可等客户端发)awaitwebsocket.ping()print(f"[{datetime.now()}] 服务端发送 ping 检测")asyncdefstart_heartbeat_server():asyncwithwebsockets.serve(heartbeat_server_handler,"localhost",8769):print("心跳检测服务端已启动:ws://localhost:8769")awaitasyncio.Future()# ---------------------- 客户端 ----------------------asyncdefclient_heartbeat(websocket):"""客户端心跳任务:每5秒发送一次 ping"""whileTrue:awaitasyncio.sleep(5)ifwebsocket.closed:breakawaitwebsocket.send("ping")# 等待 pong 响应(超时则断开)try:resp=awaitasyncio.wait_for(websocket.recv(),timeout=3)ifresp=="pong":print(f"[{datetime.now()}] 客户端收到 pong,连接正常")else:raiseException("非心跳响应")exceptasyncio.TimeoutError:print(f"[{datetime.now()}] 心跳超时,关闭连接")awaitwebsocket.close()breakasyncdefheartbeat_client():try:asyncwithwebsockets.connect("ws://localhost:8769")aswebsocket:# 启动客户端心跳任务heartbeat_task=asyncio.create_task(client_heartbeat(websocket))# 发送业务消息awaitwebsocket.send("业务消息1")resp1=awaitwebsocket.recv()print(f"[{datetime.now()}] 客户端收到:{resp1}")# 等待心跳运行(模拟长连接)awaitasyncio.sleep(20)# 取消心跳任务heartbeat_task.cancel()exceptExceptionase:print(f"客户端异常:{e}")# 运行:先启动服务端,再启动客户端# asyncio.run(start_heartbeat_server())# asyncio.run(heartbeat_client())

3.6 进阶 API:连接认证(Token/用户名密码)

WebSocket 连接建立时可通过请求头路径参数做身份认证,websockets支持在连接握手阶段验证:

示例:基于 Token 的认证
importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection# 模拟合法 Token 列表VALID_TOKENS={"token_123","token_456"}# ---------------------- 认证中间件 ----------------------asyncdefauth_middleware(websocket,path):""" 连接握手阶段认证: - 从请求头获取 Token - 验证失败则拒绝连接(RejectConnection) """# 获取请求头中的 Token(websocket.request_headers 是类字典对象)token=websocket.request_headers.get("X-Auth-Token")ifnottokenortokennotinVALID_TOKENS:# 拒绝连接(标准 401 状态码)raiseRejectConnection(401,"Invalid or missing token")# 认证通过,进入业务处理awaitauth_handler(websocket,path)# ---------------------- 业务处理 ----------------------asyncdefauth_handler(websocket,path):print(f"客户端{websocket.remote_address}认证通过,开始通信")asyncformessageinwebsocket:awaitwebsocket.send(f"认证通过的响应:{message}")# ---------------------- 启动认证服务端 ----------------------asyncdefstart_auth_server():asyncwithwebsockets.serve(auth_middleware,"localhost",8770):print("认证服务端已启动:ws://localhost:8770")awaitasyncio.Future()# ---------------------- 认证客户端 ----------------------asyncdefauth_client(valid_token:bool=True):# 构造请求头(携带 Token)headers={}ifvalid_token:headers["X-Auth-Token"]="token_123"# 合法 Tokenelse:headers["X-Auth-Token"]="invalid_token"# 非法 Tokentry:asyncwithwebsockets.connect("ws://localhost:8770",extra_headers=headers# 自定义请求头)aswebsocket:awaitwebsocket.send("认证后的业务消息")resp=awaitwebsocket.recv()print(f"客户端收到:{resp}")exceptwebsockets.exceptions.ConnectionRefusedErrorase:print(f"连接被拒绝:{e}")# 运行:# 1. 启动服务端:asyncio.run(start_auth_server())# 2. 合法 Token 客户端:asyncio.run(auth_client(valid_token=True))# 3. 非法 Token 客户端:asyncio.run(auth_client(valid_token=False))

3.7 进阶 API:服务端广播(多客户端通信)

WebSocket 最典型的场景是多客户端实时通信(如聊天室),核心是维护客户端连接池,实现消息广播:

示例:简易聊天室(广播+多客户端)
importasyncioimportwebsocketsfromtypingimportSet# 维护所有活跃的客户端连接active_connections:Set[websockets.WebSocketServerProtocol]=set()asyncdefchat_handler(websocket,path):# 1. 新客户端连接:加入连接池active_connections.add(websocket)client_addr=websocket.remote_addressprint(f"[{client_addr}] 加入聊天室,当前在线:{len(active_connections)}")try:# 2. 接收客户端消息并广播asyncformessageinwebsocket:broadcast_msg=f"[{client_addr}]{message}"print(f"广播消息:{broadcast_msg}")# 遍历所有客户端,发送广播消息forconninactive_connections:ifconn!=websocket:# 不发给自己awaitconn.send(broadcast_msg)exceptwebsockets.exceptions.ConnectionClosed:print(f"[{client_addr}] 异常断开")finally:# 3. 客户端断开:从连接池移除active_connections.remove(websocket)print(f"[{client_addr}] 离开聊天室,当前在线:{len(active_connections)}")asyncdefstart_chat_server():asyncwithwebsockets.serve(chat_handler,"localhost",8771):print("聊天室服务端已启动:ws://localhost:8771")awaitasyncio.Future()# ---------------------- 聊天室客户端 ----------------------asyncdefchat_client(username:str):asyncwithwebsockets.connect("ws://localhost:8771")aswebsocket:# 启动接收消息的任务(后台运行,实时接收广播)recv_task=asyncio.create_task(recv_chat_msg(websocket))# 控制台输入消息并发送whileTrue:msg=input(f"{username}> ")ifmsg.lower()=="exit":breakawaitwebsocket.send(f"{username}{msg}")# 取消接收任务recv_task.cancel()awaitwebsocket.close()asyncdefrecv_chat_msg(websocket):"""后台接收广播消息"""whileTrue:try:msg=awaitwebsocket.recv()print(f"\n收到广播:{msg}")print("> ",end="",flush=True)# 恢复输入提示符exceptwebsockets.exceptions.ConnectionClosed:break# 运行:# 1. 启动聊天室服务端:asyncio.run(start_chat_server())# 2. 启动多个客户端(不同终端运行):# asyncio.run(chat_client("用户1"))# asyncio.run(chat_client("用户2"))

四、实战案例:串联所有常用 API 的实时监控系统

场景需求

实现一个简易的实时监控系统:

  1. 服务端:
    • 认证客户端(Token 验证);
    • 接收客户端上报的监控数据(二进制/文本);
    • 广播监控数据给所有在线客户端;
    • 心跳检测,清理离线客户端;
    • 完整异常处理。
  2. 客户端:
    • 携带 Token 连接服务端;
    • 定期上报监控数据(CPU/内存使用率,模拟);
    • 实时接收服务端广播的所有客户端监控数据;
    • 心跳检测,断连自动重连。

完整代码实现

4.1 服务端代码(monitor_server.py)
importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection,ConnectionClosedfromtypingimportSet,Dictfromdatetimeimportdatetime# 配置VALID_TOKENS={"monitor_token_2025"}# 合法认证 TokenSERVER_HOST="localhost"SERVER_PORT=8772HEARTBEAT_INTERVAL=10# 服务端心跳检测间隔(秒)CLIENT_TIMEOUT=30# 客户端超时时间(秒)# 客户端连接池:{连接对象: {"addr": 地址, "last_heartbeat": 最后心跳时间}}client_pool:Dict[websockets.WebSocketServerProtocol,dict]={}# ---------------------- 认证逻辑 ----------------------asyncdefauth_middleware(websocket,path):token=websocket.request_headers.get("X-Monitor-Token")ifnottokenortokennotinVALID_TOKENS:raiseRejectConnection(401,"Invalid Monitor Token")# 认证通过,记录客户端信息client_pool[websocket]={"addr":websocket.remote_address,"last_heartbeat":datetime.now()}print(f"[{datetime.now()}] 客户端{websocket.remote_address}认证通过,加入监控")# 进入业务处理awaitmonitor_handler(websocket,path)# ---------------------- 心跳检测任务 ----------------------asyncdefserver_heartbeat_check():"""后台定时清理离线客户端"""whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)now=datetime.now()# 遍历连接池,检测超时客户端forconn,infoinlist(client_pool.items()):if(now-info["last_heartbeat"]).total_seconds()>CLIENT_TIMEOUT:print(f"[{datetime.now()}] 客户端{info['addr']}心跳超时,强制断开")awaitconn.close(code=1001,reason="Heartbeat timeout")delclient_pool[conn]# ---------------------- 业务处理 ----------------------asyncdefmonitor_handler(websocket,path):try:asyncformessageinwebsocket:# 更新最后心跳时间(无论消息类型,视为存活)client_pool[websocket]["last_heartbeat"]=datetime.now()# 处理监控数据ifisinstance(message,str):# 文本消息:心跳/控制指令ifmessage=="ping":awaitwebsocket.send("pong")print(f"[{datetime.now()}] 客户端{websocket.remote_address}心跳响应")else:print(f"[{datetime.now()}] 客户端{websocket.remote_address}文本监控数据:{message}")elifisinstance(message,bytes):# 二进制消息:模拟监控数据(如序列化的指标)print(f"[{datetime.now()}] 客户端{websocket.remote_address}二进制监控数据(长度:{len(message)})")# 广播监控数据给所有客户端broadcast_msg=f"[{websocket.remote_address}]{message[:50]}..."# 截断长消息forconninclient_pool:ifconn!=websocket:awaitconn.send(broadcast_msg)exceptConnectionClosedase:print(f"[{datetime.now()}] 客户端{websocket.remote_address}断开(码:{e.code},原因:{e.reason})")finally:# 清理连接池ifwebsocketinclient_pool:delclient_pool[websocket]print(f"[{datetime.now()}] 客户端{websocket.remote_address}已移除,当前在线:{len(client_pool)}")# ---------------------- 启动服务端 ----------------------asyncdefstart_monitor_server():# 启动心跳检测后台任务asyncio.create_task(server_heartbeat_check())# 启动 WebSocket 服务asyncwithwebsockets.serve(auth_middleware,SERVER_HOST,SERVER_PORT):print(f"[{datetime.now()}] 监控服务端已启动:ws://{SERVER_HOST}:{SERVER_PORT}")awaitasyncio.Future()if__name__=="__main__":asyncio.run(start_monitor_server())
4.2 客户端代码(monitor_client.py)
importasyncioimportwebsocketsfromwebsockets.exceptionsimportConnectionRefusedError,ConnectionClosedimportrandomfromdatetimeimportdatetimeimporttime# 配置SERVER_URL="ws://localhost:8772"AUTH_TOKEN="monitor_token_2025"# 合法 TokenREPORT_INTERVAL=5# 监控数据上报间隔(秒)HEARTBEAT_INTERVAL=8# 客户端心跳间隔(秒)RECONNECT_INTERVAL=3# 断连重连间隔(秒)# ---------------------- 心跳任务 ----------------------asyncdefclient_heartbeat(websocket):"""客户端心跳:定期发送 ping"""whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)ifwebsocket.closed:breaktry:awaitwebsocket.send("ping")resp=awaitasyncio.wait_for(websocket.recv(),timeout=3)ifresp=="pong":print(f"[{datetime.now()}] 心跳正常")else:raiseException("非 pong 响应")exceptasyncio.TimeoutError:print(f"[{datetime.now()}] 心跳超时,准备重连")awaitwebsocket.close()breakexceptExceptionase:print(f"[{datetime.now()}] 心跳异常:{e}")break# ---------------------- 接收广播任务 ----------------------asyncdefrecv_broadcast(websocket):"""接收服务端广播的监控数据"""whileTrue:try:msg=awaitwebsocket.recv()print(f"\n[{datetime.now()}] 收到监控广播:{msg}")exceptConnectionClosed:break# ---------------------- 上报监控数据 ----------------------asyncdefreport_monitor_data(websocket):"""模拟上报 CPU/内存使用率(文本+二进制)"""whileTrue:awaitasyncio.sleep(REPORT_INTERVAL)ifwebsocket.closed:break# 模拟监控数据cpu_usage=random.uniform(10.0,80.0)mem_usage=random.uniform(20.0,90.0)# 1. 发送文本监控数据text_data=f"CPU:{cpu_usage:.2f}%, MEM:{mem_usage:.2f}%"awaitwebsocket.send(text_data)print(f"[{datetime.now()}] 上报文本监控数据:{text_data}")# 2. 发送二进制监控数据(模拟序列化的指标)binary_data=f"{cpu_usage:.2f}|{mem_usage:.2f}".encode("utf-8")awaitwebsocket.send(binary_data)print(f"[{datetime.now()}] 上报二进制监控数据:{binary_data}")# ---------------------- 客户端主逻辑(支持重连) ----------------------asyncdefmonitor_client():whileTrue:try:# 建立连接(携带认证 Token)asyncwithwebsockets.connect(SERVER_URL,extra_headers={"X-Monitor-Token":AUTH_TOKEN})aswebsocket:print(f"[{datetime.now()}] 成功连接监控服务端")# 启动后台任务heartbeat_task=asyncio.create_task(client_heartbeat(websocket))broadcast_task=asyncio.create_task(recv_broadcast(websocket))report_task=asyncio.create_task(report_monitor_data(websocket))# 等待任务完成(或连接断开)awaitasyncio.gather(heartbeat_task,broadcast_task,report_task)exceptConnectionRefusedError:print(f"[{datetime.now()}] 连接被拒绝(服务端未启动/Token 错误)")exceptConnectionClosed:print(f"[{datetime.now()}] 连接断开")exceptExceptionase:print(f"[{datetime.now()}] 客户端异常:{e}")# 断连后重连print(f"[{datetime.now()}]{RECONNECT_INTERVAL}秒后尝试重连...")awaitasyncio.sleep(RECONNECT_INTERVAL)if__name__=="__main__":asyncio.run(monitor_client())

运行说明

  1. 启动服务端:python monitor_server.py
  2. 启动多个客户端(不同终端):python monitor_client.py
  3. 观察效果:
    • 客户端自动认证连接;
    • 定期上报文本/二进制监控数据;
    • 服务端广播数据给所有客户端;
    • 心跳检测,超时自动清理客户端;
    • 客户端断连后自动重连。

五、核心 API 总结与最佳实践

5.1 核心 API 速查表

类别API 名称/方法作用
服务端websockets.serve(handler, host, port)创建 WebSocket 服务端
客户端websockets.connect(uri, **kwargs)连接 WebSocket 服务端
消息收发await websocket.send(msg)发送文本/二进制消息
await websocket.recv()接收消息(单次)
async for msg in websocket持续监听消息(推荐)
连接管理websocket.closed判断连接是否关闭
await websocket.close(code, reason)主动关闭连接(code 遵循 WebSocket 标准码)
websocket.request_headers获取客户端请求头(服务端)
认证RejectConnection(status_code, reason)服务端拒绝连接(认证失败)
心跳await websocket.ping()/pong()内置 ping/pong 心跳(底层)
异常ConnectionClosed连接关闭异常
InvalidURI无效 URI 异常
TimeoutError连接/收发超时异常

5.2 最佳实践

  1. 连接池管理:服务端维护客户端连接池时,使用set/dict存储连接,并在finally块中清理,避免内存泄漏;
  2. 心跳必加:长连接场景必须实现心跳检测,防止闲置连接被网关/服务器断开;
  3. 异常全覆盖:至少捕获ConnectionClosedConnectionRefusedErrorTimeoutError等核心异常;
  4. 重连机制:客户端实现断连自动重连,提升鲁棒性;
  5. 并发控制:服务端广播时,若客户端数量多,使用asyncio.Semaphore限制并发发送,避免事件循环阻塞;
  6. 认证安全:认证 Token 避免明文传输(生产环境建议使用 WSS(WebSocket Secure),即wss://);
  7. 消息大小限制:生产环境可通过max_size参数限制消息大小,防止超大消息攻击:
    # 服务端限制单条消息最大 1MBwebsockets.serve(handler,host,port,max_size=1024*1024)# 客户端限制单条消息最大 1MBwebsockets.connect(uri,max_size=1024*1024)

六、总结

本文覆盖了websockets库的所有核心常用 API:

  • 基础:服务端创建、客户端连接、文本/二进制消息收发;
  • 进阶:连接管理、异常处理、心跳检测、身份认证、服务端广播;
  • 实战:通过实时监控系统串联所有 API,实现认证、心跳、数据上报、广播、重连等完整功能。

掌握这些知识点后,你可以轻松构建各类实时通信场景(聊天室、实时监控、推送服务、在线游戏等),同时遵循最佳实践确保系统的稳定性和安全性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/19 12:46:05

云原生安全:Falco 容器运行时监控

随着云原生技术的飞速发展,容器化部署已成为企业应用交付的主流方式。但容器的轻量级、动态化特性也带来了全新的安全挑战——传统的主机级安全工具难以适配容器的隔离环境,而容器镜像漏洞、运行时权限滥用、逃逸攻击等风险时刻威胁着业务安全。在众多云…

作者头像 李华
网站建设 2026/4/23 11:28:11

LobeChat网络安全等级保护方案

LobeChat网络安全等级保护方案 在企业加速推进数字化转型的今天,AI聊天系统正逐步从“锦上添花”的辅助工具演变为业务流程中的关键交互节点。尤其是在金融、政务、医疗等高敏感领域,一个看似简单的对话界面背后,可能涉及用户身份信息、内部…

作者头像 李华
网站建设 2026/4/23 14:12:50

EmotiVoice资源占用优化:在普通GPU上流畅运行

EmotiVoice资源占用优化:在普通GPU上流畅运行 在一台搭载RTX 3060、显存仅12GB的笔记本电脑上,能否实时生成带有情感色彩的定制化语音?对于许多开发者而言,这曾是一个奢望。高端语音合成模型动辄需要A100级别的算力支持&#xff0…

作者头像 李华
网站建设 2026/4/23 11:30:48

语音合成+大模型?EmotiVoice与LLM融合应用设想

语音合成与大模型的融合:让AI“有情有感”地说话 在智能助手越来越常见的今天,我们早已习惯了用手机发问:“明天会下雨吗?”“帮我设个闹钟”。但有没有觉得,这些回答虽然准确,却总少了点温度?就…

作者头像 李华
网站建设 2026/4/23 11:34:39

EmotiVoice语音合成在远程办公会议中的辅助作用

EmotiVoice语音合成在远程办公会议中的辅助作用 在一场跨时区的线上会议中,三位团队成员分别身处北京、柏林和旧金山。会议结束后,一位因时差问题未能参会的同事收到了一封邮件:“您有一条新的语音纪要,请点击播放。”按下按钮后&…

作者头像 李华