企业微信/钉钉/Slack消息通知自动化

Python 办公自动化专题 · 打通办公协作平台的自动化消息通道

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, 企业微信, 钉钉, Slack, Webhook, 消息通知, 机器人, Python自动化

一、消息平台概述

在现代企业办公协作中,消息通知的自动化工具有着至关重要的地位。企业微信、钉钉和Slack是当前最主流的三款办公协作平台,它们都提供了Webhook机器人接口,允许开发者通过HTTP请求向群聊或频道推送消息。这种机制使得程序异常告警、定时报表分发、业务监控通知等场景能够完全自动化,不再依赖人工操作。

三大平台对比

特性企业微信机器人钉钉机器人Slack Webhook
消息类型文本、Markdown、图文、文件text、markdown、actionCard、feedCardJSON自定义、Block Kit富文本
安全机制Webhook URL密钥加签 + 密钥 + IP白名单Webhook URL密钥
@提醒支持@某人/全体支持@某人/全体通过Block Kit实现
部署难度
国内生态微信生态原生集成钉钉生态丰富国际团队常用

Webhook机器人原理

Webhook(网络钩子)是一种"反向API"机制:平台提供一个固定的URL端点,开发者通过HTTP POST请求向该URL发送JSON格式的消息负载,平台接收到请求后解析内容并推送到对应的群聊或频道。整个过程使用标准的HTTP协议,任何编程语言都可以轻松调用,无需依赖SDK。

应用场景

消息通知自动化的应用场景非常广泛:服务器CPU/内存/磁盘超阈值时自动发出告警通知;每日销售数据、运营报表定时推送到管理群;CI/CD流水线构建成功或失败时通知开发团队;业务系统的异常日志自动聚合发送给运维人员;爬虫任务完成或出错时的状态通知。这些场景的共同特点是:需要将系统产生的事件实时或定时地传递给相关人员,而人工监控成本太高。

核心理念:消息通知自动化是运维监控体系和数据报表系统的"最后一公里"——将系统产生的结果以最快速、最直观的方式传递给需要的人。

二、企业微信机器人

企业微信群机器人是最易上手、国内普及度最高的消息通知方案。在群聊中添加机器人后即可获得一个Webhook URL,通过向该URL发送POST请求即可向群内推送消息。企业微信机器人支持文本、Markdown、图文和文件四种消息类型,本文逐一给出Python实现代码。

创建Webhook机器人

在企业微信客户端中进入目标群聊,点击右上角菜单 -> 群机器人 -> 添加机器人 -> 创建一个新机器人。创建完成后会得到一个形如 https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxxx-xxxx-xxxx 的Webhook URL。请妥善保管该URL,任何知道该URL的人都可以向群内发送消息。

发送文本消息

文本消息是最基础的消息类型。以下Python函数封装了企业微信机器人的文本消息发送,支持@指定成员或所有人。

import requests import json def send_wx_text(webhook_url, content, mentioned_list=None): """发送企业微信文本消息""" data = { "msgtype": "text", "text": { "content": content, "mentioned_list": mentioned_list or [] } } resp = requests.post(webhook_url, json=data) return resp.json() # 使用示例:发送普通文本 send_wx_text( "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx", "服务器CPU使用率超过90%,请及时处理!" ) # @所有人 send_wx_text( webhook_url, "线上数据库备份完成,@所有人 请确认。", ["@all"] )

发送Markdown消息

Markdown消息可以呈现更丰富的排版效果,适合发送格式化的报告和通知。企业微信支持标题、加粗、引用、列表、链接等Markdown语法。

def send_wx_markdown(webhook_url, markdown_content): """发送企业微信Markdown消息""" data = { "msgtype": "markdown", "markdown": { "content": markdown_content } } resp = requests.post(webhook_url, json=data) return resp.json() # 构建Markdown内容 md = """# 服务器日报 **时间:** 2026-05-05 23:00 **状态:** 正常运行 ## 资源使用情况 - CPU平均使用率:32% ✅ - 内存使用率:67% ✅ - 磁盘使用率:85% ⚠️ 需关注 ## 今日告警 > 无重大告警,系统运行稳定。 [查看详细监控面板](https://monitor.example.com)""" send_wx_markdown(webhook_url, md)

发送图文消息

图文消息(news类型)支持包含标题、描述和图片链接的卡片样式,适合推送带有视觉效果的公告或文章。

def send_wx_news(webhook_url, articles): """发送企业微信图文消息 articles: [{"title":"标题","description":"描述", "url":"链接","picurl":"图片地址"}, ...] """ data = { "msgtype": "news", "news": { "articles": articles } } resp = requests.post(webhook_url, json=data) return resp.json() send_wx_news(webhook_url, [ { "title": "五月份销售数据简报", "description": "本月销售额同比增长15%,环比增长8%", "url": "https://bi.example.com/report/may", "picurl": "https://example.com/charts/sales_may.png" } ])

三、钉钉机器人

钉钉自定义机器人功能强大且安全机制完善。在钉钉群中添加自定义机器人时,可以选择加签或IP白名单等安全方式。钉钉机器人支持text、markdown、actionCard(行动卡片)和feedCard(图文链接)四种消息类型,其中actionCard支持交互式按钮跳转。

创建自定义机器人

在钉钉群聊中点击群设置 -> 智能群助手 -> 添加机器人 -> 自定义。创建时可以选择安全方式:加签方式会在Webhook URL后追加一个签名参数,每次请求需要携带使用密钥计算的时间戳签名;IP白名单方式则只允许指定IP地址段的请求通过。推荐使用加签方式,安全性更高。

加签方式的签名计算需要拼接时间戳和密钥后使用HMAC-SHA256算法加密,钉钉要求每次请求都附带时间戳和签名。

发送text消息(带加签)

import requests import json import time import hmac import hashlib import base64 import urllib.parse def sign_dingtalk(secret): """钉钉加签计算""" timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode('utf-8') string_to_sign = f'{timestamp}\n{secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote(base64.b64encode(hmac_code)) return timestamp, sign def send_ding_text(webhook_url, secret, content, at_mobiles=None, at_all=False): """发送钉钉文本消息(带加签)""" timestamp, sign = sign_dingtalk(secret) url = f"{webhook_url}×tamp={timestamp}&sign={sign}" data = { "msgtype": "text", "text": {"content": content}, "at": { "atMobiles": at_mobiles or [], "isAtAll": at_all } } resp = requests.post(url, json=data) return resp.json() # 使用示例 send_ding_text( webhook_url="https://oapi.dingtalk.com/robot/send?access_token=xxx", secret="your_secret_key_here", content="数据库连接池耗尽,请紧急排查!", at_all=True )

发送markdown消息

钉钉markdown消息支持完整的Markdown语法,包括标题、加粗、斜体、引用、有序/无序列表、链接、图片等。

def send_ding_markdown(webhook_url, secret, title, md_text, at_all=False): """发送钉钉Markdown消息""" timestamp, sign = sign_dingtalk(secret) url = f"{webhook_url}×tamp={timestamp}&sign={sign}" data = { "msgtype": "markdown", "markdown": { "title": title, "text": md_text }, "at": {"isAtAll": at_all} } resp = requests.post(url, json=data) return resp.json() send_ding_markdown( webhook_url, secret, "系统监控日报", "## 系统监控日报 \n \n" "**日期:** 2026-05-05 \n" "**整体状态:** 正常 \n \n" "### 核心指标 \n" "- QPS:2340 \n" "- 平均响应时间:45ms \n" "- 错误率:0.02% \n \n" "### 需关注项 \n" "> 磁盘使用率已达82%,建议本周扩容 \n" " \n" "[查看完整监控](https://monitor.example.com)" )

发送actionCard消息

actionCard是钉钉独有的交互式卡片消息,可以包含按钮,点击按钮跳转到指定链接。非常适合发送需要人工处理的告警通知。

def send_ding_action_card(webhook_url, secret, title, md_text, btn_orientation="1", btns=None): """发送钉钉actionCard消息 btn_orientation: 0-按钮竖向排列, 1-横向排列 btns: [{"title":"按钮名","actionURL":"https://..."}] """ timestamp, sign = sign_dingtalk(secret) url = f"{webhook_url}×tamp={timestamp}&sign={sign}" data = { "msgtype": "actionCard", "actionCard": { "title": title, "text": md_text, "btnOrientation": btn_orientation, "btns": btns or [] } } resp = requests.post(url, json=data) return resp.json() send_ding_action_card( webhook_url, secret, "线上事故:支付接口超时", "### 支付接口超时 \n \n" "**影响范围:** 线上支付 \n" "**时间:** 2026-05-05 14:23 \n" "**持续时间:** 5分钟 \n \n" "请立即查看并处理!", btns=[ {"title": "查看日志", "actionURL": "https://log.example.com"}, {"title": "确认恢复", "actionURL": "https://ops.example.com/confirm"} ] )

四、Slack Webhook

Slack是国际团队最常用的协作平台,其Webhook机制基于Incoming Webhooks和Block Kit构建。Incoming Webhooks提供了一个接收JSON负载的URL端点,而Block Kit则是Slack强大的消息布局框架,支持从简单文本到复杂的交互式消息块。

Incoming Webhook配置

在Slack API页面(api.slack.com)创建App,启用Incoming Webhooks功能,选择消息发布的频道,生成Webhook URL。URL格式为 https://hooks.slack.com/services/T00/B00/xxxxxx。配置完成后,向该URL发送JSON即可向指定频道发消息。

发送简单文本消息

import requests import json def send_slack_text(webhook_url, text): """发送Slack简单文本消息""" data = {"text": text} resp = requests.post(webhook_url, json=data) return resp.text send_slack_text( "https://hooks.slack.com/services/T00/B00/xxxxxx", ":warning: 服务器CPU使用率已达95%,触发自动扩容流程。" )

使用Block Kit布局

Block Kit是Slack最强大的消息特性,它通过"块"(Block)的嵌套组合实现丰富的消息布局。常用块类型包括:Section(文本段)、Header(标题)、Divider(分隔线)、Image(图片)、Actions(按钮)、Context(上下文信息)等。块以JSON数组的形式组织,灵活度极高。

def send_slack_block(webhook_url, blocks): """发送Slack Block Kit消息""" data = { "text": "新消息通知", "blocks": blocks } resp = requests.post(webhook_url, json=data) return resp.text # 构建一个服务器监控告警的Block消息 blocks = [ { "type": "header", "text": {"type": "plain_text", "text": ":rotating_light: 服务器告警通知"} }, { "type": "divider" }, { "type": "section", "fields": [ {"type": "mrkdwn", "text": "*服务器:*\nweb-prod-01"}, {"type": "mrkdwn", "text": "*IP地址:*\n10.0.1.23"}, {"type": "mrkdwn", "text": "*指标:*\nCPU 使用率"}, {"type": "mrkdwn", "text": "*当前值:*\n95% (阈值: 80%)"}, {"type": "mrkdwn", "text": "*时间:*\n2026-05-05 15:30:00"}, {"type": "mrkdwn", "text": "*级别:*\n:red_circle: 严重"} ] }, { "type": "section", "text": {"type": "mrkdwn", "text": "已触发自动扩容流程,预计3分钟内完成。"} }, { "type": "actions", "elements": [ { "type": "button", "text": {"type": "plain_text", "text": "查看监控面板"}, "url": "https://monitor.example.com", "style": "primary" }, { "type": "button", "text": {"type": "plain_text", "text": "确认告警"}, "style": "danger" } ] } ] send_slack_block(webhook_url, blocks)

消息更新与富文本

Slack支持通过response_url更新已经发送的消息,这对长时间运行的任务状态更新非常有用。例如构建任务开始时发送一条"构建中"消息,构建完成后通过response_url将其内容更新为"构建成功"或"构建失败"。

import requests # 发送初始消息并获取response_url initial_data = { "text": "CI/CD 构建进行中... :hourglass_flowing_sand:", "response_type": "in_channel" } resp = requests.post(webhook_url, json=initial_data) response_url = resp.json().get("response_url") # ... 构建任务执行中 ... # 更新消息为构建结果 update_data = { "text": "", "blocks": [ { "type": "header", "text": {"type": "plain_text", "text": ":white_check_mark: 构建成功"} }, { "type": "section", "fields": [ {"type": "mrkdwn", "text": "*分支:*\nmain"}, {"type": "mrkdwn", "text": "*提交:*\nfeat: add payment module"}, {"type": "mrkdwn", "text": "*耗时:*\n3分25秒"}, {"type": "mrkdwn", "text": "*制品:*\napp-v2.1.0.tar.gz"} ] } ] } requests.patch(response_url, json=update_data) # 或者使用chat.update API # client.chat_update(channel="C00", ts="1234567890.123", blocks=blocks)

经验提示:Slack Block Kit的消息编辑体验是最好的。通过 Block Kit Builder 在线工具可以实时拖拽构建消息布局并导出JSON,大幅降低开发成本。

五、消息模板封装

在实际项目中,我们通常需要向多个平台同时发送通知(例如同时通知企业微信和钉钉团队)。如果每个平台单独写一套发送逻辑,代码将变得混乱难以维护。因此需要设计统一的消息接口和模板系统,对不同平台的差异进行抽象和适配。

统一消息接口

通过定义统一的消息数据类,将不同平台的差异封装在适配器层,上层业务代码只需调用统一接口即可完成多平台广播。这种设计遵循了适配器模式和策略模式的思想。

from dataclasses import dataclass, field from typing import Optional, List from enum import Enum class MessageLevel(Enum): INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" @dataclass class AlertMessage: """统一告警消息模型""" title: str content: str level: MessageLevel = MessageLevel.INFO server_name: Optional[str] = None metric_name: Optional[str] = None metric_value: Optional[str] = None threshold: Optional[str] = None timestamp: Optional[str] = None detail_url: Optional[str] = None tags: List[str] = field(default_factory=list) def to_markdown(self) -> str: """转换为通用Markdown格式""" lines = [f"# {self.title}"] if self.server_name: lines.append(f"**服务器:** {self.server_name}") if self.metric_name and self.metric_value: lines.append(f"**指标:** {self.metric_name} = {self.metric_value}") if self.threshold: lines.append(f"**阈值:** {self.threshold}") if self.timestamp: lines.append(f"**时间:** {self.timestamp}") if self.detail_url: lines.append(f"[查看详情]({self.detail_url})") lines.append("") lines.append(self.content) return "\n".join(lines)

多平台适配器

为每个平台编写适配器类,将统一的AlertMessage对象转换为各平台特有的消息格式。主调度器根据配置决定将消息发送到哪些平台。

class WeWorkAdapter: """企业微信消息适配器""" def __init__(self, webhook_url: str): self.webhook_url = webhook_url def send(self, msg: AlertMessage) -> dict: """将统一消息转换为企业微信格式发送""" if msg.level == MessageLevel.INFO and msg.metric_value: return self._send_news(msg) return self._send_markdown(msg) def _send_markdown(self, msg: AlertMessage) -> dict: content = msg.to_markdown() data = { "msgtype": "markdown", "markdown": {"content": content} } resp = requests.post(self.webhook_url, json=data) return resp.json() def _send_news(self, msg: AlertMessage) -> dict: article = { "title": f"[{msg.level.value.upper()}] {msg.title}", "description": f"{msg.server_name} | {msg.metric_name}: {msg.metric_value}", "url": msg.detail_url or "https://monitor.example.com" } data = {"msgtype": "news", "news": {"articles": [article]}} resp = requests.post(self.webhook_url, json=data) return resp.json() class DingTalkAdapter: """钉钉消息适配器""" def __init__(self, webhook_url: str, secret: str): self.webhook_url = webhook_url self.secret = secret def send(self, msg: AlertMessage) -> dict: timestamp, sign = sign_dingtalk(self.secret) url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}" data = { "msgtype": "markdown", "markdown": { "title": msg.title, "text": msg.to_markdown().replace("**", "**") }, "at": {"isAtAll": msg.level in (MessageLevel.ERROR, MessageLevel.CRITICAL)} } resp = requests.post(url, json=data) return resp.json() class NotifierDispatcher: """消息通知调度器""" def __init__(self): self._adapters: list = [] def register(self, adapter) -> None: """注册适配器""" self._adapters.append(adapter) def notify_all(self, msg: AlertMessage) -> list: """向所有注册平台发送消息""" results = [] for adapter in self._adapters: try: result = adapter.send(msg) results.append(result) except Exception as e: results.append({"error": str(e)}) return results

使用统一接口

# 初始化调度器 dispatcher = NotifierDispatcher() dispatcher.register(WeWorkAdapter("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx")) dispatcher.register(DingTalkAdapter("https://oapi.dingtalk.com/robot/send?access_token=xxx", "secret")) dispatcher.register(SlackTextAdapter("https://hooks.slack.com/services/T00/B00/xxxx")) # 一键发送告警 alert = AlertMessage( title="生产环境数据库连接异常", content="数据库连接池已满,从连接池获取连接超时。建议立即检查数据库连接数和慢查询日志。", level=MessageLevel.CRITICAL, server_name="db-prod-master", metric_name="active_connections", metric_value="198/200", threshold="200", timestamp="2026-05-05 16:00:00", detail_url="https://monitor.example.com/database" ) results = dispatcher.notify_all(alert) print("广播结果:", results)

六、异常与监控告警

异常自动告警是消息通知系统最核心的应用场景之一。当程序出现未捕获的异常、系统资源达到阈值或业务指标异常时,自动发送通知能够让相关人员第一时间响应。一个好的告警系统需要涵盖异常捕获、指标监控、告警分级和告警聚合等能力。

程序异常自动通知

通过装饰器捕获函数执行过程中的异常并自动发送通知,是一种低侵入性的告警方案。将告警逻辑与业务逻辑解耦,业务代码无需关心通知细节。

import functools import traceback def alert_on_exception(notifier, alert_msg_template=None): """装饰器:函数异常时自动发送告警通知""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: tb = traceback.format_exc() msg = AlertMessage( title=f"函数异常: {func.__name__}", content=f"**异常类型:** {type(e).__name__}\n" f"**异常信息:** {str(e)}\n" f"**调用参数:** args={args}, kwargs={kwargs}\n" f"**堆栈跟踪:**\n```\n{tb[:2000]}\n```", level=MessageLevel.ERROR, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) try: notifier.notify_all(msg) except Exception as notify_err: print(f"发送告警通知失败: {notify_err}") raise return wrapper return decorator # 使用示例 @alert_on_exception(dispatcher) def sync_order_data(order_id: str): """同步订单数据(可能失败)""" response = requests.get(f"https://api.example.com/orders/{order_id}") response.raise_for_status() return response.json() # 如果sync_order_data抛出异常,自动向所有配置的平台发送告警 result = sync_order_data("ORD-20260505-0001")

系统资源监控告警

使用psutil库定期采集系统资源指标,与阈值比较后决定是否触发告警。支持CPU、内存、磁盘、网络等多个维度的监控。

import psutil import time from datetime import datetime class SystemMonitor: """系统资源监控器""" def __init__(self, notifier: NotifierDispatcher, check_interval=60): self.notifier = notifier self.check_interval = check_interval self.thresholds = { "cpu_percent": 80, "memory_percent": 85, "disk_percent": 90, "load_average": 10.0 } def check_once(self): """执行一次检查""" alerts = [] # CPU检查 cpu_percent = psutil.cpu_percent(interval=1) if cpu_percent > self.thresholds["cpu_percent"]: alerts.append(AlertMessage( title="CPU使用率过高", level=MessageLevel.WARNING, server_name=socket.gethostname(), metric_name="cpu_percent", metric_value=f"{cpu_percent}%", threshold=f"{self.thresholds['cpu_percent']}%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"CPU使用率为 {cpu_percent}%,超过阈值 {self.thresholds['cpu_percent']}%。" )) # 内存检查 memory = psutil.virtual_memory() if memory.percent > self.thresholds["memory_percent"]: alerts.append(AlertMessage( title="内存使用率过高", level=MessageLevel.WARNING, metric_name="memory_percent", metric_value=f"{memory.percent}%", threshold=f"{self.thresholds['memory_percent']}%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"内存使用率为 {memory.percent}%,已用 {memory.used//1024//1024}MB" f" / 总计 {memory.total//1024//1024}MB" )) # 发送告警 for alert in alerts: self.notifier.notify_all(alert) return len(alerts) def start(self): """启动持续监控循环""" print(f"系统监控已启动,检查间隔 {self.check_interval} 秒") while True: try: alert_count = self.check_once() if alert_count > 0: print(f"[{datetime.now()}] 触发了 {alert_count} 条告警") time.sleep(self.check_interval) except KeyboardInterrupt: print("监控已停止") break except Exception as e: print(f"监控异常: {e}") time.sleep(self.check_interval) # 启动监控 # monitor = SystemMonitor(dispatcher) # monitor.start() # 持续运行

告警级别与聚合

不同的告警级别应该有不同的通知策略:INFO级别记录即可,WARNING发送到普通通知群,ERROR和CRITICAL则需要@所有人并发送到紧急响应群。为避免告警风暴(短时间内大量重复告警),需要实现告警聚合:相同内容的告警在时间窗口内只发送一次,或合并为一条聚合消息。

import time from collections import defaultdict class AlertAggregator: """告警聚合器:避免告警风暴""" def __init__(self, window_seconds=300): self.window_seconds = window_seconds self._recent_alerts: dict = {} # key -> timestamp def should_send(self, alert_key: str) -> bool: """判断是否应该发送告警(相同key在时间窗口内不重复发送)""" now = time.time() last_sent = self._recent_alerts.get(alert_key, 0) if now - last_sent > self.window_seconds: self._recent_alerts[alert_key] = now return True return False def aggregate_key(self, msg: AlertMessage) -> str: """生成告警去重key""" return f"{msg.server_name}:{msg.metric_name}" # 使用告警聚合 aggregator = AlertAggregator(window_seconds=300) def smart_notify(notifier, msg: AlertMessage): """智能通知:带聚合功能的通知发送""" key = aggregator.aggregate_key(msg) if aggregator.should_send(key): notifier.notify_all(msg) print(f"[已发送] {msg.title}") else: print(f"[已聚合] {msg.title} (5分钟内已发送过相同告警)") # 持续检查中调用smart_notify代替直接notifier.notify_all

七、定时报告推送

定时报告推送是消息通知自动化的另一个重要场景。系统按照预设的日程(每日、每周、每月)自动生成数据报告,通过消息机器人推送到管理群,让决策者无需登录系统即可掌握关键业务指标。结合定时任务调度器(如APScheduler、Crontab、Celery Beat),可以构建完整的自动化报告体系。

使用APScheduler定时推送

APScheduler是Python中最强大的定时任务库,支持cron表达式、固定间隔、一次性任务等多种调度方式。以下示例展示了如何每天上午9点自动推送销售日报。

from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime, timedelta def generate_sales_report(): """生成销售日报数据(模拟)""" today = datetime.now().strftime("%Y-%m-%d") yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # 模拟从数据库或API获取数据 report_data = { "date": yesterday, "total_orders": 1523, "total_revenue": 458900.00, "avg_order_value": 301.38, "new_users": 287, "top_product": "Python自动化课程", "top_category": "编程教育", "completion_rate": "94.5%" } return report_data def push_daily_sales_report(notifier: NotifierDispatcher): """生成并推送销售日报""" data = generate_sales_report() # 构建Markdown报告 md_report = f"""# 销售日报 **日期:** {data['date']} ## 核心数据 | 指标 | 数值 | |------|------| | 订单总数 | {data['total_orders']} | | 总销售额 | ¥{data['total_revenue']:,.2f} | | 平均客单价 | ¥{data['avg_order_value']:.2f} | | 新增用户 | {data['new_users']} | ## 热销商品 - 商品:{data['top_product']} - 类目:{data['top_category']} - 完成率:{data['completion_rate']} > 数据更新时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ msg = AlertMessage( title=f"销售日报 - {data['date']}", content=md_report, level=MessageLevel.INFO, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) return notifier.notify_all(msg) # 配置定时任务 scheduler = BlockingScheduler() # 每天9:00推送销售日报 scheduler.add_job( push_daily_sales_report, trigger=CronTrigger(hour=9, minute=0), args=[dispatcher], id="daily_sales_report", name="每日销售日报推送" ) # 每周一10:00推送周报 scheduler.add_job( push_weekly_report, trigger=CronTrigger(day_of_week="mon", hour=10, minute=0), args=[dispatcher], id="weekly_report" ) # 每月1日10:00推送月报 scheduler.add_job( push_monthly_report, trigger=CronTrigger(day=1, hour=10, minute=0), args=[dispatcher], id="monthly_report" ) # scheduler.start() # 启动定时调度

报告模板定制

使用Jinja2模板引擎可以更好地分离报告逻辑和展示格式。将报告模板定义为独立的模板文件,报告生成函数只负责提供数据,模板负责渲染最终的Markdown或HTML内容,便于维护和定制。

from jinja2 import Template # 定义报告模板 REPORT_TEMPLATE = """ # {{ title }} **日期:** {{ report_date }} **生成时间:** {{ generate_time }} ## 核心指标概览 {% for metric in metrics %} - **{{ metric.name }}:** {{ metric.value }} {{ metric.unit }} {% if metric.change %}({{ metric.change }}){% endif %} {% endfor %} ## 详细数据 | 指标 | 今日 | 昨日 | 环比 | |------|------|------|------| {% for row in table_rows %} | {{ row.name }} | {{ row.today }} | {{ row.yesterday }} | {{ row.change }} | {% endfor %} {% if alerts %} ## 需关注项 {% for alert in alerts %} > {{ alert }} {% endfor %} {% endif %} --- *报告由自动化系统生成* """ def render_report(template_str: str, **kwargs) -> str: """使用Jinja2渲染报告Markdown""" template = Template(template_str) return template.render(**kwargs) # 使用模板生成报告 report_md = render_report( REPORT_TEMPLATE, title="运营日报", report_date="2026-05-05", generate_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), metrics=[ {"name": "日活用户(DAU)", "value": "12,845", "unit": "人", "change": "+5.2%"}, {"name": "总收入", "value": "458,900", "unit": "元", "change": "+8.1%"}, {"name": "转化率", "value": "3.42", "unit": "%", "change": "+0.3%"} ], table_rows=[ {"name": "订单数", "today": "1,523", "yesterday": "1,412", "change": "+7.9%"}, {"name": "新增用户", "today": "287", "yesterday": "265", "change": "+8.3%"}, {"name": "退款率", "today": "1.2%", "yesterday": "1.5%", "change": "-0.3%"} ], alerts=["退款处理队列积压50单,建议增加处理人手"] ) # 推送渲染后的报告 msg = AlertMessage( title="运营日报 - 2026-05-05", content=report_md, level=MessageLevel.INFO ) dispatcher.notify_all(msg)

自动化报告最佳实践:① 报告模板使用Jinja2管理,与业务逻辑分离;② Markdown格式兼顾各平台兼容性;③ 定时任务使用APScheduler管理,支持持久化和故障恢复;④ 报告摘要信息放在消息顶部,详细数据可提供链接跳转到BI系统查看。

八、实战案例

以下三个实战案例涵盖了消息通知自动化的典型场景:服务器资源监控告警、每日销售数据推送和CI/CD构建通知。这些案例可以直接在实际项目中复用和扩展。

案例一:服务器CPU/内存监控告警

本案例综合运用了psutil系统监控、多平台消息通知和告警聚合功能。通过SystemMonitor类采集服务器指标,将告警信息通过企业微信和钉钉同时推送给运维团队。

import socket import psutil import time from datetime import datetime class ServerAlertSystem: """服务器告警系统""" def __init__(self, notifier: NotifierDispatcher, hostname=None): self.notifier = notifier self.hostname = hostname or socket.gethostname() self._last_alert_time = {} def check_and_alert(self): """检查服务器状态并推送告警""" alerts = [] # CPU监控 cpu = psutil.cpu_percent(interval=2) if cpu > 90: alerts.append(AlertMessage( title=f"[严重] {self.hostname} CPU过载", level=MessageLevel.CRITICAL, server_name=self.hostname, metric_name="CPU使用率", metric_value=f"{cpu}%", threshold="90%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"CPU使用率已达 {cpu}%,超过严重阈值90%。\n" f"建议立即登录服务器检查异常进程。" )) elif cpu > 80: alerts.append(AlertMessage( title=f"[警告] {self.hostname} CPU偏高", level=MessageLevel.WARNING, server_name=self.hostname, metric_name="CPU使用率", metric_value=f"{cpu}%", threshold="80%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"CPU使用率为 {cpu}%,超过警告阈值80%。" )) # 内存监控 mem = psutil.virtual_memory() if mem.percent > 90: alerts.append(AlertMessage( title=f"[严重] {self.hostname} 内存不足", level=MessageLevel.CRITICAL, server_name=self.hostname, metric_name="内存使用率", metric_value=f"{mem.percent}%", threshold="90%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"内存使用率 {mem.percent}%,已用 {mem.used//1024**3}GB" f" / 总计 {mem.total//1024**3}GB" )) # 磁盘监控 for partition in psutil.disk_partitions(): usage = psutil.disk_usage(partition.mountpoint) if usage.percent > 90: alerts.append(AlertMessage( title=f"[严重] {self.hostname} 磁盘空间不足", level=MessageLevel.CRITICAL, server_name=self.hostname, metric_name=f"磁盘({partition.mountpoint})", metric_value=f"{usage.percent}%", threshold="90%", timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), content=f"分区 {partition.mountpoint} 使用率 {usage.percent}%" )) # 发送告警 for alert in alerts: self.notifier.notify_all(alert) return alerts # 部署运行 if __name__ == "__main__": dispatcher = NotifierDispatcher() dispatcher.register(WeWorkAdapter("企业微信Webhook URL")) dispatcher.register(DingTalkAdapter("钉钉Webhook URL", "加签密钥")) monitor = ServerAlertSystem(dispatcher) while True: alerts = monitor.check_and_alert() if alerts: print(f"[{datetime.now()}] 触发了 {len(alerts)} 条告警") time.sleep(60) # 每分钟检查一次

案例二:每日销售数据推送

每天定时从数据库获取销售数据,生成格式化的报表后通过企业微信Markdown和钉钉同时推送到管理层群。下面示例使用SQLite作为数据源,实际项目中可以替换为MySQL或PostgreSQL。

import sqlite3 from datetime import datetime, timedelta def fetch_daily_sales(db_path: str) -> dict: """从数据库查询昨日销售数据""" conn = sqlite3.connect(db_path) cursor = conn.cursor() yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") # 模拟查询(实际项目中替换为真实的SQL查询) cursor.execute(""" SELECT COUNT(*), SUM(amount), AVG(amount) FROM orders WHERE DATE(created_at) = ? """, (yesterday,)) row = cursor.fetchone() cursor.execute(""" SELECT COUNT(DISTINCT user_id) FROM orders WHERE DATE(created_at) = ? """, (yesterday,)) active_users = cursor.fetchone()[0] conn.close() return { "date": yesterday, "total_orders": row[0] or 0, "total_revenue": float(row[1] or 0), "avg_order_value": round(float(row[2] or 0), 2), "active_users": active_users or 0 } def push_sales_report_to_all(notifier: NotifierDispatcher, db_path: str): """从数据库获取数据并推送到所有平台""" data = fetch_daily_sales(db_path) md = f"""# 📊 销售日报 - {data['date']} ## 关键指标 - **订单总数:** {data['total_orders']} 单 - **总销售额:** ¥{data['total_revenue']:,.2f} - **平均客单价:** ¥{data['avg_order_value']:.2f} - **下单用户数:** {data['active_users']} 人 > 数据来源:销售系统自动统计 > 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} """ msg = AlertMessage( title=f"销售日报 - {data['date']}", content=md, level=MessageLevel.INFO, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) results = notifier.notify_all(msg) return results # 配合APScheduler定时执行 # scheduler.add_job( # push_sales_report_to_all, # CronTrigger(hour=9, minute=30), # args=[dispatcher, "/data/sales.db"] # )

案例三:CI/CD构建通知

在CI/CD流水线(如GitHub Actions、GitLab CI、Jenkins)中集成消息通知,当构建开始、成功或失败时向团队频道发送实时通知。以GitHub Actions为例,在workflow文件中增加一个步骤调用Python通知脚本。

# .github/workflows/deploy.yml 中的通知步骤 # ... # - name: Send deployment notification # run: python scripts/notify_deploy.py # env: # WEWORK_WEBHOOK: ${{ secrets.WEWORK_WEBHOOK }} # DINGTALK_WEBHOOK: ${{ secrets.DINGTALK_WEBHOOK }} # DINGTALK_SECRET: ${{ secrets.DINGTALK_SECRET }} # DEPLOY_STATUS: ${{ job.status }} # COMMIT_SHA: ${{ github.sha }} # BRANCH: ${{ github.ref_name }} # scripts/notify_deploy.py import os import sys from datetime import datetime def send_ci_notification(): """发送CI/CD构建通知""" status = os.environ.get("DEPLOY_STATUS", "unknown") branch = os.environ.get("BRANCH", "unknown") commit_sha = os.environ.get("COMMIT_SHA", "unknown")[:8] # 定时器图标 icon_map = { "success": "✅", "failure": "❌", "cancelled": "⏹️" } md = f"""# {icon_map.get(status, '❓')} CI/CD 部署通知 **项目:** web-app-backend **分支:** {branch} **提交:** `{commit_sha}` **状态:** {'成功' if status == 'success' else '失败' if status == 'failure' else '已取消'} **时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} > 查看详情:https://github.com/org/repo/actions """ dispatcher = NotifierDispatcher() wx_webhook = os.environ.get("WEWORK_WEBHOOK") ding_webhook = os.environ.get("DINGTALK_WEBHOOK") ding_secret = os.environ.get("DINGTALK_SECRET") if wx_webhook: dispatcher.register(WeWorkAdapter(wx_webhook)) if ding_webhook and ding_secret: dispatcher.register(DingTalkAdapter(ding_webhook, ding_secret)) msg = AlertMessage( title=f"CI/CD {status} - {branch}", content=md, level=MessageLevel.INFO if status == "success" else MessageLevel.ERROR, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) dispatcher.notify_all(msg) if __name__ == "__main__": send_ci_notification()

实战总结:消息通知自动化的核心价值在于"让系统自动找人"而非"人找系统"。通过统一的消息封装和适配器模式,一套代码即可覆盖所有主流协作平台。建议从简单的单平台文本通知开始,逐步扩展到多平台广播、模板化报告和智能告警聚合,构建完整的自动化通知体系。