批量邮件发送系统与模板引擎

Python 办公自动化专题 · 构建企业级批量邮件发送平台

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, 批量邮件, 邮件模板, Jinja2, 邮件队列, 个性化发送, Python自动化

一、批量邮件系统架构

批量邮件发送系统是企业级自动化办公的重要组成部分,设计一个健壮的邮件发送平台需要从系统整体架构出发,合理划分模块边界。一个典型的批量邮件系统通常包含六大核心模块:模板引擎模块负责邮件内容的动态渲染;收件人管理模块负责数据的导入、清洗和分组;发送队列模块负责任务的调度、优先级管理和重试;并发控制模块负责多线程发送和频率限制;反馈处理模块负责退信检测、阅读追踪和统计;日志监控模块负责全流程的记录和可视化。

数据流设计方面,典型的流程是:业务系统触发发送请求,将任务提交到消息队列,工作节点从队列拉取任务,经过模板渲染生成个性化内容,通过SMTP连接池发送邮件,发送结果异步写回数据库,反馈回路持续更新送达状态。技术选型上,Python生态有丰富的邮件处理库,smtplib提供底层SMTP支持,celery作为分布式任务队列,Jinja2用于模板渲染,Redis用于缓存和队列,SQLite/PostgreSQL用于持久化存储。

系统设计中还有一个容易被忽视的要点是模块间的解耦。模板渲染、发送执行、结果处理这三个阶段应该完全独立,通过消息队列串联。这样即使发送模块暂时不可用,已经渲染好的邮件也不会丢失。此外,数据库设计需要支持收件人表、模板表、发送任务表、发送日志表、退信记录表等多个核心表结构,它们之间通过任务ID进行关联,形成一个完整的追踪链路。

基础发送模块

import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.header import Header from email import encoders from typing import List, Optional import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SmtpConfig: """SMTP服务器配置""" def __init__(self, host: str, port: int, user: str, password: str, use_tls: bool = True): self.host = host self.port = port self.user = user self.password = password self.use_tls = use_tls class EmailAttachment: """邮件附件""" def __init__(self, filename: str, data: bytes, mime_type: str = 'application/octet-stream'): self.filename = filename self.data = data self.mime_type = mime_type

连接池管理

from queue import Queue, Empty from threading import Lock import smtplib class SmtpConnectionPool: """SMTP连接池,复用TCP连接减少握手开销""" def __init__(self, config: SmtpConfig, pool_size: int = 5): self.config = config self.pool_size = pool_size self._pool = Queue(maxsize=pool_size) self._lock = Lock() self._created = 0 self._init_pool() def _create_connection(self) -> smtplib.SMTP: conn = smtplib.SMTP(self.config.host, self.config.port, timeout=30) if self.config.use_tls: conn.starttls() conn.login(self.config.user, self.config.password) return conn def _init_pool(self): for _ in range(self.pool_size): conn = self._create_connection() self._pool.put(conn) self._created += 1 def acquire(self) -> smtplib.SMTP: try: conn = self._pool.get_nowait() except Empty: conn = self._create_connection() self._created += 1 return conn def release(self, conn: smtplib.SMTP): if self._pool.qsize() < self.pool_size: self._pool.put(conn) else: conn.quit() def close_all(self): while not self._pool.empty(): conn = self._pool.get() try: conn.quit() except Exception: pass

架构分层示意图

# 系统架构的模块划分 SYSTEM_ARCHITECTURE = { "接入层": [ "HTTP API接口 (FastAPI)", "CLI命令行工具", "定时任务触发器", "Webhook回调" ], "调度层": [ "任务队列 (Redis/Celery)", "优先级调度", "重试队列", "死信队列" ], "处理层": [ "模板引擎 (Jinja2/Mako)", "个性化渲染器", "附件生成器", "SMTP发送器" ], "存储层": [ "收件人数据库", "模板存储", "发送日志", "统计看板" ], "监控层": [ "退信处理", "打开追踪", "点击统计", "异常报警" ] } # 每个层通过消息队列进行异步通信 # 接入层 -> 调度层 -> 处理层 -> 存储层 -> 监控层

二、HTML邮件模板

HTML邮件模板是批量邮件系统的核心组成部分。与传统网页不同,邮件HTML受到各大邮件客户端(Outlook、Gmail、网易等)的严格限制——不支持JavaScript、CSS兼容性极差、表格布局仍是主流。因此邮件模板的编写是一门独特的技术。使用Jinja2模板引擎可以很好地解决邮件内容的动态生成问题,它提供了变量替换、条件判断、循环遍历、过滤器等功能,让我们能够在纯文本模板中嵌入动态逻辑。

Jinja2的模板继承机制对邮件模板特别有用。我们可以定义一个基础邮件模板base_email.html,包含统一的页头Logo、页脚版权声明和退订链接,然后让具体业务模板继承这个基础模板,只覆写中间的内容区域。这样既保证了品牌一致性,又避免了重复劳动。模板中常用的过滤器包括date格式化日期、default设置默认值、escape转义HTML特殊字符等。

邮件模板设计还需要考虑客户端兼容性:使用表格布局而非div布局,CSS使用内联样式而非class,图片使用绝对路径而非相对路径,宽高使用像素而非百分比,字体使用通用字体栈(如Arial, Helvetica, sans-serif)。此外,模板中应该预留退订链接和公司信息的占位变量,这些会在发送时由系统自动填充。

Jinja2邮件模板定义

""" templates/monthly_newsletter.html Jinja2邮件模板,支持变量替换和条件渲染 """ from jinja2 import Environment, FileSystemLoader EMAIL_TEMPLATE_STR = """\
{% block content %}{% endblock %}

{{ company_name }}

{{ recipient_name }} 您好,

{% if is_birthday_month %}

🎂 祝您生日快乐!本月为您准备了专属优惠。

{% endif %}

您收到此邮件是因为您注册了{{ company_name }}服务

立即退订  |  隐私政策

"""

Jinja2环境配置与渲染

from jinja2 import Environment, BaseLoader, select_autoescape import os class EmailTemplateEngine: """邮件模板引擎,封装Jinja2渲染逻辑""" def __init__(self, template_dir: str = "templates"): self.env = Environment( loader=FileSystemLoader(template_dir), autoescape=select_autoescape(['html', 'xml']), trim_blocks=True, lstrip_blocks=True ) # 注册自定义过滤器 self.env.filters['currency'] = self._format_currency self.env.filters['truncate'] = self._truncate_text @staticmethod def _format_currency(value: float) -> str: """金额格式化过滤器""" return f"¥{value:,.2f}" @staticmethod def _truncate_text(text: str, length: int = 50) -> str: """截断文本并追加省略号""" return text[:length] + "..." if len(text) > length else text def render(self, template_name: str, context: dict) -> str: """渲染模板并返回HTML字符串""" template = self.env.get_template(template_name) return template.render(**context) def render_from_string(self, template_str: str, context: dict) -> str: """直接从字符串渲染模板""" template = self.env.from_string(template_str) return template.render(**context) # 使用示例 engine = EmailTemplateEngine() context = { "company_name": "佼艾科技", "recipient_name": "张三", "is_birthday_month": True, "unsubscribe_url": "https://example.com/unsubscribe?uid=123", "privacy_url": "https://example.com/privacy", } html_body = engine.render("monthly_newsletter.html", context) print(f"渲染完成,HTML长度: {len(html_body)} 字符")

条件渲染与循环遍历

# Jinja2模板中的高级语法示例 ADVANCED_TEMPLATE = """\ {% if user.tier == 'vip' %}

尊敬的VIP用户 {{ user.name }}

您享受专属{{ discounts.vip_discount }}%%优惠

{% elif user.tier == 'normal' %}

{{ user.name }},欢迎查看本月推荐

{% else %}

您好,{{ user.name }},感谢您的关注

{% endif %}

{{ category_name }} - 本月推荐

{% for product in products %} {% if loop.index is divisibleby 3 and not loop.last %} {% endif %} {% endfor %}
{{ product.name }}

{{ product.name }}

{{ product.price | currency }}

{% if product.original_price %}

原价: {{ product.original_price | currency }}

{% endif %}
{{ newsletter_body | safe }}
""" # 渲染示例数据 sample_products = [ {"name": "智能手表Pro", "price": 1299.00, "original_price": 1599.00, "image_url": "https://example.com/img/watch.jpg"}, {"name": "蓝牙耳机Air", "price": 399.00, "original_price": None, "image_url": "https://example.com/img/earphone.jpg"}, {"name": "无线充电板", "price": 199.00, "original_price": 249.00, "image_url": "https://example.com/img/charger.jpg"}, ] context = { "user": {"name": "李四", "tier": "vip"}, "discounts": {"vip_discount": 20}, "category_name": "数码配件", "products": sample_products, "newsletter_body": "

本月新品尝鲜,全场包邮

", } engine = EmailTemplateEngine() rendered = engine.render_from_string(ADVANCED_TEMPLATE, context) print(rendered[:200] + "...") # 打印前200字符预览

三、个性化发送

个性化发送是批量邮件系统的核心价值所在。相比群发同一封邮件,个性化邮件能显著提升打开率和转化率。实现机制是在发送前为每个收件人渲染一份独一无二的邮件内容,变量替换涉及收件人姓名、公司名称、专属优惠码、定制推荐内容等。数据源通常来自Excel表格或CRM系统的导出数据,每一行对应一个收件人及其属性字段。

实现个性化发送的关键在于批量渲染和批量发送的流水线处理。首先从Excel文件中读取所有收件人数据,然后对每一条数据进行模板渲染,生成HTML内容,最后通过SMTP服务器逐个发送。当收件人数量巨大(数万至数百万)时,需要引入分页处理和流式渲染技术,避免内存溢出。此外,动态附件也是个性化发送的重要场景,例如为每个客户生成专属的PDF报价单或数据分析报告,一并附加在邮件中发送。

个性化内容的设计需要遵循"收件人视角"原则:邮件读起来应该像是一对一的沟通,而非群发。这要求模板设计者在落笔时就考虑变量的插入点,让整封邮件的语气自然流畅。例如使用"{{ recipient_name }},您最近关注的{{ product_name }}有优惠活动"替代生硬的"您好,本店产品有优惠"。同时还需要考虑变量缺失时的默认值处理,以及中英文混合场景下的模板适配。

Excel收件人数据读取

import pandas as pd from dataclasses import dataclass from typing import List, Optional, Generator @dataclass class Recipient: """收件人数据结构""" email: str name: str company: Optional[str] = None department: Optional[str] = None tier: str = "normal" variables: dict = None # 自定义变量 def __post_init__(self): if self.variables is None: self.variables = {} class RecipientLoader: """从多种数据源加载收件人""" @staticmethod def from_excel(filepath: str, sheet_name: str = 0, chunk_size: int = 1000) -> Generator[List[Recipient], None, None]: """ 从Excel文件分块读取收件人 使用chunk_size控制每次加载的数量,避免内存溢出 """ reader = pd.read_excel(filepath, sheet_name=sheet_name, chunksize=chunk_size) for chunk in reader: recipients = [] for _, row in chunk.iterrows(): row_dict = row.to_dict() recipient = Recipient( email=row_dict.pop('email'), name=row_dict.pop('name', ''), company=row_dict.pop('company', None), department=row_dict.pop('department', None), tier=row_dict.pop('tier', 'normal'), variables=row_dict # 剩余列作为自定义变量 ) recipients.append(recipient) yield recipients @staticmethod def from_csv(filepath: str, encoding: str = 'utf-8', chunk_size: int = 1000) -> Generator[List[Recipient], None, None]: """从CSV文件分块读取收件人""" reader = pd.read_csv(filepath, encoding=encoding, chunksize=chunk_size) for chunk in reader: recipients = [] for _, row in chunk.iterrows(): row_dict = row.to_dict() recipient = Recipient( email=row_dict.pop('email'), name=row_dict.pop('name', ''), company=row_dict.pop('company', None), variables=row_dict ) recipients.append(recipient) yield recipients # 使用示例 loader = RecipientLoader() for batch in loader.from_excel("recipients.xlsx", chunk_size=500): print(f"加载了 {len(batch)} 个收件人") for r in batch[:3]: # 只打印前3个预览 print(f" -> {r.email} | {r.name} | {r.company}")

批量个性化渲染引擎

from concurrent.futures import ThreadPoolExecutor, as_completed import time class PersonalizationEngine: """个性化渲染引擎:对每个收件人执行模板渲染""" def __init__(self, template_engine: EmailTemplateEngine): self.template_engine = template_engine def render_single(self, recipient: Recipient, template_name: str) -> tuple: """为单个收件人渲染邮件,返回 (Recipient, html_content)""" # 构建渲染上下文 context = { "recipient_name": recipient.name, "recipient_email": recipient.email, "company_name": recipient.company or "我们的公司", "department": recipient.department or "客户服务部", "tier": recipient.tier, "unsubscribe_url": f"https://example.com/unsub?e={recipient.email}", # 合并自定义变量 **recipient.variables, # 以下是系统变量 "current_year": time.strftime("%Y"), "send_date": time.strftime("%Y-%m-%d"), } html = self.template_engine.render(template_name, context) return recipient, html def render_batch(self, recipients: List[Recipient], template_name: str, max_workers: int = 4) -> Generator[tuple, None, None]: """批量渲染,使用线程池提高效率""" with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(self.render_single, r, template_name): r for r in recipients } for future in as_completed(futures): yield future.result() # 使用示例 engine = EmailTemplateEngine("templates") personalizer = PersonalizationEngine(engine) recipients = [ Recipient(email="alice@example.com", name="Alice", company="ABC Corp", variables={"coupon_code": "ALICE20", "product_name": "智能手表"}), Recipient(email="bob@example.com", name="Bob", company="XYZ Inc", variables={"coupon_code": "BOB15", "product_name": "蓝牙耳机"}), ] for recipient, html in personalizer.render_batch(recipients, "promotion.html"): print(f"[{recipient.email}] 渲染完成 -> {len(html)} 字符") # 在此处将html传递给发送模块

动态附件生成

from io import BytesIO import pdfkit # HTML转PDF class DynamicAttachmentGenerator: """动态附件生成器:为每个收件人生成个性化附件""" @staticmethod def generate_pdf_report(recipient: Recipient, report_data: dict) -> EmailAttachment: """生成个性化PDF报告附件""" html_content = f"""\

月度分析报告

客户:{recipient.name}

公司:{recipient.company or 'N/A'}

报告周期:{report_data['period']}

关键指标

指标本月上月环比
访问量{report_data['visits']}{report_data['prev_visits']} {report_data['visits_growth']:.1f}%%
转化率{report_data['conversion']:.1f}%%{report_data['prev_conversion']:.1f}%% {report_data['conversion_growth']:.1f}%%
成交额¥{report_data['revenue']:,.2f}¥{report_data['prev_revenue']:,.2f} {report_data['revenue_growth']:.1f}%%

总成交额: ¥{report_data['revenue']:,.2f}

报告生成时间: {report_data['generated_at']}

""" pdf_bytes = pdfkit.from_string(html_content, False) filename = f"月度报告_{recipient.name}_{report_data['period']}.pdf" return EmailAttachment(filename, pdf_bytes, 'application/pdf') @staticmethod def generate_invoice(recipient: Recipient, invoice_data: dict) -> EmailAttachment: """生成个性化发票/账单附件""" lines = [] for item in invoice_data['items']: lines.append(f"{item['name']},{item['quantity']},{item['unit_price']},{item['total']}") csv_content = f"商品名称,数量,单价,小计\n" + "\n".join(lines) + f"\n\n总计,{invoice_data['total']}" filename = f"账单_{recipient.name}_{invoice_data['invoice_no']}.csv" return EmailAttachment(filename, csv_content.encode('utf-8'), 'text/csv; charset=utf-8')

四、发送队列

发送队列是批量邮件系统的中枢神经,它负责协调模板渲染、邮件发送和结果回写之间的异步协作。设计一个健壮的发送队列需要考虑几个关键维度:任务持久化确保服务器重启后任务不丢失;优先级机制让重要邮件(如密码重置、交易通知)插队先行;重试机制临时失败的邮件自动重新排队;死信处理将反复失败的邮件转移到隔离区由人工介入;进度跟踪让运营人员实时了解发送进展。

Redis是实现轻量级任务队列的理想选择,它利用BRPOP/LPUSH命令实现可靠队列,利用Sorted Set实现延迟队列(用于重试),利用List实现优先级分段队列。对于更大规模的企业场景,Celery结合RabbitMQ/Redis提供了更完善的任务管理、worker扩缩容和任务编排能力。Celery的task路由功能可以将不同优先级的任务发送到不同的队列,高优先级任务使用独立的worker池处理。

队列的可靠性设计至关重要。每个任务在队列中应该包含完整的元信息(任务ID、收件人、模板名称、渲染上下文、创建时间、重试次数),这样即使发送过程中断,重启后也能从持久化存储中恢复所有待处理任务。实践中推荐采用"两阶段提交"的思路:先写数据库(状态为pending),再入队列;worker消费成功后更新数据库状态为sent;这样数据库始终是可信的真相来源。

Redis任务队列实现

import json import redis import uuid from datetime import datetime from typing import Optional, Dict, Any class EmailTask: """邮件任务数据结构""" def __init__(self, task_id: str, recipient_email: str, template_name: str, context: dict, priority: int = 0, attachments: list = None, max_retries: int = 3): self.task_id = task_id self.recipient_email = recipient_email self.template_name = template_name self.context = context self.priority = priority # 0=普通, 1=高, 2=紧急 self.attachments = attachments or [] self.max_retries = max_retries self.retry_count = 0 self.created_at = datetime.now().isoformat() def to_dict(self) -> dict: return { "task_id": self.task_id, "recipient_email": self.recipient_email, "template_name": self.template_name, "context": self.context, "priority": self.priority, "attachments": self.attachments, "max_retries": self.max_retries, "retry_count": self.retry_count, "created_at": self.created_at, } class RedisTaskQueue: """基于Redis的邮件任务队列""" QUEUE_PREFIX = "email:queue:" RETRY_PREFIX = "email:retry:" DEAD_LETTER = "email:dead_letter" def __init__(self, redis_url: str = "redis://localhost:6379/0"): self.client = redis.from_url(redis_url, decode_responses=True) def _queue_key(self, priority: int) -> str: """不同优先级的队列使用不同的key""" return f"{self.QUEUE_PREFIX}p{priority}" def enqueue(self, task: EmailTask): """将任务加入对应优先级的队列""" key = self._queue_key(task.priority) self.client.lpush(key, json.dumps(task.to_dict())) # 同时持久化到数据库,保证不丢失 self.client.hset("email:tasks:meta", task.task_id, json.dumps({ "status": "queued", "email": task.recipient_email, "priority": task.priority, })) def dequeue(self, timeout: int = 5) -> Optional[EmailTask]: """从最高优先级队列中取出任务""" keys = [self._queue_key(i) for i in [2, 1, 0]] # 优先级顺序 result = self.client.brpop(keys, timeout=timeout) if result is None: return None _, task_json = result task_dict = json.loads(task_json) task = EmailTask(**task_dict) return task def requeue_for_retry(self, task: EmailTask, delay_seconds: int = 60): """将失败任务放入延迟重试队列""" task.retry_count += 1 if task.retry_count >= task.max_retries: self._send_to_dead_letter(task, "超过最大重试次数") return False # 使用Sorted Set实现延迟队列 retry_at = datetime.now().timestamp() + delay_seconds * (2 ** task.retry_count) self.client.zadd(f"{self.RETRY_PREFIX}{task.priority}", {json.dumps(task.to_dict()): retry_at}) return True def _send_to_dead_letter(self, task: EmailTask, reason: str): """将无法处理的死信任务归档""" dead_info = task.to_dict() dead_info["dead_reason"] = reason dead_info["dead_at"] = datetime.now().isoformat() self.client.lpush(self.DEAD_LETTER, json.dumps(dead_info)) def get_queue_sizes(self) -> dict: """获取各队列长度统计""" sizes = {} for p in [0, 1, 2]: sizes[f"priority_{p}"] = self.client.llen(self._queue_key(p)) sizes["dead_letter"] = self.client.llen(self.DEAD_LETTER) return sizes # 使用示例 queue = RedisTaskQueue() task = EmailTask( task_id=str(uuid.uuid4()), recipient_email="user@example.com", template_name="welcome.html", context={"name": "新用户", "company": "佼艾科技"}, priority=1, # 高优先级 ) queue.enqueue(task) print(f"任务 {task.task_id} 已入队列(优先级: {task.priority})")

Celery分布式任务配置

""" celery_app.py Celery分布式任务队列配置 """ from celery import Celery from celery.signals import task_failure, task_success import logging logger = logging.getLogger(__name__) # 创建Celery应用 app = Celery( 'email_tasks', broker='redis://localhost:6379/1', # 消息代理 backend='redis://localhost:6379/2', # 结果后端 include=['email_worker'] # 注册任务模块 ) # Celery配置 app.conf.update( # 任务路由:按优先级分配到不同队列 task_routes={ 'email_worker.send_urgent_email': {'queue': 'urgent'}, 'email_worker.send_normal_email': {'queue': 'normal'}, 'email_worker.send_batch_email': {'queue': 'batch'}, }, # 任务序列化 task_serializer='json', result_serializer='json', accept_content=['json'], # 时区 timezone='Asia/Shanghai', enable_utc=True, # 任务结果过期时间(秒) result_expires=3600, # Worker配置 worker_concurrency=4, # 并发Worker数 worker_prefetch_multiplier=1, # 每次只预取一个任务 # 任务重试 task_soft_time_limit=120, # 软超时(秒) task_time_limit=180, # 硬超时(秒) task_acks_late=True, # 任务完成后才确认 task_reject_on_worker_lost=True, # Worker丢失时重新入队 ) @task_success.connect def handle_task_success(sender=None, **kwargs): """任务成功回调""" logger.info(f"任务执行成功: {sender.request.id}") @task_failure.connect def handle_task_failure(sender=None, **kwargs): """任务失败回调""" logger.error(f"任务执行失败: {sender.request.id}") """ # 启动Worker命令: # celery -A celery_app worker -l info -Q urgent,normal,batch # 启动定时任务: # celery -A celery_app beat -l info """

任务编排与进度跟踪

""" email_worker.py 邮件发送Worker —— 处理各种队列中的邮件发送任务 """ from celery_app import app from smtp_client import SmtpConnectionPool, SmtpConfig from template_engine import EmailTemplateEngine import logging logger = logging.getLogger(__name__) @app.task(bind=True, max_retries=3, default_retry_delay=60) def send_urgent_email(self, to_email: str, subject: str, template_name: str, context: dict): """紧急邮件任务:验证码、密码重置等""" try: engine = EmailTemplateEngine("templates") html_body = engine.render(template_name, context) config = SmtpConfig("smtp.example.com", 587, "user@example.com", "password") pool = SmtpConnectionPool(config) conn = pool.acquire() msg = MIMEText(html_body, 'html', 'utf-8') msg['Subject'] = subject msg['From'] = "noreply@example.com" msg['To'] = to_email conn.send_message(msg) pool.release(conn) logger.info(f"紧急邮件发送成功: {to_email}") return {"status": "success", "email": to_email} except Exception as exc: logger.warning(f"紧急邮件发送失败: {to_email}, 重试中...") raise self.retry(exc=exc) @app.task(bind=True, max_retries=5, default_retry_delay=120, rate_limit='30/m') # 每分钟最多30封 def send_batch_email(self, email_batch: list, template_name: str, base_context: dict): """批量邮件任务:营销邮件、通知等""" successes = [] failures = [] engine = EmailTemplateEngine("templates") config = SmtpConfig("smtp.example.com", 587, "user@example.com", "password") pool = SmtpConnectionPool(config, pool_size=3) for item in email_batch: try: context = {**base_context, **item.get('variables', {})} html_body = engine.render(template_name, context) conn = pool.acquire() msg = MIMEText(html_body, 'html', 'utf-8') msg['Subject'] = base_context.get('subject', '批量邮件') msg['From'] = "marketing@example.com" msg['To'] = item['email'] conn.send_message(msg) pool.release(conn) successes.append(item['email']) self.update_state( state='PROGRESS', meta={'current': len(successes), 'total': len(email_batch), 'status': f'已发送 {len(successes)}/{len(email_batch)}'} ) except Exception as e: failures.append({"email": item['email'], "error": str(e)}) pool.close_all() return { "total": len(email_batch), "success": len(successes), "failure": len(failures), "failed_list": failures, } @app.task def track_campaign_progress(campaign_id: int, total_tasks: int): """追踪整个营销活动的发送进度""" logger.info(f"活动 #{campaign_id} 启动,共 {total_tasks} 个任务") # 定期检查已完成的任务数量 # 所有任务完成后发送通知

五、并发与限流

大规模邮件发送面临的的技术挑战主要来自三个方面:SMTP服务器的连接限制、邮件服务商的发送频率限制、以及自身服务器的资源瓶颈。并发控制就是在这些限制条件内最大化发送吞吐量。Python中实现并发发送的主要手段是多线程(I/O密集型任务适合)、多进程(CPU密集型渲染任务)和异步IO(aiohttp + asyncio)。设计合理的连接池可以在多个线程之间复用TCP连接,显著减少握手开销。

限流策略是避免被邮件服务商封禁的关键。主要邮件服务商(SendGrid、AWS SES、阿里云邮件推送)都有明确的每秒发送量(TPS)和每日发送总量限制。超出限制会导致临时封禁甚至永久封号。常见的限流算法包括令牌桶(允许突发)、漏桶(平滑速率)和滑动窗口(精确计数)。实践中最好组合使用:秒级用令牌桶允许小幅度突发,分钟级用滑动窗口确保平均值不超限,小时级用计数器做硬限制。

发送间隔自适应是一个高级话题。系统应该根据退信率和SMTP响应码动态调整发送速度。当检测到大量550(用户不存在)或452(系统资源不足)响应时,自动降低发送速率;当一切正常时逐步提速。这种自适应控制类似于TCP的拥塞控制算法,能够在保障送达率的前提下最大化吞吐量。此外,邮件发送还应该避开高峰期,例如工作日上午10-11点是最佳发送窗口,深夜发送容易被判定为垃圾邮件。

多线程并发发送

from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Event, Semaphore import time import random from typing import List, Callable class RateLimiter: """基于令牌桶的速率限制器""" def __init__(self, max_rate: float, time_unit: float = 1.0): self.max_rate = max_rate # 每秒最大请求数 self.time_unit = time_unit # 时间单位(秒) self.tokens = max_rate # 当前令牌数 self.last_refill = time.time() self._lock = Semaphore(1) def acquire(self) -> bool: """获取一个令牌,阻塞直到获取成功""" while True: with self._lock: self._refill() if self.tokens >= 1: self.tokens -= 1 return True time.sleep(0.05) def _refill(self): now = time.time() elapsed = now - self.last_refill self.tokens = min( self.max_rate, self.tokens + elapsed * (self.max_rate / self.time_unit) ) self.last_refill = now class ConcurrentEmailSender: """并发邮件发送器""" def __init__(self, smtp_config: SmtpConfig, max_workers: int = 5, rate_per_second: float = 10.0): self.smtp_config = smtp_config self.max_workers = max_workers self.rate_limiter = RateLimiter(rate_per_second) self.pool = SmtpConnectionPool(smtp_config, pool_size=max_workers) self._stop_event = Event() self.stats = {"sent": 0, "failed": 0, "total": 0} def send_one(self, recipient: Recipient, html_body: str, subject: str, sender: str) -> bool: """发送单封邮件,受速率限制""" self.rate_limiter.acquire() # 获取发送许可 conn = None try: conn = self.pool.acquire() msg = MIMEText(html_body, 'html', 'utf-8') msg['Subject'] = subject msg['From'] = sender msg['To'] = recipient.email conn.send_message(msg) return True except smtplib.SMTPServerDisconnected: # 连接断开,重新创建 time.sleep(1) return False except smtplib.SMTPResponseException as e: if e.smtp_code == 452: # 服务器忙,需要减速 time.sleep(5) return False elif e.smtp_code == 550: # 用户不存在 logging.warning(f"无效地址: {recipient.email}") return False # 不重试 return False finally: if conn: try: self.pool.release(conn) except Exception: pass def send_batch(self, deliveries: List[tuple]) -> dict: """ 批量并发发送 deliveries: [(Recipient, html_body, subject, sender), ...] """ self.stats["total"] = len(deliveries) with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = { executor.submit(self.send_one, *d): d[0].email for d in deliveries } for future in as_completed(futures): email = futures[future] try: success = future.result() if success: self.stats["sent"] += 1 else: self.stats["failed"] += 1 except Exception as e: self.stats["failed"] += 1 logging.error(f"发送异常 {email}: {e}") return self.stats # 使用示例 config = SmtpConfig("smtp.example.com", 587, "user@pass") sender = ConcurrentEmailSender(config, max_workers=10, rate_per_second=20) stats = sender.send_batch(deliveries) print(f"发送完成: {stats['sent']} 成功, {stats['failed']} 失败")

自适应限流器

import statistics from collections import deque from enum import Enum class BounceLevel(Enum): """退信严重等级""" NORMAL = 0 # 正常 WARNING = 1 # 轻度异常 CRITICAL = 2 # 严重异常 class AdaptiveRateController: """ 自适应速率控制器 根据发送成功率动态调整发送速率 类似于TCP拥塞控制算法 """ def __init__(self, initial_rate: float = 10, min_rate: float = 1, max_rate: float = 100): self.current_rate = initial_rate self.min_rate = min_rate self.max_rate = max_rate self.window_size = 100 # 滑动窗口大小 self.results = deque(maxlen=self.window_size) # 发送结果滑动窗口 self.success_times = deque(maxlen=50) # 最近成功耗时 def record_result(self, success: bool, elapsed: float): """记录一次发送结果""" self.results.append(1 if success else 0) if success: self.success_times.append(elapsed) self._adjust_rate() def _adjust_rate(self): """根据最近结果调整发送速率""" if len(self.results) < 20: return # 样本不足,不调整 success_rate = sum(self.results) / len(self.results) if success_rate < 0.90: # 成功率低于90%,快速降速(乘性减) self.current_rate = max( self.min_rate, self.current_rate * 0.7 ) self._log_adjustment(f"降速至 {self.current_rate:.1f} 封/秒 " f"(成功率 {success_rate*100:.0f}%%)") elif success_rate > 0.98: # 成功率高于98%,缓慢提速(加性增) self.current_rate = min( self.max_rate, self.current_rate * 1.05 ) self._log_adjustment(f"提速至 {self.current_rate:.1f} 封/秒 " f"(成功率 {success_rate*100:.0f}%%)") def get_bounce_level(self) -> BounceLevel: """判断当前退信等级""" if len(self.results) < 10: return BounceLevel.NORMAL success_rate = sum(self.results) / len(self.results) if success_rate < 0.80: return BounceLevel.CRITICAL elif success_rate < 0.93: return BounceLevel.WARNING return BounceLevel.NORMAL def _log_adjustment(self, msg: str): avg_latency = statistics.mean(self.success_times) if self.success_times else 0 logging.info(f"[自适应限流] {msg} | 平均延迟: {avg_latency*1000:.0f}ms") # 模拟自适应控制 controller = AdaptiveRateController(initial_rate=20) for i in range(200): # 模拟发送结果:前50个全成功,随后逐渐出现失败 success = True if i > 80 and i < 120: success = random.random() > 0.15 # 85%成功率 elif i > 150: success = random.random() > 0.98 # 恢复正常 elapsed = random.uniform(0.1, 0.8) controller.record_result(success, elapsed) time.sleep(0.01) print(f"\n最终发送速率: {controller.current_rate:.1f} 封/秒") print(f"退信等级: {controller.get_bounce_level()}")

时间窗口计数器

from collections import defaultdict from datetime import datetime, timedelta class SlidingWindowCounter: """ 滑动窗口计数器,用于精确控制发送总量 支持每秒、每分钟、每小时、每天的多级限制 """ def __init__(self): # window_sizes: {"name": (duration_seconds, limit)} self.limits = { "per_second": (1, 30), # 每秒最多30封 "per_minute": (60, 500), # 每分钟最多500封 "per_hour": (3600, 15000), # 每小时最多15000封 "per_day": (86400, 200000), # 每天最多200000封 } self.counts = defaultdict(list) # name -> [(timestamp, count)] def check_and_count(self, count: int = 1) -> tuple: """ 检查是否允许发送count封邮件 返回: (allowed: bool, limits: dict) """ now = datetime.now() all_allowed = True current_counts = {} for name, (window_sec, limit) in self.limits.items(): cutoff = now - timedelta(seconds=window_sec) # 清理过期记录 self.counts[name] = [ (ts, c) for ts, c in self.counts[name] if ts > cutoff ] # 计算当前窗口内总量 current_total = sum(c for _, c in self.counts[name]) current_counts[name] = current_total if current_total + count > limit: all_allowed = False if all_allowed: # 记录本次发送 for name, _ in self.limits.items(): self.counts[name].append((now, count)) return all_allowed, current_counts # 使用示例 counter = SlidingWindowCounter() for i in range(600): allowed, limits = counter.check_and_count(1) if not allowed: print(f"第{i+1}封: 限流了! 当前计数: {limits}") break if i < 5 or i % 100 == 0: print(f"第{i+1}封: 允许发送 | {limits}") print(f"\n最终状态: 已发送 {sum(c for _, c in counter.counts['per_second'])} 封")

六、退信与反馈

退信处理是邮件发送系统中容易被低估但至关重要的环节。邮件发送不等于邮件送达,大量邮件会因为收件人地址不存在、邮箱已满、服务器拒收等原因被退回。根据行业统计,正常的退信率在2%-5%之间,如果超过5%就需要检查收件人列表的质量。退信分为硬退信(hard bounce,永久性失败,如地址不存在)和软退信(soft bounce,临时性失败,如邮箱满)。硬退信地址应该立即从列表中移除,软退信则可以延迟重试几次后再做处理。

退信检测主要有两种方式:SMTP响应码检测和退回邮件解析。SMTP响应码是最直接的检测方式,发送时服务器会返回5xx(永久失败)或4xx(临时失败)状态码。退回邮件(bounce email)则需要配置一个退信接收邮箱,邮件服务商会将无法送达的邮件以NDR(Non-Delivery Report)报告的形式退回,系统需要定期解析这个邮箱,提取退信信息并更新数据库中的收件人状态。

打开和点击追踪是衡量邮件营销效果的核心指标。实现机制是在邮件中嵌入一个1x1像素的追踪图片(open tracking),以及将链接替换为带有重定向参数的追踪链接(click tracking)。每个追踪URL包含收件人的唯一标识符和活动ID,当收件人打开邮件或点击链接时,服务器记录相应的事件。这些数据汇聚到统计看板,以邮件客户端、设备类型、地理位置、打开时间分布等多个维度展示,帮助运营人员优化发送策略和内容设计。

退信分类与处理

from enum import Enum from datetime import datetime import re class BounceType(Enum): """退信类型""" HARD_BOUNCE = "hard" # 硬退信:永久失败 SOFT_BOUNCE = "soft" # 软退信:临时失败 COMPLAINT = "complaint" # 投诉(用户标记为垃圾邮件) AUTO_REPLY = "auto_reply" # 自动回复 class BounceClassifier: """退信分类器:根据SMTP响应码和退回内容判断类型""" # 常见的硬退信SMTP响应码 HARD_BOUNCE_CODES = {550, 551, 552, 553, 554, 521, 522, 523} # 常见的软退信SMTP响应码 SOFT_BOUNCE_CODES = {450, 451, 452, 455, 471, 551} # 硬退信关键词 HARD_BOUNCE_PATTERNS = [ r"user unknown", r"no such.*(?:user|mailbox|address)", r"mailbox (?:not found|unavailable)", r"invalid.*(?:address|recipient)", r"account.*(?:disabled|discontinued|suspended)", r"address.*reject", r"does not (?:exist|live here)", r"undeliverable", ] @classmethod def classify(cls, smtp_code: int = None, bounce_message: str = "") -> BounceType: """根据SMTP响应码和退信内容分类""" if smtp_code: if smtp_code in cls.HARD_BOUNCE_CODES: return BounceType.HARD_BOUNCE if smtp_code in cls.SOFT_BOUNCE_CODES: return BounceType.SOFT_BOUNCE # 解析退信内容 msg_lower = bounce_message.lower() for pattern in cls.HARD_BOUNCE_PATTERNS: if re.search(pattern, msg_lower): return BounceType.HARD_BOUNCE if any(word in msg_lower for word in ["temporarily", "try again", "over quota", "try later"]): return BounceType.SOFT_BOUNCE return BounceType.SOFT_BOUNCE # 默认按软退信处理 class BounceHandler: """退信处理器""" def __init__(self, db_connection): self.db = db_connection self.classifier = BounceClassifier() def process_bounce(self, email: str, smtp_code: int, bounce_message: str, campaign_id: int = None): """处理一条退信记录""" bounce_type = self.classifier.classify(smtp_code, bounce_message) now = datetime.now() # 记录退信 record = { "email": email, "campaign_id": campaign_id, "bounce_type": bounce_type.value, "smtp_code": smtp_code, "message_snippet": bounce_message[:500], "created_at": now.isoformat(), } self.db.insert("bounce_log", record) if bounce_type == BounceType.HARD_BOUNCE: # 硬退信:立即标记为无效 self.db.update("recipients", {"status": "invalid", "invalid_reason": "hard_bounce"}, {"email": email}) self.db.insert("suppression_list", { "email": email, "reason": "hard_bounce", "created_at": now.isoformat(), }) logging.warning(f"[硬退信] {email} -> 已加入禁止列表") elif bounce_type == BounceType.SOFT_BOUNCE: # 软退信:增加计数器 current = self.db.get("recipient_bounce_count", {"email": email}) count = (current or {}).get("count", 0) + 1 if count >= 3: # 连续3次软退信转为硬退信 self.db.update("recipients", {"status": "invalid", "invalid_reason": "soft_bounce_exceeded"}, {"email": email}) logging.warning(f"[软退信超限] {email} -> 超过3次,已标记为无效") else: self.db.upsert("recipient_bounce_count", {"email": email, "count": count}, {"email": email}) logging.info(f"[软退信] {email} -> 第{count}次,将延迟重试")

打开与点击追踪

import hashlib import hmac from urllib.parse import urlencode, quote from base64 import urlsafe_b64encode class TrackingLinkGenerator: """追踪链接生成器:生成打开追踪和点击追踪链接""" def __init__(self, tracking_domain: str, secret_key: str): self.tracking_domain = tracking_domain.rstrip('/') self.secret_key = secret_key.encode('utf-8') def generate_open_tracking_pixel(self, campaign_id: str, recipient_id: str) -> str: """ 生成1x1透明像素追踪图片URL 收件人打开邮件时加载此图片,服务器记录打开事件 """ payload = f"open|{campaign_id}|{recipient_id}" signature = self._sign(payload) return (f"{self.tracking_domain}/track/open?" f"c={campaign_id}&r={recipient_id}&s={signature}") def generate_click_tracking_link(self, campaign_id: str, recipient_id: str, target_url: str, link_id: str = "") -> str: """ 生成点击追踪重定向链接 收件人点击后先经过追踪服务器,记录事件后302重定向到目标URL """ params = urlencode({ "c": campaign_id, "r": recipient_id, "t": target_url, "l": link_id, "s": self._sign(f"click|{campaign_id}|{recipient_id}|{target_url}"), }) return f"{self.tracking_domain}/track/click?{params}" def _sign(self, payload: str) -> str: """HMAC签名,防止篡改""" sig = hmac.new(self.secret_key, payload.encode(), hashlib.sha256).digest() return urlsafe_b64encode(sig).decode()[:16] class TrackingServer: """追踪服务器(模拟)""" def __init__(self): self.events = [] def record_open(self, campaign_id: str, recipient_id: str, user_agent: str, ip: str, timestamp: str): """记录打开事件""" self.events.append({ "type": "open", "campaign_id": campaign_id, "recipient_id": recipient_id, "user_agent": user_agent, "ip": ip, "timestamp": timestamp, }) def record_click(self, campaign_id: str, recipient_id: str, target_url: str, link_id: str, timestamp: str): """记录点击事件""" self.events.append({ "type": "click", "campaign_id": campaign_id, "recipient_id": recipient_id, "target_url": target_url, "link_id": link_id, "timestamp": timestamp, }) def get_campaign_stats(self, campaign_id: str) -> dict: """获取活动统计""" campaign_events = [ e for e in self.events if e["campaign_id"] == campaign_id ] unique_opens = len(set( e["recipient_id"] for e in campaign_events if e["type"] == "open" )) unique_clicks = len(set( e["recipient_id"] for e in campaign_events if e["type"] == "click" )) total_sent = self._get_campaign_total(campaign_id) return { "campaign_id": campaign_id, "total_sent": total_sent, "unique_opens": unique_opens, "unique_clicks": unique_clicks, "open_rate": round(unique_opens / total_sent * 100, 2) if total_sent else 0, "click_rate": round(unique_clicks / total_sent * 100, 2) if total_sent else 0, "click_to_open_rate": round(unique_clicks / unique_opens * 100, 2) if unique_opens else 0, } # 使用示例 tracker = TrackingLinkGenerator("https://track.example.com", "my-secret-key-2024") pixel_url = tracker.generate_open_tracking_pixel("campaign_001", "user_12345") click_url = tracker.generate_click_tracking_link( "campaign_001", "user_12345", "https://example.com/promotion?product=watch" ) print(f"追踪像素: {pixel_url}") print(f"追踪链接: {click_url}")

七、发送统计与日志

完善的统计与日志系统是批量邮件平台的重要支撑,它既是运维监控的"眼睛",也是运营决策的"数据基础"。统计模块需要记录每个发送任务的完整生命周期:从入队、渲染、发送到送达确认,每个阶段的时间戳和状态都应该有据可查。核心统计指标包括发送总量、成功量、失败量、退信量、投诉量、打开率、点击率、转化率等,按照时间、模板、收件人分组、邮件客户端等维度进行多维度下钻分析。

日志系统采用结构化日志(JSON格式)记录每个发送事件,便于后续的日志分析和链路追踪。日志级别设计上,INFO记录正常的发送流程,WARNING记录软退信和重试,ERROR记录硬退信和系统异常,CRITICAL记录连续失败超过阈值的严重事件。每条日志应该包含请求ID(correlation_id),方便跨模块追踪一个发送请求的完整链路。日志收集后可以使用ELK(Elasticsearch + Logstash + Kibana)或Loki + Grafana进行可视化展示。

异常报警机制是保障系统稳定性的最后一道防线。当发送成功率低于预设阈值、退信率异常升高、队列积压超过警戒线时,系统应该通过邮件、短信或Webhook自动通知运维人员。报警应该区分警告级别:黄色预警(通知值班人员关注)、橙色报警(需要人工介入)、红色报警(立即响应,可能影响业务)。报警消息应该包含关键上下文信息,如异常时间段、影响范围、可能的根因、建议的排查步骤。

发送统计模块

from dataclasses import dataclass, field from datetime import datetime, date from collections import defaultdict from typing import Optional @dataclass class SendRecord: """单次发送记录""" task_id: str campaign_id: str recipient_email: str template_name: str status: str # queued | rendering | sending | sent | failed | bounced created_at: datetime sent_at: Optional[datetime] = None error_code: Optional[str] = None error_message: Optional[str] = None smtp_response: Optional[str] = None duration_ms: Optional[float] = None class SendStatsCollector: """发送统计收集器""" def __init__(self): self.records: list = [] def add_record(self, record: SendRecord): self.records.append(record) def get_daily_stats(self, target_date: date = None) -> dict: """获取指定日期的发送统计""" if target_date is None: target_date = date.today() day_records = [ r for r in self.records if r.created_at.date() == target_date ] total = len(day_records) if total == 0: return {"date": str(target_date), "total": 0} status_counts = defaultdict(int) for r in day_records: status_counts[r.status] += 1 return { "date": str(target_date), "total": total, "sent": status_counts.get("sent", 0), "failed": status_counts.get("failed", 0), "bounced": status_counts.get("bounced", 0), "success_rate": round( status_counts.get("sent", 0) / total * 100, 2 ), "bounce_rate": round( status_counts.get("bounced", 0) / total * 100, 2 ), } def get_campaign_report(self, campaign_id: str) -> dict: """获取指定活动的完整报告""" cmp_records = [ r for r in self.records if r.campaign_id == campaign_id ] if not cmp_records: return {"campaign_id": campaign_id, "status": "no_data"} total = len(cmp_records) status_counts = defaultdict(int) errors = defaultdict(int) durations = [] hourly_distribution = defaultdict(int) for r in cmp_records: status_counts[r.status] += 1 if r.error_code: errors[r.error_code] += 1 if r.duration_ms: durations.append(r.duration_ms) hour = r.created_at.hour hourly_distribution[f"{hour:02d}:00"] += 1 avg_duration = sum(durations) / len(durations) if durations else 0 return { "campaign_id": campaign_id, "total": total, "status_summary": dict(status_counts), "success_rate": round(status_counts.get("sent", 0) / total * 100, 2), "error_distribution": dict(errors), "avg_duration_ms": round(avg_duration, 2), "hourly_distribution": dict(sorted(hourly_distribution.items())), "generated_at": datetime.now().isoformat(), } # 使用示例 collector = SendStatsCollector() # 模拟添加记录 for i in range(1000): record = SendRecord( task_id=f"task_{i:05d}", campaign_id="campaign_newsletter_may", recipient_email=f"user{i:05d}@example.com", template_name="monthly_newsletter.html", status="sent" if i < 950 else "bounced", created_at=datetime.now(), sent_at=datetime.now(), duration_ms=round(250 + i * 0.5, 2), error_code=None if i < 950 else "550", ) collector.add_record(record) report = collector.get_campaign_report("campaign_newsletter_may") print(f"成功率: {report['success_rate']}%") print(f"平均发送耗时: {report['avg_duration_ms']}ms")

结构化日志系统

import json import logging import sys from datetime import datetime from pythonjsonlogger import jsonlogger # pip install python-json-logger class StructuredEmailLogger: """ 结构化邮件日志系统 将日志以JSON格式输出,便于ELK/Loki采集 """ def __init__(self, app_name: str = "email_system", log_file: str = "email_send.log"): self.app_name = app_name self.logger = logging.getLogger("email") self.logger.setLevel(logging.DEBUG) # 清除默认处理器 self.logger.handlers.clear() # 文件日志(JSON格式) file_handler = logging.FileHandler(log_file, encoding='utf-8') file_formatter = jsonlogger.JsonFormatter( fmt="%(timestamp)s %(level)s %(name)s %(message)s", timestamp=True, ) file_handler.setFormatter(file_formatter) self.logger.addHandler(file_handler) # 控制台日志(可读格式) console_handler = logging.StreamHandler(sys.stdout) console_formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s" ) console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) def log_send(self, level: str, event: str, **kwargs): """记录发送事件,自动添加基础字段""" extra = { "app": self.app_name, "event": event, "timestamp": datetime.now().isoformat(), **kwargs, } log_method = getattr(self.logger, level.lower(), self.logger.info) log_method(json.dumps(extra, ensure_ascii=False)) def log_send_start(self, task_id: str, email: str, campaign_id: str, template: str): self.log_send("info", "send_start", task_id=task_id, email=email, campaign_id=campaign_id, template=template) def log_send_success(self, task_id: str, email: str, duration_ms: float): self.log_send("info", "send_success", task_id=task_id, email=email, duration_ms=round(duration_ms, 2)) def log_send_failure(self, task_id: str, email: str, error_code: str, error_msg: str, retry_count: int): self.log_send("warning", "send_failure", task_id=task_id, email=email, error_code=error_code, error_msg=error_msg, retry_count=retry_count) def log_bounce(self, task_id: str, email: str, bounce_type: str, smtp_code: int): self.log_send("warning", "bounce", task_id=task_id, email=email, bounce_type=bounce_type, smtp_code=smtp_code) def log_queue_full(self, queue_name: str, size: int, limit: int): self.log_send("critical", "queue_full", queue_name=queue_name, size=size, limit=limit) # 使用示例 logger = StructuredEmailLogger() logger.log_send_start("task_00001", "alice@example.com", "campaign_may", "newsletter.html") logger.log_send_success("task_00001", "alice@example.com", 312.5) logger.log_send_failure("task_00002", "bad@invalid.com", "550", "User unknown", 0) # 模拟生成日志文件内容供查看 print("\n--- 日志文件内容示例 ---") with open("email_send.log", "r", encoding="utf-8") as f: for line in f.readlines()[:3]: parsed = json.loads(line) print(f" [{parsed.get('level','')}] {parsed.get('event','')} -> {parsed.get('email','')}")

异常报警与看板

from enum import Enum import smtplib import requests class AlertLevel(Enum): WARNING = "yellow" # 需要关注 ALERT = "orange" # 需要介入 CRITICAL = "red" # 立即响应 class AlertRule: """报警规则定义""" def __init__(self, name: str, metric: str, condition: str, threshold: float, level: AlertLevel, window_minutes: int = 5): self.name = name self.metric = metric self.condition = condition # gt, lt, eq self.threshold = threshold self.level = level self.window_minutes = window_minutes class AlertManager: """报警管理器""" def __init__(self): self.rules = [ AlertRule( "成功率过低", "success_rate", "lt", 0.90, AlertLevel.ALERT, 5 ), AlertRule( "退信率过高", "bounce_rate", "gt", 0.08, AlertLevel.CRITICAL, 5 ), AlertRule( "队列积压", "queue_depth", "gt", 10000, AlertLevel.WARNING, 10 ), AlertRule( "发送延迟过高", "avg_duration_ms", "gt", 5000, AlertLevel.ALERT, 5 ), ] self.notification_channels = [] def add_channel(self, channel: Callable): self.notification_channels.append(channel) def evaluate(self, metrics: dict) -> list: """评估所有规则,返回触发的报警列表""" triggered = [] for rule in self.rules: value = metrics.get(rule.metric, 0) triggered_flag = False if rule.condition == "gt" and value > rule.threshold: triggered_flag = True elif rule.condition == "lt" and value < rule.threshold: triggered_flag = True if triggered_flag: alert = { "rule": rule.name, "level": rule.level.value, "metric": rule.metric, "current_value": value, "threshold": rule.threshold, "timestamp": datetime.now().isoformat(), } triggered.append(alert) self._notify(alert) return triggered def _notify(self, alert: dict): """通过所有通知渠道发送报警""" for channel in self.notification_channels: try: channel(alert) except Exception as e: logging.error(f"报警通知失败: {e}") def init_default_channels(self): """初始化默认通知渠道""" def email_channel(alert): msg = (f"[{alert['level'].upper()}] {alert['rule']}: " f"当前值 {alert['current_value']}, " f"阈值 {alert['threshold']}") print(f" [邮件通知] {msg}") def webhook_channel(alert): webhook_url = "https://hooks.example.com/alert" try: resp = requests.post(webhook_url, json=alert, timeout=5) print(f" [Webhook] 状态码: {resp.status_code}") except Exception as e: print(f" [Webhook] 失败: {e}") def sms_channel(alert): if alert['level'] == 'red': print(f" [短信通知] 紧急: {alert['rule']}") self.add_channel(email_channel) self.add_channel(webhook_channel) self.add_channel(sms_channel) # 使用示例 manager = AlertManager() manager.init_default_channels() # 模拟指标评估 metrics = { "success_rate": 0.85, # 低于90% -> 触发ALERT "bounce_rate": 0.12, # 高于8% -> 触发CRITICAL "queue_depth": 15000, # 高于1万 -> 触发WARNING "avg_duration_ms": 1200, # 正常 } triggered = manager.evaluate(metrics) print(f"\n触发 {len(triggered)} 条报警:") for a in triggered: print(f" 等级:{a['level']} | 规则:{a['rule']} | " f"当前:{a['current_value']} | 阈值:{a['threshold']}")

八、邮件营销合规

邮件营销合规是批量邮件系统不可忽视的法律红线。全球主要国家和地区都有严格的反垃圾邮件法律,最著名的是美国的CAN-SPAM Act、欧盟的GDPR和中国的《通信短信息服务管理规定》。这些法规的核心要求包括:未经收件人明确同意不得发送商业邮件、邮件中必须包含真实的发件人信息、邮件标题不得误导、必须提供便捷的退订机制、退订请求必须在规定时间内(通常10个工作日内)处理完毕。违反这些法规的后果可能是巨额罚款(GDPR最高可达全球营收的4%),因此合规设计必须作为系统的核心功能而非可选项。

退订链接是合规设计中最关键的功能。每一封营销邮件必须在显著位置包含退订链接,点击后收件人可以选择完全退订或仅退订特定类别的邮件。退订流程必须简单快捷——理想情况下点击即退订,无需二次登录或填写表单。退订请求需要实时同步到所有发送队列,防止已经退订的用户在数据同步延迟期间继续收到邮件。系统还应维护一个全局的禁止发送列表(suppression list),在所有营销活动中共享,确保任何时候都不会向已退订用户发送邮件。

发送时间策略既是合规要求也是效率优化手段。不同国家和地区对发送时间有不同规定,例如德国禁止在周日和公共假日发送营销邮件。从用户体验角度,工作日上午9-11点和下午2-4点是最佳发送窗口,打开率最高;周末和深夜发送不仅打开率低,还容易被投诉。系统应该支持基于收件人时区的智能发送(例如记录每个收件人的时区,在当地时间上午10点发送),同时支持全局发送日历,标注节假日和静默期。此外,还需要建立投诉处理流程,当收件人投诉率超过阈值(通常0.1%)时,自动暂停发送并启动自查。

退订管理模块

from datetime import datetime, timedelta from enum import Enum import hashlib class UnsubscribeType(Enum): ALL = "all" # 完全退订 CATEGORY = "category" # 退订特定类别 FREQUENCY = "frequency" # 降低频率 class UnsubscribeManager: """退订管理器:处理退订请求和全局禁止发送列表""" def __init__(self, redis_client, db_connection): self.redis = redis_client self.db = db_connection self.suppression_key = "global:suppression_list" def generate_unsubscribe_token(self, email: str, campaign_id: str = None) -> str: """生成退订令牌(带签名防篡改)""" payload = f"{email}|{campaign_id or 'all'}|{datetime.now().timestamp()}" token = hashlib.sha256(payload.encode()).hexdigest()[:24] # 存储令牌与邮箱的映射(有效期7天) self.redis.setex(f"unsub:token:{token}", 604800, email) return token def generate_unsubscribe_url(self, email: str, campaign_id: str = None, unsub_type: UnsubscribeType = UnsubscribeType.ALL ) -> str: """生成退订链接""" token = self.generate_unsubscribe_token(email, campaign_id) return (f"https://mail.example.com/unsubscribe?" f"token={token}&type={unsub_type.value}") def process_unsubscribe(self, token: str, unsub_type: str = "all") -> dict: """处理退订请求""" email = self.redis.get(f"unsub:token:{token}") if not email: return {"success": False, "message": "无效或过期的退订令牌"} now = datetime.now() # 加入全局禁止发送列表 self.redis.sadd(self.suppression_key, email) # 持久化到数据库 self.db.upsert("suppression_list", { "email": email, "unsubscribe_type": unsub_type, "unsubscribed_at": now.isoformat(), "source": "user_initiated", }, {"email": email}) # 同时更新收件人状态 self.db.update("recipients", {"status": "unsubscribed"}, {"email": email}) # 清除缓存 self.redis.delete(f"unsub:token:{token}") logging.info(f"退订成功: {email} (类型: {unsub_type})") return {"success": True, "message": "您已成功退订,将不再收到相关邮件"} def is_suppressed(self, email: str) -> bool: """检查邮箱是否在禁止发送列表中""" # 先查Redis缓存 if self.redis.sismember(self.suppression_key, email): return True # 再查数据库 record = self.db.get("suppression_list", {"email": email}) return record is not None def bulk_check_suppressed(self, emails: list) -> dict: """批量检查邮箱状态,返回 {email: is_suppressed}""" results = {} pipeline = self.redis.pipeline() for email in emails: pipeline.sismember(self.suppression_key, email) redis_results = pipeline.execute() for email, in_redis in zip(emails, redis_results): results[email] = bool(in_redis) return results # 使用示例 # manager = UnsubscribeManager(redis_client, db) # url = manager.generate_unsubscribe_url("user@example.com") # print(f"退订链接: {url}") # 在邮件页脚中嵌入退订链接

合规检查与审批流程

from dataclasses import dataclass from typing import List import re @dataclass class ComplianceViolation: """合规违规记录""" field: str severity: str # error, warning message: str class ComplianceChecker: """邮件合规检查器:在发送前检查邮件内容是否符合法规""" REQUIRED_FIELDS = [ "unsubscribe_url", # 必须包含退订链接 "sender_name", # 必须包含真实发件人名称 "sender_address", # 必须包含真实物理地址 "privacy_policy_url", # 隐私政策链接 ] def __init__(self): self.violations: List[ComplianceViolation] = [] def check_email_content(self, html_body: str) -> List[ComplianceViolation]: """检查邮件HTML内容""" self.violations = [] # 检查退订链接 if "unsubscribe" not in html_body.lower(): self.violations.append(ComplianceViolation( "unsubscribe", "error", "邮件必须包含退订链接(unsubscribe)" )) # 检查发件人信息 if "mailto:" not in html_body.lower(): self.violations.append(ComplianceViolation( "sender", "warning", "建议在邮件中包含发件人邮箱联系信息" )) # 检查隐私政策 if "privacy" not in html_body.lower(): self.violations.append(ComplianceViolation( "privacy", "warning", "建议在邮件中包含隐私政策链接" )) # 检查误导性标题 misleading_patterns = [ r"(?i)RE:\s*", r"(?i)FWD:\s*", r"(?i)重要通知.*无需操作", r"(?i)账单.*立即支付", ] title_match = re.search(r"(.*?)", html_body, re.DOTALL) if title_match: title = title_match.group(1) for pattern in misleading_patterns: if re.search(pattern, title): self.violations.append(ComplianceViolation( "subject", "error", f"邮件标题可能具有误导性: '{title}'" )) # 检查垃圾邮件敏感词 spam_words = ["免费", "100%", "不劳而获", "点击这里", "立即购买", "限时特惠", "错过今天", "现金奖励", "中奖"] for word in spam_words: if word in html_body: count = html_body.count(word) if count > 2: self.violations.append(ComplianceViolation( "content", "warning", f"垃圾邮件敏感词 '{word}' 出现了 {count} 次" )) return self.violations def check_campaign(self, campaign: dict) -> tuple: """ 检查一次邮件活动是否符合规范 返回 (is_pass: bool, violations: list) """ violations = [] # 检查必要字段 for field in self.REQUIRED_FIELDS: if field not in campaign or not campaign.get(field): violations.append(ComplianceViolation( field, "error", f"缺少必要字段: {field}" )) # 检查HTML内容 if "html_body" in campaign: content_violations = self.check_email_content(campaign["html_body"]) violations.extend(content_violations) # 检查收件人来源 if campaign.get("recipient_source") == "purchased": violations.append(ComplianceViolation( "recipient_source", "error", "严禁使用购买或第三方收集的邮箱列表" )) # 检查是否有明确的许可证明 if not campaign.get("consent_obtained"): violations.append(ComplianceViolation( "consent", "error", "没有收件人明确同意的记录" )) has_error = any(v.severity == "error" for v in violations) return (not has_error, violations) class CampaignApprovalWorkflow: """活动审批流程""" def __init__(self): self.checker = ComplianceChecker() self.pending_approvals = [] def submit_for_approval(self, campaign: dict) -> str: """提交活动审批""" is_pass, violations = self.checker.check_campaign(campaign) approval_id = hashlib.md5( f"{campaign.get('name')}{datetime.now().isoformat()}".encode() ).hexdigest()[:12] record = { "approval_id": approval_id, "campaign_name": campaign.get("name"), "submitted_at": datetime.now().isoformat(), "is_pass": is_pass, "violations": [ {"field": v.field, "severity": v.severity, "message": v.message} for v in violations ], "status": "approved" if is_pass else "rejected", } self.pending_approvals.append(record) if is_pass: logging.info(f"[审批通过] {campaign.get('name')} - {approval_id}") else: logging.warning(f"[审批拒绝] {campaign.get('name')} - {approval_id}") return record # 使用示例 checker = ComplianceChecker() campaign = { "name": "五月促销活动", "unsubscribe_url": "https://example.com/unsub", "sender_name": "佼艾科技", "sender_address": "上海市浦东新区", "privacy_policy_url": "https://example.com/privacy", "recipient_source": "opt_in", "consent_obtained": True, "html_body": """\

五月特惠

亲爱的用户,本月推出限时优惠活动...

退订 """, } is_pass, violations = checker.check_campaign(campaign) print(f"合规检查: {'通过' if is_pass else '拒绝'}") for v in violations: print(f" [{v.severity.upper()}] {v.field}: {v.message}")

九、实战案例

将前面八个章节的知识整合为一个完整的实战项目,才能真正掌握批量邮件系统的构建能力。本章通过三个有代表性的案例,展示从需求分析、系统设计到代码实现的全过程。第一个案例是月度Newsletter系统,这是最常见的邮件营销场景;第二个案例是客户生日祝福自动化,涉及定时任务和个性化渲染;第三个案例是活动邀请批量发送,涵盖复杂模板和动态附件。每个案例都从业务需求出发,逐步推导出技术方案。

实战案例的核心价值在于展示如何将之前学习的各个技术点串联起来形成一个完整的业务流程。以月度Newsletter为例:定时任务每月1日触发→从CRM系统获取活跃用户列表→对每个用户进行个性化渲染(姓名、公司、推荐内容)→通过限流器控制发送速率→发送完成更新数据库→统计当月的打开率和点击率→更新用户画像用于下月推荐。这个流程涉及模板引擎、收件人管理、队列调度、并发限流、追踪反馈、统计分析等所有模块的协同工作。

在项目实施过程中,还需要考虑非功能性需求:系统需要支持水平扩展以应对业务增长;数据库需要定期清理和归档历史数据;敏感信息(如SMTP密码、API密钥)需要加密存储;发送失败率需要持续监控并设置自动化告警;不同业务的邮件需要隔离(如交易类邮件和营销类邮件使用不同的发送通道)。良好的架构设计应该在功能交付的同时,为未来的运维和扩展奠定基础。

案例一:月度Newsletter系统

""" 案例一:月度Newsletter系统 功能:每月定时向所有活跃用户发送个性化新闻通讯 """ from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("newsletter") class MonthlyNewsletterSystem: """月度Newsletter系统""" def __init__(self, template_engine, email_sender, recipient_loader, stats_collector): self.engine = template_engine self.sender = email_sender self.loader = recipient_loader self.stats = stats_collector def prepare_newsletter_data(self, recipient: Recipient) -> dict: """为每位收件人生成个性化内容""" # 根据用户的历史行为生成推荐内容 recommended_articles = self._get_recommendations(recipient) return { "recipient_name": recipient.name, "company_name": recipient.company or "佼艾科技", "month": datetime.now().strftime("%Y年%m月"), "featured_articles": recommended_articles, "personalized_tips": self._generate_tips(recipient), "unsubscribe_url": f"https://mail.example.com/unsub?" f"uid={hash(recipient.email) % 1000000}", "read_online_url": f"https://mail.example.com/view/" f"{hash(recipient.email) % 1000000}", } def _get_recommendations(self, recipient: Recipient) -> list: """模拟获取个性化推荐内容""" return [ {"title": "Python 3.12 新特性详解", "summary": "模式匹配、类型参数语法...", "url": "https://blog.example.com/python-312"}, {"title": "SQLite vs DuckDB 分析引擎对比", "summary": "列式存储与行式存储的选择...", "url": "https://blog.example.com/sqlite-duckdb"}, {"title": "FastAPI 异步编程最佳实践", "summary": "依赖注入、中间件、WebSocket...", "url": "https://blog.example.com/fastapi-async"}, ] def _generate_tips(self, recipient: Recipient) -> str: """根据用户等级生成个性化提示""" tips_map = { "vip": "感谢您长期的支持!本月VIP专享线上交流会即将举办。", "normal": "完成个人资料可提升为VIP会员,享受更多专属权益。", "new": "欢迎加入!建议您先完善个人资料以获得个性化推荐。", } return tips_map.get(recipient.tier, tips_map["normal"]) def send_newsletter(self, recipient: Recipient): """为单个收件人发送Newsletter""" context = self.prepare_newsletter_data(recipient) html_body = self.engine.render("monthly_newsletter.html", context) success = self.sender.send_one( recipient, html_body, subject=f"【佼艾科技】{context['month']}技术月刊", sender="newsletter@example.com" ) if success: self.stats.add_record(SendRecord( task_id=f"nl_{hash(recipient.email)}", campaign_id="monthly_newsletter", recipient_email=recipient.email, template_name="monthly_newsletter.html", status="sent", created_at=datetime.now(), sent_at=datetime.now(), duration_ms=0, )) def run_campaign(self, recipients: List[Recipient]): """执行完整的Newsletter发送活动""" logger.info(f"开始发送月度Newsletter,共 {len(recipients)} 人") for recipient in recipients: try: self.send_newsletter(recipient) except Exception as e: logger.error(f"发送失败 {recipient.email}: {e}") report = self.stats.get_campaign_report("monthly_newsletter") logger.info(f"发送完成: {report['success_rate']}% 成功率") return report # 调度配置:每月1号上午10点执行 scheduler = BackgroundScheduler() scheduler.add_job( lambda: newsletter_system.run_campaign(active_recipients), trigger='cron', day=1, hour=10, minute=0, id='monthly_newsletter', replace_existing=True, ) # scheduler.start() print("月度Newsletter调度器已配置:每月1日 10:00 自动发送")

案例二:客户生日祝福自动化

""" 案例二:客户生日祝福自动化 功能:每天检查是否有客户过生日,自动发送生日祝福邮件 """ from datetime import date import random from typing import List @dataclass class BirthdayRecipient(Recipient): """增加生日字段的收件人""" birthday: date = None class BirthdayGreetingSystem: """生日祝福自动化系统""" def __init__(self, template_engine, email_sender, db_connection): self.engine = template_engine self.sender = email_sender self.db = db_connection def get_today_birthday_people(self) -> List[BirthdayRecipient]: """查询今天过生日的客户""" today = date.today() # 从数据库查询生日为今天(忽略年份)的活跃客户 query = """ SELECT email, name, company, birthday, tier FROM recipients WHERE strftime('%%m-%%d', birthday) = ? AND status = 'active' AND is_suppressed = 0 """ rows = self.db.query(query, (today.strftime("%m-%d"),)) return [ BirthdayRecipient( email=row['email'], name=row['name'], company=row['company'], birthday=datetime.strptime(row['birthday'], "%Y-%m-%d").date(), tier=row.get('tier', 'normal'), ) for row in rows ] def generate_coupon(self, recipient: BirthdayRecipient) -> dict: """生成生日专属优惠券""" discount_map = { "vip": 0.30, # VIP 7折 "normal": 0.15, # 普通 85折 "new": 0.20, # 新用户 8折 } discount = discount_map.get(recipient.tier, 0.15) coupon_code = f"BDAY{recipient.name[:2].upper()}{random.randint(1000,9999)}" return { "code": coupon_code, "discount": f"{int(discount * 100)}%", "expire_days": 14, "description": f"生日专享{discount*100:.0f}折优惠,有效期14天", } def send_birthday_greeting(self, recipient: BirthdayRecipient): """发送生日祝福邮件""" today = date.today() age = today.year - recipient.birthday.year coupon = self.generate_coupon(recipient) context = { "recipient_name": recipient.name, "company_name": recipient.company or "佼艾科技", "birthday_date": recipient.birthday.strftime("%Y-%m-%d"), "age": age, "coupon_code": coupon["code"], "coupon_discount": coupon["discount"], "coupon_expire_days": coupon["expire_days"], "coupon_description": coupon["description"], "special_blessing": self._get_blessing(recipient.tier), "unsubscribe_url": f"https://mail.example.com/unsub?" f"uid={hash(recipient.email) % 1000000}", } html_body = self.engine.render("birthday_greeting.html", context) success = self.sender.send_one( recipient, html_body, subject=f"🎂 {recipient.name},佼艾科技祝您生日快乐!", sender="birthday@example.com" ) if success: # 记录发送日志 self.db.insert("birthday_log", { "email": recipient.email, "name": recipient.name, "sent_at": datetime.now().isoformat(), "coupon_code": coupon["code"], "age": age, }) logger.info(f"生日祝福已发送: {recipient.name} <{recipient.email}>") def _get_blessing(self, tier: str) -> str: blessings = { "vip": "作为我们尊贵的VIP会员,您一直以来的支持是我们最大的动力。" "愿您在新的一岁里,事业蒸蒸日上,生活幸福美满!", "normal": "感谢您选择佼艾科技,愿您生日快乐,万事如意!", "new": "欢迎加入佼艾科技大家庭!祝您生日快乐,希望在未来的日子里," "我们能为您带来更多价值!", } return blessings.get(tier, "祝您生日快乐,健康幸福!") def daily_task(self): """每日定时任务:查询生日客户并发送祝福""" birthday_people = self.get_today_birthday_people() if not birthday_people: logger.info("今日无客户过生日") return {"sent": 0} for person in birthday_people: self.send_birthday_greeting(person) logger.info(f"生日祝福发送完成: {len(birthday_people)} 人") return {"sent": len(birthday_people)} # 调度配置:每天早上8点执行 scheduler.add_job( lambda: birthday_system.daily_task(), trigger='cron', hour=8, minute=0, id='birthday_greeting', replace_existing=True, ) print("生日祝福调度器已配置:每天 08:00 自动检测并发送")

案例三:活动邀请批量发送

""" 案例三:活动邀请批量发送 功能:向目标客户发送活动邀请,支持多种邀请模板和动态附件 """ from datetime import datetime, timedelta class EventInvitationSystem: """活动邀请系统""" INVITATION_TEMPLATES = { "conference": "invitation_conference.html", "webinar": "invitation_webinar.html", "workshop": "invitation_workshop.html", } def __init__(self, template_engine, email_sender, attachment_generator, task_queue): self.engine = template_engine self.sender = email_sender self.attachment_gen = attachment_generator self.queue = task_queue def prepare_invitation(self, recipient: Recipient, event: dict) -> dict: """准备活动邀请内容""" template_name = self.INVITATION_TEMPLATES.get( event['type'], "invitation_default.html" ) qr_data = (f"https://events.example.com/checkin?" f"e={event['id']}&u={hash(recipient.email) % 1000000}") attachment = self.attachment_gen.generate_pdf_report( recipient, { "period": event['date'], "visits": 0, "prev_visits": 0, "conversion": 0, "prev_conversion": 0, "revenue": 0, "prev_revenue": 0, "visits_growth": 0, "conversion_growth": 0, "revenue_growth": 0, "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), } ) return { "template_name": template_name, "context": { "recipient_name": recipient.name, "recipient_email": recipient.email, "company_name": recipient.company or "特邀嘉宾", "event_name": event['name'], "event_date": event['date'], "event_time": event['time'], "event_location": event['location'], "event_description": event.get('description', ''), "agenda": event.get('agenda', []), "speakers": event.get('speakers', []), "dress_code": event.get('dress_code', '商务休闲'), "rsvp_deadline": event.get('rsvp_deadline', ''), "qr_checkin_url": qr_data, "map_url": f"https://maps.example.com/?q={event['location']}", "unsubscribe_url": f"https://mail.example.com/unsub?" f"uid={hash(recipient.email) % 1000000}", }, "attachments": [attachment], "subject": f"【邀请函】{event['name']} - {recipient.name} 先生/女士 收", } def send_invitation(self, recipient: Recipient, event: dict) -> bool: """发送活动邀请""" prepared = self.prepare_invitation(recipient, event) # 构建带附件的邮件 msg = MIMEMultipart('mixed') msg['Subject'] = prepared['subject'] msg['From'] = "events@example.com" msg['To'] = recipient.email msg['Reply-To'] = "rsvp@example.com" # HTML正文 html_body = self.engine.render( prepared['template_name'], prepared['context'] ) msg.attach(MIMEText(html_body, 'html', 'utf-8')) # 附件 for att in prepared['attachments']: part = MIMEBase('application', 'octet-stream') part.set_payload(att.data) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename="{att.filename}"' ) msg.attach(part) # 发送 success = self.sender.send_one(recipient, msg.as_string(), prepared['subject'], "events@example.com") return success def batch_invite(self, recipients: List[Recipient], event: dict) -> dict: """批量发送活动邀请""" results = {"success": 0, "failed": 0, "total": len(recipients)} for i, recipient in enumerate(recipients): try: ok = self.send_invitation(recipient, event) if ok: results["success"] += 1 else: results["failed"] += 1 except Exception as e: results["failed"] += 1 logger.error(f"邀请发送失败 {recipient.email}: {e}") # 每50封输出一次进度 if (i + 1) % 50 == 0: logger.info(f"邀请发送进度: {i+1}/{len(recipients)}") return results # 使用示例 event_info = { "id": "tech_conf_2026", "type": "conference", "name": "2026 Python中国技术大会", "date": "2026-06-15", "time": "09:00 - 17:30", "location": "上海国际会议中心 3楼华夏厅", "description": "一年一度的Python技术盛会,汇聚国内外顶尖开发者...", "agenda": [ {"time": "09:00-09:30", "title": "签到入场"}, {"time": "09:30-11:30", "title": "主题演讲:Python 3.13 新特性与未来展望"}, {"time": "13:00-15:00", "title": "分论坛:Web开发 / 数据分析 / AI应用"}, {"time": "15:15-16:45", "title": "圆桌讨论:Python在企业的落地实践"}, {"time": "16:45-17:30", "title": "自由交流 & 闭幕"}, ], "speakers": [ {"name": "Guido van Rossum", "title": "Python之父(视频连线)"}, {"name": "张伟", "title": "阿里巴巴资深技术专家"}, {"name": "李娜", "title": "微软Python团队首席工程师"}, ], "dress_code": "商务休闲", "rsvp_deadline": "2026-06-01", } # 假设从Excel加载了1000位受邀嘉宾 # recipients = list(loader.from_excel("invitees.xlsx")) # result = inviter.batch_invite(recipients, event_info) # print(f"邀请发送完成: {result['success']} 成功, {result['failed']} 失败")