← 返回Web开发目录
← 返回学习笔记首页
WebSocket实时通信
Web开发专题 · 掌握Web实时通信技术
专题: Python Web开发系统学习
关键词: Python, Web开发, WebSocket, 实时通信, SocketIO, Django Channels, 聊天室, 服务器推送
一、WebSocket概述
1.1 什么是WebSocket
WebSocket是HTML5引入的一种全双工通信协议,它通过在客户端和服务器之间建立持久连接,实现了双向实时数据传输。与传统的HTTP请求-响应模式不同,WebSocket连接建立后,服务器可以主动向客户端推送消息,客户端也可以随时向服务器发送消息,整个过程只需要一次握手。
WebSocket协议在2011年被IETF标准化为RFC 6455,随后被所有主流浏览器支持。它使用ws://(非加密)和wss://(加密)作为URI方案,底层依赖于TCP协议。WebSocket的设计目标是为Web应用提供一种高效、低开销的实时通信方式,弥补HTTP协议在实时交互场景下的不足。
1.2 WebSocket与HTTP的核心区别
对比维度
HTTP协议
WebSocket协议
通信模式
请求-响应(半双工)
全双工
连接方式
短连接,每次请求建立新连接
长连接,握手后保持连接
服务器推送
不支持(需轮询或SSE)
原生支持
协议开销
请求头较大(cookies等)
控制帧开销仅2字节
实时性
取决于轮询间隔
毫秒级延迟
跨域支持
CORS机制
同源策略宽松
1.3 适用场景
WebSocket广泛应用于以下实时性要求高的场景:
在线聊天 :即时消息应用,如微信网页版、Slack、Discord等
实时通知 :邮件通知、系统告警、订单提醒等推送服务
协作文档 :Google Docs、腾讯文档等多人在线编辑
股票行情 :实时报价、K线图更新、交易状态推送
在线游戏 :实时对战、状态同步、玩家位置更新
IoT数据 :传感器实时数据采集和监控
视频直播 :弹幕系统、礼物特效、实时互动
WebSocket的诞生解决了Web实时通信的痛点——在它之前,开发者只能通过轮询(Polling)、长轮询(Long Polling)或服务器发送事件(SSE)来模拟实时通信,但这些方案都有各自的局限性和资源浪费。
二、WebSocket协议
2.1 握手过程
WebSocket连接的建立始于一次特殊的HTTP升级请求。客户端发送带有Upgrade头的HTTP请求,服务端验证后返回101状态码,完成协议切换。握手成功后,通信从HTTP协议无缝切换到WebSocket协议。
客户端请求示例:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com
服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
握手过程中,Sec-WebSocket-Key是一个随机Base64编码的16字节值,服务器将其与固定GUID拼接后计算SHA-1哈希,再Base64编码得到Sec-WebSocket-Accept。这个机制确保只有理解WebSocket协议的服务器才能成功建立连接,防止非WebSocket服务器误处理。
2.2 数据帧格式
WebSocket数据传输以帧(Frame)为单位,帧格式设计简洁高效。一个数据帧包含以下几个关键字段:
FIN (1位):标记是否为消息的最后一帧,用于消息分片
Opcode (4位):帧类型,如文本帧(0x1)、二进制帧(0x2)、关闭帧(0x8)、Ping(0x9)、Pong(0xA)
Mask (1位):客户端发送的数据必须掩码,服务端发送的数据不掩码
Payload Length (7/7+16/7+64位):数据长度编码,采用可变长度方案
Masking Key (32位,仅当Mask=1时存在):用于数据掩码的密钥
Payload Data :实际传输的数据内容
帧类型的Opcode值决定了帧的用途:0x1表示UTF-8文本数据,0x2表示二进制数据,0x8表示关闭连接,0x9表示Ping心跳检测,0xA表示Pong响应。
2.3 关闭连接
WebSocket连接的关闭由关闭帧 (Close Frame,Opcode 0x8)控制。任意一方可以主动发起关闭,发送关闭帧后可附带一个2字节的状态码(如1000表示正常关闭)和可选的解释信息。对方收到关闭帧后应回复一个关闭帧,随后TCP连接关闭。关闭状态码帮助诊断连接中断原因:
1000 :正常关闭
1001 :端点离开(如服务器关闭、页面跳转)
1002 :协议错误
1003 :不支持的数据类型
1009 :消息过大
1011 :服务端异常
2.4 心跳检测(Ping/Pong)
WebSocket协议内置了心跳检测机制。任一端可以发送Ping帧(Opcode 0x9),接收方必须回复Pong帧(Opcode 0xA)。Ping/Pong帧常用于:
检测连接是否存活,及早发现断开的连接
防止中间网络设备(如路由器、防火墙)因空闲超时而断开连接
测量网络延迟(RTT)
在实际应用中,服务器通常每隔一定时间(如30秒)发送Ping帧,如果客户端在超时时间内未回复Pong,则认为连接已断开并执行清理。大多数WebSocket框架(如Flask-SocketIO、Django Channels)都内置了心跳机制,开发者只需配置间隔时间即可。
三、Flask-SocketIO
3.1 安装与配置
Flask-SocketIO是基于Flask框架的WebSocket扩展,底层使用Socket.IO协议(支持WebSocket、轮询等多种传输方式)。安装简单,一行命令即可完成:
pip install flask-socketio
# 可选:使用Redis作为消息队列
pip install redis
3.2 服务端事件处理
Flask-SocketIO使用事件驱动模型,服务器通过装饰器注册事件处理函数:
from flask import Flask
from flask_socketio import SocketIO, send, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('message') # 处理客户端发来的消息
def handle_message(msg):
print(f'收到消息: {msg}')
send(msg, broadcast=True) # 广播给所有客户端
@socketio.on('custom_event') # 自定义事件
def handle_custom(data):
print(f'收到自定义数据: {data}')
emit('response', {'status': 'ok'}, room=data.get('room'))
if __name__ == '__main__':
socketio.run(app, debug=True, port=5000)
关键API说明:
socketio.on('event_name'):注册事件处理器
send(msg):发送消息(默认事件为message)
emit('event', data):发送自定义事件
broadcast=True:广播给所有客户端
room='room_name':发送到指定房间
3.3 客户端连接
Socket.IO的客户端使用JavaScript库,连接方式简洁:
<!-- 引入Socket.IO客户端库 -->
<script src="https://cdn.socket.io/4.5.0/socket.io.min.js"></script>
<script>
const socket = io('http://localhost:5000');
socket.on('connect', () => {
console.log('已连接到服务器,ID:', socket.id);
});
socket.on('message', (data) => {
console.log('收到消息:', data);
});
socket.on('response', (data) => {
console.log('自定义事件响应:', data);
});
// 发送消息
socket.send('Hello WebSocket!');
// 发送自定义事件
socket.emit('custom_event', {room: 'general', text: 'Hello!'});
socket.on('disconnect', () => {
console.log('连接已断开');
});
</script>
Socket.IO客户端会自动尝试多种传输方式(WebSocket优先),并在连接中断时自动重连,大大简化了开发者的工作量。
3.4 房间和命名空间
房间(Room)和命名空间(Namespace)是Socket.IO的核心概念,用于消息路由和隔离。房间是逻辑分组,客户端可以加入或离开房间,消息可以发送到特定房间内的所有客户端。命名空间提供更高层次的隔离,不同命名空间下的房间互不干扰。
@socketio.on('join')
def on_join(data):
username = data['username']
room = data['room']
join_room(room) # 加入房间
emit('status', f'{username} 已加入房间 {room}', room=room)
@socketio.on('leave')
def on_leave(data):
username = data['username']
room = data['room']
leave_room(room) # 离开房间
emit('status', f'{username} 已离开房间 {room}', room=room)
@socketio.on('room_message')
def room_message(data):
# 发送消息到指定房间
emit('message', data['msg'], room=data['room'])
3.5 广播消息
广播是实时应用中最常见的需求之一。Flask-SocketIO提供了多种广播方式:
全局广播 :emit('event', data, broadcast=True) 发送给所有连接的客户端
房间广播 :emit('event', data, room='room_name') 发送给房间内所有客户端
排除发送者 :emit('event', data, broadcast=True, include_self=False) 不包含发送者自身
命名空间广播 :emit('event', data, namespace='/custom') 在指定命名空间广播
四、Django Channels
4.1 Channels概念
Django Channels是Django官方推出的WebSocket扩展,它将Django从传统的同步HTTP框架扩展为支持WebSocket、HTTP2、MQTT等多种协议的异步框架。Channels的核心思想是引入ASGI (Asynchronous Server Gateway Interface),作为WSGI的异步替代方案。ASGI支持异步处理,使得Django能够同时处理HTTP请求和WebSocket连接。
安装Django Channels需要同时安装ASGI服务器:
pip install channels
pip install channels_redis # Redis作为频道层后端
pip install uvicorn # ASGI服务器
4.2 消费者(Consumer)
Consumer是Channels中最核心的概念,相当于WebSocket视图函数。它定义了连接建立、断开和消息处理的行为。Consumer支持同步和异步两种模式:
# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
# 从URL路由获取房间名
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
# 加入房间组
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# 离开房间组
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
# 接收客户端消息并广播
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
async def chat_message(self, event):
# 发送消息给WebSocket
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
4.3 路由配置
Channels使用ASGI路由 将WebSocket连接分发到对应的Consumer。需要创建路由文件和配置ASGI应用入口:
# routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
# asgi.py (项目主目录)
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(
# 导入WebSocket路由
from chat import routing
routing.websocket_urlpatterns
)
),
})
在settings.py中还需要添加channels应用并配置频道层:
INSTALLED_APPS = [
...
'channels',
]
ASGI_APPLICATION = 'myproject.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)],
},
},
}
4.4 WebSocket认证
Django Channels提供了多种认证方式。最常用的是AuthMiddlewareStack ,它会自动将Django用户认证信息注入到scope中,使Consumer内能通过self.scope['user']访问当前用户。对于Token认证或自定义认证,可以编写自定义中间件:
# 自定义Token认证中间件
from channels.db import database_sync_to_async
from channels.middleware import BaseMiddleware
from rest_framework.authtoken.models import Token
class TokenAuthMiddleware(BaseMiddleware):
async def __call__(self, scope, receive, send):
query_string = scope.get('query_string', b'').decode()
token_key = None
for param in query_string.split('&'):
if param.startswith('token='):
token_key = param.split('=')[1]
break
if token_key:
try:
token = await database_sync_to_async(
Token.objects.get
)(key=token_key)
scope['user'] = token.user
except Token.DoesNotExist:
pass
return await super().__call__(scope, receive, send)
4.5 频道层(Redis)
频道层(Channel Layer)是Channels分布式通信的核心。它允许多个Django进程(甚至跨服务器)之间共享消息。Redis是最常用的频道层后端,支持生产者-消费者模式的消息传递。当某个Consumer发送群组消息时,消息先发送到Redis,然后由Redis分发给群组内所有Consumer实例,无论它们运行在哪个进程或服务器上。
频道层的核心API:
group_add(group, channel):将频道加入群组
group_discard(group, channel):将频道从群组移除
group_send(group, message):向群组内所有频道发送消息
send(channel, message):向单个频道发送消息
五、FastAPI WebSocket
5.1 @app.websocket()装饰器
FastAPI原生支持WebSocket,不需要额外安装扩展。使用@app.websocket()装饰器即可定义WebSocket端点。FastAPI的WebSocket支持异步操作,完美融合了Starlette的底层能力。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'服务器回显: {data}')
except WebSocketDisconnect:
print('客户端已断开连接')
FastAPI支持三种消息接收方式:
receive_text():接收文本消息
receive_bytes():接收二进制消息
receive_json():接收并自动解析JSON消息
对应的发送方法:send_text()、send_bytes()、send_json()。
5.2 依赖注入支持
FastAPI的依赖注入系统同样适用于WebSocket,可以用来共享数据库连接、用户认证、配置管理等:
from fastapi import Depends, WebSocket, Cookie, Query
async def get_token(
websocket: WebSocket,
token: str = Query(...)
):
if token != 'secret-token':
await websocket.close(code=4001)
return None
return token
@app.websocket('/ws/auth')
async def authenticated_websocket(
websocket: WebSocket,
token: str = Depends(get_token)
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f'认证用户消息: {data}')
依赖注入使得认证、日志、数据库会话等横切关注点与业务逻辑解耦,代码更清晰、更可测试。
5.3 连接管理
在生产环境中,管理大量WebSocket连接是常见需求。FastAPI中通常使用一个全局的连接管理器来跟踪所有活跃连接:
from typing import List
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat')
async def chat(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f'用户: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast('用户已离开')
5.4 WebSocket测试
FastAPI提供了内置的TestClient用于测试WebSocket端点。测试WebSocket时需要使用with语句管理连接生命周期:
from fastapi.testclient import TestClient
def test_websocket():
client = TestClient(app)
with client.websocket_connect('/ws') as websocket:
websocket.send_text('Hello FastAPI!')
data = websocket.receive_text()
assert data == '服务器回显: Hello FastAPI!'
def test_chat_room():
client = TestClient(app)
with client.websocket_connect('/ws/chat') as ws1:
with client.websocket_connect('/ws/chat') as ws2:
ws1.send_text('大家好')
# ws2应该收到广播消息
msg_to_ws2 = ws2.receive_text()
assert '大家好' in msg_to_ws2
六、前端WebSocket
6.1 WebSocket对象创建
浏览器提供了原生WebSocket API,无需额外库即可使用。创建WebSocket连接只需实例化WebSocket对象并传入URL:
// 创建WebSocket连接
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/chat/`;
const socket = new WebSocket(wsUrl);
URL格式使用ws://或wss://前缀,后者对应加密的WebSocket连接(类似HTTPS)。为了安全,生产环境应始终使用wss://。
6.2 事件处理
WebSocket对象提供了四个核心事件,覆盖连接生命周期:
// onopen:连接建立时触发
socket.onopen = function(event) {
console.log('WebSocket连接已建立');
// 可以在这里发送初始消息
socket.send(JSON.stringify({
type: 'join',
username: '用户A'
}));
};
// onmessage:收到消息时触发
socket.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('收到消息:', data);
// 更新UI
displayMessage(data);
};
// onclose:连接关闭时触发
socket.onclose = function(event) {
console.log('WebSocket连接已关闭 代码:', event.code);
// 可以在这里触发重连
if (event.code !== 1000) {
reconnect();
}
};
// onerror:发生错误时触发
socket.onerror = function(error) {
console.error('WebSocket错误:', error);
};
6.3 send()发送消息
send()方法用于向服务器发送数据,支持文本、二进制等多种格式:
// 发送文本消息
socket.send('Hello Server!');
// 发送JSON对象
socket.send(JSON.stringify({
type: 'chat_message',
content: '你好!',
timestamp: Date.now()
}));
// 发送二进制数据(Blob)
const blob = new Blob(['二进制数据'], {type: 'text/plain'});
socket.send(blob);
// 发送二进制数据(ArrayBuffer)
const buffer = new ArrayBuffer(4);
const view = new Uint32Array(buffer);
view[0] = 12345;
socket.send(buffer);
注意:send()方法必须在连接建立后(即onopen触发后)调用,否则会抛出错误。建议在onopen回调中确认连接状态后再发送消息。
6.4 重连机制
WebSocket连接可能因网络波动、服务器重启等原因断开,实现自动重连机制对用户体验至关重要。以下是封装了重连逻辑的WebSocket客户端示例:
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxRetries = options.maxRetries || 10;
this.reconnectInterval = options.reconnectInterval || 3000;
this.retryCount = 0;
this.listeners = {};
this.connect();
}
connect() {
this.socket = new WebSocket(this.url);
this.socket.onopen = (event) => {
console.log('连接成功');
this.retryCount = 0;
this.emit('open', event);
};
this.socket.onmessage = (event) => {
this.emit('message', event);
};
this.socket.onclose = (event) => {
console.log('连接断开,代码:', event.code);
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = Math.min(
this.reconnectInterval * Math.pow(1.5, this.retryCount - 1),
30000
);
console.log(`将在${delay}ms后重连 (第${this.retryCount}次)`);
setTimeout(() => this.connect(), delay);
} else {
this.emit('failed', event);
}
};
this.socket.onerror = (error) => {
console.error('连接错误:', error);
};
}
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
}
}
on(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event].push(callback);
}
emit(event, data) {
if (this.listeners[event]) {
this.listeners[event].forEach(cb => cb(data));
}
}
close() {
this.maxRetries = 0;
if (this.socket) {
this.socket.close();
}
}
}
// 使用示例
const ws = new ReconnectingWebSocket('ws://localhost:8000/ws/chat/');
ws.on('open', () => console.log('已连接'));
ws.on('message', (event) => {
const data = JSON.parse(event.data);
console.log('聊天消息:', data);
});
ws.send(JSON.stringify({type: 'chat', content: '大家好!'}));
上述重连机制使用了指数退避 策略,重连间隔从3秒开始,每次翻倍,最大30秒,避免在网络不稳定时对服务器造成过大压力。
6.5 与后端配合
前端WebSocket与后端配合时,需要注意协议一致性、数据格式约定和错误处理:
协议一致性 :前后端必须使用相同的消息格式(如JSON结构),通常包含type和payload字段
数据序列化 :前端使用JSON.stringify()序列化,后端使用json.loads()反序列化
心跳同步 :前后端的心跳间隔需要一致,通常由服务器主导,客户端响应
异常处理 :前端捕获所有WebSocket错误,显示友好的错误提示,同时触发重连
连接状态管理 :在UI上显示连接状态(已连接/已断开/重连中),提升用户体验
七、WebSocket应用实践
7.1 聊天室实现
聊天室是WebSocket最经典的应用。一个完整的聊天室系统包括以下功能模块:
用户管理 :登录认证、在线状态、头像显示
房间管理 :创建房间、加入/离开房间、房间列表
消息处理 :文本消息、表情、图片、文件传输
消息持久化 :保存聊天记录到数据库,支持历史消息查询
消息送达确认 :已读/未读状态、消息回执
使用FastAPI实现聊天室核心逻辑:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import Dict, Set
import json
app = FastAPI()
class ChatRoom:
def __init__(self):
self.rooms: Dict[str, Set[WebSocket]] = {}
async def join(self, room_name: str, websocket: WebSocket):
await websocket.accept()
if room_name not in self.rooms:
self.rooms[room_name] = set()
self.rooms[room_name].add(websocket)
await self.broadcast(room_name, {
'type': 'system',
'message': '新用户加入了聊天室'
})
async def leave(self, room_name: str, websocket: WebSocket):
self.rooms[room_name].discard(websocket)
if not self.rooms[room_name]:
del self.rooms[room_name]
await self.broadcast(room_name, {
'type': 'system',
'message': '用户离开了聊天室'
})
async def broadcast(self, room_name: str, message: dict):
if room_name in self.rooms:
for ws in self.rooms[room_name]:
try:
await ws.send_json(message)
except Exception:
pass
chat_room = ChatRoom()
@app.websocket('/ws/chat/{room_name}')
async def chat_endpoint(websocket: WebSocket, room_name: str):
await chat_room.join(room_name, websocket)
try:
while True:
data = await websocket.receive_json()
await chat_room.broadcast(room_name, {
'type': 'message',
'content': data['content'],
'sender': data.get('sender', '匿名'),
'timestamp': data.get('timestamp')
})
except WebSocketDisconnect:
await chat_room.leave(room_name, websocket)
7.2 实时通知系统
实时通知系统在企业应用中非常普遍,适用于订单提醒、审批流通知、系统告警等场景。实现实时通知系统的关键要素:
用户WebSocket映射 :维护用户ID到WebSocket连接的映射关系,支持一人多端
通知分级 :按重要性区分通知级别(普通/重要/紧急),不同级别展示方式不同
离线通知 :用户离线时通知存入数据库,上线后拉取未读通知
通知去重 :避免重复发送相同通知,使用通知ID进行去重
from typing import Dict, List
from fastapi import WebSocket
class NotificationSystem:
def __init__(self):
# 用户WebSocket连接映射
self.user_connections: Dict[int, List[WebSocket]] = {}
async def connect_user(self, user_id: int, websocket: WebSocket):
await websocket.accept()
if user_id not in self.user_connections:
self.user_connections[user_id] = []
self.user_connections[user_id].append(websocket)
# 发送未读通知
unread = await self.get_unread_notifications(user_id)
if unread:
await websocket.send_json({
'type': 'unread_notifications',
'notifications': unread
})
async def send_notification(self, user_id: int, notification: dict):
if user_id in self.user_connections:
for ws in self.user_connections[user_id]:
try:
await ws.send_json({
'type': 'notification',
'notification': notification
})
except Exception:
self.user_connections[user_id].remove(ws)
async def broadcast_to_all(self, notification: dict):
for user_id, connections in self.user_connections.items():
for ws in connections:
try:
await ws.send_json(notification)
except Exception:
pass
async def get_unread_notifications(self, user_id: int):
# 从数据库查询未读通知
return []
7.3 在线人数统计
显示当前在线用户数是实时应用的基础功能。实现方案通常基于连接计数或心跳检测:
import time
from collections import defaultdict
class OnlineCounter:
def __init__(self, timeout=60):
self.connections = {} # websocket_id -> last_heartbeat
self.timeout = timeout
def on_connect(self, ws_id: str):
self.connections[ws_id] = time.time()
def on_heartbeat(self, ws_id: str):
self.connections[ws_id] = time.time()
def on_disconnect(self, ws_id: str):
self.connections.pop(ws_id, None)
def get_online_count(self) -> int:
now = time.time()
# 清理超时连接
expired = [ws_id for ws_id, last in self.connections.items()
if now - last > self.timeout]
for ws_id in expired:
self.connections.pop(ws_id, None)
return len(self.connections)
在线人数统计的优化策略:
心跳去重 :同一用户多个设备只计一次在线
定期清理 :定时清理超时未心跳的连接
广播频率控制 :人数变化后延迟几秒再广播,避免频繁推送
使用Redis :在分布式部署中使用Redis的SET或HyperLogLog存储在线用户
7.4 消息队列集成
在高并发场景下,直接通过WebSocket处理所有消息可能导致服务器压力过大。引入消息队列(如RabbitMQ、Redis Pub/Sub、Kafka)可以解耦消息生产和消费,提升系统弹性和可扩展性:
import asyncio
import json
import aioredis
from fastapi import FastAPI, WebSocket
app = FastAPI()
class RedisPubSubManager:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis = None
self.pubsub = None
self.redis_url = redis_url
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
async def publish(self, channel: str, message: dict):
await self.redis.publish(channel, json.dumps(message))
async def subscribe(self, channel: str):
await self.pubsub.subscribe(channel)
async def listen(self):
async for message in self.pubsub.listen():
if message['type'] == 'message':
yield json.loads(message['data'])
async def close(self):
await self.pubsub.unsubscribe()
await self.redis.close()
# 全局消息管理器
pubsub = RedisPubSubManager()
@app.on_event('startup')
async def startup():
await pubsub.connect()
@app.websocket('/ws/channel/{channel_name}')
async def channel_websocket(websocket: WebSocket, channel_name: str):
await websocket.accept()
await pubsub.subscribe(channel_name)
async def redis_listener():
async for message in pubsub.listen():
try:
await websocket.send_json(message)
except Exception:
break
# 启动后台任务监听Redis消息
listener_task = asyncio.create_task(redis_listener())
try:
while True:
data = await websocket.receive_json()
# 将前端消息发布到Redis频道
await pubsub.publish(channel_name, data)
except Exception:
listener_task.cancel()
finally:
await pubsub.close()
集成消息队列的好处:
解耦 :WebSocket服务与业务逻辑分离,各自独立部署和扩展
削峰填谷 :消息队列缓存突发流量,防止WebSocket服务过载
可靠投递 :消息队列确保消息不丢失,支持重试机制
横向扩展 :多个WebSocket服务器实例通过消息队列共享消息,实现跨服务器广播
异步处理 :处理耗时任务(如消息审核、数据分析)时,先返回响应再异步处理
核心要点总结: WebSocket是Web实时通信的基石协议,Flask-SocketIO、Django Channels和FastAPI各有优势。Flask-SocketIO上手简单、功能全面;Django Channels适合大型项目,与Django ORM无缝集成;FastAPI原生WebSocket性能优异、依赖注入优雅。实际项目中应根据技术栈和规模选择合适方案。前端需重点关注重连机制和状态管理,后端需考虑连接管理、心跳检测、消息队列集成。聊天室、实时通知、在线人数统计是WebSocket的三大经典应用场景。