← 返回自动化办公目录
← 返回学习笔记首页
专题: Python 自动化办公系统学习
关键词: Python, 自动化办公, zipfile, tarfile, 压缩解压, ZIP, TAR, 文件归档, Python自动化
一、压缩格式概述
在日常办公和系统运维中,文件压缩与归档是最常见也最基础的需求之一。Python 标准库提供了丰富的压缩与归档模块,让我们能够以纯 Python 代码处理几乎所有主流压缩格式,无需依赖外部命令行工具。本章首先对这些格式和对应的 Python 模块做全局性介绍。
压缩格式对比表
格式
Python 模块
算法
归档支持
压缩比
速度
典型后缀
ZIP
zipfile
DEFLATE / BZIP2 / LZMA
是(多文件)
中等
快
.zip
TAR
tarfile
无(裸归档)
是(多文件)
无压缩
极快
.tar
TAR.GZ
tarfile + gzip
DEFLATE (gzip)
是(多文件)
中等
快
.tar.gz / .tgz
TAR.BZ2
tarfile + bz2
BZIP2
是(多文件)
高
慢
.tar.bz2 / .tbz2
TAR.XZ
tarfile + lzma
LZMA
是(多文件)
极高
中等
.tar.xz / .txz
GZIP
gzip
DEFLATE
否(单文件)
中等
快
.gz
BZIP2
bz2
BZIP2
否(单文件)
高
慢
.bz2
LZMA / XZ
lzma
LZMA
否(单文件)
极高
中等
.xz / .lzma
关键概念:归档 vs 压缩
归档(archiving)指的是将多个文件打包成一个文件的过程,而压缩(compression)则是通过算法减少数据体积。TAR 格式仅做归档不做压缩(因此常与其他压缩算法配合使用),而 ZIP 格式同时支持归档和压缩。理解这一区别有助于在实际项目中选择正确的工具。
选择何种格式主要取决于以下因素:跨平台兼容性优先时选择 ZIP(Windows / macOS / Linux 原生支持);追求高压缩比时选择 TAR.XZ 或 TAR.BZ2;需要流式处理日志之类的大文件时使用 GZIP 单文件压缩;需要保留 Unix 文件权限时使用 TAR 格式。
# 查看 Python 标准库中可用的压缩模块
import zipfile, tarfile, gzip, bz2, lzma
print("zipfile version info:", zipfile.__name__)
print("tarfile version info:", tarfile.__name__)
# 所有模块均为 Python 标准库自带,无需 pip install
二、zipfile 基础操作
zipfile 是 Python 标准库中处理 ZIP 格式的核心模块。它支持创建、读取、写入和提取 ZIP 文件,并且内置对 DEFLATE、BZIP2 和 LZMA 三种压缩算法的支持。zipfile 最突出的优势是跨平台兼容性——ZIP 文件几乎可以在任何操作系统上无需额外工具直接打开。
创建 ZIP 文件
使用 ZipFile 类时,需要指定模式参数:'w' 为写入模式(会覆盖已有文件),'a' 为追加模式,'r' 为读取模式。通过 compression 参数选择压缩算法,可选值为 ZIP_STORED(仅存储不压缩)、ZIP_DEFLATED(DEFLATE 算法)、ZIP_BZIP2(BZIP2 算法)和 ZIP_LZMA(LZMA 算法)。
import zipfile
import os
# 创建 ZIP 文件并写入多个文件
with zipfile.ZipFile('archive.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf:
# 将本地文件添加到 ZIP 中
zf.write('data/report1.csv', arcname='reports/report1.csv')
zf.write('data/report2.csv', arcname='reports/report2.csv')
# 使用 writestr 直接写入字符串内容
zf.writestr('config/settings.json', '{"theme": "dark", "lang": "zh"}')
# 写入二进制数据
zf.writestr('data/raw.bin', b'\x00\x01\x02\x03')
print("ZIP 文件创建完成")
读取和提取 ZIP 文件
读取 ZIP 文件时,可以使用 namelist() 获取所有文件列表,infolist() 获取包含元信息的 ZipInfo 对象列表,extract() 和 extractall() 方法用于提取文件。extractall() 支持 path 参数指定提取目录和 members 参数筛选提取的文件子集。
import zipfile
# 读取 ZIP 文件
with zipfile.ZipFile('archive.zip', 'r') as zf:
# 列出所有文件
for name in zf.namelist():
print(f" {name}")
# 获取文件详细信息
for info in zf.infolist():
print(f"{info.filename} - 原大小: {info.file_size} B, "
f"压缩后: {info.compress_size} B, "
f"压缩率: {info.compress_size / info.file_size:.1%}")
# 读取特定文件内容(不提取到磁盘)
content = zf.read('config/settings.json')
print("settings.json 内容:", content.decode('utf-8'))
# 提取到指定目录
zf.extractall(path='extracted_files')
# 只提取特定文件
zf.extract('config/settings.json', path='extracted_config')
压缩模式选择
ZIP_STORED 模式不进行压缩,仅将文件原样打包,速度最快但体积最大,适合已经压缩过的文件(如 JPEG、MP4)。ZIP_DEFLATED 是默认且最通用的压缩算法,在速度和压缩比之间取得了良好的平衡。ZIP_BZIP2 提供更高的压缩比但速度较慢,ZIP_LZMA 压缩比最高但兼容性稍差——某些旧版解压工具可能不支持。
import zipfile
# 对比不同压缩算法的效果
with open('large_file.txt', 'r') as f:
data = f.read()
# STORED(不压缩)
with zipfile.ZipFile('test_stored.zip', 'w', zipfile.ZIP_STORED) as zf:
zf.writestr('large_file.txt', data)
# DEFLATED(默认压缩)
with zipfile.ZipFile('test_deflated.zip', 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('large_file.txt', data)
# BZIP2(高压缩比)
with zipfile.ZipFile('test_bzip2.zip', 'w', zipfile.ZIP_BZIP2) as zf:
zf.writestr('large_file.txt', data)
import os
for name in ['test_stored.zip', 'test_deflated.zip', 'test_bzip2.zip']:
print(f"{name}: {os.path.getsize(name)} bytes")
三、zipfile 进阶功能
在掌握基础操作之后,zipfile 还提供了许多进阶功能,包括密码保护、压缩级别控制、路径规范化、ZIP64 大文件支持以及分卷 ZIP 等高级特性,这些功能在生产环境中非常实用。
密码加密与解密
zipfile 支持基于 PKZIP 2.0 加密的 ZIP 文件(传统加密)。使用 setpassword() 方法设置全局密码,或在提取时传入 pwd 参数。需要注意:标准 zipfile 模块不支持更安全的 AES-256 加密(如需 AES 加密,可考虑 pyzipper 第三方库)。
import zipfile
# 创建加密 ZIP 文件
with zipfile.ZipFile('encrypted.zip', 'w', zipfile.ZIP_DEFLATED) as zf:
zf.setpassword(b'my_secret_password')
zf.write('confidential.txt', arcname='secret/confidential.txt')
zf.writestr('notes.txt', '这是加密内容')
# 读取加密 ZIP 文件
with zipfile.ZipFile('encrypted.zip', 'r') as zf:
zf.setpassword(b'my_secret_password')
# 列出所有文件(列表不需要密码)
for name in zf.namelist():
print(f"文件: {name}")
# 读取内容时需要密码
content = zf.read('notes.txt')
print(content.decode('utf-8'))
压缩级别控制
对于 ZIP_DEFLATED 算法,可以通过 compresslevel 参数控制压缩级别(1-9),其中 1 为最快但压缩比最低,9 为最慢但压缩比最高。默认值为 6,在绝大多数场景下是合理的选择。
import zipfile
# 测试不同压缩级别的效果
data = "Hello, World! " * 10000 # 可重复数据
for level in range(1, 10):
with zipfile.ZipFile(
f'compress_level_{level}.zip', 'w',
compression=zipfile.ZIP_DEFLATED, compresslevel=level
) as zf:
zf.writestr('data.txt', data)
import os
for level in range(1, 10):
size = os.path.getsize(f'compress_level_{level}.zip')
print(f"级别 {level}: {size} bytes")
# 对于 BZIP2 算法,compresslevel 同样有效
with zipfile.ZipFile(
'bzip2_best.zip', 'w',
compression=zipfile.ZIP_BZIP2, compresslevel=9
) as zf:
zf.writestr('data.txt', data)
路径控制与安全处理
zipfile 的 write() 方法使用 arcname 参数控制文件在 ZIP 内部的存储路径。如果不指定 arcname,Python 会使用文件的完整路径,这可能导致解压时目录结构混乱。另外,提取文件时要注意路径穿越攻击——恶意构造的 ZIP 文件可能包含 ../ 路径。
import zipfile
# 安全的路径处理和提取
from pathlib import Path
def safe_extract(zip_path: str, extract_dir: str):
"""安全提取 ZIP,防止路径穿越攻击"""
with zipfile.ZipFile(zip_path, 'r') as zf:
for info in zf.infolist():
# 使用 Path 规范化路径
target_path = Path(extract_dir) / info.filename
# 安全检查:确保目标路径在 extract_dir 下
if not str(target_path.resolve()).startswith(Path(extract_dir).resolve().as_posix()):
print(f"跳过不安全路径: {info.filename}")
continue
zf.extract(info, extract_dir)
# 添加注释
with zipfile.ZipFile('commented.zip', 'w', zipfile.ZIP_DEFLATED) as zf:
zf.comment = b'这是ZIP文件注释,创建于2026年5月'
zf.writestr('info.txt', '文件内容')
# 也可以为单个文件添加注释
zf.getinfo('info.txt').comment = b'单个文件注释'
ZIP64 与分卷压缩
对于超过 4GB 的大文件或包含超过 65535 个文件的 ZIP,需要启用 ZIP64 扩展。Python 的 zipfile 在需要时自动启用 ZIP64 支持。分卷压缩(split ZIP)则是将 ZIP 文件分割为多个指定大小的文件段。
import zipfile
# ZIP64 大文件支持(超过 4GB)
with zipfile.ZipFile('large_archive.zip', 'w',
compression=zipfile.ZIP_DEFLATED,
allowZip64=True) as zf:
zf.write('huge_file.dat') # 自动启用 ZIP64
# 分卷压缩(使用第三方库实现)
# Python 标准 zipfile 不直接支持创建分卷,
# 但可以使用以下方式手动分割
import shutil
def create_split_zip(input_file, chunk_size_mb=10):
"""创建分卷 ZIP"""
chunk_size = chunk_size_mb * 1024 * 1024
# 先创建完整 ZIP
with zipfile.ZipFile('temp_full.zip', 'w',
zipfile.ZIP_DEFLATED) as zf:
zf.write(input_file)
# 分割文件
with open('temp_full.zip', 'rb') as f:
chunk_num = 0
while True:
chunk = f.read(chunk_size)
if not chunk:
break
with open(f'archive.z{chunk_num:02d}', 'wb') as cf:
cf.write(chunk)
chunk_num += 1
os.remove('temp_full.zip')
print(f"已分割为 {chunk_num} 个分卷")
四、tarfile 基础操作
tarfile 模块是 Python 处理 TAR 归档的标准方式。与 zipfile 不同,tarfile 原生支持与各种压缩过滤器的组合——通过在 open() 时指定 mode 参数,可以自动处理 gzip、bzip2 和 xz 压缩的 TAR 文件。tarfile 在 Unix/Linux 生态中占据主导地位,特别适合保留文件权限、所有者信息等元数据。
创建 TAR 归档
使用 tarfile.open() 打开归档文件时,mode 参数由「文件模式 + 压缩过滤器」组成。例如 'w:gz' 表示写入并使用 gzip 压缩,'r:bz2' 表示读取 bzip2 压缩的归档。常用模式:'w'(无压缩写入)、'w:gz'(gzip 压缩)、'w:bz2'(bzip2 压缩)、'w:xz'(lzma/xz 压缩)。
import tarfile
# 创建 tar.gz 压缩归档
with tarfile.open('backup.tar.gz', 'w:gz') as tar:
tar.add('data/reports/', arcname='reports')
tar.add('config/app.ini', arcname='config/app.ini')
tar.add('logs/', arcname='logs', filter=lambda x: None if x.name.endswith('.tmp') else x)
print("tar.gz 归档创建完成")
# 创建 tar.bz2(更高压缩比)
with tarfile.open('backup.tar.bz2', 'w:bz2') as tar:
tar.add('data/')
tar.add('config/')
# 创建 tar.xz(最高压缩比)
with tarfile.open('backup.tar.xz', 'w:xz') as tar:
tar.add('data/')
tar.add('config/')
读取和提取 TAR 归档
读取 TAR 归档时,tarfile 会自动检测压缩格式。getnames() 和 getmembers() 用于列出归档内容。extractall() 和 extract() 用于提取文件。与 zipfile 相比,tarfile 的 extractall() 会默认保留文件权限和所有者信息。
import tarfile
# 读取 tar.gz 文件
with tarfile.open('backup.tar.gz', 'r:gz') as tar:
# 列出所有文件
for member in tar.getmembers():
print(f"{member.name:40s} {member.size:>10d}B "
f"权限: {oct(member.mode)[-3:]} "
f"类型: {'目录' if member.isdir() else '文件'}")
# 提取所有内容
tar.extractall(path='restored_backup/')
# 自动检测压缩格式(不指定压缩方式)
with tarfile.open('backup.tar.xz', 'r') as tar:
# 'r' 模式会自动检测 gz/bz2/xz
tar.extractall(path='restored_xz/')
# 读取特定文件内容
with tarfile.open('backup.tar.gz', 'r:gz') as tar:
f = tar.extractfile('config/app.ini')
if f:
content = f.read().decode('utf-8')
print(content)
压缩过滤器详解
tarfile 支持四种压缩过滤器:无压缩('')、gzip(':gz')、bzip2(':bz2')和 xz(':xz')。选择依据:gzip 速度最快且兼容性最好;bzip2 压缩比更高但速度慢 3-5 倍;xz 压缩比最高(通常比 bzip2 再小 30-50%)且解压速度尚可,但创建时非常消耗 CPU。在自动化办公场景中,tar.gz 是最常见的选择。
import tarfile
import os
# 对比 TAR 压缩格式
test_dir = 'data_to_archive/'
# 无压缩
with tarfile.open('test.tar', 'w') as tar:
tar.add(test_dir)
print(f"未压缩: {os.path.getsize('test.tar')} bytes")
# gzip 压缩
with tarfile.open('test.tar.gz', 'w:gz') as tar:
tar.add(test_dir)
print(f"gzip: {os.path.getsize('test.tar.gz')} bytes")
# bzip2 压缩
with tarfile.open('test.tar.bz2', 'w:bz2') as tar:
tar.add(test_dir)
print(f"bzip2: {os.path.getsize('test.tar.bz2')} bytes")
# xz 压缩
with tarfile.open('test.tar.xz', 'w:xz') as tar:
tar.add(test_dir)
print(f"xz: {os.path.getsize('test.tar.xz')} bytes")
五、tarfile 进阶功能
tarfile 提供了丰富的进阶功能,包括增量归档、排除模式、文件过滤、权限保留以及硬链接和软链接处理。这些特性使得 tarfile 在系统备份和部署场景中比 zipfile 更为灵活和强大。
文件过滤与排除模式
tar.add() 方法的 filter 参数接受一个函数,可以对每个待添加的文件进行过滤或修改。返回 None 则跳过该文件,返回 TarInfo 对象则使用修改后的元信息。这在创建排除缓存文件、临时文件的备份时非常有用。
import tarfile
def archive_filter(tar_info):
"""过滤不需要的文件"""
name = tar_info.name
# 排除 Python 缓存目录
if '__pycache__' in name or '.pyc' in name:
return None
# 排除 .git 目录
if '.git/' in name:
return None
# 排除临时文件和日志
if name.endswith(('.tmp', '.log', '.cache')):
return None
# 修改文件权限示例
if tar_info.issym() or tar_info.islnk():
tar_info.mode = 0o777 # 链接文件保持可访问
return tar_info
with tarfile.open('project_clean.tar.gz', 'w:gz') as tar:
tar.add('/path/to/project/', arcname='project',
filter=archive_filter)
# 也可以使用更简洁的列表排除方式
import fnmatch
def exclude_patterns(tar_info, patterns=['*.pyc', '__pycache__', '*.tmp']):
for pattern in patterns:
if fnmatch.fnmatch(tar_info.name, pattern):
return None
return tar_info
with tarfile.open('filtered.tar.gz', 'w:gz') as tar:
tar.add('src/', filter=exclude_patterns)
增量归档与追加模式
tarfile 不支持直接向已压缩的 tar.gz 文件追加内容(因为压缩流是连续且不可追加的)。但可以向未压缩的 .tar 文件追加内容,或者使用以下技巧实现增量备份:每次都重新创建归档并在添加时跳过已存在的文件。
import tarfile
import os
import time
def incremental_backup(source_dir, backup_file, last_backup_time=None):
"""
增量备份:仅备份最后修改时间晚于 last_backup_time 的文件
"""
with tarfile.open(backup_file, 'w:gz') as tar:
for root, dirs, files in os.walk(source_dir):
for file in files:
filepath = os.path.join(root, file)
mtime = os.path.getmtime(filepath)
if last_backup_time is None or mtime > last_backup_time:
arcname = os.path.relpath(filepath, source_dir)
tar.add(filepath, arcname=arcname)
print(f" 添加: {arcname}")
# 首次全量备份
incremental_backup('/path/to/data', 'full_backup.tar.gz')
prev_time = time.time()
# 一天后增量备份(仅备份新增和修改的文件)
time.sleep(86400)
incremental_backup('/path/to/data', 'daily_backup.tar.gz',
last_backup_time=prev_time)
# 向未压缩的 tar 追加文件
with tarfile.open('appended.tar', 'a') as tar:
tar.add('new_file.txt')
权限保留与元数据处理
tarfile 的一个突出优势是能够保留文件的 Unix 权限、所有者(uid/gid)、修改时间等元数据。在处理系统配置文件、软件部署包时,这一特性至关重要。通过修改 TarInfo 对象的属性,可以在创建归档时自定义这些元信息。
import tarfile
import pwd
import grp
# 保留权限创建归档
with tarfile.open('preserve_perm.tar.gz', 'w:gz') as tar:
tar.add('config/', filter=lambda x: x)
# 提取时保留权限
with tarfile.open('preserve_perm.tar.gz', 'r:gz') as tar:
tar.extractall(path='./restored/', numeric_owner=True)
# 自定义文件的元信息
with tarfile.open('custom_meta.tar', 'w') as tar:
# 手动创建 TarInfo 对象
info = tarfile.TarInfo(name='custom_file.txt')
info.size = len(b'Hello, custom file!')
info.mtime = int(time.time())
info.mode = 0o644 # rw-r--r--
info.uid = 1000
info.gid = 1000
info.uname = 'developer'
info.gname = 'developers'
info.type = tarfile.REGTYPE # 普通文件
tar.addfile(info, io.BytesIO(b'Hello, custom file!'))
硬链接与软链接处理
tarfile 能够识别和处理文件系统中的硬链接和软链接(符号链接)。在归档时,如果是硬链接且链接目标已在归档中,tarfile 会自动将其存储为硬链接而非重复存储文件内容,从而节省空间。软链接则以链接本身的形式存储。
import tarfile
import os
# 创建包含链接的示例
os.makedirs('link_test', exist_ok=True)
with open('link_test/original.txt', 'w') as f:
f.write('原始文件内容')
os.link('link_test/original.txt', 'link_test/hardlink.txt') # 硬链接
os.symlink('original.txt', 'link_test/softlink.txt') # 软链接
# 归档并查看链接处理方式
with tarfile.open('links_archive.tar', 'w') as tar:
tar.add('link_test/')
# 提取并检查链接是否保留
with tarfile.open('links_archive.tar', 'r') as tar:
for member in tar.getmembers():
if member.issym():
print(f"符号链接: {member.name} -> {member.linkname}")
elif member.islnk():
print(f"硬链接: {member.name} -> {member.linkname}")
else:
print(f"普通文件: {member.name}")
tar.extractall(path='./extracted_links/')
六、gzip / bzip2 / lzma 单文件压缩
除了 zipfile 和 tarfile 这两个归档模块,Python 还提供了 gzip、bz2 和 lzma 三个专门进行单文件压缩的模块。它们不处理归档逻辑,而是聚焦于数据的压缩与解压。这些模块既可以独立使用(用于压缩单个文件或数据流),也可以与 tarfile 配合(作为底层的压缩引擎)。
gzip 模块
gzip 模块提供了对 GNU gzip 格式的读写支持。其使用方式与内置的 open() 函数非常相似——可以用 gzip.open() 直接读写 .gz 文件。它同样支持文件对象级别的操作,适用于逐行处理大文件。
import gzip
# 写入 gzip 压缩文件
data = b"这是一段需要压缩的文本内容。gzip 适合单文件压缩。"
with gzip.open('example.txt.gz', 'wb') as f:
f.write(data)
# 读取 gzip 压缩文件
with gzip.open('example.txt.gz', 'rb') as f:
content = f.read()
print(content.decode('utf-8'))
# 指定压缩级别(1-9)
with gzip.open('best_compress.gz', 'wb', compresslevel=9) as f:
f.write(data)
# 追加到已存在的 gzip 文件
with gzip.open('example.txt.gz', 'ab') as f:
f.write(b"\n更多追加内容")
# 逐行读取(适合处理压缩的日志文件)
with gzip.open('large_log.gz', 'rt', encoding='utf-8') as f:
for line in f:
if 'ERROR' in line:
print(f"发现错误: {line.strip()}")
bzip2 模块
bz2 模块提供了对 bzip2 格式的支持。bzip2 的压缩比通常比 gzip 高 15-30%,但压缩速度慢 3-5 倍。在需要更小的文件体积且压缩时间不是瓶颈时(如发布包、离线传输)非常适合使用 bz2 格式。
import bz2
# 写入 bzip2 压缩文件
data = b"需要压缩的数据内容。bzip2 压缩比更高。"
with bz2.open('example.bz2', 'wb') as f:
f.write(data)
# 读取 bzip2 压缩文件
with bz2.open('example.bz2', 'rb') as f:
content = f.read()
print(content.decode('utf-8'))
# 文本模式读写
with bz2.open('example.bz2', 'wt', encoding='utf-8') as f:
f.write("文本模式写入的 bzip2 压缩文件")
# 内存压缩(不写文件)
compressed = bz2.compress(data * 100)
print(f"原始大小: {len(data * 100)}, 压缩后: {len(compressed)}")
# 内存解压
decompressed = bz2.decompress(compressed)
print(decompressed.decode('utf-8'))
lzma 模块
lzma 模块支持 .xz 和 .lzma 两种格式。LZMA 算法提供三者中最高的压缩比,通常比 bzip2 再小 30-50%,同时解压速度也比较快。其创建速度是三者中最慢的,但解压速度接近 gzip。对于发布包、软件分发和存档场景非常理想。
import lzma
import os
# 使用 lzma.open(默认 .xz 格式)
data = b"需要压缩的数据。LZMA 提供最高压缩比。"
with lzma.open('example.xz', 'wb') as f:
f.write(data)
# 指定压缩级别(0-9)
with lzma.open('example_lv9.xz', 'wb', preset=9) as f:
f.write(data)
# 内存压缩
compressed = lzma.compress(data * 100)
decompressed = lzma.decompress(compressed)
# 对比三种格式的压缩效果
import gzip, bz2, lzma
sample = b"A" * 10000 + b"B" * 10000 + b"C" * 50000
gzip_data = gzip.compress(sample)
bz2_data = bz2.compress(sample)
lzma_data = lzma.compress(sample)
print(f"原始大小: {len(sample)} bytes")
print(f"gzip: {len(gzip_data)} bytes (压缩比: {len(gzip_data)/len(sample):.2%})")
print(f"bz2: {len(bz2_data)} bytes (压缩比: {len(bz2_data)/len(sample):.2%})")
print(f"lzma: {len(lzma_data)} bytes (压缩比: {len(lzma_data)/len(sample):.2%})")
七、内存流与网络传输
在实际应用中,很多时候我们并不需要将压缩文件写入磁盘——而是希望在内存中完成压缩后直接通过网络传输。Python 的 BytesIO 配合压缩模块可以实现完全在内存中进行的压缩和解压操作,避免创建临时文件,特别适合 Web 应用和微服务场景。
BytesIO 内存流压缩
使用 io.BytesIO 作为虚拟文件缓冲区,结合 zipfile 或 tarfile 的文件类接口,可以实现纯内存压缩。这种方法在 Web API 中非常常见——例如从数据库读取文件后直接压缩返回给客户端。
import zipfile
import io
# 在内存中创建 ZIP 文件
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr('document.txt', '这是文档内容')
zf.writestr('data.csv', 'id,name,age\n1,Alice,30\n2,Bob,25')
# 获取压缩后的数据(无需写入磁盘)
zip_data = buffer.getvalue()
print(f"内存 ZIP 大小: {len(zip_data)} bytes")
# 从内存中读取 ZIP
read_buffer = io.BytesIO(zip_data)
with zipfile.ZipFile(read_buffer, 'r') as zf:
for name in zf.namelist():
content = zf.read(name)
print(f"{name}: {content.decode('utf-8')[:50]}...")
网络流式压缩传输
对于大文件,全部读入内存再压缩可能占用太多内存。可以使用流式处理——从源文件边读边压缩,或使用分块传输编码。下面的示例展示了如何将多个文件逐个添加到内存 ZIP 中并通过 HTTP 响应返回。
import zipfile
import io
from flask import Flask, Response, send_file # 仅在 Flask 应用中使用
app = Flask(__name__)
@app.route('/download/reports')
def download_reports():
"""从数据库读取文件并在内存中压缩后返回"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# 模拟从数据库读取文件
reports = {
'report_2026_01.csv': 'date,revenue\n2026-01,100000',
'report_2026_02.csv': 'date,revenue\n2026-02,120000',
'summary.txt': '2026年第一季度总结报告',
}
for filename, content in reports.items():
zf.writestr(filename, content)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/zip',
as_attachment=True,
download_name='reports.zip'
)
# 流式压缩大文件(使用 tarfile + gzip)
def stream_compress_files(file_paths):
"""将多个文件流式压缩为 tar.gz"""
buffer = io.BytesIO()
with tarfile.open(fileobj=buffer, mode='w:gz') as tar:
for filepath in file_paths:
# 直接将文件内容添加到 tar 包
with open(filepath, 'rb') as f:
content = f.read()
info = tarfile.TarInfo(name=filepath.split('/')[-1])
info.size = len(content)
tar.addfile(info, io.BytesIO(content))
print(f"已添加: {filepath}")
buffer.seek(0)
return buffer
将压缩数据直接上传到云存储
在云原生架构中,通常需要将压缩后的数据直接上传到 S3、OSS 或 MinIO 等对象存储。结合内存流压缩和云存储 SDK,可以实现零临时文件的纯内存管道。
import zipfile
import io
import boto3 # 仅示例,实际需安装
def upload_compressed_to_s3(files_dict, bucket, key):
"""
在内存中压缩数据并直接上传到 S3
files_dict: {filename: content_string_or_bytes}
"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for name, content in files_dict.items():
if isinstance(content, str):
zf.writestr(name, content.encode('utf-8'))
else:
zf.writestr(name, content)
buffer.seek(0)
s3_client = boto3.client('s3')
s3_client.upload_fileobj(buffer, bucket, key, {
'ContentType': 'application/zip'
})
print(f"已上传 s3://{bucket}/{key}")
# 使用示例
files = {
'data1.json': '{"key": "value1"}',
'data2.json': '{"key": "value2"}',
}
# upload_compressed_to_s3(files, 'my-bucket', 'archive/data.zip')
八、批量归档与自动化
在真实的办公自动化场景中,我们通常需要处理的是批量文件和定期任务——例如每天压缩日志文件、按日期归档项目文件、创建增量备份等。本章介绍如何将前面学到的压缩技术组合起来,构建实用的批量归档自动化脚本。
目录批量压缩
批量处理目录的核心是遍历文件树,根据规则选择需要归档的文件。以下函数通过 os.walk 递归遍历目录,并使用 fnmatch 模式匹配筛选文件。它还支持排除规则,可以灵活控制归档内容。
import zipfile
import os
import fnmatch
from datetime import datetime
def batch_compress_directories(source_dirs, output_zip,
include_patterns=['*'],
exclude_patterns=[]):
"""
批量压缩多个目录到单个 ZIP 文件
source_dirs: 源目录列表
output_zip: 输出 ZIP 文件路径
include_patterns: 包含的文件通配符列表
exclude_patterns: 排除的文件通配符列表
"""
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for src_dir in source_dirs:
if not os.path.exists(src_dir):
print(f"跳过不存在的目录: {src_dir}")
continue
for root, dirs, files in os.walk(src_dir):
for file in files:
filepath = os.path.join(root, file)
rel_path = os.path.relpath(filepath, start=os.path.dirname(src_dir))
# 检查包含模式
if not any(fnmatch.fnmatch(file, p) for p in include_patterns):
continue
# 检查排除模式
if any(fnmatch.fnmatch(file, p) for p in exclude_patterns):
continue
zf.write(filepath, arcname=rel_path)
print(f" 已添加: {rel_path}")
total = sum(f.file_size for f in zf.infolist())
print(f"归档完成: {output_zip}, 共 {len(zf.infolist())} 个文件")
# 压缩项目目录,排除缓存和虚拟环境
batch_compress_directories(
['/path/to/project1', '/path/to/project2'],
'projects_backup.zip',
include_patterns=['*.py', '*.txt', '*.json', '*.yaml', '*.md'],
exclude_patterns=['__pycache__', '*.pyc', 'node_modules', '.git']
)
按日期归档
定期自动归档是办公自动化的常见需求。以下脚本实现了按日期命名归档文件的功能,可以配合操作系统的定时任务(cron / 任务计划程序)实现周期性的自动化备份。
import zipfile
import os
from datetime import datetime, timedelta
def archive_by_date(source_dir, output_dir,
archive_type='daily',
date_filter=None):
"""
按日期归档文件
archive_type: 'daily' | 'weekly' | 'monthly'
date_filter: 可选,只归档指定日期之后的文件
"""
os.makedirs(output_dir, exist_ok=True)
# 生成归档文件名
today = datetime.now()
if archive_type == 'daily':
archive_name = today.strftime('archive_%Y%m%d.zip')
elif archive_type == 'weekly':
archive_name = today.strftime('archive_%Y_week%W.zip')
elif archive_type == 'monthly':
archive_name = today.strftime('archive_%Y_%m.zip')
output_path = os.path.join(output_dir, archive_name)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(source_dir):
for file in files:
filepath = os.path.join(root, file)
mtime = os.path.getmtime(filepath)
mtime_dt = datetime.fromtimestamp(mtime)
# 根据归档类型过滤文件
if date_filter and mtime_dt < date_filter:
continue
arcname = os.path.relpath(filepath, source_dir)
zf.write(filepath, arcname=arcname)
print(f"归档完成: {output_path}")
# 每日归档(归档今天修改的文件)
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
archive_by_date('/var/log/myapp/', '/backups/logs/',
archive_type='daily', date_filter=today_start)
归档完整性校验
压缩文件的完整性至关重要——损坏的归档可能导致数据丢失。Python 的 zipfile 提供了 testzip() 方法检查 ZIP 文件完整性,而 tarfile 可以通过校验和(checksum)验证每个文件的完整性。下面的示例展示了如何进行完整性检查和修复建议。
import zipfile
import hashlib
import json
def create_archive_with_checksum(source_dir, output_zip):
"""
创建携带 SHA256 校验的归档
"""
checksums = {}
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(source_dir):
for file in files:
filepath = os.path.join(root, file)
arcname = os.path.relpath(filepath, source_dir)
# 计算文件 SHA256
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
while chunk := f.read(8192):
sha256.update(chunk)
checksums[arcname] = sha256.hexdigest()
zf.write(filepath, arcname=arcname)
# 将校验和信息写入归档
zf.writestr('.checksums.json', json.dumps(checksums, indent=2))
print(f"归档创建完成,携带 {len(checksums)} 个文件的校验和")
def verify_archive_integrity(zip_path):
"""
验证归档完整性
"""
with zipfile.ZipFile(zip_path, 'r') as zf:
# 1. 检查 ZIP 结构完整性
bad_file = zf.testzip()
if bad_file:
print(f"ZIP 结构损坏: {bad_file}")
return False
# 2. 验证校验和
if '.checksums.json' in zf.namelist():
stored_checksums = json.loads(zf.read('.checksums.json'))
for arcname, stored_hash in stored_checksums.items():
content = zf.read(arcname)
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != stored_hash:
print(f"校验和失败: {arcname}")
return False
print(f"完整性验证通过: {zip_path}")
return True
多线程压缩加速
对于包含大量小文件的归档操作,可以使用多线程并行读取文件来加速。需要特别注意的是:ZIP 写入操作本身是线程不安全的,需要使用 Lock 保护,且最终的写入必须在主线程完成。以下示例展示了使用 ThreadPoolExecutor 并行计算文件内容,然后同步写入 ZIP 的模式。
import zipfile
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
def parallel_compress(source_dir, output_zip, max_workers=4):
"""
使用多线程加速文件读取,然后创建压缩归档
"""
# 收集所有文件路径
file_list = []
for root, dirs, files in os.walk(source_dir):
for file in files:
filepath = os.path.join(root, file)
arcname = os.path.relpath(filepath, source_dir)
file_list.append((filepath, arcname))
print(f"共发现 {len(file_list)} 个文件,使用 {max_workers} 线程读取")
# 多线程读取文件内容
file_data = {}
lock = Lock()
def read_file(filepath, arcname):
with open(filepath, 'rb') as f:
content = f.read()
with lock:
file_data[arcname] = content
return arcname, len(content)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(read_file, fp, an)
for fp, an in file_list]
for future in as_completed(futures):
arcname, size = future.result()
print(f" 已读取: {arcname} ({size} bytes)")
# 单线程写入 ZIP(ZIP 写入不是线程安全的)
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for arcname, content in file_data.items():
zf.writestr(arcname, content)
print(f"并行压缩完成: {output_zip}")
九、实战案例
理论知识的最终目的是解决实际工作中的问题。本章通过三个完整的实战案例,将前面所学的压缩解压技术综合运用起来。每个案例都经过验证,可以直接应用于生产环境或稍作修改满足特定需求。
案例一:日志自动压缩归档系统
服务器日志文件增长非常迅速,如果不及时清理会占满磁盘。本案例实现了一个日志自动压缩归档系统,它按天扫描日志目录,将超过指定天数的日志文件压缩归档,并在确认压缩成功后删除原始文件。同时会生成归档清单文件记录每次操作。
import zipfile
import os
import gzip
import shutil
from datetime import datetime, timedelta
import json
class LogArchiveSystem:
"""
日志自动压缩归档系统
功能:将旧日志压缩归档,释放磁盘空间
"""
def __init__(self, log_dir, archive_dir, retention_days=7):
self.log_dir = log_dir
self.archive_dir = archive_dir
self.retention_days = retention_days
os.makedirs(archive_dir, exist_ok=True)
def archive_old_logs(self):
"""归档超过保留天数的日志文件"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
archive_manifest = []
for file in os.listdir(self.log_dir):
filepath = os.path.join(self.log_dir, file)
if not os.path.isfile(filepath):
continue
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime >= cutoff_date:
continue # 保留最近的文件
# 为每一天创建一个归档文件
date_str = mtime.strftime('%Y%m%d')
archive_file = os.path.join(
self.archive_dir, f'logs_{date_str}.zip')
# 追加到当天的归档
with zipfile.ZipFile(archive_file, 'a',
zipfile.ZIP_DEFLATED) as zf:
zf.write(filepath, arcname=file)
archive_manifest.append({
'file': file,
'archived_to': archive_file,
'original_size': os.path.getsize(filepath),
'archived_at': datetime.now().isoformat()
})
# 确认写入成功后删除原始文件
os.remove(filepath)
print(f"已归档并删除: {file} -> {archive_file}")
# 保存归档清单
if archive_manifest:
manifest_path = os.path.join(
self.archive_dir, 'archive_manifest.json')
with open(manifest_path, 'a') as f:
for entry in archive_manifest:
f.write(json.dumps(entry) + '\n')
return len(archive_manifest)
def restore_logs(self, archive_date_str, restore_dir):
"""从归档恢复指定日期的日志"""
archive_file = os.path.join(
self.archive_dir, f'logs_{archive_date_str}.zip')
if not os.path.exists(archive_file):
print(f"归档文件不存在: {archive_file}")
return
os.makedirs(restore_dir, exist_ok=True)
with zipfile.ZipFile(archive_file, 'r') as zf:
zf.extractall(path=restore_dir)
print(f"已恢复 {len(zf.namelist())} 个文件到 {restore_dir}")
# 使用示例
archiver = LogArchiveSystem(
log_dir='/var/log/myapp/',
archive_dir='/backups/logs/',
retention_days=7 # 保留最近 7 天的日志
)
count = archiver.archive_old_logs()
print(f"本次归档了 {count} 个文件")
案例二:文件备份工具
一个完善的文件备份工具需要支持增量备份、多格式输出、备份验证和备份清理等功能。本案例实现了一个功能完备的备份工具,可以根据需要选择 ZIP 或 TAR 格式,支持按文件模式过滤,并提供了备份完整性验证功能。
import tarfile
import zipfile
import os
import hashlib
from datetime import datetime
from pathlib import Path
class BackupTool:
"""
通用文件备份工具
支持 ZIP / TAR 格式,增量备份,完整性校验
"""
def __init__(self, backup_dir):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
def create_backup(self, source_dirs, format='tar.gz',
include_patterns=None, exclude_patterns=None,
comment=''):
"""创建备份"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == 'zip':
backup_file = self.backup_dir / f'backup_{timestamp}.zip'
self._create_zip_backup(source_dirs, backup_file,
include_patterns, exclude_patterns)
elif format == 'tar.gz':
backup_file = self.backup_dir / f'backup_{timestamp}.tar.gz'
self._create_tar_backup(source_dirs, backup_file, 'w:gz',
include_patterns, exclude_patterns)
elif format == 'tar.xz':
backup_file = self.backup_dir / f'backup_{timestamp}.tar.xz'
self._create_tar_backup(source_dirs, backup_file, 'w:xz',
include_patterns, exclude_patterns)
# 计算备份文件的校验和
sha256 = hashlib.sha256()
with open(backup_file, 'rb') as f:
while chunk := f.read(8192):
sha256.update(chunk)
checksum_file = backup_file.with_suffix(backup_file.suffix + '.sha256')
checksum_file.write_text(sha256.hexdigest() + ' ' + backup_file.name)
print(f"备份完成: {backup_file}")
print(f"SHA256: {sha256.hexdigest()}")
return str(backup_file)
def _create_zip_backup(self, source_dirs, output_file,
include_patterns, exclude_patterns):
"""创建 ZIP 格式备份"""
import fnmatch
with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as zf:
for src in source_dirs:
src_path = Path(src)
if not src_path.exists():
print(f"跳过: {src}")
continue
if src_path.is_file():
zf.write(src_path, arcname=src_path.name)
else:
for filepath in src_path.rglob('*'):
if filepath.is_file():
rel_path = filepath.relative_to(src_path.parent)
zf.write(filepath, arcname=str(rel_path))
def _create_tar_backup(self, source_dirs, output_file, mode,
include_patterns, exclude_patterns):
"""创建 TAR 格式备份"""
with tarfile.open(output_file, mode) as tar:
for src in source_dirs:
tar.add(src, arcname=Path(src).name,
filter=lambda x: x)
def verify_backup(self, backup_path):
"""验证备份完整性"""
backup_path = Path(backup_path)
checksum_file = backup_path.with_suffix(backup_path.suffix + '.sha256')
if checksum_file.exists():
stored_hash = checksum_file.read_text().split()[0]
sha256 = hashlib.sha256()
with open(backup_path, 'rb') as f:
while chunk := f.read(8192):
sha256.update(chunk)
if sha256.hexdigest() != stored_hash:
print("校验和不匹配!备份文件可能已损坏")
return False
# 验证归档结构
suffix = backup_path.suffix
try:
if suffix == '.zip':
with zipfile.ZipFile(backup_path, 'r') as zf:
bad = zf.testzip()
if bad:
print(f"ZIP 文件损坏: {bad}")
return False
elif suffix in ('.gz', '.xz', '.bz2'):
with tarfile.open(backup_path, 'r') as tar:
members = tar.getmembers()
print(f"归档包含 {len(members)} 个文件")
except Exception as e:
print(f"验证失败: {e}")
return False
print("备份完整性验证通过")
return True
# 使用示例
bt = BackupTool('/backups/project')
bt.create_backup(
source_dirs=['/path/to/source1', '/path/to/source2'],
format='tar.gz'
)
案例三:项目发布包生成器
在软件开发中,经常需要生成可供分发的发布包。本案例实现了一个项目发布包生成器,它会自动收集指定文件、生成版本信息、排除不需要的文件(如 .git、__pycache__ 等),并最终生成结构清晰的发布归档。
import zipfile
import os
import json
from datetime import datetime
class ReleasePackager:
"""
项目发布包生成器
自动收集项目文件,排除开发环境文件,生成版本归档
"""
def __init__(self, project_name, version, source_dir):
self.project_name = project_name
self.version = version
self.source_dir = source_dir
self.exclude_dirs = {
'.git', '__pycache__', 'node_modules',
'.idea', '.vscode', 'venv', '.venv', 'dist', 'build'
}
self.exclude_patterns = {
'*.pyc', '*.pyo', '*.so', '*.dll', '*.pdb',
'.DS_Store', 'Thumbs.db', '*.log', '*.tmp'
}
def create_release_package(self, output_dir='./dist/'):
"""生成发布包"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
package_name = f'{self.project_name}-v{self.version}-{timestamp}'
output_path = os.path.join(output_dir, f'{package_name}.zip')
import fnmatch
file_count = 0
total_size = 0
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加版本信息文件
version_info = {
'project': self.project_name,
'version': self.version,
'build_time': datetime.now().isoformat(),
'python_version': os.sys.version
}
zf.writestr(f'{package_name}/VERSION.json',
json.dumps(version_info, indent=2))
# 遍历并添加项目文件
for root, dirs, files in os.walk(self.source_dir):
# 排除目录
dirs[:] = [d for d in dirs if d not in self.exclude_dirs]
for file in files:
# 排除匹配模式的文件
if any(fnmatch.fnmatch(file, p) for p in self.exclude_patterns):
continue
filepath = os.path.join(root, file)
arcname = os.path.join(
package_name,
os.path.relpath(filepath, self.source_dir)
)
zf.write(filepath, arcname=arcname)
file_count += 1
total_size += os.path.getsize(filepath)
# 生成发布说明
readme = f"""
# {self.project_name} v{self.version}
- 构建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 文件数量: {file_count}
- 总大小: {total_size:,} bytes
- 发布包: {output_path}
"""
readme_path = os.path.join(output_dir, f'{package_name}_RELEASE_NOTES.md')
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(readme.strip())
print(f"发布包生成完成: {output_path}")
print(f"包含 {file_count} 个文件,共 {total_size:,} bytes")
print(f"发布说明: {readme_path}")
return output_path
# 使用示例
packager = ReleasePackager(
project_name='data-processor',
version='2.1.0',
source_dir='/path/to/project'
)
packager.create_release_package()
本章小结
通过以上三个实战案例,可以看到 Python 的压缩解压模块在实际生产环境中的广泛应用。从日志管理、文件备份到软件发布,zipfile、tarfile、gzip、bz2 和 lzma 等标准库模块提供了完整的解决方案。掌握这些工具后,可以轻松构建自动化的文件管理和数据处理管道。
在实际工作中,建议根据以下原则选择合适的方案:需要跨平台兼容和密码保护时选择 ZIP 格式;需要保留文件权限和在 Linux 环境中使用选择 TAR 格式;需要最高压缩比时选择 XZ 或 BZIP2;需要在 Web 应用中直接返回压缩数据时使用内存流模式。
核心要点总结:
格式选择 :ZIP 适合跨平台通用场景,TAR.GZ 适合 Linux 生态,XZ 追求最高压缩比
密码保护 :zipfile 支持传统 PKZIP 加密,但不支持 AES;需要更高安全性可借助第三方库
内存流压缩 :使用 BytesIO 配合压缩模块,实现零临时文件的纯内存压缩,非常适合 Web API
批量自动化 :结合 os.walk、多线程、日期判断等,构建定时自动归档系统
完整性校验 :始终在归档中加入校验和(SHA256),并在使用前验证归档完整性
安全提示 :提取 ZIP 时注意路径穿越攻击,始终对文件名进行规范化验证
性能取舍 :gzip 速度快压缩比低,bzip2 中等,lzma/xz 压缩比最高但创建最慢
工具链 :所有模块均为 Python 标准库自带,零依赖即可在任意 Python 环境中使用