异步爬虫(asyncio/aiohttp)

网络爬虫专题 · 掌握异步高性能爬虫开发

专题:Python网络爬虫系统学习

关键词:Python, 网络爬虫, 异步爬虫, aiohttp, asyncio, 协程, 并发请求, 事件循环, 高性能

一、异步编程基础

1.1 同步 vs 异步 vs 多线程

在理解异步爬虫之前,首先需要区分三种编程模型:

1.2 爬虫是I/O密集型任务

网络爬虫的本质是大量HTTP请求的发送与响应接收。一次HTTP请求中,90%以上的时间都花费在网络传输上——等待DNS解析、建立TCP连接、发送请求、等待服务器处理、接收响应数据。在这段时间内,CPU几乎处于空闲状态。因此爬虫是典型的I/O密集型任务,非常适合使用异步编程模型来优化性能。

1.3 协程(Coroutine)的概念

协程是一种用户态的轻量级线程,也称为微线程。与操作系统线程不同,协程的调度完全由程序自身控制,而不是由操作系统内核管理。协程可以在执行到某个耗时操作时主动挂起(yield),让其他协程继续执行,从而实现协作式多任务处理。Python中通过 async/await 语法来实现协程。

1.4 async/await 语法

async def 用于定义一个协程函数,调用该函数会返回一个协程对象。协程对象需要通过 await 来执行。await 关键字用于挂起当前协程,等待另一个协程完成,同时将控制权交还给事件循环,让事件循环调度其他就绪的协程。

import asyncio async def fetch_data(url): print(f"开始请求: {url}") # await 会挂起当前协程,模拟网络等待 await asyncio.sleep(1) print(f"完成请求: {url}") return f"数据来自 {url}" # 调用协程函数返回协程对象,不会立即执行 coro = fetch_data("https://example.com") # 需要通过 await 或 asyncio.run() 来执行

1.5 事件循环(Event Loop)

事件循环是异步编程的核心调度器。它维护着一个任务队列,不断循环检查每个任务的状态:如果任务正在等待I/O操作,则将其挂起并切换到下一个就绪的任务;如果任务已经准备就绪,则继续执行。Python的 asyncio 模块提供了高性能的事件循环实现,底层基于 selectors 模块或 IOCP(Windows)等系统级I/O多路复用机制。

核心理解:异步编程不是让程序运行得更快,而是让程序在等待I/O时不闲着,从而在单位时间内完成更多任务。对于爬虫而言,就是在一个请求等待响应的过程中,去发送其他请求。

二、asyncio 模块详解

2.1 定义和运行协程

async def 定义一个协程函数,内部的 await 表达式会挂起当前协程直到 awaitable 对象完成。使用 asyncio.run() 是启动事件循环的标准方式,它创建一个新的事件循环,运行传入的协程,并在完成后关闭事件循环。

import asyncio async def hello(): print("Hello") await asyncio.sleep(1) # 模拟I/O等待 print("World") asyncio.run(hello()) # 启动事件循环

2.2 并发执行多个协程

asyncio.gather() 是并发执行多个协程的核心函数。它接收多个 awaitable 对象,并发地调度它们,并等待所有任务完成。返回值是一个列表,顺序与传入的协程顺序一致。如果任何一个协程抛出异常,gather 默认会传播异常,但可以通过 return_exceptions=True 参数将异常作为返回值。

async def main(): tasks = [ fetch_data("https://api1.example.com"), fetch_data("https://api2.example.com"), fetch_data("https://api3.example.com"), ] results = await asyncio.gather(*tasks) print(results) asyncio.run(main())

2.3 创建与管理任务

asyncio.create_task() 用于将协程包装成一个 Task 对象,并立即安排到事件循环中执行。与直接 await 不同,create_task 不会阻塞当前协程——它会立即返回 Task 对象,协程在后台开始执行。这对于在等待其他操作时执行后台任务非常有用。

async def main(): # 创建多个任务,它们会立即开始执行 task1 = asyncio.create_task(fetch_data("https://api1.com")) task2 = asyncio.create_task(fetch_data("https://api2.com")) # 在此可以执行其他操作... print("任务已在后台启动...") # 等待所有任务完成 results = await asyncio.gather(task1, task2) return results

2.4 协程间的通信

asyncio 提供了多种协程间通信机制:asyncio.Queue 是生产消费者模式的首选,支持多个生产者协程向队列中放入 URL,多个消费者协程从队列中取出 URL 并爬取。asyncio.Event 用于协程间的同步通知,asyncio.Lock 用于保护共享资源的互斥访问,asyncio.Semaphore 用于控制并发数量。

要点总结:asyncio.Queue 是实现爬虫任务调度最常用的工具,它天然支持协程环境下的线程安全操作。结合 Semaphore 可以实现精准的并发数控制。

三、aiohttp 客户端基础

3.1 安装与导入

aiohttp 是 Python 中最流行的异步 HTTP 客户端/服务端框架。使用 pip 即可安装:

pip install aiohttp

安装完成后,在代码中导入:

import aiohttp import asyncio

3.2 创建会话与发送请求

aiohttp 的核心是 ClientSession 对象。它管理连接池和 Cookie,建议作为上下文管理器使用(async with)以确保资源正确释放。每个会话可以发送任意数量的请求,连接会自动复用。

async def fetch_url(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() # POST 请求 async def post_data(url, data): async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: return await response.json()

3.3 响应读取方式

aiohttp 提供了多种读取响应内容的方法:

这些方法都是协程,需要使用 await 调用。

3.4 超时设置

aiohttp.ClientTimeout 用于控制请求的超时行为。可以设置总超时、连接超时和读取超时:

from aiohttp import ClientTimeout # 总超时30秒 timeout = ClientTimeout(total=30) # 更精细的超时控制 timeout = ClientTimeout( total=60, # 整个请求的总超时 connect=10, # 连接建立超时 sock_read=30 # 读取数据超时 ) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: return await resp.text()

3.5 连接限制

TCPConnector 用于控制连接池行为,包括最大并发连接数、每个主机的最大连接数、SSL 验证等:

from aiohttp import TCPConnector # 限制总连接数为50,每个主机最多10个连接 connector = TCPConnector( limit=50, # 总并发连接数 limit_per_host=10, # 每个主机的最大连接数 ssl=False, # 是否验证SSL证书(开发环境可关闭) ttl_dns_cache=300 # DNS缓存时间(秒) ) async with aiohttp.ClientSession(connector=connector) as session: # ... 发送请求

四、aiohttp 高级用法

4.1 Session 复用与连接池

在一个爬虫程序中,不应该为每个请求都创建一个新的 ClientSession。正确的做法是创建一个全局 Session,在所有协程中复用。Session 内部维护了连接池(默认最多100个连接),复用 Session 可以显著减少 TCP 握手的开销:

async def crawl(): connector = TCPConnector(limit=100) async with aiohttp.ClientSession(connector=connector) as session: urls = ["https://example.com/page/{}".format(i) for i in range(100)] tasks = [fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return results async def fetch(session, url): async with session.get(url) as resp: return await resp.text()

4.2 自定义请求头

模拟浏览器请求头可以减少被反爬虫策略拦截的概率。可以通过 Session 级别的默认头或请求级别的自定义头来实现:

headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } # 全局请求头 session = aiohttp.ClientSession(headers=headers) # 或按请求设置 async with session.get(url, headers={"Referer": "https://www.google.com"}) as resp: ...

4.3 代理支持

aiohttp 原生支持 HTTP/HTTPS/SOCKS 代理,通过 proxy 参数指定:

# HTTP 代理 async with session.get(url, proxy="http://127.0.0.1:7890") as resp: ... # 需要认证的代理 async with session.get( url, proxy="http://user:password@127.0.0.1:7890" ) as resp: ...

4.4 Cookie 管理

ClientSession 自动管理 Cookie,默认使用 CookieJar 存储。可以创建自定义的 CookieJar 来控制 Cookie 策略:

from aiohttp import CookieJar # 默认 CookieJar(遵守 HTTP 标准) jar = CookieJar() # 自定义 CookieJar(不验证域,适合测试) jar_unsafe = CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar) # 手动设置 Cookie session.cookie_jar.update_cookies( {"session_id": "abc123"}, response_url )

4.5 异常处理

aiohttp 定义了丰富的异常体系,爬虫中应分类处理:

from aiohttp import ( ClientError, ClientConnectionError, ClientTimeout, ClientHttpProxyError, InvalidURL, ServerTimeoutError ) async def safe_fetch(session, url): try: async with session.get(url, timeout=ClientTimeout(total=10)) as resp: if resp.status == 200: return await resp.text() else: print(f"HTTP {resp.status}: {url}") return None except ClientTimeout: print(f"超时: {url}") except ClientConnectionError: print(f"连接失败: {url}") except ClientHttpProxyError: print(f"代理错误: {url}") except InvalidURL: print(f"无效URL: {url}") except ClientError as e: print(f"其他客户端错误: {url}, {e}") return None

4.6 重试机制

网络请求不可靠,合理的重试机制是健壮爬虫的必备功能:

async def fetch_with_retry(session, url, max_retries=3): for attempt in range(1, max_retries + 1): try: async with session.get(url, timeout=ClientTimeout(total=10)) as resp: if resp.status == 200: return await resp.text() elif resp.status in (429, 503): # 被限流或服务暂时不可用,等待后重试 wait = 2 ** attempt print(f"被限流,等待{wait}秒后重试...") await asyncio.sleep(wait) else: return None except (ClientTimeout, ClientConnectionError) as e: if attempt == max_retries: print(f"重试{max_retries}次后仍然失败: {url}") return None wait = 2 ** attempt print(f"第{attempt}次失败,{wait}秒后重试: {url}") await asyncio.sleep(wait) return None

五、并发爬虫实现

5.1 信号量限制并发数

asyncio.Semaphore 是控制并发请求数量的核心工具。它维护一个计数器,每次 acquire 减少1,每次 release 增加1。当计数器为0时,acquire 会阻塞直到有其他协程 release。合理设置并发数可以避免对目标服务器造成过大压力,同时防止被反爬虫机制封禁:

semaphore = asyncio.Semaphore(10) # 最多10个并发 async def fetch_with_limit(session, url): async with semaphore: # 超过限制时会自动等待 async with session.get(url) as resp: return await resp.text() async def crawl(urls): connector = TCPConnector(limit=20) async with aiohttp.ClientSession(connector=connector) as session: tasks = [fetch_with_limit(session, url) for url in urls] results = await asyncio.gather(*tasks) return results

5.2 请求队列

使用 asyncio.Queue 实现生产者-消费者模式,适合大规模爬虫场景。生产者负责发现并添加 URL,消费者从队列中取出 URL 并爬取:

async def worker(name, session, queue, semaphore): while True: url = await queue.get() async with semaphore: try: async with session.get(url) as resp: html = await resp.text() print(f"[{name}] 爬取完成: {url}") except Exception as e: print(f"[{name}] 失败: {url}, {e}") queue.task_done() async def main(): queue = asyncio.Queue() semaphore = asyncio.Semaphore(10) # 向队列中添加初始URL for url in ["https://example.com/page/{}".format(i) for i in range(200)]: await queue.put(url) connector = TCPConnector(limit=20) async with aiohttp.ClientSession(connector=connector) as session: workers = [worker(f"Worker-{i}", session, queue, semaphore) for i in range(5)] await asyncio.gather(*workers)

5.3 解析与存储

在实际爬虫中,爬取到的数据需要解析和存储。为了避免阻塞事件循环,CPU密集型的解析操作(如 XML/HTML 解析)应该放到线程池中执行:

import asyncio from concurrent.futures import ThreadPoolExecutor from bs4 import BeautifulSoup executor = ThreadPoolExecutor(max_workers=4) async def parse_html(html): # 使用 run_in_executor 将同步解析放入线程池 loop = asyncio.get_event_loop() def parse(): soup = BeautifulSoup(html, "html.parser") title = soup.title.string if soup.title else "" return {"title": title, "length": len(html)} return await loop.run_in_executor(executor, parse) async def fetch_and_parse(session, url): async with session.get(url) as resp: html = await resp.text() data = await parse_html(html) return data

5.4 完整异步爬虫示例

以下是一个完整的异步爬虫示例,整合了以上所有知识点:

import asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector, ClientError async def safe_fetch(session, url, semaphore, timeout=10): async with semaphore: try: async with session.get( url, timeout=ClientTimeout(total=timeout) ) as resp: if resp.status == 200: return url, await resp.text() return url, None except ClientError as e: print(f"请求失败: {url}, {e}") return url, None async def async_crawl(urls, max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent) connector = TCPConnector(limit=max_concurrent * 2, ssl=False) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36" } async with aiohttp.ClientSession( connector=connector, headers=headers ) as session: tasks = [ safe_fetch(session, url, semaphore) for url in urls ] results = await asyncio.gather(*tasks) return {url: html for url, html in results if html is not None} # 使用示例 urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 10)] results = asyncio.run(async_crawl(urls, max_concurrent=5)) print(f"成功爬取 {len(results)} 个页面")

六、aiohttp + BeautifulSoup

6.1 异步请求 + 同步解析

在异步爬虫中,网络请求部分使用 aiohttp 实现异步,而 HTML 解析使用 BeautifulSoup(同步库)。这是一种非常高效且常用的组合。关键在于将 BeautifulSoup 的同步解析操作放到线程池中执行,避免阻塞事件循环。

6.2 run_in_executor 处理阻塞操作

BeautifulSoup 的解析操作是 CPU 密集型的,如果直接在协程中执行会阻塞事件循环,导致其他协程无法运行。通过 loop.run_in_executor() 可以将同步函数提交到线程池执行,返回一个 awaitable 对象:

import asyncio import aiohttp from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor(max_workers=4) async def parse_with_bs(html): loop = asyncio.get_running_loop() return await loop.run_in_executor( pool, lambda: BeautifulSoup(html, "html.parser") ) async def fetch_and_parse(session, url, semaphore): async with semaphore: async with session.get(url) as resp: html = await resp.text() soup = await parse_with_bs(html) title = soup.title.string if soup.title else "" links = [a.get("href") for a in soup.find_all("a", href=True)] return {"url": url, "title": title, "links": links[:10]}

6.3 异步爬虫的性能提升

异步爬虫的核心优势在于:在等待 HTTP 响应的过程中,事件循环可以切换到其他协程去发送更多的请求。以一个爬取100个URL的场景为例:

关键优势:异步爬虫可以在单线程内轻松支持数百甚至数千个并发连接,而多线程在数百个线程时就已经出现严重的性能下降。异步模型在连接数超过100时优势更加明显。

七、性能对比

7.1 同步 vs 多线程 vs 异步爬虫

以下是一个简单的性能对比测试方案:

import time import requests from concurrent.futures import ThreadPoolExecutor import aiohttp import asyncio URLS = [f"https://httpbin.org/delay/1" for _ in range(20)] # 1. 同步方式 def sync_crawl(): start = time.time() for url in URLS: requests.get(url) return time.time() - start # 2. 多线程方式 def thread_crawl(): start = time.time() with ThreadPoolExecutor(max_workers=10) as pool: list(pool.map(requests.get, URLS)) return time.time() - start # 3. 异步方式 async def async_crawl(): async with aiohttp.ClientSession() as session: tasks = [session.get(url) for url in URLS] await asyncio.gather(*tasks) def async_measure(): start = time.time() asyncio.run(async_crawl()) return time.time() - start print(f"同步: {sync_crawl():.2f}s") print(f"多线程(10): {thread_crawl():.2f}s") print(f"异步: {async_measure():.2f}s")

7.2 速度与资源占用对比

对比维度 同步爬虫 多线程爬虫 异步爬虫
执行模型 顺序阻塞 并发(OS线程) 并发(协程)
内存占用 低(单线程) 高(每线程约8MB栈) 极低(协程约几KB)
最大并发数 1 几十(受限于线程开销) 数千(受限于连接数)
代码复杂度 最低 中等(需锁同步) 较高(async/await)
读写文件/数据库 直接操作 需加锁 建议用连接池
适用场景 少量URL、快速原型 中等规模、需简单并发 大规模、高性能爬虫

实测经验:在爬取100个延迟1秒的URL时,同步爬虫约102秒,多线程(10线程)约12秒,异步(50并发)仅需约4秒。异步模型的并发优势在大规模爬取时呈现指数级差距。

八、注意事项与最佳实践

8.1 不要混用同步阻塞库

在异步协程中绝对不能直接调用同步阻塞操作(如 requests.get()time.sleep()、文件读写等),这些操作会阻塞整个事件循环,导致所有协程都无法运行。如果必须使用同步库,务必通过 loop.run_in_executor() 在线程池中执行。

8.2 连接数控制

虽然异步爬虫可以轻松发起数千个并发请求,但需要注意:

8.3 异常处理完整性

网络请求可能面临各种异常情况:超时、连接重置、DNS解析失败、SSL证书错误、HTTP状态码错误、服务器主动断开连接等。健壮的爬虫必须对每种异常进行合理处理,避免一个请求的失败导致整个程序崩溃。建议为所有网络请求添加 try/except 块,并实现重试机制。

8.4 代理在异步中的使用

使用代理时需要注意:

核心总结:异步爬虫是 Python 爬虫技术集中体现的难点——它综合运用了事件循环、协程调度、HTTP协议、并发控制、异常处理等多项知识。掌握 asyncio + aiohttp 组合,可以在单机即可达到每秒数千页面的爬取能力,是构建大规模爬虫系统的基石。