asyncio网络编程:TCP/UDP服务器实现

Python并发编程专题 · 基于事件循环的底层网络通信

专题:Python并发编程系统学习

关键词:Python, 并发编程, asyncio, TCP, UDP, 网络编程, start_server, Protocol协议

一、asyncio网络编程概览

asyncio作为Python标准库中异步I/O框架的核心,提供了从底层传输层协议到高层流式API的完整网络编程工具链。理解asyncio网络编程的关键在于把握事件循环(event loop)驱动的I/O模型——所有网络操作都是非阻塞的,通过协程(coroutine)的await表达式将控制权交还给事件循环,在数据到达时通过回调或恢复协程的方式继续执行。

asyncio网络编程提供两套API层次:

选择哪套API取决于具体需求:Streams API上手快、代码可读性强;Protocol API更灵活,支持UDP和更底层的传输控制。在实际项目中,两者可以混合使用。

二、TCP服务器:start_server

asyncio.start_server() 是asyncio创建TCP服务器的主要入口。它接受一个回调函数(通常为协程函数)作为 client_connected_cb 参数,每当有新客户端连接时自动调用该回调。回调函数接收 asyncio.StreamReaderasyncio.StreamWriter 两个参数,分别用于读取客户端数据和向客户端发送数据。

下面是一个完整的echo服务器实现,将客户端发送的数据原样返回:

import asyncio async def handle_client(reader, writer): data = await reader.read(1024) writer.write(data) # echo server await writer.drain() writer.close() await writer.wait_closed() async def main(): server = await asyncio.start_server( handle_client, 'localhost', 8888) async with server: await server.serve_forever() asyncio.run(main())

关键方法与属性说明:

start_server 还支持更多参数配置:

三、Protocol类编程模型

Protocol 类是asyncio低层网络API的核心。与Streams API的回调函数模式不同,Protocol 使用面向对象的方式组织网络事件处理代码。开发者通过继承 asyncio.Protocol 并重写其回调方法来处理连接生命周期中的各类事件。

Protocol 核心回调方法:

下面是一个使用 Protocol 类实现的 echo 服务器:

import asyncio class EchoProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport peername = transport.get_extra_info('peername') print(f'New connection from {peername}') def data_received(self, data): print(f'Received: {data.decode()}') self.transport.write(data) # echo back def eof_received(self): print('EOF received') return False # close transport def connection_lost(self, exc): if exc: print(f'Connection lost with error: {exc}') else: print('Connection closed normally') async def main(): loop = asyncio.get_running_loop() server = await loop.create_server( EchoProtocol, 'localhost', 8889) async with server: await server.serve_forever() asyncio.run(main())

Protocol 模型与 Streams 模型的对比:

对比维度Streams APIProtocol API
编程范式协程式(async/await)回调式(继承 + 重写)
可读性线性代码,易于理解回调分散,逻辑可能分散
灵活性较高层,适合常见TCP场景底层,可精细控制传输行为
UDP支持不支持支持(DatagramProtocol)
粘包处理框架有内部缓冲需自行处理帧边界
适用场景HTTP、WebSocket等标准协议自定义二进制协议、UDP通信

四、UDP服务器

UDP(用户数据报协议)是无连接的传输层协议,不保证数据可靠性和顺序,但具有低延迟、低开销的优势。asyncio 通过 DatagramProtocol 类支持 UDP 通信。与 TCP 的 Protocol 类不同,DatagramProtocol 使用 datagram_received() 回调而非 data_received()

DatagramProtocol 核心回调:

下面是一个使用 DatagramProtocol 实现的 UDP echo 服务器:

import asyncio class UDPEchoProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport print('UDP server started') def datagram_received(self, data, addr): message = data.decode() print(f'Received {message!r} from {addr}') self.transport.sendto(data, addr) # echo back def error_received(self, exc): print(f'Error received: {exc}') def connection_lost(self, exc): if exc: print(f'Connection lost: {exc}') else: print('Connection closed') async def main(): loop = asyncio.get_running_loop() transport, protocol = await loop.create_datagram_endpoint( UDPEchoProtocol, local_addr=('localhost', 9999)) try: await asyncio.sleep(3600) # run for 1 hour finally: transport.close() asyncio.run(main())

UDP 客户端的实现方式类似,只是不需要 local_addr 参数(系统自动分配临时端口),但需要指定 remote_addr 来建立连接式 UDP 通信,或者使用 transport.sendto() 以无连接方式发送数据。

class UDPClientProtocol(asyncio.DatagramProtocol): def __init__(self, message): self.message = message self.received = None def connection_made(self, transport): transport.sendto(self.message.encode()) def datagram_received(self, data, addr): self.received = data.decode() print(f'Server echo: {self.received}') transport.close() # 客户端创建方式 # transport, protocol = await loop.create_datagram_endpoint( # lambda: UDPClientProtocol('Hello UDP'), # remote_addr=('localhost', 9999))

五、TCP客户端

asyncio.open_connection() 是创建 TCP 客户端连接的便捷函数。它返回 (reader, writer) 元组,与 start_server 中的处理回调函数签名完全一致,使得客户端和服务器的代码风格统一。

以下是一个同时支持交互式输入和一次性请求的 TCP 客户端示例:

import asyncio async def tcp_client(message): reader, writer = await asyncio.open_connection( 'localhost', 8888) print(f'Send: {message!r}') writer.write(message.encode()) await writer.drain() data = await reader.read(1024) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() await writer.wait_closed() asyncio.run(tcp_client('Hello World!'))

open_connection 的关键参数:

在编写客户端时,需要特别注意超时处理,避免网络异常导致协程永久挂起:

async def tcp_client_with_timeout(message): try: reader, writer = await asyncio.wait_for( asyncio.open_connection('localhost', 8888), timeout=5.0) except asyncio.TimeoutError: print('Connection timed out') return writer.write(message.encode()) try: data = await asyncio.wait_for( reader.read(1024), timeout=10.0) print(f'Received: {data.decode()!r}') except asyncio.TimeoutError: print('Response timed out') finally: writer.close() await writer.wait_closed()

六、实际案例:简单聊天服务器

综合运用本节所学知识,实现一个支持多客户端同时在线、消息广播的简单聊天服务器。该案例展示了如何管理多个客户端连接、处理消息分发以及优雅地处理客户端断开。

import asyncio class ChatServer: def __init__(self): self.clients = {} # writer -> nickname mapping async def broadcast(self, message, sender_writer=None): for writer in list(self.clients): if writer != sender_writer: # don't echo back to sender try: writer.write(message.encode()) await writer.drain() except ConnectionError: pass async def handle_client(self, reader, writer): nickname = f'User{len(self.clients) + 1}' self.clients[writer] = nickname addr = writer.get_extra_info('peername') print(f'{nickname} connected from {addr}') await self.broadcast(f'{nickname} joined the chat\n', writer) try: while True: data = await reader.readline() if not data: break # client disconnected message = data.decode().strip() if not message: continue if message == '/quit': await self.broadcast( f'{nickname} left the chat\n', writer) break formatted = f'[{nickname}] {message}\n' await self.broadcast(formatted, writer) except asyncio.CancelledError: pass finally: del self.clients[writer] writer.close() await writer.wait_closed() print(f'{nickname} disconnected') async def start(self, host='localhost', port=8888): server = await asyncio.start_server( self.handle_client, host, port) addr = server.sockets[0].getsockname() print(f'Chat server listening on {addr[0]}:{addr[1]}') async with server: await server.serve_forever() if __name__ == '__main__': chat = ChatServer() asyncio.run(chat.start())

聊天服务器的要点总结:

七、性能优化与注意事项

在生产环境中部署 asyncio 网络服务时,以下几点性能和稳定性优化至关重要:

TCP_NODELAY 设置

默认情况下,TCP 协议使用 Nagle 算法将多个小数据包合并为一个大包发送,以降低网络开销。但对于实时性要求高的应用(如聊天服务器、游戏服务器),这会引入不必要的延迟。可以通过设置 TCP_NODELAY 选项禁用 Nagle 算法:

async def handle_client(reader, writer): sock = writer.get_extra_info('socket') if sock is not None: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # ... rest of handler

缓冲区大小调优

reader 对象内部的缓冲区大小通过 limit 参数控制(默认 64KB)。对于大流量服务,适当增大缓冲区可以减少系统调用次数;对于内存敏感的环境,减小缓冲区可以降低内存占用。同时在操作系统层面调整 socket 缓冲区:

sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)

半关闭处理

TCP 允许半关闭(half-close)——即一方关闭写通道后仍可读取数据。在 Streams API 中,可以通过 writer.write_eof() 发送 EOF 信号,而 reader 仍可继续读取。Protocol 模型下通过 eof_received() 回调处理。在设计协议时,合理利用半关闭可以提高协议的效率和灵活性。

连接超时

使用 asyncio.wait_for() 为关键操作设置超时,防止协程永久挂起:

try: data = await asyncio.wait_for(reader.read(1024), timeout=30.0) except asyncio.TimeoutError: writer.close()

SSL/TLS 加密

为 TCP 服务启用加密传输需要创建 SSLContext 对象并传递给服务器或客户端:

import ssl ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain('cert.pem', 'key.pem') server = await asyncio.start_server( handle_client, 'localhost', 8443, ssl=ssl_context)

其他最佳实践