← 返回并发编程目录
← 返回学习笔记首页
Lock互斥锁与RLock可重入锁
Python并发编程专题 · 线程同步的核心原语
专题: Python并发编程系统学习
关键词: Python, 并发编程, Lock, RLock, 互斥锁, 可重入锁, 线程同步, 竞态条件
一、竞争条件(Race Condition)问题
在多线程编程中,当多个线程同时访问和修改同一份共享数据时,就会产生竞争条件 (Race Condition)。竞争条件是指程序的行为依赖于多个线程的执行顺序,而线程的调度顺序是不可预测的,这导致程序的最终结果变得不确定。
竞争条件之所以发生,是因为看似简单的操作(如 counter += 1)在底层并不是原子操作。Python 解释器在执行这一行代码时,实际上会分解为三个步骤:读取变量的当前值、将值加1、将新值写回变量。在这三步之间,操作系统随时可能切换线程,导致数据不一致。
以下是一个经典的竞争条件示例:
import threading
counter = 0
def increment ():
global counter
for _ in range(100_000 ):
counter += 1 # 非原子操作:读-改-写三步
# 创建两个线程同时执行 increment
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print (f"最终结果: {counter}" ) # 期望 200000,但实际远小于此值
运行上述代码,每次执行得到的结果都可能不同,且几乎永远达不到预期的200000。这是因为两个线程交替执行"读-改-写"操作,导致增量被彼此覆盖。这种现象就是典型的竞态条件 ,而解决它的核心手段就是引入互斥锁 。
注意: 即使存在 GIL(全局解释器锁),Python 线程仍然会遇到竞争条件问题。GIL 保证的是每个字节码指令的原子性,但 counter += 1 对应多条字节码指令,线程可能在中间被切换。GIL 并不能代替程序员进行显式的同步控制。
二、Lock互斥锁详解
Lock(互斥锁)是 Python 中最基本的线程同步原语。它的核心思想是:同一时刻只允许一个线程访问共享资源 。当一个线程获取了锁,其他试图获取同一把锁的线程必须等待,直到持有锁的线程将其释放。
2.1 Lock的基本用法
Lock 对象提供两个核心方法:acquire() 用于获取锁,release() 用于释放锁。Lock 只有两种状态:锁定 和未锁定 。初始状态为未锁定。
import threading
lock = threading.Lock()
# 方式一:手动 acquire/release
lock.acquire()
try :
# 临界区代码
pass
finally :
lock.release()
# 方式二(推荐):使用 with 语句自动管理
with lock:
# 临界区代码
pass
2.2 用Lock解决竞争条件
回到之前的累加器示例,使用 Lock 后可以确保每次加法操作的原子性:
import threading
counter = 0
lock = threading.Lock()
def safe_increment ():
global counter
for _ in range(100_000 ):
with lock:
counter += 1 # 加锁保护,确保原子操作
t1 = threading.Thread(target=safe_increment)
t2 = threading.Thread(target=safe_increment)
t1.start()
t2.start()
t1.join()
t2.join()
print (f"最终结果: {counter}" ) # 稳定输出 200000
加上锁之后,每次执行都能稳定输出 200000。这是因为 with lock 确保了同一时刻只有一个线程在执行 counter += 1,另一个线程必须等待当前线程完成整个"读-改-写"操作后才能进行。
2.3 acquire() 的阻塞特性
当锁已被占用时,调用 acquire() 的线程会进入阻塞等待 状态,直到锁被释放。这是 Lock 的默认行为,称为阻塞式获取 。
import threading
import time
lock = threading.Lock()
def worker (name ):
print (f"{name} 尝试获取锁..." )
lock.acquire()
print (f"{name} 已获取锁,开始工作" )
time.sleep(2 )
lock.release()
print (f"{name} 释放了锁" )
t1 = threading.Thread(target=worker, args=("线程A" ,))
t2 = threading.Thread(target=worker, args=("线程B" ,))
t1.start()
time.sleep(0.1 )
t2.start() # 线程B会阻塞在 acquire() 处,等待线程A释放锁
在这个示例中,虽然线程A和线程B几乎同时启动,但线程B会在 acquire() 处阻塞,直到线程A调用 release() 后才获取到锁继续执行。这种阻塞机制是互斥锁实现线程同步的核心。
三、Lock的典型陷阱
Lock 虽然简单直观,但如果使用不当,很容易落入一些典型陷阱。最常见的两个问题是死锁 和锁未释放 。
3.1 死锁(Deadlock)
死锁是指两个或多个线程相互等待对方释放资源,导致所有线程都无法继续执行的状态。以下是一个经典的死锁示例:
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1 ():
with lock_a:
time.sleep(0.1 ) # 给线程2时间获取lock_b
with lock_b: # 等待线程2释放lock_b
print ("线程1完成" )
def thread_2 ():
with lock_b:
time.sleep(0.1 ) # 给线程1时间获取lock_a
with lock_a: # 等待线程1释放lock_a
print ("线程2完成" )
t1 = threading.Thread(target=thread_1)
t2 = threading.Thread(target=thread_2)
t1.start()
t2.start()
t1.join()
t2.join() # 程序卡在这里,永远不会完成
在这个例子中:线程1持有lock_a等待lock_b,线程2持有lock_b等待lock_a,双方都在等待对方释放资源,形成了死锁。解决死锁的常见策略包括:固定加锁顺序 (总是先获取lock_a再获取lock_b)、使用 trylock 机制 (acquire(blocking=False))、或使用 RLock (适用于同一线程内多次获取的场景)。
最佳实践: 避免嵌套锁的最简单方法是尽量减少锁的粒度,使得临界区尽可能小,从而降低嵌套锁的需求。如果确实需要多个锁,务必确保所有线程以相同的顺序 获取锁。
3.2 忘记释放锁
如果在使用 acquire()/release() 模式时,临界区代码抛出异常,release() 可能永远不会被执行,导致其他线程永远阻塞:
# 危险的写法:异常会导致锁永远不被释放
lock.acquire()
risky_operation() # 如果这里抛出异常...
lock.release() # ...这行不会被执行!
# 安全的写法:使用 try/finally
lock.acquire()
try :
risky_operation()
finally :
lock.release()
# 最安全的写法(推荐):使用 with 语句
with lock:
risky_operation() # 即使异常,锁也会自动释放
使用 with 语句是 Python 中最推荐的锁管理方式,它自动在进入时调用 acquire(),在退出时(无论是否发生异常)调用 release(),从根本上杜绝了忘记释放锁的问题。
四、RLock可重入锁
RLock(可重入锁)是 Lock 的一个变体,它允许同一个线程多次获取锁 ,而不会造成死锁。这一特性使得 RLock 在递归调用或同一线程内存在多层锁获取的场景中非常有用。
4.1 为什么需要RLock
考虑以下场景:一个函数内部调用了另一个需要锁的函数,如果使用普通的 Lock,就会发生死锁:
import threading
lock = threading.Lock()
def inner ():
with lock:
print ("进入 inner" )
def outer ():
with lock: # 第一次获取锁,成功
print ("进入 outer" )
inner() # 再次获取同一把锁,Lock 会导致死锁!
outer() # 程序卡住
当 outer() 获取 lock 后调用 inner(),inner() 再次尝试获取同一把 lock。对于普通的 Lock,由于锁已被当前线程持有且未释放,再次 acquire() 会进入阻塞等待——但能释放锁的正是当前线程本身,导致线程永远等待自己,形成死锁。
4.2 RLock的解决方案
RLock 通过引入拥有者 (Owner)和递归计数器 (Recursion Count)的概念来解决这个问题:
import threading
rlock = threading.RLock()
def inner ():
with rlock:
print ("进入 inner" )
def outer ():
with rlock: # 第一次获取,计数器 = 1
print ("进入 outer" )
inner() # 再次获取同一RLock,计数器 = 2,不会阻塞!
# inner 返回后,计数器 = 1
# 退出 outer 的 with 块,计数器 = 0
outer() # 正常运行
RLock 的工作原理可以概括为:
拥有者 :RLock 记录当前持有锁的线程ID。只有拥有者线程才能再次 acquire 这个锁。
递归计数器 :每次同一个线程 acquire() 时计数器+1,每次 release() 时计数器-1。当计数器归零时,锁才真正被释放。
其他线程仍然需要等待计数器归零(即拥有者完全释放锁)后才能获取该 RLock。
4.3 RLock实际应用场景
RLock 在以下场景中特别有用:
import threading
class ThreadSafeCounter :
def __init__ (self ):
self .count = 0
self .lock = threading.RLock()
def increment (self ):
with self .lock:
self .count += 1
def add_and_report (self ):
with self .lock: # 外部方法获取锁
self .increment() # 内部方法再次获取同一锁,RLock 允许
return self .count
def batch_add (self , n):
with self .lock:
for _ in range(n):
self .increment() # 循环中安全地获取同一锁
counter = ThreadSafeCounter()
counter.add_and_report() # 正常运行,不会死锁
这种公共方法调用私有方法 、外层方法调用内层方法 的嵌套锁获取模式,在实际面向对象编程中非常常见。如果使用普通的 Lock,就需要将每个方法设计成"调用者已持有锁"的形式,增加了编码负担和出错风险。
五、Lock vs RLock对比
理解 Lock 和 RLock 的区别对于在正确的场景选择合适的工具至关重要。下面通过表格对比两者的核心差异:
对比维度
Lock(互斥锁)
RLock(可重入锁)
可重入性
不支持。同一线程不可再次 acquire
支持。同一线程可多次 acquire
拥有者追踪
无。不记录持有者线程
有。记录持有者线程和递归计数
release 行为
任何线程都可 release(危险!)
只有拥有者线程可 release
性能
较快。内部实现更简单
较慢。需要额外维护拥有者和计数器
适用场景
简单的临界区保护,无嵌套锁需求
递归调用、内部方法调用、嵌套锁获取
死锁风险
同一线程重复acquire会导致死锁
同一线程重复acquire不会死锁
with语句支持
完全支持
完全支持
核心原则: 优先使用 Lock,它更轻量、性能更好。只有当确实需要同一线程多次获取同一把锁 时才使用 RLock。不要因为"RLock 更安全"就一律使用 RLock——它掩盖了潜在的嵌套锁问题,而嵌套锁往往可以通过更好的架构设计来避免。
六、acquire(timeout)超时机制
无论是 Lock 还是 RLock,acquire() 方法都支持超时参数 ,允许线程在无法获取锁时等待一段有限时间,而不是无限期阻塞。这一机制对于构建响应灵敏的应用至关重要。
6.1 timeout 参数详解
acquire(timeout=N) 方法的行为如下:
timeout=None (默认):无限阻塞,直到获取到锁。
timeout=正数 :最多等待 N 秒。如果在 N 秒内获取到锁则返回 True,超时则返回 False。
timeout=0 :非阻塞模式。如果能立即获取锁则返回 True,否则立即返回 False(相当于 blocking=False)。
import threading
import time
lock = threading.Lock()
def worker (name ):
acquired = lock.acquire(timeout=3 ) # 最多等待3秒
if acquired:
try :
print (f"{name} 获取到锁" )
time.sleep(2 )
finally :
lock.release()
else :
print (f"{name} 超时未能获取锁,执行备选逻辑" )
t1 = threading.Thread(target=worker, args=("线程A" ,))
t2 = threading.Thread(target=worker, args=("线程B" ,))
t1.start()
time.sleep(0.5 )
t2.start() # 如果线程A持有锁超过3秒,线程B会超时
6.2 非阻塞获取锁
当 blocking=False(等价于 timeout=0)时,acquire() 会立即返回,不会阻塞:
import threading
lock = threading.Lock()
lock.acquire() # 获取锁
# 尝试非阻塞获取
result = lock.acquire(blocking=False )
print (result) # 输出 False,因为锁已被当前线程持有
# 注意:当前线程已持有锁,却再次尝试获取普通Lock
# 阻塞模式会死锁,非阻塞模式直接返回False
lock.release() # 释放锁
使用场景: 超时机制在以下场景中特别有用:需要优雅关闭的服务器线程(定期检查关闭标志)、实时系统中的看门狗机制、用户交互不响应时提供反馈(而不是卡死)。合理使用 timeout 可以让程序在意外情况下保持响应能力。
七、最佳实践
经过对 Lock 和 RLock 的深入学习,以下是 Python 线程同步中需要遵循的核心最佳实践:
7.1 最小化临界区
临界区就是被锁保护的代码区域。临界区越大,线程之间的并发度就越低。应该只将真正需要保护的操作放在临界区内,不要将无关操作(如 I/O、日志记录)放在锁的保护之下:
# 不推荐:临界区过大,不必要的操作占了锁
with lock:
data = fetch_from_db()
result = expensive_computation(data)
log_to_file(result)
shared_cache["key" ] = result
# 推荐:最小化临界区,只保护共享资源访问
data = fetch_from_db()
result = expensive_computation(data)
log_to_file(result)
with lock:
shared_cache["key" ] = result # 只保护这一行
7.2 优先使用with语句
始终优先使用 with 语句而非手动 acquire()/release()。with 语句能自动处理异常情况下的锁释放,避免因忘记 release 导致死锁。这是 Python 社区的共识,也是标准库文档推荐的做法。
7.3 避免锁的嵌套
嵌套锁(一个锁的临界区内获取另一个锁)是死锁的主要来源。如果确实需要保护多个资源,考虑以下替代方案:
合并数据结构 :将多个共享数据合并为一个对象,用一把锁保护整个对象。
固定顺序 :如果无法合并,确保所有线程以完全相同的顺序获取锁。
使用更高级的抽象 :如 Queue 或 concurrent.futures。
7.4 使用Queue代替共享内存
许多并发问题可以通过将共享内存模式改为消息传递 模式来简化。Python 的 queue.Queue 是线程安全的,内部已经处理了所有同步细节,开发者只需要关注生产和消费数据:
import threading
import queue
import time
# 使用 Queue 替代共享变量+Lock
q = queue.Queue(maxsize=10 )
def producer ():
for i in range(5 ):
q.put(i)
print (f"生产: {i}" )
time.sleep(0.5 )
def consumer ():
while True :
item = q.get()
if item is None : # 哨兵值,表示结束
break
print (f"消费: {item}" )
q.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
7.5 选择合适的锁类型
场景
推荐工具
保护一个简单计数器或标志位
Lock
保护一个复杂的数据结构
Lock + 封装类
递归函数或嵌套调用需加锁
RLock
生产者-消费者模式
queue.Queue
多个线程等待同一事件
Event 或 Condition
读写分离(读多写少)
threading 不支持,需用第三方库
7.6 思考:线程安全与锁的关系
最后需要强调的是:锁只是实现线程安全的一种手段,不是全部 。更高级的并发模型(如不可变数据结构、Actor模型、消息传递、无锁编程)同样可以实现线程安全,有时甚至更优。在 Python 中:
对于I/O密集型 应用,多线程 + Lock 是不错的选择。
对于CPU密集型 计算,多进程(multiprocessing)或 asyncio 协程通常比多线程更合适。
设计层面尽量使用 Queue、concurrent.futures 等高层次抽象,减少直接操作 Lock。
一句话总结: Lock 是最基本的同步原语,适用于简单的互斥场景;RLock 在需要同一线程重入时避免不必要的死锁;但最好的"锁"是不需要锁的设计 ——通过消息传递、不可变数据和高层抽象来消除共享可变状态。