multiprocessing多进程

Python进阶编程专题 · Python多进程编程与并行计算

专题:Python进阶编程系统学习

关键词:Python, multiprocessing, 多进程, Process, Pool, Queue, Pipe, 共享内存

一、概述

在Python中,由于全局解释器锁(Global Interpreter Lock, GIL)的存在,多线程程序无法利用多核CPU并行执行CPU密集型任务。为了突破这一限制,Python提供了multiprocessing模块——它通过创建独立的进程(而非线程)来绕过GIL,使得程序能够真正利用多核CPU的并行计算能力。

每个multiprocessing进程都有自己独立的Python解释器和内存空间,这意味着子进程中变量的修改不会影响父进程。这一特性既是优势(真正的并行性、规避GIL)也是代价(进程间通信成本高、内存开销大)。

multiprocessing模块的设计参考了threading模块的API,因此熟悉多线程编程的开发者可以快速上手。它提供了Process(进程类)、Pool(进程池)、Queue/Pipe(IPC通信)、Value/Array/Manager(共享数据)以及Lock/Semaphore/Event/Condition(同步原语)等完备的工具集。

核心要点:multiprocessing通过创建独立进程实现真正并行,适合CPU密集型任务;进程间内存隔离,通信需要通过IPC机制(Queue、Pipe、共享内存)。

二、Process:进程创建与基础操作

Process是multiprocessing模块中最核心的类,用于创建和管理子进程。它的用法与threading.Thread高度相似,支持target指定可调用对象、args/kwargs传递参数、name命名进程、daemon设置守护模式。

2.1 基础进程创建

最简单的使用方式是创建Process实例并调用start()方法启动进程,之后可以调用join()等待进程结束。

import multiprocessing as mp import os def worker(name: str): """子进程中执行的函数""" pid = os.getpid() ppid = os.getppid() print(f"[子进程 {name}] PID={pid}, PPID={ppid}") return if __name__ == "__main__": # 创建并启动进程 p1 = mp.Process(target=worker, args=("A",)) p2 = mp.Process(target=worker, args=("B",)) p1.start() p2.start() print(f"[主进程] PID={os.getpid()}, 等待子进程结束...") p1.join() p2.join() print("[主进程] 所有子进程已结束")

2.2 继承Process类

除了使用target参数,还可以通过继承Process类并重写run()方法来定义进程行为。这种方式在封装复杂逻辑时更加清晰。

import multiprocessing as mp import time class MyProcess(mp.Process): def __init__(self, name: str, count: int): super().__init__() self.name = name self.count = count def run(self): """进程启动时自动执行的方法""" for i in range(self.count): print(f"[{self.name}] step {i + 1}") time.sleep(0.5) if __name__ == "__main__": procs = [MyProcess(f"Worker-{i}", 3) for i in range(3)] for p in procs: p.start() for p in procs: p.join() print("所有进程完成")

2.3 进程属性与方法

Process实例提供了丰富的属性和方法来管理进程的生命周期。以下是最常用的几个:

重要注意事项:在Windows上,必须将进程创建代码放在if __name__ == "__main__":保护块中,否则会导致递归创建进程的Bug。这是因为Windows使用spawn方式启动进程,会重新导入模块。

三、进程间通信:Queue与Pipe

由于进程之间内存隔离,不能像线程那样直接共享变量。multiprocessing提供了多种进程间通信(IPC)机制,最基础的是Queue(队列)和Pipe(管道)。

3.1 Queue:生产者-消费者模式

Queue是一个线程和进程安全的先进先出(FIFO)队列,基于底层管道和锁实现,可以安全地在多个进程间传递Python对象(需要可pickle序列化)。

import multiprocessing as mp import time import random def producer(queue: mp.Queue, items: list): """生产者:向队列中放入数据""" for item in items: queue.put(item) print(f" 生产者放入: {item}") time.sleep(random.uniform(0.2, 0.5)) queue.put(None) # 发送结束信号 def consumer(queue: mp.Queue, name: str): """消费者:从队列中取出数据""" while True: item = queue.get() if item is None: # 收到结束信号 queue.put(None) # 传递给下一个消费者 print(f" 消费者[{name}] 结束") break print(f" 消费者[{name}] 取出: {item}") time.sleep(random.uniform(0.3, 0.6)) if __name__ == "__main__": q = mp.Queue(maxsize=10) data = [f"Task-{i}" for i in range(5)] p1 = mp.Process(target=producer, args=(q, data)) p2 = mp.Process(target=consumer, args=(q, "A")) p3 = mp.Process(target=consumer, args=(q, "B")) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() print("所有进程结束")

3.2 Queue的常用方法

3.3 Pipe:双向通信管道

Pipe返回一对连接对象(conn1, conn2),分别代表管道的两端。数据从一端发送,在另一端接收。Pipe的速度比Queue快,但只适用于两个进程之间的通信。

import multiprocessing as mp def child_process(conn: mp.Connection): """子进程:先接收再发送""" msg = conn.recv() # 阻塞等待接收 print(f" 子进程收到: {msg}") reply = f"Hello from child! Received: '{msg}'" conn.send(reply) conn.close() if __name__ == "__main__": parent_conn, child_conn = mp.Pipe() p = mp.Process(target=child_process, args=(child_conn,)) p.start() # 主进程发送消息 parent_conn.send("Hello from parent!") response = parent_conn.recv() print(f"主进程收到: {response}") p.join() parent_conn.close()

性能提示:Pipe比Queue更快,因为Queue内部使用了Pipe外加锁和缓冲区。如果只需要两个进程间通信,优先选择Pipe。但注意Pipe不是线程安全的(同时读写两端需要加锁),而Queue是线程和进程双重安全的。

四、共享内存:Value与Array

为了在进程间共享基本类型的数据,multiprocessing提供了基于共享内存的ValueArray。它们使用ctypes类型来分配内存,多个进程可以直接访问同一块内存区域,避免了数据的序列化和反序列化开销,速度极快。

4.1 Value:共享单个值

import multiprocessing as mp import time def increment(counter: mp.Value, n: int): """多进程并发递增计数器""" for _ in range(n): with counter.get_lock(): # 使用锁保证原子性 counter.value += 1 def decrement(counter: mp.Value, n: int): """多进程并发递减计数器""" for _ in range(n): with counter.get_lock(): counter.value -= 1 if __name__ == "__main__": # typecode='i' 表示ctypes.c_int shared_counter = mp.Value('i', 0) N = 100000 p1 = mp.Process(target=increment, args=(shared_counter, N)) p2 = mp.Process(target=decrement, args=(shared_counter, N)) p1.start() p2.start() p1.join() p2.join() print(f"最终值: {shared_counter.value}") # 理论为0 print(f"类型: {shared_counter.value.__class__}")

4.2 Array:共享数组

Array创建一块共享内存数组,可以指定ctypes类型和大小。多个进程可以读写该数组的不同位置或相同位置(需加锁)。

import multiprocessing as mp def write_data(arr: mp.Array, index: int, value: int): """向共享数组写入数据""" with arr.get_lock(): arr[index] = value print(f" 写入: arr[{index}] = {value}") def read_data(arr: mp.Array, index: int): """读取共享数组数据""" # 不加锁也可以读,但可能读到不一致的数据 val = arr[index] print(f" 读取: arr[{index}] = {val}") return val if __name__ == "__main__": # 'd' = ctypes.c_double, 长度为10 shared_arr = mp.Array('d', [0.0] * 10) processes = [] for i in range(10): p = mp.Process(target=write_data, args=(shared_arr, i, i * 3.14)) processes.append(p) for p in processes: p.start() for p in processes: p.join() print(f"最终数组: {list(shared_arr)}")

4.3 ctypes类型对照表

typecodectypes类型C类型说明
'c'c_charchar单字节字符
'b'c_bytesigned char有符号字节
'B'c_ubyteunsigned char无符号字节
'h'c_shortshort短整数
'H'c_ushortunsigned short无符号短整数
'i'c_intint整数
'I'c_uintunsigned int无符号整数
'l'c_longlong长整数
'L'c_ulongunsigned long无符号长整数
'f'c_floatfloat单精度浮点数
'd'c_doubledouble双精度浮点数

注意:Value和Array创建时可以通过lock=False关闭自动锁机制以提高性能,此时需要自行确保并发安全。在没有锁保护的情况下同时写操作可能导致数据损坏。

五、Manager:共享复杂对象

Value和Array只能共享基本类型的值或数组,当需要在进程间共享更复杂的Python对象(如dict、list、自定义类实例等)时,需要使用Manager

Manager创建一个独立的服务器进程来管理共享对象,其他进程通过代理(Proxy)访问这些对象。Manager支持的数据类型包括:list、dict、Namespace、Queue、Lock、Semaphore、Condition、Event、Barrier、Value、Array等。

import multiprocessing as mp def worker(shared_dict: mp.managers.DictProxy, shared_list: mp.managers.ListProxy, pid: int): """操作共享字典和列表""" shared_dict[pid] = f"进程{pid}的数据" shared_list.append(pid * 10) print(f" 子进程 {pid}: dict keys={list(shared_dict.keys())}") if __name__ == "__main__": with mp.Manager() as manager: # 创建共享字典和列表 shared_data = manager.dict() shared_list = manager.list() procs = [] for i in range(5): p = mp.Process(target=worker, args=(shared_data, shared_list, i)) procs.append(p) for p in procs: p.start() for p in procs: p.join() print(f"共享字典: {dict(shared_data)}") print(f"共享列表: {list(shared_list)}")

5.1 Namespace:灵活的命名空间

Manager还提供了Namespace对象,允许动态添加属性来共享数据,语法更加自然。

import multiprocessing as mp def worker(ns: mp.managers.Namespace, pid: int): """使用Namespace共享数据""" if not hasattr(ns, 'count'): ns.count = 0 ns.count += 1 ns.last_pid = pid print(f" 子进程 {pid}: count={ns.count}") if __name__ == "__main__": with mp.Manager() as manager: ns = manager.Namespace() ns.count = 0 ns.items = [] procs = [mp.Process(target=worker, args=(ns, i)) for i in range(4)] for p in procs: p.start() for p in procs: p.join() print(f"最终 count: {ns.count}") print(f"last_pid: {ns.last_pid}")

Manager vs Value/Array:Manager使用更灵活,支持任意复杂对象,但性能开销大(数据需要序列化通过网络传输到Manager服务器进程)。Value/Array基于共享内存,速度快,但只支持基本类型。选择原则:用Manager的灵活性换取开发效率,用Value/Array的性能换取运行效率。

六、Pool:进程池

对于需要大量创建和销毁子进程的场景(如批量数据处理),每次都手动创建Process效率低下。Pool(进程池)维护一组工作进程,自动管理进程的创建、分配和回收,是实际项目中最常用的multiprocessing组件。

6.1 Pool的创建与基本使用方法

import multiprocessing as mp import time def square(x: int) -> int: """CPU密集型计算任务""" time.sleep(0.5) # 模拟耗时 return x * x if __name__ == "__main__": data = list(range(1, 11)) # 创建进程池,默认使用os.cpu_count()个进程 with mp.Pool(processes=4) as pool: # map:阻塞等待所有结果 results = pool.map(square, data) print(f"map结果: {results}") # map_async:非阻塞版本 async_result = pool.map_async(square, data) print("map_async 提交成功,继续执行其他工作...") results2 = async_result.get(timeout=10) # 等待结果 print(f"map_async结果: {results2}") # apply:单个任务 single = pool.apply(square, args=(5,)) print(f"apply结果: {single}") # starmap:支持多个参数 pool2 = mp.Pool(2) params = [(1,), (2,), (3,)] results3 = pool2.starmap(square, params) # 注意square只接受一个参数,这里演示用法 pool2.close() pool2.join()

6.2 imap与imap_unordered:惰性迭代

当数据量非常大时,map会将所有结果一次性加载到内存中。imap提供了惰性求值版本,可以边处理边获取结果,节省内存。imap_unordered则不考虑顺序,哪个任务先完成就先返回哪个结果。

import multiprocessing as mp import time import random def slow_square(x: int) -> tuple: """每个任务执行时间不同""" t = random.uniform(0.2, 1.0) time.sleep(t) return (x, x * x, t) if __name__ == "__main__": data = range(1, 11) with mp.Pool(4) as pool: # imap:按输入顺序产生结果(但内部并行执行) print("=== imap(保持输入顺序)===") for x, sq, t in pool.imap(slow_square, data): print(f" {x}^2 = {sq} (耗时{t:.2f}s)") print() # imap_unordered:按完成顺序产生结果 print("=== imap_unordered(按完成顺序)===") for x, sq, t in pool.imap_unordered(slow_square, data): print(f" {x}^2 = {sq} (耗时{t:.2f}s)")

6.3 回调函数

apply_async和map_async支持callback参数,在任务完成时自动调用回调函数,无需手动get结果。

import multiprocessing as mp import time def heavy_task(n: int) -> tuple: """模拟耗时计算任务""" time.sleep(n) return (n, n * n) def on_success(result: tuple): """成功回调:在主进程中执行""" n, sq = result print(f" 回调: {n}^2 = {sq}") def on_error(error: BaseException): """错误回调""" print(f" 出错: {error}") if __name__ == "__main__": with mp.Pool(3) as pool: tasks = [1, 2, 3, 0.5, 1.5] results = [] for n in tasks: r = pool.apply_async( heavy_task, args=(n,), callback=on_success, error_callback=on_error, ) results.append(r) # 等待所有任务完成 for r in results: r.wait() print("所有任务完成")

关键区别
map:同步阻塞,结果按输入顺序返回
map_async:异步非阻塞,通过get()获取结果
imap:惰性迭代,按输入顺序但边计算边返回
imap_unordered:惰性迭代,按完成顺序返回
apply:同步执行单个任务
apply_async:异步执行单个任务,支持回调
starmap:与map类似,但支持多参数解包

七、同步原语

当多个进程同时访问共享资源时,需要使用同步原语来避免竞态条件(Race Condition)。multiprocessing提供了与threading类似的同步工具。

7.1 Lock:互斥锁

Lock是最基本的同步原语,确保同一时间只有一个进程能够访问被保护的资源。

import multiprocessing as mp import time def deposit(balance: mp.Value, lock: mp.Lock, n: int): for _ in range(n): with lock: balance.value += 1 def withdraw(balance: mp.Value, lock: mp.Lock, n: int): for _ in range(n): with lock: balance.value -= 1 if __name__ == "__main__": balance = mp.Value('i', 0) lock = mp.Lock() N = 50000 p1 = mp.Process(target=deposit, args=(balance, lock, N)) p2 = mp.Process(target=withdraw, args=(balance, lock, N)) p1.start() p2.start() p1.join() p2.join() print(f"最终余额: {balance.value}") # 应为0 # 如果不加锁,结果通常不为0

7.2 Semaphore:信号量

Semaphore维护一个计数器,acquire()减少计数(为0时阻塞),release()增加计数。常用于限制同时访问某资源的进程数。

import multiprocessing as mp import time def access_resource(pid: int, sem: mp.Semaphore): """使用信号量限制并发访问数""" with sem: print(f" 进程{pid} 获得资源") time.sleep(1) print(f" 进程{pid} 释放资源") if __name__ == "__main__": sem = mp.Semaphore(2) # 最多允许2个进程同时访问 procs = [mp.Process(target=access_resource, args=(i, sem)) for i in range(6)] for p in procs: p.start() for p in procs: p.join()

7.3 Event:事件通知

Event用于进程间的事件通知。一个进程可以set()设置事件,其他进程通过wait()等待事件发生。

import multiprocessing as mp import time def waiter(event: mp.Event, pid: int): """等待事件触发""" print(f" 进程{pid} 等待事件...") event.wait() # 阻塞直到事件被设置 print(f" 进程{pid} 收到事件,继续执行") def setter(event: mp.Event): """延迟后触发事件""" print(" setter: 3秒后触发事件") time.sleep(3) event.set() print(" setter: 事件已触发") if __name__ == "__main__": event = mp.Event() waiters = [mp.Process(target=waiter, args=(event, i)) for i in range(3)] s = mp.Process(target=setter, args=(event,)) for w in waiters: w.start() s.start() for w in waiters: w.join() s.join()

7.4 Condition:条件变量

Condition比Event更灵活,允许在特定条件下通知其他进程。需要与Lock配合使用。

import multiprocessing as mp import time def consumer(cond: mp.Condition, shared_list: list, pid: int): """消费者:等待条件满足""" with cond: while len(shared_list) == 0: print(f" 消费者{pid} 等待数据...") cond.wait() # 释放锁并等待通知 item = shared_list.pop(0) print(f" 消费者{pid} 消费: {item}") def producer(cond: mp.Condition, shared_list: list): """生产者:添加数据并通知""" with cond: for i in range(3): shared_list.append(i) print(f" 生产者添加: {i}") cond.notify_all() # 通知所有等待的消费者 time.sleep(1) if __name__ == "__main__": # 注意:Condition的共享列表需要用Manager with mp.Manager() as m: shared_list = m.list() cond = mp.Condition() cons = [mp.Process(target=consumer, args=(cond, shared_list, i)) for i in range(2)] prod = mp.Process(target=producer, args=(cond, shared_list)) for c in cons: c.start() prod.start() for c in cons: c.join() prod.join()

7.5 同步原语对比

原语用途适用场景
Lock互斥访问,一次一个进程保护共享资源(计数器、变量等)
RLock可重入互斥锁同一进程内递归获取锁
Semaphore允许多个进程同时访问连接池、资源池限流
Event一对多事件通知启动信号、关闭信号
Condition复杂条件等待/通知生产者-消费者高级模式
Barrier多进程同步到达某个点分阶段并行计算

八、进程启动方式

multiprocessing支持三种进程启动方式,通过mp.set_start_method()设置。不同平台默认方式不同:Windows默认为spawn,Linux默认为fork,macOS默认为spawn(Python 3.8+)。

8.1 spawn(生成)

启动一个新Python解释器进程,只继承必要的资源。父进程中的文件描述符、线程等不会被继承。这种方式最安全,但启动速度慢。

import multiprocessing as mp def worker(): print(f"spawn方式: PID={mp.current_process().pid}") if __name__ == "__main__": mp.set_start_method('spawn', force=True) p = mp.Process(target=worker) p.start() p.join()

8.2 fork(分叉)

使用os.fork()创建子进程,子进程复制父进程的所有内存。启动速度快,但不安全(可能死锁),尤其在多线程环境中。Linux平台默认方式。

import multiprocessing as mp shared = {"key": "original"} def worker(): print(f"fork方式: shared = {shared}") if __name__ == "__main__": # Windows不支持fork mp.set_start_method('fork', force=True) p = mp.Process(target=worker) p.start() p.join()

8.3 forkserver(分叉服务器)

启动一个单线程的fork服务器进程。当需要创建新进程时,forkserver进程fork自身来创建子进程。这种方式既安全又相对高效。

import multiprocessing as mp def worker(): print(f"forkserver方式: PID={mp.current_process().pid}") if __name__ == "__main__": mp.set_start_method('forkserver', force=True) p = mp.Process(target=worker) p.start() p.join()

8.4 三种方式对比

方式启动速度安全性跨平台默认平台
spawnWindows, macOS 3.8+
fork低(易死锁)否(仅Unix)Linux
forkserver较快较高否(仅Unix)(可选)

推荐实践:在跨平台代码中,不主动指定启动方式,让系统使用默认方式。如果必须在Unix上使用fork,确保在创建新线程之前调用set_start_method。Python 3.14计划在Unix上将默认启动方式改为spawn(PEP 719提案)。

九、shared_memory:新一代共享内存

Python 3.8引入了multiprocessing.shared_memory模块,提供了更底层、更灵活的共享内存机制。与Value/Array不同,SharedMemory直接操作原始字节,可以在此基础上构建任意数据结构。

from multiprocessing import shared_memory import multiprocessing as mp import numpy as np # 假设已安装 def consumer(shm_name: str, shape: tuple, dtype): """从共享内存读取numpy数组""" shm = shared_memory.SharedMemory(name=shm_name) arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf) print(f" 消费者读取: {arr}") shm.close() if __name__ == "__main__": # 创建numpy数组并写入共享内存 data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64) shm = shared_memory.SharedMemory( create=True, size=data.nbytes, ) arr = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf) arr[:] = data[:] p = mp.Process(target=consumer, args=(shm.name, data.shape, data.dtype)) p.start() p.join() shm.close() shm.unlink() # 释放共享内存 print("共享内存已释放")

9.1 SharedMemory基础API

shared_memory vs Value/Array:shared_memory更底层、更灵活,适合与numpy等库配合处理大型数组。Value/Array提供了更高级的API(自动加锁、类型转换),用起来更简单但不够灵活。对于大数据集(如图像、矩阵运算),shared_memory是更好的选择。

十、多进程 vs 多线程性能对比

选择多进程还是多线程,取决于任务的类型。以下通过实际性能对比来展示两者的适用场景。

10.1 CPU密集型任务

CPU密集型任务(如计算、加密、图像处理)受计算能力限制,多进程可以充分利用多核CPU。

import multiprocessing as mp import threading import time def cpu_bound(n: int) -> int: """CPU密集型:计算斐波那契数列""" if n <= 1: return n a, b = 0, 1 for _ in range(n): a, b = b, a + b return a def run_multiprocessing(n: int, workers: int): """多进程版本""" with mp.Pool(workers) as pool: results = pool.map(cpu_bound, [n] * workers) return results def run_multithreading(n: int, workers: int): """多线程版本(受GIL限制)""" results = [None] * workers def worker_fn(idx): results[idx] = cpu_bound(n) threads = [threading.Thread(target=worker_fn, args=(i,)) for i in range(workers)] for t in threads: t.start() for t in threads: t.join() return results if __name__ == "__main__": N = 500000 # 斐波那契数索引 WORKERS = 8 # 多进程 t0 = time.time() run_multiprocessing(N, WORKERS) t1 = time.time() print(f"多进程: {t1 - t0:.3f}s") # 多线程 t0 = time.time() run_multithreading(N, WORKERS) t1 = time.time() print(f"多线程: {t1 - t0:.3f}s")

10.2 IO密集型任务

IO密集型任务(如网络请求、文件读写)受IO等待时间限制,多线程由于没有进程切换开销反而更具优势。

import multiprocessing as mp import threading import time def io_bound(url: str) -> str: """模拟IO密集型:网络请求""" # 模拟网络延迟 time.sleep(1) return f"Fetched {url}" if __name__ == "__main__": urls = [f"http://example.com/page/{i}" for i in range(20)] # 多线程 t0 = time.time() threads = [threading.Thread(target=io_bound, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join() print(f"多线程模拟: {time.time() - t0:.3f}s") # 多进程 t0 = time.time() with mp.Pool(8) as pool: pool.map(io_bound, urls) print(f"多进程模拟: {time.time() - t0:.3f}s")

10.3 选择指南

任务类型推荐方案原因
CPU密集型(计算、加密、压缩)多进程利用多核CPU,绕过GIL
IO密集型(网络、磁盘、数据库)多线程/asyncio进程/线程切换开销小,内存共享方便
混合型多进程+多线程组合计算用进程、IO用线程
需要大量共享数据多线程进程间共享数据成本高
需要稳定性(隔离性)多进程进程间相互隔离,一个崩溃不影响其他

"选择并发模型的核心原则:让计算密集的部分并行化(多进程),让IO等待的部分异步化(asyncio或多线程)。"

十一、高级主题与实践技巧

11.1 进程池的上下文管理

使用with语句管理Pool的生命周期可以确保资源正确释放:

import multiprocessing as mp with mp.Pool(4) as pool: results = pool.map(abs, [-1, -2, -3, -4]) print(results) # 离开with块时,pool会自动调用close()和join()

11.2 避免常见的陷阱

11.3 在Queue和JoinableQueue之间选择

JoinableQueue是Queue的子类,增加了task_done()join()方法,允许等待队列中的所有项目被处理完毕。

import multiprocessing as mp def worker(q: mp.JoinableQueue, name: str): while True: item = q.get() if item is None: q.task_done() break print(f" [{name}] 处理: {item}") q.task_done() if __name__ == "__main__": q = mp.JoinableQueue() for i in range(10): q.put(f"Task-{i}") procs = [mp.Process(target=worker, args=(q, f"W{i}")) for i in range(3)] for p in procs: p.start() # 等待所有任务处理完成 q.join() # 发送停止信号 for _ in procs: q.put(None) for p in procs: p.join() print("所有任务完成")

11.4 并发控制与限流

使用Semaphore控制并发数量,或使用Pool的maxtasksperchild参数定期重启工作进程避免内存泄漏。

import multiprocessing as mp # maxtasksperchild:每个进程最多处理多少个任务后自动重启 with mp.Pool(4, maxtasksperchild=10) as pool: results = pool.map(str.upper, ["a"] * 100) print(f"完成 {len(results)} 个任务")

十二、核心要点总结

1. 突破GIL限制:多进程是Python实现真正并行的唯一方式(CPU密集型场景),通过创建独立进程绕过GIL。

2. IPC通信机制:进程间通过Queue、Pipe、Value/Array、Manager、SharedMemory等方式通信,选择依据是数据复杂度与性能要求的权衡。

3. 进程池核心用法:Pool.map/map_async用于批量任务,imap用于大数据的惰性处理,apply_async支持回调函数。

4. 同步至关重要:共享内存访问必须加锁(Lock/Semaphore),否则数据竞争会导致不可预期的结果。

5. 启动方式选择:spawn最安全但最慢,fork最快但有风险,forkserver是折中方案。跨平台代码使用默认方式。

6. shared_memory高级用法:Python 3.8+的shared_memory配合numpy等库可高效处理大型数组数据。

7. 场景匹配:CPU密集用多进程,IO密集用多线程/asyncio,混合场景组合使用。

十三、进一步思考

multiprocessing模块是Python并行计算的基石,但在实际项目开发中,选择正确的并发模型需要考虑更广泛的维度:

分布式扩展:当单机多核无法满足性能需求时,可考虑使用分布式框架(如Celery、Dask、Ray)将任务扩展到多台机器执行。

异步编程融合:在Python 3.9+中,可以结合asyncio和multiprocessing,使用asyncio.to_threadconcurrent.futures.ProcessPoolExecutor在异步事件循环中执行CPU密集型任务。

性能监控:使用multiprocessing.get_logger()开启日志调试,或使用profile工具(如cProfile、py-spy)分析多进程应用的性能瓶颈。

实践建议:从简单的process + queue模式开始,逐步过渡到进程池。避免过早优化,先用profiling确认瓶颈在CPU还是IO,再选择合适的并发方案。在微服务架构中,通常推荐用独立的服务进程代替multiprocessing,以获得更好的隔离性和可伸缩性。