← 返回Claude Code工作流目录
← 返回学习笔记首页
专题: Claude Code 工作流系统学习
关键词: Claude Code, 数据库迁移, Alembic, Flyway, 模式变更, 回滚, 数据迁移, 版本控制
一、工作流概述
数据库迁移是软件开发中最具风险的环节之一。一个错误的ALTER TABLE可能导致数据丢失、服务中断甚至回滚失败。Claude Code数据库迁移工作流旨在通过标准化流程、自动化工具集成和严格的验证机制,将数据库模式变更的风险降至最低。本工作流覆盖从迁移文件创建、代码编写、执行策略到版本管理的完整生命周期,支持Alembic(Python)、Flyway(Java)、Liquibase(Java)、Knex(Node.js)等主流迁移框架。
核心原则: 每次迁移必须可回滚、可重复、可验证。向下兼容性是生产数据库变更的底线,任何时候都不能被突破。
工作流总览
阶段 输入 输出 Claude Code角色
迁移规划 需求描述 迁移方案文档 需求分析、方案建议、风险评估
迁移编写 迁移方案 迁移脚本(up/down) 代码生成、SQL审核、最佳实践检查
迁移执行 迁移脚本 已迁移数据库 执行监控、错误处理、自动回滚
迁移验证 迁移后数据库 验证报告 差异分析、数据校验、性能评估
版本管理 迁移历史 更新基线 历史梳理、冲突解决、基线重建
二、迁移文件管理
迁移文件是数据库变更的基本单元。每个迁移文件包含一个"向上迁移"(应用变更)和一个"向下迁移"(撤销变更)。Claude Code通过对话式交互协助开发者生成、组织和管理这些文件,确保命名规范、版本有序、依赖清晰。
Alembic迁移生成
Alembic是SQLAlchemy的数据库迁移工具,广泛用于Python项目。Claude Code可以基于模型定义自动生成迁移脚本,以下是通过对话生成迁移的典型流程。
# Claude Code根据需求自动生成Alembic迁移脚本
#
# 用户: 在users表上添加last_login_at和login_count两个字段
# Claude Code生成如下迁移文件:
"""add last_login_at and login_count to users table
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2026-05-08 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'users',
sa.Column('last_login_at', sa.DateTime(timezone=True),
nullable=True,
comment='最后登录时间')
)
op.add_column(
'users',
sa.Column('login_count', sa.Integer(),
server_default='0',
nullable=False,
comment='登录次数')
)
# 为login_count创建索引以支持统计分析查询
op.create_index('idx_users_login_count', 'users', ['login_count'])
def downgrade():
op.drop_index('idx_users_login_count', table_name='users')
op.drop_column('users', 'login_count')
op.drop_column('users', 'last_login_at')
命名规范
良好的命名规范是迁移文件管理的基础。不同的迁移框架有不同的命名惯例,但核心原则一致:可排序、可识别、可关联。
# 推荐的迁移文件命名模式
# 1. 时间戳前缀模式(Alembic默认)
# 格式: YYYYMMDD_HHMMSS_<描述>.py
# 示例:
20260508_103000_add_login_fields_to_users.py
20260509_090000_create_orders_table.py
20260510_143000_add_foreign_key_orders_users.py
# 2. 序号前缀模式(Flyway推荐)
# 格式: V<序号>__<描述>.sql
# 示例:
V1__create_users_table.sql
V2__create_orders_table.sql
V3__add_product_catalog.sql
V4__seed_initial_categories.sql
# 3. 语义版本模式
# 格式: v<主版本>.<次版本>.<修订号>__<描述>.sql
# 示例:
v1.0.0__initial_schema.sql
v1.1.0__add_shipping_address.sql
v1.2.0__add_order_status_index.sql
v2.0.0__redesign_user_profile.sql
Claude Code实践建议: 在项目初始化时通过对话约定命名规范,Claude Code会在每次生成迁移文件时自动遵循该规范,并在代码审查中检查命名一致性。
版本控制与依赖排序
当多个开发者并行工作时,迁移文件的依赖关系和执行顺序至关重要。Alembic通过链式引用(down_revision指向前一个迁移)维护线性历史,但分支和合并场景需要特殊处理。
# Alembic依赖排序示例
# 基础迁移:v1.0 创建核心表
# 文件: 20260501_000001_create_users.py
revision = '001'
down_revision = None # 首个迁移没有上游
# 依赖迁移:v1.1 扩展用户表
# 文件: 20260502_000001_add_user_profile.py
revision = '002'
down_revision = '001' # 指向001,在创建用户表之后执行
# 并行分支迁移(两个开发者同时工作)
# 开发者A: 20260503_000001_add_orders.py
revision = '003a'
down_revision = '002'
# 开发者B: 20260503_000002_add_notifications.py
revision = '003b'
down_revision = '002'
# 合并迁移(用于合并两个分支)
# 文件: 20260504_000001_merge_003ab.py
revision = '004'
down_revision = '003a' # 第一个父节点
# Alembic支持多个down_revision进行合并
# 实际通过 merge_branch 方式处理:
# alembic merge -m "merge 003a and 003b" 003a 003b
风险提示: 并行分支迁移是冲突的高发区。如果两个分支修改同一个表结构,合并迁移将无法自动解决冲突,必须人工介入。Claude Code可以通过分析两个分支的迁移内容,识别冲突点并辅助生成合并脚本。
三、迁移编写
迁移编写的质量直接决定了数据安全和服务稳定性。Claude Code在迁移编写环节提供从DDL生成到业务逻辑转换的全方位辅助,同时严格执行向下兼容性检查。
建表迁移
创建新表是最基础的迁移类型。Claude Code能够根据实体关系描述或现有的ORM模型自动生成建表语句,并自动添加必要的约束、索引和注释。
-- Flyway SQL格式:创建产品目录表
-- V3__create_product_catalog.sql
CREATE TABLE product_categories (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
slug VARCHAR(120) NOT NULL UNIQUE,
description TEXT,
parent_id BIGINT REFERENCES product_categories(id),
sort_order INTEGER NOT NULL DEFAULT 0,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_categories_parent ON product_categories(parent_id);
CREATE INDEX idx_categories_active ON product_categories(is_active)
WHERE is_active = TRUE;
COMMENT ON TABLE product_categories IS '产品分类表,支持无限层级树形结构';
COMMENT ON COLUMN product_categories.slug IS 'URL友好的唯一标识符';
-- 回滚脚本 V3__rollback.sql
-- DROP TABLE IF EXISTS product_categories CASCADE;
改表迁移
修改已有表结构是最危险的迁移类型。Claude Code会在生成改表迁移时自动评估向下兼容性,包括:新增字段是否有默认值、字段类型变更是否导致数据截断、删除字段是否会破坏上游应用。
# Liquibase XML格式:重构用户表
# 将单字段name拆分为first_name和last_name,保持兼容性
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.0.xsd">
<changeSet id="20260508-01" author="claude-code">
<comment>将users.name拆分为first_name和last_name,保持name字段作为视图兼容</comment>
<addColumn tableName="users">
<column name="first_name" type="VARCHAR(100)">
<constraints nullable="true"/>
</column>
<column name="last_name" type="VARCHAR(100)">
<constraints nullable="true"/>
</column>
</addColumn>
<sql>
UPDATE users SET
first_name = SPLIT_PART(name, ' ', 1),
last_name = SUBSTRING(name FROM POSITION(' ' IN name) + 1)
WHERE first_name IS NULL;
</sql>
<addNotNullConstraint tableName="users"
columnName="first_name" columnDataType="VARCHAR(100)"/>
<addNotNullConstraint tableName="users"
columnName="last_name" columnDataType="VARCHAR(100)"/>
<rollback>
ALTER TABLE users DROP COLUMN first_name;
ALTER TABLE users DROP COLUMN last_name;
</rollback>
</changeSet>
</databaseChangeLog>
索引和约束管理
索引和约束的创建需要权衡查询性能与写入开销。Claude Code会根据查询模式分析建议最优索引策略,并注意并发创建索引以避免锁表。
-- 索引管理最佳实践(PostgreSQL)
-- 1. 并发创建索引(不阻塞DML操作)
CREATE INDEX CONCURRENTLY idx_orders_user_id
ON orders(user_id);
-- 2. 部分索引(仅索引活跃数据,节省空间)
CREATE INDEX CONCURRENTLY idx_active_subscriptions
ON subscriptions(user_id, end_date)
WHERE status = 'active';
-- 3. 覆盖索引(避免回表查询)
CREATE INDEX CONCURRENTLY idx_orders_lookup
ON orders(user_id, status)
INCLUDE (total_amount, created_at);
-- 4. 表达式索引(支持函数查询)
CREATE INDEX CONCURRENTLY idx_users_email_domain
ON users(LOWER(SPLIT_PART(email, '@', 2)));
-- 5. 删除未使用的索引
-- 先通过pg_stat_user_indexes确认索引使用情况
SELECT
schemaname, tablename, indexname,
idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0
ORDER BY tablename;
数据迁移
数据迁移是迁移脚本中最复杂的部分,涉及数据格式转换、清洗、去重等操作。Claude Code在生成数据迁移脚本时自动添加分批处理、进度监控和错误处理逻辑。
# 批量数据迁移脚本(带进度报告和断点续传能力)
# 场景:将旧版JSON配置字段迁移到规范化关联表
import json
import logging
from datetime import datetime
from sqlalchemy import text
from app import db_session
logger = logging.getLogger(__name__)
BATCH_SIZE = 500 # 每批处理500条
def upgrade():
"""将user_preferences中的JSON配置迁移到preference_settings表"""
total_processed = 0
total_errors = 0
last_id = 0
start_time = datetime.now()
logger.info(f"开始数据迁移: {start_time.isoformat()}")
while True:
# 分批获取数据,使用last_id游标实现分批
rows = db_session.execute(
text("""
SELECT id, user_id, config_json, updated_at
FROM user_preferences
WHERE id > :last_id
ORDER BY id
LIMIT :limit
"""),
{"last_id": last_id, "limit": BATCH_SIZE}
).fetchall()
if not rows:
break
for row in rows:
try:
config = json.loads(row.config_json)
# 将JSON配置展开为规范化行
for key, value in config.items():
db_session.execute(
text("""
INSERT INTO preference_settings
(user_id, pref_key, pref_value, created_at, updated_at)
VALUES (:uid, :key, :val, :now, :now)
ON CONFLICT (user_id, pref_key)
DO UPDATE SET pref_value = :val, updated_at = :now
"""),
{
"uid": row.user_id,
"key": key,
"val": str(value),
"now": datetime.utcnow()
}
)
last_id = row.id
total_processed += 1
except (json.JSONDecodeError, Exception) as e:
total_errors += 1
logger.error(f"迁移失败 user_id={row.user_id}: {str(e)}")
# 记录失败但继续,确保不阻塞整个迁移
db_session.commit()
logger.info(
f"已迁移 {total_processed} 条记录, "
f"错误 {total_errors} 条, "
f"当前游标 id={last_id}"
)
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(
f"数据迁移完成: 共{total_processed}条, "
f"错误{total_errors}条, "
f"耗时{elapsed:.2f}秒"
)
def downgrade():
"""回滚:清空迁移后的数据"""
db_session.execute(text("TRUNCATE TABLE preference_settings"))
logger.info("数据迁移已回滚: preference_settings 已清空")
向下兼容性检查清单
生产数据库迁移的黄金法则是向下兼容。Claude Code在每次生成改表迁移时自动执行以下兼容性检查。
检查项 合规做法 违规后果
新增字段 必须允许NULL或有DEFAULT值 锁表、写入失败
删除字段 先标记废弃,等待下游消费方更新后再删除 下游查询崩溃
字段类型变更 仅允许向兼容方向扩展(INT→BIGINT) 数据截断、隐式转换失败
重命名表/字段 创建新对象+数据同步+旧对象做视图 全链路断裂
添加NOT NULL 先填充数据,再分批添加约束 CHECK约束失败导致迁移中断
四、迁移执行策略
迁移执行策略的选型取决于项目阶段、数据库规模和风险承受能力。Claude Code支持三种执行模式,并在执行过程中提供实时监控和自动故障处理。
自动迁移
适用于开发和CI/CD环境。Claude Code自动检测未应用的迁移并按顺序执行,失败时自动触发回滚并提供错误分析。
# Claude Code 自动迁移执行命令
# 1. Alembic 自动迁移
alembic upgrade head
# 2. Flyway 自动迁移
flyway migrate
# 3. Liquibase 自动迁移
liquibase update
# 4. Knex.js 自动迁移
npx knex migrate:latest
# Claude Code 自动迁移监控脚本
#!/bin/bash
# auto-migrate.sh - 带错误处理的自动迁移
echo "[INFO] 开始数据库迁移: $(date '+%Y-%m-%d %H:%M:%S')"
# 前置检查:数据库连接
if ! pg_isready -h $DB_HOST -p $DB_PORT -U $DB_USER; then
echo "[ERROR] 数据库不可达,终止迁移"
exit 1
fi
# 执行迁移
if alembic upgrade head 2>&1; then
echo "[INFO] 迁移成功完成"
# 记录迁移完成状态
echo "MIGRATION_STATUS=SUCCESS" > migration-status.env
else
echo "[ERROR] 迁移失败,执行回滚"
# 回滚到上一个版本
alembic downgrade -1
echo "MIGRATION_STATUS=ROLLED_BACK" > migration-status.env
exit 2
fi
# 后置验证:检查关键表是否存在
for table in users orders products; do
if ! psql -h $DB_HOST -c "SELECT 1 FROM $table LIMIT 1" &>/dev/null; then
echo "[WARN] 表 $table 验证异常"
fi
done
echo "[INFO] 迁移流程结束: $(date '+%Y-%m-%d %H:%M:%S')"
手动审核模式
生产环境推荐的模式。Claude Code生成迁移脚本后,由DBA或高级开发者审核SQL语句,确认无性能风险和兼容性问题后再手动执行。
# Claude Code 生产迁移审核清单
# 审核项1: SQL语法正确性
# Claude Code: 已通过SQL语法验证 (使用 EXPLAIN 预执行)
# 审核项2: 锁影响分析
迁移涉及操作: 新增字段 + 创建索引
锁分析:
- ALTER TABLE ADD COLUMN: AccessExclusiveLock
影响: 阻塞所有DML操作
建议: 在维护窗口执行,预计执行时间 < 50ms
- CREATE INDEX CONCURRENTLY: 仅ShareLock
影响: 不阻塞DML
建议: 始终使用CONCURRENTLY模式
# 审核项3: 大表影响评估
表 users 数据量: 约 500万 行
新增字段: login_count INTEGER DEFAULT 0
评估: 即时DDL操作,不涉及数据重写
# 审核项4: 回滚方案
回滚脚本: V2.1__rollback.sql
回滚类型: 自动回滚
数据丢失风险: 无(新增字段,drop即可)
# 审核结论: 审核通过,可在维护窗口执行
生产环境铁律: 永远不要在业务高峰期执行数据库迁移。大型表结构的变更(如添加NOT NULL约束、修改列类型)必须在维护窗口内执行,并预留至少一倍的缓冲时间用于回滚。
多环境同步
多环境(开发、测试、预发布、生产)的数据库结构同步是迁移管理的核心挑战。Claude Code通过迁移文件版本控制和环境配置管理,确保各环境的结构一致性。
# 多环境同步策略
# 1. 环境配置管理
# alembic.ini 或 .env 文件按环境加载
# 开发环境 (.env.development)
DB_URL=postgresql://dev_user:dev_pass@localhost:5432/dev_db
# 测试环境 (.env.testing)
DB_URL=postgresql://test_user:test_pass@test-host:5432/test_db
# 生产环境 (.env.production)
DB_URL=postgresql://prod_user:prod_pass@prod-host:5432/prod_db
# 2. 环境一致性校验脚本
#!/bin/bash
# check-schema-sync.sh
echo "=== 数据库结构一致性检查 ==="
# 获取各环境的迁移版本
DEV_VERSION=$(DATABASE_URL=$DEV_DB_URL alembic current 2>/dev/null)
STAGING_VERSION=$(DATABASE_URL=$STAGING_DB_URL alembic current 2>/dev/null)
PROD_VERSION=$(DATABASE_URL=$PROD_DB_URL alembic current 2>/dev/null)
echo "开发环境: $DEV_VERSION"
echo "预发布环境: $STAGING_VERSION"
echo "生产环境: $PROD_VERSION"
if [ "$DEV_VERSION" != "$STAGING_VERSION" ]; then
echo "[WARN] 开发环境和预发布环境版本不一致"
fi
if [ "$STAGING_VERSION" != "$PROD_VERSION" ]; then
echo "[WARN] 预发布环境和生产环境版本不一致"
fi
五、迁移验证
迁移后的验证是保障数据安全的重要防线。Claude Code从三个维度进行迁移验证:结构一致性、数据完整性和性能影响。每次迁移执行后自动生成验证报告。
迁移前后结构对比
Claude Code自动对比迁移前后的数据库Schema,生成结构差异报告,确保实际变更与预期一致,不会有意外的副作用。
-- Claude Code 结构差异对比报告
-- 迁移前结构摘要:
-- public.users
-- id BIGSERIAL PK
-- email VARCHAR(255) NOT NULL UNIQUE
-- name VARCHAR(100) NOT NULL
-- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
-- 索引: idx_users_email
-- 迁移后结构摘要:
-- public.users
-- id BIGSERIAL PK
-- email VARCHAR(255) NOT NULL UNIQUE
-- name VARCHAR(100) NOT NULL
-- first_name VARCHAR(100) -- 新增字段
-- last_name VARCHAR(100) -- 新增字段
-- login_count INTEGER NOT NULL DEFAULT 0 -- 新增字段
-- last_login TIMESTAMPTZ -- 新增字段
-- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
-- 索引: idx_users_email, idx_users_login_count
-- 差异对比:
-- [+] 新增字段: first_name, last_name, login_count, last_login
-- [+] 新增索引: idx_users_login_count
-- [=] 未变更: id, email, name, created_at
-- [!] 风险项: 无(所有变更均向下兼容)
-- 自动生成的SQL差异确认
SELECT
column_name, data_type, is_nullable,
column_default, character_maximum_length
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
ORDER BY ordinal_position;
数据完整性检查
数据迁移后,Claude Code执行完整性校验脚本,验证行数一致性、外键完整性、业务约束未被破坏。
# 数据完整性校验脚本
def verify_migration_integrity():
"""迁移后数据完整性自动校验"""
checks = []
# 检查1: 记录数一致性
old_count = db_session.execute(
text("SELECT COUNT(*) FROM user_preferences")
).scalar()
new_count = db_session.execute(
text("SELECT COUNT(DISTINCT user_id) FROM preference_settings")
).scalar()
checks.append({
"check": "用户偏好数据迁移",
"source_count": old_count,
"target_count": new_count,
"status": "PASS" if old_count == new_count else "FAIL"
})
# 检查2: 外键完整性
orphaned = db_session.execute(
text("""
SELECT COUNT(*) FROM preference_settings ps
LEFT JOIN users u ON ps.user_id = u.id
WHERE u.id IS NULL
""")
).scalar()
checks.append({
"check": "外键完整性",
"orphaned_count": orphaned,
"status": "PASS" if orphaned == 0 else "FAIL"
})
# 检查3: 非空约束
null_vals = db_session.execute(
text("""
SELECT COUNT(*) FROM preference_settings
WHERE pref_key IS NULL OR pref_value IS NULL
""")
).scalar()
checks.append({
"check": "非空约束",
"null_count": null_vals,
"status": "PASS" if null_vals == 0 else "FAIL"
})
# 输出报告
print("=== 数据完整性校验报告 ===")
all_pass = True
for c in checks:
status_icon = "[PASS]" if c["status"] == "PASS" else "[FAIL]"
print(f"{status_icon} {c['check']}")
if c["status"] == "FAIL":
all_pass = False
print(f"\n结论: {'所有检查通过' if all_pass else '存在失败的检查项'}")
return all_pass
性能影响评估
结构变更可能影响查询性能。Claude Code在迁移后自动执行关键查询的性能基准测试,对比迁移前后的执行计划变化。
-- 迁移前后执行计划对比
-- 查询: 最近30天的活跃用户订单
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT
u.id, u.email,
COUNT(o.id) as order_count,
SUM(o.total_amount) as total_spent
FROM users u
JOIN orders o ON o.user_id = u.id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
AND o.status IN ('paid', 'shipped', 'completed')
GROUP BY u.id, u.email
ORDER BY total_spent DESC
LIMIT 100;
-- 迁移前执行计划摘要:
-- Seq Scan on orders (cost=0.00..45231.20 rows=152340 width=32)
-- Filter: (status = ANY ('{paid,shipped,completed}'::text[]))
-- 总耗时: 1850 ms
-- 问题: 缺少(status, created_at)复合索引,导致全表扫描
-- 迁移后执行计划摘要:
-- Index Scan using idx_orders_status_date on orders (cost=0.56..1234.50 rows=15234 width=32)
-- Index Cond: ((status = ANY ('{paid,shipped,completed}'::text[]))
-- AND (created_at >= NOW() - INTERVAL '30 days'))
-- 总耗时: 45 ms
-- 优化效果: 性能提升约 40 倍
回滚测试
回滚脚本必须经过实际测试才能用于生产。Claude Code在非生产环境中执行完整的"升级-验证-回滚-验证"循环,确保回滚路径畅通。
# Claude Code 回滚测试流程
#!/bin/bash
# rollback-test.sh - 完整的回滚测试流程
set -e
echo "=== 步骤1: 记录当前迁移版本 ==="
BEFORE_VERSION=$(alembic current)
echo "当前版本: $BEFORE_VERSION"
echo "=== 步骤2: 执行升级 ==="
alembic upgrade head
echo "=== 步骤3: 升级后验证 ==="
# 运行数据完整性校验
python verify_migration.py
echo "=== 步骤4: 回滚一个版本 ==="
alembic downgrade -1
echo "=== 步骤5: 验证回滚结果 ==="
AFTER_DOWNGRADE=$(alembic current)
echo "回滚后版本: $AFTER_DOWNGRADE"
if [ "$BEFORE_VERSION" = "$AFTER_DOWNGRADE" ]; then
echo "[PASS] 回滚版本匹配"
else
echo "[FAIL] 回滚版本不匹配"
echo "期望: $BEFORE_VERSION"
echo "实际: $AFTER_DOWNGRADE"
exit 1
fi
echo "=== 步骤6: 重新升级(验证downgrade未破坏upgrade路径) ==="
alembic upgrade head
echo "=== 步骤7: 最终验证 ==="
FINAL_VERSION=$(alembic current)
python verify_migration.py
echo "=== 回滚测试完成 ==="
echo "升级-回滚-再升级 循环验证通过"
六、数据迁移辅助
数据迁移是数据库迁移工作中最复杂的部分。与单纯的DDL变更不同,数据迁移涉及存量数据的读取、转换、清洗和写入,每一个环节都可能引入错误。Claude Code在数据迁移辅助方面提供数据清洗、格式转换、编码转换、大表分批处理和复杂外键关系处理等能力。
数据清洗与格式转换
旧系统中的数据往往存在空值、格式不一致、编码混乱等问题。Claude Code自动识别数据质量问题并生成对应的清洗脚本。
-- 数据清洗:电话号码格式化
-- 迁移前数据质量问题:
-- 1. 手机号格式不一致: 13812345678 / 138-1234-5678 / +8613812345678
-- 2. 固话缺少区号: 12345678
-- 3. 包含非法字符: 138 1234 5678
-- 清洗脚本
UPDATE users SET
phone = REGEXP_REPLACE(
REGEXP_REPLACE(phone, '[^0-9+]', '', 'g'), -- 移除非数字字符
'^(\+?86)?(\d{3})(\d{4})(\d{4})$',
'\1-\2-\3-\4'
),
phone_clean_status = 'cleaned'
WHERE phone IS NOT NULL
AND phone_clean_status IS DISTINCT FROM 'cleaned';
-- 数据清洗:邮箱地址规范化
UPDATE users SET
email = LOWER(TRIM(email)),
email_clean_status = 'cleaned'
WHERE email IS NOT NULL
AND email_clean_status IS DISTINCT FROM 'cleaned';
-- 数据清洗:日期格式统一
UPDATE orders SET
order_date = CASE
WHEN order_date::text ~ '^\d{4}/\d{2}/\d{2}$'
THEN TO_DATE(order_date::text, 'YYYY/MM/DD')
WHEN order_date::text ~ '^\d{2}-\d{2}-\d{4}$'
THEN TO_DATE(order_date::text, 'MM-DD-YYYY')
ELSE order_date
END,
date_clean_status = 'cleaned'
WHERE date_clean_status IS DISTINCT FROM 'cleaned';
大表分批迁移
对于包含数百万甚至上亿行记录的大表,一次性迁移会导致长时间锁表和事务日志暴涨。Claude Code自动生成分批迁移策略,支持游标分页和时间窗口限流。
# 大表分批迁移:使用游标分页 + 时间窗口限流
# 场景:将 legacy_orders (约 2 亿行) 迁移到 orders 新表
import time
from datetime import datetime, timedelta
BATCH_SIZE = 10000
MAX_RUNTIME = timedelta(hours=4) # 维护窗口4小时
RATE_LIMIT = 5000 # 每秒最多处理5000行
def batch_migrate_legacy_orders():
"""大表分批迁移,支持断点续传和速率控制"""
start_time = datetime.now()
last_id = 0
total_migrated = 0
batch_start = time.time()
while datetime.now() - start_time < MAX_RUNTIME:
batch_start_time = time.time()
# 1. 读取一批旧数据
legacy_rows = db_session.execute(
text("""
SELECT *
FROM legacy_orders
WHERE id > :last_id
ORDER BY id
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"last_id": last_id, "limit": BATCH_SIZE}
).fetchall()
if not legacy_rows:
logger.info("所有数据迁移完成")
break
# 2. 数据转换和写入
for row in legacy_rows:
try:
# 数据格式转换
new_order = {
"id": row.id,
"user_id": row.customer_id, # 字段重命名
"total": row.amount * 1.0, # 类型转换
"status": map_legacy_status(row.status), # 枚举映射
"created_at": row.order_date,
"updated_at": datetime.utcnow()
}
db_session.execute(
text("""
INSERT INTO orders (id, user_id, total, status,
created_at, updated_at)
VALUES (:id, :user_id, :total, :status,
:created_at, :updated_at)
ON CONFLICT (id) DO NOTHING
"""),
new_order
)
last_id = row.id
total_migrated += 1
except Exception as e:
logger.error(f"迁移失败 id={row.id}: {str(e)}")
# 记录失败行到错误表以便后续处理
db_session.execute(
text("""
INSERT INTO migration_errors
(table_name, row_id, error_msg, created_at)
VALUES ('legacy_orders', :rid, :msg, :now)
"""),
{"rid": row.id, "msg": str(e), "now": datetime.utcnow()}
)
db_session.commit()
# 3. 速率控制
elapsed = time.time() - batch_start_time
batch_rate = BATCH_SIZE / elapsed if elapsed > 0 else BATCH_SIZE
logger.info(
f"批次完成: last_id={last_id}, "
f"累计={total_migrated}, "
f"速率={batch_rate:.0f}行/秒"
)
if batch_rate > RATE_LIMIT:
sleep_time = (BATCH_SIZE / RATE_LIMIT) - elapsed
if sleep_time > 0:
logger.info(f"速率控制: 暂停 {sleep_time:.1f} 秒")
time.sleep(sleep_time)
# 迁移总结
total_time = (datetime.now() - start_time).total_seconds()
logger.info(
f"大表迁移完成: 共{total_migrated}行, "
f"耗时{total_time:.0f}秒, "
f"平均速率{total_migrated/total_time:.0f}行/秒"
)
return total_migrated
外键处理
外键约束是数据迁移中的主要障碍。Claude Code在处理外键时遵循"先迁主表、后迁子表"的原则,并在必要时临时禁用外键约束以加快迁移速度。
# 带外键约束的分层迁移策略
def migrate_with_foreign_keys():
"""按外键依赖顺序分层迁移"""
# 层级1: 基础表(无外键依赖)
tables_level_1 = ["categories", "tags", "users"]
for table in tables_level_1:
batch_migrate_table(f"legacy_{table}", table)
# 层级2: 依赖基础表的表
tables_level_2 = ["products", "posts"]
for table in tables_level_2:
batch_migrate_table(f"legacy_{table}", table)
# 层级3: 依赖层级2的表
tables_level_3 = ["order_items", "comments"]
for table in tables_level_3:
batch_migrate_table(f"legacy_{table}", table)
# 验证外键完整性
for fk_name, fk_info in FOREIGN_KEYS.items():
orphan_count = db_session.execute(
text(f"""
SELECT COUNT(*) FROM {fk_info['child_table']} c
LEFT JOIN {fk_info['parent_table']} p
ON c.{fk_info['child_column']} = p.{fk_info['parent_column']}
WHERE p.{fk_info['parent_column']} IS NULL
""")
).scalar()
if orphan_count > 0:
logger.warning(
f"外键 {fk_name}: 发现 {orphan_count} 条孤立记录"
)
else:
logger.info(f"外键 {fk_name}: 完整性验证通过")
def disable_fks_for_bulk_load():
"""批量加载时临时禁用外键"""
# 禁用所有外键约束
db_session.execute(text("SET session_replication_role = 'replica';"))
try:
# 执行批量数据加载
bulk_load_data()
db_session.commit()
except Exception as e:
db_session.rollback()
raise e
finally:
# 重新启用外键约束
db_session.execute(text("SET session_replication_role = 'origin';"))
# 重新验证所有外键
db_session.execute(text("ALTER TABLE orders VALIDATE CONSTRAINT fk_orders_users;"))
db_session.execute(text("ALTER TABLE order_items VALIDATE CONSTRAINT fk_order_items_orders;"))
七、迁移版本管理
迁移版本管理是数据库长期维护的基础设施。随着项目迭代,迁移文件数量可能增加到数百个,版本管理变得至关重要。Claude Code在迁移版本管理方面提供基线建立、历史梳理、降级路径分析和分支冲突解决等高级功能。
基准线建立
新项目或者接手遗留系统时,需要建立一个清晰的迁移基线。Claude Code通过与当前数据库结构对比,自动生成基准线迁移脚本。
# 基准线建立:Alembic stub模式
# 场景:团队决定从某个版本开始正式使用Alembic管理迁移
# 之前的所有变更通过基准线脚本汇总为一个初始状态
# 步骤1: 创建基准线迁移
alembic revision --autogenerate -m "baseline_v1"
# 步骤2: Claude Code生成的自动检测脚本
# 检测当前数据库结构并生成基准线迁移
import sqlalchemy as sa
from alembic import op
def upgrade():
# 以下SQL由Claude Code根据当前数据库结构自动生成
# 仅当目标数据库不存在该表时才创建
op.execute("""
CREATE TABLE IF NOT EXISTS migration_baseline (
id SERIAL PRIMARY KEY,
baseline_version VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
table_count INTEGER,
object_hash VARCHAR(64)
);
""")
# 记录基准线快照
table_count_query = """
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
"""
result = op.get_bind().execute(table_count_query)
table_count = result.scalar()
op.execute(f"""
INSERT INTO migration_baseline
(baseline_version, table_count, object_hash)
VALUES ('v1.0.0', {table_count},
md5(current_database() || NOW()::text))
""")
# 步骤3: 标记现有迁移为已应用(不实际执行)
alembic stamp head
降级路径规划
每个迁移都必须有对应的降级脚本。Claude Code在生成升级脚本时同步生成降级脚本,并维护完整的降级路径拓扑图,确保从任何版本都能安全回退。
# 降级路径可视化与验证
# Alembic 降级路径拓扑
#
# 001 (初始架构)
# |
# 002 (添加用户表)
# |
# 003 (添加订单表)
# | \
# | 004a (开发者A: 订单添加折扣字段)
# | |
# | 005a (开发者A: 订单添加物流信息)
# | /
# 004b (开发者B: 用户添加手机号)
# |
# 005 (合并迁移)
# |
# 006 (当前HEAD: 添加产品目录)
# 降级路径示例:
# 从 HEAD (006) 降级到 001:
# 006 -> 005 -> 004b -> 003 -> 002 -> 001
# 验证所有降级路径
def verify_downgrade_paths():
"""验证每个迁移的降级脚本是否可用"""
migration_graph = {
"001": {"down": None, "up": "002"},
"002": {"down": "001", "up": "003"},
"003": {"down": "002", "up": ["004a", "004b"]},
"004a": {"down": "003", "up": "005a"},
"004b": {"down": "003", "up": "005"},
"005a": {"down": "004a", "up": "005"},
"005": {"down": ["004b", "005a"], "up": "006"},
"006": {"down": "005", "up": None},
}
# 从HEAD开始,验证到每个节点的降级路径
for target in migration_graph:
if target == "006":
continue
path = find_downgrade_path("006", target, migration_graph)
if path:
print(f"[PASS] 006 -> {target}: 降级路径存在")
print(f" 路径: {' -> '.join(path)}")
else:
print(f"[FAIL] 006 -> {target}: 无法找到降级路径")
# 执行降级脚本验证(非生产环境)
for revision in reversed(sorted(migration_graph.keys())):
if revision == "001":
continue
print(f"验证降级: {revision} -> {migration_graph[revision]['down']}")
# 在实际环境中执行:
# alembic downgrade {migration_graph[revision]['down']}
def find_downgrade_path(current, target, graph, visited=None):
"""BFS查找降级路径"""
if visited is None:
visited = set()
if current == target:
return [current]
if current in visited:
return None
visited.add(current)
node = graph.get(current)
if not node:
return None
down = node["down"]
if down is None:
return None
# 支持多个父节点(分支合并)
if isinstance(down, list):
for parent in down:
path = find_downgrade_path(parent, target, graph, visited)
if path:
return [current] + path
else:
path = find_downgrade_path(down, target, graph, visited)
if path:
return [current] + path
return None
分支合并冲突
当多个开发者或团队同时进行数据库变更时,分支合并是必然场景。Claude Code通过分析两个分支的迁移文件内容,检测潜在冲突并辅助生成合并迁移。
# Claude Code 分支合并冲突分析与解决
# 检测到的冲突场景:
# 分支A (feature/order-discount):
# 迁移 004a: 在orders表添加 discount_amount 字段
# 迁移 005a: 在orders表添加 coupon_id 字段
#
# 分支B (feature/user-phone):
# 迁移 004b: 在users表添加 phone 字段
# 迁移 005b: 在orders表添加 shipping_address 字段
#
# 冲突分析结果:
# 1. 004a 和 004b 修改不同的表 -> 无冲突
# 2. 005a 和 005b 修改同一张表但不相同字段 -> 无冲突
# 3. 003 -> 004a 和 003 -> 004b 形成分叉 -> 需要合并迁移
# 自动生成的合并迁移:
"""merge feature/order-discount and feature/user-phone
Revision ID: merge_004ab
Revises: 004a, 004b
Create Date: 2026-05-08 12:00:00.000000
"""
revision = 'merge_004ab'
down_revision = ('004a', '004b') # 两个父节点
branch_labels = None
depends_on = None
def upgrade():
# 两个分支的变更是独立的,合并迁移本身不需要额外操作
pass
def downgrade():
pass
迁移重置与压缩
当迁移文件积累过多时,会影响迁移执行速度。Claude Code支持迁移压缩,将当前数据库结构导出为单一基准线迁移,并清理历史迁移文件。
# 迁移重置和压缩流程
# 场景: 项目有127个迁移文件,需要压缩为1个基准线
# 流程:
# 1. 确保所有迁移已应用到所有环境
# 2. 使用Alembic的 squash 功能压缩迁移
# 步骤1: 生成当前数据库结构的完整转储
pg_dump --schema-only --no-owner --no-acl \
-h $DB_HOST -U $DB_USER -d $DB_NAME > current_schema.sql
# 步骤2: 创建新的基准线迁移
alembic revision --autogenerate -m "squash_v2_baseline"
# 步骤3: Claude Code验证新旧结构一致
# 比较新旧迁移创建的Schema是否一致
def verify_squash():
old_hash = compute_schema_hash("current_schema.sql")
new_hash = compute_schema_hash("squash_schema.sql")
assert old_hash == new_hash, "压缩后的Schema与当前不一致"
# 步骤4: 标记基线
alembic stamp head
# 步骤5: 清理旧的迁移文件(谨慎操作!)
# 保留备份,仅在确认所有环境都stamp到新基线后才清理
# 注意: 迁移压缩是一次性的操作,通常在重大版本发布时执行
# 压缩后,旧的迁移文件应该被归档而不是删除
Claude Code提示: 迁移压缩是高风险操作,必须确保所有开发环境、CI/CD流水线和生产环境都已经stamp到新的基准线。建议在迁移压缩后运行完整的端到端测试套件,验证所有环境的一致性。
八、核心要点总结
1. 每次迁移必须可回滚: 没有回滚脚本的迁移不允许上生产环境。Claude Code在生成任何迁移脚本时都同步生成对应的降级脚本,并通过自动测试验证降级路径的可用性。
2. 向下兼容是底线: 新增字段必须允许NULL或提供DEFAULT值,删除字段必须先标记废弃,字段类型变更只能向兼容方向扩展。任何违反向下兼容原则的变更都需要架构评审和多阶段部署计划。
3. 大表操作必须分批: 对于超过100万行的大表,任何数据迁移操作都必须使用分批处理策略,配合游标分页、时间窗口限流和断点续传机制。永远不要在生产环境对大表执行一次性UPDATE或DELETE。
4. 迁移验证不可或缺: 结构对比、数据完整性检查、性能基准测试和回滚测试是迁移验证的四个必要环节。Claude Code自动生成验证脚本并在非生产环境中执行完整的"升级-验证-回滚-验证"循环。
5. 版本管理需要规划: 从项目初期就建立清晰的迁移命名规范、依赖关系和分支策略。定期审查迁移历史,必要时进行迁移压缩。合并迁移要谨慎处理,确保降级路径不中断。
6. 外键处理有顺序: 带外键约束的表迁移必须按依赖顺序分层执行(主表先于子表)。批量加载时可临时禁用外键约束,但加载后必须重新验证所有外键完整性。
7. 多环境一致性: 开发、测试、预发布、生产环境的数据库结构必须保持一致。使用自动化脚本定期检查各环境的迁移版本,发现偏差及时修复。
8. 生产变更需审核: 生产环境的数据库变更必须经过SQL审核、锁影响分析和性能评估。Claude Code生成审核清单,包含变更内容、风险评估、回滚方案和维护窗口建议。