multiprocessing模块 — 多进程编程

Python标准库精讲专题 · 并发编程篇 · 掌握多进程编程

专题:Python标准库精讲系统学习

关键词:Python, 标准库, multiprocessing, 多进程, Process, Queue, Pipe, Pool, 共享内存, Manager

一、多进程概述

多进程(multiprocessing)是Python中实现并行计算的核心手段之一。与多线程不同,每个进程拥有独立的Python解释器和内存空间,因此可以绕过全局解释器锁(GIL)的限制,在多核CPU上实现真正的并行执行。

1.1 进程 vs 线程

对比维度多进程 (multiprocessing)多线程 (threading)
内存空间独立内存,不共享共享同一进程内存
GIL影响不受GIL限制(每个进程独立GIL)受GIL限制,同一时刻只有一个线程执行字节码
适用场景CPU密集型任务I/O密集型任务
数据安全天然隔离,无需锁保护全局变量需要锁机制保护共享数据
创建开销较大(独立进程创建)较小(线程轻量)
通信方式Queue、Pipe、共享内存等直接读写共享变量(需加锁)

1.2 CPU密集型优势

当任务主要消耗CPU资源(如数值计算、图像处理、加密解密、大数据排序)时,多进程可以将任务分发到多个CPU核心上并行执行。以一个简单的计算密集型任务为例:计算1到N的平方和,四核机器上多进程的耗时可能只有单线程的1/3到1/4。

相比之下,多线程在CPU密集型任务上由于GIL的制约,不仅无法加速,反而因线程切换的开销可能比单线程更慢。

1.3 GIL绕过原理

Python的全局解释器锁(GIL)确保同一时刻只有一个线程在执行Python字节码。multiprocessing模块通过创建独立的子进程来绕过这一限制。每个子进程都拥有自己独立的Python解释器和GIL,因此多个进程可以同时执行Python代码,充分利用多核CPU的并行能力。

需要注意的是,进程间通信(IPC)的开销不容忽视。如果任务本身计算量很小而通信频繁,多进程的优势可能被通信开销抵消。此时应评估是否值得使用多进程,或者采用多线程+异步I/O的方案。

关键理解:多进程不是"银弹"——它适合CPU密集、数据并行的大计算任务;对于I/O密集任务(网络请求、文件读写),多线程或异步编程(asyncio)通常是更高效的选择。

二、Process类

Process类是multiprocessing模块的核心,用于创建和管理操作系统级进程。它与threading.Thread的API高度相似,降低了学习成本。

2.1 创建进程:target和args

创建进程的最简单方式是将目标函数和参数传递给Process构造函数。target指定进程要执行的函数,args以元组形式传递位置参数,kwargs以字典形式传递关键字参数。

基本用法:

from multiprocessing import Process import os def worker(name, count): print(f"子进程 {name} (PID: {os.getpid()}) 开始工作") for i in range(count): print(f" {name}: 第{i+1}项任务完成") if __name__ == "__main__": p1 = Process(target=worker, args=("进程A", 3)) p2 = Process(target=worker, args=("进程B", 3)) p1.start() p2.start() p1.join() p2.join() print("所有子进程执行完毕")

2.2 启动和等待:start与join

start() 方法启动进程,将目标函数放入子进程中运行。start()调用后会立即返回,父进程和子进程并发执行。

join([timeout]) 方法等待子进程终止。调用join()的父进程会阻塞,直到对应的子进程结束。timeout参数可选,指定最长等待秒数,超时后父进程继续执行后续代码(子进程可能仍在运行)。

典型模式:先start所有进程,再逐一join,这样可以最大化并行度。

processes = [] for i in range(4): p = Process(target=worker, args=(f"进程{i}", 5)) p.start() processes.append(p) for p in processes: p.join() # 等待所有进程结束

2.3 终止进程:terminate

terminate() 方法强制终止子进程,通过发送SIGTERM信号(POSIX)或TerminateProcess(Windows)。注意:terminate()不会执行清理操作(如finally块、上下文管理器的__exit__),使用需谨慎。

可以通过is_alive()检查进程是否仍在运行:

p = Process(target=long_running_task) p.start() time.sleep(2) if p.is_alive(): print("进程仍在运行,强制终止...") p.terminate() p.join() # terminate后建议调用join,确保进程已清理 print(f"进程是否存活: {p.is_alive()}")

2.4 daemon属性

daemon属性控制进程是否为守护进程。当设置为True时,该进程会在主进程退出时自动终止,且不能创建子进程。守护进程常用于后台监控、定时任务等场景。

关键规则:

p = Process(target=background_monitor, daemon=True) p.start() # 主进程结束,守护进程自动终止

三、进程间通信 — Queue

由于进程拥有独立的内存空间,不能像线程那样通过共享变量通信。Queue为多进程提供了线程安全、进程安全的FIFO队列,基于管道和锁实现。

3.1 基本操作

multiprocessing.Queue实现了标准的队列协议,主要方法包括:

from multiprocessing import Process, Queue def producer(q): for i in range(5): q.put(f"消息 {i}") print(f"生产者发送: 消息 {i}") q.put("STOP") # 发送结束信号 def consumer(q): while True: msg = q.get() if msg == "STOP": break print(f"消费者收到: {msg}") if __name__ == "__main__": q = Queue() p1 = Process(target=producer, args=(q,)) p2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join()

3.2 JoinableQueue

Queue的子类JoinableQueue增加了task_done()和join()方法,用于跟踪队列中所有任务是否已被处理。其工作原理与queue模块的JoinableQueue类似:每次get()处理完任务后调用task_done(),队列内部维护未完成任务计数;join()会阻塞直到计数归零。

from multiprocessing import JoinableQueue jq = JoinableQueue() # 生产者放入N个任务 # 消费者每次处理完调用 jq.task_done() # 主进程调用 jq.join() 等待所有任务完成

四、进程间通信 — Pipe

Pipe(管道)提供两个进程之间的双向或单向通信通道,比Queue更轻量。Pipe返回一对连接对象(conn1, conn2),分别代表管道的两端。

4.1 双向管道(默认)

默认情况下,Pipe创建的是双向管道(duplex=True),两端都可以发送和接收数据:

from multiprocessing import Process, Pipe def child(conn): msg = conn.recv() # 从父进程接收 print(f"子进程收到: {msg}") conn.send("你好,父进程!") # 向父进程发送 if __name__ == "__main__": parent_conn, child_conn = Pipe() p = Process(target=child, args=(child_conn,)) p.start() parent_conn.send("你好,子进程!") reply = parent_conn.recv() print(f"父进程收到回复: {reply}") p.join()

4.2 单向管道

设置duplex=False创建单向管道:conn1只能send,conn2只能recv。适合明确的生产者-消费者模式:

from multiprocessing import Pipe sender, receiver = Pipe(duplex=False) # sender.send(data) — 只发送 # receiver.recv() — 只接收

4.3 send和recv注意事项

五、同步原语

multiprocessing提供了与threading模块几乎一致的同步原语,用于协调多个进程对共享资源的访问。这些原语底层基于操作系统信号量(semaphore)实现,跨进程工作。

5.1 Lock(互斥锁)

Lock确保同一时间只有一个进程可以访问受保护的资源。常用于保护共享文件、打印机等独占资源。

from multiprocessing import Process, Lock def critical_section(lock, pid): with lock: # 获取锁,自动释放 print(f"进程 {pid} 进入临界区") # 执行需要保护的操作 print(f"进程 {pid} 离开临界区") if __name__ == "__main__": lock = Lock() processes = [Process(target=critical_section, args=(lock, i)) for i in range(3)] for p in processes: p.start() for p in processes: p.join()

5.2 Event(事件)

Event用于进程间的事件通知。一个进程可以等待另一个进程设置事件标志,实现简单的同步。

from multiprocessing import Process, Event def waiter(ready_event): print("等待信号...") ready_event.wait() print("收到信号,开始工作") def signaler(ready_event): import time; time.sleep(2) print("发送信号") ready_event.set() if __name__ == "__main__": event = Event() p1 = Process(target=waiter, args=(event,)) p2 = Process(target=signaler, args=(event,)) p1.start(); p2.start() p1.join(); p2.join()

5.3 Semaphore(信号量)

Semaphore维护一个计数器,允许多个进程同时访问有限数量的资源。acquire()减少计数(计数为0时阻塞),release()增加计数。常用于限制数据库连接池、并发线程数等场景。

from multiprocessing import Semaphore # 最多允许3个进程同时访问 sem = Semaphore(3) sem.acquire() # 计数减1(计数为0时阻塞) sem.release() # 计数加1

5.4 Condition(条件变量)

Condition结合了Lock和事件通知机制,适用于更复杂的同步场景——当某个条件成立时唤醒其他进程。典型用法是生产者-消费者模式:

from multiprocessing import Process, Condition def consumer(cv, data_list): with cv: while not data_list: cv.wait() # 等待数据 item = data_list.pop(0) print(f"消费: {item}") def producer(cv, data_list): with cv: data_list.append("新数据") cv.notify() # 唤醒等待的消费者

六、共享内存

共享内存允许多个进程直接访问同一块内存区域,避免了数据拷贝的开销,比Queue和Pipe更加高效。multiprocessing模块通过Value和Array提供对共享内存的简便封装。

6.1 Value — 共享单个值

Value(typecode_or_type, value)创建包含单个值的共享内存对象。typecode是ctypes类型代码('i'表示int,'d'表示double,'c'表示char等),也可以是ctypes类型本身。

from multiprocessing import Process, Value def increment(counter): for _ in range(1000): counter.value += 1 if __name__ == "__main__": counter = Value('i', 0) # 共享整数,初始值0 processes = [Process(target=increment, args=(counter,)) for _ in range(10)] for p in processes: p.start() for p in processes: p.join() print(f"最终计数: {counter.value}")

注意:Value对象默认带有内部锁防止竞态条件,但在复合操作(读-改-写)中仍可能产生不一致。需要原子操作时使用Value的get_lock()方法显式加锁。

6.2 Array — 共享数组

Array(typecode_or_type, size_or_sequence)创建包含固定长度数组的共享内存。可以传入长度创建空数组,或传入序列初始化。

from multiprocessing import Array arr = Array('i', [0] * 10) # 10个int的数组,初始值为0 arr[0] = 42 # 直接索引访问 arr[1:5] = [1, 2, 3, 4] # 支持切片 print(list(arr)) # 转换为Python列表 # 双精度浮点数组 double_arr = Array('d', 5) # 5个double,初始为0.0

6.3 共享字符串与自定义类型

对于字符串,可以使用ctypes.c_char_p或Array('c', string.encode()),但需要注意字符串长度固定:

from multiprocessing import Array, Value import ctypes # 共享字符串(固定长度) shared_str = Array('c', b'hello') # 5字节共享缓冲区 # 共享自定义类型 class Point(ctypes.Structure): _fields_ = [("x", ctypes.c_double), ("y", ctypes.c_double)] shared_point = Value(Point, Point(3.0, 4.0)) print(f"坐标: ({shared_point.value.x}, {shared_point.value.y})")

七、Manager

Manager提供了一种更高级、更灵活的进程间数据共享方式。它通过创建一个独立的服务器进程来管理共享对象,其他进程通过代理(proxy)访问这些对象。Manager支持几乎任何Python对象,且自动处理同步。

7.1 基本用法

Manager()创建管理器对象,其属性方法可以创建各种共享数据结构:

from multiprocessing import Process, Manager def worker(d, lst, pid): d[pid] = f"进程{pid}的数据" lst.append(pid) if __name__ == "__main__": with Manager() as manager: shared_dict = manager.dict() shared_list = manager.list() processes = [] for i in range(5): p = Process(target=worker, args=(shared_dict, shared_list, i)) p.start() processes.append(p) for p in processes: p.join() print(f"共享字典: {dict(shared_dict)}") print(f"共享列表: {list(shared_list)}")

7.2 支持的数据类型

Manager提供了以下常用共享数据类型:

7.3 Namespace

Namespace是一种极其灵活的共享对象,允许动态添加和访问任何可pickle的属性:

ns = manager.Namespace() ns.x = 10 # 动态属性 ns.config = {"host": "localhost", "port": 8080} print(ns.x, ns.config)

注意:Namespace的属性和普通属性赋值不是原子操作,修改可变对象(如列表、字典)时需要显式加锁。

7.4 远程管理

Manager支持通过网络让不同机器上的进程共享数据。通过设置authkey和指定地址,可以创建远程Manager:

from multiprocessing.managers import BaseManager # 服务端 manager = BaseManager(address=('', 50000), authkey=b'secret') manager.register('get_dict', dict) manager.start() # 客户端 m = BaseManager(address=('server_ip', 50000), authkey=b'secret') m.connect() shared = m.get_dict()

远程Manager在实际应用中需注意网络安全和数据序列化开销,通常建议使用Redis或RabbitMQ等专业中间件替代。

八、Pool进程池

当需要管理大量进程时,逐个创建和销毁进程的开销很大。Pool(进程池)维护一个进程工作池,将任务分配给空闲的工作进程,复用进程资源,显著降低开销。

8.1 apply — 同步调用

apply(func, args)阻塞当前进程直到func执行完毕并返回结果。类似于内置函数apply,但在工作进程中执行。大多数情况下不推荐使用,因为它是阻塞的,无法利用并行的优势。

from multiprocessing import Pool def square(x): return x * x with Pool(4) as pool: result = pool.apply(square, (10,)) print(result) # 100(但这是串行的)

8.2 apply_async — 异步调用(推荐)

apply_async(func, args[, callback])立即返回一个AsyncResult对象,不阻塞父进程。通过get()方法获取结果(阻塞直到结果准备好)。callback参数指定结果就绪后调用的回调函数(在主进程中执行)。

from multiprocessing import Pool def square(x): return x * x with Pool(4) as pool: results = [pool.apply_async(square, (i,)) for i in range(10)] for r in results: print(r.get()) # 按提交顺序打印0, 1, 4, 9, ...

8.3 map — 并行map

pool.map(func, iterable)类似于内置map,但将可迭代对象分块后分配到工作进程并行执行。它阻塞主进程,直到所有结果准备就绪,返回有序的结果列表。

with Pool(4) as pool: result = pool.map(square, range(10)) print(result) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

map_async 是map的非阻塞版本:

result = pool.map_async(square, range(10)) print(result.get()) # 结果同上,但不阻塞主进程

8.4 imap — 惰性map

imap(func, iterable[, chunksize])返回一个迭代器,每准备好一个结果就立即产生,不必等待所有任务完成。这对于需要逐步处理大量结果(如逐行写入文件)的场景非常有用。

with Pool(4) as pool: for result in pool.imap(square, range(100)): if result > 100: break # 可以提前终止,节省计算资源

imap_unordered 不保证结果顺序,只要一个任务完成就立即返回,通常比imap更快。

8.5 starmap — 多参数map

starmap(func, iterable)适用于目标函数接受多个参数的情况。iterable的每个元素是一个元组,会被解包传递给func。

def add(x, y): return x + y with Pool(4) as pool: result = pool.starmap(add, [(1,2), (3,4), (5,6)]) print(result) # [3, 7, 11]

starmap_async 是非阻塞版本。

8.6 close与join

在使用Pool时,正确的资源管理至关重要:

最佳实践是使用上下文管理器(with语句),它会自动调用close和join:

# 推荐:上下文管理器自动管理资源 with Pool(4) as pool: results = pool.map(square, range(100)) # 等价于: pool = Pool(4) try: results = pool.map(square, range(100)) finally: pool.close() pool.join()

8.7 进程数与CPU核数

合理设置进程池大小是性能优化的关键:

from multiprocessing import cpu_count processes = cpu_count() print(f"CPU逻辑核心数: {processes}") # 创建与CPU核数相同的进程池 with Pool(processes) as pool: results = pool.map(heavy_compute, big_data)

核心要点总结

1. multiprocessing模块通过创建独立进程绕过GIL,实现真正的多核并行计算,适用于CPU密集型任务。

2. Process类提供了类似threading.Thread的API:target/args指定任务,start()启动,join()等待,terminate()强制终止,daemon属性控制守护进程。

3. 进程间通信方式包括:Queue(安全队列)、Pipe(轻量管道)、Shared Memory(高效共享)、Manager(灵活代理),根据场景选择合适的通信方式。

4. 同步原语(Lock、Event、Semaphore、Condition)与threading模块一致,用于协调进程对共享资源的访问。

5. Pool进程池复用进程资源,提供apply_async、map、imap、starmap等多种并行映射方式,是处理大量任务的首选方案。

6. 进程数建议设置为cpu_count()或cpu_count()-1,避免过多进程导致上下文切换开销。

7. 进程间传输的数据必须可pickle序列化,这是multiprocessing模块的基本约束。

8. 务必在主模块中使用 if __name__ == "__main__": 保护进程创建代码,这是跨平台兼容的关键。