← 返回网络爬虫目录
← 返回学习笔记首页
Scrapy中间件与管道进阶
网络爬虫专题 · 掌握Scrapy高级组件
专题: Python网络爬虫系统学习
关键词: Python, 网络爬虫, Scrapy中间件, Pipeline, 代理中间件, UA伪装, CrawlSpider, 请求重试
一、Downloader中间件
1.1 作用与原理
Downloader Middleware(下载中间件)是Scrapy中位于Engine和Downloader之间的钩子框架,用于处理Request和Response的全局机制。当Engine向Downloader发送请求时,请求会依次经过所有下载中间件的 process_request 方法;当Downloader返回响应时,响应会依次经过 process_response 方法。如果请求或响应处理过程中发生异常,则会触发 process_exception 方法。
1.2 内置中间件列表
Scrapy内置了多个实用的下载中间件,默认按以下顺序启用:
顺序 中间件 功能
1 RobotstxtMiddleware 遵守robots.txt协议
2 HttpAuthMiddleware HTTP基本认证
3 DownloadTimeoutMiddleware 设置下载超时
4 UserAgentMiddleware 设置User-Agent
5 RetryMiddleware 请求重试机制
6 DefaultHeadersMiddleware 设置默认请求头
7 HttpProxyMiddleware HTTP代理支持
8 RedirectMiddleware 重定向处理
9 CookiesMiddleware Cookie管理
10 HttpCompressionMiddleware HTTP压缩支持
1.3 自定义下载中间件
自定义下载中间件需要实现以下一个或多个方法:
process_request(self, request, spider)
在请求被发送到Downloader之前被调用。可以返回 None(继续处理)、Response对象(短路后续中间件)、Request对象(重新调度)或抛出 IgnoreRequest 异常。典型用法包括修改请求头、更换代理等。
process_response(self, request, response, spider)
在Downloader返回响应后被调用。可以返回 Response 对象(继续传递或替换)、Request 对象(重新请求)。常用于检查响应状态码、处理反爬等。
process_exception(self, request, exception, spider)
当 process_request 或 process_response 抛出异常时被调用。可以返回 None(继续向上传递异常)、Response 对象或 Request 对象。
class CustomDownloaderMiddleware:
def process_request(self, request, spider):
request.headers['Custom-Header'] = 'value'
return None # 继续传递
def process_response(self, request, response, spider):
if response.status != 200:
return request.replace(dont_filter=True)
return response
def process_exception(self, request, exception, spider):
spider.logger.error(f'请求异常: {exception}')
return request.replace(dont_filter=True)
1.4 中间件优先级配置
在 settings.py 中通过 DOWNLOADER_MIDDLEWARES 字典配置中间件及其优先级,数值越小越靠近Engine(越先处理请求)。优先级范围为 0-1000,内置中间件通常在 500-900 之间。
DOWNLOADER_MIDDLEWARES = {
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'myproject.middlewares.ProxyMiddleware': 350,
'myproject.middlewares.UAMiddleware': 400,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500,
}
1.5 常用场景
更换User-Agent :随机切换UA伪装不同浏览器
代理IP :集成代理池实现IP轮换
请求重试 :根据状态码和异常类型自动重试
自定义Cookie :动态管理登录状态和会话
请求签名 :添加自定义签名参数绕过反爬
响应预处理 :统一解码、解压缩、清洗响应内容
二、Spider中间件
2.1 作用与原理
Spider Middleware(爬虫中间件)位于Engine和Spider之间,负责处理Spider的输入(Response)和输出(Item/Request)。它在引擎将响应传递给爬虫处理之前,以及爬虫返回Item或Request之后被调用。与下载中间件不同,Spider中间件更关注数据的后处理层。
2.2 核心方法
Spider中间件主要涉及四个方法:
process_spider_input(response, spider) :当Response被传递给Spider的parse方法之前调用。返回None继续处理,抛出异常则触发 process_spider_exception。
process_spider_output(response, result, spider) :当Spider返回结果(Item或Request)时调用。必须返回可迭代对象(包含Item或Request)。
process_spider_exception(response, exception, spider) :处理Spider中抛出的异常,可以返回None(传递异常)或可迭代对象(恢复处理)。
process_start_requests(start_requests, spider) :在Spider的起始请求被发送之前调用,可用来过滤或修改初始请求。
class CustomSpiderMiddleware:
def process_spider_input(self, response, spider):
if response.status == 503:
raise IgnoreRequest('服务不可用,跳过')
return None
def process_spider_output(self, response, result, spider):
for item in result:
if isinstance(item, dict):
item['processed_by'] = spider.name
yield item
else:
yield item
def process_spider_exception(self, response, exception, spider):
spider.logger.warning(f'Spider异常: {exception}')
return []
2.3 应用场景
统一的数据清洗和字段补充
Response的预处理和后处理
异常捕获与恢复机制
请求去重和过滤
爬虫粒度的统计分析
提示: Spider中间件与Downloader中间件的主要区别在于处理阶段不同。Downloader中间件处理的是网络层的请求和响应,而Spider中间件处理的是数据层的响应和输出结果。一般情况下,自定义中间件以Downloader中间件为主,Spider中间件多用于数据加工。
三、Item Pipeline深入
3.1 爬虫生命周期方法
Pipeline提供了两个与爬虫生命周期相关的方法:
open_spider(self, spider) :Spider打开时调用,用于初始化资源,如建立数据库连接、打开文件句柄等。
close_spider(self, spider) :Spider关闭时调用,用于清理资源,如关闭数据库连接、关闭文件等。
3.2 从配置中获取参数
使用 from_crawler 类方法可以从Scrapy全局配置中获取参数,这是一种依赖注入的经典实现:
class MySQLPipeline:
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
return cls(
host=settings.get('MYSQL_HOST', 'localhost'),
port=settings.getint('MYSQL_PORT', 3306),
db=settings.get('MYSQL_DB', 'scrapy'),
user=settings.get('MYSQL_USER', 'root'),
password=settings.get('MYSQL_PASSWORD', '')
)
def __init__(self, host, port, db, user, password):
self.host = host
self.port = port
self.db = db
self.user = user
self.password = password
def open_spider(self, spider):
self.conn = pymysql.connect(
host=self.host, port=self.port,
user=self.user, password=self.password,
database=self.db, charset='utf8mb4'
)
self.cursor = self.conn.cursor()
def close_spider(self, spider):
self.cursor.close()
self.conn.close()
3.3 多Pipeline数据分流
当项目中定义了多个Pipeline时,可以通过 ITEM_PIPELINES 配置控制优先级。利用 Item 的类型或特定字段实现数据分流:
class ImagePipeline:
def process_item(self, item, spider):
if 'image_url' not in item:
return item
# 处理图片下载
return item
class TextPipeline:
def process_item(self, item, spider):
if 'content' not in item:
return item
# 保存文本内容
return item
class StatsPipeline:
def process_item(self, item, spider):
spider.crawler.stats.inc_value('items_processed')
return item
3.4 Pipeline异常处理
在Pipeline中抛出 DropItem 异常可以将当前Item丢弃,不再传递给后续Pipeline。配合 Spider 的日志系统记录被丢弃的Item:
class ValidationPipeline:
def process_item(self, item, spider):
if not item.get('title'):
raise DropItem(f'缺少title字段,丢弃Item')
if not item.get('url'):
raise DropItem(f'缺少url字段,丢弃Item')
return item
3.5 数据存储Pipeline实现
MySQL Pipeline
class MySQLPipeline:
def process_item(self, item, spider):
sql = 'INSERT INTO articles(title, content, url, create_time) VALUES (%s, %s, %s, %s)'
params = (item['title'], item['content'], item['url'], datetime.now())
try:
self.cursor.execute(sql, params)
self.conn.commit()
except Exception as e:
self.conn.rollback()
spider.logger.error(f'数据库写入失败: {e}')
return item
MongoDB Pipeline
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
self.db.articles.update_one(
{'url': item['url']},
{'$set': dict(item)},
upsert=True
)
return item
def close_spider(self, spider):
self.client.close()
Redis Pipeline(去重+队列)
class RedisPipeline:
def open_spider(self, spider):
self.redis_client = redis.Redis(
host=spider.settings.get('REDIS_HOST', 'localhost'),
port=spider.settings.getint('REDIS_PORT', 6379)
)
def process_item(self, item, spider):
key = f'article:{item["url"]}'
if self.redis_client.exists(key):
raise DropItem(f'重复文章: {item["url"]}')
self.redis_client.setex(key, 86400, '1')
self.redis_client.lpush('articles:queue', json.dumps(dict(item)))
return item
四、代理中间件实现
4.1 随机代理中间件
代理中间件是反爬对抗中最重要的组件之一。通过维护一个代理池,每次请求随机选择一个代理进行访问,可以有效降低被封IP的风险。
class RandomProxyMiddleware:
def __init__(self, proxies):
self.proxies = proxies
@classmethod
def from_crawler(cls, crawler):
proxies = crawler.settings.getlist('PROXY_LIST')
if not proxies:
proxies = [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
]
return cls(proxies)
def process_request(self, request, spider):
proxy = random.choice(self.proxies)
request.meta['proxy'] = proxy
spider.logger.debug(f'使用代理: {proxy}')
def process_response(self, request, response, spider):
if response.status in [403, 429, 503]:
spider.logger.warning(f'代理被限制: {request.meta.get("proxy")}')
new_request = request.copy()
new_request.dont_filter = True
return new_request
return response
4.2 代理池集成
在实际生产环境中,代理池通常是一个独立的服务,提供 API 接口动态获取可用代理。可以在中间件中请求代理池 API,并按需获取新的代理:
class ProxyPoolMiddleware:
def __init__(self):
self.proxy_api_url = 'http://proxy-pool:5010/get/'
self.valid_proxies = []
def fetch_proxy(self):
try:
resp = requests.get(self.proxy_api_url, timeout=3)
if resp.status_code == 200:
proxy = resp.json().get('proxy')
return f'http://{proxy}'
except Exception as e:
print(f'获取代理失败: {e}')
return None
def process_request(self, request, spider):
if 'proxy' not in request.meta or not self.test_proxy(request.meta['proxy']):
proxy = self.fetch_proxy()
if proxy:
request.meta['proxy'] = proxy
4.3 代理验证与自动切换
代理的稳定性直接影响爬虫成功率。建议实现代理验证机制:
可用性检查 :定期使用测试URL验证代理是否可用
响应时间统计 :记录每个代理的响应时间,优先使用速度快的代理
失败计数器 :连续失败N次的代理从池中移除
自动切换 :当某个代理触发了反爬机制时(403、429状态码),立即切换到下一个代理并重试请求
注意: 免费代理通常不稳定,建议在关键爬虫中使用付费代理服务或自建代理池。同时,HTTPS代理比HTTP代理更安全,但成本也更高。
五、User-Agent中间件
5.1 随机UA切换
User-Agent是服务器识别客户端的重要标识。为了避免被单一UA特征识别,需要在每次请求时随机切换UA。Scrapy默认的 UserAgentMiddleware 只会使用配置中的单一UA,我们需要自定义中间件来替换它。
class RandomUserAgentMiddleware:
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) '
'Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
]
def process_request(self, request, spider):
ua = random.choice(self.user_agents)
request.headers['User-Agent'] = ua
5.2 fake_useragent库集成
手动维护UA列表不够灵活,推荐使用 fake_useragent 库自动生成真实的UA字符串:
from fake_useragent import UserAgent
class FakeUAMiddleware:
def __init__(self):
self.ua = UserAgent()
self.ua_cache = {}
self.cache_size = 50
def process_request(self, request, spider):
# 按浏览器类型随机,确保多样性
browser = random.choice(['chrome', 'firefox', 'safari', 'edge'])
ua = getattr(self.ua, browser)
request.headers['User-Agent'] = ua
5.3 UA池管理的最佳实践
组合策略 :UA与平台、操作系统、浏览器版本交叉组合,生成更真实的身份标识
频率控制 :避免短时间内对同一域名使用大量不同UA,模拟正常访问行为
会话保持 :同一会话尽量使用同一UA,避免被检测出异常切换
移动端UA :针对移动端站点,准备移动端UA池(Android、iOS)
六、请求重试与错误处理
6.1 RetryMiddleware配置
Scrapy内置了 RetryMiddleware,通过以下配置控制重试行为:
# settings.py 配置
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429]
RETRY_PRIORITY_ADJUST = -1
6.2 自定义重试条件
当内置重试策略不满足需求时,可以通过继承 RetryMiddleware 并重写方法来实现自定义逻辑:
from scrapy.downloadermiddlewares.retry import RetryMiddleware
class CustomRetryMiddleware(RetryMiddleware):
def __init__(self, settings):
super().__init__(settings)
self.max_retry_times = settings.getint('CUSTOM_RETRY_TIMES', 5)
def process_response(self, request, response, spider):
if response.status in self.retry_http_codes:
reason = f'状态码 {response.status}'
return self._retry(request, reason, spider)
# 检查响应内容是否为空
if len(response.body) < 100:
reason = '响应内容过短,可能被反爬'
return self._retry(request, reason, spider)
return response
def process_exception(self, request, exception, spider):
# 记录异常
spider.logger.error(f'请求失败: {request.url}, 异常: {exception}')
if isinstance(exception, (TimeoutError, ConnectionError)):
return self._retry(request, str(exception), spider)
return None
6.3 错误日志记录
系统化的错误日志对排查问题至关重要。建议在中间件和Pipeline中统一记录:
请求级别 :记录每次请求的URL、状态码、耗时、代理信息
异常级别 :记录异常类型、堆栈信息、请求参数
重试日志 :记录重试次数、重试原因、重试前后代理变化
丢弃日志 :记录被Pipeline丢弃的Item及其原因
6.4 请求失败回调(errback)
Scrapy的每个 Request 对象支持 errback 参数,当请求发生异常或重试耗尽后自动调用该回调函数。errback 接收一个 Failure 对象作为参数:
def parse(self, response):
yield scrapy.Request(
url='https://example.com/api/data',
callback=self.parse_api,
errback=self.handle_error,
meta={'retry_times': 0}
)
def handle_error(self, failure):
request = failure.request
self.logger.error(f'请求最终失败: {request.url}')
self.logger.error(f'失败原因: {failure.value}')
# 可以记录到数据库或发送告警
item = FailedItem()
item['url'] = request.url
item['error'] = str(failure.value)
yield item
七、CrawlSpider与Rule
7.1 CrawlSpider概述
CrawlSpider是Scrapy中用于全站爬取的Spider子类。它通过定义一组Rule规则来自动发现和跟进链接,非常适合做整站抓取或大规模内容采集。与普通Spider需要手动编写所有 parse 逻辑不同,CrawlSpider通过Rules规则引擎将链接提取和回调处理解耦。
7.2 Rule规则定义
每个Rule包含三个核心参数:
LinkExtractor :定义如何从页面中提取链接,支持allow/regex、deny、allow_domains等过滤条件
callback :指定提取到的链接对应的响应由哪个回调函数处理
follow :布尔值,表示是否在目标页面中继续应用该Rule提取链接(默认True)
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class NewsSpider(CrawlSpider):
name = 'news_spider'
allowed_domains = ['example-news.com']
start_urls = ['https://example-news.com/']
rules = (
# 提取分类页链接,跟进但不解析
Rule(LinkExtractor(allow=r'/category/\w+/'), follow=True),
# 提取文章详情页链接,由 parse_article 处理
Rule(LinkExtractor(
allow=r'/article/\d+\.html',
restrict_css='.article-list a'
), callback='parse_article'),
# 提取分页链接,继续跟进
Rule(LinkExtractor(
allow=r'/page/\d+/',
restrict_xpaths='//div[@class="pagination"]/a'
), follow=True),
)
def parse_article(self, response):
yield {
'title': response.css('h1::text').get(),
'content': response.css('.article-body::text').getall(),
'url': response.url,
'publish_time': response.css('.time::text').get(),
}
7.3 LinkExtractor高级用法
LinkExtractor提供了丰富的参数来精确控制链接提取:
allow/deny :正则表达式过滤URL
allow_domains/deny_domains :域名黑白名单
restrict_xpaths/restrict_css :限定提取区域,提高效率和准确性
tags/attrs :指定从哪些HTML标签和属性中提取链接
unique :是否去重(默认True)
process_value :对提取的URL进行预处理
7.4 适用场景
新闻网站全站文章抓取
电商网站商品信息采集
博客/论坛内容迁移
SEO整站分析
站点地图自动发现
最佳实践: 使用CrawlSpider时务必设置 allowed_domains 避免爬虫爬出站外。对于大型站点,配合 DEPTH_LIMIT 控制爬取深度,使用 CLOSESPIDER_PAGECOUNT 限制爬取页面总数,防止失控。
八、Scrapy扩展(Extensions)
8.1 扩展的生命周期
Scrapy扩展是一种在爬虫运行期间执行自定义功能的机制。扩展与中间件的关键区别在于:扩展不直接参与请求/响应/数据的处理流程,而是作为独立模块在后台运行,用于监控、统计和功能增强。扩展通过连接Scrapy的信号系统来实现其功能,生命周期包括初始化、启动、运行和关闭四个阶段。
8.2 自定义扩展实现
以下是一个统计爬虫运行状态的扩展示例,记录了请求数、成功数、失败数等关键指标:
from scrapy import signals
class StatsMonitorExtension:
def __init__(self, crawler):
self.crawler = crawler
self.start_time = None
self.item_count = 0
@classmethod
def from_crawler(cls, crawler):
ext = cls(crawler)
# 连接各个信号
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)
crawler.signals.connect(ext.request_scheduled, signal=signals.request_scheduled)
crawler.signals.connect(ext.request_dropped, signal=signals.request_dropped)
return ext
def spider_opened(self, spider):
self.start_time = datetime.now()
spider.logger.info(f'爬虫 {spider.name} 启动')
def spider_closed(self, spider, reason):
elapsed = datetime.now() - self.start_time
spider.logger.info(
f'爬虫 {spider.name} 结束,原因: {reason},'
f'耗时: {elapsed},采集Item: {self.item_count}'
)
def item_scraped(self, item, spider):
self.item_count += 1
def item_dropped(self, item, spider, exception):
spider.logger.warning(f'Item被丢弃: {item}, 原因: {exception}')
def request_scheduled(self, request, spider):
pass
def request_dropped(self, request, spider):
spider.logger.warning(f'请求被丢弃: {request.url}')
8.3 信号系统详解
Scrapy的信号系统是扩展的核心通信机制。以下是常用信号列表:
信号名称 触发时机 参数
engine_started 引擎启动时 无
engine_stopped 引擎停止时 无
spider_opened Spider打开时 spider
spider_closed Spider关闭时 spider, reason
request_scheduled 请求被调度时 request, spider
request_dropped 请求被丢弃时 request, spider
response_received 响应被接收时 response, request, spider
item_scraped Item被爬取时 item, response, spider
item_dropped Item被丢弃时 item, spider, exception
bytes_received 数据块被接收时 data, request, spider
8.4 扩展的实用场景
性能监控 :统计爬取速度、内存占用、请求延迟
定时告警 :当爬虫异常停止或采集量低于阈值时发送邮件/短信通知
爬虫协调 :通过Redis等中间件协调多个爬虫实例的协同工作
数据统计 :实时统计爬取总量、成功率、去重率等运营指标
优雅关闭 :在爬虫关闭前完成数据持久化和资源清理
# settings.py 中启用扩展
EXTENSIONS = {
'myproject.extensions.StatsMonitorExtension': 500,
'myproject.extensions.AlertExtension': 600,
}
总结与最佳实践
核心要点:
1. Downloader中间件是反爬对抗的前沿阵地,负责请求修饰和响应预处理;Spider中间件是数据加工的后处理层,负责数据清洗和异常处理。
2. Item Pipeline的 from_crawler 类方法是获取配置的标准模式,open_spider/close_spider 是管理资源生命周期的关键钩子。
3. 代理中间件和UA中间件是Scrapy中最常用的自定义中间件,两者配合使用可大幅降低被封概率。
4. RetryMiddleware配合errback回调可以实现全面的请求失败处理和告警机制。
5. CrawlSpider的Rule规则引擎适用于全站抓取场景,合理使用restrict_xpaths/restrict_css可以精确控制抓取范围。
6. 扩展(Extensions)通过信号系统与爬虫核心交互,适合实现监控、统计、告警等横切关注点。
7. 生产环境中建议将中间件、Pipeline和扩展的优先级参数化配置在 settings.py 中,便于维护和调试。