import pytest
from concurrent.futures import ThreadPoolExecutor
deftest_counter_thread_safety():
counter = Counter()
n = 1000withThreadPoolExecutor(max_workers=10) as exe:
for _ inrange(n):
exe.submit(counter.increment)
assert counter._value == n
deftest_concurrent_file_writes():
written = []
lock = threading.Lock()
defsafe_write(msg: str):
with lock:
written.append(msg)
withThreadPoolExecutor(max_workers=4) as pool:
for i inrange(100):
pool.submit(safe_write, f"msg-{i}")
assertlen(written) == 100
import multiprocessing
deftest_shared_counter_via_manager():
manager = multiprocessing.Manager()
counter = manager.Value('i', 0)
lock = manager.Lock()
defworker(c, lk):
for _ inrange(500):
with lk:
c.value += 1
procs = [multiprocessing.Process(target=worker, args=(counter, lock))
for _ inrange(4)]
for p in procs: p.start()
for p in procs: p.join()
assert counter.value == 2000
3.2 进程池测试
from multiprocessing.pool import Pool
deftest_process_pool_map():
defdouble(x):
return x * 2withPool(processes=4) as pool:
result = pool.map(double, [1, 2, 3, 4, 5])
assert result == [2, 4, 6, 8, 10]
3.3 超时测试(防止死进程)
多进程测试中,如果子进程死锁,测试会无限挂起。必须设置超时保护:
import pytest
@pytest.mark.timeout(5)# 5秒超时deftest_process_with_timeout():
defbad_worker():
whileTrue: pass# 死循环模拟死锁
p = multiprocessing.Process(target=bad_worker)
p.start()
p.join(timeout=3)
if p.is_alive():
p.terminate()
pytest.fail("进程超时未返回")
deftest_race_condition_stress():
for _ inrange(100): # 重复100次提高竞争条件暴露概率
counter = Counter()
withThreadPoolExecutor(max_workers=8) as exe:
futures = [exe.submit(counter.increment) for _ inrange(500)]
for f in futures:
f.result()
assert counter._value == 500