专题:Python并发编程系统学习
关键词:Python, 并发编程, aiohttp, 异步HTTP, ClientSession, 异步客户端, 异步服务器
一、aiohttp概述
aiohttp 是 Python 生态中最成熟、最流行的异步 HTTP 库,基于 asyncio 实现,同时支持客户端和服务器端开发。它利用了 Python 的 async/await 语法,能够在单线程内高效处理大量并发网络请求,是构建高性能网络应用的利器。
与传统的 requests 库不同,aiohttp 在整个请求生命周期中都不会阻塞事件循环。这意味着在等待网络响应的同时,事件循环可以切换到其他协程继续执行,从而实现真正的并发 I/O。这一特性使 aiohttp 在需要同时发起大量 HTTP 请求的场景下,性能远超 requests 配合多线程的方案。
aiohttp 的核心组件包括面向客户端的 ClientSession 和面向服务器的 web.Application。前者封装了连接池、Cookie 存储和请求配置,是发起 HTTP 请求的入口;后者提供了路由、中间件和信号处理机制,用于构建 Web 应用。两者既可以独立使用,也可以在同一项目中同时使用。
核心优势:纯异步非阻塞设计、内置连接池复用、同时支持 HTTP/1.1 和 HTTP/2(需要额外安装)、完善的异常处理体系、活跃的社区维护。
二、ClientSession与连接管理
ClientSession 是 aiohttp 客户端的核心类,它管理着连接池和 Cookie 存储。合理使用 Session 可以复用 TCP 连接,避免每次请求都重新握手,大幅提升性能。
以下是最基本的 GET 请求示例,通过 async with 语句确保 Session 在使用完毕后正确关闭:
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
asyncio.run(fetch('https://api.example.com/data'))
在实际项目中,通常会在应用启动时创建全局 Session 实例,并在整个应用生命周期中复用。这样可以最大化连接池的效益。连接池的大小默认限制为 100 个连接,可通过 connector 参数调整:
import aiohttp
connector = aiohttp.TCPConnector(
limit=50, # 同时最多 50 个连接
limit_per_host=10, # 同一主机最多 10 个连接
ttl_dns_cache=300 # DNS 缓存 300 秒
)
async def run():
async with aiohttp.ClientSession(connector=connector) as session:
# 复用 session 发起多个请求
resp1 = await session.get('https://api.example.com/a')
resp2 = await session.get('https://api.example.com/b')
POST 请求同样简单,支持 data(表单编码)、json(JSON 编码)、files(文件上传)等多种数据格式:
session.post('https://api.example.com/submit',
json={'name': 'Alice', 'age': 30})
session.post('https://api.example.com/upload',
data={'field': 'value'},
files={'file': open('photo.jpg', 'rb')})
三、并发请求控制
aiohttp 的真正威力体现在并发场景。配合 asyncio 提供的并发原语,可以轻松实现高效的批量请求。但需要注意的是,不加限制地同时发起大量请求可能会耗尽系统资源或被目标服务器限流。
使用 asyncio.Semaphore 可以精确控制并发数。下面的示例展示了信号量配合 asyncio.gather 的典型用法:
import aiohttp
import asyncio
semaphore = asyncio.Semaphore(10) # 最多 10 个并发
async def fetch_one(session, url):
async with semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
return await resp.json()
except Exception as e:
return {'url': url, 'error': str(e)}
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
urls = ['https://api.example.com/item/' + str(i) for i in range(100)]
results = asyncio.run(fetch_all(urls))
并发原则:并非并发数越大越好。建议根据目标服务器的承载能力和网络环境合理设置并发上限(通常 10-50 之间比较稳妥),并搭配超时和重试机制来提高健壮性。
四、超时与重试机制
网络请求不可靠,超时和重试是必不可少的防御性编程手段。aiohttp 通过 ClientTimeout 提供精细的超时控制:
timeout = aiohttp.ClientTimeout(
total=30, # 整个请求最多 30 秒
connect=10, # 连接超时 10 秒
sock_read=20, # 读取超时 20 秒
)
session.get(url, timeout=timeout)
如果默认的超时行为无法满足需求,可以自己实现重试逻辑。以下是一个带指数退避的重试装饰器示例:
import asyncio
from aiohttp import ClientError
async def fetch_with_retry(session, url, retries=3):
last_error = None
for attempt in range(retries):
try:
async with session.get(url) as resp:
if resp.status == 200:
return await resp.text()
elif resp.status >= 500:
raise ClientError(f"Server error: {resp.status}")
except ClientError as e:
last_error = e
wait = 2 ** attempt # 指数退避: 1s, 2s, 4s
print(f"Retry {attempt + 1}/{retries} after {wait}s")
await asyncio.sleep(wait)
raise last_error
常见的网络异常包括 ClientConnectorError(连接失败)、ClientTimeout(超时)、ClientResponseError(HTTP 错误状态码)和 ServerDisconnectedError(连接意外断开)。建议根据具体场景选择合适的异常捕获粒度。
五、异步Web服务器
aiohttp 不仅是一个 HTTP 客户端库,还提供了完整的 Web 服务器框架。与 Flask/FastAPI 等框架相比,aiohttp 的 Web 服务器是纯异步实现的,从请求解析到响应生成全程不阻塞事件循环。
创建一个最简的 aiohttp Web 服务器只需几行代码:
from aiohttp import web
async def handle(request):
return web.Response(text="Hello, aiohttp!")
app = web.Application()
app.router.add_get('/', handle)
web.run_app(app, host='0.0.0.0', port=8080)
处理函数是一个协程,接收 web.Request 对象,返回 web.StreamResponse、web.Response 或 web.json_response 等响应对象。请求对象提供了便捷的属性来获取查询参数、请求头、JSON 体和表单数据:
async def post_handler(request):
data = await request.json()
name = data.get('name')
return web.json_response(
{'message': f'Hello, {name}!'},
status=200
)
app.router.add_post('/greet', post_handler)
六、路由与中间件
aiohttp 的路由系统支持静态路由、动态路由和正则路由。动态路由使用大括号标记路径参数,框架会自动提取并注入请求对象的 match_info 属性:
# 动态路由示例
async def user_detail(request):
user_id = request.match_info['user_id']
return web.json_response({'user_id': user_id})
app.router.add_get('/users/{user_id}', user_detail)
# 支持路由分组和命名路由
resource = app.router.add_resource('/items/{item_id:\\d+}')
resource.add_route('GET', get_item)
resource.add_route('DELETE', delete_item)
中间件是 aiohttp Web 框架最强大的特性之一。中间件在请求被处理前和处理后执行,适合实现日志记录、身份验证、错误处理、请求时间统计等横切关注点。中间件是一个嵌套的协程链:
from aiohttp import web
async def error_middleware(request, handler):
try:
resp = await handler(request)
resp.headers['X-Process-Time'] = str(0.1)
return resp
except web.HTTPException as ex:
return web.json_response(
{'error': ex.reason},
status=ex.status
)
app = web.Application(middlewares=[error_middleware])
中间件执行顺序:中间件列表中的第一个中间件是最外层包装,最后一个中间件最靠近处理函数。请求进入时按列表顺序穿过中间件,响应返回时按逆序穿过。
七、实际应用:并发爬虫示例
综合以上知识,下面实现一个健壮的并发爬虫。该爬虫从给定 URL 列表中抓取内容,使用信号量控制并发数,配备超时和重试,并通过 asyncio.Queue 实现生产者-消费者模式:
import aiohttp
import asyncio
from bs4 import BeautifulSoup
async def worker(worker_id, queue, session, sem):
while not queue.empty():
url = await queue.get()
async with sem:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(10)) as resp:
html = await resp.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else 'No Title'
print(f"[Worker {worker_id}] {url} -> {title}")
except Exception as e:
print(f"[Worker {worker_id}] Error: {url} -> {e}")
finally:
queue.task_done()
async def crawl(urls, concurrency=10):
queue = asyncio.Queue()
for url in urls:
queue.put_nowait(url)
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
workers = [worker(i, queue, session, sem) for i in range(concurrency)]
await asyncio.gather(*workers)
urls = [f'https://example.com/page/{i}' for i in range(50)]
asyncio.run(crawl(urls, concurrency=5))
这个爬虫的设计要点是:工作者协程从共享队列中取出 URL,通过信号量限制并发数,使用全局 Session 复用连接。当所有 URL 处理完毕后,程序优雅退出。这种生产者-消费者模式可扩展性极强,可以轻松添加 URL 去重、增量抓取、结果持久化等功能。
实际部署时还需要考虑:遵守 robots.txt、设置合理的 User-Agent、实现速率限制(Rate Limiting)、处理反爬机制、以及将抓取结果持久化到数据库或文件系统。
总结:aiohttp 是 Python 异步网络编程的基石库。掌握它之后,无论编写高性能爬虫、构建微服务还是实现 API 网关,都能游刃有余。建议在实际项目中逐步深入,从简单的客户端请求开始,逐渐过渡到完整的异步服务端开发。