← 返回并发编程目录
← 返回学习笔记首页
专题: Python并发编程系统学习
关键词: Python, 并发编程, Event, Barrier, 事件标志, 屏障, 线程协调
一、Event事件对象
threading.Event 是Python中最简单的线程间通信机制之一。它内部维护一个布尔标志(flag),线程可以通过检查该标志来决定是否继续执行。Event对象非常适合实现"事件驱动"的线程同步模式,即一个线程发送信号,一个或多个线程等待该信号。
核心方法
set() :将内部标志设为 True,所有等待的线程被唤醒
clear() :将内部标志设为 False,后续 wait() 调用将阻塞
wait(timeout=None) :阻塞直到标志为 True,或超时返回
is_set() :返回当前内部标志的状态(True/False)
基础用法示例
import threading
import time
ev = threading.Event()
def waiter ():
print("等待事件..." )
ev.wait() # 阻塞直到set
print("事件触发,继续执行" )
def setter ():
time.sleep(2 )
print("2秒后设置事件" )
ev.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
wait() 超时用法
wait() 方法支持 timeout 参数,指定最大等待时间(秒)。超时后即使事件未被设置,wait() 也会返回 False,使得线程可以继续执行其他任务而不会永久阻塞。
import threading
ev = threading.Event()
result = ev.wait(timeout=3.0 ) # 最多等待3秒
if result:
print("事件已触发" )
else :
print("等待超时,事件未触发" )
多个线程等待同一个Event
Event 对象支持一对多的通知模式。当多个线程同时 wait() 在同一个 Event 上时,一旦某个线程调用 set(),所有等待线程将被同时唤醒。
import threading
import time
ev = threading.Event()
def worker (name ):
print(f" {name } 已就绪,等待开始信号" )
ev.wait()
print(f" {name} 收到信号,开始工作" )
threads = [threading.Thread(target=worker, args=(f"Worker- {i}" ,))
for i in range (5 )]
for t in threads:
t.start()
time.sleep(1 )
print("裁判发出开始信号!" )
ev.set() # 所有worker同时被唤醒
for t in threads:
t.join()
二、Event的典型应用场景
1. 线程启动同步
主线程创建多个工作线程后,通过 Event 控制所有线程同时开始执行,避免线程启动时间差异导致的不确定性。这在性能测试和基准测试中尤为常见。
import threading
import time
start_event = threading.Event()
def runner (name ):
start_event.wait() # 等待发令枪
for i in range (3 ):
print(f" {name} 第 {i+1 }圈" )
time.sleep(0.5 )
threads = [threading.Thread(target=runner, args=(f"选手 {i}" ,))
for i in range (4 )]
for t in threads:
t.start()
print("预备——跑!" )
start_event.set()
2. 优雅关闭信号
在长期运行的服务程序中,使用 Event 作为关闭信号,当主线程需要停止服务时设置 Event,工作线程检测到信号后自行清理并退出,避免强制终止带来的资源泄漏问题。
import threading
import time
stop_event = threading.Event()
def worker ():
while not stop_event.is_set():
print("工作中..." )
stop_event.wait(timeout=1.0 )
print("工作线程收到停止信号,清理退出" )
t = threading.Thread(target=worker)
t.start()
time.sleep(3 )
print("主线程发出停止信号" )
stop_event.set()
t.join()
print("程序已安全退出" )
3. 定时任务协调
Event 结合 time 模块可以实现轻量级的定时任务调度。一个线程负责计时,到点时 set() 触发 Event,工作线程开始执行周期性任务。
设计建议: Event 适用于"一次性触发"或"开关型"同步场景。如果需要多次触发,可以在循环中反复 clear() 再 wait()。但若同步逻辑复杂(如需要计数或阶段同步),应考虑使用 Barrier 或 Condition 对象。
三、Barrier屏障/栅栏
threading.Barrier 是一种多线程同步原语,它创建一个"屏障点",所有线程必须在屏障点集合完毕后才能继续执行。Barrier 的核心概念是 parties (参与方数量)——即需要等待的线程总数。当所有 parties 都调用了 wait() 后,屏障打开,所有线程同时通过。
核心方法
wait(timeout=None) :阻塞直到所有 parties 都到达屏障点,返回一个整数表示当前线程是第几个到达的(0 ~ parties-1)
abort() :将屏障置为中断状态,所有正在等待和即将 wait() 的线程都会收到 BrokenBarrierError
reset() :将屏障重置为空状态,但正在等待的线程会收到 BrokenBarrierError
parties :属性,返回所需的线程数
n_waiting :属性,当前正在等待的线程数
broken :属性,屏障是否处于中断状态
基础用法示例
import threading
import time
barrier = threading.Barrier(3 ) # 3个线程到达后继续
def worker (name ):
print(f" {name} 准备阶段" )
time.sleep(1 )
pos = barrier.wait() # 等待所有worker
print(f" {name} 同步后继续执行,我是第 {pos} 个到达的" )
threads = [threading.Thread(target=worker, args=(f"Worker- {i}" ,))
for i in range (3 )]
for t in threads:
t.start()
for t in threads:
t.join()
wait() 返回值详解
Barrier.wait() 返回一个 0 到 parties-1 之间的整数,表示当前线程在所有参与线程中的到达顺序。第一个到达的线程返回 0,最后一个返回 parties-1。可以利用这个返回值指派"领导者"线程——例如让最后一个到达的线程执行汇总操作。
import threading
barrier = threading.Barrier(4 )
def phase_worker ():
# 第一阶段
pos = barrier.wait()
if pos == barrier.parties - 1 :
print("所有线程完成第一阶段,开始汇总" )
# 第二阶段
pos = barrier.wait()
if pos == barrier.parties - 1 :
print("所有线程完成第二阶段" )
四、Barrier的abort与reset
异常处理机制
当 Barrier 出现异常时(如某个线程超时、被意外终止),应该调用 abort() 将屏障置为中断状态。此时所有正在 wait() 的线程会立即抛出 threading.BrokenBarrierError 异常,避免线程永久阻塞。
import threading
import time
barrier = threading.Barrier(3 )
def worker (name ):
print(f" {name} 到达屏障" )
try :
barrier.wait()
print(f" {name} 通过屏障" )
except threading.BrokenBarrierError:
print(f" {name} 检测到屏障已损坏" )
def aborter ():
time.sleep(0.5 )
print("中断屏障!" )
barrier.abort()
t1 = threading.Thread(target=worker, args=("W1" ,))
t2 = threading.Thread(target=worker, args=("W2" ,))
t3 = threading.Thread(target=aborter)
t1.start()
t2.start()
t3.start()
for t in (t1, t2, t3):
t.join()
reset() 与 BrokenBarrierError
reset() 方法将屏障恢复到初始状态(n_waiting = 0, broken = False),但任何正在等待的线程会收到 BrokenBarrierError。需要注意的是,reset() 调用后,之前到达的线程会被"遗忘"——它们需要重新调用 wait()。
import threading
import time
barrier = threading.Barrier(2 )
def slow_worker ():
time.sleep(2 )
print("慢线程到达屏障" )
try :
barrier.wait()
print("慢线程通过屏障" )
except threading.BrokenBarrierError:
print("慢线程:屏障损坏" )
t = threading.Thread(target=slow_worker)
t.start()
time.sleep(0.5 )
barrier.wait() # 主线程先到达
print("主线程到达,但慢线程还要1.5秒" )
# 重置屏障,慢线程的 wait() 将收到 BrokenBarrierError
barrier.reset()
print("屏障已重置" )
t.join()
wait() 超时处理
wait() 方法也支持 timeout 参数。如果当前线程等待超时,屏障会进入 broken 状态,所有其他正在等待的线程都会收到 BrokenBarrierError。因此设置 timeout 时需要确保所有线程的一致性或处理 BrokenBarrierError。
最佳实践: 使用 Barrier 时始终在 wait() 周围包裹 try/except BrokenBarrierError,确保其中一个线程异常时所有线程都能安全退出,避免死锁。
五、Event vs Barrier对比
Event 和 Barrier 虽然都是线程协调工具,但它们的同步模型有本质区别。Event 采用"一对多" 的信号广播模式,而 Barrier 采用"多对多" 的屏障等待模式。下面通过表格对比两者的核心差异:
对比维度
Event(事件)
Barrier(屏障)
同步模型
一对多:一个线程发信号,多个线程接收
多对多:所有线程互相等待,全部到达后一起继续
是否可重用
是,通过 clear() 重置标志后可再次使用
是,通过 reset() 重置,但会引发异常
是否需要知道线程数
不需要,Event 本身不跟踪等待者数量
需要,必须指定 parties 数量
适用场景
启动信号、停止信号、超时等待
分阶段并行计算、赛跑同时出发、多阶段流水线
异常处理
无特殊异常,超时返回 False
BrokenBarrierError,需显式捕获
典型类比
发令枪(一个鸣枪,所有选手起跑)
接力赛交接点(所有队员到齐后才能交接)
何时选择 Event
需要一个线程通知其他线程某个事件已经发生
通知方不需要确认接收方已收到消息
需要超时等待机制
用于实现优雅关闭或暂停/恢复信号
何时选择 Barrier
所有线程必须到达某个同步点后才能继续下一阶段
分治算法中的合并阶段(如并行排序后的结果合并)
多阶段流水线作业,每个阶段完成后需要同步
需要精确控制线程到达顺序和阶段切换
一句话总结: Event 适合"信号通知"场景——一个线程告诉其他线程"可以开始了";Barrier 适合"阶段同步"场景——所有线程互相等待,确认彼此都已准备好再一起前进。