zipfile/tarfile:压缩解压自动化

Python 办公自动化专题 · Python压缩与归档操作完全指南

专题:Python 自动化办公系统学习

关键词:Python, 自动化办公, zipfile, tarfile, 压缩解压, ZIP, TAR, 文件归档, Python自动化

一、压缩格式概述

在日常办公和系统运维中,文件压缩与归档是最常见也最基础的需求之一。Python 标准库提供了丰富的压缩与归档模块,让我们能够以纯 Python 代码处理几乎所有主流压缩格式,无需依赖外部命令行工具。本章首先对这些格式和对应的 Python 模块做全局性介绍。

压缩格式对比表

格式 Python 模块 算法 归档支持 压缩比 速度 典型后缀
ZIP zipfile DEFLATE / BZIP2 / LZMA 是(多文件) 中等 .zip
TAR tarfile 无(裸归档) 是(多文件) 无压缩 极快 .tar
TAR.GZ tarfile + gzip DEFLATE (gzip) 是(多文件) 中等 .tar.gz / .tgz
TAR.BZ2 tarfile + bz2 BZIP2 是(多文件) .tar.bz2 / .tbz2
TAR.XZ tarfile + lzma LZMA 是(多文件) 极高 中等 .tar.xz / .txz
GZIP gzip DEFLATE 否(单文件) 中等 .gz
BZIP2 bz2 BZIP2 否(单文件) .bz2
LZMA / XZ lzma LZMA 否(单文件) 极高 中等 .xz / .lzma

关键概念:归档 vs 压缩

归档(archiving)指的是将多个文件打包成一个文件的过程,而压缩(compression)则是通过算法减少数据体积。TAR 格式仅做归档不做压缩(因此常与其他压缩算法配合使用),而 ZIP 格式同时支持归档和压缩。理解这一区别有助于在实际项目中选择正确的工具。

选择何种格式主要取决于以下因素:跨平台兼容性优先时选择 ZIP(Windows / macOS / Linux 原生支持);追求高压缩比时选择 TAR.XZ 或 TAR.BZ2;需要流式处理日志之类的大文件时使用 GZIP 单文件压缩;需要保留 Unix 文件权限时使用 TAR 格式。

# 查看 Python 标准库中可用的压缩模块 import zipfile, tarfile, gzip, bz2, lzma print("zipfile version info:", zipfile.__name__) print("tarfile version info:", tarfile.__name__) # 所有模块均为 Python 标准库自带,无需 pip install

二、zipfile 基础操作

zipfile 是 Python 标准库中处理 ZIP 格式的核心模块。它支持创建、读取、写入和提取 ZIP 文件,并且内置对 DEFLATE、BZIP2 和 LZMA 三种压缩算法的支持。zipfile 最突出的优势是跨平台兼容性——ZIP 文件几乎可以在任何操作系统上无需额外工具直接打开。

创建 ZIP 文件

使用 ZipFile 类时,需要指定模式参数:'w' 为写入模式(会覆盖已有文件),'a' 为追加模式,'r' 为读取模式。通过 compression 参数选择压缩算法,可选值为 ZIP_STORED(仅存储不压缩)、ZIP_DEFLATED(DEFLATE 算法)、ZIP_BZIP2(BZIP2 算法)和 ZIP_LZMA(LZMA 算法)。

import zipfile import os # 创建 ZIP 文件并写入多个文件 with zipfile.ZipFile('archive.zip', 'w', compression=zipfile.ZIP_DEFLATED) as zf: # 将本地文件添加到 ZIP 中 zf.write('data/report1.csv', arcname='reports/report1.csv') zf.write('data/report2.csv', arcname='reports/report2.csv') # 使用 writestr 直接写入字符串内容 zf.writestr('config/settings.json', '{"theme": "dark", "lang": "zh"}') # 写入二进制数据 zf.writestr('data/raw.bin', b'\x00\x01\x02\x03') print("ZIP 文件创建完成")

读取和提取 ZIP 文件

读取 ZIP 文件时,可以使用 namelist() 获取所有文件列表,infolist() 获取包含元信息的 ZipInfo 对象列表,extract() 和 extractall() 方法用于提取文件。extractall() 支持 path 参数指定提取目录和 members 参数筛选提取的文件子集。

import zipfile # 读取 ZIP 文件 with zipfile.ZipFile('archive.zip', 'r') as zf: # 列出所有文件 for name in zf.namelist(): print(f" {name}") # 获取文件详细信息 for info in zf.infolist(): print(f"{info.filename} - 原大小: {info.file_size} B, " f"压缩后: {info.compress_size} B, " f"压缩率: {info.compress_size / info.file_size:.1%}") # 读取特定文件内容(不提取到磁盘) content = zf.read('config/settings.json') print("settings.json 内容:", content.decode('utf-8')) # 提取到指定目录 zf.extractall(path='extracted_files') # 只提取特定文件 zf.extract('config/settings.json', path='extracted_config')

压缩模式选择

ZIP_STORED 模式不进行压缩,仅将文件原样打包,速度最快但体积最大,适合已经压缩过的文件(如 JPEG、MP4)。ZIP_DEFLATED 是默认且最通用的压缩算法,在速度和压缩比之间取得了良好的平衡。ZIP_BZIP2 提供更高的压缩比但速度较慢,ZIP_LZMA 压缩比最高但兼容性稍差——某些旧版解压工具可能不支持。

import zipfile # 对比不同压缩算法的效果 with open('large_file.txt', 'r') as f: data = f.read() # STORED(不压缩) with zipfile.ZipFile('test_stored.zip', 'w', zipfile.ZIP_STORED) as zf: zf.writestr('large_file.txt', data) # DEFLATED(默认压缩) with zipfile.ZipFile('test_deflated.zip', 'w', zipfile.ZIP_DEFLATED) as zf: zf.writestr('large_file.txt', data) # BZIP2(高压缩比) with zipfile.ZipFile('test_bzip2.zip', 'w', zipfile.ZIP_BZIP2) as zf: zf.writestr('large_file.txt', data) import os for name in ['test_stored.zip', 'test_deflated.zip', 'test_bzip2.zip']: print(f"{name}: {os.path.getsize(name)} bytes")

三、zipfile 进阶功能

在掌握基础操作之后,zipfile 还提供了许多进阶功能,包括密码保护、压缩级别控制、路径规范化、ZIP64 大文件支持以及分卷 ZIP 等高级特性,这些功能在生产环境中非常实用。

密码加密与解密

zipfile 支持基于 PKZIP 2.0 加密的 ZIP 文件(传统加密)。使用 setpassword() 方法设置全局密码,或在提取时传入 pwd 参数。需要注意:标准 zipfile 模块不支持更安全的 AES-256 加密(如需 AES 加密,可考虑 pyzipper 第三方库)。

import zipfile # 创建加密 ZIP 文件 with zipfile.ZipFile('encrypted.zip', 'w', zipfile.ZIP_DEFLATED) as zf: zf.setpassword(b'my_secret_password') zf.write('confidential.txt', arcname='secret/confidential.txt') zf.writestr('notes.txt', '这是加密内容') # 读取加密 ZIP 文件 with zipfile.ZipFile('encrypted.zip', 'r') as zf: zf.setpassword(b'my_secret_password') # 列出所有文件(列表不需要密码) for name in zf.namelist(): print(f"文件: {name}") # 读取内容时需要密码 content = zf.read('notes.txt') print(content.decode('utf-8'))

压缩级别控制

对于 ZIP_DEFLATED 算法,可以通过 compresslevel 参数控制压缩级别(1-9),其中 1 为最快但压缩比最低,9 为最慢但压缩比最高。默认值为 6,在绝大多数场景下是合理的选择。

import zipfile # 测试不同压缩级别的效果 data = "Hello, World! " * 10000 # 可重复数据 for level in range(1, 10): with zipfile.ZipFile( f'compress_level_{level}.zip', 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=level ) as zf: zf.writestr('data.txt', data) import os for level in range(1, 10): size = os.path.getsize(f'compress_level_{level}.zip') print(f"级别 {level}: {size} bytes") # 对于 BZIP2 算法,compresslevel 同样有效 with zipfile.ZipFile( 'bzip2_best.zip', 'w', compression=zipfile.ZIP_BZIP2, compresslevel=9 ) as zf: zf.writestr('data.txt', data)

路径控制与安全处理

zipfile 的 write() 方法使用 arcname 参数控制文件在 ZIP 内部的存储路径。如果不指定 arcname,Python 会使用文件的完整路径,这可能导致解压时目录结构混乱。另外,提取文件时要注意路径穿越攻击——恶意构造的 ZIP 文件可能包含 ../ 路径。

import zipfile # 安全的路径处理和提取 from pathlib import Path def safe_extract(zip_path: str, extract_dir: str): """安全提取 ZIP,防止路径穿越攻击""" with zipfile.ZipFile(zip_path, 'r') as zf: for info in zf.infolist(): # 使用 Path 规范化路径 target_path = Path(extract_dir) / info.filename # 安全检查:确保目标路径在 extract_dir 下 if not str(target_path.resolve()).startswith(Path(extract_dir).resolve().as_posix()): print(f"跳过不安全路径: {info.filename}") continue zf.extract(info, extract_dir) # 添加注释 with zipfile.ZipFile('commented.zip', 'w', zipfile.ZIP_DEFLATED) as zf: zf.comment = b'这是ZIP文件注释,创建于2026年5月' zf.writestr('info.txt', '文件内容') # 也可以为单个文件添加注释 zf.getinfo('info.txt').comment = b'单个文件注释'

ZIP64 与分卷压缩

对于超过 4GB 的大文件或包含超过 65535 个文件的 ZIP,需要启用 ZIP64 扩展。Python 的 zipfile 在需要时自动启用 ZIP64 支持。分卷压缩(split ZIP)则是将 ZIP 文件分割为多个指定大小的文件段。

import zipfile # ZIP64 大文件支持(超过 4GB) with zipfile.ZipFile('large_archive.zip', 'w', compression=zipfile.ZIP_DEFLATED, allowZip64=True) as zf: zf.write('huge_file.dat') # 自动启用 ZIP64 # 分卷压缩(使用第三方库实现) # Python 标准 zipfile 不直接支持创建分卷, # 但可以使用以下方式手动分割 import shutil def create_split_zip(input_file, chunk_size_mb=10): """创建分卷 ZIP""" chunk_size = chunk_size_mb * 1024 * 1024 # 先创建完整 ZIP with zipfile.ZipFile('temp_full.zip', 'w', zipfile.ZIP_DEFLATED) as zf: zf.write(input_file) # 分割文件 with open('temp_full.zip', 'rb') as f: chunk_num = 0 while True: chunk = f.read(chunk_size) if not chunk: break with open(f'archive.z{chunk_num:02d}', 'wb') as cf: cf.write(chunk) chunk_num += 1 os.remove('temp_full.zip') print(f"已分割为 {chunk_num} 个分卷")

四、tarfile 基础操作

tarfile 模块是 Python 处理 TAR 归档的标准方式。与 zipfile 不同,tarfile 原生支持与各种压缩过滤器的组合——通过在 open() 时指定 mode 参数,可以自动处理 gzip、bzip2 和 xz 压缩的 TAR 文件。tarfile 在 Unix/Linux 生态中占据主导地位,特别适合保留文件权限、所有者信息等元数据。

创建 TAR 归档

使用 tarfile.open() 打开归档文件时,mode 参数由「文件模式 + 压缩过滤器」组成。例如 'w:gz' 表示写入并使用 gzip 压缩,'r:bz2' 表示读取 bzip2 压缩的归档。常用模式:'w'(无压缩写入)、'w:gz'(gzip 压缩)、'w:bz2'(bzip2 压缩)、'w:xz'(lzma/xz 压缩)。

import tarfile # 创建 tar.gz 压缩归档 with tarfile.open('backup.tar.gz', 'w:gz') as tar: tar.add('data/reports/', arcname='reports') tar.add('config/app.ini', arcname='config/app.ini') tar.add('logs/', arcname='logs', filter=lambda x: None if x.name.endswith('.tmp') else x) print("tar.gz 归档创建完成") # 创建 tar.bz2(更高压缩比) with tarfile.open('backup.tar.bz2', 'w:bz2') as tar: tar.add('data/') tar.add('config/') # 创建 tar.xz(最高压缩比) with tarfile.open('backup.tar.xz', 'w:xz') as tar: tar.add('data/') tar.add('config/')

读取和提取 TAR 归档

读取 TAR 归档时,tarfile 会自动检测压缩格式。getnames() 和 getmembers() 用于列出归档内容。extractall() 和 extract() 用于提取文件。与 zipfile 相比,tarfile 的 extractall() 会默认保留文件权限和所有者信息。

import tarfile # 读取 tar.gz 文件 with tarfile.open('backup.tar.gz', 'r:gz') as tar: # 列出所有文件 for member in tar.getmembers(): print(f"{member.name:40s} {member.size:>10d}B " f"权限: {oct(member.mode)[-3:]} " f"类型: {'目录' if member.isdir() else '文件'}") # 提取所有内容 tar.extractall(path='restored_backup/') # 自动检测压缩格式(不指定压缩方式) with tarfile.open('backup.tar.xz', 'r') as tar: # 'r' 模式会自动检测 gz/bz2/xz tar.extractall(path='restored_xz/') # 读取特定文件内容 with tarfile.open('backup.tar.gz', 'r:gz') as tar: f = tar.extractfile('config/app.ini') if f: content = f.read().decode('utf-8') print(content)

压缩过滤器详解

tarfile 支持四种压缩过滤器:无压缩('')、gzip(':gz')、bzip2(':bz2')和 xz(':xz')。选择依据:gzip 速度最快且兼容性最好;bzip2 压缩比更高但速度慢 3-5 倍;xz 压缩比最高(通常比 bzip2 再小 30-50%)且解压速度尚可,但创建时非常消耗 CPU。在自动化办公场景中,tar.gz 是最常见的选择。

import tarfile import os # 对比 TAR 压缩格式 test_dir = 'data_to_archive/' # 无压缩 with tarfile.open('test.tar', 'w') as tar: tar.add(test_dir) print(f"未压缩: {os.path.getsize('test.tar')} bytes") # gzip 压缩 with tarfile.open('test.tar.gz', 'w:gz') as tar: tar.add(test_dir) print(f"gzip: {os.path.getsize('test.tar.gz')} bytes") # bzip2 压缩 with tarfile.open('test.tar.bz2', 'w:bz2') as tar: tar.add(test_dir) print(f"bzip2: {os.path.getsize('test.tar.bz2')} bytes") # xz 压缩 with tarfile.open('test.tar.xz', 'w:xz') as tar: tar.add(test_dir) print(f"xz: {os.path.getsize('test.tar.xz')} bytes")

五、tarfile 进阶功能

tarfile 提供了丰富的进阶功能,包括增量归档、排除模式、文件过滤、权限保留以及硬链接和软链接处理。这些特性使得 tarfile 在系统备份和部署场景中比 zipfile 更为灵活和强大。

文件过滤与排除模式

tar.add() 方法的 filter 参数接受一个函数,可以对每个待添加的文件进行过滤或修改。返回 None 则跳过该文件,返回 TarInfo 对象则使用修改后的元信息。这在创建排除缓存文件、临时文件的备份时非常有用。

import tarfile def archive_filter(tar_info): """过滤不需要的文件""" name = tar_info.name # 排除 Python 缓存目录 if '__pycache__' in name or '.pyc' in name: return None # 排除 .git 目录 if '.git/' in name: return None # 排除临时文件和日志 if name.endswith(('.tmp', '.log', '.cache')): return None # 修改文件权限示例 if tar_info.issym() or tar_info.islnk(): tar_info.mode = 0o777 # 链接文件保持可访问 return tar_info with tarfile.open('project_clean.tar.gz', 'w:gz') as tar: tar.add('/path/to/project/', arcname='project', filter=archive_filter) # 也可以使用更简洁的列表排除方式 import fnmatch def exclude_patterns(tar_info, patterns=['*.pyc', '__pycache__', '*.tmp']): for pattern in patterns: if fnmatch.fnmatch(tar_info.name, pattern): return None return tar_info with tarfile.open('filtered.tar.gz', 'w:gz') as tar: tar.add('src/', filter=exclude_patterns)

增量归档与追加模式

tarfile 不支持直接向已压缩的 tar.gz 文件追加内容(因为压缩流是连续且不可追加的)。但可以向未压缩的 .tar 文件追加内容,或者使用以下技巧实现增量备份:每次都重新创建归档并在添加时跳过已存在的文件。

import tarfile import os import time def incremental_backup(source_dir, backup_file, last_backup_time=None): """ 增量备份:仅备份最后修改时间晚于 last_backup_time 的文件 """ with tarfile.open(backup_file, 'w:gz') as tar: for root, dirs, files in os.walk(source_dir): for file in files: filepath = os.path.join(root, file) mtime = os.path.getmtime(filepath) if last_backup_time is None or mtime > last_backup_time: arcname = os.path.relpath(filepath, source_dir) tar.add(filepath, arcname=arcname) print(f" 添加: {arcname}") # 首次全量备份 incremental_backup('/path/to/data', 'full_backup.tar.gz') prev_time = time.time() # 一天后增量备份(仅备份新增和修改的文件) time.sleep(86400) incremental_backup('/path/to/data', 'daily_backup.tar.gz', last_backup_time=prev_time) # 向未压缩的 tar 追加文件 with tarfile.open('appended.tar', 'a') as tar: tar.add('new_file.txt')

权限保留与元数据处理

tarfile 的一个突出优势是能够保留文件的 Unix 权限、所有者(uid/gid)、修改时间等元数据。在处理系统配置文件、软件部署包时,这一特性至关重要。通过修改 TarInfo 对象的属性,可以在创建归档时自定义这些元信息。

import tarfile import pwd import grp # 保留权限创建归档 with tarfile.open('preserve_perm.tar.gz', 'w:gz') as tar: tar.add('config/', filter=lambda x: x) # 提取时保留权限 with tarfile.open('preserve_perm.tar.gz', 'r:gz') as tar: tar.extractall(path='./restored/', numeric_owner=True) # 自定义文件的元信息 with tarfile.open('custom_meta.tar', 'w') as tar: # 手动创建 TarInfo 对象 info = tarfile.TarInfo(name='custom_file.txt') info.size = len(b'Hello, custom file!') info.mtime = int(time.time()) info.mode = 0o644 # rw-r--r-- info.uid = 1000 info.gid = 1000 info.uname = 'developer' info.gname = 'developers' info.type = tarfile.REGTYPE # 普通文件 tar.addfile(info, io.BytesIO(b'Hello, custom file!'))

硬链接与软链接处理

tarfile 能够识别和处理文件系统中的硬链接和软链接(符号链接)。在归档时,如果是硬链接且链接目标已在归档中,tarfile 会自动将其存储为硬链接而非重复存储文件内容,从而节省空间。软链接则以链接本身的形式存储。

import tarfile import os # 创建包含链接的示例 os.makedirs('link_test', exist_ok=True) with open('link_test/original.txt', 'w') as f: f.write('原始文件内容') os.link('link_test/original.txt', 'link_test/hardlink.txt') # 硬链接 os.symlink('original.txt', 'link_test/softlink.txt') # 软链接 # 归档并查看链接处理方式 with tarfile.open('links_archive.tar', 'w') as tar: tar.add('link_test/') # 提取并检查链接是否保留 with tarfile.open('links_archive.tar', 'r') as tar: for member in tar.getmembers(): if member.issym(): print(f"符号链接: {member.name} -> {member.linkname}") elif member.islnk(): print(f"硬链接: {member.name} -> {member.linkname}") else: print(f"普通文件: {member.name}") tar.extractall(path='./extracted_links/')

六、gzip / bzip2 / lzma 单文件压缩

除了 zipfile 和 tarfile 这两个归档模块,Python 还提供了 gzip、bz2 和 lzma 三个专门进行单文件压缩的模块。它们不处理归档逻辑,而是聚焦于数据的压缩与解压。这些模块既可以独立使用(用于压缩单个文件或数据流),也可以与 tarfile 配合(作为底层的压缩引擎)。

gzip 模块

gzip 模块提供了对 GNU gzip 格式的读写支持。其使用方式与内置的 open() 函数非常相似——可以用 gzip.open() 直接读写 .gz 文件。它同样支持文件对象级别的操作,适用于逐行处理大文件。

import gzip # 写入 gzip 压缩文件 data = b"这是一段需要压缩的文本内容。gzip 适合单文件压缩。" with gzip.open('example.txt.gz', 'wb') as f: f.write(data) # 读取 gzip 压缩文件 with gzip.open('example.txt.gz', 'rb') as f: content = f.read() print(content.decode('utf-8')) # 指定压缩级别(1-9) with gzip.open('best_compress.gz', 'wb', compresslevel=9) as f: f.write(data) # 追加到已存在的 gzip 文件 with gzip.open('example.txt.gz', 'ab') as f: f.write(b"\n更多追加内容") # 逐行读取(适合处理压缩的日志文件) with gzip.open('large_log.gz', 'rt', encoding='utf-8') as f: for line in f: if 'ERROR' in line: print(f"发现错误: {line.strip()}")

bzip2 模块

bz2 模块提供了对 bzip2 格式的支持。bzip2 的压缩比通常比 gzip 高 15-30%,但压缩速度慢 3-5 倍。在需要更小的文件体积且压缩时间不是瓶颈时(如发布包、离线传输)非常适合使用 bz2 格式。

import bz2 # 写入 bzip2 压缩文件 data = b"需要压缩的数据内容。bzip2 压缩比更高。" with bz2.open('example.bz2', 'wb') as f: f.write(data) # 读取 bzip2 压缩文件 with bz2.open('example.bz2', 'rb') as f: content = f.read() print(content.decode('utf-8')) # 文本模式读写 with bz2.open('example.bz2', 'wt', encoding='utf-8') as f: f.write("文本模式写入的 bzip2 压缩文件") # 内存压缩(不写文件) compressed = bz2.compress(data * 100) print(f"原始大小: {len(data * 100)}, 压缩后: {len(compressed)}") # 内存解压 decompressed = bz2.decompress(compressed) print(decompressed.decode('utf-8'))

lzma 模块

lzma 模块支持 .xz 和 .lzma 两种格式。LZMA 算法提供三者中最高的压缩比,通常比 bzip2 再小 30-50%,同时解压速度也比较快。其创建速度是三者中最慢的,但解压速度接近 gzip。对于发布包、软件分发和存档场景非常理想。

import lzma import os # 使用 lzma.open(默认 .xz 格式) data = b"需要压缩的数据。LZMA 提供最高压缩比。" with lzma.open('example.xz', 'wb') as f: f.write(data) # 指定压缩级别(0-9) with lzma.open('example_lv9.xz', 'wb', preset=9) as f: f.write(data) # 内存压缩 compressed = lzma.compress(data * 100) decompressed = lzma.decompress(compressed) # 对比三种格式的压缩效果 import gzip, bz2, lzma sample = b"A" * 10000 + b"B" * 10000 + b"C" * 50000 gzip_data = gzip.compress(sample) bz2_data = bz2.compress(sample) lzma_data = lzma.compress(sample) print(f"原始大小: {len(sample)} bytes") print(f"gzip: {len(gzip_data)} bytes (压缩比: {len(gzip_data)/len(sample):.2%})") print(f"bz2: {len(bz2_data)} bytes (压缩比: {len(bz2_data)/len(sample):.2%})") print(f"lzma: {len(lzma_data)} bytes (压缩比: {len(lzma_data)/len(sample):.2%})")

七、内存流与网络传输

在实际应用中,很多时候我们并不需要将压缩文件写入磁盘——而是希望在内存中完成压缩后直接通过网络传输。Python 的 BytesIO 配合压缩模块可以实现完全在内存中进行的压缩和解压操作,避免创建临时文件,特别适合 Web 应用和微服务场景。

BytesIO 内存流压缩

使用 io.BytesIO 作为虚拟文件缓冲区,结合 zipfile 或 tarfile 的文件类接口,可以实现纯内存压缩。这种方法在 Web API 中非常常见——例如从数据库读取文件后直接压缩返回给客户端。

import zipfile import io # 在内存中创建 ZIP 文件 buffer = io.BytesIO() with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: zf.writestr('document.txt', '这是文档内容') zf.writestr('data.csv', 'id,name,age\n1,Alice,30\n2,Bob,25') # 获取压缩后的数据(无需写入磁盘) zip_data = buffer.getvalue() print(f"内存 ZIP 大小: {len(zip_data)} bytes") # 从内存中读取 ZIP read_buffer = io.BytesIO(zip_data) with zipfile.ZipFile(read_buffer, 'r') as zf: for name in zf.namelist(): content = zf.read(name) print(f"{name}: {content.decode('utf-8')[:50]}...")

网络流式压缩传输

对于大文件,全部读入内存再压缩可能占用太多内存。可以使用流式处理——从源文件边读边压缩,或使用分块传输编码。下面的示例展示了如何将多个文件逐个添加到内存 ZIP 中并通过 HTTP 响应返回。

import zipfile import io from flask import Flask, Response, send_file # 仅在 Flask 应用中使用 app = Flask(__name__) @app.route('/download/reports') def download_reports(): """从数据库读取文件并在内存中压缩后返回""" buffer = io.BytesIO() with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: # 模拟从数据库读取文件 reports = { 'report_2026_01.csv': 'date,revenue\n2026-01,100000', 'report_2026_02.csv': 'date,revenue\n2026-02,120000', 'summary.txt': '2026年第一季度总结报告', } for filename, content in reports.items(): zf.writestr(filename, content) buffer.seek(0) return send_file( buffer, mimetype='application/zip', as_attachment=True, download_name='reports.zip' ) # 流式压缩大文件(使用 tarfile + gzip) def stream_compress_files(file_paths): """将多个文件流式压缩为 tar.gz""" buffer = io.BytesIO() with tarfile.open(fileobj=buffer, mode='w:gz') as tar: for filepath in file_paths: # 直接将文件内容添加到 tar 包 with open(filepath, 'rb') as f: content = f.read() info = tarfile.TarInfo(name=filepath.split('/')[-1]) info.size = len(content) tar.addfile(info, io.BytesIO(content)) print(f"已添加: {filepath}") buffer.seek(0) return buffer

将压缩数据直接上传到云存储

在云原生架构中,通常需要将压缩后的数据直接上传到 S3、OSS 或 MinIO 等对象存储。结合内存流压缩和云存储 SDK,可以实现零临时文件的纯内存管道。

import zipfile import io import boto3 # 仅示例,实际需安装 def upload_compressed_to_s3(files_dict, bucket, key): """ 在内存中压缩数据并直接上传到 S3 files_dict: {filename: content_string_or_bytes} """ buffer = io.BytesIO() with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: for name, content in files_dict.items(): if isinstance(content, str): zf.writestr(name, content.encode('utf-8')) else: zf.writestr(name, content) buffer.seek(0) s3_client = boto3.client('s3') s3_client.upload_fileobj(buffer, bucket, key, { 'ContentType': 'application/zip' }) print(f"已上传 s3://{bucket}/{key}") # 使用示例 files = { 'data1.json': '{"key": "value1"}', 'data2.json': '{"key": "value2"}', } # upload_compressed_to_s3(files, 'my-bucket', 'archive/data.zip')

八、批量归档与自动化

在真实的办公自动化场景中,我们通常需要处理的是批量文件和定期任务——例如每天压缩日志文件、按日期归档项目文件、创建增量备份等。本章介绍如何将前面学到的压缩技术组合起来,构建实用的批量归档自动化脚本。

目录批量压缩

批量处理目录的核心是遍历文件树,根据规则选择需要归档的文件。以下函数通过 os.walk 递归遍历目录,并使用 fnmatch 模式匹配筛选文件。它还支持排除规则,可以灵活控制归档内容。

import zipfile import os import fnmatch from datetime import datetime def batch_compress_directories(source_dirs, output_zip, include_patterns=['*'], exclude_patterns=[]): """ 批量压缩多个目录到单个 ZIP 文件 source_dirs: 源目录列表 output_zip: 输出 ZIP 文件路径 include_patterns: 包含的文件通配符列表 exclude_patterns: 排除的文件通配符列表 """ with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf: for src_dir in source_dirs: if not os.path.exists(src_dir): print(f"跳过不存在的目录: {src_dir}") continue for root, dirs, files in os.walk(src_dir): for file in files: filepath = os.path.join(root, file) rel_path = os.path.relpath(filepath, start=os.path.dirname(src_dir)) # 检查包含模式 if not any(fnmatch.fnmatch(file, p) for p in include_patterns): continue # 检查排除模式 if any(fnmatch.fnmatch(file, p) for p in exclude_patterns): continue zf.write(filepath, arcname=rel_path) print(f" 已添加: {rel_path}") total = sum(f.file_size for f in zf.infolist()) print(f"归档完成: {output_zip}, 共 {len(zf.infolist())} 个文件") # 压缩项目目录,排除缓存和虚拟环境 batch_compress_directories( ['/path/to/project1', '/path/to/project2'], 'projects_backup.zip', include_patterns=['*.py', '*.txt', '*.json', '*.yaml', '*.md'], exclude_patterns=['__pycache__', '*.pyc', 'node_modules', '.git'] )

按日期归档

定期自动归档是办公自动化的常见需求。以下脚本实现了按日期命名归档文件的功能,可以配合操作系统的定时任务(cron / 任务计划程序)实现周期性的自动化备份。

import zipfile import os from datetime import datetime, timedelta def archive_by_date(source_dir, output_dir, archive_type='daily', date_filter=None): """ 按日期归档文件 archive_type: 'daily' | 'weekly' | 'monthly' date_filter: 可选,只归档指定日期之后的文件 """ os.makedirs(output_dir, exist_ok=True) # 生成归档文件名 today = datetime.now() if archive_type == 'daily': archive_name = today.strftime('archive_%Y%m%d.zip') elif archive_type == 'weekly': archive_name = today.strftime('archive_%Y_week%W.zip') elif archive_type == 'monthly': archive_name = today.strftime('archive_%Y_%m.zip') output_path = os.path.join(output_dir, archive_name) with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(source_dir): for file in files: filepath = os.path.join(root, file) mtime = os.path.getmtime(filepath) mtime_dt = datetime.fromtimestamp(mtime) # 根据归档类型过滤文件 if date_filter and mtime_dt < date_filter: continue arcname = os.path.relpath(filepath, source_dir) zf.write(filepath, arcname=arcname) print(f"归档完成: {output_path}") # 每日归档(归档今天修改的文件) today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) archive_by_date('/var/log/myapp/', '/backups/logs/', archive_type='daily', date_filter=today_start)

归档完整性校验

压缩文件的完整性至关重要——损坏的归档可能导致数据丢失。Python 的 zipfile 提供了 testzip() 方法检查 ZIP 文件完整性,而 tarfile 可以通过校验和(checksum)验证每个文件的完整性。下面的示例展示了如何进行完整性检查和修复建议。

import zipfile import hashlib import json def create_archive_with_checksum(source_dir, output_zip): """ 创建携带 SHA256 校验的归档 """ checksums = {} with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(source_dir): for file in files: filepath = os.path.join(root, file) arcname = os.path.relpath(filepath, source_dir) # 计算文件 SHA256 sha256 = hashlib.sha256() with open(filepath, 'rb') as f: while chunk := f.read(8192): sha256.update(chunk) checksums[arcname] = sha256.hexdigest() zf.write(filepath, arcname=arcname) # 将校验和信息写入归档 zf.writestr('.checksums.json', json.dumps(checksums, indent=2)) print(f"归档创建完成,携带 {len(checksums)} 个文件的校验和") def verify_archive_integrity(zip_path): """ 验证归档完整性 """ with zipfile.ZipFile(zip_path, 'r') as zf: # 1. 检查 ZIP 结构完整性 bad_file = zf.testzip() if bad_file: print(f"ZIP 结构损坏: {bad_file}") return False # 2. 验证校验和 if '.checksums.json' in zf.namelist(): stored_checksums = json.loads(zf.read('.checksums.json')) for arcname, stored_hash in stored_checksums.items(): content = zf.read(arcname) actual_hash = hashlib.sha256(content).hexdigest() if actual_hash != stored_hash: print(f"校验和失败: {arcname}") return False print(f"完整性验证通过: {zip_path}") return True

多线程压缩加速

对于包含大量小文件的归档操作,可以使用多线程并行读取文件来加速。需要特别注意的是:ZIP 写入操作本身是线程不安全的,需要使用 Lock 保护,且最终的写入必须在主线程完成。以下示例展示了使用 ThreadPoolExecutor 并行计算文件内容,然后同步写入 ZIP 的模式。

import zipfile import os from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock def parallel_compress(source_dir, output_zip, max_workers=4): """ 使用多线程加速文件读取,然后创建压缩归档 """ # 收集所有文件路径 file_list = [] for root, dirs, files in os.walk(source_dir): for file in files: filepath = os.path.join(root, file) arcname = os.path.relpath(filepath, source_dir) file_list.append((filepath, arcname)) print(f"共发现 {len(file_list)} 个文件,使用 {max_workers} 线程读取") # 多线程读取文件内容 file_data = {} lock = Lock() def read_file(filepath, arcname): with open(filepath, 'rb') as f: content = f.read() with lock: file_data[arcname] = content return arcname, len(content) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(read_file, fp, an) for fp, an in file_list] for future in as_completed(futures): arcname, size = future.result() print(f" 已读取: {arcname} ({size} bytes)") # 单线程写入 ZIP(ZIP 写入不是线程安全的) with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zf: for arcname, content in file_data.items(): zf.writestr(arcname, content) print(f"并行压缩完成: {output_zip}")

九、实战案例

理论知识的最终目的是解决实际工作中的问题。本章通过三个完整的实战案例,将前面所学的压缩解压技术综合运用起来。每个案例都经过验证,可以直接应用于生产环境或稍作修改满足特定需求。

案例一:日志自动压缩归档系统

服务器日志文件增长非常迅速,如果不及时清理会占满磁盘。本案例实现了一个日志自动压缩归档系统,它按天扫描日志目录,将超过指定天数的日志文件压缩归档,并在确认压缩成功后删除原始文件。同时会生成归档清单文件记录每次操作。

import zipfile import os import gzip import shutil from datetime import datetime, timedelta import json class LogArchiveSystem: """ 日志自动压缩归档系统 功能:将旧日志压缩归档,释放磁盘空间 """ def __init__(self, log_dir, archive_dir, retention_days=7): self.log_dir = log_dir self.archive_dir = archive_dir self.retention_days = retention_days os.makedirs(archive_dir, exist_ok=True) def archive_old_logs(self): """归档超过保留天数的日志文件""" cutoff_date = datetime.now() - timedelta(days=self.retention_days) archive_manifest = [] for file in os.listdir(self.log_dir): filepath = os.path.join(self.log_dir, file) if not os.path.isfile(filepath): continue mtime = datetime.fromtimestamp(os.path.getmtime(filepath)) if mtime >= cutoff_date: continue # 保留最近的文件 # 为每一天创建一个归档文件 date_str = mtime.strftime('%Y%m%d') archive_file = os.path.join( self.archive_dir, f'logs_{date_str}.zip') # 追加到当天的归档 with zipfile.ZipFile(archive_file, 'a', zipfile.ZIP_DEFLATED) as zf: zf.write(filepath, arcname=file) archive_manifest.append({ 'file': file, 'archived_to': archive_file, 'original_size': os.path.getsize(filepath), 'archived_at': datetime.now().isoformat() }) # 确认写入成功后删除原始文件 os.remove(filepath) print(f"已归档并删除: {file} -> {archive_file}") # 保存归档清单 if archive_manifest: manifest_path = os.path.join( self.archive_dir, 'archive_manifest.json') with open(manifest_path, 'a') as f: for entry in archive_manifest: f.write(json.dumps(entry) + '\n') return len(archive_manifest) def restore_logs(self, archive_date_str, restore_dir): """从归档恢复指定日期的日志""" archive_file = os.path.join( self.archive_dir, f'logs_{archive_date_str}.zip') if not os.path.exists(archive_file): print(f"归档文件不存在: {archive_file}") return os.makedirs(restore_dir, exist_ok=True) with zipfile.ZipFile(archive_file, 'r') as zf: zf.extractall(path=restore_dir) print(f"已恢复 {len(zf.namelist())} 个文件到 {restore_dir}") # 使用示例 archiver = LogArchiveSystem( log_dir='/var/log/myapp/', archive_dir='/backups/logs/', retention_days=7 # 保留最近 7 天的日志 ) count = archiver.archive_old_logs() print(f"本次归档了 {count} 个文件")

案例二:文件备份工具

一个完善的文件备份工具需要支持增量备份、多格式输出、备份验证和备份清理等功能。本案例实现了一个功能完备的备份工具,可以根据需要选择 ZIP 或 TAR 格式,支持按文件模式过滤,并提供了备份完整性验证功能。

import tarfile import zipfile import os import hashlib from datetime import datetime from pathlib import Path class BackupTool: """ 通用文件备份工具 支持 ZIP / TAR 格式,增量备份,完整性校验 """ def __init__(self, backup_dir): self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(parents=True, exist_ok=True) def create_backup(self, source_dirs, format='tar.gz', include_patterns=None, exclude_patterns=None, comment=''): """创建备份""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if format == 'zip': backup_file = self.backup_dir / f'backup_{timestamp}.zip' self._create_zip_backup(source_dirs, backup_file, include_patterns, exclude_patterns) elif format == 'tar.gz': backup_file = self.backup_dir / f'backup_{timestamp}.tar.gz' self._create_tar_backup(source_dirs, backup_file, 'w:gz', include_patterns, exclude_patterns) elif format == 'tar.xz': backup_file = self.backup_dir / f'backup_{timestamp}.tar.xz' self._create_tar_backup(source_dirs, backup_file, 'w:xz', include_patterns, exclude_patterns) # 计算备份文件的校验和 sha256 = hashlib.sha256() with open(backup_file, 'rb') as f: while chunk := f.read(8192): sha256.update(chunk) checksum_file = backup_file.with_suffix(backup_file.suffix + '.sha256') checksum_file.write_text(sha256.hexdigest() + ' ' + backup_file.name) print(f"备份完成: {backup_file}") print(f"SHA256: {sha256.hexdigest()}") return str(backup_file) def _create_zip_backup(self, source_dirs, output_file, include_patterns, exclude_patterns): """创建 ZIP 格式备份""" import fnmatch with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as zf: for src in source_dirs: src_path = Path(src) if not src_path.exists(): print(f"跳过: {src}") continue if src_path.is_file(): zf.write(src_path, arcname=src_path.name) else: for filepath in src_path.rglob('*'): if filepath.is_file(): rel_path = filepath.relative_to(src_path.parent) zf.write(filepath, arcname=str(rel_path)) def _create_tar_backup(self, source_dirs, output_file, mode, include_patterns, exclude_patterns): """创建 TAR 格式备份""" with tarfile.open(output_file, mode) as tar: for src in source_dirs: tar.add(src, arcname=Path(src).name, filter=lambda x: x) def verify_backup(self, backup_path): """验证备份完整性""" backup_path = Path(backup_path) checksum_file = backup_path.with_suffix(backup_path.suffix + '.sha256') if checksum_file.exists(): stored_hash = checksum_file.read_text().split()[0] sha256 = hashlib.sha256() with open(backup_path, 'rb') as f: while chunk := f.read(8192): sha256.update(chunk) if sha256.hexdigest() != stored_hash: print("校验和不匹配!备份文件可能已损坏") return False # 验证归档结构 suffix = backup_path.suffix try: if suffix == '.zip': with zipfile.ZipFile(backup_path, 'r') as zf: bad = zf.testzip() if bad: print(f"ZIP 文件损坏: {bad}") return False elif suffix in ('.gz', '.xz', '.bz2'): with tarfile.open(backup_path, 'r') as tar: members = tar.getmembers() print(f"归档包含 {len(members)} 个文件") except Exception as e: print(f"验证失败: {e}") return False print("备份完整性验证通过") return True # 使用示例 bt = BackupTool('/backups/project') bt.create_backup( source_dirs=['/path/to/source1', '/path/to/source2'], format='tar.gz' )

案例三:项目发布包生成器

在软件开发中,经常需要生成可供分发的发布包。本案例实现了一个项目发布包生成器,它会自动收集指定文件、生成版本信息、排除不需要的文件(如 .git、__pycache__ 等),并最终生成结构清晰的发布归档。

import zipfile import os import json from datetime import datetime class ReleasePackager: """ 项目发布包生成器 自动收集项目文件,排除开发环境文件,生成版本归档 """ def __init__(self, project_name, version, source_dir): self.project_name = project_name self.version = version self.source_dir = source_dir self.exclude_dirs = { '.git', '__pycache__', 'node_modules', '.idea', '.vscode', 'venv', '.venv', 'dist', 'build' } self.exclude_patterns = { '*.pyc', '*.pyo', '*.so', '*.dll', '*.pdb', '.DS_Store', 'Thumbs.db', '*.log', '*.tmp' } def create_release_package(self, output_dir='./dist/'): """生成发布包""" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') package_name = f'{self.project_name}-v{self.version}-{timestamp}' output_path = os.path.join(output_dir, f'{package_name}.zip') import fnmatch file_count = 0 total_size = 0 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: # 添加版本信息文件 version_info = { 'project': self.project_name, 'version': self.version, 'build_time': datetime.now().isoformat(), 'python_version': os.sys.version } zf.writestr(f'{package_name}/VERSION.json', json.dumps(version_info, indent=2)) # 遍历并添加项目文件 for root, dirs, files in os.walk(self.source_dir): # 排除目录 dirs[:] = [d for d in dirs if d not in self.exclude_dirs] for file in files: # 排除匹配模式的文件 if any(fnmatch.fnmatch(file, p) for p in self.exclude_patterns): continue filepath = os.path.join(root, file) arcname = os.path.join( package_name, os.path.relpath(filepath, self.source_dir) ) zf.write(filepath, arcname=arcname) file_count += 1 total_size += os.path.getsize(filepath) # 生成发布说明 readme = f""" # {self.project_name} v{self.version} - 构建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 文件数量: {file_count} - 总大小: {total_size:,} bytes - 发布包: {output_path} """ readme_path = os.path.join(output_dir, f'{package_name}_RELEASE_NOTES.md') with open(readme_path, 'w', encoding='utf-8') as f: f.write(readme.strip()) print(f"发布包生成完成: {output_path}") print(f"包含 {file_count} 个文件,共 {total_size:,} bytes") print(f"发布说明: {readme_path}") return output_path # 使用示例 packager = ReleasePackager( project_name='data-processor', version='2.1.0', source_dir='/path/to/project' ) packager.create_release_package()

本章小结

通过以上三个实战案例,可以看到 Python 的压缩解压模块在实际生产环境中的广泛应用。从日志管理、文件备份到软件发布,zipfile、tarfile、gzip、bz2 和 lzma 等标准库模块提供了完整的解决方案。掌握这些工具后,可以轻松构建自动化的文件管理和数据处理管道。

在实际工作中,建议根据以下原则选择合适的方案:需要跨平台兼容和密码保护时选择 ZIP 格式;需要保留文件权限和在 Linux 环境中使用选择 TAR 格式;需要最高压缩比时选择 XZ 或 BZIP2;需要在 Web 应用中直接返回压缩数据时使用内存流模式。

核心要点总结: