数据库测试:SQLite/PostgreSQL数据操作测试

Python 测试与调试专题 · 确保数据库操作的正确性

专题:Python 测试与调试系统学习

关键词:Python, 测试, 调试, 数据库测试, SQLite, PostgreSQL, SQLAlchemy, ORM测试, 测试容器

一、数据库测试概述

数据库测试是软件质量保障体系中至关重要的环节。在现代应用开发中,几乎所有的业务逻辑都依赖于数据库的正确交互,因此确保数据操作的正确性、可靠性和性能表现,直接关系到整个系统的稳定性。数据库测试涵盖的范围非常广泛,从简单的CRUD(增删改查)操作验证,到复杂的事务一致性测试、并发访问控制测试和数据库迁移测试,都需要系统化的策略和工具支持。

测试策略:内存 / 容器 / Mock

针对数据库测试,业界形成了三种主流策略,各有其适用场景和权衡。第一种是内存数据库测试策略,典型代表是使用SQLite的:memory:模式。这种策略的最大优势在于速度快、无需额外安装数据库服务、测试用例之间天然隔离。它特别适合单元测试和快速反馈场景,可以轻松集成到CI/CD流水线中。但缺点是SQLite与PostgreSQL/MySQL等生产数据库在功能和行为上存在差异,可能导致"在测试中通过,在生产中失败"的假阳性问题。

第二种是测试容器(Testcontainers)策略,它利用Docker容器在测试时启动真实的数据库实例。这种策略提供了最接近生产环境的测试条件,确保数据库行为完全一致。Testcontainers支持PostgreSQL、MySQL、MongoDB等多种数据库,适合集成测试和端到端测试场景。其代价是测试速度相对较慢,且需要Docker运行环境。第三种是Mock策略,通过mock库模拟数据库交互层,完全隔离了数据库依赖。这种策略最轻量,测试速度最快,但覆盖率最低,适合边界条件测试和异常场景模拟。

数据库测试的核心挑战

数据库测试面临多个独特的挑战。首先是测试隔离问题,多个测试用例可能操作相同的数据表,如果不妥善管理,测试之间的数据污染会导致偶发性的失败。其次是测试数据的准备和清理,每次测试都需要建立已知的数据状态,测试结束后又需要恢复到干净状态。第三是环境一致性,开发环境、CI环境和生产环境的数据库版本和配置差异可能引入隐秘的Bug。第四是性能测试的复杂性,数据库性能受索引、查询计划、连接池、硬件资源等多因素影响,在测试环境中难以精确模拟。

核心原则:数据库测试应遵循"测试金字塔"原则——底层单元测试多而快,使用内存数据库;中间层集成测试使用测试容器验证真实行为;顶层端到端测试少而精,覆盖关键业务路径。每层测试各有侧重,相互补充。

测试隔离策略详解

测试隔离是数据库测试中最关键的考量之一。常用的隔离策略包括以下几种。事务回滚策略:在每个测试开始时启动事务,测试结束时回滚,这种方式的优点是速度极快,无需清理数据,但缺点是事务嵌套和提交操作无法测试。数据库重建策略:每个测试类或测试模块运行前重建所有表,这种方式隔离最彻底,但重建表操作耗时较长。Schema隔离策略:为每个测试用例创建独立的Schema或数据库,并在测试完成后删除,这种方式隔离性好但资源消耗大。数据清理策略:测试完成后使用DELETE或TRUNCATE清理数据,适合大多数集成测试场景。

在实际项目中,我们通常组合使用多种策略。例如,对于纯数据操作层(Repository/DAO层)的单元测试,采用事务回滚策略配合内存数据库;对于业务服务层的集成测试,采用测试容器加数据清理策略。针对性能测试,则需要专门的测试环境,使用与生产环境相同规格的数据库实例和数据量级。

二、SQLite内存测试

SQLite内存数据库是Python数据库测试中最常用的工具之一。SQLite是一个轻量级的嵌入式关系数据库引擎,其:memory:模式可以在内存中创建完全在RAM中运行的数据库,无需任何文件系统操作。这意味着每次测试都可以创建一个全新的数据库实例,测试速度极快,非常适合作为单元测试的数据库后端。Python标准库中的sqlite3模块直接支持:memory:数据库,而SQLAlchemy等ORM框架也天然支持SQLite后端。

使用:memory:数据库进行测试

创建SQLite内存数据库非常简单,只需在连接时指定":memory:"作为数据库路径即可。在pytest中,我们通常通过fixture来管理内存数据库的生命周期。每个测试函数可以获取一个独立的数据库连接,保证测试之间的完全隔离。下面是一个基本的SQLite内存数据库测试示例,展示了如何创建表、插入数据和验证结果。

import sqlite3 import pytest @pytest.fixture def memory_db(): """创建内存数据库,测试结束后自动关闭""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row # 创建表 conn.execute(""" CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title TEXT NOT NULL, content TEXT, FOREIGN KEY (user_id) REFERENCES users(id) ) """) yield conn conn.close() def test_insert_user(memory_db): """测试插入用户记录""" memory_db.execute( "INSERT INTO users (username, email) VALUES (?, ?)", ("testuser", "test@example.com") ) memory_db.commit() cursor = memory_db.execute("SELECT * FROM users WHERE username = ?", ("testuser",)) user = cursor.fetchone() assert user is not None assert user["username"] == "testuser" assert user["email"] == "test@example.com" def test_unique_constraint(memory_db): """测试唯一约束""" memory_db.execute( "INSERT INTO users (username, email) VALUES (?, ?)", ("uniqueuser", "unique@example.com") ) memory_db.commit() with pytest.raises(sqlite3.IntegrityError): memory_db.execute( "INSERT INTO users (username, email) VALUES (?, ?)", ("uniqueuser", "another@example.com") )

事务回滚隔离模式

事务回滚是一种高效的测试隔离模式。我们可以在fixture中开启一个事务,测试执行完毕后回滚该事务,这样所有的数据修改都会被自动撤销,无需手动清理。这种模式特别适合测试套件中有大量测试用例的场景,因为回滚操作比DELETE或DROP TABLE要快得多。需要注意的是,这种模式下测试用例中不能执行COMMIT操作(或者需要特别处理),否则会破坏隔离机制。

@pytest.fixture def db_with_rollback(): """使用事务回滚实现测试隔离""" conn = sqlite3.connect(":memory:") conn.execute(""" CREATE TABLE products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, price REAL NOT NULL, stock INTEGER DEFAULT 0 ) """) conn.execute(""" INSERT INTO products (name, price, stock) VALUES ('Laptop', 999.99, 10), ('Mouse', 29.99, 100), ('Keyboard', 89.99, 50) """) conn.commit() # 开启事务用于回滚 conn.execute("BEGIN") yield conn conn.rollback() # 回滚所有更改 def test_update_product_price(db_with_rollback): """测试更新产品价格""" db_with_rollback.execute( "UPDATE products SET price = 899.99 WHERE name = ?", ("Laptop",) ) cursor = db_with_rollback.execute( "SELECT price FROM products WHERE name = ?", ("Laptop",) ) assert cursor.fetchone()[0] == 899.99 def test_product_stock_query(db_with_rollback): """测试查询产品库存""" cursor = db_with_rollback.execute( "SELECT name, stock FROM products WHERE stock > ?", (50,) ) products = cursor.fetchall() assert len(products) == 2 # Mouse(100) 和 Keyboard(50) # 验证Laptop的价格仍然是原始值,因为上个测试的回滚生效了 cursor = db_with_rollback.execute( "SELECT price FROM products WHERE name = ?", ("Laptop",) ) assert cursor.fetchone()[0] == 999.99

基于pytest的数据库Fixtures

在实际项目中,我们通常需要管理多个fixture来处理数据库的初始化、种子数据加载和清理。通过pytest的fixture机制,我们可以构建一个层次化的fixture体系。例如,可以定义一个conftest.py文件,其中包含全局的数据库连接fixture、表创建fixture和种子数据fixture,测试函数根据自身需要选择依赖哪些fixture。这种架构使得测试代码更加模块化和可维护。

# conftest.py import pytest import sqlite3 @pytest.fixture(scope="session") def db_schema(): """定义数据库表结构(session级别,只执行一次)""" schema = { "users": """ CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, role TEXT DEFAULT 'user' ) """, "orders": """ CREATE TABLE orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, total REAL NOT NULL, status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ) """, "order_items": """ CREATE TABLE order_items ( id INTEGER PRIMARY KEY AUTOINCREMENT, order_id INTEGER NOT NULL, product_name TEXT NOT NULL, quantity INTEGER NOT NULL, price REAL NOT NULL, FOREIGN KEY (order_id) REFERENCES orders(id) ) """ } return schema @pytest.fixture def fresh_db(db_schema): """为每个测试创建全新的数据库""" conn = sqlite3.connect(":memory:") conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") for table_name, create_sql in db_schema.items(): conn.execute(create_sql) yield conn conn.close() @pytest.fixture def db_with_seed_data(fresh_db): """在干净数据库上插入种子数据""" users_data = [ ("alice", "alice@example.com", "admin"), ("bob", "bob@example.com", "user"), ("charlie", "charlie@example.com", "user"), ] for username, email, role in users_data: fresh_db.execute( "INSERT INTO users (username, email, role) VALUES (?, ?, ?)", (username, email, role) ) fresh_db.commit() return fresh_db

三、PostgreSQL测试容器

当我们需要在测试中使用真实的PostgreSQL数据库时,testcontainers-python库是一个非常强大的工具。它可以在测试过程中自动启动Docker容器运行PostgreSQL实例,测试完成后自动清理容器。这种方式既保证了测试环境与生产环境的一致性,又避免了在开发机上长期运行数据库服务的开销。Testcontainers支持多种配置选项,包括数据库版本、初始化脚本、持久化挂载等。

testcontainers-python基础用法

使用testcontainers-python启动PostgreSQL测试容器的基本模式是:在fixture中创建PostgreSQL容器实例,等待容器就绪后获取连接参数,然后通过SQLAlchemy或psycopg2连接到数据库。pytest的fixture机制可以完美地与容器生命周期管理结合。需要注意的是,测试容器需要Docker运行环境,在CI/CD场景下需要确保Docker可用。

import pytest from testcontainers.postgres import PostgresContainer import sqlalchemy as sa from sqlalchemy import text @pytest.fixture(scope="module") def postgres_container(): """启动PostgreSQL容器(module级别,减少启动次数)""" with PostgresContainer("postgres:15-alpine") as postgres: # 获取连接URL db_url = postgres.get_connection_url() yield db_url # 退出with块时自动停止并删除容器 @pytest.fixture def pg_engine(postgres_container): """创建SQLAlchemy引擎""" engine = sa.create_engine(postgres_container) # 创建测试表 with engine.connect() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS employees ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, department VARCHAR(50), salary NUMERIC(10, 2), hire_date DATE DEFAULT CURRENT_DATE ) """)) conn.commit() yield engine engine.dispose() def test_insert_employee(pg_engine): """测试插入员工记录""" with pg_engine.connect() as conn: conn.execute(text( "INSERT INTO employees (name, department, salary) " "VALUES (:name, :dept, :salary)" ), {"name": "张三", "dept": "Engineering", "salary": 15000.00}) conn.commit() result = conn.execute( text("SELECT * FROM employees WHERE name = :name"), {"name": "张三"} ) emp = result.fetchone() assert emp is not None assert emp.department == "Engineering" assert float(emp.salary) == 15000.00

连接管理与容器生命周期

测试容器的生命周期管理是关键设计考量。每个Testcontainers容器实例在启动时都会分配随机的端口,避免了端口冲突问题。容器启动可能需要几秒到十几秒的时间(取决于镜像下载和数据库初始化速度),因此建议使用scope="module"或scope="session"的fixture来复用容器,而不是为每个测试函数都重启容器。对于需要在多个测试模块中共享同一个容器的情况,可以使用session级别的fixture。

在容器内部,可以通过环境变量配置PostgreSQL的用户名、密码、数据库名等参数。Testcontainers也支持自定义初始化脚本,只需将SQL文件挂载到容器的/docker-entrypoint-initdb.d/目录下,容器启动时会自动执行这些脚本。这对于需要在测试前创建复杂的Schema结构或导入测试数据的场景非常有用。

import pytest from testcontainers.postgres import PostgresContainer import sqlalchemy as sa from sqlalchemy import text import time @pytest.fixture(scope="session") def shared_postgres(): """session级别的共享PostgreSQL容器""" container = PostgresContainer( "postgres:15-alpine", user="test_user", password="test_pass", dbname="testdb" ) container.start() time.sleep(2) # 等待容器完全就绪 db_url = container.get_connection_url() yield container, db_url container.stop(timeout=10) @pytest.fixture def pg_conn(shared_postgres): """为每个测试创建数据库连接""" container, db_url = shared_postgres engine = sa.create_engine(db_url) # 创建测试表结构 with engine.connect() as conn: conn.execute(text(""" CREATE TABLE IF NOT EXISTS departments ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL UNIQUE, budget NUMERIC(12, 2) DEFAULT 0 ) """)) conn.execute(text(""" CREATE TABLE IF NOT EXISTS employees ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, department_id INTEGER REFERENCES departments(id), salary NUMERIC(10, 2), email VARCHAR(200) UNIQUE ) """)) conn.commit() # 清理已有数据(确保每个测试从干净状态开始) with engine.connect() as conn: conn.execute(text("DELETE FROM employees")) conn.execute(text("DELETE FROM departments")) conn.commit() yield engine engine.dispose() def test_department_employee_relation(pg_conn): """测试部门与员工的关联关系""" with pg_conn.connect() as conn: # 插入部门 conn.execute( text("INSERT INTO departments (name, budget) VALUES (:name, :budget)"), {"name": "技术部", "budget": 500000.00} ) # 插入员工,关联部门 conn.execute( text("INSERT INTO employees (name, department_id, salary, email) " "VALUES (:name, (SELECT id FROM departments WHERE name = :dept), " ":salary, :email)"), {"name": "李四", "dept": "技术部", "salary": 20000.00, "email": "lisi@company.com"} ) conn.commit() result = conn.execute( text("SELECT e.name, d.name as dept_name, e.salary " "FROM employees e JOIN departments d " "ON e.department_id = d.id") ) rows = result.fetchall() assert len(rows) == 1 assert rows[0].dept_name == "技术部"

多数据库版本的兼容性测试

测试容器的一个强大应用场景是跨版本兼容性测试。我们可以在同一个测试套件中对不同版本的PostgreSQL运行相同的测试用例,验证应用是否兼容目标版本范围内的所有数据库版本。通过在fixture中参数化容器配置,可以轻松实现版本矩阵测试。这对于维护支持多数据库版本的项目或进行数据库升级迁移前的验证尤为重要。

import pytest from testcontainers.postgres import PostgresContainer import sqlalchemy as sa from sqlalchemy import text # 测试多个PostgreSQL版本 POSTGRES_VERSIONS = ["postgres:13-alpine", "postgres:14-alpine", "postgres:15-alpine"] @pytest.mark.parametrize("pg_version", POSTGRES_VERSIONS) def test_compatibility_across_versions(pg_version): """验证SQL在不同PG版本上的兼容性""" with PostgresContainer(pg_version) as postgres: engine = sa.create_engine(postgres.get_connection_url()) with engine.connect() as conn: # 测试通用SQL功能 conn.execute(text(""" CREATE TABLE version_test ( id SERIAL PRIMARY KEY, data JSONB, tags TEXT[] ) """)) conn.execute( text("INSERT INTO version_test (data, tags) " "VALUES (:data, :tags)"), {"data": {"key": "value", "count": 42}, "tags": ["test", "compatibility"]} ) conn.commit() result = conn.execute( text("SELECT data->>'key' as k, tags[1] as t " "FROM version_test") ) row = result.fetchone() assert row.k == "value" assert row.t == "test" engine.dispose()

四、SQLAlchemy ORM测试

SQLAlchemy是Python生态中最流行的ORM框架,广泛应用于各类Web应用和数据处理系统中。对基于SQLAlchemy的应用进行测试时,核心关注点包括:引擎和会话的正确管理、模型映射的准确性、CRUD操作的完整性、关系模型的正确加载以及复杂查询的结果准确性。通过合理的fixture设计,我们可以为每个测试提供隔离的数据库会话,确保测试的可重复性和可靠性。

引擎与会话管理fixture

SQLAlchemy的引擎(Engine)和会话(Session)是数据库交互的核心对象。在测试中,我们通常使用SQLite内存数据库作为引擎后端(快速且隔离),但关键路径的集成测试仍应使用PostgreSQL测试容器。引擎通常是整个测试会话共享的(scope="session"),而会话则应该针对每个测试函数创建新的实例,以确保测试之间的隔离。在使用事务回滚策略时,可以将会话绑定到一个savepoint,测试完成后回滚。

import pytest from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship, Session from datetime import datetime Base = declarative_base() # 定义模型 class Product(Base): __tablename__ = "products" id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) price = Column(Float, nullable=False) category = Column(String(50)) stock = Column(Integer, default=0) class Order(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True) customer_name = Column(String(100), nullable=False) total = Column(Float, default=0.0) status = Column(String(20), default="pending") created_at = Column(DateTime, default=datetime.utcnow) items = relationship("OrderItem", back_populates="order") class OrderItem(Base): __tablename__ = "order_items" id = Column(Integer, primary_key=True) order_id = Column(Integer, ForeignKey("orders.id")) product_id = Column(Integer, ForeignKey("products.id")) quantity = Column(Integer, nullable=False) price = Column(Float, nullable=False) order = relationship("Order", back_populates="items") product = relationship("Product") @pytest.fixture(scope="session") def engine(): """创建共享的SQLite内存引擎""" _engine = create_engine("sqlite:///:memory:", echo=False) Base.metadata.create_all(_engine) yield _engine _engine.dispose() @pytest.fixture def session(engine): """为每个测试创建独立会话,使用事务回滚隔离""" connection = engine.connect() transaction = connection.begin() Session = sessionmaker(bind=connection) _session = Session() yield _session _session.close() transaction.rollback() connection.close()

CRUD操作测试

CRUD测试是ORM测试的基础,需要覆盖创建(Create)、读取(Read)、更新(Update)和删除(Delete)四种核心操作。每个测试应该专注于验证单一操作的正确性,包括正常路径和异常路径。例如,创建操作不仅要验证数据正确写入,还要验证唯一约束、非空约束等数据库层约束的正确执行。对于更新操作,需要验证部分字段更新和全部字段更新两种场景。

def test_create_product(session): """测试创建产品""" product = Product( name="机械键盘", price=299.99, category="外设", stock=50 ) session.add(product) session.flush() assert product.id is not None # 从数据库重新查询验证 saved = session.get(Product, product.id) assert saved.name == "机械键盘" assert saved.price == 299.99 def test_read_products_with_filter(session): """测试条件查询""" # 准备测试数据 products = [ Product(name="游戏鼠标", price=199.00, category="外设", stock=30), Product(name="显示器", price=1999.00, category="显示设备", stock=10), Product(name="USB集线器", price=49.00, category="外设", stock=100), ] session.add_all(products) session.flush() # 查询外设类且库存大于50的产品 result = session.query(Product).filter( Product.category == "外设", Product.stock > 50 ).all() assert len(result) == 1 assert result[0].name == "USB集线器" def test_update_product_stock(session): """测试更新产品库存""" product = Product(name="测试商品", price=99.00, stock=10) session.add(product) session.flush() # 更新库存 product.stock = 5 session.flush() # 验证更新 session.expire(product) assert product.stock == 5 def test_delete_product(session): """测试删除产品""" product = Product(name="待删除商品", price=1.00) session.add(product) session.flush() product_id = product.id session.delete(product) session.flush() deleted = session.get(Product, product_id) assert deleted is None

关系测试与级联操作

SQLAlchemy的关系映射(relationship)和级联操作(cascade)是ORM测试中的重要关注点。需要验证父子关系的正确创建和加载,以及删除父记录时子记录的级联行为。懒加载(lazy loading)和预加载(eager loading)的行为差异也需要测试覆盖,避免出现N+1查询问题。

def test_create_order_with_items(session): """测试创建含订单项的订单""" # 准备产品 mouse = Product(name="无线鼠标", price=129.00, stock=50) pad = Product(name="鼠标垫", price=29.00, stock=200) session.add_all([mouse, pad]) session.flush() # 创建订单 order = Order(customer_name="张三", total=158.00) order.items = [ OrderItem(product_id=mouse.id, quantity=1, price=129.00), OrderItem(product_id=pad.id, quantity=1, price=29.00), ] session.add(order) session.flush() # 验证关系 saved_order = session.get(Order, order.id) assert len(saved_order.items) == 2 assert saved_order.items[0].product.name == "无线鼠标" def test_cascade_delete(session): """测试级联删除""" product = Product(name="测试商品", price=99.00) order = Order(customer_name="测试用户", total=99.00) order.items = [OrderItem(product_id=1, quantity=1, price=99.00)] # 注意:这里product_id=1是硬编码的,实际测试中需要关联真实的产品 session.add(product) session.add(order) session.flush() # 如果设置了级联删除,删除订单时订单项应该自动删除 session.delete(order) session.flush() items = session.query(OrderItem).filter( OrderItem.order_id == order.id ).all() assert len(items) == 0

五、事务测试

事务是数据库保证数据一致性的核心机制。在数据库测试中,事务测试是验证数据完整性和并发控制正确性的关键环节。一个完备的事务测试套件应该覆盖事务的正常提交、异常回滚、保存点(Savepoint)管理、并发事务的隔离行为以及不同隔离级别下的数据可见性等方面。通过系统化的事务测试,可以及时发现和修复数据一致性问题,避免在生产环境中出现数据损坏或业务逻辑错误。

事务提交与回滚测试

事务提交与回滚是最基础的事务操作测试。在测试中,我们需要验证在成功路径下数据是否正确持久化到数据库,以及在失败路径下数据是否被完全撤销。特别需要注意的是,回滚操作应该恢复到事务开始前的状态,包括所有已执行的INSERT、UPDATE和DELETE操作。使用SAVEPOINT可以实现事务内的部分回滚,这在复杂业务逻辑中非常有用。

import pytest from sqlalchemy import create_engine, text from sqlalchemy.orm import Session @pytest.fixture def transaction_engine(): engine = create_engine("sqlite:///:memory:", echo=False) with engine.connect() as conn: conn.execute(text(""" CREATE TABLE accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, owner TEXT NOT NULL, balance REAL NOT NULL DEFAULT 0 ) """)) conn.execute(text( "INSERT INTO accounts (owner, balance) VALUES (:owner, :balance)" ), [{"owner": "Alice", "balance": 1000}, {"owner": "Bob", "balance": 500}]) conn.commit() return engine def test_transfer_success(transaction_engine): """测试转账事务成功提交""" engine = transaction_engine with engine.connect() as conn: transaction = conn.begin() try: # Alice转300给Bob conn.execute( text("UPDATE accounts SET balance = balance - 300 WHERE owner = 'Alice'") ) conn.execute( text("UPDATE accounts SET balance = balance + 300 WHERE owner = 'Bob'") ) transaction.commit() except: transaction.rollback() raise # 验证数据正确更新 with engine.connect() as conn: alice = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Alice'") ).fetchone() bob = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Bob'") ).fetchone() assert alice[0] == 700 # 1000 - 300 assert bob[0] == 800 # 500 + 300 def test_transfer_rollback_on_failure(transaction_engine): """测试余额不足时事务回滚""" engine = transaction_engine with engine.connect() as conn: try: conn.execute(text("BEGIN")) # 尝试转出超过余额的金额 conn.execute( text("UPDATE accounts SET balance = balance - 2000 WHERE owner = 'Alice'") ) # 检查余额是否足够(模拟业务逻辑检查) new_balance = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Alice'") ).fetchone()[0] if new_balance < 0: raise ValueError("余额不足") conn.execute( text("UPDATE accounts SET balance = balance + 2000 WHERE owner = 'Bob'") ) conn.execute(text("COMMIT")) except ValueError: conn.execute(text("ROLLBACK")) # 验证余额未被修改 with engine.connect() as conn: alice = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Alice'") ).fetchone() assert alice[0] == 1000 # 保持不变

保存点(SAVEPOINT)测试

保存点是事务内部的一个逻辑标记点,允许将事务回滚到该标记点而不影响之前的操作。这在处理批量操作中某些记录失败但希望保留其他成功记录的场景尤为重要。SQLAlchemy通过begin_nested()方法支持嵌套事务,底层就是利用数据库的SAVEPOINT机制。测试保存点行为时,需要验证部分回滚后的数据状态和最终事务提交后的数据状态。

def test_savepoint_partial_rollback(transaction_engine): """测试保存点部分回滚""" engine = transaction_engine with engine.connect() as conn: transaction = conn.begin() # 创建保存点 conn.execute(text("SAVEPOINT before_insert")) conn.execute( text("INSERT INTO accounts (owner, balance) VALUES ('Charlie', 300)") ) # 回滚到保存点(撤销Charlie的插入) conn.execute(text("ROLLBACK TO SAVEPOINT before_insert")) # 提交事务 transaction.commit() # 验证Charlie没有被插入 with engine.connect() as conn: result = conn.execute( text("SELECT COUNT(*) FROM accounts WHERE owner = 'Charlie'") ).fetchone() assert result[0] == 0 def test_nested_transaction_with_savepoint(transaction_engine): """测试嵌套事务中的保存点行为""" engine = transaction_engine with engine.connect() as conn: transaction = conn.begin() # 更新Alice的余额 conn.execute( text("UPDATE accounts SET balance = 500 WHERE owner = 'Alice'") ) # 创建保存点 savepoint = conn.begin_nested() conn.execute( text("UPDATE accounts SET balance = 100 WHERE owner = 'Bob'") ) savepoint.rollback() # 回滚Bob的更新 transaction.commit() # 提交整个事务 with engine.connect() as conn: alice = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Alice'") ).fetchone() bob = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Bob'") ).fetchone() assert alice[0] == 500 # Alice被更新 assert bob[0] == 500 # Bob未被修改(被回滚)

隔离级别测试

数据库事务隔离级别决定了并发事务之间数据可见性的规则。SQL标准定义了四种隔离级别:READ UNCOMMITTED、READ COMMITTED、REPEATABLE READ和SERIALIZABLE。不同隔离级别下,脏读(Dirty Read)、不可重复读(Non-repeatable Read)和幻读(Phantom Read)现象的表现各不相同。PostgreSQL默认使用READ COMMITTED隔离级别,同时也支持REPEATABLE READ和SERIALIZABLE。在测试中,我们需要验证应用在目标隔离级别下的行为是否符合预期。

def test_read_committed_isolation(transaction_engine): """测试READ COMMITTED隔离级别行为""" engine = transaction_engine import threading results = [] def transaction_a(): with engine.connect() as conn: conn.execute(text("BEGIN ISOLATION LEVEL READ COMMITTED")) # 读取Bob的当前余额 balance = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Bob'") ).fetchone()[0] results.append(("read_before", balance)) import time time.sleep(1) # 等待事务B更新 # 再次读取Bob的余额 balance = conn.execute( text("SELECT balance FROM accounts WHERE owner = 'Bob'") ).fetchone()[0] results.append(("read_after", balance)) conn.execute(text("COMMIT")) def transaction_b(): import time time.sleep(0.3) # 等待事务A第一次读取 with engine.connect() as conn: conn.execute(text("BEGIN")) conn.execute( text("UPDATE accounts SET balance = 1000 WHERE owner = 'Bob'") ) conn.execute(text("COMMIT")) t1 = threading.Thread(target=transaction_a) t2 = threading.Thread(target=transaction_b) t1.start() t2.start() t1.join() t2.join() # READ COMMITTED下,事务A第二次读取能看到事务B的提交 read_before = [r for r in results if r[0] == "read_before"][0][1] read_after = [r for r in results if r[0] == "read_after"][0][1] assert read_before == 500 # 事务B提交前 assert read_after == 1000 # 事务B提交后(不可重复读)

六、数据库迁移测试

数据库迁移(Migration)是现代应用开发中管理数据库Schema变更的标准实践。Alembic是SQLAlchemy生态中最流行的迁移管理工具,它允许开发者以版本控制的方式管理数据库结构的渐进式变更。数据库迁移测试的目的是确保每次Schema变更都能正确执行,且不会破坏现有数据和业务逻辑。一个完善的迁移测试套件应该覆盖迁移的升级(upgrade)和降级(downgrade)操作,并验证迁移前后数据的一致性和完整性。

Alembic迁移测试基础

Alembic迁移测试的核心思路是:初始化数据库到迁移前的状态,执行迁移升级操作,验证迁移后的Schema和数据是否正确,然后执行迁移降级操作,验证是否恢复到迁移前的状态。在测试中,我们通常使用SQLite内存数据库作为测试目标,但关键迁移应该也在PostgreSQL上进行验证,因为不同数据库对DDL操作的支持存在差异。

import pytest from alembic.config import Config from alembic import command from sqlalchemy import create_engine, inspect, text import os @pytest.fixture def alembic_config(tmp_path): """创建Alembic配置对象""" # 创建alembic.ini的内容 ini_content = """ [alembic] script_location = migrations sqlalchemy.url = sqlite:///:memory: """ ini_path = tmp_path / "alembic.ini" ini_path.write_text(ini_content) cfg = Config(str(ini_path)) return cfg @pytest.fixture def migration_engine(): """为迁移测试创建数据库引擎""" engine = create_engine("sqlite:///:memory:", echo=False) return engine def test_initial_migration_upgrade(alembic_config, migration_engine): """测试初始迁移的升级操作""" from alembic import command import alembic.config # 执行迁移升级到最新版本 alembic_cfg = alembic.config.Config(str(alembic_config)) alembic_cfg.attributes["connection"] = migration_engine.connect() command.upgrade(alembic_cfg, "head") # 验证表结构 inspector = inspect(migration_engine) tables = inspector.get_table_names() expected_tables = ["users", "posts", "comments"] for table in expected_tables: assert table in tables, f"表 {table} 应该在迁移后存在" # 验证列结构 users_columns = [col["name"] for col in inspector.get_columns("users")] assert "id" in users_columns assert "username" in users_columns assert "email" in users_columns

降级测试与数据完整性验证

降级测试验证迁移的可逆性,确保在需要时可以安全地回滚到之前的版本。降级测试需要特别关注数据完整性问题:如果迁移涉及列的删除或表的重构,降级时如何恢复数据是一个需要仔细设计的问题。例如,如果一个迁移删除了某列,降级时需要重新创建该列并考虑如何填充已有数据。对于包含数据迁移的Schema变更(如拆分字段、合并表),降级测试更加复杂,需要验证降级后数据的完整性不受破坏。

def test_migration_downgrade(alembic_config, migration_engine): """测试迁移降级操作""" import alembic.config from alembic import command import sqlalchemy as sa alembic_cfg = alembic.config.Config(str(alembic_config)) conn = migration_engine.connect() alembic_cfg.attributes["connection"] = conn # 先迁移到最新版本 command.upgrade(alembic_cfg, "head") # 插入一些测试数据 conn.execute(text(""" INSERT INTO users (username, email) VALUES ('test_user', 'test@example.com') """)) conn.commit() # 获取当前版本 current_revision = command.current(alembic_cfg) # 降级一个版本 command.downgrade(alembic_cfg, "-1") # 验证降级后的表结构 inspector = inspect(migration_engine) tables = inspector.get_table_names() # 验证降级后表结构符合预期 def test_data_integrity_after_migration(alembic_config, migration_engine): """测试迁移后的数据完整性""" import alembic.config from alembic import command from sqlalchemy import text alembic_cfg = alembic.config.Config(str(alembic_config)) conn = migration_engine.connect() alembic_cfg.attributes["connection"] = conn # 初始Schema conn.execute(text(""" CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username VARCHAR(100) NOT NULL, email VARCHAR(200), full_name VARCHAR(200) ) """)) # 插入测试数据 conn.execute(text(""" INSERT INTO users (username, email, full_name) VALUES ('alice', 'alice@example.com', 'Alice Wang'), ('bob', 'bob@example.com', 'Bob Li') """)) conn.commit() # 执行迁移:将full_name拆分为first_name和last_name # 这里模拟迁移脚本的动作 conn.execute(text(""" ALTER TABLE users ADD COLUMN first_name VARCHAR(100) """)) conn.execute(text(""" ALTER TABLE users ADD COLUMN last_name VARCHAR(100) """)) # 拆分full_name conn.execute(text(""" UPDATE users SET first_name = SUBSTR(full_name, 1, INSTR(full_name, ' ') - 1), last_name = SUBSTR(full_name, INSTR(full_name, ' ') + 1) """)) conn.execute(text(""" ALTER TABLE users DROP COLUMN full_name """)) conn.commit() # 数据完整性验证 result = conn.execute( text("SELECT username, first_name, last_name FROM users WHERE username = 'alice'") ).fetchone() assert result.first_name == "Alice" assert result.last_name == "Wang"

迁移版本状态验证

除了验证迁移的执行结果,还需要验证迁移版本管理本身的正确性。这包括:迁移历史链的完整性(没有断裂的版本)、每个迁移的可达性(可以从任意版本升级到目标版本)、以及多个开发者的迁移变更之间的冲突检测。在团队协作中,迁移冲突是常见问题,通过自动化的迁移测试可以提前发现并解决这些问题。

def test_migration_history_integrity(alembic_config): """验证迁移历史链的完整性""" from alembic.script import ScriptDirectory from alembic.config import Config config = Config(str(alembic_config)) script = ScriptDirectory.from_config(config) heads = script.get_heads() assert len(heads) == 1, "应该只有一个head迁移" # 验证所有迁移可达head for revision in script.walk_revisions(): # 每个迁移应该有down_revision指向正确的父版本 if revision.down_revision is not None: parent = script.get_revision(revision.down_revision) assert parent is not None, f"迁移 {revision.revision} 的父版本 {revision.down_revision} 不存在" def test_migration_rollback_and_reapply(alembic_config, migration_engine): """测试迁移的回滚和重新应用""" import alembic.config from alembic import command alembic_cfg = alembic.config.Config(str(alembic_config)) conn = migration_engine.connect() alembic_cfg.attributes["connection"] = conn # 完整升级 command.upgrade(alembic_cfg, "head") # 完全降级到base command.downgrade(alembic_cfg, "base") # 重新升级到head(验证幂等性) command.upgrade(alembic_cfg, "head") inspector = inspect(migration_engine) tables = inspector.get_table_names() assert len(tables) > 0, "重新升级后表结构应正确创建"

七、测试数据管理

测试数据管理是数据库测试中最耗时但也最关键的部分之一。高质量的测试数据管理能够保证测试的可重复性、可读性和维护性。Python生态中,factory_boy是最流行的测试数据工厂库,它提供了声明式的方式来定义模型工厂,可以快速生成测试对象。配合Faker库,factory_boy可以生成逼真的测试数据,同时保持对关键字段的精确控制。此外,数据清理策略的选择直接影响测试的速度和可靠性。

使用factory_boy创建测试数据

factory_boy允许开发者定义工厂类(Factory类),每个工厂类对应一个ORM模型,描述了如何创建该模型的实例。工厂类中可以定义字段的生成规则,例如使用Faker生成随机名称、使用Sequence生成递增序列、使用SubFactory创建关联对象等。通过Factory类,测试代码可以用极少的代码创建复杂的测试数据对象,大大提高了测试的可读性和可维护性。

import factory from factory import fuzzy from datetime import datetime, timedelta import random # 假设有以下SQLAlchemy模型 class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) username = Column(String(100), unique=True) email = Column(String(200), unique=True) full_name = Column(String(200)) is_active = Column(Integer, default=1) created_at = Column(DateTime, default=datetime.utcnow) class Category(Base): __tablename__ = "categories" id = Column(Integer, primary_key=True) name = Column(String(100), unique=True) description = Column(String(500)) class Article(Base): __tablename__ = "articles" id = Column(Integer, primary_key=True) title = Column(String(200)) content = Column(String) author_id = Column(Integer, ForeignKey("users.id")) category_id = Column(Integer, ForeignKey("categories.id")) is_published = Column(Integer, default=0) published_at = Column(DateTime, nullable=True) author = relationship("User") category = relationship("Category") # 定义工厂类 class UserFactory(factory.Factory): class Meta: model = User username = factory.Sequence(lambda n: f"user_{n:04d}") email = factory.LazyAttribute(lambda o: f"{o.username}@example.com") full_name = factory.Faker("name") is_active = fuzzy.FuzzyInteger(0, 1) created_at = factory.LazyFunction(datetime.utcnow) class CategoryFactory(factory.Factory): class Meta: model = Category name = factory.Sequence(lambda n: f"category_{n:04d}") description = factory.Faker("sentence") class ArticleFactory(factory.Factory): class Meta: model = Article title = factory.Faker("sentence", nb_words=6) content = factory.Faker("paragraph", nb_sentences=10) author = factory.SubFactory(UserFactory) category = factory.SubFactory(CategoryFactory) is_published = fuzzy.FuzzyInteger(0, 1) published_at = factory.Maybe( factory.SelfAttribute("is_published"), factory.LazyFunction(lambda: datetime.utcnow() - timedelta(days=random.randint(1, 30))), None )

在测试中使用工厂

在测试中使用工厂类可以显著简化测试数据的准备过程。通过工厂类的灵活组合,我们可以快速创建各种测试场景所需的数据状态。例如,创建已发布的文章、创建特定用户的文章集合、创建属于特定分类的文章等。工厂类还支持属性覆盖,允许在创建实例时临时修改某些字段的值,以适应特定测试用例的需求。

def test_create_published_article(session): """测试创建已发布的文章""" # 使用工厂创建已发布的文章 article = ArticleFactory( is_published=1, title="Python数据库测试最佳实践" ) session.add(article) session.flush() # 验证文章已正确创建 saved = session.get(Article, article.id) assert saved is not None assert saved.title == "Python数据库测试最佳实践" assert saved.is_published == 1 assert saved.author is not None assert saved.category is not None assert saved.published_at is not None def test_query_articles_by_author(session): """测试按作者查询文章""" # 创建特定作者的多篇文章 author = UserFactory(username="tech_writer", full_name="技术作家") articles = ArticleFactory.create_batch( 5, author=author, is_published=1 ) for a in articles: session.add(a) session.flush() # 查询该作者的文章 result = session.query(Article).filter( Article.author_id == author.id, Article.is_published == 1 ).all() assert len(result) == 5 for article in result: assert article.author.full_name == "技术作家" def test_article_with_specific_category(session): """测试文章和特定分类的关联""" tech_category = CategoryFactory(name="技术", description="技术相关文章") articles = ArticleFactory.create_batch(3, category=tech_category, is_published=1) for a in articles: session.add(a) session.flush() # 验证所有文章都属于正确的分类 saved = session.query(Article).filter( Article.category_id == tech_category.id ).all() assert len(saved) == 3

数据清理策略

数据清理是测试数据管理的重要环节,合理的清理策略能够保证测试的独立性和可重复性。常见的清理策略包括:事务回滚(最快,但无法测试提交行为)、表截断(TRUNCATE,速度快但无法处理外键约束)、逐表删除(DELETE FROM按依赖顺序删除,最安全但较慢)、数据库重建(DROP ALL后再CREATE,隔离最彻底但最慢)。在实际项目中,建议根据测试类型选择合适的策略。单元测试层使用事务回滚,集成测试层使用表截断或逐表删除。

import pytest from sqlalchemy import text class DataCleaner: """数据清理工具类""" def __init__(self, engine): self.engine = engine def truncate_all(self, tables): """截断所有表(按外键依赖顺序反向截断)""" with self.engine.connect() as conn: conn.execute(text("PRAGMA foreign_keys = OFF")) for table in reversed(tables): conn.execute(text(f"DELETE FROM {table}")) conn.execute(text("PRAGMA foreign_keys = ON")) conn.commit() def delete_in_order(self, metadata): """按照依赖顺序删除数据""" with self.engine.connect() as conn: conn.execute(text("PRAGMA foreign_keys = OFF")) # 获取所有表并按依赖关系排序 tables = metadata.sorted_tables for table in reversed(tables): conn.execute(text(f"DELETE FROM {table.name}")) conn.execute(text("PRAGMA foreign_keys = ON")) conn.commit() @pytest.fixture def cleaner(engine): return DataCleaner(engine) def test_with_cleanup(engine, cleaner): """测试数据清理效果""" from sqlalchemy import text # 插入测试数据 with engine.connect() as conn: conn.execute(text(""" INSERT INTO users (username, email) VALUES ('temp_user', 'temp@example.com') """)) conn.commit() # 清理数据 cleaner.truncate_all(["users"]) # 验证数据已被清理 with engine.connect() as conn: count = conn.execute( text("SELECT COUNT(*) FROM users") ).fetchone()[0] assert count == 0

八、并发数据库测试

并发数据库测试是验证应用在多线程或多进程同时访问数据库时行为正确性的关键环节。并发环境下可能出现的问题包括:竞态条件(Race Condition)、死锁(Deadlock)、脏读(Dirty Read)、丢失更新(Lost Update)和不可重复读等。通过精心设计的并发测试,可以暴露这些在单线程测试中无法发现的问题,确保应用在高并发场景下的数据一致性和系统稳定性。

多线程写入测试

多线程写入测试模拟多个线程同时向数据库写入数据的场景。测试的核心目标是验证在并发写入下数据的一致性和完整性不受破坏。例如,多个线程同时尝试创建订单、同时扣减库存或同时更新同一条记录时,数据库的锁机制和事务隔离应该确保最终结果符合预期。在Python中,可以使用threading模块创建并发工作线程,并使用barrier或event来同步线程的启动时机,确保真正的并发执行。

import pytest import threading from sqlalchemy import create_engine, text from concurrent.futures import ThreadPoolExecutor, as_completed @pytest.fixture def concurrency_engine(): """为并发测试创建数据库引擎""" engine = create_engine("sqlite:///:memory:", echo=False) with engine.connect() as conn: conn.execute(text(""" CREATE TABLE inventory ( product_id INTEGER PRIMARY KEY, product_name TEXT NOT NULL, quantity INTEGER NOT NULL, version INTEGER DEFAULT 1 ) """)) conn.execute(text(""" INSERT INTO inventory (product_id, product_name, quantity) VALUES (1, '热门商品', 100) """)) conn.commit() return engine def test_concurrent_inventory_deduction(concurrency_engine): """测试并发库存扣减""" engine = concurrency_engine num_threads = 10 deduction_per_thread = 5 errors = [] lock = threading.Lock() def deduct_stock(thread_id): try: with engine.connect() as conn: conn.execute(text("BEGIN")) result = conn.execute( text("SELECT quantity FROM inventory WHERE product_id = 1") ).fetchone() current_qty = result[0] if current_qty >= deduction_per_thread: conn.execute( text("UPDATE inventory SET quantity = quantity - :qty WHERE product_id = 1"), {"qty": deduction_per_thread} ) conn.execute(text("COMMIT")) return f"Thread {thread_id}: 扣减成功,剩余{current_qty - deduction_per_thread}" else: conn.execute(text("ROLLBACK")) return f"Thread {thread_id}: 库存不足,当前{current_qty}" except Exception as e: with lock: errors.append(e) return f"Thread {thread_id}: 错误 - {e}" with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [ executor.submit(deduct_stock, i) for i in range(num_threads) ] results = [f.result() for f in as_completed(futures)] # 验证最终库存 with engine.connect() as conn: final_qty = conn.execute( text("SELECT quantity FROM inventory WHERE product_id = 1") ).fetchone()[0] # 如果没有死锁,最终库存应该是: 100 - (成功扣减的线程数 * 5) assert final_qty >= 0, "库存不应为负数" expected_min = 100 - num_threads * deduction_per_thread assert final_qty >= expected_min print(f"最终库存: {final_qty}") def test_concurrent_insert_unique_constraint(concurrency_engine): """测试并发插入的唯一约束""" engine = concurrency_engine num_threads = 5 successful_inserts = 0 def insert_user(thread_id): try: with engine.connect() as conn: conn.execute( text("INSERT INTO users (username, email) VALUES (:name, :email)"), {"name": f"concurrent_user", "email": f"concurrent@example.com"} ) conn.commit() return True except Exception: conn.execute(text("ROLLBACK")) return False with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [ executor.submit(insert_user, i) for i in range(num_threads) ] results = [f.result() for f in as_completed(futures)] successful = sum(1 for r in results if r) assert successful <= 1, "唯一约束应该只允许一个插入成功"

锁与死锁检测

数据库锁机制是保证并发数据一致性的基础,但不当的锁使用会导致死锁。死锁是指两个或多个事务相互等待对方释放锁,导致所有涉及的事务都无法继续执行。数据库系统通常有死锁检测机制,会选择一个事务作为牺牲品回滚,以解除死锁。在测试中,我们需要验证应用在死锁发生时的行为是否符合预期,包括重试机制的正确性和回滚操作的完整性。

import pytest import threading import time from sqlalchemy import text from concurrent.futures import ThreadPoolExecutor def test_deadlock_detection(concurrency_engine): """测试死锁检测和回滚""" engine = concurrency_engine barrier = threading.Barrier(2, timeout=10) def transaction_1(): try: with engine.connect() as conn: conn.execute(text("BEGIN")) conn.execute( text("UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 1") ) barrier.wait(timeout=5) # 尝试获取另一个资源(模拟交叉锁) conn.execute( text("UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2") ) conn.execute(text("COMMIT")) return "T1 成功" except Exception as e: return f"T1 失败: {e}" def transaction_2(): try: with engine.connect() as conn: conn.execute(text("BEGIN")) conn.execute( text("UPDATE inventory SET quantity = quantity - 20 WHERE product_id = 2") ) barrier.wait(timeout=5) # 尝试获取事务1已锁定的资源 conn.execute( text("UPDATE inventory SET quantity = quantity + 20 WHERE product_id = 1") ) conn.execute(text("COMMIT")) return "T2 成功" except Exception as e: return f"T2 失败: {e}" with ThreadPoolExecutor(max_workers=2) as executor: f1 = executor.submit(transaction_1) f2 = executor.submit(transaction_2) r1 = f1.result() r2 = f2.result() # SQLite死锁时,一个事务会成功,另一个会失败 print(f"事务1结果: {r1}") print(f"事务2结果: {r2}") def test_pessimistic_lock_with_select_for_update(concurrency_engine): """测试悲观锁(SELECT FOR UPDATE)""" engine = concurrency_engine results = [] barrier = threading.Barrier(2, timeout=10) def locked_transaction(thread_id): try: with engine.connect() as conn: conn.execute(text("BEGIN")) # 使用悲观锁锁定行 result = conn.execute( text("SELECT quantity FROM inventory WHERE product_id = 1 FOR UPDATE") ).fetchone() current = result[0] time.sleep(0.5) # 模拟业务处理时间 # 基于锁定值更新 conn.execute( text("UPDATE inventory SET quantity = :qty WHERE product_id = 1"), {"qty": current - 10} ) conn.execute(text("COMMIT")) results.append(f"Thread {thread_id}: 成功更新") except Exception as e: results.append(f"Thread {thread_id}: 错误 - {e}") threads = [ threading.Thread(target=locked_transaction, args=(1,)), threading.Thread(target=locked_transaction, args=(2,)), ] for t in threads: t.start() for t in threads: t.join() # 验证最终库存 with engine.connect() as conn: final = conn.execute( text("SELECT quantity FROM inventory WHERE product_id = 1") ).fetchone()[0] # 两个线程各扣减10,总共扣减20 assert final == 80 # 100 - 20

连接池测试

连接池是数据库性能优化的重要组成部分,它重用数据库连接以减少连接创建和销毁的开销。在测试中,我们需要验证连接池的配置是否正确,包括最大连接数、超时设置、连接回收策略等。特别是当并发请求超过连接池大小时,应用的行为应该符合预期——可能会等待空闲连接或抛出连接超时异常。此外,连接泄露(即连接使用后未正确归还到连接池)也是需要测试的关键场景。

import pytest from sqlalchemy import create_engine, text from sqlalchemy.exc import TimeoutError import threading import time def test_connection_pool_exhaustion(): """测试连接池耗尽时的行为""" # 创建连接池大小为3的引擎 engine = create_engine( "sqlite:///:memory:", pool_size=3, max_overflow=0, pool_timeout=2 ) # 创建表 with engine.connect() as conn: conn.execute(text("CREATE TABLE test_pool (id INTEGER)")) conn.commit() connections = [] errors = [] def acquire_connection(thread_id): try: conn = engine.connect() with conn: connections.append(conn) time.sleep(3) # 长时间持有连接 except TimeoutError as e: errors.append(f"Thread {thread_id}: 连接超时") except Exception as e: errors.append(f"Thread {thread_id}: {e}") # 启动4个线程(超过连接池大小) threads = [ threading.Thread(target=acquire_connection, args=(i,)) for i in range(4) ] for t in threads: t.start() for t in threads: t.join() # 应该至少有一个线程无法获取连接 assert len(errors) > 0, "连接池应该被耗尽" # 清理 for conn in connections: conn.close() engine.dispose() def test_connection_pool_reuse(): """测试连接池重用""" import sqlalchemy from sqlalchemy import event engine = create_engine( "sqlite:///:memory:", pool_size=2, max_overflow=0, pool_class=sqlalchemy.pool.QueuePool ) connect_count = 0 @event.listens_for(engine, "connect") def count_connections(dbapi_con, con_record): nonlocal connect_count connect_count += 1 with engine.connect() as conn1: with engine.connect() as conn2: pass # 连接被归还到连接池后,新获取的连接应该重用 with engine.connect() as conn3: with engine.connect() as conn4: pass # 理论上只创建了2个连接(连接池大小为2) assert connect_count == 2 engine.dispose()

九、实战案例

理论知识需要通过实际项目案例来巩固。本章将通过两个完整的实战案例——用户系统数据层测试和订单数据库完整测试套件——展示如何将前面所学的内容综合应用到真实的测试场景中。每个案例都包含了从测试架构设计到具体测试用例实现的完整过程,涵盖了fixture组织、数据工厂定义、核心业务操作测试和边界条件覆盖等各个方面。

案例一:用户系统数据层测试

用户系统是几乎所有应用的基础模块,其数据层测试具有典型性和代表性。用户系统的核心操作包括用户注册、用户信息查询、用户状态管理和用户删除。在测试中,我们需要覆盖用户名的唯一性约束、邮箱格式验证、密码加密存储、用户状态的正确转换(如激活、禁用、删除)以及用户关联数据(如用户角色、权限)的完整生命周期。

import pytest from sqlalchemy import ( create_engine, Column, Integer, String, DateTime, Boolean, Enum as SAEnum, ForeignKey, Text ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from datetime import datetime from werkzeug.security import generate_password_hash, check_password_hash Base = declarative_base() class Role(Base): __tablename__ = "roles" id = Column(Integer, primary_key=True) name = Column(String(50), unique=True, nullable=False) description = Column(String(200)) class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) username = Column(String(100), unique=True, nullable=False) email = Column(String(200), unique=True, nullable=False) password_hash = Column(String(256), nullable=False) role_id = Column(Integer, ForeignKey("roles.id")) is_active = Column(Boolean, default=True) is_deleted = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) last_login = Column(DateTime, nullable=True) role = relationship("Role") def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) class UserService: """用户服务层""" def __init__(self, session): self.session = session def register_user(self, username, email, password, role_name="user"): """用户注册""" existing = self.session.query(User).filter( (User.username == username) | (User.email == email) ).first() if existing: raise ValueError("用户名或邮箱已存在") role = self.session.query(Role).filter(Role.name == role_name).first() if not role: raise ValueError(f"角色 {role_name} 不存在") user = User(username=username, email=email, role=role) user.set_password(password) self.session.add(user) self.session.flush() return user def get_active_user(self, username): """获取活跃用户""" return self.session.query(User).filter( User.username == username, User.is_active == True, User.is_deleted == False ).first() def deactivate_user(self, user_id): """禁用用户""" user = self.session.get(User, user_id) if not user: raise ValueError("用户不存在") user.is_active = False self.session.flush() return user def delete_user_soft(self, user_id): """软删除用户""" user = self.session.get(User, user_id) if not user: raise ValueError("用户不存在") user.is_deleted = True user.is_active = False self.session.flush() return user @pytest.fixture def user_db_session(): """用户测试数据库会话""" engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) connection = engine.connect() transaction = connection.begin() session = sessionmaker(bind=connection)() # 创建基础角色 admin_role = Role(name="admin", description="管理员") user_role = Role(name="user", description="普通用户") session.add_all([admin_role, user_role]) session.flush() yield session session.close() transaction.rollback() connection.close() engine.dispose() def test_user_registration_success(user_db_session): """测试用户注册成功""" service = UserService(user_db_session) user = service.register_user("newuser", "new@example.com", "SecurePass123") assert user.id is not None assert user.username == "newuser" assert user.is_active == True assert user.check_password("SecurePass123") == True def test_user_registration_duplicate_username(user_db_session): """测试重复用户名注册被拒绝""" service = UserService(user_db_session) service.register_user("existing", "existing@example.com", "Pass123") with pytest.raises(ValueError, match="用户名或邮箱已存在"): service.register_user("existing", "other@example.com", "Pass456") def test_user_deactivation(user_db_session): """测试用户禁用""" service = UserService(user_db_session) user = service.register_user("todeactivate", "deact@example.com", "Pass123") service.deactivate_user(user.id) # 验证禁用后无法通过活跃用户查询找到 found = service.get_active_user("todeactivate") assert found is None def test_soft_delete_user(user_db_session): """测试用户软删除""" service = UserService(user_db_session) user = service.register_user("todelete", "delete@example.com", "Pass123") service.delete_user_soft(user.id) assert user.is_deleted == True assert user.is_active == False

案例二:订单数据库完整测试套件

订单系统是电商应用的核心模块,涉及复杂的数据库操作和业务规则。一个完整的订单测试套件需要覆盖:订单创建(包括商品库存扣减、订单项创建、价格计算)、订单状态流转(待支付、已支付、已发货、已完成、已取消)、订单查询(按用户、按状态、按时间范围、分页)、以及异常场景(库存不足、重复支付、订单超时取消)。通过系统化的测试,确保订单系统的数据一致性和业务逻辑正确性。

import pytest from dataclasses import dataclass from decimal import Decimal from datetime import datetime, timedelta from sqlalchemy import ( create_engine, Column, Integer, String, Float, DateTime, ForeignKey, Enum as SAEnum, Text ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship import enum OrderStatus = enum.Enum("OrderStatus", "PENDING PAID SHIPPED COMPLETED CANCELLED") Base = declarative_base() class Product(Base): __tablename__ = "products" id = Column(Integer, primary_key=True) name = Column(String(200), nullable=False) price = Column(Float, nullable=False) stock = Column(Integer, default=0) class OrderORM(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True) user_id = Column(Integer, nullable=False) status = Column(String(20), default="PENDING") total_amount = Column(Float, default=0.0) shipping_address = Column(Text) created_at = Column(DateTime, default=datetime.utcnow) paid_at = Column(DateTime, nullable=True) items = relationship("OrderItem", back_populates="order") class OrderItem(Base): __tablename__ = "order_items" id = Column(Integer, primary_key=True) order_id = Column(Integer, ForeignKey("orders.id")) product_id = Column(Integer, ForeignKey("products.id")) product_name = Column(String(200)) quantity = Column(Integer, nullable=False) unit_price = Column(Float, nullable=False) order = relationship("OrderORM", back_populates="items") class OrderService: """订单服务""" def __init__(self, session): self.session = session def create_order(self, user_id, items_data, shipping_address=""): """创建订单""" total = 0.0 order_items = [] for item in items_data: product = self.session.get(Product, item["product_id"]) if not product: raise ValueError(f"商品 {item['product_id']} 不存在") if product.stock < item["quantity"]: raise ValueError(f"商品 {product.name} 库存不足") # 扣减库存 product.stock -= item["quantity"] item_total = product.price * item["quantity"] total += item_total order_items.append(OrderItem( product_id=product.id, product_name=product.name, quantity=item["quantity"], unit_price=product.price )) order = OrderORM( user_id=user_id, status="PENDING", total_amount=total, shipping_address=shipping_address, items=order_items ) self.session.add(order) self.session.flush() return order def pay_order(self, order_id): """支付订单""" order = self.session.get(OrderORM, order_id) if not order: raise ValueError("订单不存在") if order.status != "PENDING": raise ValueError("订单状态不允许支付") order.status = "PAID" order.paid_at = datetime.utcnow() self.session.flush() return order def cancel_order(self, order_id): """取消订单(恢复库存)""" order = self.session.get(OrderORM, order_id) if not order: raise ValueError("订单不存在") if order.status not in ("PENDING", "PAID"): raise ValueError("当前状态不允许取消") # 恢复库存 for item in order.items: product = self.session.get(Product, item.product_id) if product: product.stock += item.quantity order.status = "CANCELLED" self.session.flush() return order def get_user_orders(self, user_id, status=None, page=1, page_size=10): """分页查询用户订单""" query = self.session.query(OrderORM).filter( OrderORM.user_id == user_id ) if status: query = query.filter(OrderORM.status == status) query = query.order_by(OrderORM.created_at.desc()) offset = (page - 1) * page_size return query.offset(offset).limit(page_size).all() @pytest.fixture def order_db_session(): """订单测试数据库会话""" engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) connection = engine.connect() transaction = connection.begin() session = sessionmaker(bind=connection)() # 创建测试商品 products = [ Product(name="Python编程从入门到实践", price=89.00, stock=100), Product(name="机械键盘", price=299.00, stock=50), Product(name="无线鼠标", price=129.00, stock=30), Product(name="显示器27寸", price=1999.00, stock=10), ] session.add_all(products) session.flush() yield session session.close() transaction.rollback() connection.close() def test_create_order_success(order_db_session): """测试订单创建成功""" service = OrderService(order_db_session) product = order_db_session.query(Product).first() order = service.create_order( user_id=1, items_data=[{"product_id": product.id, "quantity": 2}], shipping_address="上海市浦东新区" ) assert order.id is not None assert order.status == "PENDING" assert len(order.items) == 1 assert order.items[0].quantity == 2 # 验证库存已扣减 updated_product = order_db_session.get(Product, product.id) assert updated_product.stock == 98 # 100 - 2 def test_create_order_insufficient_stock(order_db_session): """测试库存不足时订单创建失败""" service = OrderService(order_db_session) product = order_db_session.query(Product).filter( Product.stock == 10 # 显示器,只有10台 ).first() with pytest.raises(ValueError, match="库存不足"): service.create_order( user_id=1, items_data=[{"product_id": product.id, "quantity": 20}] ) def test_order_payment_and_cancellation(order_db_session): """测试订单支付和取消的完整流程""" service = OrderService(order_db_session) product = order_db_session.query(Product).first() initial_stock = product.stock # 创建订单 order = service.create_order( user_id=1, items_data=[{"product_id": product.id, "quantity": 3}] ) assert order_db_session.get(Product, product.id).stock == initial_stock - 3 # 支付订单 paid = service.pay_order(order.id) assert paid.status == "PAID" assert paid.paid_at is not None # 取消订单(恢复库存) cancelled = service.cancel_order(order.id) assert cancelled.status == "CANCELLED" assert order_db_session.get(Product, product.id).stock == initial_stock def test_query_orders_with_pagination(order_db_session): """测试订单分页查询""" service = OrderService(order_db_session) product = order_db_session.query(Product).first() # 创建多个订单 for i in range(5): service.create_order( user_id=1, items_data=[{"product_id": product.id, "quantity": 1}] ) # 分页查询 page1 = service.get_user_orders(user_id=1, page=1, page_size=2) assert len(page1) == 2 page2 = service.get_user_orders(user_id=1, page=2, page_size=2) assert len(page2) == 2 page3 = service.get_user_orders(user_id=1, page=3, page_size=2) assert len(page3) == 1 # 所有订单ID不能重复 all_ids = [o.id for o in page1 + page2 + page3] assert len(all_ids) == len(set(all_ids))

测试套件集成运行

将以上所有测试组织成一个完整的测试套件,可以使用pytest的mark机制进行分类和筛选。例如,使用@pytest.mark.unit标记单元测试、@pytest.mark.integration标记集成测试、@pytest.mark.concurrent标记并发测试。通过pytest的命令行选项可以灵活选择要运行的测试类别,实现分层测试策略。在CI/CD流水线中,建议先运行单元测试层(快速反馈),再运行集成测试层(验证真实行为),最后运行并发和压力测试(性能验证)。

# pytest.ini 配置示例 """ [pytest] markers = unit: 单元测试(使用SQLite内存数据库) integration: 集成测试(使用PostgreSQL测试容器) concurrent: 并发测试 migration: 数据库迁移测试 slow: 慢速测试(标记后默认跳过) testpaths = tests/database """ # 运行命令示例 """ # 只运行单元测试 pytest -m unit -v # 运行所有数据库测试(跳过慢速测试) pytest tests/database/ -v # 运行集成测试和并发测试 pytest -m "integration or concurrent" -v # 生成测试覆盖率报告 pytest --cov=app.models --cov=app.services --cov-report=html tests/database/ # 并行运行测试 pytest -n auto tests/database/ -v """ # 测试组织示例 """ tests/database/ ├── conftest.py # 共享fixture定义 ├── test_sqlite_basics.py # SQLite基础测试 ├── test_sqlalchemy_crud.py # SQLAlchemy CRUD测试 ├── test_transactions.py # 事务测试 ├── test_migrations.py # 迁移测试 ├── test_concurrent.py # 并发测试 ├── test_factories.py # 工厂测试 ├── factories.py # 工厂类定义 ├── test_user_service.py # 用户服务测试 └── test_order_service.py # 订单服务测试 """

最佳实践总结:数据库测试的核心要诀是"快、准、稳"。快——使用内存数据库和事务回滚确保测试速度;准——使用测试容器确保真实环境一致性;稳——通过完善的fixture管理和数据清理策略确保测试稳定可靠。分层测试、关注隔离、覆盖边界是写好数据库测试的三个关键原则。