Condition条件变量:经典等待/通知模式

Python并发编程专题 · 线程间协作式同步机制

专题:Python并发编程系统学习

关键词:Python, 并发编程, Condition, 条件变量, wait, notify, 虚假唤醒, 生产者消费者

一、条件变量的核心概念

1.1 什么是条件变量

条件变量(Condition Variable)是并发编程中一种重要的线程同步原语,它允许一个或多个线程等待某个特定条件变为真,同时允许其他线程在条件满足时通知等待线程。Condition对象本质上是对一个底层Lock(或RLock)的封装,提供了一种比单纯使用锁更高级的协作机制。

在Python的threading模块中,Condition类实现了这一模式。它最核心的设计思想是 "等待/通知"(Wait/Notify)范式,这种范式在多线程编程中广泛应用,尤其是在生产者-消费者问题、有界缓冲区、线程池任务调度等场景中。

1.2 条件变量与互斥锁的关系

条件变量必须与一个互斥锁(Lock或RLock)配合使用。互斥锁确保对共享数据的访问是互斥的,而条件变量则提供了一种让线程在某个条件不满足时挂起等待、在条件满足时被唤醒的机制。二者的关系可以概括为:互斥锁解决的是 "资源冲突" 问题,条件变量解决的是 "资源依赖" 问题。

具体来说,互斥锁保证任何时候只有一个线程可以访问共享资源,防止数据竞争。而条件变量则允许线程在共享资源不满足某条件时主动让出CPU(通过wait),避免忙等待浪费CPU周期;当条件发生变化时,其他线程可以通过notify或notify_all唤醒等待线程。这种组合使得多线程协作既安全又高效。

1.3 为什么需要条件变量

如果没有条件变量,开发者只能通过轮询(polling)的方式检测条件是否满足:要么使用忙等待(busy waiting)持续检查,浪费CPU资源;要么使用定时休眠+检查的方式,带来不必要的延迟和开销。条件变量从根本上解决了这两个问题——它让线程在条件不满足时进入休眠状态,不消耗CPU,并且在条件满足时被立即唤醒,实现零延迟响应。

核心要点:条件变量 = 互斥锁 + 等待队列。互斥锁保护共享数据,等待队列管理需要等待特定条件的线程。条件变量使得线程可以在"等待条件"和"条件满足被唤醒"两种状态之间高效切换,避免了轮询带来的CPU浪费。

二、Condition的API详解

2.1 构造函数

threading.Condition(lock=None)——创建一个条件变量对象。lock参数是可选的,如果为None,内部会自动创建一个RLock(可重入锁);如果需要使用普通的Lock,可以显式传入threading.Lock()。选择Lock还是RLock取决于是否需要可重入性:如果同一个线程中可能需要多次获取同一把锁(例如递归调用),则应使用RLock。

2.2 acquire() 和 release()

Condition对象继承自底层锁的acquire和release方法。它们用于获取和释放与Condition关联的锁。在调用wait和notify方法之前,线程必须已经持有该锁。通常更推荐使用上下文管理器(with语句)来自动管理锁的获取和释放,这样代码更加简洁且不易出错。

2.3 wait(timeout=None)

wait方法执行三个关键步骤:

timeout参数指定最大等待秒数。如果超时,wait返回False;如果被正常唤醒,返回True。需要注意的是,wait返回时,等待的条件不一定成立(详见虚假唤醒章节),因此必须在循环中调用wait。

2.4 notify(n=1)

notify方法唤醒当前等待队列中的n个线程(默认为1个)。被唤醒的线程会尝试重新获取锁,但不会立即执行——它们必须等待当前线程释放锁后才能继续。notify通常只用于只需要唤醒一个等待线程的场景,例如单一生产者-消费者模式中生产一个商品后只需通知一个消费者。

2.5 notify_all()

notify_all方法唤醒当前等待队列中的所有线程。所有被唤醒的线程都会竞争锁,竞争成功的线程继续执行,其余的会再次进入等待状态。当条件变化可能影响多个等待线程时(例如共享状态发生了根本性变化),应使用notify_all。

使用规则:wait、notify和notify_all都必须在持有锁的情况下调用。在with cond: 或 cond.acquire()/cond.release() 保护范围内使用。违反这一规则将导致RuntimeError。

三、经典的生产者-消费者模式

生产者-消费者模式是条件变量的最典型应用。生产者线程生产数据并放入共享缓冲区,消费者线程从缓冲区取出数据进行处理。当缓冲区为空时,消费者需要等待生产者放入新数据;当缓冲区满时,生产者需要等待消费者取出数据。下面是一个完整的实现示例:

import threading import time import random cond = threading.Condition() items = [] class Producer(threading.Thread): def run(self): for i in range(10): with cond: items.append(i) print(f"生产: {i}") cond.notify() # 通知消费者 time.sleep(random.random()) class Consumer(threading.Thread): def run(self): while True: with cond: while not items: # 务必使用while而非if防止虚假唤醒 cond.wait() item = items.pop(0) print(f"消费: {item}") time.sleep(random.random())

在这个例子中,消费者线程在items为空时调用cond.wait()进入等待状态,释放锁供生产者使用。生产者生产一个商品后调用cond.notify()唤醒一个消费者线程。消费者被唤醒后会重新获取锁,然后再次检查items是否非空(这是防止虚假唤醒的关键),确认有数据后才pop消费。需要注意的是,这个实现假设消费者线程会一直运行——实际应用中通常会加入终止标志(如sentinel)来优雅退出。

下面是带终止机制和缓冲区容量限制的更完整版本:

import threading import time import random cond = threading.Condition() items = [] MAX_SIZE = 5 DONE = False class Producer(threading.Thread): def run(self): global DONE for i in range(20): with cond: while len(items) >= MAX_SIZE: cond.wait() # 缓冲区满,等待消费者消费 items.append(i) print(f"生产: {i} (缓冲区大小: {len(items)})") cond.notify() time.sleep(random.uniform(0.1, 0.5)) with cond: DONE = True cond.notify_all() # 通知所有消费者生产结束 class Consumer(threading.Thread): def run(self): while True: with cond: while not items and not DONE: cond.wait() if not items and DONE: break # 无数据且生产已结束 item = items.pop(0) print(f"消费: {item} (缓冲区大小: {len(items)})") cond.notify() # 通知生产者缓冲区有空位 time.sleep(random.uniform(0.2, 0.8)) print("消费者退出") producers = [Producer()] consumers = [Consumer() for _ in range(2)] for t in producers + consumers: t.start() for t in producers + consumers: t.join()

这个增强版本展示了几个重要实践:生产者等待缓冲区不满、通过DONE标志优雅终止多个消费者、生产结束时使用notify_all避免消费者永久阻塞。其中,notify()和notify_all()的选择至关重要——通知一个消费者处理新商品使用notify(),而通知所有消费者生产结束则使用notify_all()。

四、wait的详细工作机制

4.1 wait的原子性操作

wait方法的设计巧妙之处在于它将 "释放锁" 和 "线程挂起" 合并为一个原子操作。这意味着不会出现这样的竞态条件:线程释放了锁但还没来得及挂起就被其他线程通知了。如果这两个步骤不是原子的,那么当线程A释放锁后、挂起前,线程B获取了锁、发送了notify、又释放了锁,然后线程A才挂起,线程A就会永远错过这次通知,导致永久阻塞。

Python的Condition实现内部使用底层操作系统的条件变量原语(在POSIX系统上使用pthread_cond_wait),这些原语天然保证了释放锁和挂起操作的原子性。

4.2 wait的完整生命周期

当一个线程调用condition.wait()时,以下步骤依次发生:

下图可视化了wait的完整流程:

wait的"三步曲":

① 释放锁并阻塞等待 → ② 被唤醒并重新获取锁 → ③ 返回并检查条件

关键:wait返回时仅仅是 "被唤醒了",并不代表 "条件成立了"——检查条件的职责在调用者身上。

4.3 wait的超时机制

wait(timeout=N)允许设置最大等待时间。如果超过N秒仍未收到通知,wait会自动返回False。超时机制在某些场景下非常有用:例如需要定期检查某些状态,或希望在等待过程中有机会处理其他事务。使用超时时,代码模式通常是:

while not condition_met(): result = cond.wait(timeout=5.0) if not result: handle_timeout() # 超时处理逻辑

五、虚假唤醒(Spurious Wakeup)

5.1 什么是虚假唤醒

虚假唤醒是指线程在没有收到任何notify/notify_all调用的情况下,从wait方法中返回的现象。这听起来像是bug,但实际上在大多数操作系统中,这是条件变量实现的固有特性。POSIX标准明确允许pthread_cond_wait函数返回虚假唤醒。Python的Condition基于底层OS原语实现,因此也继承了这一特性。

虚假唤醒的根本原因与操作系统的线程调度机制有关。在某些实现中,条件变量的等待队列实现可能为了性能和正确性,偶尔会触发所有或部分等待线程的唤醒。此外,信号处理的干扰、进程间的信号传递等也可能导致虚假唤醒。

5.2 为什么必须用while而非if

这是条件变量使用中最重要的一条规则:永远在循环中检查等待条件。如果使用if而不是while,虚假唤醒会导致线程在条件实际不满足的情况下继续执行,从而引发数据竞争、索引越界、逻辑错误等严重问题。

以生产者-消费者为例,使用if的错误写法:

# 错误写法——请不要这样做 with cond: if not items: # 使用if! cond.wait() item = items.pop(0) # 可能触发IndexError!

假设两个消费者线程同时被唤醒(无论是notify_all还是虚假唤醒)。它们依次获取锁:线程A获取锁后pop了唯一的商品,然后释放锁;线程B获取锁后继续执行——但由于使用了if,线程B没有重新检查items是否为空的代码路径,直接调用items.pop(0)就会抛出IndexError。而使用while的正确版本中,线程B会在循环中重新检查条件,发现items为空后再次进入wait,从而避免了错误。

5.3 标准保护范式

Python官方文档和所有权威并发编程教材都推荐以下范式:

# 等待端 —— 总是用while循环保护 with cond: while not predicate(): # 条件不满足就继续等 cond.wait() # 条件满足,继续执行 # 通知端 —— 条件变化时通知等待线程 with cond: update_shared_data() # 修改共享状态 cond.notify() # 或 cond.notify_all()

这种范式保证:无论是否发生虚假唤醒,线程都不会在条件不满足的情况下继续执行。这是线程安全的黄金法则之一。

六、notify vs notify_all

6.1 notify的行为特征

cond.notify()默认唤醒等待队列中的一个线程。等待队列通常采用FIFO顺序,但具体行为依赖于底层实现,开发者不应依赖特定顺序。选择notify的场景包括:

6.2 notify_all的行为特征

cond.notify_all()唤醒等待队列中的所有线程。所有被唤醒的线程都会尝试获取锁,但只有一个能成功获取,其余线程会进入锁的等待队列。选择notify_all的场景包括:

6.3 惊群效应(Thundering Herd Problem)

使用notify_all时需要注意惊群效应:当大量线程被同时唤醒但实际只能有一个线程继续执行时,其他线程会立即重新进入等待状态,导致大量的上下文切换和锁竞争,增加了系统开销。在等待线程数量很大(数百或数千)的场景下,惊群效应可能造成显著的性能下降。

缓解惊群效应的策略包括:

6.4 选择策略总结

场景推荐方法原因
单一生产/单一消费notify()每次只需唤醒一个线程,效率最高
多生产/多消费notify()默认唤醒一个,避免无效竞争
状态类型发生变化notify_all()所有等待线程都应检查各自条件
程序关闭/终止信号notify_all()确保所有线程收到退出信号
等待线程很少(2-3个)notify_all()也可惊群效应开销可忽略
大量等待线程(100+)优先notify()避免惊群效应导致性能雪崩

七、Condition的高级应用

7.1 有界缓冲区(Bounded Buffer)

有界缓冲区是生产者-消费者模式的重要变体。它不仅要求缓冲区非空时消费者可以消费,还要求缓冲区非满时生产者可以生产。这需要两个等待条件,但Python的Condition只能关联一个等待队列。常见的解决方案是在同一把锁的保护下,使用while循环同时检查两个条件:

class BoundedBuffer: def __init__(self, capacity): self.capacity = capacity self.buffer = [] self.cond = threading.Condition() def put(self, item): with self.cond: while len(self.buffer) >= self.capacity: self.cond.wait() self.buffer.append(item) self.cond.notify() # 通知消费者 def get(self): with self.cond: while not self.buffer: self.cond.wait() item = self.buffer.pop(0) self.cond.notify() # 通知生产者 return item

需要注意的是,上面的实现使用同一个Condition对象来管理两类等待线程(生产者等待和消费者等待)。当调用notify()时,无法区分唤醒的是生产者还是消费者。在大多数情况下这没问题,因为被唤醒的线程会在while循环中检查自己的条件——如果被唤醒的线程发现自己的条件不满足(例如消费者被唤醒但缓冲区仍然为空),它会再次进入等待。但在高并发场景下,这可能导致 "活锁" 现象:频繁的无效唤醒增加了CPU开销。

更精细的控制需要两个条件变量(分别用于 "非满" 和 "非空" 条件),但Python标准库的Condition不支持创建多个等待队列。在需要极致性能的场景下,可以使用threading.Lock配合threading.Event或引入第三方库如queue.Queue——后者内部已经实现了完善的有界缓冲区管理。

7.2 用Condition实现读写锁

读写锁(Read-Write Lock)允许多个线程同时读共享数据,但写操作必须独占访问。使用Condition可以实现一个简单的读写锁:

class ReadWriteLock: def __init__(self): self.lock = threading.Lock() self.cond = threading.Condition(self.lock) self.readers = 0 self.writers = 0 self.write_waiters = 0 def acquire_read(self): with self.cond: while self.writers > 0 or self.write_waiters > 0: self.cond.wait() self.readers += 1 def release_read(self): with self.cond: self.readers -= 1 if self.readers == 0: self.cond.notify_all() def acquire_write(self): with self.cond: self.write_waiters += 1 while self.readers > 0 or self.writers > 0: self.cond.wait() self.write_waiters -= 1 self.writers += 1 def release_write(self): with self.cond: self.writers -= 1 self.cond.notify_all()

这个读写锁实现中,acquire_read在有写者在执行或有写者在等待时会阻塞,优先保证写操作不饿死;release_read在最后一个读线程释放锁时唤醒所有等待线程(通常主要是写线程);acquire_write等待所有读线程和写线程完成;release_write在写线程释放锁时唤醒所有等待线程。写优先策略避免了写线程被连续不断的读线程饿死。

7.3 多阶段流水线同步

在某些并发场景中,数据需要经过多个处理阶段。每个阶段的线程处理完数据后需要通知下一阶段的线程。使用多个Condition可以精细控制各阶段的同步:

class Pipeline: def __init__(self, stages): self.stages = stages self.data = [None] * stages self.ready = [False] * stages self.lock = threading.Lock() self.conds = [threading.Condition(self.lock) for _ in range(stages)] def set_stage(self, stage, value): with self.lock: self.data[stage] = value self.ready[stage] = True self.conds[stage].notify_all() def wait_stage(self, stage): with self.lock: while not self.ready[stage]: self.conds[stage].wait() return self.data[stage]

每个阶段有自己的Condition对象,使得流水线中的线程可以精确等待自己关注的那个阶段完成。这种模式在数据处理管道(如ETL流程)、图像处理流水线、期货或期权定价的分步计算等场景中非常有用。

八、核心要点总结

Condition条件变量的核心要点:

九、进一步思考

1. Condition与Event的区别:Event是一次性的广播通知,所有调用wait()的线程都会被唤醒且无需再次检查条件;而Condition用于需要反复检查复杂条件的场景,wait返回后必须重新检查条件。Event适合 "初始化完成" 这类一次性信号,Condition适合 "队列中有数据" 这类持续变化的条件。

2. Condition与Semaphore的区别:Semaphore(信号量)维护一个计数器,允许多个线程同时访问有限数量的资源,不需要显式的条件检查;Condition则提供了更灵活的条件判断,可以在任意复杂的谓词条件下等待和通知。

3. 多Condition vs 单Condition:在同一个锁下使用多个Condition对象可以细分等待条件,避免惊群效应。Python的threading.Condition虽然只能关联一个等待队列,但可以通过创建多个Condition对象并共享同一个锁来实现多等待队列的效果(如上文的Pipeline实现)。

4. queue.Queue的封装:Python标准库中的queue.Queue内部已经基于Condition实现了完善的有界缓冲区,包括put/get的超时处理和线程安全。在大多数实际场景中,直接使用queue.Queue比手写Condition更推荐。但是理解Condition的底层原理对于排查复杂的并发问题、以及实现自定义同步原语是必不可少的。