← 返回Python进阶编程目录
← 返回学习笔记首页
专题: Python进阶编程系统学习
关键词: Python, Lock, RLock, Semaphore, Event, Condition, Barrier, 同步原语, 线程同步, 并发编程
一、概述
在多线程和多进程编程中,多个执行单元并发访问共享资源时会产生竞态条件(Race Condition) ,导致数据不一致、程序崩溃等严重问题。同步原语(Synchronization Primitives) 是操作系统和编程语言提供的底层工具,用于协调线程/进程之间的执行顺序,保证共享资源的互斥访问和线程间的正确通信。
Python的threading模块提供了丰富且易用的同步原语,包括Lock、RLock、Semaphore、Event、Condition、Barrier和Timer。这些同步原语既可用于threading模块的线程同步,也可通过multiprocessing模块的对应版本用于进程同步。理解每种原语的语义、适用场景和潜在陷阱,是编写正确、高效并发程序的基础。
核心要点: 同步原语的核心目标有三个——互斥 (同一时间只有一个线程访问共享资源)、协调 (线程之间按照特定顺序执行)、通信 (线程之间传递状态信息)。Python threading模块为这三种目标分别提供了对应的原语实现。
二、Lock 互斥锁——最基础的同步工具
threading.Lock是Python中最基础的同步原语,只有锁定 和未锁定 两种状态。它的设计极其简单:一个线程调用acquire()获得锁,其他线程必须等待直到持有锁的线程调用release()释放锁。Lock保证同一时刻最多只有一个线程 能进入临界区(Critical Section),从根本上消除竞态条件。
基本用法
import threading
# 创建一个锁对象
lock = threading.Lock()
# 方式一:手动 acquire/release
lock.acquire()
try:
# 临界区代码——每次只有一个线程能执行到这里
shared_counter += 1
finally:
lock.release()
# 方式二:使用 with 语句(推荐)
with lock:
# 上下文管理器自动 acquire 和 release
shared_counter += 1
完整示例:计数器安全累加
import threading
counter = 0
lock = threading.Lock()
results = []
def safe_increment(n):
global counter
for _ in range(n):
with lock:
counter += 1
threads = []
for i in range(5):
t = threading.Thread(target=safe_increment, args=(100000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终结果: {counter}") # 正确输出 500000
注意事项: Lock是不可重入的。如果一个线程已经持有了锁,再次调用acquire()会导致死锁(Deadlock) ——线程被自己阻塞住。解决此问题请使用下面介绍的RLock。
深入理解 Lock 的内部机制
Lock底层通过操作系统的互斥量(Mutex) 实现。当一个线程调用acquire(blocking=True)时,如果锁已被其他线程持有,该线程会进入阻塞等待 状态,操作系统会将其挂起,不会继续消耗CPU时间片。被阻塞的线程会加入锁的等待队列 ,当持有锁的线程释放锁时,等待队列中的第一个线程会被唤醒并成功获取锁。这种机制保证了CPU资源的合理利用 ——等待中的线程不会进行忙等待(busy waiting),避免了无意义的CPU轮询。
acquire(blocking=False)是非阻塞模式。如果锁已被持有,立即返回False而不会阻塞线程。这在需要尝试获取锁而不想阻塞的场景下非常有用,比如实现"尝试执行"的逻辑。
lock = threading.Lock()
# 非阻塞尝试获取锁
if lock.acquire(blocking=False):
try:
print("成功获取锁,执行临界区代码")
finally:
lock.release()
else:
print("锁已被其他线程持有,跳过本次操作")
# 可以执行其他逻辑或稍后重试
三、RLock 可重入锁——同一线程可多次获取
threading.RLock(Reentrant Lock,可重入锁)是Lock的增强版本,允许同一个线程多次获取锁而不会死锁 。RLock内部维护了两个关键信息:持有线程标识(Owner) 和重入计数(Count) 。当线程第一次获取锁时,计数为1;如果同一线程再次获取,计数递增;每次release()递减计数,直到计数归零时锁才真正被释放。
import threading
rlock = threading.RLock()
def outer_function():
with rlock:
print("外层函数获取锁")
inner_function() # 同一线程再次获取同一锁,不会死锁
def inner_function():
with rlock:
print("内层函数也获取锁——RLock允许这样做")
# 执行:RLock允许同一线程重复进入
t = threading.Thread(target=outer_function)
t.start()
t.join()
RLock 在递归调用中的典型应用
import threading
class SafeCounter:
def __init__(self):
self.count = 0
self.lock = threading.RLock() # 必须用 RLock
def increment(self):
with self.lock:
self.count += 1
def increment_multiple(self, times):
with self.lock:
# 已经持有锁,但仍然可以递归调用
for _ in range(times):
self.increment() # 再次获取同一个锁
def get_count(self):
with self.lock:
return self.count
counter = SafeCounter()
counter.increment_multiple(10)
print(f"安全计数结果: {counter.get_count()}") # 10
Lock vs RLock 选择指南: 如果你不确定使用哪种锁,优先使用Lock——它更轻量且在非递归场景下更安全。只有当你明确知道需要在同一个线程内多次获取同一个锁 (如递归调用、辅助方法调用)时,才使用RLock。滥用RLock会掩盖潜在的设计问题。
特性 Lock RLock
同一线程重复 acquire 死锁 允许(计数递增)
性能 更轻量 略重(需维护计数)
适用场景 简单互斥 递归调用、内嵌同步
release 次数 一次即可 必须与 acquire 次数匹配
四、Semaphore 信号量——资源池限流的利器
threading.Semaphore内部维护一个计数器 ,用于控制同时访问特定资源的线程数量。它可以看作是"允许多个线程同时访问的锁"。每次调用acquire()时计数器减一(计数为0时阻塞),每次release()时计数器加一。信号量非常适合连接池限制 、并发数量控制 等场景。
基本使用:限制并发数据库连接数
import threading
import time
# 最多允许3个线程同时访问数据库连接池
connection_pool = threading.Semaphore(3)
def query_database(query_id):
with connection_pool:
print(f"[{time.strftime('%H:%M:%S')}] 查询 {query_id} 开始——占用连接")
time.sleep(2) # 模拟数据库查询耗时
print(f"[{time.strftime('%H:%M:%S')}] 查询 {query_id} 结束——释放连接")
threads = []
for i in range(8):
t = threading.Thread(target=query_database, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 任何时候都最多有3个查询同时进行
BoundedSemaphore——更安全的信号量
threading.BoundedSemaphore是Semaphore的子类,增加了上限保护 :如果release()的次数超过了初始计数(即试图将计数恢复到初始值以上),会抛出ValueError。这能有效防止程序错误导致信号量计数失控 ,是一种防御性编程实践。
import threading
# BoundedSemaphore:计数不会超过初始值
s = threading.BoundedSemaphore(2)
s.acquire() # 计数: 2 -> 1
s.acquire() # 计数: 1 -> 0
s.release() # 计数: 0 -> 1
s.release() # 计数: 1 -> 2
# s.release() # ValueError! 计数不能超过初始值2
# 普通 Semaphore 没有此限制
us = threading.Semaphore(2)
us.release() # 没问题,计数变成3——这可能隐藏 bug
最佳实践: 除非有特殊需求,否则始终使用BoundedSemaphore而非Semaphore,它能帮你捕获因release()调用次数不匹配导致的bug。
五、Event 事件——线程间状态通知
threading.Event是最简单的线程间通信机制,管理一个内部布尔标志位(Flag) 。Event类似于一个"信号旗"——一个线程设置标志位,其他线程等待该标志位被设置。它不涉及数据传递,仅用于线程间的状态通知 。
核心方法
set():将内部标志设为True,唤醒所有等待的线程
clear():将内部标志设为False,后续线程调用wait()将阻塞
wait(timeout=None):阻塞直到标志为True或超时返回
is_set():返回当前内部标志的值
典型场景:等待服务启动
import threading
import time
import random
def worker(worker_id, ready_event):
"""工作线程等待服务就绪信号"""
print(f" 工人 {worker_id} 等待服务就绪...")
ready_event.wait() # 阻塞等待 event 被 set
print(f" 工人 {worker_id} 开始工作")
def service_starter(ready_event):
"""服务启动线程,完成后发送就绪信号"""
print("服务启动中...")
time.sleep(random.uniform(1, 3)) # 模拟启动耗时
print("服务已就绪!通知所有工人开始工作")
ready_event.set() # 一次性通知所有等待线程
ready_event = threading.Event()
# 启动多个工作线程
workers = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, ready_event))
workers.append(t)
t.start()
# 启动服务
starter = threading.Thread(target=service_starter, args=(ready_event,))
starter.start()
for t in workers:
t.join()
starter.join()
# 所有工人同时得到通知,几乎同时开始工作
可复用的就绪—开始模式
Event可以重复使用 :调用clear()将标志重置为False,然后再次set()触发下一轮通知。这种模式适合多轮次的任务协调。
event = threading.Event()
# 第一轮:等待就绪
event.wait()
# ... 执行第一轮任务 ...
# 重置并等待第二轮
event.clear()
# 等待第二轮就绪信号
event.wait()
Event vs Condition: Event适用于"一次性通知"或"广播通知"场景,语义简单清晰。如果要实现更复杂的等待条件(比如等待队列非空),Condition是更合适的选择。
六、Condition 条件变量——最灵活的同步原语
threading.Condition是Python中最强大的同步原语,它结合了锁(Lock/RLock) 和等待/通知机制 。Condition的核心思想是:线程在某个条件不满足时释放锁并等待 ,其他线程在条件变化后通知 等待线程重新获取锁并检查条件。Condition是实现生产者-消费者模式 的标准工具。
核心方法
wait(timeout=None):释放内部锁,阻塞当前线程,直到被notify()唤醒或超时
notify(n=1):唤醒一个等待该条件的线程(不释放锁)
notify_all():唤醒所有等待该条件的线程
必须在持有内部锁的情况下调用wait()、notify()和notify_all()
生产者-消费者经典实现
import threading
import time
import random
class BoundedBuffer:
"""有界缓冲区——生产者消费者模型"""
def __init__(self, capacity=5):
self.buffer = []
self.capacity = capacity
self.cond = threading.Condition()
def produce(self, item):
with self.cond:
# 当缓冲区已满时等待
while len(self.buffer) >= self.capacity:
print(f" 缓冲区满,生产者等待...")
self.cond.wait()
self.buffer.append(item)
print(f"生产 [{item}] 缓冲区大小: {len(self.buffer)}")
# 通知可能正在等待的消费者
self.cond.notify()
def consume(self):
with self.cond:
# 当缓冲区为空时等待
while len(self.buffer) == 0:
print(f" 缓冲区空,消费者等待...")
self.cond.wait()
item = self.buffer.pop(0)
print(f"消费 [{item}] 缓冲区大小: {len(self.buffer)}")
# 通知可能正在等待的生产者
self.cond.notify()
return item
def producer(buffer, items):
for item in items:
buffer.produce(item)
time.sleep(random.uniform(0.3, 0.8))
def consumer(buffer, count):
for _ in range(count):
buffer.consume()
time.sleep(random.uniform(0.5, 1.0))
buffer = BoundedBuffer(capacity=3)
items = [f"商品-{i}" for i in range(6)]
prod = threading.Thread(target=producer, args=(buffer, items))
cons = threading.Thread(target=consumer, args=(buffer, 6))
prod.start()
cons.start()
prod.join()
cons.join()
print("所有生产消费任务完成")
重要注意事项: 使用wait()时一定要使用while循环来检查条件,而不是if。这是因为notify()可能唤醒多个线程(虚假唤醒),或者在被唤醒到重新获取锁之间条件再次发生变化。while循环确保了条件成立时才能继续执行,这是防御性编程的核心实践 。
notify 与 notify_all 的选择
# notify:只唤醒一个等待线程(效率高,适用于同类等待)
with cond:
cond.notify() # 唤醒一个线程
# notify_all:唤醒所有等待线程(适用于不同类别的等待)
with cond:
cond.notify_all() # 唤醒所有线程
经验法则: 如果所有等待线程都在等待同一个条件 ,使用notify()即可。如果等待线程在等待不同条件 (例如有些等待"缓冲区非空",有些等待"缓冲区未满"),使用notify_all()避免产生"信号丢失"问题。
七、Barrier 屏障——等待所有线程就绪
threading.Barrier(屏障/栅栏)用于多线程同步 :设定一个参与线程数量n,当少于n个线程调用wait()时,所有调用线程都会阻塞;当第n个线程也调用wait()后,所有线程同时被释放,继续执行后续代码。Barrier天然适用于分阶段并行计算 (如并行排序中的归并阶段)、多线程初始化同步 等场景。
import threading
import time
import random
def worker(worker_id, barrier):
print(f"工作线程 {worker_id} 执行第一阶段...")
time.sleep(random.uniform(0.5, 1.5))
print(f"工作线程 {worker_id} 第一阶段完成,等待其他线程...")
# 等待所有线程完成第一阶段
barrier.wait()
print(f"工作线程 {worker_id} 所有线程就绪,开始第二阶段!")
n_workers = 4
barrier = threading.Barrier(n_workers)
threads = []
for i in range(n_workers):
t = threading.Thread(target=worker, args=(i, barrier))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有阶段已完成!")
Barrier 的高级特性:回调函数与abort
def on_barrier_released():
"""所有线程到达屏障时执行的回调"""
print(f"=== 屏障释放!所有 {threading.Barrier.parties} 个线程已就绪 ===")
def worker_with_callback(worker_id, barrier):
time.sleep(random.uniform(0.5, 2.0))
print(f" 线程 {worker_id} 到达屏障")
try:
barrier.wait()
print(f" 线程 {worker_id} 通过屏障")
except threading.BrokenBarrierError:
print(f" 线程 {worker_id}:屏障已损坏(超时或重置)")
# parties:需要等待的线程数
# action:屏障释放时执行的回调函数
# timeout:单个 wait() 的最大等待时间
barrier = threading.Barrier(3, action=on_barrier_released)
for i in range(3):
t = threading.Thread(target=worker_with_callback, args=(i, barrier))
t.start()
time.sleep(0.3)
# Barrier 其他重要方法:
# barrier.reset() # 重置屏障到初始状态
# barrier.abort() # 将屏障置为损坏状态,所有等待线程收到 BrokenBarrierError
# barrier.parties # 屏障需要的线程数量
# barrier.n_waiting # 当前正在等待的线程数
# barrier.broken # 屏障是否处于损坏状态
适用场景
并行计算的多阶段同步
服务启动时等待所有模块初始化完成
游戏引擎中等待所有渲染线程就绪
测试框架中多个线程同时开始执行
注意事项
线程数必须精确匹配 parties
超时配置可防止无限等待
单个线程异常会破坏屏障
重置后原等待的线程会收到异常
八、Timer 定时器——延迟执行任务
threading.Timer是Thread的子类,用于在指定时间延迟后执行一个函数 。Timer在执行一次后自动结束,但可以通过cancel()方法在触发前取消。Timer相当于一个异步定时器 ,不会阻塞主线程的执行。
import threading
import time
def timeout_handler():
print(f"[{time.strftime('%H:%M:%S')}] 定时器触发!执行超时处理")
# 3秒后执行 timeout_handler
timer = threading.Timer(3.0, timeout_handler)
timer.start()
print(f"[{time.strftime('%H:%M:%S')}] 定时器已启动,主线程继续执行...")
time.sleep(2)
print(f"[{time.strftime('%H:%M:%S')}] 主线程仍在工作...")
取消定时器
def delayed_task(task_id):
print(f"任务 {task_id} 执行")
# 创建多个定时器
timers = []
for i in range(5):
t = threading.Timer(i * 2.0, delayed_task, args=(i,))
timers.append(t)
t.start()
# 取消第3个定时器(i=2,将在4秒后执行)
timers[2].cancel()
print("定时器 2 已被取消")
# 输出:任务 0、1、3、4 会执行,任务 2 不会执行
注意: Timer的延迟精度受操作系统调度影响,不适用于高精度定时场景。如果需要精确定时,考虑使用time.perf_counter()配合循环,或使用sched模块。
九、同步原语综合对比
原语 核心机制 线程数 主要用途 注意事项
Lock 互斥锁(两种状态) 限制为1 临界区保护 不可重入,同一线程第二次acquire死锁
RLock 可重入锁(计数) 限制为1 递归同步场景 比Lock略重,需配平acquire/release
Semaphore 计数器(允许多个) 限制为n 资源池限流 推荐使用BoundedSemaphore
Event 布尔标志(set/clear) 不限 状态通知/广播 clear后需重新wait
Condition 等待/通知(+锁) 不限 生产者-消费者 wait必须在while循环中使用
Barrier 计数屏障(等待n个) 固定为n 多线程分阶段同步 超时和abort会破坏屏障
Timer 延迟执行(一次性) — 定时/超时任务 精度有限,可取消
十、生产者-消费者模式完整实战
生产者-消费者模式是并发编程中最经典的协作模式,涉及多类同步原语的协同使用 。下面实现一个带超时控制的、支持多生产者和多消费者的完整示例。
import threading
import time
import random
from queue import Queue
# 使用 Condition 实现的生产者-消费者
class ThreadSafeQueue:
def __init__(self, maxsize=10):
self.queue = []
self.maxsize = maxsize
self.cond = threading.Condition()
self.running = True
def put(self, item):
with self.cond:
while len(self.queue) >= self.maxsize and self.running:
self.cond.wait(timeout=1.0)
if not self.running:
return False
self.queue.append(item)
self.cond.notify()
return True
def get(self):
with self.cond:
while len(self.queue) == 0 and self.running:
self.cond.wait(timeout=1.0)
if not self.running and len(self.queue) == 0:
return None
item = self.queue.pop(0)
self.cond.notify()
return item
def stop(self):
with self.cond:
self.running = False
self.cond.notify_all()
def producer_task(q, producer_id, items):
for item in items:
success = q.put(f"P{producer_id}-{item}")
if not success:
print(f"生产者 {producer_id}:队列已关闭,停止生产")
break
print(f"生产: P{producer_id}-{item}")
time.sleep(random.uniform(0.1, 0.3))
def consumer_task(q, consumer_id, count):
consumed = 0
while consumed < count:
item = q.get()
if item is None:
print(f"消费者 {consumer_id}:队列已关闭,停止消费")
break
print(f" 消费: {item} (来自消费者 {consumer_id})")
consumed += 1
time.sleep(random.uniform(0.2, 0.5))
q = ThreadSafeQueue(maxsize=5)
# 多生产者
producers = []
for i in range(2):
items = [f"商品-{j}" for j in range(5)]
t = threading.Thread(target=producer_task, args=(q, i, items))
producers.append(t)
t.start()
# 多消费者
consumers = []
for i in range(3):
t = threading.Thread(target=consumer_task, args=(q, i, 4))
consumers.append(t)
t.start()
for t in producers:
t.join()
# 等待队列消费完
time.sleep(1)
q.stop()
for t in consumers:
t.join()
print("所有生产消费任务完成!")
设计要点总结: (1)使用Condition实现高效的等待/通知机制,避免忙等待;(2)始终在while循环中检查条件,防止虚假唤醒;(3)通过running标志实现优雅关闭,避免死锁;(4)添加超时机制防止线程永久阻塞;(5)生产者和消费者的速度差异通过缓冲区容量来平衡。
十一、死锁的预防与检测
死锁(Deadlock) 是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的僵局。死锁发生的四个必要条件(Coffman条件)必须同时满足:互斥 、持有并等待 、不可剥夺 、循环等待 。
经典死锁示例
import threading
import time
# 经典死锁:两个锁、两个线程、循环等待
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
print("线程1: 持有锁A, 等待锁B...")
time.sleep(0.1)
with lock_b:
print("线程1: 获取锁B") # 永远不会执行到这里
def thread_2():
with lock_b:
print("线程2: 持有锁B, 等待锁A...")
time.sleep(0.1)
with lock_a:
print("线程2: 获取锁A") # 永远不会执行到这里
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start()
t2.start()
t1.join()
t2.join() # 程序卡死在这里!
预防策略一:固定锁顺序
# 解决方案1:所有线程按相同顺序获取锁
def thread_1_fixed():
with lock_a:
print("线程1: 持有锁A")
time.sleep(0.1)
with lock_b:
print("线程1: 获取锁B")
def thread_2_fixed():
with lock_a: # 同样先获取锁A,再获取锁B
print("线程2: 持有锁A")
time.sleep(0.1)
with lock_b:
print("线程2: 获取锁B")
# 这样就不会产生死锁——线程2不会持有锁B去等锁A
预防策略二:超时回退
# 解决方案2:使用 acquire(timeout) + 超时后释放已有锁
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
def try_acquire_with_timeout():
acquired_a = False
acquired_b = False
try:
# 尝试获取锁A,超时1秒
acquired_a = lock_a.acquire(timeout=1)
if not acquired_a:
return False
# 尝试获取锁B,超时1秒
acquired_b = lock_b.acquire(timeout=1)
if not acquired_b:
return False
# 成功获取两个锁,执行临界区
print("成功获取所有锁")
return True
finally:
if acquired_b:
lock_b.release()
if acquired_a:
lock_a.release()
def worker_with_timeout():
retries = 3
for attempt in range(retries):
if try_acquire_with_timeout():
break
print(f"获取锁失败,重试 {attempt + 1}/{retries}")
time.sleep(0.5)
t1 = threading.Thread(target=worker_with_timeout)
t2 = threading.Thread(target=worker_with_timeout)
t1.start()
t2.start()
t1.join()
t2.join()
预防策略三:使用 with 语句避免遗漏 release
# 始终使用 with 语句来管理锁的获取和释放
# 好的做法:
lock = threading.Lock()
with lock:
# 即使这里抛出异常,锁也会被正确释放
shared_resource.modify()
# 不好的做法(容易遗漏 release):
lock.acquire()
shared_resource.modify()
# 如果中间有异常,lock.release() 永远不会执行
lock.release()
死锁检测工具: Python标准库没有内置的死锁检测器,但可以使用第三方库如deadlock-detector,或在开发环境中启用PYTHONDEADLOCK=1环境变量(在某些Python发行版中支持)。更推荐的是通过代码审查 和锁顺序规范化 来预防死锁。
十二、核心要点总结
Lock vs RLock: Lock不可重入,同一线程重复acquire会导致死锁;RLock允许同一线程重入,适合递归调用场景。不确定时优先使用Lock。
Semaphore/BoundedSemaphore: 管理有限资源的并发访问,如数据库连接池、API限流。BoundedSemaphore比Semaphore更安全,能捕获release过多的bug。
Event: 简单的布尔标志通知机制,适合一次性广播通知场景。set唤醒所有等待线程,clear重置标志,wait阻塞等待。
Condition: 最灵活的同步原语,结合锁与等待/通知机制,是实现生产者-消费者模式的首选。wait务必在while循环中使用,防止虚假唤醒。
Barrier: 等待指定数量的线程全部到达后同时释放,适合分阶段并行计算。注意超时和异常处理会损坏屏障。
Timer: 一次性延迟执行任务的简便工具,支持cancel取消。精度受操作系统调度限制。
死锁预防: 固定锁获取顺序、使用超时回退、始终使用with语句管理锁。死锁的四个必要条件必须同时满足才能发生,破坏任意一个即可预防。
最佳实践: 尽量使用with语句管理锁的获取和释放;使用while循环检查Condition的等待条件;优先使用BoundedSemaphore而非Semaphore;合理设计锁的粒度,避免在持有锁时执行耗时操作。
十三、进一步思考
掌握同步原语只是并发编程的第一步。在实际工程中,还需要考虑以下进阶话题:
进阶方向:
无锁编程(Lock-Free): 使用原子操作(如threading.Atomic、Python 3.11+的free_threaded模式和原子操作)避免锁的开销和死锁风险。Python 3.12及之后版本对free-threaded模式的支持正在逐步增强。
协程同步: asyncio库提供了对应的异步同步原语(asyncio.Lock、asyncio.Semaphore等),在异步编程中不能混用threading和asyncio的同步原语。
进程间同步: multiprocessing模块提供了与threading对应的同步原语,但进程间同步需要使用共享内存或Manager对象,开销比线程同步大得多。
性能权衡: 锁的粒度越细,并发度越高,但锁管理开销也越大。过度细粒度的锁可能导致性能反而下降(锁争用开销超过了并发收益)。合理的做法是优先使用粗粒度锁,性能分析确认瓶颈后再细粒度化。
GIL的影响: CPython的全局解释器锁(GIL)使得纯CPU密集型的多线程程序无法利用多核优势。但I/O密集型任务(网络请求、文件读写)使用多线程仍然有效,因为I/O操作会释放GIL。
选择正确的同步原语不仅关乎程序的正确性,更关乎代码的可读性和可维护性。一个好的同步设计应该是:读者能直观理解同步意图,编写者能确信没有隐藏的竞态风险。