← 返回自动化办公目录
← 返回学习笔记首页
专题: Python 自动化办公系统学习
关键词: Python, 自动化办公, sqlite3, SQLite, 数据库, CRUD, 事务管理, 数据导入导出, Python
一、SQLite与sqlite3概述
SQLite是一个轻量级的嵌入式关系型数据库引擎,它不需要独立的数据库服务器进程,而是直接读写普通磁盘文件。整个数据库就是一个文件,这使得它在部署、分发和迁移方面具有天然优势。SQLite遵守ACID原则,实现了大多数SQL标准,支持视图、触发器、事务等高级功能,同时体积仅几百KB。它被广泛应用于嵌入式设备、移动应用(Android/iOS)、桌面软件、浏览器(Chrome/Firefox都用它存储书签和历史记录)以及各类本地数据存储场景。对于办公自动化而言,SQLite是替代Excel文件和CSV文件的理想方案,它支持更复杂的数据查询和管理操作,性能也远高于纯文本文件。
Python内置了sqlite3模块,无需额外安装即可使用。这个模块提供了符合DB-API 2.0规范的数据库操作接口,包括连接管理、游标操作、事务控制等功能。sqlite3模块不仅支持基础的SQL语句执行,还提供了Row对象、Connection上下文管理器、自定义聚合函数等高级特性。在数据库管理方面,推荐使用DB Browser for SQLite或SQLiteStudio等图形化工具,方便查看和管理数据库文件。命令行工具sqlite3.exe也提供了完整的数据库管理功能,适合脚本自动化场景。
SQLite在办公自动化中的典型应用场景包括:本地数据存储和管理(替代Excel)、程序配置信息持久化、数据分析和报表系统的数据源、日志记录和查询系统、ETL流程中的中间数据存储。相比于使用Excel文件,SQLite支持多用户并发读、细粒度的数据查询和更新、支持SQL复杂查询和聚合分析,数据完整性和一致性更好。相比于使用大型数据库(MySQL/PostgreSQL),SQLite零配置、零维护、部署简单的特点使其更适合桌面工具和单机应用。
import sqlite3
import sys
# 检查Python版本和sqlite3版本
print(f"Python版本: {sys.version}")
print(f"SQLite版本: {sqlite3.sqlite_version}")
print(f"sqlite3模块版本: {sqlite3.version}")
# 创建内存数据库(不产生文件,适合测试)
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.execute("SELECT sqlite_version(), date('now')")
version, today = cursor.fetchone()
print(f"SQLite版本: {version}, 当前日期: {today}")
conn.close()
import sqlite3
import os
# 创建文件数据库
db_path = "office_automation.db"
conn = sqlite3.connect(db_path)
print(f"数据库文件创建成功: {os.path.abspath(db_path)}")
print(f"数据库文件大小: {os.path.getsize(db_path)} 字节")
conn.close()
# 验证文件存在并连接已有数据库
conn2 = sqlite3.connect(db_path)
cursor = conn2.cursor()
# 查询数据库中所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print(f"当前数据库中的表: {tables}")
conn2.close()
核心要点: SQLite是零配置、无服务器的嵌入式数据库引擎,Python的sqlite3是内置模块无需pip安装。创建数据库只需指定文件路径,文件不存在即自动创建。":memory:"可用于创建纯内存数据库,适合测试和临时数据场景。
二、连接与表操作
数据库连接是操作SQLite的入口。sqlite3模块通过connect()函数建立与数据库文件的连接,如果指定的文件不存在则会自动创建。连接对象(Connection)提供了cursor()方法创建游标、execute()直接执行SQL、commit()和rollback()管理事务、close()关闭连接等核心方法。在实际办公自动化开发中,推荐使用with语句(上下文管理器)来管理连接生命周期,确保资源正确释放。
表操作是数据库应用的基石。CREATE TABLE语句用于创建新表,需要指定表名、列名和数据类型。SQLite支持的数据类型包括:NULL(空值)、INTEGER(整型)、REAL(浮点型)、TEXT(文本)、BLOB(二进制数据)。此外SQLite还支持列约束如NOT NULL(非空)、UNIQUE(唯一)、PRIMARY KEY(主键)、FOREIGN KEY(外键)、DEFAULT(默认值)、CHECK(检查约束)。ALTER TABLE语句支持修改表名(RENAME)和添加列(ADD COLUMN);DROP TABLE用于删除表。需要注意的是SQLite对ALTER TABLE的支持有限,不支持删除列或修改列类型,如需进行复杂结构变更通常需要重建表。
在实际开发中,良好的表设计是数据库应用的基础。建议在设计表时:为每张表添加一个自增的INTEGER PRIMARY KEY作为主键;为常用查询字段创建索引;合理使用DEFAULT约束提供默认值;使用NOT NULL约束确保关键字段不为空;通过UNIQUE约束防止重复数据。使用IF NOT EXISTS子句可以防止重复建表错误。查看表结构可以使用PRAGMA table_info()命令。
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 创建员工表
cursor.execute("""
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
department TEXT NOT NULL DEFAULT '未分配',
position TEXT NOT NULL,
salary REAL NOT NULL DEFAULT 0.0,
hire_date TEXT NOT NULL,
email TEXT UNIQUE,
is_active INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now', 'localtime'))
)
""")
print("员工表创建成功")
# 创建部门表
cursor.execute("""
CREATE TABLE IF NOT EXISTS departments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dept_name TEXT NOT NULL UNIQUE,
manager TEXT,
budget REAL DEFAULT 0.0
)
""")
print("部门表创建成功")
# 查看表结构
cursor.execute("PRAGMA table_info(employees)")
columns = cursor.fetchall()
print("\nemployees表结构:")
for col in columns:
print(f" {col[1]:20s} {col[2]:10s} 非空={col[3]} 默认={col[4]}")
conn.commit()
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# ALTER TABLE 操作
cursor.execute("ALTER TABLE employees ADD COLUMN phone TEXT")
print("已添加phone列")
cursor.execute("ALTER TABLE employees RENAME TO staff")
print("已将employees表重命名为staff")
# 创建新表并复制数据(模拟删除列)
cursor.execute("""
CREATE TABLE employees_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
department TEXT NOT NULL,
salary REAL DEFAULT 0.0,
email TEXT UNIQUE,
hire_date TEXT
)
""")
cursor.execute("""
INSERT INTO employees_new (id, name, department, salary, email, hire_date)
SELECT id, name, department, salary, email, hire_date FROM staff
""")
cursor.execute("DROP TABLE staff")
cursor.execute("ALTER TABLE employees_new RENAME TO employees")
print("已通过重建表删除position和phone列")
# 查看最终表结构
cursor.execute("PRAGMA table_info(employees)")
for col in cursor.fetchall():
print(f" {col[1]:20s} {col[2]:10s}")
conn.commit()
conn.close()
import sqlite3
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
# 演示各种数据类型和约束
cursor.execute("""
CREATE TABLE data_types_demo (
id INTEGER PRIMARY KEY,
int_col INTEGER, -- 整型
real_col REAL, -- 浮点型
text_col TEXT, -- 文本
blob_col BLOB, -- 二进制
bool_col INTEGER, -- SQLite无布尔型,用0/1代替
date_col TEXT, -- 日期存为TEXT
ts_col TEXT DEFAULT (datetime('now'))
)
""")
# 插入各种类型数据
cursor.execute("""
INSERT INTO data_types_demo VALUES (
1, 42, 3.14159, 'Hello SQLite',
x'68656C6C6F', 1, '2026-05-05', '2026-05-05 23:58:31'
)
""")
# 查看存储类型
cursor.execute("SELECT typeof(int_col), typeof(real_col), typeof(text_col), typeof(blob_col), typeof(bool_col) FROM data_types_demo")
types = cursor.fetchone()
print(f"存储类型: INTEGER={types[0]}, REAL={types[1]}, TEXT={types[2]}, BLOB={types[3]}, BOOL={types[4]}")
conn.close()
核心要点: SQLite是动态类型系统,但建议使用静态类型以保证数据一致性。INTEGER PRIMARY KEY自增列是常用的行标识方案。ALTER TABLE功能有限,复杂结构变更需要通过"新建表-复制数据-删除旧表-重命名"四步完成。使用PRAGMA table_info()可查询表结构。
三、CRUD操作
CRUD是数据库最基本的四个操作:Create(创建/插入)、Read(读取/查询)、Update(更新)、Delete(删除)。INSERT语句用于向表中添加新记录,支持插入单条数据和多条批量插入。使用INSERT OR REPLACE或INSERT OR IGNORE可以处理主键或唯一约束冲突。SELECT语句用于查询数据,支持条件过滤(WHERE)、排序(ORDER BY)、分页(LIMIT/OFFSET)、字段计算和别名等。WHERE子句支持比较运算符(=, >, <, >=, <=, !=)、逻辑运算符(AND, OR, NOT)、范围检查(IN, BETWEEN)和模式匹配(LIKE, GLOB)。ORDER BY可以指定多个排序字段和排序方向(ASC/DESC)。LIMIT和OFFSET组合实现分页查询。
UPDATE语句用于修改已有记录,通常配合WHERE子句限定更新范围,如果不加WHERE则会更新表中所有行。DELETE语句用于删除记录,同样需要配合WHERE使用以防误删。在办公自动化场景中,CRUD操作是最核心的数据管理手段:通过程序批量导入Excel数据到数据库、按条件筛选查询生成报表、批量更新状态字段、定时清理过期数据等。sqlite3模块默认开启事务自动提交,这意味着每次execute()后都会立刻写入磁盘。如果需要批量操作,建议使用显式事务来提升性能(详见第五节事务管理)。
使用fetchall()、fetchone()和fetchmany(size)三种方法获取查询结果。fetchall()返回所有剩余结果行的列表;fetchone()返回下一行(无数据返回None);fetchmany(size)返回最多size行数据。游标对象还提供了rowcount属性表示最近一次操作影响的行数,lastrowid属性表示最后插入行的ROWID,这些属性在自动化处理中非常实用。
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 先插入部门数据
cursor.execute("""
INSERT INTO departments (dept_name, manager, budget)
VALUES ('技术部', '张三', 500000)
""")
cursor.execute("""
INSERT INTO departments (dept_name, manager, budget)
VALUES ('市场部', '李四', 300000)
""")
cursor.execute("""
INSERT INTO departments (dept_name, manager, budget)
VALUES ('财务部', '王五', 200000)
""")
# 批量插入员工数据
employees_data = [
('张三', '技术部', '技术总监', 35000.0, 'zhangsan@company.com', '2020-01-15'),
('李四', '市场部', '市场经理', 25000.0, 'lisi@company.com', '2020-03-20'),
('王五', '财务部', '财务主管', 28000.0, 'wangwu@company.com', '2021-06-01'),
('赵六', '技术部', '高级工程师', 22000.0, 'zhaoliu@company.com', '2021-09-10'),
('孙七', '技术部', '工程师', 15000.0, 'sunqi@company.com', '2022-02-28'),
('周八', '市场部', '市场专员', 12000.0, 'zhouba@company.com', '2022-07-15'),
('吴九', '财务部', '会计', 13000.0, 'wujiu@company.com', '2023-01-05'),
('郑十', '市场部', '品牌经理', 20000.0, 'zhengshi@company.com', '2023-04-20'),
]
cursor.executemany("""
INSERT INTO employees (name, department, position, salary, email, hire_date)
VALUES (?, ?, ?, ?, ?, ?)
""", employees_data)
print(f"已插入 {len(employees_data)} 条员工记录")
conn.commit()
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
# 设置Row工厂,使查询结果支持列名访问
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# SELECT基本查询 - 所有员工
cursor.execute("SELECT id, name, department, position, salary FROM employees")
print("=== 所有员工 ===")
for row in cursor.fetchall():
print(f" #{row['id']} {row['name']:4s} | {row['department']:6s} | {row['position']:8s} | {row['salary']:>6.0f}")
# WHERE条件筛选
cursor.execute("""
SELECT name, department, salary
FROM employees
WHERE salary >= 20000 AND department = '技术部'
ORDER BY salary DESC
""")
print("\n=== 技术部高薪员工(>=20000) ===")
for row in cursor.fetchall():
print(f" {row['name']}: {row['salary']:.0f}")
# LIKE模糊查询 + ORDER BY + LIMIT分页
cursor.execute("""
SELECT name, position, salary
FROM employees
WHERE position LIKE '%工程师%'
ORDER BY salary DESC
LIMIT 3
""")
print("\n=== 前3名工程师 ===")
for row in cursor.fetchall():
print(f" {row['name']}: {row['position']} {row['salary']:.0f}")
# 聚合查询
cursor.execute("""
SELECT department,
COUNT(*) as 人数,
ROUND(AVG(salary), 0) as 平均薪资,
MAX(salary) as 最高薪资,
MIN(salary) as 最低薪资,
SUM(salary) as 总薪资
FROM employees
GROUP BY department
ORDER BY 平均薪资 DESC
""")
print("\n=== 部门薪资统计 ===")
for row in cursor.fetchall():
print(f" {row['department']:6s} | 人数:{row['人数']} | 平均:{row['平均薪资']:>6.0f} | 最高:{row['最高薪资']:>6.0f}")
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# UPDATE - 员工调薪
cursor.execute("""
UPDATE employees
SET salary = salary * 1.10
WHERE department = '技术部' AND salary < 20000
""")
print(f"技术部低薪员工加薪: 影响行数 {cursor.rowcount}")
# UPDATE - 部门经理变更
cursor.execute("""
UPDATE departments
SET manager = '赵六'
WHERE dept_name = '技术部'
""")
print(f"技术部经理已变更: 影响行数 {cursor.rowcount}")
# DELETE - 删除指定记录
cursor.execute("DELETE FROM employees WHERE name = '郑十'")
print(f"已删除记录: 影响行数 {cursor.rowcount}")
# 验证结果
cursor.execute("SELECT name, department, salary FROM employees ORDER BY salary DESC")
for row in cursor.fetchall():
print(f" {row[0]:4s} | {row[1]:6s} | {row[2]:>8.0f}")
conn.commit()
conn.close()
核心要点: CRUD是数据库操作的四项基本功。WHERE条件过滤必须谨慎,UPDATE和DELETE不加WHERE会操作全表。查询时使用LIKE进行模糊匹配,ORDER BY排序,LIMIT/OFFSET分页。设置row_factory为sqlite3.Row可通过列名访问字段,提升代码可读性。
四、参数化查询
参数化查询是数据库编程中最重要的安全实践之一。所谓的参数化查询,是指在SQL语句中使用占位符代替实际的值,然后通过执行参数将具体的值传递进去。这样做有两个核心好处:第一,从根本上防止SQL注入攻击,因为参数值永远不会被解释为SQL代码;第二,提高查询性能,因为相同的SQL语句可以预编译并缓存执行计划,重复执行时无需重新解析。在sqlite3模块中,支持两种占位符风格:?风格的问号占位符和:name风格的命名占位符。executemany()方法配合参数化查询可以实现高效的批量数据操作,将大量数据插入操作包含在单次数据库事务中执行,性能提升数十倍。
在构建动态查询条件时,参数化查询需要注意一些特殊场景的处理。对于IN子句中的列表参数,不能直接传递列表对象,需要动态生成对应数量的占位符。对于LIKE查询中的通配符,应该将通配符放在参数值中而非SQL语句中。对于INSERT OR REPLACE、INSERT INTO ... ON CONFLICT等进阶语法,同样可以使用参数化查询。在办公自动化脚本中,处理用户输入、读取配置文件、从外部文件导入数据时,必须使用参数化查询来确保安全性。即使是对可信的内部数据,参数化查询也比字符串拼接更安全和规范。
除了基础的安全防护,参数化查询还支持Python原生类型的自动转换。sqlite3模块会自动将Python的None转换为SQL的NULL,int转为INTEGER,float转为REAL,str转为TEXT,bytes转为BLOB。这种类型映射机制大大简化了数据读写操作。但在处理日期时间对象时需要注意,sqlite3默认不支持datetime对象的直接存储,需要通过detect_types参数启用类型检测或者手动进行字符串序列化。
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 方式1:?占位符(推荐用法)
name = "张三"
department = "技术部"
cursor.execute("""
SELECT * FROM employees WHERE name = ? AND department = ?
""", (name, department))
print("问号占位符结果:")
print(f" {cursor.fetchone()}")
# 方式2:命名占位符
cursor.execute("""
SELECT * FROM employees WHERE salary >= :min_sal AND department = :dept
""", {"min_sal": 20000, "dept": "技术部"})
print("\n命名占位符结果:")
for row in cursor.fetchall():
print(f" {row}")
# 方式3:executemany批量参数化
new_employees = [
('刘一', '技术部', '实习生', 5000, 'liuyi@company.com', '2026-05-01'),
('陈二', '市场部', '实习生', 5000, 'chener@company.com', '2026-05-01'),
]
cursor.executemany("""
INSERT INTO employees (name, department, position, salary, email, hire_date)
VALUES (?, ?, ?, ?, ?, ?)
""", new_employees)
print(f"\n批量插入 {len(new_employees)} 条新员工记录")
conn.commit()
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 防止SQL注入演示:恶意输入
# 用户输入(模拟恶意输入)
user_input_name = "'; DROP TABLE employees; --"
# 危险做法:字符串拼接(绝对不要这样做!!!)
# cursor.execute(f"SELECT * FROM employees WHERE name = '{user_input_name}'")
# 上面这行执行后变成了:
# SELECT * FROM employees WHERE name = ''; DROP TABLE employees; --'
# 安全做法:参数化查询
cursor.execute("SELECT * FROM employees WHERE name = ?", (user_input_name,))
result = cursor.fetchall()
print(f"参数化查询阻止了SQL注入,返回空结果: {result}")
print("参数值被安全地作为数据处理,不会被视为SQL代码执行")
# 实用示例:动态IN子句
department_list = ['技术部', '市场部']
placeholders = ','.join(['?'] * len(department_list))
cursor.execute(f"""
SELECT name, department, salary
FROM employees
WHERE department IN ({placeholders})
ORDER BY salary DESC
""", department_list)
print(f"\n动态IN查询 (部门: {department_list}):")
for row in cursor.fetchall():
print(f" {row[0]:4s} | {row[1]:6s} | {row[2]:>8.0f}")
# LIKE查询参数化
search_term = "%工程师%"
cursor.execute("""
SELECT name, position FROM employees WHERE position LIKE ?
""", (search_term,))
print(f"\n职位包含'工程师'的员工:")
for row in cursor.fetchall():
print(f" {row[0]:4s} - {row[1]}")
conn.close()
import sqlite3
from datetime import datetime, date
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# Python类型与SQLite类型自动转换演示
data_to_insert = (
None, # -> NULL
42, # -> INTEGER
3.14159, # -> REAL
"文本数据", # -> TEXT
b"\x00\x01\x02", # -> BLOB
True, # -> INTEGER (1)
False, # -> INTEGER (0)
)
cursor.execute("""
CREATE TABLE IF NOT EXISTS type_demo (
id INTEGER PRIMARY KEY,
val_null INTEGER,
val_int INTEGER,
val_real REAL,
val_text TEXT,
val_blob BLOB,
val_bool INTEGER,
val_bool2 INTEGER
)
""")
cursor.execute("""
INSERT INTO type_demo VALUES (1, ?, ?, ?, ?, ?, ?, ?)
""", data_to_insert)
# 读取并检查类型转换
cursor.execute("SELECT * FROM type_demo")
row = cursor.fetchone()
print(f"Python类型 {type(data_to_insert[4]).__name__} -> SQLite类型 -> Python类型 {type(row[5]).__name__}: {row[5]}")
print(f"Python类型 {type(data_to_insert[6]).__name__} -> SQLite类型 -> Python类型 {type(row[6]).__name__}: {row[6]}")
conn.commit()
conn.close()
核心要点: 始终使用参数化查询(?占位符或:name占位符)而非字符串拼接SQL,这是防止SQL注入的唯一可靠方法。executemany()配合参数化查询可实现高效的批量数据处理。动态IN子句需根据参数数量动态生成对应数量的占位符。
五、事务管理
事务是数据库管理系统中保证数据一致性和完整性的核心机制。SQLite严格遵循ACID特性:原子性(Atomicity)确保事务中的所有操作要么全部成功要么全部失败;一致性(Consistency)确保数据在事务前后都处于合法状态;隔离性(Isolation)确保并发事务之间互不干扰;持久性(Durability)确保已提交的事务结果永久保存。在sqlite3模块中,事务管理默认行为是在数据修改语句(INSERT、UPDATE、DELETE、CREATE等)执行前自动开启事务。Connection对象提供了三个核心事务控制方法:commit()提交事务,将修改写入磁盘;rollback()回滚事务,撤销所有未提交的修改;execute("SAVEPOINT ...")创建保存点,实现事务内的部分回滚。
sqlite3模块的默认事务行为是:当第一次执行数据修改语句时自动开启事务,后续的所有修改操作都在同一事务中,直到调用commit()或rollback()。这种设计为开发者提供了显式控制事务边界的能力。如果不主动调用commit(),连接关闭时未提交的修改会自动回滚。对于只需要单条SQL语句自动提交的场景,可以通过设置Connection的isolation_level属性为None来关闭自动事务管理,进入自动提交模式(autocommit)。在自动提交模式下,每条SQL语句都在独立的事务中执行,修改立即生效。
在处理大量数据操作时,合理使用事务可以大幅提升性能。每执行一条INSERT语句都自动提交的话,会产生大量的磁盘写入操作。将大量INSERT语句包裹在单个事务中,可以显著减少磁盘I/O操作,性能提升可达数十倍甚至数百倍。SAVEPOINT机制允许在事务内部设置保存点,当后续操作出错时可以选择回滚到保存点而非整个事务,这在复杂的数据处理流程中非常有用。事务隔离级别方面,SQLite支持四种隔离级别:DEFERRED、IMMEDIATE、EXCLUSIVE,通过BEGIN语句指定。在办公自动化场景中,推荐使用DEFERRED(默认)级别,在需要读写分离的场景可使用IMMEDIATE级别防止写饥饿。
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
try:
# 显式开启事务(可选,默认会在第一次修改时自动开启)
cursor.execute("BEGIN TRANSACTION")
# 执行一系列数据库操作
cursor.execute("""
UPDATE employees SET salary = salary * 1.05
WHERE department = '市场部'
""")
market_count = cursor.rowcount
print(f"市场部 {market_count} 名员工加薪5%")
cursor.execute("""
UPDATE employees SET salary = salary * 1.08
WHERE department = '技术部'
""")
tech_count = cursor.rowcount
print(f"技术部 {tech_count} 名员工加薪8%")
# 验证数据一致性
cursor.execute("SELECT SUM(salary) FROM employees")
total_before = cursor.fetchone()[0]
print(f"调薪后薪资总额: {total_before:.0f}")
# 提交事务
conn.commit()
print("事务已提交,所有修改已保存")
except Exception as e:
# 发生错误时回滚事务
conn.rollback()
print(f"操作失败,事务已回滚: {e}")
finally:
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 演示回滚效果
print("=== 事务回滚演示 ===")
cursor.execute("SELECT COUNT(*) FROM employees")
count_before = cursor.fetchone()[0]
print(f"回滚前员工总数: {count_before}")
try:
cursor.execute("BEGIN")
# 删除所有员工(危险操作!)
cursor.execute("DELETE FROM employees")
print(f"临时删除 {cursor.rowcount} 名员工")
# 模拟出错了,需要撤销
raise ValueError("模拟操作错误")
except ValueError as e:
print(f"捕获异常: {e}")
cursor.execute("ROLLBACK")
print("事务已回滚")
# 验证数据未被删除
cursor.execute("SELECT COUNT(*) FROM employees")
count_after = cursor.fetchone()[0]
print(f"回滚后员工总数: {count_after}")
print(f"数据安全: {'是' if count_before == count_after else '否'}")
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# SAVEPOINT保存点演示
print("=== 保存点回滚演示 ===")
try:
cursor.execute("BEGIN")
# 第一步:员工加薪
cursor.execute("UPDATE employees SET salary = salary * 2 WHERE name = '张三'")
print("步骤1: 张三四薪 -> 保存点sp1")
# 设置保存点1
cursor.execute("SAVEPOINT sp1")
# 第二步:再操作
cursor.execute("UPDATE employees SET salary = salary * 2 WHERE name = '李四'")
print("步骤2: 李四加薪")
# 设置保存点2
cursor.execute("SAVEPOINT sp2")
# 第三步(有问题的操作)
cursor.execute("DELETE FROM employees WHERE name = '孙七'")
print("步骤3: 误删孙七,准备回滚到此步骤之前")
# 回滚到保存点2(撤销步骤3)
cursor.execute("ROLLBACK TO sp2")
print("已回滚到sp2,孙七数据已恢复")
# 继续提交
conn.commit()
print("事务已提交")
# 验证
cursor.execute("SELECT name, salary FROM employees WHERE name IN ('张三','李四','孙七')")
for row in cursor.fetchall():
print(f" {row[0]:4s} 薪资: {row[1]:.0f}")
except Exception as e:
conn.rollback()
print(f"事务整体回滚: {e}")
conn.close()
核心要点: 事务是保证数据一致性的关键机制。批量修改操作务必包裹在事务中,出错时用rollback()回滚。SAVEPOINT实现了事务内部分回滚,在复杂数据流程中非常实用。SQLite默认自动管理事务边界,也可以通过设置isolation_level来控制。
六、高级查询
SQLite支持丰富的高级查询功能,使数据库不仅仅是一个数据存储容器,更是一个强大的数据分析引擎。JOIN连接是关系型数据库的核心能力,通过将多张表按照一定条件组合起来,实现跨表查询。INNER JOIN(内连接)返回两表中匹配的记录;LEFT JOIN(左连接)返回左表所有记录和右表匹配的记录(不匹配时右表字段为NULL);CROSS JOIN(交叉连接)返回两表的笛卡尔积。多表连接时,建议使用表别名简化SQL语句,并注意连接条件的使用以避免产生不必要的巨大结果集。
GROUP BY分组和聚合函数是实现数据汇总统计的利器。常用聚合函数包括:COUNT(计数)、SUM(求和)、AVG(平均值)、MAX(最大值)、MIN(最小值)、GROUP_CONCAT(字符串拼接)。HAVING子句用于对分组后的结果进行过滤(与WHERE过滤行的区别在于HAVING过滤分组)。子查询(Subquery)是将一个查询的结果作为另一个查询的输入,可以出现在SELECT、FROM、WHERE子句中。在FROM子句中使用子查询时需要使用别名。子查询分为相关子查询(依赖外部查询)和非相关子查询(独立执行)。
UNION操作用于合并多个SELECT语句的结果集,UNION自动去重而UNION ALL保留所有记录。窗口函数(Window Function)是SQLite 3.25.0版本引入的高级分析功能,支持在结果集的行窗口上执行计算,如ROW_NUMBER()、RANK()、LEAD()、LAG()等。窗口函数通过OVER子句定义分区(PARTITION BY)和排序(ORDER BY)规则,在不改变行数的情况下实现复杂的分析计算。这些高级查询功能在办公自动化的报表生成、数据分析、数据清洗等场景中发挥着重要作用。
import sqlite3
conn = sqlite3.connect("office.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# INNER JOIN - 查看员工及其部门信息
cursor.execute("""
SELECT e.name, e.position, e.salary,
d.dept_name, d.manager, d.budget
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name
WHERE e.salary > 15000
ORDER BY e.salary DESC
""")
print("=== 员工与部门信息 (INNER JOIN) ===")
for row in cursor.fetchall():
print(f" {row['name']:4s} | {row['position']:8s} | 薪资:{row['salary']:>6.0f} | 部门:{row['dept_name']:6s} | 经理:{row['manager']}")
# LEFT JOIN - 显示所有部门的预算使用情况
cursor.execute("""
SELECT d.dept_name, d.budget,
COUNT(e.id) as emp_count,
COALESCE(SUM(e.salary), 0) as total_salary,
ROUND(d.budget - COALESCE(SUM(e.salary), 0), 0) as remaining
FROM departments d
LEFT JOIN employees e ON d.dept_name = e.department
GROUP BY d.dept_name
ORDER BY remaining DESC
""")
print("\n=== 部门预算分析 (LEFT JOIN + GROUP BY) ===")
for row in cursor.fetchall():
print(f" {row['dept_name']:6s} | 预算:{row['budget']:>8.0f} | 人数:{row['emp_count']:2d} | 薪资总额:{row['total_salary']:>8.0f} | 剩余:{row['remaining']:>8.0f}")
# 多表子查询
cursor.execute("""
SELECT e.name, e.salary,
(SELECT AVG(salary) FROM employees WHERE department = e.department) as dept_avg,
ROUND(e.salary - (SELECT AVG(salary) FROM employees WHERE department = e.department), 0) as diff
FROM employees e
WHERE e.salary > (SELECT AVG(salary) FROM employees WHERE department = e.department)
ORDER BY diff DESC
""")
print("\n=== 高于部门平均薪资的员工 (子查询) ===")
for row in cursor.fetchall():
print(f" {row['name']:4s} | 薪资:{row['salary']:>6.0f} | 部门平均:{row['dept_avg']:>6.0f} | 超出:{row['diff']:>6.0f}")
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 聚合函数 + GROUP BY + HAVING
cursor.execute("""
SELECT department,
COUNT(*) as emp_count,
ROUND(AVG(salary), 0) as avg_salary,
SUM(salary) as total_salary,
MAX(salary) as max_salary,
MIN(salary) as min_salary,
GROUP_CONCAT(name, ', ') as employee_names
FROM employees
GROUP BY department
HAVING emp_count >= 2
ORDER BY avg_salary DESC
""")
print("=== 各部门统计 (GROUP BY + HAVING) ===")
for row in cursor.fetchall():
print(f" {row[0]:6s} | 人数:{row[1]:2d} | 平均:{row[2]:>6.0f} | 总额:{row[3]:>8.0f}")
print(f" 员工: {row[6]}")
# UNION ALL 合并查询
cursor.execute("""
SELECT name, salary, '高薪' as category FROM employees WHERE salary >= 30000
UNION ALL
SELECT name, salary, '中等' as category FROM employees WHERE salary BETWEEN 15000 AND 29999
UNION ALL
SELECT name, salary, '初级' as category FROM employees WHERE salary < 15000
ORDER BY salary DESC
""")
print("\n=== 薪资分级 (UNION ALL) ===")
for row in cursor.fetchall():
print(f" {row[0]:4s} | {row[1]:>6.0f} | {row[2]}")
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 窗口函数演示(需要SQLite 3.25.0+)
# 每个部门内按薪资排名
cursor.execute("""
SELECT name, department, salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank_dense,
AVG(salary) OVER (PARTITION BY department) as dept_avg_salary,
salary - AVG(salary) OVER (PARTITION BY department) as diff_from_avg,
LEAD(salary, 1) OVER (PARTITION BY department ORDER BY salary DESC) as next_lower_salary,
LAG(salary, 1) OVER (PARTITION BY department ORDER BY salary DESC) as next_higher_salary
FROM employees
ORDER BY department, dept_rank
""")
print("=== 窗口函数分析 ===")
print(f" {'姓名':4s} {'部门':6s} {'薪资':>6s} {'排名':>4s} {'部门平均':>8s} {'与平均差':>8s}")
print(" " + "-"*44)
for row in cursor.fetchall():
if row[5] is not None:
diff = f"{row[5]:>+7.0f}"
else:
diff = " N/A"
print(f" {row[0]:4s} {row[1]:6s} {row[2]:>6.0f} {row[3]:>4d} {row[4]:>8.0f} {diff}")
conn.close()
核心要点: JOIN连接实现多表关联查询,是关系型数据库的核心能力。GROUP BY配合聚合函数实现数据汇总统计,HAVING过滤分组结果。窗口函数(ROW_NUMBER、LEAD/LAG等)实现排名、同比环比分析等高级数据分析功能,在报表自动化中非常实用。
七、数据导入导出
在办公自动化场景中,SQLite数据库经常需要与外部数据格式(CSV、Excel、JSON)进行数据交换。数据导入导出是连接数据库和办公软件的桥梁。CSV(逗号分隔值)是最通用的数据交换格式,Python内置的csv模块配合sqlite3可以实现高效的CSV导入导出。对于Excel文件,需要借助openpyxl或xlrd库读取.xlsx文件,然后将数据写入数据库。JSON格式在Web API和配置文件中广泛使用,Python的json模块可以方便地实现JSON与数据库的互转。在大数据量场景下,SQLite提供了.import命令(通过sqlite3命令行工具)和ATTACH DATABASE语法实现高效的数据库间数据迁移。
数据导出的一般流程是:从SQLite数据库中查询数据,然后在Python中处理数据格式(日期序列化、空值处理、类型转换等),最后写入目标文件。数据导入的流程相反:读取外部文件,验证和清洗数据(处理缺失值、去重、格式规范化等),然后通过参数化查询批量写入数据库。在导入过程中,使用事务包裹写入操作可以大幅提升性能,并确保数据完整性。对于重复导入的场景,使用INSERT OR IGNORE或INSERT OR REPLACE来处理主键冲突,避免数据重复。
数据库备份是数据管理的最后一道防线。SQLite提供了多种备份方式:使用VACUUM命令压缩和重建数据库文件;备份API(sqlite3_backup_init)实现在线热备份;直接复制数据库文件(需确保没有未提交的事务);通过.dump命令导出SQL文本备份。对于自动化办公系统,建议定期执行VACUUM回收磁盘空间并重建索引,同时建立自动备份机制防止数据丢失。
import sqlite3
import csv
import os
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# --- CSV导出 ---
csv_file = "employees_export.csv"
cursor.execute("SELECT * FROM employees")
rows = cursor.fetchall()
# 获取列名
col_names = [desc[0] for desc in cursor.description]
with open(csv_file, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(col_names)
writer.writerows(rows)
print(f"已导出 {len(rows)} 条记录到 {csv_file} ({os.path.getsize(csv_file)} 字节)")
# --- CSV导入 ---
csv_file2 = "new_employees.csv"
# 先创建示例CSV文件
with open(csv_file2, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(['name', 'department', 'position', 'salary', 'email', 'hire_date'])
writer.writerow(['钱十一', '技术部', '前端工程师', 16000, 'qian11@company.com', '2026-05-01'])
writer.writerow(['何十二', '市场部', '市场分析师', 14000, 'he12@company.com', '2026-05-01'])
# 读取CSV并导入数据库
with open(csv_file2, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
imported_count = 0
for row in reader:
cursor.execute("""
INSERT OR IGNORE INTO employees (name, department, position, salary, email, hire_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (row['name'], row['department'], row['position'],
float(row['salary']), row['email'], row['hire_date']))
if cursor.rowcount > 0:
imported_count += 1
conn.commit()
print(f"从CSV导入 {imported_count} 条新员工记录")
# 清理临时文件
os.remove(csv_file)
os.remove(csv_file2)
conn.close()
import sqlite3
import json
conn = sqlite3.connect("office.db")
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# --- JSON导出 ---
cursor.execute("""
SELECT d.dept_name as department,
COUNT(e.id) as employee_count,
ROUND(AVG(e.salary), 2) as avg_salary,
SUM(e.salary) as total_salary
FROM departments d
LEFT JOIN employees e ON d.dept_name = e.department
GROUP BY d.dept_name
""")
rows = cursor.fetchall()
# 将Row对象转为字典列表
data = [dict(row) for row in rows]
json_str = json.dumps(data, ensure_ascii=False, indent=2)
with open("department_stats.json", 'w', encoding='utf-8') as f:
f.write(json_str)
print(f"JSON导出完成: {json_str[:200]}...")
# --- JSON导入 ---
json_data = '''
[
{"name": "吴十三", "department": "技术部", "position": "DevOps工程师", "salary": 21000, "email": "wu13@company.com", "hire_date": "2026-05-05"},
{"name": "郑十四", "department": "财务部", "position": "财务分析师", "salary": 18000, "email": "zheng14@company.com", "hire_date": "2026-05-05"}
]
'''
employees_to_import = json.loads(json_data)
with conn:
for emp in employees_to_import:
cursor.execute("""
INSERT OR IGNORE INTO employees (name, department, position, salary, email, hire_date)
VALUES (:name, :department, :position, :salary, :email, :hire_date)
""", emp)
print(f"从JSON导入 {len(employees_to_import)} 条记录")
conn.close()
import sqlite3
import shutil
import os
conn = sqlite3.connect("office.db")
# --- 数据库备份 ---
# 方法1: 使用backup API创建热备份
backup_conn = sqlite3.connect("office_backup.db")
conn.backup(backup_conn, pages=1000) # 每次复制1000页
backup_conn.close()
print(f"热备份完成: {os.path.getsize('office_backup.db')} 字节")
# 方法2: 导出SQL文本备份
with open("office_dump.sql", 'w', encoding='utf-8') as f:
for line in conn.iterdump():
f.write(line + '\n')
print(f"SQL转储完成,已生成 office_dump.sql")
# 方法3: VACUUM重建优化
conn.execute("VACUUM")
print("VACUUM完成,数据库已优化")
# 验证备份完整性
restore_conn = sqlite3.connect("office_backup.db")
restore_cursor = restore_conn.cursor()
restore_cursor.execute("SELECT COUNT(*) FROM employees")
count = restore_cursor.fetchone()[0]
restore_cursor.execute("SELECT COUNT(*) FROM departments")
dept_count = restore_cursor.fetchone()[0]
print(f"备份验证: employees表{count}条记录, departments表{dept_count}条记录, 数据完好")
restore_conn.close()
conn.close()
# 清理备份文件
os.remove("office_backup.db")
os.remove("office_dump.sql")
核心要点: CSV适用通用数据交换,Excel适用办公文档对接,JSON适用Web API接口。数据导入前务必进行清洗和验证(处理空值、去重、格式校验)。使用事务包裹批量导入可提升性能和确保原子性。定期使用VACUUM和备份保护数据安全。
八、性能与安全
随着数据量的增长,数据库查询性能成为必须关注的问题。索引(Index)是提升查询性能最有效的手段,它类似于书籍的目录,能够将全表扫描(逐行检查)转变为索引查找(直接定位)。创建索引使用CREATE INDEX语句,通常为WHERE子句中的过滤字段、JOIN连接字段和ORDER BY排序字段创建索引。复合索引(多列索引)需要注意列的顺序——最左前缀原则。但索引并非越多越好:每个索引都会增加写入操作(INSERT/UPDATE/DELETE)的开销,占用额外的磁盘空间。EXPLAIN QUERY PLAN命令可以分析SQL查询的执行计划,帮助判断查询是否有效利用了索引。
SQLite的WAL(Write-Ahead Logging)模式通过将写入操作先追加到WAL文件而非直接修改数据库文件,显著提升了并发读写性能。在WAL模式下,读操作和写操作可以并发执行(读不阻塞写,写不阻塞读),非常适合多线程应用场景。启用WAL模式只需执行PRAGMA journal_mode=WAL。其他重要的PRAGMA优化选项包括:synchronous(同步模式,平衡安全性与性能)、cache_size(缓存大小)、page_size(页面大小)、temp_store(临时存储位置)等。对于批量数据操作,可以在事务中暂时关闭同步模式和增加缓存来加速。
安全性方面,SQLite本身不提供用户名/密码认证机制,但可以通过加密扩展(如SEE、SQLCipher、SQLiteCrypt)实现数据库文件加密。SQLCipher是最流行的开源SQLite加密扩展,它基于256位AES加密算法对整个数据库文件进行透明加密。对于不需要加密但需要防篡改的场景,可以使用PRAGMA integrity_check进行数据完整性验证。多线程安全方面,sqlite3.Connection对象默认是线程安全的,但同一连接的不同游标不能跨线程使用。推荐的多线程模式是每个线程使用独立的数据库连接,或者使用连接池管理连接。
import sqlite3
import time
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
# 创建测试表并插入大量数据
cursor.execute("""
CREATE TABLE perf_test (
id INTEGER PRIMARY KEY,
category TEXT,
value INTEGER,
name TEXT
)
""")
# 插入10万条测试数据
import random
names = ['张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
categories = ['A', 'B', 'C', 'D']
print("正在插入10万条测试数据...")
data = [(i, random.choice(categories), random.randint(1, 10000),
random.choice(names)) for i in range(100000)]
with conn:
cursor.executemany("""
INSERT INTO perf_test (id, category, value, name)
VALUES (?, ?, ?, ?)
""", data)
print("数据插入完成,共100000条")
# 无索引查询
start = time.time()
cursor.execute("SELECT * FROM perf_test WHERE category = 'A'")
len(cursor.fetchall())
no_index_time = time.time() - start
print(f"\n无索引查询时间: {no_index_time:.4f} 秒")
# 创建索引
cursor.execute("CREATE INDEX idx_category ON perf_test(category)")
print("已创建索引 idx_category")
# 有索引查询
start = time.time()
cursor.execute("SELECT * FROM perf_test WHERE category = 'A'")
len(cursor.fetchall())
index_time = time.time() - start
print(f"有索引查询时间: {index_time:.4f} 秒")
print(f"性能提升: {no_index_time / index_time:.1f}x")
conn.close()
import sqlite3
conn = sqlite3.connect("office.db")
cursor = conn.cursor()
# 查看查询执行计划
print("=== EXPLAIN QUERY PLAN ===")
# 索引前的查询计划
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM employees WHERE department = '技术部'")
print("无索引查询计划:")
for row in cursor.fetchall():
print(f" {row}")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_emp_dept ON employees(department)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_emp_salary ON employees(salary)")
print("\n已创建索引: idx_emp_dept, idx_emp_salary")
# 索引后的查询计划
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM employees WHERE department = '技术部'")
print("\n有索引查询计划:")
for row in cursor.fetchall():
print(f" {row}")
# 复合查询索引
cursor.execute("""
EXPLAIN QUERY PLAN
SELECT department, AVG(salary)
FROM employees
WHERE salary > 15000
GROUP BY department
""")
print("\n复合查询计划:")
for row in cursor.fetchall():
print(f" {row}")
# 启用WAL模式
cursor.execute("PRAGMA journal_mode=WAL")
journal_mode = cursor.fetchone()[0]
print(f"\n日志模式: {journal_mode}")
# 查看其他优化PRAGMA
cursor.execute("PRAGMA page_size")
print(f"页面大小: {cursor.fetchone()[0]} 字节")
cursor.execute("PRAGMA cache_size")
cache_size = cursor.fetchone()[0]
print(f"缓存大小: {cache_size} 页 ({cache_size * 1024 / 1024:.0f} MB)")
cursor.execute("PRAGMA synchronous")
sync_modes = {0: 'OFF', 1: 'NORMAL', 2: 'FULL'}
print(f"同步模式: {cursor.fetchone()[0]} ({sync_modes.get(cursor.fetchone()[0], 'UNKNOWN')})")
conn.close()
import sqlite3
import threading
import time
# 多线程安全访问演示
results = []
def worker(thread_id, db_path):
"""工作线程:独立连接,读写数据库"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# WAL模式支持并发读
cursor.execute("PRAGMA journal_mode=WAL")
for i in range(5):
# 读取
cursor.execute("SELECT COUNT(*) FROM employees")
count = cursor.fetchone()[0]
# 写入
cursor.execute("""
INSERT INTO employees (name, department, position, salary, email, hire_date)
VALUES (?, ?, ?, ?, ?, ?)
""", (f"线程{thread_id}_第{i+1}次", '技术部', '临时工', 10000,
f"thread{thread_id}_{i}@temp.com", '2026-05-05'))
conn.commit()
results.append(f"线程{thread_id}: 第{i+1}次写入成功, 当前总记录数={count+1}")
time.sleep(0.01)
except Exception as e:
results.append(f"线程{thread_id}出错: {e}")
finally:
conn.close()
# 启动多个线程并发访问
print("=== 多线程并发访问演示 ===")
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, "office.db"))
threads.append(t)
t.start()
for t in threads:
t.join()
for r in results:
print(f" {r}")
print(f"\n所有线程执行完毕,共写入 {len(results)} 条记录")
# 连接池模式概念演示(实际生产中使用queue.Queue)
import queue
class SQLiteConnectionPool:
"""简单的SQLite连接池实现"""
def __init__(self, db_path, max_connections=5):
self.db_path = db_path
self._pool = queue.Queue(maxsize=max_connections)
for _ in range(max_connections):
conn = sqlite3.connect(db_path, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
self._pool.put(conn)
def get_connection(self):
return self._pool.get()
def return_connection(self, conn):
self._pool.put(conn)
pool = SQLiteConnectionPool("office.db", max_connections=3)
print(f"\n连接池已创建,最大连接数: 3")
conn_from_pool = pool.get_connection()
print("从连接池获取连接成功")
pool.return_connection(conn_from_pool)
print("归还连接成功")
核心要点: 索引是提升查询性能最有效的手段,但要权衡读写性能。EXPLAIN QUERY PLAN帮助分析查询优化效果。WAL模式显著提升并发读写性能,推荐在办公自动化场景中启用。多线程环境下每个线程使用独立连接,使用连接池管理资源。
九、实战案例
理论学习最终要落实到实际应用中。下面通过三个完整的实战案例,展示sqlite3在办公自动化中的典型应用。第一个案例是员工信息管理系统,涵盖部门管理、员工信息维护、薪资统计和报表生成;第二个案例是图书库存管理系统,涵盖图书入库、借出、归还和库存预警功能;第三个案例是日志存储与查询系统,实现应用日志的结构化存储、分级检索和统计分析。通过这些案例,可以更深入地理解sqlite3在真实场景中的应用方式和最佳实践。
在设计实战系统时,建议遵循以下原则:表结构设计要充分考虑业务需求的可扩展性;关键字段建立索引保证查询性能;数据写入使用事务保证完整性;用户输入处理使用参数化查询防止注入;异常处理要完善并提供有意义的错误提示;输出格式要清晰易读,便于在终端或日志中查看。以下几个案例综合运用了前面八个章节的知识点,是完整的可运行示例代码。
import sqlite3
from datetime import datetime, timedelta
import random
class EmployeeManagementSystem:
"""员工信息管理系统"""
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self._init_db()
def _init_db(self):
"""初始化数据库表结构"""
self.cursor.executescript("""
CREATE TABLE IF NOT EXISTS departments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dept_name TEXT NOT NULL UNIQUE,
manager TEXT,
budget REAL DEFAULT 0.0,
created_at TEXT DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
emp_no TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
gender TEXT CHECK(gender IN ('男','女')),
department_id INTEGER,
position TEXT,
salary REAL DEFAULT 0.0,
phone TEXT,
email TEXT UNIQUE,
status TEXT DEFAULT '在职' CHECK(status IN ('在职','离职','休假')),
hire_date TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
FOREIGN KEY (department_id) REFERENCES departments(id)
);
CREATE INDEX IF NOT EXISTS idx_emp_dept_id ON employees(department_id);
CREATE INDEX IF NOT EXISTS idx_emp_status ON employees(status);
""")
self.conn.commit()
print("员工管理系统初始化完成")
def add_department(self, dept_name, manager, budget):
"""添加部门"""
try:
self.cursor.execute("""
INSERT INTO departments (dept_name, manager, budget)
VALUES (?, ?, ?)
""", (dept_name, manager, budget))
self.conn.commit()
return f"部门 '{dept_name}' 创建成功"
except sqlite3.IntegrityError:
return f"部门 '{dept_name}' 已存在"
def add_employee(self, emp_no, name, gender, dept_name, position, salary, phone, email):
"""添加员工"""
# 先获取部门ID
self.cursor.execute("SELECT id FROM departments WHERE dept_name = ?", (dept_name,))
dept = self.cursor.fetchone()
if not dept:
return f"错误: 部门 '{dept_name}' 不存在"
try:
self.cursor.execute("""
INSERT INTO employees (emp_no, name, gender, department_id, position, salary, phone, email, hire_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, date('now'))
""", (emp_no, name, gender, dept['id'], position, salary, phone, email))
self.conn.commit()
return f"员工 '{name}' 入职成功 (工号: {emp_no})"
except sqlite3.IntegrityError as e:
return f"添加失败: 工号或邮箱已存在 ({str(e)})"
def list_employees(self, department=None, status=None):
"""查询员工列表"""
sql = """
SELECT e.emp_no, e.name, e.gender, d.dept_name, e.position,
e.salary, e.status, e.hire_date
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE 1=1
"""
params = []
if department:
sql += " AND d.dept_name = ?"
params.append(department)
if status:
sql += " AND e.status = ?"
params.append(status)
sql += " ORDER BY d.dept_name, e.salary DESC"
self.cursor.execute(sql, params)
rows = self.cursor.fetchall()
if not rows:
return "未找到匹配的员工记录"
result = [f"\n{'工号':10s} {'姓名':6s} {'性别':4s} {'部门':8s} {'职位':10s} {'薪资':>8s} {'状态':6s} {'入职日期':12s}"]
result.append("-" * 70)
for r in rows:
result.append(f"{r['emp_no']:10s} {r['name']:6s} {r['gender']:4s} {r['dept_name']:8s} {r['position']:10s} {r['salary']:>8.0f} {r['status']:6s} {r['hire_date']:12s}")
return '\n'.join(result)
def salary_report(self):
"""薪资统计报表"""
self.cursor.execute("""
SELECT d.dept_name,
COUNT(e.id) as emp_count,
ROUND(AVG(e.salary), 0) as avg_salary,
MAX(e.salary) as max_salary,
MIN(e.salary) as min_salary,
SUM(e.salary) as total_salary
FROM departments d
LEFT JOIN employees e ON d.id = e.department_id AND e.status = '在职'
GROUP BY d.dept_name
ORDER BY avg_salary DESC
""")
rows = self.cursor.fetchall()
result = ["\n=== 部门薪资统计报表 ==="]
result.append(f"{'部门':8s} {'人数':4s} {'平均薪资':>8s} {'最高薪资':>8s} {'最低薪资':>8s} {'总薪资':>10s}")
result.append("-" * 50)
for r in rows:
result.append(f"{r['dept_name']:8s} {r['emp_count']:4d} {r['avg_salary']:>8.0f} {r['max_salary']:>8.0f} {r['min_salary']:>8.0f} {r['total_salary']:>10.0f}")
result.append(f"\n{'总计':8s} {sum(r['emp_count'] for r in rows):4d} {sum(r['total_salary'] for r in rows):>10.0f}")
return '\n'.join(result)
def close(self):
self.conn.close()
# 运行示例
print("=" * 50)
print("员工信息管理系统 - 实战案例")
print("=" * 50)
ems = EmployeeManagementSystem("employee_system.db")
# 添加部门
for dept in [('技术部', '张三', 500000), ('市场部', '李四', 300000), ('财务部', '王五', 200000)]:
print(ems.add_department(*dept))
# 添加员工
employees = [
('EMP001', '张三', '男', '技术部', '技术总监', 35000, '13800138001', 'zhangsan@corp.com'),
('EMP002', '李四', '女', '市场部', '市场经理', 25000, '13800138002', 'lisi@corp.com'),
('EMP003', '王五', '男', '财务部', '财务主管', 28000, '13800138003', 'wangwu@corp.com'),
('EMP004', '赵六', '男', '技术部', '高级工程师', 22000, '13800138004', 'zhaoliu@corp.com'),
('EMP005', '孙七', '女', '技术部', '工程师', 15000, '13800138005', 'sunqi@corp.com'),
]
for emp in employees:
print(ems.add_employee(*emp))
# 查询和报表
print(ems.list_employees(department='技术部'))
print(ems.list_employees(status='在职'))
print(ems.salary_report())
ems.close()
# 清理
import os
os.remove("employee_system.db")
import sqlite3
from datetime import datetime, timedelta
import random
class BookInventorySystem:
"""图书库存管理系统"""
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self._init_db()
def _init_db(self):
self.cursor.executescript("""
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
isbn TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
author TEXT NOT NULL,
publisher TEXT,
category TEXT,
price REAL,
stock INTEGER DEFAULT 0,
min_stock INTEGER DEFAULT 5,
location TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS borrow_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
book_id INTEGER NOT NULL,
borrower TEXT NOT NULL,
borrow_date TEXT NOT NULL,
due_date TEXT NOT NULL,
return_date TEXT,
status TEXT DEFAULT '借出' CHECK(status IN ('借出','已还','逾期')),
FOREIGN KEY (book_id) REFERENCES books(id)
);
CREATE INDEX IF NOT EXISTS idx_borrow_status ON borrow_records(status);
CREATE INDEX IF NOT EXISTS idx_book_category ON books(category);
""")
self.conn.commit()
def add_book(self, isbn, title, author, publisher, category, price, stock, location):
"""添加图书"""
try:
self.cursor.execute("""
INSERT INTO books (isbn, title, author, publisher, category, price, stock, min_stock, location)
VALUES (?, ?, ?, ?, ?, ?, ?, 5, ?)
""", (isbn, title, author, publisher, category, price, stock, location))
self.conn.commit()
return f"图书 '{title}' 入库成功 (ISBN: {isbn}, 数量: {stock})"
except sqlite3.IntegrityError:
return f"ISBN '{isbn}' 已存在"
def borrow_book(self, isbn, borrower):
"""借出图书"""
self.cursor.execute("SELECT id, title, stock FROM books WHERE isbn = ?", (isbn,))
book = self.cursor.fetchone()
if not book:
return f"错误: ISBN '{isbn}' 不存在"
if book['stock'] <= 0:
return f"图书 '{book['title']}' 库存不足"
borrow_date = datetime.now()
due_date = borrow_date + timedelta(days=30)
with self.conn:
self.cursor.execute("UPDATE books SET stock = stock - 1 WHERE id = ?", (book['id'],))
self.cursor.execute("""
INSERT INTO borrow_records (book_id, borrower, borrow_date, due_date)
VALUES (?, ?, ?, ?)
""", (book['id'], borrower, borrow_date.strftime('%Y-%m-%d'), due_date.strftime('%Y-%m-%d')))
return f"图书 '{book['title']}' 已借出给 '{borrower}', 应还日期: {due_date.strftime('%Y-%m-%d')}"
def return_book(self, isbn, borrower):
"""归还图书"""
self.cursor.execute("""
SELECT br.id, b.title, br.borrow_date, br.due_date
FROM borrow_records br
JOIN books b ON br.book_id = b.id
WHERE b.isbn = ? AND br.borrower = ? AND br.status = '借出'
""", (isbn, borrower))
record = self.cursor.fetchone()
if not record:
return f"未找到 '{borrower}' 借出的ISBN '{isbn}' 的借阅记录"
return_date = datetime.now()
due = datetime.strptime(record['due_date'], '%Y-%m-%d')
overdue_days = (return_date - due).days if return_date > due else 0
with self.conn:
self.cursor.execute("""
UPDATE borrow_records SET return_date = ?, status = ?
WHERE id = ?
""", (return_date.strftime('%Y-%m-%d'), '逾期' if overdue_days > 0 else '已还', record['id']))
self.cursor.execute("UPDATE books SET stock = stock + 1 WHERE isbn = ?", (isbn,))
msg = f"图书 '{record['title']}' 归还成功"
if overdue_days > 0:
msg += f", 逾期 {overdue_days} 天"
return msg
def check_stock_alert(self):
"""库存预警检查"""
self.cursor.execute("""
SELECT title, isbn, stock, min_stock, location
FROM books WHERE stock <= min_stock
ORDER BY stock ASC
""")
low_stock = self.cursor.fetchall()
if not low_stock:
return "所有图书库存充足"
result = ["\n=== 库存预警 ==="]
for b in low_stock:
result.append(f" [{b['title']}] ISBN:{b['isbn']} 库存:{b['stock']}/{b['min_stock']} 位置:{b['location']}")
return '\n'.join(result)
def close(self):
self.conn.close()
# 运行示例
print("\n" + "=" * 50)
print("图书库存管理系统 - 实战案例")
print("=" * 50)
bis = BookInventorySystem("book_inventory.db")
# 添加图书
books_data = [
('978-7-111-12345-6', 'Python编程从入门到实践', 'Eric Matthes', '人民邮电出版社', '编程', 89.0, 10, 'A-01'),
('978-7-111-23456-7', '流畅的Python', 'Luciano Ramalho', '人民邮电出版社', '编程', 129.0, 8, 'A-02'),
('978-7-111-34567-8', '数据结构与算法分析', 'Mark Allen Weiss', '机械工业出版社', '计算机', 79.0, 3, 'B-01'),
('978-7-111-45678-9', 'SQLite权威指南', 'Grant Allen', '电子工业出版社', '数据库', 69.0, 2, 'B-02'),
('978-7-111-56789-0', '自动化办公实战手册', '王小明', '清华大学出版社', '办公', 59.0, 1, 'C-01'),
]
for book in books_data:
print(bis.add_book(*book))
# 借出操作
print(bis.borrow_book('978-7-111-56789-0', '张三'))
print(bis.borrow_book('978-7-111-34567-8', '李四'))
# 查看库存预警
print(bis.check_stock_alert())
# 归还操作
print(bis.return_book('978-7-111-56789-0', '张三'))
# 最终库存
bis.cursor.execute("SELECT title, stock FROM books WHERE stock <= min_stock")
for row in bis.cursor.fetchall():
print(f" 预警: {row['title']} 库存={row['stock']}")
bis.close()
# 清理
import os
os.remove("book_inventory.db")
import sqlite3
from datetime import datetime
import random
import time
class LogStorageSystem:
"""日志存储与查询系统"""
LEVELS = {'DEBUG': 0, 'INFO': 1, 'WARNING': 2, 'ERROR': 3, 'CRITICAL': 4}
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
self._init_db()
def _init_db(self):
self.cursor.executescript("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
level TEXT NOT NULL CHECK(level IN ('DEBUG','INFO','WARNING','ERROR','CRITICAL')),
logger TEXT NOT NULL,
message TEXT NOT NULL,
module TEXT,
line INTEGER,
duration REAL,
extra TEXT
);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level);
CREATE INDEX IF NOT EXISTS idx_logs_logger ON logs(logger);
CREATE INDEX IF NOT EXISTS idx_logs_module ON logs(module);
""")
self.conn.commit()
def write_log(self, level, logger, message, module=None, line=None, duration=None, extra=None):
"""写入日志"""
self.cursor.execute("""
INSERT INTO logs (timestamp, level, logger, message, module, line, duration, extra)
VALUES (datetime('now', 'localtime'), ?, ?, ?, ?, ?, ?, ?)
""", (level, logger, message, module, line, duration, extra))
self.conn.commit()
def simulate_logs(self, count=100):
"""生成模拟日志数据"""
loggers = ['app.server', 'app.database', 'app.auth', 'app.api', 'app.cache']
modules = ['server.py', 'db.py', 'auth.py', 'api.py', 'cache.py']
messages = [
('INFO', '服务器启动成功'),
('INFO', '数据库连接建立'),
('DEBUG', '用户请求处理完成: GET /api/users'),
('INFO', f'用户登录成功: user_{random.randint(1,100)}'),
('WARNING', f'数据库查询耗时过长: {random.randint(500, 2000)}ms'),
('ERROR', f'数据库连接失败: timeout after 30s'),
('CRITICAL', '磁盘空间不足, 仅剩 200MB'),
('INFO', '缓存刷新完成'),
('DEBUG', 'API响应时间: 45ms'),
('ERROR', '无效的用户认证令牌'),
('INFO', '定时任务执行完成: 数据备份'),
('WARNING', 'API调用频率过高, 触发限流'),
]
for _ in range(count):
level, msg_template = random.choice(messages)
logger = random.choice(loggers)
module = random.choice(modules)
line = random.randint(10, 500)
duration = round(random.uniform(0.01, 2.5), 3) if level in ('WARNING', 'ERROR') else None
extra = f'{{"request_id": "req_{random.randint(1000,9999)}"}}' if random.random() > 0.5 else None
self.write_log(level, logger, msg_template, module, line, duration, extra)
def query_logs(self, level=None, logger=None, module=None, start_date=None, end_date=None, limit=20):
"""查询日志"""
sql = "SELECT * FROM logs WHERE 1=1"
params = []
if level:
sql += " AND level = ?"
params.append(level)
if logger:
sql += " AND logger = ?"
params.append(logger)
if module:
sql += " AND module = ?"
params.append(module)
if start_date:
sql += " AND timestamp >= ?"
params.append(start_date)
if end_date:
sql += " AND timestamp <= ?"
params.append(end_date)
sql += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
self.cursor.execute(sql, params)
return self.cursor.fetchall()
def log_statistics(self):
"""日志统计"""
self.cursor.execute("""
SELECT level, COUNT(*) as count,
MIN(timestamp) as first_log,
MAX(timestamp) as last_log
FROM logs
GROUP BY level
ORDER BY count DESC
""")
level_stats = self.cursor.fetchall()
self.cursor.execute("""
SELECT logger, COUNT(*) as count,
ROUND(AVG(duration), 3) as avg_duration
FROM logs
WHERE duration IS NOT NULL
GROUP BY logger
ORDER BY count DESC
""")
logger_stats = self.cursor.fetchall()
self.cursor.execute("""
SELECT strftime('%H', timestamp) as hour,
COUNT(*) as count
FROM logs
GROUP BY hour
ORDER BY hour
""")
hourly_stats = self.cursor.fetchall()
return level_stats, logger_stats, hourly_stats
def clean_old_logs(self, days=30):
"""清理旧日志"""
self.cursor.execute("""
DELETE FROM logs WHERE timestamp < date('now', ? || ' days')
""", (f'-{days}',))
deleted = self.cursor.rowcount
self.conn.commit()
return deleted
def close(self):
self.conn.close()
# 运行示例
print("\n" + "=" * 50)
print("日志存储与查询系统 - 实战案例")
print("=" * 50)
lss = LogStorageSystem("log_system.db")
# 生成100条模拟日志
print("正在生成100条模拟日志...")
lss.simulate_logs(100)
print("日志生成完成")
# 按级别查询
print("\n=== ERROR级别日志 ===")
error_logs = lss.query_logs(level='ERROR', limit=5)
for log in error_logs:
print(f" [{log['timestamp']}] {log['level']:8s} {log['logger']:15s} {log['message']}")
# 日志统计
level_stats, logger_stats, hourly_stats = lss.log_statistics()
print("\n=== 日志级别分布 ===")
for s in level_stats:
print(f" {s['level']:10s}: {s['count']:3d} 条 (最早: {s['first_log']}, 最晚: {s['last_log']})")
print("\n=== 每小时日志量 ===")
for s in hourly_stats:
bar = '#' * min(s['count'], 10)
print(f" {s['hour']:2s}时: {bar} ({s['count']}条)")
# 清理旧日志
deleted = lss.clean_old_logs(days=7)
print(f"\n清理7天前日志: 删除 {deleted} 条")
# 总日志数
lss.cursor.execute("SELECT COUNT(*) FROM logs")
total = lss.cursor.fetchone()[0]
print(f"当前日志总量: {total} 条")
lss.close()
# 清理
import os
os.remove("log_system.db")
核心要点: 三个实战案例完整展示了sqlite3在办公自动化中的应用模式。员工管理系统体现了CRUD和联表查询;图书库存系统展示了事务一致性和库存预警;日志系统演示了结构化存储、多条件查询和统计分析。这些模式可直接复用到实际的办公自动化项目中。