专题:Python 自动化办公系统学习
关键词:Python, 自动化办公, 企业微信, 钉钉, Slack, Webhook, 消息通知, 机器人, Python自动化
一、消息平台概述
在现代企业办公协作中,消息通知的自动化工具有着至关重要的地位。企业微信、钉钉和Slack是当前最主流的三款办公协作平台,它们都提供了Webhook机器人接口,允许开发者通过HTTP请求向群聊或频道推送消息。这种机制使得程序异常告警、定时报表分发、业务监控通知等场景能够完全自动化,不再依赖人工操作。
三大平台对比
| 特性 | 企业微信机器人 | 钉钉机器人 | Slack Webhook |
| 消息类型 | 文本、Markdown、图文、文件 | text、markdown、actionCard、feedCard | JSON自定义、Block Kit富文本 |
| 安全机制 | Webhook URL密钥 | 加签 + 密钥 + IP白名单 | Webhook URL密钥 |
| @提醒 | 支持@某人/全体 | 支持@某人/全体 | 通过Block Kit实现 |
| 部署难度 | 低 | 中 | 低 |
| 国内生态 | 微信生态原生集成 | 钉钉生态丰富 | 国际团队常用 |
Webhook机器人原理
Webhook(网络钩子)是一种"反向API"机制:平台提供一个固定的URL端点,开发者通过HTTP POST请求向该URL发送JSON格式的消息负载,平台接收到请求后解析内容并推送到对应的群聊或频道。整个过程使用标准的HTTP协议,任何编程语言都可以轻松调用,无需依赖SDK。
应用场景
消息通知自动化的应用场景非常广泛:服务器CPU/内存/磁盘超阈值时自动发出告警通知;每日销售数据、运营报表定时推送到管理群;CI/CD流水线构建成功或失败时通知开发团队;业务系统的异常日志自动聚合发送给运维人员;爬虫任务完成或出错时的状态通知。这些场景的共同特点是:需要将系统产生的事件实时或定时地传递给相关人员,而人工监控成本太高。
核心理念:消息通知自动化是运维监控体系和数据报表系统的"最后一公里"——将系统产生的结果以最快速、最直观的方式传递给需要的人。
二、企业微信机器人
企业微信群机器人是最易上手、国内普及度最高的消息通知方案。在群聊中添加机器人后即可获得一个Webhook URL,通过向该URL发送POST请求即可向群内推送消息。企业微信机器人支持文本、Markdown、图文和文件四种消息类型,本文逐一给出Python实现代码。
创建Webhook机器人
在企业微信客户端中进入目标群聊,点击右上角菜单 -> 群机器人 -> 添加机器人 -> 创建一个新机器人。创建完成后会得到一个形如 https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxx-xxxx-xxxx 的Webhook URL。请妥善保管该URL,任何知道该URL的人都可以向群内发送消息。
发送文本消息
文本消息是最基础的消息类型。以下Python函数封装了企业微信机器人的文本消息发送,支持@指定成员或所有人。
import requests
import json
def send_wx_text(webhook_url, content, mentioned_list=None):
"""发送企业微信文本消息"""
data = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_list": mentioned_list or []
}
}
resp = requests.post(webhook_url, json=data)
return resp.json()
# 使用示例:发送普通文本
send_wx_text(
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx",
"服务器CPU使用率超过90%,请及时处理!"
)
# @所有人
send_wx_text(
webhook_url,
"线上数据库备份完成,@所有人 请确认。",
["@all"]
)
发送Markdown消息
Markdown消息可以呈现更丰富的排版效果,适合发送格式化的报告和通知。企业微信支持标题、加粗、引用、列表、链接等Markdown语法。
def send_wx_markdown(webhook_url, markdown_content):
"""发送企业微信Markdown消息"""
data = {
"msgtype": "markdown",
"markdown": {
"content": markdown_content
}
}
resp = requests.post(webhook_url, json=data)
return resp.json()
# 构建Markdown内容
md = """# 服务器日报
**时间:** 2026-05-05 23:00
**状态:** 正常运行
## 资源使用情况
- CPU平均使用率:32% ✅
- 内存使用率:67% ✅
- 磁盘使用率:85% ⚠️ 需关注
## 今日告警
> 无重大告警,系统运行稳定。
[查看详细监控面板](https://monitor.example.com)"""
send_wx_markdown(webhook_url, md)
发送图文消息
图文消息(news类型)支持包含标题、描述和图片链接的卡片样式,适合推送带有视觉效果的公告或文章。
def send_wx_news(webhook_url, articles):
"""发送企业微信图文消息
articles: [{"title":"标题","description":"描述",
"url":"链接","picurl":"图片地址"}, ...]
"""
data = {
"msgtype": "news",
"news": {
"articles": articles
}
}
resp = requests.post(webhook_url, json=data)
return resp.json()
send_wx_news(webhook_url, [
{
"title": "五月份销售数据简报",
"description": "本月销售额同比增长15%,环比增长8%",
"url": "https://bi.example.com/report/may",
"picurl": "https://example.com/charts/sales_may.png"
}
])
三、钉钉机器人
钉钉自定义机器人功能强大且安全机制完善。在钉钉群中添加自定义机器人时,可以选择加签或IP白名单等安全方式。钉钉机器人支持text、markdown、actionCard(行动卡片)和feedCard(图文链接)四种消息类型,其中actionCard支持交互式按钮跳转。
创建自定义机器人
在钉钉群聊中点击群设置 -> 智能群助手 -> 添加机器人 -> 自定义。创建时可以选择安全方式:加签方式会在Webhook URL后追加一个签名参数,每次请求需要携带使用密钥计算的时间戳签名;IP白名单方式则只允许指定IP地址段的请求通过。推荐使用加签方式,安全性更高。
加签方式的签名计算需要拼接时间戳和密钥后使用HMAC-SHA256算法加密,钉钉要求每次请求都附带时间戳和签名。
发送text消息(带加签)
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
def sign_dingtalk(secret):
"""钉钉加签计算"""
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc,
digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote(base64.b64encode(hmac_code))
return timestamp, sign
def send_ding_text(webhook_url, secret, content, at_mobiles=None, at_all=False):
"""发送钉钉文本消息(带加签)"""
timestamp, sign = sign_dingtalk(secret)
url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
data = {
"msgtype": "text",
"text": {"content": content},
"at": {
"atMobiles": at_mobiles or [],
"isAtAll": at_all
}
}
resp = requests.post(url, json=data)
return resp.json()
# 使用示例
send_ding_text(
webhook_url="https://oapi.dingtalk.com/robot/send?access_token=xxx",
secret="your_secret_key_here",
content="数据库连接池耗尽,请紧急排查!",
at_all=True
)
发送markdown消息
钉钉markdown消息支持完整的Markdown语法,包括标题、加粗、斜体、引用、有序/无序列表、链接、图片等。
def send_ding_markdown(webhook_url, secret, title, md_text, at_all=False):
"""发送钉钉Markdown消息"""
timestamp, sign = sign_dingtalk(secret)
url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": md_text
},
"at": {"isAtAll": at_all}
}
resp = requests.post(url, json=data)
return resp.json()
send_ding_markdown(
webhook_url, secret,
"系统监控日报",
"## 系统监控日报 \n \n"
"**日期:** 2026-05-05 \n"
"**整体状态:** 正常 \n \n"
"### 核心指标 \n"
"- QPS:2340 \n"
"- 平均响应时间:45ms \n"
"- 错误率:0.02% \n \n"
"### 需关注项 \n"
"> 磁盘使用率已达82%,建议本周扩容 \n"
" \n"
"[查看完整监控](https://monitor.example.com)"
)
发送actionCard消息
actionCard是钉钉独有的交互式卡片消息,可以包含按钮,点击按钮跳转到指定链接。非常适合发送需要人工处理的告警通知。
def send_ding_action_card(webhook_url, secret, title, md_text,
btn_orientation="1", btns=None):
"""发送钉钉actionCard消息
btn_orientation: 0-按钮竖向排列, 1-横向排列
btns: [{"title":"按钮名","actionURL":"https://..."}]
"""
timestamp, sign = sign_dingtalk(secret)
url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
data = {
"msgtype": "actionCard",
"actionCard": {
"title": title,
"text": md_text,
"btnOrientation": btn_orientation,
"btns": btns or []
}
}
resp = requests.post(url, json=data)
return resp.json()
send_ding_action_card(
webhook_url, secret,
"线上事故:支付接口超时",
"### 支付接口超时 \n \n"
"**影响范围:** 线上支付 \n"
"**时间:** 2026-05-05 14:23 \n"
"**持续时间:** 5分钟 \n \n"
"请立即查看并处理!",
btns=[
{"title": "查看日志", "actionURL": "https://log.example.com"},
{"title": "确认恢复", "actionURL": "https://ops.example.com/confirm"}
]
)
四、Slack Webhook
Slack是国际团队最常用的协作平台,其Webhook机制基于Incoming Webhooks和Block Kit构建。Incoming Webhooks提供了一个接收JSON负载的URL端点,而Block Kit则是Slack强大的消息布局框架,支持从简单文本到复杂的交互式消息块。
Incoming Webhook配置
在Slack API页面(api.slack.com)创建App,启用Incoming Webhooks功能,选择消息发布的频道,生成Webhook URL。URL格式为 https://hooks.slack.com/services/T00/B00/xxxxxx。配置完成后,向该URL发送JSON即可向指定频道发消息。
发送简单文本消息
import requests
import json
def send_slack_text(webhook_url, text):
"""发送Slack简单文本消息"""
data = {"text": text}
resp = requests.post(webhook_url, json=data)
return resp.text
send_slack_text(
"https://hooks.slack.com/services/T00/B00/xxxxxx",
":warning: 服务器CPU使用率已达95%,触发自动扩容流程。"
)
使用Block Kit布局
Block Kit是Slack最强大的消息特性,它通过"块"(Block)的嵌套组合实现丰富的消息布局。常用块类型包括:Section(文本段)、Header(标题)、Divider(分隔线)、Image(图片)、Actions(按钮)、Context(上下文信息)等。块以JSON数组的形式组织,灵活度极高。
def send_slack_block(webhook_url, blocks):
"""发送Slack Block Kit消息"""
data = {
"text": "新消息通知",
"blocks": blocks
}
resp = requests.post(webhook_url, json=data)
return resp.text
# 构建一个服务器监控告警的Block消息
blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": ":rotating_light: 服务器告警通知"}
},
{
"type": "divider"
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*服务器:*\nweb-prod-01"},
{"type": "mrkdwn", "text": "*IP地址:*\n10.0.1.23"},
{"type": "mrkdwn", "text": "*指标:*\nCPU 使用率"},
{"type": "mrkdwn", "text": "*当前值:*\n95% (阈值: 80%)"},
{"type": "mrkdwn", "text": "*时间:*\n2026-05-05 15:30:00"},
{"type": "mrkdwn", "text": "*级别:*\n:red_circle: 严重"}
]
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": "已触发自动扩容流程,预计3分钟内完成。"}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "查看监控面板"},
"url": "https://monitor.example.com",
"style": "primary"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "确认告警"},
"style": "danger"
}
]
}
]
send_slack_block(webhook_url, blocks)
消息更新与富文本
Slack支持通过response_url更新已经发送的消息,这对长时间运行的任务状态更新非常有用。例如构建任务开始时发送一条"构建中"消息,构建完成后通过response_url将其内容更新为"构建成功"或"构建失败"。
import requests
# 发送初始消息并获取response_url
initial_data = {
"text": "CI/CD 构建进行中... :hourglass_flowing_sand:",
"response_type": "in_channel"
}
resp = requests.post(webhook_url, json=initial_data)
response_url = resp.json().get("response_url")
# ... 构建任务执行中 ...
# 更新消息为构建结果
update_data = {
"text": "",
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": ":white_check_mark: 构建成功"}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*分支:*\nmain"},
{"type": "mrkdwn", "text": "*提交:*\nfeat: add payment module"},
{"type": "mrkdwn", "text": "*耗时:*\n3分25秒"},
{"type": "mrkdwn", "text": "*制品:*\napp-v2.1.0.tar.gz"}
]
}
]
}
requests.patch(response_url, json=update_data)
# 或者使用chat.update API
# client.chat_update(channel="C00", ts="1234567890.123", blocks=blocks)
经验提示:Slack Block Kit的消息编辑体验是最好的。通过 Block Kit Builder 在线工具可以实时拖拽构建消息布局并导出JSON,大幅降低开发成本。
五、消息模板封装
在实际项目中,我们通常需要向多个平台同时发送通知(例如同时通知企业微信和钉钉团队)。如果每个平台单独写一套发送逻辑,代码将变得混乱难以维护。因此需要设计统一的消息接口和模板系统,对不同平台的差异进行抽象和适配。
统一消息接口
通过定义统一的消息数据类,将不同平台的差异封装在适配器层,上层业务代码只需调用统一接口即可完成多平台广播。这种设计遵循了适配器模式和策略模式的思想。
from dataclasses import dataclass, field
from typing import Optional, List
from enum import Enum
class MessageLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class AlertMessage:
"""统一告警消息模型"""
title: str
content: str
level: MessageLevel = MessageLevel.INFO
server_name: Optional[str] = None
metric_name: Optional[str] = None
metric_value: Optional[str] = None
threshold: Optional[str] = None
timestamp: Optional[str] = None
detail_url: Optional[str] = None
tags: List[str] = field(default_factory=list)
def to_markdown(self) -> str:
"""转换为通用Markdown格式"""
lines = [f"# {self.title}"]
if self.server_name:
lines.append(f"**服务器:** {self.server_name}")
if self.metric_name and self.metric_value:
lines.append(f"**指标:** {self.metric_name} = {self.metric_value}")
if self.threshold:
lines.append(f"**阈值:** {self.threshold}")
if self.timestamp:
lines.append(f"**时间:** {self.timestamp}")
if self.detail_url:
lines.append(f"[查看详情]({self.detail_url})")
lines.append("")
lines.append(self.content)
return "\n".join(lines)
多平台适配器
为每个平台编写适配器类,将统一的AlertMessage对象转换为各平台特有的消息格式。主调度器根据配置决定将消息发送到哪些平台。
class WeWorkAdapter:
"""企业微信消息适配器"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, msg: AlertMessage) -> dict:
"""将统一消息转换为企业微信格式发送"""
if msg.level == MessageLevel.INFO and msg.metric_value:
return self._send_news(msg)
return self._send_markdown(msg)
def _send_markdown(self, msg: AlertMessage) -> dict:
content = msg.to_markdown()
data = {
"msgtype": "markdown",
"markdown": {"content": content}
}
resp = requests.post(self.webhook_url, json=data)
return resp.json()
def _send_news(self, msg: AlertMessage) -> dict:
article = {
"title": f"[{msg.level.value.upper()}] {msg.title}",
"description": f"{msg.server_name} | {msg.metric_name}: {msg.metric_value}",
"url": msg.detail_url or "https://monitor.example.com"
}
data = {"msgtype": "news", "news": {"articles": [article]}}
resp = requests.post(self.webhook_url, json=data)
return resp.json()
class DingTalkAdapter:
"""钉钉消息适配器"""
def __init__(self, webhook_url: str, secret: str):
self.webhook_url = webhook_url
self.secret = secret
def send(self, msg: AlertMessage) -> dict:
timestamp, sign = sign_dingtalk(self.secret)
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
data = {
"msgtype": "markdown",
"markdown": {
"title": msg.title,
"text": msg.to_markdown().replace("**", "**")
},
"at": {"isAtAll": msg.level in (MessageLevel.ERROR, MessageLevel.CRITICAL)}
}
resp = requests.post(url, json=data)
return resp.json()
class NotifierDispatcher:
"""消息通知调度器"""
def __init__(self):
self._adapters: list = []
def register(self, adapter) -> None:
"""注册适配器"""
self._adapters.append(adapter)
def notify_all(self, msg: AlertMessage) -> list:
"""向所有注册平台发送消息"""
results = []
for adapter in self._adapters:
try:
result = adapter.send(msg)
results.append(result)
except Exception as e:
results.append({"error": str(e)})
return results
使用统一接口
# 初始化调度器
dispatcher = NotifierDispatcher()
dispatcher.register(WeWorkAdapter("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx"))
dispatcher.register(DingTalkAdapter("https://oapi.dingtalk.com/robot/send?access_token=xxx", "secret"))
dispatcher.register(SlackTextAdapter("https://hooks.slack.com/services/T00/B00/xxxx"))
# 一键发送告警
alert = AlertMessage(
title="生产环境数据库连接异常",
content="数据库连接池已满,从连接池获取连接超时。建议立即检查数据库连接数和慢查询日志。",
level=MessageLevel.CRITICAL,
server_name="db-prod-master",
metric_name="active_connections",
metric_value="198/200",
threshold="200",
timestamp="2026-05-05 16:00:00",
detail_url="https://monitor.example.com/database"
)
results = dispatcher.notify_all(alert)
print("广播结果:", results)
六、异常与监控告警
异常自动告警是消息通知系统最核心的应用场景之一。当程序出现未捕获的异常、系统资源达到阈值或业务指标异常时,自动发送通知能够让相关人员第一时间响应。一个好的告警系统需要涵盖异常捕获、指标监控、告警分级和告警聚合等能力。
程序异常自动通知
通过装饰器捕获函数执行过程中的异常并自动发送通知,是一种低侵入性的告警方案。将告警逻辑与业务逻辑解耦,业务代码无需关心通知细节。
import functools
import traceback
def alert_on_exception(notifier, alert_msg_template=None):
"""装饰器:函数异常时自动发送告警通知"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
tb = traceback.format_exc()
msg = AlertMessage(
title=f"函数异常: {func.__name__}",
content=f"**异常类型:** {type(e).__name__}\n"
f"**异常信息:** {str(e)}\n"
f"**调用参数:** args={args}, kwargs={kwargs}\n"
f"**堆栈跟踪:**\n```\n{tb[:2000]}\n```",
level=MessageLevel.ERROR,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
try:
notifier.notify_all(msg)
except Exception as notify_err:
print(f"发送告警通知失败: {notify_err}")
raise
return wrapper
return decorator
# 使用示例
@alert_on_exception(dispatcher)
def sync_order_data(order_id: str):
"""同步订单数据(可能失败)"""
response = requests.get(f"https://api.example.com/orders/{order_id}")
response.raise_for_status()
return response.json()
# 如果sync_order_data抛出异常,自动向所有配置的平台发送告警
result = sync_order_data("ORD-20260505-0001")
系统资源监控告警
使用psutil库定期采集系统资源指标,与阈值比较后决定是否触发告警。支持CPU、内存、磁盘、网络等多个维度的监控。
import psutil
import time
from datetime import datetime
class SystemMonitor:
"""系统资源监控器"""
def __init__(self, notifier: NotifierDispatcher, check_interval=60):
self.notifier = notifier
self.check_interval = check_interval
self.thresholds = {
"cpu_percent": 80,
"memory_percent": 85,
"disk_percent": 90,
"load_average": 10.0
}
def check_once(self):
"""执行一次检查"""
alerts = []
# CPU检查
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > self.thresholds["cpu_percent"]:
alerts.append(AlertMessage(
title="CPU使用率过高",
level=MessageLevel.WARNING,
server_name=socket.gethostname(),
metric_name="cpu_percent",
metric_value=f"{cpu_percent}%",
threshold=f"{self.thresholds['cpu_percent']}%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"CPU使用率为 {cpu_percent}%,超过阈值 {self.thresholds['cpu_percent']}%。"
))
# 内存检查
memory = psutil.virtual_memory()
if memory.percent > self.thresholds["memory_percent"]:
alerts.append(AlertMessage(
title="内存使用率过高",
level=MessageLevel.WARNING,
metric_name="memory_percent",
metric_value=f"{memory.percent}%",
threshold=f"{self.thresholds['memory_percent']}%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"内存使用率为 {memory.percent}%,已用 {memory.used//1024//1024}MB"
f" / 总计 {memory.total//1024//1024}MB"
))
# 发送告警
for alert in alerts:
self.notifier.notify_all(alert)
return len(alerts)
def start(self):
"""启动持续监控循环"""
print(f"系统监控已启动,检查间隔 {self.check_interval} 秒")
while True:
try:
alert_count = self.check_once()
if alert_count > 0:
print(f"[{datetime.now()}] 触发了 {alert_count} 条告警")
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("监控已停止")
break
except Exception as e:
print(f"监控异常: {e}")
time.sleep(self.check_interval)
# 启动监控
# monitor = SystemMonitor(dispatcher)
# monitor.start() # 持续运行
告警级别与聚合
不同的告警级别应该有不同的通知策略:INFO级别记录即可,WARNING发送到普通通知群,ERROR和CRITICAL则需要@所有人并发送到紧急响应群。为避免告警风暴(短时间内大量重复告警),需要实现告警聚合:相同内容的告警在时间窗口内只发送一次,或合并为一条聚合消息。
import time
from collections import defaultdict
class AlertAggregator:
"""告警聚合器:避免告警风暴"""
def __init__(self, window_seconds=300):
self.window_seconds = window_seconds
self._recent_alerts: dict = {} # key -> timestamp
def should_send(self, alert_key: str) -> bool:
"""判断是否应该发送告警(相同key在时间窗口内不重复发送)"""
now = time.time()
last_sent = self._recent_alerts.get(alert_key, 0)
if now - last_sent > self.window_seconds:
self._recent_alerts[alert_key] = now
return True
return False
def aggregate_key(self, msg: AlertMessage) -> str:
"""生成告警去重key"""
return f"{msg.server_name}:{msg.metric_name}"
# 使用告警聚合
aggregator = AlertAggregator(window_seconds=300)
def smart_notify(notifier, msg: AlertMessage):
"""智能通知:带聚合功能的通知发送"""
key = aggregator.aggregate_key(msg)
if aggregator.should_send(key):
notifier.notify_all(msg)
print(f"[已发送] {msg.title}")
else:
print(f"[已聚合] {msg.title} (5分钟内已发送过相同告警)")
# 持续检查中调用smart_notify代替直接notifier.notify_all
七、定时报告推送
定时报告推送是消息通知自动化的另一个重要场景。系统按照预设的日程(每日、每周、每月)自动生成数据报告,通过消息机器人推送到管理群,让决策者无需登录系统即可掌握关键业务指标。结合定时任务调度器(如APScheduler、Crontab、Celery Beat),可以构建完整的自动化报告体系。
使用APScheduler定时推送
APScheduler是Python中最强大的定时任务库,支持cron表达式、固定间隔、一次性任务等多种调度方式。以下示例展示了如何每天上午9点自动推送销售日报。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta
def generate_sales_report():
"""生成销售日报数据(模拟)"""
today = datetime.now().strftime("%Y-%m-%d")
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# 模拟从数据库或API获取数据
report_data = {
"date": yesterday,
"total_orders": 1523,
"total_revenue": 458900.00,
"avg_order_value": 301.38,
"new_users": 287,
"top_product": "Python自动化课程",
"top_category": "编程教育",
"completion_rate": "94.5%"
}
return report_data
def push_daily_sales_report(notifier: NotifierDispatcher):
"""生成并推送销售日报"""
data = generate_sales_report()
# 构建Markdown报告
md_report = f"""# 销售日报
**日期:** {data['date']}
## 核心数据
| 指标 | 数值 |
|------|------|
| 订单总数 | {data['total_orders']} |
| 总销售额 | ¥{data['total_revenue']:,.2f} |
| 平均客单价 | ¥{data['avg_order_value']:.2f} |
| 新增用户 | {data['new_users']} |
## 热销商品
- 商品:{data['top_product']}
- 类目:{data['top_category']}
- 完成率:{data['completion_rate']}
> 数据更新时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
msg = AlertMessage(
title=f"销售日报 - {data['date']}",
content=md_report,
level=MessageLevel.INFO,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
return notifier.notify_all(msg)
# 配置定时任务
scheduler = BlockingScheduler()
# 每天9:00推送销售日报
scheduler.add_job(
push_daily_sales_report,
trigger=CronTrigger(hour=9, minute=0),
args=[dispatcher],
id="daily_sales_report",
name="每日销售日报推送"
)
# 每周一10:00推送周报
scheduler.add_job(
push_weekly_report,
trigger=CronTrigger(day_of_week="mon", hour=10, minute=0),
args=[dispatcher],
id="weekly_report"
)
# 每月1日10:00推送月报
scheduler.add_job(
push_monthly_report,
trigger=CronTrigger(day=1, hour=10, minute=0),
args=[dispatcher],
id="monthly_report"
)
# scheduler.start() # 启动定时调度
报告模板定制
使用Jinja2模板引擎可以更好地分离报告逻辑和展示格式。将报告模板定义为独立的模板文件,报告生成函数只负责提供数据,模板负责渲染最终的Markdown或HTML内容,便于维护和定制。
from jinja2 import Template
# 定义报告模板
REPORT_TEMPLATE = """
# {{ title }}
**日期:** {{ report_date }}
**生成时间:** {{ generate_time }}
## 核心指标概览
{% for metric in metrics %}
- **{{ metric.name }}:** {{ metric.value }} {{ metric.unit }}
{% if metric.change %}({{ metric.change }}){% endif %}
{% endfor %}
## 详细数据
| 指标 | 今日 | 昨日 | 环比 |
|------|------|------|------|
{% for row in table_rows %}
| {{ row.name }} | {{ row.today }} | {{ row.yesterday }} | {{ row.change }} |
{% endfor %}
{% if alerts %}
## 需关注项
{% for alert in alerts %}
> {{ alert }}
{% endfor %}
{% endif %}
---
*报告由自动化系统生成*
"""
def render_report(template_str: str, **kwargs) -> str:
"""使用Jinja2渲染报告Markdown"""
template = Template(template_str)
return template.render(**kwargs)
# 使用模板生成报告
report_md = render_report(
REPORT_TEMPLATE,
title="运营日报",
report_date="2026-05-05",
generate_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
metrics=[
{"name": "日活用户(DAU)", "value": "12,845", "unit": "人", "change": "+5.2%"},
{"name": "总收入", "value": "458,900", "unit": "元", "change": "+8.1%"},
{"name": "转化率", "value": "3.42", "unit": "%", "change": "+0.3%"}
],
table_rows=[
{"name": "订单数", "today": "1,523", "yesterday": "1,412", "change": "+7.9%"},
{"name": "新增用户", "today": "287", "yesterday": "265", "change": "+8.3%"},
{"name": "退款率", "today": "1.2%", "yesterday": "1.5%", "change": "-0.3%"}
],
alerts=["退款处理队列积压50单,建议增加处理人手"]
)
# 推送渲染后的报告
msg = AlertMessage(
title="运营日报 - 2026-05-05",
content=report_md,
level=MessageLevel.INFO
)
dispatcher.notify_all(msg)
自动化报告最佳实践:① 报告模板使用Jinja2管理,与业务逻辑分离;② Markdown格式兼顾各平台兼容性;③ 定时任务使用APScheduler管理,支持持久化和故障恢复;④ 报告摘要信息放在消息顶部,详细数据可提供链接跳转到BI系统查看。
八、实战案例
以下三个实战案例涵盖了消息通知自动化的典型场景:服务器资源监控告警、每日销售数据推送和CI/CD构建通知。这些案例可以直接在实际项目中复用和扩展。
案例一:服务器CPU/内存监控告警
本案例综合运用了psutil系统监控、多平台消息通知和告警聚合功能。通过SystemMonitor类采集服务器指标,将告警信息通过企业微信和钉钉同时推送给运维团队。
import socket
import psutil
import time
from datetime import datetime
class ServerAlertSystem:
"""服务器告警系统"""
def __init__(self, notifier: NotifierDispatcher, hostname=None):
self.notifier = notifier
self.hostname = hostname or socket.gethostname()
self._last_alert_time = {}
def check_and_alert(self):
"""检查服务器状态并推送告警"""
alerts = []
# CPU监控
cpu = psutil.cpu_percent(interval=2)
if cpu > 90:
alerts.append(AlertMessage(
title=f"[严重] {self.hostname} CPU过载",
level=MessageLevel.CRITICAL,
server_name=self.hostname,
metric_name="CPU使用率",
metric_value=f"{cpu}%",
threshold="90%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"CPU使用率已达 {cpu}%,超过严重阈值90%。\n"
f"建议立即登录服务器检查异常进程。"
))
elif cpu > 80:
alerts.append(AlertMessage(
title=f"[警告] {self.hostname} CPU偏高",
level=MessageLevel.WARNING,
server_name=self.hostname,
metric_name="CPU使用率",
metric_value=f"{cpu}%",
threshold="80%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"CPU使用率为 {cpu}%,超过警告阈值80%。"
))
# 内存监控
mem = psutil.virtual_memory()
if mem.percent > 90:
alerts.append(AlertMessage(
title=f"[严重] {self.hostname} 内存不足",
level=MessageLevel.CRITICAL,
server_name=self.hostname,
metric_name="内存使用率",
metric_value=f"{mem.percent}%",
threshold="90%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"内存使用率 {mem.percent}%,已用 {mem.used//1024**3}GB"
f" / 总计 {mem.total//1024**3}GB"
))
# 磁盘监控
for partition in psutil.disk_partitions():
usage = psutil.disk_usage(partition.mountpoint)
if usage.percent > 90:
alerts.append(AlertMessage(
title=f"[严重] {self.hostname} 磁盘空间不足",
level=MessageLevel.CRITICAL,
server_name=self.hostname,
metric_name=f"磁盘({partition.mountpoint})",
metric_value=f"{usage.percent}%",
threshold="90%",
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
content=f"分区 {partition.mountpoint} 使用率 {usage.percent}%"
))
# 发送告警
for alert in alerts:
self.notifier.notify_all(alert)
return alerts
# 部署运行
if __name__ == "__main__":
dispatcher = NotifierDispatcher()
dispatcher.register(WeWorkAdapter("企业微信Webhook URL"))
dispatcher.register(DingTalkAdapter("钉钉Webhook URL", "加签密钥"))
monitor = ServerAlertSystem(dispatcher)
while True:
alerts = monitor.check_and_alert()
if alerts:
print(f"[{datetime.now()}] 触发了 {len(alerts)} 条告警")
time.sleep(60) # 每分钟检查一次
案例二:每日销售数据推送
每天定时从数据库获取销售数据,生成格式化的报表后通过企业微信Markdown和钉钉同时推送到管理层群。下面示例使用SQLite作为数据源,实际项目中可以替换为MySQL或PostgreSQL。
import sqlite3
from datetime import datetime, timedelta
def fetch_daily_sales(db_path: str) -> dict:
"""从数据库查询昨日销售数据"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# 模拟查询(实际项目中替换为真实的SQL查询)
cursor.execute("""
SELECT COUNT(*), SUM(amount), AVG(amount)
FROM orders
WHERE DATE(created_at) = ?
""", (yesterday,))
row = cursor.fetchone()
cursor.execute("""
SELECT COUNT(DISTINCT user_id)
FROM orders
WHERE DATE(created_at) = ?
""", (yesterday,))
active_users = cursor.fetchone()[0]
conn.close()
return {
"date": yesterday,
"total_orders": row[0] or 0,
"total_revenue": float(row[1] or 0),
"avg_order_value": round(float(row[2] or 0), 2),
"active_users": active_users or 0
}
def push_sales_report_to_all(notifier: NotifierDispatcher, db_path: str):
"""从数据库获取数据并推送到所有平台"""
data = fetch_daily_sales(db_path)
md = f"""# 📊 销售日报 - {data['date']}
## 关键指标
- **订单总数:** {data['total_orders']} 单
- **总销售额:** ¥{data['total_revenue']:,.2f}
- **平均客单价:** ¥{data['avg_order_value']:.2f}
- **下单用户数:** {data['active_users']} 人
> 数据来源:销售系统自动统计
> 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
msg = AlertMessage(
title=f"销售日报 - {data['date']}",
content=md,
level=MessageLevel.INFO,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
results = notifier.notify_all(msg)
return results
# 配合APScheduler定时执行
# scheduler.add_job(
# push_sales_report_to_all,
# CronTrigger(hour=9, minute=30),
# args=[dispatcher, "/data/sales.db"]
# )
案例三:CI/CD构建通知
在CI/CD流水线(如GitHub Actions、GitLab CI、Jenkins)中集成消息通知,当构建开始、成功或失败时向团队频道发送实时通知。以GitHub Actions为例,在workflow文件中增加一个步骤调用Python通知脚本。
# .github/workflows/deploy.yml 中的通知步骤
# ...
# - name: Send deployment notification
# run: python scripts/notify_deploy.py
# env:
# WEWORK_WEBHOOK: ${{ secrets.WEWORK_WEBHOOK }}
# DINGTALK_WEBHOOK: ${{ secrets.DINGTALK_WEBHOOK }}
# DINGTALK_SECRET: ${{ secrets.DINGTALK_SECRET }}
# DEPLOY_STATUS: ${{ job.status }}
# COMMIT_SHA: ${{ github.sha }}
# BRANCH: ${{ github.ref_name }}
# scripts/notify_deploy.py
import os
import sys
from datetime import datetime
def send_ci_notification():
"""发送CI/CD构建通知"""
status = os.environ.get("DEPLOY_STATUS", "unknown")
branch = os.environ.get("BRANCH", "unknown")
commit_sha = os.environ.get("COMMIT_SHA", "unknown")[:8]
# 定时器图标
icon_map = {
"success": "✅",
"failure": "❌",
"cancelled": "⏹️"
}
md = f"""# {icon_map.get(status, '❓')} CI/CD 部署通知
**项目:** web-app-backend
**分支:** {branch}
**提交:** `{commit_sha}`
**状态:** {'成功' if status == 'success' else '失败' if status == 'failure' else '已取消'}
**时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
> 查看详情:https://github.com/org/repo/actions
"""
dispatcher = NotifierDispatcher()
wx_webhook = os.environ.get("WEWORK_WEBHOOK")
ding_webhook = os.environ.get("DINGTALK_WEBHOOK")
ding_secret = os.environ.get("DINGTALK_SECRET")
if wx_webhook:
dispatcher.register(WeWorkAdapter(wx_webhook))
if ding_webhook and ding_secret:
dispatcher.register(DingTalkAdapter(ding_webhook, ding_secret))
msg = AlertMessage(
title=f"CI/CD {status} - {branch}",
content=md,
level=MessageLevel.INFO if status == "success" else MessageLevel.ERROR,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
dispatcher.notify_all(msg)
if __name__ == "__main__":
send_ci_notification()
实战总结:消息通知自动化的核心价值在于"让系统自动找人"而非"人找系统"。通过统一的消息封装和适配器模式,一套代码即可覆盖所有主流协作平台。建议从简单的单平台文本通知开始,逐步扩展到多平台广播、模板化报告和智能告警聚合,构建完整的自动化通知体系。