← 返回自动化办公目录
← 返回学习笔记首页
专题: Python 自动化办公系统学习
关键词: Python, 自动化办公, 项目实战, 办公自动化, 综合项目, 系统设计, 自动化流程, Python, Jenkins
一、项目实战概述
综合办公自动化项目实战是将前序各章所学技术(Excel自动化、Word文档处理、PDF操作、邮件收发、定时任务、GUI桌面应用等)融合于一体的关键环节。独立的技术点只有在真实项目中串联起来,才能发挥出真正的生产力价值。本章旨在帮助学习者建立全局视野,掌握从需求分析到系统交付的完整项目流程。
1.1 技术栈整合策略
在一个完整的办公自动化项目中,通常需要同时调用多个技术栈协同工作。例如:使用 openpyxl 读取各部门上报的Excel数据,通过 python-docx 生成结构化周报Word文档,利用 pdfkit / reportlab 将成果转为PDF归档,再借助 smtplib / yagmail 通过邮件自动分发报告。定时触发层面可以使用 schedule 库或操作系统原生任务计划程序。如果需要图形界面,还可以嵌入 PyQt5 / tkinter 提供操作面板。
# 技术栈整合配置示例
PROJECT_STACK = {
"data_layer": {
"excel": "openpyxl >= 3.0",
"csv": "csv (built-in)",
"database": "sqlite3 / pymysql"
},
"document_layer": {
"word": "python-docx >= 0.8",
"pdf": "reportlab / pdfkit",
"template": "Jinja2 >= 3.0"
},
"communication_layer": {
"email": "yagmail >= 0.15",
"wechat_robot": "requests (webhook)",
"dingtalk": "requests (webhook)"
},
"schedule_layer": {
"python_scheduler": "schedule >= 1.0",
"os_scheduler": "windows_task / cron"
},
"monitor_layer": {
"logging": "loguru >= 0.7",
"alert": "smtplib + twilio (optional)"
}
}
def check_environment():
"""检查项目运行环境是否满足依赖"""
import importlib
missing = []
for layer, tools in PROJECT_STACK.items():
for tool, spec in tools.items():
try:
importlib.import_module(tool.split("[")[0])
print(f"[OK] {tool} - {spec}")
except ImportError:
missing.append((tool, spec))
print(f"[MISS] {tool} - {spec}")
if missing:
print(f"\n缺少 {len(missing)} 个依赖,请执行: pip install -r requirements.txt")
return len(missing) == 0
1.2 项目开发流程
办公自动化项目建议采用敏捷迭代方式推进:第一阶段完成核心数据读取与基本报告生成(MVP);第二阶段加入邮件分发和定时调度;第三阶段完善异常处理、日志监控和配置管理;第四阶段根据用户反馈持续优化。每个阶段都应有明确的验收标准和测试用例。
# 项目开发流程管理
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
class Phase(Enum):
MVP = "最小可行产品"
ENHANCE = "功能增强"
POLISH = "完善优化"
ITERATE = "持续迭代"
@dataclass
class ProjectMilestone:
phase: Phase
tasks: list
deadline: datetime
acceptance_criteria: list
def is_completed(self, completed_tasks: set) -> bool:
return all(t in completed_tasks for t in self.tasks)
# 定义四个阶段的任务
project_plan = [
ProjectMilestone(
phase=Phase.MVP,
tasks=["数据读取模块", "基本报告生成", "本地保存"],
deadline=datetime(2026, 6, 1),
acceptance_criteria=["能正确读取3种格式的数据源", "生成Word文档", "文档格式正确"]
),
ProjectMilestone(
phase=Phase.ENHANCE,
tasks=["邮件集成", "定时任务", "多模板支持"],
deadline=datetime(2026, 6, 15),
acceptance_criteria=["邮件发送成功", "定时触发准确", "模板切换正常"]
)
]
def generate_gantt_chart(plan):
"""生成项目甘特图文本表示"""
print("=" * 60)
print(f"{'阶段':<20} {'任务':<30} {'截止日期':<15}")
print("=" * 60)
for milestone in plan:
tasks_str = ", ".join(milestone.tasks[:2])
if len(milestone.tasks) > 2:
tasks_str += f"...(+{len(milestone.tasks)-2})"
print(f"{milestone.phase.value:<20} {tasks_str:<30} "
f"{milestone.deadline.strftime('%m/%d'):<15}")
print("=" * 60)
1.3 需求分析方法
在启动任何办公自动化项目之前,必须进行充分的需求调研。重点关注以下维度:现有的手工流程是什么样的?哪些环节最耗时?数据源来自哪些系统/部门?期望的输出格式和分发渠道是什么?异常情况如何处理?是否有合规性要求(如数据脱敏、审计留痕)?通过5W1H分析法(What/Why/Who/When/Where/How)梳理需求,形成需求文档并获得相关方确认,是项目成功的基础。
二、自动日报/周报系统
日报/周报自动生成系统是企业办公自动化中最常见也最具价值的场景之一。传统的手工填报方式存在数据分散、格式不统一、提交不及时、主管汇总费时等痛点。通过自动化手段,可以从多个业务系统中抽取数据,按统一模板生成报告,并通过邮件或IM工具定时推送给相关人员。
2.1 数据源整合
日报系统的数据通常来自多个部门的多张Excel表格。例如:销售部的"每日业绩表"、运营部的"渠道流量表"、客服部的"工单处理统计表"。这些表格的格式可能各不相同,需要先进行统一的解析和规范化处理。建议为每个部门定义一个数据提取器(Data Extractor),将异构数据转换为统一的内部数据结构。
import openpyxl
from pathlib import Path
from datetime import date
from dataclasses import dataclass, asdict
from typing import List, Optional
import csv
import json
@dataclass
class DailyReportRow:
"""统一的日报数据行模型"""
department: str
reporter: str
report_date: date
kpi_name: str
kpi_value: float
unit: str
remark: Optional[str] = None
class ExcelDataExtractor:
"""从Excel文件中提取日报数据"""
def __init__(self, file_path: str, department: str):
self.file_path = Path(file_path)
self.department = department
self.wb = openpyxl.load_workbook(file_path, data_only=True)
def extract_sales_data(self) -> List[DailyReportRow]:
"""提取销售部数据"""
ws = self.wb.active
records = []
for row in ws.iter_rows(min_row=3, values_only=True):
if row[0] is None:
continue
records.append(DailyReportRow(
department=self.department,
reporter=row[1],
report_date=row[0],
kpi_name="销售额",
kpi_value=float(row[2] or 0),
unit="元",
remark=row[3]
))
return records
def extract_operation_data(self) -> List[DailyReportRow]:
"""提取运营部数据"""
ws = self.wb["流量数据"]
records = []
for row in ws.iter_rows(min_row=2, max_col=5, values_only=True):
if not row[0]:
continue
records.append(DailyReportRow(
department=self.department,
reporter=row[1],
report_date=row[0],
kpi_name=row[2],
kpi_value=float(row[3] or 0),
unit=row[4]
))
return records
def get_all_records(self) -> List[DailyReportRow]:
"""根据部门自动选择提取器"""
extractors = {
"销售部": self.extract_sales_data,
"运营部": self.extract_operation_data,
}
extractor = extractors.get(self.department)
if not extractor:
raise ValueError(f"不支持的部门: {self.department}")
return extractor()
2.2 报告自动生成与分发
数据提取完毕后,接下来需要将数据按照模板生成日报文档。这里可以采用python-docx操作Word模板,将数据填充到预定义的表格和文本占位符中。生成后通过yagmail库自动发送邮件,支持HTML正文和附件。对于异常数据(如销售额突降超过20%),系统自动在报告中标注预警,并额外发送告警通知。
import yagmail
from docx import Document
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from typing import List
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
class ReportGenerator:
"""日报/周报自动生成器"""
def __init__(self, template_path: str):
self.template_path = template_path
self.doc = Document(template_path)
def fill_summary(self, total_revenue: float, total_orders: int, abnormal_count: int):
"""填充摘要数据"""
summary_text = (
f"本日总销售额: {total_revenue:,.2f} 元 | "
f"总订单数: {total_orders} | "
f"异常数据项: {abnormal_count} 项"
)
for paragraph in self.doc.paragraphs:
if "{{summary}}" in paragraph.text:
paragraph.text = paragraph.text.replace("{{summary}}", summary_text)
if abnormal_count > 0:
run = paragraph.runs[0]
run.font.color.rgb = RGBColor(255, 0, 0)
def add_data_table(self, records: List[DailyReportRow]):
"""在文档中添加数据表格"""
table = self.doc.add_table(rows=1, cols=6, style="Light Grid Accent 1")
header = table.rows[0].cells
headers = ["部门", "填报人", "日期", "指标", "数值", "备注"]
for i, h in enumerate(headers):
header[i].text = h
for record in records:
row = table.add_row().cells
row[0].text = record.department
row[1].text = record.reporter
row[2].text = record.report_date.strftime("%Y-%m-%d")
row[3].text = record.kpi_name
row[4].text = f"{record.kpi_value:,.2f} {record.unit}"
row[5].text = record.remark or ""
def save(self, output_path: str):
self.doc.save(output_path)
class EmailDispatcher:
"""邮件自动分发器"""
def __init__(self, config: dict):
self.sender = config["sender"]
self.password = config["password"]
self.smtp_server = config.get("smtp_server", "smtp.qq.com")
self.smtp_port = config.get("smtp_port", 465)
def send_report(self, recipients: List[str], subject: str,
body: str, attachment_path: str):
"""发送带附件的报告邮件"""
msg = MIMEMultipart()
msg["From"] = self.sender
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
msg.attach(MIMEText(body, "html", "utf-8"))
with open(attachment_path, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition",
f"attachment; filename={attachment_path}")
msg.attach(part)
with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:
server.login(self.sender, self.password)
server.send_message(msg)
def send_alert(self, recipient: str, alert_message: str):
"""发送异常告警"""
subject = "【系统预警】日报数据异常通知"
html = f"""
数据异常预警
{alert_message}
请相关同事及时核查处理。
系统自动发送,请勿回复
"""
self.send_report([recipient], subject, html, attachment_path="")
2.3 定时调度整合
将以上所有步骤串联起来,通过 schedule 库设置每日定时执行。建议在每天固定时间(如上午9:30)执行数据提取-报告生成-邮件分发全流程。同时记录每次执行的日志,便于排查问题。对于节假日或周末,可以配置自动跳过逻辑。
import schedule
import time
from datetime import datetime, date
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("daily_report.log", encoding="utf-8"),
logging.StreamHandler()
]
)
class DailyReportPipeline:
"""日报全流程调度管道"""
def __init__(self, config: dict):
self.config = config
self.generator = ReportGenerator(config["template_path"])
self.dispatcher = EmailDispatcher(config["email"])
self.data_dir = config["data_dir"]
def run(self):
"""执行日报生成全流程"""
try:
logging.info("开始执行日报生成流程...")
today = date.today()
# 跳过周末
if today.weekday() >= 5:
logging.info(f"今天是周末({today}),跳过日报生成")
return
# 1. 提取数据
all_records = []
data_sources = {
"销售部": f"{self.data_dir}/sales_{today}.xlsx",
"运营部": f"{self.data_dir}/ops_{today}.xlsx"
}
for dept, path in data_sources.items():
extractor = ExcelDataExtractor(path, dept)
records = extractor.get_all_records()
all_records.extend(records)
logging.info(f"从 {dept} 提取了 {len(records)} 条记录")
# 2. 异常检测
abnormal = [r for r in all_records if r.kpi_value < 0]
if abnormal:
logging.warning(f"发现 {len(abnormal)} 条异常数据")
for rec in abnormal[:3]:
self.dispatcher.send_alert(
self.config["manager_email"],
f"{rec.department}-{rec.reporter}: {rec.kpi_name} 异常 ({rec.kpi_value})"
)
# 3. 生成报告
total_revenue = sum(r.kpi_value for r in all_records
if r.kpi_name == "销售额")
report_path = f"日报_{today.strftime('%Y%m%d')}.docx"
self.generator.fill_summary(total_revenue, len(all_records), len(abnormal))
self.generator.add_data_table(all_records)
self.generator.save(report_path)
logging.info(f"报告已生成: {report_path}")
# 4. 邮件分发
self.dispatcher.send_report(
recipients=self.config["report_recipients"],
subject=f"【日报】{today.strftime('%Y-%m-%d')} 业务数据报告",
body=self._build_html_body(all_records),
attachment_path=report_path
)
logging.info("日报邮件已发送")
except Exception as e:
logging.error(f"日报生成失败: {e}", exc_info=True)
self.dispatcher.send_alert(
self.config["admin_email"],
f"日报自动生成系统异常: {str(e)}"
)
def _build_html_body(self, records):
"""构建HTML邮件正文"""
rows = "".join(
f"
{r.department} {r.kpi_name} "
f"{r.kpi_value:,.2f} {r.unit} "
for r in records[:10]
)
return f"""
今日业务摘要
共收集 {len(records)} 条数据记录
详细报告请见附件。
"""
# 调度配置
pipeline = DailyReportPipeline({
"template_path": "templates/daily_report_template.docx",
"data_dir": "data/daily/",
"email": {"sender": "reporter@company.com", "password": "***"},
"report_recipients": ["manager@company.com", "boss@company.com"],
"manager_email": "manager@company.com",
"admin_email": "admin@company.com"
})
# 定时任务:每个工作日9:30执行
schedule.every().monday.at("09:30").do(pipeline.run)
schedule.every().tuesday.at("09:30").do(pipeline.run)
schedule.every().wednesday.at("09:30").do(pipeline.run)
schedule.every().thursday.at("09:30").do(pipeline.run)
schedule.every().friday.at("09:30").do(pipeline.run)
if __name__ == "__main__":
logging.info("日报自动系统已启动,等待定时触发...")
while True:
schedule.run_pending()
time.sleep(30)
三、审批流程自动化
企业日常运转中充斥着大量审批流程:请假审批、报销审批、采购审批、用章审批等。传统纸质审批流转慢、易丢失、难追溯。审批流程自动化系统可以将这些流程数字化,通过预设的审批链自动流转,结合企业微信、钉钉或邮件实现即时通知,大幅提升审批效率。
3.1 审批流程引擎设计
核心设计思路是将审批流程抽象为"节点链":每个审批单经过一系列审批节点,每个节点有对应的审批人和处理规则。支持会签(多人同时审批都需通过)、或签(一人通过即可)、条件分支(金额大于某值需上级审批)等常见模式。
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from datetime import datetime
import uuid
class ApprovalNodeType(Enum):
"""审批节点类型"""
SINGLE = "单人审批" # 单人审批即可
AND_SIGN = "会签" # 所有人审批通过
OR_SIGN = "或签" # 一人通过即可
CC = "抄送" # 仅通知
class ApprovalStatus(Enum):
PENDING = "待审批"
APPROVED = "已通过"
REJECTED = "已驳回"
WITHDRAWN = "已撤回"
@dataclass
class ApprovalNode:
"""审批节点"""
node_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
node_name: str = "审批节点"
node_type: ApprovalNodeType = ApprovalNodeType.SINGLE
approvers: List[str] = field(default_factory=list)
condition: Optional[Callable] = None # 条件函数,返回True才执行
order: int = 0 # 节点顺序
@dataclass
class ApprovalRequest:
"""审批请求"""
request_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
title: str = ""
content: dict = field(default_factory=dict)
requester: str = ""
create_time: datetime = field(default_factory=datetime.now)
status: ApprovalStatus = ApprovalStatus.PENDING
current_node_index: int = 0
history: List[dict] = field(default_factory=list)
class ApprovalEngine:
"""审批流程引擎"""
def __init__(self, name: str):
self.name = name
self.nodes: List[ApprovalNode] = []
self.requests: dict = {}
def add_node(self, node: ApprovalNode):
self.nodes.append(node)
# 按order排序
self.nodes.sort(key=lambda n: n.order)
def submit(self, request: ApprovalRequest) -> str:
"""提交审批请求"""
request.status = ApprovalStatus.PENDING
request.current_node_index = 0
self.requests[request.request_id] = request
self._notify_approvers(request)
return request.request_id
def approve(self, request_id: str, approver: str, comment: str = ""):
"""审批通过"""
request = self.requests[request_id]
current_node = self.nodes[request.current_node_index]
request.history.append({
"action": "approve",
"node": current_node.node_name,
"approver": approver,
"comment": comment,
"time": datetime.now()
})
# 判断是否还有下一节点
if request.current_node_index + 1 >= len(self.nodes):
request.status = ApprovalStatus.APPROVED
self._on_completed(request)
else:
request.current_node_index += 1
next_node = self.nodes[request.current_node_index]
if next_node.node_type == ApprovalNodeType.CC:
self._cc_notify(next_node, request)
request.current_node_index += 1
self._notify_approvers(request)
def reject(self, request_id: str, approver: str, reason: str):
"""驳回审批"""
request = self.requests[request_id]
request.status = ApprovalStatus.REJECTED
request.history.append({
"action": "reject",
"approver": approver,
"reason": reason,
"time": datetime.now()
})
self._notify_requester(request, f"您的审批已被驳回: {reason}")
def _notify_approvers(self, request: ApprovalRequest):
"""通知当前节点的审批人"""
node = self.nodes[request.current_node_index]
msg = f"您有新的审批任务: {request.title}"
for approver in node.approvers:
print(f"[通知] {approver}: {msg}")
# 实际项目中调用企业微信/邮件接口
def _notify_requester(self, request: ApprovalRequest, message: str):
"""通知申请人"""
print(f"[通知] {request.requester}: {message}")
def _on_completed(self, request: ApprovalRequest):
"""审批完成回调"""
print(f"[完成] 审批单 {request.request_id} 已全部通过")
# 可在此触发后续流程(如生成报表、更新状态)
def _cc_notify(self, node: ApprovalNode, request: ApprovalRequest):
"""抄送通知"""
msg = f"审批单 {request.title} 已通过,请知悉"
for cc_user in node.approvers:
print(f"[抄送] {cc_user}: {msg}")
3.2 表单自动生成与通知
审批表单可以根据预设模板自动生成。例如,请假审批单自动从系统中获取申请人的剩余假期、部门主管信息,填充到表单中。表单生成后,通过企业微信机器人webhook或邮件将审批链接推送给审批人。审批人在链接中点击"通过"或"驳回"后,系统自动更新状态并将结果通知申请人。
import requests
import json
from docx import Document
class ApprovalFormGenerator:
"""审批表单自动生成器"""
def __init__(self, template_path: str):
self.template_path = template_path
def generate_leave_form(self, applicant: str, department: str,
leave_type: str, start_date: str,
end_date: str, reason: str) -> str:
"""生成请假审批单"""
doc = Document(self.template_path)
placeholders = {
"{{applicant}}": applicant,
"{{department}}": department,
"{{leave_type}}": leave_type,
"{{start_date}}": start_date,
"{{end_date}}": end_date,
"{{reason}}": reason,
"{{apply_date}}": datetime.now().strftime("%Y-%m-%d")
}
for para in doc.paragraphs:
for key, value in placeholders.items():
if key in para.text:
para.text = para.text.replace(key, value)
output_path = f"审批单_{applicant}_{datetime.now().strftime('%Y%m%d%H%M%S')}.docx"
doc.save(output_path)
return output_path
class WeChatRobotNotifier:
"""企业微信机器人通知"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_approval_notification(self, approver: str, title: str,
form_url: str, urgent: bool = False):
"""发送审批通知到企业微信"""
level = "【紧急】" if urgent else ""
message = {
"msgtype": "markdown",
"markdown": {
"content": (
f"### {level}审批通知\n"
f"> **主题**: {title}\n"
f"> **审批人**: {approver}\n"
f"> **请及时处理**: [点击审批]({form_url})\n"
)
}
}
resp = requests.post(
self.webhook_url,
data=json.dumps(message),
headers={"Content-Type": "application/json"}
)
return resp.status_code == 200
def send_result_notification(self, recipient: str, title: str,
result: str, comment: str = ""):
"""发送审批结果通知"""
message = {
"msgtype": "markdown",
"markdown": {
"content": (
f"### 审批结果通知\n"
f"> **主题**: {title}\n"
f"> **结果**: {'✅ 已通过' if result == 'approved' else '❌ 已驳回'}\n"
f"> **说明**: {comment if comment else '无'}\n"
f"> **申请人**: {recipient}\n"
)
}
}
requests.post(
self.webhook_url,
data=json.dumps(message),
headers={"Content-Type": "application/json"}
)
3.3 审批数据统计看板
所有审批数据汇总到数据库中,可以生成多维度统计看板:各部门审批量趋势、平均审批耗时、各审批人处理效率、驳回率分析等。这些数据对管理层优化审批流程、提高组织效率具有重要参考价值。结合openpyxl可以定期导出审批统计报表,或通过HTML模板生成可视化的审批数据看板。
import sqlite3
from datetime import datetime, timedelta
class ApprovalStats:
"""审批数据统计"""
def __init__(self, db_path: str = "approval.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS approval_records (
id TEXT PRIMARY KEY,
title TEXT,
requester TEXT,
department TEXT,
amount REAL,
status TEXT,
create_time TIMESTAMP,
complete_time TIMESTAMP,
total_hours REAL
)
""")
self.conn.commit()
def department_stats(self, days: int = 30):
"""按部门统计审批量"""
since = datetime.now() - timedelta(days=days)
cursor = self.conn.execute("""
SELECT department,
COUNT(*) as total,
SUM(CASE WHEN status='APPROVED' THEN 1 ELSE 0 END) as approved,
ROUND(AVG(total_hours), 1) as avg_hours
FROM approval_records
WHERE create_time >= ?
GROUP BY department
ORDER BY total DESC
""", (since,))
return cursor.fetchall()
def approver_efficiency(self):
"""审批人效率统计"""
cursor = self.conn.execute("""
SELECT approver,
COUNT(*) as processed,
ROUND(AVG(processing_hours), 1) as avg_hours
FROM approval_actions
GROUP BY approver
ORDER BY avg_hours ASC
""")
return cursor.fetchall()
def export_to_excel(self, output_path: str):
"""导出审批统计到Excel"""
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = "审批统计"
# 标题行
ws.append(["部门", "审批总数", "通过数", "驳回数", "平均耗时(小时)"])
for row in self.department_stats(90):
ws.append(list(row))
# 自动调整列宽
for col in ws.columns:
max_length = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = max_length + 4
wb.save(output_path)
return output_path
四、数据报表与看板
数据报表与可视化看板是企业决策层最常用的管理工具。自动化报表系统能够从多个数据源(ERP、CRM、财务系统等)抽取数据,经过清洗和归并后,按预设模板生成统计报表,并通过图表直观展示关键指标。结合Web技术,还可以构建实时刷新的数据看板,让管理层随时掌握业务动态。
4.1 多数据源归并引擎
企业数据往往分散在不同系统中。ERP系统输出的是CSV格式的销售明细,CRM系统提供API接口获取客户数据,财务系统使用Excel报表。多数据源归并引擎的核心能力是将异构数据统一成标准格式,并按时间、部门、产品等维度进行聚合计算。
import pandas as pd
from typing import List, Dict, Any
from abc import ABC, abstractmethod
import csv
import json
class DataSource(ABC):
"""数据源抽象基类"""
@abstractmethod
def fetch(self, date_range: tuple) -> pd.DataFrame:
pass
class CSVDataSource(DataSource):
"""CSV文件数据源"""
def __init__(self, file_pattern: str, name: str):
self.file_pattern = file_pattern
self.name = name
def fetch(self, date_range: tuple) -> pd.DataFrame:
import glob
files = glob.glob(self.file_pattern)
dfs = []
for f in files:
df = pd.read_csv(f, encoding="utf-8")
dfs.append(df)
if not dfs:
return pd.DataFrame()
result = pd.concat(dfs, ignore_index=True)
# 按日期范围过滤
start, end = date_range
if "日期" in result.columns:
result["日期"] = pd.to_datetime(result["日期"])
result = result[(result["日期"] >= start) & (result["日期"] <= end)]
return result
class APIDataSource(DataSource):
"""API接口数据源"""
def __init__(self, api_url: str, api_key: str, name: str):
self.api_url = api_url
self.api_key = api_key
self.name = name
def fetch(self, date_range: tuple) -> pd.DataFrame:
params = {
"start_date": date_range[0].strftime("%Y-%m-%d"),
"end_date": date_range[1].strftime("%Y-%m-%d")
}
headers = {"Authorization": f"Bearer {self.api_key}"}
resp = requests.get(self.api_url, params=params, headers=headers)
data = resp.json()
return pd.DataFrame(data.get("records", []))
class DataMergeEngine:
"""多数据源归并引擎"""
def __init__(self):
self.sources: Dict[str, DataSource] = {}
def register_source(self, name: str, source: DataSource):
self.sources[name] = source
def merge(self, date_range: tuple,
join_key: str = "日期") -> pd.DataFrame:
"""归并所有数据源"""
all_data = []
for name, source in self.sources.items():
df = source.fetch(date_range)
df["数据来源"] = name
all_data.append(df)
print(f"[{name}] 获取 {len(df)} 条记录")
if not all_data:
return pd.DataFrame()
merged = pd.concat(all_data, ignore_index=True, sort=False)
# 去重(同一条数据可能出现在多个源中)
if join_key in merged.columns:
merged = merged.drop_duplicates(subset=[join_key, "数据来源"])
return merged
4.2 图表自动化生成
数据可视化是报表系统的关键组成部分。Python生态中有多个优秀的图表库:matplotlib适合静态出版物级图表;plotly适合交互式Web图表;pyecharts则提供了丰富的中国式报表图表风格。在自动化报表系统中,建议将图表生成封装为独立模块,支持一键输出PNG图片或嵌入HTML。
import matplotlib
matplotlib.use("Agg") # 无头模式,服务器环境必需
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
from io import BytesIO
import base64
class ChartGenerator:
"""图表自动生成器"""
def __init__(self, style: str = "seaborn-v0_8-whitegrid"):
plt.style.use(style)
plt.rcParams["font.sans-serif"] = ["SimHei", "Microsoft YaHei"]
plt.rcParams["axes.unicode_minus"] = False
def revenue_trend(self, dates: list, amounts: list,
title: str = "收入趋势图") -> str:
"""生成收入趋势折线图,返回base64图片"""
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(dates, amounts, marker="o", linewidth=2, color="#2e7d32")
ax.fill_between(dates, amounts, alpha=0.15, color="#2e7d32")
ax.set_title(title, fontsize=14, pad=15)
ax.set_xlabel("日期")
ax.set_ylabel("金额(元)")
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))
fig.autofmt_xdate()
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=150)
plt.close()
buf.seek(0)
return base64.b64encode(buf.read()).decode()
def department_pie(self, labels: list, values: list,
title: str = "部门业绩占比") -> str:
"""生成部门业绩饼图"""
fig, ax = plt.subplots(figsize=(8, 8))
colors = ["#2e7d32", "#388e3c", "#66bb6a", "#a5d6a7", "#c8e6c9"]
wedges, texts, autotexts = ax.pie(
values, labels=labels, autopct="%1.1f%%",
colors=colors[:len(labels)], startangle=90,
textprops={"fontsize": 11}
)
ax.set_title(title, fontsize=14, pad=15)
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=150)
plt.close()
buf.seek(0)
return base64.b64encode(buf.read()).decode()
def comparison_bar(self, categories: list, series1: list,
series2: list, labels: tuple = ("本期", "上期"),
title: str = "同比对比图") -> str:
"""生成同比对比柱状图"""
fig, ax = plt.subplots(figsize=(10, 5))
x = np.arange(len(categories))
width = 0.35
bars1 = ax.bar(x - width/2, series1, width, label=labels[0], color="#2e7d32")
bars2 = ax.bar(x + width/2, series2, width, label=labels[1], color="#66bb6a")
ax.set_title(title, fontsize=14, pad=15)
ax.set_xticks(x)
ax.set_xticklabels(categories)
ax.legend()
plt.tight_layout()
buf = BytesIO()
plt.savefig(buf, format="png", dpi=150)
plt.close()
buf.seek(0)
return base64.b64encode(buf.read()).decode()
4.3 Web可视化看板
除了静态报表外,可以使用Flask或FastAPI构建轻量级Web看板。看板页面通过定时AJAX请求刷新数据,使用ECharts或Plotly.js在前端渲染交互式图表。看板可以按角色展示不同内容:管理层看全局KPI、部门经理看部门数据、一线员工看个人绩效。权限控制通过简单的Token或LDAP集成实现。
# Web看板核心路由示例 (Flask)
"""
from flask import Flask, jsonify, render_template
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
# 全局数据引擎实例
merge_engine = DataMergeEngine()
chart_gen = ChartGenerator()
@app.route("/api/dashboard/summary")
def dashboard_summary():
\"\"\"看板摘要数据API\"\"\"
from datetime import datetime, timedelta
end = datetime.now()
start = end - timedelta(days=30)
df = merge_engine.merge((start, end))
summary = {
"total_revenue": float(df["金额"].sum()) if "金额" in df else 0,
"total_orders": int(df["订单号"].nunique()) if "订单号" in df else 0,
"avg_order_value": float(df["金额"].mean()) if "金额" in df else 0,
"active_departments": int(df["部门"].nunique()) if "部门" in df else 0,
"data_date": end.strftime("%Y-%m-%d %H:%M")
}
return jsonify(summary)
@app.route("/api/dashboard/trend")
def dashboard_trend():
\"\"\"趋势图数据API\"\"\"
end = datetime.now()
start = end - timedelta(days=30)
df = merge_engine.merge((start, end))
if "日期" in df and "金额" in df:
daily = df.groupby(df["日期"].dt.date)["金额"].sum()
chart_data = {
"dates": [str(d) for d in daily.index],
"values": [float(v) for v in daily.values]
}
else:
chart_data = {"dates": [], "values": []}
return jsonify(chart_data)
@app.route("/dashboard")
def dashboard_page():
\"\"\"看板页面\"\"\"
return render_template("dashboard.html")
"""
五、文档批量处理工厂
在企业运营中,大量文档处理工作具有高度重复性:合同批量生成、通知书批量打印、证书批量制作、报价单批量输出等。文档批量处理工厂的核心思想是将文档模板与数据分离,通过占位符替换引擎实现"一份模板 + 多组数据 = 批量文档"的高效产出模式。
5.1 文档模板管理系统
一套完善的模板管理系统需要支持模板的上传、分类、版本管理、占位符定义和预览功能。模板中通过 {{placeholder}} 格式定义变量占位符,系统自动扫描模板中的所有占位符并生成数据录入表单。模板可以采用Word格式(便于非技术人员编辑)或HTML格式(便于Web渲染)。
import re
import json
from docx import Document
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
class TemplateManager:
"""文档模板管理器"""
def __init__(self, template_dir: str = "templates"):
self.template_dir = Path(template_dir)
self.template_dir.mkdir(parents=True, exist_ok=True)
def scan_placeholders(self, template_path: str) -> List[str]:
"""扫描模板中的所有占位符"""
doc = Document(template_path)
placeholders = set()
pattern = re.compile(r"\{\{(\w+)\}\}")
for para in doc.paragraphs:
found = pattern.findall(para.text)
placeholders.update(found)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
found = pattern.findall(cell.text)
placeholders.update(found)
return sorted(placeholders)
def list_templates(self) -> List[dict]:
"""列出所有可用模板"""
templates = []
for f in self.template_dir.glob("*.docx"):
placeholders = self.scan_placeholders(str(f))
templates.append({
"name": f.stem,
"path": str(f),
"size": f.stat().st_size,
"modified": datetime.fromtimestamp(f.stat().st_mtime),
"placeholders": placeholders
})
return templates
def validate_data(self, template_path: str,
data: Dict[str, str]) -> tuple:
"""验证数据是否满足模板需求,返回(是否有效, 缺失字段列表)"""
required = self.scan_placeholders(template_path)
missing = [p for p in required if p not in data]
return len(missing) == 0, missing
def preview(self, template_path: str, data: Dict[str, str]) -> str:
"""预览单个文档(返回文本预览)"""
doc = Document(template_path)
preview_lines = []
for para in doc.paragraphs:
text = para.text
for key, value in data.items():
text = text.replace("{{" + key + "}}", value)
preview_lines.append(text)
return "\n".join(preview_lines[:20])
5.2 批量生成引擎
批量生成引擎的核心是一个高性能的文档渲染器。对于大量数据的批量生成(如数千份合同),需要关注性能优化:使用缓存减少重复读取、多线程并行生成、流式写入避免内存溢出。生成的文档可以自动转换为PDF并添加防伪水印,确保文档的正式性和安全性。
from concurrent.futures import ThreadPoolExecutor, as_completed
from docx import Document
from docx.oxml.ns import qn
import pdfkit
import hashlib
from datetime import datetime
class BatchDocumentFactory:
"""文档批量生成工厂"""
def __init__(self, template_path: str, output_dir: str = "output"):
self.template_path = template_path
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def _render_single(self, data: dict) -> str:
"""渲染单个文档"""
doc = Document(self.template_path)
# 替换段落中的占位符
for para in doc.paragraphs:
for key, value in data.items():
placeholder = "{{" + key + "}}"
if placeholder in para.text:
para.text = para.text.replace(placeholder, str(value))
# 替换表格中的占位符
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for key, value in data.items():
placeholder = "{{" + key + "}}"
if placeholder in cell.text:
cell.text = cell.text.replace(placeholder, str(value))
# 生成唯一边际输出文件名
serial = data.get("编号", data.get("serial", datetime.now().strftime("%Y%m%d%H%M%S")))
output_path = str(self.output_dir / f"{serial}.docx")
doc.save(output_path)
return output_path
def add_watermark(self, docx_path: str, watermark_text: str = "内部资料 严禁外传"):
"""添加文字水印"""
doc = Document(docx_path)
for section in doc.sections:
header = section.header
para = header.paragraphs[0] if header.paragraphs else header.add_paragraph()
para.text = watermark_text
from docx.shared import Pt, RGBColor
run = para.runs[0] if para.runs else para.add_run(watermark_text)
run.font.size = Pt(48)
run.font.color.rgb = RGBColor(200, 200, 200)
run.font.name = "Arial"
doc.save(docx_path)
def convert_to_pdf(self, docx_path: str) -> str:
"""将Word文档转换为PDF"""
pdf_path = docx_path.replace(".docx", ".pdf")
try:
pdfkit.from_file(docx_path, pdf_path)
except Exception as e:
print(f"PDF转换失败: {e}")
# 降级:直接复制DOCX
pdf_path = docx_path
return pdf_path
def batch_generate(self, all_data: List[Dict],
parallel: bool = True,
max_workers: int = 4,
need_pdf: bool = False,
need_watermark: bool = False) -> List[str]:
"""批量生成文档
Args:
all_data: 数据列表,每条数据生成一份文档
parallel: 是否并行生成
max_workers: 并行数
need_pdf: 是否转换为PDF
need_watermark: 是否添加水印
Returns:
生成的文件路径列表
"""
results = []
if parallel:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self._render_single, data): data
for data in all_data}
for future in as_completed(futures):
data = futures[future]
docx_path = future.result()
if need_watermark:
self.add_watermark(docx_path)
if need_pdf:
pdf_path = self.convert_to_pdf(docx_path)
results.append(pdf_path)
else:
results.append(docx_path)
print(f"[完成] {data.get('姓名', data.get('name', '未知'))}")
else:
for data in all_data:
docx_path = self._render_single(data)
if need_watermark:
self.add_watermark(docx_path)
if need_pdf:
pdf_path = self.convert_to_pdf(docx_path)
results.append(pdf_path)
else:
results.append(docx_path)
return results
def verify_document(self, docx_path: str) -> dict:
"""验证文档完整性"""
file_path = Path(docx_path)
if not file_path.exists():
return {"valid": False, "error": "文件不存在"}
sha256 = hashlib.sha256()
with open(docx_path, "rb") as f:
sha256.update(f.read())
return {
"valid": True,
"filename": file_path.name,
"size_bytes": file_path.stat().st_size,
"sha256": sha256.hexdigest(),
"generated_at": datetime.fromtimestamp(file_path.stat().st_ctime)
}
# ===== 使用示例 =====
if __name__ == "__main__":
# 准备合同数据(模拟从数据库读取)
contracts_data = [
{"编号": "HT2026001", "姓名": "张三", "公司": "科技有限公司",
"金额": "500000", "日期": "2026-05-01", "产品": "企业版软件许可"},
{"编号": "HT2026002", "姓名": "李四", "公司": "信息技术公司",
"金额": "320000", "日期": "2026-05-03", "产品": "标准版软件许可"},
{"编号": "HT2026003", "姓名": "王五", "公司": "数据服务公司",
"金额": "880000", "日期": "2026-05-05", "产品": "旗舰版全套方案"},
]
factory = BatchDocumentFactory(
template_path="templates/contract_template.docx",
output_dir="output/contracts/"
)
# 批量生成(并行, 加PDF加水印)
generated = factory.batch_generate(
all_data=contracts_data,
parallel=True,
max_workers=4,
need_pdf=True,
need_watermark=True
)
print(f"\n成功生成 {len(generated)} 份文档:")
for path in generated:
info = factory.verify_document(path)
print(f" - {info['filename']} ({info['size_bytes']} bytes)")
六、ETL数据处理管道
ETL(Extract-Transform-Load)是从多个异构数据源抽取数据、经过清洗转换后加载到目标系统的完整过程。在办公自动化场景中,ETL管道常用于数据仓库建设、报表系统的数据预处理、系统间数据同步等。一个健壮的ETL管道需要具备可配置、可监控、可重跑、断点续传等工程特性。
6.1 数据抽取模块
数据抽取需要支持多种格式和来源:Excel/CSV文件、各类数据库(MySQL/PostgreSQL/SQLite)、HTTP API接口、邮件附件、FTP服务器等。每个数据源定义一个抽取器(Extractor),实现统一的抽取接口。抽取器负责处理连接管理、分页拉取、增量更新逻辑。
from abc import ABC, abstractmethod
from typing import Iterator, Dict, Any, Optional
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text
class Extractor(ABC):
"""数据抽取器基类"""
@abstractmethod
def extract(self, **kwargs) -> pd.DataFrame:
pass
@abstractmethod
def validate_source(self) -> bool:
"""验证数据源是否可访问"""
pass
class MySQLExtractor(Extractor):
"""MySQL数据库抽取器"""
def __init__(self, connection_string: str, query: str,
chunk_size: int = 10000):
self.connection_string = connection_string
self.query = query
self.chunk_size = chunk_size
self.engine = sa.create_engine(connection_string)
def extract(self, **kwargs) -> pd.DataFrame:
chunks = []
for chunk in pd.read_sql_query(
text(self.query), self.engine,
chunksize=self.chunk_size
):
chunks.append(chunk)
print(f" 已抽取 {len(chunk)} 条...")
return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
def validate_source(self) -> bool:
try:
with self.engine.connect() as conn:
conn.execute(text("SELECT 1"))
return True
except Exception as e:
print(f"数据库连接失败: {e}")
return False
class ExcelExtractor(Extractor):
"""多Sheet Excel抽取器"""
def __init__(self, file_path: str, sheet_name: Optional[str] = None):
self.file_path = file_path
self.sheet_name = sheet_name
def extract(self, **kwargs) -> pd.DataFrame:
if self.sheet_name:
return pd.read_excel(self.file_path, sheet_name=self.sheet_name)
# 读取所有sheet并合并
excel_file = pd.ExcelFile(self.file_path)
all_sheets = []
for sheet in excel_file.sheet_names:
df = excel_file.parse(sheet)
df["来源Sheet"] = sheet
all_sheets.append(df)
return pd.concat(all_sheets, ignore_index=True)
def validate_source(self) -> bool:
return Path(self.file_path).exists()
6.2 数据转换与清洗引擎
数据转换是ETL管道的核心环节。常见转换操作包括:字段映射(源字段名到目标字段名)、类型转换(字符串到日期、文本到数字)、数据清洗(去重、空值处理、异常值过滤)、业务规则计算(汇总、分摊、汇率转换)、数据脱敏(手机号/身份证打码)。转换规则通过配置文件定义,支持热加载。
import re
import yaml
from typing import Callable, Dict, Any
class TransformRule:
"""数据转换规则"""
def __init__(self, name: str, func: Callable, params: dict = None):
self.name = name
self.func = func
self.params = params or {}
class DataTransformer:
"""数据转换引擎"""
def __init__(self, rules_config: str = None):
self.rules: Dict[str, list] = {}
if rules_config:
self.load_config(rules_config)
def load_config(self, config_path: str):
"""从YAML配置文件加载转换规则"""
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
for table, rules in config.get("transforms", {}).items():
self.rules[table] = []
for rule_def in rules:
rule = TransformRule(
name=rule_def["name"],
func=self._resolve_function(rule_def["type"]),
params=rule_def.get("params", {})
)
self.rules[table].append(rule)
def _resolve_function(self, rule_type: str) -> Callable:
"""根据规则类型返回对应的处理函数"""
registry = {
"strip_whitespace": lambda v, **_: v.strip() if isinstance(v, str) else v,
"to_datetime": lambda v, **_: pd.to_datetime(v, errors="coerce"),
"to_numeric": lambda v, **_: pd.to_numeric(v, errors="coerce"),
"fillna": lambda v, **kw: v if pd.notna(v) else kw.get("default", 0),
"mask_phone": self._mask_phone,
"upper_case": lambda v, **_: v.upper() if isinstance(v, str) else v,
"remove_duplicates": lambda df, **_: df.drop_duplicates(),
}
return registry.get(rule_type, lambda v, **_: v)
def _mask_phone(self, value: Any, **kwargs) -> str:
"""手机号脱敏:138****1234"""
phone = str(value)
if len(phone) == 11:
return phone[:3] + "****" + phone[-4:]
return phone
def transform(self, table_name: str, df: pd.DataFrame) -> pd.DataFrame:
"""对指定表的数据执行转换"""
if table_name not in self.rules:
print(f"[警告] 表 {table_name} 无转换规则,跳过")
return df
for rule in self.rules[table_name]:
try:
if rule.name == "remove_duplicates":
df = rule.func(df, **rule.params)
else:
col = rule.params.get("column")
if col and col in df.columns:
df[col] = df[col].apply(
lambda v: rule.func(v, **rule.params)
)
print(f" [规则] {rule.name} 执行成功")
except Exception as e:
print(f" [错误] 规则 {rule.name} 执行失败: {e}")
return df
def run_pipeline(self, table_name: str, df: pd.DataFrame) -> pd.DataFrame:
"""执行完整转换管道"""
print(f"开始转换表: {table_name} (共 {len(df)} 行)")
# 1. 数据清洗
df = self.transform(table_name, df)
# 2. 字段选择
columns = [r.params.get("column")
for r in self.rules.get(table_name, [])
if r.params.get("column")]
valid_cols = [c for c in columns if c in df.columns]
if valid_cols:
df = df[list(dict.fromkeys(valid_cols))] # 去重保持顺序
print(f"转换完成: {len(df)} 行, {len(df.columns)} 列")
return df
6.3 数据加载与调度监控
数据经过抽取和转换后,需要加载到目标系统中。加载策略分为全量加载(首次或重建时)和增量加载(日常更新)。增量加载通常使用时间戳或自增ID作为增量标记。调度监控模块记录每次ETL任务的执行情况,包括数据量、耗时、错误信息等,并通过看板或告警通知维护人员。
from datetime import datetime
import sqlite3
import json
class DataLoader:
"""数据加载器"""
def __init__(self, target_conn_str: str, load_strategy: str = "append"):
self.engine = sa.create_engine(target_conn_str)
self.load_strategy = load_strategy # append, replace, upsert
def load(self, table_name: str, df: pd.DataFrame,
primary_key: str = None) -> dict:
"""加载数据到目标表"""
result = {
"table": table_name,
"rows_before": 0,
"rows_loaded": len(df),
"rows_after": 0,
"status": "pending"
}
try:
if self.load_strategy == "replace":
df.to_sql(table_name, self.engine,
if_exists="replace", index=False)
elif self.load_strategy == "append":
df.to_sql(table_name, self.engine,
if_exists="append", index=False)
elif self.load_strategy == "upsert" and primary_key:
# 先删除已存在的记录
existing_ids = tuple(df[primary_key].unique())
if len(existing_ids) == 1:
existing_ids = f"('{existing_ids[0]}')"
with self.engine.connect() as conn:
conn.execute(text(
f"DELETE FROM {table_name} "
f"WHERE {primary_key} IN {existing_ids}"
))
conn.commit()
df.to_sql(table_name, self.engine,
if_exists="append", index=False)
result["status"] = "success"
# 统计加载后的行数
with self.engine.connect() as conn:
count = conn.execute(
text(f"SELECT COUNT(*) FROM {table_name}")
).scalar()
result["rows_after"] = count
except Exception as e:
result["status"] = "failed"
result["error"] = str(e)
return result
class ETLPipeline:
"""完整ETL管道"""
def __init__(self, name: str):
self.name = name
self.extractor: Optional[Extractor] = None
self.transformer = DataTransformer()
self.loader: Optional[DataLoader] = None
self.job_history = []
def run(self, table_name: str, **kwargs) -> dict:
"""执行一次完整的ETL任务"""
job_id = datetime.now().strftime("%Y%m%d%H%M%S")
start_time = datetime.now()
print(f"\n{'='*50}")
print(f"ETL任务 [{job_id}] 开始: {self.name}/{table_name}")
print(f"{'='*50}")
job_result = {
"job_id": job_id,
"pipeline": self.name,
"table": table_name,
"start_time": start_time,
"extract": {},
"transform": {},
"load": {},
"status": "running"
}
try:
# 1. 抽取
print("\n[步骤1/3] 数据抽取...")
df = self.extractor.extract(**kwargs)
job_result["extract"] = {"rows": len(df), "columns": list(df.columns)}
print(f" 抽取完成: {len(df)} 行, {len(df.columns)} 列")
# 2. 转换
print("\n[步骤2/3] 数据转换...")
df = self.transformer.run_pipeline(table_name, df)
job_result["transform"] = {"rows_after": len(df)}
# 3. 加载
print("\n[步骤3/3] 数据加载...")
load_result = self.loader.load(table_name, df)
job_result["load"] = load_result
# 完成
elapsed = (datetime.now() - start_time).total_seconds()
job_result["status"] = "success" if load_result["status"] == "success" else "failed"
job_result["elapsed_seconds"] = round(elapsed, 2)
print(f"\nETL任务完成! 耗时: {elapsed:.2f}秒")
except Exception as e:
job_result["status"] = "failed"
job_result["error"] = str(e)
print(f"\nETL任务失败: {e}")
job_result["end_time"] = datetime.now()
self.job_history.append(job_result)
# 记录到本地历史文件
self._persist_job(job_result)
return job_result
def _persist_job(self, result: dict):
"""持久化任务记录"""
history_file = f"etl_history_{self.name}.jsonl"
with open(history_file, "a", encoding="utf-8") as f:
f.write(json.dumps(result, default=str) + "\n")
def get_recent_jobs(self, n: int = 10) -> list:
"""获取最近的ETL任务记录"""
return self.job_history[-n:]
七、系统集成部署
一个完整的办公自动化系统不仅包含核心业务逻辑,还需要考虑配置管理、日志监控、异常告警、部署运维等工程化问题。良好的系统集成部署设计能够确保系统稳定运行、快速定位问题和便捷扩展功能。本节从工程实践角度介绍系统集成部署的最佳实践。
7.1 配置管理体系
所有与环境相关的参数(数据库连接串、邮件账户、API密钥、文件路径等)都应该从代码中剥离,通过配置文件管理。推荐使用层次化配置方案:默认配置(config/default.yaml)提供合理的默认值,环境配置(config/production.yaml)覆盖特定环境的参数,环境变量提供敏感信息的注入途径,命令行参数提供运行时覆盖能力。
import os
import yaml
from pathlib import Path
from typing import Any, Dict
class ConfigManager:
"""层次化配置管理器"""
def __init__(self, config_dir: str = "config"):
self.config_dir = Path(config_dir)
self._config: Dict[str, Any] = {}
def load(self, env: str = "development"):
"""加载配置(按优先级从低到高)"""
# 1. 默认配置
default_path = self.config_dir / "default.yaml"
if default_path.exists():
with open(default_path, "r", encoding="utf-8") as f:
self._config.update(yaml.safe_load(f) or {})
# 2. 环境配置
env_path = self.config_dir / f"{env}.yaml"
if env_path.exists():
with open(env_path, "r", encoding="utf-8") as f:
self._config.update(yaml.safe_load(f) or {})
# 3. 环境变量覆盖(前缀 APP_)
for key, value in os.environ.items():
if key.startswith("APP_"):
config_key = key[4:].lower().replace("__", ".")
self._set_nested(config_key, value)
return self
def _set_nested(self, key: str, value: Any):
"""设置嵌套配置项"""
keys = key.split(".")
target = self._config
for k in keys[:-1]:
if k not in target:
target[k] = {}
target = target[k]
target[keys[-1]] = value
def get(self, key: str, default: Any = None) -> Any:
"""获取配置项(支持点号分隔的嵌套访问)"""
keys = key.split(".")
target = self._config
for k in keys:
if isinstance(target, dict):
target = target.get(k)
else:
return default
return target if target is not None else default
@property
def all(self) -> dict:
return self._config
# === config/default.yaml 示例 ===
"""
app:
name: "OA Auto System"
version: "1.0.0"
debug: false
database:
host: "localhost"
port: 3306
name: "oa_auto"
user: "root"
password: "" # 由环境变量 APP_database__password 提供
email:
sender: "noreply@company.com"
smtp_server: "smtp.company.com"
smtp_port: 465
use_ssl: true
schedule:
daily_report: "09:30"
weekly_report: "09:00"
etl_pipeline: "02:00"
logging:
level: "INFO"
file: "logs/app.log"
max_bytes: 10485760 # 10MB
backup_count: 5
paths:
data_dir: "./data"
output_dir: "./output"
template_dir: "./templates"
"""
# 使用方式
config = ConfigManager().load(env=os.getenv("APP_ENV", "development"))
db_host = config.get("database.host", "localhost")
email_config = {
"sender": config.get("email.sender"),
"password": config.get("email.password")
}
7.2 日志与监控告警
日志系统是定位问题的重要工具。推荐使用loguru替代标准logging库,它提供了更简洁的API、自动旋转日志、结构化日志输出等特性。监控告警方面,当系统出现异常(如数据源连接失败、生成报告为空、邮件发送失败)时,应能自动通过邮件或IM工具通知运维人员。告警策略应避免重复告警风暴,可以采用分级告警和告警聚合机制。
from loguru import logger
import sys
import json
from datetime import datetime
from typing import Optional
import smtplib
from email.mime.text import MIMEText
class LoggingSetup:
"""日志系统初始化"""
@staticmethod
def setup(config: dict):
# 移除默认的handler
logger.remove()
# 控制台输出(带颜色)
logger.add(
sys.stderr,
format="{time:YYYY-MM-DD HH:mm:ss} | "
"{level: <8} | "
"{name} :{function} :{line} | "
"{message} ",
level=config.get("level", "INFO"),
colorize=True
)
# 文件输出(自动旋转)
logger.add(
config.get("file", "logs/app.log"),
rotation=config.get("max_bytes", 10485760),
retention=config.get("backup_count", 5),
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="DEBUG",
encoding="utf-8"
)
# 错误单独记录到一个文件
logger.add(
"logs/error.log",
rotation="10 MB",
level="ERROR",
encoding="utf-8",
backtrace=True,
diagnose=True
)
logger.info("日志系统初始化完成")
class AlertManager:
"""告警管理器(分级告警 + 告警聚合)"""
def __init__(self, email_config: dict = None,
webhook_url: str = None):
self.email_config = email_config
self.webhook_url = webhook_url
self.alert_history = []
self.silence_period = 300 # 相同告警5分钟内不重复发送
def send_alert(self, level: str, title: str,
message: str, source: str = "system"):
"""发送告警
Args:
level: CRITICAL / ERROR / WARNING / INFO
title: 告警标题
message: 告警内容
source: 告警来源模块
"""
alert = {
"level": level,
"title": title,
"message": message,
"source": source,
"time": datetime.now()
}
# 告警聚合:检查是否在静默期内
for prev in self.alert_history[-10:]:
if (prev["title"] == title
and prev["source"] == source
and (datetime.now() - prev["time"]).total_seconds()
< self.silence_period):
logger.debug(f"[告警聚合] 跳过重复告警: {title}")
return
self.alert_history.append(alert)
logger.warning(f"[{level}] {title}: {message}")
# 分级处理
if level in ("CRITICAL", "ERROR"):
self._send_email_alert(level, title, message)
self._send_webhook_alert(level, title, message)
elif level == "WARNING":
self._send_webhook_alert(level, title, message)
# INFO级别仅记录日志
def _send_email_alert(self, level: str, title: str, message: str):
"""发送邮件告警"""
if not self.email_config:
return
try:
msg = MIMEText(
f"告警级别: {level}\n"
f"告警来源: {title}\n"
f"详细信息: {message}\n"
f"发生时间: {datetime.now()}\n",
"plain", "utf-8"
)
msg["Subject"] = f"[{level}] 办公自动化系统告警 - {title}"
msg["From"] = self.email_config["sender"]
msg["To"] = self.email_config.get("admin_email", "admin@company.com")
with smtplib.SMTP_SSL(
self.email_config["smtp_server"],
self.email_config["smtp_port"]
) as server:
server.login(
self.email_config["sender"],
self.email_config["password"]
)
server.send_message(msg)
logger.info(f"告警邮件已发送: {title}")
except Exception as e:
logger.error(f"告警邮件发送失败: {e}")
def _send_webhook_alert(self, level: str, title: str, message: str):
"""发送Webhook告警(企业微信/钉钉)"""
if not self.webhook_url:
return
try:
import requests
payload = {
"msgtype": "markdown",
"markdown": {
"content": (
f"### 系统告警通知\n"
f"> **级别**: {level}\n"
f"> **标题**: {title}\n"
f"> **详情**: {message[:200]}\n"
f"> **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
)
}
}
resp = requests.post(self.webhook_url, json=payload, timeout=5)
if resp.status_code != 200:
logger.error(f"Webhook发送失败: {resp.text}")
except Exception as e:
logger.error(f"Webhook告警失败: {e}")
# 使用示例
LoggingSetup.setup({
"level": "INFO",
"file": "logs/app.log",
"max_bytes": 10485760,
"backup_count": 5
})
alert_mgr = AlertManager(
email_config={"sender": "alert@company.com", "smtp_server": "smtp.company.com",
"smtp_port": 465, "admin_email": "admin@company.com"},
webhook_url="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"
)
# 业务代码中调用
try:
# ... 业务操作 ...
pass
except Exception as e:
alert_mgr.send_alert("ERROR", "日报生成失败", str(e), "daily_report")
logger.exception("日报生成过程发生异常")
7.3 部署与定时触发
办公自动化系统通常不需要7x24小时的Web服务,更适合以定时任务或Windows服务的形式部署。Windows环境可以使用任务计划程序(Task Scheduler),Linux环境使用cron。也可以将Python脚本打包为exe可执行文件,方便在没有Python环境的机器上运行。对于需要Web界面的系统,可以使用Flask/FastAPI内建服务器配合nssm注册为Windows服务。
"""
=== Windows任务计划程序配置示例 ===
创建定时任务启动日报系统:
schtasks /create /tn "OA_DailyReport" /tr "python D:\\oa_auto\\main.py" /sc daily /st 09:30 /ru SYSTEM
检查任务状态:
schtasks /query /tn "OA_DailyReport"
=== Linux crontab配置示例 ===
# 每天9:30执行日报生成
30 9 * * 1-5 cd /opt/oa_auto && python main.py daily_report >> logs/cron.log 2>&1
# 每天凌晨2点执行ETL管道
0 2 * * * cd /opt/oa_auto && python main.py etl >> logs/cron.log 2>&1
# 每周一9点执行周报
0 9 * * 1 cd /opt/oa_auto && python main.py weekly_report >> logs/cron.log 2>&1
"""
# === 主入口 main.py ===
import sys
import argparse
from pathlib import Path
def setup_environment():
"""初始化运行环境"""
# 确保目录存在
for dir_name in ["logs", "output", "data", "config"]:
Path(dir_name).mkdir(exist_ok=True)
# 加载配置
config = ConfigManager().load(
env=os.getenv("APP_ENV", "production")
)
# 初始化日志
LoggingSetup.setup(config.get("logging", {}))
return config
def cmd_daily_report(config):
"""执行日报生成命令"""
logger.info("执行日报生成任务...")
pipeline = DailyReportPipeline({
"template_path": config.get("paths.template_dir") + "/daily_report_template.docx",
"data_dir": config.get("paths.data_dir") + "/daily",
"email": config.get("email"),
"report_recipients": config.get("report.recipients", []),
"manager_email": config.get("report.manager_email"),
"admin_email": config.get("report.admin_email"),
})
pipeline.run()
def cmd_etl(config):
"""执行ETL任务"""
logger.info("执行ETL管道任务...")
# ETL执行逻辑
pass
def cmd_weekly_report(config):
"""执行周报生成"""
logger.info("执行周报生成任务...")
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="办公自动化系统")
parser.add_argument("command", choices=[
"daily_report", "weekly_report", "etl", "dashboard"
], help="要执行的命令")
args = parser.parse_args()
config = setup_environment()
logger.info(f"系统启动,执行命令: {args.command}")
commands = {
"daily_report": cmd_daily_report,
"weekly_report": cmd_weekly_report,
"etl": cmd_etl,
}
cmd_func = commands.get(args.command)
if cmd_func:
cmd_func(config)
else:
logger.error(f"未知命令: {args.command}")
sys.exit(1)
八、代码质量与测试
办公自动化系统虽然不像互联网高并发系统那样对性能有极端要求,但由于其涉及公司核心业务数据(财务数据、客户信息、内部文档),对正确性和稳定性的要求极高。一个错误的报表生成可能导致错误的管理决策,一封错误发送的邮件可能造成数据泄露。因此,系统化的测试和质量保障体系不可或缺。
8.1 项目结构组织
良好的项目结构是代码质量的基础。推荐采用模块化分层设计:数据访问层(data/)负责所有数据读写操作;业务逻辑层(services/)实现核心业务流程;展示层(ui/)处理输出格式和用户交互;工具层(utils/)提供通用工具函数。各层之间通过清晰的接口通信,降低耦合度。
"""
=== 推荐的项目目录结构 ===
oa_auto/
├── README.md
├── requirements.txt # 项目依赖
├── setup.py # 安装脚本
├── main.py # 主入口
├── config/
│ ├── default.yaml # 默认配置
│ ├── development.yaml # 开发环境配置
│ └── production.yaml # 生产环境配置
├── data/ # 数据文件目录
│ ├── daily/ # 日报数据
│ └── reference/ # 参考数据
├── logs/ # 日志目录
├── output/ # 输出文件目录
├── templates/ # 文档模板目录
├── src/ # 源代码
│ ├── __init__.py
│ ├── data/ # 数据访问层
│ │ ├── __init__.py
│ │ ├── extractors.py # 数据抽取器
│ │ ├── database.py # 数据库操作
│ │ └── cache.py # 缓存管理
│ ├── services/ # 业务逻辑层
│ │ ├── __init__.py
│ │ ├── report_service.py # 报告生成服务
│ │ ├── approval_service.py # 审批服务
│ │ ├── document_service.py # 文档处理服务
│ │ └── etl_service.py # ETL服务
│ ├── ui/ # 展示层
│ │ ├── __init__.py
│ │ ├── excel_exporter.py # Excel导出
│ │ ├── chart_generator.py # 图表生成
│ │ └── email_dispatcher.py # 邮件分发
│ └── utils/ # 工具层
│ ├── __init__.py
│ ├── config.py # 配置管理
│ ├── logger.py # 日志工具
│ ├── alert.py # 告警工具
│ └── decorators.py # 通用装饰器
├── tests/ # 测试目录
│ ├── __init__.py
│ ├── conftest.py # pytest fixtures
│ ├── test_extractors.py # 测试数据抽取
│ ├── test_report_service.py # 测试报告服务
│ ├── test_document_factory.py # 测试文档工厂
│ └── test_data/
│ ├── sample_sales.xlsx # 测试数据文件
│ └── sample_template.docx
└── scripts/ # 运维脚本
├── setup_env.py # 环境初始化
├── run_tests.py # 运行测试
└── deploy.py # 部署脚本
"""
# src/utils/decorators.py - 通用装饰器
import functools
import time
from loguru import logger
def log_execution(func):
"""记录函数执行时间和结果"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
start = time.time()
logger.debug(f"[{func_name}] 开始执行")
try:
result = func(*args, **kwargs)
elapsed = time.time() - start
logger.debug(f"[{func_name}] 执行完成, 耗时: {elapsed:.3f}s")
return result
except Exception as e:
elapsed = time.time() - start
logger.error(f"[{func_name}] 执行失败 (耗时{elapsed:.3f}s): {e}")
raise
return wrapper
def retry(max_attempts: int = 3, delay: float = 1.0,
exceptions: tuple = (Exception,)):
"""失败重试装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts:
logger.warning(
f"[{func.__name__}] 第{attempt}次尝试失败, "
f"{delay}秒后重试: {e}"
)
time.sleep(delay)
else:
logger.error(
f"[{func.__name__}] 重试{max_attempts}次均失败"
)
raise last_exception
return wrapper
return decorator
8.2 单元测试与集成测试
单元测试使用pytest框架编写,每个模块对应一个测试文件。使用pytest fixtures管理测试数据和mock对象。对于文件IO密集型操作(如Excel读取、Word生成),使用临时目录(tmp_path fixture)创建测试文件,测试完成后自动清理。集成测试则验证多个模块协同工作的正确性,可以使用真实的小样本数据进行端到端验证。
# tests/test_report_service.py
import pytest
from datetime import date, datetime
from pathlib import Path
from src.services.report_service import ReportGenerator
from src.data.extractors import ExcelDataExtractor
class TestReportGenerator:
"""报告生成器单元测试"""
@pytest.fixture
def sample_data(self):
"""准备测试数据"""
from src.data.extractors import DailyReportRow
return [
DailyReportRow(
department="销售部",
reporter="张三",
report_date=date(2026, 5, 1),
kpi_name="销售额",
kpi_value=50000.0,
unit="元",
remark="正常"
),
DailyReportRow(
department="运营部",
reporter="李四",
report_date=date(2026, 5, 1),
kpi_name="新增用户",
kpi_value=1200.0,
unit="人",
remark=""
),
]
@pytest.fixture
def mock_template(self, tmp_path):
"""创建临时测试模板"""
from docx import Document
doc = Document()
doc.add_paragraph("{{summary}}")
doc.add_paragraph("日报详情")
template_path = tmp_path / "test_template.docx"
doc.save(str(template_path))
return str(template_path)
def test_fill_summary_normal(self, mock_template, sample_data):
"""测试正常摘要填充"""
generator = ReportGenerator(mock_template)
generator.fill_summary(
total_revenue=50000.0,
total_orders=2,
abnormal_count=0
)
# 验证摘要是否正确填充
text_content = "\n".join(p.text for p in generator.doc.paragraphs)
assert "50000" in text_content
assert "2" in text_content
assert "{{summary}}" not in text_content
def test_fill_summary_abnormal(self, mock_template, sample_data):
"""测试异常数据时摘要标记"""
generator = ReportGenerator(mock_template)
generator.fill_summary(
total_revenue=50000.0,
total_orders=3,
abnormal_count=2
)
text_content = "\n".join(p.text for p in generator.doc.paragraphs)
assert "异常" in text_content
def test_add_data_table_empty(self, mock_template):
"""测试空数据表格生成"""
generator = ReportGenerator(mock_template)
generator.add_data_table([])
# 验证至少有一个表格
assert len(generator.doc.tables) >= 1
def test_add_data_table_with_data(self, mock_template, sample_data):
"""测试有数据时表格生成"""
generator = ReportGenerator(mock_template)
generator.add_data_table(sample_data)
table = generator.doc.tables[0]
# 表头 + 2行数据 = 3行
assert len(table.rows) == 3
# 验证表头
assert table.rows[0].cells[0].text == "部门"
def test_save_report(self, mock_template, sample_data, tmp_path):
"""测试报告保存"""
generator = ReportGenerator(mock_template)
generator.add_data_table(sample_data)
output_path = tmp_path / "test_output.docx"
generator.save(str(output_path))
assert Path(output_path).exists()
assert Path(output_path).stat().st_size > 0
class TestExcelDataExtractor:
"""数据抽取器单元测试"""
@pytest.fixture
def sample_excel(self, tmp_path):
"""创建测试Excel文件"""
import openpyxl
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sheet1"
ws.append(["日期", "姓名", "销售额", "备注"])
ws.append([date(2026, 5, 1), "张三", 50000, "正常"])
ws.append([date(2026, 5, 2), "李四", 62000, "大客户"])
ws.append([date(2026, 5, 3), "王五", 38000, None])
file_path = tmp_path / "test_sales.xlsx"
wb.save(str(file_path))
return str(file_path)
def test_extract_valid_data(self, sample_excel):
"""测试从有效Excel中提取数据"""
extractor = ExcelDataExtractor(sample_excel, "销售部")
records = extractor.extract_sales_data()
assert len(records) == 3
assert records[0].reporter == "张三"
assert records[0].kpi_value == 50000.0
assert records[2].remark is None
def test_extract_invalid_file(self):
"""测试文件不存在时的异常处理"""
with pytest.raises(FileNotFoundError):
ExcelDataExtractor("nonexistent.xlsx", "销售部")
def test_unknown_department(self, sample_excel):
"""测试未知部门的异常处理"""
extractor = ExcelDataExtractor(sample_excel, "未知部")
with pytest.raises(ValueError, match="不支持的部门"):
extractor.get_all_records()
8.3 CI/CD集成
建议将办公自动化系统纳入持续集成流程。每次代码提交后自动运行单元测试和集成测试,确保新改动不破坏已有功能。使用flake8或black保证代码风格统一。可以配置GitHub Actions或Jenkins实现自动化流水线。对于迭代频繁的项目,还可以加入自动化发布流程,自动打包exe并上传到内部软件仓库。
# === .github/workflows/test.yml ===
"""
name: OA Auto System CI
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: 设置Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: 安装依赖
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov flake8
- name: 代码风格检查
run: |
flake8 src/ --max-line-length=100 --exclude __pycache__
- name: 运行单元测试
run: |
pytest tests/ -v --cov=src --cov-report=term-missing
- name: 测试覆盖率报告
run: |
pytest tests/ --cov=src --cov-report=xml
- name: 集成测试
run: |
python -m pytest tests/integration/ -v
"""
# === 本地运行测试 ===
"""
# 安装测试依赖
pip install pytest pytest-cov pytest-mock flake8
# 运行所有测试
pytest tests/ -v
# 运行特定测试文件
pytest tests/test_report_service.py -v
# 运行特定测试函数
pytest tests/test_report_service.py::TestReportGenerator::test_fill_summary_normal -v
# 生成覆盖率报告
pytest tests/ --cov=src --cov-report=html
# 代码风格检查
flake8 src/ --max-line-length=100 --statistics
# === 运行测试脚本 scripts/run_tests.py ===
import subprocess
import sys
def run_all_tests():
print("=" * 50)
print("办公自动化系统 - 测试套件")
print("=" * 50)
# 1. 代码风格检查
print("\n[1/3] 代码风格检查...")
result = subprocess.run(
["flake8", "src/", "--max-line-length=100", "--statistics"],
capture_output=True, text=True
)
if result.returncode == 0:
print(" ✓ 代码风格检查通过")
else:
print(f" ✗ 代码风格问题:\n{result.stdout}")
# 2. 单元测试
print("\n[2/3] 运行单元测试...")
result = subprocess.run(
["pytest", "tests/", "-v", "--tb=short"],
capture_output=True, text=True
)
print(result.stdout)
if result.returncode == 0:
print(" ✓ 所有单元测试通过")
else:
print(" ✗ 部分测试失败")
print(result.stderr[:500] if result.stderr else "")
# 3. 覆盖率报告
print("\n[3/3] 生成覆盖率报告...")
result = subprocess.run(
["pytest", "tests/", "--cov=src", "--cov-report=term-missing",
"--cov-fail-under=80"],
capture_output=True, text=True
)
print(result.stdout)
if result.returncode == 0:
print("\n✓ 所有检查通过!")
else:
print("\n✗ 检查未通过,请修复问题后重试")
sys.exit(1)
if __name__ == "__main__":
run_all_tests()
"""
九、全面案例
理论需要实践来检验。本节通过两个完整的综合案例,展示如何将前面各章节的知识点串联起来,构建真实可用的办公自动化系统。每个案例都包含需求分析、架构设计、核心代码实现和运行效果展示。建议读者跟随案例动手实践,在理解的基础上进行定制和扩展。
9.1 案例一:从零构建企业周报自动化系统
9.1.1 需求背景
某互联网公司有5个业务部门(销售、运营、产品、技术、客服),每个部门每周五下班前需提交周报。管理层希望在每周一上午9点收到整合后的全公司周报汇总。痛点在于:各部门周报格式五花八门、汇总工作耗时2-3小时、历史数据难以检索、缺少同比环比分析。
9.1.2 系统设计
采用"统一模板下发 -> 分布式填报 -> 自动汇总 -> 智能分析 -> 定时分发"的五阶段架构。核心模块包括:模板管理模块(定义Excel周报模板标准格式);数据采集模块(定时扫描各部门提交的Excel文件并解析);数据汇总引擎(将所有部门数据合并并生成汇总报告);智能分析模块(自动计算环比增长率、异常检测、趋势预测);分发模块(生成PDF和Word双格式报告并通过邮件发送)。
"""
案例一:企业周报自动化系统 - 核心实现
"""
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from datetime import date, timedelta, datetime
from pathlib import Path
from typing import List, Dict, Optional
import pandas as pd
# ---------- 模块1:周报模板管理 ----------
class WeeklyTemplateManager:
"""周报模板管理"""
TEMPLATE_HEADERS = [
"部门", "本周重点工作", "完成情况(%)", "关键指标",
"本周数据", "上周数据", "环比增长率(%)", "问题与风险", "下周计划"
]
def create_template(self, output_path: str, department: str):
"""创建标准化周报模板"""
wb = openpyxl.Workbook()
ws = wb.active
ws.title = f"{department}周报"
# 标题样式
title_font = Font(name="微软雅黑", size=14, bold=True, color="FFFFFF")
title_fill = PatternFill(start_color="2E7D32", end_color="2E7D32",
fill_type="solid")
header_font = Font(name="微软雅黑", size=11, bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="388E3C", end_color="388E3C",
fill_type="solid")
# 标题行
ws.merge_cells("A1:I1")
title_cell = ws["A1"]
title_cell.value = f"{department} - 周报 ({self._week_range_str()})"
title_cell.font = title_font
title_cell.fill = title_fill
title_cell.alignment = Alignment(horizontal="center", vertical="center")
ws.row_dimensions[1].height = 40
# 填报说明
ws.merge_cells("A2:I2")
ws["A2"].value = f"填报人: ________ 填报日期: {date.today()} 提交截止: 每周五18:00"
ws["A2"].font = Font(name="微软雅黑", size=9, color="666666")
# 表头
for col_idx, header in enumerate(self.TEMPLATE_HEADERS, 1):
cell = ws.cell(row=3, column=col_idx)
cell.value = header
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center", vertical="center",
wrap_text=True)
# 数据行(预留5行)
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin")
)
for row in range(4, 9):
for col in range(1, 10):
cell = ws.cell(row=row, column=col)
cell.border = thin_border
# 列宽设置
col_widths = [12, 25, 12, 12, 12, 12, 14, 20, 25]
for i, width in enumerate(col_widths, 1):
ws.column_dimensions[openpyxl.utils.get_column_letter(i)].width = width
# 数据验证:完成情况(0-100的整数)
from openpyxl.worksheet.datavalidation import DataValidation
dv = DataValidation(type="whole", operator="between",
formula1="0", formula2="100")
dv.error = "请输入0-100之间的整数"
dv.errorTitle = "输入错误"
dv.add(f"D4:D8")
ws.add_data_validation(dv)
wb.save(output_path)
print(f"周报模板已创建: {output_path}")
def _week_range_str(self) -> str:
"""获取当前周的日期范围"""
today = date.today()
monday = today - timedelta(days=today.weekday())
friday = monday + timedelta(days=4)
return f"{monday.strftime('%m/%d')}-{friday.strftime('%m/%d')}"
# ---------- 模块2:周报自动汇总引擎 ----------
class WeeklyReportCollector:
"""周报数据自动汇总"""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
def collect_all(self) -> pd.DataFrame:
"""收集所有部门的周报数据"""
all_data = []
for file_path in self.data_dir.glob("*周报*.xlsx"):
try:
df = pd.read_excel(file_path, sheet_name=0, header=2)
df = df.dropna(how="all") # 删除空行
df = df[df["部门"].notna()] # 删除无效行
all_data.append(df)
print(f"收集: {file_path.name} ({len(df)} 条)")
except Exception as e:
print(f"解析失败 {file_path.name}: {e}")
if not all_data:
return pd.DataFrame()
return pd.concat(all_data, ignore_index=True)
def check_submission(self) -> Dict[str, bool]:
"""检查各部门是否已提交"""
expected_depts = ["销售部", "运营部", "产品部", "技术部", "客服部"]
submitted = {}
for dept in expected_depts:
files = list(self.data_dir.glob(f"*{dept}*.xlsx"))
submitted[dept] = len(files) > 0
return submitted
# ---------- 模块3:周报分析器 ----------
class WeeklyReportAnalyzer:
"""周报数据分析"""
def __init__(self, data: pd.DataFrame):
self.data = data
def calc_growth_rate(self) -> pd.DataFrame:
"""计算各部门环比增长率"""
if "本周数据" not in self.data or "上周数据" not in self.data:
return self.data
self.data["环比增长率(%)"] = (
(self.data["本周数据"] - self.data["上周数据"])
/ self.data["上周数据"].replace(0, 1) * 100
).round(1)
return self.data
def detect_anomalies(self, threshold: float = 30.0) -> List[dict]:
"""检测异常数据(环比波动超过阈值)"""
anomalies = []
if "环比增长率(%)" not in self.data.columns:
return anomalies
for _, row in self.data.iterrows():
if abs(row["环比增长率(%)"]) > threshold:
anomalies.append({
"部门": row.get("部门", "未知"),
"指标": row.get("关键指标", ""),
"波动": f"{row['环比增长率(%)']:+.1f}%",
"本周": row.get("本周数据", 0),
"上周": row.get("上周数据", 0),
})
return anomalies
def generate_summary(self) -> str:
"""生成汇总摘要文本"""
total_items = len(self.data)
avg_completion = self.data["完成情况(%)"].mean() if "完成情况(%)" in self.data else 0
anomalies = self.detect_anomalies()
summary = (
f"本周共收集 {total_items} 项重点工作,平均完成进度 {avg_completion:.1f}%。\n"
f"异常波动项: {len(anomalies)} 项。\n"
)
if anomalies:
summary += "需关注:\n"
for a in anomalies[:5]:
summary += f" - {a['部门']} {a['指标']} 环比{a['波动']}\n"
return summary
# ---------- 模块4:全流程编排 ----------
class WeeklyReportSystem:
"""周报自动化全系统"""
def __init__(self, config: dict):
self.config = config
self.template_mgr = WeeklyTemplateManager()
self.collector = WeeklyReportCollector(config["data_dir"])
self.report_gen = ReportGenerator(config.get("template_path", "templates/weekly_report_template.docx"))
self.email_disp = EmailDispatcher(config["email"])
def run_full_cycle(self):
"""执行完整的周报周期"""
print("\n" + "=" * 60)
print("企业周报自动化系统 - 全周期执行")
print("=" * 60)
today = date.today()
# 阶段1:周一 - 下发模板
if today.weekday() == 0: # Monday
print("\n[阶段1] 下发周报模板...")
for dept in ["销售部", "运营部", "产品部", "技术部", "客服部"]:
path = f"{self.config['data_dir']}/{dept}_周报模板.xlsx"
self.template_mgr.create_template(path, dept)
print(f" 模板已下发: {dept}")
print(" 模板下发完成,请各部门周五前提交")
# 阶段2:周一上午 - 检查提交状态
print("\n[阶段2] 检查提交状态...")
submission_status = self.collector.check_submission()
missing = [dept for dept, submitted in submission_status.items() if not submitted]
if missing:
print(f" 以下部门尚未提交: {', '.join(missing)}")
# 发送催交通知
for dept in missing:
self.email_disp.send_report(
recipients=[f"{dept}@company.com"],
subject="【提醒】请提交周报",
body=f"
{dept}的同事您好,请于今日18:00前提交本周周报。
",
attachment_path=""
)
else:
print(" 所有部门均已提交 ✓")
# 阶段3:汇总与分析
print("\n[阶段3] 汇总与分析...")
all_data = self.collector.collect_all()
if not all_data.empty:
analyzer = WeeklyReportAnalyzer(all_data)
analyzer.calc_growth_rate()
summary = analyzer.generate_summary()
print(f" 汇总摘要:\n{summary}")
anomalies = analyzer.detect_anomalies()
if anomalies:
print(f" 发现 {len(anomalies)} 项异常数据,已发送预警")
# 阶段4:生成汇总报告
print("\n[阶段4] 生成汇总报告...")
report_path = f"周报汇总_{today.strftime('%Y%m%d')}.docx"
if not all_data.empty:
self.report_gen.fill_summary(
total_revenue=float(all_data["本周数据"].sum()) if "本周数据" in all_data else 0,
total_orders=len(all_data),
abnormal_count=len(anomalies) if 'anomalies' in dir() else 0
)
self.report_gen.save(report_path)
print(f" 汇总报告已生成: {report_path}")
# 阶段5:邮件分发
print("\n[阶段5] 邮件分发...")
self.email_disp.send_report(
recipients=self.config.get("report_recipients", []),
subject=f"【周报汇总】{today.strftime('%Y-%m-%d')} 全公司周报",
body=self._build_html(all_data, summary if 'summary' in dir() else ""),
attachment_path=report_path
)
print("周报已发送至管理层 ✓")
print("\n" + "=" * 60)
print("全周期执行完成!")
print("=" * 60)
def _build_html(self, data, summary):
"""构建HTML邮件正文"""
table_rows = ""
if not data.empty:
for _, row in data.head(15).iterrows():
table_rows += (
f"
{row.get('部门', '')} "
f"{row.get('本周重点工作', '')[:20]} "
f"{row.get('完成情况(%)', '')}% "
f"{row.get('环比增长率(%)', '')}% "
)
return f"""
📊 全公司周报汇总
周期: {self._week_range_str()}
摘要: {summary}
详细报告请见附件。
系统自动生成,请勿回复
"""
@staticmethod
def _week_range_str() -> str:
today = date.today()
monday = today - timedelta(days=today.weekday())
friday = monday + timedelta(days=4)
return f"{monday.strftime('%m/%d')}-{friday.strftime('%m/%d')}"
"""
案例一运行流程:
python main.py weekly_report
"""
9.2 案例二:多渠道数据汇总分析平台
某电商运营公司每天需要从淘宝、京东、拼多多、抖音四个电商平台拉取店铺经营数据,汇总后生成统一的运营日报。每个平台的API接口、数据格式、指标定义各不相同。运营团队每天需要花费大量时间手动登录各平台后台,复制粘贴数据到Excel中。
9.2.1 系统架构
本案例采用"适配器模式"统一不同平台的数据接口。每个平台编写一个适配器(Adapter),将平台特有数据格式转换为统一的内部数据模型。ETL管道负责数据的清洗和转换。汇总后的数据通过Web看板实时展示,同时每日定时生成PDF报告存档。核心架构分为四层:数据采集层(平台适配器)、数据处理层(ETL管道)、数据存储层(SQLite数据库)、数据展示层(Flask Web看板 + 报告生成器)。
"""
案例二:多渠道数据汇总分析平台 - 核心实现
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, date
import json
import hmac
import hashlib
import base64
import time
import requests
# ---------- 统一数据模型 ----------
@dataclass
class PlatformOrder:
"""统一订单数据模型"""
platform: str # 平台名称
order_id: str # 订单号
product_name: str # 商品名称
category: str # 商品类目
price: float # 成交金额
quantity: int # 购买数量
order_time: datetime # 下单时间
customer_city: str # 客户城市
payment_method: str # 支付方式
status: str # 订单状态
@dataclass
class PlatformStats:
"""统一平台统计模型"""
platform: str
date: date
total_revenue: float
total_orders: int
total_visitors: int
conversion_rate: float
avg_order_value: float
refund_amount: float
# ---------- 平台适配器基类 ----------
class PlatformAdapter(ABC):
"""平台适配器抽象基类"""
def __init__(self, app_key: str, app_secret: str):
self.app_key = app_key
self.app_secret = app_secret
@abstractmethod
def fetch_orders(self, start_date: date, end_date: date) -> List[PlatformOrder]:
pass
@abstractmethod
def fetch_stats(self, target_date: date) -> PlatformStats:
pass
@abstractmethod
def get_platform_name(self) -> str:
pass
# ---------- 淘宝适配器实现 ----------
class TaobaoAdapter(PlatformAdapter):
"""淘宝平台数据适配器"""
def get_platform_name(self) -> str:
return "淘宝"
def _sign_request(self, params: dict) -> str:
"""淘宝API签名"""
sorted_params = sorted(params.items())
sign_str = self.app_secret + "".join(
f"{k}{v}" for k, v in sorted_params
) + self.app_secret
return hashlib.md5(sign_str.encode()).hexdigest().upper()
def fetch_orders(self, start_date: date, end_date: date) -> List[PlatformOrder]:
"""调用淘宝订单API"""
params = {
"method": "taobao.trades.sold.get",
"app_key": self.app_key,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"start_created": start_date.strftime("%Y-%m-%d 00:00:00"),
"end_created": end_date.strftime("%Y-%m-%d 23:59:59"),
"fields": "tid,title,num,price,pay_time,receiver_city,payment,status",
"page_size": "100"
}
params["sign"] = self._sign_request(params)
try:
resp = requests.get(
"https://eco.taobao.com/router/rest",
params=params, timeout=30
)
data = resp.json()
trades = data.get("trades_sold_get_response", {}).get("trades", {}).get("trade", [])
orders = []
for t in trades:
orders.append(PlatformOrder(
platform="淘宝",
order_id=str(t.get("tid", "")),
product_name=t.get("title", ""),
category="", # 需额外API获取
price=float(t.get("payment", 0)),
quantity=int(t.get("num", 1)),
order_time=datetime.strptime(t["pay_time"], "%Y-%m-%d %H:%M:%S"),
customer_city=t.get("receiver_city", ""),
payment_method="",
status=t.get("status", "")
))
return orders
except Exception as e:
print(f"[淘宝] 订单获取失败: {e}")
return []
def fetch_stats(self, target_date: date) -> PlatformStats:
"""获取淘宝店铺统计"""
orders = self.fetch_orders(target_date, target_date)
if not orders:
return PlatformStats(
platform="淘宝", date=target_date,
total_revenue=0, total_orders=0, total_visitors=0,
conversion_rate=0.0, avg_order_value=0.0, refund_amount=0.0
)
total_revenue = sum(o.price for o in orders if o.status != "REFUND")
total_orders = len([o for o in orders if o.status != "REFUND"])
refunds = sum(o.price for o in orders if o.status == "REFUND")
avg_value = total_revenue / total_orders if total_orders > 0 else 0
return PlatformStats(
platform="淘宝", date=target_date,
total_revenue=round(total_revenue, 2),
total_orders=total_orders,
total_visitors=int(total_revenue * 3.5), # 估算
conversion_rate=round(total_orders / (total_revenue * 3.5) * 100, 2) if total_revenue > 0 else 0,
avg_order_value=round(avg_value, 2),
refund_amount=round(refunds, 2)
)
# ---------- 抖音适配器(示例) ----------
class DouyinAdapter(PlatformAdapter):
"""抖音电商平台数据适配器"""
def get_platform_name(self) -> str:
return "抖音"
def fetch_orders(self, start_date: date, end_date: date) -> List[PlatformOrder]:
"""调用抖音电商API"""
# 抖音API签名逻辑
timestamp = str(int(time.time()))
params = {
"app_id": self.app_key,
"timestamp": timestamp,
"start_time": start_date.strftime("%Y%m%d"),
"end_time": end_date.strftime("%Y%m%d"),
"page": "1",
"page_size": "50"
}
sign_str = self.app_secret + "".join(
f"{k}{v}" for k, v in sorted(params.items())
)
params["sign"] = hashlib.sha256(sign_str.encode()).hexdigest()
try:
resp = requests.get(
"https://open-api.douyin.com/order/list",
params=params, timeout=30
)
data = resp.json()
orders_data = data.get("data", {}).get("orders", [])
orders = []
for o in orders_data:
orders.append(PlatformOrder(
platform="抖音",
order_id=o.get("order_id", ""),
product_name=o.get("product_name", ""),
category=o.get("category_name", ""),
price=float(o.get("pay_amount", 0)) / 100,
quantity=int(o.get("product_num", 1)),
order_time=datetime.fromtimestamp(o.get("create_time", 0)),
customer_city=o.get("city", ""),
payment_method="",
status=o.get("order_status", "")
))
return orders
except Exception as e:
print(f"[抖音] 订单获取失败: {e}")
return []
def fetch_stats(self, target_date: date) -> PlatformStats:
orders = self.fetch_orders(target_date, target_date)
return self._calc_stats(orders, target_date)
def _calc_stats(self, orders: List[PlatformOrder], target_date: date) -> PlatformStats:
"""从订单列表计算统计指标"""
total_revenue = sum(o.price for o in orders)
total_orders = len(orders)
avg_value = total_revenue / total_orders if total_orders > 0 else 0
return PlatformStats(
platform="抖音", date=target_date,
total_revenue=round(total_revenue, 2),
total_orders=total_orders,
total_visitors=0,
conversion_rate=0.0,
avg_order_value=round(avg_value, 2),
refund_amount=0.0
)
# ---------- 汇总平台 ----------
class MultiPlatformAggregator:
"""多平台数据汇总器"""
def __init__(self):
self.adapters: Dict[str, PlatformAdapter] = {}
def register_adapter(self, adapter: PlatformAdapter):
self.adapters[adapter.get_platform_name()] = adapter
def aggregate_daily(self, target_date: date) -> Dict[str, PlatformStats]:
"""汇总所有平台的每日数据"""
results = {}
for name, adapter in self.adapters.items():
print(f" 正在拉取 {name} 数据...")
try:
stats = adapter.fetch_stats(target_date)
results[name] = stats
print(f" 完成: 销售额 {stats.total_revenue:,.2f}, "
f"订单 {stats.total_orders}")
except Exception as e:
print(f" [错误] {e}")
results[name] = PlatformStats(
platform=name, date=target_date,
total_revenue=0, total_orders=0, total_visitors=0,
conversion_rate=0, avg_order_value=0, refund_amount=0
)
return results
def generate_summary_report(self, daily_data: Dict[str, PlatformStats]) -> str:
"""生成汇总报告文本"""
total_revenue = sum(s.total_revenue for s in daily_data.values())
total_orders = sum(s.total_orders for s in daily_data.values())
lines = [
f"{'='*50}",
f"多渠道运营日报 - {date.today().strftime('%Y-%m-%d')}",
f"{'='*50}",
f"全平台汇总: 销售额 ¥{total_revenue:,.2f} | 订单 {total_orders}",
f"{'-'*50}",
f"{'平台':<10} {'销售额':<15} {'订单':<10} {'均价':<10}",
f"{'-'*50}"
]
for name, stats in daily_data.items():
lines.append(
f"{name:<10} ¥{stats.total_revenue:<10,.2f} "
f"{stats.total_orders:<10} ¥{stats.avg_order_value:<8,.2f}"
)
lines.extend([
f"{'-'*50}",
f"总计: 销售额 ¥{total_revenue:,.2f} | 订单 {total_orders} 笔",
f"平均客单价: ¥{total_revenue/total_orders:,.2f}" if total_orders > 0 else "",
f"{'='*50}"
])
return "\n".join(lines)
# ===== 使用示例 =====
if __name__ == "__main__":
# 注册各平台适配器
aggregator = MultiPlatformAggregator()
# 实际使用时替换为真实API密钥
aggregator.register_adapter(TaobaoAdapter("app_key_xxx", "secret_xxx"))
aggregator.register_adapter(DouyinAdapter("app_key_yyy", "secret_yyy"))
# aggregator.register_adapter(JDAdapter(...))
# aggregator.register_adapter(PinduoduoAdapter(...))
# 汇总昨日数据
yesterday = date.today() - timedelta(days=1)
print(f"\n汇总 {yesterday} 全平台数据...")
report = aggregator.aggregate_daily(yesterday)
# 生成汇总报告
summary = aggregator.generate_summary_report(report)
print(summary)
# 保存到文件
with open(f"运营日报_{yesterday.strftime('%Y%m%d')}.txt", "w", encoding="utf-8") as f:
f.write(summary)
# 输出到Excel
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = f"日报_{yesterday.strftime('%Y%m%d')}"
ws.append(["平台", "销售额", "订单数", "访客数", "转化率", "客单价", "退款金额"])
for stats in report.values():
ws.append([stats.platform, stats.total_revenue, stats.total_orders,
stats.total_visitors, stats.conversion_rate,
stats.avg_order_value, stats.refund_amount])
# 汇总行
ws.append(["合计",
sum(s.total_revenue for s in report.values()),
sum(s.total_orders for s in report.values()),
sum(s.total_visitors for s in report.values()),
None, None,
sum(s.refund_amount for s in report.values())])
wb.save(f"运营日报_{yesterday.strftime('%Y%m%d')}.xlsx")
print(f"Excel报表已生成")
9.3 案例总结与最佳实践
以上两个案例展示了综合办公自动化项目的完整构建过程,涵盖了从需求分析、架构设计、模块开发到部署运维的各个环节。从这些案例中可以总结出几条关键最佳实践:第一,统一数据模型是系统可扩展性的基础,无论对接多少数据源,内部都使用统一的数据结构;第二,适配器模式是处理多数据源/多平台集成的有效手段,新增数据源只需编写新的适配器,无需修改现有代码;第三,完善的日志和告警机制是系统可靠运行的保障,办公自动化系统通常无人值守运行,异常必须能自动发现并通知相关人员。
综合实战核心要点:
1. 项目启动前务必进行充分的需求分析,明确流程边界和异常处理策略
2. 采用模块化分层架构,数据层、业务层、展示层解耦
3. 统一数据模型 + 适配器模式 = 无限扩展的数据源接入能力
4. 模板与数据分离,一份模板可批量生成大量文档
5. 完善的配置管理、日志监控、异常告警是生产级系统的标配
6. 单元测试覆盖核心逻辑,集成测试验证端到端流程
7. 定时任务+命令行入口,适合Windows/Linux跨平台部署
8. 善用CI/CD自动化测试和部署,提升团队协作效率
9. 从MVP开始迭代,快速交付价值,持续收集反馈优化
10. 安全第一:涉及敏感数据的脱敏处理、权限控制和审计留痕