IMAP/POP3:邮件自动收取与处理

Python 办公自动化专题 · 用Python自动管理收件箱

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, IMAP, POP3, 邮件收取, imaplib, 邮件解析, 邮件自动化, Python

一、IMAP vs POP3 — 协议原理对比

邮件自动收取的第一步是选对协议。IMAP(Internet Message Access Protocol)和 POP3(Post Office Protocol 3)是两种最主流的邮件接收协议,它们在设计理念、功能特性和适用场景上存在本质差异。理解二者的区别,是构建邮件自动化系统的基础。

协议原理对比:POP3 是一种"离线"协议,其设计初衷是在客户端和服务器之间建立临时连接,将邮件从服务器下载到本地后,服务器端的邮件通常会被删除。这种方式在早期拨号上网时代非常流行——用户拨号连接、收取邮件、断开连接、在本地阅读。IMAP 则是一种"在线"协议,邮件始终保留在服务器上,客户端对邮件的操作(阅读、移动、删除、标记)会同步回服务器。IMAP 更像是一种远程文件系统协议,客户端可以只下载邮件的部分内容(如先下载邮件头,需要时再下载全文)。

核心差异对照表

特性POP3IMAP
工作模式离线模式在线/离线均可
邮件存储下载到本地,服务器可选删除始终保留在服务器
多设备同步不支持天然支持
文件夹管理仅收件箱支持自定义文件夹
部分下载不支持支持(先下载头信息)
服务器搜索不支持支持服务端搜索
端口号(SSL)995993
适用场景单设备、离线阅读多设备、在线办公

适用场景分析:IMAP 更适用于自动化办公场景,原因有三:第一,多个自动化任务可以在不同设备上协同操作同一个邮箱;第二,可以通过服务器端搜索快速定位邮件,避免大量数据下载;第三,可以精细控制要同步的文件夹和邮件状态。POP3 更适合个人单设备使用的简单场景或作为备份方案(在 IMAP 之外再通过 POP3 做一次本地备份)。

Python 中的选择策略:Python 标准库同时提供了 imaplibpoplib 两个模块。在开发邮件自动化系统时,建议优先选择 IMAP 协议。如果需要支持遗留系统提供 POP3 接口,也需要了解 poplib 的基本用法。

# 检测邮箱支持的协议能力 import imaplib def check_mailbox_capabilities(host, port=993): """检测IMAP服务器支持的功能""" mail = imaplib.IMAP4_SSL(host, port) mail.login('user@example.com', 'password') status, capabilities = mail.capability() if status == 'OK': print('服务器支持的功能:', capabilities) if b'IDLE' in capabilities[0]: print('支持IDLE推送通知') if b'QUOTA' in capabilities[0]: print('支持配额查询') if b'CHILDREN' in capabilities[0]: print('支持子文件夹') mail.logout() # check_mailbox_capabilities('imap.example.com')
# IMAP 与 POP3 连接对比演示 import imaplib import poplib # IMAP连接 - 在线模式 imap = imaplib.IMAP4_SSL('imap.gmail.com') imap.login('user@gmail.com', 'password') # 列出所有文件夹 status, folders = imap.list() for folder in folders: print('IMAP文件夹:', folder.decode()) imap.logout() print('---') # POP3连接 - 离线模式 pop = poplib.POP3_SSL('pop.gmail.com') pop.user('user@gmail.com') pop.pass_('password') # POP3只有收件箱 num_messages = len(pop.list()[1]) print(f'POP3收件箱邮件数: {num_messages}') pop.quit()
# 协议选择器工厂函数 from enum import Enum import imaplib import poplib class MailProtocol(Enum): IMAP = 'imap' POP3 = 'pop3' PROTOCOL_CONFIG = { MailProtocol.IMAP: { 'host': 'imap.example.com', 'port': 993, 'lib': imaplib.IMAP4_SSL, 'supports_folders': True, 'supports_search': True, }, MailProtocol.POP3: { 'host': 'pop.example.com', 'port': 995, 'lib': poplib.POP3_SSL, 'supports_folders': False, 'supports_search': False, }, } def get_mail_connection(protocol: MailProtocol, user: str, password: str): """根据协议类型获取连接""" config = PROTOCOL_CONFIG[protocol] if protocol == MailProtocol.IMAP: conn = config['lib'](config['host'], config['port']) conn.login(user, password) return conn else: conn = config['lib'](config['host'], config['port']) conn.user(user) conn.pass_(password) return conn

重点总结:IMAP 是邮件自动化的首选协议,它支持多设备同步、服务器端搜索、文件夹管理,非常适合需要程序化操作邮件的场景。POP3 适用于简单备份或单向下载场景。在实际项目中选择时,优先考虑 IMAP,只有在目标邮箱仅支持 POP3 时才退而求其次。

二、imaplib 核心用法

Python 标准库中的 imaplib 模块提供了 IMAP4 协议的完整实现,是构建邮件自动化系统的核心工具。它支持 IMAP4rev1 标准中的所有关键操作,包括认证、邮箱选择、邮件搜索、邮件获取、状态管理等。本节详细讲解 imaplib 的核心用法。

连接与认证:连接 IMAP 服务器时,首推使用 SSL/TLS 加密连接。绝大多数现代邮件服务商(如 Gmail、Outlook、QQ邮箱、163邮箱)都要求使用 SSL 连接。连接成功后,需要调用 login() 方法进行认证。对于开启了两步验证的账号,通常需要使用"应用专用密码"代替登录密码。

邮箱选择与操作:认证通过后,使用 select() 方法选择一个邮箱(默认是 INBOX)。select() 方法返回邮箱的状态信息,包括邮件总数、最近邮件数等。IMAP 支持多个邮箱文件夹,可以通过 list() 方法列出所有邮箱,使用 create() 方法创建新邮箱。

搜索与获取search() 方法用于在服务器端搜索邮件,返回满足条件的邮件序号列表。fetch() 方法根据序号获取邮件的指定部分,可以只获取邮件头、指定 MIME 部分或完整邮件。通过设置不同的 fetch 参数,可以精细控制要下载的数据量。

常用操作演示

# imaplib 基础连接与邮箱操作 import imaplib def connect_imap(host, port, user, password): """连接IMAP服务器并选择收件箱""" try: # 建立SSL连接 mail = imaplib.IMAP4_SSL(host, port) mail.login(user, password) print(f'成功连接到 {host}') # 选择收件箱 status, messages = mail.select('INBOX') print(f'收件箱状态: {status}') print(f'邮件总数: {messages[0].decode()}') # 列出所有邮箱文件夹 status, folders = mail.list() print('\n所有邮箱文件夹:') for folder in folders: print(f' - {folder.decode()}') return mail except imaplib.IMAP4.error as e: print(f'IMAP连接失败: {e}') return None # 使用示例(请替换为实际凭据) # mail = connect_imap('imap.example.com', 993, 'user@example.com', 'password')
# 搜索并获取邮件 - 详细演示 def search_and_fetch(mail, search_criteria='ALL'): """按条件搜索并获取邮件头信息""" # 搜索邮件 status, message_ids = mail.search(None, search_criteria) if status != 'OK': print('搜索失败') return ids = message_ids[0].split() print(f'找到 {len(ids)} 封符合条件邮件') # 只处理前5封 for msg_id in ids[:5]: # 获取邮件头(RFC822格式的头信息) status, msg_data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])') if status != 'OK': continue # 解析邮件头 raw_headers = msg_data[0][1] print(f'\n--- 邮件 ID: {msg_id.decode()} ---') print(raw_headers.decode('utf-8', errors='replace')[:300]) return ids # 搜索今日邮件示例 # search_and_fetch(mail, '(SINCE "01-May-2026")')
# 获取邮件的不同部分 def fetch_email_parts(mail, msg_id): """获取邮件的不同组成部分""" # 1. 只获取邮件头 status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])') headers = data[0][1] print('邮件头已获取') # 2. 获取邮件正文(仅文本部分) status, data = mail.fetch(msg_id, '(BODY.PEEK[TEXT])') body = data[0][1] print(f'正文大小: {len(body)} 字节') # 3. 获取邮件完整结构(不下载内容) status, data = mail.fetch(msg_id, '(BODYSTRUCTURE)') structure = data[0][1] print(f'邮件结构: {structure.decode()[:200]}...') # 4. 获取完整邮件 status, data = mail.fetch(msg_id, '(RFC822)') full_email = data[0][1] print(f'完整邮件大小: {len(full_email)} 字节') # 5. 获取邮件标志(已读/未读等) status, data = mail.fetch(msg_id, '(FLAGS)') flags = data[0] print(f'邮件标志: {flags}') return headers
# IMAP连接管理最佳实践 class IMAPClient: """IMAP客户端封装类""" def __init__(self, host, port=993, user=None, password=None): self.host = host self.port = port self.user = user self.password = password self.connection = None def __enter__(self): self.connection = imaplib.IMAP4_SSL(self.host, self.port) self.connection.login(self.user, self.password) return self def __exit__(self, exc_type, exc_val, exc_tb): if self.connection: try: self.connection.close() self.connection.logout() except: pass def select_mailbox(self, mailbox='INBOX'): return self.connection.select(mailbox) def search(self, criteria='ALL'): return self.connection.search(None, criteria) def fetch(self, msg_id, parts='(RFC822)'): return self.connection.fetch(msg_id, parts) # 使用上下文管理器自动管理连接 # with IMAPClient('imap.example.com', 993, 'user', 'pass') as client: # client.select_mailbox() # status, ids = client.search('UNSEEN') # print(f'未读邮件: {len(ids[0].split())}')

重点总结:imaplib 的核心操作流程为:连接→登录→选择邮箱→搜索→获取→操作→关闭。掌握 search()fetch() 的灵活组合是构建邮件自动化系统的关键。使用上下文管理器(with 语句)可以确保连接被正确关闭。

三、邮件搜索筛选

IMAP 的一个重要优势是支持服务器端搜索。这意味着可以在不下载邮件内容的情况下,通过搜索条件精准定位目标邮件,大大减少了网络传输量。imaplib 的 search() 方法支持丰富的搜索条件,让邮件筛选变得高效灵活。

按日期搜索:日期搜索是邮件自动化中最常用的功能之一。IMAP 支持 SINCEBEFOREON 等日期条件。日期格式必须为 DD-Mon-YYYY(如 01-May-2026)。需要注意的是,SINCE 条件是包含指定日期在内的,即搜索指定日期及之后收到的邮件。

按发件人/主题搜索:使用 FROM 条件按发件人搜索,使用 SUBJECT 条件按主题搜索。这两个条件都支持子字符串匹配,可以结合通配符使用。对于中文内容,需要注意编码问题,部分服务器可能不支持直接搜索中文。解决方法是先通过其他条件缩小范围,再在本地进行二次过滤。

按邮件状态搜索:IMAP 定义了多种邮件状态标志,包括 SEEN(已读)、UNSEEN(未读)、FLAGGED(已标记)、ANSWERED(已回复)、DELETED(已删除)等。搜索时使用 NOT 关键字可以取反,例如 NOT SEEN 等价于 UNSEEN

# 日期范围搜索 - 查找特定时间段内的邮件 import imaplib from datetime import datetime, timedelta def search_by_date_range(mail, days_back=7): """搜索最近N天内的邮件""" today = datetime.now() since_date = today - timedelta(days=days_back) # 格式化为 IMAP 日期格式: DD-Mon-YYYY since_str = since_date.strftime('%d-%b-%Y') before_str = (today + timedelta(days=1)).strftime('%d-%b-%Y') # 搜索日期范围内的邮件 criteria = f'(SINCE {since_str} BEFORE {before_str})' status, msg_ids = mail.search(None, criteria) if status == 'OK': ids = msg_ids[0].split() print(f'最近 {days_back} 天内有 {len(ids)} 封邮件') # 统计每天的邮件数 for msg_id in ids[-10:]: # 最近10封 status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (DATE SUBJECT)])') if status == 'OK': print(data[0][1].decode('utf-8', errors='replace')) return msg_ids # search_by_date_range(mail, 7)
# 复合搜索条件组合 def advanced_search(mail): """高级复合搜索演示""" # 1. 来自特定发件人的未读邮件 criteria = '(UNSEEN FROM "notifications@example.com")' status, ids = mail.search(None, criteria) print(f'来自通知服务的未读邮件: {len(ids[0].split())}封') # 2. 特定主题且已标记的邮件 criteria = '(FLAGGED SUBJECT "紧急")' status, ids = mail.search(None, criteria) print(f'标记为紧急的邮件: {len(ids[0].split())}封') # 3. 7天内包含附件的邮件 from datetime import datetime, timedelta date_str = (datetime.now() - timedelta(days=7)).strftime('%d-%b-%Y') criteria = f'(SINCE {date_str} LARGER 50000)' # 大于50KB通常有附件 status, ids = mail.search(None, criteria) print(f'7天内超过50KB的邮件: {len(ids[0].split())}封') # 4. 未读且主题包含关键词但不包含垃圾词 criteria = '(UNSEEN SUBJECT "订单" NOT FROM "spam@example.com")' status, ids = mail.search(None, criteria) print(f'订单相关非垃圾未读: {len(ids[0].split())}封') # 5. 多条件或关系(IMAP不直接支持OR) # 使用OR关键字:OR条件1条件2 criteria = '(OR SUBJECT "发票" SUBJECT "账单")' status, ids = mail.search(None, criteria) print(f'发票或账单相关: {len(ids[0].split())}封') return ids
# UID 模式搜索 - 更稳定的引用方式 def uid_search_example(mail): """使用UID模式搜索和操作邮件""" # 先进入收件箱 mail.select('INBOX') # UID搜索(比序号更稳定,邮件在服务器上的唯一标识) status, uid_data = mail.uid('search', None, 'ALL') uids = uid_data[0].split() print(f'总邮件数: {len(uids)}') print(f'最新5封UID: {uids[-5:]}') if uids: # 使用UID获取邮件(不会改变邮件的\Seen状态) latest_uid = uids[-1] status, data = mail.uid('fetch', latest_uid, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])') if status == 'OK': raw = data[0][1] print(f'\n最新邮件头:\n{raw.decode("utf-8", errors="replace")}') # 将搜索条件重新应用于搜索 from datetime import datetime, timedelta date_str = (datetime.now() - timedelta(days=30)).strftime('%d-%b-%Y') status, uid_data = mail.uid('search', None, f'(SINCE {date_str})') recent_uids = uid_data[0].split() print(f'最近30天内邮件数(UID): {len(recent_uids)}') return uids
# 本地二次过滤 - 弥补服务端搜索的不足 def search_with_local_filter(mail, sender_pattern=None, subject_keywords=None): """服务端搜索 + 本地二次过滤""" mail.select('INBOX') # 第一步:服务端宽泛搜索(最近30天) from datetime import datetime, timedelta date_str = (datetime.now() - timedelta(days=30)).strftime('%d-%b-%Y') status, msg_ids = mail.search(None, f'(SINCE {date_str})') all_ids = msg_ids[0].split() print(f'宽泛搜索得到: {len(all_ids)} 封') # 第二步:本地二次过滤 matched = [] for msg_id in all_ids: status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])') if status != 'OK': continue raw = data[0][1].decode('utf-8', errors='replace') # 本地发件人匹配(支持中文和部分匹配) if sender_pattern and sender_pattern.lower() not in raw.lower(): continue # 本地主题关键词匹配 if subject_keywords: subject = '' for line in raw.split('\n'): if line.startswith('Subject:'): subject = line break if not any(kw in subject for kw in subject_keywords): continue matched.append(msg_id) print(f'本地过滤后: {len(matched)} 封') return matched

重点总结:IMAP的搜索功能非常强大,支持日期、发件人、主题、状态等多种条件组合。使用UID而不是序号来引用邮件更稳定。对于中文搜索等服务端支持不完善的情况,应先用宽条件搜索,再在本地进行二次过滤。

四、邮件解析

从IMAP服务器获取到原始邮件数据后,下一步是解析邮件内容。Python 的 email 标准库提供了强大的邮件解析能力,可以处理多部分MIME邮件、各种编码格式和附件。邮件解析是邮件自动化中最具挑战性的环节,因为邮件格式的多样性和编码的复杂性。

邮件结构:一封邮件通常由邮件头(Header)和邮件体(Body)两部分组成。邮件头包含 FromToSubjectDateMessage-ID 等字段。邮件体可以是单一文本,也可以是多部分MIME(Multipurpose Internet Mail Extensions)结构。多部分邮件中每个部分都有自己的Content-Type和编码。

解析邮件头:使用 email.parser 模块将原始字节数据解析为 email.message.Message 对象。邮件头中的编码字段(如 Subject、From)可能使用 =?charset?encoding?text?= 格式编码(即 RFC 2047 编码),需要使用 email.header.decode_header() 进行解码。

解析正文:对于多部分邮件,需要遍历邮件体的各个部分,根据 Content-Type 判断是纯文本、HTML还是附件。常见的邮件正文有两种格式:text/plain 纯文本和 text/html HTML格式。自动化处理时,优先提取纯文本部分;如果没有纯文本,再从HTML中提取文本。

处理中文乱码:中文邮件最常见的编码是 gbkgb2312utf-8。解析时需要根据邮件头中的 Content-Type 中的 charset 参数进行解码。对于编码名称不规范的情况,可以使用 chardet 库自动检测编码。

# 邮件头解析 - 处理编码字段 import email from email.header import decode_header from email.utils import parsedate_to_datetime def decode_email_header(header_value): """解码邮件头中的编码字段""" if header_value is None: return '' decoded_parts = decode_header(header_value) result = [] for part, charset in decoded_parts: if isinstance(part, bytes): if charset: try: result.append(part.decode(charset)) except: result.append(part.decode('utf-8', errors='replace')) else: result.append(part.decode('utf-8', errors='replace')) else: result.append(part) return ' '.join(result) def parse_email_headers(raw_email): """解析邮件头信息""" msg = email.message_from_bytes(raw_email) headers = { 'from': decode_email_header(msg.get('From')), 'to': decode_email_header(msg.get('To')), 'subject': decode_email_header(msg.get('Subject')), 'date': msg.get('Date'), 'message_id': msg.get('Message-ID'), 'reply_to': decode_email_header(msg.get('Reply-To')), 'cc': decode_email_header(msg.get('Cc')), } # 解析日期为datetime对象 try: headers['datetime'] = parsedate_to_datetime(headers['date']) except: headers['datetime'] = None return headers # 使用示例 # raw_email = mail.fetch(msg_id, '(RFC822)')[1][0][1] # parsed = parse_email_headers(raw_email) # print(f"发件人: {parsed['from']}") # print(f"主题: {parsed['subject']}")
# 邮件正文解析 - 处理多部分MIME def get_email_body(msg): """提取邮件正文(优先纯文本,其次HTML)""" body_text = '' body_html = '' if msg.is_multipart(): # 多部分邮件,遍历各个部分 for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get('Content-Disposition', '')) # 跳过附件 if 'attachment' in content_disposition: continue # 获取正文内容 if content_type == 'text/plain': charset = part.get_content_charset() or 'utf-8' try: body_text += part.get_payload(decode=True).decode(charset, errors='replace') except: body_text += part.get_payload(decode=True).decode('utf-8', errors='replace') elif content_type == 'text/html': charset = part.get_content_charset() or 'utf-8' try: body_html += part.get_payload(decode=True).decode(charset, errors='replace') except: body_html += part.get_payload(decode=True).decode('utf-8', errors='replace') else: # 非多部分邮件 content_type = msg.get_content_type() if content_type == 'text/plain': charset = msg.get_content_charset() or 'utf-8' body_text = msg.get_payload(decode=True).decode(charset, errors='replace') elif content_type == 'text/html': body_html = msg.get_payload(decode=True).decode(charset, errors='replace') return body_text or body_html, body_html # 使用示例 # raw_email = mail.fetch(msg_id, '(RFC822)')[1][0][1] # msg = email.message_from_bytes(raw_email) # text, html = get_email_body(msg) # print(f"纯文本: {text[:200]}...")
# 附件提取 - 完整实现 import os def extract_attachments(msg, save_dir='attachments'): """提取邮件中的所有附件并保存""" os.makedirs(save_dir, exist_ok=True) saved_files = [] for part in msg.walk(): content_disposition = str(part.get('Content-Disposition', '')) # 检查是否是附件 if 'attachment' not in content_disposition: continue filename = part.get_filename() if not filename: continue # 解码文件名(可能是编码过的) filename = decode_email_header(filename) # 清理文件名,避免路径穿越问题 filename = os.path.basename(filename) # 获取附件内容 payload = part.get_payload(decode=True) if payload is None: continue # 检查是否已存在同名文件 base, ext = os.path.splitext(filename) counter = 1 filepath = os.path.join(save_dir, filename) while os.path.exists(filepath): filepath = os.path.join(save_dir, f'{base}_{counter}{ext}') counter += 1 # 保存附件 with open(filepath, 'wb') as f: f.write(payload) saved_files.append({ 'filename': filename, 'saved_path': filepath, 'size': len(payload), 'content_type': part.get_content_type(), }) print(f'已保存附件: {filename} ({len(payload)} bytes)') return saved_files

重点总结:邮件解析的三步法:先用 email.message_from_bytes 解析原始邮件,再用 decode_header 解码头部字段,然后遍历邮件部分提取正文和附件。处理中文乱码时,注意指定正确的字符编码,多尝试几种编码方案。

五、POP3 收信

尽管 IMAP 是更现代的协议,但 POP3 仍然广泛使用,特别是在一些企业邮箱系统和老旧系统中。Python 的 poplib 模块是对 POP3 协议的标准实现。与 IMAP 相比,POP3 的功能较为简单:连接服务器、认证、列出邮件、下载邮件、删除邮件。本节将系统讲解如何使用 poplib 进行邮件收取。

POP3 的核心流程:POP3 的工作流程非常直接。首先建立 SSL/TLS 连接到服务器(默认端口 995),然后依次调用 user()pass_() 方法进行认证。认证成功后,使用 stat() 获取邮箱状态(邮件总数和总大小),使用 list() 获取每封邮件的编号和大小,使用 retr() 下载指定编号的邮件,使用 dele() 标记删除邮件。操作完成后调用 quit() 断开连接。

与 IMAP 的主要差异:POP3 不支持服务器端搜索,所有搜索过滤必须在下载后本地进行。POP3 没有文件夹的概念,所有邮件都在同一个收件箱中。POP3 不支持邮件状态标记(如已读/未读),不支持部分下载。POP3 下载邮件后,服务器可以选择保留或删除邮件,这取决于服务器配置和客户端设置。

POP3 的应用场景:在自动化办公中,POP3 主要作为辅助方案使用。例如:作为 IMAP 的备份通道,定时通过 POP3 下载所有邮件到本地归档;处理仅支持 POP3 的企业邮箱;一次性批量导出旧邮件。

# POP3 基础操作 - 连接、认证、统计 import poplib from email import message_from_bytes def connect_pop3(host='pop.example.com', port=995, user='', password=''): """连接POP3服务器并获取邮箱状态""" try: # 建立SSL连接 pop = poplib.POP3_SSL(host, port) pop.set_debuglevel(0) # 设为1可查看详细通信过程 # 认证 pop.user(user) pop.pass_(password) # 获取邮箱状态 num_messages, mailbox_size = pop.stat() print(f'邮箱状态: {num_messages} 封邮件,总大小 {mailbox_size} 字节') # 获取邮件列表(编号和大小) response, msg_list, octets = pop.list() print(f'\n邮件列表 (前10封):') for i, msg_info in enumerate(msg_list[:10]): print(f' {msg_info.decode()}') return pop, num_messages except poplib.error_proto as e: print(f'POP3错误: {e}') return None, 0 # connect_pop3('pop.example.com', 995, 'user@example.com', 'password')
# POP3 下载邮件并解析 def download_all_emails(pop, max_emails=50): """下载POP3服务器上的邮件""" downloaded = [] num_messages = len(pop.list()[1]) # 只下载最近的N封(从编号最大的开始,通常最新) start = max(1, num_messages - max_emails + 1) for i in range(start, num_messages + 1): try: # 下载邮件 response, lines, octets = pop.retr(i) # 将行列表合并为字节数据 raw_email = b'\n'.join(lines) # 解析邮件 msg = message_from_bytes(raw_email) # 提取关键信息 subject = msg.get('Subject', '(无主题)') from_addr = msg.get('From', '(未知发件人)') # 解码主题 from email.header import decode_header try: decoded_subject = '' for part, charset in decode_header(subject): if isinstance(part, bytes): decoded_subject += part.decode(charset or 'utf-8', errors='replace') else: decoded_subject += part subject = decoded_subject except: pass downloaded.append({ 'id': i, 'subject': subject, 'from': from_addr, 'size': octets, 'raw': raw_email, }) print(f'[{i}/{num_messages}] {subject[:50]}...') except poplib.error_proto as e: print(f'下载邮件 {i} 失败: {e}') continue return downloaded # pop, count = connect_pop3() # if pop: # emails = download_all_emails(pop, 10) # pop.quit()
# POP3 邮件删除与服务器管理 def pop3_cleanup(pop, delete_before_days=30): """删除POP3服务器上指定天数前的邮件""" from datetime import datetime, timedelta, timezone import email from email.utils import parsedate_to_datetime cutoff = datetime.now(timezone.utc) - timedelta(days=delete_before_days) num_messages = len(pop.list()[1]) deleted_count = 0 for i in range(1, num_messages + 1): try: # 只获取邮件头(用TOP命令) response, lines, octets = pop.top(i, 0) raw_header = b'\n'.join(lines) msg = email.message_from_bytes(raw_header) # 解析日期 date_str = msg.get('Date') if date_str: try: email_date = parsedate_to_datetime(date_str) if email_date < cutoff: pop.dele(i) deleted_count += 1 print(f'已标记删除: [{i}] {msg.get("Subject", "")[:40]}') except: pass except: continue print(f'\n共标记删除 {deleted_count} 封邮件') return deleted_count # pop, _ = connect_pop3() # if pop: # deleted = pop3_cleanup(pop, 30) # pop.quit() # print(f'删除操作完成,实际删除将在quit()时生效')

重点总结:POP3 的使用比 IMAP 简单直接,但功能也相对有限。核心操作为:连接→认证→list→retr→quit。POP3 适合做邮件备份和一次性批量下载,不适合需要精细管理邮件的场景。在自动化系统中,应优先使用 IMAP,将 POP3 作为补充方案。

六、自动分类与规则引擎

当邮箱中的邮件数量越来越多时,自动分类和规则处理就成了必不可少的工具。通过编写邮件分类规则引擎,可以自动将邮件归类、打标签、转发或执行特定操作,大幅提升邮件处理效率。本节将介绍如何构建一个灵活的邮件分类与规则系统。

规则引擎架构:一个完整的邮件规则引擎通常包含三个核心组件:规则定义(匹配条件和执行动作)、规则匹配器(逐一检查规则并执行优先级判断)、动作执行器(执行分类、标记、转发等具体操作)。规则可以采用配置式(如 JSON/YAML 定义)或编程式(Python 代码定义),配置式更灵活,修改规则无需改代码。

关键词分类:最常见的分类方式是关键词匹配。通过定义不同类别对应的关键词列表,在邮件主题和正文中搜索匹配。例如,包含"发票"、"账单"、"付款"的邮件归入"财务"类别;包含"简历"、"面试"、"招聘"的归入"人事"类别。关键词匹配支持精确匹配、模糊匹配和正则表达式匹配。

发件人策略:基于发件人的规则非常实用。白名单(重要联系人直接标记为高优先级)、黑名单(垃圾邮件发送者自动归档或删除)、域名单(同一公司的邮件自动归类)是三种最常见的策略。

# 邮件规则引擎 - 规则定义与匹配 import re from dataclasses import dataclass, field from typing import List, Callable, Optional class MailAction: """邮件处理动作""" MARK_READ = 'mark_read' MARK_FLAGGED = 'mark_flagged' MOVE_TO = 'move_to' COPY_TO = 'copy_to' DELETE = 'delete' FORWARD = 'forward' REPLY = 'auto_reply' NOTIFY = 'notify' @dataclass class MailRule: """邮件规则定义""" name: str priority: int = 0 # 数字越小优先级越高 conditions: dict = field(default_factory=dict) actions: List[dict] = field(default_factory=list) enabled: bool = True class MailRuleEngine: """邮件分类规则引擎""" def __init__(self): self.rules: List[MailRule] = [] def add_rule(self, rule: MailRule): self.rules.append(rule) # 按优先级排序 self.rules.sort(key=lambda r: r.priority) def match(self, msg, headers) -> List[dict]: """对邮件应用所有规则,返回匹配的动作列表""" matched_actions = [] for rule in self.rules: if not rule.enabled: continue if self._check_conditions(rule.conditions, msg, headers): print(f'规则匹配: {rule.name}') matched_actions.extend(rule.actions) break # 高优先级规则匹配后停止(可根据需要修改) return matched_actions def _check_conditions(self, conditions, msg, headers) -> bool: """检查规则条件是否满足""" if not conditions: return True # 关键词匹配(主题) if 'subject_keywords' in conditions: subject = headers.get('subject', '') keywords = conditions['subject_keywords'] if not any(kw.lower() in subject.lower() for kw in keywords): return False # 关键词匹配(正文) if 'body_keywords' in conditions: body_text, _ = get_email_body(msg) keywords = conditions['body_keywords'] if not any(kw.lower() in body_text.lower() for kw in keywords): return False # 发件人匹配 if 'from_pattern' in conditions: from_addr = headers.get('from', '') pattern = conditions['from_pattern'] if not re.search(pattern, from_addr, re.IGNORECASE): return False # 附件类型匹配 if 'attachment_types' in conditions: allowed_types = conditions['attachment_types'] has_matching = False for part in msg.walk(): if part.get_filename(): if part.get_content_type() in allowed_types: has_matching = True break if not has_matching: return False return True
# 规则引擎使用示例 def setup_rules(): """配置邮件分类规则""" engine = MailRuleEngine() # 规则1:财务发票 - 移动到财务文件夹 engine.add_rule(MailRule( name='财务发票', priority=1, conditions={ 'subject_keywords': ['发票', '账单', '付款', '财务', '报销'], }, actions=[ {'type': MailAction.MOVE_TO, 'target': 'Finance'}, {'type': MailAction.MARK_FLAGGED}, ] )) # 规则2:人事招聘 engine.add_rule(MailRule( name='招聘邮件', priority=2, conditions={ 'subject_keywords': ['简历', '面试', '招聘', 'offer', '录用'], 'from_pattern': '@company\.com$', }, actions=[ {'type': MailAction.MOVE_TO, 'target': 'HR/Recruitment'}, {'type': MailAction.NOTIFY, 'target': 'hr-team'}, ] )) # 规则3:项目通知 engine.add_rule(MailRule( name='项目通知', priority=3, conditions={ 'from_pattern': 'notify@(github|gitlab|jira|confluence)', }, actions=[ {'type': MailAction.MOVE_TO, 'target': 'Projects'}, {'type': MailAction.MARK_READ}, ] )) # 规则4:垃圾邮件 engine.add_rule(MailRule( name='垃圾邮件', priority=10, conditions={ 'subject_keywords': ['广告', '推广', '优惠', '中奖', '免费领取'], }, actions=[ {'type': MailAction.MOVE_TO, 'target': 'Spam'}, {'type': MailAction.DELETE}, ] )) return engine # engine = setup_rules() # 对每封邮件:engine.match(msg, headers)
# YAML配置规则 - 无需改代码即可调整规则 """ # rules.yaml 规则配置文件示例 rules: - name: "财务发票" priority: 1 conditions: subject_keywords: ["发票", "账单", "付款"] body_keywords: ["金额", "账户", "转账"] actions: - type: "move_to" target: "INBOX.Finance" - type: "mark_flagged" - name: "客户咨询" priority: 2 conditions: from_pattern: "@(client|customer)\.com$" actions: - type: "move_to" target: "INBOX.Clients" - type: "notify" target: "sales-team" """ import yaml def load_rules_from_yaml(yaml_path='rules.yaml'): """从YAML文件加载规则配置""" with open(yaml_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) engine = MailRuleEngine() for rule_config in config['rules']: rule = MailRule( name=rule_config['name'], priority=rule_config.get('priority', 99), conditions=rule_config['conditions'], actions=rule_config['actions'], ) engine.add_rule(rule) print(f'已加载 {len(config["rules"])} 条规则') return engine # engine = load_rules_from_yaml('rules.yaml')

重点总结:邮件规则引擎的核心是"条件-动作"模式。通过合理设计规则优先级、匹配条件和执行动作,可以实现复杂的邮件自动分类和处理。将规则配置外置于 YAML/JSON 文件中,可以使系统更灵活,修改规则无需重新部署代码。

七、附件自动处理

邮件附件是办公自动化中的重要处理对象。日常工作中常见的附件包括 PDF 文档、Excel 表格、Word 文件、图片、压缩包等。自动保存和管理附件可以节省大量手动操作时间。本节将介绍附件自动处理的完整实现方案。

附件识别与提取:遍历邮件的 MIME 部分,通过 Content-Disposition 头判断是否为附件。需要特别注意的是,部分邮件将附件以 inline 方式嵌入,这种情况下的文件名获取方式略有不同。附件提取的关键步骤包括:获取附件名(可能经过编码)、解码附件名、获取附件内容、根据附件类型选择保存策略。

附件类型判断:根据 MIME 类型(Content-Type)和文件扩展名判断附件类别。常见类别包括:文档类(PDF、DOCX、XLSX、PPTX)、图片类(JPG、PNG、GIF)、压缩包(ZIP、RAR、7z)、代码文件(PY、JS、HTML)等。不同类型可以采用不同的处理路径。例如,压缩包需要解压后处理,图片可能需要缩略图生成。

安全注意事项:处理附件时必须注意安全风险。不要直接执行附件中的可执行文件(.exe、.bat、.vbs 等),不要直接打开附件中的文档(可能包含宏病毒),对压缩包附件先扫描再解压。文件名需要清理,避免路径穿越攻击(如 ../../etc/passwd)。附件大小需要限制,避免磁盘空间耗尽。

# 附件自动保存系统 import os import mimetypes from datetime import datetime class AttachmentSaver: """附件自动保存器""" def __init__(self, base_dir='attachments'): self.base_dir = base_dir self.supported_types = { 'document': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx'], 'image': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'], 'archive': ['.zip', '.rar', '.7z', '.tar', '.gz'], 'code': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.sql'], 'data': ['.csv', '.json', '.xml', '.yaml', '.yml'], } def classify_attachment(self, filename): """根据文件扩展名分类""" _, ext = os.path.splitext(filename.lower()) for category, extensions in self.supported_types.items(): if ext in extensions: return category return 'other' def get_safe_filename(self, filename): """生成安全的文件名""" # 移除路径分隔符,避免路径穿越 filename = os.path.basename(filename) # 替换非法字符 illegal_chars = '<>:"/\\|?*' for char in illegal_chars: filename = filename.replace(char, '_') return filename def save_attachment(self, msg, email_subject=''): """保存邮件中的所有附件""" saved_files = [] email_dir_name = datetime.now().strftime('%Y%m%d_%H%M%S') for part in msg.walk(): # 检查是否有附件 content_disposition = str(part.get('Content-Disposition', '')) if 'attachment' not in content_disposition and 'inline' not in content_disposition: continue filename = part.get_filename() if not filename: continue # 解码和解安全化文件名 filename = decode_email_header(filename) filename = self.get_safe_filename(filename) if not filename: continue # 分类附件 category = self.classify_attachment(filename) # 创建目录: base_dir/分类/邮件日期/ save_dir = os.path.join(self.base_dir, category, email_dir_name) os.makedirs(save_dir, exist_ok=True) # 获取附件内容 payload = part.get_payload(decode=True) if payload is None: continue # 处理同名文件 filepath = os.path.join(save_dir, filename) counter = 1 while os.path.exists(filepath): name, ext = os.path.splitext(filename) filepath = os.path.join(save_dir, f'{name}_{counter}{ext}') counter += 1 # 保存 with open(filepath, 'wb') as f: f.write(payload) file_info = { 'original_name': part.get_filename(), 'saved_name': os.path.basename(filepath), 'path': filepath, 'size': len(payload), 'type': part.get_content_type(), 'category': category, } saved_files.append(file_info) print(f'[{category}] 已保存: {os.path.basename(filepath)} ({len(payload)} bytes)') return saved_files
# 压缩包附件自动处理 import zipfile import tarfile import io def process_archive_attachment(payload, filename, extract_dir='extracted'): """处理压缩包附件(解压并提取内容)""" _, ext = os.path.splitext(filename.lower()) os.makedirs(extract_dir, exist_ok=True) extracted_files = [] try: if ext == '.zip': with zipfile.ZipFile(io.BytesIO(payload)) as zf: # 安全解压:防止路径穿越 for member in zf.namelist(): # 跳过目录和路径穿越 if member.endswith('/') or '..' in member: continue safe_name = os.path.basename(member) if not safe_name: continue target_path = os.path.join(extract_dir, safe_name) with open(target_path, 'wb') as f: f.write(zf.read(member)) extracted_files.append({ 'source': member, 'saved': target_path, 'size': os.path.getsize(target_path), }) print(f' 解压: {member} -> {safe_name}') elif ext in ('.tar', '.gz', '.bz2'): with tarfile.open(fileobj=io.BytesIO(payload), mode='r:*') as tf: for member in tf.getmembers(): if member.isfile() and '..' not in member.name: safe_name = os.path.basename(member.name) if not safe_name: continue target_path = os.path.join(extract_dir, safe_name) with open(target_path, 'wb') as f: f.write(tf.extractfile(member).read()) extracted_files.append({ 'source': member.name, 'saved': target_path, 'size': os.path.getsize(target_path), }) print(f' 解压: {member.name} -> {safe_name}') except Exception as e: print(f'解压失败 {filename}: {e}') return extracted_files
# 附件内容提取(将附件文本内容提取到数据库或索引) def extract_attachment_text(payload, content_type): """提取附件中的文本内容""" import PyPDF2 # 假设已安装 if content_type == 'application/pdf': try: pdf_file = io.BytesIO(payload) reader = PyPDF2.PdfReader(pdf_file) text = '' for page in reader.pages: text += page.extract_text() return text except Exception as e: return f'[PDF提取失败: {e}]' elif content_type == 'text/plain': return payload.decode('utf-8', errors='replace') elif content_type == 'text/csv': return payload.decode('utf-8', errors='replace') else: # 尝试作为纯文本解码 try: return payload.decode('utf-8', errors='replace') except: return '[不支持直接提取文本的附件类型]' # 附件处理综合流程 def process_all_attachments(msg, save_base_dir): """附件处理综合流程:保存 + 解压 + 索引""" # 1. 保存附件 saver = AttachmentSaver(save_base_dir) saved = saver.save_attachment(msg) for file_info in saved: # 2. 如果是压缩包,解压 if file_info['category'] == 'archive': with open(file_info['path'], 'rb') as f: payload = f.read() extracted = process_archive_attachment( payload, os.path.basename(file_info['path']), file_info['path'] + '_extracted' ) # 3. 如果是文档,提取文本 if file_info['type'] in ('application/pdf', 'text/plain', 'text/csv'): with open(file_info['path'], 'rb') as f: payload = f.read() text_content = extract_attachment_text(payload, file_info['type']) # 可存入数据库或全文搜索引擎 return saved

重点总结:附件自动化处理的核心包括识别提取、分类存储、压缩解压、内容提取四个环节。安全处理是重中之重——必须防范路径穿越攻击、恶意文件类型和超大附件。根据附件类型采用不同的处理策略,可以提高自动化处理的效率和准确性。

八、未读邮件监控

在许多自动化场景中,需要实时监控邮箱中的新邮件,并对新邮件做出及时响应。例如,自动处理客户咨询邮件、监控系统告警邮件、自动提取验证码等。一个健壮的邮件监控系统需要具备定时检查、高效通知、异常恢复和长期稳定运行的能力。

监控循环架构:邮件监控的核心是一个无限循环,每次循环执行以下操作:连接邮箱、搜索未读邮件、处理每封新邮件、标记邮件为已读、等待一段时间后进入下一轮循环。循环间隔需要合理设置——太频繁会增加服务器负担(甚至被限流),太稀疏会导致响应不及时。一般建议设置为30-60秒。

IDLE 命令优化:部分 IMAP 服务器支持 IDLE 命令,这是一种推送机制。客户端发送 IDLE 命令后,服务器在有新邮件到达时主动通知客户端,而不是客户端反复轮询。IDLE 模式可以大大降低网络流量和服务端负载,实现接近实时的通知。IMAPlib 中通过 idle()idle_done() 方法实现。

异常处理与重连:网络连接在长时间运行中不可避免地会遇到中断。监控系统必须具备自动重连机制,包括:捕获网络异常(超时、连接重置等)、指数退避重连策略、记录错误日志、自动恢复后继续正常监控。

# 未读邮件监控器 - 基础循环版 import time import logging from datetime import datetime class UnreadMailMonitor: """未读邮件监控器""" def __init__(self, imap_client, poll_interval=30): self.client = imap_client self.poll_interval = poll_interval self.running = False self.processed_ids = set() logging.basicConfig(level=logging.INFO) def start(self): """启动监控循环""" self.running = True logging.info('邮件监控器已启动') while self.running: try: self._check_once() except Exception as e: logging.error(f'监控异常: {e}') self._reconnect() time.sleep(self.poll_interval) def stop(self): """停止监控""" self.running = False logging.info('邮件监控器已停止') def _check_once(self): """单次检查""" with self.client as mail: mail.select_mailbox('INBOX') status, ids = mail.search('UNSEEN') if status != 'OK' or not ids[0]: return unseen_ids = ids[0].split() new_ids = [mid for mid in unseen_ids if mid not in self.processed_ids] for msg_id in new_ids: self._process_new_message(mail, msg_id) self.processed_ids.add(msg_id) logging.info(f'检查完成: {len(new_ids)} 封新邮件') def _process_new_message(self, mail, msg_id): """处理单封新邮件(子类可重写)""" status, data = mail.fetch(msg_id, '(RFC822)') if status != 'OK': return raw_email = data[0][1] msg = email.message_from_bytes(raw_email) subject = decode_email_header(msg.get('Subject', '')) logging.info(f'新邮件: {subject}') print(f'\n[新邮件] {datetime.now().strftime("%H:%M:%S")}') print(f' 主题: {subject}') print(f' 发件人: {decode_email_header(msg.get("From", ""))}') # 标记为已读 mail.connection.store(msg_id, '+FLAGS', '\\Seen') def _reconnect(self): """重连机制""" for attempt in range(1, 6): wait = min(2 ** attempt, 60) logging.info(f'尝试重连 (第{attempt}次, {wait}秒后)...') time.sleep(wait) try: self.client = IMAPClient(self.client.host, self.client.port, self.client.user, self.client.password) logging.info('重连成功') return except Exception as e: logging.error(f'重连失败: {e}') logging.critical('多次重连失败,停止监控') self.running = False # monitor = UnreadMailMonitor(IMAPClient('imap.example.com', 993, 'user', 'pass')) # monitor.start()
# IDLE模式监控 - 实时推送 def monitor_with_idle(host, port, user, password): """使用IDLE模式实现实时邮件监控""" mail = imaplib.IMAP4_SSL(host, port) mail.login(user, password) mail.select('INBOX') print('IDLE监控已启动,等待新邮件...') while True: try: # 发送IDLE命令 mail.send(b'IDLE\r\n') # 持续读取服务器响应(超时29分钟,避免自动断开) while True: response = mail.readline() if not response: break # 服务器通知有新邮件 if b'EXISTS' in response: print(f'\n[{datetime.now().strftime("%H:%M:%S")}] 检测到新邮件!') # 退出IDLE模式处理邮件 mail.send(b'DONE\r\n') mail.readline() # 读取IDLE终止响应 # 搜索未读邮件 status, ids = mail.search(None, 'UNSEEN') if status == 'OK': for msg_id in ids[0].split()[-3:]: # 最多处理3封 status, data = mail.fetch(msg_id, '(BODY.PEEK[HEADER])') raw = data[0][1].decode('utf-8', errors='replace') print(f'新邮件头:\n{raw[:300]}') # 重新进入IDLE模式 mail.send(b'IDLE\r\n') except (ConnectionError, TimeoutError) as e: print(f'连接断开: {e}') time.sleep(5) # 重连... (省略实际重连代码) break except imaplib.IMAP4.error as e: print(f'IMAP错误: {e}') time.sleep(5) continue mail.logout()
# 邮件监控回调系统 from typing import Callable, List from dataclasses import dataclass @dataclass class MailEvent: """邮件事件""" msg_id: str subject: str sender: str received_at: datetime msg_object: object class EventDrivenMonitor: """事件驱动的邮件监控器""" def __init__(self, host, port, user, password): self.config = {'host': host, 'port': port, 'user': user, 'password': password} self.handlers: List[Callable[[MailEvent], None]] = [] def on_new_email(self, handler: Callable): """注册新邮件处理器""" self.handlers.append(handler) return handler # 支持装饰器 def _dispatch(self, event: MailEvent): """将事件分发给所有处理器""" for handler in self.handlers: try: handler(event) except Exception as e: logging.error(f'处理器 {handler.__name__} 执行失败: {e}') def run(self): """运行监控器""" mail = imaplib.IMAP4_SSL(self.config['host'], self.config['port']) mail.login(self.config['user'], self.config['password']) mail.select('INBOX') last_check = time.time() while True: try: status, ids = mail.search(None, f'UNSEEN SINCE {datetime.fromtimestamp(last_check).strftime("%d-%b-%Y")}') if status == 'OK' and ids[0]: for msg_id in ids[0].split(): status, data = mail.fetch(msg_id, '(RFC822)') if status != 'OK': continue msg = email.message_from_bytes(data[0][1]) event = MailEvent( msg_id=msg_id.decode(), subject=decode_email_header(msg.get('Subject', '')), sender=decode_email_header(msg.get('From', '')), received_at=datetime.now(), msg_object=msg, ) self._dispatch(event) mail.store(msg_id, '+FLAGS', '\\Seen') last_check = time.time() time.sleep(30) except Exception as e: logging.error(f'监控异常: {e}') time.sleep(5) # 使用示例 # monitor = EventDrivenMonitor('imap.example.com', 993, 'user', 'pass') # # @monitor.on_new_email # def handle_alert(event): # if '告警' in event.subject: # print(f'收到告警: {event.subject}') # # 发送企业微信通知 # # send_wechat_notification(event.subject) # # @monitor.on_new_email # def handle_order(event): # if '订单' in event.subject: # print(f'收到订单: {event.subject}') # # 自动处理订单 # # process_order(event.msg_object) # # monitor.run()

重点总结:邮件监控系统的三个关键设计:合理的轮询间隔(30-60秒)、健壮的重连机制(指数退避)、灵活的事件处理架构(回调函数)。IDLE 模式适合对实时性要求高的场景。生产环境中还应考虑连接池管理、限流策略和监控告警本身的可观测性。

九、实战案例

掌握了邮件自动收取的核心技术后,让我们一起看三个典型的实战案例。这些案例融合了前面的知识点,展示了邮件自动化在实际工作中的典型应用场景。

案例一:自动提取验证码

许多网站和服务的验证码通过邮件发送。自动提取验证码可以用于自动化测试、账号注册等场景。核心思路是:搜索最近来自特定发件人的邮件,从邮件正文或主题中提取验证码,支持多种验证码格式。需要注意验证码邮件的时效性,通常5分钟内有效。

# 自动提取验证码 import re def extract_verification_code(mail, sender='noreply@example.com', max_age_minutes=5): """从邮件中提取验证码""" from datetime import datetime, timedelta mail.select('INBOX') since_date = (datetime.now() - timedelta(minutes=max_age_minutes)).strftime('%d-%b-%Y') # 搜索近期来自指定发件人的未读邮件 criteria = f'(UNSEEN FROM "{sender}" SINCE {since_date})' status, ids = mail.search(None, criteria) if status != 'OK' or not ids[0]: print('未找到验证码邮件') return None codes = [] code_patterns = [ (r'验证码[::]\s*(\d{4,8})', '4-8位数字验证码'), (r'code[::]\s*([A-Z0-9]{4,8})', '字母数字验证码'), (r'(\d{6})', '6位数字(宽松)'), ] for msg_id in ids[0].split(): status, data = mail.fetch(msg_id, '(RFC822)') if status != 'OK': continue msg = email.message_from_bytes(data[0][1]) body_text, _ = get_email_body(msg) # 尝试所有验证码模式 for pattern, desc in code_patterns: match = re.search(pattern, body_text) if match: code = match.group(1) codes.append({'code': code, 'source': desc, 'email_id': msg_id}) print(f'提取到验证码: {code} ({desc})') break # 一封邮件只取一个验证码 # 标记为已读 mail.store(msg_id, '+FLAGS', '\\Seen') return codes[0] if codes else None # code = extract_verification_code(mail, 'verify@example.com') # if code: # print(f'验证码: {code["code"]}')

案例二:邮件订单自动处理

电商平台每天会收到大量订单邮件,手动处理效率极低。通过邮件自动化系统,可以自动解析订单信息、提取关键字段、写入数据库、发送确认回复,实现订单处理的半自动化或全自动化。

# 订单邮件自动处理 import json import sqlite3 def process_order_email(msg): """解析订单邮件并写入数据库""" subject = decode_email_header(msg.get('Subject', '')) body_text, body_html = get_email_body(msg) order_data = {} # 1. 提取订单号 order_match = re.search(r'订单号[::]\s*(\w+)', body_text) if order_match: order_data['order_id'] = order_match.group(1) # 2. 提取金额 amount_match = re.search(r'金额[::]\s*[¥¥]?(\d+\.?\d*)', body_text) if amount_match: order_data['amount'] = float(amount_match.group(1)) # 3. 提取商品信息 product_match = re.search(r'商品[::]\s*(.+)', body_text) if product_match: order_data['product'] = product_match.group(1).strip() # 4. 提取客户信息 customer_match = re.search(r'客户[::]\s*(.+)', body_text) if customer_match: order_data['customer'] = customer_match.group(1).strip() # 5. 提取发件人 order_data['email'] = msg.get('From', '') if order_data.get('order_id'): # 写入数据库 conn = sqlite3.connect('orders.db') c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS orders ( order_id TEXT PRIMARY KEY, amount REAL, product TEXT, customer TEXT, email TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') c.execute(''' INSERT OR REPLACE INTO orders (order_id, amount, product, customer, email) VALUES (?, ?, ?, ?, ?) ''', ( order_data['order_id'], order_data.get('amount', 0), order_data.get('product', ''), order_data.get('customer', ''), order_data.get('email', ''), )) conn.commit() conn.close() print(f'订单 {order_data["order_id"]} 已写入数据库') return order_data print('未能提取到有效订单信息') return None # 批量处理订单邮件 def batch_process_orders(mail, sender_pattern='order@'): """批量处理来自特定发件人的未处理订单""" mail.select('INBOX') status, ids = mail.search(None, f'UNSEEN FROM "{sender_pattern}"') if status != 'OK' or not ids[0]: print('没有未处理的订单邮件') return [] processed = [] for msg_id in ids[0].split(): status, data = mail.fetch(msg_id, '(RFC822)') if status != 'OK': continue msg = email.message_from_bytes(data[0][1]) result = process_order_email(msg) if result: # 标记已处理并移动到归档文件夹 mail.copy(msg_id, 'Processed') mail.store(msg_id, '+FLAGS', '\\Deleted') processed.append(result) # 永久删除原位置的已处理邮件 mail.expunge() return processed # orders = batch_process_orders(mail, 'orders@shop.com') # print(f'成功处理 {len(orders)} 个订单')

案例三:邮件附件备份系统

企业的重要文件经常通过邮件附件传递。构建一个自动备份系统,可以确保所有附件被及时保存、分类归档,并建立索引便于检索。这个系统可以作为一个定期运行的定时任务(如每天一次)。

# 邮件附件备份系统 import hashlib import os from datetime import datetime, timedelta class EmailBackupSystem: """邮件附件备份系统""" def __init__(self, backup_root='email_backup', retention_days=365): self.backup_root = backup_root self.retention_days = retention_days self.db_path = os.path.join(backup_root, 'backup_index.db') def run_backup(self, mail, search_days=1): """执行一次备份""" mail.select('INBOX') # 搜索指定时间范围内的所有邮件 since_date = (datetime.now() - timedelta(days=search_days)).strftime('%d-%b-%Y') status, ids = mail.search(None, f'(SINCE {since_date})') if status != 'OK' or not ids[0]: print('没有需要备份的邮件') return [] backup_records = [] count = 0 for msg_id in ids[0].split(): status, data = mail.fetch(msg_id, '(RFC822)') if status != 'OK': continue msg = email.message_from_bytes(data[0][1]) email_date = msg.get('Date', datetime.now().isoformat()) email_subject = decode_email_header(msg.get('Subject', '(无主题)')) # 保存附件 saver = AttachmentSaver(os.path.join(self.backup_root, 'attachments')) saved_files = saver.save_attachment(msg, email_subject) for file_info in saved_files: # 计算文件哈希(去重) with open(file_info['path'], 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest() record = { 'msg_id': msg_id.decode(), 'subject': email_subject, 'date': email_date, 'filename': file_info['saved_name'], 'path': file_info['path'], 'size': file_info['size'], 'category': file_info['category'], 'hash': file_hash, } backup_records.append(record) count += 1 print(f'[{count}] 已备份: {file_info["saved_name"]} ' f'({file_info["category"]}, {file_info["size"]} bytes)') self._save_index(backup_records) return backup_records def _save_index(self, records): """保存备份索引""" import json index_path = os.path.join(self.backup_root, 'backup_index.json') existing = [] if os.path.exists(index_path): with open(index_path, 'r', encoding='utf-8') as f: existing = json.load(f) existing.extend(records) with open(index_path, 'w', encoding='utf-8') as f: json.dump(existing, f, ensure_ascii=False, indent=2) print(f'\n索引已更新: {len(existing)} 条记录') def find_by_filename(self, keyword): """按文件名搜索备份""" import json index_path = os.path.join(self.backup_root, 'backup_index.json') if not os.path.exists(index_path): return [] with open(index_path, 'r', encoding='utf-8') as f: records = json.load(f) results = [r for r in records if keyword.lower() in r['filename'].lower()] return results def cleanup_old_backups(self): """清理过期备份""" import shutil cutoff = datetime.now() - timedelta(days=self.retention_days) # 实现按日期清理... pass # 使用示例(设置每天运行一次) # backup = EmailBackupSystem('\\\\nas\\backups\\email', retention_days=365) # with IMAPClient('imap.company.com', 993, 'user', 'pass') as mail: # records = backup.run_backup(mail, search_days=1) # print(f'本次备份了 {len(records)} 个附件')

重点总结:三个实战案例覆盖了邮件自动化的典型场景:验证码提取(实时响应)、订单处理(数据解析与存储)、附件备份(文件管理与索引)。每个案例都融合了IMAP操作、邮件解析、规则处理和本地持久化等技术要点。实际应用中,可以结合定时任务框架(如APScheduler、Celery)和消息队列(如RabbitMQ)构建更健壮的邮件自动化系统。