专题:Python并发编程系统学习
关键词:Python, 并发编程, asyncio, TCP, UDP, 网络编程, start_server, Protocol协议
一、asyncio网络编程概览
asyncio作为Python标准库中异步I/O框架的核心,提供了从底层传输层协议到高层流式API的完整网络编程工具链。理解asyncio网络编程的关键在于把握事件循环(event loop)驱动的I/O模型——所有网络操作都是非阻塞的,通过协程(coroutine)的await表达式将控制权交还给事件循环,在数据到达时通过回调或恢复协程的方式继续执行。
asyncio网络编程提供两套API层次:
- 高层API(Streams):基于流的流式接口,使用
asyncio.start_server() 和 asyncio.open_connection(),通过 reader/writer 对象读写数据。API简洁直观,适合大多数TCP应用场景。
- 低层API(Protocol/Transport):基于协议类的回调式编程模型,实现 Protocol 接口(如
asyncio.Protocol 和 asyncio.DatagramProtocol),通过 connection_made()、data_received()、connection_lost() 等回调方法处理网络事件。更适合UDP和需要精细控制的TCP应用。
选择哪套API取决于具体需求:Streams API上手快、代码可读性强;Protocol API更灵活,支持UDP和更底层的传输控制。在实际项目中,两者可以混合使用。
二、TCP服务器:start_server
asyncio.start_server() 是asyncio创建TCP服务器的主要入口。它接受一个回调函数(通常为协程函数)作为 client_connected_cb 参数,每当有新客户端连接时自动调用该回调。回调函数接收 asyncio.StreamReader 和 asyncio.StreamWriter 两个参数,分别用于读取客户端数据和向客户端发送数据。
下面是一个完整的echo服务器实现,将客户端发送的数据原样返回:
import asyncio
async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(data) # echo server
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_client, 'localhost', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
关键方法与属性说明:
- reader.read(n):读取最多 n 字节数据。当流结束时返回空字节串 b''。
- writer.write(data):将数据写入传输缓冲区,不保证立即发送。需要配合 drain() 使用。
- writer.drain():等待写入缓冲区中的数据被底层传输层实际发送。在写入大量数据时必须 await。
- writer.close():关闭写入流。建议后续调用
wait_closed() 确保连接完全关闭。
- writer.wait_closed():等待连接完全关闭。避免资源泄漏。
- server.serve_forever():启动服务器并持续运行直到被取消。在 async with 块内使用可确保退出时资源被正确清理。
start_server 还支持更多参数配置:
- family=socket.AF_INET:指定地址族(IPv4 或 IPv6)。
- flags=socket.AI_PASSIVE:getaddrinfo 标志。
- backlog:监听队列大小,默认为 100。
- ssl:传入 SSLContext 对象以启用 TLS/SSL 加密。
- reuse_address:是否重用地址,默认为 True。
- reuse_port:是否重用端口(仅限 Linux)。
三、Protocol类编程模型
Protocol 类是asyncio低层网络API的核心。与Streams API的回调函数模式不同,Protocol 使用面向对象的方式组织网络事件处理代码。开发者通过继承 asyncio.Protocol 并重写其回调方法来处理连接生命周期中的各类事件。
Protocol 核心回调方法:
- connection_made(transport):当连接建立成功时调用。transport 参数代表底层传输层对象,提供发送数据和关闭连接的方法。
- data_received(data):当接收到数据时调用。data 为 bytes 类型。注意该方法可能在一条消息未接收完时被多次调用,因此需要自行处理消息边界和粘包问题。
- eof_received():当收到 EOF 标志时调用。返回 False 表示关闭传输方向(半关闭)。
- connection_lost(exc):当连接关闭或发生错误时调用。exc 为 None 表示正常关闭,否则为异常对象。
下面是一个使用 Protocol 类实现的 echo 服务器:
import asyncio
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
peername = transport.get_extra_info('peername')
print(f'New connection from {peername}')
def data_received(self, data):
print(f'Received: {data.decode()}')
self.transport.write(data) # echo back
def eof_received(self):
print('EOF received')
return False # close transport
def connection_lost(self, exc):
if exc:
print(f'Connection lost with error: {exc}')
else:
print('Connection closed normally')
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(
EchoProtocol, 'localhost', 8889)
async with server:
await server.serve_forever()
asyncio.run(main())
Protocol 模型与 Streams 模型的对比:
| 对比维度 | Streams API | Protocol API |
| 编程范式 | 协程式(async/await) | 回调式(继承 + 重写) |
| 可读性 | 线性代码,易于理解 | 回调分散,逻辑可能分散 |
| 灵活性 | 较高层,适合常见TCP场景 | 底层,可精细控制传输行为 |
| UDP支持 | 不支持 | 支持(DatagramProtocol) |
| 粘包处理 | 框架有内部缓冲 | 需自行处理帧边界 |
| 适用场景 | HTTP、WebSocket等标准协议 | 自定义二进制协议、UDP通信 |
四、UDP服务器
UDP(用户数据报协议)是无连接的传输层协议,不保证数据可靠性和顺序,但具有低延迟、低开销的优势。asyncio 通过 DatagramProtocol 类支持 UDP 通信。与 TCP 的 Protocol 类不同,DatagramProtocol 使用 datagram_received() 回调而非 data_received()。
DatagramProtocol 核心回调:
- connection_made(transport):传输层就绪时调用。
- datagram_received(data, addr):收到 UDP 数据报时调用。data 为数据内容(bytes),addr 为发送方地址元组 (host, port)。
- error_received(exc):当底层传输发生错误时调用(如 ICMP 不可达)。
- connection_lost(exc):传输层关闭时调用。
下面是一个使用 DatagramProtocol 实现的 UDP echo 服务器:
import asyncio
class UDPEchoProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
print('UDP server started')
def datagram_received(self, data, addr):
message = data.decode()
print(f'Received {message!r} from {addr}')
self.transport.sendto(data, addr) # echo back
def error_received(self, exc):
print(f'Error received: {exc}')
def connection_lost(self, exc):
if exc:
print(f'Connection lost: {exc}')
else:
print('Connection closed')
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
UDPEchoProtocol,
local_addr=('localhost', 9999))
try:
await asyncio.sleep(3600) # run for 1 hour
finally:
transport.close()
asyncio.run(main())
UDP 客户端的实现方式类似,只是不需要 local_addr 参数(系统自动分配临时端口),但需要指定 remote_addr 来建立连接式 UDP 通信,或者使用 transport.sendto() 以无连接方式发送数据。
class UDPClientProtocol(asyncio.DatagramProtocol):
def __init__(self, message):
self.message = message
self.received = None
def connection_made(self, transport):
transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
self.received = data.decode()
print(f'Server echo: {self.received}')
transport.close()
# 客户端创建方式
# transport, protocol = await loop.create_datagram_endpoint(
# lambda: UDPClientProtocol('Hello UDP'),
# remote_addr=('localhost', 9999))
五、TCP客户端
asyncio.open_connection() 是创建 TCP 客户端连接的便捷函数。它返回 (reader, writer) 元组,与 start_server 中的处理回调函数签名完全一致,使得客户端和服务器的代码风格统一。
以下是一个同时支持交互式输入和一次性请求的 TCP 客户端示例:
import asyncio
async def tcp_client(message):
reader, writer = await asyncio.open_connection(
'localhost', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(1024)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_client('Hello World!'))
open_connection 的关键参数:
- host/port:服务器地址和端口。host 为 None 时使用 localhost。
- ssl:SSLContext 对象或 True(使用默认上下文)。
- family/proto/flags:socket 地址参数,与 socket.getaddrinfo() 一致。
- local_addr:绑定到本地特定地址的元组 (host, port)。
- limit:reader 内部缓冲区的最大大小(字节),默认 2^16 = 65536。
在编写客户端时,需要特别注意超时处理,避免网络异常导致协程永久挂起:
async def tcp_client_with_timeout(message):
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection('localhost', 8888),
timeout=5.0)
except asyncio.TimeoutError:
print('Connection timed out')
return
writer.write(message.encode())
try:
data = await asyncio.wait_for(
reader.read(1024), timeout=10.0)
print(f'Received: {data.decode()!r}')
except asyncio.TimeoutError:
print('Response timed out')
finally:
writer.close()
await writer.wait_closed()
六、实际案例:简单聊天服务器
综合运用本节所学知识,实现一个支持多客户端同时在线、消息广播的简单聊天服务器。该案例展示了如何管理多个客户端连接、处理消息分发以及优雅地处理客户端断开。
import asyncio
class ChatServer:
def __init__(self):
self.clients = {} # writer -> nickname mapping
async def broadcast(self, message, sender_writer=None):
for writer in list(self.clients):
if writer != sender_writer: # don't echo back to sender
try:
writer.write(message.encode())
await writer.drain()
except ConnectionError:
pass
async def handle_client(self, reader, writer):
nickname = f'User{len(self.clients) + 1}'
self.clients[writer] = nickname
addr = writer.get_extra_info('peername')
print(f'{nickname} connected from {addr}')
await self.broadcast(f'{nickname} joined the chat\n', writer)
try:
while True:
data = await reader.readline()
if not data:
break # client disconnected
message = data.decode().strip()
if not message:
continue
if message == '/quit':
await self.broadcast(
f'{nickname} left the chat\n', writer)
break
formatted = f'[{nickname}] {message}\n'
await self.broadcast(formatted, writer)
except asyncio.CancelledError:
pass
finally:
del self.clients[writer]
writer.close()
await writer.wait_closed()
print(f'{nickname} disconnected')
async def start(self, host='localhost', port=8888):
server = await asyncio.start_server(
self.handle_client, host, port)
addr = server.sockets[0].getsockname()
print(f'Chat server listening on {addr[0]}:{addr[1]}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
chat = ChatServer()
asyncio.run(chat.start())
聊天服务器的要点总结:
- 多客户端管理:使用字典(writer -> nickname)追踪所有在线客户端。
- 消息广播:遍历所有客户端连接,向除发送者之外的每个人转发消息。
- 消息格式:使用 readline() 按行读取,配合 '\n' 作为消息分隔符。
- 优雅断开:在 finally 块中清理客户端记录并关闭连接。
- 错误处理:在广播时捕获 ConnectionError,避免单个客户端异常影响其他人。
- 退出命令:提供 /quit 命令让客户端主动离开聊天室。
七、性能优化与注意事项
在生产环境中部署 asyncio 网络服务时,以下几点性能和稳定性优化至关重要:
TCP_NODELAY 设置
默认情况下,TCP 协议使用 Nagle 算法将多个小数据包合并为一个大包发送,以降低网络开销。但对于实时性要求高的应用(如聊天服务器、游戏服务器),这会引入不必要的延迟。可以通过设置 TCP_NODELAY 选项禁用 Nagle 算法:
async def handle_client(reader, writer):
sock = writer.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# ... rest of handler
缓冲区大小调优
reader 对象内部的缓冲区大小通过 limit 参数控制(默认 64KB)。对于大流量服务,适当增大缓冲区可以减少系统调用次数;对于内存敏感的环境,减小缓冲区可以降低内存占用。同时在操作系统层面调整 socket 缓冲区:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
半关闭处理
TCP 允许半关闭(half-close)——即一方关闭写通道后仍可读取数据。在 Streams API 中,可以通过 writer.write_eof() 发送 EOF 信号,而 reader 仍可继续读取。Protocol 模型下通过 eof_received() 回调处理。在设计协议时,合理利用半关闭可以提高协议的效率和灵活性。
连接超时
使用 asyncio.wait_for() 为关键操作设置超时,防止协程永久挂起:
try:
data = await asyncio.wait_for(reader.read(1024), timeout=30.0)
except asyncio.TimeoutError:
writer.close()
SSL/TLS 加密
为 TCP 服务启用加密传输需要创建 SSLContext 对象并传递给服务器或客户端:
import ssl
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain('cert.pem', 'key.pem')
server = await asyncio.start_server(
handle_client, 'localhost', 8443,
ssl=ssl_context)
其他最佳实践
- 使用 asyncio.run() 启动:Python 3.7+ 推荐的入口方式,自动创建和清理事件循环。
- 异常处理:在 Protocol 的 connection_lost 和协程处理函数的 try/except 中妥善处理所有异常。
- 资源清理:始终确保 writer.close() 和 wait_closed() 被调用,避免文件描述符泄漏。
- 监控与日志:记录关键事件(连接建立、断开、错误)到日志系统,便于运维排查。
- 限流与背压:当客户端消费速度跟不上生产速度时,通过 drain() 的阻塞特性实现自然的背压控制。
- 优雅关闭:通过 signal 处理器捕获 SIGINT/SIGTERM,在关闭服务器前完成正在处理的任务。