表单自动填写与网页数据采集

Python 办公自动化专题 · 结合Selenium/Playwright实现表单自动化

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, 表单填写, 数据采集, Web自动化, 爬虫, Selenium, 数据导出, Python

一、表单自动化概述

表单自动填写与网页数据采集是Web自动化办公中最常见也最具实用价值的场景之一。随着企业数字化转型的深入,大量业务操作仍然依赖Web表单完成,从数据录入、批量注册到在线申报,手工重复填写已成为效率瓶颈。Web表单自动化的核心价值在于:将人工重复操作转化为脚本自动执行,大幅提升工作效率并降低人为出错率。

从技术框架选型来看,主流的浏览器自动化工具包括Selenium、Playwright和Puppeteer三大阵营。Selenium作为老牌框架,生态成熟、社区庞大,支持所有主流浏览器,适合传统企业级项目。Playwright是微软推出的后起之秀,API设计更现代,支持多浏览器、多语言,内置自动等待和网络拦截功能,在速度和稳定性上优于Selenium。Puppeteer仅支持Chromium内核,但在Node.js生态中表现优异。对于Python技术栈的办公自动化场景,Selenium和Playwright是最推荐的两个选择。

在实际应用中,必须充分重视安全与法律合规问题。表单自动化可能涉及用户隐私数据(如身份证号、手机号、银行账号),处理这些数据时需遵守《个人信息保护法》相关规定。自动化操作不应绕过网站的认证机制、验证码保护或访问控制,也不得用于恶意注册、刷票、刷单等违反平台规则的行为。合法的应用场景包括:企业内部系统的数据录入自动化、经授权的数据采集与分析、软件测试中的表单验证、个人重复性工作的脚本化等。建议在实施自动化前,详细阅读目标网站的robots.txt和服务条款,确保操作合规。

表单自动化的整体工作流通常包含五个阶段:目标表单分析(定位页面元素、理解交互逻辑)→ 数据准备(从Excel/CSV/数据库读取数据源)→ 自动化执行(逐字段填充、提交、处理异常)→ 结果验证(检查提交是否成功、提取返回信息)→ 后续处理(数据导出、日志记录、失败重试)。下面先从一个最简单的Selenium表单填写入门示例开始。

from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # 初始化浏览器驱动 driver = webdriver.Chrome() driver.get("https://example.com/form") try: # 等待表单加载完成 form = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.ID, "myForm")) ) # 填充文本输入框 name_input = driver.find_element(By.NAME, "username") name_input.send_keys("张三") # 提交表单 submit_btn = driver.find_element(By.CSS_SELECTOR, "button[type='submit']") submit_btn.click() # 验证提交结果 success_msg = WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CLASS_NAME, "success")) ) print(f"表单提交成功: {success_msg.text}") finally: driver.quit()
from playwright.sync_api import sync_playwright # 使用 Playwright 填写表单(API 更简洁) with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto("https://example.com/form") # 填充表单字段 page.fill("#name", "张三") page.select_option("#city", "北京") page.check("#agree") page.fill("#remark", "这是一条自动提交的测试数据") # 截取填写后的表单截图 page.screenshot(path="form_filled.png") # 点击提交并等待导航完成 with page.expect_navigation(): page.click("button[type='submit']") print("表单提交完成") browser.close()

二、表单字段填充

Web表单的字段类型多种多样,每种类型的填充方式各不相同。掌握各类字段的定位与操作技巧,是表单自动化的基本功。最常见的字段类型包括:文本输入框、下拉选择框、单选框/复选框、日期选择器、文件上传控件以及隐藏字段。下面逐一讲解每种类型的自动化操作方法。

文本输入框

文本输入框是最基本的表单元素,Selenium使用send_keys()方法输入内容,Playwright使用fill()方法。需要注意的是,输入前应先使用clear()清空已有内容,避免追加写入。对于富文本编辑器(如CKEditor、TinyMCE),通常需要先切换到iframe,然后操作编辑器的内容区域。对于密码输入框,处理方法与普通文本框相同,但要注意不要在日志中明文记录密码。

下拉选择框

下拉选择框分为原生Select元素和自定义下拉(通常由ul/li模拟)。原生Select可以使用Selenium的Select类进行操作,支持按可见文本、值或索引选择。自定义下拉则需要先点击展开下拉菜单,再点击目标选项,这需要更精细的等待和定位策略。联动下拉(如选择省份后动态加载城市列表)需要额外处理AJAX加载等待。

单选框与复选框

单选框(radio)和复选框(checkbox)是最常见的选项控件。操作方式为先定位到目标元素,然后调用click()方法。对于复选框,通常需要先检查其当前状态(is_selected()),只有在未选中时才执行点击,避免重复操作导致状态反转。单选框在同一组中只能选择一个,因此直接点击目标选项即可。

日期选择器与文件上传

日期选择器有两种常见实现:一种是原生input[type=date],可以直接用send_keys()输入日期字符串(格式为yyyy-MM-dd);另一种是JavaScript自定义的日期控件,需要逐一点击年份/月份/日期,操作较为复杂。文件上传控件通常通过input[type=file]实现,使用send_keys()传入本地文件路径即可。注意路径中的反斜杠需要转义,或使用正斜杠。对于隐藏的文件上传控件(opacity:0或display:none),需要先通过JavaScript使其可见再操作。

from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import Select from selenium.webdriver.common.keys import Keys import time driver = webdriver.Chrome() driver.get("https://example.com/complex-form") # 1. 文本输入框:先清空再填入 name = driver.find_element(By.ID, "fullname") name.clear() name.send_keys("李四") # 2. 原生下拉选择框:使用 Select 类 city_select = Select(driver.find_element(By.ID, "city")) city_select.select_by_visible_text("上海市") # 按可见文本选择 # city_select.select_by_value("shanghai") # 按值选择 # city_select.select_by_index(2) # 按索引选择 # 3. 复选框:判断状态后再操作 agree_checkbox = driver.find_element(By.NAME, "agreement") if not agree_checkbox.is_selected(): agree_checkbox.click() # 4. 单选框:直接点击目标选项 gender_radio = driver.find_element(By.CSS_SELECTOR, "input[name='gender'][value='male']") gender_radio.click() # 5. 日期输入 date_input = driver.find_element(By.NAME, "birthday") date_input.clear() date_input.send_keys("1990-01-15") # 6. 文件上传 file_input = driver.find_element(By.NAME, "attachment") file_input.send_keys(r"C:\Users\Documents\resume.pdf") time.sleep(2) driver.quit()
from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto("https://example.com/complex-form") # 1. 文本框填充 page.fill("#fullname", "李四") # 2. 原生下拉选择 page.select_option("#city", label="上海市") # 3. 复选框操作 page.check("input[name='agreement']") # 4. 单选框操作 page.check("input[name='gender'][value='male']") # 5. 日期输入 page.fill("input[name='birthday']", "1990-01-15") # 6. 文件上传 page.set_input_files("input[name='attachment']", "C:/Users/Documents/resume.pdf") # 7. 自定义下拉框处理(非Select元素) page.click("#custom-dropdown") page.click("#custom-dropdown .option-item:has-text('选项三')") print("所有字段填充完成") browser.close()

三、数据源驱动

表单自动化的核心价值在于批量处理,而批量处理离不开数据源驱动。实际业务场景中,需要填写的表单数据通常存储在Excel表格、CSV文件或JSON文档中。数据源驱动模式的核心思想是:将数据与逻辑分离,通过读取外部数据源逐行驱动表单填写流程,从而实现大批量数据的自动化录入。这不仅是效率的提升,更是数据准确性的保障。

Excel是最常见的数据源格式,Python的openpyxl库可以方便地读取.xlsx文件,pandas库则提供了更强大的数据处理能力。CSV文件格式简单、兼容性好,使用Python内置的csv模块即可轻松读写。JSON格式适合结构化复杂数据(如嵌套字段、多层级关系),常用于API接口的数据传递。选择哪种数据源取决于具体业务场景:Excel最适合非技术人员维护的数据,CSV适合数据量较大且格式简单的场景,JSON适合具有层级结构的数据。

在实现数据源驱动时,需要建立数据字段与表单字段之间的映射关系。通常的做法是:Excel的列标题对应表单字段的name或id属性,程序读取每一行数据后,根据映射关系将数据填入对应的表单字段。数据验证是批量处理中的关键环节,需要在写入前检查数据的合法性(如必填项不为空、邮箱格式正确、手机号位数正确等)。对于不合法的数据,应跳过当前记录并记录错误日志,而不是终止整个流程。下面给出一个完整的实战示例。

import csv import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import Select def read_csv_data(filepath): """读取 CSV 数据源,返回字典列表""" records = [] with open(filepath, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: records.append(row) return records def fill_single_record(driver, record): """将单条记录填入表单""" try: driver.find_element(By.NAME, "name").clear() driver.find_element(By.NAME, "name").send_keys(record["姓名"]) driver.find_element(By.NAME, "phone").clear() driver.find_element(By.NAME, "phone").send_keys(record["手机号"]) Select(driver.find_element(By.NAME, "city")).select_by_visible_text(record["城市"]) email_input = driver.find_element(By.NAME, "email") email_input.clear() email_input.send_keys(record["邮箱"]) # 提交表单 driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() time.sleep(1) # 等待提交完成 return True, "" except Exception as e: return False, str(e) # 主流程 driver = webdriver.Chrome() records = read_csv_data("registrations.csv") success_count = 0 fail_count = 0 for idx, record in enumerate(records, 1): print(f"正在处理第 {idx} 条记录: {record['姓名']}") driver.get("https://example.com/register") # 数据验证 if not record["手机号"] or len(record["手机号"]) != 11: print(f" 跳过: 手机号格式不正确") fail_count += 1 continue success, error = fill_single_record(driver, record) if success: success_count += 1 print(f" 成功") else: fail_count += 1 print(f" 失败: {error}") print(f"\n批量处理完成: 成功 {success_count} 条, 失败 {fail_count} 条") driver.quit()
import pandas as pd from playwright.sync_api import sync_playwright # 使用 pandas 读取 Excel 数据源 df = pd.read_excel("products.xlsx", sheet_name="上架商品") success_ids = [] with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() for idx, row in df.iterrows(): page.goto("https://example.com/product/add") page.wait_for_load_state("networkidle") # 字段映射:Excel 列名 -> 表单字段 name page.fill("input[name='product_name']", str(row["商品名称"])) page.fill("input[name='price']", str(row["价格"])) page.fill("input[name='stock']", str(row["库存"])) category = str(row["分类"]) page.select_option("select[name='category']", label=category) description = str(row["描述"]) if pd.notna(row["描述"]) else "" page.fill("textarea[name='description']", description) # 提交并验证 with page.expect_navigation(timeout=10000) as nav: page.click("button[type='submit']") if page.url.__contains__("success"): success_ids.append(row.get("商品ID", idx)) print(f"[成功] 商品 {row['商品名称']} 已上架") else: print(f"[失败] 商品 {row['商品名称']} 上架失败") browser.close() print(f"批量上架完成,成功 {len(success_ids)} 个商品")

四、动态/复杂表单

现代Web应用广泛使用AJAX、异步加载和动态交互技术,使得表单自动化处理的复杂度大幅提升。动态表单的核心挑战在于:页面元素不是一次性加载完成的,而是根据用户操作逐步生成。这就需要自动化脚本具备智能等待能力,既要等待元素出现,也要等待数据加载完成,还要处理各种异步交互场景。

AJAX异步加载是最常见的动态表单场景。例如,选择省份后,城市列表通过AJAX请求动态获取,选择城市后再加载区县列表,形成三级联动。处理这种场景的关键在于:每完成一步操作后,必须显式等待下一步的元素出现,而不是使用固定的time.sleep()。WebDriverWait结合expected_conditions是处理异步加载的最佳实践,Playwright内置的auto-waiting机制则更加智能,会在执行操作前自动等待元素稳定。

分步表单(Step Form / Wizard)将长表单拆分为多个步骤,每步完成后点击"下一步"按钮切换到下一步骤。自动化处理时需要跟踪当前步骤,确保在正确的步骤填写对应的字段。弹窗表单(Modal/Dialog)在页面加载完成后通过点击按钮弹出,此时需要显式等待弹窗可见后再操作表单元素。拖拽排序(Drag and Drop)涉及鼠标操作,Selenium的ActionChains和Playwright的drag_to方法都能实现。富文本编辑器通常嵌入在iframe中,需要先切换到iframe上下文,再操作编辑器的主体内容。下面给出处理这些复杂场景的代码示例。

from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains driver = webdriver.Chrome() driver.get("https://example.com/wizard-form") wait = WebDriverWait(driver, 10) # === 第一步:基本信息 === wait.until(EC.presence_of_element_located((By.ID, "step1"))).is_displayed() driver.find_element(By.NAME, "name").send_keys("王五") driver.find_element(By.NAME, "email").send_keys("wangwu@example.com") driver.find_element(By.CSS_SELECTOR, "#step1 .next-btn").click() # === 第二步:地址选择(联动下拉) === wait.until(EC.visibility_of_element_located((By.ID, "step2"))) # 选择省份(触发AJAX加载城市) Select(driver.find_element(By.ID, "province")).select_by_visible_text("广东省") # 等待城市下拉框的选项加载完成 wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#city option:not([value=''])")), "城市选项加载超时" ) Select(driver.find_element(By.ID, "city")).select_by_visible_text("深圳市") driver.find_element(By.CSS_SELECTOR, "#step2 .next-btn").click() # === 第三步:弹窗表单(富文本编辑器) === # 等待弹窗出现 modal = wait.until(EC.visibility_of_element_located((By.ID, "editorModal"))) # 切换到富文本编辑器 iframe iframe = driver.find_element(By.CSS_SELECTOR, "iframe.cke_wysiwyg_frame") driver.switch_to.frame(iframe) editor_body = driver.find_element(By.TAG_NAME, "body") editor_body.send_keys("这是通过自动化输入的富文本内容") driver.switch_to.default_content() # 切回主文档 # 点击弹窗中的确认按钮 driver.find_element(By.CSS_SELECTOR, "#editorModal .confirm-btn").click() # === 第四步:拖拽排序 === wait.until(EC.visibility_of_element_located((By.ID, "step4"))) source = driver.find_element(By.CSS_SELECTOR, ".sort-item[data-id='3']") target = driver.find_element(By.CSS_SELECTOR, ".sort-item[data-id='1']") ActionChains(driver).drag_and_drop(source, target).perform() # 最终提交 driver.find_element(By.ID, "finalSubmit").click() print("复杂表单提交完成") driver.quit()
from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto("https://example.com/dynamic-form") # 处理 AJAX 联动下拉 page.select_option("#province", label="广东省") # Playwright 自动等待城市选项加载完成 page.wait_for_selector("#city option:not([value=''])") page.select_option("#city", label="广州市") # 处理弹窗表单 page.click("#openModalBtn") modal = page.wait_for_selector(".modal:visible") # 处理 iframe 中的富文本编辑器 frame = page.frame_locator("iframe.cke_wysiwyg_frame") frame.locator("body").fill("自动化输入的富文本内容") # 关闭弹窗 page.click(".modal .confirm-btn") # 处理 AJAX 异步提交的表单 # 使用 expect_response 等待 AJAX 请求完成 with page.expect_response(lambda resp: "/api/submit" in resp.url) as resp_info: page.click("#ajaxSubmit") response = resp_info.value result = response.json() if result.get("success"): print(f"AJAX 表单提交成功, ID: {result.get('id')}") else: print(f"提交失败: {result.get('message')}") browser.close()

五、验证码处理

验证码(CAPTCHA)是网站防止自动化操作的第一道防线,也是表单自动化中最棘手的障碍之一。验证码的类型多种多样,从简单的数字字母图片验证码,到行为验证码(滑块拼图、点选文字)、无感验证码(reCAPTCHA v3),自动化处理策略也各不相同。需要明确的是,任何绕过验证码的行为都应当遵守法律法规和平台规则,仅限用于合法的测试和授权场景。

对于传统的图片验证码,可以使用OCR(光学字符识别)技术进行识别。Python的pytesseract库(基于Tesseract引擎)是最常用的解决方案,但对于干扰严重的验证码识别率较低。更专业的方案是使用深度学习模型(如CNN)进行定制化训练,适用于验证码风格固定的内部系统。对于滑块验证码,核心策略是模拟人类拖拽行为:计算目标位置偏移量,生成符合贝塞尔曲线或加速度规律的拖拽轨迹,控制拖拽速度(先快后慢)。第三方打码服务(如打码平台)提供了付费的验证码识别API,识别率高且支持多种验证码类型,适合企业级应用。

在实际项目中,更务实的策略是多层次组合:第一层尝试本地OCR识别,失败后切换第三方服务,再失败则人工介入。人工介入可以通过消息通知(钉钉/微信)将验证码图片推送给运维人员,由人工识别后输入结果,脚本继续执行。这种方式虽然降低了完全自动化的程度,但保证了流程的可靠性。此外,一些合规的绕过策略包括:使用已登录的Cookie/Session绕过验证页面、控制请求频率避免触发验证码机制、使用IP白名单访问内部系统等。

import pytesseract from PIL import Image import requests from io import BytesIO from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time # 配置 Tesseract 路径 pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" driver = webdriver.Chrome() driver.get("https://example.com/login") # 1. 获取验证码图片并识别 captcha_img = driver.find_element(By.ID, "captcha-img") img_src = captcha_img.get_attribute("src") # 下载图片到内存 response = requests.get(img_src) img = Image.open(BytesIO(response.content)) # 图片预处理:灰度化 + 二值化 + 去噪点 img = img.convert("L") # 灰度 threshold = 140 img = img.point(lambda x: 255 if x > threshold else 0) # 二值化 img.save("captcha_processed.png") # 保存以便调试 # OCR 识别 captcha_text = pytesseract.image_to_string(img, config="--psm 7").strip() print(f"验证码识别结果: {captcha_text}") # 2. 输入验证码 driver.find_element(By.NAME, "captcha").send_keys(captcha_text) driver.find_element(By.NAME, "username").send_keys("admin") driver.find_element(By.NAME, "password").send_keys("password123") driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() # 3. 检查是否识别失败(页面包含验证码错误提示) time.sleep(2) if "验证码错误" in driver.page_source: # 刷新验证码并重试 driver.find_element(By.ID, "refresh-captcha").click() time.sleep(1) # 切换到第三方打码服务... print("本地OCR识别失败,需要切换为第三方服务或者人工介入") driver.quit()
# 滑块验证码处理示例 from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver import ActionChains import random import time def generate_track(distance): """ 生成模拟人类拖拽的轨迹(先快后慢) distance: 需要拖拽的总距离(像素) 返回: 轨迹点列表 [(x_offset, y_offset, sleep_time), ...] """ track = [] current = 0 mid = distance * 0.7 # 加速阶段占70% while current < distance: if current < mid: # 加速阶段 move = random.randint(3, 8) else: # 减速阶段 + 小幅回拉修正 move = random.randint(1, 3) current += move if current > distance: move -= (current - distance) current = distance # 模拟人手抖动的Y轴偏移 y_offset = random.randint(-2, 2) track.append((move, y_offset, random.uniform(0.01, 0.03))) # 最后添加一个微小回拉(模拟人手过冲后的修正) track.append((-random.randint(1, 3), 0, 0.05)) return track driver = webdriver.Chrome() driver.get("https://example.com/slider-captcha") # 定位滑块元素 slider = driver.find_element(By.CSS_SELECTOR, ".slider-btn") # 获取滑块需要滑动的距离(需根据实际页面计算) # 通常滑动距离 = 缺口位置 - 滑块初始位置 slide_distance = 258 # 示例距离,实际需动态计算 # 执行拖拽 action = ActionChains(driver) action.click_and_hold(slider).perform() track = generate_track(slide_distance) for x_offset, y_offset, sleep_time in track: action.move_by_offset(xoffset=x_offset, yoffset=y_offset).perform() time.sleep(sleep_time) # 释放鼠标 action.release().perform() time.sleep(2) # 验证是否通过 if "通过" in driver.page_source: print("滑块验证通过") else: print("滑块验证失败,可能需要重试")

六、分页数据采集

表单提交后的数据采集通常涉及分页处理。无论是从表格列表中提取数据,还是在搜索结果中获取信息,分页都是必不可少的环节。不同的Web应用采用不同的分页机制,自动化脚本需要针对不同的分页类型采用对应的处理策略。常见分页类型包括:URL参数分页、滚动加载(无限滚动)、点击"加载更多"按钮、AJAX异步分页等。

URL参数分页是最传统的分页方式,URL中通常包含page=1、offset=20、pageIndex=0等参数,通过修改URL参数即可切换页面。这种分页方式最容易处理,直接循环修改URL参数并发起请求即可。滚动加载(无限滚动)常见于社交媒体和资讯类网站,页面会随着滚动条向下滚动而自动加载新内容。处理无限滚动需要模拟页面滚动操作,并持续检测页面高度的变化,直到满足终止条件(如达到最大页数或页面高度不再变化)。

AJAX分页是现代Web应用最常用的分页方式,点击页码按钮后通过AJAX请求获取数据并更新页面内容。这种方式的处理要点是:点击页码按钮后,等待AJAX请求完成(监听network idle状态或等待数据容器元素更新),然后再提取数据。使用Playwright的expect_response或Selenium的WebDriverWait都可以有效处理这种场景。对于点击"加载更多"按钮类型的分页,只需循环检测按钮是否存在,存在则点击并等待新内容加载,不存在则说明已经加载完全部数据。下面给出各种分页类型的代码示例。

from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time import csv driver = webdriver.Chrome() base_url = "https://example.com/products?page={}" all_data = [] for page_num in range(1, 11): # 采集前10页 url = base_url.format(page_num) driver.get(url) print(f"正在采集第 {page_num} 页...") # 等待数据行加载 rows = WebDriverWait(driver, 10).until( EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".product-row")) ) for row in rows: data = { "name": row.find_element(By.CSS_SELECTOR, ".product-name").text, "price": row.find_element(By.CSS_SELECTOR, ".product-price").text, "stock": row.find_element(By.CSS_SELECTOR, ".product-stock").text, } all_data.append(data) # 控制采集频率,避免对服务器造成压力 time.sleep(1.5) # 保存为 CSV 文件 with open("products_data.csv", "w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=["name", "price", "stock"]) writer.writeheader() writer.writerows(all_data) print(f"数据采集完成,共采集 {len(all_data)} 条记录") driver.quit()
# 处理无限滚动加载的示例 from selenium import webdriver from selenium.webdriver.common.by import By import time driver = webdriver.Chrome() driver.get("https://example.com/infinite-scroll") page_num = 0 max_pages = 20 last_height = 0 data = [] while page_num < max_pages: # 获取当前可见的所有数据项 items = driver.find_elements(By.CSS_SELECTOR, ".data-item") print(f"第 {page_num + 1} 轮: 已加载 {len(items)} 条数据") # 提取当前页数据 for item in items: data.append({ "title": item.find_element(By.CSS_SELECTOR, ".item-title").text, "content": item.find_element(By.CSS_SELECTOR, ".item-content").text, }) # 模拟向下滚动到页面底部,触发加载更多 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) # 等待 AJAX 加载 # 检查页面高度是否变化(判断是否还有更多内容) new_height = driver.execute_script("return document.body.scrollHeight") if new_height == last_height: print("页面高度不再变化,数据已全部加载完毕") break last_height = new_height page_num += 1 print(f"最终采集到 {len(data)} 条数据") # 去重(滚动加载可能导致重复) unique_data = [] seen_titles = set() for item in data: if item["title"] not in seen_titles: seen_titles.add(item["title"]) unique_data.append(item) print(f"去重后:{len(unique_data)} 条") driver.quit()
# 使用 Playwright 处理 "点击加载更多" 分页 from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(headless=False) page = browser.new_page() page.goto("https://example.com/load-more") all_items = [] click_count = 0 max_clicks = 30 while click_count < max_clicks: # 提取当前所有可见项目 items = page.locator(".item-card").all() for item in items: all_items.append({ "title": item.locator(".item-title").inner_text(), "link": item.locator("a").get_attribute("href"), }) # 检查"加载更多"按钮是否还存在 load_more = page.locator("#loadMoreBtn") if not load_more.is_visible(): print("加载更多按钮已隐藏或消失,数据加载完毕") break load_more.click() # 等待新内容加载完成(等待新元素出现) page.wait_for_load_state("networkidle") click_count += 1 print(f"已点击 {click_count} 次,累计 {len(all_items)} 条项目") print(f"最终采集到 {len(all_items)} 条数据") browser.close()

七、数据清洗与导出

从Web页面采集的原始数据通常存在各种质量问题:重复记录、格式不统一、缺失字段、编码错误、混杂HTML标签等。数据清洗是将原始数据转化为结构化、标准化信息的关键步骤,其重要性不亚于数据采集本身。高质量的数据清洗能够为后续的数据分析和业务决策提供可靠基础。

数据去重是清洗的第一步。同一数据可能在多次采集中重复出现,需要通过唯一标识字段(如商品ID、文章URL)或字段组合(如标题+发布时间)进行去重。Python的pandas库提供了drop_duplicates()方法,可以方便地基于指定列进行去重。格式标准化包括:日期格式统一为yyyy-MM-dd、货币金额统一为数字类型、电话号码统一为连续数字、文本去除多余空格和换行等。字段映射转换用于将不同来源的数据映射到统一的字段结构,例如将"联系电话"和"手机号码"统一映射为"phone"字段。

数据导出是将清洗后的数据持久化存储的过程。导出格式的选择取决于数据的使用方:Excel(.xlsx)适合业务人员直接查看和编辑,CSV适合数据量较大且需要跨系统交换的场景,数据库(SQLite/MySQL/MongoDB)适合需要频繁查询和关联分析的大数据集,JSON适合作为API接口的数据输出格式。对于大数据量(超过10万条),建议使用数据库存储并按批次导入,避免内存溢出。同时建议为每次采集任务添加时间戳和来源标记,便于数据追踪和增量更新。

import pandas as pd import re # 读取原始采集数据 raw_data = pd.read_csv("raw_products.csv") print(f"原始数据量: {len(raw_data)} 条") # === 1. 数据去重 === # 基于商品名称和价格组合去重,保留第一次出现的记录 deduped = raw_data.drop_duplicates(subset=["name", "price"], keep="first") print(f"去重后: {len(deduped)} 条(去除 {len(raw_data) - len(deduped)} 条重复)") # === 2. 格式标准化 === def clean_price(price_str): """清洗价格:去掉货币符号和逗号,转为浮点数""" if pd.isna(price_str): return 0.0 cleaned = re.sub(r"[¥¥$,元\s]", "", str(price_str)) try: return float(cleaned) except ValueError: return 0.0 def clean_phone(phone): """清洗手机号:去除非数字字符""" if pd.isna(phone): return "" digits = re.sub(r"\D", "", str(phone)) return digits if len(digits) == 11 else "" # 应用清洗函数 deduped["price_clean"] = deduped["price"].apply(clean_price) deduped["phone_clean"] = deduped["phone"].apply(clean_phone) # 删除价格异常的数据(价格小于0.01或大于1000000) deduped = deduped[(deduped["price_clean"] >= 0.01) & (deduped["price_clean"] <= 1000000)] # === 3. 字段映射 === # 将采集数据的列名映射为统一标准 column_mapping = { "product_name": "name", "goods_name": "name", "selling_price": "price_clean", "contact_phone": "phone_clean", } deduped.rename(columns=column_mapping, inplace=True) # === 4. 缺失值处理 === deduped.fillna({"description": "", "category": "未分类"}, inplace=True) # === 5. 导出为 Excel === with pd.ExcelWriter("cleaned_products.xlsx", engine="openpyxl") as writer: deduped.to_excel(writer, sheet_name="商品数据", index=False) # 添加数据摘要工作表 summary = pd.DataFrame({ "指标": ["原始数据量", "去重后", "清洗后", "字段数"], "值": [len(raw_data), len(deduped), len(deduped), len(deduped.columns)] }) summary.to_excel(writer, sheet_name="数据摘要", index=False) print(f"清洗完成,导出 {len(deduped)} 条记录到 cleaned_products.xlsx")
import sqlite3 import json # 将清洗后的数据存入 SQLite 数据库 def export_to_database(data_list, db_path="collected_data.db"): """将数据列表导出到 SQLite 数据库""" conn = sqlite3.connect(db_path) cursor = conn.cursor() # 创建表(如果不存在) cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, price REAL, category TEXT, description TEXT, source_url TEXT, collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 批量插入数据 inserted = 0 for item in data_list: try: cursor.execute(""" INSERT INTO products (name, price, category, description, source_url) VALUES (?, ?, ?, ?, ?) """, ( item.get("name"), item.get("price", 0.0), item.get("category", "未分类"), item.get("description", ""), item.get("source_url", "") )) inserted += 1 except Exception as e: print(f"插入失败: {item.get('name')} - {e}") conn.commit() conn.close() print(f"成功导入 {inserted}/{len(data_list)} 条数据到 SQLite") # 同时导出 JSON 格式(供 API 使用) def export_to_json(data_list, filepath="products_export.json"): """导出为 JSON 格式""" with open(filepath, "w", encoding="utf-8") as f: json.dump(data_list, f, ensure_ascii=False, indent=2) print(f"已导出 {len(data_list)} 条数据到 {filepath}") # 使用示例 products = [ {"name": "商品A", "price": 99.9, "category": "电子产品", "source_url": "https://example.com/a"}, {"name": "商品B", "price": 199.0, "category": "家居用品", "source_url": "https://example.com/b"}, ] export_to_database(products) export_to_json(products)

八、反爬策略应对

在Web数据采集中,目标网站通常会采取各种反爬措施来保护自身数据和服务资源。理解并合理应对这些反爬策略,是保障自动化流程稳定运行的必要能力。但需要强调的是,应对反爬策略必须在合法合规的前提下进行,尊重目标网站的服务条款,不得用于恶意爬取或破坏网站正常运营。

最常见的反爬机制包括:IP频率限制(同一IP单位时间内访问次数过多被封禁)、User-Agent检测(非浏览器UA被拒绝)、Cookie/Session验证(无有效会话被拦截)、浏览器指纹检测(识别自动化工具特征)、JavaScript渲染验证(需要执行JS才能获取真实内容)、验证码拦截(异常行为触发验证码)等。有效的应对策略是多层次的:首先模拟真实用户的浏览行为,其次通过代理池分散请求来源,最后合理控制请求频率和节奏。

IP代理池是应对频率限制最有效的手段。可以从付费代理服务商购买高质量代理IP,或自建代理池。使用时需要注意代理的可用性验证、自动剔除失效代理、按代理质量分级等。User-Agent轮换可以伪装成不同浏览器和操作系统,避免因UA特征过于单一而被识别。请求间隔控制(添加随机延迟)是模拟人类行为的基础手段,延迟时间应该在一个范围内随机分布,而不是固定值。在更高级的场景中,可以使用指纹浏览器(如Undetected Chromedriver、Playwright Stealth)来规避基于浏览器特征的检测。下面给出综合使用多种反爬策略的示例。

import random import time import requests from fake_useragent import UserAgent from itertools import cycle class SmartCrawler: """ 智能爬虫:集成代理池、UA轮换、请求间隔控制 """ def __init__(self, proxy_list=None): self.ua = UserAgent() self.proxies = cycle(proxy_list) if proxy_list else None self.session = requests.Session() self.last_request_time = 0 def get_headers(self): """生成随机请求头,模拟不同浏览器""" headers = { "User-Agent": self.ua.random, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Referer": random.choice([ "https://www.google.com/", "https://www.baidu.com/", "https://www.bing.com/", ]), } return headers def get_proxy(self): """从代理池中获取一个代理""" if self.proxies: return {"http": next(self.proxies), "https": next(self.proxies)} return None def smart_sleep(self): """智能休眠:模拟人类浏览行为的时间间隔""" base_sleep = random.uniform(1.5, 3.5) # 偶尔添加长间隔,模拟阅读行为 if random.random() < 0.1: # 10%的概率 base_sleep += random.uniform(5, 10) time.sleep(base_sleep) def fetch(self, url, max_retries=3): """带重试机制的请求方法""" for attempt in range(max_retries): self.smart_sleep() try: response = self.session.get( url, headers=self.get_headers(), proxies=self.get_proxy(), timeout=15, ) if response.status_code == 200: return response.text elif response.status_code == 429: # 请求过于频繁,加倍等待 wait_time = 30 * (attempt + 1) print(f"触发频率限制,等待 {wait_time} 秒后重试...") time.sleep(wait_time) elif response.status_code == 403: print(f"被禁止访问 {url},更换代理...") if self.proxies: self.smart_sleep() else: print(f"HTTP {response.status_code}: {url}") except requests.RequestException as e: print(f"请求异常: {e}, 重试 {attempt + 1}/{max_retries}") time.sleep(5) return None def close(self): self.session.close() # 使用示例 crawler = SmartCrawler(proxy_list=[ "http://proxy1.example.com:8080", "http://proxy2.example.com:8080", ]) content = crawler.fetch("https://example.com/data") if content: print(f"成功获取页面,大小: {len(content)} 字符") crawler.close()
# 使用 undetected_chromedriver 规避浏览器指纹检测 import undetected_chromedriver as uc from selenium.webdriver.common.by import By import random import time def create_stealth_driver(): """创建规避检测的 Chrome 浏览器实例""" options = uc.ChromeOptions() # 设置窗口大小(模拟真实显示器分辨率) width = random.choice([1366, 1440, 1536, 1920]) height = random.choice([768, 900, 864, 1080]) options.add_argument(f"--window-size={width},{height}") # 禁用自动化特征标志 options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--disable-automation") # 设置语言和时区 options.add_argument("--lang=zh-CN") # 初始化驱动(自动处理 chromedriver 版本) driver = uc.Chrome(options=options) # 通过 JS 进一步修改浏览器指纹 driver.execute_script(""" // 覆盖 navigator.webdriver 属性 Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // 修改 chrome 对象 window.chrome = { runtime: {} }; // 覆盖 permissions API const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); """) return driver # 使用隐身浏览器采集数据 driver = create_stealth_driver() driver.get("https://example.com/protected-data") # 模拟真实用户行为:先缓慢滚动 for i in range(5): driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i / 5});") time.sleep(random.uniform(0.5, 1.5)) # 模拟鼠标移动(通过执行JS) driver.execute_script(""" var event = new MouseEvent('mousemove', { view: window, bubbles: true, cancelable: true, clientX: Math.random() * window.innerWidth, clientY: Math.random() * window.innerHeight }); document.dispatchEvent(event); """) # 提取数据 data = driver.find_element(By.ID, "protected-content").text print(f"成功获取受保护内容,长度: {len(data)} 字符") driver.quit()

九、实战案例

理论和方法最终要落实到实际业务场景中。下面通过三个完整的实战案例,展示表单自动填写与网页数据采集在不同业务领域的应用。每个案例都包含了需求分析、技术选型、实现要点和完整代码,读者可以根据自身需求进行适配和扩展。

案例一:电商商品自动上架

某电商运营团队每天需要将数百个商品信息录入到商城后台。传统手工操作效率低下且容易出错。通过Selenium实现自动化上架:从Excel读取商品信息(名称、价格、库存、分类、描述、图片路径等),自动登录后台管理系统,逐条创建商品并上传图片。在实现中需要注意:图片上传需要较长时间等待,每上架一个商品后要验证是否成功(检查页面是否出现成功提示或跳转到商品详情页),失败的商品要记录原因以便人工复核。

""" 实战案例一:电商商品自动上架 数据来源: Excel 文件 products.xlsx 目标系统: 电商后台管理系统 """ import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC import time import os def auto_list_product(driver, product): """自动上架单个商品""" driver.get("https://shop.example.com/admin/product/add") wait = WebDriverWait(driver, 10) # 填写基本信息 wait.until(EC.presence_of_element_located((By.NAME, "name"))) driver.find_element(By.NAME, "name").send_keys(product["商品名称"]) driver.find_element(By.NAME, "price").send_keys(str(product["价格"])) driver.find_element(By.NAME, "stock").send_keys(str(product["库存"])) Select(driver.find_element(By.NAME, "category_id")).select_by_visible_text(product["分类"]) # 上传商品图片 image_path = product.get("图片路径", "") if image_path and os.path.exists(image_path): driver.find_element(By.NAME, "image").send_keys(os.path.abspath(image_path)) # 等待图片上传完成(根据上传进度条判断) time.sleep(3) # 填写详细描述(富文本编辑器) iframe = driver.find_element(By.CSS_SELECTOR, "iframe.rich-editor") driver.switch_to.frame(iframe) driver.find_element(By.TAG_NAME, "body").send_keys(product.get("描述", "")) driver.switch_to.default_content() # 提交表单 driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() # 验证上架结果 time.sleep(2) if "添加成功" in driver.page_source or "success" in driver.current_url: return True, "上架成功" else: return False, "提交后未检测到成功提示" # 主程序 driver = webdriver.Chrome() # 先登录 driver.get("https://shop.example.com/admin/login") driver.find_element(By.NAME, "username").send_keys("admin") driver.find_element(By.NAME, "password").send_keys("password123") driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click() time.sleep(2) # 读取商品数据 products = pd.read_excel("products.xlsx") results = [] for idx, product in products.iterrows(): print(f"正在上架第 {idx + 1}/{len(products)} 个商品: {product['商品名称']}") success, msg = auto_list_product(driver, product) results.append({"商品": product["商品名称"], "结果": "成功" if success else "失败", "备注": msg}) print(f" {'✓' if success else '✗'} {msg}") time.sleep(1) # 输出结果汇总 result_df = pd.DataFrame(results) result_df.to_excel("上架结果.xlsx", index=False) print(f"\n上架完成: 成功 {result_df[result_df['结果']=='成功'].shape[0]} 个, 失败 {result_df[result_df['结果']=='失败'].shape[0]} 个") driver.quit()

案例二:问卷调查自动填写

某市场调研公司需要定期进行问卷测试和数据收集,验证问卷系统的逻辑正确性。通过自动化脚本模拟不同用户填写问卷,可以快速发现问卷流程中的逻辑缺陷和兼容性问题。该方案使用Playwright实现,支持单选、多选、量表题、矩阵题、排序题等多种题型。核心机制是:根据题型类型采用不同的填充策略(单选题随机选择一个选项、多选题随机选择2-3个选项、量表题按指定规则打分),每份问卷填写完成后提交并截图保存。

案例三:招标信息自动采集

某企业需要实时监控招标网站上与其业务相关的招标公告。每天定时采集指定关键词的招标信息,提取标题、发布时间、采购人、预算金额、截止日期等关键字段,并通过邮件或企业微信推送给业务团队。该案例综合运用了本章所学的多项技术:登录认证处理、分页数据采集、HTML解析与信息提取、数据清洗与结构化、多通道消息推送。为了确保信息的及时性,使用定时任务(Windows Task Scheduler或Linux Crontab)每日定时执行采集任务。

""" 实战案例三:招标信息自动采集与推送 综合应用:登录 + 分页采集 + 数据清洗 + 消息推送 """ import requests from bs4 import BeautifulSoup import json import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import time import random class BidCollector: """招标信息采集器""" def __init__(self): self.session = requests.Session() self.keywords = ["信息化", "软件开发", "系统集成"] self.collected = [] def login(self, username, password): """登录招标网站""" login_url = "https://bid.example.com/login" data = {"username": username, "password": password, "remember": "1"} resp = self.session.post(login_url, data=data) return "登录成功" in resp.text def search_bids(self, keyword, pages=3): """搜索指定关键词的招标信息""" for page in range(1, pages + 1): url = f"https://bid.example.com/search?keyword={keyword}&page={page}" resp = self.session.get(url) soup = BeautifulSoup(resp.text, "html.parser") items = soup.select(".bid-item") for item in items: bid = { "title": item.select_one(".bid-title a").text.strip(), "url": item.select_one(".bid-title a")["href"], "publisher": item.select_one(".publisher").text.strip() if item.select_one(".publisher") else "", "budget": self._extract_budget(item), "deadline": item.select_one(".deadline").text.strip() if item.select_one(".deadline") else "", "collected_at": datetime.now().strftime("%Y-%m-%d %H:%M"), } self.collected.append(bid) time.sleep(random.uniform(1, 3)) def _extract_budget(self, item): """提取预算金额""" budget_elem = item.select_one(".budget") if budget_elem: text = budget_elem.text.strip() import re nums = re.findall(r"[\d,.]+", text) return nums[0] if nums else "" return "" def push_to_wecom(self, webhook_url): """通过企业微信机器人推送""" msg = "【招标信息日报】\n\n" for bid in self.collected[:10]: # 推送前10条 msg += f"● {bid['title']}\n预算: {bid.get('budget', '未知')} | 截止: {bid.get('deadline', '未知')}\n{bid['url']}\n\n" payload = {"msgtype": "text", "text": {"content": msg}} self.session.post(webhook_url, json=payload) def export(self, filepath="bids.json"): """导出采集结果""" with open(filepath, "w", encoding="utf-8") as f: json.dump(self.collected, f, ensure_ascii=False, indent=2) print(f"已导出 {len(self.collected)} 条招标信息到 {filepath}") # 执行采集 collector = BidCollector() if collector.login("user", "pass"): for kw in collector.keywords: print(f"正在采集关键词: {kw}") collector.search_bids(kw, pages=2) collector.export() collector.push_to_wecom("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx") print(f"采集完成,共 {len(collector.collected)} 条记录") else: print("登录失败,请检查账号密码")