← 返回自动化办公目录
← 返回学习笔记首页
专题: Python 自动化办公系统学习
关键词: Python, 自动化办公, IMAP, POP3, 邮件收取, imaplib, 邮件解析, 邮件自动化, Python
一、IMAP vs POP3 — 协议原理对比
邮件自动收取的第一步是选对协议。IMAP(Internet Message Access Protocol)和 POP3(Post Office Protocol 3)是两种最主流的邮件接收协议,它们在设计理念、功能特性和适用场景上存在本质差异。理解二者的区别,是构建邮件自动化系统的基础。
协议原理对比 :POP3 是一种"离线"协议,其设计初衷是在客户端和服务器之间建立临时连接,将邮件从服务器下载到本地后,服务器端的邮件通常会被删除。这种方式在早期拨号上网时代非常流行——用户拨号连接、收取邮件、断开连接、在本地阅读。IMAP 则是一种"在线"协议,邮件始终保留在服务器上,客户端对邮件的操作(阅读、移动、删除、标记)会同步回服务器。IMAP 更像是一种远程文件系统协议,客户端可以只下载邮件的部分内容(如先下载邮件头,需要时再下载全文)。
核心差异对照表 :
特性 POP3 IMAP
工作模式 离线模式 在线/离线均可
邮件存储 下载到本地,服务器可选删除 始终保留在服务器
多设备同步 不支持 天然支持
文件夹管理 仅收件箱 支持自定义文件夹
部分下载 不支持 支持(先下载头信息)
服务器搜索 不支持 支持服务端搜索
端口号(SSL) 995 993
适用场景 单设备、离线阅读 多设备、在线办公
适用场景分析 :IMAP 更适用于自动化办公场景,原因有三:第一,多个自动化任务可以在不同设备上协同操作同一个邮箱;第二,可以通过服务器端搜索快速定位邮件,避免大量数据下载;第三,可以精细控制要同步的文件夹和邮件状态。POP3 更适合个人单设备使用的简单场景或作为备份方案(在 IMAP 之外再通过 POP3 做一次本地备份)。
Python 中的选择策略 :Python 标准库同时提供了 imaplib 和 poplib 两个模块。在开发邮件自动化系统时,建议优先选择 IMAP 协议。如果需要支持遗留系统提供 POP3 接口,也需要了解 poplib 的基本用法。
# 检测邮箱支持的协议能力
import imaplib
def check_mailbox_capabilities(host, port=993):
"""检测IMAP服务器支持的功能"""
mail = imaplib.IMAP4_SSL(host, port)
mail.login('user@example.com', 'password')
status, capabilities = mail.capability()
if status == 'OK':
print('服务器支持的功能:', capabilities)
if b'IDLE' in capabilities[0]:
print('支持IDLE推送通知')
if b'QUOTA' in capabilities[0]:
print('支持配额查询')
if b'CHILDREN' in capabilities[0]:
print('支持子文件夹')
mail.logout()
# check_mailbox_capabilities('imap.example.com')
# IMAP 与 POP3 连接对比演示
import imaplib
import poplib
# IMAP连接 - 在线模式
imap = imaplib.IMAP4_SSL('imap.gmail.com')
imap.login('user@gmail.com', 'password')
# 列出所有文件夹
status, folders = imap.list()
for folder in folders:
print('IMAP文件夹:', folder.decode())
imap.logout()
print('---')
# POP3连接 - 离线模式
pop = poplib.POP3_SSL('pop.gmail.com')
pop.user('user@gmail.com')
pop.pass_('password')
# POP3只有收件箱
num_messages = len(pop.list()[1])
print(f'POP3收件箱邮件数: {num_messages}')
pop.quit()
# 协议选择器工厂函数
from enum import Enum
import imaplib
import poplib
class MailProtocol(Enum):
IMAP = 'imap'
POP3 = 'pop3'
PROTOCOL_CONFIG = {
MailProtocol.IMAP: {
'host': 'imap.example.com',
'port': 993,
'lib': imaplib.IMAP4_SSL,
'supports_folders': True,
'supports_search': True,
},
MailProtocol.POP3: {
'host': 'pop.example.com',
'port': 995,
'lib': poplib.POP3_SSL,
'supports_folders': False,
'supports_search': False,
},
}
def get_mail_connection(protocol: MailProtocol, user: str, password: str):
"""根据协议类型获取连接"""
config = PROTOCOL_CONFIG[protocol]
if protocol == MailProtocol.IMAP:
conn = config['lib'](config['host'], config['port'])
conn.login(user, password)
return conn
else:
conn = config['lib'](config['host'], config['port'])
conn.user(user)
conn.pass_(password)
return conn
重点总结 :IMAP 是邮件自动化的首选协议,它支持多设备同步、服务器端搜索、文件夹管理,非常适合需要程序化操作邮件的场景。POP3 适用于简单备份或单向下载场景。在实际项目中选择时,优先考虑 IMAP,只有在目标邮箱仅支持 POP3 时才退而求其次。
二、imaplib 核心用法
Python 标准库中的 imaplib 模块提供了 IMAP4 协议的完整实现,是构建邮件自动化系统的核心工具。它支持 IMAP4rev1 标准中的所有关键操作,包括认证、邮箱选择、邮件搜索、邮件获取、状态管理等。本节详细讲解 imaplib 的核心用法。
连接与认证 :连接 IMAP 服务器时,首推使用 SSL/TLS 加密连接。绝大多数现代邮件服务商(如 Gmail、Outlook、QQ邮箱、163邮箱)都要求使用 SSL 连接。连接成功后,需要调用 login() 方法进行认证。对于开启了两步验证的账号,通常需要使用"应用专用密码"代替登录密码。
邮箱选择与操作 :认证通过后,使用 select() 方法选择一个邮箱(默认是 INBOX)。select() 方法返回邮箱的状态信息,包括邮件总数、最近邮件数等。IMAP 支持多个邮箱文件夹,可以通过 list() 方法列出所有邮箱,使用 create() 方法创建新邮箱。
搜索与获取 :search() 方法用于在服务器端搜索邮件,返回满足条件的邮件序号列表。fetch() 方法根据序号获取邮件的指定部分,可以只获取邮件头、指定 MIME 部分或完整邮件。通过设置不同的 fetch 参数,可以精细控制要下载的数据量。
常用操作演示 :
# imaplib 基础连接与邮箱操作
import imaplib
def connect_imap(host, port, user, password):
"""连接IMAP服务器并选择收件箱"""
try:
# 建立SSL连接
mail = imaplib.IMAP4_SSL(host, port)
mail.login(user, password)
print(f'成功连接到 {host}')
# 选择收件箱
status, messages = mail.select('INBOX')
print(f'收件箱状态: {status}')
print(f'邮件总数: {messages[0].decode()}')
# 列出所有邮箱文件夹
status, folders = mail.list()
print('\n所有邮箱文件夹:')
for folder in folders:
print(f' - {folder.decode()}')
return mail
except imaplib.IMAP4.error as e:
print(f'IMAP连接失败: {e}')
return None
# 使用示例(请替换为实际凭据)
# mail = connect_imap('imap.example.com', 993, 'user@example.com', 'password')
# 搜索并获取邮件 - 详细演示
def search_and_fetch(mail, search_criteria='ALL'):
"""按条件搜索并获取邮件头信息"""
# 搜索邮件
status, message_ids = mail.search(None, search_criteria)
if status != 'OK':
print('搜索失败')
return
ids = message_ids[0].split()
print(f'找到 {len(ids)} 封符合条件邮件')
# 只处理前5封
for msg_id in ids[:5]:
# 获取邮件头(RFC822格式的头信息)
status, msg_data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])')
if status != 'OK':
continue
# 解析邮件头
raw_headers = msg_data[0][1]
print(f'\n--- 邮件 ID: {msg_id.decode()} ---')
print(raw_headers.decode('utf-8', errors='replace')[:300])
return ids
# 搜索今日邮件示例
# search_and_fetch(mail, '(SINCE "01-May-2026")')
# 获取邮件的不同部分
def fetch_email_parts(mail, msg_id):
"""获取邮件的不同组成部分"""
# 1. 只获取邮件头
status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])')
headers = data[0][1]
print('邮件头已获取')
# 2. 获取邮件正文(仅文本部分)
status, data = mail.fetch(msg_id, '(BODY.PEEK[TEXT])')
body = data[0][1]
print(f'正文大小: {len(body)} 字节')
# 3. 获取邮件完整结构(不下载内容)
status, data = mail.fetch(msg_id, '(BODYSTRUCTURE)')
structure = data[0][1]
print(f'邮件结构: {structure.decode()[:200]}...')
# 4. 获取完整邮件
status, data = mail.fetch(msg_id, '(RFC822)')
full_email = data[0][1]
print(f'完整邮件大小: {len(full_email)} 字节')
# 5. 获取邮件标志(已读/未读等)
status, data = mail.fetch(msg_id, '(FLAGS)')
flags = data[0]
print(f'邮件标志: {flags}')
return headers
# IMAP连接管理最佳实践
class IMAPClient:
"""IMAP客户端封装类"""
def __init__(self, host, port=993, user=None, password=None):
self.host = host
self.port = port
self.user = user
self.password = password
self.connection = None
def __enter__(self):
self.connection = imaplib.IMAP4_SSL(self.host, self.port)
self.connection.login(self.user, self.password)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.connection:
try:
self.connection.close()
self.connection.logout()
except:
pass
def select_mailbox(self, mailbox='INBOX'):
return self.connection.select(mailbox)
def search(self, criteria='ALL'):
return self.connection.search(None, criteria)
def fetch(self, msg_id, parts='(RFC822)'):
return self.connection.fetch(msg_id, parts)
# 使用上下文管理器自动管理连接
# with IMAPClient('imap.example.com', 993, 'user', 'pass') as client:
# client.select_mailbox()
# status, ids = client.search('UNSEEN')
# print(f'未读邮件: {len(ids[0].split())}')
重点总结 :imaplib 的核心操作流程为:连接→登录→选择邮箱→搜索→获取→操作→关闭。掌握 search() 和 fetch() 的灵活组合是构建邮件自动化系统的关键。使用上下文管理器(with 语句)可以确保连接被正确关闭。
三、邮件搜索筛选
IMAP 的一个重要优势是支持服务器端搜索。这意味着可以在不下载邮件内容的情况下,通过搜索条件精准定位目标邮件,大大减少了网络传输量。imaplib 的 search() 方法支持丰富的搜索条件,让邮件筛选变得高效灵活。
按日期搜索 :日期搜索是邮件自动化中最常用的功能之一。IMAP 支持 SINCE、BEFORE、ON 等日期条件。日期格式必须为 DD-Mon-YYYY(如 01-May-2026)。需要注意的是,SINCE 条件是包含指定日期在内的,即搜索指定日期及之后收到的邮件。
按发件人/主题搜索 :使用 FROM 条件按发件人搜索,使用 SUBJECT 条件按主题搜索。这两个条件都支持子字符串匹配,可以结合通配符使用。对于中文内容,需要注意编码问题,部分服务器可能不支持直接搜索中文。解决方法是先通过其他条件缩小范围,再在本地进行二次过滤。
按邮件状态搜索 :IMAP 定义了多种邮件状态标志,包括 SEEN(已读)、UNSEEN(未读)、FLAGGED(已标记)、ANSWERED(已回复)、DELETED(已删除)等。搜索时使用 NOT 关键字可以取反,例如 NOT SEEN 等价于 UNSEEN。
# 日期范围搜索 - 查找特定时间段内的邮件
import imaplib
from datetime import datetime, timedelta
def search_by_date_range(mail, days_back=7):
"""搜索最近N天内的邮件"""
today = datetime.now()
since_date = today - timedelta(days=days_back)
# 格式化为 IMAP 日期格式: DD-Mon-YYYY
since_str = since_date.strftime('%d-%b-%Y')
before_str = (today + timedelta(days=1)).strftime('%d-%b-%Y')
# 搜索日期范围内的邮件
criteria = f'(SINCE {since_str} BEFORE {before_str})'
status, msg_ids = mail.search(None, criteria)
if status == 'OK':
ids = msg_ids[0].split()
print(f'最近 {days_back} 天内有 {len(ids)} 封邮件')
# 统计每天的邮件数
for msg_id in ids[-10:]: # 最近10封
status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (DATE SUBJECT)])')
if status == 'OK':
print(data[0][1].decode('utf-8', errors='replace'))
return msg_ids
# search_by_date_range(mail, 7)
# 复合搜索条件组合
def advanced_search(mail):
"""高级复合搜索演示"""
# 1. 来自特定发件人的未读邮件
criteria = '(UNSEEN FROM "notifications@example.com")'
status, ids = mail.search(None, criteria)
print(f'来自通知服务的未读邮件: {len(ids[0].split())}封')
# 2. 特定主题且已标记的邮件
criteria = '(FLAGGED SUBJECT "紧急")'
status, ids = mail.search(None, criteria)
print(f'标记为紧急的邮件: {len(ids[0].split())}封')
# 3. 7天内包含附件的邮件
from datetime import datetime, timedelta
date_str = (datetime.now() - timedelta(days=7)).strftime('%d-%b-%Y')
criteria = f'(SINCE {date_str} LARGER 50000)' # 大于50KB通常有附件
status, ids = mail.search(None, criteria)
print(f'7天内超过50KB的邮件: {len(ids[0].split())}封')
# 4. 未读且主题包含关键词但不包含垃圾词
criteria = '(UNSEEN SUBJECT "订单" NOT FROM "spam@example.com")'
status, ids = mail.search(None, criteria)
print(f'订单相关非垃圾未读: {len(ids[0].split())}封')
# 5. 多条件或关系(IMAP不直接支持OR)
# 使用OR关键字:OR条件1条件2
criteria = '(OR SUBJECT "发票" SUBJECT "账单")'
status, ids = mail.search(None, criteria)
print(f'发票或账单相关: {len(ids[0].split())}封')
return ids
# UID 模式搜索 - 更稳定的引用方式
def uid_search_example(mail):
"""使用UID模式搜索和操作邮件"""
# 先进入收件箱
mail.select('INBOX')
# UID搜索(比序号更稳定,邮件在服务器上的唯一标识)
status, uid_data = mail.uid('search', None, 'ALL')
uids = uid_data[0].split()
print(f'总邮件数: {len(uids)}')
print(f'最新5封UID: {uids[-5:]}')
if uids:
# 使用UID获取邮件(不会改变邮件的\Seen状态)
latest_uid = uids[-1]
status, data = mail.uid('fetch', latest_uid, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])')
if status == 'OK':
raw = data[0][1]
print(f'\n最新邮件头:\n{raw.decode("utf-8", errors="replace")}')
# 将搜索条件重新应用于搜索
from datetime import datetime, timedelta
date_str = (datetime.now() - timedelta(days=30)).strftime('%d-%b-%Y')
status, uid_data = mail.uid('search', None, f'(SINCE {date_str})')
recent_uids = uid_data[0].split()
print(f'最近30天内邮件数(UID): {len(recent_uids)}')
return uids
# 本地二次过滤 - 弥补服务端搜索的不足
def search_with_local_filter(mail, sender_pattern=None, subject_keywords=None):
"""服务端搜索 + 本地二次过滤"""
mail.select('INBOX')
# 第一步:服务端宽泛搜索(最近30天)
from datetime import datetime, timedelta
date_str = (datetime.now() - timedelta(days=30)).strftime('%d-%b-%Y')
status, msg_ids = mail.search(None, f'(SINCE {date_str})')
all_ids = msg_ids[0].split()
print(f'宽泛搜索得到: {len(all_ids)} 封')
# 第二步:本地二次过滤
matched = []
for msg_id in all_ids:
status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])')
if status != 'OK':
continue
raw = data[0][1].decode('utf-8', errors='replace')
# 本地发件人匹配(支持中文和部分匹配)
if sender_pattern and sender_pattern.lower() not in raw.lower():
continue
# 本地主题关键词匹配
if subject_keywords:
subject = ''
for line in raw.split('\n'):
if line.startswith('Subject:'):
subject = line
break
if not any(kw in subject for kw in subject_keywords):
continue
matched.append(msg_id)
print(f'本地过滤后: {len(matched)} 封')
return matched
重点总结 :IMAP的搜索功能非常强大,支持日期、发件人、主题、状态等多种条件组合。使用UID而不是序号来引用邮件更稳定。对于中文搜索等服务端支持不完善的情况,应先用宽条件搜索,再在本地进行二次过滤。
四、邮件解析
从IMAP服务器获取到原始邮件数据后,下一步是解析邮件内容。Python 的 email 标准库提供了强大的邮件解析能力,可以处理多部分MIME邮件、各种编码格式和附件。邮件解析是邮件自动化中最具挑战性的环节,因为邮件格式的多样性和编码的复杂性。
邮件结构 :一封邮件通常由邮件头(Header)和邮件体(Body)两部分组成。邮件头包含 From、To、Subject、Date、Message-ID 等字段。邮件体可以是单一文本,也可以是多部分MIME(Multipurpose Internet Mail Extensions)结构。多部分邮件中每个部分都有自己的Content-Type和编码。
解析邮件头 :使用 email.parser 模块将原始字节数据解析为 email.message.Message 对象。邮件头中的编码字段(如 Subject、From)可能使用 =?charset?encoding?text?= 格式编码(即 RFC 2047 编码),需要使用 email.header.decode_header() 进行解码。
解析正文 :对于多部分邮件,需要遍历邮件体的各个部分,根据 Content-Type 判断是纯文本、HTML还是附件。常见的邮件正文有两种格式:text/plain 纯文本和 text/html HTML格式。自动化处理时,优先提取纯文本部分;如果没有纯文本,再从HTML中提取文本。
处理中文乱码 :中文邮件最常见的编码是 gbk、gb2312、utf-8。解析时需要根据邮件头中的 Content-Type 中的 charset 参数进行解码。对于编码名称不规范的情况,可以使用 chardet 库自动检测编码。
# 邮件头解析 - 处理编码字段
import email
from email.header import decode_header
from email.utils import parsedate_to_datetime
def decode_email_header(header_value):
"""解码邮件头中的编码字段"""
if header_value is None:
return ''
decoded_parts = decode_header(header_value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
if charset:
try:
result.append(part.decode(charset))
except:
result.append(part.decode('utf-8', errors='replace'))
else:
result.append(part.decode('utf-8', errors='replace'))
else:
result.append(part)
return ' '.join(result)
def parse_email_headers(raw_email):
"""解析邮件头信息"""
msg = email.message_from_bytes(raw_email)
headers = {
'from': decode_email_header(msg.get('From')),
'to': decode_email_header(msg.get('To')),
'subject': decode_email_header(msg.get('Subject')),
'date': msg.get('Date'),
'message_id': msg.get('Message-ID'),
'reply_to': decode_email_header(msg.get('Reply-To')),
'cc': decode_email_header(msg.get('Cc')),
}
# 解析日期为datetime对象
try:
headers['datetime'] = parsedate_to_datetime(headers['date'])
except:
headers['datetime'] = None
return headers
# 使用示例
# raw_email = mail.fetch(msg_id, '(RFC822)')[1][0][1]
# parsed = parse_email_headers(raw_email)
# print(f"发件人: {parsed['from']}")
# print(f"主题: {parsed['subject']}")
# 邮件正文解析 - 处理多部分MIME
def get_email_body(msg):
"""提取邮件正文(优先纯文本,其次HTML)"""
body_text = ''
body_html = ''
if msg.is_multipart():
# 多部分邮件,遍历各个部分
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition', ''))
# 跳过附件
if 'attachment' in content_disposition:
continue
# 获取正文内容
if content_type == 'text/plain':
charset = part.get_content_charset() or 'utf-8'
try:
body_text += part.get_payload(decode=True).decode(charset, errors='replace')
except:
body_text += part.get_payload(decode=True).decode('utf-8', errors='replace')
elif content_type == 'text/html':
charset = part.get_content_charset() or 'utf-8'
try:
body_html += part.get_payload(decode=True).decode(charset, errors='replace')
except:
body_html += part.get_payload(decode=True).decode('utf-8', errors='replace')
else:
# 非多部分邮件
content_type = msg.get_content_type()
if content_type == 'text/plain':
charset = msg.get_content_charset() or 'utf-8'
body_text = msg.get_payload(decode=True).decode(charset, errors='replace')
elif content_type == 'text/html':
body_html = msg.get_payload(decode=True).decode(charset, errors='replace')
return body_text or body_html, body_html
# 使用示例
# raw_email = mail.fetch(msg_id, '(RFC822)')[1][0][1]
# msg = email.message_from_bytes(raw_email)
# text, html = get_email_body(msg)
# print(f"纯文本: {text[:200]}...")
# 附件提取 - 完整实现
import os
def extract_attachments(msg, save_dir='attachments'):
"""提取邮件中的所有附件并保存"""
os.makedirs(save_dir, exist_ok=True)
saved_files = []
for part in msg.walk():
content_disposition = str(part.get('Content-Disposition', ''))
# 检查是否是附件
if 'attachment' not in content_disposition:
continue
filename = part.get_filename()
if not filename:
continue
# 解码文件名(可能是编码过的)
filename = decode_email_header(filename)
# 清理文件名,避免路径穿越问题
filename = os.path.basename(filename)
# 获取附件内容
payload = part.get_payload(decode=True)
if payload is None:
continue
# 检查是否已存在同名文件
base, ext = os.path.splitext(filename)
counter = 1
filepath = os.path.join(save_dir, filename)
while os.path.exists(filepath):
filepath = os.path.join(save_dir, f'{base}_{counter}{ext}')
counter += 1
# 保存附件
with open(filepath, 'wb') as f:
f.write(payload)
saved_files.append({
'filename': filename,
'saved_path': filepath,
'size': len(payload),
'content_type': part.get_content_type(),
})
print(f'已保存附件: {filename} ({len(payload)} bytes)')
return saved_files
重点总结 :邮件解析的三步法:先用 email.message_from_bytes 解析原始邮件,再用 decode_header 解码头部字段,然后遍历邮件部分提取正文和附件。处理中文乱码时,注意指定正确的字符编码,多尝试几种编码方案。
五、POP3 收信
尽管 IMAP 是更现代的协议,但 POP3 仍然广泛使用,特别是在一些企业邮箱系统和老旧系统中。Python 的 poplib 模块是对 POP3 协议的标准实现。与 IMAP 相比,POP3 的功能较为简单:连接服务器、认证、列出邮件、下载邮件、删除邮件。本节将系统讲解如何使用 poplib 进行邮件收取。
POP3 的核心流程 :POP3 的工作流程非常直接。首先建立 SSL/TLS 连接到服务器(默认端口 995),然后依次调用 user() 和 pass_() 方法进行认证。认证成功后,使用 stat() 获取邮箱状态(邮件总数和总大小),使用 list() 获取每封邮件的编号和大小,使用 retr() 下载指定编号的邮件,使用 dele() 标记删除邮件。操作完成后调用 quit() 断开连接。
与 IMAP 的主要差异 :POP3 不支持服务器端搜索,所有搜索过滤必须在下载后本地进行。POP3 没有文件夹的概念,所有邮件都在同一个收件箱中。POP3 不支持邮件状态标记(如已读/未读),不支持部分下载。POP3 下载邮件后,服务器可以选择保留或删除邮件,这取决于服务器配置和客户端设置。
POP3 的应用场景 :在自动化办公中,POP3 主要作为辅助方案使用。例如:作为 IMAP 的备份通道,定时通过 POP3 下载所有邮件到本地归档;处理仅支持 POP3 的企业邮箱;一次性批量导出旧邮件。
# POP3 基础操作 - 连接、认证、统计
import poplib
from email import message_from_bytes
def connect_pop3(host='pop.example.com', port=995, user='', password=''):
"""连接POP3服务器并获取邮箱状态"""
try:
# 建立SSL连接
pop = poplib.POP3_SSL(host, port)
pop.set_debuglevel(0) # 设为1可查看详细通信过程
# 认证
pop.user(user)
pop.pass_(password)
# 获取邮箱状态
num_messages, mailbox_size = pop.stat()
print(f'邮箱状态: {num_messages} 封邮件,总大小 {mailbox_size} 字节')
# 获取邮件列表(编号和大小)
response, msg_list, octets = pop.list()
print(f'\n邮件列表 (前10封):')
for i, msg_info in enumerate(msg_list[:10]):
print(f' {msg_info.decode()}')
return pop, num_messages
except poplib.error_proto as e:
print(f'POP3错误: {e}')
return None, 0
# connect_pop3('pop.example.com', 995, 'user@example.com', 'password')
# POP3 下载邮件并解析
def download_all_emails(pop, max_emails=50):
"""下载POP3服务器上的邮件"""
downloaded = []
num_messages = len(pop.list()[1])
# 只下载最近的N封(从编号最大的开始,通常最新)
start = max(1, num_messages - max_emails + 1)
for i in range(start, num_messages + 1):
try:
# 下载邮件
response, lines, octets = pop.retr(i)
# 将行列表合并为字节数据
raw_email = b'\n'.join(lines)
# 解析邮件
msg = message_from_bytes(raw_email)
# 提取关键信息
subject = msg.get('Subject', '(无主题)')
from_addr = msg.get('From', '(未知发件人)')
# 解码主题
from email.header import decode_header
try:
decoded_subject = ''
for part, charset in decode_header(subject):
if isinstance(part, bytes):
decoded_subject += part.decode(charset or 'utf-8', errors='replace')
else:
decoded_subject += part
subject = decoded_subject
except:
pass
downloaded.append({
'id': i,
'subject': subject,
'from': from_addr,
'size': octets,
'raw': raw_email,
})
print(f'[{i}/{num_messages}] {subject[:50]}...')
except poplib.error_proto as e:
print(f'下载邮件 {i} 失败: {e}')
continue
return downloaded
# pop, count = connect_pop3()
# if pop:
# emails = download_all_emails(pop, 10)
# pop.quit()
# POP3 邮件删除与服务器管理
def pop3_cleanup(pop, delete_before_days=30):
"""删除POP3服务器上指定天数前的邮件"""
from datetime import datetime, timedelta, timezone
import email
from email.utils import parsedate_to_datetime
cutoff = datetime.now(timezone.utc) - timedelta(days=delete_before_days)
num_messages = len(pop.list()[1])
deleted_count = 0
for i in range(1, num_messages + 1):
try:
# 只获取邮件头(用TOP命令)
response, lines, octets = pop.top(i, 0)
raw_header = b'\n'.join(lines)
msg = email.message_from_bytes(raw_header)
# 解析日期
date_str = msg.get('Date')
if date_str:
try:
email_date = parsedate_to_datetime(date_str)
if email_date < cutoff:
pop.dele(i)
deleted_count += 1
print(f'已标记删除: [{i}] {msg.get("Subject", "")[:40]}')
except:
pass
except:
continue
print(f'\n共标记删除 {deleted_count} 封邮件')
return deleted_count
# pop, _ = connect_pop3()
# if pop:
# deleted = pop3_cleanup(pop, 30)
# pop.quit()
# print(f'删除操作完成,实际删除将在quit()时生效')
重点总结 :POP3 的使用比 IMAP 简单直接,但功能也相对有限。核心操作为:连接→认证→list→retr→quit。POP3 适合做邮件备份和一次性批量下载,不适合需要精细管理邮件的场景。在自动化系统中,应优先使用 IMAP,将 POP3 作为补充方案。
六、自动分类与规则引擎
当邮箱中的邮件数量越来越多时,自动分类和规则处理就成了必不可少的工具。通过编写邮件分类规则引擎,可以自动将邮件归类、打标签、转发或执行特定操作,大幅提升邮件处理效率。本节将介绍如何构建一个灵活的邮件分类与规则系统。
规则引擎架构 :一个完整的邮件规则引擎通常包含三个核心组件:规则定义 (匹配条件和执行动作)、规则匹配器 (逐一检查规则并执行优先级判断)、动作执行器 (执行分类、标记、转发等具体操作)。规则可以采用配置式(如 JSON/YAML 定义)或编程式(Python 代码定义),配置式更灵活,修改规则无需改代码。
关键词分类 :最常见的分类方式是关键词匹配。通过定义不同类别对应的关键词列表,在邮件主题和正文中搜索匹配。例如,包含"发票"、"账单"、"付款"的邮件归入"财务"类别;包含"简历"、"面试"、"招聘"的归入"人事"类别。关键词匹配支持精确匹配、模糊匹配和正则表达式匹配。
发件人策略 :基于发件人的规则非常实用。白名单(重要联系人直接标记为高优先级)、黑名单(垃圾邮件发送者自动归档或删除)、域名单(同一公司的邮件自动归类)是三种最常见的策略。
# 邮件规则引擎 - 规则定义与匹配
import re
from dataclasses import dataclass, field
from typing import List, Callable, Optional
class MailAction:
"""邮件处理动作"""
MARK_READ = 'mark_read'
MARK_FLAGGED = 'mark_flagged'
MOVE_TO = 'move_to'
COPY_TO = 'copy_to'
DELETE = 'delete'
FORWARD = 'forward'
REPLY = 'auto_reply'
NOTIFY = 'notify'
@dataclass
class MailRule:
"""邮件规则定义"""
name: str
priority: int = 0 # 数字越小优先级越高
conditions: dict = field(default_factory=dict)
actions: List[dict] = field(default_factory=list)
enabled: bool = True
class MailRuleEngine:
"""邮件分类规则引擎"""
def __init__(self):
self.rules: List[MailRule] = []
def add_rule(self, rule: MailRule):
self.rules.append(rule)
# 按优先级排序
self.rules.sort(key=lambda r: r.priority)
def match(self, msg, headers) -> List[dict]:
"""对邮件应用所有规则,返回匹配的动作列表"""
matched_actions = []
for rule in self.rules:
if not rule.enabled:
continue
if self._check_conditions(rule.conditions, msg, headers):
print(f'规则匹配: {rule.name}')
matched_actions.extend(rule.actions)
break # 高优先级规则匹配后停止(可根据需要修改)
return matched_actions
def _check_conditions(self, conditions, msg, headers) -> bool:
"""检查规则条件是否满足"""
if not conditions:
return True
# 关键词匹配(主题)
if 'subject_keywords' in conditions:
subject = headers.get('subject', '')
keywords = conditions['subject_keywords']
if not any(kw.lower() in subject.lower() for kw in keywords):
return False
# 关键词匹配(正文)
if 'body_keywords' in conditions:
body_text, _ = get_email_body(msg)
keywords = conditions['body_keywords']
if not any(kw.lower() in body_text.lower() for kw in keywords):
return False
# 发件人匹配
if 'from_pattern' in conditions:
from_addr = headers.get('from', '')
pattern = conditions['from_pattern']
if not re.search(pattern, from_addr, re.IGNORECASE):
return False
# 附件类型匹配
if 'attachment_types' in conditions:
allowed_types = conditions['attachment_types']
has_matching = False
for part in msg.walk():
if part.get_filename():
if part.get_content_type() in allowed_types:
has_matching = True
break
if not has_matching:
return False
return True
# 规则引擎使用示例
def setup_rules():
"""配置邮件分类规则"""
engine = MailRuleEngine()
# 规则1:财务发票 - 移动到财务文件夹
engine.add_rule(MailRule(
name='财务发票',
priority=1,
conditions={
'subject_keywords': ['发票', '账单', '付款', '财务', '报销'],
},
actions=[
{'type': MailAction.MOVE_TO, 'target': 'Finance'},
{'type': MailAction.MARK_FLAGGED},
]
))
# 规则2:人事招聘
engine.add_rule(MailRule(
name='招聘邮件',
priority=2,
conditions={
'subject_keywords': ['简历', '面试', '招聘', 'offer', '录用'],
'from_pattern': '@company\.com$',
},
actions=[
{'type': MailAction.MOVE_TO, 'target': 'HR/Recruitment'},
{'type': MailAction.NOTIFY, 'target': 'hr-team'},
]
))
# 规则3:项目通知
engine.add_rule(MailRule(
name='项目通知',
priority=3,
conditions={
'from_pattern': 'notify@(github|gitlab|jira|confluence)',
},
actions=[
{'type': MailAction.MOVE_TO, 'target': 'Projects'},
{'type': MailAction.MARK_READ},
]
))
# 规则4:垃圾邮件
engine.add_rule(MailRule(
name='垃圾邮件',
priority=10,
conditions={
'subject_keywords': ['广告', '推广', '优惠', '中奖', '免费领取'],
},
actions=[
{'type': MailAction.MOVE_TO, 'target': 'Spam'},
{'type': MailAction.DELETE},
]
))
return engine
# engine = setup_rules()
# 对每封邮件:engine.match(msg, headers)
# YAML配置规则 - 无需改代码即可调整规则
"""
# rules.yaml 规则配置文件示例
rules:
- name: "财务发票"
priority: 1
conditions:
subject_keywords: ["发票", "账单", "付款"]
body_keywords: ["金额", "账户", "转账"]
actions:
- type: "move_to"
target: "INBOX.Finance"
- type: "mark_flagged"
- name: "客户咨询"
priority: 2
conditions:
from_pattern: "@(client|customer)\.com$"
actions:
- type: "move_to"
target: "INBOX.Clients"
- type: "notify"
target: "sales-team"
"""
import yaml
def load_rules_from_yaml(yaml_path='rules.yaml'):
"""从YAML文件加载规则配置"""
with open(yaml_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
engine = MailRuleEngine()
for rule_config in config['rules']:
rule = MailRule(
name=rule_config['name'],
priority=rule_config.get('priority', 99),
conditions=rule_config['conditions'],
actions=rule_config['actions'],
)
engine.add_rule(rule)
print(f'已加载 {len(config["rules"])} 条规则')
return engine
# engine = load_rules_from_yaml('rules.yaml')
重点总结 :邮件规则引擎的核心是"条件-动作"模式。通过合理设计规则优先级、匹配条件和执行动作,可以实现复杂的邮件自动分类和处理。将规则配置外置于 YAML/JSON 文件中,可以使系统更灵活,修改规则无需重新部署代码。
七、附件自动处理
邮件附件是办公自动化中的重要处理对象。日常工作中常见的附件包括 PDF 文档、Excel 表格、Word 文件、图片、压缩包等。自动保存和管理附件可以节省大量手动操作时间。本节将介绍附件自动处理的完整实现方案。
附件识别与提取 :遍历邮件的 MIME 部分,通过 Content-Disposition 头判断是否为附件。需要特别注意的是,部分邮件将附件以 inline 方式嵌入,这种情况下的文件名获取方式略有不同。附件提取的关键步骤包括:获取附件名(可能经过编码)、解码附件名、获取附件内容、根据附件类型选择保存策略。
附件类型判断 :根据 MIME 类型(Content-Type)和文件扩展名判断附件类别。常见类别包括:文档类(PDF、DOCX、XLSX、PPTX)、图片类(JPG、PNG、GIF)、压缩包(ZIP、RAR、7z)、代码文件(PY、JS、HTML)等。不同类型可以采用不同的处理路径。例如,压缩包需要解压后处理,图片可能需要缩略图生成。
安全注意事项 :处理附件时必须注意安全风险。不要直接执行附件中的可执行文件(.exe、.bat、.vbs 等),不要直接打开附件中的文档(可能包含宏病毒),对压缩包附件先扫描再解压。文件名需要清理,避免路径穿越攻击(如 ../../etc/passwd)。附件大小需要限制,避免磁盘空间耗尽。
# 附件自动保存系统
import os
import mimetypes
from datetime import datetime
class AttachmentSaver:
"""附件自动保存器"""
def __init__(self, base_dir='attachments'):
self.base_dir = base_dir
self.supported_types = {
'document': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'],
'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'],
'archive': ['.zip', '.rar', '.7z', '.tar', '.gz'],
'code': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.sql'],
'data': ['.csv', '.json', '.xml', '.yaml', '.yml'],
}
def classify_attachment(self, filename):
"""根据文件扩展名分类"""
_, ext = os.path.splitext(filename.lower())
for category, extensions in self.supported_types.items():
if ext in extensions:
return category
return 'other'
def get_safe_filename(self, filename):
"""生成安全的文件名"""
# 移除路径分隔符,避免路径穿越
filename = os.path.basename(filename)
# 替换非法字符
illegal_chars = '<>:"/\\|?*'
for char in illegal_chars:
filename = filename.replace(char, '_')
return filename
def save_attachment(self, msg, email_subject=''):
"""保存邮件中的所有附件"""
saved_files = []
email_dir_name = datetime.now().strftime('%Y%m%d_%H%M%S')
for part in msg.walk():
# 检查是否有附件
content_disposition = str(part.get('Content-Disposition', ''))
if 'attachment' not in content_disposition and 'inline' not in content_disposition:
continue
filename = part.get_filename()
if not filename:
continue
# 解码和解安全化文件名
filename = decode_email_header(filename)
filename = self.get_safe_filename(filename)
if not filename:
continue
# 分类附件
category = self.classify_attachment(filename)
# 创建目录: base_dir/分类/邮件日期/
save_dir = os.path.join(self.base_dir, category, email_dir_name)
os.makedirs(save_dir, exist_ok=True)
# 获取附件内容
payload = part.get_payload(decode=True)
if payload is None:
continue
# 处理同名文件
filepath = os.path.join(save_dir, filename)
counter = 1
while os.path.exists(filepath):
name, ext = os.path.splitext(filename)
filepath = os.path.join(save_dir, f'{name}_{counter}{ext}')
counter += 1
# 保存
with open(filepath, 'wb') as f:
f.write(payload)
file_info = {
'original_name': part.get_filename(),
'saved_name': os.path.basename(filepath),
'path': filepath,
'size': len(payload),
'type': part.get_content_type(),
'category': category,
}
saved_files.append(file_info)
print(f'[{category}] 已保存: {os.path.basename(filepath)} ({len(payload)} bytes)')
return saved_files
# 压缩包附件自动处理
import zipfile
import tarfile
import io
def process_archive_attachment(payload, filename, extract_dir='extracted'):
"""处理压缩包附件(解压并提取内容)"""
_, ext = os.path.splitext(filename.lower())
os.makedirs(extract_dir, exist_ok=True)
extracted_files = []
try:
if ext == '.zip':
with zipfile.ZipFile(io.BytesIO(payload)) as zf:
# 安全解压:防止路径穿越
for member in zf.namelist():
# 跳过目录和路径穿越
if member.endswith('/') or '..' in member:
continue
safe_name = os.path.basename(member)
if not safe_name:
continue
target_path = os.path.join(extract_dir, safe_name)
with open(target_path, 'wb') as f:
f.write(zf.read(member))
extracted_files.append({
'source': member,
'saved': target_path,
'size': os.path.getsize(target_path),
})
print(f' 解压: {member} -> {safe_name}')
elif ext in ('.tar', '.gz', '.bz2'):
with tarfile.open(fileobj=io.BytesIO(payload), mode='r:*') as tf:
for member in tf.getmembers():
if member.isfile() and '..' not in member.name:
safe_name = os.path.basename(member.name)
if not safe_name:
continue
target_path = os.path.join(extract_dir, safe_name)
with open(target_path, 'wb') as f:
f.write(tf.extractfile(member).read())
extracted_files.append({
'source': member.name,
'saved': target_path,
'size': os.path.getsize(target_path),
})
print(f' 解压: {member.name} -> {safe_name}')
except Exception as e:
print(f'解压失败 {filename}: {e}')
return extracted_files
# 附件内容提取(将附件文本内容提取到数据库或索引)
def extract_attachment_text(payload, content_type):
"""提取附件中的文本内容"""
import PyPDF2 # 假设已安装
if content_type == 'application/pdf':
try:
pdf_file = io.BytesIO(payload)
reader = PyPDF2.PdfReader(pdf_file)
text = ''
for page in reader.pages:
text += page.extract_text()
return text
except Exception as e:
return f'[PDF提取失败: {e}]'
elif content_type == 'text/plain':
return payload.decode('utf-8', errors='replace')
elif content_type == 'text/csv':
return payload.decode('utf-8', errors='replace')
else:
# 尝试作为纯文本解码
try:
return payload.decode('utf-8', errors='replace')
except:
return '[不支持直接提取文本的附件类型]'
# 附件处理综合流程
def process_all_attachments(msg, save_base_dir):
"""附件处理综合流程:保存 + 解压 + 索引"""
# 1. 保存附件
saver = AttachmentSaver(save_base_dir)
saved = saver.save_attachment(msg)
for file_info in saved:
# 2. 如果是压缩包,解压
if file_info['category'] == 'archive':
with open(file_info['path'], 'rb') as f:
payload = f.read()
extracted = process_archive_attachment(
payload,
os.path.basename(file_info['path']),
file_info['path'] + '_extracted'
)
# 3. 如果是文档,提取文本
if file_info['type'] in ('application/pdf', 'text/plain', 'text/csv'):
with open(file_info['path'], 'rb') as f:
payload = f.read()
text_content = extract_attachment_text(payload, file_info['type'])
# 可存入数据库或全文搜索引擎
return saved
重点总结 :附件自动化处理的核心包括识别提取、分类存储、压缩解压、内容提取四个环节。安全处理是重中之重——必须防范路径穿越攻击、恶意文件类型和超大附件。根据附件类型采用不同的处理策略,可以提高自动化处理的效率和准确性。
八、未读邮件监控
在许多自动化场景中,需要实时监控邮箱中的新邮件,并对新邮件做出及时响应。例如,自动处理客户咨询邮件、监控系统告警邮件、自动提取验证码等。一个健壮的邮件监控系统需要具备定时检查、高效通知、异常恢复和长期稳定运行的能力。
监控循环架构 :邮件监控的核心是一个无限循环,每次循环执行以下操作:连接邮箱、搜索未读邮件、处理每封新邮件、标记邮件为已读、等待一段时间后进入下一轮循环。循环间隔需要合理设置——太频繁会增加服务器负担(甚至被限流),太稀疏会导致响应不及时。一般建议设置为30-60秒。
IDLE 命令优化 :部分 IMAP 服务器支持 IDLE 命令,这是一种推送机制。客户端发送 IDLE 命令后,服务器在有新邮件到达时主动通知客户端,而不是客户端反复轮询。IDLE 模式可以大大降低网络流量和服务端负载,实现接近实时的通知。IMAPlib 中通过 idle() 和 idle_done() 方法实现。
异常处理与重连 :网络连接在长时间运行中不可避免地会遇到中断。监控系统必须具备自动重连机制,包括:捕获网络异常(超时、连接重置等)、指数退避重连策略、记录错误日志、自动恢复后继续正常监控。
# 未读邮件监控器 - 基础循环版
import time
import logging
from datetime import datetime
class UnreadMailMonitor:
"""未读邮件监控器"""
def __init__(self, imap_client, poll_interval=30):
self.client = imap_client
self.poll_interval = poll_interval
self.running = False
self.processed_ids = set()
logging.basicConfig(level=logging.INFO)
def start(self):
"""启动监控循环"""
self.running = True
logging.info('邮件监控器已启动')
while self.running:
try:
self._check_once()
except Exception as e:
logging.error(f'监控异常: {e}')
self._reconnect()
time.sleep(self.poll_interval)
def stop(self):
"""停止监控"""
self.running = False
logging.info('邮件监控器已停止')
def _check_once(self):
"""单次检查"""
with self.client as mail:
mail.select_mailbox('INBOX')
status, ids = mail.search('UNSEEN')
if status != 'OK' or not ids[0]:
return
unseen_ids = ids[0].split()
new_ids = [mid for mid in unseen_ids if mid not in self.processed_ids]
for msg_id in new_ids:
self._process_new_message(mail, msg_id)
self.processed_ids.add(msg_id)
logging.info(f'检查完成: {len(new_ids)} 封新邮件')
def _process_new_message(self, mail, msg_id):
"""处理单封新邮件(子类可重写)"""
status, data = mail.fetch(msg_id, '(RFC822)')
if status != 'OK':
return
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
subject = decode_email_header(msg.get('Subject', ''))
logging.info(f'新邮件: {subject}')
print(f'\n[新邮件] {datetime.now().strftime("%H:%M:%S")}')
print(f' 主题: {subject}')
print(f' 发件人: {decode_email_header(msg.get("From", ""))}')
# 标记为已读
mail.connection.store(msg_id, '+FLAGS', '\\Seen')
def _reconnect(self):
"""重连机制"""
for attempt in range(1, 6):
wait = min(2 ** attempt, 60)
logging.info(f'尝试重连 (第{attempt}次, {wait}秒后)...')
time.sleep(wait)
try:
self.client = IMAPClient(self.client.host, self.client.port,
self.client.user, self.client.password)
logging.info('重连成功')
return
except Exception as e:
logging.error(f'重连失败: {e}')
logging.critical('多次重连失败,停止监控')
self.running = False
# monitor = UnreadMailMonitor(IMAPClient('imap.example.com', 993, 'user', 'pass'))
# monitor.start()
# IDLE模式监控 - 实时推送
def monitor_with_idle(host, port, user, password):
"""使用IDLE模式实现实时邮件监控"""
mail = imaplib.IMAP4_SSL(host, port)
mail.login(user, password)
mail.select('INBOX')
print('IDLE监控已启动,等待新邮件...')
while True:
try:
# 发送IDLE命令
mail.send(b'IDLE\r\n')
# 持续读取服务器响应(超时29分钟,避免自动断开)
while True:
response = mail.readline()
if not response:
break
# 服务器通知有新邮件
if b'EXISTS' in response:
print(f'\n[{datetime.now().strftime("%H:%M:%S")}] 检测到新邮件!')
# 退出IDLE模式处理邮件
mail.send(b'DONE\r\n')
mail.readline() # 读取IDLE终止响应
# 搜索未读邮件
status, ids = mail.search(None, 'UNSEEN')
if status == 'OK':
for msg_id in ids[0].split()[-3:]: # 最多处理3封
status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])')
raw = data[0][1].decode('utf-8', errors='replace')
print(f'新邮件头:\n{raw[:300]}')
# 重新进入IDLE模式
mail.send(b'IDLE\r\n')
except (ConnectionError, TimeoutError) as e:
print(f'连接断开: {e}')
time.sleep(5)
# 重连... (省略实际重连代码)
break
except imaplib.IMAP4.error as e:
print(f'IMAP错误: {e}')
time.sleep(5)
continue
mail.logout()
# 邮件监控回调系统
from typing import Callable, List
from dataclasses import dataclass
@dataclass
class MailEvent:
"""邮件事件"""
msg_id: str
subject: str
sender: str
received_at: datetime
msg_object: object
class EventDrivenMonitor:
"""事件驱动的邮件监控器"""
def __init__(self, host, port, user, password):
self.config = {'host': host, 'port': port, 'user': user, 'password': password}
self.handlers: List[Callable[[MailEvent], None]] = []
def on_new_email(self, handler: Callable):
"""注册新邮件处理器"""
self.handlers.append(handler)
return handler # 支持装饰器
def _dispatch(self, event: MailEvent):
"""将事件分发给所有处理器"""
for handler in self.handlers:
try:
handler(event)
except Exception as e:
logging.error(f'处理器 {handler.__name__} 执行失败: {e}')
def run(self):
"""运行监控器"""
mail = imaplib.IMAP4_SSL(self.config['host'], self.config['port'])
mail.login(self.config['user'], self.config['password'])
mail.select('INBOX')
last_check = time.time()
while True:
try:
status, ids = mail.search(None, f'UNSEEN SINCE {datetime.fromtimestamp(last_check).strftime("%d-%b-%Y")}')
if status == 'OK' and ids[0]:
for msg_id in ids[0].split():
status, data = mail.fetch(msg_id, '(RFC822)')
if status != 'OK':
continue
msg = email.message_from_bytes(data[0][1])
event = MailEvent(
msg_id=msg_id.decode(),
subject=decode_email_header(msg.get('Subject', '')),
sender=decode_email_header(msg.get('From', '')),
received_at=datetime.now(),
msg_object=msg,
)
self._dispatch(event)
mail.store(msg_id, '+FLAGS', '\\Seen')
last_check = time.time()
time.sleep(30)
except Exception as e:
logging.error(f'监控异常: {e}')
time.sleep(5)
# 使用示例
# monitor = EventDrivenMonitor('imap.example.com', 993, 'user', 'pass')
#
# @monitor.on_new_email
# def handle_alert(event):
# if '告警' in event.subject:
# print(f'收到告警: {event.subject}')
# # 发送企业微信通知
# # send_wechat_notification(event.subject)
#
# @monitor.on_new_email
# def handle_order(event):
# if '订单' in event.subject:
# print(f'收到订单: {event.subject}')
# # 自动处理订单
# # process_order(event.msg_object)
#
# monitor.run()
重点总结 :邮件监控系统的三个关键设计:合理的轮询间隔(30-60秒)、健壮的重连机制(指数退避)、灵活的事件处理架构(回调函数)。IDLE 模式适合对实时性要求高的场景。生产环境中还应考虑连接池管理、限流策略和监控告警本身的可观测性。
九、实战案例
掌握了邮件自动收取的核心技术后,让我们一起看三个典型的实战案例。这些案例融合了前面的知识点,展示了邮件自动化在实际工作中的典型应用场景。
案例一:自动提取验证码
许多网站和服务的验证码通过邮件发送。自动提取验证码可以用于自动化测试、账号注册等场景。核心思路是:搜索最近来自特定发件人的邮件,从邮件正文或主题中提取验证码,支持多种验证码格式。需要注意验证码邮件的时效性,通常5分钟内有效。
# 自动提取验证码
import re
def extract_verification_code(mail, sender='noreply@example.com', max_age_minutes=5):
"""从邮件中提取验证码"""
from datetime import datetime, timedelta
mail.select('INBOX')
since_date = (datetime.now() - timedelta(minutes=max_age_minutes)).strftime('%d-%b-%Y')
# 搜索近期来自指定发件人的未读邮件
criteria = f'(UNSEEN FROM "{sender}" SINCE {since_date})'
status, ids = mail.search(None, criteria)
if status != 'OK' or not ids[0]:
print('未找到验证码邮件')
return None
codes = []
code_patterns = [
(r'验证码[::]\s*(\d{4,8})', '4-8位数字验证码'),
(r'code[::]\s*([A-Z0-9]{4,8})', '字母数字验证码'),
(r'(\d{6})', '6位数字(宽松)'),
]
for msg_id in ids[0].split():
status, data = mail.fetch(msg_id, '(RFC822)')
if status != 'OK':
continue
msg = email.message_from_bytes(data[0][1])
body_text, _ = get_email_body(msg)
# 尝试所有验证码模式
for pattern, desc in code_patterns:
match = re.search(pattern, body_text)
if match:
code = match.group(1)
codes.append({'code': code, 'source': desc, 'email_id': msg_id})
print(f'提取到验证码: {code} ({desc})')
break # 一封邮件只取一个验证码
# 标记为已读
mail.store(msg_id, '+FLAGS', '\\Seen')
return codes[0] if codes else None
# code = extract_verification_code(mail, 'verify@example.com')
# if code:
# print(f'验证码: {code["code"]}')
案例二:邮件订单自动处理
电商平台每天会收到大量订单邮件,手动处理效率极低。通过邮件自动化系统,可以自动解析订单信息、提取关键字段、写入数据库、发送确认回复,实现订单处理的半自动化或全自动化。
# 订单邮件自动处理
import json
import sqlite3
def process_order_email(msg):
"""解析订单邮件并写入数据库"""
subject = decode_email_header(msg.get('Subject', ''))
body_text, body_html = get_email_body(msg)
order_data = {}
# 1. 提取订单号
order_match = re.search(r'订单号[::]\s*(\w+)', body_text)
if order_match:
order_data['order_id'] = order_match.group(1)
# 2. 提取金额
amount_match = re.search(r'金额[::]\s*[¥¥]?(\d+\.?\d*)', body_text)
if amount_match:
order_data['amount'] = float(amount_match.group(1))
# 3. 提取商品信息
product_match = re.search(r'商品[::]\s*(.+)', body_text)
if product_match:
order_data['product'] = product_match.group(1).strip()
# 4. 提取客户信息
customer_match = re.search(r'客户[::]\s*(.+)', body_text)
if customer_match:
order_data['customer'] = customer_match.group(1).strip()
# 5. 提取发件人
order_data['email'] = msg.get('From', '')
if order_data.get('order_id'):
# 写入数据库
conn = sqlite3.connect('orders.db')
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
amount REAL,
product TEXT,
customer TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
c.execute('''
INSERT OR REPLACE INTO orders (order_id, amount, product, customer, email)
VALUES (?, ?, ?, ?, ?)
''', (
order_data['order_id'],
order_data.get('amount', 0),
order_data.get('product', ''),
order_data.get('customer', ''),
order_data.get('email', ''),
))
conn.commit()
conn.close()
print(f'订单 {order_data["order_id"]} 已写入数据库')
return order_data
print('未能提取到有效订单信息')
return None
# 批量处理订单邮件
def batch_process_orders(mail, sender_pattern='order@'):
"""批量处理来自特定发件人的未处理订单"""
mail.select('INBOX')
status, ids = mail.search(None, f'UNSEEN FROM "{sender_pattern}"')
if status != 'OK' or not ids[0]:
print('没有未处理的订单邮件')
return []
processed = []
for msg_id in ids[0].split():
status, data = mail.fetch(msg_id, '(RFC822)')
if status != 'OK':
continue
msg = email.message_from_bytes(data[0][1])
result = process_order_email(msg)
if result:
# 标记已处理并移动到归档文件夹
mail.copy(msg_id, 'Processed')
mail.store(msg_id, '+FLAGS', '\\Deleted')
processed.append(result)
# 永久删除原位置的已处理邮件
mail.expunge()
return processed
# orders = batch_process_orders(mail, 'orders@shop.com')
# print(f'成功处理 {len(orders)} 个订单')
案例三:邮件附件备份系统
企业的重要文件经常通过邮件附件传递。构建一个自动备份系统,可以确保所有附件被及时保存、分类归档,并建立索引便于检索。这个系统可以作为一个定期运行的定时任务(如每天一次)。
# 邮件附件备份系统
import hashlib
import os
from datetime import datetime, timedelta
class EmailBackupSystem:
"""邮件附件备份系统"""
def __init__(self, backup_root='email_backup', retention_days=365):
self.backup_root = backup_root
self.retention_days = retention_days
self.db_path = os.path.join(backup_root, 'backup_index.db')
def run_backup(self, mail, search_days=1):
"""执行一次备份"""
mail.select('INBOX')
# 搜索指定时间范围内的所有邮件
since_date = (datetime.now() - timedelta(days=search_days)).strftime('%d-%b-%Y')
status, ids = mail.search(None, f'(SINCE {since_date})')
if status != 'OK' or not ids[0]:
print('没有需要备份的邮件')
return []
backup_records = []
count = 0
for msg_id in ids[0].split():
status, data = mail.fetch(msg_id, '(RFC822)')
if status != 'OK':
continue
msg = email.message_from_bytes(data[0][1])
email_date = msg.get('Date', datetime.now().isoformat())
email_subject = decode_email_header(msg.get('Subject', '(无主题)'))
# 保存附件
saver = AttachmentSaver(os.path.join(self.backup_root, 'attachments'))
saved_files = saver.save_attachment(msg, email_subject)
for file_info in saved_files:
# 计算文件哈希(去重)
with open(file_info['path'], 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
record = {
'msg_id': msg_id.decode(),
'subject': email_subject,
'date': email_date,
'filename': file_info['saved_name'],
'path': file_info['path'],
'size': file_info['size'],
'category': file_info['category'],
'hash': file_hash,
}
backup_records.append(record)
count += 1
print(f'[{count}] 已备份: {file_info["saved_name"]} '
f'({file_info["category"]}, {file_info["size"]} bytes)')
self._save_index(backup_records)
return backup_records
def _save_index(self, records):
"""保存备份索引"""
import json
index_path = os.path.join(self.backup_root, 'backup_index.json')
existing = []
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
existing = json.load(f)
existing.extend(records)
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
print(f'\n索引已更新: {len(existing)} 条记录')
def find_by_filename(self, keyword):
"""按文件名搜索备份"""
import json
index_path = os.path.join(self.backup_root, 'backup_index.json')
if not os.path.exists(index_path):
return []
with open(index_path, 'r', encoding='utf-8') as f:
records = json.load(f)
results = [r for r in records if keyword.lower() in r['filename'].lower()]
return results
def cleanup_old_backups(self):
"""清理过期备份"""
import shutil
cutoff = datetime.now() - timedelta(days=self.retention_days)
# 实现按日期清理...
pass
# 使用示例(设置每天运行一次)
# backup = EmailBackupSystem('\\\\nas\\backups\\email', retention_days=365)
# with IMAPClient('imap.company.com', 993, 'user', 'pass') as mail:
# records = backup.run_backup(mail, search_days=1)
# print(f'本次备份了 {len(records)} 个附件')
重点总结 :三个实战案例覆盖了邮件自动化的典型场景:验证码提取(实时响应)、订单处理(数据解析与存储)、附件备份(文件管理与索引)。每个案例都融合了IMAP操作、邮件解析、规则处理和本地持久化等技术要点。实际应用中,可以结合定时任务框架(如APScheduler、Celery)和消息队列(如RabbitMQ)构建更健壮的邮件自动化系统。