一、WebSocket概述
1.1 什么是WebSocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议,由RFC 6455定义。它使得客户端和服务器之间可以进行实时、持久的双向数据交换,而不需要像HTTP那样由客户端反复发起请求。WebSocket协议在2011年被IETF标准化,目前已被所有主流浏览器支持。
与传统的HTTP请求-响应模式不同,WebSocket建立连接后,服务器可以主动向客户端推送数据,客户端也可以随时向服务器发送消息。这种通信方式极大地降低了网络延迟和服务器的资源消耗,特别适合需要实时交互的应用场景。
1.2 WebSocket与HTTP的对比
HTTP协议是单向的请求-响应模式。客户端发送请求,服务器返回响应,连接随即关闭(HTTP/1.1的keep-alive虽然可以复用连接,但仍然是请求-响应的模式)。WebSocket则完全不同:一旦握手成功,连接保持打开状态,双方可以随时发送数据,无需重复建立连接。
具体来说,WebSocket相比HTTP有以下优势:持久连接——连接建立后一直保持,无需重复握手;双向推送——服务器可以主动发送数据到客户端;低延迟——省去了HTTP头部和握手开销,数据传输更高效;协议开销小——WebSocket帧头部仅2-14字节,而HTTP请求头部通常有几百到几千字节。
下表总结了HTTP与WebSocket的核心差异:
| 对比维度 | HTTP协议 | WebSocket协议 |
| 通信模式 | 请求-响应(单向) | 全双工(双向) |
| 连接特性 | 无状态,短连接 | 有状态,持久连接 |
| 数据推送 | 不支持服务器推送(需轮询) | 支持服务器主动推送 |
| 协议开销 | 头部开销大 | 帧头部开销极小 |
| 实时性 | 依赖轮询间隔,延迟高 | 真正的实时通信 |
| 适用场景 | REST API、网页资源加载 | 聊天、游戏、实时数据 |
1.3 适用场景
WebSocket的实时双向通信特性使其在以下场景中得到广泛应用:
- 即时聊天/消息推送:用户之间或系统向用户推送实时消息,如微信网页版、Slack等。
- 实时通知:系统事件发生时(如订单状态变更、告警触发),服务器主动推送通知到客户端。
- 协同编辑:多人同时编辑同一份文档,通过WebSocket同步操作和状态,如Google Docs。
- 在线游戏:多人在线游戏需要低延迟的实时数据交换,如棋牌游戏、竞技游戏。
- 实时数据可视化:股票行情、物联网传感器数据、服务器监控指标的实时更新展示。
- 直播互动:弹幕、点赞、打赏等实时互动功能。
二、FastAPI WebSocket基础
2.1 @app.websocket()装饰器
FastAPI提供了对WebSocket的原生支持,通过@app.websocket()装饰器可以轻松定义WebSocket端点。这个装饰器用法与@app.get()、@app.post()类似,但专门用于处理WebSocket连接。FastAPI内部依赖Starlette的WebSocket实现,提供了完善的类型注解和自动验证功能。
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"收到: {data}")
2.2 WebSocket端点定义
定义一个WebSocket端点需要传入WebSocket类型的参数。FastAPI会自动注入WebSocket实例,开发者无需手动创建。端点函数通常是异步的(async def),因为WebSocket操作本质上是I/O密集型的,异步处理可以最大化并发性能。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
@app.websocket("/chat/{room_id}")
async def chat_endpoint(websocket: WebSocket, room_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# 处理业务逻辑…
except WebSocketDisconnect:
# 处理客户端断开连接的情况
pass
2.3 核心API方法
FastAPI的WebSocket对象提供了一组简洁而强大的API方法,涵盖了WebSocket连接的完整生命周期管理。
websocket.accept():接受WebSocket连接请求。必须在接收或发送任何数据之前调用此方法,否则连接将处于挂起状态。accept()方法还可以接受subprotocol参数,用于选择子协议。
websocket.receive_text():接收客户端发送的文本消息。这是一个协程方法,当没有消息时会阻塞等待。返回一个字符串。如果要接收JSON数据,可以使用receive_json(),它会自动解析JSON字符串为Python字典。
websocket.send_text():向客户端发送文本消息。参数为字符串。对应地,send_json()可以发送JSON数据,它会自动将Python对象序列化为JSON字符串。send_bytes()则用于发送二进制数据。
websocket.close():主动关闭WebSocket连接。可以传入code和reason参数,指定关闭状态码和原因。
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/data")
async def data_endpoint(websocket: WebSocket):
await websocket.accept()
# 接收JSON数据
data = await websocket.receive_json()
print(f"收到数据: {data}")
# 发送JSON响应
await websocket.send_json({
"status": "success",
"received": data
})
# 关闭连接
await websocket.close(code=1000, reason="正常关闭")
2.4 客户端连接示例
在浏览器端,可以使用JavaScript原生的WebSocket API来连接FastAPI的WebSocket端点。
// 创建WebSocket连接
const ws = new WebSocket("ws://localhost:8000/ws");
// 连接打开时触发
ws.onopen = function(event) {
console.log("连接已建立");
ws.send("Hello Server!");
};
// 收到消息时触发
ws.onmessage = function(event) {
console.log("收到服务器消息:", event.data);
};
// 连接关闭时触发
ws.onclose = function(event) {
console.log("连接已关闭,code:", event.code);
};
// 发生错误时触发
ws.onerror = function(event) {
console.error("WebSocket错误:", event);
};
三、WebSocket高级应用
3.1 连接管理
在实际应用中,通常需要管理多个WebSocket连接,以便实现广播、房间、群组等功能。常见的做法是使用一个全局集合来存储活跃的连接对象。由于WebSocket连接涉及并发访问,推荐使用线程安全的容器(如Python的set)或使用锁来保护共享数据。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Set
app = FastAPI()
# 存储所有活跃连接
active_connections: Set[WebSocket] = set()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.add(websocket)
try:
while True:
data = await websocket.receive_text()
# 处理消息…
except WebSocketDisconnect:
active_connections.discard(websocket)
3.2 广播消息
广播是将一条消息发送给所有已连接的客户端。这在聊天室、公告推送等场景中非常常见。实现广播时需要注意异常处理:某些连接可能已经断开但还没有从连接集合中移除,发送消息时可能会抛出异常。
async def broadcast(message: str):
"""向所有活跃连接广播消息"""
disconnected = set()
for connection in active_connections:
try:
await connection.send_text(message)
except Exception:
disconnected.add(connection)
# 清理已断开的连接
active_connections -= disconnected
@app.websocket("/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
active_connections.add(websocket)
try:
while True:
data = await websocket.receive_text()
await broadcast(f"用户: {data}")
except WebSocketDisconnect:
active_connections.discard(websocket)
await broadcast("用户离开了聊天室")
3.3 依赖注入在WebSocket中
FastAPI的依赖注入系统同样适用于WebSocket端点。通过Depends(),我们可以将数据库连接、用户认证、配置对象等注入到WebSocket处理函数中。这在需要验证用户身份或访问共享资源时非常有用。
from fastapi import FastAPI, WebSocket, Depends, WebSocketDisconnect
from fastapi.security import APIKeyHeader
async def verify_token(websocket: WebSocket, token: str = None):
"""验证WebSocket连接token的依赖"""
if token != "my-secret-token":
await websocket.close(code=1008)
raise WebSocketDisconnect()
return {"user": "authenticated-user"}
@app.websocket("/secure/ws")
async def secure_endpoint(
websocket: WebSocket,
user: dict = Depends(verify_token)
):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"已验证用户: {data}")
3.4 断开重连机制
在实际生产环境中,网络波动可能导致WebSocket连接断开。客户端需要实现断开重连逻辑,以确保连接的可靠性。重连策略通常包括:固定间隔重连、指数退避重连(每次重连间隔递增,避免雪崩效应)、随机抖动(在间隔中增加随机值,防止大量客户端同时重连)。
// 带有指数退避的断线重连实现
function connectWebSocket(url, maxRetries = 10) {
let retryCount = 0;
let ws = null;
function connect() {
ws = new WebSocket(url);
ws.onopen = function() {
console.log("连接成功");
retryCount = 0; // 重置重试计数
};
ws.onclose = function(event) {
if (event.code !== 1000) { // 非正常关闭
retryCount++;
if (retryCount <= maxRetries) {
// 指数退避: 1s, 2s, 4s, 8s, ...
const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
// 添加随机抖动,避免同时重连
const jitter = Math.random() * 1000;
setTimeout(connect, delay + jitter);
}
}
};
}
connect();
return ws;
}
3.5 WebSocket握手参数
WebSocket握手阶段可以传递查询参数、Cookie和自定义头部,用于身份验证和初始化配置。在FastAPI中,可以通过WebSocket对象的属性和方法来获取这些信息。
from fastapi import FastAPI, WebSocket, Cookie, Query
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Query(...), # 从查询参数获取
username: str = "anonymous", # 可选的查询参数
):
# 验证握手参数
if not token:
await websocket.close(code=1008)
return
await websocket.accept()
await websocket.send_text(f"欢迎, {username}!")
# ... 后续处理
四、FastAPI中间件
4.1 HTTP中间件
中间件是处理每个请求的钩子函数,可以在请求到达路由处理函数之前或响应返回客户端之前执行自定义逻辑。FastAPI中使用@app.middleware("http")装饰器注册HTTP中间件。中间件函数接收Request和call_next参数,call_next是一个可调用对象,用于将请求传递给下一个中间件或路由处理函数。
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
4.2 WebSocket中间件
FastAPI也支持为WebSocket端点注册中间件。WebSocket中间件与HTTP中间件类似,但参数是WebSocket对象而非Request对象。WebSocket中间件可以用于日志记录、连接计数、IP黑名单检查等场景。
from fastapi import FastAPI, WebSocket
import logging
logger = logging.getLogger("websocket")
@app.middleware("websocket")
async def ws_logging_middleware(websocket: WebSocket, call_next):
logger.info(f"WebSocket连接: {websocket.client}")
try:
await call_next(websocket)
except Exception as e:
logger.error(f"WebSocket错误: {e}")
raise
finally:
logger.info(f"WebSocket断开: {websocket.client}")
4.3 请求处理时间统计
统计每个请求的处理时间是中间件的典型应用场景。通过记录请求进入和响应离开的时间戳,可以计算出请求的总耗时。这些数据可以用于性能监控、SLA保障和瓶颈分析。
from fastapi import FastAPI, Request
import time
import logging
logger = logging.getLogger("performance")
@app.middleware("http")
async def performance_monitor(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
logger.info(
f"{request.method} {request.url.path} - "
f"{response.status_code} - {duration:.4f}s"
)
# 慢查询告警(超过1秒)
if duration > 1.0:
logger.warning(f"慢请求检测: {request.url.path} ({duration:.2f}s)")
return response
4.4 CORS中间件配置
跨域资源共享(CORS)是Web开发中必须处理的问题。FastAPI提供了CORSMiddleware中间件,可以方便地配置允许的来源、方法、头部等。在生产环境中,应该将allow_origins设置为具体的域名列表,而不是使用通配符"*"。
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://example.com",
"https://www.example.com",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
4.5 自定义中间件
除了使用内置中间件,我们还可以创建自定义中间件实现特定的业务逻辑。常见的自定义中间件包括:IP白名单/黑名单过滤、请求速率限制、请求/响应日志记录、自定义安全头部注入、请求体大小限制等。
from fastapi import FastAPI, Request, HTTPException
import re
app = FastAPI()
# 自定义IP白名单中间件
ALLOWED_IPS = ["127.0.0.1", "192.168.1."]
@app.middleware("http")
async def ip_whitelist(request: Request, call_next):
client_ip = request.client.host
# 检查IP是否在白名单中
allowed = any(
client_ip.startswith(allowed)
for allowed in ALLOWED_IPS
)
if not allowed:
raise HTTPException(status_code=403, detail="IP未被授权")
return await call_next(request)
五、后台任务
5.1 BackgroundTasks简介
FastAPI的BackgroundTasks允许在处理请求后执行一些后台操作,而无需等待这些操作完成再返回响应。这对于发送邮件、生成报告、处理上传文件等耗时操作非常有用。BackgroundTasks是FastAPI内置的功能,不需要额外安装依赖。
5.2 add_task()添加后台任务
使用BackgroundTasks非常简单:在路由函数的参数中声明BackgroundTasks类型的参数,FastAPI会自动注入实例。然后调用tasks.add_task()将任务函数添加到队列中。任务函数可以是同步或异步的,但必须是可调用对象。
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email(email: str, message: str):
"""模拟发送邮件(后台执行)"""
import time
time.sleep(3) # 模拟耗时操作
print(f"发送邮件到 {email}: {message}")
@app.post("/register")
async def register(username: str, tasks: BackgroundTasks):
# 立即返回响应
tasks.add_task(send_email, "user@example.com", f"欢迎{username}!")
return {"message": "注册成功,邮件将在后台发送"}
5.3 异步后台任务
BackgroundTasks也支持异步函数。使用异步后台任务时需要注意:执行顺序将在事件循环中进行调度,不会阻塞其他请求的处理。异步任务适合数据库写入、API调用等I/O密集型操作。
from fastapi import FastAPI, BackgroundTasks
import aiohttp
app = FastAPI()
async def notify_external_service(user_id: int):
"""异步通知外部服务"""
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.example.com/notify",
json={"user_id": user_id}
) as resp:
return await resp.json()
@app.post("/users/{user_id}/activate")
async def activate_user(user_id: int, tasks: BackgroundTasks):
tasks.add_task(notify_external_service, user_id)
return {"message": "用户激活中,通知将在后台发送"}
5.4 任务执行时机
BackgroundTasks的执行时机是在响应发送之后。这意味着即使后台任务抛出异常,也不会影响已经发送给客户端的响应。但这也意味着后台任务的错误处理需要由开发者自行负责。建议在后台任务函数内部使用try/except捕获所有异常,并记录日志。
需要注意的是:BackgroundTasks适用于轻量级的后台处理,不适合CPU密集型任务。对于CPU密集型或长时间运行的任务,应该使用Celery、RQ等专门的任务队列系统。
5.5 依赖注入中的后台任务
后台任务也可以与FastAPI的依赖注入系统结合使用。在依赖函数中声明BackgroundTasks参数,可以注册需要在请求处理完成后执行的任务。这使得后台任务可以在多个路由之间共享和复用。
from fastapi import FastAPI, Depends, BackgroundTasks
app = FastAPI()
def log_request(background_tasks: BackgroundTasks, request_path: str):
def write_log(path: str):
with open("access.log", "a") as f:
f.write(f"{path}\n")
background_tasks.add_task(write_log, request_path)
@app.get("/items/")
async def read_items(background_tasks: BackgroundTasks = Depends(log_request)):
return {"items": ["item1", "item2"]}
六、FastAPI部署
6.1 Uvicorn部署
Uvicorn是一个基于ASGI(Asynchronous Server Gateway Interface)协议的服务器,是FastAPI官方推荐的运行方式。它以高性能著称,底层使用uvloop和httptools实现异步I/O。最基本的启动命令如下:
# 基本启动命令
uvicorn main:app --host 0.0.0.0 --port 8000
# 开启热重载(开发环境)
uvicorn main:app --reload
# 设置工作进程数
uvicorn main:app --workers 4
# 启用日志级别
uvicorn main:app --log-level info
6.2 Gunicorn + Uvicorn Worker
在生产环境中,通常使用Gunicorn作为进程管理器,结合Uvicorn的Worker类来运行FastAPI应用。Gunicorn负责管理进程的生命周期、处理信号、重启崩溃的Worker等;Uvicorn Worker则负责处理ASGI请求。这种组合既发挥了Gunicorn的进程管理优势,又利用了Uvicorn的异步性能。
# 安装依赖
pip install gunicorn uvicorn
# 使用Gunicorn启动(4个工作进程)
gunicorn main:app \
--worker-class uvicorn.workers.UvicornWorker \
--workers 4 \
--bind 0.0.0.0:8000 \
--timeout 120 \
--access-logfile access.log \
--error-logfile error.log
6.3 Docker部署
使用Docker容器化部署FastAPI应用,可以确保开发环境和生产环境的一致性和可移植性。多阶段构建(multi-stage build)是一种最佳实践:第一阶段安装所有依赖和编译工具,第二阶段仅复制必要的文件和依赖,显著减小最终镜像的大小。
# Dockerfile - 多阶段构建
# 第一阶段:依赖安装
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 第二阶段:运行环境
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages \
/usr/local/lib/python3.11/site-packages
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
对应的docker-compose.yml文件配置:
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/app
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
restart: unless-stopped
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=app
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
postgres_data:
6.4 Nginx反向代理
在生产部署中,Nginx通常作为反向代理部署在FastAPI应用前端,负责处理SSL/TLS终止、静态文件服务、负载均衡、请求限流和安全防护。Nginx将客户端的HTTP/HTTPS请求转发到后端的Uvicorn服务器。
# nginx.conf
upstream fastapi_backend {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
keepalive 32;
}
server {
listen 80;
server_name api.example.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /etc/ssl/certs/example.crt;
ssl_certificate_key /etc/ssl/private/example.key;
# 安全头部
add_header X-Frame-Options "SAMEORIGIN";
add_header X-Content-Type-Options "nosniff";
location / {
proxy_pass http://fastapi_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 60s;
}
}
6.5 环境变量管理
环境变量是管理不同环境配置(开发、测试、生产)的标准方式。FastAPI支持使用pydantic-settings库来加载和管理环境变量,实现配置的集中管理和类型验证。
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
app_name: str = "FastAPI应用"
debug: bool = False
database_url: str = "sqlite:///dev.db"
redis_url: str = "redis://localhost:6379/0"
secret_key: str = "change-this-in-production"
allowed_hosts: list[str] = ["*"]
class Config:
env_file = ".env"
@lru_cache()
def get_settings():
return Settings()
# .env 文件内容:
# DATABASE_URL=postgresql://user:pass@prod-db:5432/app
# DEBUG=false
# SECRET_KEY=your-production-secret-key
七、生产环境最佳实践
7.1 CORS配置
生产环境的CORS配置需要格外谨慎。使用通配符"*"虽然方便,但在生产环境中存在安全风险。应该明确列出允许的域名,并根据需要精确配置允许的方法和头部。如果应用需要支持Cookie或认证信息,allow_credentials必须设置为true,此时allow_origins不能使用"*"。
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://your-frontend.com",
"https://admin.your-frontend.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=[
"Authorization",
"Content-Type",
"X-Requested-With",
],
)
7.2 日志配置
完善的日志系统是排查问题的关键。生产环境建议使用结构化日志(JSON格式),便于日志聚合和分析。Python标准库的logging模块搭配Logstash或fluentd等工具,可以构建完整的日志收集和分析管道。
import logging
import sys
from datetime import datetime
# 创建日志记录器
logger = logging.getLogger("fastapi_app")
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler("app.log")
file_handler.setLevel(logging.WARNING)
# 日志格式
formatter = logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 在应用中使用
logger.info("应用启动完成")
logger.error("数据库连接失败", exc_info=True)
7.3 全局异常处理
统一的异常处理机制可以确保所有错误都以一致的格式返回给客户端,同时避免敏感信息泄露。FastAPI提供了异常处理器(exception handler)机制,可以针对不同类型的异常自定义响应。
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
app = FastAPI()
class AppException(Exception):
"""自定义应用异常基类"""
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"code": exc.status_code,
"timestamp": datetime.now().isoformat(),
},
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
return JSONResponse(
status_code=422,
content={
"error": "请求参数验证失败",
"details": exc.errors(),
},
)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# 记录详细错误日志
logger.error(f"未处理的异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "服务器内部错误",
"code": 500,
},
)
7.4 应用版本管理
良好的版本管理有助于API的维护和演进。FastAPI支持在应用级别和路由级别配置版本信息,方便客户端了解当前API的版本。建议使用语义化版本号(SemVer)规范。
from fastapi import FastAPI, APIRouter
app = FastAPI(
title="FastAPI Demo API",
version="2.1.0", # 语义化版本号
description="支持WebSocket的实时API服务",
contact={
"name": "开发者团队",
"email": "dev@example.com",
},
license_info={
"name": "MIT",
},
)
# 或使用路由前缀实现多版本API
v1_router = APIRouter(prefix="/api/v1")
v2_router = APIRouter(prefix="/api/v2")
app.include_router(v1_router)
app.include_router(v2_router)
7.5 启动事件(startup/shutdown)
FastAPI支持在应用启动和关闭时执行特定的代码,这通过@app.on_event("startup")和@app.on_event("shutdown")装饰器实现。启动事件适合初始化数据库连接池、加载配置、创建线程池等;关闭事件适合释放资源、关闭连接、保存状态等。
from fastapi import FastAPI
import asyncpg
app = FastAPI()
db_pool = None
@app.on_event("startup")
async def startup():
"""应用启动时的初始化操作"""
global db_pool
# 创建数据库连接池
db_pool = await asyncpg.create_pool(
user="user",
password="password",
database="app",
host="localhost",
min_size=5,
max_size=20,
)
# 加载配置
logger.info("应用启动完成,数据库连接池已初始化")
@app.on_event("shutdown")
async def shutdown():
"""应用关闭时的清理操作"""
global db_pool
if db_pool:
await db_pool.close()
logger.info("数据库连接池已关闭")
# 关闭其他资源
logger.info("应用已安全关闭")
# 同时在启动时检查依赖
@app.on_event("startup")
async def health_check():
"""启动时健康检查"""
try:
# 检查数据库连接
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
logger.info("健康检查通过")
except Exception as e:
logger.warning(f"健康检查失败: {e}")
7.6 性能优化建议
在生产环境中,性能优化是不可忽视的环节。以下是一些针对FastAPI应用的优化建议:
- 使用异步数据库驱动:如asyncpg(PostgreSQL)、databases、SQLAlchemy 2.0异步模式,避免阻塞事件循环。
- 启用Gzip压缩:通过添加GzipMiddleware减少响应体大小,尤其对JSON响应效果显著。
- 缓存策略:合理使用Redis缓存热点数据,减少数据库查询。对频繁访问的API端点设置合适的Cache-Control头部。
- 连接池优化:根据并发量调整数据库连接池大小,避免连接过多导致资源耗尽或过少导致请求排队。
- 静态文件分离:使用Nginx直接提供静态文件服务,不经过Python应用,减轻应用服务器负载。
通过以上七大章节的系统学习,我们从WebSocket基础原理出发,深入掌握了FastAPI WebSocket的完整实现、中间件机制、后台任务处理,再到生产级别的部署方案和最佳实践。FastAPI作为现代Python Web框架的佼佼者,其简洁的API设计、完善的类型注解支持和卓越的性能表现,使其成为构建实时、高并发Web应用的理想选择。