← 返回Python进阶编程目录
← 返回学习笔记首页
专题: Python进阶编程系统学习
关键词: Python, 序列化, pickle, json, msgpack, __getstate__, __reduce__, JSONLines
一、概述
序列化(Serialization) 是指将内存中的数据结构或对象状态转换为可存储或可传输的格式(如字节流、JSON字符串)的过程。反序列化(Deserialization) 则是其逆过程,将存储格式还原为内存中的数据结构。这两个过程构成了数据持久化和网络通信的基础设施。
Python作为一门广泛应用于数据处理、后端开发和机器学习领域的语言,提供了多种序列化方案,每种方案都有其独特的适用场景和性能特征。理解并正确选择序列化方案,对构建高效、安全的Python应用至关重要。
本文将从实践角度出发,系统地介绍Python中主流的序列化技术——pickle、json、msgpack,并深入探讨它们的进阶用法、性能对比和安全注意事项。我们还将讨论在大数据场景下的序列化优化策略,帮助你在实际项目中做出最优选择。
序列化的核心价值
数据持久化 :将程序状态保存到磁盘,支持断点续传和缓存
进程间通信 :在多进程或多机间传递结构化数据
远程调用(RPC) :将方法调用的参数和返回值在网络中传输
缓存系统 :将计算结果序列化后存入Redis/Memcached等缓存系统
深度学习模型保存 :保存训练好的模型权重和结构(PyTorch的.pt文件本质就是pickle格式)
二、pickle模块详解
pickle是Python内置的序列化模块,它实现了Python对象与字节流之间的相互转换。与其他序列化方案不同,pickle是Python专用的协议,能够序列化几乎任意Python对象,包括自定义类实例、嵌套数据结构、函数引用等。这是其最大的优势,也是最需要谨慎使用的特性。
2.1 基本使用
pickle提供了两个核心函数——dump()/dumps()用于序列化,load()/loads()用于反序列化。带s的版本操作内存中的字节对象,不带s的版本直接读写文件。
import pickle
# 准备一个复杂的Python对象
data = {
"name" : "Python进阶笔记" ,
"topics" : ["序列化" , "并发" , "元编程" ],
"version" : 3.12 ,
"is_active" : True ,
"metadata" : {"author" : "admin" , "pages" : 256 }
}
# 序列化为字节流
bytes_data = pickle.dumps(data)
print (f"序列化后大小: {len(bytes_data)} bytes" )
# 输出: 序列化后大小: 96 bytes
# 从字节流恢复对象
restored = pickle.loads(bytes_data)
print (restored == data)
# 输出: True
# 直接写入文件
with open ("data.pkl" , "wb" ) as f:
pickle.dump(data, f)
# 从文件读取
with open ("data.pkl" , "rb" ) as f:
restored2 = pickle.load(f)
2.2 pickle协议版本
pickle协议经历了多次演进,从v0(文本格式)到v5(Python 3.8+)。理解协议版本对于跨版本兼容性和性能优化非常重要。
协议版本 引入版本 特点
v0 Python 1.x 文本格式,可读性好但效率低
v1 Python 2.x 二进制格式,效率提升
v2 Python 2.3 支持__getstate__/__setstate__协议
v3 Python 3.0 默认协议,对bytes类型优化
v4 Python 3.4 支持numpy等大型对象,提升效率
v5 Python 3.8 支持out-of-band数据,大数组零拷贝
import pickle
# 查看当前Python版本最高的协议版本
print (pickle.HIGHEST_PROTOCOL)
# 输出: 5 (Python 3.8+ 环境)
# 查看默认协议版本
print (pickle.DEFAULT_PROTOCOL)
# 输出: 5 或 4 (取决于Python版本)
# 显式指定协议版本以获得最佳兼容性或性能
data = {"key" : "value" }
bytes_v2 = pickle.dumps(data, protocol=2 )
bytes_v5 = pickle.dumps(data, protocol=5 )
print (f"v2 大小: {len(bytes_v2)} bytes" )
print (f"v5 大小: {len(bytes_v5)} bytes" )
兼容性建议: 如果你需要将pickle数据在Python 2和3之间共享,建议使用protocol=2。如果仅在Python 3.8+环境中使用,protocol=5能提供最佳性能和最小的序列化体积。默认协议在Python 3.8+中就是v5,无需额外指定。
2.3 __getstate__ / __setstate__ 自定义序列化
当pickle序列化一个对象时,默认行为是保存该对象的__dict__属性字典。但很多场景下,我们需要更细粒度的控制——比如排除某些不可序列化的资源(数据库连接、文件句柄、线程锁等),或者对敏感字段进行加密。
通过在类中定义__getstate__和__setstate__方法,可以完全控制序列化和反序列化的行为。
import pickle
import threading
class DatabaseConnection :
def __init__ (self , host, port, db_name, password):
self .host = host
self .port = port
self .db_name = db_name
self .password = password # 敏感信息,序列化时需要处理
self .connection = self ._connect()
self ._lock = threading.Lock() # 不可序列化的锁对象
def _connect (self ):
# 实际项目中这里会建立真实的数据库连接
return f"<connection to {self.host}:{self.port}/{self.db_name}>"
def __getstate__ (self ):
# 返回一个字典,pickle将使用此字典而非 __dict__ 进行序列化
state = self .__dict__.copy()
# 移除不可序列化的对象
del state["connection" ]
del state["_lock" ]
# 对密码进行简单的脱敏处理
state["password" ] = "***"
return state
def __setstate__ (self , state):
# 反序列化时重建对象状态
# state 就是 __getstate__ 返回的字典
self .__dict__.update(state)
# 重新创建不可序列化的资源
self .connection = self ._connect()
self ._lock = threading.Lock()
# 如果密码被脱敏,提示需要重新设置
if self .password == "***" :
print ("警告:反序列化后密码被脱敏,请调用 set_password() 重新设置" )
# 测试自定义序列化
conn = DatabaseConnection("localhost" , 5432 , "mydb" , "secret123" )
bytes_data = pickle.dumps(conn)
restored = pickle.loads(bytes_data)
print (restored.password) # 输出: ***
print (restored.connection)
# 输出: <connection to localhost:5432/mydb>
2.4 __reduce__ 高级自定义
__reduce__是比__getstate__更底层的协议接口。它返回一个元组,定义如何重新创建对象。这在处理无法通过__dict__恢复的对象时非常有用,比如C扩展类型、单例模式对象等。
import pickle
class Singleton :
"""单例模式 —— 演示 __reduce__ 的典型应用"""
_instance = None
def __new__ (cls, *args, **kwargs):
if cls._instance is None :
cls._instance = super ().__new__(cls)
return cls._instance
def __init__ (self , value=None ):
if not hasattr (self , "_initialized" ):
self .value = value
self ._initialized = True
def __reduce__ (self ):
# 返回 (可调用对象, 参数元组, 状态字典)
# pickle 将调用 cls.__new__(cls, *args) 创建对象,然后用 __setstate__ 恢复状态
return (
__new__ , # 用于创建对象的函数
(self .__class__,), # 传给 __new__ 的参数
self .__dict__ # 状态(会传给 __setstate__)
)
# 更复杂的示例:处理需要清理资源的外部对象
class ModelCheckpoint :
"""深度学习模型检查点 —— 需要保存模型参数但排除优化器状态"""
def __init__ (self , model, optimizer, epoch):
self .model = model
self .optimizer = optimizer
self .epoch = epoch
def __reduce__ (self ):
# 自定义序列化:只保存模型的state_dict,不保存整个对象
model_state = self .model.state_dict() if hasattr (self .model, "state_dict" ) else self .model
return (
self ._rebuild,
(model_state, self .epoch)
)
@staticmethod
def _rebuild (model_state, epoch):
# 重建时用更轻量的方式恢复
return {"model_state" : model_state, "epoch" : epoch}
理解__reduce__的返回值格式: (callable, args)形式最常用——pickle会调用callable(*args)来重建对象。如果返回(callable, args, state),则pickle在创建对象后会调用obj.__setstate__(state)恢复状态。callable通常可以是类本身、类的__new__方法,或者任何工厂函数。
2.5 安全限制与风险
pickle的安全性是其最大的短板。恶意的pickle数据可以在反序列化时执行任意代码,这是因为pickle本质上是一个简单的栈式虚拟机,可以构造字节码来执行任意Python函数。因此,永远不要对来自不可信来源的数据执行pickle.loads() 。
安全警告: 以下代码展示pickle的反序列化漏洞原理,仅供学习参考,请勿用于攻击。
import pickle
import os
# ⚠️ 恶意构造的pickle数据示例(教育目的)
class EvilPickle :
"""演示pickle反序列化漏洞:这个类可以在反序列化时执行系统命令"""
def __reduce__ (self ):
# 返回 (os.system, ("calc",)) # Windows 下会打开计算器
return (os.system, ("echo '漏洞演示:pickle执行了系统命令!'" ,))
# 构造恶意pickle字节流
malicious_data = pickle.dumps(EvilPickle())
# ❌ 危险!以下代码会执行恶意操作
# pickle.loads(malicious_data)
# ✅ 安全实践:使用限制性反序列化
class SafeUnpickler (pickle.Unpickler):
"""白名单安全反序列化器"""
ALLOWED_TYPES = {dict, list, tuple, str, int, float, bool, bytes, set, frozenset, type (None )}
def find_class (self , module, name):
# 仅允许从内置模块和白名单类中反序列化
if module == "builtins" and name in {
"dict" , "list" , "tuple" , "str" ,
"int" , "float" , "bool" , "bytes" ,
"set" , "frozenset" , "NoneType" }:
return getattr (__builtins__, name)
raise pickle.UnpicklingError(f"禁止反序列化类型: {module}.{name}" )
# 安全地反序列化
with open ("data.pkl" , "rb" ) as f:
safe_restored = SafeUnpickler(f).load()
# 替代方案:使用 json 或 msgpack 传递跨进程数据
print ("推荐:对于跨进程/跨服务通信,优先使用 JSON 或 msgpack 替代 pickle" )
2.6 跨版本兼容性
pickle的跨版本兼容性是一个实际工程中经常遇到的挑战。不同Python版本间的pickle数据可能不兼容,尤其是在Python 2和Python 3之间。
import pickle
import sys
# 场景:在Python 3.12中保存的数据,需要在Python 3.6中恢复
data = {"name" : "test" , "value" : 42 }
# 方案1:使用低版本协议保证兼容性
with open ("compat_v2.pkl" , "wb" ) as f:
pickle.dump(data, f, protocol=2 ) # v2 协议在 Python 2.3+ 中均可读取
# 方案2:检查Python版本,选择最优兼容协议
def pickle_with_compat (obj, target_min_version="3.6" ):
version_info = tuple(map (int , target_min_version.split("." )))
if version_info < (3 , 8 ):
protocol = 4 # Python 3.4+ 支持 v4
elif version_info < (3 , 4 ):
protocol = 3 # Python 3.0+ 支持 v3
else :
protocol = 2 # 最广泛的兼容性
return pickle.dumps(obj, protocol=protocol)
# 方案3:检测并转换协议版本(如已存在高版本数据)
def ensure_compatible (pickle_bytes, target_protocol=2 ):
"""尝试用高版本加载,然后以低版本重新序列化"""
try :
obj = pickle.loads(pickle_bytes)
return pickle.dumps(obj, protocol=target_protocol)
except Exception as e:
raise ValueError (f"无法转换pickle数据: {e}" )
三、JSON进阶
JSON(JavaScript Object Notation)是最广泛使用的跨语言数据交换格式。Python的json模块提供了简洁的API,但默认只支持基本数据类型(dict、list、str、int、float、bool、None)。在实际开发中,我们几乎总是需要处理自定义对象、datetime、Decimal等类型,这就需要我们对JSON编码器/解码器进行自定义扩展。
3.1 default参数与自定义编码器
当json.dumps()遇到无法序列化的类型时,会抛出TypeError。这时可以通过default参数提供一个转换函数,将这些类型转换为可序列化的基本类型。更优雅的方式是继承json.JSONEncoder,实现default方法。
import json
from datetime import datetime, date
from decimal import Decimal
from enum import Enum
import uuid
# ---------- 方法1:使用 default 参数 ----------
def custom_serializer (obj):
"""自定义序列化函数,处理json模块默认不支持的类型"""
if isinstance (obj, (datetime, date)):
return obj.isoformat()
if isinstance (obj, Decimal):
return float (obj)
if isinstance (obj, uuid.UUID):
return str (obj)
if isinstance (obj, set ):
return list (obj)
if isinstance (obj, Enum):
return obj.value
raise TypeError (f"Type {type(obj)} not serializable" )
data = {
"created_at" : datetime.now(),
"price" : Decimal("19.99" ),
"user_id" : uuid.uuid4(),
"tags" : {"python" , "json" , "serialization" }
}
json_str = json.dumps(data, default=custom_serializer, ensure_ascii=False , indent=2 )
print (json_str)
# ---------- 方法2:继承 JSONEncoder(推荐) ----------
class AdvancedJSONEncoder (json.JSONEncoder):
"""更完备的自定义JSON编码器"""
def default (self , obj):
if isinstance (obj, (datetime, date)):
return {"__type__" : "datetime" , "value" : obj.isoformat()}
if isinstance (obj, Decimal):
return {"__type__" : "decimal" , "value" : str (obj)}
if isinstance (obj, bytes ):
return {"__type__" : "bytes" , "value" : obj.hex()}
if hasattr (obj, "to_dict" ): # 支持自定义 to_dict 协议
return obj.to_dict()
return super ().default(obj)
# 使用自定义编码器
json_str2 = json.dumps(data, cls=AdvancedJSONEncoder, ensure_ascii=False , indent=2 )
print (json_str2)
3.2 object_hook / object_pairs_hook 自定义解码器
与编码相反,解码时可以通过object_hook或object_pairs_hook自定义JSON对象到Python对象的映射逻辑。object_hook在解析每个JSON对象时被调用,object_pairs_hook则接收一个有序的键值对列表,适合需要保留字段顺序的场景。
import json
from datetime import datetime
from decimal import Decimal
# 使用 object_hook 恢复编码器中标记的特殊类型
def custom_decoder (dct):
"""根据编码器中的 __type__ 标记恢复特殊类型"""
if "__type__" in dct:
type_name = dct["__type__" ]
value = dct["value" ]
if type_name == "datetime" :
return datetime.fromisoformat(value)
if type_name == "decimal" :
return Decimal(value)
if type_name == "bytes" :
return bytes .fromhex(value)
return dct
# 模拟前面编码器生成的JSON字符串
encoded = '{"created_at": {"__type__": "datetime", "value": "2026-05-05T12:00:00"}, "price": {"__type__": "decimal", "value": "19.99"}}'
decoded = json.loads(encoded, object_hook=custom_decoder)
print (decoded["created_at" ]) # datetime 对象
print (type (decoded["created_at" ])) # <class 'datetime.datetime'>
print (decoded["price" ]) # Decimal 对象
print (type (decoded["price" ])) # <class 'decimal.Decimal'>
# ---------- 使用 object_pairs_hook 保留字段顺序 ----------
from collections import OrderedDict
def ordered_decoder (pairs):
"""保留JSON对象中字段的原始顺序"""
result = OrderedDict()
for key, value in pairs:
result[key] = value
return result
json_str = '{"z": 1, "a": 2, "nested": {"b": 3, "a": 4}}'
data = json.loads(json_str, object_pairs_hook=ordered_decoder)
print (list (data.keys())) # ['z', 'a', 'nested'] — 保留原始顺序
# 对比:默认行为会丢失顺序
data_default = json.loads(json_str)
print (list (data_default.keys())) # 顺序不保证(Python 3.7+ 实际保留了,但依赖此行为不安全)
3.3 cls_hook 参数
除了object_hook和object_pairs_hook之外,还可以通过继承json.JSONDecoder并覆写相关方法来实现更精细的解码控制。
import json
from datetime import datetime
class CustomJSONDecoder (json.JSONDecoder):
"""自定义JSON解码器,整合多种自定义解码逻辑"""
def __init__ (self , *args, **kwargs):
super ().__init__(
*args,
object_hook=self ._object_hook,
**kwargs
)
@staticmethod
def _object_hook (dct):
# 自动识别ISO格式的日期时间字符串
for key, value in dct.items():
if isinstance (value, str ) and len (value) > 18 :
try :
# 尝试解析ISO格式的datetime字符串
dct[key] = datetime.fromisoformat(value)
except (ValueError , TypeError ):
pass
return dct
# 使用自定义解码器
decoder = CustomJSONDecoder()
data = decoder.decode('{"name": "test", "created_at": "2026-05-05T14:30:00"}' )
print (type (data["created_at" ])) # <class 'datetime.datetime'>
3.4 JSONLines 格式
JSONLines(又称NDJSON)是一种以换行符分隔的JSON格式,每行都是一个独立的JSON对象。它在日志处理、流式数据、大数据ETL等场景中广泛使用。与JSON数组相比,JSONLines支持增量写入和流式读取,非常适合逐行处理海量数据。
import json
# ---------- 写入 JSONLines 文件 ----------
records = [
{"id" : 1 , "name" : "Alice" , "score" : 95.5 },
{"id" : 2 , "name" : "Bob" , "score" : 87.0 },
{"id" : 3 , "name" : "Charlie" , "score" : 92.3 },
{"id" : 4 , "name" : "Diana" , "score" : 78.8 },
]
with open ("data.jsonl" , "w" , encoding="utf-8" ) as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False ) + "\n" )
# ---------- 流式读取 JSONLines ----------
# 方式1:逐行读取(适合大文件,内存友好)
def read_jsonl (filepath):
"""生成器方式逐行读取JSONLines文件"""
with open (filepath, "r" , encoding="utf-8" ) as f:
for line_no, line in enumerate (f, 1 ):
line = line.strip()
if not line: # 跳过空行
continue
try :
yield json.loads(line)
except json.JSONDecodeError as e:
print (f"解析错误 第{line_no}行: {e}" )
# 使用生成器流式读取
for item in read_jsonl ("data.jsonl" ):
print (f"处理: {item['name']} - {item['score']}" )
# ---------- JSONLines 的优势场景 ----------
# 场景1:日志收集
import time
import random
def write_log_entry (log_file, level, message, **extra):
"""模拟写日志到JSONLines文件"""
entry = {
"timestamp" : time.time(),
"level" : level,
"message" : message,
**extra
}
with open (log_file, "a" , encoding="utf-8" ) as f:
f.write(json.dumps(entry, ensure_ascii=False ) + "\n" )
# 场景2:ETL 中的增量处理
# 传统的JSON数组需要先读入全部数据才能操作,
# JSONLines 可以在文件末尾追加新数据,无需重写整个文件
# 场景3:结合 MapReduce / Hadoop 的分布式处理
# Hadoop 的默认输入格式按行分割,JSONLines 天然适合
JSONLines vs JSON数组: JSONLines格式的典型优势在于:1)支持增量追加(只需在文件末尾加一行,而JSON数组需重写整个[...]结构);2)流式处理(逐行读取,内存占用与文件大小无关);3)天然适配Hadoop、Spark等分布式计算框架的按行分割逻辑。其主要缺点是没有统一的元数据头,无法表达嵌套的多层级结构。
四、msgpack 二进制序列化
MessagePack(msgpack)是一种高效的二进制序列化格式,类似于JSON但更小更快。它用单字节标记替代了JSON中的花括号、方括号、冒号和逗号,使得序列化后的体积更小,解析速度更快。msgpack是一种跨语言格式,有超过50种语言的实现。
4.1 基本使用
Python中使用msgpack需要先安装第三方库:pip install msgpack。其核心API是pack()/packb()和unpack()/unpackb()。
import msgpack
import json
# 准备数据
data = {
"name" : "Python进阶学习" ,
"version" : 3.12 ,
"topics" : ["序列化" , "并发" , "异步" ],
"count" : 1000000 ,
"is_active" : True ,
"metadata" : {"author" : "admin" , "rating" : 4.8 }
}
# msgpack 序列化
packed = msgpack.packb(data)
unpacked = msgpack.unpackb(packed)
# JSON 序列化(用于对比)
json_bytes = json.dumps(data, ensure_ascii=False ).encode("utf-8" )
print (f"msgpack 大小: {len(packed)} bytes" )
print (f"JSON 大小: {len(json_bytes)} bytes" )
print (f"压缩比: {len(json_bytes) / len(packed):.2f}x" )
print (f"数据一致: {data == unpacked}" )
# 写入文件
with open ("data.msgpack" , "wb" ) as f:
msgpack.pack(data, f)
# 从文件读取
with open ("data.msgpack" , "rb" ) as f:
restored = msgpack.unpack(f)
4.2 msgpack vs pickle vs json 对比
特性 pickle json msgpack
格式 二进制 文本 二进制
跨语言 仅Python 几乎全语言 50+语言
序列化速度 中等 慢 快
反序列化速度 快 中等 很快
体积 小 大 很小
安全性 极不安全 安全 安全
可读性 不可读 可读 不可读
支持的类型 几乎所有Python类型 基本类型+自定义扩展 基本类型+扩展类型
适用场景 本地缓存、模型保存 Web API、跨语言通信 高性能RPC、缓存、IoT
import msgpack
import pickle
import json
import timeit
# 性能对比测试
test_data = {f"key_{i}" : "value_" * 10 for i in range (1000 )}
# 序列化性能测试
pickle_time = timeit.timeit("pickle.dumps(test_data)" , globals=locals (), number=1000 )
json_time = timeit.timeit(
"json.dumps(test_data, ensure_ascii=False).encode('utf-8')" ,
globals=locals (), number=1000
)
msgpack_time = timeit.timeit("msgpack.packb(test_data)" , globals=locals (), number=1000 )
print ("===== 序列化性能 (1000次, 秒) =====" )
print (f"pickle: {pickle_time:.4f}s" )
print (f"json: {json_time:.4f}s" )
print (f"msgpack: {msgpack_time:.4f}s" )
# 体积对比
pickle_bytes = pickle.dumps(test_data)
json_bytes = json.dumps(test_data, ensure_ascii=False ).encode("utf-8" )
msgpack_bytes = msgpack.packb(test_data)
print ("\n===== 序列化体积 =====" )
print (f"pickle: {len(pickle_bytes)} bytes" )
print (f"json: {len(json_bytes)} bytes" )
print (f"msgpack: {len(msgpack_bytes)} bytes" )
选型建议: 如果你的服务是纯Python内部使用,pickle的方便性无与伦比(但要注意安全);如果涉及跨语言通信或对外API,JSON是最稳妥的选择;如果对性能和带宽有较高要求(如微服务间RPC、移动端数据传输、实时游戏通信),msgpack通常是比JSON更好的选择。
4.3 msgpack 扩展类型
msgpack支持通过注册扩展类型(Extension Types)来实现自定义类型的序列化,类似于JSON的default参数但更高效。
import msgpack
from datetime import datetime, timezone
import numpy as np
# 注册自定义扩展类型
# 方式1:使用 default 和 ext_hook 参数
def encode_datetime (obj):
if isinstance (obj, datetime):
timestamp = obj.timestamp()
return msgpack.ExtType(42 , int (timestamp).to_bytes(8 , "big" ))
raise TypeError (f"Unknown type: {type(obj)}" )
def decode_datetime (code, data):
if code == 42 :
timestamp = int .from_bytes(data, "big" )
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
return msgpack.ExtType(code, data)
data = {
"event" : "系统启动" ,
"time" : datetime.now(timezone.utc)
}
packed = msgpack.packb(data, default=encode_datetime)
unpacked = msgpack.unpackb(packed, ext_hook=decode_datetime)
print (unpacked["time" ])
# 输出: 2026-05-05 12:00:00+00:00 (datetime 对象)
# 方式2:使用 msgpack 的 Timestamp 类型(v1.0+)
packed_ts = msgpack.packb(data, datetime=True )
unpacked_ts = msgpack.unpackb(packed_ts, datetime=True )
print (unpacked_ts["time" ]) # 自动识别为 datetime
五、序列化方案对比与选型
在实际项目中选择合适的序列化方案需要综合考量多个因素。下面从更广泛的视角对比主流序列化格式,包括Protobuf、Thrift和Avro等重量级方案。
5.1 主流方案全景对比
方案 IDL定义 Schema演进 序列化速度 反序列化速度 体积 学习成本
pickle 无 不支持 中等 快 小 低
json 无 手动兼容 慢 中等 大 低
msgpack 无 手动兼容 快 快 很小 低
Protobuf .proto文件 原生支持 很快 很快 极小 高
Thrift .thrift文件 原生支持 很快 很快 极小 高
Avro .avsc文件 原生支持 快 快 小 中
5.2 Protobuf 初步体验
Protocol Buffers(Protobuf)是Google开发的序列化框架,需要先定义.proto文件来描述数据结构,然后通过编译器生成对应语言的代码。它与msgpack相比虽然笨重一些,但在Schema演进和跨语言兼容性方面更加成熟。
# 首先安装: pip install protobuf
# 定义一个简单的 proto 消息格式 (user.proto)
"""
syntax = "proto3";
message User {
string name = 1;
int32 age = 2;
repeated string tags = 3;
}
"""
from google.protobuf import json_format
# 假设我们已经编译了 user_pb2
# import user_pb2
# 使用 protobuf
# user = user_pb2.User()
# user.name = "Alice"
# user.age = 30
# user.tags.extend(["python", "protobuf"])
# serialized = user.SerializeToString()
# print(f"Protobuf 大小: {len(serialized)} bytes")
# 对比 msgpack (无 IDL 的轻量方案)
import msgpack
user_data = {"name" : "Alice" , "age" : 30 , "tags" : ["python" , "protobuf" ]}
msgpack_bytes = msgpack.packb(user_data)
print (f"msgpack 大小: {len(msgpack_bytes)} bytes" )
当选用Protobuf: 1)团队规模大,需要严格的Schema管理和版本演进;2)需要为多种语言(Java/Go/C++/Python)生成代码;3)性能敏感且需要极小的序列化体积。如果是小型项目或快速原型开发,msgpack或JSON的灵活性更高。
5.3 选型决策树
以下决策指南可以帮助你在实际项目中快速选择合适的序列化方案:
纯Python内部使用 :选择pickle(便捷)或msgpack(安全和跨进程)
跨语言通信 :选择JSON(通用)或Protobuf(性能)
需要人类可读 :选择JSON
高性能缓存(Redis) :选择msgpack(体积小、序列化快)
大数据/流处理 :选择Avro(Hadoop生态友好)或JSONLines
微服务RPC通信 :选择Protobuf(gRPC必备)或msgpack(轻量)
对象深度拷贝 :使用pickle的copy.deepcopy(基于pickle协议实现)
六、大数据序列化性能优化
当处理大规模数据(GB级别以上)时,序列化性能成为系统瓶颈。以下策略可以帮助你在大数据场景中优化序列化性能。
6.1 批量序列化与流式处理
避免一次性将全部数据加载到内存中,采用分块序列化或流式写入。
import json
import time
# ❌ 低效方式:一次性加载全部数据
def serialize_all_at_once (records, filepath):
# 全部数据在内存中构建成一个大列表,可能导致OOM
with open (filepath, "w" ) as f:
json.dump(records, f)
# ✅ 高效方式:流式JSONLines写入
def serialize_streaming (records, filepath):
"""流式写入JSONLines,内存占用恒定"""
with open (filepath, "w" , encoding="utf-8" , buffering=65536 ) as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False ) + "\n" )
# ✅ msgpack 流式写入(适合高性能场景)
import msgpack
def serialize_msgpack_stream (records, filepath):
with open (filepath, "wb" , buffering=65536 ) as f:
for record in records:
f.write(msgpack.packb(record))
6.2 压缩策略
序列化后的数据可以进一步压缩,在CPU和IO之间做权衡。
import pickle
import gzip
import lzma
import json
import time
import msgpack
# 测试数据:10万条记录
test_records = [{"id" : i, "name" : f"user_{i}" , "scores" : [float (j) for j in range (10 )]} for i in range (100000 )]
def serialize_with_compression (data, serializer, compressor):
"""序列化+压缩测试"""
start = time.time()
if serializer == "pickle" :
serialized = pickle.dumps(data, protocol=5 )
elif serializer == "json" :
serialized = json.dumps(data, ensure_ascii=False ).encode("utf-8" )
elif serializer == "msgpack" :
serialized = msgpack.packb(data)
else :
raise ValueError ("Unknown serializer" )
if compressor == "gzip" :
compressed = gzip.compress(serialized)
elif compressor == "lzma" :
compressed = lzma.compress(serialized)
else :
compressed = serialized
elapsed = time.time() - start
return compressed, elapsed, len (compressed)
# 对比不同组合
for ser in ["pickle" , "json" , "msgpack" ]:
for comp in [None , "gzip" , "lzma" ]:
if comp is None :
comp_name = "无压缩"
else :
comp_name = comp
try :
result, elapsed, size = serialize_with_compression (test_records, ser, comp)
print (f"{ser:8s} + {comp_name:6s}: {size:10,d} bytes, {elapsed:.3f}s" )
except Exception as e:
print (f"{ser:8s} + {comp_name:6s}: 失败 - {e}" )
6.3 零拷贝与内存视图优化
对于大数据量的场景,使用pickle v5的out-of-band(OOB)数据和memoryview可以实现零拷贝序列化,避免不必要的数据复制。
import pickle
import numpy as np
# 使用 pickle v5 的 out-of-band 数据(零拷贝)
# 适用于需要独立处理大数组的场景(如NumPy数组、CUDA张量)
large_array = np.random.rand(1000 , 1000 )
# 方式1:常规方式(会将数组拷贝到pickle流中)
normal_bytes = pickle.dumps(large_array, protocol=5 )
print (f"常规方式: {len(normal_bytes)} bytes" )
# 方式2:使用 pickle.Buffer 实现零拷贝(需要Python 3.8+)
buffers = []
class NumpySerializer :
"""为NumPy数组提供零拷贝序列化的包装器"""
def __init__ (self , array):
self .array = array
def __reduce_ex__ (self , protocol):
# 利用 pickle v5 的 buffer_callback 机制
if protocol >= 5 :
return (
_reconstruct_numpy ,
(self .array.dtype, self .array.shape),
self .array.tobytes()
)
else :
return (_reconstruct_numpy , (self .array.dtype, self .array.shape, self .array.tobytes()))
def _reconstruct_numpy (dtype, shape, data=None ):
if data is None :
# 返回需要填充buffer的占位对象
return _Placeholder (dtype, shape)
return np.frombuffer(data, dtype=dtype).reshape(shape)
class _Placeholder :
def __init__ (self , dtype, shape):
self .dtype = dtype
self .shape = shape
self .buffer = None
def __setstate__ (self , state):
self .buffer = state
def finalize (self ):
return np.frombuffer(self .buffer, dtype=self .dtype).reshape(self .shape)
6.4 大数据序列化最佳实践
使用缓冲区 :用buffering=65536(64KB)等较大缓冲区减少磁盘IO次数
分块处理 :将大数据集切分为固定大小的块(如每块10000条),逐块序列化
选择合适协议 :pickle v5在序列化大对象(如NumPy数组)时效率显著优于v4
按需序列化 :只序列化需要的字段,避免传输完整对象
使用多进程 :将序列化任务分发到多个进程并行执行
数据压缩 :在IO成为瓶颈时使用gzip或lzma压缩,在CPU成为瓶颈时放弃压缩
避免重复序列化 :缓存已序列化的结果,使用functools.lru_cache或类似机制
七、核心要点总结
1. pickle :Python内置、支持任意对象、v5协议性能最优。核心方法:__getstate__排除不可序列化资源,__setstate__重建资源,__reduce__底层控制序列化流程。安全警告 :绝不反序列化不可信数据,建议继承pickle.Unpickler实现白名单机制。
2. JSON :跨语言标准、人类可读。进阶技巧:default参数或继承JSONEncoder处理自定义类型;object_hook/object_pairs_hook恢复特殊类型;JSONLines格式适用于流式日志和大数据ETL。
3. msgpack :二进制紧凑格式,性能介于pickle和JSON之间。适合高性能RPC、缓存、IoT等场景。支持扩展类型注册,比JSON更小的体积和更快的解析速度。
4. 选型原则 :内部用pickle,跨语言用JSON,高性能用msgpack,企业级系统用Protobuf。
5. 性能优化 :流式处理避免OOM,压缩在CPU/IO间权衡,pickle v5零拷贝优化大数组序列化。
序列化的本质是一种「表示转换」 ——将内存中的对象图转换为一维的字节序列。优秀的序列化方案应当在编码效率、解析速度、空间占用、类型保真度和安全性之间取得平衡。没有银弹,只有最适合当前场景的选择。