Scrapy中间件与管道进阶

网络爬虫专题 · 掌握Scrapy高级组件

专题:Python网络爬虫系统学习

关键词:Python, 网络爬虫, Scrapy中间件, Pipeline, 代理中间件, UA伪装, CrawlSpider, 请求重试

一、Downloader中间件

1.1 作用与原理

Downloader Middleware(下载中间件)是Scrapy中位于Engine和Downloader之间的钩子框架,用于处理Request和Response的全局机制。当Engine向Downloader发送请求时,请求会依次经过所有下载中间件的 process_request 方法;当Downloader返回响应时,响应会依次经过 process_response 方法。如果请求或响应处理过程中发生异常,则会触发 process_exception 方法。

1.2 内置中间件列表

Scrapy内置了多个实用的下载中间件,默认按以下顺序启用:

顺序中间件功能
1RobotstxtMiddleware遵守robots.txt协议
2HttpAuthMiddlewareHTTP基本认证
3DownloadTimeoutMiddleware设置下载超时
4UserAgentMiddleware设置User-Agent
5RetryMiddleware请求重试机制
6DefaultHeadersMiddleware设置默认请求头
7HttpProxyMiddlewareHTTP代理支持
8RedirectMiddleware重定向处理
9CookiesMiddlewareCookie管理
10HttpCompressionMiddlewareHTTP压缩支持

1.3 自定义下载中间件

自定义下载中间件需要实现以下一个或多个方法:

process_request(self, request, spider)

在请求被发送到Downloader之前被调用。可以返回 None(继续处理)、Response对象(短路后续中间件)、Request对象(重新调度)或抛出 IgnoreRequest 异常。典型用法包括修改请求头、更换代理等。

process_response(self, request, response, spider)

在Downloader返回响应后被调用。可以返回 Response 对象(继续传递或替换)、Request 对象(重新请求)。常用于检查响应状态码、处理反爬等。

process_exception(self, request, exception, spider)

当 process_request 或 process_response 抛出异常时被调用。可以返回 None(继续向上传递异常)、Response 对象或 Request 对象。

class CustomDownloaderMiddleware: def process_request(self, request, spider): request.headers['Custom-Header'] = 'value' return None # 继续传递 def process_response(self, request, response, spider): if response.status != 200: return request.replace(dont_filter=True) return response def process_exception(self, request, exception, spider): spider.logger.error(f'请求异常: {exception}') return request.replace(dont_filter=True)

1.4 中间件优先级配置

在 settings.py 中通过 DOWNLOADER_MIDDLEWARES 字典配置中间件及其优先级,数值越小越靠近Engine(越先处理请求)。优先级范围为 0-1000,内置中间件通常在 500-900 之间。

DOWNLOADER_MIDDLEWARES = { 'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None, 'myproject.middlewares.ProxyMiddleware': 350, 'myproject.middlewares.UAMiddleware': 400, 'scrapy.downloadermiddlewares.retry.RetryMiddleware': 500, }

1.5 常用场景

二、Spider中间件

2.1 作用与原理

Spider Middleware(爬虫中间件)位于Engine和Spider之间,负责处理Spider的输入(Response)和输出(Item/Request)。它在引擎将响应传递给爬虫处理之前,以及爬虫返回Item或Request之后被调用。与下载中间件不同,Spider中间件更关注数据的后处理层。

2.2 核心方法

Spider中间件主要涉及四个方法:

class CustomSpiderMiddleware: def process_spider_input(self, response, spider): if response.status == 503: raise IgnoreRequest('服务不可用,跳过') return None def process_spider_output(self, response, result, spider): for item in result: if isinstance(item, dict): item['processed_by'] = spider.name yield item else: yield item def process_spider_exception(self, response, exception, spider): spider.logger.warning(f'Spider异常: {exception}') return []

2.3 应用场景

提示:Spider中间件与Downloader中间件的主要区别在于处理阶段不同。Downloader中间件处理的是网络层的请求和响应,而Spider中间件处理的是数据层的响应和输出结果。一般情况下,自定义中间件以Downloader中间件为主,Spider中间件多用于数据加工。

三、Item Pipeline深入

3.1 爬虫生命周期方法

Pipeline提供了两个与爬虫生命周期相关的方法:

3.2 从配置中获取参数

使用 from_crawler 类方法可以从Scrapy全局配置中获取参数,这是一种依赖注入的经典实现:

class MySQLPipeline: @classmethod def from_crawler(cls, crawler): settings = crawler.settings return cls( host=settings.get('MYSQL_HOST', 'localhost'), port=settings.getint('MYSQL_PORT', 3306), db=settings.get('MYSQL_DB', 'scrapy'), user=settings.get('MYSQL_USER', 'root'), password=settings.get('MYSQL_PASSWORD', '') ) def __init__(self, host, port, db, user, password): self.host = host self.port = port self.db = db self.user = user self.password = password def open_spider(self, spider): self.conn = pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.db, charset='utf8mb4' ) self.cursor = self.conn.cursor() def close_spider(self, spider): self.cursor.close() self.conn.close()

3.3 多Pipeline数据分流

当项目中定义了多个Pipeline时,可以通过 ITEM_PIPELINES 配置控制优先级。利用 Item 的类型或特定字段实现数据分流:

class ImagePipeline: def process_item(self, item, spider): if 'image_url' not in item: return item # 处理图片下载 return item class TextPipeline: def process_item(self, item, spider): if 'content' not in item: return item # 保存文本内容 return item class StatsPipeline: def process_item(self, item, spider): spider.crawler.stats.inc_value('items_processed') return item

3.4 Pipeline异常处理

在Pipeline中抛出 DropItem 异常可以将当前Item丢弃,不再传递给后续Pipeline。配合 Spider 的日志系统记录被丢弃的Item:

class ValidationPipeline: def process_item(self, item, spider): if not item.get('title'): raise DropItem(f'缺少title字段,丢弃Item') if not item.get('url'): raise DropItem(f'缺少url字段,丢弃Item') return item

3.5 数据存储Pipeline实现

MySQL Pipeline

class MySQLPipeline: def process_item(self, item, spider): sql = 'INSERT INTO articles(title, content, url, create_time) VALUES (%s, %s, %s, %s)' params = (item['title'], item['content'], item['url'], datetime.now()) try: self.cursor.execute(sql, params) self.conn.commit() except Exception as e: self.conn.rollback() spider.logger.error(f'数据库写入失败: {e}') return item

MongoDB Pipeline

class MongoDBPipeline: def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def process_item(self, item, spider): self.db.articles.update_one( {'url': item['url']}, {'$set': dict(item)}, upsert=True ) return item def close_spider(self, spider): self.client.close()

Redis Pipeline(去重+队列)

class RedisPipeline: def open_spider(self, spider): self.redis_client = redis.Redis( host=spider.settings.get('REDIS_HOST', 'localhost'), port=spider.settings.getint('REDIS_PORT', 6379) ) def process_item(self, item, spider): key = f'article:{item["url"]}' if self.redis_client.exists(key): raise DropItem(f'重复文章: {item["url"]}') self.redis_client.setex(key, 86400, '1') self.redis_client.lpush('articles:queue', json.dumps(dict(item))) return item

四、代理中间件实现

4.1 随机代理中间件

代理中间件是反爬对抗中最重要的组件之一。通过维护一个代理池,每次请求随机选择一个代理进行访问,可以有效降低被封IP的风险。

class RandomProxyMiddleware: def __init__(self, proxies): self.proxies = proxies @classmethod def from_crawler(cls, crawler): proxies = crawler.settings.getlist('PROXY_LIST') if not proxies: proxies = [ 'http://proxy1.example.com:8080', 'http://proxy2.example.com:8080', ] return cls(proxies) def process_request(self, request, spider): proxy = random.choice(self.proxies) request.meta['proxy'] = proxy spider.logger.debug(f'使用代理: {proxy}') def process_response(self, request, response, spider): if response.status in [403, 429, 503]: spider.logger.warning(f'代理被限制: {request.meta.get("proxy")}') new_request = request.copy() new_request.dont_filter = True return new_request return response

4.2 代理池集成

在实际生产环境中,代理池通常是一个独立的服务,提供 API 接口动态获取可用代理。可以在中间件中请求代理池 API,并按需获取新的代理:

class ProxyPoolMiddleware: def __init__(self): self.proxy_api_url = 'http://proxy-pool:5010/get/' self.valid_proxies = [] def fetch_proxy(self): try: resp = requests.get(self.proxy_api_url, timeout=3) if resp.status_code == 200: proxy = resp.json().get('proxy') return f'http://{proxy}' except Exception as e: print(f'获取代理失败: {e}') return None def process_request(self, request, spider): if 'proxy' not in request.meta or not self.test_proxy(request.meta['proxy']): proxy = self.fetch_proxy() if proxy: request.meta['proxy'] = proxy

4.3 代理验证与自动切换

代理的稳定性直接影响爬虫成功率。建议实现代理验证机制:

注意:免费代理通常不稳定,建议在关键爬虫中使用付费代理服务或自建代理池。同时,HTTPS代理比HTTP代理更安全,但成本也更高。

五、User-Agent中间件

5.1 随机UA切换

User-Agent是服务器识别客户端的重要标识。为了避免被单一UA特征识别,需要在每次请求时随机切换UA。Scrapy默认的 UserAgentMiddleware 只会使用配置中的单一UA,我们需要自定义中间件来替换它。

class RandomUserAgentMiddleware: def __init__(self): self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) ' 'Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ' 'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', ] def process_request(self, request, spider): ua = random.choice(self.user_agents) request.headers['User-Agent'] = ua

5.2 fake_useragent库集成

手动维护UA列表不够灵活,推荐使用 fake_useragent 库自动生成真实的UA字符串:

from fake_useragent import UserAgent class FakeUAMiddleware: def __init__(self): self.ua = UserAgent() self.ua_cache = {} self.cache_size = 50 def process_request(self, request, spider): # 按浏览器类型随机,确保多样性 browser = random.choice(['chrome', 'firefox', 'safari', 'edge']) ua = getattr(self.ua, browser) request.headers['User-Agent'] = ua

5.3 UA池管理的最佳实践

六、请求重试与错误处理

6.1 RetryMiddleware配置

Scrapy内置了 RetryMiddleware,通过以下配置控制重试行为:

# settings.py 配置 RETRY_ENABLED = True RETRY_TIMES = 3 RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429] RETRY_PRIORITY_ADJUST = -1

6.2 自定义重试条件

当内置重试策略不满足需求时,可以通过继承 RetryMiddleware 并重写方法来实现自定义逻辑:

from scrapy.downloadermiddlewares.retry import RetryMiddleware class CustomRetryMiddleware(RetryMiddleware): def __init__(self, settings): super().__init__(settings) self.max_retry_times = settings.getint('CUSTOM_RETRY_TIMES', 5) def process_response(self, request, response, spider): if response.status in self.retry_http_codes: reason = f'状态码 {response.status}' return self._retry(request, reason, spider) # 检查响应内容是否为空 if len(response.body) < 100: reason = '响应内容过短,可能被反爬' return self._retry(request, reason, spider) return response def process_exception(self, request, exception, spider): # 记录异常 spider.logger.error(f'请求失败: {request.url}, 异常: {exception}') if isinstance(exception, (TimeoutError, ConnectionError)): return self._retry(request, str(exception), spider) return None

6.3 错误日志记录

系统化的错误日志对排查问题至关重要。建议在中间件和Pipeline中统一记录:

6.4 请求失败回调(errback)

Scrapy的每个 Request 对象支持 errback 参数,当请求发生异常或重试耗尽后自动调用该回调函数。errback 接收一个 Failure 对象作为参数:

def parse(self, response): yield scrapy.Request( url='https://example.com/api/data', callback=self.parse_api, errback=self.handle_error, meta={'retry_times': 0} ) def handle_error(self, failure): request = failure.request self.logger.error(f'请求最终失败: {request.url}') self.logger.error(f'失败原因: {failure.value}') # 可以记录到数据库或发送告警 item = FailedItem() item['url'] = request.url item['error'] = str(failure.value) yield item

七、CrawlSpider与Rule

7.1 CrawlSpider概述

CrawlSpider是Scrapy中用于全站爬取的Spider子类。它通过定义一组Rule规则来自动发现和跟进链接,非常适合做整站抓取或大规模内容采集。与普通Spider需要手动编写所有 parse 逻辑不同,CrawlSpider通过Rules规则引擎将链接提取和回调处理解耦。

7.2 Rule规则定义

每个Rule包含三个核心参数:

from scrapy.spiders import CrawlSpider, Rule from scrapy.linkextractors import LinkExtractor class NewsSpider(CrawlSpider): name = 'news_spider' allowed_domains = ['example-news.com'] start_urls = ['https://example-news.com/'] rules = ( # 提取分类页链接,跟进但不解析 Rule(LinkExtractor(allow=r'/category/\w+/'), follow=True), # 提取文章详情页链接,由 parse_article 处理 Rule(LinkExtractor( allow=r'/article/\d+\.html', restrict_css='.article-list a' ), callback='parse_article'), # 提取分页链接,继续跟进 Rule(LinkExtractor( allow=r'/page/\d+/', restrict_xpaths='//div[@class="pagination"]/a' ), follow=True), ) def parse_article(self, response): yield { 'title': response.css('h1::text').get(), 'content': response.css('.article-body::text').getall(), 'url': response.url, 'publish_time': response.css('.time::text').get(), }

7.3 LinkExtractor高级用法

LinkExtractor提供了丰富的参数来精确控制链接提取:

7.4 适用场景

最佳实践:使用CrawlSpider时务必设置 allowed_domains 避免爬虫爬出站外。对于大型站点,配合 DEPTH_LIMIT 控制爬取深度,使用 CLOSESPIDER_PAGECOUNT 限制爬取页面总数,防止失控。

八、Scrapy扩展(Extensions)

8.1 扩展的生命周期

Scrapy扩展是一种在爬虫运行期间执行自定义功能的机制。扩展与中间件的关键区别在于:扩展不直接参与请求/响应/数据的处理流程,而是作为独立模块在后台运行,用于监控、统计和功能增强。扩展通过连接Scrapy的信号系统来实现其功能,生命周期包括初始化、启动、运行和关闭四个阶段。

8.2 自定义扩展实现

以下是一个统计爬虫运行状态的扩展示例,记录了请求数、成功数、失败数等关键指标:

from scrapy import signals class StatsMonitorExtension: def __init__(self, crawler): self.crawler = crawler self.start_time = None self.item_count = 0 @classmethod def from_crawler(cls, crawler): ext = cls(crawler) # 连接各个信号 crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped) crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped) crawler.signals.connect(ext.request_scheduled, signal=signals.request_scheduled) crawler.signals.connect(ext.request_dropped, signal=signals.request_dropped) return ext def spider_opened(self, spider): self.start_time = datetime.now() spider.logger.info(f'爬虫 {spider.name} 启动') def spider_closed(self, spider, reason): elapsed = datetime.now() - self.start_time spider.logger.info( f'爬虫 {spider.name} 结束,原因: {reason},' f'耗时: {elapsed},采集Item: {self.item_count}' ) def item_scraped(self, item, spider): self.item_count += 1 def item_dropped(self, item, spider, exception): spider.logger.warning(f'Item被丢弃: {item}, 原因: {exception}') def request_scheduled(self, request, spider): pass def request_dropped(self, request, spider): spider.logger.warning(f'请求被丢弃: {request.url}')

8.3 信号系统详解

Scrapy的信号系统是扩展的核心通信机制。以下是常用信号列表:

信号名称触发时机参数
engine_started引擎启动时
engine_stopped引擎停止时
spider_openedSpider打开时spider
spider_closedSpider关闭时spider, reason
request_scheduled请求被调度时request, spider
request_dropped请求被丢弃时request, spider
response_received响应被接收时response, request, spider
item_scrapedItem被爬取时item, response, spider
item_droppedItem被丢弃时item, spider, exception
bytes_received数据块被接收时data, request, spider

8.4 扩展的实用场景

# settings.py 中启用扩展 EXTENSIONS = { 'myproject.extensions.StatsMonitorExtension': 500, 'myproject.extensions.AlertExtension': 600, }

总结与最佳实践

核心要点:

1. Downloader中间件是反爬对抗的前沿阵地,负责请求修饰和响应预处理;Spider中间件是数据加工的后处理层,负责数据清洗和异常处理。

2. Item Pipeline的 from_crawler 类方法是获取配置的标准模式,open_spider/close_spider 是管理资源生命周期的关键钩子。

3. 代理中间件和UA中间件是Scrapy中最常用的自定义中间件,两者配合使用可大幅降低被封概率。

4. RetryMiddleware配合errback回调可以实现全面的请求失败处理和告警机制。

5. CrawlSpider的Rule规则引擎适用于全站抓取场景,合理使用restrict_xpaths/restrict_css可以精确控制抓取范围。

6. 扩展(Extensions)通过信号系统与爬虫核心交互,适合实现监控、统计、告警等横切关注点。

7. 生产环境中建议将中间件、Pipeline和扩展的优先级参数化配置在 settings.py 中,便于维护和调试。