import multiprocessing as mp
import time
class MyProcess(mp.Process):
def __init__(self, name: str, count: int):
super().__init__()
self.name = name
self.count = count
def run(self):
"""进程启动时自动执行的方法"""
for i in range(self.count):
print(f"[{self.name}] step {i + 1}")
time.sleep(0.5)
if __name__ == "__main__":
procs = [MyProcess(f"Worker-{i}", 3) for i in range(3)]
for p in procs:
p.start()
for p in procs:
p.join()
print("所有进程完成")
import multiprocessing as mp
def write_data(arr: mp.Array, index: int, value: int):
"""向共享数组写入数据"""
with arr.get_lock():
arr[index] = value
print(f" 写入: arr[{index}] = {value}")
def read_data(arr: mp.Array, index: int):
"""读取共享数组数据"""
# 不加锁也可以读,但可能读到不一致的数据
val = arr[index]
print(f" 读取: arr[{index}] = {val}")
return val
if __name__ == "__main__":
# 'd' = ctypes.c_double, 长度为10
shared_arr = mp.Array('d', [0.0] * 10)
processes = []
for i in range(10):
p = mp.Process(target=write_data, args=(shared_arr, i, i * 3.14))
processes.append(p)
for p in processes:
p.start()
for p in processes:
p.join()
print(f"最终数组: {list(shared_arr)}")
import multiprocessing as mp
def worker(shared_dict: mp.managers.DictProxy,
shared_list: mp.managers.ListProxy,
pid: int):
"""操作共享字典和列表"""
shared_dict[pid] = f"进程{pid}的数据"
shared_list.append(pid * 10)
print(f" 子进程 {pid}: dict keys={list(shared_dict.keys())}")
if __name__ == "__main__":
with mp.Manager() as manager:
# 创建共享字典和列表
shared_data = manager.dict()
shared_list = manager.list()
procs = []
for i in range(5):
p = mp.Process(target=worker, args=(shared_data, shared_list, i))
procs.append(p)
for p in procs:
p.start()
for p in procs:
p.join()
print(f"共享字典: {dict(shared_data)}")
print(f"共享列表: {list(shared_list)}")
5.1 Namespace:灵活的命名空间
Manager还提供了Namespace对象,允许动态添加属性来共享数据,语法更加自然。
import multiprocessing as mp
def worker(ns: mp.managers.Namespace, pid: int):
"""使用Namespace共享数据"""
if not hasattr(ns, 'count'):
ns.count = 0
ns.count += 1
ns.last_pid = pid
print(f" 子进程 {pid}: count={ns.count}")
if __name__ == "__main__":
with mp.Manager() as manager:
ns = manager.Namespace()
ns.count = 0
ns.items = []
procs = [mp.Process(target=worker, args=(ns, i)) for i in range(4)]
for p in procs:
p.start()
for p in procs:
p.join()
print(f"最终 count: {ns.count}")
print(f"last_pid: {ns.last_pid}")
Manager vs Value/Array:Manager使用更灵活,支持任意复杂对象,但性能开销大(数据需要序列化通过网络传输到Manager服务器进程)。Value/Array基于共享内存,速度快,但只支持基本类型。选择原则:用Manager的灵活性换取开发效率,用Value/Array的性能换取运行效率。
import multiprocessing as mp
import time
import random
def slow_square(x: int) -> tuple:
"""每个任务执行时间不同"""
t = random.uniform(0.2, 1.0)
time.sleep(t)
return (x, x * x, t)
if __name__ == "__main__":
data = range(1, 11)
with mp.Pool(4) as pool:
# imap:按输入顺序产生结果(但内部并行执行)
print("=== imap(保持输入顺序)===")
for x, sq, t in pool.imap(slow_square, data):
print(f" {x}^2 = {sq} (耗时{t:.2f}s)")
print()
# imap_unordered:按完成顺序产生结果
print("=== imap_unordered(按完成顺序)===")
for x, sq, t in pool.imap_unordered(slow_square, data):
print(f" {x}^2 = {sq} (耗时{t:.2f}s)")
import multiprocessing as mp
import time
def access_resource(pid: int, sem: mp.Semaphore):
"""使用信号量限制并发访问数"""
with sem:
print(f" 进程{pid} 获得资源")
time.sleep(1)
print(f" 进程{pid} 释放资源")
if __name__ == "__main__":
sem = mp.Semaphore(2) # 最多允许2个进程同时访问
procs = [mp.Process(target=access_resource, args=(i, sem))
for i in range(6)]
for p in procs:
p.start()
for p in procs:
p.join()
import multiprocessing as mp
import time
def waiter(event: mp.Event, pid: int):
"""等待事件触发"""
print(f" 进程{pid} 等待事件...")
event.wait() # 阻塞直到事件被设置
print(f" 进程{pid} 收到事件,继续执行")
def setter(event: mp.Event):
"""延迟后触发事件"""
print(" setter: 3秒后触发事件")
time.sleep(3)
event.set()
print(" setter: 事件已触发")
if __name__ == "__main__":
event = mp.Event()
waiters = [mp.Process(target=waiter, args=(event, i)) for i in range(3)]
s = mp.Process(target=setter, args=(event,))
for w in waiters:
w.start()
s.start()
for w in waiters:
w.join()
s.join()
7.4 Condition:条件变量
Condition比Event更灵活,允许在特定条件下通知其他进程。需要与Lock配合使用。
import multiprocessing as mp
import time
def consumer(cond: mp.Condition, shared_list: list, pid: int):
"""消费者:等待条件满足"""
with cond:
while len(shared_list) == 0:
print(f" 消费者{pid} 等待数据...")
cond.wait() # 释放锁并等待通知
item = shared_list.pop(0)
print(f" 消费者{pid} 消费: {item}")
def producer(cond: mp.Condition, shared_list: list):
"""生产者:添加数据并通知"""
with cond:
for i in range(3):
shared_list.append(i)
print(f" 生产者添加: {i}")
cond.notify_all() # 通知所有等待的消费者
time.sleep(1)
if __name__ == "__main__":
# 注意:Condition的共享列表需要用Manager
with mp.Manager() as m:
shared_list = m.list()
cond = mp.Condition()
cons = [mp.Process(target=consumer, args=(cond, shared_list, i))
for i in range(2)]
prod = mp.Process(target=producer, args=(cond, shared_list))
for c in cons:
c.start()
prod.start()
for c in cons:
c.join()
prod.join()
shared_memory vs Value/Array:shared_memory更底层、更灵活,适合与numpy等库配合处理大型数组。Value/Array提供了更高级的API(自动加锁、类型转换),用起来更简单但不够灵活。对于大数据集(如图像、矩阵运算),shared_memory是更好的选择。
十、多进程 vs 多线程性能对比
选择多进程还是多线程,取决于任务的类型。以下通过实际性能对比来展示两者的适用场景。
10.1 CPU密集型任务
CPU密集型任务(如计算、加密、图像处理)受计算能力限制,多进程可以充分利用多核CPU。
import multiprocessing as mp
import threading
import time
def cpu_bound(n: int) -> int:
"""CPU密集型:计算斐波那契数列"""
if n <= 1:
return n
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
def run_multiprocessing(n: int, workers: int):
"""多进程版本"""
with mp.Pool(workers) as pool:
results = pool.map(cpu_bound, [n] * workers)
return results
def run_multithreading(n: int, workers: int):
"""多线程版本(受GIL限制)"""
results = [None] * workers
def worker_fn(idx):
results[idx] = cpu_bound(n)
threads = [threading.Thread(target=worker_fn, args=(i,))
for i in range(workers)]
for t in threads:
t.start()
for t in threads:
t.join()
return results
if __name__ == "__main__":
N = 500000 # 斐波那契数索引
WORKERS = 8
# 多进程
t0 = time.time()
run_multiprocessing(N, WORKERS)
t1 = time.time()
print(f"多进程: {t1 - t0:.3f}s")
# 多线程
t0 = time.time()
run_multithreading(N, WORKERS)
t1 = time.time()
print(f"多线程: {t1 - t0:.3f}s")
10.2 IO密集型任务
IO密集型任务(如网络请求、文件读写)受IO等待时间限制,多线程由于没有进程切换开销反而更具优势。
import multiprocessing as mp
import threading
import time
def io_bound(url: str) -> str:
"""模拟IO密集型:网络请求"""
# 模拟网络延迟
time.sleep(1)
return f"Fetched {url}"
if __name__ == "__main__":
urls = [f"http://example.com/page/{i}" for i in range(20)]
# 多线程
t0 = time.time()
threads = [threading.Thread(target=io_bound, args=(url,))
for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"多线程模拟: {time.time() - t0:.3f}s")
# 多进程
t0 = time.time()
with mp.Pool(8) as pool:
pool.map(io_bound, urls)
print(f"多进程模拟: {time.time() - t0:.3f}s")
import multiprocessing as mp
def worker(q: mp.JoinableQueue, name: str):
while True:
item = q.get()
if item is None:
q.task_done()
break
print(f" [{name}] 处理: {item}")
q.task_done()
if __name__ == "__main__":
q = mp.JoinableQueue()
for i in range(10):
q.put(f"Task-{i}")
procs = [mp.Process(target=worker, args=(q, f"W{i}")) for i in range(3)]
for p in procs:
p.start()
# 等待所有任务处理完成
q.join()
# 发送停止信号
for _ in procs:
q.put(None)
for p in procs:
p.join()
print("所有任务完成")