schedule库:轻量级定时任务

Python 办公自动化专题 · 简单优雅的Python定时任务方案

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, schedule库, 定时任务, 任务调度, 定时器, 周期性任务, Python自动化

一、schedule概述

schedule是Python生态中最受欢迎的轻量级定时任务调度库之一,以其简洁的API设计和零外部依赖而著称。与Celery、APScheduler等重量级调度框架不同,schedule遵循"简单够用"的设计哲学,只需几行代码即可完成周期性任务的配置和运行。schedule内部基于轮询机制实现,在主线程中维护一个任务队列,每隔指定时间检查是否有任务到期执行,开发者无需关心底层的线程管理或时间计算逻辑。

schedule库的适用场景非常广泛,尤其适合中小规模的定时任务需求。在办公自动化领域,schedule常用于定时发送邮件、定期备份文件、周期性数据采集、定时生成报表等场景。它特别适合那些不需要分布式调度、任务数量有限(通常几十个以内)、对实时性要求不高的场景。相比之下,如果你的项目需要分布式任务调度、任务持久化、复杂cron表达式、毫秒级精度等高级特性,则需要考虑APScheduler或Celery等更强大的方案。

核心特点:

1. API极简:schedule.every(10).minutes.do(job) 即可定义一个任务

2. 零外部依赖:纯Python实现,安装即用

3. 友好的链式调用:支持自然语言风格的链式语法

4. 轻量级:整个库只有几百KB,内存占用极低

5. 跨平台:支持Windows、Linux、macOS全平台运行

schedule与其他常见调度方案的对比:APScheduler功能最全面,支持cron表达式和持久化存储,但配置复杂;Celery适合大规模分布式场景,但需要消息队列中间件(Redis/RabbitMQ)作为支撑;操作系统的定时任务(cron、Task Scheduler)适合系统级别的任务,但部署和管理不便;而schedule在易用性和功能丰富度之间取得了最佳平衡,是个人开发者和中小型项目的首选方案。

# 安装schedule库 pip install schedule # 验证安装 python -c "import schedule; print(schedule.__version__)"
# schedule与APScheduler、Celery的对比示例 # schedule版本 - 3行搞定 import schedule schedule.every(10).minutes.do(lambda: print("任务执行")) # APScheduler版本 - 需要配置trigger和scheduler # from apscheduler.schedulers.blocking import BlockingScheduler # scheduler = BlockingScheduler() # scheduler.add_job(my_job, 'interval', minutes=10)
# 完整的最小示例 import schedule import time def job(): print("定时任务执行中...") schedule.every(3).seconds.do(job) while True: schedule.run_pending() time.sleep(1)

二、基础调度

schedule库的核心调度能力围绕every()方法构建,通过链式调用组合时间单位和数量,再以do()方法绑定目标任务,形成一条完整的调度声明。这种设计模式被称为"流畅接口"(Fluent Interface),代码读起来就像自然语言:every(10).minutes.do(job) 的字面意思就是"每10分钟执行job"。除了基本的间隔调度,schedule还支持在特定时间点执行(at方法)、按星期几执行、以及通过标签对任务进行分组管理。

间隔调度是最常用的模式,支持的时间单位包括seconds(秒)、minutes(分钟)、hours(小时)、days(天)、weeks(周)。通过数字参数控制间隔长度,例如every(5).seconds表示每5秒执行一次。at()方法用于指定当天的具体执行时间点,格式为"HH:MM"或"HH:MM:SS",常用于固定时间的任务(如每天上午9点发送日报)。周几调度可以结合days参数实现:schedule.every().monday.do(job)表示每周一执行,也可以使用day_of_week参数配合at指定具体时间。链式调用让代码表达力极强,同时保持逻辑清晰。

# 间隔调度 - 多种时间单位 import schedule import time def job(): print("任务执行") schedule.every(5).seconds.do(job) schedule.every(2).minutes.do(job) schedule.every(1).hour.do(job) schedule.every(3).days.do(job) schedule.every().monday.do(job) while True: schedule.run_pending() time.sleep(1)
# 时间点调度 - 在特定时刻执行 import schedule import time def morning_job(): print("早上9点 - 发送日报邮件") def noon_job(): print("中午12点 - 检查系统状态") def evening_job(): print("晚上6点半 - 生成日终报表") # 精确到分钟 schedule.every().day.at("09:00").do(morning_job) # 精确到秒 schedule.every().day.at("12:00:30").do(noon_job) # 也可以指定星期几的特定时间 schedule.every().monday.at("18:30").do(evening_job) while True: schedule.run_pending() time.sleep(1)
# 链式调用与任务标签 import schedule import time def send_email(): print("发送邮件") def backup_db(): print("备份数据库") def cleanup_logs(): print("清理日志") # 通过标签对任务分组管理 schedule.every(10).minutes.do(send_email).tag("email", "daily") schedule.every().day.at("03:00").do(backup_db).tag("backup") schedule.every().sunday.do(cleanup_logs).tag("maintenance") # 获取所有标签为"daily"的任务 daily_tasks = schedule.get_jobs("daily") print(f"每日任务数量: {len(daily_tasks)}") while True: schedule.run_pending() time.sleep(1)

三、任务管理

在实际项目中,定时任务往往不是固定不变的,而是需要根据运行状态动态管理。schedule库提供了丰富的任务管理API,允许开发者在程序运行过程中灵活地添加、取消、暂停和恢复任务。任务管理的核心是引用任务对象——do()方法返回一个Job对象,我们可以将其保存到变量中,后续通过该变量对任务进行操作。如果不方便保存引用,也可以利用tag标签机制进行分组管理,通过标签批量操作一组相关联的任务。

schedule的任务管理功能覆盖了完整的任务生命周期。cancel_job()用于取消单个任务,清空任务列表可以用clear()方法(可选择按标签清空)。get_jobs()方法返回当前所有任务的列表,结合tag参数可以按标签过滤。对于需要暂停一段时间后再恢复的任务场景,可以通过将任务的next_run属性设置为一个较远的未来时间来实现软暂停,或者从任务列表中移除并在需要时重新添加。标签管理是schedule非常实用的特性——你可以为任务分配多个标签,然后通过标签名执行批量操作。

# 任务取消与清空 import schedule import time def my_job(): print("正在执行任务...") def cancel_job_example(): global job1 schedule.cancel_job(job1) print("任务已取消") # 保存任务引用 job1 = schedule.every(3).seconds.do(my_job) # 5秒后取消这个任务 schedule.every(5).seconds.do(cancel_job_example) # 10秒后清空所有任务 schedule.every(10).seconds.do(lambda: schedule.clear()) start_time = time.time() while time.time() - start_time < 15: schedule.run_pending() time.sleep(1)
# 通过标签批量管理任务 import schedule import time def job_a(): print("任务A执行") def job_b(): print("任务B执行") def job_c(): print("任务C执行") schedule.every(2).seconds.do(job_a).tag("group1") schedule.every(3).seconds.do(job_b).tag("group1", "important") schedule.every(4).seconds.do(job_c).tag("group2") def show_jobs(): print("\n=== 当前所有任务 ===") for j in schedule.get_jobs(): print(f" 任务: {j.job_func}, 下次执行: {j.next_run}") print(f"标签 'group1' 下的任务数: {len(schedule.get_jobs('group1'))}") schedule.every(5).seconds.do(show_jobs) # 10秒后清除group1的所有任务 schedule.every(10).seconds.do(lambda: schedule.clear("group1")) start_time = time.time() while time.time() - start_time < 15: schedule.run_pending() time.sleep(1)
# 任务暂停与恢复(通过设置next_run实现) import schedule import time import datetime def my_job(): print("任务执行中...") job = schedule.every(2).seconds.do(my_job) def pause_job(): # 把下次执行时间设到很远的未来 job.next_run = datetime.datetime.now() + datetime.timedelta(days=365) print("任务已暂停(推迟1年)") def resume_job(): # 立即执行一次并恢复周期 job.next_run = datetime.datetime.now() print("任务已恢复") # 5秒后暂停 schedule.every(5).seconds.do(pause_job) # 10秒后恢复 schedule.every(10).seconds.do(resume_job) start_time = time.time() while time.time() - start_time < 15: schedule.run_pending() time.sleep(1)

四、任务传参与回调

在实际开发中,定时任务通常需要接受外部参数来驱动不同的行为逻辑。schedule库的do()方法支持向任务函数传递任意数量的位置参数和关键字参数,这在实现灵活的数据处理流程时非常有用。无论是向邮件发送函数传递收件人列表,还是向数据备份函数传递源路径和目标路径,参数传递机制都让同一个任务函数能够服务于多个不同的调度场景。此外,schedule支持多种形式的任务回调,包括普通函数、类方法、lambda表达式等,开发者可以根据实际需求选择最合适的编码风格。

任务传参的几种典型模式:传位置参数时只需在do()中依次传入即可;传关键字参数时使用key=value的语法;对于类方法调度,可以先实例化类,再将绑定方法传入do();lambda表达式适用于简单的匿名任务,但需注意lambda中变量捕获的闭包陷阱。在办公自动化实践中,传参机制尤其重要——例如同一个"发送报表"函数,可以通过参数区分发送给不同部门、使用不同模板,无需为每种场景单独定义函数。

# 带参数的任务 import schedule import time def send_notification(recipient, message, level="info"): print(f"[{level}] 发送给 {recipient}: {message}") # 向do()传递位置参数和关键字参数 schedule.every(3).seconds.do(send_notification, "admin@example.com", "系统运行正常") schedule.every(5).seconds.do(send_notification, "ops@example.com", "需要关注", level="warning") schedule.every(7).seconds.do(send_notification, "boss@example.com", "绩效报表已生成", level="important") start_time = time.time() while time.time() - start_time < 10: schedule.run_pending() time.sleep(1)
# 类方法调度与回调 import schedule import time class DataProcessor: def __init__(self, source, target): self.source = source self.target = target self.count = 0 def process(self): self.count += 1 print(f"第{self.count}次处理: {self.source} → {self.target}") def reset(self): self.count = 0 print("计数器已重置") # 实例化后调度类方法 processor = DataProcessor("input.csv", "output.db") schedule.every(2).seconds.do(processor.process) schedule.every(6).seconds.do(processor.reset) start_time = time.time() while time.time() - start_time < 10: schedule.run_pending() time.sleep(1)
# lambda表达式与闭包陷阱 import schedule import time # 正确方式:使用默认参数捕获当前值 for i in range(1, 4): schedule.every(i * 2).seconds.do( lambda x=i: print(f"任务{x}: 每{x*2}秒执行一次") ) # 错误方式(闭包陷阱):所有任务都打印最后一个i的值 # for i in range(1, 4): # schedule.every(i * 2).seconds.do( # lambda: print(f"任务{i}: 每{i*2}秒执行一次") # ) start_time = time.time() while time.time() - start_time < 10: schedule.run_pending() time.sleep(1)

五、并行执行

schedule库的默认调度模式是单线程串行执行——所有任务在同一个主线程中排队运行。这意味着如果一个任务的执行时间超过了调度间隔,后续任务将会被阻塞,造成调度延迟和任务堆积。在实际的生产环境中,这种阻塞可能导致严重问题:一个耗时较长的数据备份任务,可能阻塞后续的邮件发送任务,导致邮件延迟数小时。为解决这个问题,schedule需要配合多线程技术实现真正的并行执行,让每个被调度任务的执行互不干扰。

实现并行执行最直接的方式是为每个任务开启独立线程。当任务被触发时,不是直接执行函数,而是将函数提交到一个线程池中异步执行。这种设计不仅解决了任务阻塞问题,还能有效控制并发数量,防止过多的线程消耗系统资源。Python标准库中的concurrent.futures.ThreadPoolExecutor是最理想的线程池实现——它提供了线程复用、任务队列、超时控制等高级功能。在实践中,通常会在while调度循环之外创建一个全局的线程池,所有任务都通过executor.submit()提交执行,主循环只负责检查任务是否到期。

# 多线程调度 - 基础方案 import schedule import time import threading def long_running_job(name): print(f"[{name}] 开始执行(将耗时5秒)") time.sleep(5) print(f"[{name}] 执行完毕") def run_in_thread(job_func, *args, **kwargs): thread = threading.Thread(target=job_func, args=args, kwargs=kwargs, daemon=True) thread.start() schedule.every(2).seconds.do(run_in_thread, long_running_job, "备份任务") schedule.every(3).seconds.do(run_in_thread, long_running_job, "清理任务") start_time = time.time() while time.time() - start_time < 12: schedule.run_pending() time.sleep(1)
# 使用ThreadPoolExecutor整合 import schedule import time from concurrent.futures import ThreadPoolExecutor def data_backup(source): print(f"备份开始: {source}") time.sleep(3) print(f"备份完成: {source}") return f"{source} 已备份" def send_report(email): print(f"发送报告至: {email}") time.sleep(2) print(f"报告已发送至: {email}") # 创建全局线程池(最大4个并发线程) executor = ThreadPoolExecutor(max_workers=4) # 包装函数:将任务提交到线程池 def async_run(func, *args, **kwargs): future = executor.submit(func, *args, **kwargs) future.add_done_callback( lambda f: print(f"异步任务完成: {f.result()}") ) schedule.every(3).seconds.do(async_run, data_backup, "/data/mysql") schedule.every(5).seconds.do(async_run, send_report, "admin@example.com") start_time = time.time() while time.time() - start_time < 15: schedule.run_pending() time.sleep(1) executor.shutdown(wait=True)
# 自定义调度装饰器 - 并行执行优雅封装 import schedule import time from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=4) def parallel(job_func): def wrapper(*args, **kwargs): executor.submit(job_func, *args, **kwargs) return wrapper @parallel def heavy_task_a(): time.sleep(4) print("重型任务A完成") @parallel def heavy_task_b(): time.sleep(3) print("重型任务B完成") schedule.every(2).seconds.do(heavy_task_a) schedule.every(3).seconds.do(heavy_task_b) start_time = time.time() while time.time() - start_time < 10: schedule.run_pending() time.sleep(1) executor.shutdown(wait=True)

六、异常处理

在定时任务的长期运行过程中,异常几乎是不可避免的——网络波动导致API调用失败、磁盘空间不足造成文件写入失败、数据格式变更引起解析异常等等。如果不对这些异常进行妥善处理,一个任务的崩溃可能导致整个调度主循环退出,所有定时任务都随之停止。因此,为schedule任务构建健壮的异常处理体系,是从"能用"到"可靠"的关键一步。异常处理的核心思路是"隔离与恢复":确保任何单个任务的异常不会扩散到其他任务或主循环,同时具备自动重试和失败通知的能力。

推荐的异常处理策略分为三个层次:第一层是任务级别的try/except捕获,每个任务函数内部自行处理预期内的异常;第二层是调度层级的装饰器封装,对所有任务统一包裹异常处理逻辑;第三层是重试机制,对临时性故障(如网络超时)自动重试指定次数。对于无法恢复的严重错误,应记录详细日志并通过邮件或即时消息通知运维人员。特别的,schedule主循环本身不提供内置的异常处理——必须由开发者自行实现这些保护措施。

# 任务级异常捕获与重试 import schedule import time import random def retry(max_attempts=3, delay=1): def decorator(func): def wrapper(*args, **kwargs): last_exception = None for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except Exception as e: last_exception = e print(f"第{attempt}次尝试失败: {e}") if attempt < max_attempts: time.sleep(delay) print(f"任务最终失败,已重试{max_attempts}次: {last_exception}") raise last_exception return wrapper return decorator @retry(max_attempts=3, delay=2) def unstable_api_call(): if random.random() < 0.7: raise ConnectionError("网络连接失败") print("API调用成功!") schedule.every(3).seconds.do(unstable_api_call) start_time = time.time() while time.time() - start_time < 12: schedule.run_pending() time.sleep(1)
# 隔离执行 - 防止一个任务拖垮整个调度 import schedule import time import traceback import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger("schedule_app") def safe_run(func, *args, **kwargs): try: func(*args, **kwargs) except Exception as e: error_msg = traceback.format_exc() logger.error(f"任务 {func.__name__} 执行异常:\n{error_msg}") # 通知管理员 print(f"⚠ 告警: 任务 {func.__name__} 失败, 请检查日志") def faulty_job(): raise ValueError("数据处理异常") def normal_job(): logger.info("正常任务运行中") # 使用隔离执行 schedule.every(2).seconds.do(safe_run, faulty_job) schedule.every(3).seconds.do(safe_run, normal_job) start_time = time.time() while time.time() - start_time < 10: schedule.run_pending() time.sleep(1)
# 失败通知邮件(整合异常处理) import schedule import time import smtplib from email.message import EmailMessage import traceback from datetime import datetime def notify_failure(task_name, error_info): msg = EmailMessage() msg.set_content(f"任务: {task_name}\n时间: {datetime.now()}\n错误: {error_info}") msg['Subject'] = f"[告警] 定时任务失败: {task_name}" msg['From'] = "monitor@example.com" msg['To'] = "admin@example.com" try: with smtplib.SMTP("smtp.example.com") as server: server.send_message(msg) print("告警邮件已发送") except Exception as e: print(f"发送告警邮件失败: {e}") def robust_task_wrapper(func, *args, **kwargs): try: func(*args, **kwargs) except Exception as e: error_info = traceback.format_exc() print(f"任务 {func.__name__} 失败: {e}") notify_failure(func.__name__, error_info) schedule.every(5).seconds.do(robust_task_wrapper, faulty_job) start_time = time.time() while time.time() - start_time < 12: schedule.run_pending() time.sleep(1)

七、日志与监控

定时任务系统一旦部署到生产环境,日志和监控就是运维人员了解系统运行状况的"眼睛"。没有完善的日志记录,当任务执行异常或未按预期运行时,排查问题将变得极为困难。schedule库本身不提供内置的日志功能,但我们可以通过Python标准库logging模块为其增加完善的日志记录能力。一个好的日志系统应该记录:任务何时开始执行、执行耗时多少、执行结果是否成功、如果失败则记录失败原因。这些信息对于后期的问题排查和性能优化至关重要。

监控方面,不仅要记录每次任务执行的日志,还需要构建宏观的运行统计。常见的监控指标包括:任务执行次数统计、平均执行时长、最长执行时长、失败率、调度延迟等。通过这些指标,可以及时发现系统性能下降的趋势或异常模式。可选的监控手段还包括:将关键指标暴露为Prometheus指标、通过Grafana构建可视化仪表盘、设置基于阈值的告警规则等。对于中小型项目,至少应该做到执行时间监控和任务健康检查这两个基本点。

# 任务执行日志系统 import schedule import time import logging from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler("schedule_tasks.log", encoding="utf-8"), logging.StreamHandler() ] ) logger = logging.getLogger("TaskScheduler") def logged_task(func): def wrapper(*args, **kwargs): task_name = func.__name__ start_time = time.time() logger.info(f"▶ 任务 [{task_name}] 开始执行") try: result = func(*args, **kwargs) elapsed = time.time() - start_time logger.info(f"✓ 任务 [{task_name}] 执行成功, 耗时: {elapsed:.2f}s") return result except Exception as e: elapsed = time.time() - start_time logger.error(f"✗ 任务 [{task_name}] 执行失败, 耗时: {elapsed:.2f}s, 错误: {e}") raise return wrapper @logged_task def backup_task(): time.sleep(1) print("备份完成") @logged_task def report_task(): time.sleep(2) print("报表生成完成") schedule.every(3).seconds.do(backup_task) schedule.every(5).seconds.do(report_task) start_time = time.time() while time.time() - start_time < 12: schedule.run_pending() time.sleep(1)
# 执行时间监控与健康检查 import schedule import time from collections import defaultdict from datetime import datetime, timedelta class TaskMonitor: def __init__(self): self.stats = defaultdict(lambda: { "count": 0, "errors": 0, "total_time": 0, "max_time": 0 }) def monitor(self, func): def wrapper(*args, **kwargs): name = func.__name__ start = time.time() try: result = func(*args, **kwargs) self.stats[name]["count"] += 1 except Exception as e: self.stats[name]["errors"] += 1 raise finally: elapsed = time.time() - start self.stats[name]["total_time"] += elapsed self.stats[name]["max_time"] = max( self.stats[name]["max_time"], elapsed ) return result return wrapper def report(self): print("\n=== 任务运行统计 ===") for name, s in self.stats.items(): avg_time = s["total_time"] / s["count"] if s["count"] else 0 print(f" {name}: 执行{s['count']}次, " f"失败{s['errors']}次, " f"平均{avg_time:.2f}s, " f"最慢{s['max_time']:.2f}s") monitor = TaskMonitor() @monitor.monitor def healthy_task(): time.sleep(1) print("健康任务执行") @monitor.monitor def unhealthy_task(): time.sleep(2) if time.time() % 2 > 1: raise RuntimeError("随机故障") schedule.every(2).seconds.do(healthy_task) schedule.every(4).seconds.do(unhealthy_task) # 每10秒输出监控报告 schedule.every(10).seconds.do(monitor.report) start_time = time.time() while time.time() - start_time < 15: schedule.run_pending() time.sleep(1)
# 调度延迟监控 import schedule import time from datetime import datetime, timedelta class SchedulerHealth: def __init__(self): self.last_run = datetime.now() self.missed_deadlines = 0 def check(self): now = datetime.now() gap = (now - self.last_run).total_seconds() self.last_run = now if gap > 2.0: # 超过2秒说明调度存在延迟 self.missed_deadlines += 1 print(f"⚠ 调度延迟: 间隔{gap:.1f}s") print(f"调度健康: 良好 (已累积{self.missed_deadlines}次延迟)") scheduler_health = SchedulerHealth() # 每个周期的常规检查和健康报告 schedule.every(1).seconds.do(scheduler_health.check) start_time = time.time() while time.time() - start_time < 6: schedule.run_pending() time.sleep(1)

八、schedule整合办公

schedule库在办公自动化领域的真正价值,体现在它能够与各种Python办公库无缝整合,构建全自动的日常工作流。将schedule的定时调度能力与smtplib(邮件)、shutil(文件操作)、openpyxl(Excel)、pandas(数据分析)、requests(网络请求)等库相结合,可以实现办公场景中绝大部分的重复性任务自动化。一个典型的自动化办公系统可以这样构建:schedule作为"心脏"负责定时触发,各种功能库作为"手脚"负责具体执行,logging模块作为"记忆"负责记录运行情况。

在实际部署中,需要注意几个关键点:办公脚本通常需要长时间连续运行,因此建议部署在服务器或专用的办公电脑上;任务执行结果的反馈机制很重要,失败时应能通过邮件或即时通讯工具通知相关人员;对于涉及文件读写的任务(如备份、报表生成),要确保目标目录存在且有写入权限,建议在任务开始前进行前置检查。以下展示了几个典型的办公自动化整合场景。

# 定时发送邮件 import schedule import time import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime SMTP_CONFIG = { "host": "smtp.qq.com", "port": 465, "user": "your_email@qq.com", "password": "your_auth_code" } def send_daily_report(recipients): msg = MIMEMultipart() msg["Subject"] = f"日报 - {datetime.now().strftime('%Y-%m-%d')}" msg["From"] = SMTP_CONFIG["user"] msg["To"] = ", ".join(recipients) html_content = """ <h2>每日工作日报</h2> <p>日期: {}</p> <table border="1"> <tr><th>项目</th><th>进度</th></tr> <tr><td>数据采集</td><td>100%</td></tr> <tr><td>报表生成</td><td>完成</td></tr> </table> """.format(datetime.now().strftime('%Y-%m-%d %H:%M')) msg.attach(MIMEText(html_content, "html", "utf-8")) with smtplib.SMTP_SSL(SMTP_CONFIG["host"], SMTP_CONFIG["port"]) as server: server.login(SMTP_CONFIG["user"], SMTP_CONFIG["password"]) server.send_message(msg) print(f"日报已发送至 {recipients}") # 每个工作日早上9点发送日报 schedule.every().monday.at("09:00").do( send_daily_report, ["manager@example.com", "team@example.com"] ) schedule.every().tuesday.at("09:00").do( send_daily_report, ["manager@example.com"] )
# 定时备份文件 import schedule import time import shutil import os from datetime import datetime def backup_folder(source_dir, backup_base_dir): if not os.path.exists(source_dir): print(f"源目录不存在: {source_dir}") return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dirname = os.path.basename(source_dir) backup_path = os.path.join(backup_base_dir, f"{dirname}_{timestamp}") try: shutil.copytree(source_dir, backup_path) print(f"备份成功: {source_dir} → {backup_path}") except Exception as e: print(f"备份失败: {e}") def cleanup_old_backups(backup_base_dir, max_backups=30): backups = sorted([ os.path.join(backup_base_dir, d) for d in os.listdir(backup_base_dir) if os.path.isdir(os.path.join(backup_base_dir, d)) ], key=os.path.getmtime) while len(backups) > max_backups: old = backups.pop(0) shutil.rmtree(old) print(f"删除旧备份: {old}") # 每小时备份一次(办公时间) schedule.every().hour.do(backup_folder, "D:/重要文档", "D:/备份") # 每天清理超过30个的旧备份 schedule.every().day.at("23:00").do(cleanup_old_backups, "D:/备份", 30)
# 定时生成Excel报表 import schedule import time import openpyxl from openpyxl.styles import Font, PatternFill, Alignment from datetime import datetime def generate_sales_report(output_dir): wb = openpyxl.Workbook() ws = wb.active ws.title = "销售报表" # 表头样式 header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill(start_color="2E7D32", end_color="2E7D32", fill_type="solid") # 写入表头 headers = ["日期", "产品", "销量", "金额", "备注"] for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") # 写入示例数据 sample_data = [ [datetime.now().strftime("%Y-%m-%d"), "产品A", 120, 36000, "-"], [datetime.now().strftime("%Y-%m-%d"), "产品B", 85, 42500, "-"], ] for row_idx, row_data in enumerate(sample_data, 2): for col_idx, value in enumerate(row_data, 1): ws.cell(row=row_idx, column=col_idx, value=value) # 自动调整列宽 for col in ws.columns: max_length = max(len(str(cell.value or "")) for cell in col) ws.column_dimensions[col[0].column_letter].width = max_length + 4 filename = f"销售报表_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" filepath = os.path.join(output_dir, filename) wb.save(filepath) print(f"报表已生成: {filepath}") # 每天下午5点生成当天的销售报表 schedule.every().day.at("17:00").do(generate_sales_report, "D:/报表输出")

九、实战案例

前面八个章节分别介绍了schedule库的各个功能模块,本章节将这些知识综合起来,构建三个完整的办公自动化实战案例。每个案例都是一个可独立运行的Python脚本,涵盖从环境检查到任务执行再到结果通知的完整流程。这些案例可以直接部署使用,也可以作为模板进行修改扩展。案例设计遵循了几个原则:充分的异常处理、完善的日志记录、可配置的参数管理、以及友好的运行反馈。

第一个案例是每日自动发送日报邮件系统,整合了数据准备、模板渲染、SMTP发送和失败重试机制;第二个案例是每小时数据备份和每周报表生成系统,展示了多个schedule任务如何协同工作;第三个案例是定时文件夹清理工具,这是一个常被忽视但非常实用的自动化场景——日志文件和临时文件的堆积会逐渐消耗磁盘空间,自动清理可以防患于未然。建议读者在实际部署时,先在一台开发机上测试运行,确认所有功能正常后再迁移到正式环境。

# 案例1:每日自动发送日报邮件(完整版) import schedule import time import smtplib import logging import json import os from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime from pathlib import Path # 配置 CONFIG = { "smtp": {"host": "smtp.qq.com", "port": 465, "user": "noreply@example.com", "pass": "auth_code"}, "recipients": ["manager@example.com", "team@example.com"], "log_dir": Path("./logs"), "max_retries": 3 } # 日志配置 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger("DailyReport") def prepare_daily_data(): logger.info("准备日报数据...") return { "date": datetime.now().strftime("%Y-%m-%d"), "metrics": {"orders": 156, "revenue": 46800, "users": 89} } def render_template(data): return f""" <h2>{data['date']} 工作日报</h2> <p>订单数: {data['metrics']['orders']}</p> <p>营收额: ¥{data['metrics']['revenue']:,}</p> <p>新增用户: {data['metrics']['users']}</p> """ def send_email(subject, html_body): msg = MIMEMultipart() msg["Subject"] = subject msg["From"] = CONFIG["smtp"]["user"] msg["To"] = ", ".join(CONFIG["recipients"]) msg.attach(MIMEText(html_body, "html", "utf-8")) with smtplib.SMTP_SSL(CONFIG["smtp"]["host"], CONFIG["smtp"]["port"]) as server: server.login(CONFIG["smtp"]["user"], CONFIG["smtp"]["pass"]) server.send_message(msg) def daily_report_job(): for attempt in range(CONFIG["max_retries"]): try: data = prepare_daily_data() html = render_template(data) send_email(f"日报 {data['date']}", html) logger.info("日报发送成功") return except Exception as e: logger.warning(f"第{attempt+1}次发送失败: {e}") time.sleep(5) logger.error("日报发送最终失败") schedule.every().day.at("09:00").do(daily_report_job) while True: schedule.run_pending() time.sleep(30)
# 案例2:每小时数据备份 + 每周报表生成(多任务协同) import schedule import time import shutil import os import logging from datetime import datetime from concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger("BackupSystem") executor = ThreadPoolExecutor(max_workers=3) BACKUP_CONFIG = { "sources": ["D:/data/db", "D:/data/uploads"], "dest": "D:/backups", "retention_days": 30 } def backup_single_source(source): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") basename = os.path.basename(source) dest_path = os.path.join(BACKUP_CONFIG["dest"], f"{basename}_{timestamp}") shutil.copytree(source, dest_path) logger.info(f"已备份: {source} → {dest_path}") def hourly_backup(): logger.info("开始执行每小时备份...") for source in BACKUP_CONFIG["sources"]: executor.submit(backup_single_source, source) def weekly_report(): logger.info("生成每周备份报告...") report_lines = [f"周报 - {datetime.now().strftime('%Y-W%W')}"] for item in os.listdir(BACKUP_CONFIG["dest"]): item_path = os.path.join(BACKUP_CONFIG["dest"], item) mtime = datetime.fromtimestamp(os.path.getmtime(item_path)) report_lines.append(f" {item}: 修改于 {mtime}") report_path = os.path.join(BACKUP_CONFIG["dest"], "backup_report.txt") with open(report_path, "w", encoding="utf-8") as f: f.write("\n".join(report_lines)) logger.info(f"周报已生成: {report_path}") def cleanup_old_backups(): now = time.time() cutoff = now - BACKUP_CONFIG["retention_days"] * 86400 for item in os.listdir(BACKUP_CONFIG["dest"]): item_path = os.path.join(BACKUP_CONFIG["dest"], item) if os.path.isdir(item_path) and os.path.getmtime(item_path) < cutoff: shutil.rmtree(item_path) logger.info(f"已清理过期备份: {item}") # 调度配置 schedule.every(1).hour.do(hourly_backup) schedule.every().sunday.at("10:00").do(weekly_report) schedule.every().sunday.at("10:30").do(cleanup_old_backups) logger.info("备份系统启动完成") while True: schedule.run_pending() time.sleep(30)
# 案例3:定时文件夹清理工具 import schedule import time import os import shutil import logging from pathlib import Path from datetime import datetime, timedelta logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[logging.FileHandler("cleanup.log", encoding="utf-8"), logging.StreamHandler()] ) logger = logging.getLogger("CleanupTool") CLEANUP_RULES = [ {"dir": Path("D:/logs"), "pattern": "*.log", "max_age_days": 7}, {"dir": Path("D:/temp"), "pattern": "*", "max_age_days": 1}, {"dir": Path("D:/backups"), "pattern": "*", "max_age_days": 90}, ] def cleanup_directory(rule): target_dir = rule["dir"] if not target_dir.exists(): logger.warning(f"目录不存在, 跳过: {target_dir}") return cutoff = datetime.now() - timedelta(days=rule["max_age_days"]) deleted_count = 0 freed_bytes = 0 for item in target_dir.rglob(rule["pattern"]): try: mtime = datetime.fromtimestamp(item.stat().st_mtime) if mtime < cutoff: if item.is_file(): freed_bytes += item.stat().st_size item.unlink() elif item.is_dir(): shutil.rmtree(item) deleted_count += 1 except Exception as e: logger.error(f"删除失败 {item}: {e}") freed_mb = freed_bytes / 1024 / 1024 logger.info(f"清理完成 [{target_dir}]: 删除{deleted_count}项, 释放{freed_mb:.2f}MB") def scheduled_cleanup(): logger.info("开始执行定时清理...") for rule in CLEANUP_RULES: try: cleanup_directory(rule) except Exception as e: logger.error(f"清理规则执行失败 {rule['dir']}: {e}") # 每天凌晨2点执行清理 schedule.every().day.at("02:00").do(scheduled_cleanup) if __name__ == "__main__": logger.info("文件夹清理已启动") while True: schedule.run_pending() time.sleep(60)

核心要点总结:

1. schedule库的核心API是 every().do() 链式调用,支持seconds/minutes/hours/days/weeks五种时间单位

2. 时间点调度使用 at("HH:MM") 支持精确到秒的定时执行

3. 任务标签 (tag) 是分组管理和批量操作任务的有效手段

4. 默认串行执行,耗时任务需配合 ThreadPoolExecutor 实现并行

5. 异常处理必须由开发者自行实现,建议使用装饰器模式统一处理

6. 日志和监控是生产环境必不可少的组成部分

7. schedule与 smtplib/shutil/openpyxl 等库整合可实现完整的办公自动化流程

8. 主循环中的 time.sleep() 决定了调度检查精度,一般1-30秒即可