一、表单自动化概述
表单自动填写与网页数据采集是Web自动化办公中最常见也最具实用价值的场景之一。随着企业数字化转型的深入,大量业务操作仍然依赖Web表单完成,从数据录入、批量注册到在线申报,手工重复填写已成为效率瓶颈。Web表单自动化的核心价值在于:将人工重复操作转化为脚本自动执行,大幅提升工作效率并降低人为出错率。
从技术框架选型来看,主流的浏览器自动化工具包括Selenium、Playwright和Puppeteer三大阵营。Selenium作为老牌框架,生态成熟、社区庞大,支持所有主流浏览器,适合传统企业级项目。Playwright是微软推出的后起之秀,API设计更现代,支持多浏览器、多语言,内置自动等待和网络拦截功能,在速度和稳定性上优于Selenium。Puppeteer仅支持Chromium内核,但在Node.js生态中表现优异。对于Python技术栈的办公自动化场景,Selenium和Playwright是最推荐的两个选择。
在实际应用中,必须充分重视安全与法律合规问题。表单自动化可能涉及用户隐私数据(如身份证号、手机号、银行账号),处理这些数据时需遵守《个人信息保护法》相关规定。自动化操作不应绕过网站的认证机制、验证码保护或访问控制,也不得用于恶意注册、刷票、刷单等违反平台规则的行为。合法的应用场景包括:企业内部系统的数据录入自动化、经授权的数据采集与分析、软件测试中的表单验证、个人重复性工作的脚本化等。建议在实施自动化前,详细阅读目标网站的robots.txt和服务条款,确保操作合规。
表单自动化的整体工作流通常包含五个阶段:目标表单分析(定位页面元素、理解交互逻辑)→ 数据准备(从Excel/CSV/数据库读取数据源)→ 自动化执行(逐字段填充、提交、处理异常)→ 结果验证(检查提交是否成功、提取返回信息)→ 后续处理(数据导出、日志记录、失败重试)。下面先从一个最简单的Selenium表单填写入门示例开始。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# 初始化浏览器驱动
driver = webdriver.Chrome()
driver.get("https://example.com/form")
try:
# 等待表单加载完成
form = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "myForm"))
)
# 填充文本输入框
name_input = driver.find_element(By.NAME, "username")
name_input.send_keys("张三")
# 提交表单
submit_btn = driver.find_element(By.CSS_SELECTOR, "button[type='submit']")
submit_btn.click()
# 验证提交结果
success_msg = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CLASS_NAME, "success"))
)
print(f"表单提交成功: {success_msg.text}")
finally:
driver.quit()
from playwright.sync_api import sync_playwright
# 使用 Playwright 填写表单(API 更简洁)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto("https://example.com/form")
# 填充表单字段
page.fill("#name", "张三")
page.select_option("#city", "北京")
page.check("#agree")
page.fill("#remark", "这是一条自动提交的测试数据")
# 截取填写后的表单截图
page.screenshot(path="form_filled.png")
# 点击提交并等待导航完成
with page.expect_navigation():
page.click("button[type='submit']")
print("表单提交完成")
browser.close()
二、表单字段填充
Web表单的字段类型多种多样,每种类型的填充方式各不相同。掌握各类字段的定位与操作技巧,是表单自动化的基本功。最常见的字段类型包括:文本输入框、下拉选择框、单选框/复选框、日期选择器、文件上传控件以及隐藏字段。下面逐一讲解每种类型的自动化操作方法。
文本输入框
文本输入框是最基本的表单元素,Selenium使用send_keys()方法输入内容,Playwright使用fill()方法。需要注意的是,输入前应先使用clear()清空已有内容,避免追加写入。对于富文本编辑器(如CKEditor、TinyMCE),通常需要先切换到iframe,然后操作编辑器的内容区域。对于密码输入框,处理方法与普通文本框相同,但要注意不要在日志中明文记录密码。
下拉选择框
下拉选择框分为原生Select元素和自定义下拉(通常由ul/li模拟)。原生Select可以使用Selenium的Select类进行操作,支持按可见文本、值或索引选择。自定义下拉则需要先点击展开下拉菜单,再点击目标选项,这需要更精细的等待和定位策略。联动下拉(如选择省份后动态加载城市列表)需要额外处理AJAX加载等待。
单选框与复选框
单选框(radio)和复选框(checkbox)是最常见的选项控件。操作方式为先定位到目标元素,然后调用click()方法。对于复选框,通常需要先检查其当前状态(is_selected()),只有在未选中时才执行点击,避免重复操作导致状态反转。单选框在同一组中只能选择一个,因此直接点击目标选项即可。
日期选择器与文件上传
日期选择器有两种常见实现:一种是原生input[type=date],可以直接用send_keys()输入日期字符串(格式为yyyy-MM-dd);另一种是JavaScript自定义的日期控件,需要逐一点击年份/月份/日期,操作较为复杂。文件上传控件通常通过input[type=file]实现,使用send_keys()传入本地文件路径即可。注意路径中的反斜杠需要转义,或使用正斜杠。对于隐藏的文件上传控件(opacity:0或display:none),需要先通过JavaScript使其可见再操作。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.keys import Keys
import time
driver = webdriver.Chrome()
driver.get("https://example.com/complex-form")
# 1. 文本输入框:先清空再填入
name = driver.find_element(By.ID, "fullname")
name.clear()
name.send_keys("李四")
# 2. 原生下拉选择框:使用 Select 类
city_select = Select(driver.find_element(By.ID, "city"))
city_select.select_by_visible_text("上海市") # 按可见文本选择
# city_select.select_by_value("shanghai") # 按值选择
# city_select.select_by_index(2) # 按索引选择
# 3. 复选框:判断状态后再操作
agree_checkbox = driver.find_element(By.NAME, "agreement")
if not agree_checkbox.is_selected():
agree_checkbox.click()
# 4. 单选框:直接点击目标选项
gender_radio = driver.find_element(By.CSS_SELECTOR, "input[name='gender'][value='male']")
gender_radio.click()
# 5. 日期输入
date_input = driver.find_element(By.NAME, "birthday")
date_input.clear()
date_input.send_keys("1990-01-15")
# 6. 文件上传
file_input = driver.find_element(By.NAME, "attachment")
file_input.send_keys(r"C:\Users\Documents\resume.pdf")
time.sleep(2)
driver.quit()
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto("https://example.com/complex-form")
# 1. 文本框填充
page.fill("#fullname", "李四")
# 2. 原生下拉选择
page.select_option("#city", label="上海市")
# 3. 复选框操作
page.check("input[name='agreement']")
# 4. 单选框操作
page.check("input[name='gender'][value='male']")
# 5. 日期输入
page.fill("input[name='birthday']", "1990-01-15")
# 6. 文件上传
page.set_input_files("input[name='attachment']", "C:/Users/Documents/resume.pdf")
# 7. 自定义下拉框处理(非Select元素)
page.click("#custom-dropdown")
page.click("#custom-dropdown .option-item:has-text('选项三')")
print("所有字段填充完成")
browser.close()
三、数据源驱动
表单自动化的核心价值在于批量处理,而批量处理离不开数据源驱动。实际业务场景中,需要填写的表单数据通常存储在Excel表格、CSV文件或JSON文档中。数据源驱动模式的核心思想是:将数据与逻辑分离,通过读取外部数据源逐行驱动表单填写流程,从而实现大批量数据的自动化录入。这不仅是效率的提升,更是数据准确性的保障。
Excel是最常见的数据源格式,Python的openpyxl库可以方便地读取.xlsx文件,pandas库则提供了更强大的数据处理能力。CSV文件格式简单、兼容性好,使用Python内置的csv模块即可轻松读写。JSON格式适合结构化复杂数据(如嵌套字段、多层级关系),常用于API接口的数据传递。选择哪种数据源取决于具体业务场景:Excel最适合非技术人员维护的数据,CSV适合数据量较大且格式简单的场景,JSON适合具有层级结构的数据。
在实现数据源驱动时,需要建立数据字段与表单字段之间的映射关系。通常的做法是:Excel的列标题对应表单字段的name或id属性,程序读取每一行数据后,根据映射关系将数据填入对应的表单字段。数据验证是批量处理中的关键环节,需要在写入前检查数据的合法性(如必填项不为空、邮箱格式正确、手机号位数正确等)。对于不合法的数据,应跳过当前记录并记录错误日志,而不是终止整个流程。下面给出一个完整的实战示例。
import csv
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
def read_csv_data(filepath):
"""读取 CSV 数据源,返回字典列表"""
records = []
with open(filepath, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(row)
return records
def fill_single_record(driver, record):
"""将单条记录填入表单"""
try:
driver.find_element(By.NAME, "name").clear()
driver.find_element(By.NAME, "name").send_keys(record["姓名"])
driver.find_element(By.NAME, "phone").clear()
driver.find_element(By.NAME, "phone").send_keys(record["手机号"])
Select(driver.find_element(By.NAME, "city")).select_by_visible_text(record["城市"])
email_input = driver.find_element(By.NAME, "email")
email_input.clear()
email_input.send_keys(record["邮箱"])
# 提交表单
driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
time.sleep(1) # 等待提交完成
return True, ""
except Exception as e:
return False, str(e)
# 主流程
driver = webdriver.Chrome()
records = read_csv_data("registrations.csv")
success_count = 0
fail_count = 0
for idx, record in enumerate(records, 1):
print(f"正在处理第 {idx} 条记录: {record['姓名']}")
driver.get("https://example.com/register")
# 数据验证
if not record["手机号"] or len(record["手机号"]) != 11:
print(f" 跳过: 手机号格式不正确")
fail_count += 1
continue
success, error = fill_single_record(driver, record)
if success:
success_count += 1
print(f" 成功")
else:
fail_count += 1
print(f" 失败: {error}")
print(f"\n批量处理完成: 成功 {success_count} 条, 失败 {fail_count} 条")
driver.quit()
import pandas as pd
from playwright.sync_api import sync_playwright
# 使用 pandas 读取 Excel 数据源
df = pd.read_excel("products.xlsx", sheet_name="上架商品")
success_ids = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
for idx, row in df.iterrows():
page.goto("https://example.com/product/add")
page.wait_for_load_state("networkidle")
# 字段映射:Excel 列名 -> 表单字段 name
page.fill("input[name='product_name']", str(row["商品名称"]))
page.fill("input[name='price']", str(row["价格"]))
page.fill("input[name='stock']", str(row["库存"]))
category = str(row["分类"])
page.select_option("select[name='category']", label=category)
description = str(row["描述"]) if pd.notna(row["描述"]) else ""
page.fill("textarea[name='description']", description)
# 提交并验证
with page.expect_navigation(timeout=10000) as nav:
page.click("button[type='submit']")
if page.url.__contains__("success"):
success_ids.append(row.get("商品ID", idx))
print(f"[成功] 商品 {row['商品名称']} 已上架")
else:
print(f"[失败] 商品 {row['商品名称']} 上架失败")
browser.close()
print(f"批量上架完成,成功 {len(success_ids)} 个商品")
四、动态/复杂表单
现代Web应用广泛使用AJAX、异步加载和动态交互技术,使得表单自动化处理的复杂度大幅提升。动态表单的核心挑战在于:页面元素不是一次性加载完成的,而是根据用户操作逐步生成。这就需要自动化脚本具备智能等待能力,既要等待元素出现,也要等待数据加载完成,还要处理各种异步交互场景。
AJAX异步加载是最常见的动态表单场景。例如,选择省份后,城市列表通过AJAX请求动态获取,选择城市后再加载区县列表,形成三级联动。处理这种场景的关键在于:每完成一步操作后,必须显式等待下一步的元素出现,而不是使用固定的time.sleep()。WebDriverWait结合expected_conditions是处理异步加载的最佳实践,Playwright内置的auto-waiting机制则更加智能,会在执行操作前自动等待元素稳定。
分步表单(Step Form / Wizard)将长表单拆分为多个步骤,每步完成后点击"下一步"按钮切换到下一步骤。自动化处理时需要跟踪当前步骤,确保在正确的步骤填写对应的字段。弹窗表单(Modal/Dialog)在页面加载完成后通过点击按钮弹出,此时需要显式等待弹窗可见后再操作表单元素。拖拽排序(Drag and Drop)涉及鼠标操作,Selenium的ActionChains和Playwright的drag_to方法都能实现。富文本编辑器通常嵌入在iframe中,需要先切换到iframe上下文,再操作编辑器的主体内容。下面给出处理这些复杂场景的代码示例。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
driver = webdriver.Chrome()
driver.get("https://example.com/wizard-form")
wait = WebDriverWait(driver, 10)
# === 第一步:基本信息 ===
wait.until(EC.presence_of_element_located((By.ID, "step1"))).is_displayed()
driver.find_element(By.NAME, "name").send_keys("王五")
driver.find_element(By.NAME, "email").send_keys("wangwu@example.com")
driver.find_element(By.CSS_SELECTOR, "#step1 .next-btn").click()
# === 第二步:地址选择(联动下拉) ===
wait.until(EC.visibility_of_element_located((By.ID, "step2")))
# 选择省份(触发AJAX加载城市)
Select(driver.find_element(By.ID, "province")).select_by_visible_text("广东省")
# 等待城市下拉框的选项加载完成
wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#city option:not([value=''])")),
"城市选项加载超时"
)
Select(driver.find_element(By.ID, "city")).select_by_visible_text("深圳市")
driver.find_element(By.CSS_SELECTOR, "#step2 .next-btn").click()
# === 第三步:弹窗表单(富文本编辑器) ===
# 等待弹窗出现
modal = wait.until(EC.visibility_of_element_located((By.ID, "editorModal")))
# 切换到富文本编辑器 iframe
iframe = driver.find_element(By.CSS_SELECTOR, "iframe.cke_wysiwyg_frame")
driver.switch_to.frame(iframe)
editor_body = driver.find_element(By.TAG_NAME, "body")
editor_body.send_keys("这是通过自动化输入的富文本内容")
driver.switch_to.default_content() # 切回主文档
# 点击弹窗中的确认按钮
driver.find_element(By.CSS_SELECTOR, "#editorModal .confirm-btn").click()
# === 第四步:拖拽排序 ===
wait.until(EC.visibility_of_element_located((By.ID, "step4")))
source = driver.find_element(By.CSS_SELECTOR, ".sort-item[data-id='3']")
target = driver.find_element(By.CSS_SELECTOR, ".sort-item[data-id='1']")
ActionChains(driver).drag_and_drop(source, target).perform()
# 最终提交
driver.find_element(By.ID, "finalSubmit").click()
print("复杂表单提交完成")
driver.quit()
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto("https://example.com/dynamic-form")
# 处理 AJAX 联动下拉
page.select_option("#province", label="广东省")
# Playwright 自动等待城市选项加载完成
page.wait_for_selector("#city option:not([value=''])")
page.select_option("#city", label="广州市")
# 处理弹窗表单
page.click("#openModalBtn")
modal = page.wait_for_selector(".modal:visible")
# 处理 iframe 中的富文本编辑器
frame = page.frame_locator("iframe.cke_wysiwyg_frame")
frame.locator("body").fill("自动化输入的富文本内容")
# 关闭弹窗
page.click(".modal .confirm-btn")
# 处理 AJAX 异步提交的表单
# 使用 expect_response 等待 AJAX 请求完成
with page.expect_response(lambda resp: "/api/submit" in resp.url) as resp_info:
page.click("#ajaxSubmit")
response = resp_info.value
result = response.json()
if result.get("success"):
print(f"AJAX 表单提交成功, ID: {result.get('id')}")
else:
print(f"提交失败: {result.get('message')}")
browser.close()
五、验证码处理
验证码(CAPTCHA)是网站防止自动化操作的第一道防线,也是表单自动化中最棘手的障碍之一。验证码的类型多种多样,从简单的数字字母图片验证码,到行为验证码(滑块拼图、点选文字)、无感验证码(reCAPTCHA v3),自动化处理策略也各不相同。需要明确的是,任何绕过验证码的行为都应当遵守法律法规和平台规则,仅限用于合法的测试和授权场景。
对于传统的图片验证码,可以使用OCR(光学字符识别)技术进行识别。Python的pytesseract库(基于Tesseract引擎)是最常用的解决方案,但对于干扰严重的验证码识别率较低。更专业的方案是使用深度学习模型(如CNN)进行定制化训练,适用于验证码风格固定的内部系统。对于滑块验证码,核心策略是模拟人类拖拽行为:计算目标位置偏移量,生成符合贝塞尔曲线或加速度规律的拖拽轨迹,控制拖拽速度(先快后慢)。第三方打码服务(如打码平台)提供了付费的验证码识别API,识别率高且支持多种验证码类型,适合企业级应用。
在实际项目中,更务实的策略是多层次组合:第一层尝试本地OCR识别,失败后切换第三方服务,再失败则人工介入。人工介入可以通过消息通知(钉钉/微信)将验证码图片推送给运维人员,由人工识别后输入结果,脚本继续执行。这种方式虽然降低了完全自动化的程度,但保证了流程的可靠性。此外,一些合规的绕过策略包括:使用已登录的Cookie/Session绕过验证页面、控制请求频率避免触发验证码机制、使用IP白名单访问内部系统等。
import pytesseract
from PIL import Image
import requests
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
# 配置 Tesseract 路径
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
driver = webdriver.Chrome()
driver.get("https://example.com/login")
# 1. 获取验证码图片并识别
captcha_img = driver.find_element(By.ID, "captcha-img")
img_src = captcha_img.get_attribute("src")
# 下载图片到内存
response = requests.get(img_src)
img = Image.open(BytesIO(response.content))
# 图片预处理:灰度化 + 二值化 + 去噪点
img = img.convert("L") # 灰度
threshold = 140
img = img.point(lambda x: 255 if x > threshold else 0) # 二值化
img.save("captcha_processed.png") # 保存以便调试
# OCR 识别
captcha_text = pytesseract.image_to_string(img, config="--psm 7").strip()
print(f"验证码识别结果: {captcha_text}")
# 2. 输入验证码
driver.find_element(By.NAME, "captcha").send_keys(captcha_text)
driver.find_element(By.NAME, "username").send_keys("admin")
driver.find_element(By.NAME, "password").send_keys("password123")
driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 3. 检查是否识别失败(页面包含验证码错误提示)
time.sleep(2)
if "验证码错误" in driver.page_source:
# 刷新验证码并重试
driver.find_element(By.ID, "refresh-captcha").click()
time.sleep(1)
# 切换到第三方打码服务...
print("本地OCR识别失败,需要切换为第三方服务或者人工介入")
driver.quit()
# 滑块验证码处理示例
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
import random
import time
def generate_track(distance):
"""
生成模拟人类拖拽的轨迹(先快后慢)
distance: 需要拖拽的总距离(像素)
返回: 轨迹点列表 [(x_offset, y_offset, sleep_time), ...]
"""
track = []
current = 0
mid = distance * 0.7 # 加速阶段占70%
while current < distance:
if current < mid:
# 加速阶段
move = random.randint(3, 8)
else:
# 减速阶段 + 小幅回拉修正
move = random.randint(1, 3)
current += move
if current > distance:
move -= (current - distance)
current = distance
# 模拟人手抖动的Y轴偏移
y_offset = random.randint(-2, 2)
track.append((move, y_offset, random.uniform(0.01, 0.03)))
# 最后添加一个微小回拉(模拟人手过冲后的修正)
track.append((-random.randint(1, 3), 0, 0.05))
return track
driver = webdriver.Chrome()
driver.get("https://example.com/slider-captcha")
# 定位滑块元素
slider = driver.find_element(By.CSS_SELECTOR, ".slider-btn")
# 获取滑块需要滑动的距离(需根据实际页面计算)
# 通常滑动距离 = 缺口位置 - 滑块初始位置
slide_distance = 258 # 示例距离,实际需动态计算
# 执行拖拽
action = ActionChains(driver)
action.click_and_hold(slider).perform()
track = generate_track(slide_distance)
for x_offset, y_offset, sleep_time in track:
action.move_by_offset(xoffset=x_offset, yoffset=y_offset).perform()
time.sleep(sleep_time)
# 释放鼠标
action.release().perform()
time.sleep(2)
# 验证是否通过
if "通过" in driver.page_source:
print("滑块验证通过")
else:
print("滑块验证失败,可能需要重试")
六、分页数据采集
表单提交后的数据采集通常涉及分页处理。无论是从表格列表中提取数据,还是在搜索结果中获取信息,分页都是必不可少的环节。不同的Web应用采用不同的分页机制,自动化脚本需要针对不同的分页类型采用对应的处理策略。常见分页类型包括:URL参数分页、滚动加载(无限滚动)、点击"加载更多"按钮、AJAX异步分页等。
URL参数分页是最传统的分页方式,URL中通常包含page=1、offset=20、pageIndex=0等参数,通过修改URL参数即可切换页面。这种分页方式最容易处理,直接循环修改URL参数并发起请求即可。滚动加载(无限滚动)常见于社交媒体和资讯类网站,页面会随着滚动条向下滚动而自动加载新内容。处理无限滚动需要模拟页面滚动操作,并持续检测页面高度的变化,直到满足终止条件(如达到最大页数或页面高度不再变化)。
AJAX分页是现代Web应用最常用的分页方式,点击页码按钮后通过AJAX请求获取数据并更新页面内容。这种方式的处理要点是:点击页码按钮后,等待AJAX请求完成(监听network idle状态或等待数据容器元素更新),然后再提取数据。使用Playwright的expect_response或Selenium的WebDriverWait都可以有效处理这种场景。对于点击"加载更多"按钮类型的分页,只需循环检测按钮是否存在,存在则点击并等待新内容加载,不存在则说明已经加载完全部数据。下面给出各种分页类型的代码示例。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import csv
driver = webdriver.Chrome()
base_url = "https://example.com/products?page={}"
all_data = []
for page_num in range(1, 11): # 采集前10页
url = base_url.format(page_num)
driver.get(url)
print(f"正在采集第 {page_num} 页...")
# 等待数据行加载
rows = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".product-row"))
)
for row in rows:
data = {
"name": row.find_element(By.CSS_SELECTOR, ".product-name").text,
"price": row.find_element(By.CSS_SELECTOR, ".product-price").text,
"stock": row.find_element(By.CSS_SELECTOR, ".product-stock").text,
}
all_data.append(data)
# 控制采集频率,避免对服务器造成压力
time.sleep(1.5)
# 保存为 CSV 文件
with open("products_data.csv", "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["name", "price", "stock"])
writer.writeheader()
writer.writerows(all_data)
print(f"数据采集完成,共采集 {len(all_data)} 条记录")
driver.quit()
# 处理无限滚动加载的示例
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
driver = webdriver.Chrome()
driver.get("https://example.com/infinite-scroll")
page_num = 0
max_pages = 20
last_height = 0
data = []
while page_num < max_pages:
# 获取当前可见的所有数据项
items = driver.find_elements(By.CSS_SELECTOR, ".data-item")
print(f"第 {page_num + 1} 轮: 已加载 {len(items)} 条数据")
# 提取当前页数据
for item in items:
data.append({
"title": item.find_element(By.CSS_SELECTOR, ".item-title").text,
"content": item.find_element(By.CSS_SELECTOR, ".item-content").text,
})
# 模拟向下滚动到页面底部,触发加载更多
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2) # 等待 AJAX 加载
# 检查页面高度是否变化(判断是否还有更多内容)
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
print("页面高度不再变化,数据已全部加载完毕")
break
last_height = new_height
page_num += 1
print(f"最终采集到 {len(data)} 条数据")
# 去重(滚动加载可能导致重复)
unique_data = []
seen_titles = set()
for item in data:
if item["title"] not in seen_titles:
seen_titles.add(item["title"])
unique_data.append(item)
print(f"去重后:{len(unique_data)} 条")
driver.quit()
# 使用 Playwright 处理 "点击加载更多" 分页
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto("https://example.com/load-more")
all_items = []
click_count = 0
max_clicks = 30
while click_count < max_clicks:
# 提取当前所有可见项目
items = page.locator(".item-card").all()
for item in items:
all_items.append({
"title": item.locator(".item-title").inner_text(),
"link": item.locator("a").get_attribute("href"),
})
# 检查"加载更多"按钮是否还存在
load_more = page.locator("#loadMoreBtn")
if not load_more.is_visible():
print("加载更多按钮已隐藏或消失,数据加载完毕")
break
load_more.click()
# 等待新内容加载完成(等待新元素出现)
page.wait_for_load_state("networkidle")
click_count += 1
print(f"已点击 {click_count} 次,累计 {len(all_items)} 条项目")
print(f"最终采集到 {len(all_items)} 条数据")
browser.close()
七、数据清洗与导出
从Web页面采集的原始数据通常存在各种质量问题:重复记录、格式不统一、缺失字段、编码错误、混杂HTML标签等。数据清洗是将原始数据转化为结构化、标准化信息的关键步骤,其重要性不亚于数据采集本身。高质量的数据清洗能够为后续的数据分析和业务决策提供可靠基础。
数据去重是清洗的第一步。同一数据可能在多次采集中重复出现,需要通过唯一标识字段(如商品ID、文章URL)或字段组合(如标题+发布时间)进行去重。Python的pandas库提供了drop_duplicates()方法,可以方便地基于指定列进行去重。格式标准化包括:日期格式统一为yyyy-MM-dd、货币金额统一为数字类型、电话号码统一为连续数字、文本去除多余空格和换行等。字段映射转换用于将不同来源的数据映射到统一的字段结构,例如将"联系电话"和"手机号码"统一映射为"phone"字段。
数据导出是将清洗后的数据持久化存储的过程。导出格式的选择取决于数据的使用方:Excel(.xlsx)适合业务人员直接查看和编辑,CSV适合数据量较大且需要跨系统交换的场景,数据库(SQLite/MySQL/MongoDB)适合需要频繁查询和关联分析的大数据集,JSON适合作为API接口的数据输出格式。对于大数据量(超过10万条),建议使用数据库存储并按批次导入,避免内存溢出。同时建议为每次采集任务添加时间戳和来源标记,便于数据追踪和增量更新。
import pandas as pd
import re
# 读取原始采集数据
raw_data = pd.read_csv("raw_products.csv")
print(f"原始数据量: {len(raw_data)} 条")
# === 1. 数据去重 ===
# 基于商品名称和价格组合去重,保留第一次出现的记录
deduped = raw_data.drop_duplicates(subset=["name", "price"], keep="first")
print(f"去重后: {len(deduped)} 条(去除 {len(raw_data) - len(deduped)} 条重复)")
# === 2. 格式标准化 ===
def clean_price(price_str):
"""清洗价格:去掉货币符号和逗号,转为浮点数"""
if pd.isna(price_str):
return 0.0
cleaned = re.sub(r"[¥¥$,元\s]", "", str(price_str))
try:
return float(cleaned)
except ValueError:
return 0.0
def clean_phone(phone):
"""清洗手机号:去除非数字字符"""
if pd.isna(phone):
return ""
digits = re.sub(r"\D", "", str(phone))
return digits if len(digits) == 11 else ""
# 应用清洗函数
deduped["price_clean"] = deduped["price"].apply(clean_price)
deduped["phone_clean"] = deduped["phone"].apply(clean_phone)
# 删除价格异常的数据(价格小于0.01或大于1000000)
deduped = deduped[(deduped["price_clean"] >= 0.01) & (deduped["price_clean"] <= 1000000)]
# === 3. 字段映射 ===
# 将采集数据的列名映射为统一标准
column_mapping = {
"product_name": "name",
"goods_name": "name",
"selling_price": "price_clean",
"contact_phone": "phone_clean",
}
deduped.rename(columns=column_mapping, inplace=True)
# === 4. 缺失值处理 ===
deduped.fillna({"description": "", "category": "未分类"}, inplace=True)
# === 5. 导出为 Excel ===
with pd.ExcelWriter("cleaned_products.xlsx", engine="openpyxl") as writer:
deduped.to_excel(writer, sheet_name="商品数据", index=False)
# 添加数据摘要工作表
summary = pd.DataFrame({
"指标": ["原始数据量", "去重后", "清洗后", "字段数"],
"值": [len(raw_data), len(deduped), len(deduped), len(deduped.columns)]
})
summary.to_excel(writer, sheet_name="数据摘要", index=False)
print(f"清洗完成,导出 {len(deduped)} 条记录到 cleaned_products.xlsx")
import sqlite3
import json
# 将清洗后的数据存入 SQLite 数据库
def export_to_database(data_list, db_path="collected_data.db"):
"""将数据列表导出到 SQLite 数据库"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建表(如果不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL,
category TEXT,
description TEXT,
source_url TEXT,
collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 批量插入数据
inserted = 0
for item in data_list:
try:
cursor.execute("""
INSERT INTO products (name, price, category, description, source_url)
VALUES (?, ?, ?, ?, ?)
""", (
item.get("name"),
item.get("price", 0.0),
item.get("category", "未分类"),
item.get("description", ""),
item.get("source_url", "")
))
inserted += 1
except Exception as e:
print(f"插入失败: {item.get('name')} - {e}")
conn.commit()
conn.close()
print(f"成功导入 {inserted}/{len(data_list)} 条数据到 SQLite")
# 同时导出 JSON 格式(供 API 使用)
def export_to_json(data_list, filepath="products_export.json"):
"""导出为 JSON 格式"""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data_list, f, ensure_ascii=False, indent=2)
print(f"已导出 {len(data_list)} 条数据到 {filepath}")
# 使用示例
products = [
{"name": "商品A", "price": 99.9, "category": "电子产品", "source_url": "https://example.com/a"},
{"name": "商品B", "price": 199.0, "category": "家居用品", "source_url": "https://example.com/b"},
]
export_to_database(products)
export_to_json(products)
八、反爬策略应对
在Web数据采集中,目标网站通常会采取各种反爬措施来保护自身数据和服务资源。理解并合理应对这些反爬策略,是保障自动化流程稳定运行的必要能力。但需要强调的是,应对反爬策略必须在合法合规的前提下进行,尊重目标网站的服务条款,不得用于恶意爬取或破坏网站正常运营。
最常见的反爬机制包括:IP频率限制(同一IP单位时间内访问次数过多被封禁)、User-Agent检测(非浏览器UA被拒绝)、Cookie/Session验证(无有效会话被拦截)、浏览器指纹检测(识别自动化工具特征)、JavaScript渲染验证(需要执行JS才能获取真实内容)、验证码拦截(异常行为触发验证码)等。有效的应对策略是多层次的:首先模拟真实用户的浏览行为,其次通过代理池分散请求来源,最后合理控制请求频率和节奏。
IP代理池是应对频率限制最有效的手段。可以从付费代理服务商购买高质量代理IP,或自建代理池。使用时需要注意代理的可用性验证、自动剔除失效代理、按代理质量分级等。User-Agent轮换可以伪装成不同浏览器和操作系统,避免因UA特征过于单一而被识别。请求间隔控制(添加随机延迟)是模拟人类行为的基础手段,延迟时间应该在一个范围内随机分布,而不是固定值。在更高级的场景中,可以使用指纹浏览器(如Undetected Chromedriver、Playwright Stealth)来规避基于浏览器特征的检测。下面给出综合使用多种反爬策略的示例。
import random
import time
import requests
from fake_useragent import UserAgent
from itertools import cycle
class SmartCrawler:
"""
智能爬虫:集成代理池、UA轮换、请求间隔控制
"""
def __init__(self, proxy_list=None):
self.ua = UserAgent()
self.proxies = cycle(proxy_list) if proxy_list else None
self.session = requests.Session()
self.last_request_time = 0
def get_headers(self):
"""生成随机请求头,模拟不同浏览器"""
headers = {
"User-Agent": self.ua.random,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": random.choice([
"https://www.google.com/",
"https://www.baidu.com/",
"https://www.bing.com/",
]),
}
return headers
def get_proxy(self):
"""从代理池中获取一个代理"""
if self.proxies:
return {"http": next(self.proxies), "https": next(self.proxies)}
return None
def smart_sleep(self):
"""智能休眠:模拟人类浏览行为的时间间隔"""
base_sleep = random.uniform(1.5, 3.5)
# 偶尔添加长间隔,模拟阅读行为
if random.random() < 0.1: # 10%的概率
base_sleep += random.uniform(5, 10)
time.sleep(base_sleep)
def fetch(self, url, max_retries=3):
"""带重试机制的请求方法"""
for attempt in range(max_retries):
self.smart_sleep()
try:
response = self.session.get(
url,
headers=self.get_headers(),
proxies=self.get_proxy(),
timeout=15,
)
if response.status_code == 200:
return response.text
elif response.status_code == 429:
# 请求过于频繁,加倍等待
wait_time = 30 * (attempt + 1)
print(f"触发频率限制,等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
elif response.status_code == 403:
print(f"被禁止访问 {url},更换代理...")
if self.proxies:
self.smart_sleep()
else:
print(f"HTTP {response.status_code}: {url}")
except requests.RequestException as e:
print(f"请求异常: {e}, 重试 {attempt + 1}/{max_retries}")
time.sleep(5)
return None
def close(self):
self.session.close()
# 使用示例
crawler = SmartCrawler(proxy_list=[
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
])
content = crawler.fetch("https://example.com/data")
if content:
print(f"成功获取页面,大小: {len(content)} 字符")
crawler.close()
# 使用 undetected_chromedriver 规避浏览器指纹检测
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
import random
import time
def create_stealth_driver():
"""创建规避检测的 Chrome 浏览器实例"""
options = uc.ChromeOptions()
# 设置窗口大小(模拟真实显示器分辨率)
width = random.choice([1366, 1440, 1536, 1920])
height = random.choice([768, 900, 864, 1080])
options.add_argument(f"--window-size={width},{height}")
# 禁用自动化特征标志
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-automation")
# 设置语言和时区
options.add_argument("--lang=zh-CN")
# 初始化驱动(自动处理 chromedriver 版本)
driver = uc.Chrome(options=options)
# 通过 JS 进一步修改浏览器指纹
driver.execute_script("""
// 覆盖 navigator.webdriver 属性
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
// 修改 chrome 对象
window.chrome = { runtime: {} };
// 覆盖 permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
return driver
# 使用隐身浏览器采集数据
driver = create_stealth_driver()
driver.get("https://example.com/protected-data")
# 模拟真实用户行为:先缓慢滚动
for i in range(5):
driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i / 5});")
time.sleep(random.uniform(0.5, 1.5))
# 模拟鼠标移动(通过执行JS)
driver.execute_script("""
var event = new MouseEvent('mousemove', {
view: window,
bubbles: true,
cancelable: true,
clientX: Math.random() * window.innerWidth,
clientY: Math.random() * window.innerHeight
});
document.dispatchEvent(event);
""")
# 提取数据
data = driver.find_element(By.ID, "protected-content").text
print(f"成功获取受保护内容,长度: {len(data)} 字符")
driver.quit()
九、实战案例
理论和方法最终要落实到实际业务场景中。下面通过三个完整的实战案例,展示表单自动填写与网页数据采集在不同业务领域的应用。每个案例都包含了需求分析、技术选型、实现要点和完整代码,读者可以根据自身需求进行适配和扩展。
案例一:电商商品自动上架
某电商运营团队每天需要将数百个商品信息录入到商城后台。传统手工操作效率低下且容易出错。通过Selenium实现自动化上架:从Excel读取商品信息(名称、价格、库存、分类、描述、图片路径等),自动登录后台管理系统,逐条创建商品并上传图片。在实现中需要注意:图片上传需要较长时间等待,每上架一个商品后要验证是否成功(检查页面是否出现成功提示或跳转到商品详情页),失败的商品要记录原因以便人工复核。
"""
实战案例一:电商商品自动上架
数据来源: Excel 文件 products.xlsx
目标系统: 电商后台管理系统
"""
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
import time
import os
def auto_list_product(driver, product):
"""自动上架单个商品"""
driver.get("https://shop.example.com/admin/product/add")
wait = WebDriverWait(driver, 10)
# 填写基本信息
wait.until(EC.presence_of_element_located((By.NAME, "name")))
driver.find_element(By.NAME, "name").send_keys(product["商品名称"])
driver.find_element(By.NAME, "price").send_keys(str(product["价格"]))
driver.find_element(By.NAME, "stock").send_keys(str(product["库存"]))
Select(driver.find_element(By.NAME, "category_id")).select_by_visible_text(product["分类"])
# 上传商品图片
image_path = product.get("图片路径", "")
if image_path and os.path.exists(image_path):
driver.find_element(By.NAME, "image").send_keys(os.path.abspath(image_path))
# 等待图片上传完成(根据上传进度条判断)
time.sleep(3)
# 填写详细描述(富文本编辑器)
iframe = driver.find_element(By.CSS_SELECTOR, "iframe.rich-editor")
driver.switch_to.frame(iframe)
driver.find_element(By.TAG_NAME, "body").send_keys(product.get("描述", ""))
driver.switch_to.default_content()
# 提交表单
driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
# 验证上架结果
time.sleep(2)
if "添加成功" in driver.page_source or "success" in driver.current_url:
return True, "上架成功"
else:
return False, "提交后未检测到成功提示"
# 主程序
driver = webdriver.Chrome()
# 先登录
driver.get("https://shop.example.com/admin/login")
driver.find_element(By.NAME, "username").send_keys("admin")
driver.find_element(By.NAME, "password").send_keys("password123")
driver.find_element(By.CSS_SELECTOR, "button[type='submit']").click()
time.sleep(2)
# 读取商品数据
products = pd.read_excel("products.xlsx")
results = []
for idx, product in products.iterrows():
print(f"正在上架第 {idx + 1}/{len(products)} 个商品: {product['商品名称']}")
success, msg = auto_list_product(driver, product)
results.append({"商品": product["商品名称"], "结果": "成功" if success else "失败", "备注": msg})
print(f" {'✓' if success else '✗'} {msg}")
time.sleep(1)
# 输出结果汇总
result_df = pd.DataFrame(results)
result_df.to_excel("上架结果.xlsx", index=False)
print(f"\n上架完成: 成功 {result_df[result_df['结果']=='成功'].shape[0]} 个, 失败 {result_df[result_df['结果']=='失败'].shape[0]} 个")
driver.quit()
案例二:问卷调查自动填写
某市场调研公司需要定期进行问卷测试和数据收集,验证问卷系统的逻辑正确性。通过自动化脚本模拟不同用户填写问卷,可以快速发现问卷流程中的逻辑缺陷和兼容性问题。该方案使用Playwright实现,支持单选、多选、量表题、矩阵题、排序题等多种题型。核心机制是:根据题型类型采用不同的填充策略(单选题随机选择一个选项、多选题随机选择2-3个选项、量表题按指定规则打分),每份问卷填写完成后提交并截图保存。
案例三:招标信息自动采集
某企业需要实时监控招标网站上与其业务相关的招标公告。每天定时采集指定关键词的招标信息,提取标题、发布时间、采购人、预算金额、截止日期等关键字段,并通过邮件或企业微信推送给业务团队。该案例综合运用了本章所学的多项技术:登录认证处理、分页数据采集、HTML解析与信息提取、数据清洗与结构化、多通道消息推送。为了确保信息的及时性,使用定时任务(Windows Task Scheduler或Linux Crontab)每日定时执行采集任务。
"""
实战案例三:招标信息自动采集与推送
综合应用:登录 + 分页采集 + 数据清洗 + 消息推送
"""
import requests
from bs4 import BeautifulSoup
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import time
import random
class BidCollector:
"""招标信息采集器"""
def __init__(self):
self.session = requests.Session()
self.keywords = ["信息化", "软件开发", "系统集成"]
self.collected = []
def login(self, username, password):
"""登录招标网站"""
login_url = "https://bid.example.com/login"
data = {"username": username, "password": password, "remember": "1"}
resp = self.session.post(login_url, data=data)
return "登录成功" in resp.text
def search_bids(self, keyword, pages=3):
"""搜索指定关键词的招标信息"""
for page in range(1, pages + 1):
url = f"https://bid.example.com/search?keyword={keyword}&page={page}"
resp = self.session.get(url)
soup = BeautifulSoup(resp.text, "html.parser")
items = soup.select(".bid-item")
for item in items:
bid = {
"title": item.select_one(".bid-title a").text.strip(),
"url": item.select_one(".bid-title a")["href"],
"publisher": item.select_one(".publisher").text.strip() if item.select_one(".publisher") else "",
"budget": self._extract_budget(item),
"deadline": item.select_one(".deadline").text.strip() if item.select_one(".deadline") else "",
"collected_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
}
self.collected.append(bid)
time.sleep(random.uniform(1, 3))
def _extract_budget(self, item):
"""提取预算金额"""
budget_elem = item.select_one(".budget")
if budget_elem:
text = budget_elem.text.strip()
import re
nums = re.findall(r"[\d,.]+", text)
return nums[0] if nums else ""
return ""
def push_to_wecom(self, webhook_url):
"""通过企业微信机器人推送"""
msg = "【招标信息日报】\n\n"
for bid in self.collected[:10]: # 推送前10条
msg += f"● {bid['title']}\n预算: {bid.get('budget', '未知')} | 截止: {bid.get('deadline', '未知')}\n{bid['url']}\n\n"
payload = {"msgtype": "text", "text": {"content": msg}}
self.session.post(webhook_url, json=payload)
def export(self, filepath="bids.json"):
"""导出采集结果"""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(self.collected, f, ensure_ascii=False, indent=2)
print(f"已导出 {len(self.collected)} 条招标信息到 {filepath}")
# 执行采集
collector = BidCollector()
if collector.login("user", "pass"):
for kw in collector.keywords:
print(f"正在采集关键词: {kw}")
collector.search_bids(kw, pages=2)
collector.export()
collector.push_to_wecom("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")
print(f"采集完成,共 {len(collector.collected)} 条记录")
else:
print("登录失败,请检查账号密码")