同步原语(锁/信号量/条件变量/屏障)

Python进阶编程专题 · Python并发编程中的线程协调机制

专题:Python进阶编程系统学习

关键词:Python, Lock, RLock, Semaphore, Event, Condition, Barrier, 同步原语, 线程同步, 并发编程

一、概述

在多线程和多进程编程中,多个执行单元并发访问共享资源时会产生竞态条件(Race Condition),导致数据不一致、程序崩溃等严重问题。同步原语(Synchronization Primitives)是操作系统和编程语言提供的底层工具,用于协调线程/进程之间的执行顺序,保证共享资源的互斥访问和线程间的正确通信。

Python的threading模块提供了丰富且易用的同步原语,包括LockRLockSemaphoreEventConditionBarrierTimer。这些同步原语既可用于threading模块的线程同步,也可通过multiprocessing模块的对应版本用于进程同步。理解每种原语的语义、适用场景和潜在陷阱,是编写正确、高效并发程序的基础。

核心要点:同步原语的核心目标有三个——互斥(同一时间只有一个线程访问共享资源)、协调(线程之间按照特定顺序执行)、通信(线程之间传递状态信息)。Python threading模块为这三种目标分别提供了对应的原语实现。

二、Lock 互斥锁——最基础的同步工具

threading.Lock是Python中最基础的同步原语,只有锁定未锁定两种状态。它的设计极其简单:一个线程调用acquire()获得锁,其他线程必须等待直到持有锁的线程调用release()释放锁。Lock保证同一时刻最多只有一个线程能进入临界区(Critical Section),从根本上消除竞态条件。

基本用法

import threading # 创建一个锁对象 lock = threading.Lock() # 方式一:手动 acquire/release lock.acquire() try: # 临界区代码——每次只有一个线程能执行到这里 shared_counter += 1 finally: lock.release() # 方式二:使用 with 语句(推荐) with lock: # 上下文管理器自动 acquire 和 release shared_counter += 1

完整示例:计数器安全累加

import threading counter = 0 lock = threading.Lock() results = [] def safe_increment(n): global counter for _ in range(n): with lock: counter += 1 threads = [] for i in range(5): t = threading.Thread(target=safe_increment, args=(100000,)) threads.append(t) t.start() for t in threads: t.join() print(f"最终结果: {counter}") # 正确输出 500000

注意事项:Lock是不可重入的。如果一个线程已经持有了锁,再次调用acquire()会导致死锁(Deadlock)——线程被自己阻塞住。解决此问题请使用下面介绍的RLock

深入理解 Lock 的内部机制

Lock底层通过操作系统的互斥量(Mutex)实现。当一个线程调用acquire(blocking=True)时,如果锁已被其他线程持有,该线程会进入阻塞等待状态,操作系统会将其挂起,不会继续消耗CPU时间片。被阻塞的线程会加入锁的等待队列,当持有锁的线程释放锁时,等待队列中的第一个线程会被唤醒并成功获取锁。这种机制保证了CPU资源的合理利用——等待中的线程不会进行忙等待(busy waiting),避免了无意义的CPU轮询。

acquire(blocking=False)是非阻塞模式。如果锁已被持有,立即返回False而不会阻塞线程。这在需要尝试获取锁而不想阻塞的场景下非常有用,比如实现"尝试执行"的逻辑。

lock = threading.Lock() # 非阻塞尝试获取锁 if lock.acquire(blocking=False): try: print("成功获取锁,执行临界区代码") finally: lock.release() else: print("锁已被其他线程持有,跳过本次操作") # 可以执行其他逻辑或稍后重试

三、RLock 可重入锁——同一线程可多次获取

threading.RLock(Reentrant Lock,可重入锁)是Lock的增强版本,允许同一个线程多次获取锁而不会死锁。RLock内部维护了两个关键信息:持有线程标识(Owner)重入计数(Count)。当线程第一次获取锁时,计数为1;如果同一线程再次获取,计数递增;每次release()递减计数,直到计数归零时锁才真正被释放。

import threading rlock = threading.RLock() def outer_function(): with rlock: print("外层函数获取锁") inner_function() # 同一线程再次获取同一锁,不会死锁 def inner_function(): with rlock: print("内层函数也获取锁——RLock允许这样做") # 执行:RLock允许同一线程重复进入 t = threading.Thread(target=outer_function) t.start() t.join()

RLock 在递归调用中的典型应用

import threading class SafeCounter: def __init__(self): self.count = 0 self.lock = threading.RLock() # 必须用 RLock def increment(self): with self.lock: self.count += 1 def increment_multiple(self, times): with self.lock: # 已经持有锁,但仍然可以递归调用 for _ in range(times): self.increment() # 再次获取同一个锁 def get_count(self): with self.lock: return self.count counter = SafeCounter() counter.increment_multiple(10) print(f"安全计数结果: {counter.get_count()}") # 10

Lock vs RLock 选择指南:如果你不确定使用哪种锁,优先使用Lock——它更轻量且在非递归场景下更安全。只有当你明确知道需要在同一个线程内多次获取同一个锁(如递归调用、辅助方法调用)时,才使用RLock。滥用RLock会掩盖潜在的设计问题。

特性LockRLock
同一线程重复 acquire死锁允许(计数递增)
性能更轻量略重(需维护计数)
适用场景简单互斥递归调用、内嵌同步
release 次数一次即可必须与 acquire 次数匹配

四、Semaphore 信号量——资源池限流的利器

threading.Semaphore内部维护一个计数器,用于控制同时访问特定资源的线程数量。它可以看作是"允许多个线程同时访问的锁"。每次调用acquire()时计数器减一(计数为0时阻塞),每次release()时计数器加一。信号量非常适合连接池限制并发数量控制等场景。

基本使用:限制并发数据库连接数

import threading import time # 最多允许3个线程同时访问数据库连接池 connection_pool = threading.Semaphore(3) def query_database(query_id): with connection_pool: print(f"[{time.strftime('%H:%M:%S')}] 查询 {query_id} 开始——占用连接") time.sleep(2) # 模拟数据库查询耗时 print(f"[{time.strftime('%H:%M:%S')}] 查询 {query_id} 结束——释放连接") threads = [] for i in range(8): t = threading.Thread(target=query_database, args=(i,)) threads.append(t) t.start() for t in threads: t.join() # 任何时候都最多有3个查询同时进行

BoundedSemaphore——更安全的信号量

threading.BoundedSemaphore是Semaphore的子类,增加了上限保护:如果release()的次数超过了初始计数(即试图将计数恢复到初始值以上),会抛出ValueError。这能有效防止程序错误导致信号量计数失控,是一种防御性编程实践。

import threading # BoundedSemaphore:计数不会超过初始值 s = threading.BoundedSemaphore(2) s.acquire() # 计数: 2 -> 1 s.acquire() # 计数: 1 -> 0 s.release() # 计数: 0 -> 1 s.release() # 计数: 1 -> 2 # s.release() # ValueError! 计数不能超过初始值2 # 普通 Semaphore 没有此限制 us = threading.Semaphore(2) us.release() # 没问题,计数变成3——这可能隐藏 bug

最佳实践:除非有特殊需求,否则始终使用BoundedSemaphore而非Semaphore,它能帮你捕获因release()调用次数不匹配导致的bug。

五、Event 事件——线程间状态通知

threading.Event是最简单的线程间通信机制,管理一个内部布尔标志位(Flag)。Event类似于一个"信号旗"——一个线程设置标志位,其他线程等待该标志位被设置。它不涉及数据传递,仅用于线程间的状态通知

核心方法

典型场景:等待服务启动

import threading import time import random def worker(worker_id, ready_event): """工作线程等待服务就绪信号""" print(f" 工人 {worker_id} 等待服务就绪...") ready_event.wait() # 阻塞等待 event 被 set print(f" 工人 {worker_id} 开始工作") def service_starter(ready_event): """服务启动线程,完成后发送就绪信号""" print("服务启动中...") time.sleep(random.uniform(1, 3)) # 模拟启动耗时 print("服务已就绪!通知所有工人开始工作") ready_event.set() # 一次性通知所有等待线程 ready_event = threading.Event() # 启动多个工作线程 workers = [] for i in range(5): t = threading.Thread(target=worker, args=(i, ready_event)) workers.append(t) t.start() # 启动服务 starter = threading.Thread(target=service_starter, args=(ready_event,)) starter.start() for t in workers: t.join() starter.join() # 所有工人同时得到通知,几乎同时开始工作

可复用的就绪—开始模式

Event可以重复使用:调用clear()将标志重置为False,然后再次set()触发下一轮通知。这种模式适合多轮次的任务协调。

event = threading.Event() # 第一轮:等待就绪 event.wait() # ... 执行第一轮任务 ... # 重置并等待第二轮 event.clear() # 等待第二轮就绪信号 event.wait()

Event vs Condition:Event适用于"一次性通知"或"广播通知"场景,语义简单清晰。如果要实现更复杂的等待条件(比如等待队列非空),Condition是更合适的选择。

六、Condition 条件变量——最灵活的同步原语

threading.Condition是Python中最强大的同步原语,它结合了锁(Lock/RLock)等待/通知机制。Condition的核心思想是:线程在某个条件不满足时释放锁并等待,其他线程在条件变化后通知等待线程重新获取锁并检查条件。Condition是实现生产者-消费者模式的标准工具。

核心方法

生产者-消费者经典实现

import threading import time import random class BoundedBuffer: """有界缓冲区——生产者消费者模型""" def __init__(self, capacity=5): self.buffer = [] self.capacity = capacity self.cond = threading.Condition() def produce(self, item): with self.cond: # 当缓冲区已满时等待 while len(self.buffer) >= self.capacity: print(f" 缓冲区满,生产者等待...") self.cond.wait() self.buffer.append(item) print(f"生产 [{item}] 缓冲区大小: {len(self.buffer)}") # 通知可能正在等待的消费者 self.cond.notify() def consume(self): with self.cond: # 当缓冲区为空时等待 while len(self.buffer) == 0: print(f" 缓冲区空,消费者等待...") self.cond.wait() item = self.buffer.pop(0) print(f"消费 [{item}] 缓冲区大小: {len(self.buffer)}") # 通知可能正在等待的生产者 self.cond.notify() return item def producer(buffer, items): for item in items: buffer.produce(item) time.sleep(random.uniform(0.3, 0.8)) def consumer(buffer, count): for _ in range(count): buffer.consume() time.sleep(random.uniform(0.5, 1.0)) buffer = BoundedBuffer(capacity=3) items = [f"商品-{i}" for i in range(6)] prod = threading.Thread(target=producer, args=(buffer, items)) cons = threading.Thread(target=consumer, args=(buffer, 6)) prod.start() cons.start() prod.join() cons.join() print("所有生产消费任务完成")

重要注意事项:使用wait()时一定要使用while循环来检查条件,而不是if。这是因为notify()可能唤醒多个线程(虚假唤醒),或者在被唤醒到重新获取锁之间条件再次发生变化。while循环确保了条件成立时才能继续执行,这是防御性编程的核心实践

notify 与 notify_all 的选择

# notify:只唤醒一个等待线程(效率高,适用于同类等待) with cond: cond.notify() # 唤醒一个线程 # notify_all:唤醒所有等待线程(适用于不同类别的等待) with cond: cond.notify_all() # 唤醒所有线程

经验法则:如果所有等待线程都在等待同一个条件,使用notify()即可。如果等待线程在等待不同条件(例如有些等待"缓冲区非空",有些等待"缓冲区未满"),使用notify_all()避免产生"信号丢失"问题。

七、Barrier 屏障——等待所有线程就绪

threading.Barrier(屏障/栅栏)用于多线程同步:设定一个参与线程数量n,当少于n个线程调用wait()时,所有调用线程都会阻塞;当第n个线程也调用wait()后,所有线程同时被释放,继续执行后续代码。Barrier天然适用于分阶段并行计算(如并行排序中的归并阶段)、多线程初始化同步等场景。

import threading import time import random def worker(worker_id, barrier): print(f"工作线程 {worker_id} 执行第一阶段...") time.sleep(random.uniform(0.5, 1.5)) print(f"工作线程 {worker_id} 第一阶段完成,等待其他线程...") # 等待所有线程完成第一阶段 barrier.wait() print(f"工作线程 {worker_id} 所有线程就绪,开始第二阶段!") n_workers = 4 barrier = threading.Barrier(n_workers) threads = [] for i in range(n_workers): t = threading.Thread(target=worker, args=(i, barrier)) threads.append(t) t.start() for t in threads: t.join() print("所有阶段已完成!")

Barrier 的高级特性:回调函数与abort

def on_barrier_released(): """所有线程到达屏障时执行的回调""" print(f"=== 屏障释放!所有 {threading.Barrier.parties} 个线程已就绪 ===") def worker_with_callback(worker_id, barrier): time.sleep(random.uniform(0.5, 2.0)) print(f" 线程 {worker_id} 到达屏障") try: barrier.wait() print(f" 线程 {worker_id} 通过屏障") except threading.BrokenBarrierError: print(f" 线程 {worker_id}:屏障已损坏(超时或重置)") # parties:需要等待的线程数 # action:屏障释放时执行的回调函数 # timeout:单个 wait() 的最大等待时间 barrier = threading.Barrier(3, action=on_barrier_released) for i in range(3): t = threading.Thread(target=worker_with_callback, args=(i, barrier)) t.start() time.sleep(0.3) # Barrier 其他重要方法: # barrier.reset() # 重置屏障到初始状态 # barrier.abort() # 将屏障置为损坏状态,所有等待线程收到 BrokenBarrierError # barrier.parties # 屏障需要的线程数量 # barrier.n_waiting # 当前正在等待的线程数 # barrier.broken # 屏障是否处于损坏状态

适用场景

  • 并行计算的多阶段同步
  • 服务启动时等待所有模块初始化完成
  • 游戏引擎中等待所有渲染线程就绪
  • 测试框架中多个线程同时开始执行

注意事项

  • 线程数必须精确匹配 parties
  • 超时配置可防止无限等待
  • 单个线程异常会破坏屏障
  • 重置后原等待的线程会收到异常

八、Timer 定时器——延迟执行任务

threading.Timer是Thread的子类,用于在指定时间延迟后执行一个函数。Timer在执行一次后自动结束,但可以通过cancel()方法在触发前取消。Timer相当于一个异步定时器,不会阻塞主线程的执行。

import threading import time def timeout_handler(): print(f"[{time.strftime('%H:%M:%S')}] 定时器触发!执行超时处理") # 3秒后执行 timeout_handler timer = threading.Timer(3.0, timeout_handler) timer.start() print(f"[{time.strftime('%H:%M:%S')}] 定时器已启动,主线程继续执行...") time.sleep(2) print(f"[{time.strftime('%H:%M:%S')}] 主线程仍在工作...")

取消定时器

def delayed_task(task_id): print(f"任务 {task_id} 执行") # 创建多个定时器 timers = [] for i in range(5): t = threading.Timer(i * 2.0, delayed_task, args=(i,)) timers.append(t) t.start() # 取消第3个定时器(i=2,将在4秒后执行) timers[2].cancel() print("定时器 2 已被取消") # 输出:任务 0、1、3、4 会执行,任务 2 不会执行

注意:Timer的延迟精度受操作系统调度影响,不适用于高精度定时场景。如果需要精确定时,考虑使用time.perf_counter()配合循环,或使用sched模块。

九、同步原语综合对比

原语核心机制线程数主要用途注意事项
Lock互斥锁(两种状态)限制为1临界区保护不可重入,同一线程第二次acquire死锁
RLock可重入锁(计数)限制为1递归同步场景比Lock略重,需配平acquire/release
Semaphore计数器(允许多个)限制为n资源池限流推荐使用BoundedSemaphore
Event布尔标志(set/clear)不限状态通知/广播clear后需重新wait
Condition等待/通知(+锁)不限生产者-消费者wait必须在while循环中使用
Barrier计数屏障(等待n个)固定为n多线程分阶段同步超时和abort会破坏屏障
Timer延迟执行(一次性)定时/超时任务精度有限,可取消

十、生产者-消费者模式完整实战

生产者-消费者模式是并发编程中最经典的协作模式,涉及多类同步原语的协同使用。下面实现一个带超时控制的、支持多生产者和多消费者的完整示例。

import threading import time import random from queue import Queue # 使用 Condition 实现的生产者-消费者 class ThreadSafeQueue: def __init__(self, maxsize=10): self.queue = [] self.maxsize = maxsize self.cond = threading.Condition() self.running = True def put(self, item): with self.cond: while len(self.queue) >= self.maxsize and self.running: self.cond.wait(timeout=1.0) if not self.running: return False self.queue.append(item) self.cond.notify() return True def get(self): with self.cond: while len(self.queue) == 0 and self.running: self.cond.wait(timeout=1.0) if not self.running and len(self.queue) == 0: return None item = self.queue.pop(0) self.cond.notify() return item def stop(self): with self.cond: self.running = False self.cond.notify_all() def producer_task(q, producer_id, items): for item in items: success = q.put(f"P{producer_id}-{item}") if not success: print(f"生产者 {producer_id}:队列已关闭,停止生产") break print(f"生产: P{producer_id}-{item}") time.sleep(random.uniform(0.1, 0.3)) def consumer_task(q, consumer_id, count): consumed = 0 while consumed < count: item = q.get() if item is None: print(f"消费者 {consumer_id}:队列已关闭,停止消费") break print(f" 消费: {item} (来自消费者 {consumer_id})") consumed += 1 time.sleep(random.uniform(0.2, 0.5)) q = ThreadSafeQueue(maxsize=5) # 多生产者 producers = [] for i in range(2): items = [f"商品-{j}" for j in range(5)] t = threading.Thread(target=producer_task, args=(q, i, items)) producers.append(t) t.start() # 多消费者 consumers = [] for i in range(3): t = threading.Thread(target=consumer_task, args=(q, i, 4)) consumers.append(t) t.start() for t in producers: t.join() # 等待队列消费完 time.sleep(1) q.stop() for t in consumers: t.join() print("所有生产消费任务完成!")

设计要点总结:(1)使用Condition实现高效的等待/通知机制,避免忙等待;(2)始终在while循环中检查条件,防止虚假唤醒;(3)通过running标志实现优雅关闭,避免死锁;(4)添加超时机制防止线程永久阻塞;(5)生产者和消费者的速度差异通过缓冲区容量来平衡。

十一、死锁的预防与检测

死锁(Deadlock)是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的僵局。死锁发生的四个必要条件(Coffman条件)必须同时满足:互斥持有并等待不可剥夺循环等待

经典死锁示例

import threading import time # 经典死锁:两个锁、两个线程、循环等待 lock_a = threading.Lock() lock_b = threading.Lock() def thread_1(): with lock_a: print("线程1: 持有锁A, 等待锁B...") time.sleep(0.1) with lock_b: print("线程1: 获取锁B") # 永远不会执行到这里 def thread_2(): with lock_b: print("线程2: 持有锁B, 等待锁A...") time.sleep(0.1) with lock_a: print("线程2: 获取锁A") # 永远不会执行到这里 t1 = threading.Thread(target=thread_1) t2 = threading.Thread(target=thread_2) t1.start() t2.start() t1.join() t2.join() # 程序卡死在这里!

预防策略一:固定锁顺序

# 解决方案1:所有线程按相同顺序获取锁 def thread_1_fixed(): with lock_a: print("线程1: 持有锁A") time.sleep(0.1) with lock_b: print("线程1: 获取锁B") def thread_2_fixed(): with lock_a: # 同样先获取锁A,再获取锁B print("线程2: 持有锁A") time.sleep(0.1) with lock_b: print("线程2: 获取锁B") # 这样就不会产生死锁——线程2不会持有锁B去等锁A

预防策略二:超时回退

# 解决方案2:使用 acquire(timeout) + 超时后释放已有锁 import threading lock_a = threading.Lock() lock_b = threading.Lock() def try_acquire_with_timeout(): acquired_a = False acquired_b = False try: # 尝试获取锁A,超时1秒 acquired_a = lock_a.acquire(timeout=1) if not acquired_a: return False # 尝试获取锁B,超时1秒 acquired_b = lock_b.acquire(timeout=1) if not acquired_b: return False # 成功获取两个锁,执行临界区 print("成功获取所有锁") return True finally: if acquired_b: lock_b.release() if acquired_a: lock_a.release() def worker_with_timeout(): retries = 3 for attempt in range(retries): if try_acquire_with_timeout(): break print(f"获取锁失败,重试 {attempt + 1}/{retries}") time.sleep(0.5) t1 = threading.Thread(target=worker_with_timeout) t2 = threading.Thread(target=worker_with_timeout) t1.start() t2.start() t1.join() t2.join()

预防策略三:使用 with 语句避免遗漏 release

# 始终使用 with 语句来管理锁的获取和释放 # 好的做法: lock = threading.Lock() with lock: # 即使这里抛出异常,锁也会被正确释放 shared_resource.modify() # 不好的做法(容易遗漏 release): lock.acquire() shared_resource.modify() # 如果中间有异常,lock.release() 永远不会执行 lock.release()

死锁检测工具:Python标准库没有内置的死锁检测器,但可以使用第三方库如deadlock-detector,或在开发环境中启用PYTHONDEADLOCK=1环境变量(在某些Python发行版中支持)。更推荐的是通过代码审查锁顺序规范化来预防死锁。

十二、核心要点总结

  • Lock vs RLock:Lock不可重入,同一线程重复acquire会导致死锁;RLock允许同一线程重入,适合递归调用场景。不确定时优先使用Lock。
  • Semaphore/BoundedSemaphore:管理有限资源的并发访问,如数据库连接池、API限流。BoundedSemaphore比Semaphore更安全,能捕获release过多的bug。
  • Event:简单的布尔标志通知机制,适合一次性广播通知场景。set唤醒所有等待线程,clear重置标志,wait阻塞等待。
  • Condition:最灵活的同步原语,结合锁与等待/通知机制,是实现生产者-消费者模式的首选。wait务必在while循环中使用,防止虚假唤醒。
  • Barrier:等待指定数量的线程全部到达后同时释放,适合分阶段并行计算。注意超时和异常处理会损坏屏障。
  • Timer:一次性延迟执行任务的简便工具,支持cancel取消。精度受操作系统调度限制。
  • 死锁预防:固定锁获取顺序、使用超时回退、始终使用with语句管理锁。死锁的四个必要条件必须同时满足才能发生,破坏任意一个即可预防。
  • 最佳实践:尽量使用with语句管理锁的获取和释放;使用while循环检查Condition的等待条件;优先使用BoundedSemaphore而非Semaphore;合理设计锁的粒度,避免在持有锁时执行耗时操作。

十三、进一步思考

掌握同步原语只是并发编程的第一步。在实际工程中,还需要考虑以下进阶话题:

进阶方向:

  • 无锁编程(Lock-Free):使用原子操作(如threading.Atomic、Python 3.11+的free_threaded模式和原子操作)避免锁的开销和死锁风险。Python 3.12及之后版本对free-threaded模式的支持正在逐步增强。
  • 协程同步:asyncio库提供了对应的异步同步原语(asyncio.Lockasyncio.Semaphore等),在异步编程中不能混用threading和asyncio的同步原语。
  • 进程间同步:multiprocessing模块提供了与threading对应的同步原语,但进程间同步需要使用共享内存或Manager对象,开销比线程同步大得多。
  • 性能权衡:锁的粒度越细,并发度越高,但锁管理开销也越大。过度细粒度的锁可能导致性能反而下降(锁争用开销超过了并发收益)。合理的做法是优先使用粗粒度锁,性能分析确认瓶颈后再细粒度化。
  • GIL的影响:CPython的全局解释器锁(GIL)使得纯CPU密集型的多线程程序无法利用多核优势。但I/O密集型任务(网络请求、文件读写)使用多线程仍然有效,因为I/O操作会释放GIL。

选择正确的同步原语不仅关乎程序的正确性,更关乎代码的可读性和可维护性。一个好的同步设计应该是:读者能直观理解同步意图,编写者能确信没有隐藏的竞态风险。