大家好,我是jobleap.cn的小九。
Python 的websockets库是基于asyncio实现的异步 WebSocket 客户端/服务端框架,遵循 RFC 6455 标准,支持全双工通信、心跳检测、认证等核心特性,是构建实时通信场景(如聊天室、实时监控、推送服务)的首选工具。本文将系统讲解websockets的所有常用 API,并通过实战案例串联知识点,帮助你掌握异步 WebSocket 开发的全流程。
一、环境准备:安装依赖
websockets仅支持 Python 3.7+,且依赖asyncio(Python 3.7+ 内置),安装命令如下:
# 安装最新稳定版pipinstallwebsockets# 验证安装python -c"import websockets; print(websockets.__version__)"二、核心概念与基础架构
WebSocket 是基于 TCP 的全双工通信协议,核心特点:
- 一次握手,持久连接(无需反复建立 HTTP 连接);
- 服务端/客户端可主动推送消息;
- 支持文本、二进制消息传输。
websockets核心 API 分为两大模块:
- 服务端:
websockets.serve()(创建服务端)、websockets.WebSocketServerProtocol(连接对象); - 客户端:
websockets.connect()(创建客户端)、websockets.WebSocketClientProtocol(连接对象); - 通用:消息收发(
send()/recv())、连接关闭(close())、状态检测等。
三、基础常用 API 详解
3.1 最简示例:Echo 服务(服务端+客户端)
实现“客户端发什么,服务端就返回什么”的基础功能,覆盖核心 API:serve()、connect()、recv()、send()。
3.1.1 服务端代码
importasyncioimportwebsockets# 核心处理函数:每个客户端连接会触发该函数asyncdefecho_handler(websocket,path):""" websocket: 客户端连接对象(WebSocketServerProtocol 实例) path: 客户端请求的路径(如 ws://localhost:8765/chat) """try:# 循环接收客户端消息(全双工通信,持续监听)asyncformessageinwebsocket:print(f"服务端收到:{message}")# 发送消息给客户端(echo 逻辑)awaitwebsocket.send(f"Echo:{message}")finally:print(f"客户端{websocket.remote_address}断开连接")# 启动服务端asyncdefstart_echo_server():# 核心 API:websockets.serve() 创建服务端# 参数:处理函数、绑定地址、端口asyncwithwebsockets.serve(echo_handler,"localhost",8765):print("Echo 服务端已启动:ws://localhost:8765")# 挂起事件循环,保持服务运行awaitasyncio.Future()# 无限运行# 运行服务端if__name__=="__main__":asyncio.run(start_echo_server())3.1.2 客户端代码
importasyncioimportwebsocketsasyncdefecho_client():# 核心 API:websockets.connect() 连接服务端asyncwithwebsockets.connect("ws://localhost:8765")aswebsocket:# 发送文本消息(核心 API:send())awaitwebsocket.send("Hello WebSocket!")# 接收消息(核心 API:recv())response=awaitwebsocket.recv()print(f"客户端收到:{response}")# 主动发送多条消息foriinrange(2):msg=f"测试消息{i+1}"awaitwebsocket.send(msg)res=awaitwebsocket.recv()print(f"客户端收到:{res}")# 运行客户端(需先启动服务端)if__name__=="__main__":asyncio.run(echo_client())3.2 消息类型:文本与二进制消息
WebSocket 支持文本(str)和二进制(bytes)两种消息类型,websockets原生支持这两种格式的收发:
3.2.1 服务端(支持二进制消息)
importasyncioimportwebsocketsasyncdefbinary_handler(websocket,path):asyncformessageinwebsocket:# 判断消息类型ifisinstance(message,str):print(f"文本消息:{message}")awaitwebsocket.send(f"文本响应:{message}")elifisinstance(message,bytes):print(f"二进制消息(长度:{len(message)}):{message}")# 二进制消息响应(比如返回原字节+后缀)awaitwebsocket.send(message+b"_response")asyncdefstart_binary_server():asyncwithwebsockets.serve(binary_handler,"localhost",8766):print("二进制消息服务端已启动:ws://localhost:8766")awaitasyncio.Future()# asyncio.run(start_binary_server())3.2.2 客户端(发送二进制消息)
importasyncioimportwebsocketsasyncdefbinary_client():asyncwithwebsockets.connect("ws://localhost:8766")aswebsocket:# 发送文本消息awaitwebsocket.send("这是文本消息")text_resp=awaitwebsocket.recv()print(f"文本响应:{text_resp}")# 发送二进制消息(比如图片字节、自定义二进制数据)binary_data=b"hello_binary"awaitwebsocket.send(binary_data)binary_resp=awaitwebsocket.recv()print(f"二进制响应:{binary_resp}")# asyncio.run(binary_client())3.3 连接管理:关闭连接与状态检测
websockets提供连接状态检测、主动关闭连接的 API,核心属性/方法:
websocket.closed:判断连接是否关闭(布尔值);websocket.close(code=1000, reason=""):主动关闭连接(code 遵循 WebSocket 标准码);websocket.close_code/websocket.close_reason:获取关闭码/原因。
示例:主动关闭连接与状态检测
importasyncioimportwebsocketsasyncdefclose_handler(websocket,path):# 接收客户端消息,触发关闭逻辑message=awaitwebsocket.recv()ifmessage=="close":# 主动关闭连接(标准关闭码 1000:正常关闭)awaitwebsocket.close(code=1000,reason="客户端请求关闭")print(f"连接已关闭(码:{websocket.close_code},原因:{websocket.close_reason})")else:awaitwebsocket.send(f"收到:{message}")asyncdefstart_close_server():asyncwithwebsockets.serve(close_handler,"localhost",8767):print("关闭连接测试服务端已启动:ws://localhost:8767")awaitasyncio.Future()asyncdefclose_client():asyncwithwebsockets.connect("ws://localhost:8767")aswebsocket:# 检测初始状态print(f"初始连接状态:{'关闭'ifwebsocket.closedelse'开启'}")# 发送关闭指令awaitwebsocket.send("close")# 等待连接关闭(可选)awaitasyncio.sleep(0.1)print(f"发送关闭指令后状态:{'关闭'ifwebsocket.closedelse'开启'}")# 先启动服务端,再运行客户端# asyncio.run(start_close_server())# asyncio.run(close_client())3.4 异常处理:捕获 WebSocket 相关异常
websockets定义了多种异常类型,核心异常包括:
websockets.exceptions.ConnectionClosed:连接已关闭;websockets.exceptions.ConnectionClosedError:连接异常关闭;websockets.exceptions.InvalidURI:无效的 WebSocket 地址;websockets.exceptions.TimeoutError:连接/收发超时。
示例:完整异常处理
importasyncioimportwebsocketsfromwebsockets.exceptionsimport(ConnectionClosed,InvalidURI,TimeoutError)asyncdefexception_handler(websocket,path):try:asyncformessageinwebsocket:print(f"收到消息:{message}")awaitwebsocket.send(f"响应:{message}")exceptConnectionClosedase:print(f"连接关闭异常(码:{e.code},原因:{e.reason})")exceptExceptionase:print(f"未知异常:{e}")asyncdefstart_exception_server():asyncwithwebsockets.serve(exception_handler,"localhost",8768):print("异常处理服务端已启动:ws://localhost:8768")awaitasyncio.Future()asyncdefexception_client():try:# 测试1:无效 URI(故意写错)# async with websockets.connect("ws://localhost:8768/invalid") as websocket:# 测试2:正常连接后强制断开asyncwithwebsockets.connect("ws://localhost:8768",timeout=5)aswebsocket:awaitwebsocket.send("测试消息")# 模拟服务端断开后继续发送awaitasyncio.sleep(1)awaitwebsocket.send("断开后发送的消息")# 触发 ConnectionClosedexceptInvalidURIase:print(f"错误:无效的 URI -{e}")exceptTimeoutErrorase:print(f"错误:连接超时 -{e}")exceptConnectionClosedase:print(f"错误:连接已关闭 -{e.code}|{e.reason}")exceptExceptionase:print(f"通用错误:{e}")# asyncio.run(start_exception_server())# asyncio.run(exception_client())3.5 进阶 API:心跳检测(防止连接断开)
WebSocket 长连接可能因网络闲置被网关/服务器断开,心跳检测是核心解决方案:客户端/服务端定期发送心跳包(如ping),对方回复pong确认连接存活。
websockets内置ping()/pong()方法,也可自定义心跳逻辑:
示例:带心跳检测的客户端+服务端
importasyncioimportwebsocketsfromdatetimeimportdatetime# ---------------------- 服务端 ----------------------asyncdefheartbeat_server_handler(websocket,path):# 启动心跳检测任务(后台运行)heartbeat_task=asyncio.create_task(server_heartbeat(websocket))try:asyncformessageinwebsocket:ifmessage=="ping":# 响应心跳包awaitwebsocket.send("pong")print(f"[{datetime.now()}] 服务端收到心跳,回复 pong")else:print(f"[{datetime.now()}] 服务端收到消息:{message}")awaitwebsocket.send(f"响应:{message}")finally:# 取消心跳任务heartbeat_task.cancel()print(f"[{datetime.now()}] 客户端断开,心跳任务已取消")asyncdefserver_heartbeat(websocket):"""服务端心跳检测:定期检查连接状态"""whileTrue:awaitasyncio.sleep(10)# 每10秒检测一次ifwebsocket.closed:break# 主动发送 ping(可选,也可等客户端发)awaitwebsocket.ping()print(f"[{datetime.now()}] 服务端发送 ping 检测")asyncdefstart_heartbeat_server():asyncwithwebsockets.serve(heartbeat_server_handler,"localhost",8769):print("心跳检测服务端已启动:ws://localhost:8769")awaitasyncio.Future()# ---------------------- 客户端 ----------------------asyncdefclient_heartbeat(websocket):"""客户端心跳任务:每5秒发送一次 ping"""whileTrue:awaitasyncio.sleep(5)ifwebsocket.closed:breakawaitwebsocket.send("ping")# 等待 pong 响应(超时则断开)try:resp=awaitasyncio.wait_for(websocket.recv(),timeout=3)ifresp=="pong":print(f"[{datetime.now()}] 客户端收到 pong,连接正常")else:raiseException("非心跳响应")exceptasyncio.TimeoutError:print(f"[{datetime.now()}] 心跳超时,关闭连接")awaitwebsocket.close()breakasyncdefheartbeat_client():try:asyncwithwebsockets.connect("ws://localhost:8769")aswebsocket:# 启动客户端心跳任务heartbeat_task=asyncio.create_task(client_heartbeat(websocket))# 发送业务消息awaitwebsocket.send("业务消息1")resp1=awaitwebsocket.recv()print(f"[{datetime.now()}] 客户端收到:{resp1}")# 等待心跳运行(模拟长连接)awaitasyncio.sleep(20)# 取消心跳任务heartbeat_task.cancel()exceptExceptionase:print(f"客户端异常:{e}")# 运行:先启动服务端,再启动客户端# asyncio.run(start_heartbeat_server())# asyncio.run(heartbeat_client())3.6 进阶 API:连接认证(Token/用户名密码)
WebSocket 连接建立时可通过请求头或路径参数做身份认证,websockets支持在连接握手阶段验证:
示例:基于 Token 的认证
importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection# 模拟合法 Token 列表VALID_TOKENS={"token_123","token_456"}# ---------------------- 认证中间件 ----------------------asyncdefauth_middleware(websocket,path):""" 连接握手阶段认证: - 从请求头获取 Token - 验证失败则拒绝连接(RejectConnection) """# 获取请求头中的 Token(websocket.request_headers 是类字典对象)token=websocket.request_headers.get("X-Auth-Token")ifnottokenortokennotinVALID_TOKENS:# 拒绝连接(标准 401 状态码)raiseRejectConnection(401,"Invalid or missing token")# 认证通过,进入业务处理awaitauth_handler(websocket,path)# ---------------------- 业务处理 ----------------------asyncdefauth_handler(websocket,path):print(f"客户端{websocket.remote_address}认证通过,开始通信")asyncformessageinwebsocket:awaitwebsocket.send(f"认证通过的响应:{message}")# ---------------------- 启动认证服务端 ----------------------asyncdefstart_auth_server():asyncwithwebsockets.serve(auth_middleware,"localhost",8770):print("认证服务端已启动:ws://localhost:8770")awaitasyncio.Future()# ---------------------- 认证客户端 ----------------------asyncdefauth_client(valid_token:bool=True):# 构造请求头(携带 Token)headers={}ifvalid_token:headers["X-Auth-Token"]="token_123"# 合法 Tokenelse:headers["X-Auth-Token"]="invalid_token"# 非法 Tokentry:asyncwithwebsockets.connect("ws://localhost:8770",extra_headers=headers# 自定义请求头)aswebsocket:awaitwebsocket.send("认证后的业务消息")resp=awaitwebsocket.recv()print(f"客户端收到:{resp}")exceptwebsockets.exceptions.ConnectionRefusedErrorase:print(f"连接被拒绝:{e}")# 运行:# 1. 启动服务端:asyncio.run(start_auth_server())# 2. 合法 Token 客户端:asyncio.run(auth_client(valid_token=True))# 3. 非法 Token 客户端:asyncio.run(auth_client(valid_token=False))3.7 进阶 API:服务端广播(多客户端通信)
WebSocket 最典型的场景是多客户端实时通信(如聊天室),核心是维护客户端连接池,实现消息广播:
示例:简易聊天室(广播+多客户端)
importasyncioimportwebsocketsfromtypingimportSet# 维护所有活跃的客户端连接active_connections:Set[websockets.WebSocketServerProtocol]=set()asyncdefchat_handler(websocket,path):# 1. 新客户端连接:加入连接池active_connections.add(websocket)client_addr=websocket.remote_addressprint(f"[{client_addr}] 加入聊天室,当前在线:{len(active_connections)}")try:# 2. 接收客户端消息并广播asyncformessageinwebsocket:broadcast_msg=f"[{client_addr}]{message}"print(f"广播消息:{broadcast_msg}")# 遍历所有客户端,发送广播消息forconninactive_connections:ifconn!=websocket:# 不发给自己awaitconn.send(broadcast_msg)exceptwebsockets.exceptions.ConnectionClosed:print(f"[{client_addr}] 异常断开")finally:# 3. 客户端断开:从连接池移除active_connections.remove(websocket)print(f"[{client_addr}] 离开聊天室,当前在线:{len(active_connections)}")asyncdefstart_chat_server():asyncwithwebsockets.serve(chat_handler,"localhost",8771):print("聊天室服务端已启动:ws://localhost:8771")awaitasyncio.Future()# ---------------------- 聊天室客户端 ----------------------asyncdefchat_client(username:str):asyncwithwebsockets.connect("ws://localhost:8771")aswebsocket:# 启动接收消息的任务(后台运行,实时接收广播)recv_task=asyncio.create_task(recv_chat_msg(websocket))# 控制台输入消息并发送whileTrue:msg=input(f"{username}> ")ifmsg.lower()=="exit":breakawaitwebsocket.send(f"{username}:{msg}")# 取消接收任务recv_task.cancel()awaitwebsocket.close()asyncdefrecv_chat_msg(websocket):"""后台接收广播消息"""whileTrue:try:msg=awaitwebsocket.recv()print(f"\n收到广播:{msg}")print("> ",end="",flush=True)# 恢复输入提示符exceptwebsockets.exceptions.ConnectionClosed:break# 运行:# 1. 启动聊天室服务端:asyncio.run(start_chat_server())# 2. 启动多个客户端(不同终端运行):# asyncio.run(chat_client("用户1"))# asyncio.run(chat_client("用户2"))四、实战案例:串联所有常用 API 的实时监控系统
场景需求
实现一个简易的实时监控系统:
- 服务端:
- 认证客户端(Token 验证);
- 接收客户端上报的监控数据(二进制/文本);
- 广播监控数据给所有在线客户端;
- 心跳检测,清理离线客户端;
- 完整异常处理。
- 客户端:
- 携带 Token 连接服务端;
- 定期上报监控数据(CPU/内存使用率,模拟);
- 实时接收服务端广播的所有客户端监控数据;
- 心跳检测,断连自动重连。
完整代码实现
4.1 服务端代码(monitor_server.py)
importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection,ConnectionClosedfromtypingimportSet,Dictfromdatetimeimportdatetime# 配置VALID_TOKENS={"monitor_token_2025"}# 合法认证 TokenSERVER_HOST="localhost"SERVER_PORT=8772HEARTBEAT_INTERVAL=10# 服务端心跳检测间隔(秒)CLIENT_TIMEOUT=30# 客户端超时时间(秒)# 客户端连接池:{连接对象: {"addr": 地址, "last_heartbeat": 最后心跳时间}}client_pool:Dict[websockets.WebSocketServerProtocol,dict]={}# ---------------------- 认证逻辑 ----------------------asyncdefauth_middleware(websocket,path):token=websocket.request_headers.get("X-Monitor-Token")ifnottokenortokennotinVALID_TOKENS:raiseRejectConnection(401,"Invalid Monitor Token")# 认证通过,记录客户端信息client_pool[websocket]={"addr":websocket.remote_address,"last_heartbeat":datetime.now()}print(f"[{datetime.now()}] 客户端{websocket.remote_address}认证通过,加入监控")# 进入业务处理awaitmonitor_handler(websocket,path)# ---------------------- 心跳检测任务 ----------------------asyncdefserver_heartbeat_check():"""后台定时清理离线客户端"""whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)now=datetime.now()# 遍历连接池,检测超时客户端forconn,infoinlist(client_pool.items()):if(now-info["last_heartbeat"]).total_seconds()>CLIENT_TIMEOUT:print(f"[{datetime.now()}] 客户端{info['addr']}心跳超时,强制断开")awaitconn.close(code=1001,reason="Heartbeat timeout")delclient_pool[conn]# ---------------------- 业务处理 ----------------------asyncdefmonitor_handler(websocket,path):try:asyncformessageinwebsocket:# 更新最后心跳时间(无论消息类型,视为存活)client_pool[websocket]["last_heartbeat"]=datetime.now()# 处理监控数据ifisinstance(message,str):# 文本消息:心跳/控制指令ifmessage=="ping":awaitwebsocket.send("pong")print(f"[{datetime.now()}] 客户端{websocket.remote_address}心跳响应")else:print(f"[{datetime.now()}] 客户端{websocket.remote_address}文本监控数据:{message}")elifisinstance(message,bytes):# 二进制消息:模拟监控数据(如序列化的指标)print(f"[{datetime.now()}] 客户端{websocket.remote_address}二进制监控数据(长度:{len(message)})")# 广播监控数据给所有客户端broadcast_msg=f"[{websocket.remote_address}]{message[:50]}..."# 截断长消息forconninclient_pool:ifconn!=websocket:awaitconn.send(broadcast_msg)exceptConnectionClosedase:print(f"[{datetime.now()}] 客户端{websocket.remote_address}断开(码:{e.code},原因:{e.reason})")finally:# 清理连接池ifwebsocketinclient_pool:delclient_pool[websocket]print(f"[{datetime.now()}] 客户端{websocket.remote_address}已移除,当前在线:{len(client_pool)}")# ---------------------- 启动服务端 ----------------------asyncdefstart_monitor_server():# 启动心跳检测后台任务asyncio.create_task(server_heartbeat_check())# 启动 WebSocket 服务asyncwithwebsockets.serve(auth_middleware,SERVER_HOST,SERVER_PORT):print(f"[{datetime.now()}] 监控服务端已启动:ws://{SERVER_HOST}:{SERVER_PORT}")awaitasyncio.Future()if__name__=="__main__":asyncio.run(start_monitor_server())4.2 客户端代码(monitor_client.py)
importasyncioimportwebsocketsfromwebsockets.exceptionsimportConnectionRefusedError,ConnectionClosedimportrandomfromdatetimeimportdatetimeimporttime# 配置SERVER_URL="ws://localhost:8772"AUTH_TOKEN="monitor_token_2025"# 合法 TokenREPORT_INTERVAL=5# 监控数据上报间隔(秒)HEARTBEAT_INTERVAL=8# 客户端心跳间隔(秒)RECONNECT_INTERVAL=3# 断连重连间隔(秒)# ---------------------- 心跳任务 ----------------------asyncdefclient_heartbeat(websocket):"""客户端心跳:定期发送 ping"""whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)ifwebsocket.closed:breaktry:awaitwebsocket.send("ping")resp=awaitasyncio.wait_for(websocket.recv(),timeout=3)ifresp=="pong":print(f"[{datetime.now()}] 心跳正常")else:raiseException("非 pong 响应")exceptasyncio.TimeoutError:print(f"[{datetime.now()}] 心跳超时,准备重连")awaitwebsocket.close()breakexceptExceptionase:print(f"[{datetime.now()}] 心跳异常:{e}")break# ---------------------- 接收广播任务 ----------------------asyncdefrecv_broadcast(websocket):"""接收服务端广播的监控数据"""whileTrue:try:msg=awaitwebsocket.recv()print(f"\n[{datetime.now()}] 收到监控广播:{msg}")exceptConnectionClosed:break# ---------------------- 上报监控数据 ----------------------asyncdefreport_monitor_data(websocket):"""模拟上报 CPU/内存使用率(文本+二进制)"""whileTrue:awaitasyncio.sleep(REPORT_INTERVAL)ifwebsocket.closed:break# 模拟监控数据cpu_usage=random.uniform(10.0,80.0)mem_usage=random.uniform(20.0,90.0)# 1. 发送文本监控数据text_data=f"CPU:{cpu_usage:.2f}%, MEM:{mem_usage:.2f}%"awaitwebsocket.send(text_data)print(f"[{datetime.now()}] 上报文本监控数据:{text_data}")# 2. 发送二进制监控数据(模拟序列化的指标)binary_data=f"{cpu_usage:.2f}|{mem_usage:.2f}".encode("utf-8")awaitwebsocket.send(binary_data)print(f"[{datetime.now()}] 上报二进制监控数据:{binary_data}")# ---------------------- 客户端主逻辑(支持重连) ----------------------asyncdefmonitor_client():whileTrue:try:# 建立连接(携带认证 Token)asyncwithwebsockets.connect(SERVER_URL,extra_headers={"X-Monitor-Token":AUTH_TOKEN})aswebsocket:print(f"[{datetime.now()}] 成功连接监控服务端")# 启动后台任务heartbeat_task=asyncio.create_task(client_heartbeat(websocket))broadcast_task=asyncio.create_task(recv_broadcast(websocket))report_task=asyncio.create_task(report_monitor_data(websocket))# 等待任务完成(或连接断开)awaitasyncio.gather(heartbeat_task,broadcast_task,report_task)exceptConnectionRefusedError:print(f"[{datetime.now()}] 连接被拒绝(服务端未启动/Token 错误)")exceptConnectionClosed:print(f"[{datetime.now()}] 连接断开")exceptExceptionase:print(f"[{datetime.now()}] 客户端异常:{e}")# 断连后重连print(f"[{datetime.now()}]{RECONNECT_INTERVAL}秒后尝试重连...")awaitasyncio.sleep(RECONNECT_INTERVAL)if__name__=="__main__":asyncio.run(monitor_client())运行说明
- 启动服务端:
python monitor_server.py; - 启动多个客户端(不同终端):
python monitor_client.py; - 观察效果:
- 客户端自动认证连接;
- 定期上报文本/二进制监控数据;
- 服务端广播数据给所有客户端;
- 心跳检测,超时自动清理客户端;
- 客户端断连后自动重连。
五、核心 API 总结与最佳实践
5.1 核心 API 速查表
| 类别 | API 名称/方法 | 作用 |
|---|---|---|
| 服务端 | websockets.serve(handler, host, port) | 创建 WebSocket 服务端 |
| 客户端 | websockets.connect(uri, **kwargs) | 连接 WebSocket 服务端 |
| 消息收发 | await websocket.send(msg) | 发送文本/二进制消息 |
await websocket.recv() | 接收消息(单次) | |
async for msg in websocket | 持续监听消息(推荐) | |
| 连接管理 | websocket.closed | 判断连接是否关闭 |
await websocket.close(code, reason) | 主动关闭连接(code 遵循 WebSocket 标准码) | |
websocket.request_headers | 获取客户端请求头(服务端) | |
| 认证 | RejectConnection(status_code, reason) | 服务端拒绝连接(认证失败) |
| 心跳 | await websocket.ping()/pong() | 内置 ping/pong 心跳(底层) |
| 异常 | ConnectionClosed | 连接关闭异常 |
InvalidURI | 无效 URI 异常 | |
TimeoutError | 连接/收发超时异常 |
5.2 最佳实践
- 连接池管理:服务端维护客户端连接池时,使用
set/dict存储连接,并在finally块中清理,避免内存泄漏; - 心跳必加:长连接场景必须实现心跳检测,防止闲置连接被网关/服务器断开;
- 异常全覆盖:至少捕获
ConnectionClosed、ConnectionRefusedError、TimeoutError等核心异常; - 重连机制:客户端实现断连自动重连,提升鲁棒性;
- 并发控制:服务端广播时,若客户端数量多,使用
asyncio.Semaphore限制并发发送,避免事件循环阻塞; - 认证安全:认证 Token 避免明文传输(生产环境建议使用 WSS(WebSocket Secure),即
wss://); - 消息大小限制:生产环境可通过
max_size参数限制消息大小,防止超大消息攻击:# 服务端限制单条消息最大 1MBwebsockets.serve(handler,host,port,max_size=1024*1024)# 客户端限制单条消息最大 1MBwebsockets.connect(uri,max_size=1024*1024)
六、总结
本文覆盖了websockets库的所有核心常用 API:
- 基础:服务端创建、客户端连接、文本/二进制消息收发;
- 进阶:连接管理、异常处理、心跳检测、身份认证、服务端广播;
- 实战:通过实时监控系统串联所有 API,实现认证、心跳、数据上报、广播、重连等完整功能。
掌握这些知识点后,你可以轻松构建各类实时通信场景(聊天室、实时监控、推送服务、在线游戏等),同时遵循最佳实践确保系统的稳定性和安全性。