专题:Python 测试与调试系统学习
关键词:Python, 测试, 调试, unittest, TestSuite, TestRunner, Fixture, 测试框架, Python测试
一、TestSuite高级用法
TestSuite是unittest框架中用于组织和聚合测试用例的核心容器。基础的TestSuite用法是简单地将TestCase子类组合在一起执行,但在实际项目中我们需要更精细的控制能力。高级TestSuite用法包括按标签组合测试、按模块分层组合、创建嵌套的TestSuite结构、根据运行条件动态加载测试、以及精确控制测试的执行顺序等。掌握这些高级技巧可以让你在组织大型测试项目时事半功倍。
1. 按标签组合测试
在大型项目中,测试可能被标记为"smoke"、"regression"、"slow"、"fast"等不同类别。通过为测试方法添加标签属性,可以有选择性地组合执行特定类别的测试,这在CI/CD流水线中尤为重要。
import unittest
class UserTests(unittest.TestCase):
def test_login_success(self):
self.tag = "smoke"
# ... 测试逻辑
def test_login_failure(self):
self.tag = "regression"
# ... 测试逻辑
def test_password_reset(self):
self.tag = "slow"
# ... 测试逻辑
def suite_by_tag(tag_name):
"""根据标签动态构建TestSuite"""
suite = unittest.TestSuite()
loader = unittest.TestLoader()
# 加载模块中所有TestCase
all_tests = loader.loadTestsFromTestCase(UserTests)
for test in all_tests:
# 获取测试方法并检查标签
test_method = test._testMethodName
method = getattr(UserTests, test_method)
if hasattr(method, 'tag') and method.tag == tag_name:
suite.addTest(test)
return suite
2. 嵌套TestSuite与模块组合
实际项目中的测试结构往往是多层次的。unittest的TestSuite支持嵌套组合,允许你按模块、按功能域或按子系统层级组织测试。这种方式使得你可以灵活地运行不同粒度的测试集合。
import unittest
from tests import test_user, test_order, test_payment
def build_master_suite():
"""构建分层嵌套的TestSuite"""
loader = unittest.TestLoader()
# 用户模块测试套件
user_suite = unittest.TestSuite()
user_suite.addTests(loader.loadTestsFromModule(test_user))
# 订单模块测试套件
order_suite = unittest.TestSuite()
order_suite.addTests(loader.loadTestsFromModule(test_order))
# 支付模块测试套件
payment_suite = unittest.TestSuite()
payment_suite.addTests(loader.loadTestsFromModule(test_payment))
# 业务模块主套件
biz_suite = unittest.TestSuite([user_suite, order_suite])
# 顶层主套件(合并所有)
master_suite = unittest.TestSuite([biz_suite, payment_suite])
return master_suite
if __name__ == '__main__':
runner = unittest.TextTestRunner(verbosity=2)
runner.run(build_master_suite())
3. 执行顺序控制
unittest默认按方法名的字典序执行测试。当测试之间存在隐式依赖时(如需要先注册再登录),可以通过TestSuite的addTest顺序精确控制执行流。
import unittest
class TestUserFlow(unittest.TestCase):
def test_01_register(self):
print("1. 用户注册")
self.assertTrue(True)
def test_02_login(self):
print("2. 用户登录")
self.assertTrue(True)
def test_03_update_profile(self):
print("3. 更新资料")
self.assertTrue(True)
def test_04_delete_account(self):
print("4. 删除账号")
self.assertTrue(True)
def ordered_suite():
"""按自定义顺序执行测试"""
suite = unittest.TestSuite()
# 按业务流程顺序添加,不受方法名限制
suite.addTest(TestUserFlow('test_01_register'))
suite.addTest(TestUserFlow('test_02_login'))
suite.addTest(TestUserFlow('test_04_delete_account')) # 跳过中间步骤
suite.addTest(TestUserFlow('test_03_update_profile'))
return suite
二、自定义TestRunner
unittest内置的TextTestRunner会在控制台输出测试结果,但在自动化测试平台、持续集成系统或需要定制化报告的场景中,默认的Runner无法满足需求。通过继承TextTestRunner或实现TestRunner接口,我们可以完全控制测试的执行过程、输出格式、进度展示和结果收集方式。自定义TestRunner是现代测试基础设施建设的核心环节。
1. 自定义输出格式与进度显示
以下示例展示如何创建一个带彩色输出和实时进度条的自定义Runner,它比默认的TextTestRunner提供更直观的反馈。
import unittest
import time
import sys
class ProgressTestRunner(unittest.TextTestRunner):
"""带进度显示的自定义TestRunner"""
def run(self, test):
# 预先统计测试总数
result = self._makeResult()
total = test.countTestCases()
executed = [0]
# 包装result以拦截每个测试执行
original_startTest = result.startTest
original_addSuccess = result.addSuccess
original_addFailure = result.addFailure
def startTest_wrapper(test):
executed[0] += 1
percent = executed[0] / total * 100
bar = '#' * int(percent / 5) + '-' * (20 - int(percent / 5))
sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({executed[0]}/{total})')
sys.stdout.flush()
original_startTest(test)
result.startTest = startTest_wrapper
sys.stdout.write('\n开始执行测试...\n')
start = time.time()
test(result)
elapsed = time.time() - start
sys.stdout.write(f'\n\n总耗时: {elapsed:.2f}s\n')
return result
2. HTML报告生成器
在企业级测试框架中,生成HTML格式的测试报告是常见需求。下面展示如何通过自定义TestRunner将测试结果输出为结构化的HTML报告。
import unittest
from datetime import datetime
class HtmlReportRunner(unittest.TextTestRunner):
"""生成HTML测试报告的自定义Runner"""
def __init__(self, report_path='test_report.html', *args, **kwargs):
super().__init__(*args, **kwargs)
self.report_path = report_path
def run(self, test):
result = super().run(test)
# 收集统计数据
total = result.testsRun
successes = total - len(result.failures) - len(result.errors)
failures = len(result.failures)
errors = len(result.errors)
# 生成HTML报告
html = f'''<!DOCTYPE html>
<html><head><title>测试报告</title></head><body>
<h1>测试执行报告</h1>
<p>执行时间: {datetime.now()}</p>
<table border="1">
<tr><th>总数</th><th>通过</th><th>失败</th><th>错误</th></tr>
<tr>
<td>{total}</td><td>{successes}</td>
<td>{failures}</td><td>{errors}</td>
</tr></table></body></html>'''
with open(self.report_path, 'w', encoding='utf-8') as f:
f.write(html)
print(f'\n报告已生成: {self.report_path}')
return result
3. 结果收集与汇总
在分布式测试或批量测试场景中,需要将多个Runner的测试结果汇总分析。以下展示如何创建一个结果收集器Runner。
import unittest
class CollectingTestRunner:
"""收集测试结果的Runner(不输出到控制台)"""
def __init__(self):
self.results = []
def run(self, test_or_suite):
suite = unittest.TestSuite()
suite.addTest(test_or_suite)
# 使用StringIO捕获输出
import io
stream = io.StringIO()
runner = unittest.TextTestRunner(stream=stream, verbosity=0)
result = runner.run(suite)
self.results.append({
'tests_run': result.testsRun,
'failures': len(result.failures),
'errors': len(result.errors),
'was_successful': result.wasSuccessful(),
'output': stream.getvalue(),
})
return result
def summary(self):
total = sum(r['tests_run'] for r in self.results)
failed = sum(r['failures'] for r in self.results)
errors = sum(r['errors'] for r in self.results)
return {
'total': total,
'failed': failed,
'errors': errors,
'success_rate': (total - failed - errors) / total * 100 if total else 0
}
# 使用示例
collector = CollectingTestRunner()
collector.run(unittest.TestLoader().loadTestsFromTestCase(UserTests))
print(collector.summary())
三、Fixture深度管理
Fixture是unittest框架中用于准备测试环境和清理测试资源的机制。理解Fixture的不同作用域以及如何正确处理Fixture中的异常和依赖关系,是编写健壮测试的关键。unittest提供了三个级别的Fixture:模块级(setUpModule/tearDownModule)、类级(setUpClass/tearDownClass)和方法级(setUp/tearDown)。在实际项目中还需要管理数据库连接、文件句柄、网络服务等外部资源的生命周期。
1. Fixture作用域详解
不同作用域的Fixture决定了资源何时初始化、何时清理。正确选择Fixture作用域可以大幅提升测试效率。模块级Fixture在整个模块执行前后运行一次,适合初始化数据库连接池、加载配置文件等重量级操作。类级Fixture在类的所有测试前后运行,适合创建共享对象。方法级Fixture在每条测试前后运行,保证测试隔离性。
import unittest
import os
def setUpModule():
"""模块级Fixture:整个模块执行前运行一次"""
print('\n[setUpModule] 初始化测试数据库连接')
os.environ['TEST_MODE'] = 'true'
def tearDownModule():
"""模块级Fixture:整个模块执行后运行一次"""
print('\n[tearDownModule] 关闭测试数据库连接')
os.environ.pop('TEST_MODE', None)
class TestDatabaseFixture(unittest.TestCase):
db_connection = None
@classmethod
def setUpClass(cls):
"""类级Fixture:类所有测试前运行一次"""
print('\n[setUpClass] 创建数据库表结构')
cls.db_connection = {'host': 'localhost', 'db': 'test', 'connected': True}
@classmethod
def tearDownClass(cls):
"""类级Fixture:类所有测试后运行一次"""
print('\n[tearDownClass] 删除数据库表结构')
cls.db_connection = None
def setUp(self):
"""方法级Fixture:每条测试前运行"""
print(f'\n[setUp] 准备测试数据 - {self._testMethodName}')
self.test_data = {'id': 1, 'name': 'test_user'}
def tearDown(self):
"""方法级Fixture:每条测试后运行"""
print(f'\n[tearDown] 清理测试数据 - {self._testMethodName}')
self.test_data = None
def test_insert(self):
self.assertIsNotNone(self.db_connection)
self.assertEqual(self.test_data['name'], 'test_user')
def test_query(self):
self.assertTrue(self.db_connection['connected'])
2. Fixture异常处理
Fixture中的异常处理容易被忽视。如果setUp方法抛出异常,对应的tearDown方法将不会被执行,这可能导致资源泄露。因此,在Fixture中必须妥善处理可能出现的异常,确保资源在任何情况下都能被正确释放。
import unittest
class SafeFixtureTest(unittest.TestCase):
"""安全的Fixture异常处理示例"""
def setUp(self):
self.resources = []
try:
# 可能失败的外部资源
self.db = self._connect_db()
self.resources.append('db')
self.file = open('/tmp/test_data.txt', 'w')
self.resources.append('file')
except Exception as e:
# 清理已成功分配的资源
self._cleanup_resources()
raise RuntimeError(f'Fixture初始化失败: {e}')
def tearDown(self):
"""保证无论setUp是否成功,资源都被清理"""
self._cleanup_resources()
def _cleanup_resources(self):
for resource in self.resources[:]:
try:
if resource == 'db' and hasattr(self, 'db'):
self.db.close()
elif resource == 'file' and hasattr(self, 'file'):
self.file.close()
self.resources.remove(resource)
except Exception as e:
print(f'清理资源 {resource} 时出错: {e}')
def _connect_db(self):
# 模拟数据库连接
import socket
s = socket.socket()
s.connect(('localhost', 3306))
return s
def test_example(self):
self.assertEqual(len(self.resources), 2)
3. 外部资源生命周期管理
管理数据库、文件、网络连接等外部资源时,Fixtures需要处理创建、验证、清理全生命周期。推荐使用上下文管理器风格的资源管理方式,确保资源在测试结束后立即释放。
import unittest
import tempfile
import os
import json
class ResourceManagerTest(unittest.TestCase):
"""外部资源管理最佳实践"""
temp_dir = None
@classmethod
def setUpClass(cls):
"""创建临时目录"""
cls.temp_dir = tempfile.mkdtemp()
@classmethod
def tearDownClass(cls):
"""清理临时目录及其所有文件"""
if cls.temp_dir and os.path.exists(cls.temp_dir):
import shutil
shutil.rmtree(cls.temp_dir)
def setUp(self):
"""创建测试配置文件"""
self.config = {
'debug': True,
'max_retries': 3,
'timeout': 30
}
self.config_file = os.path.join(self.temp_dir, 'config.json')
with open(self.config_file, 'w') as f:
json.dump(self.config, f)
def tearDown(self):
"""删除测试配置文件"""
if os.path.exists(self.config_file):
os.remove(self.config_file)
def test_config_loading(self):
"""测试配置文件加载"""
with open(self.config_file, 'r') as f:
loaded = json.load(f)
self.assertEqual(loaded['debug'], True)
self.assertEqual(loaded['timeout'], 30)
def test_config_update(self):
"""测试配置更新"""
self.config['debug'] = False
with open(self.config_file, 'w') as f:
json.dump(self.config, f)
with open(self.config_file, 'r') as f:
loaded = json.load(f)
self.assertFalse(loaded['debug'])
四、测试监听器与自定义TestResult
unittest框架提供了TestResult类来收集和记录测试执行过程中的各种事件。通过继承TestResult并重写其回调方法(addSuccess、addFailure、addError、addSkip等),我们可以创建自定义的测试监听器,实现对测试执行全过程的监控。这在测试报告定制、失败截图、日志采集和实时通知等场景中非常有用。
1. 自定义TestResult监听器
以下示例展示如何创建一个自定义的TestResult,它会在每次测试事件发生时记录详细信息,包括执行时间、失败截图和日志内容。
import unittest
import time
import sys
class DetailedTestResult(unittest.TestResult):
"""详细的测试结果收集器"""
def __init__(self, stream=None, descriptions=None, verbosity=None):
super().__init__(stream, descriptions, verbosity)
self.test_details = []
self.start_times = {}
def startTest(self, test):
"""测试开始时记录时间"""
self.start_times[test.id()] = time.time()
super().startTest(test)
def addSuccess(self, test):
"""测试通过时记录"""
elapsed = time.time() - self.start_times.pop(test.id(), time.time())
self.test_details.append({
'test': test.id(),
'status': 'PASS',
'elapsed': f'{elapsed:.3f}s',
'timestamp': time.strftime('%H:%M:%S')
})
super().addSuccess(test)
def addFailure(self, test, err):
"""测试失败时记录详细信息"""
elapsed = time.time() - self.start_times.pop(test.id(), time.time())
self.test_details.append({
'test': test.id(),
'status': 'FAIL',
'elapsed': f'{elapsed:.3f}s',
'error_info': self._exc_info_to_string(err, test),
'timestamp': time.strftime('%H:%M:%S')
})
# 可在此处触发截图
self._capture_screenshot(test)
super().addFailure(test, err)
def addError(self, test, err):
"""测试错误时记录"""
elapsed = time.time() - self.start_times.pop(test.id(), time.time())
self.test_details.append({
'test': test.id(),
'status': 'ERROR',
'elapsed': f'{elapsed:.3f}s',
'error_info': self._exc_info_to_string(err, test),
'timestamp': time.strftime('%H:%M:%S')
})
super().addError(test, err)
def _capture_screenshot(self, test):
"""模拟截图保存"""
print(f'\n[截图保存] {test.id()}_failure.png')
# 实际项目中可调用 Selenium 等工具截图
def print_summary(self):
"""打印汇总报告"""
print('\n===== 测试执行报告 =====')
for detail in self.test_details:
status_color = '✓' if detail['status'] == 'PASS' else '✗'
print(f'{status_color} {detail["test"]} - {detail["status"]} ({detail["elapsed"]})')
print(f'\n总计: {self.testsRun} | '
f'通过: {self.testsRun - len(self.failures) - len(self.errors)} | '
f'失败: {len(self.failures)} | 错误: {len(self.errors)}')
2. 基于TestResult的计时器Runner
将自定义TestResult与TestRunner集成,可以创建功能完整的计时测试运行器,清晰展示每条测试的耗时,帮助快速定位性能瓶颈。
import unittest
class TimedTestRunner(unittest.TextTestRunner):
"""带执行时间统计的TestRunner"""
def _makeResult(self):
return DetailedTestResult(
self.stream, self.descriptions, self.verbosity
)
def run(self, test):
result = super().run(test)
result.print_summary()
return result
# 使用示例
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(MyTestCase)
runner = TimedTestRunner(verbosity=0)
runner.run(suite)
3. 测试事件钩子系统
在企业级测试框架中,事件钩子机制允许在不侵入测试代码的前提下插入横切关注点,比如测试执行前后的日志记录、性能监控和外部系统通知。
import unittest
from datetime import datetime
class TestHookMixin:
"""测试钩子混入类"""
@classmethod
def setUpClass(cls):
"""钩子:测试类执行前"""
print(f'\n[{datetime.now()}] 开始测试类: {cls.__name__}')
cls._test_count = 0
cls._failure_count = 0
super().setUpClass()
def setUp(self):
"""钩子:每条测试前"""
self._test_count += 1
self._test_start = datetime.now()
print(f'\n[{self._test_start}] 执行: {self._testMethodName}')
super().setUp()
def tearDown(self):
"""钩子:每条测试后"""
elapsed = (datetime.now() - self._test_start).total_seconds()
has_failed = any(
test == self for test, _ in self._outcome.errors
if hasattr(self, '_outcome')
)
status = 'FAIL' if has_failed else 'PASS'
print(f'[{datetime.now()}] 完成: {self._testMethodName} [{status}] ({elapsed:.2f}s)')
super().tearDown()
class MyHookedTest(TestHookMixin, unittest.TestCase):
"""使用钩子的测试类"""
def test_a(self):
self.assertEqual(1, 1)
def test_b(self):
self.assertEqual(2, 2)
五、测试超时控制
在测试中,某些操作可能因为网络延迟、资源争用或死循环而无限期挂起。为测试添加超时控制可以防止测试进程被卡死,确保整个测试套件能在合理时间内完成。Python中实现测试超时主要有信号机制和线程机制两种方式。信号方式效率高但只能在主线程中使用,线程方式通用性强但有一些额外开销。理解两种方式的优劣,能够根据场景选择合适的超时策略。
1. 基于信号的超时控制
signal模块提供了一种高效的超时实现方式,它利用操作系统信号在指定时间后中断当前执行。这种方法不会创建额外线程,性能开销极小。但需要注意的是,signal模块在Windows平台支持有限,且不能在非主线程中使用。
import unittest
import signal
class TimeoutError(AssertionError):
"""自定义超时异常"""
pass
def timeout(seconds):
"""基于信号量的超时装饰器"""
def decorator(func):
def handler(signum, frame):
raise TimeoutError(f'测试执行超时 ({seconds}s)')
def wrapper(*args, **kwargs):
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
return result
finally:
signal.alarm(0) # 取消定时器
signal.signal(signal.SIGALRM, old_handler)
return wrapper
return decorator
class TestWithSignalTimeout(unittest.TestCase):
@timeout(2)
def test_fast_operation(self):
"""2秒内应完成的快速操作"""
import time
time.sleep(1)
self.assertTrue(True)
@timeout(1)
def test_slow_operation(self):
"""超过1秒会触发的超时测试"""
import time
time.sleep(3) # 将会超时
self.assertTrue(True)
2. 基于线程的超时控制
线程方式的超时控制通过在新线程中执行测试函数,主线程等待指定时间后强制终止。这种方式不依赖操作系统信号,在所有平台上都能工作,非常适合测试网络请求等阻塞操作。
import unittest
import threading
import functools
class ThreadTimeoutError(Exception):
pass
def thread_timeout(seconds):
"""基于线程的超时装饰器(跨平台)"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = [None]
exception = [None]
finished = threading.Event()
def target():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
finally:
finished.set()
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
if not finished.wait(timeout=seconds):
raise ThreadTimeoutError(
f'测试在 {seconds}s 内未完成'
)
if exception[0]:
raise exception[0]
return result[0]
return wrapper
return decorator
class TestWithThreadTimeout(unittest.TestCase):
@thread_timeout(3)
def test_api_call(self):
"""模拟带有超时的API调用"""
import time
time.sleep(2)
self.assertEqual(200, 200)
@thread_timeout(2)
def test_hanging_call(self):
"""模拟挂起的调用(将会超时)"""
import time
while True:
time.sleep(1)
3. 超时后清理机制
超时发生时,测试资源可能处于不一致状态。必须建立超时后的清理机制,确保已创建的资源被正确释放,避免影响后续测试的执行。
import unittest
import threading
import time
class SafeTimeoutRunner:
"""带超时清理的测试执行器"""
def run_with_cleanup(self, test_case, timeout_seconds=5):
test_thread = threading.Thread(target=test_case, daemon=True)
test_thread.start()
test_thread.join(timeout_seconds)
if test_thread.is_alive():
print(f'[清理] 测试 {test_case._testMethodName} 超时,执行清理')
# 执行紧急清理操作
self._emergency_cleanup(test_case)
raise TimeoutError(f'测试超时 ({timeout_seconds}s)')
def _emergency_cleanup(self, test_case):
"""紧急清理:关闭可能打开的资源"""
if hasattr(test_case, 'db') and test_case.db:
try:
test_case.db.close()
print('[清理] 数据库连接已关闭')
except Exception:
pass
if hasattr(test_case, 'file_obj') and test_case.file_obj:
try:
test_case.file_obj.close()
print('[清理] 文件句柄已关闭')
except Exception:
pass
六、测试依赖与执行顺序
在理想情况下,每条测试应该是独立的,可以以任意顺序执行。但在实际项目中,某些测试天然存在依赖关系,例如需要先创建用户才能测试登录,需要先有订单才能测试退款。unittest框架本身不提供原生的测试依赖支持,但通过TestSuite的精确控制以及第三方库的帮助,我们可以实现有序测试、依赖测试以及失败后自动跳过后续测试的能力。
1. 有序测试实现
通过自定义TestSuite并按特定顺序添加测试方法,可以精确控制测试的执行流程。配合类属性记录状态,可以在不同测试方法间传递数据。
import unittest
class TestWorkflow(unittest.TestCase):
"""业务流程测试:步骤间传递状态"""
shared_state = {}
def test_01_create_user(self):
"""步骤1:创建用户"""
user_id = 1001
self.shared_state['user_id'] = user_id
self.shared_state['user_created'] = True
print(f'用户创建成功, ID: {user_id}')
self.assertTrue(self.shared_state['user_created'])
def test_02_create_order(self):
"""步骤2:创建订单(依赖用户)"""
self.assertTrue(
self.shared_state.get('user_created', False),
'必须先创建用户'
)
order_id = 50001
self.shared_state['order_id'] = order_id
self.shared_state['order_created'] = True
print(f'订单创建成功, ID: {order_id}')
self.assertTrue(self.shared_state['order_created'])
def test_03_process_payment(self):
"""步骤3:处理支付(依赖用户和订单)"""
self.assertTrue(self.shared_state.get('user_created'))
self.assertTrue(self.shared_state.get('order_created'))
payment_id = 'PAY_20260301_001'
self.shared_state['payment_id'] = payment_id
print(f'支付成功, 支付ID: {payment_id}')
self.assertEqual(len(payment_id), 19)
def test_04_refund(self):
"""步骤4:退款(依赖支付)"""
self.assertIn('payment_id', self.shared_state)
refund_amount = 99.99
print(f'退款成功, 金额: {refund_amount}')
self.assertGreater(refund_amount, 0)
# 按流程顺序执行
def workflow_suite():
suite = unittest.TestSuite()
suite.addTest(TestWorkflow('test_01_create_user'))
suite.addTest(TestWorkflow('test_02_create_order'))
suite.addTest(TestWorkflow('test_03_process_payment'))
suite.addTest(TestWorkflow('test_04_refund'))
return suite
2. 依赖装饰器实现
使用装饰器可以更优雅地表达测试之间的依赖关系。当被依赖的测试失败时,依赖它的测试自动跳过并标注原因,而不是以错误或失败的状态呈现。
import unittest
def depends_on(dep_test_name):
"""依赖装饰器:指定依赖的测试方法名"""
def decorator(test_method):
def wrapper(self, *args, **kwargs):
outcome = self._outcome
if hasattr(outcome, 'errors'):
# 检查依赖测试是否失败
for test, error in outcome.errors + outcome.failures:
if test._testMethodName == dep_test_name:
self.skipTest(
f'依赖测试 {dep_test_name} 未通过'
)
return test_method(self, *args, **kwargs)
return wrapper
return decorator
class TestWithDependency(unittest.TestCase):
def test_login(self):
"""登录测试"""
self.assertEqual(200, 200) # 模拟登录成功
@depends_on('test_login')
def test_get_profile(self):
"""获取用户资料(依赖登录)"""
# 只有在登录成功后才会执行
profile = {'name': 'Alice', 'role': 'admin'}
self.assertEqual(profile['role'], 'admin')
@depends_on('test_get_profile')
def test_update_profile(self):
"""更新资料(依赖获取资料)"""
# 如果前序失败会自动跳过
self.assertTrue(True)
3. 测试排序策略
在使用测试发现(test discovery)时,可以自定义加载器的排序策略来控制测试执行顺序。以下展示如何根据方法名中的数字前缀或者自定义属性来排序。
import unittest
class OrderedTestLoader(unittest.TestLoader):
"""按方法名前缀数字排序的加载器"""
def getTestCaseNames(self, testCaseClass):
names = super().getTestCaseNames(testCaseClass)
# 提取数字前缀并排序
def sort_key(name):
import re
match = re.match(r'test_(\d+)', name)
return int(match.group(1)) if match else 999
return sorted(names, key=sort_key)
class TestPriorityOrder(unittest.TestCase):
"""优先级顺序测试"""
def test_03_checkout(self):
self.assertTrue(True)
def test_01_add_to_cart(self):
self.assertTrue(True)
def test_02_apply_coupon(self):
self.assertTrue(True)
# 使用自定义加载器
loader = OrderedTestLoader()
suite = loader.loadTestsFromTestCase(TestPriorityOrder)
if __name__ == '__main__':
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
七、失败与错误处理
在unittest框架中,测试未通过有两种表现形式:断言失败(Failure)和异常错误(Error)。断言失败是指测试中的assert语句条件不满足,表示预期结果与实际结果不一致;异常错误是指测试执行过程中抛出了未捕获的异常,可能是代码bug或环境问题导致的崩溃。正确区分和处理这两种情况,对于测试结果分析和问题定位至关重要。
1. 断言失败与异常错误的区别
断言失败使用AssertionError,通常表示业务逻辑不符合预期;异常错误使用各种其他异常类型(TypeError、ValueError、ConnectionError等),通常表示代码本身存在缺陷或测试环境有问题。下面展示如何分别收集和展示两类问题。
import unittest
class TestFailureVsError(unittest.TestCase):
"""展示断言失败与异常错误的区别"""
def test_assertion_failure(self):
"""断言失败:预期值不匹配"""
actual = add(2, 2) # 假设add函数返回了错误结果
self.assertEqual(actual, 5) # 触发断言失败 (Failure)
def test_exception_error(self):
"""异常错误:代码执行异常"""
data = None
data['key'] = 'value' # 触发TypeError (Error)
def test_mixed_scenario(self):
"""混合场景:同时检查多个条件"""
result = process_data(15)
# 第一个断言失败时,后续代码不执行
self.assertIsNotNone(result)
self.assertEqual(result['status'], 'ok') # 若前一行失败,本行不执行
# 自定义结果处理
def analyze_test_result(result):
print(f'\n测试总数: {result.testsRun}')
print(f'断言失败: {len(result.failures)}')
for test, traceback in result.failures:
print(f' [FAIL] {test.id()}')
print(f' 原因: 断言条件不满足')
print(f'异常错误: {len(result.errors)}')
for test, traceback in result.errors:
print(f' [ERROR] {test.id()}')
print(f' 原因: 代码执行异常')
if result.wasSuccessful():
print('结果: 全部通过!')
elif result.failures and not result.errors:
print('结果: 存在断言失败,请检查预期值')
elif result.errors and not result.failures:
print('结果: 存在异常错误,请检查代码健壮性')
else:
print('结果: 同时存在断言失败和异常错误')
2. 错误详情收集与保存
在自动化测试执行中,自动收集和保存详细的错误信息对于问题复现和分析至关重要。以下示例展示如何将失败的测试详细信息(包括堆栈跟踪、测试数据和运行环境)保存到日志文件中。
import unittest
import datetime
import traceback
import json
class ErrorDetailCollector(unittest.TestResult):
"""错误详情收集器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.failure_details = []
self.error_details = []
def addFailure(self, test, err):
super().addFailure(test, err)
self.failure_details.append({
'test_id': test.id(),
'method': test._testMethodName,
'timestamp': datetime.datetime.now().isoformat(),
'traceback': ''.join(traceback.format_exception(*err)),
'type': 'AssertionError',
})
def addError(self, test, err):
super().addError(test, err)
self.error_details.append({
'test_id': test.id(),
'method': test._testMethodName,
'timestamp': datetime.datetime.now().isoformat(),
'traceback': ''.join(traceback.format_exception(*err)),
'type': type(err[1]).__name__ if err[1] else 'Unknown',
})
def save_to_file(self, filepath='test_errors.json'):
"""保存错误详情到JSON文件"""
all_issues = {
'failures': self.failure_details,
'errors': self.error_details,
'summary': {
'total': self.testsRun,
'failures': len(self.failure_details),
'errors': len(self.error_details),
}
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(all_issues, f, ensure_ascii=False, indent=2)
print(f'错误详情已保存至: {filepath}')
3. 测试失败自动重试
对于某些不稳定的测试(如网络相关的测试),首次失败可能是临时性问题。实现自动重试机制可以提高测试的可靠性,同时记录重试历史用于后续分析。
import unittest
import functools
def retry_on_failure(max_retries=3):
"""测试失败自动重试装饰器"""
def decorator(test_method):
@functools.wraps(test_method)
def wrapper(self, *args, **kwargs):
last_exception = None
for attempt in range(1, max_retries + 1):
try:
return test_method(self, *args, **kwargs)
except AssertionError as e:
last_exception = e
print(f'[重试] 第 {attempt} 次失败: {test_method.__name__}')
if attempt < max_retries:
self.setUp() # 重新初始化fixture
raise last_exception
return wrapper
return decorator
class TestWithRetry(unittest.TestCase):
"""带自动重试的测试类"""
@retry_on_failure(max_retries=3)
def test_flaky_network_call(self):
"""模拟不稳定的网络调用"""
import random
result = random.choice([True, False, False])
self.assertTrue(result, '网络请求临时失败')
八、TestLoader定制
TestLoader是unittest框架中负责发现和加载测试用例的核心组件。它提供了从模块、类或目录自动发现测试的能力。通过继承TestLoader并重写其方法,我们可以实现自定义的测试发现策略,如按文件名模式过滤、按测试类属性筛选、从特定目录结构加载等。定制TestLoader是构建大型测试框架的基础工作之一。
1. 自定义测试发现过滤
默认的test discovery会加载所有匹配`test*.py`的文件中的TestCase子类。通过自定义Loader,可以增加更精细的过滤条件,比如跳过某些标记为慢速的测试、只加载特定命名空间的测试等。
import unittest
class FilteredTestLoader(unittest.TestLoader):
"""支持过滤的自定义TestLoader"""
def __init__(self, include_tags=None, exclude_tags=None):
super().__init__()
self.include_tags = include_tags or set()
self.exclude_tags = exclude_tags or set()
def loadTestsFromTestCase(self, testCaseClass):
if hasattr(testCaseClass, 'tags'):
class_tags = set(testCaseClass.tags)
# 排除包含排除标签的测试类
if self.exclude_tags and class_tags & self.exclude_tags:
return unittest.TestSuite()
# 包含指定标签的才加载
if self.include_tags and not (class_tags & self.include_tags):
return unittest.TestSuite()
return super().loadTestsFromTestCase(testCaseClass)
def getTestCaseNames(self, testCaseClass):
names = super().getTestCaseNames(testCaseClass)
filtered = []
for name in names:
method = getattr(testCaseClass, name)
method_tags = getattr(method, 'tags', set())
if self.exclude_tags and method_tags & self.exclude_tags:
continue
if self.include_tags and not (method_tags & self.include_tags):
continue
filtered.append(name)
return filtered
# 用法示例:只加载冒烟测试
loader = FilteredTestLoader(include_tags={'smoke'})
smoke_suite = loader.discover('tests')
2. 模块级测试加载控制
当项目结构比较复杂时,需要精确控制从哪些模块加载测试。自定义Loader可以按模块的路径模式、模块内定义的配置变量等条件来决定是否加载该模块中的测试。
import unittest
import importlib
import pkgutil
def load_tests_from_package(package_name, version_filter=None):
"""从包中动态加载测试,支持版本过滤"""
loader = unittest.TestLoader()
master_suite = unittest.TestSuite()
package = importlib.import_module(package_name)
path = getattr(package, '__path__', [])
for importer, modname, ispkg in pkgutil.iter_modules(path):
if not modname.startswith('test_'):
continue
module = importlib.import_module(f'{package_name}.{modname}')
# 检查模块是否定义了版本要求
module_version = getattr(module, 'TEST_VERSION', None)
if version_filter and module_version:
if module_version != version_filter:
print(f'跳过模块 {modname} (版本: {module_version})')
continue
suite = loader.loadTestsFromModule(module)
master_suite.addTests(suite)
print(f'已加载模块: {modname}, 测试数: {suite.countTestCases()}')
return master_suite
class SelectiveTestLoader(unittest.TestLoader):
"""选择性加载器:只加载指定命名空间的测试"""
def loadTestsFromModule(self, module, *args, **kwargs):
suite = super().loadTestsFromModule(module, *args, **kwargs)
# 排除集成测试(除非显式要求)
if getattr(module, 'TEST_TYPE', None) == 'integration':
return unittest.TestSuite()
return suite
# 使用示例
if __name__ == '__main__':
suite = SelectiveTestLoader().discover(
'tests', pattern='test_*.py', top_level_dir='.'
)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
3. 自定义类加载器
除了过滤方法,还可以控制如何从TestCase类中提取测试方法。以下示例展示如何使用加载器只加载标记了特定装饰器的方法。
import unittest
def smoke_test(func):
"""标记为冒烟测试的装饰器"""
func._smoke = True
return func
class SmokeTestLoader(unittest.TestLoader):
"""只加载冒烟测试的Loader"""
def getTestCaseNames(self, testCaseClass):
all_names = super().getTestCaseNames(testCaseClass)
smoke_names = []
for name in all_names:
method = getattr(testCaseClass, name)
if getattr(method, '_smoke', False):
smoke_names.append(name)
return smoke_names
class MyAppTest(unittest.TestCase):
@smoke_test
def test_critical_login(self):
self.assertTrue(True)
def test_detail_page(self):
self.assertTrue(True)
@smoke_test
def test_db_connection(self):
self.assertTrue(True)
# 使用SmokeTestLoader
loader = SmokeTestLoader()
suite = loader.loadTestsFromTestCase(MyAppTest)
print(f'冒烟测试数量: {suite.countTestCases()}') # 输出: 2
九、实战案例
理论知识需要通过实践来巩固。本实战章节将综合运用前述的TestSuite、TestRunner和Fixture等高级特性,构建两个典型的真实测试场景:数据库CRUD操作测试和Web API接口测试。通过这些案例,你将学会如何在实际项目中组织和编写高质量的测试代码。
1. 数据库CRUD测试Fixtures
数据库测试是后端开发中最常见的测试场景之一。一个好的数据库测试方案需要管理数据库连接的生命周期、事务的隔离性以及测试数据的自动清理。以下示例展示如何使用模块级和类级Fixture搭建完整的数据库测试框架。
import unittest
import sqlite3
import os
# ---- 数据库CRUD测试 ----
TEST_DB_PATH = '/tmp/test_app.db'
def setUpModule():
"""模块级:创建测试数据库"""
global _connection
if os.path.exists(TEST_DB_PATH):
os.remove(TEST_DB_PATH)
_connection = sqlite3.connect(TEST_DB_PATH)
_connection.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER
)
''')
_connection.commit()
def tearDownModule():
"""模块级:清理测试数据库"""
global _connection
if _connection:
_connection.close()
if os.path.exists(TEST_DB_PATH):
os.remove(TEST_DB_PATH)
class TestUserCRUD(unittest.TestCase):
"""用户CRUD操作测试"""
@classmethod
def setUpClass(cls):
cls.conn = _connection
# 每条测试前开启新事务
cls.conn.execute('BEGIN')
@classmethod
def tearDownClass(cls):
# 每条测试后回滚事务,保证隔离性
cls.conn.execute('ROLLBACK')
def setUp(self):
self.cursor = self.conn.cursor()
def test_create_user(self):
"""测试创建用户"""
self.cursor.execute(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
('Alice', 'alice@example.com', 28)
)
self.assertEqual(self.cursor.lastrowid, 1)
def test_query_user(self):
"""测试查询用户"""
# 插入测试数据
self.cursor.execute(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
('Bob', 'bob@example.com', 35)
)
# 查询
self.cursor.execute(
'SELECT * FROM users WHERE email = ?',
('bob@example.com',)
)
user = self.cursor.fetchone()
self.assertIsNotNone(user)
self.assertEqual(user[1], 'Bob')
self.assertEqual(user[3], 35)
def test_update_user(self):
"""测试更新用户"""
self.cursor.execute(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
('Charlie', 'charlie@example.com', 25)
)
self.cursor.execute(
'UPDATE users SET age = ? WHERE email = ?',
(26, 'charlie@example.com')
)
self.cursor.execute(
'SELECT age FROM users WHERE email = ?',
('charlie@example.com',)
)
age = self.cursor.fetchone()[0]
self.assertEqual(age, 26)
def test_delete_user(self):
"""测试删除用户"""
self.cursor.execute(
'INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
('Diana', 'diana@example.com', 30)
)
self.cursor.execute(
'DELETE FROM users WHERE email = ?',
('diana@example.com',)
)
self.cursor.execute(
'SELECT COUNT(*) FROM users WHERE email = ?',
('diana@example.com',)
)
count = self.cursor.fetchone()[0]
self.assertEqual(count, 0)
# 创建数据库CRUD测试套件
def db_test_suite():
loader = unittest.TestLoader()
return loader.loadTestsFromTestCase(TestUserCRUD)
2. Web API测试套件组织
Web API测试涉及请求发送、响应验证、认证鉴权和异常处理等多个方面。通过精心组织的测试套件和自定义Runner,可以构建清晰且可维护的API测试体系。以下示例演示如何使用测试套件分层组织不同粒度的API测试。
import unittest
import json
from unittest.mock import patch, Mock
class TestUserAPI(unittest.TestCase):
"""用户API测试"""
def setUp(self):
self.base_url = 'https://api.example.com/v1'
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer test_token'
}
@patch('requests.post')
def test_user_registration(self, mock_post):
"""测试用户注册API"""
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {
'id': 1001,
'name': 'new_user',
'token': 'jwt_token_xxx'
}
mock_post.return_value = mock_response
# 模拟调用
payload = {'username': 'new_user', 'password': 'SecurePass123'}
response = mock_post(
f'{self.base_url}/register',
headers=self.headers,
json=payload
)
self.assertEqual(response.status_code, 201)
data = response.json()
self.assertIn('token', data)
self.assertEqual(data['name'], 'new_user')
@patch('requests.get')
def test_get_user_profile(self, mock_get):
"""测试获取用户资料"""
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'id': 1001,
'name': 'Alice',
'email': 'alice@example.com',
'role': 'admin'
}
mock_get.return_value = mock_response
response = mock_get(
f'{self.base_url}/users/1001',
headers=self.headers
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data['role'], 'admin')
@patch('requests.get')
def test_user_not_found(self, mock_get):
"""测试用户不存在场景"""
mock_response = Mock()
mock_response.status_code = 404
mock_response.json.return_value = {
'error': 'user_not_found',
'message': '用户不存在'
}
mock_get.return_value = mock_response
response = mock_get(
f'{self.base_url}/users/99999',
headers=self.headers
)
self.assertEqual(response.status_code, 404)
data = response.json()
self.assertEqual(data['error'], 'user_not_found')
class TestOrderAPI(unittest.TestCase):
"""订单API测试"""
@patch('requests.post')
def test_create_order(self, mock_post):
"""测试创建订单"""
mock_response = Mock()
mock_response.status_code = 201
mock_response.json.return_value = {
'order_id': 'ORD_20260301_001',
'status': 'pending',
'total_amount': 299.99
}
mock_post.return_value = mock_response
response = mock_post(
'https://api.example.com/v1/orders',
json={'user_id': 1001, 'items': [{'product_id': 'P001', 'qty': 2}]}
)
self.assertEqual(response.status_code, 201)
data = response.json()
self.assertIn('order_id', data)
@patch('requests.post')
def test_create_order_invalid_payload(self, mock_post):
"""测试无效订单"""
mock_response = Mock()
mock_response.status_code = 400
mock_response.json.return_value = {
'error': 'validation_error',
'message': '缺少必填字段: items'
}
mock_post.return_value = mock_response
response = mock_post(
'https://api.example.com/v1/orders',
json={'user_id': 1001}
)
self.assertEqual(response.status_code, 400)
# 构建分层API测试套件
def api_test_suite():
"""构建完整的API测试套件"""
loader = unittest.TestLoader()
# 用户API套件
user_suite = loader.loadTestsFromTestCase(TestUserAPI)
# 订单API套件
order_suite = loader.loadTestsFromTestCase(TestOrderAPI)
# 完整的API测试套件
api_suite = unittest.TestSuite([user_suite, order_suite])
return api_suite
if __name__ == '__main__':
# 使用自定义Runner执行
suite = api_test_suite()
runner = TimedTestRunner(verbosity=2)
result = runner.run(suite)
print(f'API测试完成: 通过={result.testsRun - len(result.failures) - len(result.errors)}, '
f'失败={len(result.failures)}, 错误={len(result.errors)}')
3. 综合测试调度器
在实际项目中,通常需要同时执行多种类型的测试并按需生成报告。以下示例展示如何创建一个综合调度器,将不同测试套件组装在一起统一执行。
import unittest
import sys
class TestScheduler:
"""综合测试调度器"""
def __init__(self):
self.suites = {}
def register_suite(self, name, suite_fn, tags=None):
"""注册测试套件"""
self.suites[name] = {
'builder': suite_fn,
'tags': tags or [],
'suite': None
}
def build_all(self):
"""构建所有注册的套件"""
for name, info in self.suites.items():
info['suite'] = info['builder']()
print(f'已构建套件: {name} ({info["suite"].countTestCases()} 条测试)')
def run_by_tags(self, tags):
"""按标签运行测试"""
master = unittest.TestSuite()
for name, info in self.suites.items():
if any(tag in info['tags'] for tag in tags):
if info['suite']:
master.addTests(info['suite'])
runner = unittest.TextTestRunner(verbosity=2)
return runner.run(master)
def run_all(self, runner_cls=None):
"""运行所有测试"""
master = unittest.TestSuite()
for info in self.suites.values():
if info['suite']:
master.addTests(info['suite'])
runner = (runner_cls or unittest.TextTestRunner)(verbosity=2)
return runner.run(master)
# 使用示例
if __name__ == '__main__':
scheduler = TestScheduler()
scheduler.register_suite('db_crud', db_test_suite, tags=['database', 'core'])
scheduler.register_suite('api_tests', api_test_suite, tags=['api', 'integration'])
scheduler.build_all()
# 运行所有测试
result = scheduler.run_all()
sys.exit(0 if result.wasSuccessful() else 1)