Excel批量合并、拆分与数据比对

Python 办公自动化专题 · 高效处理大量Excel文件的自动化方案

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, Excel批量合并, Excel拆分, 数据比对, pandas, openpyxl, 文件自动化, Python

一、批量处理概述

在日常办公中,Excel文件的批量处理是最常见也是最耗时的任务之一。无论是财务部门的月度报表合并、销售团队的客户数据拆分,还是质量部门的检测数据比对,Excel批量处理都能大幅提升工作效率。本专题将系统性地介绍如何使用Python实现Excel文件的批量合并、拆分与数据比对,帮助读者掌握自动化处理的完整技能体系。

批量处理Excel文件的核心应用场景非常广泛。在财务报表领域,每月需要汇总各分公司提交的利润表、资产负债表;在销售管理中,需要按区域、按产品线拆分总部的销售数据;在库存管理环节,需要比对月初和月末的库存盘点数据,找出差异商品。这些场景如果完全依赖手动操作,不仅耗时而且极易出错。

技术选型对比

在Python生态中,处理Excel文件主要有三个主流库:pandas、openpyxl和xlwings。pandas是数据分析的核心库,擅长处理结构化数据,支持批量读写Excel文件,内置强大的数据聚合和分组功能,适合大规模数据处理,但不擅长样式控制和单元格级别的精细操作。openpyxl专注于Excel文件的读写,支持xlsx格式,能够精细控制单元格样式、颜色、合并单元格等,适合需要对Excel格式进行精确控制的场景,但处理速度在大文件时偏慢。xlwings则提供了与Excel应用程序的接口,可以调用Excel自身的功能,适合需要Excel引擎计算能力的场景,但依赖Excel安装环境。

选型建议:纯数据处理首选pandas(简洁高效);需要保留或设置格式选openpyxl(精细控制);需要Excel公式计算或VBA交互选xlwings(功能完整)。实际项目中常将pandas和openpyxl结合使用。

文件遍历模式

批量处理的第一步是遍历目标目录下的所有Excel文件。Python的glob模块和os模块提供了便捷的文件遍历能力。glob支持通配符模式匹配,可以快速筛选出指定扩展名的文件列表。os模块则提供了更底层的目录遍历和路径操作功能。此外,pathlib作为面向对象的路径处理库,在代码可读性方面更具优势。

import glob import os import pandas as pd from pathlib import Path # 方式一:glob通配符匹配 files = glob.glob("data/*.xlsx") print(f"找到 {len(files)} 个Excel文件") # 方式二:pathlib面向对象方式 data_dir = Path("data") files = list(data_dir.glob("*.xlsx")) # 方式三:递归搜索子目录(包含.xls和.xlsx) all_files = [] for root, dirs, fnames in os.walk("data"): for f in fnames: if f.endswith((".xlsx", ".xls")): all_files.append(os.path.join(root, f))

在获取到文件列表后,建议先对文件进行校验和过滤。例如检查文件是否为有效的Excel文件、是否被加密保护、工作表名称是否一致等。这一步骤虽然看似简单,但在实际批量处理中常常是各种异常的来源。

import openpyxl def validate_excel(filepath): """验证Excel文件是否可读取""" try: wb = openpyxl.load_workbook(filepath, read_only=True) sheets = wb.sheetnames wb.close() return {"valid": True, "sheets": sheets} except Exception as e: return {"valid": False, "error": str(e)} # 批量验证文件 valid_files = [] for f in files: result = validate_excel(f) if result["valid"]: valid_files.append(f) print(f"[OK] {f}") else: print(f"[FAIL] {f}: {result['error']}")

经验之谈:在实际项目中,建议在批量处理前先进行"干运行"——只遍历和验证文件,不执行实际的数据读写操作。这可以帮助提前发现文件路径错误、权限问题、格式不兼容等异常,避免处理到一半时程序崩溃。

二、同结构Excel合并

同结构合并是最常见的Excel合并场景。所谓同结构,是指多个Excel文件的工作表名称相同、列结构(列名和列顺序)一致。例如,各分公司每月提交的销售报表,表头结构完全相同,只是数据内容不同。这种情况下,合并操作的核心逻辑就是"纵向堆叠"——将所有文件的数据逐行追加到一个统一的数据框中。

pandas库的concat函数是实现同结构合并的最佳工具。它能够将多个DataFrame纵向拼接,并自动处理索引。在实际操作中,需要注意几个关键问题:列名中的前后空格可能导致匹配失败,不同文件中列的先后顺序可能意外不一致,数据类型的隐式转换可能导致精度丢失等。因此在合并前需要对列名进行标准化处理。

import pandas as pd import glob def merge_same_structure(file_list, sheet_name="Sheet1"): """ 合并同结构Excel文件 参数: file_list: 文件路径列表 sheet_name: 要合并的工作表名称 返回:合并后的DataFrame """ df_list = [] for filepath in file_list: # 读取单个文件 df = pd.read_excel(filepath, sheet_name=sheet_name) # 来源标注 df["来源文件"] = filepath.split("\\")[-1] df_list.append(df) # 纵向合并所有DataFrame result = pd.concat(df_list, ignore_index=True) return result # 使用示例 files = glob.glob("销售报表/*.xlsx") merged_df = merge_same_structure(files, sheet_name="月度数据") print(merged_df.head()) print(f"合并后总行数: {len(merged_df)}")

合并后的数据验证同样重要。验证内容通常包括:总行数是否等于各文件行数之和、关键列是否存在空值、数值列的汇总值是否与各文件加总一致、是否有重复的主键等。通过系统化的验证流程可以确保合并结果的准确性。

def validate_merged_data(merged_df, file_list): """验证合并后的数据完整性""" print("="*50) print("数据验证报告") print("="*50) # 1. 基本统计 total_rows = len(merged_df) print(f"合并后总行数: {total_rows}") # 2. 各来源文件行数分布 if "来源文件" in merged_df.columns: counts = merged_df["来源文件"].value_counts() print("\n各文件数据分布:") for file_name, count in counts.items(): print(f" {file_name}: {count} 行") # 3. 检查关键列空值 null_counts = merged_df.isnull().sum() null_cols = null_counts[null_counts > 0] if len(null_cols) > 0: print("\n警告:以下列存在空值:") for col, cnt in null_cols.items(): print(f" {col}: {cnt} 个空值") # 4. 重复行检查 dup_count = merged_df.duplicated().sum() if dup_count > 0: print(f"\n警告:发现 {dup_count} 行完全重复数据") print("\n验证完成!")

关键提示:同结构合并虽然逻辑简单,但实际工作中经常遇到"伪同结构"文件——表头看起来相同但实际存在细微差异(如全角/半角符号、空格、大小写等)。建议在合并前先对列名进行统一处理(strip、replace、lower等操作),或者建立列名映射表。

三、异结构Excel合并

异结构合并是比同结构合并更复杂的场景。所谓异结构,是指不同Excel文件中的工作表列名不完全一致,可能列数不同、列名不同、数据类型不同。例如,合并不同年份的销售报表时,2023年的报表可能有"销售区域"列而2024年改名为"地区",或者新增了"渠道来源"列。处理这类问题的核心思路是"列名映射与统一"。

列名映射是异结构合并的第一步。我们需要建立一个映射字典,将不同文件中的列名统一映射到标准列名。例如,{"销售区域": "地区", "区域": "地区", "Region": "地区"}表示这三个不同的列名都映射到标准列名"地区"。这种方式能够灵活处理各种命名差异。在处理完列名映射后,不同文件会拥有统一的标准列名体系,此时就可以使用concat进行合并了。

def merge_diff_structure(file_list, column_mapping, sheet_name="Sheet1"): """ 合并异结构Excel文件(通过列名映射) 参数: file_list: 文件路径列表 column_mapping: 列名映射字典,格式:{标准列名: [该标准列对应的所有可能原始列名]} sheet_name: 工作表名称 """ all_data = [] for filepath in file_list: df = pd.read_excel(filepath, sheet_name=sheet_name) original_cols = df.columns.tolist() # 反向映射:从原始列名 -> 标准列名 reverse_map = {} for std_col, alias_list in column_mapping.items(): for alias in alias_list: if alias in original_cols: reverse_map[alias] = std_col # 重命名列 df = df.rename(columns=reverse_map) # 只保留标准列名集合中的列 std_columns = list(column_mapping.keys()) for col in std_columns: if col not in df.columns: df[col] = None # 缺失列填充空值 df["来源文件"] = filepath.split("\\")[-1] all_data.append(df[std_columns + ["来源文件"]]) result = pd.concat(all_data, ignore_index=True) return result # 使用示例:列名映射配置 mapping = { "日期": ["日期", "交易日期", "Date"], "产品名称": ["产品", "商品名称", "Product"], "销售额": ["金额", "销售金额", "成交额", "Amount"], "数量": ["数量", "销量", "Qty", "Quantity"], "地区": ["地区", "区域", "销售区域", "Region"], "销售员": ["销售员", "业务员", "Sales"] } files = glob.glob("销售数据/*.xlsx") merged = merge_diff_structure(files, mapping) print(merged.head())

数据类型兼容也是异结构合并中需要特别关注的方面。不同文件中同一列的数据类型可能不同:一个文件中"销售额"是浮点数,另一个文件中可能是字符串(含逗号或货币符号);一个文件中"日期"是datetime类型,另一个可能是字符串类型。在合并前统一数据类型可以避免许多隐蔽的错误。

def unify_dtypes(df, type_config): """统一DataFrame中各列的数据类型 type_config示例: { "销售额": "float", "日期": "datetime", "数量": "int" } """ for col, dtype in type_config.items(): if col not in df.columns: continue if dtype == "float": # 去除货币符号和逗号,再转为浮点数 df[col] = df[col].astype(str).str.replace(r"[¥$,]", "", regex=True) df[col] = pd.to_numeric(df[col], errors="coerce") elif dtype == "datetime": df[col] = pd.to_datetime(df[col], errors="coerce") elif dtype == "int": # 先转浮点再转整型,避免空值转换错误 df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0).astype(int) return df

最佳实践:建议在异结构合并项目中维护一个"数据字典"文件(JSON或YAML格式),其中记录所有标准列名、可能出现的别名、期望的数据类型和默认值。这样当新增数据源时,只需更新配置文件,不需要修改核心代码。

四、Excel数据拆分

数据拆分是合并的逆操作,也是日常办公中的高频需求。常见的拆分场景包括:按列值拆分(如将全国销售数据按省份拆分为多个文件)、按行数均分(如将10万行数据拆分为每1万行一个文件)、按条件分组拆分(如将订单数据分为已付款/未付款两组)。Python的pandas库和openpyxl库可以灵活应对这些场景。

按列值拆分是最常用的拆分方式。例如,有一份全国客户数据表,需要按"省份"列将数据拆分为各省的独立文件。使用pandas的groupby函数可以方便地实现按列分组,然后遍历每个分组写入独立的Excel文件。在拆分时还可以控制每个文件的列范围、排序规则、是否包含表头等细节参数。

import pandas as pd from pathlib import Path def split_by_column_value(filepath, split_col, output_dir="output"): """ 按指定列的值拆分Excel文件为多个文件 参数: filepath: 源文件路径 split_col: 用于拆分的列名 output_dir: 输出目录 """ df = pd.read_excel(filepath) Path(output_dir).mkdir(parents=True, exist_ok=True) # 遍历每个分组 for value, group in df.groupby(split_col): # 生成文件名:用拆分列的值命名 filename = f"{value}.xlsx" filepath_out = Path(output_dir) / filename # 去除拆分列中可能导致文件名非法字符 safe_name = "".join(c for c in str(value) if c.isalnum() or c in ("_", "-")) filepath_out = Path(output_dir) / f"{safe_name}.xlsx" group.to_excel(filepath_out, index=False) print(f"已创建: {filepath_out} ({len(group)} 行)") print(f"\n拆分完成!共 {df[split_col].nunique()} 个文件") # 使用示例:按省份拆分 split_by_column_value("全国客户数据.xlsx", split_col="省份")

按行数均分常用于处理超大Excel文件的场景。当单个Excel文件的行数超过Excel的行数上限(约104万行)或者超出打开软件的处理能力时,需要将文件拆分为多个较小的文件。按行数均分不依赖数据的语义,只是机械地将数据切分成等长的片段,逻辑简单但非常实用。

def split_by_row_count(filepath, rows_per_file=50000): """将Excel文件按指定行数拆分为多个小文件""" df = pd.read_excel(filepath) total_rows = len(df) file_count = (total_rows + rows_per_file - 1) // rows_per_file print(f"总行数: {total_rows},每个文件 {rows_per_file} 行,共 {file_count} 个文件") for i in range(file_count): start = i * rows_per_file end = min((i + 1) * rows_per_file, total_rows) chunk = df.iloc[start:end] filename = f"split_part_{i+1:03d}.xlsx" chunk.to_excel(filename, index=False) print(f"已创建: {filename} ({len(chunk)} 行)") # 按条件分组拆分(例如按数量区间) def split_by_condition(filepath, conditions): """ 按自定义条件拆分 conditions示例: { "大客户": df[df["金额"] > 100000], "中客户": df[(df["金额"] >= 10000) & (df["金额"] <= 100000)], "小客户": df[df["金额"] < 10000] } """ df = pd.read_excel(filepath) for name, mask in conditions.items(): subset = df[mask].copy() if len(subset) > 0: subset.to_excel(f"{name}.xlsx", index=False) print(f"已创建: {name}.xlsx ({len(subset)} 行)")

除了拆分为多个文件,有时还需要将数据拆分为同一个Excel文件中的多个工作表(Sheet)。这种方式的优点是所有数据集中在同一个文件中,便于分发和管理。使用pandas的ExcelWriter可以方便地实现多Sheet写入。

def split_to_multi_sheets(filepath, split_col, output_file="split_result.xlsx"): """按列值拆分到同一个文件的不同Sheet""" df = pd.read_excel(filepath) with pd.ExcelWriter(output_file, engine="openpyxl") as writer: for value, group in df.groupby(split_col): # Sheet名最多31个字符,超出会报错 sheet_name = str(value)[:31] group.to_excel(writer, sheet_name=sheet_name, index=False) print(f"已写入Sheet: {sheet_name} ({len(group)} 行)") print(f"\n已保存到文件: {output_file}")

注意:Excel的Sheet名称最长31个字符,且不能包含\/?*[]等特殊字符。当按列值拆分为Sheet时,务必对Sheet名称做截断和消毒处理,否则openpyxl会抛出异常。同时,如果分组数量过多(超过Excel允许的Sheet上限),建议改用多文件方式拆分。

五、数据比对基础

数据比对是Excel处理的另一大核心需求。无论是两版数据之间的差异检查、库存盘点前后的数据核对,还是系统导出数据与手工台账的比对验证,都需要高效、可靠的数据比对方案。手工比对不仅效率低下,而且容易遗漏差异,特别是在数据量较大的情况下。

行级逐字段比对是最基本也是最精确的比对方式。它的核心思路是:将两个DataFrame按照主键对齐,然后逐行、逐列比较对应单元格的值是否一致。pandas的compare函数提供了内置的比对能力,可以快速找出两个DataFrame之间的差异,并以结构化的方式呈现。对于比对结果,可以进一步生成差异报告,包括差异总数、差异分布、差异明细等统计信息。

import pandas as pd import numpy as np def basic_row_compare(df_old, df_new, key_cols): """ 行级逐字段比对两个DataFrame 参数: df_old: 旧数据 df_new: 新数据 key_cols: 主键列名列表(用于对齐数据行) 返回:差异摘要和差异明细 """ # 按主键对齐 merged = df_old.merge( df_new, on=key_cols, how="outer", suffixes=("_旧", "_新"), indicator=True ) # 差异分类统计 stats = merged["_merge"].value_counts() print("=== 差异统计 ===") print(f"两表一致: {stats.get('both', 0)} 行") print(f"仅在旧表: {stats.get('left_only', 0)} 行") print("(可能为已删除数据)") print(f"仅在新表: {stats.get('right_only', 0)} 行") print("(可能为新增数据)") # 找出两表都有但数值不同的行 both = merged[merged["_merge"] == "both"] value_cols_old = [c for c in both.columns if c.endswith("_旧")] value_cols_new = [c for c in both.columns if c.endswith("_新")] diff_count = 0 for old_col, new_col in zip(value_cols_old, value_cols_new): # 处理NaN的比对 mismatches = ~(both[old_col].fillna("__NULL__") == both[new_col].fillna("__NULL__")) col_diff = mismatches.sum() if col_diff > 0: print(f"列 '{old_col.replace('_旧', '')}' 差异: {col_diff} 处") diff_count += col_diff print(f"\n总计差异: {diff_count} 处") return merged

Key值关联比对是行级比对的进阶版本,不仅比较两表共有的行,还处理"仅在旧表"和"仅在新表"的情况。这种比对方式更全面地反映了数据的变化全貌,适合数据同步校验和版本变更审计等场景。

def compare_with_report(df_old, df_new, key_cols, output_file="差异报告.xlsx"): """带主键关联的完整比对,输出差异报告到Excel""" # 确保Key列存在 for col in key_cols: assert col in df_old.columns and col in df_new.columns # 设置主键索引 old_idx = df_old.set_index(key_cols) new_idx = df_new.set_index(key_cols) # 找出变化行 diff_mask = old_idx.fillna("__NULL__") != new_idx.fillna("__NULL__") # 只有新旧都有的行才比较值 common_keys = old_idx.index.intersection(new_idx.index) diffs = [] for key in common_keys: old_row = old_idx.loc[key] new_row = new_idx.loc[key] for col in old_row.index: val_old = old_row[col] val_new = new_row[col] if str(val_old) != str(val_new): diffs.append({ "主键": key, "字段": col, "旧值": val_old, "新值": val_new }) # 输出差异报告 diff_df = pd.DataFrame(diffs) with pd.ExcelWriter(output_file) as writer: diff_df.to_excel(writer, sheet_name="差异明细", index=False) print(f"差异报告已保存: {output_file}") return diff_df

比对原则:数据比对应遵循"从粗到精"的策略——先比对行数级(总量是否一致),再比对汇总级(求和、均值是否一致),最后比对行级(具体哪些行、哪些列有差异)。这种分层比对可以快速定位问题范围,避免一开始就陷入海量的细节比对中。

六、高级比对功能

在实际业务中,数据比对的需求往往比简单的"相等/不相等"二元判断复杂得多。例如,两份数据中同一款产品的名称可能略有差异("iPhone 15" vs "iPhone15");月度数据的变化追踪需要比对连续12个月的版本;或者需要在比对完成后自动生成格式化的差异报告。这些高级比对场景需要更灵活的算法和更复杂的流程设计。

模糊匹配比对是处理"近似相同但不等"场景的核心技术。其原理是使用字符串相似度算法(如Levenshtein距离、余弦相似度、Jaccard系数等)来判断两个字符串的匹配程度。在Python中,fuzzywuzzy库(基于difflib)和rapidfuzz库都提供了高效的模糊匹配功能。当匹配度超过设定的阈值(如85%)时,视为匹配成功。

from rapidfuzz import fuzz def fuzzy_compare(df_old, df_new, key_col, threshold=85): """ 基于模糊匹配的数据比对 参数: threshold: 匹配阈值(0-100),越高要求越严格 """ old_keys = df_old[key_col].dropna().unique() new_keys = df_new[key_col].dropna().unique() matches = [] unmatched_old = [] unmatched_new = list(new_keys) for old_key in old_keys: best_score = 0 best_match = None for new_key in unmatched_new: score = fuzz.token_sort_ratio(str(old_key), str(new_key)) if score > best_score: best_score = score best_match = new_key if best_score >= threshold: matches.append({ "旧值": old_key, "新值": best_match, "匹配度": best_score }) unmatched_new.remove(best_match) else: unmatched_old.append(old_key) print(f"完美匹配: {len(matches)}") print(f"旧表未匹配: {len(unmatched_old)}") print(f"新表未匹配: {len(unmatched_new)}") return {"matches": matches, "unmatched_old": unmatched_old, "unmatched_new": unmatched_new}

多版本变化追踪适用于需要持续监控数据变化的场景。例如,每月的库存数据变化追踪、财务数据的审计轨迹等。实现方式是将多个时间版本的数据进行两两比对,记录每一次的变化状态(新增、删除、修改、无变化),最终生成一个完整的变化历史。通过pivot_table可以将变化历史汇总为直观的"快照"视图。

def track_multi_version_changes(version_files, key_cols, value_cols): """ 多版本数据变化追踪 参数: version_files: 按时间顺序的文件列表,如["v1.xlsx", "v2.xlsx", "v3.xlsx"] key_cols: 主键列 value_cols: 需要追踪的数值列 返回:所有版本的变化历史 """ versions = {} for i, f in enumerate(version_files): versions[i] = pd.read_excel(f).set_index(key_cols) records = [] for col in value_cols: all_keys = set() for df in versions.values(): all_keys.update(df.index) for key in all_keys: values = [] for i in range(len(versions)): val = versions[i].loc[key, col] if key in versions[i].index else None values.append(val) # 判断变化类型 non_null = [v for v in values if v is not None] if len(non_null) == 1 and non_null[0] is not None: change_type = "新增" elif len(set(non_null)) == 1: change_type = "无变化" else: change_type = "修改" records.append({ "主键": key, "字段": col, "变化类型": change_type, "版本价值": values }) return pd.DataFrame(records)

增量更新和比对报告生成是高级比对功能的最后环节。增量更新是指在比对完成后,只将有差异的数据输出到结果文件中,而不是输出全部数据。比对报告通常以HTML或Excel格式输出,包含差异统计摘要、差异明细表、差异分布图表等。使用pandas的Styler功能可以在Excel中实现差异高亮(绿色标注新增行、红色标注删除行、黄色标注修改行)。

def generate_diff_report(df_old, df_new, key_cols, output_html="差异报告.html"): """生成HTML格式的差异比对报告,包含高亮显示""" # 比对并生成差异标记 old_set = df_old.set_index(key_cols) new_set = df_new.set_index(key_cols) html_parts = [""" """] # 统计摘要 all_keys = old_set.index.union(new_set.index) added = new_set.index.difference(old_set.index) deleted = old_set.index.difference(new_set.index) common = old_set.index.intersection(new_set.index) html_parts.append(f"""

差异报告摘要

总记录数(旧): {len(old_set)}

总记录数(新): {len(new_set)}

新增: {len(added)}

删除: {len(deleted)}

修改: 待逐行比对

"""
) # ... 逐行比对并生成HTML表格 ... html_parts.append("") with open(output_html, "w", encoding="utf-8") as f: f.write("\n".join(html_parts)) print(f"差异报告已生成: {output_html}")

进阶技巧:在大规模数据比对中,建议采用"分桶"策略——先将数据按某个维度(如日期、区域)分成若干桶,再对每个桶单独进行比对。这种策略一方面可以减少单次比对的内存消耗,另一方面便于并行处理和逐步排查差异。

七、大数据量处理

当Excel文件的规模达到百万行级别时,直接使用pandas的read_excel一次性读入内存可能会导致内存不足(MemoryError)。大数据量处理需要采用特殊的策略和技巧。核心思路包括:分块读取(Chunking)、数据类型优化、内存监控、以及断点续传机制。

分块读取是处理大文件的首要策略。虽然pandas的read_excel不直接支持chunksize参数(read_csv支持),但可以通过openpyxl的只读模式(read_only=True)逐行读取数据,再分批转换为DataFrame。另外,也可以先使用Python原生的csv模块或迭代器模式逐行处理数据,避免一次性加载整个文件到内存中。

import openpyxl import pandas as pd from itertools import islice def read_excel_in_chunks(filepath, sheet_name=None, chunk_size=10000): """ 分块读取Excel大文件(生成器模式) 每次返回一个chunk_size行的DataFrame """ wb = openpyxl.load_workbook(filepath, read_only=True) if sheet_name is None: sheet = wb.active else: sheet = wb[sheet_name] # 读取表头 rows_iter = sheet.iter_rows(values_only=True) headers = next(rows_iter) while True: chunk = list(islice(rows_iter, chunk_size)) if not chunk: break df = pd.DataFrame(chunk, columns=headers) yield df wb.close() # 使用示例:逐块处理大文件 total_rows = 0 for chunk_df in read_excel_in_chunks("超大文件.xlsx", chunk_size=50000): total_rows += len(chunk_df) # 对chunk进行处理... print(f"已处理 {total_rows} 行...")

内存优化是处理大数据量的另一个关键方面。pandas默认的数据类型(如int64、float64)占用较多内存。通过将列的数据类型降级为更经济的类型(如int32、float32,甚至categorical类型),可以显著降低内存占用。此外,及时删除不再使用的中间变量、使用del语句释放内存、以及调用gc.collect()强制垃圾回收,都是有效的内存管理手段。

import pandas as pd import numpy as np import gc def optimize_dtypes(df): """智能优化DataFrame的数据类型以节省内存""" start_mem = df.memory_usage(deep=True).sum() / 1024**2 print(f"优化前内存: {start_mem:.2f} MB") for col in df.columns: col_type = df[col].dtype if col_type != "object": c_min, c_max = df[col].min(), df[col].max() if "int" in str(col_type): if c_min >= 0: if c_max < 2**8: df[col] = df[col].astype(np.uint8) elif c_max < 2**16: df[col] = df[col].astype(np.uint16) elif c_max < 2**32: df[col] = df[col].astype(np.uint32) else: if c_min > -2**7 and c_max < 2**7: df[col] = df[col].astype(np.int8) elif c_min > -2**15 and c_max < 2**15: df[col] = df[col].astype(np.int16) elif c_min > -2**31 and c_max < 2**31: df[col] = df[col].astype(np.int32) elif "float" in str(col_type): df[col] = df[col].astype(np.float32) else: # 对象类型:如果唯一值少,转为category num_unique = df[col].nunique() if num_unique < len(df) * 0.5: df[col] = df[col].astype("category") end_mem = df.memory_usage(deep=True).sum() / 1024**2 print(f"优化后内存: {end_mem:.2f} MB") print(f"内存节省: {(start_mem - end_mem) / start_mem * 100:.1f}%") return df # 及时清理内存 del df gc.collect()

进度显示和断点续传是提升大数据量处理体验的重要功能。使用tqdm库可以显示处理进度条;通过定期记录已处理的行数或文件索引,可以在程序中断后从断点处继续处理,避免从头开始。

from tqdm import tqdm import json def process_with_resume(file_list, checkpoint_file="checkpoint.json"): """带断点续传功能的批量处理""" # 读取断点 start_index = 0 try: with open(checkpoint_file, "r") as f: checkpoint = json.load(f) start_index = checkpoint.get("last_index", 0) print(f"从断点恢复: 已处理到第 {start_index} 个文件") except FileNotFoundError: print("未发现断点,从头开始处理") # 带进度条的处理循环 for i in tqdm(range(start_index, len(file_list)), desc="处理进度"): try: # 模拟文件处理 df = pd.read_excel(file_list[i]) # ... 处理逻辑 ... # 保存断点 with open(checkpoint_file, "w") as f: json.dump({"last_index": i + 1}, f) except Exception as e: print(f"处理文件 {file_list[i]} 时出错: {e}") continue print("全部处理完成!")

性能经验:在实践中有三条黄金法则——(1)能用CSV就不要用Excel:相同数据量下CSV读写速度是Excel的5-10倍;(2)能用筛选就不要全表读:通过openpyxl的iter_rows只读需要的行列,避免加载全部数据;(3)能用SQL就不要用Excel:数据量超过100万行时,建议先导入SQLite或数据库再处理。

八、实战案例

理论方法需要通过实战案例来巩固。本节提供三个典型的企业级应用案例,涵盖合并、拆分、比对三大核心功能,这些案例均来自真实业务场景,可直接复用或根据实际需求调整。

案例一:月度销售报表合并

某连锁企业每月从全国30个门店收集销售报表,每个门店提交的Excel文件包含"销售明细"工作表,列结构一致(交易日期、门店编号、商品编码、商品名称、数量、单价、金额、收银员)。月末需要将所有门店的报表合并为一个总表,用于管理层数据分析。

from pathlib import Path import pandas as pd import glob from datetime import datetime def monthly_sales_merge(report_dir, month_str): """ 合并指定月份的销售报表 参数: report_dir: 报表目录(包含各门店子目录) month_str: 月份字符串,如 "202603" """ pattern = f"{report_dir}/{month_str}/*/*.xlsx" files = glob.glob(pattern) print(f"找到 {len(files)} 个报表文件") all_data = [] for f in files: df = pd.read_excel(f, sheet_name="销售明细") # 数据清洗 df = df.dropna(subset=["交易日期", "商品编码"]) df["金额"] = pd.to_numeric(df["金额"], errors="coerce").fillna(0) # 标注来源 store_name = Path(f).parent.name df["门店"] = store_name all_data.append(df) # 合并 result = pd.concat(all_data, ignore_index=True) # 输出汇总统计 summary = result.groupby("门店").agg( 总销售额=("金额", "sum"), 总笔数=("交易日期", "count") ).reset_index() # 保存 output_file = f"销售报表汇总_{month_str}.xlsx" with pd.ExcelWriter(output_file) as writer: result.to_excel(writer, sheet_name="合并明细", index=False) summary.to_excel(writer, sheet_name="门店汇总", index=False) print(f"合并完成!总销售额: {summary['总销售额'].sum():.2f} 元") print(f"总计交易笔数: {summary['总笔数'].sum()}") return result # 运行 merged = monthly_sales_merge("sales_reports", "202603")

案例二:按地区拆分客户数据

某公司总部的CRM系统导出了一份全国客户数据表(10万行),需要按"省份"列拆分为各个地区的客户名单,分发给各区域销售团队。每个区域的文件需要按客户名称排序,并包含一个统计Sheet说明该区域的客户分布情况。

def split_customers_by_region(source_file, region_col="省份"): """将客户数据按省份拆分,每个文件含统计信息""" df = pd.read_excel(source_file) # 按省份分组 for region, group in df.groupby(region_col): group = group.sort_values("客户名称") # 城市级别统计 city_stats = group["城市"].value_counts().reset_index() city_stats.columns = ["城市", "客户数"] filename = f"客户数据_{region}.xlsx" with pd.ExcelWriter(filename) as writer: group.to_excel(writer, sheet_name="客户名单", index=False) city_stats.to_excel(writer, sheet_name="城市分布", index=False) # 写入统计摘要 summary = pd.DataFrame([ ["省份", region], ["客户总数", len(group)], ["覆盖城市", group["城市"].nunique()], ], columns=["项目", "值"]) summary.to_excel(writer, sheet_name="摘要", index=False) print(f"已生成: {filename} ({len(group)} 条)") print(f"拆分完成!共 {df[region_col].nunique()} 个区域文件") split_customers_by_region("全国客户数据.xlsx")

案例三:库存盘点前后数据比对

某仓库每月进行库存盘点。有两个Excel文件:"系统库存.xlsx"(ERP系统导出)和"盘点库存.xlsx"(仓库人员实地盘点结果)。需要通过比对找出差异商品,并生成差异报告,包含差异金额汇总和重点差异商品标记。

def inventory_compare(system_file, physical_file, output_file="库存差异报告.xlsx"): """库存盘点数据比对,生成差异报告""" sys_df = pd.read_excel(system_file) phy_df = pd.read_excel(physical_file) # 确保关键列存在 required_cols = ["商品编码", "商品名称", "库存数量", "单价"] for col in required_cols: assert col in sys_df.columns and col in phy_df.columns # 按主键对齐 merged = sys_df.merge( phy_df, on=["商品编码", "商品名称"], how="outer", suffixes=("_系统", "_盘点"), indicator=True ) # 计算差异 merged["数量差异"] = merged["库存数量_盘点"].fillna(0) - merged["库存数量_系统"].fillna(0) merged["金额差异"] = merged["数量差异"] * merged["单价_系统"].fillna(merged["单价_盘点"]) # 标记差异级别 def mark_level(qty_diff, amt_diff): if abs(amt_diff) >= 10000: return "重大" elif abs(qty_diff) >= 10: return "中等" elif qty_diff != 0: return "一般" else: return "一致" merged["差异级别"] = merged.apply( lambda row: mark_level(row["数量差异"], row["金额差异"]), axis=1 ) # 筛选出有差异的 diff_only = merged[merged["差异级别"] != "一致"].copy() # 汇总统计 summary = pd.DataFrame([ ["系统记录数", len(sys_df)], ["盘点记录数", len(phy_df)], ["差异商品数", len(diff_only)], ["差异总金额", diff_only["金额差异"].sum()], ], columns=["统计项", "值"]) # 输出报告 with pd.ExcelWriter(output_file) as writer: summary.to_excel(writer, sheet_name="汇总", index=False) diff_only.to_excel(writer, sheet_name="差异明细", index=False) print(f"库存差异报告已生成: {output_file}") print(f"差异商品数: {len(diff_only)},差异总金额: {diff_only['金额差异'].sum():.2f} 元") return diff_only inventory_compare("系统库存.xlsx", "盘点库存.xlsx")

学以致用:以上三个案例代表了Excel批量处理的三大基本场景——合并(聚)、拆分(散)、比对(校)。在实际工作中,这三个功能常常组合使用。例如,先合并各门店报表,再按产品线拆分,最后比对两个月的差异。掌握这三项核心技能,可以覆盖日常办公中90%以上的Excel自动化需求。