专题:Python 测试与调试系统学习
关键词:Python, 测试, 调试, pytest-asyncio, 异步测试, 协程测试, asyncio, AsyncMock, 异步API测试
一、异步测试概述
随着Python异步编程(asyncio)的广泛普及,越来越多的项目采用async/await语法构建高性能IO密集型应用。然而,异步代码的测试与同步代码有着本质区别,面临独特的挑战:事件循环管理、协程生命周期控制、异步上下文隔离以及并发任务的竞态条件等问题,使得传统的pytest测试框架无法直接处理异步测试用例。
pytest-asyncio正是解决这一痛点的核心工具。它作为pytest的插件,为异步测试提供了完整的支持体系。当pytest运行一个标记为@pytest.mark.asyncio的测试函数时,pytest-asyncio会自动创建并管理一个事件循环,在该循环中执行协程,并收集结果或异常。这极大地简化了异步测试的编写,开发者无需手动调用asyncio.run()或管理事件循环的创建与销毁。
pytest-asyncio提供了三种asyncio模式(通过asyncio_mode配置项设置):Auto模式自动将所有async def测试函数识别为异步测试,无需显式添加装饰器,最适用于以异步为主的项目;Strict模式要求每个异步测试都必须显式标注@pytest.mark.asyncio,是最明确和安全的模式;Legacy模式向后兼容旧版本行为。从项目可维护性角度推荐使用Strict模式,因为它让测试意图一目了然。
事件循环作用域(event_loop_scope)是另一个关键概念。默认情况下每个测试函数拥有独立的事件循环,确保了测试之间的完全隔离。但在某些场景下(如共享数据库连接池),可以将作用域设置为"module"或"session"以复用事件循环,提升测试执行效率。理解不同作用域的trade-off对编写高效且可靠的异步测试至关重要。
# pytest.ini 配置示例
[pytest]
asyncio_mode = strict
testpaths = tests
python_files = test_*.py
asyncio_default_fixture_loop_scope = function
# 三种asyncio模式对比
# Auto模式:自动识别async def测试函数
# pytest.ini
# asyncio_mode = auto
async def test_fetch_data_auto(): # 无需装饰器,自动识别
result = await fetch_data()
assert result["status"] == "ok"
# Strict模式:必须显式标记
# pytest.ini
# asyncio_mode = strict
@pytest.mark.asyncio
async def test_fetch_data_strict():
result = await fetch_data()
assert result["status"] == "ok"
# 事件循环作用域配置
import pytest
# 函数级作用域(默认):每个测试独立事件循环
@pytest.mark.asyncio(loop_scope="function")
async def test_independent():
loop = asyncio.get_running_loop()
# 每个测试运行在自己的事件循环中
# 模块级作用域:同一模块共享事件循环
@pytest.mark.asyncio(loop_scope="module")
async def test_shared_1():
pass
@pytest.mark.asyncio(loop_scope="module")
async def test_shared_2():
pass # 与test_shared_1共享同一事件循环
二、pytest-asyncio基础
pytest-asyncio最基础的使用方式是通过@pytest.mark.asyncio装饰器将普通的测试函数标记为异步测试。当一个测试函数被该装饰器修饰后,pytest-asyncio会在测试执行前创建事件循环,运行协程,最后清理事件循环。这整个过程对开发者透明,开发者只需要专注于编写async def test_xxx形式的测试函数即可。
asyncio_mode配置项决定了pytest-asyncio如何识别异步测试。在Auto模式下,任何async def的测试函数都会被自动视为异步测试,无需手动添加装饰器。这对于大型项目的迁移非常友好,但也可能无意中将某些本不应异步执行的测试误识别。Strict模式则要求每个异步测试都显式添加@pytest.mark.asyncio装饰器,虽然多了一行代码,但极大地提升了代码的可读性和可维护性。Legacy模式是pytest-asyncio 0.21版本之前的行为,为了向后兼容而保留。
异步测试的参数化与同步测试完全一致,使用@pytest.mark.parametrize即可。此外,pytest-asyncio支持pytest.mark.timeout等第三方标记,可以组合使用以实现超时控制等高级功能。在测试夹具(fixture)方面,pytest-asyncio支持异步fixture的定义和注入,使得测试准备和清理工作更加灵活。
# 基础异步测试
import pytest
@pytest.mark.asyncio
async def test_async_simple():
"""最简单的异步测试"""
result = await some_async_function()
assert result == expected_value
@pytest.mark.asyncio
async def test_async_failure():
"""测试异步异常"""
with pytest.raises(ValueError, match="invalid input"):
await async_raiser()
# 参数化异步测试
import pytest
@pytest.mark.asyncio
@pytest.mark.parametrize("input_val,expected", [
(1, 2),
(2, 4),
(3, 6),
(10, 20),
])
async def test_async_double(input_val, expected):
result = await async_double(input_val)
assert result == expected
# conftest.py 全局asyncio配置
import pytest
# 在conftest.py中设置全局事件循环作用域
@pytest.fixture(scope="session")
def event_loop():
"""为整个测试会话创建一个事件循环"""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
# 使用pytestmark为模块中所有测试添加标记
pytestmark = pytest.mark.asyncio
三、异步fixture
异步fixture是pytest-asyncio中最强大的特性之一。与同步fixture类似,异步fixture使用async def定义,通过@pytest.fixture装饰器注册。当测试函数需要异步资源(如数据库连接、HTTP客户端、缓存客户端等)时,异步fixture可以await资源的创建和销毁过程,使测试准备和清理工作变得简洁自然。
异步fixture也支持yield语法实现清理逻辑(teardown)。在yield之前的代码作为设置(setup),yield之后的代码作为清理(teardown)。清理代码同样可以是异步的,确保资源被正确释放。这与同步fixture的yield用法完全一致,只是函数本身是async def。需要注意的是,异步fixture的作用域(scope)与事件循环作用域(loop_scope)是两个不同的概念,前者控制fixture的缓存周期,后者控制事件循环的共享范围。
异步fixture之间可以相互依赖,一个fixture可以依赖另一个异步fixture,pytest-asyncio会自动解析依赖关系并按正确的顺序执行。此外,异步fixture可以和同步fixture混合使用,pytest会自动处理同步和异步上下文之间的切换。这种灵活性使得在大型测试套件中可以逐步迁移异步测试,而不需要一次性重写所有fixture。
# 基本的异步fixture
import pytest
@pytest.fixture
async def db_session():
"""创建异步数据库会话"""
session = await create_async_session()
yield session
await session.close()
@pytest.mark.asyncio
async def test_db_query(db_session):
"""使用异步fixture的测试"""
result = await db_session.execute("SELECT 1")
assert result == 1
# 带作用域和清理的异步fixture
import pytest
@pytest.fixture(scope="module")
async def redis_client():
"""模块级异步Redis客户端"""
# setup:创建连接池
pool = await redis.create_pool(
"redis://localhost:6379",
minsize=5, maxsize=10
)
client = redis.Redis(connection_pool=pool)
yield client
# teardown:关闭连接池
await pool.clear()
await client.close()
@pytest.mark.asyncio
async def test_redis_set_get(redis_client):
await redis_client.set("key", "value")
val = await redis_client.get("key")
assert val == "value"
# 异步fixture依赖与组合
import pytest
@pytest.fixture
async def http_client():
"""HTTP客户端fixture"""
async with httpx.AsyncClient() as client:
yield client
@pytest.fixture
async def auth_client(http_client):
"""基于http_client的认证客户端"""
token = await get_auth_token()
http_client.headers = {"Authorization": f"Bearer {token}"}
yield http_client
@pytest.mark.asyncio
async def test_authenticated_request(auth_client):
"""使用组合fixture"""
response = await auth_client.get("/api/protected")
assert response.status_code == 200
四、AsyncMock在异步测试
Python 3.8+ 的unittest.mock模块引入了AsyncMock类,专门用于模拟异步函数和异步上下文管理器。AsyncMock继承自MagicMock,但特殊之处在于它的返回值是一个awaitable对象。当AsyncMock被调用时,它返回一个Awaitable对象,该对象在被await时返回指定的返回值或抛出指定的异常。这使得模拟异步依赖变得极其简单。
AsyncMock支持side_effect参数,可以接受一个可迭代对象或异常类。当side_effect是一个可迭代对象时,每次await调用会返回下一个元素。这使得模拟多次异步调用变得非常方便,比如模拟HTTP请求的多次重试场景。此外,AsyncMock还支持assert_awaited、assert_awaited_once、assert_awaited_with等一系列断言方法,专门验证异步调用是否按预期执行。
对于异步上下文管理器(即实现了__aenter__和__aexit__的类),AsyncMock可以自动处理async with语句。只需设置async_mock.__aenter__.return_value和async_mock.__aexit__.return_value即可。同样,对于异步迭代器(实现__aiter__和__anext__的类),可以使用async_mock.__aiter__.return_value = [...]来模拟。这些特性使得AsyncMock成为异步测试中不可或缺的工具。
# AsyncMock基础用法
from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.asyncio
async def test_async_mock_basic():
mock = AsyncMock()
mock.return_value = 42
result = await mock()
assert result == 42
mock.assert_awaited_once()
@pytest.mark.asyncio
async def test_async_mock_with_side_effect():
mock = AsyncMock()
mock.side_effect = [10, 20, ValueError("no more")]
assert await mock() == 10
assert await mock() == 20
with pytest.raises(ValueError, match="no more"):
await mock()
# 异步上下文管理器Mock
from unittest.mock import AsyncMock
import pytest
@pytest.mark.asyncio
async def test_async_context_manager():
mock = AsyncMock()
# 模拟异步上下文管理器
mock.__aenter__.return_value = mock
mock.__aexit__.return_value = None
async with mock as cm:
assert cm is mock
mock.__aenter__.assert_awaited_once()
mock.__aexit__.assert_awaited_once()
@pytest.mark.asyncio
async def test_async_iterator():
mock = AsyncMock()
# 模拟异步迭代器
items = [1, 2, 3]
mock.__aiter__.return_value = mock
mock.__anext__.side_effect = items + [StopAsyncIteration()]
collected = [item async for item in mock]
assert collected == [1, 2, 3]
# 实战:使用patch模拟异步HTTP请求
from unittest.mock import AsyncMock, patch
import pytest
class AsyncDataFetcher:
async def fetch_data(self, url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
@pytest.mark.asyncio
async def test_fetch_data_with_mock():
fetcher = AsyncDataFetcher()
with patch.object(fetcher, 'fetch_data', new=AsyncMock()) as mock_fetch:
mock_fetch.return_value = {"id": 1, "name": "test"}
result = await fetcher.fetch_data("http://api.example.com/data")
assert result["name"] == "test"
mock_fetch.assert_awaited_once_with(
"http://api.example.com/data"
)
五、异步HTTP测试
在现代Web应用中,异步HTTP客户端已经成为主流。aiohttp和httpx是Python社区最流行的两个异步HTTP库,它们各自拥有配套的测试工具生态。测试异步HTTP请求的核心挑战在于:如何在测试中模拟外部服务,而不真正发起网络请求。这不仅能提升测试速度,还能确保测试的确定性和可重复性。
对于aiohttp,官方提供了pytest-aiohttp插件,它提供了aiohttp_clientfixture,可以将aiohttp应用包装为测试客户端,直接发送请求而不经过网络。此外,aresponses库提供了更加灵活的请求拦截方式,可以按URL模式匹配并返回预定义的响应。对于httpx,pytest-httpx和respx是两种流行的选择。pytest-httpx提供了高度可配置的请求拦截机制,支持按URL、方法、头部等条件匹配;respx则设计为与httpx的API更为贴近。
在选择测试工具时,需要考虑以下因素:是否需要模拟外部服务(aresponses/respx/pytest-httpx适合),还是需要测试应用自身的路由和中间件(pytest-aiohttp/httpx的ASGI传输适合)。对于微服务架构下的集成测试,推荐使用respx或pytest-httpx来模拟下游依赖;对于单体应用的单元测试和集成测试,pytest-aiohttp配合aiohttp应用实例是更好的选择。
# aiohttp + aresponses 测试
import aresponses
import pytest
@pytest.mark.asyncio
async def test_aiohttp_with_aresponses():
async with aresponses.ResponsesMockServer() as server:
# 注册模拟响应
server.add(
"api.example.com",
"/users/1",
"GET",
aresponses.Response(
status=200,
json={"id": 1, "name": "Alice"}
)
)
async with aiohttp.ClientSession() as session:
async with session.get(
"http://api.example.com/users/1"
) as response:
data = await response.json()
assert data["name"] == "Alice"
# httpx + respx 测试
import respx
from httpx import Response
import pytest
@pytest.mark.asyncio
async def test_httpx_with_respx():
# 使用respx路由mock
respx.get("https://api.example.com/items") \
.mock(return_value=Response(200, json=[
{"id": 1, "name": "item1"},
{"id": 2, "name": "item2"},
]))
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.example.com/items"
)
assert response.status_code == 200
assert len(response.json()) == 2
@pytest.mark.asyncio
async def test_httpx_with_pytest_httpx(pytest_httpx):
# 使用pytest-httpx
pytest_httpx.add_response(
url="https://api.example.com/data",
json={"key": "value"},
status_code=200
)
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.example.com/data"
)
assert response.json()["key"] == "value"
# FastAPI + TestClient + AsyncHTTP
from httpx import AsyncClient, ASGITransport
import pytest
@pytest.mark.asyncio
async def test_fastapi_async():
# 使用ASGITransport直接发送请求到FastAPI应用
transport = ASGITransport(app=app)
async with AsyncClient(
transport=transport,
base_url="http://test"
) as client:
response = await client.post(
"/api/users",
json={"name": "Bob", "email": "bob@test.com"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Bob"
六、异步数据库测试
数据库测试是异步测试中最复杂的领域之一。常见的异步数据库库包括databases(轻量级异步数据库访问库)、asyncpg(PostgreSQL专用异步驱动)、以及SQLAlchemy 1.4+的原生异步支持。在测试异步数据库操作时,核心挑战是:如何在测试之间隔离数据、如何管理数据库连接池、以及如何处理事务回滚。
最常用的策略是"事务回滚隔离":在每个测试开始时开启一个事务,测试结束后回滚该事务,从而保证每个测试运行在干净的数据库状态中。对于databases库,可以通过Database.transaction()上下文管理器实现。对于asyncpg,可以直接使用connection.transaction()。对于SQLAlchemy async,则通过AsyncSession配合事务管理来实现。
另一个重要的实践是使用"测试数据库"而非生产数据库。在CI/CD环境中,通常使用Docker容器启动一个临时的数据库实例,运行测试后再销毁。GitHub Actions等CI平台原生支持service containers,可以非常方便地配置PostgreSQL、MySQL等服务。在本地开发时,可以使用pytest-datadir或pytest-postgresql等插件来管理测试数据库的生命周期。
# databases库事务回滚测试
import pytest
from databases import Database
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/testdb"
@pytest.fixture
async def test_db():
database = Database(DATABASE_URL)
await database.connect()
# 开启事务
async with database.transaction(force_rollback=True):
yield database
# 事务自动回滚,数据被清理
await database.disconnect()
@pytest.mark.asyncio
async def test_insert_user(test_db):
query = "INSERT INTO users(name, email) VALUES(:name, :email)"
await test_db.execute(query, {
"name": "Test User",
"email": "test@example.com"
})
result = await test_db.fetch_one(
"SELECT COUNT(*) as cnt FROM users"
)
assert result["cnt"] == 1
# 测试结束后事务回滚,数据不持久化
# SQLAlchemy AsyncSession测试
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
import pytest
@pytest.fixture
async def async_session():
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/testdb"
)
session_factory = async_sessionmaker(
engine, expire_on_commit=False
)
async with session_factory() as session:
async with session.begin():
yield session
# 事务回滚
await session.rollback()
await engine.dispose()
@pytest.mark.asyncio
async def test_create_user(async_session):
user = User(name="Alice", email="alice@test.com")
async_session.add(user)
await async_session.flush()
result = await async_session.get(User, user.id)
assert result.name == "Alice"
# asyncpg直接测试
import asyncpg
import pytest
@pytest.fixture
async def pg_pool():
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/testdb",
min_size=2, max_size=5
)
yield pool
await pool.close()
@pytest.mark.asyncio
async def test_asyncpg_transaction(pg_pool):
async with pg_pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"INSERT INTO items(name) VALUES($1)",
"test_item"
)
result = await conn.fetchval(
"SELECT COUNT(*) FROM items"
)
assert result >= 1
# 事务自动回滚
七、超时与取消测试
异步编程中,超时处理是保证系统健壮性的关键环节。一个永不返回的协程可能会导致整个事件循环被阻塞,进而影响系统的可用性。因此,在测试中验证超时和取消行为与验证正常功能同样重要。pytest-asyncio生态提供了多种超时测试方案,包括pytest-timeout插件和asyncio内置的asyncio.wait_for()函数。
pytest-timeout插件可以在测试函数级别设置超时,无论测试是同步还是异步。当测试执行时间超过设定的超时阈值时,pytest会将测试标记为失败。这对于防止CI流水线中的测试挂起非常有用。然而,pytest-timeout是在进程级别实现的,可能在捕获超时时不够精细。更细粒度的超时控制应使用asyncio.wait_for,它可以在协程级别设置超时,并抛出asyncio.TimeoutError异常。
Task取消测试是另一个重要话题。asyncio的Task对象支持cancel()方法,被取消的Task在await时会抛出asyncio.CancelledError。良好的异步代码应该正确处理这个异常,执行必要的清理操作(如释放资源、回滚事务等)。测试中需要验证:取消信号能被正确传播、被取消的Task确实不再执行、以及在取消后资源被正确清理。此外,超时后的清理操作(如关闭网络连接、释放文件句柄)也需要通过测试来保障。
# asyncio.wait_for 超时测试
import asyncio
import pytest
@pytest.mark.asyncio
async def test_timeout_raises_exception():
"""验证超时确实抛出异常"""
async def slow_operation():
await asyncio.sleep(10)
return "done"
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(
slow_operation(), timeout=0.1
)
@pytest.mark.asyncio
async def test_operation_completes_before_timeout():
"""验证操作在超时前完成"""
async def fast_operation():
await asyncio.sleep(0.01)
return "done"
result = await asyncio.wait_for(
fast_operation(), timeout=5.0
)
assert result == "done"
# Task取消测试
import asyncio
import pytest
@pytest.mark.asyncio
async def test_task_cancellation():
"""验证Task取消行为"""
async def cancellable_worker():
try:
while True:
await asyncio.sleep(0.01)
except asyncio.CancelledError:
# 执行清理操作
cleanup_done = True
raise # 重新抛出是好的实践
finally:
resources_released = True
task = asyncio.create_task(cancellable_worker())
await asyncio.sleep(0.05) # 让任务跑一会儿
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert task.cancelled()
@pytest.mark.asyncio
async def test_task_group_cancellation():
"""验证TaskGroup整体取消"""
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(some_work())
t2 = tg.create_task(some_work())
# TaskGroup中的任一Task失败会取消其他Task
# 超时清理测试
import asyncio
import pytest
class ResourceHolder:
"""需要清理的资源类"""
def __init__(self):
self.cleanup_called = False
async def cleanup(self):
self.cleanup_called = True
@pytest.mark.asyncio
async def test_timeout_cleanup():
"""验证超时后资源被正确清理"""
resource = ResourceHolder()
async def operation_with_cleanup():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
await resource.cleanup()
raise
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(
operation_with_cleanup(), timeout=0.1
)
assert resource.cleanup_called, \
"超时后资源清理应被调用"
八、事件循环管理
事件循环是asyncio的核心组件,理解和管理事件循环对于编写可靠的异步测试至关重要。默认情况下,pytest-asyncio会为每个测试函数创建和销毁事件循环,但高级场景下需要自定义事件循环的创建策略。Python的asyncio.AbstractEventLoopPolicy提供了事件循环的工厂接口,允许开发者自定义事件循环的类型和行为。
uvloop是一个高性能的事件循环实现,基于libuv(Node.js底层的事件循环库)。它可以将asyncio的事件循环替换为uvloop,在某些场景下性能提升可达2-4倍。在测试中使用uvloop可以验证应用在uvloop环境下的行为是否正确。通过设置asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()),可以在测试会话级别全局替换事件循环。需要注意的是,uvloop目前仅支持Unix平台,Windows用户需要使用其他方案。
事件循环隔离是大型测试套件中经常遇到的问题。当多个测试共享同一个事件循环时,一个测试中的未完成协程可能会干扰其他测试的执行。pytest-asyncio通过loop_scope参数控制事件循环的隔离级别。function级别(默认)为每个测试创建独立的事件循环,隔离性最强但开销最大;module级别在同一模块内共享事件循环;session级别在整个测试会话中共享单一事件循环,性能最好但隔离性最差。选择合适的隔离级别需要在测试速度和可靠性之间做出权衡。
# 自定义事件循环策略
import asyncio
import pytest
class CustomEventLoopPolicy(
asyncio.DefaultEventLoopPolicy
):
"""自定义事件循环策略:添加日志和监控"""
def new_event_loop(self):
loop = super().new_event_loop()
print(f"[CustomPolicy] 创建新事件循环: {id(loop)}")
return loop
def get_event_loop(self):
loop = super().get_event_loop()
print(f"[CustomPolicy] 获取事件循环: {id(loop)}")
return loop
# 在conftest.py中应用自定义策略
@pytest.fixture(scope="session")
def event_loop():
policy = CustomEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
loop = policy.new_event_loop()
yield loop
loop.close()
# uvloop测试配置
import asyncio
import pytest
try:
import uvloop
HAS_UVLOOP = True
except ImportError:
HAS_UVLOOP = False
@pytest.fixture(scope="session")
def event_loop():
"""使用uvloop替代默认事件循环"""
if HAS_UVLOOP:
asyncio.set_event_loop_policy(
uvloop.EventLoopPolicy()
)
print("使用uvloop作为事件循环")
else:
print("使用默认事件循环")
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_with_uvloop():
"""在uvloop上运行的测试"""
loop = asyncio.get_running_loop()
assert isinstance(loop, uvloop.Loop) if HAS_UVLOOP else True
# 事件循环隔离模式
import pytest
# 函数级隔离(默认):最安全,但开销最大
@pytest.mark.asyncio(loop_scope="function")
async def test_function_scope():
"""每个测试函数拥有独立事件循环"""
loop = asyncio.get_running_loop()
# 该事件循环仅在此测试中可用
# 类级隔离:同一个类的测试共享事件循环
@pytest.mark.asyncio(loop_scope="class")
class TestClassScope:
async def test_a(self):
pass
async def test_b(self):
pass # 与test_a共享事件循环
# 模块级隔离:同一模块共享事件循环
@pytest.mark.asyncio(loop_scope="module")
async def test_module_scope_a():
pass
@pytest.mark.asyncio(loop_scope="module")
async def test_module_scope_b():
pass # 与test_module_scope_a共享事件循环
九、实战案例
综合运用以上知识,我们可以构建完整的异步测试解决方案。以下从FastAPI异步接口测试、异步爬虫测试和WebSocket测试三个典型场景出发,展示实际项目中的异步测试实践。每个案例都综合使用了异步fixture、AsyncMock、事件循环管理和超时处理等技术。
FastAPI异步接口测试是当前最热门的应用之一。通过httpx的ASGITransport,我们可以直接向FastAPI应用发送请求而无需启动HTTP服务器,测试速度极快。结合respx模拟外部依赖(如第三方支付接口、短信服务等),可以实现完整的端到端测试。对于需要数据库的测试,使用事务回滚隔离策略,确保每个测试用例运行在干净的数据库状态下。
异步爬虫测试则更具挑战性,因为爬虫通常涉及多个阶段的异步操作:URL调度、页面下载、解析、数据处理和存储。使用AsyncMock模拟HTTP响应、使用异步fixture管理数据库连接、使用超时控制防止爬虫挂起,都是测试异步爬虫的关键技术。WebSocket测试则需要关注连接建立、消息收发、心跳维持和断线重连等异步事件的处理正确性。
# 实战:FastAPI异步接口完整测试
import pytest
from httpx import AsyncClient, ASGITransport
from app import app # FastAPI应用
from unittest.mock import AsyncMock, patch
@pytest.fixture
async def async_client():
"""FastAPI测试客户端"""
transport = ASGITransport(app=app)
async with AsyncClient(
transport=transport,
base_url="http://test"
) as client:
yield client
@pytest.fixture
def mock_email_service():
"""模拟外部邮件服务"""
with patch(
"app.services.email.send_email",
new_callable=AsyncMock
) as mock:
mock.return_value = {"status": "sent"}
yield mock
@pytest.mark.asyncio
async def test_register_user(
async_client, mock_email_service
):
"""测试用户注册完整流程"""
response = await async_client.post("/api/register", json={
"username": "newuser",
"email": "newuser@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 201
assert response.json()["username"] == "newuser"
# 验证邮件服务被调用
mock_email_service.assert_awaited_once()
args, _ = mock_email_service.await_args
assert args[0] == "newuser@example.com"
@pytest.mark.asyncio
async def test_register_duplicate(async_client):
"""测试重复注册处理"""
response = await async_client.post("/api/register", json={
"username": "existing",
"email": "existing@test.com",
"password": "SecurePass123!"
})
assert response.status_code == 409
assert "already exists" in response.text
# 实战:异步爬虫测试
from unittest.mock import AsyncMock, patch
import pytest
class AsyncCrawler:
"""简单的异步爬虫"""
def __init__(self, session):
self.session = session
async def fetch_page(self, url: str) -> str:
async with self.session.get(url) as resp:
return await resp.text()
async def crawl(self, urls: list) -> dict:
results = {}
for url in urls:
try:
html = await asyncio.wait_for(
self.fetch_page(url),
timeout=5.0
)
results[url] = {"status": "success", "size": len(html)}
except asyncio.TimeoutError:
results[url] = {"status": "timeout"}
return results
@pytest.mark.asyncio
async def test_crawler_success():
"""测试爬虫成功抓取"""
mock_session = AsyncMock()
mock_response = AsyncMock()
mock_response.text = AsyncMock(
return_value="content"
)
mock_session.get.return_value.__aenter__.return_value = (
mock_response
)
crawler = AsyncCrawler(mock_session)
result = await crawler.crawl(["https://example.com"])
assert result["https://example.com"]["status"] == "success"
mock_session.get.assert_awaited_once_with(
"https://example.com"
)
@pytest.mark.asyncio
async def test_crawler_timeout():
"""测试爬虫超时处理"""
mock_session = AsyncMock()
mock_response = AsyncMock()
mock_response.text = AsyncMock(
side_effect=asyncio.TimeoutError
)
mock_session.get.return_value.__aenter__.return_value = (
mock_response
)
crawler = AsyncCrawler(mock_session)
result = await crawler.crawl(["https://slow-site.com"])
assert result["https://slow-site.com"]["status"] == "timeout"
# 实战:WebSocket测试
from unittest.mock import AsyncMock, patch
import pytest
class AsyncWebSocketClient:
"""异步WebSocket客户端"""
def __init__(self):
self.connected = False
self.messages = []
async def connect(self, url: str):
self.connected = True
self.url = url
async def send(self, message: str):
if not self.connected:
raise ConnectionError("未连接")
self.messages.append(message)
async def receive(self) -> str:
if not self.connected:
raise ConnectionError("未连接")
return await self._fetch_message()
async def disconnect(self):
self.connected = False
@pytest.mark.asyncio
async def test_websocket_lifecycle():
"""测试WebSocket完整生命周期"""
client = AsyncWebSocketClient()
# 连接
await client.connect("ws://server/chat")
assert client.connected
# 发送和接收
await client.send("Hello")
assert len(client.messages) == 1
# 断开
await client.disconnect()
assert not client.connected
# 断线后发送应报错
with pytest.raises(ConnectionError):
await client.send("test")
核心要点总结:
1. pytest-asyncio是Python异步测试的标准方案,提供@pytest.mark.asyncio装饰器和asyncio_mode配置(Auto/Strict/Legacy)。
2. 异步fixture使用async def定义,支持yield清理和依赖注入,是组织异步测试资源的最佳方式。
3. AsyncMock(Python 3.8+)专门用于模拟异步函数、异步上下文管理器和异步迭代器,提供assert_awaited等断言方法。
4. 异步HTTP测试推荐aresponses(aiohttp)和respx/pytest-httpx(httpx),无需真实网络请求即可验证HTTP行为。
5. 异步数据库测试的核心策略是"事务回滚隔离",确保每个测试运行在干净的数据库状态下。
6. 超时和取消测试使用asyncio.wait_for结合pytest.raises,验证异常处理和资源清理的正确性。
7. 事件循环管理通过loop_scope控制隔离级别,高级场景可使用自定义EventLoopPolicy或uvloop提升性能。
8. FastAPI、异步爬虫和WebSocket等实战场景综合运用上述技术,构建完整的异步测试体系。