一、schedule概述
schedule是Python生态中最受欢迎的轻量级定时任务调度库之一,以其简洁的API设计和零外部依赖而著称。与Celery、APScheduler等重量级调度框架不同,schedule遵循"简单够用"的设计哲学,只需几行代码即可完成周期性任务的配置和运行。schedule内部基于轮询机制实现,在主线程中维护一个任务队列,每隔指定时间检查是否有任务到期执行,开发者无需关心底层的线程管理或时间计算逻辑。
schedule库的适用场景非常广泛,尤其适合中小规模的定时任务需求。在办公自动化领域,schedule常用于定时发送邮件、定期备份文件、周期性数据采集、定时生成报表等场景。它特别适合那些不需要分布式调度、任务数量有限(通常几十个以内)、对实时性要求不高的场景。相比之下,如果你的项目需要分布式任务调度、任务持久化、复杂cron表达式、毫秒级精度等高级特性,则需要考虑APScheduler或Celery等更强大的方案。
核心特点:
1. API极简:schedule.every(10).minutes.do(job) 即可定义一个任务
2. 零外部依赖:纯Python实现,安装即用
3. 友好的链式调用:支持自然语言风格的链式语法
4. 轻量级:整个库只有几百KB,内存占用极低
5. 跨平台:支持Windows、Linux、macOS全平台运行
schedule与其他常见调度方案的对比:APScheduler功能最全面,支持cron表达式和持久化存储,但配置复杂;Celery适合大规模分布式场景,但需要消息队列中间件(Redis/RabbitMQ)作为支撑;操作系统的定时任务(cron、Task Scheduler)适合系统级别的任务,但部署和管理不便;而schedule在易用性和功能丰富度之间取得了最佳平衡,是个人开发者和中小型项目的首选方案。
# 安装schedule库
pip install schedule
# 验证安装
python -c "import schedule; print(schedule.__version__)"
# schedule与APScheduler、Celery的对比示例
# schedule版本 - 3行搞定
import schedule
schedule.every(10).minutes.do(lambda: print("任务执行"))
# APScheduler版本 - 需要配置trigger和scheduler
# from apscheduler.schedulers.blocking import BlockingScheduler
# scheduler = BlockingScheduler()
# scheduler.add_job(my_job, 'interval', minutes=10)
# 完整的最小示例
import schedule
import time
def job():
print("定时任务执行中...")
schedule.every(3).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
二、基础调度
schedule库的核心调度能力围绕every()方法构建,通过链式调用组合时间单位和数量,再以do()方法绑定目标任务,形成一条完整的调度声明。这种设计模式被称为"流畅接口"(Fluent Interface),代码读起来就像自然语言:every(10).minutes.do(job) 的字面意思就是"每10分钟执行job"。除了基本的间隔调度,schedule还支持在特定时间点执行(at方法)、按星期几执行、以及通过标签对任务进行分组管理。
间隔调度是最常用的模式,支持的时间单位包括seconds(秒)、minutes(分钟)、hours(小时)、days(天)、weeks(周)。通过数字参数控制间隔长度,例如every(5).seconds表示每5秒执行一次。at()方法用于指定当天的具体执行时间点,格式为"HH:MM"或"HH:MM:SS",常用于固定时间的任务(如每天上午9点发送日报)。周几调度可以结合days参数实现:schedule.every().monday.do(job)表示每周一执行,也可以使用day_of_week参数配合at指定具体时间。链式调用让代码表达力极强,同时保持逻辑清晰。
# 间隔调度 - 多种时间单位
import schedule
import time
def job():
print("任务执行")
schedule.every(5).seconds.do(job)
schedule.every(2).minutes.do(job)
schedule.every(1).hour.do(job)
schedule.every(3).days.do(job)
schedule.every().monday.do(job)
while True:
schedule.run_pending()
time.sleep(1)
# 时间点调度 - 在特定时刻执行
import schedule
import time
def morning_job():
print("早上9点 - 发送日报邮件")
def noon_job():
print("中午12点 - 检查系统状态")
def evening_job():
print("晚上6点半 - 生成日终报表")
# 精确到分钟
schedule.every().day.at("09:00").do(morning_job)
# 精确到秒
schedule.every().day.at("12:00:30").do(noon_job)
# 也可以指定星期几的特定时间
schedule.every().monday.at("18:30").do(evening_job)
while True:
schedule.run_pending()
time.sleep(1)
# 链式调用与任务标签
import schedule
import time
def send_email():
print("发送邮件")
def backup_db():
print("备份数据库")
def cleanup_logs():
print("清理日志")
# 通过标签对任务分组管理
schedule.every(10).minutes.do(send_email).tag("email", "daily")
schedule.every().day.at("03:00").do(backup_db).tag("backup")
schedule.every().sunday.do(cleanup_logs).tag("maintenance")
# 获取所有标签为"daily"的任务
daily_tasks = schedule.get_jobs("daily")
print(f"每日任务数量: {len(daily_tasks)}")
while True:
schedule.run_pending()
time.sleep(1)
三、任务管理
在实际项目中,定时任务往往不是固定不变的,而是需要根据运行状态动态管理。schedule库提供了丰富的任务管理API,允许开发者在程序运行过程中灵活地添加、取消、暂停和恢复任务。任务管理的核心是引用任务对象——do()方法返回一个Job对象,我们可以将其保存到变量中,后续通过该变量对任务进行操作。如果不方便保存引用,也可以利用tag标签机制进行分组管理,通过标签批量操作一组相关联的任务。
schedule的任务管理功能覆盖了完整的任务生命周期。cancel_job()用于取消单个任务,清空任务列表可以用clear()方法(可选择按标签清空)。get_jobs()方法返回当前所有任务的列表,结合tag参数可以按标签过滤。对于需要暂停一段时间后再恢复的任务场景,可以通过将任务的next_run属性设置为一个较远的未来时间来实现软暂停,或者从任务列表中移除并在需要时重新添加。标签管理是schedule非常实用的特性——你可以为任务分配多个标签,然后通过标签名执行批量操作。
# 任务取消与清空
import schedule
import time
def my_job():
print("正在执行任务...")
def cancel_job_example():
global job1
schedule.cancel_job(job1)
print("任务已取消")
# 保存任务引用
job1 = schedule.every(3).seconds.do(my_job)
# 5秒后取消这个任务
schedule.every(5).seconds.do(cancel_job_example)
# 10秒后清空所有任务
schedule.every(10).seconds.do(lambda: schedule.clear())
start_time = time.time()
while time.time() - start_time < 15:
schedule.run_pending()
time.sleep(1)
# 通过标签批量管理任务
import schedule
import time
def job_a():
print("任务A执行")
def job_b():
print("任务B执行")
def job_c():
print("任务C执行")
schedule.every(2).seconds.do(job_a).tag("group1")
schedule.every(3).seconds.do(job_b).tag("group1", "important")
schedule.every(4).seconds.do(job_c).tag("group2")
def show_jobs():
print("\n=== 当前所有任务 ===")
for j in schedule.get_jobs():
print(f" 任务: {j.job_func}, 下次执行: {j.next_run}")
print(f"标签 'group1' 下的任务数: {len(schedule.get_jobs('group1'))}")
schedule.every(5).seconds.do(show_jobs)
# 10秒后清除group1的所有任务
schedule.every(10).seconds.do(lambda: schedule.clear("group1"))
start_time = time.time()
while time.time() - start_time < 15:
schedule.run_pending()
time.sleep(1)
# 任务暂停与恢复(通过设置next_run实现)
import schedule
import time
import datetime
def my_job():
print("任务执行中...")
job = schedule.every(2).seconds.do(my_job)
def pause_job():
# 把下次执行时间设到很远的未来
job.next_run = datetime.datetime.now() + datetime.timedelta(days=365)
print("任务已暂停(推迟1年)")
def resume_job():
# 立即执行一次并恢复周期
job.next_run = datetime.datetime.now()
print("任务已恢复")
# 5秒后暂停
schedule.every(5).seconds.do(pause_job)
# 10秒后恢复
schedule.every(10).seconds.do(resume_job)
start_time = time.time()
while time.time() - start_time < 15:
schedule.run_pending()
time.sleep(1)
四、任务传参与回调
在实际开发中,定时任务通常需要接受外部参数来驱动不同的行为逻辑。schedule库的do()方法支持向任务函数传递任意数量的位置参数和关键字参数,这在实现灵活的数据处理流程时非常有用。无论是向邮件发送函数传递收件人列表,还是向数据备份函数传递源路径和目标路径,参数传递机制都让同一个任务函数能够服务于多个不同的调度场景。此外,schedule支持多种形式的任务回调,包括普通函数、类方法、lambda表达式等,开发者可以根据实际需求选择最合适的编码风格。
任务传参的几种典型模式:传位置参数时只需在do()中依次传入即可;传关键字参数时使用key=value的语法;对于类方法调度,可以先实例化类,再将绑定方法传入do();lambda表达式适用于简单的匿名任务,但需注意lambda中变量捕获的闭包陷阱。在办公自动化实践中,传参机制尤其重要——例如同一个"发送报表"函数,可以通过参数区分发送给不同部门、使用不同模板,无需为每种场景单独定义函数。
# 带参数的任务
import schedule
import time
def send_notification(recipient, message, level="info"):
print(f"[{level}] 发送给 {recipient}: {message}")
# 向do()传递位置参数和关键字参数
schedule.every(3).seconds.do(send_notification, "admin@example.com", "系统运行正常")
schedule.every(5).seconds.do(send_notification, "ops@example.com", "需要关注", level="warning")
schedule.every(7).seconds.do(send_notification, "boss@example.com", "绩效报表已生成", level="important")
start_time = time.time()
while time.time() - start_time < 10:
schedule.run_pending()
time.sleep(1)
# 类方法调度与回调
import schedule
import time
class DataProcessor:
def __init__(self, source, target):
self.source = source
self.target = target
self.count = 0
def process(self):
self.count += 1
print(f"第{self.count}次处理: {self.source} → {self.target}")
def reset(self):
self.count = 0
print("计数器已重置")
# 实例化后调度类方法
processor = DataProcessor("input.csv", "output.db")
schedule.every(2).seconds.do(processor.process)
schedule.every(6).seconds.do(processor.reset)
start_time = time.time()
while time.time() - start_time < 10:
schedule.run_pending()
time.sleep(1)
# lambda表达式与闭包陷阱
import schedule
import time
# 正确方式:使用默认参数捕获当前值
for i in range(1, 4):
schedule.every(i * 2).seconds.do(
lambda x=i: print(f"任务{x}: 每{x*2}秒执行一次")
)
# 错误方式(闭包陷阱):所有任务都打印最后一个i的值
# for i in range(1, 4):
# schedule.every(i * 2).seconds.do(
# lambda: print(f"任务{i}: 每{i*2}秒执行一次")
# )
start_time = time.time()
while time.time() - start_time < 10:
schedule.run_pending()
time.sleep(1)
五、并行执行
schedule库的默认调度模式是单线程串行执行——所有任务在同一个主线程中排队运行。这意味着如果一个任务的执行时间超过了调度间隔,后续任务将会被阻塞,造成调度延迟和任务堆积。在实际的生产环境中,这种阻塞可能导致严重问题:一个耗时较长的数据备份任务,可能阻塞后续的邮件发送任务,导致邮件延迟数小时。为解决这个问题,schedule需要配合多线程技术实现真正的并行执行,让每个被调度任务的执行互不干扰。
实现并行执行最直接的方式是为每个任务开启独立线程。当任务被触发时,不是直接执行函数,而是将函数提交到一个线程池中异步执行。这种设计不仅解决了任务阻塞问题,还能有效控制并发数量,防止过多的线程消耗系统资源。Python标准库中的concurrent.futures.ThreadPoolExecutor是最理想的线程池实现——它提供了线程复用、任务队列、超时控制等高级功能。在实践中,通常会在while调度循环之外创建一个全局的线程池,所有任务都通过executor.submit()提交执行,主循环只负责检查任务是否到期。
# 多线程调度 - 基础方案
import schedule
import time
import threading
def long_running_job(name):
print(f"[{name}] 开始执行(将耗时5秒)")
time.sleep(5)
print(f"[{name}] 执行完毕")
def run_in_thread(job_func, *args, **kwargs):
thread = threading.Thread(target=job_func, args=args, kwargs=kwargs, daemon=True)
thread.start()
schedule.every(2).seconds.do(run_in_thread, long_running_job, "备份任务")
schedule.every(3).seconds.do(run_in_thread, long_running_job, "清理任务")
start_time = time.time()
while time.time() - start_time < 12:
schedule.run_pending()
time.sleep(1)
# 使用ThreadPoolExecutor整合
import schedule
import time
from concurrent.futures import ThreadPoolExecutor
def data_backup(source):
print(f"备份开始: {source}")
time.sleep(3)
print(f"备份完成: {source}")
return f"{source} 已备份"
def send_report(email):
print(f"发送报告至: {email}")
time.sleep(2)
print(f"报告已发送至: {email}")
# 创建全局线程池(最大4个并发线程)
executor = ThreadPoolExecutor(max_workers=4)
# 包装函数:将任务提交到线程池
def async_run(func, *args, **kwargs):
future = executor.submit(func, *args, **kwargs)
future.add_done_callback(
lambda f: print(f"异步任务完成: {f.result()}")
)
schedule.every(3).seconds.do(async_run, data_backup, "/data/mysql")
schedule.every(5).seconds.do(async_run, send_report, "admin@example.com")
start_time = time.time()
while time.time() - start_time < 15:
schedule.run_pending()
time.sleep(1)
executor.shutdown(wait=True)
# 自定义调度装饰器 - 并行执行优雅封装
import schedule
import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
def parallel(job_func):
def wrapper(*args, **kwargs):
executor.submit(job_func, *args, **kwargs)
return wrapper
@parallel
def heavy_task_a():
time.sleep(4)
print("重型任务A完成")
@parallel
def heavy_task_b():
time.sleep(3)
print("重型任务B完成")
schedule.every(2).seconds.do(heavy_task_a)
schedule.every(3).seconds.do(heavy_task_b)
start_time = time.time()
while time.time() - start_time < 10:
schedule.run_pending()
time.sleep(1)
executor.shutdown(wait=True)
六、异常处理
在定时任务的长期运行过程中,异常几乎是不可避免的——网络波动导致API调用失败、磁盘空间不足造成文件写入失败、数据格式变更引起解析异常等等。如果不对这些异常进行妥善处理,一个任务的崩溃可能导致整个调度主循环退出,所有定时任务都随之停止。因此,为schedule任务构建健壮的异常处理体系,是从"能用"到"可靠"的关键一步。异常处理的核心思路是"隔离与恢复":确保任何单个任务的异常不会扩散到其他任务或主循环,同时具备自动重试和失败通知的能力。
推荐的异常处理策略分为三个层次:第一层是任务级别的try/except捕获,每个任务函数内部自行处理预期内的异常;第二层是调度层级的装饰器封装,对所有任务统一包裹异常处理逻辑;第三层是重试机制,对临时性故障(如网络超时)自动重试指定次数。对于无法恢复的严重错误,应记录详细日志并通过邮件或即时消息通知运维人员。特别的,schedule主循环本身不提供内置的异常处理——必须由开发者自行实现这些保护措施。
# 任务级异常捕获与重试
import schedule
import time
import random
def retry(max_attempts=3, delay=1):
def decorator(func):
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
print(f"第{attempt}次尝试失败: {e}")
if attempt < max_attempts:
time.sleep(delay)
print(f"任务最终失败,已重试{max_attempts}次: {last_exception}")
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def unstable_api_call():
if random.random() < 0.7:
raise ConnectionError("网络连接失败")
print("API调用成功!")
schedule.every(3).seconds.do(unstable_api_call)
start_time = time.time()
while time.time() - start_time < 12:
schedule.run_pending()
time.sleep(1)
# 隔离执行 - 防止一个任务拖垮整个调度
import schedule
import time
import traceback
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger("schedule_app")
def safe_run(func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
error_msg = traceback.format_exc()
logger.error(f"任务 {func.__name__} 执行异常:\n{error_msg}")
# 通知管理员
print(f"⚠ 告警: 任务 {func.__name__} 失败, 请检查日志")
def faulty_job():
raise ValueError("数据处理异常")
def normal_job():
logger.info("正常任务运行中")
# 使用隔离执行
schedule.every(2).seconds.do(safe_run, faulty_job)
schedule.every(3).seconds.do(safe_run, normal_job)
start_time = time.time()
while time.time() - start_time < 10:
schedule.run_pending()
time.sleep(1)
# 失败通知邮件(整合异常处理)
import schedule
import time
import smtplib
from email.message import EmailMessage
import traceback
from datetime import datetime
def notify_failure(task_name, error_info):
msg = EmailMessage()
msg.set_content(f"任务: {task_name}\n时间: {datetime.now()}\n错误: {error_info}")
msg['Subject'] = f"[告警] 定时任务失败: {task_name}"
msg['From'] = "monitor@example.com"
msg['To'] = "admin@example.com"
try:
with smtplib.SMTP("smtp.example.com") as server:
server.send_message(msg)
print("告警邮件已发送")
except Exception as e:
print(f"发送告警邮件失败: {e}")
def robust_task_wrapper(func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
error_info = traceback.format_exc()
print(f"任务 {func.__name__} 失败: {e}")
notify_failure(func.__name__, error_info)
schedule.every(5).seconds.do(robust_task_wrapper, faulty_job)
start_time = time.time()
while time.time() - start_time < 12:
schedule.run_pending()
time.sleep(1)
七、日志与监控
定时任务系统一旦部署到生产环境,日志和监控就是运维人员了解系统运行状况的"眼睛"。没有完善的日志记录,当任务执行异常或未按预期运行时,排查问题将变得极为困难。schedule库本身不提供内置的日志功能,但我们可以通过Python标准库logging模块为其增加完善的日志记录能力。一个好的日志系统应该记录:任务何时开始执行、执行耗时多少、执行结果是否成功、如果失败则记录失败原因。这些信息对于后期的问题排查和性能优化至关重要。
监控方面,不仅要记录每次任务执行的日志,还需要构建宏观的运行统计。常见的监控指标包括:任务执行次数统计、平均执行时长、最长执行时长、失败率、调度延迟等。通过这些指标,可以及时发现系统性能下降的趋势或异常模式。可选的监控手段还包括:将关键指标暴露为Prometheus指标、通过Grafana构建可视化仪表盘、设置基于阈值的告警规则等。对于中小型项目,至少应该做到执行时间监控和任务健康检查这两个基本点。
# 任务执行日志系统
import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler("schedule_tasks.log", encoding="utf-8"),
logging.StreamHandler()
]
)
logger = logging.getLogger("TaskScheduler")
def logged_task(func):
def wrapper(*args, **kwargs):
task_name = func.__name__
start_time = time.time()
logger.info(f"▶ 任务 [{task_name}] 开始执行")
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
logger.info(f"✓ 任务 [{task_name}] 执行成功, 耗时: {elapsed:.2f}s")
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error(f"✗ 任务 [{task_name}] 执行失败, 耗时: {elapsed:.2f}s, 错误: {e}")
raise
return wrapper
@logged_task
def backup_task():
time.sleep(1)
print("备份完成")
@logged_task
def report_task():
time.sleep(2)
print("报表生成完成")
schedule.every(3).seconds.do(backup_task)
schedule.every(5).seconds.do(report_task)
start_time = time.time()
while time.time() - start_time < 12:
schedule.run_pending()
time.sleep(1)
# 执行时间监控与健康检查
import schedule
import time
from collections import defaultdict
from datetime import datetime, timedelta
class TaskMonitor:
def __init__(self):
self.stats = defaultdict(lambda: {
"count": 0, "errors": 0,
"total_time": 0, "max_time": 0
})
def monitor(self, func):
def wrapper(*args, **kwargs):
name = func.__name__
start = time.time()
try:
result = func(*args, **kwargs)
self.stats[name]["count"] += 1
except Exception as e:
self.stats[name]["errors"] += 1
raise
finally:
elapsed = time.time() - start
self.stats[name]["total_time"] += elapsed
self.stats[name]["max_time"] = max(
self.stats[name]["max_time"], elapsed
)
return result
return wrapper
def report(self):
print("\n=== 任务运行统计 ===")
for name, s in self.stats.items():
avg_time = s["total_time"] / s["count"] if s["count"] else 0
print(f" {name}: 执行{s['count']}次, "
f"失败{s['errors']}次, "
f"平均{avg_time:.2f}s, "
f"最慢{s['max_time']:.2f}s")
monitor = TaskMonitor()
@monitor.monitor
def healthy_task():
time.sleep(1)
print("健康任务执行")
@monitor.monitor
def unhealthy_task():
time.sleep(2)
if time.time() % 2 > 1:
raise RuntimeError("随机故障")
schedule.every(2).seconds.do(healthy_task)
schedule.every(4).seconds.do(unhealthy_task)
# 每10秒输出监控报告
schedule.every(10).seconds.do(monitor.report)
start_time = time.time()
while time.time() - start_time < 15:
schedule.run_pending()
time.sleep(1)
# 调度延迟监控
import schedule
import time
from datetime import datetime, timedelta
class SchedulerHealth:
def __init__(self):
self.last_run = datetime.now()
self.missed_deadlines = 0
def check(self):
now = datetime.now()
gap = (now - self.last_run).total_seconds()
self.last_run = now
if gap > 2.0: # 超过2秒说明调度存在延迟
self.missed_deadlines += 1
print(f"⚠ 调度延迟: 间隔{gap:.1f}s")
print(f"调度健康: 良好 (已累积{self.missed_deadlines}次延迟)")
scheduler_health = SchedulerHealth()
# 每个周期的常规检查和健康报告
schedule.every(1).seconds.do(scheduler_health.check)
start_time = time.time()
while time.time() - start_time < 6:
schedule.run_pending()
time.sleep(1)
八、schedule整合办公
schedule库在办公自动化领域的真正价值,体现在它能够与各种Python办公库无缝整合,构建全自动的日常工作流。将schedule的定时调度能力与smtplib(邮件)、shutil(文件操作)、openpyxl(Excel)、pandas(数据分析)、requests(网络请求)等库相结合,可以实现办公场景中绝大部分的重复性任务自动化。一个典型的自动化办公系统可以这样构建:schedule作为"心脏"负责定时触发,各种功能库作为"手脚"负责具体执行,logging模块作为"记忆"负责记录运行情况。
在实际部署中,需要注意几个关键点:办公脚本通常需要长时间连续运行,因此建议部署在服务器或专用的办公电脑上;任务执行结果的反馈机制很重要,失败时应能通过邮件或即时通讯工具通知相关人员;对于涉及文件读写的任务(如备份、报表生成),要确保目标目录存在且有写入权限,建议在任务开始前进行前置检查。以下展示了几个典型的办公自动化整合场景。
# 定时发送邮件
import schedule
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
SMTP_CONFIG = {
"host": "smtp.qq.com",
"port": 465,
"user": "your_email@qq.com",
"password": "your_auth_code"
}
def send_daily_report(recipients):
msg = MIMEMultipart()
msg["Subject"] = f"日报 - {datetime.now().strftime('%Y-%m-%d')}"
msg["From"] = SMTP_CONFIG["user"]
msg["To"] = ", ".join(recipients)
html_content = """
<h2>每日工作日报</h2>
<p>日期: {}</p>
<table border="1">
<tr><th>项目</th><th>进度</th></tr>
<tr><td>数据采集</td><td>100%</td></tr>
<tr><td>报表生成</td><td>完成</td></tr>
</table>
""".format(datetime.now().strftime('%Y-%m-%d %H:%M'))
msg.attach(MIMEText(html_content, "html", "utf-8"))
with smtplib.SMTP_SSL(SMTP_CONFIG["host"], SMTP_CONFIG["port"]) as server:
server.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"])
server.send_message(msg)
print(f"日报已发送至 {recipients}")
# 每个工作日早上9点发送日报
schedule.every().monday.at("09:00").do(
send_daily_report, ["manager@example.com", "team@example.com"]
)
schedule.every().tuesday.at("09:00").do(
send_daily_report, ["manager@example.com"]
)
# 定时备份文件
import schedule
import time
import shutil
import os
from datetime import datetime
def backup_folder(source_dir, backup_base_dir):
if not os.path.exists(source_dir):
print(f"源目录不存在: {source_dir}")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dirname = os.path.basename(source_dir)
backup_path = os.path.join(backup_base_dir, f"{dirname}_{timestamp}")
try:
shutil.copytree(source_dir, backup_path)
print(f"备份成功: {source_dir} → {backup_path}")
except Exception as e:
print(f"备份失败: {e}")
def cleanup_old_backups(backup_base_dir, max_backups=30):
backups = sorted([
os.path.join(backup_base_dir, d) for d in os.listdir(backup_base_dir)
if os.path.isdir(os.path.join(backup_base_dir, d))
], key=os.path.getmtime)
while len(backups) > max_backups:
old = backups.pop(0)
shutil.rmtree(old)
print(f"删除旧备份: {old}")
# 每小时备份一次(办公时间)
schedule.every().hour.do(backup_folder, "D:/重要文档", "D:/备份")
# 每天清理超过30个的旧备份
schedule.every().day.at("23:00").do(cleanup_old_backups, "D:/备份", 30)
# 定时生成Excel报表
import schedule
import time
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from datetime import datetime
def generate_sales_report(output_dir):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "销售报表"
# 表头样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="2E7D32", end_color="2E7D32", fill_type="solid")
# 写入表头
headers = ["日期", "产品", "销量", "金额", "备注"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
# 写入示例数据
sample_data = [
[datetime.now().strftime("%Y-%m-%d"), "产品A", 120, 36000, "-"],
[datetime.now().strftime("%Y-%m-%d"), "产品B", 85, 42500, "-"],
]
for row_idx, row_data in enumerate(sample_data, 2):
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=row_idx, column=col_idx, value=value)
# 自动调整列宽
for col in ws.columns:
max_length = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = max_length + 4
filename = f"销售报表_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
filepath = os.path.join(output_dir, filename)
wb.save(filepath)
print(f"报表已生成: {filepath}")
# 每天下午5点生成当天的销售报表
schedule.every().day.at("17:00").do(generate_sales_report, "D:/报表输出")
九、实战案例
前面八个章节分别介绍了schedule库的各个功能模块,本章节将这些知识综合起来,构建三个完整的办公自动化实战案例。每个案例都是一个可独立运行的Python脚本,涵盖从环境检查到任务执行再到结果通知的完整流程。这些案例可以直接部署使用,也可以作为模板进行修改扩展。案例设计遵循了几个原则:充分的异常处理、完善的日志记录、可配置的参数管理、以及友好的运行反馈。
第一个案例是每日自动发送日报邮件系统,整合了数据准备、模板渲染、SMTP发送和失败重试机制;第二个案例是每小时数据备份和每周报表生成系统,展示了多个schedule任务如何协同工作;第三个案例是定时文件夹清理工具,这是一个常被忽视但非常实用的自动化场景——日志文件和临时文件的堆积会逐渐消耗磁盘空间,自动清理可以防患于未然。建议读者在实际部署时,先在一台开发机上测试运行,确认所有功能正常后再迁移到正式环境。
# 案例1:每日自动发送日报邮件(完整版)
import schedule
import time
import smtplib
import logging
import json
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
from pathlib import Path
# 配置
CONFIG = {
"smtp": {"host": "smtp.qq.com", "port": 465, "user": "noreply@example.com", "pass": "auth_code"},
"recipients": ["manager@example.com", "team@example.com"],
"log_dir": Path("./logs"),
"max_retries": 3
}
# 日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger("DailyReport")
def prepare_daily_data():
logger.info("准备日报数据...")
return {
"date": datetime.now().strftime("%Y-%m-%d"),
"metrics": {"orders": 156, "revenue": 46800, "users": 89}
}
def render_template(data):
return f"""
<h2>{data['date']} 工作日报</h2>
<p>订单数: {data['metrics']['orders']}</p>
<p>营收额: ¥{data['metrics']['revenue']:,}</p>
<p>新增用户: {data['metrics']['users']}</p>
"""
def send_email(subject, html_body):
msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = CONFIG["smtp"]["user"]
msg["To"] = ", ".join(CONFIG["recipients"])
msg.attach(MIMEText(html_body, "html", "utf-8"))
with smtplib.SMTP_SSL(CONFIG["smtp"]["host"], CONFIG["smtp"]["port"]) as server:
server.login(CONFIG["smtp"]["user"], CONFIG["smtp"]["pass"])
server.send_message(msg)
def daily_report_job():
for attempt in range(CONFIG["max_retries"]):
try:
data = prepare_daily_data()
html = render_template(data)
send_email(f"日报 {data['date']}", html)
logger.info("日报发送成功")
return
except Exception as e:
logger.warning(f"第{attempt+1}次发送失败: {e}")
time.sleep(5)
logger.error("日报发送最终失败")
schedule.every().day.at("09:00").do(daily_report_job)
while True:
schedule.run_pending()
time.sleep(30)
# 案例2:每小时数据备份 + 每周报表生成(多任务协同)
import schedule
import time
import shutil
import os
import logging
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger("BackupSystem")
executor = ThreadPoolExecutor(max_workers=3)
BACKUP_CONFIG = {
"sources": ["D:/data/db", "D:/data/uploads"],
"dest": "D:/backups",
"retention_days": 30
}
def backup_single_source(source):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
basename = os.path.basename(source)
dest_path = os.path.join(BACKUP_CONFIG["dest"], f"{basename}_{timestamp}")
shutil.copytree(source, dest_path)
logger.info(f"已备份: {source} → {dest_path}")
def hourly_backup():
logger.info("开始执行每小时备份...")
for source in BACKUP_CONFIG["sources"]:
executor.submit(backup_single_source, source)
def weekly_report():
logger.info("生成每周备份报告...")
report_lines = [f"周报 - {datetime.now().strftime('%Y-W%W')}"]
for item in os.listdir(BACKUP_CONFIG["dest"]):
item_path = os.path.join(BACKUP_CONFIG["dest"], item)
mtime = datetime.fromtimestamp(os.path.getmtime(item_path))
report_lines.append(f" {item}: 修改于 {mtime}")
report_path = os.path.join(BACKUP_CONFIG["dest"], "backup_report.txt")
with open(report_path, "w", encoding="utf-8") as f:
f.write("\n".join(report_lines))
logger.info(f"周报已生成: {report_path}")
def cleanup_old_backups():
now = time.time()
cutoff = now - BACKUP_CONFIG["retention_days"] * 86400
for item in os.listdir(BACKUP_CONFIG["dest"]):
item_path = os.path.join(BACKUP_CONFIG["dest"], item)
if os.path.isdir(item_path) and os.path.getmtime(item_path) < cutoff:
shutil.rmtree(item_path)
logger.info(f"已清理过期备份: {item}")
# 调度配置
schedule.every(1).hour.do(hourly_backup)
schedule.every().sunday.at("10:00").do(weekly_report)
schedule.every().sunday.at("10:30").do(cleanup_old_backups)
logger.info("备份系统启动完成")
while True:
schedule.run_pending()
time.sleep(30)
# 案例3:定时文件夹清理工具
import schedule
import time
import os
import shutil
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(
level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.FileHandler("cleanup.log", encoding="utf-8"), logging.StreamHandler()]
)
logger = logging.getLogger("CleanupTool")
CLEANUP_RULES = [
{"dir": Path("D:/logs"), "pattern": "*.log", "max_age_days": 7},
{"dir": Path("D:/temp"), "pattern": "*", "max_age_days": 1},
{"dir": Path("D:/backups"), "pattern": "*", "max_age_days": 90},
]
def cleanup_directory(rule):
target_dir = rule["dir"]
if not target_dir.exists():
logger.warning(f"目录不存在, 跳过: {target_dir}")
return
cutoff = datetime.now() - timedelta(days=rule["max_age_days"])
deleted_count = 0
freed_bytes = 0
for item in target_dir.rglob(rule["pattern"]):
try:
mtime = datetime.fromtimestamp(item.stat().st_mtime)
if mtime < cutoff:
if item.is_file():
freed_bytes += item.stat().st_size
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
deleted_count += 1
except Exception as e:
logger.error(f"删除失败 {item}: {e}")
freed_mb = freed_bytes / 1024 / 1024
logger.info(f"清理完成 [{target_dir}]: 删除{deleted_count}项, 释放{freed_mb:.2f}MB")
def scheduled_cleanup():
logger.info("开始执行定时清理...")
for rule in CLEANUP_RULES:
try:
cleanup_directory(rule)
except Exception as e:
logger.error(f"清理规则执行失败 {rule['dir']}: {e}")
# 每天凌晨2点执行清理
schedule.every().day.at("02:00").do(scheduled_cleanup)
if __name__ == "__main__":
logger.info("文件夹清理已启动")
while True:
schedule.run_pending()
time.sleep(60)