← 返回Python进阶编程目录
← 返回学习笔记首页
专题: Python进阶编程系统学习
关键词: Python, 数据库, SQLite, SQLAlchemy, ORM, 事务, 连接池, Alembic
一、概述
在现代应用开发中,数据库交互是不可或缺的核心能力。Python 凭借其简洁的语法和丰富的生态,提供了从底层 DB-API 到高层 ORM 框架的完整数据库编程解决方案。无论您是在构建 Web 应用、数据分析管道、还是自动化脚本,掌握 Python 数据库编程都将大幅提升您的开发效率。
Python 的数据库交互生态主要分为三个层次:底层数据库驱动(如 sqlite3、psycopg2、pymysql)、中层 DB-API 2.0 兼容层、以及高层 ORM 框架(如 SQLAlchemy、Django ORM)。理解每一层的工作原理和适用场景,能够帮助开发者在不同项目中做出最优的技术选型。
本文将系统性地讲解 Python 数据库交互的各个方面:从 Python 标准库中的 sqlite3 模块开始,逐步深入 SQLAlchemy 核心与 ORM、数据库连接池管理、事务控制、异步数据库访问、SQL 注入防范,以及使用 Alembic 进行数据库迁移管理。每个部分均配有可独立运行的代码示例,帮助读者在实践中掌握这些技术。
二、DB-API 2.0 标准
要点: PEP 249 定义的 Python 数据库 API 规范。所有主流数据库驱动(sqlite3、psycopg2、MySQL Connector、cx_Oracle)均遵循该标准,提供统一的接口。
2.1 核心接口
DB-API 2.0 定义了数据库交互的核心对象和协议。Connection 对象代表与数据库的一个连接会话,通过连接字符串或连接参数创建。Cursor 对象是执行 SQL 语句和获取结果的核心接口。使用 with 语句管理 Connection 和 Cursor 的上下文是一种推荐的最佳实践,能够确保资源被正确释放。
2.2 连接与游标生命周期
以下代码展示了使用 DB-API 2.0 标准的典型模式——建立连接、获取游标、执行 SQL、处理结果、清理资源。这种模式在所有遵循该标准的数据库驱动中基本一致。
import sqlite3
# 建立连接 - 所有遵循 DB-API 2.0 的驱动都支持类似模式
conn = sqlite3.connect('example.db' )
# 创建游标
cursor = conn.cursor()
# 执行 SQL
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'" )
# 获取单行
row = cursor.fetchone()
# 获取全部结果
all_rows = cursor.fetchall()
# 获取指定数量的行
some_rows = cursor.fetchmany(size=5 )
# 关闭游标和连接
cursor.close()
conn.close()
2.3 常用方法速查表
方法 描述 返回值
cursor.execute(sql, params) 执行单条 SQL 语句 Cursor 自身
cursor.executemany(sql, seq_of_params) 批量执行 SQL(参数为序列) Cursor 自身
cursor.fetchone() 获取结果集的下一行 单行 tuple / None
cursor.fetchmany(size) 获取最多 size 行 tuple 列表
cursor.fetchall() 获取所有剩余行 tuple 列表
conn.commit() 提交当前事务 None
conn.rollback() 回滚当前事务 None
cursor.description 列名和元信息 tuple 序列
cursor.rowcount 最近操作影响的行数 int
最佳实践: 始终使用 with conn: 上下文管理器——当块内代码正常执行完毕时会自动 commit,抛出异常时会自动 rollback。同时建议使用 with conn.cursor() as cur 确保游标被正确关闭。
三、SQLite3 内建数据库
SQLite3 是 Python 标准库中内置的轻量级数据库引擎,无需任何外部依赖即可使用。它以其零配置、服务端无进程、数据库文件单一的特点,成为嵌入式系统和单机应用的理想选择。Python 的 sqlite3 模块自 2.5 版本起被纳入标准库,完美实现了 DB-API 2.0 规范。
3.1 连接与建表
使用 sqlite3.connect() 可以连接到一个 SQLite 数据库文件。如果文件不存在,SQLite 会自动创建。使用内存数据库(":memory:")可以提升测试场景下的性能。
import sqlite3
# 连接数据库(自动创建文件)
conn = sqlite3.connect('shop.db' )
# 使用内存数据库(测试用)
# conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
age INTEGER DEFAULT 18,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''' )
conn.commit()
3.2 完整的 CRUD 操作
以下示例展示了使用 sqlite3 模块完成标准的增删改查(CRUD)操作。请注意,所有用户输入的操作都应使用参数化查询(问号占位符),这是防范 SQL 注入的第一道防线。
# ---------- CREATE ----------
cursor.execute(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)" ,
('alice' , 'alice@example.com' , 28 )
)
conn.commit()
print (f"插入用户 ID: {cursor.lastrowid}" )
# 批量插入
users = [
('bob' , 'bob@example.com' , 32 ),
('carol' , 'carol@example.com' , 25 ),
]
cursor.executemany(
"INSERT INTO users (username, email, age) VALUES (?, ?, ?)" ,
users
)
conn.commit()
# ---------- READ ----------
cursor.execute("SELECT * FROM users WHERE age > ?" , (25 ,))
rows = cursor.fetchall()
for row in rows:
print (f"ID: {row[0]}, 用户名: {row[1]}, 邮箱: {row[2]}" )
# ---------- UPDATE ----------
cursor.execute(
"UPDATE users SET age = ? WHERE username = ?" ,
(29 , 'alice' )
)
conn.commit()
print (f"更新了 {cursor.rowcount} 行" )
# ---------- DELETE ----------
cursor.execute(
"DELETE FROM users WHERE username = ?" ,
('carol' ,)
)
conn.commit()
print (f"删除了 {cursor.rowcount} 行" )
3.3 行工厂(Row Factory)
默认情况下,fetch 返回的是元组(tuple),通过列索引访问数据可读性较差。sqlite3 提供了行工厂机制,可以将返回的行转换为字典风格的对象,支持通过列名访问数据。
# 启用行工厂
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?" , ('alice' ,))
row = cursor.fetchone()
# 通过列名访问(推荐)
print (row['username' ])
print (row['email' ])
# 转换为字典
user_dict = dict(row)
print (user_dict)
# 获取列名元信息
print ([desc[0 ] for desc in cursor.description])
场景提示: SQLite3 适用于单机应用、原型开发、测试环境和小型 Web 应用(并发请求 < 100)。对于高并发场景,建议迁移至 PostgreSQL 或 MySQL。
四、SQLAlchemy 核心与 ORM
SQLAlchemy 是 Python 社区中最成熟、最强大的数据库工具包。它提供了两套独立但可以协同工作的 API:SQLAlchemy Core(核心)提供 SQL 表达式语言,以 Python 代码的方式构建 SQL 语句;SQLAlchemy ORM 则在 Core 之上构建了完整的对象关系映射层,允许开发者通过 Python 对象直接操作数据库。
4.1 Engine 与连接管理
Engine 是 SQLAlchemy 的核心入口点,它封装了数据库连接池和方言(Dialect)信息。Engine 负责管理到数据库的实际连接,并将 SQL 语句转换为目标数据库的方言。
from sqlalchemy import create_engine, text
# SQLite 内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:" , echo=True )
# PostgreSQL(需安装 psycopg2)
# engine = create_engine("postgresql+psycopg2://user:pass@localhost/dbname")
# MySQL(需安装 pymysql)
# engine = create_engine("mysql+pymysql://user:pass@localhost/dbname")
# 使用 Engine 执行原生 SQL
with engine.connect() as conn:
result = conn.execute(text("SELECT 1" ))
print (result.scalar()) # 输出: 1
4.2 声明式 ORM 映射(Declarative Mapping)
SQLAlchemy 的声明式映射系统允许开发者使用 Python 类来定义数据库表结构。从 2.0 版本开始,声明式映射得到了全面改进,推荐使用 mapped_column() 替代旧式的 Column()。
from typing import Optional, List
from datetime import datetime
from sqlalchemy import ForeignKey, String, Float, Text
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column,
relationship, Session
)
# 声明基类
class Base (DeclarativeBase):
pass
# 定义模型
class Category (Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True )
name: Mapped[str] = mapped_column(String(100 ), unique=True , nullable=False )
description: Mapped[Optional[str]] = mapped_column(Text)
# 关系:一个分类有多个产品
products: Mapped[List["Product" ]] = relationship(back_populates="category" )
def __repr__ (self ) -> str:
return f"<Category(id={self.id}, name={self.name})>"
class Product (Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True )
name: Mapped[str] = mapped_column(String(200 ), nullable=False )
price: Mapped[float] = mapped_column(Float, nullable=False )
stock: Mapped[int] = mapped_column(default=0 )
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id" ))
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
category: Mapped["Category" ] = relationship(back_populates="products" )
def __repr__ (self ) -> str:
return f"<Product(id={self.id}, name={self.name}, price={self.price})>"
4.3 创建表与 Session 管理
Session 是 SQLAlchemy ORM 操作的核心管理器。它跟踪所有被加载或创建的对象,并在合适的时机将变更同步到数据库。Repository 模式可以进一步封装 Session 操作,使业务逻辑与数据库访问解耦。
# 创建所有表
Base.metadata.create_all(engine)
# 创建 Session
session = Session(engine)
# 插入数据(ORM 风格)
category = Category(name="电子设备" , description="消费电子产品" )
session.add(category)
session.flush() # 立即发送 SQL 但不提交(获取 auto-generated ID)
product = Product(
name="无线蓝牙耳机" ,
price=299.00 ,
stock=500 ,
category_id=category.id
)
session.add(product)
session.commit() # 提交事务
# 查询数据(Query API,SQLAlchemy 1.x 风格)
products = session.query(Product).filter(Product.price > 100 ).all()
# 查询数据(2.0 风格——推荐)
from sqlalchemy import select
stmt = select(Product).where(Product.price > 100 ).order_by(Product.price.desc())
for product in session.scalars(stmt):
print (product.name, product.price)
# 更新数据
product.price = 249.00
session.commit()
# 删除数据
session.delete(product)
session.commit()
session.close()
4.4 关系查询与预加载
在 ORM 中处理关联关系时,N+1 查询问题是常见的性能陷阱。SQLAlchemy 提供了多种预加载(Eager Loading)策略,可以在一次查询中获取关联数据,显著提升查询性能。
from sqlalchemy.orm import joinedload, selectinload
# 懒加载(Lazy Loading)—— 默认行为,可能导致 N+1 查询
# 访问 product.category 时会额外发送查询
# 立即加载(Joined Load)—— 使用 LEFT JOIN
stmt = select(Product).options(joinedload(Product.category))
products = session.scalars(stmt).unique().all()
# 此时 product.category 已经在同一查询中加载,不会触发额外查询
# 子查询加载(Selectin Load)—— 使用 IN 子句,适合一对多关系
stmt = select(Category).options(selectinload(Category.products))
categories = session.scalars(stmt).unique().all()
for cat in categories:
print (f"{cat.name}: {[p.name for p in cat.products]}" )
推荐方案(ORM 2.0 风格):
使用 select() 配合 session.scalars()
类型安全,与异步兼容,未来发展方向。
旧式方案(查询 1.x 风格):
使用 session.query(Model).filter()
仍然可用,但不推荐在新项目中使用。
五、数据库连接池
在 Web 应用和高频数据库交互场景中,频繁创建和销毁数据库连接的代价非常高。连接池机制通过维护一组可复用的连接,显著降低了连接创建的开销,同时限制了并发连接数,防止数据库过载。
5.1 SQLAlchemy 内置连接池
SQLAlchemy 的 Engine 默认集成了连接池。最常用的实现是 QueuePool,它维护一个固定大小的连接队列,工作线程从队列中获取连接,使用完后归还。
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool, NullPool, assertion
# QueuePool —— 默认连接池,适用于大多数场景
engine = create_engine(
"sqlite+pysqlite:///shop.db" ,
poolclass=QueuePool,
pool_size=5 , # 池中保持的连接数
max_overflow=10 , # 最大溢出连接数(超过 pool_size 后可额外创建的连接)
pool_timeout=30 , # 获取连接的超时时间(秒)
pool_recycle=3600 , # 连接回收时间(秒),防止连接被数据库关闭
pool_pre_ping=True , # 每次获取连接前发送 SELECT 1 检查连接有效性
)
# NullPool —— 无连接池,每次获取新连接,适合 SQLite :memory:
engine = create_engine(
"sqlite+pysqlite:///:memory:" ,
poolclass=NullPool
)
# 监听连接检入/检出事件
@event.listens_for (engine, "checkout" )
def receive_checkout (dbapi_conn, conn_record, conn_proxy):
print (f"连接被检出: {dbapi_conn}" )
连接池调优建议: pool_size 应设置为应用并发峰值的 70%-80%;max_overflow 应对突发流量;pool_pre_ping=True 可以有效避免使用已断开的陈旧连接;pool_recycle 建议小于数据库侧的连接超时时间(通常 MySQL 为 8 小时,建议设为 3600)。
5.2 自定义连接池
对于特定的应用场景,可以通过继承 Pool 基类来创建自定义的连接池。例如限制单例模式的连接池、记录连接使用统计的连接池等。
from sqlalchemy.pool import Pool
import time
class TimingPool (Pool):
"""记录连接获取时间的连接池"""
def __init__ (self , creator, **kwargs):
super ().__init__(creator, **kwargs)
self .acquisition_times = []
def _create_connection (self ):
start = time.perf_counter()
conn = super ()._create_connection()
elapsed = time.perf_counter() - start
self .acquisition_times.append(elapsed)
return conn
六、事务管理
事务是数据库操作的基本原子单位。Python 数据库编程中,事务管理主要分为自动提交模式和手动提交模式两种。SQLAlchemy 2.0 引入了更清晰的事务控制语义,特别是在嵌套事务和保存点(Savepoint)方面。
6.1 手动事务控制
在 SQLite3 中,当执行数据修改语句(INSERT、UPDATE、DELETE)时,事务会自动开始。用户需要显式调用 commit() 或 rollback() 来完成事务。在 SQLAlchemy 2.0 中,推荐使用连接对象的上下文管理器来处理事务。
# SQLite3 手动事务
conn = sqlite3.connect('shop.db' )
try :
cursor = conn.cursor()
cursor.execute("UPDATE users SET age = age + 1 WHERE username = ?" , ('alice' ,))
cursor.execute("UPDATE users SET age = age - 1 WHERE username = ?" , ('bob' ,))
conn.commit() # 两笔更新作为一个原子事务
except Exception as e:
conn.rollback() # 出错时回滚全部变更
print (f"事务回滚: {e}" )
finally :
conn.close()
6.2 SQLAlchemy 事务与 Savepoint
SQLAlchemy 2.0 使用 connection.begin() 或 connection.begin_nested() 管理事务。嵌套事务通过数据库的保存点(SAVEPOINT)机制实现,允许在事务内部回滚到特定点,而不影响整个事务。
from sqlalchemy import create_engine, text
engine = create_engine("sqlite+pysqlite:///shop.db" )
# 简洁的自动事务管理(推荐)
with engine.connect() as conn:
with conn.begin():
conn.execute(text("UPDATE users SET age = age + 1 WHERE id = 1" ))
# 嵌套事务——使用 Savepoint
with engine.connect() as conn:
with conn.begin() as transaction:
conn.execute(text("INSERT INTO users (username) VALUES ('temp1')" ))
# 创建保存点
with conn.begin_nested():
conn.execute(text("INSERT INTO users (username) VALUES ('temp2')" ))
conn.execute(text("INSERT INTO users (username) VALUES ('temp3')" ))
# 如果这里出错,内部 savepoint 回滚,但外部事务不受影响
conn.execute(text("INSERT INTO users (username) VALUES ('temp4')" ))
# 事务提交后:temp1, temp2, temp3, temp4 全部存在
注意: SQLite3 默认不支持嵌套事务。SQLAlchemy 通过 SAVEPOINT 模拟了嵌套事务的行为,但底层 SQLite 的保存点具有一定的限制(例如不支持 DDL 语句的回滚)。建议在 PostgreSQL 中使用真正的嵌套事务。
6.3 ORM Session 事务边界
在 SQLAlchemy ORM 中,Session 本身就是一个事务边界管理器。理解 Session 的刷新(flush)和提交(commit)之间的区别至关重要。flush 将对象状态同步到数据库(发送 SQL),而 commit 则持久化变更并结束事务。
# Session 事务管理
session = Session(engine)
try :
user = User(name="新用户" , email="new@example.com" )
session.add(user)
session.flush() # 发送 INSERT,获取 user.id,但未提交
# 可以利用 user.id 做后续操作
profile = Profile(user_id=user.id, bio="简介" )
session.add(profile)
session.commit() # 提交整个事务
except Exception:
session.rollback() # 回滚全部
raise
finally :
session.close()
七、异步数据库访问
随着异步编程在 Python 中的普及(FastAPI、Tornado、aiohttp 等框架),异步数据库访问成为高性能应用的关键需求。Python 生态提供了多种异步数据库驱动,其中最流行的是 aiosqlite、asyncpg(PostgreSQL 专用)和 SQLAlchemy 的异步扩展 sqlalchemy.asyncio。
7.1 aiosqlite
aiosqlite 是对 sqlite3 的异步封装,提供与标准 sqlite3 模块类似的接口,但所有 I/O 操作都是非阻塞的。非常适合在异步 Web 应用中使用 SQLite 的场景。
import asyncio
import aiosqlite
async def main ():
async with aiosqlite.connect("shop.db" ) as db:
async with db.execute(
"SELECT username, email FROM users WHERE age > ?" ,
(25 ,)
) as cursor:
async for row in cursor:
print (f"用户名: {row[0]}, 邮箱: {row[1]}" )
asyncio.run(main())
7.2 asyncpg(PostgreSQL)
asyncpg 是目前最快的 Python PostgreSQL 驱动,专为异步编程设计。它直接构建在 PostgreSQL 的二进制协议之上,跳过了传统驱动中的文本协议解析层,在性能上具有显著优势。
import asyncio
import asyncpg
async def main ():
conn = await asyncpg.connect(
user='user' ,
password='password' ,
database='dbname' ,
host='localhost'
)
# 准备语句(Prepared Statement)
stmt = await conn.prepare("SELECT username, email FROM users WHERE age >= $1" )
rows = await stmt.fetch(18 )
for row in rows:
print (f"用户名: {row['username']}" )
# 批量插入
await conn.executemany(
"INSERT INTO users (username, email, age) VALUES ($1, $2, $3)" ,
[('user1' , 'u1@test.com' , 20 ),
('user2' , 'u2@test.com' , 30 )]
)
await conn.close()
asyncio.run(main())
7.3 SQLAlchemy 异步引擎
SQLAlchemy 1.4 起引入了对 asyncio 的完整支持。通过 create_async_engine 创建异步引擎,配合 AsyncSession 进行 ORM 操作。这使得在 FastAPI 等异步框架中可以使用完整的 SQLAlchemy ORM 功能。
from sqlalchemy.ext.asyncio import (
create_async_engine, AsyncSession, async_sessionmaker
)
from sqlalchemy.orm import selectinload
# 创建异步引擎(注意 URL 前缀:sqlite+aiosqlite://)
async_engine = create_async_engine(
"sqlite+aiosqlite:///shop.db" ,
echo=True
)
# 异步 Session 工厂
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_product_count ():
async with AsyncSessionLocal() as session:
from sqlalchemy import select, func
result = await session.execute(select(func.count()).select_from(Product))
count = result.scalar()
return count
# 在 FastAPI 中使用
# @app.get("/products/count")
# async def product_count():
# count = await get_product_count()
# return {"count": count}
八、SQL 注入防范
安全警示: SQL 注入是最常见也最危险的 Web 安全漏洞之一。永远不要相信用户输入,永远不要通过字符串拼接构建 SQL 语句。
8.1 错误的做法
以下代码展示了绝对不要使用的做法——通过 f-string 或 format 将用户输入直接插入 SQL 字符串。攻击者可以通过输入恶意字符串操纵查询逻辑,进而窃取数据、破坏数据库甚至获取系统权限。
# 危险的写法!永远不要这样做!
username = "alice' OR '1'='1" # 恶意输入
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'" )
# 实际执行的 SQL:
# SELECT * FROM users WHERE username = 'alice' OR '1'='1'
# -> 返回所有用户!
# 更恶意的输入
username = "'; DROP TABLE users; --"
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'" )
# -> users 表被删除!
8.2 正确的做法:参数化查询
参数化查询将 SQL 语句与参数数据分离,数据库驱动会自动对参数进行转义和引用,从根本上杜绝了 SQL 注入的可能性。所有遵循 DB-API 2.0 的驱动都支持参数化查询。
# 方式一:DB-API 2.0 问号占位符(推荐,适用于 sqlite3, psycopg2 等)
cursor.execute(
"SELECT * FROM users WHERE username = ? AND age > ?" ,
(username, age)
)
# 方式二:命名参数(适用于 sqlite3, 更清晰)
cursor.execute(
"SELECT * FROM users WHERE username = :name AND age > :min_age" ,
{"name" : username, "min_age" : age}
)
# 方式三:%s 占位符(MySQL Connector/Python 风格)
cursor.execute(
"SELECT * FROM users WHERE username = %s AND age > %s" ,
(username, age)
)
# 方式四:$1 占位符(asyncpg 风格)
await conn.fetchrow(
"SELECT * FROM users WHERE username = $1 AND age > $2" ,
username, age
)
8.3 ORM 的天然防护
使用 SQLAlchemy ORM 时,ORM 框架在底层使用参数化查询,天然提供了 SQL 注入防护。但需要注意的是,在 ORM 中使用原生 SQL 或 text() 时仍需谨慎。
# ORM 查询 —— 自动参数化,安全
stmt = select(User).where(User.username == user_input)
result = session.execute(stmt)
# text() —— 使用绑定参数,安全
stmt = text("SELECT * FROM users WHERE username = :name" )
result = session.execute(stmt, {"name" : user_input})
# text() 中慎用 format() —— 危险!
stmt = text(f"SELECT * FROM users WHERE username = '{user_input}'" )
# 仍然存在注入风险!应使用绑定参数替代
危险做法:
字符串拼接 / f-string / format
f"SELECT * FROM t WHERE x = '{input}'"
安全做法:
参数化查询 / 绑定变量
"SELECT * FROM t WHERE x = ?"
九、数据库迁移(Alembic)
随着应用的发展,数据库 Schema 的变更不可避免。Alembic 是 SQLAlchemy 团队开发的数据库迁移工具,能够以版本化、可追溯的方式管理数据库 Schema 变更。每次 Schema 变更对应一个迁移脚本,可以向上(upgrade)应用变更,也可以向下(downgrade)回滚变更。
9.1 初始化与配置
Alembic 使用 alembic init 命令初始化迁移环境,生成配置文件和迁移脚本目录。核心配置文件 alembic.ini 包含数据库连接地址等配置。env.py 文件定义了 Alembic 如何与您的 SQLAlchemy 模型关联。
# 终端命令
# pip install alembic
# alembic init alembic
# alembic.ini 中设置数据库 URL
# sqlalchemy.url = sqlite:///shop.db
# env.py 中设置 Target Metadata
# from your_models import Base
# target_metadata = Base.metadata
9.2 生成迁移脚本
使用 alembic revision --autogenerate 可以自动检测模型定义与当前数据库 Schema 之间的差异,并生成对应的迁移脚本。在自动生成后,始终需要审查生成的脚本,确认变更的准确性。
# 创建迁移脚本
# alembic revision --autogenerate -m "add_product_table"
# 生成的迁移脚本示例
"""add product table
Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-05-05 22:50:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision: str = 'a1b2c3d4e5f6'
down_revision: Union[str, None ] = None
branch_labels: Union[str, Sequence[str], None ] = None
depends_on: Union[str, Sequence[str], None ] = None
def upgrade () -> None :
op.create_table(
'products' ,
sa.Column('id' , sa.Integer(), primary_key=True ),
sa.Column('name' , sa.String(200 ), nullable=False ),
sa.Column('price' , sa.Float(), nullable=False ),
sa.Column('stock' , sa.Integer(), server_default='0' ),
)
def downgrade () -> None :
op.drop_table('products' )
9.3 应用与回滚
通过简单的命令即可完成迁移的应用和回滚。始终在应用迁移前备份数据库(尤其是在生产环境中)。
# 应用所有待定迁移(最新版本)
# alembic upgrade head
# 回滚到指定版本
# alembic downgrade a1b2c3d4e5f6
# 查看迁移历史
# alembic history
# 查看当前版本
# alembic current
# 升级指定版本数
# alembic upgrade +2
# 回滚指定版本数
# alembic downgrade -1
迁移最佳实践: 每个迁移脚本只做一件事(如:添加一个表、修改一个列),便于回滚;生产环境的迁移必须先经过测试环境验证;迁移脚本一旦提交到版本控制后不应修改(使用新的迁移修正);使用 op.get_bind() 可以在迁移中执行数据迁移脚本。
9.4 复杂迁移:数据迁移
有时 Schema 变更需要伴随数据迁移,例如拆分列、合并列、或转换数据格式。Alembic 的 upgrade() 函数可以在 Schema 变更之后执行数据操作。
def upgrade () -> None :
# Schema 变更:新增 full_name 列
op.add_column('users' , sa.Column('full_name' , sa.String(200 )))
# 数据迁移:将 first_name + ' ' + last_name 合并为 full_name
op.execute("""
UPDATE users
SET full_name = first_name || ' ' || last_name
WHERE full_name IS NULL
""" )
# Schema 变更:设置 full_name 为 NOT NULL
op.alter_column('users' , 'full_name' ,
existing_type=sa.String(200 ),
nullable=False )
十、综合示例:构建简单的产品 API
以下综合示例整合了 SQLAlchemy ORM、Session 管理、关系查询和异步数据库访问等知识点,展示如何构建一个完整的产品数据访问层(DAL)。该模式可以轻松集成到 FastAPI 或 Flask 应用中。
from __future__ import annotations
from typing import Optional, List
from datetime import datetime
from dataclasses import dataclass
from sqlalchemy import (
create_engine, String, Float, Integer, Text,
ForeignKey, select, func
)
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column,
relationship, Session, joinedload
)
# ---- 模型定义 ----
class Base (DeclarativeBase):
pass
class Category (Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True )
name: Mapped[str] = mapped_column(String(100 ), unique=True )
products: Mapped[List["Product" ]] = relationship(back_populates="category" )
class Product (Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True )
name: Mapped[str] = mapped_column(String(200 ))
price: Mapped[float] = mapped_column(Float)
stock: Mapped[int] = mapped_column(Integer, default=0 )
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id" ))
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
category: Mapped["Category" ] = relationship(back_populates="products" )
# ---- 数据访问层 ----
engine = create_engine("sqlite:///shop.db" , echo=True )
Base.metadata.create_all(engine)
class ProductRepository :
"""产品数据访问仓库"""
def __init__ (self , session: Session):
self .session = session
def get_by_id (self , product_id: int) -> Optional[Product]:
return self .session.get(Product, product_id)
def search (
self , min_price: float = 0 ,
max_price: float = float('inf' ),
category_id: Optional[int] = None ,
offset: int = 0 ,
limit: int = 20
) -> List[Product]:
stmt = select(Product).where(
Product.price.between(min_price, max_price)
)
if category_id is not None :
stmt = stmt.where(Product.category_id == category_id)
stmt = stmt.options(joinedload(Product.category))
stmt = stmt.offset(offset).limit(limit)
return list(self .session.scalars(stmt).unique())
def create (self , name: str, price: float,
stock: int, category_id: int) -> Product:
product = Product(
name=name, price=price,
stock=stock, category_id=category_id
)
self .session.add(product)
self .session.flush()
return product
def update_stock (self , product_id: int, quantity: int) -> bool:
product = self .get_by_id(product_id)
if not product:
return False
product.stock = quantity
return True
def delete (self , product_id: int) -> bool:
product = self .get_by_id(product_id)
if not product:
return False
self .session.delete(product)
return True
# ---- 使用示例 ----
with Session(engine) as session:
repo = ProductRepository(session)
# 创建分类和产品
cat = Category(name="书籍" )
session.add(cat)
session.flush()
product = repo.create(
name="Python进阶编程" ,
price=89.00 ,
stock=200 ,
category_id=cat.id
)
print (f"创建产品 ID: {product.id}" )
# 搜索产品
results = repo.search(min_price=50 , max_price=100 )
print (f"搜索到 {len(results)} 个产品" )
session.commit()
十一、核心要点总结
Python 数据库交互的核心要点:
DB-API 2.0 标准 是所有 Python 数据库驱动的基石,理解 Connection/Cursor/execute/fetch* 协议至关重要。
SQLite3 作为 Python 标准库的一部分,是快速原型开发和单机应用的首选,应熟练掌握其行工厂和事务控制等高级特性。
SQLAlchemy 提供了从底层 Core 到上层 ORM 的完整数据库编程方案,推荐在新项目中使用 2.0 风格的声明式映射和 select() 查询。
连接池 配置直接影响应用性能和数据库稳定性,pool_size、max_overflow、pool_pre_ping 是需要重点关注的参数。
事务管理 使用上下文管理器(with 语句)是最安全、最简洁的方式,嵌套事务通过 savepoint 实现。
异步数据库访问 是高性能 Web 应用的标配,aiosqlite(SQLite)和 asyncpg(PostgreSQL)是各自领域的首选驱动。
SQL 注入防范 永远使用参数化查询,永远不要用字符串拼接构建 SQL。ORM 在底层自动使用参数化查询。
Alembic 是数据库 Schema 版本管理的标准工具,每个迁移脚本应当原子化并在应用前审查。
学习路径建议: 先掌握 sqlite3 模块 + DB-API 2.0,理解数据库交互的基本模式;然后深入 SQLAlchemy ORM,理解 Session、声明式映射、关系查询等核心概念;最后学习 Alembic 迁移和异步数据库访问,形成完整的数据库编程知识体系。