异步代码测试:pytest-asyncio协程测试

Python 测试与调试专题 · 全面保障异步代码的正确性

专题:Python 测试与调试系统学习

关键词:Python, 测试, 调试, pytest-asyncio, 异步测试, 协程测试, asyncio, AsyncMock, 异步API测试

一、异步测试概述

随着Python异步编程(asyncio)的广泛普及,越来越多的项目采用async/await语法构建高性能IO密集型应用。然而,异步代码的测试与同步代码有着本质区别,面临独特的挑战:事件循环管理、协程生命周期控制、异步上下文隔离以及并发任务的竞态条件等问题,使得传统的pytest测试框架无法直接处理异步测试用例。

pytest-asyncio正是解决这一痛点的核心工具。它作为pytest的插件,为异步测试提供了完整的支持体系。当pytest运行一个标记为@pytest.mark.asyncio的测试函数时,pytest-asyncio会自动创建并管理一个事件循环,在该循环中执行协程,并收集结果或异常。这极大地简化了异步测试的编写,开发者无需手动调用asyncio.run()或管理事件循环的创建与销毁。

pytest-asyncio提供了三种asyncio模式(通过asyncio_mode配置项设置):Auto模式自动将所有async def测试函数识别为异步测试,无需显式添加装饰器,最适用于以异步为主的项目;Strict模式要求每个异步测试都必须显式标注@pytest.mark.asyncio,是最明确和安全的模式;Legacy模式向后兼容旧版本行为。从项目可维护性角度推荐使用Strict模式,因为它让测试意图一目了然。

事件循环作用域(event_loop_scope)是另一个关键概念。默认情况下每个测试函数拥有独立的事件循环,确保了测试之间的完全隔离。但在某些场景下(如共享数据库连接池),可以将作用域设置为"module"或"session"以复用事件循环,提升测试执行效率。理解不同作用域的trade-off对编写高效且可靠的异步测试至关重要。

# pytest.ini 配置示例 [pytest] asyncio_mode = strict testpaths = tests python_files = test_*.py asyncio_default_fixture_loop_scope = function
# 三种asyncio模式对比 # Auto模式:自动识别async def测试函数 # pytest.ini # asyncio_mode = auto async def test_fetch_data_auto(): # 无需装饰器,自动识别 result = await fetch_data() assert result["status"] == "ok" # Strict模式:必须显式标记 # pytest.ini # asyncio_mode = strict @pytest.mark.asyncio async def test_fetch_data_strict(): result = await fetch_data() assert result["status"] == "ok"
# 事件循环作用域配置 import pytest # 函数级作用域(默认):每个测试独立事件循环 @pytest.mark.asyncio(loop_scope="function") async def test_independent(): loop = asyncio.get_running_loop() # 每个测试运行在自己的事件循环中 # 模块级作用域:同一模块共享事件循环 @pytest.mark.asyncio(loop_scope="module") async def test_shared_1(): pass @pytest.mark.asyncio(loop_scope="module") async def test_shared_2(): pass # 与test_shared_1共享同一事件循环

二、pytest-asyncio基础

pytest-asyncio最基础的使用方式是通过@pytest.mark.asyncio装饰器将普通的测试函数标记为异步测试。当一个测试函数被该装饰器修饰后,pytest-asyncio会在测试执行前创建事件循环,运行协程,最后清理事件循环。这整个过程对开发者透明,开发者只需要专注于编写async def test_xxx形式的测试函数即可。

asyncio_mode配置项决定了pytest-asyncio如何识别异步测试。在Auto模式下,任何async def的测试函数都会被自动视为异步测试,无需手动添加装饰器。这对于大型项目的迁移非常友好,但也可能无意中将某些本不应异步执行的测试误识别。Strict模式则要求每个异步测试都显式添加@pytest.mark.asyncio装饰器,虽然多了一行代码,但极大地提升了代码的可读性和可维护性。Legacy模式是pytest-asyncio 0.21版本之前的行为,为了向后兼容而保留。

异步测试的参数化与同步测试完全一致,使用@pytest.mark.parametrize即可。此外,pytest-asyncio支持pytest.mark.timeout等第三方标记,可以组合使用以实现超时控制等高级功能。在测试夹具(fixture)方面,pytest-asyncio支持异步fixture的定义和注入,使得测试准备和清理工作更加灵活。

# 基础异步测试 import pytest @pytest.mark.asyncio async def test_async_simple(): """最简单的异步测试""" result = await some_async_function() assert result == expected_value @pytest.mark.asyncio async def test_async_failure(): """测试异步异常""" with pytest.raises(ValueError, match="invalid input"): await async_raiser()
# 参数化异步测试 import pytest @pytest.mark.asyncio @pytest.mark.parametrize("input_val,expected", [ (1, 2), (2, 4), (3, 6), (10, 20), ]) async def test_async_double(input_val, expected): result = await async_double(input_val) assert result == expected
# conftest.py 全局asyncio配置 import pytest # 在conftest.py中设置全局事件循环作用域 @pytest.fixture(scope="session") def event_loop(): """为整个测试会话创建一个事件循环""" policy = asyncio.get_event_loop_policy() loop = policy.new_event_loop() yield loop loop.close() # 使用pytestmark为模块中所有测试添加标记 pytestmark = pytest.mark.asyncio

三、异步fixture

异步fixture是pytest-asyncio中最强大的特性之一。与同步fixture类似,异步fixture使用async def定义,通过@pytest.fixture装饰器注册。当测试函数需要异步资源(如数据库连接、HTTP客户端、缓存客户端等)时,异步fixture可以await资源的创建和销毁过程,使测试准备和清理工作变得简洁自然。

异步fixture也支持yield语法实现清理逻辑(teardown)。在yield之前的代码作为设置(setup),yield之后的代码作为清理(teardown)。清理代码同样可以是异步的,确保资源被正确释放。这与同步fixture的yield用法完全一致,只是函数本身是async def。需要注意的是,异步fixture的作用域(scope)与事件循环作用域(loop_scope)是两个不同的概念,前者控制fixture的缓存周期,后者控制事件循环的共享范围。

异步fixture之间可以相互依赖,一个fixture可以依赖另一个异步fixture,pytest-asyncio会自动解析依赖关系并按正确的顺序执行。此外,异步fixture可以和同步fixture混合使用,pytest会自动处理同步和异步上下文之间的切换。这种灵活性使得在大型测试套件中可以逐步迁移异步测试,而不需要一次性重写所有fixture。

# 基本的异步fixture import pytest @pytest.fixture async def db_session(): """创建异步数据库会话""" session = await create_async_session() yield session await session.close() @pytest.mark.asyncio async def test_db_query(db_session): """使用异步fixture的测试""" result = await db_session.execute("SELECT 1") assert result == 1
# 带作用域和清理的异步fixture import pytest @pytest.fixture(scope="module") async def redis_client(): """模块级异步Redis客户端""" # setup:创建连接池 pool = await redis.create_pool( "redis://localhost:6379", minsize=5, maxsize=10 ) client = redis.Redis(connection_pool=pool) yield client # teardown:关闭连接池 await pool.clear() await client.close() @pytest.mark.asyncio async def test_redis_set_get(redis_client): await redis_client.set("key", "value") val = await redis_client.get("key") assert val == "value"
# 异步fixture依赖与组合 import pytest @pytest.fixture async def http_client(): """HTTP客户端fixture""" async with httpx.AsyncClient() as client: yield client @pytest.fixture async def auth_client(http_client): """基于http_client的认证客户端""" token = await get_auth_token() http_client.headers = {"Authorization": f"Bearer {token}"} yield http_client @pytest.mark.asyncio async def test_authenticated_request(auth_client): """使用组合fixture""" response = await auth_client.get("/api/protected") assert response.status_code == 200

四、AsyncMock在异步测试

Python 3.8+ 的unittest.mock模块引入了AsyncMock类,专门用于模拟异步函数和异步上下文管理器。AsyncMock继承自MagicMock,但特殊之处在于它的返回值是一个awaitable对象。当AsyncMock被调用时,它返回一个Awaitable对象,该对象在被await时返回指定的返回值或抛出指定的异常。这使得模拟异步依赖变得极其简单。

AsyncMock支持side_effect参数,可以接受一个可迭代对象或异常类。当side_effect是一个可迭代对象时,每次await调用会返回下一个元素。这使得模拟多次异步调用变得非常方便,比如模拟HTTP请求的多次重试场景。此外,AsyncMock还支持assert_awaitedassert_awaited_onceassert_awaited_with等一系列断言方法,专门验证异步调用是否按预期执行。

对于异步上下文管理器(即实现了__aenter____aexit__的类),AsyncMock可以自动处理async with语句。只需设置async_mock.__aenter__.return_valueasync_mock.__aexit__.return_value即可。同样,对于异步迭代器(实现__aiter____anext__的类),可以使用async_mock.__aiter__.return_value = [...]来模拟。这些特性使得AsyncMock成为异步测试中不可或缺的工具。

# AsyncMock基础用法 from unittest.mock import AsyncMock, patch import pytest @pytest.mark.asyncio async def test_async_mock_basic(): mock = AsyncMock() mock.return_value = 42 result = await mock() assert result == 42 mock.assert_awaited_once() @pytest.mark.asyncio async def test_async_mock_with_side_effect(): mock = AsyncMock() mock.side_effect = [10, 20, ValueError("no more")] assert await mock() == 10 assert await mock() == 20 with pytest.raises(ValueError, match="no more"): await mock()
# 异步上下文管理器Mock from unittest.mock import AsyncMock import pytest @pytest.mark.asyncio async def test_async_context_manager(): mock = AsyncMock() # 模拟异步上下文管理器 mock.__aenter__.return_value = mock mock.__aexit__.return_value = None async with mock as cm: assert cm is mock mock.__aenter__.assert_awaited_once() mock.__aexit__.assert_awaited_once() @pytest.mark.asyncio async def test_async_iterator(): mock = AsyncMock() # 模拟异步迭代器 items = [1, 2, 3] mock.__aiter__.return_value = mock mock.__anext__.side_effect = items + [StopAsyncIteration()] collected = [item async for item in mock] assert collected == [1, 2, 3]
# 实战:使用patch模拟异步HTTP请求 from unittest.mock import AsyncMock, patch import pytest class AsyncDataFetcher: async def fetch_data(self, url: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get(url) return response.json() @pytest.mark.asyncio async def test_fetch_data_with_mock(): fetcher = AsyncDataFetcher() with patch.object(fetcher, 'fetch_data', new=AsyncMock()) as mock_fetch: mock_fetch.return_value = {"id": 1, "name": "test"} result = await fetcher.fetch_data("http://api.example.com/data") assert result["name"] == "test" mock_fetch.assert_awaited_once_with( "http://api.example.com/data" )

五、异步HTTP测试

在现代Web应用中,异步HTTP客户端已经成为主流。aiohttp和httpx是Python社区最流行的两个异步HTTP库,它们各自拥有配套的测试工具生态。测试异步HTTP请求的核心挑战在于:如何在测试中模拟外部服务,而不真正发起网络请求。这不仅能提升测试速度,还能确保测试的确定性和可重复性。

对于aiohttp,官方提供了pytest-aiohttp插件,它提供了aiohttp_clientfixture,可以将aiohttp应用包装为测试客户端,直接发送请求而不经过网络。此外,aresponses库提供了更加灵活的请求拦截方式,可以按URL模式匹配并返回预定义的响应。对于httpx,pytest-httpxrespx是两种流行的选择。pytest-httpx提供了高度可配置的请求拦截机制,支持按URL、方法、头部等条件匹配;respx则设计为与httpx的API更为贴近。

在选择测试工具时,需要考虑以下因素:是否需要模拟外部服务(aresponses/respx/pytest-httpx适合),还是需要测试应用自身的路由和中间件(pytest-aiohttp/httpx的ASGI传输适合)。对于微服务架构下的集成测试,推荐使用respx或pytest-httpx来模拟下游依赖;对于单体应用的单元测试和集成测试,pytest-aiohttp配合aiohttp应用实例是更好的选择。

# aiohttp + aresponses 测试 import aresponses import pytest @pytest.mark.asyncio async def test_aiohttp_with_aresponses(): async with aresponses.ResponsesMockServer() as server: # 注册模拟响应 server.add( "api.example.com", "/users/1", "GET", aresponses.Response( status=200, json={"id": 1, "name": "Alice"} ) ) async with aiohttp.ClientSession() as session: async with session.get( "http://api.example.com/users/1" ) as response: data = await response.json() assert data["name"] == "Alice"
# httpx + respx 测试 import respx from httpx import Response import pytest @pytest.mark.asyncio async def test_httpx_with_respx(): # 使用respx路由mock respx.get("https://api.example.com/items") \ .mock(return_value=Response(200, json=[ {"id": 1, "name": "item1"}, {"id": 2, "name": "item2"}, ])) async with httpx.AsyncClient() as client: response = await client.get( "https://api.example.com/items" ) assert response.status_code == 200 assert len(response.json()) == 2 @pytest.mark.asyncio async def test_httpx_with_pytest_httpx(pytest_httpx): # 使用pytest-httpx pytest_httpx.add_response( url="https://api.example.com/data", json={"key": "value"}, status_code=200 ) async with httpx.AsyncClient() as client: response = await client.get( "https://api.example.com/data" ) assert response.json()["key"] == "value"
# FastAPI + TestClient + AsyncHTTP from httpx import AsyncClient, ASGITransport import pytest @pytest.mark.asyncio async def test_fastapi_async(): # 使用ASGITransport直接发送请求到FastAPI应用 transport = ASGITransport(app=app) async with AsyncClient( transport=transport, base_url="http://test" ) as client: response = await client.post( "/api/users", json={"name": "Bob", "email": "bob@test.com"} ) assert response.status_code == 201 data = response.json() assert data["name"] == "Bob"

六、异步数据库测试

数据库测试是异步测试中最复杂的领域之一。常见的异步数据库库包括databases(轻量级异步数据库访问库)、asyncpg(PostgreSQL专用异步驱动)、以及SQLAlchemy 1.4+的原生异步支持。在测试异步数据库操作时,核心挑战是:如何在测试之间隔离数据、如何管理数据库连接池、以及如何处理事务回滚。

最常用的策略是"事务回滚隔离":在每个测试开始时开启一个事务,测试结束后回滚该事务,从而保证每个测试运行在干净的数据库状态中。对于databases库,可以通过Database.transaction()上下文管理器实现。对于asyncpg,可以直接使用connection.transaction()。对于SQLAlchemy async,则通过AsyncSession配合事务管理来实现。

另一个重要的实践是使用"测试数据库"而非生产数据库。在CI/CD环境中,通常使用Docker容器启动一个临时的数据库实例,运行测试后再销毁。GitHub Actions等CI平台原生支持service containers,可以非常方便地配置PostgreSQL、MySQL等服务。在本地开发时,可以使用pytest-datadirpytest-postgresql等插件来管理测试数据库的生命周期。

# databases库事务回滚测试 import pytest from databases import Database DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/testdb" @pytest.fixture async def test_db(): database = Database(DATABASE_URL) await database.connect() # 开启事务 async with database.transaction(force_rollback=True): yield database # 事务自动回滚,数据被清理 await database.disconnect() @pytest.mark.asyncio async def test_insert_user(test_db): query = "INSERT INTO users(name, email) VALUES(:name, :email)" await test_db.execute(query, { "name": "Test User", "email": "test@example.com" }) result = await test_db.fetch_one( "SELECT COUNT(*) as cnt FROM users" ) assert result["cnt"] == 1 # 测试结束后事务回滚,数据不持久化
# SQLAlchemy AsyncSession测试 from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker ) import pytest @pytest.fixture async def async_session(): engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/testdb" ) session_factory = async_sessionmaker( engine, expire_on_commit=False ) async with session_factory() as session: async with session.begin(): yield session # 事务回滚 await session.rollback() await engine.dispose() @pytest.mark.asyncio async def test_create_user(async_session): user = User(name="Alice", email="alice@test.com") async_session.add(user) await async_session.flush() result = await async_session.get(User, user.id) assert result.name == "Alice"
# asyncpg直接测试 import asyncpg import pytest @pytest.fixture async def pg_pool(): pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/testdb", min_size=2, max_size=5 ) yield pool await pool.close() @pytest.mark.asyncio async def test_asyncpg_transaction(pg_pool): async with pg_pool.acquire() as conn: async with conn.transaction(): await conn.execute( "INSERT INTO items(name) VALUES($1)", "test_item" ) result = await conn.fetchval( "SELECT COUNT(*) FROM items" ) assert result >= 1 # 事务自动回滚

七、超时与取消测试

异步编程中,超时处理是保证系统健壮性的关键环节。一个永不返回的协程可能会导致整个事件循环被阻塞,进而影响系统的可用性。因此,在测试中验证超时和取消行为与验证正常功能同样重要。pytest-asyncio生态提供了多种超时测试方案,包括pytest-timeout插件和asyncio内置的asyncio.wait_for()函数。

pytest-timeout插件可以在测试函数级别设置超时,无论测试是同步还是异步。当测试执行时间超过设定的超时阈值时,pytest会将测试标记为失败。这对于防止CI流水线中的测试挂起非常有用。然而,pytest-timeout是在进程级别实现的,可能在捕获超时时不够精细。更细粒度的超时控制应使用asyncio.wait_for,它可以在协程级别设置超时,并抛出asyncio.TimeoutError异常。

Task取消测试是另一个重要话题。asyncio的Task对象支持cancel()方法,被取消的Task在await时会抛出asyncio.CancelledError。良好的异步代码应该正确处理这个异常,执行必要的清理操作(如释放资源、回滚事务等)。测试中需要验证:取消信号能被正确传播、被取消的Task确实不再执行、以及在取消后资源被正确清理。此外,超时后的清理操作(如关闭网络连接、释放文件句柄)也需要通过测试来保障。

# asyncio.wait_for 超时测试 import asyncio import pytest @pytest.mark.asyncio async def test_timeout_raises_exception(): """验证超时确实抛出异常""" async def slow_operation(): await asyncio.sleep(10) return "done" with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for( slow_operation(), timeout=0.1 ) @pytest.mark.asyncio async def test_operation_completes_before_timeout(): """验证操作在超时前完成""" async def fast_operation(): await asyncio.sleep(0.01) return "done" result = await asyncio.wait_for( fast_operation(), timeout=5.0 ) assert result == "done"
# Task取消测试 import asyncio import pytest @pytest.mark.asyncio async def test_task_cancellation(): """验证Task取消行为""" async def cancellable_worker(): try: while True: await asyncio.sleep(0.01) except asyncio.CancelledError: # 执行清理操作 cleanup_done = True raise # 重新抛出是好的实践 finally: resources_released = True task = asyncio.create_task(cancellable_worker()) await asyncio.sleep(0.05) # 让任务跑一会儿 task.cancel() with pytest.raises(asyncio.CancelledError): await task assert task.cancelled() @pytest.mark.asyncio async def test_task_group_cancellation(): """验证TaskGroup整体取消""" async with asyncio.TaskGroup() as tg: t1 = tg.create_task(some_work()) t2 = tg.create_task(some_work()) # TaskGroup中的任一Task失败会取消其他Task
# 超时清理测试 import asyncio import pytest class ResourceHolder: """需要清理的资源类""" def __init__(self): self.cleanup_called = False async def cleanup(self): self.cleanup_called = True @pytest.mark.asyncio async def test_timeout_cleanup(): """验证超时后资源被正确清理""" resource = ResourceHolder() async def operation_with_cleanup(): try: await asyncio.sleep(10) except asyncio.CancelledError: await resource.cleanup() raise with pytest.raises(asyncio.CancelledError): await asyncio.wait_for( operation_with_cleanup(), timeout=0.1 ) assert resource.cleanup_called, \ "超时后资源清理应被调用"

八、事件循环管理

事件循环是asyncio的核心组件,理解和管理事件循环对于编写可靠的异步测试至关重要。默认情况下,pytest-asyncio会为每个测试函数创建和销毁事件循环,但高级场景下需要自定义事件循环的创建策略。Python的asyncio.AbstractEventLoopPolicy提供了事件循环的工厂接口,允许开发者自定义事件循环的类型和行为。

uvloop是一个高性能的事件循环实现,基于libuv(Node.js底层的事件循环库)。它可以将asyncio的事件循环替换为uvloop,在某些场景下性能提升可达2-4倍。在测试中使用uvloop可以验证应用在uvloop环境下的行为是否正确。通过设置asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()),可以在测试会话级别全局替换事件循环。需要注意的是,uvloop目前仅支持Unix平台,Windows用户需要使用其他方案。

事件循环隔离是大型测试套件中经常遇到的问题。当多个测试共享同一个事件循环时,一个测试中的未完成协程可能会干扰其他测试的执行。pytest-asyncio通过loop_scope参数控制事件循环的隔离级别。function级别(默认)为每个测试创建独立的事件循环,隔离性最强但开销最大;module级别在同一模块内共享事件循环;session级别在整个测试会话中共享单一事件循环,性能最好但隔离性最差。选择合适的隔离级别需要在测试速度和可靠性之间做出权衡。

# 自定义事件循环策略 import asyncio import pytest class CustomEventLoopPolicy( asyncio.DefaultEventLoopPolicy ): """自定义事件循环策略:添加日志和监控""" def new_event_loop(self): loop = super().new_event_loop() print(f"[CustomPolicy] 创建新事件循环: {id(loop)}") return loop def get_event_loop(self): loop = super().get_event_loop() print(f"[CustomPolicy] 获取事件循环: {id(loop)}") return loop # 在conftest.py中应用自定义策略 @pytest.fixture(scope="session") def event_loop(): policy = CustomEventLoopPolicy() asyncio.set_event_loop_policy(policy) loop = policy.new_event_loop() yield loop loop.close()
# uvloop测试配置 import asyncio import pytest try: import uvloop HAS_UVLOOP = True except ImportError: HAS_UVLOOP = False @pytest.fixture(scope="session") def event_loop(): """使用uvloop替代默认事件循环""" if HAS_UVLOOP: asyncio.set_event_loop_policy( uvloop.EventLoopPolicy() ) print("使用uvloop作为事件循环") else: print("使用默认事件循环") loop = asyncio.new_event_loop() yield loop loop.close() @pytest.mark.asyncio async def test_with_uvloop(): """在uvloop上运行的测试""" loop = asyncio.get_running_loop() assert isinstance(loop, uvloop.Loop) if HAS_UVLOOP else True
# 事件循环隔离模式 import pytest # 函数级隔离(默认):最安全,但开销最大 @pytest.mark.asyncio(loop_scope="function") async def test_function_scope(): """每个测试函数拥有独立事件循环""" loop = asyncio.get_running_loop() # 该事件循环仅在此测试中可用 # 类级隔离:同一个类的测试共享事件循环 @pytest.mark.asyncio(loop_scope="class") class TestClassScope: async def test_a(self): pass async def test_b(self): pass # 与test_a共享事件循环 # 模块级隔离:同一模块共享事件循环 @pytest.mark.asyncio(loop_scope="module") async def test_module_scope_a(): pass @pytest.mark.asyncio(loop_scope="module") async def test_module_scope_b(): pass # 与test_module_scope_a共享事件循环

九、实战案例

综合运用以上知识,我们可以构建完整的异步测试解决方案。以下从FastAPI异步接口测试、异步爬虫测试和WebSocket测试三个典型场景出发,展示实际项目中的异步测试实践。每个案例都综合使用了异步fixture、AsyncMock、事件循环管理和超时处理等技术。

FastAPI异步接口测试是当前最热门的应用之一。通过httpx的ASGITransport,我们可以直接向FastAPI应用发送请求而无需启动HTTP服务器,测试速度极快。结合respx模拟外部依赖(如第三方支付接口、短信服务等),可以实现完整的端到端测试。对于需要数据库的测试,使用事务回滚隔离策略,确保每个测试用例运行在干净的数据库状态下。

异步爬虫测试则更具挑战性,因为爬虫通常涉及多个阶段的异步操作:URL调度、页面下载、解析、数据处理和存储。使用AsyncMock模拟HTTP响应、使用异步fixture管理数据库连接、使用超时控制防止爬虫挂起,都是测试异步爬虫的关键技术。WebSocket测试则需要关注连接建立、消息收发、心跳维持和断线重连等异步事件的处理正确性。

# 实战:FastAPI异步接口完整测试 import pytest from httpx import AsyncClient, ASGITransport from app import app # FastAPI应用 from unittest.mock import AsyncMock, patch @pytest.fixture async def async_client(): """FastAPI测试客户端""" transport = ASGITransport(app=app) async with AsyncClient( transport=transport, base_url="http://test" ) as client: yield client @pytest.fixture def mock_email_service(): """模拟外部邮件服务""" with patch( "app.services.email.send_email", new_callable=AsyncMock ) as mock: mock.return_value = {"status": "sent"} yield mock @pytest.mark.asyncio async def test_register_user( async_client, mock_email_service ): """测试用户注册完整流程""" response = await async_client.post("/api/register", json={ "username": "newuser", "email": "newuser@example.com", "password": "SecurePass123!" }) assert response.status_code == 201 assert response.json()["username"] == "newuser" # 验证邮件服务被调用 mock_email_service.assert_awaited_once() args, _ = mock_email_service.await_args assert args[0] == "newuser@example.com" @pytest.mark.asyncio async def test_register_duplicate(async_client): """测试重复注册处理""" response = await async_client.post("/api/register", json={ "username": "existing", "email": "existing@test.com", "password": "SecurePass123!" }) assert response.status_code == 409 assert "already exists" in response.text
# 实战:异步爬虫测试 from unittest.mock import AsyncMock, patch import pytest class AsyncCrawler: """简单的异步爬虫""" def __init__(self, session): self.session = session async def fetch_page(self, url: str) -> str: async with self.session.get(url) as resp: return await resp.text() async def crawl(self, urls: list) -> dict: results = {} for url in urls: try: html = await asyncio.wait_for( self.fetch_page(url), timeout=5.0 ) results[url] = {"status": "success", "size": len(html)} except asyncio.TimeoutError: results[url] = {"status": "timeout"} return results @pytest.mark.asyncio async def test_crawler_success(): """测试爬虫成功抓取""" mock_session = AsyncMock() mock_response = AsyncMock() mock_response.text = AsyncMock( return_value="content" ) mock_session.get.return_value.__aenter__.return_value = ( mock_response ) crawler = AsyncCrawler(mock_session) result = await crawler.crawl(["https://example.com"]) assert result["https://example.com"]["status"] == "success" mock_session.get.assert_awaited_once_with( "https://example.com" ) @pytest.mark.asyncio async def test_crawler_timeout(): """测试爬虫超时处理""" mock_session = AsyncMock() mock_response = AsyncMock() mock_response.text = AsyncMock( side_effect=asyncio.TimeoutError ) mock_session.get.return_value.__aenter__.return_value = ( mock_response ) crawler = AsyncCrawler(mock_session) result = await crawler.crawl(["https://slow-site.com"]) assert result["https://slow-site.com"]["status"] == "timeout"
# 实战:WebSocket测试 from unittest.mock import AsyncMock, patch import pytest class AsyncWebSocketClient: """异步WebSocket客户端""" def __init__(self): self.connected = False self.messages = [] async def connect(self, url: str): self.connected = True self.url = url async def send(self, message: str): if not self.connected: raise ConnectionError("未连接") self.messages.append(message) async def receive(self) -> str: if not self.connected: raise ConnectionError("未连接") return await self._fetch_message() async def disconnect(self): self.connected = False @pytest.mark.asyncio async def test_websocket_lifecycle(): """测试WebSocket完整生命周期""" client = AsyncWebSocketClient() # 连接 await client.connect("ws://server/chat") assert client.connected # 发送和接收 await client.send("Hello") assert len(client.messages) == 1 # 断开 await client.disconnect() assert not client.connected # 断线后发送应报错 with pytest.raises(ConnectionError): await client.send("test")

核心要点总结:

1. pytest-asyncio是Python异步测试的标准方案,提供@pytest.mark.asyncio装饰器和asyncio_mode配置(Auto/Strict/Legacy)。

2. 异步fixture使用async def定义,支持yield清理和依赖注入,是组织异步测试资源的最佳方式。

3. AsyncMock(Python 3.8+)专门用于模拟异步函数、异步上下文管理器和异步迭代器,提供assert_awaited等断言方法。

4. 异步HTTP测试推荐aresponses(aiohttp)和respx/pytest-httpx(httpx),无需真实网络请求即可验证HTTP行为。

5. 异步数据库测试的核心策略是"事务回滚隔离",确保每个测试运行在干净的数据库状态下。

6. 超时和取消测试使用asyncio.wait_for结合pytest.raises,验证异常处理和资源清理的正确性。

7. 事件循环管理通过loop_scope控制隔离级别,高级场景可使用自定义EventLoopPolicy或uvloop提升性能。

8. FastAPI、异步爬虫和WebSocket等实战场景综合运用上述技术,构建完整的异步测试体系。