news 2026/5/15 17:59:21

Flask + WebSocket 实战:从零构建实时聊天应用(附完整代码)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flask + WebSocket 实战:从零构建实时聊天应用(附完整代码)

1. 为什么选择Flask+WebSocket做实时聊天应用

实时聊天应用的核心需求是双向即时通信,传统HTTP协议每次请求都需要重新建立连接,就像每次打电话都要重新拨号一样低效。而WebSocket就像保持通话状态的对讲机,建立连接后双方可以随时发送数据。实测下来,用Python的Flask框架配合WebSocket技术栈,30分钟就能搭建出可用的聊天服务。

我最初尝试用轮询(Polling)实现消息推送,发现服务器压力大且延迟高。后来切换到WebSocket方案,资源消耗直接降低80%。Flask作为轻量级框架,配合Flask-SocketIO这个神器,既保留了Python的简洁语法,又能处理高并发连接。去年做智能家居项目时,就用这套方案实现了设备状态实时同步。

2. 环境搭建与基础配置

2.1 准备Python环境

推荐使用Python 3.8+版本,太新的版本可能会遇到依赖冲突。先用虚拟环境隔离项目:

python -m venv chat_env source chat_env/bin/activate # Linux/Mac chat_env\Scripts\activate # Windows

安装核心依赖时要注意版本兼容性,这是我验证过的稳定组合:

pip install flask==2.0.3 flask-socketio==5.3.4 python-engineio==4.3.4 python-socketio==5.7.2

2.2 最小化Flask应用结构

创建基础项目目录:

/chat_app ├── app.py # 主程序 ├── static/ # 静态资源 │ └── js/app.js # 前端逻辑 └── templates/ # HTML模板 └── index.html

在app.py中初始化应用:

from flask import Flask, render_template from flask_socketio import SocketIO app = Flask(__name__) app.config['SECRET_KEY'] = 'your_secret_key_here' # 生产环境要用更安全的密钥 socketio = SocketIO(app, cors_allowed_origins="*") # 允许跨域 @app.route('/') def index(): return render_template('index.html') if __name__ == '__main__': socketio.run(app, debug=True)

3. 实现双向通信核心逻辑

3.1 服务端事件处理

Flask-SocketIO通过事件驱动模型工作,主要处理三类事件:

  1. 连接事件:客户端建立连接时触发
  2. 自定义事件:前端主动发起的交互
  3. 断开事件:连接关闭时清理资源
@socketio.on('connect') def handle_connect(): print(f'客户端 {request.sid} 已连接') emit('server_response', {'data': '连接成功'}) @socketio.on('disconnect') def handle_disconnect(): print(f'客户端 {request.sid} 已断开') @socketio.on('client_message') def handle_message(data): print(f"收到消息: {data}") emit('server_broadcast', {'user': data['user'], 'msg': data['msg']}, broadcast=True)

3.2 前端交互实现

在static/js/app.js中建立WebSocket连接:

const socket = io(); // 连接成功回调 socket.on('connect', () => { console.log('已连接到服务器'); }); // 接收广播消息 socket.on('server_broadcast', (data) => { const chatBox = document.getElementById('chat-box'); chatBox.innerHTML += `<p><b>${data.user}:</b> ${data.msg}</p>`; }); // 发送消息函数 function sendMessage() { const user = document.getElementById('user').value; const msg = document.getElementById('message').value; socket.emit('client_message', {user, msg}); }

对应的HTML模板示例:

<div id="chat-box" style="height:300px;overflow-y:scroll"></div> <input type="text" id="user" placeholder="你的名字"> <input type="text" id="message" placeholder="输入消息"> <button onclick="sendMessage()">发送</button> <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.4.1/socket.io.min.js"></script> <script src="{{ url_for('static', filename='js/app.js') }}"></script>

4. 生产环境部署要点

4.1 性能优化配置

开发时用内置服务器没问题,但生产环境需要调整:

socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True, keyfile='key.pem', # HTTPS证书 certfile='cert.pem')

建议配合Gunicorn+Eventlet提升并发能力:

pip install gunicorn eventlet gunicorn -k eventlet -w 4 app:app

4.2 常见问题解决方案

消息延迟问题
开启WebSocket压缩减少传输量:

socketio = SocketIO(app, engineio_logger=True, compression_threshold=1024)

跨域问题
除了设置cors_allowed_origins,还需要处理预检请求:

@app.after_request def add_cors_headers(response): response.headers.add('Access-Control-Allow-Headers', 'Content-Type') return response

Nginx配置要点
在/etc/nginx/sites-available/your_site中添加:

location /socket.io { proxy_pass http://127.0.0.1:5000; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; }

5. 功能扩展实战

5.1 实现用户在线列表

服务端维护连接字典:

active_users = {} @socketio.on('register') def handle_register(username): active_users[request.sid] = username emit('user_update', list(active_users.values()), broadcast=True)

前端实时显示:

socket.on('user_update', (users) => { const userList = document.getElementById('user-list'); userList.innerHTML = users.map(u => `<li>${u}</li>`).join(''); });

5.2 消息持久化存储

集成SQLAlchemy保存聊天记录:

from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy(app) class Message(db.Model): id = db.Column(db.Integer, primary_key=True) user = db.Column(db.String(80)) content = db.Column(db.Text) timestamp = db.Column(db.DateTime, default=datetime.utcnow) @socketio.on('client_message') def handle_message(data): msg = Message(user=data['user'], content=data['msg']) db.session.add(msg) db.session.commit()

6. 完整代码示例

服务端最终版本:

# app.py from datetime import datetime from flask import Flask, render_template, request from flask_socketio import SocketIO, emit from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///chat.db' db = SQLAlchemy(app) socketio = SocketIO(app, cors_allowed_origins="*") class Message(db.Model): id = db.Column(db.Integer, primary_key=True) user = db.Column(db.String(80)) content = db.Column(db.Text) timestamp = db.Column(db.DateTime, default=datetime.utcnow) active_users = {} @app.route('/') def index(): return render_template('index.html') @socketio.on('connect') def handle_connect(): emit('server_response', {'data': 'Connected'}) @socketio.on('register') def handle_register(username): active_users[request.sid] = username emit('user_update', list(active_users.values()), broadcast=True) @socketio.on('client_message') def handle_message(data): msg = Message(user=data['user'], content=data['msg']) db.session.add(msg) db.session.commit() emit('server_broadcast', data, broadcast=True) @socketio.on('disconnect') def handle_disconnect(): if request.sid in active_users: del active_users[request.sid] emit('user_update', list(active_users.values()), broadcast=True) if __name__ == '__main__': with app.app_context(): db.create_all() socketio.run(app, debug=True)

前端完整实现:

<!-- templates/index.html --> <!DOCTYPE html> <html> <head> <title>实时聊天室</title> <style> #chat-box { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; } #user-list { float: right; width: 150px; border: 1px solid #eee; padding: 10px; } </style> </head> <body> <div id="user-list"> <h3>在线用户</h3> <ul id="users"></ul> </div> <div id="chat-box"></div> <input type="text" id="username" placeholder="设置昵称" required> <input type="text" id="message" placeholder="输入消息"> <button onclick="sendMessage()">发送</button> <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.4.1/socket.io.min.js"></script> <script> const socket = io(); let currentUser = ''; socket.on('connect', () => { const username = prompt('请输入昵称') || '匿名用户'; document.getElementById('username').value = username; currentUser = username; socket.emit('register', username); }); socket.on('server_broadcast', (data) => { const chatBox = document.getElementById('chat-box'); const time = new Date().toLocaleTimeString(); chatBox.innerHTML += `<p>[${time}] <b>${data.user}:</b> ${data.msg}</p>`; chatBox.scrollTop = chatBox.scrollHeight; }); socket.on('user_update', (users) => { document.getElementById('users').innerHTML = users.map(u => `<li>${u}</li>`).join(''); }); function sendMessage() { const msg = document.getElementById('message').value; if (msg.trim()) { socket.emit('client_message', { user: currentUser, msg: msg }); document.getElementById('message').value = ''; } } document.getElementById('message').addEventListener('keypress', (e) => { if (e.key === 'Enter') sendMessage(); }); </script> </body> </html>

7. 进阶优化方向

消息已读回执
在消息对象中添加read_by字段,当客户端收到消息后发送确认事件:

@socketio.on('message_read') def handle_read(msg_id): msg = Message.query.get(msg_id) if msg: msg.read_by = current_user db.session.commit()

离线消息处理
用户重连时检查未读消息:

@socketio.on('reconnect') def handle_reconnect(): unread = Message.query.filter( Message.timestamp > last_online_time, Message.user != current_user ).all() for msg in unread: emit('new_message', {'id': msg.id, 'content': msg.content})

消息加密传输
使用PyNaCl进行端到端加密:

from nacl.secret import SecretBox key = b'32-byte-key-for-AES256-encryption' box = SecretBox(key) encrypted = box.encrypt(b"secret message") plaintext = box.decrypt(encrypted)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/15 17:57:03

基于MediaPipe与Two.js的手势交互项目Clawspace开发实践

1. 项目概述与核心价值最近在GitHub上看到一个挺有意思的项目&#xff0c;叫nickytonline/clawspace。乍一看这个名字&#xff0c;可能会有点摸不着头脑&#xff0c;“Claw Space”直译是“爪子空间”&#xff0c;听起来像是个游戏或者某种创意工具。点进去深入研究后&#xff…

作者头像 李华
网站建设 2026/5/15 17:55:47

UBX-M8030-KA-DR,支持3D传感器直接连接的M8 ADR芯片

简介今天我要向大家介绍的是 u-blox 的高性能GNSS芯片——UBX-M8030-KA-DR。这是一款专为满足最新交互式导航系统和显示需求而设计的第四代汽车航位推算&#xff08;ADR&#xff09;芯片。该芯片基于u-blox M8 72通道GNSS引擎&#xff0c;支持并发接收GPS/QZSS、GLONASS、BeiDo…

作者头像 李华
网站建设 2026/5/15 17:55:17

告别代码调试:用Roboflow一站式搞定数据集图像增强与格式转换

1. 为什么你需要Roboflow处理数据集&#xff1f; 做计算机视觉项目时&#xff0c;最头疼的往往不是模型调参&#xff0c;而是数据准备阶段。我去年做工业质检项目时就深有体会——好不容易从产线收集了200张缺陷图片&#xff0c;训练出来的模型却总是过拟合。导师一句话点醒我&…

作者头像 李华
网站建设 2026/5/15 17:54:05

从数字到实体:5步掌握Cura 3D打印切片软件,让创意触手可及

从数字到实体&#xff1a;5步掌握Cura 3D打印切片软件&#xff0c;让创意触手可及 【免费下载链接】Cura 3D printer / slicing GUI built on top of the Uranium framework 项目地址: https://gitcode.com/gh_mirrors/cu/Cura 想要将你的3D设计变成真实的物理对象吗&am…

作者头像 李华
网站建设 2026/5/15 17:53:09

高速网络调试:应对Gb/s级数据洪流的观测与诊断方法论

1. 项目概述&#xff1a;当连接速度不再是瓶颈“调试速度高达几个Gb每秒的连接”&#xff0c;这个标题听起来像是一个纯粹的性能炫耀&#xff0c;或者是一个遥不可及的实验室场景。但如果你在云计算、金融高频交易、大规模数据中心运维、视频流媒体平台或者大型在线游戏的后台工…

作者头像 李华