unittest进阶:TestSuite/TestRunner/Fixture

Python 测试与调试专题 · 深入掌握unittest框架的高级特性

专题:Python 测试与调试系统学习

关键词:Python, 测试, 调试, unittest, TestSuite, TestRunner, Fixture, 测试框架, Python测试

一、TestSuite高级用法

TestSuite是unittest框架中用于组织和聚合测试用例的核心容器。基础的TestSuite用法是简单地将TestCase子类组合在一起执行,但在实际项目中我们需要更精细的控制能力。高级TestSuite用法包括按标签组合测试、按模块分层组合、创建嵌套的TestSuite结构、根据运行条件动态加载测试、以及精确控制测试的执行顺序等。掌握这些高级技巧可以让你在组织大型测试项目时事半功倍。

1. 按标签组合测试

在大型项目中,测试可能被标记为"smoke"、"regression"、"slow"、"fast"等不同类别。通过为测试方法添加标签属性,可以有选择性地组合执行特定类别的测试,这在CI/CD流水线中尤为重要。

import unittest class UserTests(unittest.TestCase): def test_login_success(self): self.tag = "smoke" # ... 测试逻辑 def test_login_failure(self): self.tag = "regression" # ... 测试逻辑 def test_password_reset(self): self.tag = "slow" # ... 测试逻辑 def suite_by_tag(tag_name): """根据标签动态构建TestSuite""" suite = unittest.TestSuite() loader = unittest.TestLoader() # 加载模块中所有TestCase all_tests = loader.loadTestsFromTestCase(UserTests) for test in all_tests: # 获取测试方法并检查标签 test_method = test._testMethodName method = getattr(UserTests, test_method) if hasattr(method, 'tag') and method.tag == tag_name: suite.addTest(test) return suite

2. 嵌套TestSuite与模块组合

实际项目中的测试结构往往是多层次的。unittest的TestSuite支持嵌套组合,允许你按模块、按功能域或按子系统层级组织测试。这种方式使得你可以灵活地运行不同粒度的测试集合。

import unittest from tests import test_user, test_order, test_payment def build_master_suite(): """构建分层嵌套的TestSuite""" loader = unittest.TestLoader() # 用户模块测试套件 user_suite = unittest.TestSuite() user_suite.addTests(loader.loadTestsFromModule(test_user)) # 订单模块测试套件 order_suite = unittest.TestSuite() order_suite.addTests(loader.loadTestsFromModule(test_order)) # 支付模块测试套件 payment_suite = unittest.TestSuite() payment_suite.addTests(loader.loadTestsFromModule(test_payment)) # 业务模块主套件 biz_suite = unittest.TestSuite([user_suite, order_suite]) # 顶层主套件(合并所有) master_suite = unittest.TestSuite([biz_suite, payment_suite]) return master_suite if __name__ == '__main__': runner = unittest.TextTestRunner(verbosity=2) runner.run(build_master_suite())

3. 执行顺序控制

unittest默认按方法名的字典序执行测试。当测试之间存在隐式依赖时(如需要先注册再登录),可以通过TestSuite的addTest顺序精确控制执行流。

import unittest class TestUserFlow(unittest.TestCase): def test_01_register(self): print("1. 用户注册") self.assertTrue(True) def test_02_login(self): print("2. 用户登录") self.assertTrue(True) def test_03_update_profile(self): print("3. 更新资料") self.assertTrue(True) def test_04_delete_account(self): print("4. 删除账号") self.assertTrue(True) def ordered_suite(): """按自定义顺序执行测试""" suite = unittest.TestSuite() # 按业务流程顺序添加,不受方法名限制 suite.addTest(TestUserFlow('test_01_register')) suite.addTest(TestUserFlow('test_02_login')) suite.addTest(TestUserFlow('test_04_delete_account')) # 跳过中间步骤 suite.addTest(TestUserFlow('test_03_update_profile')) return suite

二、自定义TestRunner

unittest内置的TextTestRunner会在控制台输出测试结果,但在自动化测试平台、持续集成系统或需要定制化报告的场景中,默认的Runner无法满足需求。通过继承TextTestRunner或实现TestRunner接口,我们可以完全控制测试的执行过程、输出格式、进度展示和结果收集方式。自定义TestRunner是现代测试基础设施建设的核心环节。

1. 自定义输出格式与进度显示

以下示例展示如何创建一个带彩色输出和实时进度条的自定义Runner,它比默认的TextTestRunner提供更直观的反馈。

import unittest import time import sys class ProgressTestRunner(unittest.TextTestRunner): """带进度显示的自定义TestRunner""" def run(self, test): # 预先统计测试总数 result = self._makeResult() total = test.countTestCases() executed = [0] # 包装result以拦截每个测试执行 original_startTest = result.startTest original_addSuccess = result.addSuccess original_addFailure = result.addFailure def startTest_wrapper(test): executed[0] += 1 percent = executed[0] / total * 100 bar = '#' * int(percent / 5) + '-' * (20 - int(percent / 5)) sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({executed[0]}/{total})') sys.stdout.flush() original_startTest(test) result.startTest = startTest_wrapper sys.stdout.write('\n开始执行测试...\n') start = time.time() test(result) elapsed = time.time() - start sys.stdout.write(f'\n\n总耗时: {elapsed:.2f}s\n') return result

2. HTML报告生成器

在企业级测试框架中,生成HTML格式的测试报告是常见需求。下面展示如何通过自定义TestRunner将测试结果输出为结构化的HTML报告。

import unittest from datetime import datetime class HtmlReportRunner(unittest.TextTestRunner): """生成HTML测试报告的自定义Runner""" def __init__(self, report_path='test_report.html', *args, **kwargs): super().__init__(*args, **kwargs) self.report_path = report_path def run(self, test): result = super().run(test) # 收集统计数据 total = result.testsRun successes = total - len(result.failures) - len(result.errors) failures = len(result.failures) errors = len(result.errors) # 生成HTML报告 html = f'''<!DOCTYPE html> <html><head><title>测试报告</title></head><body> <h1>测试执行报告</h1> <p>执行时间: {datetime.now()}</p> <table border="1"> <tr><th>总数</th><th>通过</th><th>失败</th><th>错误</th></tr> <tr> <td>{total}</td><td>{successes}</td> <td>{failures}</td><td>{errors}</td> </tr></table></body></html>''' with open(self.report_path, 'w', encoding='utf-8') as f: f.write(html) print(f'\n报告已生成: {self.report_path}') return result

3. 结果收集与汇总

在分布式测试或批量测试场景中,需要将多个Runner的测试结果汇总分析。以下展示如何创建一个结果收集器Runner。

import unittest class CollectingTestRunner: """收集测试结果的Runner(不输出到控制台)""" def __init__(self): self.results = [] def run(self, test_or_suite): suite = unittest.TestSuite() suite.addTest(test_or_suite) # 使用StringIO捕获输出 import io stream = io.StringIO() runner = unittest.TextTestRunner(stream=stream, verbosity=0) result = runner.run(suite) self.results.append({ 'tests_run': result.testsRun, 'failures': len(result.failures), 'errors': len(result.errors), 'was_successful': result.wasSuccessful(), 'output': stream.getvalue(), }) return result def summary(self): total = sum(r['tests_run'] for r in self.results) failed = sum(r['failures'] for r in self.results) errors = sum(r['errors'] for r in self.results) return { 'total': total, 'failed': failed, 'errors': errors, 'success_rate': (total - failed - errors) / total * 100 if total else 0 } # 使用示例 collector = CollectingTestRunner() collector.run(unittest.TestLoader().loadTestsFromTestCase(UserTests)) print(collector.summary())

三、Fixture深度管理

Fixture是unittest框架中用于准备测试环境和清理测试资源的机制。理解Fixture的不同作用域以及如何正确处理Fixture中的异常和依赖关系,是编写健壮测试的关键。unittest提供了三个级别的Fixture:模块级(setUpModule/tearDownModule)、类级(setUpClass/tearDownClass)和方法级(setUp/tearDown)。在实际项目中还需要管理数据库连接、文件句柄、网络服务等外部资源的生命周期。

1. Fixture作用域详解

不同作用域的Fixture决定了资源何时初始化、何时清理。正确选择Fixture作用域可以大幅提升测试效率。模块级Fixture在整个模块执行前后运行一次,适合初始化数据库连接池、加载配置文件等重量级操作。类级Fixture在类的所有测试前后运行,适合创建共享对象。方法级Fixture在每条测试前后运行,保证测试隔离性。

import unittest import os def setUpModule(): """模块级Fixture:整个模块执行前运行一次""" print('\n[setUpModule] 初始化测试数据库连接') os.environ['TEST_MODE'] = 'true' def tearDownModule(): """模块级Fixture:整个模块执行后运行一次""" print('\n[tearDownModule] 关闭测试数据库连接') os.environ.pop('TEST_MODE', None) class TestDatabaseFixture(unittest.TestCase): db_connection = None @classmethod def setUpClass(cls): """类级Fixture:类所有测试前运行一次""" print('\n[setUpClass] 创建数据库表结构') cls.db_connection = {'host': 'localhost', 'db': 'test', 'connected': True} @classmethod def tearDownClass(cls): """类级Fixture:类所有测试后运行一次""" print('\n[tearDownClass] 删除数据库表结构') cls.db_connection = None def setUp(self): """方法级Fixture:每条测试前运行""" print(f'\n[setUp] 准备测试数据 - {self._testMethodName}') self.test_data = {'id': 1, 'name': 'test_user'} def tearDown(self): """方法级Fixture:每条测试后运行""" print(f'\n[tearDown] 清理测试数据 - {self._testMethodName}') self.test_data = None def test_insert(self): self.assertIsNotNone(self.db_connection) self.assertEqual(self.test_data['name'], 'test_user') def test_query(self): self.assertTrue(self.db_connection['connected'])

2. Fixture异常处理

Fixture中的异常处理容易被忽视。如果setUp方法抛出异常,对应的tearDown方法将不会被执行,这可能导致资源泄露。因此,在Fixture中必须妥善处理可能出现的异常,确保资源在任何情况下都能被正确释放。

import unittest class SafeFixtureTest(unittest.TestCase): """安全的Fixture异常处理示例""" def setUp(self): self.resources = [] try: # 可能失败的外部资源 self.db = self._connect_db() self.resources.append('db') self.file = open('/tmp/test_data.txt', 'w') self.resources.append('file') except Exception as e: # 清理已成功分配的资源 self._cleanup_resources() raise RuntimeError(f'Fixture初始化失败: {e}') def tearDown(self): """保证无论setUp是否成功,资源都被清理""" self._cleanup_resources() def _cleanup_resources(self): for resource in self.resources[:]: try: if resource == 'db' and hasattr(self, 'db'): self.db.close() elif resource == 'file' and hasattr(self, 'file'): self.file.close() self.resources.remove(resource) except Exception as e: print(f'清理资源 {resource} 时出错: {e}') def _connect_db(self): # 模拟数据库连接 import socket s = socket.socket() s.connect(('localhost', 3306)) return s def test_example(self): self.assertEqual(len(self.resources), 2)

3. 外部资源生命周期管理

管理数据库、文件、网络连接等外部资源时,Fixtures需要处理创建、验证、清理全生命周期。推荐使用上下文管理器风格的资源管理方式,确保资源在测试结束后立即释放。

import unittest import tempfile import os import json class ResourceManagerTest(unittest.TestCase): """外部资源管理最佳实践""" temp_dir = None @classmethod def setUpClass(cls): """创建临时目录""" cls.temp_dir = tempfile.mkdtemp() @classmethod def tearDownClass(cls): """清理临时目录及其所有文件""" if cls.temp_dir and os.path.exists(cls.temp_dir): import shutil shutil.rmtree(cls.temp_dir) def setUp(self): """创建测试配置文件""" self.config = { 'debug': True, 'max_retries': 3, 'timeout': 30 } self.config_file = os.path.join(self.temp_dir, 'config.json') with open(self.config_file, 'w') as f: json.dump(self.config, f) def tearDown(self): """删除测试配置文件""" if os.path.exists(self.config_file): os.remove(self.config_file) def test_config_loading(self): """测试配置文件加载""" with open(self.config_file, 'r') as f: loaded = json.load(f) self.assertEqual(loaded['debug'], True) self.assertEqual(loaded['timeout'], 30) def test_config_update(self): """测试配置更新""" self.config['debug'] = False with open(self.config_file, 'w') as f: json.dump(self.config, f) with open(self.config_file, 'r') as f: loaded = json.load(f) self.assertFalse(loaded['debug'])

四、测试监听器与自定义TestResult

unittest框架提供了TestResult类来收集和记录测试执行过程中的各种事件。通过继承TestResult并重写其回调方法(addSuccess、addFailure、addError、addSkip等),我们可以创建自定义的测试监听器,实现对测试执行全过程的监控。这在测试报告定制、失败截图、日志采集和实时通知等场景中非常有用。

1. 自定义TestResult监听器

以下示例展示如何创建一个自定义的TestResult,它会在每次测试事件发生时记录详细信息,包括执行时间、失败截图和日志内容。

import unittest import time import sys class DetailedTestResult(unittest.TestResult): """详细的测试结果收集器""" def __init__(self, stream=None, descriptions=None, verbosity=None): super().__init__(stream, descriptions, verbosity) self.test_details = [] self.start_times = {} def startTest(self, test): """测试开始时记录时间""" self.start_times[test.id()] = time.time() super().startTest(test) def addSuccess(self, test): """测试通过时记录""" elapsed = time.time() - self.start_times.pop(test.id(), time.time()) self.test_details.append({ 'test': test.id(), 'status': 'PASS', 'elapsed': f'{elapsed:.3f}s', 'timestamp': time.strftime('%H:%M:%S') }) super().addSuccess(test) def addFailure(self, test, err): """测试失败时记录详细信息""" elapsed = time.time() - self.start_times.pop(test.id(), time.time()) self.test_details.append({ 'test': test.id(), 'status': 'FAIL', 'elapsed': f'{elapsed:.3f}s', 'error_info': self._exc_info_to_string(err, test), 'timestamp': time.strftime('%H:%M:%S') }) # 可在此处触发截图 self._capture_screenshot(test) super().addFailure(test, err) def addError(self, test, err): """测试错误时记录""" elapsed = time.time() - self.start_times.pop(test.id(), time.time()) self.test_details.append({ 'test': test.id(), 'status': 'ERROR', 'elapsed': f'{elapsed:.3f}s', 'error_info': self._exc_info_to_string(err, test), 'timestamp': time.strftime('%H:%M:%S') }) super().addError(test, err) def _capture_screenshot(self, test): """模拟截图保存""" print(f'\n[截图保存] {test.id()}_failure.png') # 实际项目中可调用 Selenium 等工具截图 def print_summary(self): """打印汇总报告""" print('\n===== 测试执行报告 =====') for detail in self.test_details: status_color = '✓' if detail['status'] == 'PASS' else '✗' print(f'{status_color} {detail["test"]} - {detail["status"]} ({detail["elapsed"]})') print(f'\n总计: {self.testsRun} | ' f'通过: {self.testsRun - len(self.failures) - len(self.errors)} | ' f'失败: {len(self.failures)} | 错误: {len(self.errors)}')

2. 基于TestResult的计时器Runner

将自定义TestResult与TestRunner集成,可以创建功能完整的计时测试运行器,清晰展示每条测试的耗时,帮助快速定位性能瓶颈。

import unittest class TimedTestRunner(unittest.TextTestRunner): """带执行时间统计的TestRunner""" def _makeResult(self): return DetailedTestResult( self.stream, self.descriptions, self.verbosity ) def run(self, test): result = super().run(test) result.print_summary() return result # 使用示例 if __name__ == '__main__': suite = unittest.TestLoader().loadTestsFromTestCase(MyTestCase) runner = TimedTestRunner(verbosity=0) runner.run(suite)

3. 测试事件钩子系统

在企业级测试框架中,事件钩子机制允许在不侵入测试代码的前提下插入横切关注点,比如测试执行前后的日志记录、性能监控和外部系统通知。

import unittest from datetime import datetime class TestHookMixin: """测试钩子混入类""" @classmethod def setUpClass(cls): """钩子:测试类执行前""" print(f'\n[{datetime.now()}] 开始测试类: {cls.__name__}') cls._test_count = 0 cls._failure_count = 0 super().setUpClass() def setUp(self): """钩子:每条测试前""" self._test_count += 1 self._test_start = datetime.now() print(f'\n[{self._test_start}] 执行: {self._testMethodName}') super().setUp() def tearDown(self): """钩子:每条测试后""" elapsed = (datetime.now() - self._test_start).total_seconds() has_failed = any( test == self for test, _ in self._outcome.errors if hasattr(self, '_outcome') ) status = 'FAIL' if has_failed else 'PASS' print(f'[{datetime.now()}] 完成: {self._testMethodName} [{status}] ({elapsed:.2f}s)') super().tearDown() class MyHookedTest(TestHookMixin, unittest.TestCase): """使用钩子的测试类""" def test_a(self): self.assertEqual(1, 1) def test_b(self): self.assertEqual(2, 2)

五、测试超时控制

在测试中,某些操作可能因为网络延迟、资源争用或死循环而无限期挂起。为测试添加超时控制可以防止测试进程被卡死,确保整个测试套件能在合理时间内完成。Python中实现测试超时主要有信号机制和线程机制两种方式。信号方式效率高但只能在主线程中使用,线程方式通用性强但有一些额外开销。理解两种方式的优劣,能够根据场景选择合适的超时策略。

1. 基于信号的超时控制

signal模块提供了一种高效的超时实现方式,它利用操作系统信号在指定时间后中断当前执行。这种方法不会创建额外线程,性能开销极小。但需要注意的是,signal模块在Windows平台支持有限,且不能在非主线程中使用。

import unittest import signal class TimeoutError(AssertionError): """自定义超时异常""" pass def timeout(seconds): """基于信号量的超时装饰器""" def decorator(func): def handler(signum, frame): raise TimeoutError(f'测试执行超时 ({seconds}s)') def wrapper(*args, **kwargs): old_handler = signal.signal(signal.SIGALRM, handler) signal.alarm(seconds) try: result = func(*args, **kwargs) return result finally: signal.alarm(0) # 取消定时器 signal.signal(signal.SIGALRM, old_handler) return wrapper return decorator class TestWithSignalTimeout(unittest.TestCase): @timeout(2) def test_fast_operation(self): """2秒内应完成的快速操作""" import time time.sleep(1) self.assertTrue(True) @timeout(1) def test_slow_operation(self): """超过1秒会触发的超时测试""" import time time.sleep(3) # 将会超时 self.assertTrue(True)

2. 基于线程的超时控制

线程方式的超时控制通过在新线程中执行测试函数,主线程等待指定时间后强制终止。这种方式不依赖操作系统信号,在所有平台上都能工作,非常适合测试网络请求等阻塞操作。

import unittest import threading import functools class ThreadTimeoutError(Exception): pass def thread_timeout(seconds): """基于线程的超时装饰器(跨平台)""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): result = [None] exception = [None] finished = threading.Event() def target(): try: result[0] = func(*args, **kwargs) except Exception as e: exception[0] = e finally: finished.set() thread = threading.Thread(target=target) thread.daemon = True thread.start() if not finished.wait(timeout=seconds): raise ThreadTimeoutError( f'测试在 {seconds}s 内未完成' ) if exception[0]: raise exception[0] return result[0] return wrapper return decorator class TestWithThreadTimeout(unittest.TestCase): @thread_timeout(3) def test_api_call(self): """模拟带有超时的API调用""" import time time.sleep(2) self.assertEqual(200, 200) @thread_timeout(2) def test_hanging_call(self): """模拟挂起的调用(将会超时)""" import time while True: time.sleep(1)

3. 超时后清理机制

超时发生时,测试资源可能处于不一致状态。必须建立超时后的清理机制,确保已创建的资源被正确释放,避免影响后续测试的执行。

import unittest import threading import time class SafeTimeoutRunner: """带超时清理的测试执行器""" def run_with_cleanup(self, test_case, timeout_seconds=5): test_thread = threading.Thread(target=test_case, daemon=True) test_thread.start() test_thread.join(timeout_seconds) if test_thread.is_alive(): print(f'[清理] 测试 {test_case._testMethodName} 超时,执行清理') # 执行紧急清理操作 self._emergency_cleanup(test_case) raise TimeoutError(f'测试超时 ({timeout_seconds}s)') def _emergency_cleanup(self, test_case): """紧急清理:关闭可能打开的资源""" if hasattr(test_case, 'db') and test_case.db: try: test_case.db.close() print('[清理] 数据库连接已关闭') except Exception: pass if hasattr(test_case, 'file_obj') and test_case.file_obj: try: test_case.file_obj.close() print('[清理] 文件句柄已关闭') except Exception: pass

六、测试依赖与执行顺序

在理想情况下,每条测试应该是独立的,可以以任意顺序执行。但在实际项目中,某些测试天然存在依赖关系,例如需要先创建用户才能测试登录,需要先有订单才能测试退款。unittest框架本身不提供原生的测试依赖支持,但通过TestSuite的精确控制以及第三方库的帮助,我们可以实现有序测试、依赖测试以及失败后自动跳过后续测试的能力。

1. 有序测试实现

通过自定义TestSuite并按特定顺序添加测试方法,可以精确控制测试的执行流程。配合类属性记录状态,可以在不同测试方法间传递数据。

import unittest class TestWorkflow(unittest.TestCase): """业务流程测试:步骤间传递状态""" shared_state = {} def test_01_create_user(self): """步骤1:创建用户""" user_id = 1001 self.shared_state['user_id'] = user_id self.shared_state['user_created'] = True print(f'用户创建成功, ID: {user_id}') self.assertTrue(self.shared_state['user_created']) def test_02_create_order(self): """步骤2:创建订单(依赖用户)""" self.assertTrue( self.shared_state.get('user_created', False), '必须先创建用户' ) order_id = 50001 self.shared_state['order_id'] = order_id self.shared_state['order_created'] = True print(f'订单创建成功, ID: {order_id}') self.assertTrue(self.shared_state['order_created']) def test_03_process_payment(self): """步骤3:处理支付(依赖用户和订单)""" self.assertTrue(self.shared_state.get('user_created')) self.assertTrue(self.shared_state.get('order_created')) payment_id = 'PAY_20260301_001' self.shared_state['payment_id'] = payment_id print(f'支付成功, 支付ID: {payment_id}') self.assertEqual(len(payment_id), 19) def test_04_refund(self): """步骤4:退款(依赖支付)""" self.assertIn('payment_id', self.shared_state) refund_amount = 99.99 print(f'退款成功, 金额: {refund_amount}') self.assertGreater(refund_amount, 0) # 按流程顺序执行 def workflow_suite(): suite = unittest.TestSuite() suite.addTest(TestWorkflow('test_01_create_user')) suite.addTest(TestWorkflow('test_02_create_order')) suite.addTest(TestWorkflow('test_03_process_payment')) suite.addTest(TestWorkflow('test_04_refund')) return suite

2. 依赖装饰器实现

使用装饰器可以更优雅地表达测试之间的依赖关系。当被依赖的测试失败时,依赖它的测试自动跳过并标注原因,而不是以错误或失败的状态呈现。

import unittest def depends_on(dep_test_name): """依赖装饰器:指定依赖的测试方法名""" def decorator(test_method): def wrapper(self, *args, **kwargs): outcome = self._outcome if hasattr(outcome, 'errors'): # 检查依赖测试是否失败 for test, error in outcome.errors + outcome.failures: if test._testMethodName == dep_test_name: self.skipTest( f'依赖测试 {dep_test_name} 未通过' ) return test_method(self, *args, **kwargs) return wrapper return decorator class TestWithDependency(unittest.TestCase): def test_login(self): """登录测试""" self.assertEqual(200, 200) # 模拟登录成功 @depends_on('test_login') def test_get_profile(self): """获取用户资料(依赖登录)""" # 只有在登录成功后才会执行 profile = {'name': 'Alice', 'role': 'admin'} self.assertEqual(profile['role'], 'admin') @depends_on('test_get_profile') def test_update_profile(self): """更新资料(依赖获取资料)""" # 如果前序失败会自动跳过 self.assertTrue(True)

3. 测试排序策略

在使用测试发现(test discovery)时,可以自定义加载器的排序策略来控制测试执行顺序。以下展示如何根据方法名中的数字前缀或者自定义属性来排序。

import unittest class OrderedTestLoader(unittest.TestLoader): """按方法名前缀数字排序的加载器""" def getTestCaseNames(self, testCaseClass): names = super().getTestCaseNames(testCaseClass) # 提取数字前缀并排序 def sort_key(name): import re match = re.match(r'test_(\d+)', name) return int(match.group(1)) if match else 999 return sorted(names, key=sort_key) class TestPriorityOrder(unittest.TestCase): """优先级顺序测试""" def test_03_checkout(self): self.assertTrue(True) def test_01_add_to_cart(self): self.assertTrue(True) def test_02_apply_coupon(self): self.assertTrue(True) # 使用自定义加载器 loader = OrderedTestLoader() suite = loader.loadTestsFromTestCase(TestPriorityOrder) if __name__ == '__main__': runner = unittest.TextTestRunner(verbosity=2) runner.run(suite)

七、失败与错误处理

在unittest框架中,测试未通过有两种表现形式:断言失败(Failure)和异常错误(Error)。断言失败是指测试中的assert语句条件不满足,表示预期结果与实际结果不一致;异常错误是指测试执行过程中抛出了未捕获的异常,可能是代码bug或环境问题导致的崩溃。正确区分和处理这两种情况,对于测试结果分析和问题定位至关重要。

1. 断言失败与异常错误的区别

断言失败使用AssertionError,通常表示业务逻辑不符合预期;异常错误使用各种其他异常类型(TypeError、ValueError、ConnectionError等),通常表示代码本身存在缺陷或测试环境有问题。下面展示如何分别收集和展示两类问题。

import unittest class TestFailureVsError(unittest.TestCase): """展示断言失败与异常错误的区别""" def test_assertion_failure(self): """断言失败:预期值不匹配""" actual = add(2, 2) # 假设add函数返回了错误结果 self.assertEqual(actual, 5) # 触发断言失败 (Failure) def test_exception_error(self): """异常错误:代码执行异常""" data = None data['key'] = 'value' # 触发TypeError (Error) def test_mixed_scenario(self): """混合场景:同时检查多个条件""" result = process_data(15) # 第一个断言失败时,后续代码不执行 self.assertIsNotNone(result) self.assertEqual(result['status'], 'ok') # 若前一行失败,本行不执行 # 自定义结果处理 def analyze_test_result(result): print(f'\n测试总数: {result.testsRun}') print(f'断言失败: {len(result.failures)}') for test, traceback in result.failures: print(f' [FAIL] {test.id()}') print(f' 原因: 断言条件不满足') print(f'异常错误: {len(result.errors)}') for test, traceback in result.errors: print(f' [ERROR] {test.id()}') print(f' 原因: 代码执行异常') if result.wasSuccessful(): print('结果: 全部通过!') elif result.failures and not result.errors: print('结果: 存在断言失败,请检查预期值') elif result.errors and not result.failures: print('结果: 存在异常错误,请检查代码健壮性') else: print('结果: 同时存在断言失败和异常错误')

2. 错误详情收集与保存

在自动化测试执行中,自动收集和保存详细的错误信息对于问题复现和分析至关重要。以下示例展示如何将失败的测试详细信息(包括堆栈跟踪、测试数据和运行环境)保存到日志文件中。

import unittest import datetime import traceback import json class ErrorDetailCollector(unittest.TestResult): """错误详情收集器""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.failure_details = [] self.error_details = [] def addFailure(self, test, err): super().addFailure(test, err) self.failure_details.append({ 'test_id': test.id(), 'method': test._testMethodName, 'timestamp': datetime.datetime.now().isoformat(), 'traceback': ''.join(traceback.format_exception(*err)), 'type': 'AssertionError', }) def addError(self, test, err): super().addError(test, err) self.error_details.append({ 'test_id': test.id(), 'method': test._testMethodName, 'timestamp': datetime.datetime.now().isoformat(), 'traceback': ''.join(traceback.format_exception(*err)), 'type': type(err[1]).__name__ if err[1] else 'Unknown', }) def save_to_file(self, filepath='test_errors.json'): """保存错误详情到JSON文件""" all_issues = { 'failures': self.failure_details, 'errors': self.error_details, 'summary': { 'total': self.testsRun, 'failures': len(self.failure_details), 'errors': len(self.error_details), } } with open(filepath, 'w', encoding='utf-8') as f: json.dump(all_issues, f, ensure_ascii=False, indent=2) print(f'错误详情已保存至: {filepath}')

3. 测试失败自动重试

对于某些不稳定的测试(如网络相关的测试),首次失败可能是临时性问题。实现自动重试机制可以提高测试的可靠性,同时记录重试历史用于后续分析。

import unittest import functools def retry_on_failure(max_retries=3): """测试失败自动重试装饰器""" def decorator(test_method): @functools.wraps(test_method) def wrapper(self, *args, **kwargs): last_exception = None for attempt in range(1, max_retries + 1): try: return test_method(self, *args, **kwargs) except AssertionError as e: last_exception = e print(f'[重试] 第 {attempt} 次失败: {test_method.__name__}') if attempt < max_retries: self.setUp() # 重新初始化fixture raise last_exception return wrapper return decorator class TestWithRetry(unittest.TestCase): """带自动重试的测试类""" @retry_on_failure(max_retries=3) def test_flaky_network_call(self): """模拟不稳定的网络调用""" import random result = random.choice([True, False, False]) self.assertTrue(result, '网络请求临时失败')

八、TestLoader定制

TestLoader是unittest框架中负责发现和加载测试用例的核心组件。它提供了从模块、类或目录自动发现测试的能力。通过继承TestLoader并重写其方法,我们可以实现自定义的测试发现策略,如按文件名模式过滤、按测试类属性筛选、从特定目录结构加载等。定制TestLoader是构建大型测试框架的基础工作之一。

1. 自定义测试发现过滤

默认的test discovery会加载所有匹配`test*.py`的文件中的TestCase子类。通过自定义Loader,可以增加更精细的过滤条件,比如跳过某些标记为慢速的测试、只加载特定命名空间的测试等。

import unittest class FilteredTestLoader(unittest.TestLoader): """支持过滤的自定义TestLoader""" def __init__(self, include_tags=None, exclude_tags=None): super().__init__() self.include_tags = include_tags or set() self.exclude_tags = exclude_tags or set() def loadTestsFromTestCase(self, testCaseClass): if hasattr(testCaseClass, 'tags'): class_tags = set(testCaseClass.tags) # 排除包含排除标签的测试类 if self.exclude_tags and class_tags & self.exclude_tags: return unittest.TestSuite() # 包含指定标签的才加载 if self.include_tags and not (class_tags & self.include_tags): return unittest.TestSuite() return super().loadTestsFromTestCase(testCaseClass) def getTestCaseNames(self, testCaseClass): names = super().getTestCaseNames(testCaseClass) filtered = [] for name in names: method = getattr(testCaseClass, name) method_tags = getattr(method, 'tags', set()) if self.exclude_tags and method_tags & self.exclude_tags: continue if self.include_tags and not (method_tags & self.include_tags): continue filtered.append(name) return filtered # 用法示例:只加载冒烟测试 loader = FilteredTestLoader(include_tags={'smoke'}) smoke_suite = loader.discover('tests')

2. 模块级测试加载控制

当项目结构比较复杂时,需要精确控制从哪些模块加载测试。自定义Loader可以按模块的路径模式、模块内定义的配置变量等条件来决定是否加载该模块中的测试。

import unittest import importlib import pkgutil def load_tests_from_package(package_name, version_filter=None): """从包中动态加载测试,支持版本过滤""" loader = unittest.TestLoader() master_suite = unittest.TestSuite() package = importlib.import_module(package_name) path = getattr(package, '__path__', []) for importer, modname, ispkg in pkgutil.iter_modules(path): if not modname.startswith('test_'): continue module = importlib.import_module(f'{package_name}.{modname}') # 检查模块是否定义了版本要求 module_version = getattr(module, 'TEST_VERSION', None) if version_filter and module_version: if module_version != version_filter: print(f'跳过模块 {modname} (版本: {module_version})') continue suite = loader.loadTestsFromModule(module) master_suite.addTests(suite) print(f'已加载模块: {modname}, 测试数: {suite.countTestCases()}') return master_suite class SelectiveTestLoader(unittest.TestLoader): """选择性加载器:只加载指定命名空间的测试""" def loadTestsFromModule(self, module, *args, **kwargs): suite = super().loadTestsFromModule(module, *args, **kwargs) # 排除集成测试(除非显式要求) if getattr(module, 'TEST_TYPE', None) == 'integration': return unittest.TestSuite() return suite # 使用示例 if __name__ == '__main__': suite = SelectiveTestLoader().discover( 'tests', pattern='test_*.py', top_level_dir='.' ) runner = unittest.TextTestRunner(verbosity=2) runner.run(suite)

3. 自定义类加载器

除了过滤方法,还可以控制如何从TestCase类中提取测试方法。以下示例展示如何使用加载器只加载标记了特定装饰器的方法。

import unittest def smoke_test(func): """标记为冒烟测试的装饰器""" func._smoke = True return func class SmokeTestLoader(unittest.TestLoader): """只加载冒烟测试的Loader""" def getTestCaseNames(self, testCaseClass): all_names = super().getTestCaseNames(testCaseClass) smoke_names = [] for name in all_names: method = getattr(testCaseClass, name) if getattr(method, '_smoke', False): smoke_names.append(name) return smoke_names class MyAppTest(unittest.TestCase): @smoke_test def test_critical_login(self): self.assertTrue(True) def test_detail_page(self): self.assertTrue(True) @smoke_test def test_db_connection(self): self.assertTrue(True) # 使用SmokeTestLoader loader = SmokeTestLoader() suite = loader.loadTestsFromTestCase(MyAppTest) print(f'冒烟测试数量: {suite.countTestCases()}') # 输出: 2

九、实战案例

理论知识需要通过实践来巩固。本实战章节将综合运用前述的TestSuite、TestRunner和Fixture等高级特性,构建两个典型的真实测试场景:数据库CRUD操作测试和Web API接口测试。通过这些案例,你将学会如何在实际项目中组织和编写高质量的测试代码。

1. 数据库CRUD测试Fixtures

数据库测试是后端开发中最常见的测试场景之一。一个好的数据库测试方案需要管理数据库连接的生命周期、事务的隔离性以及测试数据的自动清理。以下示例展示如何使用模块级和类级Fixture搭建完整的数据库测试框架。

import unittest import sqlite3 import os # ---- 数据库CRUD测试 ---- TEST_DB_PATH = '/tmp/test_app.db' def setUpModule(): """模块级:创建测试数据库""" global _connection if os.path.exists(TEST_DB_PATH): os.remove(TEST_DB_PATH) _connection = sqlite3.connect(TEST_DB_PATH) _connection.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE, age INTEGER ) ''') _connection.commit() def tearDownModule(): """模块级:清理测试数据库""" global _connection if _connection: _connection.close() if os.path.exists(TEST_DB_PATH): os.remove(TEST_DB_PATH) class TestUserCRUD(unittest.TestCase): """用户CRUD操作测试""" @classmethod def setUpClass(cls): cls.conn = _connection # 每条测试前开启新事务 cls.conn.execute('BEGIN') @classmethod def tearDownClass(cls): # 每条测试后回滚事务,保证隔离性 cls.conn.execute('ROLLBACK') def setUp(self): self.cursor = self.conn.cursor() def test_create_user(self): """测试创建用户""" self.cursor.execute( 'INSERT INTO users (name, email, age) VALUES (?, ?, ?)', ('Alice', 'alice@example.com', 28) ) self.assertEqual(self.cursor.lastrowid, 1) def test_query_user(self): """测试查询用户""" # 插入测试数据 self.cursor.execute( 'INSERT INTO users (name, email, age) VALUES (?, ?, ?)', ('Bob', 'bob@example.com', 35) ) # 查询 self.cursor.execute( 'SELECT * FROM users WHERE email = ?', ('bob@example.com',) ) user = self.cursor.fetchone() self.assertIsNotNone(user) self.assertEqual(user[1], 'Bob') self.assertEqual(user[3], 35) def test_update_user(self): """测试更新用户""" self.cursor.execute( 'INSERT INTO users (name, email, age) VALUES (?, ?, ?)', ('Charlie', 'charlie@example.com', 25) ) self.cursor.execute( 'UPDATE users SET age = ? WHERE email = ?', (26, 'charlie@example.com') ) self.cursor.execute( 'SELECT age FROM users WHERE email = ?', ('charlie@example.com',) ) age = self.cursor.fetchone()[0] self.assertEqual(age, 26) def test_delete_user(self): """测试删除用户""" self.cursor.execute( 'INSERT INTO users (name, email, age) VALUES (?, ?, ?)', ('Diana', 'diana@example.com', 30) ) self.cursor.execute( 'DELETE FROM users WHERE email = ?', ('diana@example.com',) ) self.cursor.execute( 'SELECT COUNT(*) FROM users WHERE email = ?', ('diana@example.com',) ) count = self.cursor.fetchone()[0] self.assertEqual(count, 0) # 创建数据库CRUD测试套件 def db_test_suite(): loader = unittest.TestLoader() return loader.loadTestsFromTestCase(TestUserCRUD)

2. Web API测试套件组织

Web API测试涉及请求发送、响应验证、认证鉴权和异常处理等多个方面。通过精心组织的测试套件和自定义Runner,可以构建清晰且可维护的API测试体系。以下示例演示如何使用测试套件分层组织不同粒度的API测试。

import unittest import json from unittest.mock import patch, Mock class TestUserAPI(unittest.TestCase): """用户API测试""" def setUp(self): self.base_url = 'https://api.example.com/v1' self.headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer test_token' } @patch('requests.post') def test_user_registration(self, mock_post): """测试用户注册API""" mock_response = Mock() mock_response.status_code = 201 mock_response.json.return_value = { 'id': 1001, 'name': 'new_user', 'token': 'jwt_token_xxx' } mock_post.return_value = mock_response # 模拟调用 payload = {'username': 'new_user', 'password': 'SecurePass123'} response = mock_post( f'{self.base_url}/register', headers=self.headers, json=payload ) self.assertEqual(response.status_code, 201) data = response.json() self.assertIn('token', data) self.assertEqual(data['name'], 'new_user') @patch('requests.get') def test_get_user_profile(self, mock_get): """测试获取用户资料""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'id': 1001, 'name': 'Alice', 'email': 'alice@example.com', 'role': 'admin' } mock_get.return_value = mock_response response = mock_get( f'{self.base_url}/users/1001', headers=self.headers ) self.assertEqual(response.status_code, 200) data = response.json() self.assertEqual(data['role'], 'admin') @patch('requests.get') def test_user_not_found(self, mock_get): """测试用户不存在场景""" mock_response = Mock() mock_response.status_code = 404 mock_response.json.return_value = { 'error': 'user_not_found', 'message': '用户不存在' } mock_get.return_value = mock_response response = mock_get( f'{self.base_url}/users/99999', headers=self.headers ) self.assertEqual(response.status_code, 404) data = response.json() self.assertEqual(data['error'], 'user_not_found') class TestOrderAPI(unittest.TestCase): """订单API测试""" @patch('requests.post') def test_create_order(self, mock_post): """测试创建订单""" mock_response = Mock() mock_response.status_code = 201 mock_response.json.return_value = { 'order_id': 'ORD_20260301_001', 'status': 'pending', 'total_amount': 299.99 } mock_post.return_value = mock_response response = mock_post( 'https://api.example.com/v1/orders', json={'user_id': 1001, 'items': [{'product_id': 'P001', 'qty': 2}]} ) self.assertEqual(response.status_code, 201) data = response.json() self.assertIn('order_id', data) @patch('requests.post') def test_create_order_invalid_payload(self, mock_post): """测试无效订单""" mock_response = Mock() mock_response.status_code = 400 mock_response.json.return_value = { 'error': 'validation_error', 'message': '缺少必填字段: items' } mock_post.return_value = mock_response response = mock_post( 'https://api.example.com/v1/orders', json={'user_id': 1001} ) self.assertEqual(response.status_code, 400) # 构建分层API测试套件 def api_test_suite(): """构建完整的API测试套件""" loader = unittest.TestLoader() # 用户API套件 user_suite = loader.loadTestsFromTestCase(TestUserAPI) # 订单API套件 order_suite = loader.loadTestsFromTestCase(TestOrderAPI) # 完整的API测试套件 api_suite = unittest.TestSuite([user_suite, order_suite]) return api_suite if __name__ == '__main__': # 使用自定义Runner执行 suite = api_test_suite() runner = TimedTestRunner(verbosity=2) result = runner.run(suite) print(f'API测试完成: 通过={result.testsRun - len(result.failures) - len(result.errors)}, ' f'失败={len(result.failures)}, 错误={len(result.errors)}')

3. 综合测试调度器

在实际项目中,通常需要同时执行多种类型的测试并按需生成报告。以下示例展示如何创建一个综合调度器,将不同测试套件组装在一起统一执行。

import unittest import sys class TestScheduler: """综合测试调度器""" def __init__(self): self.suites = {} def register_suite(self, name, suite_fn, tags=None): """注册测试套件""" self.suites[name] = { 'builder': suite_fn, 'tags': tags or [], 'suite': None } def build_all(self): """构建所有注册的套件""" for name, info in self.suites.items(): info['suite'] = info['builder']() print(f'已构建套件: {name} ({info["suite"].countTestCases()} 条测试)') def run_by_tags(self, tags): """按标签运行测试""" master = unittest.TestSuite() for name, info in self.suites.items(): if any(tag in info['tags'] for tag in tags): if info['suite']: master.addTests(info['suite']) runner = unittest.TextTestRunner(verbosity=2) return runner.run(master) def run_all(self, runner_cls=None): """运行所有测试""" master = unittest.TestSuite() for info in self.suites.values(): if info['suite']: master.addTests(info['suite']) runner = (runner_cls or unittest.TextTestRunner)(verbosity=2) return runner.run(master) # 使用示例 if __name__ == '__main__': scheduler = TestScheduler() scheduler.register_suite('db_crud', db_test_suite, tags=['database', 'core']) scheduler.register_suite('api_tests', api_test_suite, tags=['api', 'integration']) scheduler.build_all() # 运行所有测试 result = scheduler.run_all() sys.exit(0 if result.wasSuccessful() else 1)