Lock互斥锁与RLock可重入锁

Python并发编程专题 · 线程同步的核心原语

专题:Python并发编程系统学习

关键词:Python, 并发编程, Lock, RLock, 互斥锁, 可重入锁, 线程同步, 竞态条件

一、竞争条件(Race Condition)问题

在多线程编程中,当多个线程同时访问和修改同一份共享数据时,就会产生竞争条件(Race Condition)。竞争条件是指程序的行为依赖于多个线程的执行顺序,而线程的调度顺序是不可预测的,这导致程序的最终结果变得不确定。

竞争条件之所以发生,是因为看似简单的操作(如 counter += 1)在底层并不是原子操作。Python 解释器在执行这一行代码时,实际上会分解为三个步骤:读取变量的当前值、将值加1、将新值写回变量。在这三步之间,操作系统随时可能切换线程,导致数据不一致。

以下是一个经典的竞争条件示例:

import threading counter = 0 def increment(): global counter for _ in range(100_000): counter += 1 # 非原子操作:读-改-写三步 # 创建两个线程同时执行 increment t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment) t1.start() t2.start() t1.join() t2.join() print(f"最终结果: {counter}") # 期望 200000,但实际远小于此值

运行上述代码,每次执行得到的结果都可能不同,且几乎永远达不到预期的200000。这是因为两个线程交替执行"读-改-写"操作,导致增量被彼此覆盖。这种现象就是典型的竞态条件,而解决它的核心手段就是引入互斥锁

注意:即使存在 GIL(全局解释器锁),Python 线程仍然会遇到竞争条件问题。GIL 保证的是每个字节码指令的原子性,但 counter += 1 对应多条字节码指令,线程可能在中间被切换。GIL 并不能代替程序员进行显式的同步控制。

二、Lock互斥锁详解

Lock(互斥锁)是 Python 中最基本的线程同步原语。它的核心思想是:同一时刻只允许一个线程访问共享资源。当一个线程获取了锁,其他试图获取同一把锁的线程必须等待,直到持有锁的线程将其释放。

2.1 Lock的基本用法

Lock 对象提供两个核心方法:acquire() 用于获取锁,release() 用于释放锁。Lock 只有两种状态:锁定未锁定。初始状态为未锁定。

import threading lock = threading.Lock() # 方式一:手动 acquire/release lock.acquire() try: # 临界区代码 pass finally: lock.release() # 方式二(推荐):使用 with 语句自动管理 with lock: # 临界区代码 pass

2.2 用Lock解决竞争条件

回到之前的累加器示例,使用 Lock 后可以确保每次加法操作的原子性:

import threading counter = 0 lock = threading.Lock() def safe_increment(): global counter for _ in range(100_000): with lock: counter += 1 # 加锁保护,确保原子操作 t1 = threading.Thread(target=safe_increment) t2 = threading.Thread(target=safe_increment) t1.start() t2.start() t1.join() t2.join() print(f"最终结果: {counter}") # 稳定输出 200000

加上锁之后,每次执行都能稳定输出 200000。这是因为 with lock 确保了同一时刻只有一个线程在执行 counter += 1,另一个线程必须等待当前线程完成整个"读-改-写"操作后才能进行。

2.3 acquire() 的阻塞特性

当锁已被占用时,调用 acquire() 的线程会进入阻塞等待状态,直到锁被释放。这是 Lock 的默认行为,称为阻塞式获取

import threading import time lock = threading.Lock() def worker(name): print(f"{name} 尝试获取锁...") lock.acquire() print(f"{name} 已获取锁,开始工作") time.sleep(2) lock.release() print(f"{name} 释放了锁") t1 = threading.Thread(target=worker, args=("线程A",)) t2 = threading.Thread(target=worker, args=("线程B",)) t1.start() time.sleep(0.1) t2.start() # 线程B会阻塞在 acquire() 处,等待线程A释放锁

在这个示例中,虽然线程A和线程B几乎同时启动,但线程B会在 acquire() 处阻塞,直到线程A调用 release() 后才获取到锁继续执行。这种阻塞机制是互斥锁实现线程同步的核心。

三、Lock的典型陷阱

Lock 虽然简单直观,但如果使用不当,很容易落入一些典型陷阱。最常见的两个问题是死锁锁未释放

3.1 死锁(Deadlock)

死锁是指两个或多个线程相互等待对方释放资源,导致所有线程都无法继续执行的状态。以下是一个经典的死锁示例:

import threading import time lock_a = threading.Lock() lock_b = threading.Lock() def thread_1(): with lock_a: time.sleep(0.1) # 给线程2时间获取lock_b with lock_b: # 等待线程2释放lock_b print("线程1完成") def thread_2(): with lock_b: time.sleep(0.1) # 给线程1时间获取lock_a with lock_a: # 等待线程1释放lock_a print("线程2完成") t1 = threading.Thread(target=thread_1) t2 = threading.Thread(target=thread_2) t1.start() t2.start() t1.join() t2.join() # 程序卡在这里,永远不会完成

在这个例子中:线程1持有lock_a等待lock_b,线程2持有lock_b等待lock_a,双方都在等待对方释放资源,形成了死锁。解决死锁的常见策略包括:固定加锁顺序(总是先获取lock_a再获取lock_b)、使用 trylock 机制acquire(blocking=False))、或使用 RLock(适用于同一线程内多次获取的场景)。

最佳实践:避免嵌套锁的最简单方法是尽量减少锁的粒度,使得临界区尽可能小,从而降低嵌套锁的需求。如果确实需要多个锁,务必确保所有线程以相同的顺序获取锁。

3.2 忘记释放锁

如果在使用 acquire()/release() 模式时,临界区代码抛出异常,release() 可能永远不会被执行,导致其他线程永远阻塞:

# 危险的写法:异常会导致锁永远不被释放 lock.acquire() risky_operation() # 如果这里抛出异常... lock.release() # ...这行不会被执行! # 安全的写法:使用 try/finally lock.acquire() try: risky_operation() finally: lock.release() # 最安全的写法(推荐):使用 with 语句 with lock: risky_operation() # 即使异常,锁也会自动释放

使用 with 语句是 Python 中最推荐的锁管理方式,它自动在进入时调用 acquire(),在退出时(无论是否发生异常)调用 release(),从根本上杜绝了忘记释放锁的问题。

四、RLock可重入锁

RLock(可重入锁)是 Lock 的一个变体,它允许同一个线程多次获取锁,而不会造成死锁。这一特性使得 RLock 在递归调用或同一线程内存在多层锁获取的场景中非常有用。

4.1 为什么需要RLock

考虑以下场景:一个函数内部调用了另一个需要锁的函数,如果使用普通的 Lock,就会发生死锁:

import threading lock = threading.Lock() def inner(): with lock: print("进入 inner") def outer(): with lock: # 第一次获取锁,成功 print("进入 outer") inner() # 再次获取同一把锁,Lock 会导致死锁! outer() # 程序卡住

outer() 获取 lock 后调用 inner()inner() 再次尝试获取同一把 lock。对于普通的 Lock,由于锁已被当前线程持有且未释放,再次 acquire() 会进入阻塞等待——但能释放锁的正是当前线程本身,导致线程永远等待自己,形成死锁。

4.2 RLock的解决方案

RLock 通过引入拥有者(Owner)和递归计数器(Recursion Count)的概念来解决这个问题:

import threading rlock = threading.RLock() def inner(): with rlock: print("进入 inner") def outer(): with rlock: # 第一次获取,计数器 = 1 print("进入 outer") inner() # 再次获取同一RLock,计数器 = 2,不会阻塞! # inner 返回后,计数器 = 1 # 退出 outer 的 with 块,计数器 = 0 outer() # 正常运行

RLock 的工作原理可以概括为:

4.3 RLock实际应用场景

RLock 在以下场景中特别有用:

import threading class ThreadSafeCounter: def __init__(self): self.count = 0 self.lock = threading.RLock() def increment(self): with self.lock: self.count += 1 def add_and_report(self): with self.lock: # 外部方法获取锁 self.increment() # 内部方法再次获取同一锁,RLock 允许 return self.count def batch_add(self, n): with self.lock: for _ in range(n): self.increment() # 循环中安全地获取同一锁 counter = ThreadSafeCounter() counter.add_and_report() # 正常运行,不会死锁

这种公共方法调用私有方法外层方法调用内层方法的嵌套锁获取模式,在实际面向对象编程中非常常见。如果使用普通的 Lock,就需要将每个方法设计成"调用者已持有锁"的形式,增加了编码负担和出错风险。

五、Lock vs RLock对比

理解 Lock 和 RLock 的区别对于在正确的场景选择合适的工具至关重要。下面通过表格对比两者的核心差异:

对比维度 Lock(互斥锁) RLock(可重入锁)
可重入性 不支持。同一线程不可再次 acquire 支持。同一线程可多次 acquire
拥有者追踪 无。不记录持有者线程 有。记录持有者线程和递归计数
release 行为 任何线程都可 release(危险!) 只有拥有者线程可 release
性能 较快。内部实现更简单 较慢。需要额外维护拥有者和计数器
适用场景 简单的临界区保护,无嵌套锁需求 递归调用、内部方法调用、嵌套锁获取
死锁风险 同一线程重复acquire会导致死锁 同一线程重复acquire不会死锁
with语句支持 完全支持 完全支持

核心原则:优先使用 Lock,它更轻量、性能更好。只有当确实需要同一线程多次获取同一把锁时才使用 RLock。不要因为"RLock 更安全"就一律使用 RLock——它掩盖了潜在的嵌套锁问题,而嵌套锁往往可以通过更好的架构设计来避免。

六、acquire(timeout)超时机制

无论是 Lock 还是 RLock,acquire() 方法都支持超时参数,允许线程在无法获取锁时等待一段有限时间,而不是无限期阻塞。这一机制对于构建响应灵敏的应用至关重要。

6.1 timeout 参数详解

acquire(timeout=N) 方法的行为如下:

import threading import time lock = threading.Lock() def worker(name): acquired = lock.acquire(timeout=3) # 最多等待3秒 if acquired: try: print(f"{name} 获取到锁") time.sleep(2) finally: lock.release() else: print(f"{name} 超时未能获取锁,执行备选逻辑") t1 = threading.Thread(target=worker, args=("线程A",)) t2 = threading.Thread(target=worker, args=("线程B",)) t1.start() time.sleep(0.5) t2.start() # 如果线程A持有锁超过3秒,线程B会超时

6.2 非阻塞获取锁

blocking=False(等价于 timeout=0)时,acquire() 会立即返回,不会阻塞:

import threading lock = threading.Lock() lock.acquire() # 获取锁 # 尝试非阻塞获取 result = lock.acquire(blocking=False) print(result) # 输出 False,因为锁已被当前线程持有 # 注意:当前线程已持有锁,却再次尝试获取普通Lock # 阻塞模式会死锁,非阻塞模式直接返回False lock.release() # 释放锁

使用场景:超时机制在以下场景中特别有用:需要优雅关闭的服务器线程(定期检查关闭标志)、实时系统中的看门狗机制、用户交互不响应时提供反馈(而不是卡死)。合理使用 timeout 可以让程序在意外情况下保持响应能力。

七、最佳实践

经过对 Lock 和 RLock 的深入学习,以下是 Python 线程同步中需要遵循的核心最佳实践:

7.1 最小化临界区

临界区就是被锁保护的代码区域。临界区越大,线程之间的并发度就越低。应该只将真正需要保护的操作放在临界区内,不要将无关操作(如 I/O、日志记录)放在锁的保护之下:

# 不推荐:临界区过大,不必要的操作占了锁 with lock: data = fetch_from_db() result = expensive_computation(data) log_to_file(result) shared_cache["key"] = result # 推荐:最小化临界区,只保护共享资源访问 data = fetch_from_db() result = expensive_computation(data) log_to_file(result) with lock: shared_cache["key"] = result # 只保护这一行

7.2 优先使用with语句

始终优先使用 with 语句而非手动 acquire()/release()。with 语句能自动处理异常情况下的锁释放,避免因忘记 release 导致死锁。这是 Python 社区的共识,也是标准库文档推荐的做法。

7.3 避免锁的嵌套

嵌套锁(一个锁的临界区内获取另一个锁)是死锁的主要来源。如果确实需要保护多个资源,考虑以下替代方案:

7.4 使用Queue代替共享内存

许多并发问题可以通过将共享内存模式改为消息传递模式来简化。Python 的 queue.Queue 是线程安全的,内部已经处理了所有同步细节,开发者只需要关注生产和消费数据:

import threading import queue import time # 使用 Queue 替代共享变量+Lock q = queue.Queue(maxsize=10) def producer(): for i in range(5): q.put(i) print(f"生产: {i}") time.sleep(0.5) def consumer(): while True: item = q.get() if item is None: # 哨兵值,表示结束 break print(f"消费: {item}") q.task_done() threading.Thread(target=producer).start() threading.Thread(target=consumer).start()

7.5 选择合适的锁类型

场景 推荐工具
保护一个简单计数器或标志位 Lock
保护一个复杂的数据结构 Lock + 封装类
递归函数或嵌套调用需加锁 RLock
生产者-消费者模式 queue.Queue
多个线程等待同一事件 Event 或 Condition
读写分离(读多写少) threading 不支持,需用第三方库

7.6 思考:线程安全与锁的关系

最后需要强调的是:锁只是实现线程安全的一种手段,不是全部。更高级的并发模型(如不可变数据结构、Actor模型、消息传递、无锁编程)同样可以实现线程安全,有时甚至更优。在 Python 中:

一句话总结:Lock 是最基本的同步原语,适用于简单的互斥场景;RLock 在需要同一线程重入时避免不必要的死锁;但最好的"锁"是不需要锁的设计——通过消息传递、不可变数据和高层抽象来消除共享可变状态。