一、异步编程基础
1.1 同步 vs 异步 vs 多线程
在理解异步爬虫之前,首先需要区分三种编程模型:
- 同步(Synchronous):代码按顺序逐行执行,每个操作必须等待上一个操作完成后才能进行。在爬虫场景中,发送一个HTTP请求后必须等待服务器返回响应,才能发送下一个请求。期间CPU大量时间处于空闲等待状态,造成资源浪费。
- 多线程(Multithreading):通过创建多个线程,每个线程独立执行任务。虽然可以实现并发,但线程的创建和切换开销较大,且受全局解释器锁(GIL)限制,Python多线程在CPU密集型任务中表现不佳。不过对于I/O密集型任务,多线程仍然可以提升效率。
- 异步(Asynchronous):在单线程内通过事件循环调度任务,遇到I/O等待时主动让出CPU,切换到其他任务执行。避免了线程创建和上下文切换的开销,是Python中最高效的I/O密集型任务解决方案。
1.2 爬虫是I/O密集型任务
网络爬虫的本质是大量HTTP请求的发送与响应接收。一次HTTP请求中,90%以上的时间都花费在网络传输上——等待DNS解析、建立TCP连接、发送请求、等待服务器处理、接收响应数据。在这段时间内,CPU几乎处于空闲状态。因此爬虫是典型的I/O密集型任务,非常适合使用异步编程模型来优化性能。
1.3 协程(Coroutine)的概念
协程是一种用户态的轻量级线程,也称为微线程。与操作系统线程不同,协程的调度完全由程序自身控制,而不是由操作系统内核管理。协程可以在执行到某个耗时操作时主动挂起(yield),让其他协程继续执行,从而实现协作式多任务处理。Python中通过 async/await 语法来实现协程。
1.4 async/await 语法
async def 用于定义一个协程函数,调用该函数会返回一个协程对象。协程对象需要通过 await 来执行。await 关键字用于挂起当前协程,等待另一个协程完成,同时将控制权交还给事件循环,让事件循环调度其他就绪的协程。
import asyncio
async def fetch_data(url):
print(f"开始请求: {url}")
# await 会挂起当前协程,模拟网络等待
await asyncio.sleep(1)
print(f"完成请求: {url}")
return f"数据来自 {url}"
# 调用协程函数返回协程对象,不会立即执行
coro = fetch_data("https://example.com")
# 需要通过 await 或 asyncio.run() 来执行
1.5 事件循环(Event Loop)
事件循环是异步编程的核心调度器。它维护着一个任务队列,不断循环检查每个任务的状态:如果任务正在等待I/O操作,则将其挂起并切换到下一个就绪的任务;如果任务已经准备就绪,则继续执行。Python的 asyncio 模块提供了高性能的事件循环实现,底层基于 selectors 模块或 IOCP(Windows)等系统级I/O多路复用机制。
核心理解:异步编程不是让程序运行得更快,而是让程序在等待I/O时不闲着,从而在单位时间内完成更多任务。对于爬虫而言,就是在一个请求等待响应的过程中,去发送其他请求。
二、asyncio 模块详解
2.1 定义和运行协程
async def 定义一个协程函数,内部的 await 表达式会挂起当前协程直到 awaitable 对象完成。使用 asyncio.run() 是启动事件循环的标准方式,它创建一个新的事件循环,运行传入的协程,并在完成后关闭事件循环。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # 模拟I/O等待
print("World")
asyncio.run(hello()) # 启动事件循环
2.2 并发执行多个协程
asyncio.gather() 是并发执行多个协程的核心函数。它接收多个 awaitable 对象,并发地调度它们,并等待所有任务完成。返回值是一个列表,顺序与传入的协程顺序一致。如果任何一个协程抛出异常,gather 默认会传播异常,但可以通过 return_exceptions=True 参数将异常作为返回值。
async def main():
tasks = [
fetch_data("https://api1.example.com"),
fetch_data("https://api2.example.com"),
fetch_data("https://api3.example.com"),
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
2.3 创建与管理任务
asyncio.create_task() 用于将协程包装成一个 Task 对象,并立即安排到事件循环中执行。与直接 await 不同,create_task 不会阻塞当前协程——它会立即返回 Task 对象,协程在后台开始执行。这对于在等待其他操作时执行后台任务非常有用。
async def main():
# 创建多个任务,它们会立即开始执行
task1 = asyncio.create_task(fetch_data("https://api1.com"))
task2 = asyncio.create_task(fetch_data("https://api2.com"))
# 在此可以执行其他操作...
print("任务已在后台启动...")
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
return results
2.4 协程间的通信
asyncio 提供了多种协程间通信机制:asyncio.Queue 是生产消费者模式的首选,支持多个生产者协程向队列中放入 URL,多个消费者协程从队列中取出 URL 并爬取。asyncio.Event 用于协程间的同步通知,asyncio.Lock 用于保护共享资源的互斥访问,asyncio.Semaphore 用于控制并发数量。
要点总结:asyncio.Queue 是实现爬虫任务调度最常用的工具,它天然支持协程环境下的线程安全操作。结合 Semaphore 可以实现精准的并发数控制。
三、aiohttp 客户端基础
3.1 安装与导入
aiohttp 是 Python 中最流行的异步 HTTP 客户端/服务端框架。使用 pip 即可安装:
pip install aiohttp
安装完成后,在代码中导入:
import aiohttp
import asyncio
3.2 创建会话与发送请求
aiohttp 的核心是 ClientSession 对象。它管理连接池和 Cookie,建议作为上下文管理器使用(async with)以确保资源正确释放。每个会话可以发送任意数量的请求,连接会自动复用。
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# POST 请求
async def post_data(url, data):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
return await response.json()
3.3 响应读取方式
aiohttp 提供了多种读取响应内容的方法:
response.text() —— 以字符串形式读取响应体,可指定编码(encoding参数)
response.json() —— 将响应体解析为 JSON 对象
response.read() —— 以字节形式读取原始响应体,适合图片、文件等二进制数据
这些方法都是协程,需要使用 await 调用。
3.4 超时设置
aiohttp.ClientTimeout 用于控制请求的超时行为。可以设置总超时、连接超时和读取超时:
from aiohttp import ClientTimeout
# 总超时30秒
timeout = ClientTimeout(total=30)
# 更精细的超时控制
timeout = ClientTimeout(
total=60, # 整个请求的总超时
connect=10, # 连接建立超时
sock_read=30 # 读取数据超时
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
return await resp.text()
3.5 连接限制
TCPConnector 用于控制连接池行为,包括最大并发连接数、每个主机的最大连接数、SSL 验证等:
from aiohttp import TCPConnector
# 限制总连接数为50,每个主机最多10个连接
connector = TCPConnector(
limit=50, # 总并发连接数
limit_per_host=10, # 每个主机的最大连接数
ssl=False, # 是否验证SSL证书(开发环境可关闭)
ttl_dns_cache=300 # DNS缓存时间(秒)
)
async with aiohttp.ClientSession(connector=connector) as session:
# ... 发送请求
四、aiohttp 高级用法
4.1 Session 复用与连接池
在一个爬虫程序中,不应该为每个请求都创建一个新的 ClientSession。正确的做法是创建一个全局 Session,在所有协程中复用。Session 内部维护了连接池(默认最多100个连接),复用 Session 可以显著减少 TCP 握手的开销:
async def crawl():
connector = TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
urls = ["https://example.com/page/{}".format(i) for i in range(100)]
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
4.2 自定义请求头
模拟浏览器请求头可以减少被反爬虫策略拦截的概率。可以通过 Session 级别的默认头或请求级别的自定义头来实现:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
# 全局请求头
session = aiohttp.ClientSession(headers=headers)
# 或按请求设置
async with session.get(url, headers={"Referer": "https://www.google.com"}) as resp:
...
4.3 代理支持
aiohttp 原生支持 HTTP/HTTPS/SOCKS 代理,通过 proxy 参数指定:
# HTTP 代理
async with session.get(url, proxy="http://127.0.0.1:7890") as resp:
...
# 需要认证的代理
async with session.get(
url,
proxy="http://user:password@127.0.0.1:7890"
) as resp:
...
4.4 Cookie 管理
ClientSession 自动管理 Cookie,默认使用 CookieJar 存储。可以创建自定义的 CookieJar 来控制 Cookie 策略:
from aiohttp import CookieJar
# 默认 CookieJar(遵守 HTTP 标准)
jar = CookieJar()
# 自定义 CookieJar(不验证域,适合测试)
jar_unsafe = CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)
# 手动设置 Cookie
session.cookie_jar.update_cookies(
{"session_id": "abc123"},
response_url
)
4.5 异常处理
aiohttp 定义了丰富的异常体系,爬虫中应分类处理:
from aiohttp import (
ClientError, ClientConnectionError,
ClientTimeout, ClientHttpProxyError,
InvalidURL, ServerTimeoutError
)
async def safe_fetch(session, url):
try:
async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
if resp.status == 200:
return await resp.text()
else:
print(f"HTTP {resp.status}: {url}")
return None
except ClientTimeout:
print(f"超时: {url}")
except ClientConnectionError:
print(f"连接失败: {url}")
except ClientHttpProxyError:
print(f"代理错误: {url}")
except InvalidURL:
print(f"无效URL: {url}")
except ClientError as e:
print(f"其他客户端错误: {url}, {e}")
return None
4.6 重试机制
网络请求不可靠,合理的重试机制是健壮爬虫的必备功能:
async def fetch_with_retry(session, url, max_retries=3):
for attempt in range(1, max_retries + 1):
try:
async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
if resp.status == 200:
return await resp.text()
elif resp.status in (429, 503):
# 被限流或服务暂时不可用,等待后重试
wait = 2 ** attempt
print(f"被限流,等待{wait}秒后重试...")
await asyncio.sleep(wait)
else:
return None
except (ClientTimeout, ClientConnectionError) as e:
if attempt == max_retries:
print(f"重试{max_retries}次后仍然失败: {url}")
return None
wait = 2 ** attempt
print(f"第{attempt}次失败,{wait}秒后重试: {url}")
await asyncio.sleep(wait)
return None
五、并发爬虫实现
5.1 信号量限制并发数
asyncio.Semaphore 是控制并发请求数量的核心工具。它维护一个计数器,每次 acquire 减少1,每次 release 增加1。当计数器为0时,acquire 会阻塞直到有其他协程 release。合理设置并发数可以避免对目标服务器造成过大压力,同时防止被反爬虫机制封禁:
semaphore = asyncio.Semaphore(10) # 最多10个并发
async def fetch_with_limit(session, url):
async with semaphore: # 超过限制时会自动等待
async with session.get(url) as resp:
return await resp.text()
async def crawl(urls):
connector = TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_with_limit(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
5.2 请求队列
使用 asyncio.Queue 实现生产者-消费者模式,适合大规模爬虫场景。生产者负责发现并添加 URL,消费者从队列中取出 URL 并爬取:
async def worker(name, session, queue, semaphore):
while True:
url = await queue.get()
async with semaphore:
try:
async with session.get(url) as resp:
html = await resp.text()
print(f"[{name}] 爬取完成: {url}")
except Exception as e:
print(f"[{name}] 失败: {url}, {e}")
queue.task_done()
async def main():
queue = asyncio.Queue()
semaphore = asyncio.Semaphore(10)
# 向队列中添加初始URL
for url in ["https://example.com/page/{}".format(i) for i in range(200)]:
await queue.put(url)
connector = TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
workers = [worker(f"Worker-{i}", session, queue, semaphore)
for i in range(5)]
await asyncio.gather(*workers)
5.3 解析与存储
在实际爬虫中,爬取到的数据需要解析和存储。为了避免阻塞事件循环,CPU密集型的解析操作(如 XML/HTML 解析)应该放到线程池中执行:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
executor = ThreadPoolExecutor(max_workers=4)
async def parse_html(html):
# 使用 run_in_executor 将同步解析放入线程池
loop = asyncio.get_event_loop()
def parse():
soup = BeautifulSoup(html, "html.parser")
title = soup.title.string if soup.title else ""
return {"title": title, "length": len(html)}
return await loop.run_in_executor(executor, parse)
async def fetch_and_parse(session, url):
async with session.get(url) as resp:
html = await resp.text()
data = await parse_html(html)
return data
5.4 完整异步爬虫示例
以下是一个完整的异步爬虫示例,整合了以上所有知识点:
import asyncio
import aiohttp
from aiohttp import ClientTimeout, TCPConnector, ClientError
async def safe_fetch(session, url, semaphore, timeout=10):
async with semaphore:
try:
async with session.get(
url, timeout=ClientTimeout(total=timeout)
) as resp:
if resp.status == 200:
return url, await resp.text()
return url, None
except ClientError as e:
print(f"请求失败: {url}, {e}")
return url, None
async def async_crawl(urls, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
connector = TCPConnector(limit=max_concurrent * 2, ssl=False)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36"
}
async with aiohttp.ClientSession(
connector=connector, headers=headers
) as session:
tasks = [
safe_fetch(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return {url: html for url, html in results if html is not None}
# 使用示例
urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 10)]
results = asyncio.run(async_crawl(urls, max_concurrent=5))
print(f"成功爬取 {len(results)} 个页面")
六、aiohttp + BeautifulSoup
6.1 异步请求 + 同步解析
在异步爬虫中,网络请求部分使用 aiohttp 实现异步,而 HTML 解析使用 BeautifulSoup(同步库)。这是一种非常高效且常用的组合。关键在于将 BeautifulSoup 的同步解析操作放到线程池中执行,避免阻塞事件循环。
6.2 run_in_executor 处理阻塞操作
BeautifulSoup 的解析操作是 CPU 密集型的,如果直接在协程中执行会阻塞事件循环,导致其他协程无法运行。通过 loop.run_in_executor() 可以将同步函数提交到线程池执行,返回一个 awaitable 对象:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4)
async def parse_with_bs(html):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
pool,
lambda: BeautifulSoup(html, "html.parser")
)
async def fetch_and_parse(session, url, semaphore):
async with semaphore:
async with session.get(url) as resp:
html = await resp.text()
soup = await parse_with_bs(html)
title = soup.title.string if soup.title else ""
links = [a.get("href") for a in soup.find_all("a", href=True)]
return {"url": url, "title": title, "links": links[:10]}
6.3 异步爬虫的性能提升
异步爬虫的核心优势在于:在等待 HTTP 响应的过程中,事件循环可以切换到其他协程去发送更多的请求。以一个爬取100个URL的场景为例:
- 同步爬虫:每个请求耗时约1秒(网络延迟),总耗时约100秒
- 多线程爬虫(10线程):约10秒完成,但线程切换有一定开销
- 异步爬虫(10并发):约10秒完成,零线程切换开销,支持更高的并发数
关键优势:异步爬虫可以在单线程内轻松支持数百甚至数千个并发连接,而多线程在数百个线程时就已经出现严重的性能下降。异步模型在连接数超过100时优势更加明显。
七、性能对比
7.1 同步 vs 多线程 vs 异步爬虫
以下是一个简单的性能对比测试方案:
import time
import requests
from concurrent.futures import ThreadPoolExecutor
import aiohttp
import asyncio
URLS = [f"https://httpbin.org/delay/1" for _ in range(20)]
# 1. 同步方式
def sync_crawl():
start = time.time()
for url in URLS:
requests.get(url)
return time.time() - start
# 2. 多线程方式
def thread_crawl():
start = time.time()
with ThreadPoolExecutor(max_workers=10) as pool:
list(pool.map(requests.get, URLS))
return time.time() - start
# 3. 异步方式
async def async_crawl():
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in URLS]
await asyncio.gather(*tasks)
def async_measure():
start = time.time()
asyncio.run(async_crawl())
return time.time() - start
print(f"同步: {sync_crawl():.2f}s")
print(f"多线程(10): {thread_crawl():.2f}s")
print(f"异步: {async_measure():.2f}s")
7.2 速度与资源占用对比
| 对比维度 |
同步爬虫 |
多线程爬虫 |
异步爬虫 |
| 执行模型 |
顺序阻塞 |
并发(OS线程) |
并发(协程) |
| 内存占用 |
低(单线程) |
高(每线程约8MB栈) |
极低(协程约几KB) |
| 最大并发数 |
1 |
几十(受限于线程开销) |
数千(受限于连接数) |
| 代码复杂度 |
最低 |
中等(需锁同步) |
较高(async/await) |
| 读写文件/数据库 |
直接操作 |
需加锁 |
建议用连接池 |
| 适用场景 |
少量URL、快速原型 |
中等规模、需简单并发 |
大规模、高性能爬虫 |
实测经验:在爬取100个延迟1秒的URL时,同步爬虫约102秒,多线程(10线程)约12秒,异步(50并发)仅需约4秒。异步模型的并发优势在大规模爬取时呈现指数级差距。
八、注意事项与最佳实践
8.1 不要混用同步阻塞库
在异步协程中绝对不能直接调用同步阻塞操作(如 requests.get()、time.sleep()、文件读写等),这些操作会阻塞整个事件循环,导致所有协程都无法运行。如果必须使用同步库,务必通过 loop.run_in_executor() 在线程池中执行。
8.2 连接数控制
虽然异步爬虫可以轻松发起数千个并发请求,但需要注意:
- 目标服务器通常有反爬虫策略,过高的并发可能导致IP被封禁
- 本地网络环境和系统文件描述符限制(Linux中可通过 ulimit 调整)
- 建议从较小的并发数(如10-20)开始,逐步调优
- 使用 asyncio.Semaphore 和 TCPConnector.limit 双重控制
8.3 异常处理完整性
网络请求可能面临各种异常情况:超时、连接重置、DNS解析失败、SSL证书错误、HTTP状态码错误、服务器主动断开连接等。健壮的爬虫必须对每种异常进行合理处理,避免一个请求的失败导致整个程序崩溃。建议为所有网络请求添加 try/except 块,并实现重试机制。
8.4 代理在异步中的使用
使用代理时需要注意:
- aiohttp 对 HTTP 代理支持良好,但 SOCKS5 代理需要安装
aiohttp-socks 库
- 代理连接的建立也需要时间,大量并发使用同一代理可能造成代理延迟增加
- 建议为不同的 URL 或任务轮换使用不同的代理 IP
- 对于高匿代理,可以全局设置一个代理:
session.get(url, proxy=proxy_url)
核心总结:异步爬虫是 Python 爬虫技术集中体现的难点——它综合运用了事件循环、协程调度、HTTP协议、并发控制、异常处理等多项知识。掌握 asyncio + aiohttp 组合,可以在单机即可达到每秒数千页面的爬取能力,是构建大规模爬虫系统的基石。