← 返回自动化办公目录
← 返回学习笔记首页
专题: Python 自动化办公系统学习
关键词: Python, 自动化办公, Excel, 数据库, 数据导入, 数据导出, SQLite, MySQL, PostgreSQL, Python
一、数据互导概述
应用场景
在现代企业信息化建设中,Excel与数据库之间的数据互导是最常见也最核心的需求之一。Excel作为业务人员最熟悉的数据工具,拥有直观的表格界面和灵活的编辑能力,而数据库系统则提供了结构化存储、高效查询、事务安全和企业级管理能力。打通二者之间的数据通道,可以实现从数据采集、清洗、存储到分析的全流程自动化。
典型的应用场景包括:数据迁移 ——将历史遗留的Excel数据一次性导入数据库系统,完成数据平台的升级换代;报表生成 ——从数据库中提取业务数据,导出为Excel格式供管理层审阅或向客户交付;ETL流程 ——构建从业务系统数据库到数据仓库的抽取-转换-加载管道,其中Excel常作为中间数据交换格式或数据源存在;批量导入 ——通过Excel模板批量录入客户信息、商品目录、库存数据等,由自动化脚本校验后写入数据库。
技术方案对比
Python生态系统为Excel与数据库互导提供了多层次的解决方案。最基础的方案是使用openpyxl或xlrd/xlwt直接操作Excel文件,配合sqlite3或pymysql等数据库驱动手动读写数据。这种方式控制力最强,但代码量较大,需要自行处理数据类型映射和异常处理。推荐的生产级方案是使用pandas库配合SQLAlchemy,pandas的read_excel/to_excel和read_sql/to_sql
import pandas as pd
from sqlalchemy import create_engine
# 从Excel读取数据
df = pd.read_excel("sales_data.xlsx" , sheet_name="Sheet1" )
# 写入SQLite数据库
engine = create_engine("sqlite:///sales.db" )
df.to_sql("sales_records" , engine, index=False , if_exists="replace" )
# 验证写入结果
result = pd.read_sql("SELECT COUNT(*) FROM sales_records" , engine)
print (f"共导入 {result.iloc[0,0]} 条记录" )
架构设计原则
在设计数据互导系统时,应遵循以下原则:解耦 ——数据读取、数据转换、数据写入三个环节应分离为独立模块,便于单独测试和复用;可配置 ——数据库连接信息、字段映射关系、数据清洗规则等应外置到配置文件中,避免硬编码;容错 ——针对脏数据、网络超时、文件损坏等异常情况设计重试和回滚机制;可追溯 ——记录每次同步操作的日志,包括执行时间、处理记录数、异常详情等,便于问题排查和审计。
# 配置驱动的数据互导架构示例
import json
import pandas as pd
from sqlalchemy import create_engine
def load_config (config_path: str ) -> dict :
with open (config_path, 'r' , encoding='utf-8' ) as f:
return json.load(f)
def excel_to_db (config: dict ):
df = pd.read_excel(config["source_file" ], sheet_name=config["sheet" ])
# 字段映射与清洗
df.rename(columns=config["column_mapping" ], inplace=True )
df = df[config["target_columns" ]]
# 写入数据库
engine = create_engine(config["db_url" ])
df.to_sql(config["table_name" ], engine, index=False ,
if_exists=config.get("if_exists" , "append" ),
chunksize=config.get("chunksize" , 1000))
print (f"成功导入 {len(df)} 条记录到 {config['table_name']}" )
# 调用示例
config = load_config("sync_config.json" )
excel_to_db(config)
在实际工程中,架构设计还需要考虑数据量级。对于数千行级别的小批量数据,使用pandas直接读写即可满足需求。当数据量达到数十万行甚至百万行级别时,需要引入分块处理、批量提交和流式读写等优化策略。对于跨数据库类型的企业级应用,SQLAlchemy的方言(Dialect)机制可以自动适配不同数据库的语法差异,是构建可移植数据互导系统的基础组件。
二、Excel→SQLite
pandas + sqlite3 快速导入
SQLite是嵌入式关系数据库引擎,无需独立服务器进程,数据库文件即为单一文件,非常适合桌面应用、原型开发和中小规模数据管理。使用pandas配合sqlite3模块是将Excel数据导入SQLite最高效的方式。pandas的read_excel()方法可以直接读取Excel文件为DataFrame,然后通过to_sql()方法将DataFrame写入SQLite表。整个过程只需寥寥数行代码,且自动处理了数据类型推断和表结构创建。
import pandas as pd
import sqlite3
# 步骤1:从Excel读取数据
df = pd.read_excel("products.xlsx" )
print (f"读取到 {len(df)} 行数据" )
print ("列名:" , list(df.columns))
# 步骤2:连接到SQLite数据库(不存在则自动创建)
conn = sqlite3.connect("inventory.db" )
# 步骤3:将DataFrame写入SQLite表
df.to_sql("products" , conn, index=False , if_exists="replace" )
# 步骤4:验证数据
result = pd.read_sql("SELECT * FROM products LIMIT 5" , conn)
print (result)
conn.close()
openpyxl逐行插入
当需要对Excel读取过程进行精细控制时,可以使用openpyxl逐行读取并配合SQL参数化插入。这种方式虽然代码量稍大,但可以在数据处理过程中加入自定义校验逻辑,并且通过批量提交可以显著提升写入性能。逐行插入的另一大优势是内存控制——对于超大Excel文件,pandas的read_excel会将整个文件加载到内存,而openpyxl的read_only=True模式可以按行流式读取,内存占用仅为单个数据行的量级。
import sqlite3
from openpyxl import load_workbook
def excel_to_sqlite_batch (excel_path, db_path, table_name, batch_size=500):
"""逐行读取Excel并批量写入SQLite"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
wb = load_workbook(excel_path, read_only=True )
ws = wb.active
# 读取表头
headers = [cell.value for cell in next(ws.iter_rows())]
placeholders = ", " .join(["?" ] * len(headers))
columns = ", " .join(headers)
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入
batch = []
total = 0
for row in ws.iter_rows(values_only=True ):
# 跳过空行
if all (cell is None for cell in row):
continue
batch.append(row)
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
batch = []
# 插入剩余数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total += len(batch)
wb.close()
conn.close()
print (f"共导入 {total} 条记录到 {table_name}" )
数据类型映射与批量写入优化
Excel与SQLite之间的数据类型映射需要特别关注。Excel中常见的类型(文本、数字、日期、布尔值)与SQLite的存储类(TEXT、INTEGER、REAL、BLOB)存在对应关系,但实际转换中可能出现精度丢失、日期格式不兼容等问题。建议在导入前对DataFrame的dtypes进行检查和转换,特别是将object类型中的日期字符串统一转为datetime类型,将浮点数字段中的NaN值处理为None或0。批量写入时,chunksize参数的设置对性能影响显著——过小会导致频繁的数据库事务提交,过大则可能占用过多内存。根据经验,chunksize设置在500-2000之间可以获得较好的平衡。
import pandas as pd
import sqlite3
from datetime import datetime
# 数据清洗与类型映射
df = pd.read_excel("orders.xlsx" )
# 将日期字符串转为datetime
df["order_date" ] = pd.to_datetime(df["order_date" ], errors="coerce" )
# 将金额字段转为浮点数,非数字设为NaN
df["amount" ] = pd.to_numeric(df["amount" ], errors="coerce" ).fillna(0.0 )
# 填充空值
df["customer_name" ] = df["customer_name" ].fillna("未知客户" )
# 分块写入SQLite
conn = sqlite3.connect("orders.db" )
df.to_sql("orders" , conn, index=False ,
if_exists="append" , chunksize=1000 )
conn.close()
print (f"导入完成,共处理 {len(df)} 条订单记录" )
三、Excel→MySQL / PostgreSQL
SQLAlchemy统一连接
对于MySQL和PostgreSQL这样的企业级数据库,推荐使用SQLAlchemy作为数据库连接层。SQLAlchemy提供了统一的ORM和Core API,上层pandas的to_sql和read_sql方法天然支持SQLAlchemy Engine对象。这意味着切换数据库后端时,只需要修改连接字符串即可,业务逻辑代码无需改动。MySQL的连接字符串格式为mysql+pymysql://user:pass@host:port/dbname,PostgreSQL的格式为postgresql+psycopg2://user:pass@host:port/dbname。建议将连接字符串存储在环境变量或配置文件中,避免敏感信息硬编码。
import pandas as pd
from sqlalchemy import create_engine
# MySQL连接
mysql_engine = create_engine(
"mysql+pymysql://root:password@localhost:3306/business_db" ,
pool_size=5 , max_overflow=10 ,
echo=False
)
# PostgreSQL连接
pg_engine = create_engine(
"postgresql+psycopg2://postgres:password@localhost:5432/business_db" ,
pool_size=5 , max_overflow=10
)
# 从Excel读取并写入MySQL
df = pd.read_excel("sales_2025.xlsx" )
df.to_sql("sales" , mysql_engine, index=False ,
if_exists="append" , chunksize=2000 )
print ("数据已写入MySQL" )
# 同样的代码,只需换engine即可写入PostgreSQL
# df.to_sql("sales", pg_engine, index=False, if_exists="append", chunksize=2000)
主键与索引处理
pandas的to_sql方法在默认情况下不会自动创建主键和索引,导入的数据表需要后续手动添加约束。建议的实践是先通过DDL语句创建目标表结构(包括主键、外键、唯一约束和索引),然后使用to_sql(if_exists="append")以追加模式写入数据。这样可以确保数据完整性约束在导入过程中生效,同时获得更好的查询性能。对于大数据量的场景,可以考虑先禁用索引、批量导入数据、再重建索引的策略,这可以显著提升写入速度。
from sqlalchemy import create_engine, text
engine = create_engine("mysql+pymysql://root:password@localhost:3306/business_db" )
# 步骤1:先创建表结构(含主键和索引)
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS customers (
id INT AUTO_INCREMENT PRIMARY KEY,
customer_code VARCHAR(50) NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
phone VARCHAR(20),
email VARCHAR(200),
province VARCHAR(50),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_province (province),
INDEX idx_created (created_at)
)
""" ))
conn.commit()
# 步骤2:从Excel读取并追加写入
df = pd.read_excel("customers.xlsx" )
df.to_sql("customers" , engine, index=False ,
if_exists="append" , chunksize=500 ,
method="multi" ) -- 使用多行插入语法提升性能
print (f"客户数据导入完成,共 {len(df)} 条" )
事务安全与批量写入
数据导入的原子性保障至关重要。特别是在生产环境中,导入过程中可能发生网络中断、唯一键冲突、数据类型错误等异常。使用数据库事务可以确保导入操作要么全部成功,要么全部回滚,避免出现部分写入导致的数据不一致。SQLAlchemy的Engine默认启用了事务,当调用conn.commit()或上下文管理器退出时才会真正提交。在pandas的to_sql中,可以通过method参数自定义写入方法,例如使用PostgreSQL的COPY命令实现极速批量加载。
from sqlalchemy import create_engine, text
import pandas as pd
engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/business_db" )
# 事务安全的导入函数
def safe_import_from_excel (excel_path, table_name, engine):
"""在事务中安全导入Excel数据到数据库"""
try :
df = pd.read_excel(excel_path)
# 使用事务上下文
with engine.begin() as conn:
# 先清理目标表中的旧数据(可选)
# conn.execute(text(f"TRUNCATE TABLE {table_name}"))
df.to_sql(table_name, conn, index=False ,
if_exists="append" , chunksize=1000 )
print (f"导入成功:{len(df)} 条记录" )
return True
except Exception as e:
print (f"导入失败,已回滚:{e}" )
return False
四、数据库→Excel
pandas读取SQL直接导出为Excel
从数据库导出数据到Excel是最常见的报表生成需求。pandas的read_sql()方法可以直接执行SQL查询并将结果集转换为DataFrame,再通过to_excel()方法写入Excel文件。这种方式简洁高效,支持复杂的SQL查询(包括多表JOIN、聚合计算、子查询等),并且可以配合ExcelWriter实现多Sheet输出。生成的Excel文件可以设置列宽、冻结窗格、添加筛选器等格式化特性,使报表更加直观易用。
import pandas as pd
from sqlalchemy import create_engine
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill
from openpyxl.utils import get_column_letter
engine = create_engine("mysql+pymysql://root:password@localhost:3306/sales_db" )
# 执行SQL查询并导出到Excel
query = """
SELECT
DATE_FORMAT(order_date, '%%Y-%%m') AS 月份,
region AS 区域,
COUNT(*) AS 订单数,
SUM(amount) AS 销售总额,
AVG(amount) AS 平均订单金额
FROM orders
WHERE order_date >= '2025-01-01'
GROUP BY 月份, 区域
ORDER BY 月份, 区域
"""
df = pd.read_sql(query, engine)
# 导出到Excel并格式化
output_path = "月度销售报表.xlsx"
with pd.ExcelWriter(output_path, engine="openpyxl" ) as writer:
df.to_excel(writer, sheet_name="销售汇总" , index=False )
# 自动调整列宽
worksheet = writer.sheets["销售汇总" ]
for i, col in enumerate (df.columns, 1 ):
max_len = max(df[col].astype(str ).str.len().max(), len(col)) + 2
worksheet.column_dimensions[get_column_letter(i)].width = min(max_len, 30 )
print (f"报表已生成:{output_path}" )
游标逐行读取写入
当数据库查询结果集非常庞大(数百万行)时,使用pandas一次性加载全部数据可能导致内存溢出。此时应该使用数据库游标(Cursor)逐行或分批读取数据,配合openpyxl的write_only=True模式逐行写入Excel。这种方式的内存占用是恒定的,与数据总量无关,可以处理任意大小的数据集。游标方式还允许在读取过程中进行实时数据处理,如数据脱敏、格式转换、异常值过滤等。需要特别注意的是,逐行写入Excel的性能瓶颈通常在IO而非CPU,适当增大批处理大小可以平衡内存和性能。
import mysql.connector
from openpyxl import Workbook
def export_large_query_to_excel (db_config, query, output_path, batch_size=10000 ):
"""使用游标逐批读取大数据量查询结果并写入Excel"""
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(buffered=True )
cursor.execute(query)
# 获取列名
columns = [desc[0 ] for desc in cursor.description]
# 创建Excel工作簿(write_only模式节省内存)
wb = Workbook(write_only=True )
ws = wb.create_sheet()
ws.append(columns)
# 分批读取并写入
total = 0
while True :
rows = cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
ws.append(row)
total += len(rows)
print (f"已处理 {total} 行..." )
wb.save(output_path)
cursor.close()
conn.close()
print (f"导出完成,共 {total} 条记录 -> {output_path}" )
大数据量分Sheet导出
Excel单个Sheet的最大行数为1048576行,但实际使用中,当数据量超过数万行时,打开和操作Excel文件会变得缓慢。更好的做法是将数据按业务维度分拆到多个Sheet中,例如按月份、按区域或按产品类别分Sheet。pandas的ExcelWriter对象支持向同一个Excel文件的多个Sheet写入数据,结合groupby操作可以轻松实现分Sheet导出。此外,对于超大规模数据集,还可以将数据拆分为多个Excel文件,每个文件包含一个独立的数据子集,文件按序号或业务关键词命名。
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/sales_db" )
# 按月份分Sheet导出销售数据
df = pd.read_sql("SELECT * FROM orders WHERE year = 2025" , engine)
df["month" ] = pd.to_datetime(df["order_date" ]).dt.month
output_path = "2025年销售数据_按月分Sheet.xlsx"
with pd.ExcelWriter(output_path, engine="openpyxl" ) as writer:
for month, group in df.groupby("month" ):
sheet_name = f"{int(month)}月"
group.to_excel(writer, sheet_name=sheet_name, index=False )
print (f"已写入 {sheet_name}: {len(group)} 条记录" )
print (f"多Sheet报表已生成:{output_path}" )
五、增量同步
时间戳增量同步
在实际业务中,全量数据同步往往只在首次初始化时执行一次,后续的日常同步应使用增量方式,仅同步发生变化的数据。最常用的是基于时间戳的增量同步策略:在源数据表和目标表中都维护一个"最后修改时间"字段,每次同步时只拉取该时间戳之后新增或修改的数据。这种策略实现简单、性能优秀,但要求源系统必须可靠地维护时间戳字段,并且在跨时区场景下需要统一转换为UTC时间。时间戳增量同步适合数据只增不改或很少修改的场景,如日志数据、订单记录等。
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime
def incremental_sync_by_timestamp (excel_path, db_url, table_name, timestamp_col):
"""基于时间戳的增量同步"""
engine = create_engine(db_url)
# 获取目标表中最大的时间戳
query = f"SELECT MAX({timestamp_col}) FROM {table_name}"
last_sync = pd.read_sql(query, engine).iloc[0 , 0 ]
if last_sync is None :
# 首次同步:全量导入
df = pd.read_excel(excel_path)
print ("首次同步,全量导入" )
else :
# 仅读取时间戳之后的数据
# 假设Excel数据也可以按时间过滤
df = pd.read_excel(excel_path)
df = df[pd.to_datetime(df[timestamp_col]) > last_sync]
print (f"增量同步,上次同步时间:{last_sync},本次新增:{len(df)} 条" )
if not df.empty:
df.to_sql(table_name, engine, index=False ,
if_exists="append" , chunksize=500 )
print (f"同步完成,新增 {len(df)} 条记录" )
else :
print ("没有新增数据,跳过同步" )
return len(df) if not df.empty else 0
自增ID增量与变更数据捕获
自增ID增量同步是另一种常见策略,适用于有序追加的数据(如流水号、日志ID等)。同步时记录上次同步的最大ID值,下次只拉取ID更大的数据。自增ID策略的优点是不依赖时间戳字段,性能极佳,但缺点是无法捕获对已有数据的修改操作。对于需要完整捕获数据变更的场景,可以考虑变更数据捕获(CDC)方案。CDC通过订阅数据库的变更日志(如MySQL的binlog、PostgreSQL的WAL),实时捕获所有INSERT、UPDATE、DELETE操作。虽然在Python中实现完整的CDC较为复杂,但可以借助第三方工具如Debezium、Maxwell或Canal来捕获变更事件,再通过消息队列传递给Python处理程序。
# 基于自增ID的增量同步
import pandas as pd
from sqlalchemy import create_engine
def sync_by_increment_id (db_url, source_table, target_table, id_col, excel_output):
engine = create_engine(db_url)
# 获取目标表已同步的最大ID
max_id = pd.read_sql(f"SELECT COALESCE(MAX({id_col}), 0) FROM {target_table}" , engine).iloc[0 , 0 ]
# 拉取增量数据
new_data = pd.read_sql(
f"SELECT * FROM {source_table} WHERE {id_col} > {max_id}" , engine
)
if not new_data.empty:
new_data.to_excel(excel_output, index=False )
print (f"增量导出 {len(new_data)} 条到 {excel_output}" )
else :
print ("无增量数据" )
return new_data
数据比对与同步日志
数据比对是增量同步的正确性保障。在同步完成后,应对比源端和目标端的记录数、关键指标的汇总值(如总金额、总条数),确保数据一致性。对于数据量较大的场景,可以使用checksum(哈希校验)方式进行快速比对。同步日志记录了每次同步的详细信息,包括开始时间、结束时间、同步类型(全量/增量)、处理记录数、异常记录数和错误详情。完善的同步日志不仅便于问题排查,也是数据审计的重要依据。建议将同步日志写入单独的日志表或日志文件中,并提供查询接口供运维人员查看同步状态。
import logging
from datetime import datetime
# 配置同步日志
logging.basicConfig(
filename="sync_log.txt" ,
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s" ,
encoding="utf-8"
)
def validate_sync (engine, table_name, expected_count):
"""同步后的数据校验"""
actual_count = pd.read_sql(f"SELECT COUNT(*) FROM {table_name}" , engine).iloc[0 , 0 ]
if actual_count == expected_count:
logging.info(f"校验通过:{table_name} 共 {actual_count} 条" )
return True
else :
logging.warning(f"校验失败:期望 {expected_count} 条,实际 {actual_count} 条" )
return False
六、数据清洗与映射
列名映射
Excel文件中的列名通常采用中文或非标准命名,而数据库中的字段名往往是英文或遵循特定命名规范。列名映射是数据导入的第一步,需要建立Excel列名到数据库字段名的对应关系。推荐使用字典来定义映射关系,将映射配置外置到JSON或YAML文件中,使得非开发人员也能维护映射规则。在pandas中,可以使用df.rename(columns=mapping)方法快速完成列名替换,然后通过列选择子集确保只导入需要的字段。对于缺失字段,应该设置默认值或触发告警。
import pandas as pd
import json
# 列名映射配置
column_mapping = {
"客户编号" : "customer_code" ,
"客户名称" : "customer_name" ,
"联系电话" : "phone" ,
"电子邮箱" : "email" ,
"所在省份" : "province" ,
"注册日期" : "reg_date" ,
"客户等级" : "tier"
}
# 应用映射
df = pd.read_excel("客户数据.xlsx" )
df.rename(columns=column_mapping, inplace=True )
# 只保留目标字段,缺失字段填充默认值
required_cols = list(column_mapping.values())
for col in required_cols:
if col not in df.columns:
df[col] = None
df = df[required_cols]
print ("映射后列名:" , list(df.columns))
数据类型转换与空值处理
Excel中的数据类型与数据库类型并非一一对应,数据导入时的类型转换是数据质量的关键环节。pandas的astype()方法可以转换数据类型,pd.to_datetime()专门处理日期转换,pd.to_numeric()处理数字转换。空值处理方面,需要针对不同字段制定不同的策略:数值字段可以填充0或平均值,文本字段可以填充""或"未知",关键业务字段(如订单号)的空值应触发告警而非自动填充。pandas的fillna()方法提供了灵活的填充能力,dropna()可以删除空值过多的行。建议为每个字段定义清洗规则,形成完整的数据质量规则库。
# 全面的数据类型转换与空值处理
df = pd.read_excel("orders_import.xlsx" )
# 日期字段统一转换
date_cols = ["order_date" , "delivery_date" ]
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors="coerce" )
# 数值字段强制转换,无效值设为0
numeric_cols = ["quantity" , "unit_price" , "total_amount" ]
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors="coerce" ).fillna(0 )
# 文本字段去除首尾空格,空值填充默认值
text_cols = ["customer_name" , "product_name" ]
for col in text_cols:
df[col] = df[col].astype(str ).str.strip().replace("nan" , "未知" )
# 删除关键字段为空的行
df.dropna(subset=["order_id" ], inplace=True )
print (f"清洗完成,有效数据 {len(df)} 条" )
去重策略与编码统一
数据去重是数据清洗中的常见需求,需要根据业务规则确定"重复"的定义。全字段重复是最严格的去重,可以直接使用df.drop_duplicates()。更常见的场景是基于关键字段去重(如客户编号、订单号唯一),可以使用df.drop_duplicates(subset=["customer_code"]),并可以指定保留第一条还是最后一条记录。编码统一是跨系统数据交换的另一个常见问题。Excel文件可能使用GBK、GB2312、UTF-8等不同编码,Python的openpyxl库可以自动处理.xlsx文件的编码,但对于.csv文件则需要显式指定编码格式。建议在数据导入初期就将所有文本统一转换为UTF-8编码,避免后续处理中的编码混乱。
# 综合去重与编码处理示例
import pandas as pd
import chardet
# 自动检测CSV文件编码
with open ("data_export.csv" , "rb" ) as f:
encoding = chardet.detect(f.read(10000 ))["encoding" ]
print (f"检测到编码:{encoding}" )
# 读取CSV(指定编码)
df = pd.read_csv("data_export.csv" , encoding=encoding)
# 去重策略
# 1. 完全去重
df_clean = df.drop_duplicates()
# 2. 基于关键字段去重(保留最新记录)
df_clean = df.sort_values("update_time" ).drop_duplicates(
subset=["customer_code" ], keep="last"
)
print (f"去重前:{len(df)} 条,去重后:{len(df_clean)} 条" )
七、定时同步
schedule库实现轻量定时
对于不需要持久化调度、随应用进程启动和停止的轻量级定时任务,Python的schedule库是理想选择。它提供了简洁的API,使用类似自然语言的方式定义定时规则,如schedule.every().day.at("08:00").do(job)表示每天早上8点执行任务。schedule库适合单进程、无需任务持久化、不需要分布式调度的场景。它的设计哲学是简单易用,但功能相对有限:不支持集群部署、任务失败不会自动重试、调度器本身不具备高可用性。
import schedule
import time
from datetime import datetime
def sync_sales_data ():
"""同步销售数据的定时任务"""
print (f"[{datetime.now()}] 开始同步销售数据..." )
# 实际同步逻辑
# df = pd.read_excel("sales_today.xlsx")
# df.to_sql("sales", engine, if_exists="append")
print (f"[{datetime.now()}] 同步完成" )
# 定义定时任务
schedule.every().day.at("08:00" ).do(sync_sales_data)
schedule.every().day.at("20:00" ).do(sync_sales_data)
schedule.every(30 ).minutes.do(sync_sales_data) # 每30分钟执行一次
# 循环运行调度器
while True :
schedule.run_pending()
time.sleep(1 )
APScheduler企业级调度
对于需要持久化任务、支持集群部署、提供丰富调度策略的企业级场景,APScheduler(Advanced Python Scheduler)是更强大的选择。APScheduler提供了四种组件:触发器(决定任务何时执行)、任务存储器(保存任务状态)、执行器(运行任务)和调度器(协调前三者)。APScheduler支持cron表达式、间隔触发、日期触发三种触发器类型;支持SQLite、Redis、MongoDB等多种任务存储器,实现任务的持久化和恢复;支持线程池和进程池执行器,可以并发运行多个任务。特别地,当使用SQLite作为任务存储器时,任务定义和执行记录会被持久化到数据库中,即使应用程序重启,未完成的任务也不会丢失。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
# 创建调度器,使用SQLite持久化任务
jobstores = {
"default" : SQLAlchemyJobStore(url="sqlite:///jobs.db" )
}
scheduler = BlockingScheduler(jobstores=jobstores)
# 定义同步任务
def sync_excel_to_mysql (excel_path, table_name):
print (f"执行同步:{excel_path} -> {table_name}" )
# 实际同步逻辑
# df = pd.read_excel(excel_path)
# df.to_sql(table_name, engine, if_exists="replace")
# 添加定时任务:每天凌晨1点执行
scheduler.add_job(
sync_excel_to_mysql,
CronTrigger(hour=1 , minute=0 ),
args=["daily_sales.xlsx" , "daily_sales" ],
id="sync_sales_job" ,
replace_existing=True
)
# 启动调度器
scheduler.start()
配置驱动同步与监控
当管理的数据同步任务增多时,将同步任务的配置(数据源、目标表、同步策略、调度规则等)外置到配置文件中,可以极大提升系统的可维护性。可以设计一个同步任务注册表(可以是数据库表或JSON配置文件),每个任务包含源文件路径、目标数据库连接、表名、同步方式(全量/增量)、调度cron表达式等字段。系统启动时读取配置,动态注册所有同步任务。同步监控方面,可以集成Prometheus指标暴露或简单的健康检查API,实时查看每个同步任务的执行状态、延迟情况和错误率。当同步失败时,通过邮件、钉钉或企业微信发送告警通知。
import json
import pandas as pd
from sqlalchemy import create_engine
# 配置驱动的同步引擎
class SyncEngine :
def __init__ (self , config_path: str ):
with open (config_path, "r" , encoding="utf-8" ) as f:
self .tasks = json.load(f)
def run_task (self , task_name: str ):
task = self .tasks[task_name]
engine = create_engine(task["target_db" ])
df = pd.read_excel(task["source_file" ], sheet_name=task["sheet" ])
if task["sync_mode" ] == "incremental" :
# 增量模式:按时间戳过滤
last = pd.read_sql(f"SELECT MAX({task['timestamp_col']}) FROM {task['table']}" , engine)
if not last.iloc[0 , 0 ]:
last_value = "1970-01-01"
df = df[df[task["timestamp_col" ]] > last_value]
df.to_sql(task["table" ], engine, index=False ,
if_exists="append" , chunksize=1000 )
return {"task" : task_name, "rows" : len(df), "status" : "success" }
def run_all (self ):
results = []
for task_name in self .tasks:
results.append(self .run_task(task_name))
return results
# 使用示例
engine = SyncEngine("sync_tasks.json" )
results = engine.run_all()
print (json.dumps(results, ensure_ascii=False , indent=2 ))
八、实战案例
案例一:每日销售数据入库
某电商公司每天需要将各门店的销售Excel报表汇总导入到MySQL数据库中,用于后续的数据分析和BI展示。数据文件以日期命名(如"2025-05-05_门店销售.xlsx"),包含订单号、门店名称、产品名称、销售数量、单价、总金额和销售日期等字段。要求每天晚上11点自动导入当天数据,导入前需校验数据的完整性和一致性,导入后发送导入结果通知。我们使用APScheduler实现定时调度,pandas实现数据读取和入库,并加入了数据校验和日志记录功能。系统运行半年来,日均处理约5万条销售记录,导入成功率保持在99.8%以上。
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(message)s" )
def import_daily_sales ():
"""每日销售数据导入任务"""
today = datetime.now().strftime("%Y-%m-%d" )
excel_path = f"/data/sales_reports/{today}_门店销售.xlsx"
engine = create_engine("mysql+pymysql://user:pass@localhost:3306/sales_db" )
try :
# 1. 读取Excel
df = pd.read_excel(excel_path)
logging.info(f"读取 {excel_path},共 {len(df)} 条记录" )
# 2. 数据清洗
df["sales_date" ] = pd.to_datetime(df["sales_date" ])
df["total_amount" ] = pd.to_numeric(df["total_amount" ], errors="coerce" )
df.dropna(subset=["order_no" ], inplace=True )
# 3. 写入数据库
with engine.begin() as conn:
df.to_sql("daily_sales" , conn, index=False ,
if_exists="append" , chunksize=500 )
# 4. 数据校验
count = pd.read_sql("SELECT COUNT(*) FROM daily_sales WHERE sales_date = :d" ,
engine, params={"d" : today}).iloc[0 , 0 ]
logging.info(f"导入成功!数据库中共 {count} 条今日销售记录" )
return True
except FileNotFoundError:
logging.warning(f"今日销售报表未找到:{excel_path}" )
return False
except Exception as e:
logging.error(f"导入失败:{e}" )
return False
案例二:报表数据导出Excel
某集团公司财务部每月需要从PostgreSQL数据库中导出各类财务报表,包括利润表、资产负债表、现金流量表等。要求导出的Excel文件格式美观,表头加粗加背景色,数字格式保留两位小数,日期字段格式化为"YYYY年MM月DD日",并且冻结首行方便查看。我们使用pandas读取SQL查询结果,通过openpyxl对生成的Excel进行格式化处理,实现了多Sheet一键生成完整财务报告包。该方案上线后,财务人员每月手动处理报表的时间从2天缩短到10分钟,大幅提升了工作效率。
import pandas as pd
from sqlalchemy import create_engine
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side, numbers
from openpyxl.utils import get_column_letter
engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/finance_db" )
def export_monthly_report (year, month):
output_path = f"财务报表_{year}年{month:02d}月.xlsx"
# 定义各个报表的SQL查询
reports = {
"利润表" : "SELECT * FROM v_profit_statement WHERE year=:y AND month=:m" ,
"资产负债表" : "SELECT * FROM v_balance_sheet WHERE report_date=:d" ,
"现金流量表" : "SELECT * FROM v_cash_flow WHERE year=:y AND month=:m"
}
header_font = Font(bold=True , size=11 , color="FFFFFF" )
header_fill = PatternFill(start_color="2E7D32" , end_color="2E7D32" , fill_type="solid" )
thin_border = Border(
left=Side(style="thin" ), right=Side(style="thin" ),
top=Side(style="thin" ), bottom=Side(style="thin" )
)
with pd.ExcelWriter(output_path, engine="openpyxl" ) as writer:
for sheet_name, query in reports.items():
params = {"y" : year, "m" : month, "d" : f"{year}-{month:02d}-01" }
df = pd.read_sql(query, engine, params=params)
df.to_excel(writer, sheet_name=sheet_name, index=False , startrow=0 )
# 格式化样式
ws = writer.sheets[sheet_name]
for cell in ws[1 ]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center" )
cell.border = thin_border
print (f"财务报告已生成:{output_path}" )
案例三:客户数据批量导入系统
某CRM系统需要支持从Excel批量导入客户数据,设计了一套完整的导入流程:用户上传Excel文件 → 前端预览数据 → 后端校验清洗 → 写入数据库 → 返回导入结果。系统需要处理大量边缘情况:重复客户检测(按手机号或邮箱去重)、地址标准化、省份城市自动补全、非法字符过滤等。对于导入失败的数据,需要生成详细的错误报告Excel,标注每条记录的具体错误原因,方便用户修正后重新导入。该系统的核心是数据校验模块,它定义了20余条校验规则,覆盖格式校验、业务逻辑校验和关联数据校验三个层级,确保导入数据的质量。
import pandas as pd
from sqlalchemy import create_engine, text
import re
class CustomerImporter :
def __init__ (self , db_url):
self .engine = create_engine(db_url)
def validate_row (self , row):
"""单行数据校验,返回错误信息列表"""
errors = []
if pd.isna(row.get("phone" )):
errors.append("手机号不能为空" )
elif not re.match(r"^1\d{10}$" , str (row["phone" ])):
errors.append("手机号格式不正确" )
if pd.isna(row.get("name" )):
errors.append("客户姓名不能为空" )
return errors
def batch_import (self , excel_path):
df = pd.read_excel(excel_path)
success_rows, fail_rows = [], []
for idx, row in df.iterrows():
errors = self .validate_row(row)
if errors:
row["错误原因" ] = "; " .join(errors)
fail_rows.append(row)
else :
success_rows.append(row)
# 写入成功数据
if success_rows:
df_success = pd.DataFrame(success_rows)
df_success.to_sql("customers" , self .engine,
index=False , if_exists="append" )
# 导出失败记录
if fail_rows:
df_fail = pd.DataFrame(fail_rows)
df_fail.to_excel("导入失败记录.xlsx" , index=False )
return {
"total" : len(df),
"success" : len(success_rows),
"fail" : len(fail_rows),
"fail_file" : "导入失败记录.xlsx" if fail_rows else None
}
# 使用示例
importer = CustomerImporter("mysql+pymysql://user:pass@localhost:3306/crm" )
result = importer.batch_import("客户导入模板.xlsx" )
print (f"导入结果:总计 {result['total']} 条,成功 {result['success']} 条,失败 {result['fail']} 条" )