threading模块 — 多线程编程

Python标准库精讲专题 · 并发编程篇 · 掌握多线程编程

专题:Python标准库精讲系统学习

关键词:Python, 标准库, threading, 线程, 多线程, Thread, Lock, RLock, Semaphore, Event, Condition, GIL

一、线程概述

1.1 进程与线程的区别

进程(Process)是操作系统分配资源的最小单位,拥有独立的地址空间。线程(Thread)是CPU调度的最小单位,同一进程下的线程共享进程的内存空间(代码段、数据段、堆),但各自拥有独立的栈和寄存器上下文。

多进程适合CPU密集型任务,多线程适合I/O密集型任务。进程间切换开销大,线程间切换开销小。进程间通信复杂(需管道、队列、共享内存等),线程间通信简单(直接共享内存)。

1.2 GIL全局解释器锁

CPython解释器中的GIL(Global Interpreter Lock)是一个互斥锁,它确保任意时刻只有一个线程在执行Python字节码。这意味着在多核CPU上,Python多线程无法真正并行执行计算密集型任务。

GIL存在的根本原因是CPython的内存管理不是线程安全的。为避免多个线程同时操作对象引用计数导致内存错误,设计了GIL来序列化对Python对象的访问。虽然GIL简化了CPython的实现,但也成为多线程性能的瓶颈。

在Python 3.x中,GIL的实现经过优化(如采用"检查间隔"+"信号量切换"机制),但核心设计未变。对于CPU密集型任务,推荐使用 multiprocessing 模块或 concurrent.futures.ProcessPoolExecutor 来绕过GIL限制。

1.3 threading模块适用场景

threading模块特别适合以下场景:I/O密集型任务(网络请求、文件读写、数据库操作)、GUI程序保持界面响应、定时任务调度、生产者-消费者模式、并发Web爬虫等。对于需要并发执行但以等待I/O为主的场景,多线程能显著提升吞吐量。

核心要点:GIL使得Python多线程不能充分利用多核CPU,但通过线程调度和I/O等待释放GIL的机制,多线程在I/O密集型场景下依然能带来显著的性能提升。理解GIL是掌握Python多线程编程的关键前提。

二、Thread类

2.1 创建线程的方式

创建线程有两种主要方式:一是通过 target 参数传入可调用对象,二是继承 Thread 类并重写 run 方法。前者简洁灵活,适合简单任务;后者适合封装复杂线程逻辑。

import threading import time # 方式一:传入 target 函数 def worker(name, delay): print(f"线程 {name} 开始工作") time.sleep(delay) print(f"线程 {name} 工作完成") t1 = threading.Thread(target=worker, args=("A", 2)) t2 = threading.Thread(target=worker, args=("B", 1)) # 方式二:继承 Thread 类 class MyThread(threading.Thread): def __init__(self, name, delay): super().__init__(name=name) self.delay = delay def run(self): print(f"自定义线程 {self.name} 开始") time.sleep(self.delay) print(f"自定义线程 {self.name} 结束") t3 = MyThread("C", 1.5)

2.2 start / join / daemon

start() 方法启动线程,使其进入就绪状态等待调度。join(timeout) 方法阻塞调用线程,直到被调用线程终止或超时。daemon 属性设置守护线程——当主线程结束时守护线程自动终止,不等待其完成。非守护线程(前台线程)则会在主线程退出时继续执行,直到完成。

import threading import time def daemon_worker(): while True: print("守护线程运行中...") time.sleep(1) def normal_worker(): time.sleep(3) print("普通线程完成") d = threading.Thread(target=daemon_worker, daemon=True) n = threading.Thread(target=normal_worker) d.start() n.start() # 主线程等待 n 完成后退出,不会等待守护线程 d n.join() print("主线程结束") # 守护线程 d 会随主线程退出而强制终止

2.3 线程属性:name / ident / native_id

name 是线程的可读名称,可在创建时指定或运行时修改。ident 是线程的唯一标识符(非零整数),在线程启动后才有效,线程终止后可被重用。native_id(Python 3.8+)是操作系统分配的线程ID,由底层内核分配。

import threading t = threading.Thread(name="Worker-1", target=lambda: None) print(f"启动前: name={t.name}, ident={t.ident}, native_id={t.native_id}") t.start() print(f"启动后: name={t.name}, ident={t.ident}, native_id={t.native_id}") t.join() print(f"结束后: ident={t.ident}") # ident 仍保留结束前的值

三、Lock互斥锁

3.1 acquire / release 与上下文管理器

Lock是最基本的同步原语,只有"锁定"和"未锁定"两种状态。acquire(blocking=True, timeout=-1) 获取锁,release() 释放锁。推荐使用 with 语句作为上下文管理器,能自动管理锁的获取与释放,避免因异常导致锁未释放的死锁问题。

import threading counter = 0 lock = threading.Lock() def increment(): global counter for _ in range(100000): # 方式一:显示 acquire / release lock.acquire() try: counter += 1 finally: lock.release() # 方式二:上下文管理器(推荐) # with lock: # counter += 1 threads = [threading.Thread(target=increment) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() print(f"最终值: {counter}") # 无锁竞争时会出现竞态条件导致结果错误

acquire 的 blocking 参数可控制是否阻塞:blocking=False 时非阻塞获取锁,获取失败立即返回 False;timeout 参数指定阻塞等待的最长时间(秒),超时未获取到返回 False。这两个参数在需要避免死锁的复杂场景中非常有用。

3.2 可重入锁 RLock

RLock(可重入锁)允许同一个线程多次 acquire 而不会死锁。它维护一个锁的持有者计数,每次 acquire 计数加1,release 减1,计数归零时才真正释放。RLock适用于递归调用或一个函数中需要多次进入临界区的场景。

import threading rlock = threading.RLock() def recursive_func(n): with rlock: print(f"递归深度: {n}") if n > 0: recursive_func(n - 1) # 同一个线程可以多次进入 # 如果使用普通 Lock,上述递归调用会在第二次 acquire 时阻塞,导致死锁 recursive_func(3)

核心要点:Lock是"谁拿到谁释放"的互斥机制,同一线程不可重复获取已持有的普通Lock;RLock允许同一线程重复获取,计数器管理嵌套层级。在简单的临界区保护中用Lock,在需要同一线程重入的场景中用RLock。

四、Semaphore信号量

4.1 信号量原理

Semaphore(信号量)维护一个内部计数器,acquire() 使计数器减1(为0时阻塞),release() 使计数器加1。信号量用于控制对有限资源的并发访问数量。BoundedSemaphore是有界信号量,确保 release() 次数不超过初始值,防止计数溢出。

import threading import time import random # 限制同时最多3个线程访问资源 semaphore = threading.Semaphore(3) def access_database(thread_id): with semaphore: print(f"线程 {thread_id} 接入数据库...") time.sleep(random.uniform(0.5, 2)) print(f"线程 {thread_id} 释放数据库连接") threads = [threading.Thread(target=access_database, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join()

4.2 BoundedSemaphore 有界信号量

BoundedSemaphore 是 Semaphore 的子类,区别在于 release() 时如果当前值超过初始值会抛出 ValueError。这能有效防止编程错误导致的信号量计数异常,是资源池管理中的首选。

import threading # 普通信号量 s = threading.Semaphore(2) s.acquire() s.acquire() s.release() s.release() s.release() # 没问题,计数变为3,超出初始值 # 有界信号量 bs = threading.BoundedSemaphore(2) bs.acquire() bs.acquire() bs.release() bs.release() # bs.release() # 抛出 ValueError: Semaphore released too many times

适用场景:数据库连接池限制、API并发请求限制、文件句柄池、线程池大小控制。信号量优雅地解决了"资源有限、请求无限"的并发控制问题。

五、Event事件

5.1 线程间通知机制

Event(事件)是线程间通信的简单机制,内部维护一个bool标志位。set() 将标志设为True并唤醒所有等待线程;clear() 将标志设为False;wait(timeout) 阻塞直到标志为True或超时;is_set() 检查当前标志状态。

Event常用于一个线程需要等待另一个线程完成特定操作后再继续执行的场景,如服务启动完成后通知等待的客户端线程。

import threading import time event = threading.Event() def waiter(): print("等待服务启动...") event.wait() # 阻塞直到 set() print("服务已启动,开始工作") def starter(): print("正在启动服务...") time.sleep(3) print("服务启动完成") event.set() # 通知所有等待线程 t1 = threading.Thread(target=waiter) t2 = threading.Thread(target=waiter) # 多个线程等待同一个事件 t3 = threading.Thread(target=starter) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()

5.2 事件的高级用法

Event可以实现一次性同步:set() 后即使 clear(),之前已通过的线程不会回退。配合超时参数 wait(timeout) 可实现带超时的等待,避免永久阻塞。Event也支持重复使用模式——先 wait 再 set,处理完再 clear 后重复等待。

import threading import random event = threading.Event() def worker(worker_id): # 等待主线程发令 print(f"运动员 {worker_id} 准备就绪") event.wait() print(f"运动员 {worker_id} 起跑!") def referee(): print("裁判:各就各位...") time.sleep(random.uniform(1, 3)) print("裁判:跑!") event.set() threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] for t in threads: t.start() referee() for t in threads: t.join()

六、Condition条件变量

6.1 wait / notify / notify_all

Condition(条件变量)结合了 Lock 和事件通知机制。它总是与一个 Lock(或 RLock)关联使用。wait(timeout) 释放锁并阻塞,直到被 notify 唤醒后重新获取锁;notify(n=1) 唤醒至多 n 个等待线程;notify_all() 唤醒所有等待线程。

使用 Condition 必须遵循固定的模式:在 with 语句下先检查条件是否满足,不满足则 wait 释放锁等待通知,被唤醒后再重新检查条件。

6.2 生产者-消费者模式

import threading import time import random class Queue: def __init__(self, maxsize=5): self.items = [] self.maxsize = maxsize self.cond = threading.Condition() def put(self, item): with self.cond: while len(self.items) >= self.maxsize: self.cond.wait() # 队列满,等待消费者消费 self.items.append(item) print(f"生产 {item}, 队列: {len(self.items)}") self.cond.notify() # 通知消费者 def get(self): with self.cond: while not self.items: self.cond.wait() # 队列空,等待生产者生产 item = self.items.pop(0) print(f"消费 {item}, 队列: {len(self.items)}") self.cond.notify() # 通知生产者 return item def producer(q, pid): for i in range(5): time.sleep(random.uniform(0.3, 0.8)) q.put(f"产品-{pid}-{i}") def consumer(q): for _ in range(10): time.sleep(random.uniform(0.5, 1.0)) q.get() q = Queue(maxsize=3) tp = threading.Thread(target=producer, args=(q, 1)) tc = threading.Thread(target=consumer, args=(q,)) tp.start() tc.start() tp.join() tc.join()

核心要点:Condition 与 Lock 的区别在于,Lock 只提供互斥访问,而 Condition 还提供了线程间等待/通知的语义。生产者-消费者模式是 Condition 最经典的用法——生产者等待"不满"条件,消费者等待"不空"条件。

七、Barrier屏障

7.1 多线程同步到达

Barrier(屏障)用于让指定数量(parties)的线程在某个点上同步等待,所有线程都到达屏障点后才一起继续执行。Barrier.wait(timeout) 使线程在屏障处等待,返回一个指示当前线程在屏障中位置的值(0到parties-1)。

Barrier 特别适用于分阶段计算:每个线程完成阶段一的工作后,必须在屏障处等待所有线程都完成,才能进入阶段二。这种模式在并行计算和测试中非常常见。

import threading import time def worker(phase, barrier, worker_id): print(f"阶段{phase}: 工人{worker_id} 开始工作") time.sleep(worker_id * 0.5) # 不同工人工时不同 print(f"阶段{phase}: 工人{worker_id} 到达屏障") barrier.wait() # 所有工人都到达屏障后才会继续 print(f"阶段{phase}: 工人{worker_id} 通过屏障") barrier = threading.Barrier(3) for phase in range(1, 4): print(f"\n=== 阶段 {phase} 开始 ===") threads = [threading.Thread(target=worker, args=(phase, barrier, i)) for i in range(3)] for t in threads: t.start() for t in threads: t.join()

7.2 Barrier 属性与重置

Barrier 对象提供多个有用的属性和方法:parties 属性返回线程总数;n_waiting 属性返回当前正在等待的线程数;broken 属性指示屏障是否被破坏。abort() 方法将屏障置于 broken 状态,所有等待线程会收到 BrokenBarrierError 异常。reset() 方法将屏障重置为初始状态,可用于多次同步。

import threading def fragile_worker(barrier, wid): print(f"线程 {wid} 到达屏障") try: result = barrier.wait(timeout=1) print(f"线程 {wid} 通过,位置: {result}") except threading.BrokenBarrierError: print(f"线程 {wid} 检测到屏障破裂") barrier = threading.Barrier(3) # 只启动2个线程,第三个线程永远不会到达 threads = [threading.Thread(target=fragile_worker, args=(barrier, i)) for i in range(2)] for t in threads: t.start() for t in threads: t.join() print(f"屏障状态 - broken: {barrier.broken}") # 超时或部分线程未到达都会导致屏障 broken

八、Timer定时器与总结

8.1 Timer 延迟执行

Timer 是 Thread 的子类,用于在指定延迟后执行某个函数。它的构造函数 Timer(interval, function, args=None, kwargs=None) 表示在 interval 秒后调用 function。cancel() 方法可在定时器触发前取消执行。Timer 适合一次性延迟任务,而非周期性任务。

import threading import time def delayed_print(msg): print(f"[{time.strftime('%H:%M:%S')}] {msg}") print(f"[{time.strftime('%H:%M:%S')}] 启动定时器") t = threading.Timer(3.0, delayed_print, args=("3秒后执行",)) t.start() # 可取消定时器 cancel_t = threading.Timer(5.0, delayed_print, args=("这条不会被执行",)) cancel_t.start() cancel_t.cancel() # 取消执行 t.join()

如需周期性定时执行任务,可以在回调函数中重新创建 Timer,或使用 sched 模块 / threading 的 Event 循环等方案。Timer 本身不提供周期执行能力。

8.2 线程安全问题与最佳实践

多线程编程的核心挑战是共享数据的并发访问一致性。以下是一些重要的线程安全建议:

import threading import queue import time # 推荐使用 queue.Queue 进行线程间安全通信 def producer(q, stop_event): for i in range(5): q.put(i) time.sleep(0.2) stop_event.set() def consumer(q, stop_event): while not stop_event.is_set() or not q.empty(): try: item = q.get(timeout=0.5) print(f"处理: {item}") q.task_done() except queue.Empty: continue q = queue.Queue() stop = threading.Event() tp = threading.Thread(target=producer, args=(q, stop)) tc = threading.Thread(target=consumer, args=(q, stop)) tp.start() tc.start() tp.join() q.join() # 等待所有 item 被处理 tc.join()

8.3 总结

Python threading 模块为多线程编程提供了完整而强大的工具集。从基础的 Thread 类、Lock 互斥锁,到高级的 Condition 条件变量、Barrier 屏障,以及 Timer 延迟执行,每种工具都有其特定的适用场景。

理解 GIL 的局限性和线程切换的代价,有助于做出正确的技术选型。对于 I/O 密集型任务,threading 配合适当的同步机制能显著提升程序吞吐量;对于 CPU 密集型任务,则应考虑 multiprocessing 或异步编程方案。

多线程编程的核心原则始终不变:最小化共享状态、优先使用消息传递、保证临界区的互斥访问、避免死锁和活锁。掌握这些原则和 threading 模块提供的工具,就能编写出正确、高效、可维护的并发 Python 程序。

核心要点总结:

- GIL 使 Python 多线程无法并行执行 CPU 密集型任务,但 I/O 密集型场景仍能大幅提升性能

- Thread 类提供两种创建方式(target 函数 / 子类继承),start/join/daemon 控制线程生命周期

- Lock 用于互斥访问临界区,RLock 支持同一线程重入

- Semaphore 控制并发访问数量,BoundedSemaphore 防止释放次数超限

- Event 用于一次性/多次线程间通知

- Condition 实现生产者-消费者模式的条件等待与唤醒

- Barrier 让 N 个线程在同步点同时等待后再继续

- Timer 提供延迟执行能力,cancel() 可在触发前取消