一、批量处理概述
在日常办公中,Excel文件的批量处理是最常见也是最耗时的任务之一。无论是财务部门的月度报表合并、销售团队的客户数据拆分,还是质量部门的检测数据比对,Excel批量处理都能大幅提升工作效率。本专题将系统性地介绍如何使用Python实现Excel文件的批量合并、拆分与数据比对,帮助读者掌握自动化处理的完整技能体系。
批量处理Excel文件的核心应用场景非常广泛。在财务报表领域,每月需要汇总各分公司提交的利润表、资产负债表;在销售管理中,需要按区域、按产品线拆分总部的销售数据;在库存管理环节,需要比对月初和月末的库存盘点数据,找出差异商品。这些场景如果完全依赖手动操作,不仅耗时而且极易出错。
技术选型对比
在Python生态中,处理Excel文件主要有三个主流库:pandas、openpyxl和xlwings。pandas是数据分析的核心库,擅长处理结构化数据,支持批量读写Excel文件,内置强大的数据聚合和分组功能,适合大规模数据处理,但不擅长样式控制和单元格级别的精细操作。openpyxl专注于Excel文件的读写,支持xlsx格式,能够精细控制单元格样式、颜色、合并单元格等,适合需要对Excel格式进行精确控制的场景,但处理速度在大文件时偏慢。xlwings则提供了与Excel应用程序的接口,可以调用Excel自身的功能,适合需要Excel引擎计算能力的场景,但依赖Excel安装环境。
选型建议:纯数据处理首选pandas(简洁高效);需要保留或设置格式选openpyxl(精细控制);需要Excel公式计算或VBA交互选xlwings(功能完整)。实际项目中常将pandas和openpyxl结合使用。
文件遍历模式
批量处理的第一步是遍历目标目录下的所有Excel文件。Python的glob模块和os模块提供了便捷的文件遍历能力。glob支持通配符模式匹配,可以快速筛选出指定扩展名的文件列表。os模块则提供了更底层的目录遍历和路径操作功能。此外,pathlib作为面向对象的路径处理库,在代码可读性方面更具优势。
import glob
import os
import pandas as pd
from pathlib import Path
# 方式一:glob通配符匹配
files = glob.glob("data/*.xlsx")
print(f"找到 {len(files)} 个Excel文件")
# 方式二:pathlib面向对象方式
data_dir = Path("data")
files = list(data_dir.glob("*.xlsx"))
# 方式三:递归搜索子目录(包含.xls和.xlsx)
all_files = []
for root, dirs, fnames in os.walk("data"):
for f in fnames:
if f.endswith((".xlsx", ".xls")):
all_files.append(os.path.join(root, f))
在获取到文件列表后,建议先对文件进行校验和过滤。例如检查文件是否为有效的Excel文件、是否被加密保护、工作表名称是否一致等。这一步骤虽然看似简单,但在实际批量处理中常常是各种异常的来源。
import openpyxl
def validate_excel(filepath):
"""验证Excel文件是否可读取"""
try:
wb = openpyxl.load_workbook(filepath, read_only=True)
sheets = wb.sheetnames
wb.close()
return {"valid": True, "sheets": sheets}
except Exception as e:
return {"valid": False, "error": str(e)}
# 批量验证文件
valid_files = []
for f in files:
result = validate_excel(f)
if result["valid"]:
valid_files.append(f)
print(f"[OK] {f}")
else:
print(f"[FAIL] {f}: {result['error']}")
经验之谈:在实际项目中,建议在批量处理前先进行"干运行"——只遍历和验证文件,不执行实际的数据读写操作。这可以帮助提前发现文件路径错误、权限问题、格式不兼容等异常,避免处理到一半时程序崩溃。
二、同结构Excel合并
同结构合并是最常见的Excel合并场景。所谓同结构,是指多个Excel文件的工作表名称相同、列结构(列名和列顺序)一致。例如,各分公司每月提交的销售报表,表头结构完全相同,只是数据内容不同。这种情况下,合并操作的核心逻辑就是"纵向堆叠"——将所有文件的数据逐行追加到一个统一的数据框中。
pandas库的concat函数是实现同结构合并的最佳工具。它能够将多个DataFrame纵向拼接,并自动处理索引。在实际操作中,需要注意几个关键问题:列名中的前后空格可能导致匹配失败,不同文件中列的先后顺序可能意外不一致,数据类型的隐式转换可能导致精度丢失等。因此在合并前需要对列名进行标准化处理。
import pandas as pd
import glob
def merge_same_structure(file_list, sheet_name="Sheet1"):
"""
合并同结构Excel文件
参数:
file_list: 文件路径列表
sheet_name: 要合并的工作表名称
返回:合并后的DataFrame
"""
df_list = []
for filepath in file_list:
# 读取单个文件
df = pd.read_excel(filepath, sheet_name=sheet_name)
# 来源标注
df["来源文件"] = filepath.split("\\")[-1]
df_list.append(df)
# 纵向合并所有DataFrame
result = pd.concat(df_list, ignore_index=True)
return result
# 使用示例
files = glob.glob("销售报表/*.xlsx")
merged_df = merge_same_structure(files, sheet_name="月度数据")
print(merged_df.head())
print(f"合并后总行数: {len(merged_df)}")
合并后的数据验证同样重要。验证内容通常包括:总行数是否等于各文件行数之和、关键列是否存在空值、数值列的汇总值是否与各文件加总一致、是否有重复的主键等。通过系统化的验证流程可以确保合并结果的准确性。
def validate_merged_data(merged_df, file_list):
"""验证合并后的数据完整性"""
print("="*50)
print("数据验证报告")
print("="*50)
# 1. 基本统计
total_rows = len(merged_df)
print(f"合并后总行数: {total_rows}")
# 2. 各来源文件行数分布
if "来源文件" in merged_df.columns:
counts = merged_df["来源文件"].value_counts()
print("\n各文件数据分布:")
for file_name, count in counts.items():
print(f" {file_name}: {count} 行")
# 3. 检查关键列空值
null_counts = merged_df.isnull().sum()
null_cols = null_counts[null_counts > 0]
if len(null_cols) > 0:
print("\n警告:以下列存在空值:")
for col, cnt in null_cols.items():
print(f" {col}: {cnt} 个空值")
# 4. 重复行检查
dup_count = merged_df.duplicated().sum()
if dup_count > 0:
print(f"\n警告:发现 {dup_count} 行完全重复数据")
print("\n验证完成!")
关键提示:同结构合并虽然逻辑简单,但实际工作中经常遇到"伪同结构"文件——表头看起来相同但实际存在细微差异(如全角/半角符号、空格、大小写等)。建议在合并前先对列名进行统一处理(strip、replace、lower等操作),或者建立列名映射表。
三、异结构Excel合并
异结构合并是比同结构合并更复杂的场景。所谓异结构,是指不同Excel文件中的工作表列名不完全一致,可能列数不同、列名不同、数据类型不同。例如,合并不同年份的销售报表时,2023年的报表可能有"销售区域"列而2024年改名为"地区",或者新增了"渠道来源"列。处理这类问题的核心思路是"列名映射与统一"。
列名映射是异结构合并的第一步。我们需要建立一个映射字典,将不同文件中的列名统一映射到标准列名。例如,{"销售区域": "地区", "区域": "地区", "Region": "地区"}表示这三个不同的列名都映射到标准列名"地区"。这种方式能够灵活处理各种命名差异。在处理完列名映射后,不同文件会拥有统一的标准列名体系,此时就可以使用concat进行合并了。
def merge_diff_structure(file_list, column_mapping, sheet_name="Sheet1"):
"""
合并异结构Excel文件(通过列名映射)
参数:
file_list: 文件路径列表
column_mapping: 列名映射字典,格式:{标准列名: [该标准列对应的所有可能原始列名]}
sheet_name: 工作表名称
"""
all_data = []
for filepath in file_list:
df = pd.read_excel(filepath, sheet_name=sheet_name)
original_cols = df.columns.tolist()
# 反向映射:从原始列名 -> 标准列名
reverse_map = {}
for std_col, alias_list in column_mapping.items():
for alias in alias_list:
if alias in original_cols:
reverse_map[alias] = std_col
# 重命名列
df = df.rename(columns=reverse_map)
# 只保留标准列名集合中的列
std_columns = list(column_mapping.keys())
for col in std_columns:
if col not in df.columns:
df[col] = None # 缺失列填充空值
df["来源文件"] = filepath.split("\\")[-1]
all_data.append(df[std_columns + ["来源文件"]])
result = pd.concat(all_data, ignore_index=True)
return result
# 使用示例:列名映射配置
mapping = {
"日期": ["日期", "交易日期", "Date"],
"产品名称": ["产品", "商品名称", "Product"],
"销售额": ["金额", "销售金额", "成交额", "Amount"],
"数量": ["数量", "销量", "Qty", "Quantity"],
"地区": ["地区", "区域", "销售区域", "Region"],
"销售员": ["销售员", "业务员", "Sales"]
}
files = glob.glob("销售数据/*.xlsx")
merged = merge_diff_structure(files, mapping)
print(merged.head())
数据类型兼容也是异结构合并中需要特别关注的方面。不同文件中同一列的数据类型可能不同:一个文件中"销售额"是浮点数,另一个文件中可能是字符串(含逗号或货币符号);一个文件中"日期"是datetime类型,另一个可能是字符串类型。在合并前统一数据类型可以避免许多隐蔽的错误。
def unify_dtypes(df, type_config):
"""统一DataFrame中各列的数据类型
type_config示例:
{
"销售额": "float",
"日期": "datetime",
"数量": "int"
}
"""
for col, dtype in type_config.items():
if col not in df.columns:
continue
if dtype == "float":
# 去除货币符号和逗号,再转为浮点数
df[col] = df[col].astype(str).str.replace(r"[¥$,]", "", regex=True)
df[col] = pd.to_numeric(df[col], errors="coerce")
elif dtype == "datetime":
df[col] = pd.to_datetime(df[col], errors="coerce")
elif dtype == "int":
# 先转浮点再转整型,避免空值转换错误
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int)
return df
最佳实践:建议在异结构合并项目中维护一个"数据字典"文件(JSON或YAML格式),其中记录所有标准列名、可能出现的别名、期望的数据类型和默认值。这样当新增数据源时,只需更新配置文件,不需要修改核心代码。
四、Excel数据拆分
数据拆分是合并的逆操作,也是日常办公中的高频需求。常见的拆分场景包括:按列值拆分(如将全国销售数据按省份拆分为多个文件)、按行数均分(如将10万行数据拆分为每1万行一个文件)、按条件分组拆分(如将订单数据分为已付款/未付款两组)。Python的pandas库和openpyxl库可以灵活应对这些场景。
按列值拆分是最常用的拆分方式。例如,有一份全国客户数据表,需要按"省份"列将数据拆分为各省的独立文件。使用pandas的groupby函数可以方便地实现按列分组,然后遍历每个分组写入独立的Excel文件。在拆分时还可以控制每个文件的列范围、排序规则、是否包含表头等细节参数。
import pandas as pd
from pathlib import Path
def split_by_column_value(filepath, split_col, output_dir="output"):
"""
按指定列的值拆分Excel文件为多个文件
参数:
filepath: 源文件路径
split_col: 用于拆分的列名
output_dir: 输出目录
"""
df = pd.read_excel(filepath)
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 遍历每个分组
for value, group in df.groupby(split_col):
# 生成文件名:用拆分列的值命名
filename = f"{value}.xlsx"
filepath_out = Path(output_dir) / filename
# 去除拆分列中可能导致文件名非法字符
safe_name = "".join(c for c in str(value) if c.isalnum() or c in ("_", "-"))
filepath_out = Path(output_dir) / f"{safe_name}.xlsx"
group.to_excel(filepath_out, index=False)
print(f"已创建: {filepath_out} ({len(group)} 行)")
print(f"\n拆分完成!共 {df[split_col].nunique()} 个文件")
# 使用示例:按省份拆分
split_by_column_value("全国客户数据.xlsx", split_col="省份")
按行数均分常用于处理超大Excel文件的场景。当单个Excel文件的行数超过Excel的行数上限(约104万行)或者超出打开软件的处理能力时,需要将文件拆分为多个较小的文件。按行数均分不依赖数据的语义,只是机械地将数据切分成等长的片段,逻辑简单但非常实用。
def split_by_row_count(filepath, rows_per_file=50000):
"""将Excel文件按指定行数拆分为多个小文件"""
df = pd.read_excel(filepath)
total_rows = len(df)
file_count = (total_rows + rows_per_file - 1) // rows_per_file
print(f"总行数: {total_rows},每个文件 {rows_per_file} 行,共 {file_count} 个文件")
for i in range(file_count):
start = i * rows_per_file
end = min((i + 1) * rows_per_file, total_rows)
chunk = df.iloc[start:end]
filename = f"split_part_{i+1:03d}.xlsx"
chunk.to_excel(filename, index=False)
print(f"已创建: {filename} ({len(chunk)} 行)")
# 按条件分组拆分(例如按数量区间)
def split_by_condition(filepath, conditions):
"""
按自定义条件拆分
conditions示例:
{
"大客户": df[df["金额"] > 100000],
"中客户": df[(df["金额"] >= 10000) & (df["金额"] <= 100000)],
"小客户": df[df["金额"] < 10000]
}
"""
df = pd.read_excel(filepath)
for name, mask in conditions.items():
subset = df[mask].copy()
if len(subset) > 0:
subset.to_excel(f"{name}.xlsx", index=False)
print(f"已创建: {name}.xlsx ({len(subset)} 行)")
除了拆分为多个文件,有时还需要将数据拆分为同一个Excel文件中的多个工作表(Sheet)。这种方式的优点是所有数据集中在同一个文件中,便于分发和管理。使用pandas的ExcelWriter可以方便地实现多Sheet写入。
def split_to_multi_sheets(filepath, split_col, output_file="split_result.xlsx"):
"""按列值拆分到同一个文件的不同Sheet"""
df = pd.read_excel(filepath)
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
for value, group in df.groupby(split_col):
# Sheet名最多31个字符,超出会报错
sheet_name = str(value)[:31]
group.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"已写入Sheet: {sheet_name} ({len(group)} 行)")
print(f"\n已保存到文件: {output_file}")
注意:Excel的Sheet名称最长31个字符,且不能包含\/?*[]等特殊字符。当按列值拆分为Sheet时,务必对Sheet名称做截断和消毒处理,否则openpyxl会抛出异常。同时,如果分组数量过多(超过Excel允许的Sheet上限),建议改用多文件方式拆分。
五、数据比对基础
数据比对是Excel处理的另一大核心需求。无论是两版数据之间的差异检查、库存盘点前后的数据核对,还是系统导出数据与手工台账的比对验证,都需要高效、可靠的数据比对方案。手工比对不仅效率低下,而且容易遗漏差异,特别是在数据量较大的情况下。
行级逐字段比对是最基本也是最精确的比对方式。它的核心思路是:将两个DataFrame按照主键对齐,然后逐行、逐列比较对应单元格的值是否一致。pandas的compare函数提供了内置的比对能力,可以快速找出两个DataFrame之间的差异,并以结构化的方式呈现。对于比对结果,可以进一步生成差异报告,包括差异总数、差异分布、差异明细等统计信息。
import pandas as pd
import numpy as np
def basic_row_compare(df_old, df_new, key_cols):
"""
行级逐字段比对两个DataFrame
参数:
df_old: 旧数据
df_new: 新数据
key_cols: 主键列名列表(用于对齐数据行)
返回:差异摘要和差异明细
"""
# 按主键对齐
merged = df_old.merge(
df_new,
on=key_cols,
how="outer",
suffixes=("_旧", "_新"),
indicator=True
)
# 差异分类统计
stats = merged["_merge"].value_counts()
print("=== 差异统计 ===")
print(f"两表一致: {stats.get('both', 0)} 行")
print(f"仅在旧表: {stats.get('left_only', 0)} 行")
print("(可能为已删除数据)")
print(f"仅在新表: {stats.get('right_only', 0)} 行")
print("(可能为新增数据)")
# 找出两表都有但数值不同的行
both = merged[merged["_merge"] == "both"]
value_cols_old = [c for c in both.columns if c.endswith("_旧")]
value_cols_new = [c for c in both.columns if c.endswith("_新")]
diff_count = 0
for old_col, new_col in zip(value_cols_old, value_cols_new):
# 处理NaN的比对
mismatches = ~(both[old_col].fillna("__NULL__") == both[new_col].fillna("__NULL__"))
col_diff = mismatches.sum()
if col_diff > 0:
print(f"列 '{old_col.replace('_旧', '')}' 差异: {col_diff} 处")
diff_count += col_diff
print(f"\n总计差异: {diff_count} 处")
return merged
Key值关联比对是行级比对的进阶版本,不仅比较两表共有的行,还处理"仅在旧表"和"仅在新表"的情况。这种比对方式更全面地反映了数据的变化全貌,适合数据同步校验和版本变更审计等场景。
def compare_with_report(df_old, df_new, key_cols, output_file="差异报告.xlsx"):
"""带主键关联的完整比对,输出差异报告到Excel"""
# 确保Key列存在
for col in key_cols:
assert col in df_old.columns and col in df_new.columns
# 设置主键索引
old_idx = df_old.set_index(key_cols)
new_idx = df_new.set_index(key_cols)
# 找出变化行
diff_mask = old_idx.fillna("__NULL__") != new_idx.fillna("__NULL__")
# 只有新旧都有的行才比较值
common_keys = old_idx.index.intersection(new_idx.index)
diffs = []
for key in common_keys:
old_row = old_idx.loc[key]
new_row = new_idx.loc[key]
for col in old_row.index:
val_old = old_row[col]
val_new = new_row[col]
if str(val_old) != str(val_new):
diffs.append({
"主键": key,
"字段": col,
"旧值": val_old,
"新值": val_new
})
# 输出差异报告
diff_df = pd.DataFrame(diffs)
with pd.ExcelWriter(output_file) as writer:
diff_df.to_excel(writer, sheet_name="差异明细", index=False)
print(f"差异报告已保存: {output_file}")
return diff_df
比对原则:数据比对应遵循"从粗到精"的策略——先比对行数级(总量是否一致),再比对汇总级(求和、均值是否一致),最后比对行级(具体哪些行、哪些列有差异)。这种分层比对可以快速定位问题范围,避免一开始就陷入海量的细节比对中。
六、高级比对功能
在实际业务中,数据比对的需求往往比简单的"相等/不相等"二元判断复杂得多。例如,两份数据中同一款产品的名称可能略有差异("iPhone 15" vs "iPhone15");月度数据的变化追踪需要比对连续12个月的版本;或者需要在比对完成后自动生成格式化的差异报告。这些高级比对场景需要更灵活的算法和更复杂的流程设计。
模糊匹配比对是处理"近似相同但不等"场景的核心技术。其原理是使用字符串相似度算法(如Levenshtein距离、余弦相似度、Jaccard系数等)来判断两个字符串的匹配程度。在Python中,fuzzywuzzy库(基于difflib)和rapidfuzz库都提供了高效的模糊匹配功能。当匹配度超过设定的阈值(如85%)时,视为匹配成功。
from rapidfuzz import fuzz
def fuzzy_compare(df_old, df_new, key_col, threshold=85):
"""
基于模糊匹配的数据比对
参数:
threshold: 匹配阈值(0-100),越高要求越严格
"""
old_keys = df_old[key_col].dropna().unique()
new_keys = df_new[key_col].dropna().unique()
matches = []
unmatched_old = []
unmatched_new = list(new_keys)
for old_key in old_keys:
best_score = 0
best_match = None
for new_key in unmatched_new:
score = fuzz.token_sort_ratio(str(old_key), str(new_key))
if score > best_score:
best_score = score
best_match = new_key
if best_score >= threshold:
matches.append({
"旧值": old_key,
"新值": best_match,
"匹配度": best_score
})
unmatched_new.remove(best_match)
else:
unmatched_old.append(old_key)
print(f"完美匹配: {len(matches)}")
print(f"旧表未匹配: {len(unmatched_old)}")
print(f"新表未匹配: {len(unmatched_new)}")
return {"matches": matches, "unmatched_old": unmatched_old,
"unmatched_new": unmatched_new}
多版本变化追踪适用于需要持续监控数据变化的场景。例如,每月的库存数据变化追踪、财务数据的审计轨迹等。实现方式是将多个时间版本的数据进行两两比对,记录每一次的变化状态(新增、删除、修改、无变化),最终生成一个完整的变化历史。通过pivot_table可以将变化历史汇总为直观的"快照"视图。
def track_multi_version_changes(version_files, key_cols, value_cols):
"""
多版本数据变化追踪
参数:
version_files: 按时间顺序的文件列表,如["v1.xlsx", "v2.xlsx", "v3.xlsx"]
key_cols: 主键列
value_cols: 需要追踪的数值列
返回:所有版本的变化历史
"""
versions = {}
for i, f in enumerate(version_files):
versions[i] = pd.read_excel(f).set_index(key_cols)
records = []
for col in value_cols:
all_keys = set()
for df in versions.values():
all_keys.update(df.index)
for key in all_keys:
values = []
for i in range(len(versions)):
val = versions[i].loc[key, col] if key in versions[i].index else None
values.append(val)
# 判断变化类型
non_null = [v for v in values if v is not None]
if len(non_null) == 1 and non_null[0] is not None:
change_type = "新增"
elif len(set(non_null)) == 1:
change_type = "无变化"
else:
change_type = "修改"
records.append({
"主键": key,
"字段": col,
"变化类型": change_type,
"版本价值": values
})
return pd.DataFrame(records)
增量更新和比对报告生成是高级比对功能的最后环节。增量更新是指在比对完成后,只将有差异的数据输出到结果文件中,而不是输出全部数据。比对报告通常以HTML或Excel格式输出,包含差异统计摘要、差异明细表、差异分布图表等。使用pandas的Styler功能可以在Excel中实现差异高亮(绿色标注新增行、红色标注删除行、黄色标注修改行)。
def generate_diff_report(df_old, df_new, key_cols, output_html=
"差异报告.html"):
"""生成HTML格式的差异比对报告,包含高亮显示"""
# 比对并生成差异标记
old_set = df_old.set_index(key_cols)
new_set = df_new.set_index(key_cols)
html_parts = [
"""
"""]
# 统计摘要
all_keys = old_set.index.union(new_set.index)
added = new_set.index.difference(old_set.index)
deleted = old_set.index.difference(new_set.index)
common = old_set.index.intersection(new_set.index)
html_parts.append(
f"""
差异报告摘要
总记录数(旧): {len(old_set)}
总记录数(新): {len(new_set)}
新增: {len(added)}
删除: {len(deleted)}
修改: 待逐行比对
""")
# ... 逐行比对并生成HTML表格 ...
html_parts.append(
"")
with open(output_html,
"w", encoding=
"utf-8")
as f:
f.write(
"\n".join(html_parts))
print(
f"差异报告已生成: {output_html}")
进阶技巧:在大规模数据比对中,建议采用"分桶"策略——先将数据按某个维度(如日期、区域)分成若干桶,再对每个桶单独进行比对。这种策略一方面可以减少单次比对的内存消耗,另一方面便于并行处理和逐步排查差异。
七、大数据量处理
当Excel文件的规模达到百万行级别时,直接使用pandas的read_excel一次性读入内存可能会导致内存不足(MemoryError)。大数据量处理需要采用特殊的策略和技巧。核心思路包括:分块读取(Chunking)、数据类型优化、内存监控、以及断点续传机制。
分块读取是处理大文件的首要策略。虽然pandas的read_excel不直接支持chunksize参数(read_csv支持),但可以通过openpyxl的只读模式(read_only=True)逐行读取数据,再分批转换为DataFrame。另外,也可以先使用Python原生的csv模块或迭代器模式逐行处理数据,避免一次性加载整个文件到内存中。
import openpyxl
import pandas as pd
from itertools import islice
def read_excel_in_chunks(filepath, sheet_name=None, chunk_size=10000):
"""
分块读取Excel大文件(生成器模式)
每次返回一个chunk_size行的DataFrame
"""
wb = openpyxl.load_workbook(filepath, read_only=True)
if sheet_name is None:
sheet = wb.active
else:
sheet = wb[sheet_name]
# 读取表头
rows_iter = sheet.iter_rows(values_only=True)
headers = next(rows_iter)
while True:
chunk = list(islice(rows_iter, chunk_size))
if not chunk:
break
df = pd.DataFrame(chunk, columns=headers)
yield df
wb.close()
# 使用示例:逐块处理大文件
total_rows = 0
for chunk_df in read_excel_in_chunks("超大文件.xlsx", chunk_size=50000):
total_rows += len(chunk_df)
# 对chunk进行处理...
print(f"已处理 {total_rows} 行...")
内存优化是处理大数据量的另一个关键方面。pandas默认的数据类型(如int64、float64)占用较多内存。通过将列的数据类型降级为更经济的类型(如int32、float32,甚至categorical类型),可以显著降低内存占用。此外,及时删除不再使用的中间变量、使用del语句释放内存、以及调用gc.collect()强制垃圾回收,都是有效的内存管理手段。
import pandas as pd
import numpy as np
import gc
def optimize_dtypes(df):
"""智能优化DataFrame的数据类型以节省内存"""
start_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"优化前内存: {start_mem:.2f} MB")
for col in df.columns:
col_type = df[col].dtype
if col_type != "object":
c_min, c_max = df[col].min(), df[col].max()
if "int" in str(col_type):
if c_min >= 0:
if c_max < 2**8:
df[col] = df[col].astype(np.uint8)
elif c_max < 2**16:
df[col] = df[col].astype(np.uint16)
elif c_max < 2**32:
df[col] = df[col].astype(np.uint32)
else:
if c_min > -2**7 and c_max < 2**7:
df[col] = df[col].astype(np.int8)
elif c_min > -2**15 and c_max < 2**15:
df[col] = df[col].astype(np.int16)
elif c_min > -2**31 and c_max < 2**31:
df[col] = df[col].astype(np.int32)
elif "float" in str(col_type):
df[col] = df[col].astype(np.float32)
else:
# 对象类型:如果唯一值少,转为category
num_unique = df[col].nunique()
if num_unique < len(df) * 0.5:
df[col] = df[col].astype("category")
end_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"优化后内存: {end_mem:.2f} MB")
print(f"内存节省: {(start_mem - end_mem) / start_mem * 100:.1f}%")
return df
# 及时清理内存
del df
gc.collect()
进度显示和断点续传是提升大数据量处理体验的重要功能。使用tqdm库可以显示处理进度条;通过定期记录已处理的行数或文件索引,可以在程序中断后从断点处继续处理,避免从头开始。
from tqdm import tqdm
import json
def process_with_resume(file_list, checkpoint_file="checkpoint.json"):
"""带断点续传功能的批量处理"""
# 读取断点
start_index = 0
try:
with open(checkpoint_file, "r") as f:
checkpoint = json.load(f)
start_index = checkpoint.get("last_index", 0)
print(f"从断点恢复: 已处理到第 {start_index} 个文件")
except FileNotFoundError:
print("未发现断点,从头开始处理")
# 带进度条的处理循环
for i in tqdm(range(start_index, len(file_list)), desc="处理进度"):
try:
# 模拟文件处理
df = pd.read_excel(file_list[i])
# ... 处理逻辑 ...
# 保存断点
with open(checkpoint_file, "w") as f:
json.dump({"last_index": i + 1}, f)
except Exception as e:
print(f"处理文件 {file_list[i]} 时出错: {e}")
continue
print("全部处理完成!")
性能经验:在实践中有三条黄金法则——(1)能用CSV就不要用Excel:相同数据量下CSV读写速度是Excel的5-10倍;(2)能用筛选就不要全表读:通过openpyxl的iter_rows只读需要的行列,避免加载全部数据;(3)能用SQL就不要用Excel:数据量超过100万行时,建议先导入SQLite或数据库再处理。
八、实战案例
理论方法需要通过实战案例来巩固。本节提供三个典型的企业级应用案例,涵盖合并、拆分、比对三大核心功能,这些案例均来自真实业务场景,可直接复用或根据实际需求调整。
案例一:月度销售报表合并
某连锁企业每月从全国30个门店收集销售报表,每个门店提交的Excel文件包含"销售明细"工作表,列结构一致(交易日期、门店编号、商品编码、商品名称、数量、单价、金额、收银员)。月末需要将所有门店的报表合并为一个总表,用于管理层数据分析。
from pathlib import Path
import pandas as pd
import glob
from datetime import datetime
def monthly_sales_merge(report_dir, month_str):
"""
合并指定月份的销售报表
参数:
report_dir: 报表目录(包含各门店子目录)
month_str: 月份字符串,如 "202603"
"""
pattern = f"{report_dir}/{month_str}/*/*.xlsx"
files = glob.glob(pattern)
print(f"找到 {len(files)} 个报表文件")
all_data = []
for f in files:
df = pd.read_excel(f, sheet_name="销售明细")
# 数据清洗
df = df.dropna(subset=["交易日期", "商品编码"])
df["金额"] = pd.to_numeric(df["金额"], errors="coerce").fillna(0)
# 标注来源
store_name = Path(f).parent.name
df["门店"] = store_name
all_data.append(df)
# 合并
result = pd.concat(all_data, ignore_index=True)
# 输出汇总统计
summary = result.groupby("门店").agg(
总销售额=("金额", "sum"),
总笔数=("交易日期", "count")
).reset_index()
# 保存
output_file = f"销售报表汇总_{month_str}.xlsx"
with pd.ExcelWriter(output_file) as writer:
result.to_excel(writer, sheet_name="合并明细", index=False)
summary.to_excel(writer, sheet_name="门店汇总", index=False)
print(f"合并完成!总销售额: {summary['总销售额'].sum():.2f} 元")
print(f"总计交易笔数: {summary['总笔数'].sum()}")
return result
# 运行
merged = monthly_sales_merge("sales_reports", "202603")
案例二:按地区拆分客户数据
某公司总部的CRM系统导出了一份全国客户数据表(10万行),需要按"省份"列拆分为各个地区的客户名单,分发给各区域销售团队。每个区域的文件需要按客户名称排序,并包含一个统计Sheet说明该区域的客户分布情况。
def split_customers_by_region(source_file, region_col="省份"):
"""将客户数据按省份拆分,每个文件含统计信息"""
df = pd.read_excel(source_file)
# 按省份分组
for region, group in df.groupby(region_col):
group = group.sort_values("客户名称")
# 城市级别统计
city_stats = group["城市"].value_counts().reset_index()
city_stats.columns = ["城市", "客户数"]
filename = f"客户数据_{region}.xlsx"
with pd.ExcelWriter(filename) as writer:
group.to_excel(writer, sheet_name="客户名单", index=False)
city_stats.to_excel(writer, sheet_name="城市分布", index=False)
# 写入统计摘要
summary = pd.DataFrame([
["省份", region],
["客户总数", len(group)],
["覆盖城市", group["城市"].nunique()],
], columns=["项目", "值"])
summary.to_excel(writer, sheet_name="摘要", index=False)
print(f"已生成: {filename} ({len(group)} 条)")
print(f"拆分完成!共 {df[region_col].nunique()} 个区域文件")
split_customers_by_region("全国客户数据.xlsx")
案例三:库存盘点前后数据比对
某仓库每月进行库存盘点。有两个Excel文件:"系统库存.xlsx"(ERP系统导出)和"盘点库存.xlsx"(仓库人员实地盘点结果)。需要通过比对找出差异商品,并生成差异报告,包含差异金额汇总和重点差异商品标记。
def inventory_compare(system_file, physical_file, output_file="库存差异报告.xlsx"):
"""库存盘点数据比对,生成差异报告"""
sys_df = pd.read_excel(system_file)
phy_df = pd.read_excel(physical_file)
# 确保关键列存在
required_cols = ["商品编码", "商品名称", "库存数量", "单价"]
for col in required_cols:
assert col in sys_df.columns and col in phy_df.columns
# 按主键对齐
merged = sys_df.merge(
phy_df,
on=["商品编码", "商品名称"],
how="outer",
suffixes=("_系统", "_盘点"),
indicator=True
)
# 计算差异
merged["数量差异"] = merged["库存数量_盘点"].fillna(0) - merged["库存数量_系统"].fillna(0)
merged["金额差异"] = merged["数量差异"] * merged["单价_系统"].fillna(merged["单价_盘点"])
# 标记差异级别
def mark_level(qty_diff, amt_diff):
if abs(amt_diff) >= 10000:
return "重大"
elif abs(qty_diff) >= 10:
return "中等"
elif qty_diff != 0:
return "一般"
else:
return "一致"
merged["差异级别"] = merged.apply(
lambda row: mark_level(row["数量差异"], row["金额差异"]), axis=1
)
# 筛选出有差异的
diff_only = merged[merged["差异级别"] != "一致"].copy()
# 汇总统计
summary = pd.DataFrame([
["系统记录数", len(sys_df)],
["盘点记录数", len(phy_df)],
["差异商品数", len(diff_only)],
["差异总金额", diff_only["金额差异"].sum()],
], columns=["统计项", "值"])
# 输出报告
with pd.ExcelWriter(output_file) as writer:
summary.to_excel(writer, sheet_name="汇总", index=False)
diff_only.to_excel(writer, sheet_name="差异明细", index=False)
print(f"库存差异报告已生成: {output_file}")
print(f"差异商品数: {len(diff_only)},差异总金额: {diff_only['金额差异'].sum():.2f} 元")
return diff_only
inventory_compare("系统库存.xlsx", "盘点库存.xlsx")
学以致用:以上三个案例代表了Excel批量处理的三大基本场景——合并(聚)、拆分(散)、比对(校)。在实际工作中,这三个功能常常组合使用。例如,先合并各门店报表,再按产品线拆分,最后比对两个月的差异。掌握这三项核心技能,可以覆盖日常办公中90%以上的Excel自动化需求。