序列化与反序列化

Python进阶编程专题 · Python数据持久化的多种方案

专题:Python进阶编程系统学习

关键词:Python, 序列化, pickle, json, msgpack, __getstate__, __reduce__, JSONLines

一、概述

序列化(Serialization)是指将内存中的数据结构或对象状态转换为可存储或可传输的格式(如字节流、JSON字符串)的过程。反序列化(Deserialization)则是其逆过程,将存储格式还原为内存中的数据结构。这两个过程构成了数据持久化和网络通信的基础设施。

Python作为一门广泛应用于数据处理、后端开发和机器学习领域的语言,提供了多种序列化方案,每种方案都有其独特的适用场景和性能特征。理解并正确选择序列化方案,对构建高效、安全的Python应用至关重要。

本文将从实践角度出发,系统地介绍Python中主流的序列化技术——picklejsonmsgpack,并深入探讨它们的进阶用法、性能对比和安全注意事项。我们还将讨论在大数据场景下的序列化优化策略,帮助你在实际项目中做出最优选择。

序列化的核心价值

二、pickle模块详解

pickle是Python内置的序列化模块,它实现了Python对象与字节流之间的相互转换。与其他序列化方案不同,pickle是Python专用的协议,能够序列化几乎任意Python对象,包括自定义类实例、嵌套数据结构、函数引用等。这是其最大的优势,也是最需要谨慎使用的特性。

2.1 基本使用

pickle提供了两个核心函数——dump()/dumps()用于序列化,load()/loads()用于反序列化。带s的版本操作内存中的字节对象,不带s的版本直接读写文件。

import pickle # 准备一个复杂的Python对象 data = { "name": "Python进阶笔记", "topics": ["序列化", "并发", "元编程"], "version": 3.12, "is_active": True, "metadata": {"author": "admin", "pages": 256} } # 序列化为字节流 bytes_data = pickle.dumps(data) print(f"序列化后大小: {len(bytes_data)} bytes") # 输出: 序列化后大小: 96 bytes # 从字节流恢复对象 restored = pickle.loads(bytes_data) print(restored == data) # 输出: True # 直接写入文件 with open("data.pkl", "wb") as f: pickle.dump(data, f) # 从文件读取 with open("data.pkl", "rb") as f: restored2 = pickle.load(f)

2.2 pickle协议版本

pickle协议经历了多次演进,从v0(文本格式)到v5(Python 3.8+)。理解协议版本对于跨版本兼容性和性能优化非常重要。

协议版本引入版本特点
v0Python 1.x文本格式,可读性好但效率低
v1Python 2.x二进制格式,效率提升
v2Python 2.3支持__getstate__/__setstate__协议
v3Python 3.0默认协议,对bytes类型优化
v4Python 3.4支持numpy等大型对象,提升效率
v5Python 3.8支持out-of-band数据,大数组零拷贝
import pickle # 查看当前Python版本最高的协议版本 print(pickle.HIGHEST_PROTOCOL) # 输出: 5 (Python 3.8+ 环境) # 查看默认协议版本 print(pickle.DEFAULT_PROTOCOL) # 输出: 5 或 4 (取决于Python版本) # 显式指定协议版本以获得最佳兼容性或性能 data = {"key": "value"} bytes_v2 = pickle.dumps(data, protocol=2) bytes_v5 = pickle.dumps(data, protocol=5) print(f"v2 大小: {len(bytes_v2)} bytes") print(f"v5 大小: {len(bytes_v5)} bytes")

兼容性建议:如果你需要将pickle数据在Python 2和3之间共享,建议使用protocol=2。如果仅在Python 3.8+环境中使用,protocol=5能提供最佳性能和最小的序列化体积。默认协议在Python 3.8+中就是v5,无需额外指定。

2.3 __getstate__ / __setstate__ 自定义序列化

当pickle序列化一个对象时,默认行为是保存该对象的__dict__属性字典。但很多场景下,我们需要更细粒度的控制——比如排除某些不可序列化的资源(数据库连接、文件句柄、线程锁等),或者对敏感字段进行加密。

通过在类中定义__getstate____setstate__方法,可以完全控制序列化和反序列化的行为。

import pickle import threading class DatabaseConnection: def __init__(self, host, port, db_name, password): self.host = host self.port = port self.db_name = db_name self.password = password # 敏感信息,序列化时需要处理 self.connection = self._connect() self._lock = threading.Lock() # 不可序列化的锁对象 def _connect(self): # 实际项目中这里会建立真实的数据库连接 return f"<connection to {self.host}:{self.port}/{self.db_name}>" def __getstate__(self): # 返回一个字典,pickle将使用此字典而非 __dict__ 进行序列化 state = self.__dict__.copy() # 移除不可序列化的对象 del state["connection"] del state["_lock"] # 对密码进行简单的脱敏处理 state["password"] = "***" return state def __setstate__(self, state): # 反序列化时重建对象状态 # state 就是 __getstate__ 返回的字典 self.__dict__.update(state) # 重新创建不可序列化的资源 self.connection = self._connect() self._lock = threading.Lock() # 如果密码被脱敏,提示需要重新设置 if self.password == "***": print("警告:反序列化后密码被脱敏,请调用 set_password() 重新设置") # 测试自定义序列化 conn = DatabaseConnection("localhost", 5432, "mydb", "secret123") bytes_data = pickle.dumps(conn) restored = pickle.loads(bytes_data) print(restored.password) # 输出: *** print(restored.connection) # 输出: <connection to localhost:5432/mydb>

2.4 __reduce__ 高级自定义

__reduce__是比__getstate__更底层的协议接口。它返回一个元组,定义如何重新创建对象。这在处理无法通过__dict__恢复的对象时非常有用,比如C扩展类型、单例模式对象等。

import pickle class Singleton: """单例模式 —— 演示 __reduce__ 的典型应用""" _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self, value=None): if not hasattr(self, "_initialized"): self.value = value self._initialized = True def __reduce__(self): # 返回 (可调用对象, 参数元组, 状态字典) # pickle 将调用 cls.__new__(cls, *args) 创建对象,然后用 __setstate__ 恢复状态 return ( __new__, # 用于创建对象的函数 (self.__class__,), # 传给 __new__ 的参数 self.__dict__ # 状态(会传给 __setstate__) ) # 更复杂的示例:处理需要清理资源的外部对象 class ModelCheckpoint: """深度学习模型检查点 —— 需要保存模型参数但排除优化器状态""" def __init__(self, model, optimizer, epoch): self.model = model self.optimizer = optimizer self.epoch = epoch def __reduce__(self): # 自定义序列化:只保存模型的state_dict,不保存整个对象 model_state = self.model.state_dict() if hasattr(self.model, "state_dict") else self.model return ( self._rebuild, (model_state, self.epoch) ) @staticmethod def _rebuild(model_state, epoch): # 重建时用更轻量的方式恢复 return {"model_state": model_state, "epoch": epoch}

理解__reduce__的返回值格式:(callable, args)形式最常用——pickle会调用callable(*args)来重建对象。如果返回(callable, args, state),则pickle在创建对象后会调用obj.__setstate__(state)恢复状态。callable通常可以是类本身、类的__new__方法,或者任何工厂函数。

2.5 安全限制与风险

pickle的安全性是其最大的短板。恶意的pickle数据可以在反序列化时执行任意代码,这是因为pickle本质上是一个简单的栈式虚拟机,可以构造字节码来执行任意Python函数。因此,永远不要对来自不可信来源的数据执行pickle.loads()

安全警告:以下代码展示pickle的反序列化漏洞原理,仅供学习参考,请勿用于攻击。

import pickle import os # ⚠️ 恶意构造的pickle数据示例(教育目的) class EvilPickle: """演示pickle反序列化漏洞:这个类可以在反序列化时执行系统命令""" def __reduce__(self): # 返回 (os.system, ("calc",)) # Windows 下会打开计算器 return (os.system, ("echo '漏洞演示:pickle执行了系统命令!'",)) # 构造恶意pickle字节流 malicious_data = pickle.dumps(EvilPickle()) # ❌ 危险!以下代码会执行恶意操作 # pickle.loads(malicious_data) # ✅ 安全实践:使用限制性反序列化 class SafeUnpickler(pickle.Unpickler): """白名单安全反序列化器""" ALLOWED_TYPES = {dict, list, tuple, str, int, float, bool, bytes, set, frozenset, type(None)} def find_class(self, module, name): # 仅允许从内置模块和白名单类中反序列化 if module == "builtins" and name in { "dict", "list", "tuple", "str", "int", "float", "bool", "bytes", "set", "frozenset", "NoneType"}: return getattr(__builtins__, name) raise pickle.UnpicklingError(f"禁止反序列化类型: {module}.{name}") # 安全地反序列化 with open("data.pkl", "rb") as f: safe_restored = SafeUnpickler(f).load() # 替代方案:使用 json 或 msgpack 传递跨进程数据 print("推荐:对于跨进程/跨服务通信,优先使用 JSON 或 msgpack 替代 pickle")

2.6 跨版本兼容性

pickle的跨版本兼容性是一个实际工程中经常遇到的挑战。不同Python版本间的pickle数据可能不兼容,尤其是在Python 2和Python 3之间。

import pickle import sys # 场景:在Python 3.12中保存的数据,需要在Python 3.6中恢复 data = {"name": "test", "value": 42} # 方案1:使用低版本协议保证兼容性 with open("compat_v2.pkl", "wb") as f: pickle.dump(data, f, protocol=2) # v2 协议在 Python 2.3+ 中均可读取 # 方案2:检查Python版本,选择最优兼容协议 def pickle_with_compat(obj, target_min_version="3.6"): version_info = tuple(map(int, target_min_version.split("."))) if version_info < (3, 8): protocol = 4 # Python 3.4+ 支持 v4 elif version_info < (3, 4): protocol = 3 # Python 3.0+ 支持 v3 else: protocol = 2 # 最广泛的兼容性 return pickle.dumps(obj, protocol=protocol) # 方案3:检测并转换协议版本(如已存在高版本数据) def ensure_compatible(pickle_bytes, target_protocol=2): """尝试用高版本加载,然后以低版本重新序列化""" try: obj = pickle.loads(pickle_bytes) return pickle.dumps(obj, protocol=target_protocol) except Exception as e: raise ValueError(f"无法转换pickle数据: {e}")

三、JSON进阶

JSON(JavaScript Object Notation)是最广泛使用的跨语言数据交换格式。Python的json模块提供了简洁的API,但默认只支持基本数据类型(dict、list、str、int、float、bool、None)。在实际开发中,我们几乎总是需要处理自定义对象、datetime、Decimal等类型,这就需要我们对JSON编码器/解码器进行自定义扩展。

3.1 default参数与自定义编码器

json.dumps()遇到无法序列化的类型时,会抛出TypeError。这时可以通过default参数提供一个转换函数,将这些类型转换为可序列化的基本类型。更优雅的方式是继承json.JSONEncoder,实现default方法。

import json from datetime import datetime, date from decimal import Decimal from enum import Enum import uuid # ---------- 方法1:使用 default 参数 ---------- def custom_serializer(obj): """自定义序列化函数,处理json模块默认不支持的类型""" if isinstance(obj, (datetime, date)): return obj.isoformat() if isinstance(obj, Decimal): return float(obj) if isinstance(obj, uuid.UUID): return str(obj) if isinstance(obj, set): return list(obj) if isinstance(obj, Enum): return obj.value raise TypeError(f"Type {type(obj)} not serializable") data = { "created_at": datetime.now(), "price": Decimal("19.99"), "user_id": uuid.uuid4(), "tags": {"python", "json", "serialization"} } json_str = json.dumps(data, default=custom_serializer, ensure_ascii=False, indent=2) print(json_str) # ---------- 方法2:继承 JSONEncoder(推荐) ---------- class AdvancedJSONEncoder(json.JSONEncoder): """更完备的自定义JSON编码器""" def default(self, obj): if isinstance(obj, (datetime, date)): return {"__type__": "datetime", "value": obj.isoformat()} if isinstance(obj, Decimal): return {"__type__": "decimal", "value": str(obj)} if isinstance(obj, bytes): return {"__type__": "bytes", "value": obj.hex()} if hasattr(obj, "to_dict"): # 支持自定义 to_dict 协议 return obj.to_dict() return super().default(obj) # 使用自定义编码器 json_str2 = json.dumps(data, cls=AdvancedJSONEncoder, ensure_ascii=False, indent=2) print(json_str2)

3.2 object_hook / object_pairs_hook 自定义解码器

与编码相反,解码时可以通过object_hookobject_pairs_hook自定义JSON对象到Python对象的映射逻辑。object_hook在解析每个JSON对象时被调用,object_pairs_hook则接收一个有序的键值对列表,适合需要保留字段顺序的场景。

import json from datetime import datetime from decimal import Decimal # 使用 object_hook 恢复编码器中标记的特殊类型 def custom_decoder(dct): """根据编码器中的 __type__ 标记恢复特殊类型""" if "__type__" in dct: type_name = dct["__type__"] value = dct["value"] if type_name == "datetime": return datetime.fromisoformat(value) if type_name == "decimal": return Decimal(value) if type_name == "bytes": return bytes.fromhex(value) return dct # 模拟前面编码器生成的JSON字符串 encoded = '{"created_at": {"__type__": "datetime", "value": "2026-05-05T12:00:00"}, "price": {"__type__": "decimal", "value": "19.99"}}' decoded = json.loads(encoded, object_hook=custom_decoder) print(decoded["created_at"]) # datetime 对象 print(type(decoded["created_at"])) # <class 'datetime.datetime'> print(decoded["price"]) # Decimal 对象 print(type(decoded["price"])) # <class 'decimal.Decimal'> # ---------- 使用 object_pairs_hook 保留字段顺序 ---------- from collections import OrderedDict def ordered_decoder(pairs): """保留JSON对象中字段的原始顺序""" result = OrderedDict() for key, value in pairs: result[key] = value return result json_str = '{"z": 1, "a": 2, "nested": {"b": 3, "a": 4}}' data = json.loads(json_str, object_pairs_hook=ordered_decoder) print(list(data.keys())) # ['z', 'a', 'nested'] — 保留原始顺序 # 对比:默认行为会丢失顺序 data_default = json.loads(json_str) print(list(data_default.keys())) # 顺序不保证(Python 3.7+ 实际保留了,但依赖此行为不安全)

3.3 cls_hook 参数

除了object_hookobject_pairs_hook之外,还可以通过继承json.JSONDecoder并覆写相关方法来实现更精细的解码控制。

import json from datetime import datetime class CustomJSONDecoder(json.JSONDecoder): """自定义JSON解码器,整合多种自定义解码逻辑""" def __init__(self, *args, **kwargs): super().__init__( *args, object_hook=self._object_hook, **kwargs ) @staticmethod def _object_hook(dct): # 自动识别ISO格式的日期时间字符串 for key, value in dct.items(): if isinstance(value, str) and len(value) > 18: try: # 尝试解析ISO格式的datetime字符串 dct[key] = datetime.fromisoformat(value) except (ValueError, TypeError): pass return dct # 使用自定义解码器 decoder = CustomJSONDecoder() data = decoder.decode('{"name": "test", "created_at": "2026-05-05T14:30:00"}') print(type(data["created_at"])) # <class 'datetime.datetime'>

3.4 JSONLines 格式

JSONLines(又称NDJSON)是一种以换行符分隔的JSON格式,每行都是一个独立的JSON对象。它在日志处理、流式数据、大数据ETL等场景中广泛使用。与JSON数组相比,JSONLines支持增量写入和流式读取,非常适合逐行处理海量数据。

import json # ---------- 写入 JSONLines 文件 ---------- records = [ {"id": 1, "name": "Alice", "score": 95.5}, {"id": 2, "name": "Bob", "score": 87.0}, {"id": 3, "name": "Charlie", "score": 92.3}, {"id": 4, "name": "Diana", "score": 78.8}, ] with open("data.jsonl", "w", encoding="utf-8") as f: for record in records: f.write(json.dumps(record, ensure_ascii=False) + "\n") # ---------- 流式读取 JSONLines ---------- # 方式1:逐行读取(适合大文件,内存友好) def read_jsonl(filepath): """生成器方式逐行读取JSONLines文件""" with open(filepath, "r", encoding="utf-8") as f: for line_no, line in enumerate(f, 1): line = line.strip() if not line: # 跳过空行 continue try: yield json.loads(line) except json.JSONDecodeError as e: print(f"解析错误 第{line_no}行: {e}") # 使用生成器流式读取 for item in read_jsonl("data.jsonl"): print(f"处理: {item['name']} - {item['score']}") # ---------- JSONLines 的优势场景 ---------- # 场景1:日志收集 import time import random def write_log_entry(log_file, level, message, **extra): """模拟写日志到JSONLines文件""" entry = { "timestamp": time.time(), "level": level, "message": message, **extra } with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") # 场景2:ETL 中的增量处理 # 传统的JSON数组需要先读入全部数据才能操作, # JSONLines 可以在文件末尾追加新数据,无需重写整个文件 # 场景3:结合 MapReduce / Hadoop 的分布式处理 # Hadoop 的默认输入格式按行分割,JSONLines 天然适合

JSONLines vs JSON数组:JSONLines格式的典型优势在于:1)支持增量追加(只需在文件末尾加一行,而JSON数组需重写整个[...]结构);2)流式处理(逐行读取,内存占用与文件大小无关);3)天然适配Hadoop、Spark等分布式计算框架的按行分割逻辑。其主要缺点是没有统一的元数据头,无法表达嵌套的多层级结构。

四、msgpack 二进制序列化

MessagePack(msgpack)是一种高效的二进制序列化格式,类似于JSON但更小更快。它用单字节标记替代了JSON中的花括号、方括号、冒号和逗号,使得序列化后的体积更小,解析速度更快。msgpack是一种跨语言格式,有超过50种语言的实现。

4.1 基本使用

Python中使用msgpack需要先安装第三方库:pip install msgpack。其核心API是pack()/packb()unpack()/unpackb()

import msgpack import json # 准备数据 data = { "name": "Python进阶学习", "version": 3.12, "topics": ["序列化", "并发", "异步"], "count": 1000000, "is_active": True, "metadata": {"author": "admin", "rating": 4.8} } # msgpack 序列化 packed = msgpack.packb(data) unpacked = msgpack.unpackb(packed) # JSON 序列化(用于对比) json_bytes = json.dumps(data, ensure_ascii=False).encode("utf-8") print(f"msgpack 大小: {len(packed)} bytes") print(f"JSON 大小: {len(json_bytes)} bytes") print(f"压缩比: {len(json_bytes) / len(packed):.2f}x") print(f"数据一致: {data == unpacked}") # 写入文件 with open("data.msgpack", "wb") as f: msgpack.pack(data, f) # 从文件读取 with open("data.msgpack", "rb") as f: restored = msgpack.unpack(f)

4.2 msgpack vs pickle vs json 对比

特性picklejsonmsgpack
格式二进制文本二进制
跨语言仅Python几乎全语言50+语言
序列化速度中等
反序列化速度中等很快
体积很小
安全性极不安全安全安全
可读性不可读可读不可读
支持的类型几乎所有Python类型基本类型+自定义扩展基本类型+扩展类型
适用场景本地缓存、模型保存Web API、跨语言通信高性能RPC、缓存、IoT
import msgpack import pickle import json import timeit # 性能对比测试 test_data = {f"key_{i}": "value_" * 10 for i in range(1000)} # 序列化性能测试 pickle_time = timeit.timeit("pickle.dumps(test_data)", globals=locals(), number=1000) json_time = timeit.timeit( "json.dumps(test_data, ensure_ascii=False).encode('utf-8')", globals=locals(), number=1000 ) msgpack_time = timeit.timeit("msgpack.packb(test_data)", globals=locals(), number=1000) print("===== 序列化性能 (1000次, 秒) =====") print(f"pickle: {pickle_time:.4f}s") print(f"json: {json_time:.4f}s") print(f"msgpack: {msgpack_time:.4f}s") # 体积对比 pickle_bytes = pickle.dumps(test_data) json_bytes = json.dumps(test_data, ensure_ascii=False).encode("utf-8") msgpack_bytes = msgpack.packb(test_data) print("\n===== 序列化体积 =====") print(f"pickle: {len(pickle_bytes)} bytes") print(f"json: {len(json_bytes)} bytes") print(f"msgpack: {len(msgpack_bytes)} bytes")

选型建议:如果你的服务是纯Python内部使用,pickle的方便性无与伦比(但要注意安全);如果涉及跨语言通信或对外API,JSON是最稳妥的选择;如果对性能和带宽有较高要求(如微服务间RPC、移动端数据传输、实时游戏通信),msgpack通常是比JSON更好的选择。

4.3 msgpack 扩展类型

msgpack支持通过注册扩展类型(Extension Types)来实现自定义类型的序列化,类似于JSON的default参数但更高效。

import msgpack from datetime import datetime, timezone import numpy as np # 注册自定义扩展类型 # 方式1:使用 default 和 ext_hook 参数 def encode_datetime(obj): if isinstance(obj, datetime): timestamp = obj.timestamp() return msgpack.ExtType(42, int(timestamp).to_bytes(8, "big")) raise TypeError(f"Unknown type: {type(obj)}") def decode_datetime(code, data): if code == 42: timestamp = int.from_bytes(data, "big") return datetime.fromtimestamp(timestamp, tz=timezone.utc) return msgpack.ExtType(code, data) data = { "event": "系统启动", "time": datetime.now(timezone.utc) } packed = msgpack.packb(data, default=encode_datetime) unpacked = msgpack.unpackb(packed, ext_hook=decode_datetime) print(unpacked["time"]) # 输出: 2026-05-05 12:00:00+00:00 (datetime 对象) # 方式2:使用 msgpack 的 Timestamp 类型(v1.0+) packed_ts = msgpack.packb(data, datetime=True) unpacked_ts = msgpack.unpackb(packed_ts, datetime=True) print(unpacked_ts["time"]) # 自动识别为 datetime

五、序列化方案对比与选型

在实际项目中选择合适的序列化方案需要综合考量多个因素。下面从更广泛的视角对比主流序列化格式,包括Protobuf、Thrift和Avro等重量级方案。

5.1 主流方案全景对比

方案IDL定义Schema演进序列化速度反序列化速度体积学习成本
pickle不支持中等
json手动兼容中等
msgpack手动兼容很小
Protobuf.proto文件原生支持很快很快极小
Thrift.thrift文件原生支持很快很快极小
Avro.avsc文件原生支持

5.2 Protobuf 初步体验

Protocol Buffers(Protobuf)是Google开发的序列化框架,需要先定义.proto文件来描述数据结构,然后通过编译器生成对应语言的代码。它与msgpack相比虽然笨重一些,但在Schema演进和跨语言兼容性方面更加成熟。

# 首先安装: pip install protobuf # 定义一个简单的 proto 消息格式 (user.proto) """ syntax = "proto3"; message User { string name = 1; int32 age = 2; repeated string tags = 3; } """ from google.protobuf import json_format # 假设我们已经编译了 user_pb2 # import user_pb2 # 使用 protobuf # user = user_pb2.User() # user.name = "Alice" # user.age = 30 # user.tags.extend(["python", "protobuf"]) # serialized = user.SerializeToString() # print(f"Protobuf 大小: {len(serialized)} bytes") # 对比 msgpack (无 IDL 的轻量方案) import msgpack user_data = {"name": "Alice", "age": 30, "tags": ["python", "protobuf"]} msgpack_bytes = msgpack.packb(user_data) print(f"msgpack 大小: {len(msgpack_bytes)} bytes")

当选用Protobuf:1)团队规模大,需要严格的Schema管理和版本演进;2)需要为多种语言(Java/Go/C++/Python)生成代码;3)性能敏感且需要极小的序列化体积。如果是小型项目或快速原型开发,msgpack或JSON的灵活性更高。

5.3 选型决策树

以下决策指南可以帮助你在实际项目中快速选择合适的序列化方案:

六、大数据序列化性能优化

当处理大规模数据(GB级别以上)时,序列化性能成为系统瓶颈。以下策略可以帮助你在大数据场景中优化序列化性能。

6.1 批量序列化与流式处理

避免一次性将全部数据加载到内存中,采用分块序列化或流式写入。

import json import time # ❌ 低效方式:一次性加载全部数据 def serialize_all_at_once(records, filepath): # 全部数据在内存中构建成一个大列表,可能导致OOM with open(filepath, "w") as f: json.dump(records, f) # ✅ 高效方式:流式JSONLines写入 def serialize_streaming(records, filepath): """流式写入JSONLines,内存占用恒定""" with open(filepath, "w", encoding="utf-8", buffering=65536) as f: for record in records: f.write(json.dumps(record, ensure_ascii=False) + "\n") # ✅ msgpack 流式写入(适合高性能场景) import msgpack def serialize_msgpack_stream(records, filepath): with open(filepath, "wb", buffering=65536) as f: for record in records: f.write(msgpack.packb(record))

6.2 压缩策略

序列化后的数据可以进一步压缩,在CPU和IO之间做权衡。

import pickle import gzip import lzma import json import time import msgpack # 测试数据:10万条记录 test_records = [{"id": i, "name": f"user_{i}", "scores": [float(j) for j in range(10)]} for i in range(100000)] def serialize_with_compression(data, serializer, compressor): """序列化+压缩测试""" start = time.time() if serializer == "pickle": serialized = pickle.dumps(data, protocol=5) elif serializer == "json": serialized = json.dumps(data, ensure_ascii=False).encode("utf-8") elif serializer == "msgpack": serialized = msgpack.packb(data) else: raise ValueError("Unknown serializer") if compressor == "gzip": compressed = gzip.compress(serialized) elif compressor == "lzma": compressed = lzma.compress(serialized) else: compressed = serialized elapsed = time.time() - start return compressed, elapsed, len(compressed) # 对比不同组合 for ser in ["pickle", "json", "msgpack"]: for comp in [None, "gzip", "lzma"]: if comp is None: comp_name = "无压缩" else: comp_name = comp try: result, elapsed, size = serialize_with_compression(test_records, ser, comp) print(f"{ser:8s} + {comp_name:6s}: {size:10,d} bytes, {elapsed:.3f}s") except Exception as e: print(f"{ser:8s} + {comp_name:6s}: 失败 - {e}")

6.3 零拷贝与内存视图优化

对于大数据量的场景,使用pickle v5的out-of-band(OOB)数据和memoryview可以实现零拷贝序列化,避免不必要的数据复制。

import pickle import numpy as np # 使用 pickle v5 的 out-of-band 数据(零拷贝) # 适用于需要独立处理大数组的场景(如NumPy数组、CUDA张量) large_array = np.random.rand(1000, 1000) # 方式1:常规方式(会将数组拷贝到pickle流中) normal_bytes = pickle.dumps(large_array, protocol=5) print(f"常规方式: {len(normal_bytes)} bytes") # 方式2:使用 pickle.Buffer 实现零拷贝(需要Python 3.8+) buffers = [] class NumpySerializer: """为NumPy数组提供零拷贝序列化的包装器""" def __init__(self, array): self.array = array def __reduce_ex__(self, protocol): # 利用 pickle v5 的 buffer_callback 机制 if protocol >= 5: return ( _reconstruct_numpy, (self.array.dtype, self.array.shape), self.array.tobytes() ) else: return (_reconstruct_numpy, (self.array.dtype, self.array.shape, self.array.tobytes())) def _reconstruct_numpy(dtype, shape, data=None): if data is None: # 返回需要填充buffer的占位对象 return _Placeholder(dtype, shape) return np.frombuffer(data, dtype=dtype).reshape(shape) class _Placeholder: def __init__(self, dtype, shape): self.dtype = dtype self.shape = shape self.buffer = None def __setstate__(self, state): self.buffer = state def finalize(self): return np.frombuffer(self.buffer, dtype=self.dtype).reshape(self.shape)

6.4 大数据序列化最佳实践

七、核心要点总结

1. pickle:Python内置、支持任意对象、v5协议性能最优。核心方法:__getstate__排除不可序列化资源,__setstate__重建资源,__reduce__底层控制序列化流程。安全警告:绝不反序列化不可信数据,建议继承pickle.Unpickler实现白名单机制。

2. JSON:跨语言标准、人类可读。进阶技巧:default参数或继承JSONEncoder处理自定义类型;object_hook/object_pairs_hook恢复特殊类型;JSONLines格式适用于流式日志和大数据ETL。

3. msgpack:二进制紧凑格式,性能介于pickle和JSON之间。适合高性能RPC、缓存、IoT等场景。支持扩展类型注册,比JSON更小的体积和更快的解析速度。

4. 选型原则:内部用pickle,跨语言用JSON,高性能用msgpack,企业级系统用Protobuf。

5. 性能优化:流式处理避免OOM,压缩在CPU/IO间权衡,pickle v5零拷贝优化大数组序列化。

序列化的本质是一种「表示转换」——将内存中的对象图转换为一维的字节序列。优秀的序列化方案应当在编码效率、解析速度、空间占用、类型保真度和安全性之间取得平衡。没有银弹,只有最适合当前场景的选择。