pdfplumber 是一个基于 pdfminer.six 构建的 Python 库,专注于从 PDF 文件中精确提取文本和表格数据。与传统的 PDF 解析库相比,pdfplumber 提供了更细粒度的访问控制,允许开发者获取每个字符的精确位置坐标(x0, y0, x1, y1)、字体名称、字号大小等信息,这使得它特别适合需要保留文档结构布局的提取场景。
pdfplumber 的核心优势在于其对 PDF 内部对象的完整暴露。它不像其他库那样只提供"黑盒"式的文本提取,而是将 PDF 的页面分解为字符(Char)、矩形(Rect)、线(Line)、图像(Image)等基础对象,让开发者可以根据实际需求灵活组合使用。这种设计思路使得 pdfplumber 在处理复杂排版、多栏布局、带表格的文档时表现出色。
pdfplumber 将 PDF 文档按照层次结构组织。最顶层是 PDF 文档对象,包含若干页面(Page)。每个页面由一系列基础对象组成,主要包括:Char(字符,最小的文本单元,包含字体、大小、坐标等属性)、Rect(矩形,通常用于表示表格边框或背景色块)、Line(线段,用于绘制表格线或分隔线)。理解这一对象模型是高效使用 pdfplumber 的关键,因为几乎所有的高级功能(如表格提取、区域文本提取)都是基于对这些基础对象的筛选和组合实现的。
import pdfplumber
# 打开PDF文件with pdfplumber.open("sample.pdf") as pdf:
# 获取页面总数print(f"总页数: {len(pdf.pages)}")
# 获取第一页
first_page = pdf.pages[0]
# 查看页面尺寸print(f"页面尺寸: {first_page.width} x {first_page.height}")
# 提取页面纯文本
text = first_page.extract_text()
print(text[:500])
# 深入探索页面对象with pdfplumber.open("sample.pdf") as pdf:
page = pdf.pages[0]
# 查看页面上所有字符对象
chars = page.chars
print(f"字符数量: {len(chars)}")
print("第一个字符属性:")
for key, value in chars[0].items():
print(f" {key}: {value}")
# 查看矩形对象(如表格边框)
rects = page.rects
print(f"矩形数量: {len(rects)}")
# 查看线段对象
lines = page.lines
print(f"线段数量: {len(lines)}")
defclean_table_data(raw_data):
"""清洗表格数据:去除空白、修复格式"""
cleaned = []
for row in raw_data:
# 去除每个单元格的前后空白
clean_row = []
for cell in row:
if cell isNone:
clean_row.append("")
else:
# 去除多余换行和空白
cell = cell.replace("\n", " ").strip()
clean_row.append(cell)
# 跳过完全空的行if any(cell for cell in clean_row):
cleaned.append(clean_row)
return cleaned
with pdfplumber.open("report.pdf") as pdf:
page = pdf.pages[0]
tables = page.find_tables()
if tables:
raw = tables[0].extract()
print("原始数据:")
for row in raw:
print(row)
cleaned = clean_table_data(raw)
print("\n清洗后数据:")
for row in cleaned:
print(row)
通过分析字符属性的分布模式,我们可以自动识别文档的结构元素:大号字体的文本通常为标题或表头,等宽字体的文本可能是代码或数据,特定的颜色可能表示批注或修改标记。这种基于字符属性的文档结构分析,是实现智能化 PDF 提取的基础。
defanalyze_chars_detail(chars):
"""详细分析字符属性"""print(f"共 {len(chars)} 个字符")
print("\n=== 字体统计 ===")
font_stats = {}
for c in chars:
fn = c["fontname"]
if fn notin font_stats:
font_stats[fn] = {"count": 0, "sizes": set(), "text": []}
font_stats[fn]["count"] += 1
font_stats[fn]["sizes"].add(round(c["size"], 1))
if len(font_stats[fn]["text"]) < 10:
font_stats[fn]["text"].append(c["text"])
for fn, stats in font_stats.items():
print(f" 字体 '{fn}': {stats['count']}个字符, 字号: {sorted(stats['sizes'])}")
print(f" 示例文本: {''.join(stats['text'][:5])}")
print("\n=== 字号分布 ===")
size_dist = {}
for c in chars:
sz = round(c["size"], 1)
size_dist[sz] = size_dist.get(sz, 0) + 1for sz in sorted(size_dist):
bar = "█" * min(size_dist[sz] // 10, 50)
print(f" {sz:4.1f}pt: {size_dist[sz]:5d}个 {bar}")
# 使用示例with pdfplumber.open("document.pdf") as pdf:
chars = pdf.pages[0].chars
analyze_chars_detail(chars)
矩形分析
PDF 中的矩形对象(Rect)通常表示表格的单元格背景、边框色块、按钮或图标区域。通过分析页面上的矩形对象,我们可以了解表格的结构布局、识别高亮区域、检测表单字段位置。矩形的属性包括四条边的坐标、填充颜色(stroking_color, non_stroking_color)、线宽(linewidth)等。在表格提取的上下文中,矩形分析可以帮助我们判断表格边框的完整性和连续性。
defanalyze_rects(page):
"""分析页面上的矩形对象"""
rects = page.rects
ifnot rects:
print("页面上没有矩形对象")
returnprint(f"矩形数量: {len(rects)}")
# 矩形尺寸分析
widths = [r["x1"] - r["x0"] for r in rects]
heights = [r["y1"] - r["y0"] for r in rects]
print(f"矩形宽度范围: {min(widths):.1f} - {max(widths):.1f}")
print(f"矩形高度范围: {min(heights):.1f} - {max(heights):.1f}")
# 查找表格区域(大尺寸矩形)
page_area = page.width * page.height
large_rects = [r for r in rects
if (r["x1"] - r["x0"]) * (r["y1"] - r["y0"]) > page_area * 0.01]
print(f"大尺寸矩形(>1%页面): {len(large_rects)}个")
# 检查矩形填充色
filled = [r for r in rects if r.get("non_stroking_color")]
if filled:
print(f"有填充色的矩形: {len(filled)}个(可能是表格表头背景)")
# 检测可能的表格行间隔
y_positions = sorted(set(round(r["y0"]) for r in large_rects))
if len(y_positions) > 2:
gaps = [y_positions[i+1] - y_positions[i] for i in range(len(y_positions)-1)]
print(f"行间隔: 最小值={min(gaps)}, 最大值={max(gaps)}, 平均={sum(gaps)/len(gaps):.1f}")
return large_rects
with pdfplumber.open("invoice.pdf") as pdf:
analyze_rects(pdf.pages[0])
线分析
线段(Line)是 PDF 表格结构的另一关键组成元素。表格的水平线和垂直线分别由水平线段和垂直线段表示。通过分析线段的起始和结束坐标,可以重构出表格的网格结构。pdfplumber 在 find_tables() 内部就使用了类似的线段分析算法。当默认的表格检测效果不佳时,手动分析线段分布可以帮助我们理解问题所在,从而有针对性地调整参数。
defanalyze_lines(page):
"""分析页面上的线段对象"""
lines = page.lines
ifnot lines:
print("页面上没有线段对象")
return# 区分水平线和垂直线
h_lines = [l for l in lines if abs(l["y0"] - l["y1"]) < 0.5]
v_lines = [l for l in lines if abs(l["x0"] - l["x1"]) < 0.5]
print(f"水平线段: {len(h_lines)}条")
print(f"垂直线段: {len(v_lines)}条")
# 水平线Y坐标聚类(识别表格行)
h_y_positions = sorted(set(round(l["y0"], 1) for l in h_lines))
if len(h_y_positions) > 2:
print(f"水平线Y坐标层次: {len(h_y_positions)}个不同位置")
print(f" Y范围: {min(h_y_positions):.1f} - {max(h_y_positions):.1f}")
# 检查是否形成均匀网格(表格特征)
gaps = [h_y_positions[i+1] - h_y_positions[i] for i in range(len(h_y_positions)-1)]
avg_gap = sum(gaps) / len(gaps)
uniformity = max(gaps) - min(gaps)
if uniformity < avg_gap * 0.3:
print(f" → 水平线分布均匀,疑似表格行(平均行距: {avg_gap:.1f})")
# 垂直线X坐标聚类(识别表格列)
v_x_positions = sorted(set(round(l["x0"], 1) for l in v_lines))
if len(v_x_positions) > 1:
print(f"垂直线X坐标层次: {len(v_x_positions)}个不同位置")
print(f" X范围: {min(v_x_positions):.1f} - {max(v_x_positions):.1f}")
return h_lines, v_lines
with pdfplumber.open("table_report.pdf") as pdf:
analyze_lines(pdf.pages[0])
defgroup_chars_into_words(chars, x_tolerance=2):
"""将字符按水平位置组合成单词"""ifnot chars:
return []
# 按行(Y坐标相近)分组
sorted_chars = sorted(chars, key=lambda c: (-c["top"], c["x0"]))
lines = []
current_line = [sorted_chars[0]]
for c in sorted_chars[1:]:
# 如果Y坐标差小于容差,视为同一行if abs(c["top"] - current_line[-1]["top"]) < 5:
current_line.append(c)
else:
lines.append(current_line)
current_line = [c]
if current_line:
lines.append(current_line)
# 将每行的字符组合成文本
result = []
for line in lines:
text = ""
prev_x = line[0]["x0"]
for c in line:
# 如果字符间距超过容差,插入空格if c["x0"] - prev_x > x_tolerance:
text += " "
text += c["text"]
prev_x = c["x1"]
result.append({
"text": text,
"y": line[0]["top"],
"x0": line[0]["x0"],
"x1": line[-1]["x1"],
"size": max(c["size"] for c in line),
"font": line[0]["fontname"],
})
return result
with pdfplumber.open("document.pdf") as pdf:
chars = pdf.pages[0].chars
words = group_chars_into_words(chars)
for w in words[:20]:
print(f"[{w['y']:6.1f}] {w['text']}")
import pandas as pd
import re
classPDFDataCleaner:
"""PDF提取数据清洗管道"""@staticmethoddefremove_whitespace(df):
"""去除所有字符串列的前后空白"""for col in df.select_dtypes(include=["object"]).columns:
df[col] = df[col].str.strip().str.replace(r"\s+", " ", regex=True)
return df
@staticmethoddefclean_numeric(df, columns):
"""将带有货币符号和逗号的字符串转换为数值"""for col in columns:
if col in df.columns:
df[col] = df[col].astype("str")
df[col] = df[col].str.replace(r"[¥$€,,]", "", regex=True)
df[col] = df[col].str.replace(r"[^\d.]", "", regex=True)
df[col] = pd.to_numeric(df[col], errors="coerce")
return df
@staticmethoddefclean_date(df, columns, fmt="%Y-%m-%d"):
"""统一日期格式"""for col in columns:
if col in df.columns:
# 常见日期格式转换
df[col] = df[col].astype("str")
df[col] = df[col].str.replace(r"(\d{4})[年](\d{1,2})[月](\d{1,2})[日]",
r"\1-\2-\3", regex=True)
df[col] = pd.to_datetime(df[col], errors="coerce")
return df
@staticmethoddefremove_empty_rows(df, thresh=1):
"""删除空值过多的行"""return df.dropna(thresh=thresh)
@classmethoddefclean(cls, df, numeric_cols=None, date_cols=None):
"""运行完整清洗管道"""
df = cls.remove_whitespace(df)
if numeric_cols:
df = cls.clean_numeric(df, numeric_cols)
if date_cols:
df = cls.clean_date(df, date_cols)
df = cls.remove_empty_rows(df)
return df
# 使用清洗管道
df = pd.read_csv("raw_data.csv")
cleaner = PDFDataCleaner()
df_cleaned = cleaner.clean(
df,
numeric_cols=["金额", "单价", "数量"],
date_cols=["日期", "到期日"]
)
print(df_cleaned.dtypes)
print(df_cleaned.head())
字段映射与导出
在将 PDF 数据导入到目标系统(如数据库、ERP系统、数据分析平台)之前,通常需要进行字段映射。源数据中的列名可能与目标系统的字段名不一致,需要建立映射关系。此外,还需要进行数据验证(检查必填字段、数据范围、格式正确性)和异常处理。完成清洗和映射后,可以将数据导出为多种格式:Excel(支持多工作表)、CSV(通用交换格式)、直接写入数据库等。
import pandas as pd
from pathlib import Path
defexport_pdf_data(pdf_path, output_dir="."):
"""完整的PDF提取、清洗和导出流程"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 字段映射
field_mapping = {
"商品名称": "product_name",
"规格型号": "specification",
"单位": "unit",
"数量": "quantity",
"含税单价": "unit_price",
"含税金额": "total_amount",
"税率": "tax_rate",
"税额": "tax_amount",
}
with pdfplumber.open(pdf_path) as pdf:
all_data = []
for page in pdf.pages:
tables = page.find_tables()
for table in tables:
raw = table.extract()
if len(raw) > 1:
# 使用第一行作为列名
headers = raw[0]
# 映射列名
mapped_headers = [field_mapping.get(h, h) for h in headers]
df_page = pd.DataFrame(raw[1:], columns=mapped_headers)
all_data.append(df_page)
ifnot all_data:
print("未提取到数据")
return# 合并所有页面的数据
df = pd.concat(all_data, ignore_index=True)
# 清洗数据
df = PDFDataCleaner.clean(
df,
numeric_cols=["quantity", "unit_price", "total_amount",
"tax_rate", "tax_amount"]
)
# 导出为Excel(带格式)
excel_path = output_dir / f"{Path(pdf_path).stem}_output.xlsx"with pd.ExcelWriter(excel_path, engine="openpyxl") as writer:
df.to_excel(writer, sheet_name="PDF提取数据", index=False)
# 自动调整列宽
worksheet = writer.sheets["PDF提取数据"]
for col in worksheet.columns:
max_len = max(len(str(cell.value or"")) for cell in col)
worksheet.column_dimensions[col[0].column_letter].width = min(max_len + 2, 40)
# 同时导出CSV
csv_path = output_dir / f"{Path(pdf_path).stem}_output.csv"
df.to_csv(csv_path, index=False, encoding="utf-8-sig")
print(f"成功导出 {len(df)} 行数据")
print(f"Excel: {excel_path}")
print(f"CSV: {csv_path}")
return df
# 使用示例
df = export_pdf_data("invoice_data.pdf", "output")
七、批量PDF处理
批量提取与文件夹遍历
在真实业务场景中,我们往往需要一次性处理大量 PDF 文件(如批量处理发票、月度报表、合同文档等)。Python 的 pathlib 库提供了便捷的文件系统操作接口,可以遍历指定目录及其子目录下的所有 PDF 文件。结合 pdfplumber 的页面迭代能力,可以实现全自动的批量处理管道。关键在于设计一个鲁棒的文件处理循环,能够优雅地处理单个文件失败的情况(如密码保护的文件、损坏的PDF),而不中断整个批处理流程。
from pathlib import Path
import pdfplumber
import pandas as pd
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("pdf_extract.log", encoding="utf-8"),
logging.StreamHandler()
]
)
deffind_all_pdfs(root_dir):
"""递归查找目录下所有PDF文件"""return sorted(Path(root_dir).rglob("*.pdf"))
defbatch_extract_text(pdf_files, output_dir="extracted_text"):
"""批量提取PDF文本到单独的txt文件"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
results = {"success": 0, "failed": 0, "errors": []}
for pdf_file in pdf_files:
try:
logging.info(f"正在处理: {pdf_file.name}")
with pdfplumber.open(str(pdf_file)) as pdf:
text_content = []
for i, page in enumerate(pdf.pages):
text = page.extract_text()
if text:
text_content.append(f"--- 第{i+1}页 ---\n{text}")
# 保存提取的文本
output_file = output_path / f"{pdf_file.stem}.txt"with open(output_file, "w", encoding="utf-8") as f:
f.write("\n\n".join(text_content))
results["success"] += 1
logging.info(f" → 已保存: {output_file.name}")
except Exception as e:
results["failed"] += 1
results["errors"].append({"file": str(pdf_file.name), "error": str(e)})
logging.error(f" → 失败: {e}")
# 输出汇总报告
logging.info("=" * 40)
logging.info(f"批量处理完成: 成功{results['success']}个, 失败{results['failed']}个")
if results["errors"]:
logging.info("失败文件列表:")
for err in results["errors"]:
logging.info(f" - {err['file']}: {err['error']}")
return results
# 使用示例# pdf_files = find_all_pdfs("./invoices/")# results = batch_extract_text(pdf_files, "./extracted_text/")
并发加速处理
当需要处理大量 PDF 文件时(数百甚至数千个),单线程串行处理可能非常耗时。Python 的 concurrent.futures 模块可以方便地实现多线程或多进程并发处理。对于 IO 密集型任务(如读取 PDF 文件),多线程效果显著;对于 CPU 密集型任务(如 OCR 识别),多进程更为适合。需要注意的是,并发处理时需要确保输出文件的命名不会冲突,并且要合理控制并发数量以避免耗尽系统资源。
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import pdfplumber
import pandas as pd
import time
defextract_table_from_pdf(pdf_path):
"""从单个PDF中提取表格数据"""try:
with pdfplumber.open(str(pdf_path)) as pdf:
all_rows = []
for page in pdf.pages:
tables = page.find_tables()
for table in tables:
for row in table.extract():
all_rows.append(row)
return {"file": pdf_path.name, "rows": all_rows, "success": True}
except Exception as e:
return {"file": pdf_path.name, "error": str(e), "success": False}
defparallel_extract(pdf_dir, max_workers=4):
"""多线程并发提取PDF表格"""
pdf_files = list(Path(pdf_dir).glob("*.pdf"))
print(f"找到 {len(pdf_files)} 个PDF文件,使用 {max_workers} 个线程并发处理...")
start_time = time.time()
all_data = []
success_count = 0with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_file = {executor.submit(extract_table_from_pdf, f): f for f in pdf_files}
# 处理完成的Futurefor i, future in enumerate(as_completed(future_to_file), 1):
result = future.result()
if result["success"]:
all_data.extend(result["rows"])
success_count += 1else:
print(f" [{i}/{len(pdf_files)}] 失败: {result['file']} - {result['error']}")
# 进度提示if i % 10 == 0or i == len(pdf_files):
elapsed = time.time() - start_time
print(f" 进度: {i}/{len(pdf_files)} ({i*100//len(pdf_files)}%), 用时{elapsed:.1f}秒")
elapsed = time.time() - start_time
print(f"\n并发提取完成!")
print(f"成功: {success_count}/{len(pdf_files)}, 总行数: {len(all_data)}, 用时: {elapsed:.1f}秒")
return all_data
# 使用示例# data = parallel_extract("./invoices/", max_workers=8)
import pdfplumber
import re
import json
defextract_invoice_info(pdf_path):
"""从PDF发票中提取关键信息"""
info = {}
with pdfplumber.open(pdf_path) as pdf:
page = pdf.pages[0]
text = page.extract_text()
# 提取发票号码(通常格式如:12345678)
invoice_no_match = re.search(r"发票号码[::]\s*(\d+)", text)
if invoice_no_match:
info["发票号码"] = invoice_no_match.group(1)
# 提取开票日期
date_match = re.search(r"开票日期[::]\s*(\d{4}年\d{1,2}月\d{1,2}日)", text)
if date_match:
info["开票日期"] = date_match.group(1)
# 提取购买方信息
buyer_name_match = re.search(r"购买方[名称]*[::]\s*([^\n]+)", text)
if buyer_name_match:
info["购买方名称"] = buyer_name_match.group(1).strip()
# 提取金额
amount_match = re.search(r"价税合计[((]大写[))]\s*[^0-9]*?([\d,]+\.\d{2})", text)
ifnot amount_match:
amount_match = re.search(r"价税合计[::]\s*([\d,]+\.\d{2})", text)
if amount_match:
info["价税合计"] = amount_match.group(1)
# 提取商品明细(表格区域)
tables = page.find_tables()
if tables:
# 通常第一个表格包含商品明细
table_data = tables[0].extract()
if len(table_data) > 1:
items = []
for row in table_data[1:]:
if row[0] and row[0].strip():
items.append({
"名称": row[0].strip(),
"数量": row[2].strip() if len(row) > 2else"",
"金额": row[4].strip() if len(row) > 4else"",
})
info["商品明细"] = items
return info
# 使用示例# invoice = extract_invoice_info("invoice_2025.pdf")# print(json.dumps(invoice, ensure_ascii=False, indent=2))
案例二:银行对账单解析
银行对账单是另一种常见的结构化PDF文档,通常包含多页的表格数据(交易日期、摘要、交易金额、余额等)。对账单的难点在于:跨页表格的处理(表头在每页重复出现),金额的数字格式(负数和正数的表示方式,千分位分隔符),日期的多种格式。本案例展示了如何构建一个健壮的对账单解析器,将 PDF 格式的银行流水转换为 Excel 格式的结构化数据。
import pdfplumber
import pandas as pd
import re
defparse_bank_statement(pdf_path):
"""解析银行对账单PDF"""
all_transactions = []
header = Nonewith pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
tables = page.find_tables({
"vertical_strategy": "lines",
"horizontal_strategy": "lines"
})
for table in tables:
data = table.extract()
ifnot data or len(data) < 2:
continueif header isNone:
header = data[0]
transactions = data[1:]
else:
# 检查并跳过重复表头if data[0] == header:
transactions = data[1:]
else:
transactions = data
for row in transactions:
# 清理每行数据
cleaned = [cell.strip() if cell else""for cell in row]
# 跳过空行和汇总行if cleaned[0] andnot any(kw in cleaned[0]
for kw in ["合计", "上期", "本期", "余额"]):
all_transactions.append(cleaned)
# 转换为 DataFrameif header and all_transactions:
df = pd.DataFrame(all_transactions, columns=header)
# 清理金额列:去除货币符号和逗号
amount_cols = [col for col in df.columns if"金额"in col or"余额"in col]
for col in amount_cols:
df[col] = df[col].str.replace(r"[¥$,\s]", "", regex=True)
df[col] = pd.to_numeric(df[col], errors="coerce")
# 日期列转换
date_cols = [col for col in df.columns if"日期"in col]
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors="coerce")
return df
return pd.DataFrame()
# 使用示例# df = parse_bank_statement("bank_statement_2025Q1.pdf")# df.to_excel("bank_statement_cleaned.xlsx", index=False)