Event事件与Barrier屏障

Python并发编程专题 · 事件驱动与同步点的线程协调

专题:Python并发编程系统学习

关键词:Python, 并发编程, Event, Barrier, 事件标志, 屏障, 线程协调

一、Event事件对象

threading.Event 是Python中最简单的线程间通信机制之一。它内部维护一个布尔标志(flag),线程可以通过检查该标志来决定是否继续执行。Event对象非常适合实现"事件驱动"的线程同步模式,即一个线程发送信号,一个或多个线程等待该信号。

核心方法

基础用法示例

import threading import time ev = threading.Event() def waiter(): print("等待事件...") ev.wait() # 阻塞直到set print("事件触发,继续执行") def setter(): time.sleep(2) print("2秒后设置事件") ev.set() t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=setter) t1.start() t2.start() t1.join() t2.join()

wait() 超时用法

wait() 方法支持 timeout 参数,指定最大等待时间(秒)。超时后即使事件未被设置,wait() 也会返回 False,使得线程可以继续执行其他任务而不会永久阻塞。

import threading ev = threading.Event() result = ev.wait(timeout=3.0) # 最多等待3秒 if result: print("事件已触发") else: print("等待超时,事件未触发")

多个线程等待同一个Event

Event 对象支持一对多的通知模式。当多个线程同时 wait() 在同一个 Event 上时,一旦某个线程调用 set(),所有等待线程将被同时唤醒。

import threading import time ev = threading.Event() def worker(name): print(f"{name} 已就绪,等待开始信号") ev.wait() print(f"{name} 收到信号,开始工作") threads = [threading.Thread(target=worker, args=(f"Worker-{i}",)) for i in range(5)] for t in threads: t.start() time.sleep(1) print("裁判发出开始信号!") ev.set() # 所有worker同时被唤醒 for t in threads: t.join()

二、Event的典型应用场景

1. 线程启动同步

主线程创建多个工作线程后,通过 Event 控制所有线程同时开始执行,避免线程启动时间差异导致的不确定性。这在性能测试和基准测试中尤为常见。

import threading import time start_event = threading.Event() def runner(name): start_event.wait() # 等待发令枪 for i in range(3): print(f"{name} {i+1}圈") time.sleep(0.5) threads = [threading.Thread(target=runner, args=(f"选手{i}",)) for i in range(4)] for t in threads: t.start() print("预备——跑!") start_event.set()

2. 优雅关闭信号

在长期运行的服务程序中,使用 Event 作为关闭信号,当主线程需要停止服务时设置 Event,工作线程检测到信号后自行清理并退出,避免强制终止带来的资源泄漏问题。

import threading import time stop_event = threading.Event() def worker(): while not stop_event.is_set(): print("工作中...") stop_event.wait(timeout=1.0) print("工作线程收到停止信号,清理退出") t = threading.Thread(target=worker) t.start() time.sleep(3) print("主线程发出停止信号") stop_event.set() t.join() print("程序已安全退出")

3. 定时任务协调

Event 结合 time 模块可以实现轻量级的定时任务调度。一个线程负责计时,到点时 set() 触发 Event,工作线程开始执行周期性任务。

设计建议:Event 适用于"一次性触发"或"开关型"同步场景。如果需要多次触发,可以在循环中反复 clear() 再 wait()。但若同步逻辑复杂(如需要计数或阶段同步),应考虑使用 Barrier 或 Condition 对象。

三、Barrier屏障/栅栏

threading.Barrier 是一种多线程同步原语,它创建一个"屏障点",所有线程必须在屏障点集合完毕后才能继续执行。Barrier 的核心概念是 parties(参与方数量)——即需要等待的线程总数。当所有 parties 都调用了 wait() 后,屏障打开,所有线程同时通过。

核心方法

基础用法示例

import threading import time barrier = threading.Barrier(3) # 3个线程到达后继续 def worker(name): print(f"{name} 准备阶段") time.sleep(1) pos = barrier.wait() # 等待所有worker print(f"{name} 同步后继续执行,我是第{pos} 个到达的") threads = [threading.Thread(target=worker, args=(f"Worker-{i}",)) for i in range(3)] for t in threads: t.start() for t in threads: t.join()

wait() 返回值详解

Barrier.wait() 返回一个 0 到 parties-1 之间的整数,表示当前线程在所有参与线程中的到达顺序。第一个到达的线程返回 0,最后一个返回 parties-1。可以利用这个返回值指派"领导者"线程——例如让最后一个到达的线程执行汇总操作。

import threading barrier = threading.Barrier(4) def phase_worker(): # 第一阶段 pos = barrier.wait() if pos == barrier.parties - 1: print("所有线程完成第一阶段,开始汇总") # 第二阶段 pos = barrier.wait() if pos == barrier.parties - 1: print("所有线程完成第二阶段")

四、Barrier的abort与reset

异常处理机制

当 Barrier 出现异常时(如某个线程超时、被意外终止),应该调用 abort() 将屏障置为中断状态。此时所有正在 wait() 的线程会立即抛出 threading.BrokenBarrierError 异常,避免线程永久阻塞。

import threading import time barrier = threading.Barrier(3) def worker(name): print(f"{name} 到达屏障") try: barrier.wait() print(f"{name} 通过屏障") except threading.BrokenBarrierError: print(f"{name} 检测到屏障已损坏") def aborter(): time.sleep(0.5) print("中断屏障!") barrier.abort() t1 = threading.Thread(target=worker, args=("W1",)) t2 = threading.Thread(target=worker, args=("W2",)) t3 = threading.Thread(target=aborter) t1.start() t2.start() t3.start() for t in (t1, t2, t3): t.join()

reset() 与 BrokenBarrierError

reset() 方法将屏障恢复到初始状态(n_waiting = 0, broken = False),但任何正在等待的线程会收到 BrokenBarrierError。需要注意的是,reset() 调用后,之前到达的线程会被"遗忘"——它们需要重新调用 wait()。

import threading import time barrier = threading.Barrier(2) def slow_worker(): time.sleep(2) print("慢线程到达屏障") try: barrier.wait() print("慢线程通过屏障") except threading.BrokenBarrierError: print("慢线程:屏障损坏") t = threading.Thread(target=slow_worker) t.start() time.sleep(0.5) barrier.wait() # 主线程先到达 print("主线程到达,但慢线程还要1.5秒") # 重置屏障,慢线程的 wait() 将收到 BrokenBarrierError barrier.reset() print("屏障已重置") t.join()

wait() 超时处理

wait() 方法也支持 timeout 参数。如果当前线程等待超时,屏障会进入 broken 状态,所有其他正在等待的线程都会收到 BrokenBarrierError。因此设置 timeout 时需要确保所有线程的一致性或处理 BrokenBarrierError。

最佳实践:使用 Barrier 时始终在 wait() 周围包裹 try/except BrokenBarrierError,确保其中一个线程异常时所有线程都能安全退出,避免死锁。

五、Event vs Barrier对比

Event 和 Barrier 虽然都是线程协调工具,但它们的同步模型有本质区别。Event 采用"一对多"的信号广播模式,而 Barrier 采用"多对多"的屏障等待模式。下面通过表格对比两者的核心差异:

对比维度 Event(事件) Barrier(屏障)
同步模型 一对多:一个线程发信号,多个线程接收 多对多:所有线程互相等待,全部到达后一起继续
是否可重用 是,通过 clear() 重置标志后可再次使用 是,通过 reset() 重置,但会引发异常
是否需要知道线程数 不需要,Event 本身不跟踪等待者数量 需要,必须指定 parties 数量
适用场景 启动信号、停止信号、超时等待 分阶段并行计算、赛跑同时出发、多阶段流水线
异常处理 无特殊异常,超时返回 False BrokenBarrierError,需显式捕获
典型类比 发令枪(一个鸣枪,所有选手起跑) 接力赛交接点(所有队员到齐后才能交接)

何时选择 Event

何时选择 Barrier

一句话总结:Event 适合"信号通知"场景——一个线程告诉其他线程"可以开始了";Barrier 适合"阶段同步"场景——所有线程互相等待,确认彼此都已准备好再一起前进。