Python与数据库交互

Python进阶编程专题 · Python数据库编程全面指南

专题:Python进阶编程系统学习

关键词:Python, 数据库, SQLite, SQLAlchemy, ORM, 事务, 连接池, Alembic

一、概述

在现代应用开发中,数据库交互是不可或缺的核心能力。Python 凭借其简洁的语法和丰富的生态,提供了从底层 DB-API 到高层 ORM 框架的完整数据库编程解决方案。无论您是在构建 Web 应用、数据分析管道、还是自动化脚本,掌握 Python 数据库编程都将大幅提升您的开发效率。

Python 的数据库交互生态主要分为三个层次:底层数据库驱动(如 sqlite3、psycopg2、pymysql)、中层 DB-API 2.0 兼容层、以及高层 ORM 框架(如 SQLAlchemy、Django ORM)。理解每一层的工作原理和适用场景,能够帮助开发者在不同项目中做出最优的技术选型。

本文将系统性地讲解 Python 数据库交互的各个方面:从 Python 标准库中的 sqlite3 模块开始,逐步深入 SQLAlchemy 核心与 ORM、数据库连接池管理、事务控制、异步数据库访问、SQL 注入防范,以及使用 Alembic 进行数据库迁移管理。每个部分均配有可独立运行的代码示例,帮助读者在实践中掌握这些技术。

二、DB-API 2.0 标准

要点:PEP 249 定义的 Python 数据库 API 规范。所有主流数据库驱动(sqlite3、psycopg2、MySQL Connector、cx_Oracle)均遵循该标准,提供统一的接口。

2.1 核心接口

DB-API 2.0 定义了数据库交互的核心对象和协议。Connection 对象代表与数据库的一个连接会话,通过连接字符串或连接参数创建。Cursor 对象是执行 SQL 语句和获取结果的核心接口。使用 with 语句管理 Connection 和 Cursor 的上下文是一种推荐的最佳实践,能够确保资源被正确释放。

2.2 连接与游标生命周期

以下代码展示了使用 DB-API 2.0 标准的典型模式——建立连接、获取游标、执行 SQL、处理结果、清理资源。这种模式在所有遵循该标准的数据库驱动中基本一致。

import sqlite3 # 建立连接 - 所有遵循 DB-API 2.0 的驱动都支持类似模式 conn = sqlite3.connect('example.db') # 创建游标 cursor = conn.cursor() # 执行 SQL cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") # 获取单行 row = cursor.fetchone() # 获取全部结果 all_rows = cursor.fetchall() # 获取指定数量的行 some_rows = cursor.fetchmany(size=5) # 关闭游标和连接 cursor.close() conn.close()

2.3 常用方法速查表

方法描述返回值
cursor.execute(sql, params)执行单条 SQL 语句Cursor 自身
cursor.executemany(sql, seq_of_params)批量执行 SQL(参数为序列)Cursor 自身
cursor.fetchone()获取结果集的下一行单行 tuple / None
cursor.fetchmany(size)获取最多 size 行tuple 列表
cursor.fetchall()获取所有剩余行tuple 列表
conn.commit()提交当前事务None
conn.rollback()回滚当前事务None
cursor.description列名和元信息tuple 序列
cursor.rowcount最近操作影响的行数int

最佳实践:始终使用 with conn: 上下文管理器——当块内代码正常执行完毕时会自动 commit,抛出异常时会自动 rollback。同时建议使用 with conn.cursor() as cur 确保游标被正确关闭。

三、SQLite3 内建数据库

SQLite3 是 Python 标准库中内置的轻量级数据库引擎,无需任何外部依赖即可使用。它以其零配置、服务端无进程、数据库文件单一的特点,成为嵌入式系统和单机应用的理想选择。Python 的 sqlite3 模块自 2.5 版本起被纳入标准库,完美实现了 DB-API 2.0 规范。

3.1 连接与建表

使用 sqlite3.connect() 可以连接到一个 SQLite 数据库文件。如果文件不存在,SQLite 会自动创建。使用内存数据库(":memory:")可以提升测试场景下的性能。

import sqlite3 # 连接数据库(自动创建文件) conn = sqlite3.connect('shop.db') # 使用内存数据库(测试用) # conn = sqlite3.connect(':memory:') cursor = conn.cursor() # 创建表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, email TEXT NOT NULL, age INTEGER DEFAULT 18, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit()

3.2 完整的 CRUD 操作

以下示例展示了使用 sqlite3 模块完成标准的增删改查(CRUD)操作。请注意,所有用户输入的操作都应使用参数化查询(问号占位符),这是防范 SQL 注入的第一道防线。

# ---------- CREATE ---------- cursor.execute( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", ('alice', 'alice@example.com', 28) ) conn.commit() print(f"插入用户 ID: {cursor.lastrowid}") # 批量插入 users = [ ('bob', 'bob@example.com', 32), ('carol', 'carol@example.com', 25), ] cursor.executemany( "INSERT INTO users (username, email, age) VALUES (?, ?, ?)", users ) conn.commit() # ---------- READ ---------- cursor.execute("SELECT * FROM users WHERE age > ?", (25,)) rows = cursor.fetchall() for row in rows: print(f"ID: {row[0]}, 用户名: {row[1]}, 邮箱: {row[2]}") # ---------- UPDATE ---------- cursor.execute( "UPDATE users SET age = ? WHERE username = ?", (29, 'alice') ) conn.commit() print(f"更新了 {cursor.rowcount} 行") # ---------- DELETE ---------- cursor.execute( "DELETE FROM users WHERE username = ?", ('carol',) ) conn.commit() print(f"删除了 {cursor.rowcount} 行")

3.3 行工厂(Row Factory)

默认情况下,fetch 返回的是元组(tuple),通过列索引访问数据可读性较差。sqlite3 提供了行工厂机制,可以将返回的行转换为字典风格的对象,支持通过列名访问数据。

# 启用行工厂 conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", ('alice',)) row = cursor.fetchone() # 通过列名访问(推荐) print(row['username']) print(row['email']) # 转换为字典 user_dict = dict(row) print(user_dict) # 获取列名元信息 print([desc[0] for desc in cursor.description])

场景提示:SQLite3 适用于单机应用、原型开发、测试环境和小型 Web 应用(并发请求 < 100)。对于高并发场景,建议迁移至 PostgreSQL 或 MySQL。

四、SQLAlchemy 核心与 ORM

SQLAlchemy 是 Python 社区中最成熟、最强大的数据库工具包。它提供了两套独立但可以协同工作的 API:SQLAlchemy Core(核心)提供 SQL 表达式语言,以 Python 代码的方式构建 SQL 语句;SQLAlchemy ORM 则在 Core 之上构建了完整的对象关系映射层,允许开发者通过 Python 对象直接操作数据库。

4.1 Engine 与连接管理

Engine 是 SQLAlchemy 的核心入口点,它封装了数据库连接池和方言(Dialect)信息。Engine 负责管理到数据库的实际连接,并将 SQL 语句转换为目标数据库的方言。

from sqlalchemy import create_engine, text # SQLite 内存数据库 engine = create_engine("sqlite+pysqlite:///:memory:", echo=True) # PostgreSQL(需安装 psycopg2) # engine = create_engine("postgresql+psycopg2://user:pass@localhost/dbname") # MySQL(需安装 pymysql) # engine = create_engine("mysql+pymysql://user:pass@localhost/dbname") # 使用 Engine 执行原生 SQL with engine.connect() as conn: result = conn.execute(text("SELECT 1")) print(result.scalar()) # 输出: 1

4.2 声明式 ORM 映射(Declarative Mapping)

SQLAlchemy 的声明式映射系统允许开发者使用 Python 类来定义数据库表结构。从 2.0 版本开始,声明式映射得到了全面改进,推荐使用 mapped_column() 替代旧式的 Column()

from typing import Optional, List from datetime import datetime from sqlalchemy import ForeignKey, String, Float, Text from sqlalchemy.orm import ( DeclarativeBase, Mapped, mapped_column, relationship, Session ) # 声明基类 class Base(DeclarativeBase): pass # 定义模型 class Category(Base): __tablename__ = "categories" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False) description: Mapped[Optional[str]] = mapped_column(Text) # 关系:一个分类有多个产品 products: Mapped[List["Product"]] = relationship(back_populates="category") def __repr__(self) -> str: return f"<Category(id={self.id}, name={self.name})>" class Product(Base): __tablename__ = "products" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(200), nullable=False) price: Mapped[float] = mapped_column(Float, nullable=False) stock: Mapped[int] = mapped_column(default=0) category_id: Mapped[int] = mapped_column(ForeignKey("categories.id")) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) category: Mapped["Category"] = relationship(back_populates="products") def __repr__(self) -> str: return f"<Product(id={self.id}, name={self.name}, price={self.price})>"

4.3 创建表与 Session 管理

Session 是 SQLAlchemy ORM 操作的核心管理器。它跟踪所有被加载或创建的对象,并在合适的时机将变更同步到数据库。Repository 模式可以进一步封装 Session 操作,使业务逻辑与数据库访问解耦。

# 创建所有表 Base.metadata.create_all(engine) # 创建 Session session = Session(engine) # 插入数据(ORM 风格) category = Category(name="电子设备", description="消费电子产品") session.add(category) session.flush() # 立即发送 SQL 但不提交(获取 auto-generated ID) product = Product( name="无线蓝牙耳机", price=299.00, stock=500, category_id=category.id ) session.add(product) session.commit() # 提交事务 # 查询数据(Query API,SQLAlchemy 1.x 风格) products = session.query(Product).filter(Product.price > 100).all() # 查询数据(2.0 风格——推荐) from sqlalchemy import select stmt = select(Product).where(Product.price > 100).order_by(Product.price.desc()) for product in session.scalars(stmt): print(product.name, product.price) # 更新数据 product.price = 249.00 session.commit() # 删除数据 session.delete(product) session.commit() session.close()

4.4 关系查询与预加载

在 ORM 中处理关联关系时,N+1 查询问题是常见的性能陷阱。SQLAlchemy 提供了多种预加载(Eager Loading)策略,可以在一次查询中获取关联数据,显著提升查询性能。

from sqlalchemy.orm import joinedload, selectinload # 懒加载(Lazy Loading)—— 默认行为,可能导致 N+1 查询 # 访问 product.category 时会额外发送查询 # 立即加载(Joined Load)—— 使用 LEFT JOIN stmt = select(Product).options(joinedload(Product.category)) products = session.scalars(stmt).unique().all() # 此时 product.category 已经在同一查询中加载,不会触发额外查询 # 子查询加载(Selectin Load)—— 使用 IN 子句,适合一对多关系 stmt = select(Category).options(selectinload(Category.products)) categories = session.scalars(stmt).unique().all() for cat in categories: print(f"{cat.name}: {[p.name for p in cat.products]}")

推荐方案(ORM 2.0 风格):
使用 select() 配合 session.scalars()
类型安全,与异步兼容,未来发展方向。

旧式方案(查询 1.x 风格):
使用 session.query(Model).filter()
仍然可用,但不推荐在新项目中使用。

五、数据库连接池

在 Web 应用和高频数据库交互场景中,频繁创建和销毁数据库连接的代价非常高。连接池机制通过维护一组可复用的连接,显著降低了连接创建的开销,同时限制了并发连接数,防止数据库过载。

5.1 SQLAlchemy 内置连接池

SQLAlchemy 的 Engine 默认集成了连接池。最常用的实现是 QueuePool,它维护一个固定大小的连接队列,工作线程从队列中获取连接,使用完后归还。

from sqlalchemy import create_engine, event from sqlalchemy.pool import QueuePool, NullPool, assertion # QueuePool —— 默认连接池,适用于大多数场景 engine = create_engine( "sqlite+pysqlite:///shop.db", poolclass=QueuePool, pool_size=5, # 池中保持的连接数 max_overflow=10, # 最大溢出连接数(超过 pool_size 后可额外创建的连接) pool_timeout=30, # 获取连接的超时时间(秒) pool_recycle=3600, # 连接回收时间(秒),防止连接被数据库关闭 pool_pre_ping=True, # 每次获取连接前发送 SELECT 1 检查连接有效性 ) # NullPool —— 无连接池,每次获取新连接,适合 SQLite :memory: engine = create_engine( "sqlite+pysqlite:///:memory:", poolclass=NullPool ) # 监听连接检入/检出事件 @event.listens_for(engine, "checkout") def receive_checkout(dbapi_conn, conn_record, conn_proxy): print(f"连接被检出: {dbapi_conn}")

连接池调优建议:pool_size 应设置为应用并发峰值的 70%-80%;max_overflow 应对突发流量;pool_pre_ping=True 可以有效避免使用已断开的陈旧连接;pool_recycle 建议小于数据库侧的连接超时时间(通常 MySQL 为 8 小时,建议设为 3600)。

5.2 自定义连接池

对于特定的应用场景,可以通过继承 Pool 基类来创建自定义的连接池。例如限制单例模式的连接池、记录连接使用统计的连接池等。

from sqlalchemy.pool import Pool import time class TimingPool(Pool): """记录连接获取时间的连接池""" def __init__(self, creator, **kwargs): super().__init__(creator, **kwargs) self.acquisition_times = [] def _create_connection(self): start = time.perf_counter() conn = super()._create_connection() elapsed = time.perf_counter() - start self.acquisition_times.append(elapsed) return conn

六、事务管理

事务是数据库操作的基本原子单位。Python 数据库编程中,事务管理主要分为自动提交模式和手动提交模式两种。SQLAlchemy 2.0 引入了更清晰的事务控制语义,特别是在嵌套事务和保存点(Savepoint)方面。

6.1 手动事务控制

在 SQLite3 中,当执行数据修改语句(INSERT、UPDATE、DELETE)时,事务会自动开始。用户需要显式调用 commit() 或 rollback() 来完成事务。在 SQLAlchemy 2.0 中,推荐使用连接对象的上下文管理器来处理事务。

# SQLite3 手动事务 conn = sqlite3.connect('shop.db') try: cursor = conn.cursor() cursor.execute("UPDATE users SET age = age + 1 WHERE username = ?", ('alice',)) cursor.execute("UPDATE users SET age = age - 1 WHERE username = ?", ('bob',)) conn.commit() # 两笔更新作为一个原子事务 except Exception as e: conn.rollback() # 出错时回滚全部变更 print(f"事务回滚: {e}") finally: conn.close()

6.2 SQLAlchemy 事务与 Savepoint

SQLAlchemy 2.0 使用 connection.begin()connection.begin_nested() 管理事务。嵌套事务通过数据库的保存点(SAVEPOINT)机制实现,允许在事务内部回滚到特定点,而不影响整个事务。

from sqlalchemy import create_engine, text engine = create_engine("sqlite+pysqlite:///shop.db") # 简洁的自动事务管理(推荐) with engine.connect() as conn: with conn.begin(): conn.execute(text("UPDATE users SET age = age + 1 WHERE id = 1")) # 嵌套事务——使用 Savepoint with engine.connect() as conn: with conn.begin() as transaction: conn.execute(text("INSERT INTO users (username) VALUES ('temp1')")) # 创建保存点 with conn.begin_nested(): conn.execute(text("INSERT INTO users (username) VALUES ('temp2')")) conn.execute(text("INSERT INTO users (username) VALUES ('temp3')")) # 如果这里出错,内部 savepoint 回滚,但外部事务不受影响 conn.execute(text("INSERT INTO users (username) VALUES ('temp4')")) # 事务提交后:temp1, temp2, temp3, temp4 全部存在

注意:SQLite3 默认不支持嵌套事务。SQLAlchemy 通过 SAVEPOINT 模拟了嵌套事务的行为,但底层 SQLite 的保存点具有一定的限制(例如不支持 DDL 语句的回滚)。建议在 PostgreSQL 中使用真正的嵌套事务。

6.3 ORM Session 事务边界

在 SQLAlchemy ORM 中,Session 本身就是一个事务边界管理器。理解 Session 的刷新(flush)和提交(commit)之间的区别至关重要。flush 将对象状态同步到数据库(发送 SQL),而 commit 则持久化变更并结束事务。

# Session 事务管理 session = Session(engine) try: user = User(name="新用户", email="new@example.com") session.add(user) session.flush() # 发送 INSERT,获取 user.id,但未提交 # 可以利用 user.id 做后续操作 profile = Profile(user_id=user.id, bio="简介") session.add(profile) session.commit() # 提交整个事务 except Exception: session.rollback() # 回滚全部 raise finally: session.close()

七、异步数据库访问

随着异步编程在 Python 中的普及(FastAPI、Tornado、aiohttp 等框架),异步数据库访问成为高性能应用的关键需求。Python 生态提供了多种异步数据库驱动,其中最流行的是 aiosqlite、asyncpg(PostgreSQL 专用)和 SQLAlchemy 的异步扩展 sqlalchemy.asyncio。

7.1 aiosqlite

aiosqlite 是对 sqlite3 的异步封装,提供与标准 sqlite3 模块类似的接口,但所有 I/O 操作都是非阻塞的。非常适合在异步 Web 应用中使用 SQLite 的场景。

import asyncio import aiosqlite async def main(): async with aiosqlite.connect("shop.db") as db: async with db.execute( "SELECT username, email FROM users WHERE age > ?", (25,) ) as cursor: async for row in cursor: print(f"用户名: {row[0]}, 邮箱: {row[1]}") asyncio.run(main())

7.2 asyncpg(PostgreSQL)

asyncpg 是目前最快的 Python PostgreSQL 驱动,专为异步编程设计。它直接构建在 PostgreSQL 的二进制协议之上,跳过了传统驱动中的文本协议解析层,在性能上具有显著优势。

import asyncio import asyncpg async def main(): conn = await asyncpg.connect( user='user', password='password', database='dbname', host='localhost' ) # 准备语句(Prepared Statement) stmt = await conn.prepare("SELECT username, email FROM users WHERE age >= $1") rows = await stmt.fetch(18) for row in rows: print(f"用户名: {row['username']}") # 批量插入 await conn.executemany( "INSERT INTO users (username, email, age) VALUES ($1, $2, $3)", [('user1', 'u1@test.com', 20), ('user2', 'u2@test.com', 30)] ) await conn.close() asyncio.run(main())

7.3 SQLAlchemy 异步引擎

SQLAlchemy 1.4 起引入了对 asyncio 的完整支持。通过 create_async_engine 创建异步引擎,配合 AsyncSession 进行 ORM 操作。这使得在 FastAPI 等异步框架中可以使用完整的 SQLAlchemy ORM 功能。

from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker ) from sqlalchemy.orm import selectinload # 创建异步引擎(注意 URL 前缀:sqlite+aiosqlite://) async_engine = create_async_engine( "sqlite+aiosqlite:///shop.db", echo=True ) # 异步 Session 工厂 AsyncSessionLocal = async_sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False ) async def get_product_count(): async with AsyncSessionLocal() as session: from sqlalchemy import select, func result = await session.execute(select(func.count()).select_from(Product)) count = result.scalar() return count # 在 FastAPI 中使用 # @app.get("/products/count") # async def product_count(): # count = await get_product_count() # return {"count": count}

八、SQL 注入防范

安全警示:SQL 注入是最常见也最危险的 Web 安全漏洞之一。永远不要相信用户输入,永远不要通过字符串拼接构建 SQL 语句。

8.1 错误的做法

以下代码展示了绝对不要使用的做法——通过 f-string 或 format 将用户输入直接插入 SQL 字符串。攻击者可以通过输入恶意字符串操纵查询逻辑,进而窃取数据、破坏数据库甚至获取系统权限。

# 危险的写法!永远不要这样做! username = "alice' OR '1'='1" # 恶意输入 cursor.execute(f"SELECT * FROM users WHERE username = '{username}'") # 实际执行的 SQL: # SELECT * FROM users WHERE username = 'alice' OR '1'='1' # -> 返回所有用户! # 更恶意的输入 username = "'; DROP TABLE users; --" cursor.execute(f"SELECT * FROM users WHERE username = '{username}'") # -> users 表被删除!

8.2 正确的做法:参数化查询

参数化查询将 SQL 语句与参数数据分离,数据库驱动会自动对参数进行转义和引用,从根本上杜绝了 SQL 注入的可能性。所有遵循 DB-API 2.0 的驱动都支持参数化查询。

# 方式一:DB-API 2.0 问号占位符(推荐,适用于 sqlite3, psycopg2 等) cursor.execute( "SELECT * FROM users WHERE username = ? AND age > ?", (username, age) ) # 方式二:命名参数(适用于 sqlite3, 更清晰) cursor.execute( "SELECT * FROM users WHERE username = :name AND age > :min_age", {"name": username, "min_age": age} ) # 方式三:%s 占位符(MySQL Connector/Python 风格) cursor.execute( "SELECT * FROM users WHERE username = %s AND age > %s", (username, age) ) # 方式四:$1 占位符(asyncpg 风格) await conn.fetchrow( "SELECT * FROM users WHERE username = $1 AND age > $2", username, age )

8.3 ORM 的天然防护

使用 SQLAlchemy ORM 时,ORM 框架在底层使用参数化查询,天然提供了 SQL 注入防护。但需要注意的是,在 ORM 中使用原生 SQL 或 text() 时仍需谨慎。

# ORM 查询 —— 自动参数化,安全 stmt = select(User).where(User.username == user_input) result = session.execute(stmt) # text() —— 使用绑定参数,安全 stmt = text("SELECT * FROM users WHERE username = :name") result = session.execute(stmt, {"name": user_input}) # text() 中慎用 format() —— 危险! stmt = text(f"SELECT * FROM users WHERE username = '{user_input}'") # 仍然存在注入风险!应使用绑定参数替代

危险做法:
字符串拼接 / f-string / format
f"SELECT * FROM t WHERE x = '{input}'"

安全做法:
参数化查询 / 绑定变量
"SELECT * FROM t WHERE x = ?"

九、数据库迁移(Alembic)

随着应用的发展,数据库 Schema 的变更不可避免。Alembic 是 SQLAlchemy 团队开发的数据库迁移工具,能够以版本化、可追溯的方式管理数据库 Schema 变更。每次 Schema 变更对应一个迁移脚本,可以向上(upgrade)应用变更,也可以向下(downgrade)回滚变更。

9.1 初始化与配置

Alembic 使用 alembic init 命令初始化迁移环境,生成配置文件和迁移脚本目录。核心配置文件 alembic.ini 包含数据库连接地址等配置。env.py 文件定义了 Alembic 如何与您的 SQLAlchemy 模型关联。

# 终端命令 # pip install alembic # alembic init alembic # alembic.ini 中设置数据库 URL # sqlalchemy.url = sqlite:///shop.db # env.py 中设置 Target Metadata # from your_models import Base # target_metadata = Base.metadata

9.2 生成迁移脚本

使用 alembic revision --autogenerate 可以自动检测模型定义与当前数据库 Schema 之间的差异,并生成对应的迁移脚本。在自动生成后,始终需要审查生成的脚本,确认变更的准确性。

# 创建迁移脚本 # alembic revision --autogenerate -m "add_product_table" # 生成的迁移脚本示例 """add product table Revision ID: a1b2c3d4e5f6 Revises: Create Date: 2026-05-05 22:50:00.000000 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa # revision identifiers revision: str = 'a1b2c3d4e5f6' down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( 'products', sa.Column('id', sa.Integer(), primary_key=True), sa.Column('name', sa.String(200), nullable=False), sa.Column('price', sa.Float(), nullable=False), sa.Column('stock', sa.Integer(), server_default='0'), ) def downgrade() -> None: op.drop_table('products')

9.3 应用与回滚

通过简单的命令即可完成迁移的应用和回滚。始终在应用迁移前备份数据库(尤其是在生产环境中)。

# 应用所有待定迁移(最新版本) # alembic upgrade head # 回滚到指定版本 # alembic downgrade a1b2c3d4e5f6 # 查看迁移历史 # alembic history # 查看当前版本 # alembic current # 升级指定版本数 # alembic upgrade +2 # 回滚指定版本数 # alembic downgrade -1

迁移最佳实践:每个迁移脚本只做一件事(如:添加一个表、修改一个列),便于回滚;生产环境的迁移必须先经过测试环境验证;迁移脚本一旦提交到版本控制后不应修改(使用新的迁移修正);使用 op.get_bind() 可以在迁移中执行数据迁移脚本。

9.4 复杂迁移:数据迁移

有时 Schema 变更需要伴随数据迁移,例如拆分列、合并列、或转换数据格式。Alembic 的 upgrade() 函数可以在 Schema 变更之后执行数据操作。

def upgrade() -> None: # Schema 变更:新增 full_name 列 op.add_column('users', sa.Column('full_name', sa.String(200))) # 数据迁移:将 first_name + ' ' + last_name 合并为 full_name op.execute(""" UPDATE users SET full_name = first_name || ' ' || last_name WHERE full_name IS NULL """) # Schema 变更:设置 full_name 为 NOT NULL op.alter_column('users', 'full_name', existing_type=sa.String(200), nullable=False)

十、综合示例:构建简单的产品 API

以下综合示例整合了 SQLAlchemy ORM、Session 管理、关系查询和异步数据库访问等知识点,展示如何构建一个完整的产品数据访问层(DAL)。该模式可以轻松集成到 FastAPI 或 Flask 应用中。

from __future__ import annotations from typing import Optional, List from datetime import datetime from dataclasses import dataclass from sqlalchemy import ( create_engine, String, Float, Integer, Text, ForeignKey, select, func ) from sqlalchemy.orm import ( DeclarativeBase, Mapped, mapped_column, relationship, Session, joinedload ) # ---- 模型定义 ---- class Base(DeclarativeBase): pass class Category(Base): __tablename__ = "categories" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True) products: Mapped[List["Product"]] = relationship(back_populates="category") class Product(Base): __tablename__ = "products" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(200)) price: Mapped[float] = mapped_column(Float) stock: Mapped[int] = mapped_column(Integer, default=0) category_id: Mapped[int] = mapped_column(ForeignKey("categories.id")) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) category: Mapped["Category"] = relationship(back_populates="products") # ---- 数据访问层 ---- engine = create_engine("sqlite:///shop.db", echo=True) Base.metadata.create_all(engine) class ProductRepository: """产品数据访问仓库""" def __init__(self, session: Session): self.session = session def get_by_id(self, product_id: int) -> Optional[Product]: return self.session.get(Product, product_id) def search( self, min_price: float = 0, max_price: float = float('inf'), category_id: Optional[int] = None, offset: int = 0, limit: int = 20 ) -> List[Product]: stmt = select(Product).where( Product.price.between(min_price, max_price) ) if category_id is not None: stmt = stmt.where(Product.category_id == category_id) stmt = stmt.options(joinedload(Product.category)) stmt = stmt.offset(offset).limit(limit) return list(self.session.scalars(stmt).unique()) def create(self, name: str, price: float, stock: int, category_id: int) -> Product: product = Product( name=name, price=price, stock=stock, category_id=category_id ) self.session.add(product) self.session.flush() return product def update_stock(self, product_id: int, quantity: int) -> bool: product = self.get_by_id(product_id) if not product: return False product.stock = quantity return True def delete(self, product_id: int) -> bool: product = self.get_by_id(product_id) if not product: return False self.session.delete(product) return True # ---- 使用示例 ---- with Session(engine) as session: repo = ProductRepository(session) # 创建分类和产品 cat = Category(name="书籍") session.add(cat) session.flush() product = repo.create( name="Python进阶编程", price=89.00, stock=200, category_id=cat.id ) print(f"创建产品 ID: {product.id}") # 搜索产品 results = repo.search(min_price=50, max_price=100) print(f"搜索到 {len(results)} 个产品") session.commit()

十一、核心要点总结

Python 数据库交互的核心要点:

  • DB-API 2.0 标准是所有 Python 数据库驱动的基石,理解 Connection/Cursor/execute/fetch* 协议至关重要。
  • SQLite3 作为 Python 标准库的一部分,是快速原型开发和单机应用的首选,应熟练掌握其行工厂和事务控制等高级特性。
  • SQLAlchemy 提供了从底层 Core 到上层 ORM 的完整数据库编程方案,推荐在新项目中使用 2.0 风格的声明式映射和 select() 查询。
  • 连接池配置直接影响应用性能和数据库稳定性,pool_size、max_overflow、pool_pre_ping 是需要重点关注的参数。
  • 事务管理使用上下文管理器(with 语句)是最安全、最简洁的方式,嵌套事务通过 savepoint 实现。
  • 异步数据库访问是高性能 Web 应用的标配,aiosqlite(SQLite)和 asyncpg(PostgreSQL)是各自领域的首选驱动。
  • SQL 注入防范永远使用参数化查询,永远不要用字符串拼接构建 SQL。ORM 在底层自动使用参数化查询。
  • Alembic 是数据库 Schema 版本管理的标准工具,每个迁移脚本应当原子化并在应用前审查。

学习路径建议:先掌握 sqlite3 模块 + DB-API 2.0,理解数据库交互的基本模式;然后深入 SQLAlchemy ORM,理解 Session、声明式映射、关系查询等核心概念;最后学习 Alembic 迁移和异步数据库访问,形成完整的数据库编程知识体系。