一、多进程概述
多进程(multiprocessing)是Python中实现并行计算的核心手段之一。与多线程不同,每个进程拥有独立的Python解释器和内存空间,因此可以绕过全局解释器锁(GIL)的限制,在多核CPU上实现真正的并行执行。
1.1 进程 vs 线程
| 对比维度 | 多进程 (multiprocessing) | 多线程 (threading) |
| 内存空间 | 独立内存,不共享 | 共享同一进程内存 |
| GIL影响 | 不受GIL限制(每个进程独立GIL) | 受GIL限制,同一时刻只有一个线程执行字节码 |
| 适用场景 | CPU密集型任务 | I/O密集型任务 |
| 数据安全 | 天然隔离,无需锁保护全局变量 | 需要锁机制保护共享数据 |
| 创建开销 | 较大(独立进程创建) | 较小(线程轻量) |
| 通信方式 | Queue、Pipe、共享内存等 | 直接读写共享变量(需加锁) |
1.2 CPU密集型优势
当任务主要消耗CPU资源(如数值计算、图像处理、加密解密、大数据排序)时,多进程可以将任务分发到多个CPU核心上并行执行。以一个简单的计算密集型任务为例:计算1到N的平方和,四核机器上多进程的耗时可能只有单线程的1/3到1/4。
相比之下,多线程在CPU密集型任务上由于GIL的制约,不仅无法加速,反而因线程切换的开销可能比单线程更慢。
1.3 GIL绕过原理
Python的全局解释器锁(GIL)确保同一时刻只有一个线程在执行Python字节码。multiprocessing模块通过创建独立的子进程来绕过这一限制。每个子进程都拥有自己独立的Python解释器和GIL,因此多个进程可以同时执行Python代码,充分利用多核CPU的并行能力。
需要注意的是,进程间通信(IPC)的开销不容忽视。如果任务本身计算量很小而通信频繁,多进程的优势可能被通信开销抵消。此时应评估是否值得使用多进程,或者采用多线程+异步I/O的方案。
关键理解:多进程不是"银弹"——它适合CPU密集、数据并行的大计算任务;对于I/O密集任务(网络请求、文件读写),多线程或异步编程(asyncio)通常是更高效的选择。
二、Process类
Process类是multiprocessing模块的核心,用于创建和管理操作系统级进程。它与threading.Thread的API高度相似,降低了学习成本。
2.1 创建进程:target和args
创建进程的最简单方式是将目标函数和参数传递给Process构造函数。target指定进程要执行的函数,args以元组形式传递位置参数,kwargs以字典形式传递关键字参数。
基本用法:
from multiprocessing import Process
import os
def worker(name, count):
print(f"子进程 {name} (PID: {os.getpid()}) 开始工作")
for i in range(count):
print(f" {name}: 第{i+1}项任务完成")
if __name__ == "__main__":
p1 = Process(target=worker, args=("进程A", 3))
p2 = Process(target=worker, args=("进程B", 3))
p1.start()
p2.start()
p1.join()
p2.join()
print("所有子进程执行完毕")
2.2 启动和等待:start与join
start() 方法启动进程,将目标函数放入子进程中运行。start()调用后会立即返回,父进程和子进程并发执行。
join([timeout]) 方法等待子进程终止。调用join()的父进程会阻塞,直到对应的子进程结束。timeout参数可选,指定最长等待秒数,超时后父进程继续执行后续代码(子进程可能仍在运行)。
典型模式:先start所有进程,再逐一join,这样可以最大化并行度。
processes = []
for i in range(4):
p = Process(target=worker, args=(f"进程{i}", 5))
p.start()
processes.append(p)
for p in processes:
p.join() # 等待所有进程结束
2.3 终止进程:terminate
terminate() 方法强制终止子进程,通过发送SIGTERM信号(POSIX)或TerminateProcess(Windows)。注意:terminate()不会执行清理操作(如finally块、上下文管理器的__exit__),使用需谨慎。
可以通过is_alive()检查进程是否仍在运行:
p = Process(target=long_running_task)
p.start()
time.sleep(2)
if p.is_alive():
print("进程仍在运行,强制终止...")
p.terminate()
p.join() # terminate后建议调用join,确保进程已清理
print(f"进程是否存活: {p.is_alive()}")
2.4 daemon属性
daemon属性控制进程是否为守护进程。当设置为True时,该进程会在主进程退出时自动终止,且不能创建子进程。守护进程常用于后台监控、定时任务等场景。
关键规则:
- daemon属性必须在start()调用之前设置
- 守护进程会在主进程退出时被强制终止,不会执行清理代码
- 子进程默认继承父进程的daemon状态
- 守护进程无法创建子进程(会引发AssertionError)
p = Process(target=background_monitor, daemon=True)
p.start()
# 主进程结束,守护进程自动终止
三、进程间通信 — Queue
由于进程拥有独立的内存空间,不能像线程那样通过共享变量通信。Queue为多进程提供了线程安全、进程安全的FIFO队列,基于管道和锁实现。
3.1 基本操作
multiprocessing.Queue实现了标准的队列协议,主要方法包括:
- put(obj[, block[, timeout]]):将对象放入队列。block=True时,队列满则阻塞;block=False时,队列满则抛出queue.Full异常。timeout指定阻塞超时。
- get([block[, timeout]]):从队列取出对象。队列空时行为与put类似(阻塞或抛出queue.Empty异常)。
- empty():判断队列是否为空(注意:在多进程环境中,返回值可能不准确)。
- qsize():返回队列的近似大小(同样不保证精确)。
- close():关闭队列,释放资源。关闭后不能put新数据。
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(f"消息 {i}")
print(f"生产者发送: 消息 {i}")
q.put("STOP") # 发送结束信号
def consumer(q):
while True:
msg = q.get()
if msg == "STOP":
break
print(f"消费者收到: {msg}")
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
3.2 JoinableQueue
Queue的子类JoinableQueue增加了task_done()和join()方法,用于跟踪队列中所有任务是否已被处理。其工作原理与queue模块的JoinableQueue类似:每次get()处理完任务后调用task_done(),队列内部维护未完成任务计数;join()会阻塞直到计数归零。
from multiprocessing import JoinableQueue
jq = JoinableQueue()
# 生产者放入N个任务
# 消费者每次处理完调用 jq.task_done()
# 主进程调用 jq.join() 等待所有任务完成
四、进程间通信 — Pipe
Pipe(管道)提供两个进程之间的双向或单向通信通道,比Queue更轻量。Pipe返回一对连接对象(conn1, conn2),分别代表管道的两端。
4.1 双向管道(默认)
默认情况下,Pipe创建的是双向管道(duplex=True),两端都可以发送和接收数据:
from multiprocessing import Process, Pipe
def child(conn):
msg = conn.recv() # 从父进程接收
print(f"子进程收到: {msg}")
conn.send("你好,父进程!") # 向父进程发送
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=child, args=(child_conn,))
p.start()
parent_conn.send("你好,子进程!")
reply = parent_conn.recv()
print(f"父进程收到回复: {reply}")
p.join()
4.2 单向管道
设置duplex=False创建单向管道:conn1只能send,conn2只能recv。适合明确的生产者-消费者模式:
from multiprocessing import Pipe
sender, receiver = Pipe(duplex=False)
# sender.send(data) — 只发送
# receiver.recv() — 只接收
4.3 send和recv注意事项
- 序列化:通过Pipe发送的数据必须可pickle序列化。无法序列化的对象(如lambda、某些动态对象)不能通过Pipe传输。
- 同步风险:recv()默认阻塞直到收到数据。如果两端都在等待recv,会导致死锁。
- 连接关闭:当连接关闭时,recv()会抛出EOFError。可以用try/except捕获并做出相应处理。
- 避免混用:管道的两端不应被多个进程/线程同时使用,除非使用锁保护。
- 超过64KB:发送大于64KB的数据时,内部可能触发分段传输机制,但这对用户透明。
五、同步原语
multiprocessing提供了与threading模块几乎一致的同步原语,用于协调多个进程对共享资源的访问。这些原语底层基于操作系统信号量(semaphore)实现,跨进程工作。
5.1 Lock(互斥锁)
Lock确保同一时间只有一个进程可以访问受保护的资源。常用于保护共享文件、打印机等独占资源。
from multiprocessing import Process, Lock
def critical_section(lock, pid):
with lock: # 获取锁,自动释放
print(f"进程 {pid} 进入临界区")
# 执行需要保护的操作
print(f"进程 {pid} 离开临界区")
if __name__ == "__main__":
lock = Lock()
processes = [Process(target=critical_section, args=(lock, i))
for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
5.2 Event(事件)
Event用于进程间的事件通知。一个进程可以等待另一个进程设置事件标志,实现简单的同步。
- set():将事件标志设为True,唤醒所有等待的进程
- clear():将事件标志重置为False
- wait([timeout]):阻塞直到事件被set,或超时返回
- is_set():检查事件是否已被设置
from multiprocessing import Process, Event
def waiter(ready_event):
print("等待信号...")
ready_event.wait()
print("收到信号,开始工作")
def signaler(ready_event):
import time; time.sleep(2)
print("发送信号")
ready_event.set()
if __name__ == "__main__":
event = Event()
p1 = Process(target=waiter, args=(event,))
p2 = Process(target=signaler, args=(event,))
p1.start(); p2.start()
p1.join(); p2.join()
5.3 Semaphore(信号量)
Semaphore维护一个计数器,允许多个进程同时访问有限数量的资源。acquire()减少计数(计数为0时阻塞),release()增加计数。常用于限制数据库连接池、并发线程数等场景。
from multiprocessing import Semaphore
# 最多允许3个进程同时访问
sem = Semaphore(3)
sem.acquire() # 计数减1(计数为0时阻塞)
sem.release() # 计数加1
5.4 Condition(条件变量)
Condition结合了Lock和事件通知机制,适用于更复杂的同步场景——当某个条件成立时唤醒其他进程。典型用法是生产者-消费者模式:
from multiprocessing import Process, Condition
def consumer(cv, data_list):
with cv:
while not data_list:
cv.wait() # 等待数据
item = data_list.pop(0)
print(f"消费: {item}")
def producer(cv, data_list):
with cv:
data_list.append("新数据")
cv.notify() # 唤醒等待的消费者
六、共享内存
共享内存允许多个进程直接访问同一块内存区域,避免了数据拷贝的开销,比Queue和Pipe更加高效。multiprocessing模块通过Value和Array提供对共享内存的简便封装。
6.1 Value — 共享单个值
Value(typecode_or_type, value)创建包含单个值的共享内存对象。typecode是ctypes类型代码('i'表示int,'d'表示double,'c'表示char等),也可以是ctypes类型本身。
from multiprocessing import Process, Value
def increment(counter):
for _ in range(1000):
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # 共享整数,初始值0
processes = [Process(target=increment, args=(counter,))
for _ in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"最终计数: {counter.value}")
注意:Value对象默认带有内部锁防止竞态条件,但在复合操作(读-改-写)中仍可能产生不一致。需要原子操作时使用Value的get_lock()方法显式加锁。
6.2 Array — 共享数组
Array(typecode_or_type, size_or_sequence)创建包含固定长度数组的共享内存。可以传入长度创建空数组,或传入序列初始化。
from multiprocessing import Array
arr = Array('i', [0] * 10) # 10个int的数组,初始值为0
arr[0] = 42 # 直接索引访问
arr[1:5] = [1, 2, 3, 4] # 支持切片
print(list(arr)) # 转换为Python列表
# 双精度浮点数组
double_arr = Array('d', 5) # 5个double,初始为0.0
6.3 共享字符串与自定义类型
对于字符串,可以使用ctypes.c_char_p或Array('c', string.encode()),但需要注意字符串长度固定:
from multiprocessing import Array, Value
import ctypes
# 共享字符串(固定长度)
shared_str = Array('c', b'hello') # 5字节共享缓冲区
# 共享自定义类型
class Point(ctypes.Structure):
_fields_ = [("x", ctypes.c_double), ("y", ctypes.c_double)]
shared_point = Value(Point, Point(3.0, 4.0))
print(f"坐标: ({shared_point.value.x}, {shared_point.value.y})")
七、Manager
Manager提供了一种更高级、更灵活的进程间数据共享方式。它通过创建一个独立的服务器进程来管理共享对象,其他进程通过代理(proxy)访问这些对象。Manager支持几乎任何Python对象,且自动处理同步。
7.1 基本用法
Manager()创建管理器对象,其属性方法可以创建各种共享数据结构:
from multiprocessing import Process, Manager
def worker(d, lst, pid):
d[pid] = f"进程{pid}的数据"
lst.append(pid)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = []
for i in range(5):
p = Process(target=worker, args=(shared_dict, shared_list, i))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"共享字典: {dict(shared_dict)}")
print(f"共享列表: {list(shared_list)}")
7.2 支持的数据类型
Manager提供了以下常用共享数据类型:
- dict() — 共享字典,支持所有字典操作
- list() — 共享列表,支持所有列表操作
- Namespace() — 命名空间对象,通过属性访问
- Queue() — 进程安全队列,类似multiprocessing.Queue
- Lock() / RLock() / Semaphore() / Event() / Condition() / Barrier() — 同步原语
- Value() / Array() — 共享内存封装
7.3 Namespace
Namespace是一种极其灵活的共享对象,允许动态添加和访问任何可pickle的属性:
ns = manager.Namespace()
ns.x = 10 # 动态属性
ns.config = {"host": "localhost", "port": 8080}
print(ns.x, ns.config)
注意:Namespace的属性和普通属性赋值不是原子操作,修改可变对象(如列表、字典)时需要显式加锁。
7.4 远程管理
Manager支持通过网络让不同机器上的进程共享数据。通过设置authkey和指定地址,可以创建远程Manager:
from multiprocessing.managers import BaseManager
# 服务端
manager = BaseManager(address=('', 50000), authkey=b'secret')
manager.register('get_dict', dict)
manager.start()
# 客户端
m = BaseManager(address=('server_ip', 50000), authkey=b'secret')
m.connect()
shared = m.get_dict()
远程Manager在实际应用中需注意网络安全和数据序列化开销,通常建议使用Redis或RabbitMQ等专业中间件替代。
八、Pool进程池
当需要管理大量进程时,逐个创建和销毁进程的开销很大。Pool(进程池)维护一个进程工作池,将任务分配给空闲的工作进程,复用进程资源,显著降低开销。
8.1 apply — 同步调用
apply(func, args)阻塞当前进程直到func执行完毕并返回结果。类似于内置函数apply,但在工作进程中执行。大多数情况下不推荐使用,因为它是阻塞的,无法利用并行的优势。
from multiprocessing import Pool
def square(x):
return x * x
with Pool(4) as pool:
result = pool.apply(square, (10,))
print(result) # 100(但这是串行的)
8.2 apply_async — 异步调用(推荐)
apply_async(func, args[, callback])立即返回一个AsyncResult对象,不阻塞父进程。通过get()方法获取结果(阻塞直到结果准备好)。callback参数指定结果就绪后调用的回调函数(在主进程中执行)。
from multiprocessing import Pool
def square(x):
return x * x
with Pool(4) as pool:
results = [pool.apply_async(square, (i,)) for i in range(10)]
for r in results:
print(r.get()) # 按提交顺序打印0, 1, 4, 9, ...
8.3 map — 并行map
pool.map(func, iterable)类似于内置map,但将可迭代对象分块后分配到工作进程并行执行。它阻塞主进程,直到所有结果准备就绪,返回有序的结果列表。
with Pool(4) as pool:
result = pool.map(square, range(10))
print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
map_async 是map的非阻塞版本:
result = pool.map_async(square, range(10))
print(result.get()) # 结果同上,但不阻塞主进程
8.4 imap — 惰性map
imap(func, iterable[, chunksize])返回一个迭代器,每准备好一个结果就立即产生,不必等待所有任务完成。这对于需要逐步处理大量结果(如逐行写入文件)的场景非常有用。
with Pool(4) as pool:
for result in pool.imap(square, range(100)):
if result > 100:
break # 可以提前终止,节省计算资源
imap_unordered 不保证结果顺序,只要一个任务完成就立即返回,通常比imap更快。
8.5 starmap — 多参数map
starmap(func, iterable)适用于目标函数接受多个参数的情况。iterable的每个元素是一个元组,会被解包传递给func。
def add(x, y):
return x + y
with Pool(4) as pool:
result = pool.starmap(add, [(1,2), (3,4), (5,6)])
print(result) # [3, 7, 11]
starmap_async 是非阻塞版本。
8.6 close与join
在使用Pool时,正确的资源管理至关重要:
- close():关闭进程池,阻止新任务提交。正在运行的任务继续执行。
- join():等待所有工作进程退出。必须在close()或terminate()之后调用。
- terminate():立即停止所有工作进程,不等待未完成任务。
最佳实践是使用上下文管理器(with语句),它会自动调用close和join:
# 推荐:上下文管理器自动管理资源
with Pool(4) as pool:
results = pool.map(square, range(100))
# 等价于:
pool = Pool(4)
try:
results = pool.map(square, range(100))
finally:
pool.close()
pool.join()
8.7 进程数与CPU核数
合理设置进程池大小是性能优化的关键:
- cpu_count():返回系统的CPU逻辑核心数量(例如四核八线程返回8)。
- 一般规则:CPU密集型任务,进程数通常设为 cpu_count() 或 cpu_count() - 1(留一个给系统)。
- 混合任务:同时存在I/O等待时,可以适当增加进程数(如 cpu_count() * 2)。
- 过犹不及:进程数超过CPU核心数会导致上下文切换开销增加,反而降低性能。
from multiprocessing import cpu_count
processes = cpu_count()
print(f"CPU逻辑核心数: {processes}")
# 创建与CPU核数相同的进程池
with Pool(processes) as pool:
results = pool.map(heavy_compute, big_data)
核心要点总结
1. multiprocessing模块通过创建独立进程绕过GIL,实现真正的多核并行计算,适用于CPU密集型任务。
2. Process类提供了类似threading.Thread的API:target/args指定任务,start()启动,join()等待,terminate()强制终止,daemon属性控制守护进程。
3. 进程间通信方式包括:Queue(安全队列)、Pipe(轻量管道)、Shared Memory(高效共享)、Manager(灵活代理),根据场景选择合适的通信方式。
4. 同步原语(Lock、Event、Semaphore、Condition)与threading模块一致,用于协调进程对共享资源的访问。
5. Pool进程池复用进程资源,提供apply_async、map、imap、starmap等多种并行映射方式,是处理大量任务的首选方案。
6. 进程数建议设置为cpu_count()或cpu_count()-1,避免过多进程导致上下文切换开销。
7. 进程间传输的数据必须可pickle序列化,这是multiprocessing模块的基本约束。
8. 务必在主模块中使用 if __name__ == "__main__": 保护进程创建代码,这是跨平台兼容的关键。