Excel与数据库数据互导

Python 办公自动化专题 · 打通Excel与数据库之间的数据通道

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, Excel, 数据库, 数据导入, 数据导出, SQLite, MySQL, PostgreSQL, Python

一、数据互导概述

应用场景

在现代企业信息化建设中,Excel与数据库之间的数据互导是最常见也最核心的需求之一。Excel作为业务人员最熟悉的数据工具,拥有直观的表格界面和灵活的编辑能力,而数据库系统则提供了结构化存储、高效查询、事务安全和企业级管理能力。打通二者之间的数据通道,可以实现从数据采集、清洗、存储到分析的全流程自动化。

典型的应用场景包括:数据迁移——将历史遗留的Excel数据一次性导入数据库系统,完成数据平台的升级换代;报表生成——从数据库中提取业务数据,导出为Excel格式供管理层审阅或向客户交付;ETL流程——构建从业务系统数据库到数据仓库的抽取-转换-加载管道,其中Excel常作为中间数据交换格式或数据源存在;批量导入——通过Excel模板批量录入客户信息、商品目录、库存数据等,由自动化脚本校验后写入数据库。

技术方案对比

Python生态系统为Excel与数据库互导提供了多层次的解决方案。最基础的方案是使用openpyxlxlrd/xlwt直接操作Excel文件,配合sqlite3pymysql等数据库驱动手动读写数据。这种方式控制力最强,但代码量较大,需要自行处理数据类型映射和异常处理。推荐的生产级方案是使用pandas库配合SQLAlchemy,pandas的read_excel/to_excelread_sql/to_sql

import pandas as pd from sqlalchemy import create_engine # 从Excel读取数据 df = pd.read_excel("sales_data.xlsx", sheet_name="Sheet1") # 写入SQLite数据库 engine = create_engine("sqlite:///sales.db") df.to_sql("sales_records", engine, index=False, if_exists="replace") # 验证写入结果 result = pd.read_sql("SELECT COUNT(*) FROM sales_records", engine) print(f"共导入 {result.iloc[0,0]} 条记录")

架构设计原则

在设计数据互导系统时,应遵循以下原则:解耦——数据读取、数据转换、数据写入三个环节应分离为独立模块,便于单独测试和复用;可配置——数据库连接信息、字段映射关系、数据清洗规则等应外置到配置文件中,避免硬编码;容错——针对脏数据、网络超时、文件损坏等异常情况设计重试和回滚机制;可追溯——记录每次同步操作的日志,包括执行时间、处理记录数、异常详情等,便于问题排查和审计。

# 配置驱动的数据互导架构示例 import json import pandas as pd from sqlalchemy import create_engine def load_config(config_path: str) -> dict: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) def excel_to_db(config: dict): df = pd.read_excel(config["source_file"], sheet_name=config["sheet"]) # 字段映射与清洗 df.rename(columns=config["column_mapping"], inplace=True) df = df[config["target_columns"]] # 写入数据库 engine = create_engine(config["db_url"]) df.to_sql(config["table_name"], engine, index=False, if_exists=config.get("if_exists", "append"), chunksize=config.get("chunksize", 1000)) print(f"成功导入 {len(df)} 条记录到 {config['table_name']}") # 调用示例 config = load_config("sync_config.json") excel_to_db(config)

在实际工程中,架构设计还需要考虑数据量级。对于数千行级别的小批量数据,使用pandas直接读写即可满足需求。当数据量达到数十万行甚至百万行级别时,需要引入分块处理、批量提交和流式读写等优化策略。对于跨数据库类型的企业级应用,SQLAlchemy的方言(Dialect)机制可以自动适配不同数据库的语法差异,是构建可移植数据互导系统的基础组件。

二、Excel→SQLite

pandas + sqlite3 快速导入

SQLite是嵌入式关系数据库引擎,无需独立服务器进程,数据库文件即为单一文件,非常适合桌面应用、原型开发和中小规模数据管理。使用pandas配合sqlite3模块是将Excel数据导入SQLite最高效的方式。pandas的read_excel()方法可以直接读取Excel文件为DataFrame,然后通过to_sql()方法将DataFrame写入SQLite表。整个过程只需寥寥数行代码,且自动处理了数据类型推断和表结构创建。

import pandas as pd import sqlite3 # 步骤1:从Excel读取数据 df = pd.read_excel("products.xlsx") print(f"读取到 {len(df)} 行数据") print("列名:", list(df.columns)) # 步骤2:连接到SQLite数据库(不存在则自动创建) conn = sqlite3.connect("inventory.db") # 步骤3:将DataFrame写入SQLite表 df.to_sql("products", conn, index=False, if_exists="replace") # 步骤4:验证数据 result = pd.read_sql("SELECT * FROM products LIMIT 5", conn) print(result) conn.close()

openpyxl逐行插入

当需要对Excel读取过程进行精细控制时,可以使用openpyxl逐行读取并配合SQL参数化插入。这种方式虽然代码量稍大,但可以在数据处理过程中加入自定义校验逻辑,并且通过批量提交可以显著提升写入性能。逐行插入的另一大优势是内存控制——对于超大Excel文件,pandas的read_excel会将整个文件加载到内存,而openpyxl的read_only=True模式可以按行流式读取,内存占用仅为单个数据行的量级。

import sqlite3 from openpyxl import load_workbook def excel_to_sqlite_batch(excel_path, db_path, table_name, batch_size=500): """逐行读取Excel并批量写入SQLite""" conn = sqlite3.connect(db_path) cursor = conn.cursor() wb = load_workbook(excel_path, read_only=True) ws = wb.active # 读取表头 headers = [cell.value for cell in next(ws.iter_rows())] placeholders = ", ".join(["?"] * len(headers)) columns = ", ".join(headers) insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 批量插入 batch = [] total = 0 for row in ws.iter_rows(values_only=True): # 跳过空行 if all(cell is None for cell in row): continue batch.append(row) if len(batch) >= batch_size: cursor.executemany(insert_sql, batch) conn.commit() total += len(batch) batch = [] # 插入剩余数据 if batch: cursor.executemany(insert_sql, batch) conn.commit() total += len(batch) wb.close() conn.close() print(f"共导入 {total} 条记录到 {table_name}")

数据类型映射与批量写入优化

Excel与SQLite之间的数据类型映射需要特别关注。Excel中常见的类型(文本、数字、日期、布尔值)与SQLite的存储类(TEXT、INTEGER、REAL、BLOB)存在对应关系,但实际转换中可能出现精度丢失、日期格式不兼容等问题。建议在导入前对DataFrame的dtypes进行检查和转换,特别是将object类型中的日期字符串统一转为datetime类型,将浮点数字段中的NaN值处理为None或0。批量写入时,chunksize参数的设置对性能影响显著——过小会导致频繁的数据库事务提交,过大则可能占用过多内存。根据经验,chunksize设置在500-2000之间可以获得较好的平衡。

import pandas as pd import sqlite3 from datetime import datetime # 数据清洗与类型映射 df = pd.read_excel("orders.xlsx") # 将日期字符串转为datetime df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce") # 将金额字段转为浮点数,非数字设为NaN df["amount"] = pd.to_numeric(df["amount"], errors="coerce").fillna(0.0) # 填充空值 df["customer_name"] = df["customer_name"].fillna("未知客户") # 分块写入SQLite conn = sqlite3.connect("orders.db") df.to_sql("orders", conn, index=False, if_exists="append", chunksize=1000) conn.close() print(f"导入完成,共处理 {len(df)} 条订单记录")

三、Excel→MySQL / PostgreSQL

SQLAlchemy统一连接

对于MySQL和PostgreSQL这样的企业级数据库,推荐使用SQLAlchemy作为数据库连接层。SQLAlchemy提供了统一的ORM和Core API,上层pandas的to_sqlread_sql方法天然支持SQLAlchemy Engine对象。这意味着切换数据库后端时,只需要修改连接字符串即可,业务逻辑代码无需改动。MySQL的连接字符串格式为mysql+pymysql://user:pass@host:port/dbname,PostgreSQL的格式为postgresql+psycopg2://user:pass@host:port/dbname。建议将连接字符串存储在环境变量或配置文件中,避免敏感信息硬编码。

import pandas as pd from sqlalchemy import create_engine # MySQL连接 mysql_engine = create_engine( "mysql+pymysql://root:password@localhost:3306/business_db", pool_size=5, max_overflow=10, echo=False ) # PostgreSQL连接 pg_engine = create_engine( "postgresql+psycopg2://postgres:password@localhost:5432/business_db", pool_size=5, max_overflow=10 ) # 从Excel读取并写入MySQL df = pd.read_excel("sales_2025.xlsx") df.to_sql("sales", mysql_engine, index=False, if_exists="append", chunksize=2000) print("数据已写入MySQL") # 同样的代码,只需换engine即可写入PostgreSQL # df.to_sql("sales", pg_engine, index=False, if_exists="append", chunksize=2000)

主键与索引处理

pandas的to_sql方法在默认情况下不会自动创建主键和索引,导入的数据表需要后续手动添加约束。建议的实践是先通过DDL语句创建目标表结构(包括主键、外键、唯一约束和索引),然后使用to_sql(if_exists="append")以追加模式写入数据。这样可以确保数据完整性约束在导入过程中生效,同时获得更好的查询性能。对于大数据量的场景,可以考虑先禁用索引、批量导入数据、再重建索引的策略,这可以显著提升写入速度。

from sqlalchemy import create_engine, text engine = create_engine("mysql+pymysql://root:password@localhost:3306/business_db") # 步骤1:先创建表结构(含主键和索引) with engine.connect() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS customers ( id INT AUTO_INCREMENT PRIMARY KEY, customer_code VARCHAR(50) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, phone VARCHAR(20), email VARCHAR(200), province VARCHAR(50), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_province (province), INDEX idx_created (created_at) ) """)) conn.commit() # 步骤2:从Excel读取并追加写入 df = pd.read_excel("customers.xlsx") df.to_sql("customers", engine, index=False, if_exists="append", chunksize=500, method="multi") -- 使用多行插入语法提升性能 print(f"客户数据导入完成,共 {len(df)} 条")

事务安全与批量写入

数据导入的原子性保障至关重要。特别是在生产环境中,导入过程中可能发生网络中断、唯一键冲突、数据类型错误等异常。使用数据库事务可以确保导入操作要么全部成功,要么全部回滚,避免出现部分写入导致的数据不一致。SQLAlchemy的Engine默认启用了事务,当调用conn.commit()或上下文管理器退出时才会真正提交。在pandas的to_sql中,可以通过method参数自定义写入方法,例如使用PostgreSQL的COPY命令实现极速批量加载。

from sqlalchemy import create_engine, text import pandas as pd engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/business_db") # 事务安全的导入函数 def safe_import_from_excel(excel_path, table_name, engine): """在事务中安全导入Excel数据到数据库""" try: df = pd.read_excel(excel_path) # 使用事务上下文 with engine.begin() as conn: # 先清理目标表中的旧数据(可选) # conn.execute(text(f"TRUNCATE TABLE {table_name}")) df.to_sql(table_name, conn, index=False, if_exists="append", chunksize=1000) print(f"导入成功:{len(df)} 条记录") return True except Exception as e: print(f"导入失败,已回滚:{e}") return False

四、数据库→Excel

pandas读取SQL直接导出为Excel

从数据库导出数据到Excel是最常见的报表生成需求。pandas的read_sql()方法可以直接执行SQL查询并将结果集转换为DataFrame,再通过to_excel()方法写入Excel文件。这种方式简洁高效,支持复杂的SQL查询(包括多表JOIN、聚合计算、子查询等),并且可以配合ExcelWriter实现多Sheet输出。生成的Excel文件可以设置列宽、冻结窗格、添加筛选器等格式化特性,使报表更加直观易用。

import pandas as pd from sqlalchemy import create_engine from openpyxl import load_workbook from openpyxl.styles import Font, PatternFill from openpyxl.utils import get_column_letter engine = create_engine("mysql+pymysql://root:password@localhost:3306/sales_db") # 执行SQL查询并导出到Excel query = """ SELECT DATE_FORMAT(order_date, '%%Y-%%m') AS 月份, region AS 区域, COUNT(*) AS 订单数, SUM(amount) AS 销售总额, AVG(amount) AS 平均订单金额 FROM orders WHERE order_date >= '2025-01-01' GROUP BY 月份, 区域 ORDER BY 月份, 区域 """ df = pd.read_sql(query, engine) # 导出到Excel并格式化 output_path = "月度销售报表.xlsx" with pd.ExcelWriter(output_path, engine="openpyxl") as writer: df.to_excel(writer, sheet_name="销售汇总", index=False) # 自动调整列宽 worksheet = writer.sheets["销售汇总"] for i, col in enumerate(df.columns, 1): max_len = max(df[col].astype(str).str.len().max(), len(col)) + 2 worksheet.column_dimensions[get_column_letter(i)].width = min(max_len, 30) print(f"报表已生成:{output_path}")

游标逐行读取写入

当数据库查询结果集非常庞大(数百万行)时,使用pandas一次性加载全部数据可能导致内存溢出。此时应该使用数据库游标(Cursor)逐行或分批读取数据,配合openpyxl的write_only=True模式逐行写入Excel。这种方式的内存占用是恒定的,与数据总量无关,可以处理任意大小的数据集。游标方式还允许在读取过程中进行实时数据处理,如数据脱敏、格式转换、异常值过滤等。需要特别注意的是,逐行写入Excel的性能瓶颈通常在IO而非CPU,适当增大批处理大小可以平衡内存和性能。

import mysql.connector from openpyxl import Workbook def export_large_query_to_excel(db_config, query, output_path, batch_size=10000): """使用游标逐批读取大数据量查询结果并写入Excel""" conn = mysql.connector.connect(**db_config) cursor = conn.cursor(buffered=True) cursor.execute(query) # 获取列名 columns = [desc[0] for desc in cursor.description] # 创建Excel工作簿(write_only模式节省内存) wb = Workbook(write_only=True) ws = wb.create_sheet() ws.append(columns) # 分批读取并写入 total = 0 while True: rows = cursor.fetchmany(batch_size) if not rows: break for row in rows: ws.append(row) total += len(rows) print(f"已处理 {total} 行...") wb.save(output_path) cursor.close() conn.close() print(f"导出完成,共 {total} 条记录 -> {output_path}")

大数据量分Sheet导出

Excel单个Sheet的最大行数为1048576行,但实际使用中,当数据量超过数万行时,打开和操作Excel文件会变得缓慢。更好的做法是将数据按业务维度分拆到多个Sheet中,例如按月份、按区域或按产品类别分Sheet。pandas的ExcelWriter对象支持向同一个Excel文件的多个Sheet写入数据,结合groupby操作可以轻松实现分Sheet导出。此外,对于超大规模数据集,还可以将数据拆分为多个Excel文件,每个文件包含一个独立的数据子集,文件按序号或业务关键词命名。

import pandas as pd from sqlalchemy import create_engine engine = create_engine("postgresql+psycopg2://postgres:password@localhost:5432/sales_db") # 按月份分Sheet导出销售数据 df = pd.read_sql("SELECT * FROM orders WHERE year = 2025", engine) df["month"] = pd.to_datetime(df["order_date"]).dt.month output_path = "2025年销售数据_按月分Sheet.xlsx" with pd.ExcelWriter(output_path, engine="openpyxl") as writer: for month, group in df.groupby("month"): sheet_name = f"{int(month)}月" group.to_excel(writer, sheet_name=sheet_name, index=False) print(f"已写入 {sheet_name}: {len(group)} 条记录") print(f"多Sheet报表已生成:{output_path}")

五、增量同步

时间戳增量同步

在实际业务中,全量数据同步往往只在首次初始化时执行一次,后续的日常同步应使用增量方式,仅同步发生变化的数据。最常用的是基于时间戳的增量同步策略:在源数据表和目标表中都维护一个"最后修改时间"字段,每次同步时只拉取该时间戳之后新增或修改的数据。这种策略实现简单、性能优秀,但要求源系统必须可靠地维护时间戳字段,并且在跨时区场景下需要统一转换为UTC时间。时间戳增量同步适合数据只增不改或很少修改的场景,如日志数据、订单记录等。

import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime def incremental_sync_by_timestamp(excel_path, db_url, table_name, timestamp_col): """基于时间戳的增量同步""" engine = create_engine(db_url) # 获取目标表中最大的时间戳 query = f"SELECT MAX({timestamp_col}) FROM {table_name}" last_sync = pd.read_sql(query, engine).iloc[0, 0] if last_sync is None: # 首次同步:全量导入 df = pd.read_excel(excel_path) print("首次同步,全量导入") else: # 仅读取时间戳之后的数据 # 假设Excel数据也可以按时间过滤 df = pd.read_excel(excel_path) df = df[pd.to_datetime(df[timestamp_col]) > last_sync] print(f"增量同步,上次同步时间:{last_sync},本次新增:{len(df)} 条") if not df.empty: df.to_sql(table_name, engine, index=False, if_exists="append", chunksize=500) print(f"同步完成,新增 {len(df)} 条记录") else: print("没有新增数据,跳过同步") return len(df) if not df.empty else 0

自增ID增量与变更数据捕获

自增ID增量同步是另一种常见策略,适用于有序追加的数据(如流水号、日志ID等)。同步时记录上次同步的最大ID值,下次只拉取ID更大的数据。自增ID策略的优点是不依赖时间戳字段,性能极佳,但缺点是无法捕获对已有数据的修改操作。对于需要完整捕获数据变更的场景,可以考虑变更数据捕获(CDC)方案。CDC通过订阅数据库的变更日志(如MySQL的binlog、PostgreSQL的WAL),实时捕获所有INSERT、UPDATE、DELETE操作。虽然在Python中实现完整的CDC较为复杂,但可以借助第三方工具如Debezium、Maxwell或Canal来捕获变更事件,再通过消息队列传递给Python处理程序。

# 基于自增ID的增量同步 import pandas as pd from sqlalchemy import create_engine def sync_by_increment_id(db_url, source_table, target_table, id_col, excel_output): engine = create_engine(db_url) # 获取目标表已同步的最大ID max_id = pd.read_sql(f"SELECT COALESCE(MAX({id_col}), 0) FROM {target_table}", engine).iloc[0, 0] # 拉取增量数据 new_data = pd.read_sql( f"SELECT * FROM {source_table} WHERE {id_col} > {max_id}", engine ) if not new_data.empty: new_data.to_excel(excel_output, index=False) print(f"增量导出 {len(new_data)} 条到 {excel_output}") else: print("无增量数据") return new_data

数据比对与同步日志

数据比对是增量同步的正确性保障。在同步完成后,应对比源端和目标端的记录数、关键指标的汇总值(如总金额、总条数),确保数据一致性。对于数据量较大的场景,可以使用checksum(哈希校验)方式进行快速比对。同步日志记录了每次同步的详细信息,包括开始时间、结束时间、同步类型(全量/增量)、处理记录数、异常记录数和错误详情。完善的同步日志不仅便于问题排查,也是数据审计的重要依据。建议将同步日志写入单独的日志表或日志文件中,并提供查询接口供运维人员查看同步状态。

import logging from datetime import datetime # 配置同步日志 logging.basicConfig( filename="sync_log.txt", level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", encoding="utf-8" ) def validate_sync(engine, table_name, expected_count): """同步后的数据校验""" actual_count = pd.read_sql(f"SELECT COUNT(*) FROM {table_name}", engine).iloc[0, 0] if actual_count == expected_count: logging.info(f"校验通过:{table_name} 共 {actual_count} 条") return True else: logging.warning(f"校验失败:期望 {expected_count} 条,实际 {actual_count} 条") return False

六、数据清洗与映射

列名映射

Excel文件中的列名通常采用中文或非标准命名,而数据库中的字段名往往是英文或遵循特定命名规范。列名映射是数据导入的第一步,需要建立Excel列名到数据库字段名的对应关系。推荐使用字典来定义映射关系,将映射配置外置到JSON或YAML文件中,使得非开发人员也能维护映射规则。在pandas中,可以使用df.rename(columns=mapping)方法快速完成列名替换,然后通过列选择子集确保只导入需要的字段。对于缺失字段,应该设置默认值或触发告警。

import pandas as pd import json # 列名映射配置 column_mapping = { "客户编号": "customer_code", "客户名称": "customer_name", "联系电话": "phone", "电子邮箱": "email", "所在省份": "province", "注册日期": "reg_date", "客户等级": "tier" } # 应用映射 df = pd.read_excel("客户数据.xlsx") df.rename(columns=column_mapping, inplace=True) # 只保留目标字段,缺失字段填充默认值 required_cols = list(column_mapping.values()) for col in required_cols: if col not in df.columns: df[col] = None df = df[required_cols] print("映射后列名:", list(df.columns))

数据类型转换与空值处理

Excel中的数据类型与数据库类型并非一一对应,数据导入时的类型转换是数据质量的关键环节。pandas的astype()方法可以转换数据类型,pd.to_datetime()专门处理日期转换,pd.to_numeric()处理数字转换。空值处理方面,需要针对不同字段制定不同的策略:数值字段可以填充0或平均值,文本字段可以填充""或"未知",关键业务字段(如订单号)的空值应触发告警而非自动填充。pandas的fillna()方法提供了灵活的填充能力,dropna()可以删除空值过多的行。建议为每个字段定义清洗规则,形成完整的数据质量规则库。

# 全面的数据类型转换与空值处理 df = pd.read_excel("orders_import.xlsx") # 日期字段统一转换 date_cols = ["order_date", "delivery_date"] for col in date_cols: df[col] = pd.to_datetime(df[col], errors="coerce") # 数值字段强制转换,无效值设为0 numeric_cols = ["quantity", "unit_price", "total_amount"] for col in numeric_cols: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) # 文本字段去除首尾空格,空值填充默认值 text_cols = ["customer_name", "product_name"] for col in text_cols: df[col] = df[col].astype(str).str.strip().replace("nan", "未知") # 删除关键字段为空的行 df.dropna(subset=["order_id"], inplace=True) print(f"清洗完成,有效数据 {len(df)} 条")

去重策略与编码统一

数据去重是数据清洗中的常见需求,需要根据业务规则确定"重复"的定义。全字段重复是最严格的去重,可以直接使用df.drop_duplicates()。更常见的场景是基于关键字段去重(如客户编号、订单号唯一),可以使用df.drop_duplicates(subset=["customer_code"]),并可以指定保留第一条还是最后一条记录。编码统一是跨系统数据交换的另一个常见问题。Excel文件可能使用GBK、GB2312、UTF-8等不同编码,Python的openpyxl库可以自动处理.xlsx文件的编码,但对于.csv文件则需要显式指定编码格式。建议在数据导入初期就将所有文本统一转换为UTF-8编码,避免后续处理中的编码混乱。

# 综合去重与编码处理示例 import pandas as pd import chardet # 自动检测CSV文件编码 with open("data_export.csv", "rb") as f: encoding = chardet.detect(f.read(10000))["encoding"] print(f"检测到编码:{encoding}") # 读取CSV(指定编码) df = pd.read_csv("data_export.csv", encoding=encoding) # 去重策略 # 1. 完全去重 df_clean = df.drop_duplicates() # 2. 基于关键字段去重(保留最新记录) df_clean = df.sort_values("update_time").drop_duplicates( subset=["customer_code"], keep="last" ) print(f"去重前:{len(df)} 条,去重后:{len(df_clean)} 条")

七、定时同步

schedule库实现轻量定时

对于不需要持久化调度、随应用进程启动和停止的轻量级定时任务,Python的schedule库是理想选择。它提供了简洁的API,使用类似自然语言的方式定义定时规则,如schedule.every().day.at("08:00").do(job)表示每天早上8点执行任务。schedule库适合单进程、无需任务持久化、不需要分布式调度的场景。它的设计哲学是简单易用,但功能相对有限:不支持集群部署、任务失败不会自动重试、调度器本身不具备高可用性。

import schedule import time from datetime import datetime def sync_sales_data(): """同步销售数据的定时任务""" print(f"[{datetime.now()}] 开始同步销售数据...") # 实际同步逻辑 # df = pd.read_excel("sales_today.xlsx") # df.to_sql("sales", engine, if_exists="append") print(f"[{datetime.now()}] 同步完成") # 定义定时任务 schedule.every().day.at("08:00").do(sync_sales_data) schedule.every().day.at("20:00").do(sync_sales_data) schedule.every(30).minutes.do(sync_sales_data) # 每30分钟执行一次 # 循环运行调度器 while True: schedule.run_pending() time.sleep(1)

APScheduler企业级调度

对于需要持久化任务、支持集群部署、提供丰富调度策略的企业级场景,APScheduler(Advanced Python Scheduler)是更强大的选择。APScheduler提供了四种组件:触发器(决定任务何时执行)、任务存储器(保存任务状态)、执行器(运行任务)和调度器(协调前三者)。APScheduler支持cron表达式、间隔触发、日期触发三种触发器类型;支持SQLite、Redis、MongoDB等多种任务存储器,实现任务的持久化和恢复;支持线程池和进程池执行器,可以并发运行多个任务。特别地,当使用SQLite作为任务存储器时,任务定义和执行记录会被持久化到数据库中,即使应用程序重启,未完成的任务也不会丢失。

from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore import logging # 配置日志 logging.basicConfig(level=logging.INFO) # 创建调度器,使用SQLite持久化任务 jobstores = { "default": SQLAlchemyJobStore(url="sqlite:///jobs.db") } scheduler = BlockingScheduler(jobstores=jobstores) # 定义同步任务 def sync_excel_to_mysql(excel_path, table_name): print(f"执行同步:{excel_path} -> {table_name}") # 实际同步逻辑 # df = pd.read_excel(excel_path) # df.to_sql(table_name, engine, if_exists="replace") # 添加定时任务:每天凌晨1点执行 scheduler.add_job( sync_excel_to_mysql, CronTrigger(hour=1, minute=0), args=["daily_sales.xlsx", "daily_sales"], id="sync_sales_job", replace_existing=True ) # 启动调度器 scheduler.start()

配置驱动同步与监控

当管理的数据同步任务增多时,将同步任务的配置(数据源、目标表、同步策略、调度规则等)外置到配置文件中,可以极大提升系统的可维护性。可以设计一个同步任务注册表(可以是数据库表或JSON配置文件),每个任务包含源文件路径、目标数据库连接、表名、同步方式(全量/增量)、调度cron表达式等字段。系统启动时读取配置,动态注册所有同步任务。同步监控方面,可以集成Prometheus指标暴露或简单的健康检查API,实时查看每个同步任务的执行状态、延迟情况和错误率。当同步失败时,通过邮件、钉钉或企业微信发送告警通知。

import json import pandas as pd from sqlalchemy import create_engine # 配置驱动的同步引擎 class SyncEngine: def __init__(self, config_path: str): with open(config_path, "r", encoding="utf-8") as f: self.tasks = json.load(f) def run_task(self, task_name: str): task = self.tasks[task_name] engine = create_engine(task["target_db"]) df = pd.read_excel(task["source_file"], sheet_name=task["sheet"]) if task["sync_mode"] == "incremental": # 增量模式:按时间戳过滤 last = pd.read_sql(f"SELECT MAX({task['timestamp_col']}) FROM {task['table']}", engine) if not last.iloc[0, 0]: last_value = "1970-01-01" df = df[df[task["timestamp_col"]] > last_value] df.to_sql(task["table"], engine, index=False, if_exists="append", chunksize=1000) return {"task": task_name, "rows": len(df), "status": "success"} def run_all(self): results = [] for task_name in self.tasks: results.append(self.run_task(task_name)) return results # 使用示例 engine = SyncEngine("sync_tasks.json") results = engine.run_all() print(json.dumps(results, ensure_ascii=False, indent=2))

八、实战案例

案例一:每日销售数据入库

某电商公司每天需要将各门店的销售Excel报表汇总导入到MySQL数据库中,用于后续的数据分析和BI展示。数据文件以日期命名(如"2025-05-05_门店销售.xlsx"),包含订单号、门店名称、产品名称、销售数量、单价、总金额和销售日期等字段。要求每天晚上11点自动导入当天数据,导入前需校验数据的完整性和一致性,导入后发送导入结果通知。我们使用APScheduler实现定时调度,pandas实现数据读取和入库,并加入了数据校验和日志记录功能。系统运行半年来,日均处理约5万条销售记录,导入成功率保持在99.8%以上。

import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime, timedelta import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(message)s") def import_daily_sales(): """每日销售数据导入任务""" today = datetime.now().strftime("%Y-%m-%d") excel_path = f"/data/sales_reports/{today}_门店销售.xlsx" engine = create_engine("mysql+pymysql://user:pass@localhost:3306/sales_db") try: # 1. 读取Excel df = pd.read_excel(excel_path) logging.info(f"读取 {excel_path},共 {len(df)} 条记录") # 2. 数据清洗 df["sales_date"] = pd.to_datetime(df["sales_date"]) df["total_amount"] = pd.to_numeric(df["total_amount"], errors="coerce") df.dropna(subset=["order_no"], inplace=True) # 3. 写入数据库 with engine.begin() as conn: df.to_sql("daily_sales", conn, index=False, if_exists="append", chunksize=500) # 4. 数据校验 count = pd.read_sql("SELECT COUNT(*) FROM daily_sales WHERE sales_date = :d", engine, params={"d": today}).iloc[0, 0] logging.info(f"导入成功!数据库中共 {count} 条今日销售记录") return True except FileNotFoundError: logging.warning(f"今日销售报表未找到:{excel_path}") return False except Exception as e: logging.error(f"导入失败:{e}") return False

案例二:报表数据导出Excel

某集团公司财务部每月需要从PostgreSQL数据库中导出各类财务报表,包括利润表、资产负债表、现金流量表等。要求导出的Excel文件格式美观,表头加粗加背景色,数字格式保留两位小数,日期字段格式化为"YYYY年MM月DD日",并且冻结首行方便查看。我们使用pandas读取SQL查询结果,通过openpyxl对生成的Excel进行格式化处理,实现了多Sheet一键生成完整财务报告包。该方案上线后,财务人员每月手动处理报表的时间从2天缩短到10分钟,大幅提升了工作效率。

import pandas as pd from sqlalchemy import create_engine from openpyxl.styles import Font, PatternFill, Alignment, Border, Side, numbers from openpyxl.utils import get_column_letter engine = create_engine("postgresql+psycopg2://user:pass@localhost:5432/finance_db") def export_monthly_report(year, month): output_path = f"财务报表_{year}年{month:02d}月.xlsx" # 定义各个报表的SQL查询 reports = { "利润表": "SELECT * FROM v_profit_statement WHERE year=:y AND month=:m", "资产负债表": "SELECT * FROM v_balance_sheet WHERE report_date=:d", "现金流量表": "SELECT * FROM v_cash_flow WHERE year=:y AND month=:m" } header_font = Font(bold=True, size=11, color="FFFFFF") header_fill = PatternFill(start_color="2E7D32", end_color="2E7D32", fill_type="solid") thin_border = Border( left=Side(style="thin"), right=Side(style="thin"), top=Side(style="thin"), bottom=Side(style="thin") ) with pd.ExcelWriter(output_path, engine="openpyxl") as writer: for sheet_name, query in reports.items(): params = {"y": year, "m": month, "d": f"{year}-{month:02d}-01"} df = pd.read_sql(query, engine, params=params) df.to_excel(writer, sheet_name=sheet_name, index=False, startrow=0) # 格式化样式 ws = writer.sheets[sheet_name] for cell in ws[1]: cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") cell.border = thin_border print(f"财务报告已生成:{output_path}")

案例三:客户数据批量导入系统

某CRM系统需要支持从Excel批量导入客户数据,设计了一套完整的导入流程:用户上传Excel文件 → 前端预览数据 → 后端校验清洗 → 写入数据库 → 返回导入结果。系统需要处理大量边缘情况:重复客户检测(按手机号或邮箱去重)、地址标准化、省份城市自动补全、非法字符过滤等。对于导入失败的数据,需要生成详细的错误报告Excel,标注每条记录的具体错误原因,方便用户修正后重新导入。该系统的核心是数据校验模块,它定义了20余条校验规则,覆盖格式校验、业务逻辑校验和关联数据校验三个层级,确保导入数据的质量。

import pandas as pd from sqlalchemy import create_engine, text import re class CustomerImporter: def __init__(self, db_url): self.engine = create_engine(db_url) def validate_row(self, row): """单行数据校验,返回错误信息列表""" errors = [] if pd.isna(row.get("phone")): errors.append("手机号不能为空") elif not re.match(r"^1\d{10}$", str(row["phone"])): errors.append("手机号格式不正确") if pd.isna(row.get("name")): errors.append("客户姓名不能为空") return errors def batch_import(self, excel_path): df = pd.read_excel(excel_path) success_rows, fail_rows = [], [] for idx, row in df.iterrows(): errors = self.validate_row(row) if errors: row["错误原因"] = "; ".join(errors) fail_rows.append(row) else: success_rows.append(row) # 写入成功数据 if success_rows: df_success = pd.DataFrame(success_rows) df_success.to_sql("customers", self.engine, index=False, if_exists="append") # 导出失败记录 if fail_rows: df_fail = pd.DataFrame(fail_rows) df_fail.to_excel("导入失败记录.xlsx", index=False) return { "total": len(df), "success": len(success_rows), "fail": len(fail_rows), "fail_file": "导入失败记录.xlsx" if fail_rows else None } # 使用示例 importer = CustomerImporter("mysql+pymysql://user:pass@localhost:3306/crm") result = importer.batch_import("客户导入模板.xlsx") print(f"导入结果:总计 {result['total']} 条,成功 {result['success']} 条,失败 {result['fail']} 条")