数据库迁移工作流

Claude Code 工作流专题 · 安全管理数据库模式变更

专题:Claude Code 工作流系统学习

关键词:Claude Code, 数据库迁移, Alembic, Flyway, 模式变更, 回滚, 数据迁移, 版本控制

一、工作流概述

数据库迁移是软件开发中最具风险的环节之一。一个错误的ALTER TABLE可能导致数据丢失、服务中断甚至回滚失败。Claude Code数据库迁移工作流旨在通过标准化流程、自动化工具集成和严格的验证机制,将数据库模式变更的风险降至最低。本工作流覆盖从迁移文件创建、代码编写、执行策略到版本管理的完整生命周期,支持Alembic(Python)、Flyway(Java)、Liquibase(Java)、Knex(Node.js)等主流迁移框架。

核心原则:每次迁移必须可回滚、可重复、可验证。向下兼容性是生产数据库变更的底线,任何时候都不能被突破。

工作流总览

阶段输入输出Claude Code角色
迁移规划需求描述迁移方案文档需求分析、方案建议、风险评估
迁移编写迁移方案迁移脚本(up/down)代码生成、SQL审核、最佳实践检查
迁移执行迁移脚本已迁移数据库执行监控、错误处理、自动回滚
迁移验证迁移后数据库验证报告差异分析、数据校验、性能评估
版本管理迁移历史更新基线历史梳理、冲突解决、基线重建

二、迁移文件管理

迁移文件是数据库变更的基本单元。每个迁移文件包含一个"向上迁移"(应用变更)和一个"向下迁移"(撤销变更)。Claude Code通过对话式交互协助开发者生成、组织和管理这些文件,确保命名规范、版本有序、依赖清晰。

Alembic迁移生成

Alembic是SQLAlchemy的数据库迁移工具,广泛用于Python项目。Claude Code可以基于模型定义自动生成迁移脚本,以下是通过对话生成迁移的典型流程。

# Claude Code根据需求自动生成Alembic迁移脚本 # # 用户: 在users表上添加last_login_at和login_count两个字段 # Claude Code生成如下迁移文件: """add last_login_at and login_count to users table Revision ID: a1b2c3d4e5f6 Revises: 9z8y7x6w5v4u Create Date: 2026-05-08 10:30:00.000000 """ from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql revision = 'a1b2c3d4e5f6' down_revision = '9z8y7x6w5v4u' branch_labels = None depends_on = None def upgrade(): op.add_column( 'users', sa.Column('last_login_at', sa.DateTime(timezone=True), nullable=True, comment='最后登录时间') ) op.add_column( 'users', sa.Column('login_count', sa.Integer(), server_default='0', nullable=False, comment='登录次数') ) # 为login_count创建索引以支持统计分析查询 op.create_index('idx_users_login_count', 'users', ['login_count']) def downgrade(): op.drop_index('idx_users_login_count', table_name='users') op.drop_column('users', 'login_count') op.drop_column('users', 'last_login_at')

命名规范

良好的命名规范是迁移文件管理的基础。不同的迁移框架有不同的命名惯例,但核心原则一致:可排序、可识别、可关联。

# 推荐的迁移文件命名模式 # 1. 时间戳前缀模式(Alembic默认) # 格式: YYYYMMDD_HHMMSS_<描述>.py # 示例: 20260508_103000_add_login_fields_to_users.py 20260509_090000_create_orders_table.py 20260510_143000_add_foreign_key_orders_users.py # 2. 序号前缀模式(Flyway推荐) # 格式: V<序号>__<描述>.sql # 示例: V1__create_users_table.sql V2__create_orders_table.sql V3__add_product_catalog.sql V4__seed_initial_categories.sql # 3. 语义版本模式 # 格式: v<主版本>.<次版本>.<修订号>__<描述>.sql # 示例: v1.0.0__initial_schema.sql v1.1.0__add_shipping_address.sql v1.2.0__add_order_status_index.sql v2.0.0__redesign_user_profile.sql

Claude Code实践建议:在项目初始化时通过对话约定命名规范,Claude Code会在每次生成迁移文件时自动遵循该规范,并在代码审查中检查命名一致性。

版本控制与依赖排序

当多个开发者并行工作时,迁移文件的依赖关系和执行顺序至关重要。Alembic通过链式引用(down_revision指向前一个迁移)维护线性历史,但分支和合并场景需要特殊处理。

# Alembic依赖排序示例 # 基础迁移:v1.0 创建核心表 # 文件: 20260501_000001_create_users.py revision = '001' down_revision = None # 首个迁移没有上游 # 依赖迁移:v1.1 扩展用户表 # 文件: 20260502_000001_add_user_profile.py revision = '002' down_revision = '001' # 指向001,在创建用户表之后执行 # 并行分支迁移(两个开发者同时工作) # 开发者A: 20260503_000001_add_orders.py revision = '003a' down_revision = '002' # 开发者B: 20260503_000002_add_notifications.py revision = '003b' down_revision = '002' # 合并迁移(用于合并两个分支) # 文件: 20260504_000001_merge_003ab.py revision = '004' down_revision = '003a' # 第一个父节点 # Alembic支持多个down_revision进行合并 # 实际通过 merge_branch 方式处理: # alembic merge -m "merge 003a and 003b" 003a 003b

风险提示:并行分支迁移是冲突的高发区。如果两个分支修改同一个表结构,合并迁移将无法自动解决冲突,必须人工介入。Claude Code可以通过分析两个分支的迁移内容,识别冲突点并辅助生成合并脚本。

三、迁移编写

迁移编写的质量直接决定了数据安全和服务稳定性。Claude Code在迁移编写环节提供从DDL生成到业务逻辑转换的全方位辅助,同时严格执行向下兼容性检查。

建表迁移

创建新表是最基础的迁移类型。Claude Code能够根据实体关系描述或现有的ORM模型自动生成建表语句,并自动添加必要的约束、索引和注释。

-- Flyway SQL格式:创建产品目录表 -- V3__create_product_catalog.sql CREATE TABLE product_categories ( id BIGSERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, slug VARCHAR(120) NOT NULL UNIQUE, description TEXT, parent_id BIGINT REFERENCES product_categories(id), sort_order INTEGER NOT NULL DEFAULT 0, is_active BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_categories_parent ON product_categories(parent_id); CREATE INDEX idx_categories_active ON product_categories(is_active) WHERE is_active = TRUE; COMMENT ON TABLE product_categories IS '产品分类表,支持无限层级树形结构'; COMMENT ON COLUMN product_categories.slug IS 'URL友好的唯一标识符'; -- 回滚脚本 V3__rollback.sql -- DROP TABLE IF EXISTS product_categories CASCADE;

改表迁移

修改已有表结构是最危险的迁移类型。Claude Code会在生成改表迁移时自动评估向下兼容性,包括:新增字段是否有默认值、字段类型变更是否导致数据截断、删除字段是否会破坏上游应用。

# Liquibase XML格式:重构用户表 # 将单字段name拆分为first_name和last_name,保持兼容性 <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.0.xsd"> <changeSet id="20260508-01" author="claude-code"> <comment>将users.name拆分为first_name和last_name,保持name字段作为视图兼容</comment> <addColumn tableName="users"> <column name="first_name" type="VARCHAR(100)"> <constraints nullable="true"/> </column> <column name="last_name" type="VARCHAR(100)"> <constraints nullable="true"/> </column> </addColumn> <sql> UPDATE users SET first_name = SPLIT_PART(name, ' ', 1), last_name = SUBSTRING(name FROM POSITION(' ' IN name) + 1) WHERE first_name IS NULL; </sql> <addNotNullConstraint tableName="users" columnName="first_name" columnDataType="VARCHAR(100)"/> <addNotNullConstraint tableName="users" columnName="last_name" columnDataType="VARCHAR(100)"/> <rollback> ALTER TABLE users DROP COLUMN first_name; ALTER TABLE users DROP COLUMN last_name; </rollback> </changeSet> </databaseChangeLog>

索引和约束管理

索引和约束的创建需要权衡查询性能与写入开销。Claude Code会根据查询模式分析建议最优索引策略,并注意并发创建索引以避免锁表。

-- 索引管理最佳实践(PostgreSQL) -- 1. 并发创建索引(不阻塞DML操作) CREATE INDEX CONCURRENTLY idx_orders_user_id ON orders(user_id); -- 2. 部分索引(仅索引活跃数据,节省空间) CREATE INDEX CONCURRENTLY idx_active_subscriptions ON subscriptions(user_id, end_date) WHERE status = 'active'; -- 3. 覆盖索引(避免回表查询) CREATE INDEX CONCURRENTLY idx_orders_lookup ON orders(user_id, status) INCLUDE (total_amount, created_at); -- 4. 表达式索引(支持函数查询) CREATE INDEX CONCURRENTLY idx_users_email_domain ON users(LOWER(SPLIT_PART(email, '@', 2))); -- 5. 删除未使用的索引 -- 先通过pg_stat_user_indexes确认索引使用情况 SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch FROM pg_stat_user_indexes WHERE idx_scan = 0 ORDER BY tablename;

数据迁移

数据迁移是迁移脚本中最复杂的部分,涉及数据格式转换、清洗、去重等操作。Claude Code在生成数据迁移脚本时自动添加分批处理、进度监控和错误处理逻辑。

# 批量数据迁移脚本(带进度报告和断点续传能力) # 场景:将旧版JSON配置字段迁移到规范化关联表 import json import logging from datetime import datetime from sqlalchemy import text from app import db_session logger = logging.getLogger(__name__) BATCH_SIZE = 500 # 每批处理500条 def upgrade(): """将user_preferences中的JSON配置迁移到preference_settings表""" total_processed = 0 total_errors = 0 last_id = 0 start_time = datetime.now() logger.info(f"开始数据迁移: {start_time.isoformat()}") while True: # 分批获取数据,使用last_id游标实现分批 rows = db_session.execute( text(""" SELECT id, user_id, config_json, updated_at FROM user_preferences WHERE id > :last_id ORDER BY id LIMIT :limit """), {"last_id": last_id, "limit": BATCH_SIZE} ).fetchall() if not rows: break for row in rows: try: config = json.loads(row.config_json) # 将JSON配置展开为规范化行 for key, value in config.items(): db_session.execute( text(""" INSERT INTO preference_settings (user_id, pref_key, pref_value, created_at, updated_at) VALUES (:uid, :key, :val, :now, :now) ON CONFLICT (user_id, pref_key) DO UPDATE SET pref_value = :val, updated_at = :now """), { "uid": row.user_id, "key": key, "val": str(value), "now": datetime.utcnow() } ) last_id = row.id total_processed += 1 except (json.JSONDecodeError, Exception) as e: total_errors += 1 logger.error(f"迁移失败 user_id={row.user_id}: {str(e)}") # 记录失败但继续,确保不阻塞整个迁移 db_session.commit() logger.info( f"已迁移 {total_processed} 条记录, " f"错误 {total_errors} 条, " f"当前游标 id={last_id}" ) elapsed = (datetime.now() - start_time).total_seconds() logger.info( f"数据迁移完成: 共{total_processed}条, " f"错误{total_errors}条, " f"耗时{elapsed:.2f}秒" ) def downgrade(): """回滚:清空迁移后的数据""" db_session.execute(text("TRUNCATE TABLE preference_settings")) logger.info("数据迁移已回滚: preference_settings 已清空")

向下兼容性检查清单

生产数据库迁移的黄金法则是向下兼容。Claude Code在每次生成改表迁移时自动执行以下兼容性检查。

检查项合规做法违规后果
新增字段必须允许NULL或有DEFAULT值锁表、写入失败
删除字段先标记废弃,等待下游消费方更新后再删除下游查询崩溃
字段类型变更仅允许向兼容方向扩展(INT→BIGINT)数据截断、隐式转换失败
重命名表/字段创建新对象+数据同步+旧对象做视图全链路断裂
添加NOT NULL先填充数据,再分批添加约束CHECK约束失败导致迁移中断

四、迁移执行策略

迁移执行策略的选型取决于项目阶段、数据库规模和风险承受能力。Claude Code支持三种执行模式,并在执行过程中提供实时监控和自动故障处理。

自动迁移

适用于开发和CI/CD环境。Claude Code自动检测未应用的迁移并按顺序执行,失败时自动触发回滚并提供错误分析。

# Claude Code 自动迁移执行命令 # 1. Alembic 自动迁移 alembic upgrade head # 2. Flyway 自动迁移 flyway migrate # 3. Liquibase 自动迁移 liquibase update # 4. Knex.js 自动迁移 npx knex migrate:latest # Claude Code 自动迁移监控脚本 #!/bin/bash # auto-migrate.sh - 带错误处理的自动迁移 echo "[INFO] 开始数据库迁移: $(date '+%Y-%m-%d %H:%M:%S')" # 前置检查:数据库连接 if ! pg_isready -h $DB_HOST -p $DB_PORT -U $DB_USER; then echo "[ERROR] 数据库不可达,终止迁移" exit 1 fi # 执行迁移 if alembic upgrade head 2>&1; then echo "[INFO] 迁移成功完成" # 记录迁移完成状态 echo "MIGRATION_STATUS=SUCCESS" > migration-status.env else echo "[ERROR] 迁移失败,执行回滚" # 回滚到上一个版本 alembic downgrade -1 echo "MIGRATION_STATUS=ROLLED_BACK" > migration-status.env exit 2 fi # 后置验证:检查关键表是否存在 for table in users orders products; do if ! psql -h $DB_HOST -c "SELECT 1 FROM $table LIMIT 1" &>/dev/null; then echo "[WARN] 表 $table 验证异常" fi done echo "[INFO] 迁移流程结束: $(date '+%Y-%m-%d %H:%M:%S')"

手动审核模式

生产环境推荐的模式。Claude Code生成迁移脚本后,由DBA或高级开发者审核SQL语句,确认无性能风险和兼容性问题后再手动执行。

# Claude Code 生产迁移审核清单 # 审核项1: SQL语法正确性 # Claude Code: 已通过SQL语法验证 (使用 EXPLAIN 预执行) # 审核项2: 锁影响分析 迁移涉及操作: 新增字段 + 创建索引 锁分析: - ALTER TABLE ADD COLUMN: AccessExclusiveLock 影响: 阻塞所有DML操作 建议: 在维护窗口执行,预计执行时间 < 50ms - CREATE INDEX CONCURRENTLY: 仅ShareLock 影响: 不阻塞DML 建议: 始终使用CONCURRENTLY模式 # 审核项3: 大表影响评估 表 users 数据量: 约 500万 行 新增字段: login_count INTEGER DEFAULT 0 评估: 即时DDL操作,不涉及数据重写 # 审核项4: 回滚方案 回滚脚本: V2.1__rollback.sql 回滚类型: 自动回滚 数据丢失风险: 无(新增字段,drop即可) # 审核结论: 审核通过,可在维护窗口执行

生产环境铁律:永远不要在业务高峰期执行数据库迁移。大型表结构的变更(如添加NOT NULL约束、修改列类型)必须在维护窗口内执行,并预留至少一倍的缓冲时间用于回滚。

多环境同步

多环境(开发、测试、预发布、生产)的数据库结构同步是迁移管理的核心挑战。Claude Code通过迁移文件版本控制和环境配置管理,确保各环境的结构一致性。

# 多环境同步策略 # 1. 环境配置管理 # alembic.ini 或 .env 文件按环境加载 # 开发环境 (.env.development) DB_URL=postgresql://dev_user:dev_pass@localhost:5432/dev_db # 测试环境 (.env.testing) DB_URL=postgresql://test_user:test_pass@test-host:5432/test_db # 生产环境 (.env.production) DB_URL=postgresql://prod_user:prod_pass@prod-host:5432/prod_db # 2. 环境一致性校验脚本 #!/bin/bash # check-schema-sync.sh echo "=== 数据库结构一致性检查 ===" # 获取各环境的迁移版本 DEV_VERSION=$(DATABASE_URL=$DEV_DB_URL alembic current 2>/dev/null) STAGING_VERSION=$(DATABASE_URL=$STAGING_DB_URL alembic current 2>/dev/null) PROD_VERSION=$(DATABASE_URL=$PROD_DB_URL alembic current 2>/dev/null) echo "开发环境: $DEV_VERSION" echo "预发布环境: $STAGING_VERSION" echo "生产环境: $PROD_VERSION" if [ "$DEV_VERSION" != "$STAGING_VERSION" ]; then echo "[WARN] 开发环境和预发布环境版本不一致" fi if [ "$STAGING_VERSION" != "$PROD_VERSION" ]; then echo "[WARN] 预发布环境和生产环境版本不一致" fi

五、迁移验证

迁移后的验证是保障数据安全的重要防线。Claude Code从三个维度进行迁移验证:结构一致性、数据完整性和性能影响。每次迁移执行后自动生成验证报告。

迁移前后结构对比

Claude Code自动对比迁移前后的数据库Schema,生成结构差异报告,确保实际变更与预期一致,不会有意外的副作用。

-- Claude Code 结构差异对比报告 -- 迁移前结构摘要: -- public.users -- id BIGSERIAL PK -- email VARCHAR(255) NOT NULL UNIQUE -- name VARCHAR(100) NOT NULL -- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() -- 索引: idx_users_email -- 迁移后结构摘要: -- public.users -- id BIGSERIAL PK -- email VARCHAR(255) NOT NULL UNIQUE -- name VARCHAR(100) NOT NULL -- first_name VARCHAR(100) -- 新增字段 -- last_name VARCHAR(100) -- 新增字段 -- login_count INTEGER NOT NULL DEFAULT 0 -- 新增字段 -- last_login TIMESTAMPTZ -- 新增字段 -- created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() -- 索引: idx_users_email, idx_users_login_count -- 差异对比: -- [+] 新增字段: first_name, last_name, login_count, last_login -- [+] 新增索引: idx_users_login_count -- [=] 未变更: id, email, name, created_at -- [!] 风险项: 无(所有变更均向下兼容) -- 自动生成的SQL差异确认 SELECT column_name, data_type, is_nullable, column_default, character_maximum_length FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'users' ORDER BY ordinal_position;

数据完整性检查

数据迁移后,Claude Code执行完整性校验脚本,验证行数一致性、外键完整性、业务约束未被破坏。

# 数据完整性校验脚本 def verify_migration_integrity(): """迁移后数据完整性自动校验""" checks = [] # 检查1: 记录数一致性 old_count = db_session.execute( text("SELECT COUNT(*) FROM user_preferences") ).scalar() new_count = db_session.execute( text("SELECT COUNT(DISTINCT user_id) FROM preference_settings") ).scalar() checks.append({ "check": "用户偏好数据迁移", "source_count": old_count, "target_count": new_count, "status": "PASS" if old_count == new_count else "FAIL" }) # 检查2: 外键完整性 orphaned = db_session.execute( text(""" SELECT COUNT(*) FROM preference_settings ps LEFT JOIN users u ON ps.user_id = u.id WHERE u.id IS NULL """) ).scalar() checks.append({ "check": "外键完整性", "orphaned_count": orphaned, "status": "PASS" if orphaned == 0 else "FAIL" }) # 检查3: 非空约束 null_vals = db_session.execute( text(""" SELECT COUNT(*) FROM preference_settings WHERE pref_key IS NULL OR pref_value IS NULL """) ).scalar() checks.append({ "check": "非空约束", "null_count": null_vals, "status": "PASS" if null_vals == 0 else "FAIL" }) # 输出报告 print("=== 数据完整性校验报告 ===") all_pass = True for c in checks: status_icon = "[PASS]" if c["status"] == "PASS" else "[FAIL]" print(f"{status_icon} {c['check']}") if c["status"] == "FAIL": all_pass = False print(f"\n结论: {'所有检查通过' if all_pass else '存在失败的检查项'}") return all_pass

性能影响评估

结构变更可能影响查询性能。Claude Code在迁移后自动执行关键查询的性能基准测试,对比迁移前后的执行计划变化。

-- 迁移前后执行计划对比 -- 查询: 最近30天的活跃用户订单 EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) SELECT u.id, u.email, COUNT(o.id) as order_count, SUM(o.total_amount) as total_spent FROM users u JOIN orders o ON o.user_id = u.id WHERE o.created_at >= NOW() - INTERVAL '30 days' AND o.status IN ('paid', 'shipped', 'completed') GROUP BY u.id, u.email ORDER BY total_spent DESC LIMIT 100; -- 迁移前执行计划摘要: -- Seq Scan on orders (cost=0.00..45231.20 rows=152340 width=32) -- Filter: (status = ANY ('{paid,shipped,completed}'::text[])) -- 总耗时: 1850 ms -- 问题: 缺少(status, created_at)复合索引,导致全表扫描 -- 迁移后执行计划摘要: -- Index Scan using idx_orders_status_date on orders (cost=0.56..1234.50 rows=15234 width=32) -- Index Cond: ((status = ANY ('{paid,shipped,completed}'::text[])) -- AND (created_at >= NOW() - INTERVAL '30 days')) -- 总耗时: 45 ms -- 优化效果: 性能提升约 40 倍

回滚测试

回滚脚本必须经过实际测试才能用于生产。Claude Code在非生产环境中执行完整的"升级-验证-回滚-验证"循环,确保回滚路径畅通。

# Claude Code 回滚测试流程 #!/bin/bash # rollback-test.sh - 完整的回滚测试流程 set -e echo "=== 步骤1: 记录当前迁移版本 ===" BEFORE_VERSION=$(alembic current) echo "当前版本: $BEFORE_VERSION" echo "=== 步骤2: 执行升级 ===" alembic upgrade head echo "=== 步骤3: 升级后验证 ===" # 运行数据完整性校验 python verify_migration.py echo "=== 步骤4: 回滚一个版本 ===" alembic downgrade -1 echo "=== 步骤5: 验证回滚结果 ===" AFTER_DOWNGRADE=$(alembic current) echo "回滚后版本: $AFTER_DOWNGRADE" if [ "$BEFORE_VERSION" = "$AFTER_DOWNGRADE" ]; then echo "[PASS] 回滚版本匹配" else echo "[FAIL] 回滚版本不匹配" echo "期望: $BEFORE_VERSION" echo "实际: $AFTER_DOWNGRADE" exit 1 fi echo "=== 步骤6: 重新升级(验证downgrade未破坏upgrade路径) ===" alembic upgrade head echo "=== 步骤7: 最终验证 ===" FINAL_VERSION=$(alembic current) python verify_migration.py echo "=== 回滚测试完成 ===" echo "升级-回滚-再升级 循环验证通过"

六、数据迁移辅助

数据迁移是数据库迁移工作中最复杂的部分。与单纯的DDL变更不同,数据迁移涉及存量数据的读取、转换、清洗和写入,每一个环节都可能引入错误。Claude Code在数据迁移辅助方面提供数据清洗、格式转换、编码转换、大表分批处理和复杂外键关系处理等能力。

数据清洗与格式转换

旧系统中的数据往往存在空值、格式不一致、编码混乱等问题。Claude Code自动识别数据质量问题并生成对应的清洗脚本。

-- 数据清洗:电话号码格式化 -- 迁移前数据质量问题: -- 1. 手机号格式不一致: 13812345678 / 138-1234-5678 / +8613812345678 -- 2. 固话缺少区号: 12345678 -- 3. 包含非法字符: 138 1234 5678 -- 清洗脚本 UPDATE users SET phone = REGEXP_REPLACE( REGEXP_REPLACE(phone, '[^0-9+]', '', 'g'), -- 移除非数字字符 '^(\+?86)?(\d{3})(\d{4})(\d{4})$', '\1-\2-\3-\4' ), phone_clean_status = 'cleaned' WHERE phone IS NOT NULL AND phone_clean_status IS DISTINCT FROM 'cleaned'; -- 数据清洗:邮箱地址规范化 UPDATE users SET email = LOWER(TRIM(email)), email_clean_status = 'cleaned' WHERE email IS NOT NULL AND email_clean_status IS DISTINCT FROM 'cleaned'; -- 数据清洗:日期格式统一 UPDATE orders SET order_date = CASE WHEN order_date::text ~ '^\d{4}/\d{2}/\d{2}$' THEN TO_DATE(order_date::text, 'YYYY/MM/DD') WHEN order_date::text ~ '^\d{2}-\d{2}-\d{4}$' THEN TO_DATE(order_date::text, 'MM-DD-YYYY') ELSE order_date END, date_clean_status = 'cleaned' WHERE date_clean_status IS DISTINCT FROM 'cleaned';

大表分批迁移

对于包含数百万甚至上亿行记录的大表,一次性迁移会导致长时间锁表和事务日志暴涨。Claude Code自动生成分批迁移策略,支持游标分页和时间窗口限流。

# 大表分批迁移:使用游标分页 + 时间窗口限流 # 场景:将 legacy_orders (约 2 亿行) 迁移到 orders 新表 import time from datetime import datetime, timedelta BATCH_SIZE = 10000 MAX_RUNTIME = timedelta(hours=4) # 维护窗口4小时 RATE_LIMIT = 5000 # 每秒最多处理5000行 def batch_migrate_legacy_orders(): """大表分批迁移,支持断点续传和速率控制""" start_time = datetime.now() last_id = 0 total_migrated = 0 batch_start = time.time() while datetime.now() - start_time < MAX_RUNTIME: batch_start_time = time.time() # 1. 读取一批旧数据 legacy_rows = db_session.execute( text(""" SELECT * FROM legacy_orders WHERE id > :last_id ORDER BY id LIMIT :limit FOR UPDATE SKIP LOCKED """), {"last_id": last_id, "limit": BATCH_SIZE} ).fetchall() if not legacy_rows: logger.info("所有数据迁移完成") break # 2. 数据转换和写入 for row in legacy_rows: try: # 数据格式转换 new_order = { "id": row.id, "user_id": row.customer_id, # 字段重命名 "total": row.amount * 1.0, # 类型转换 "status": map_legacy_status(row.status), # 枚举映射 "created_at": row.order_date, "updated_at": datetime.utcnow() } db_session.execute( text(""" INSERT INTO orders (id, user_id, total, status, created_at, updated_at) VALUES (:id, :user_id, :total, :status, :created_at, :updated_at) ON CONFLICT (id) DO NOTHING """), new_order ) last_id = row.id total_migrated += 1 except Exception as e: logger.error(f"迁移失败 id={row.id}: {str(e)}") # 记录失败行到错误表以便后续处理 db_session.execute( text(""" INSERT INTO migration_errors (table_name, row_id, error_msg, created_at) VALUES ('legacy_orders', :rid, :msg, :now) """), {"rid": row.id, "msg": str(e), "now": datetime.utcnow()} ) db_session.commit() # 3. 速率控制 elapsed = time.time() - batch_start_time batch_rate = BATCH_SIZE / elapsed if elapsed > 0 else BATCH_SIZE logger.info( f"批次完成: last_id={last_id}, " f"累计={total_migrated}, " f"速率={batch_rate:.0f}行/秒" ) if batch_rate > RATE_LIMIT: sleep_time = (BATCH_SIZE / RATE_LIMIT) - elapsed if sleep_time > 0: logger.info(f"速率控制: 暂停 {sleep_time:.1f} 秒") time.sleep(sleep_time) # 迁移总结 total_time = (datetime.now() - start_time).total_seconds() logger.info( f"大表迁移完成: 共{total_migrated}行, " f"耗时{total_time:.0f}秒, " f"平均速率{total_migrated/total_time:.0f}行/秒" ) return total_migrated

外键处理

外键约束是数据迁移中的主要障碍。Claude Code在处理外键时遵循"先迁主表、后迁子表"的原则,并在必要时临时禁用外键约束以加快迁移速度。

# 带外键约束的分层迁移策略 def migrate_with_foreign_keys(): """按外键依赖顺序分层迁移""" # 层级1: 基础表(无外键依赖) tables_level_1 = ["categories", "tags", "users"] for table in tables_level_1: batch_migrate_table(f"legacy_{table}", table) # 层级2: 依赖基础表的表 tables_level_2 = ["products", "posts"] for table in tables_level_2: batch_migrate_table(f"legacy_{table}", table) # 层级3: 依赖层级2的表 tables_level_3 = ["order_items", "comments"] for table in tables_level_3: batch_migrate_table(f"legacy_{table}", table) # 验证外键完整性 for fk_name, fk_info in FOREIGN_KEYS.items(): orphan_count = db_session.execute( text(f""" SELECT COUNT(*) FROM {fk_info['child_table']} c LEFT JOIN {fk_info['parent_table']} p ON c.{fk_info['child_column']} = p.{fk_info['parent_column']} WHERE p.{fk_info['parent_column']} IS NULL """) ).scalar() if orphan_count > 0: logger.warning( f"外键 {fk_name}: 发现 {orphan_count} 条孤立记录" ) else: logger.info(f"外键 {fk_name}: 完整性验证通过") def disable_fks_for_bulk_load(): """批量加载时临时禁用外键""" # 禁用所有外键约束 db_session.execute(text("SET session_replication_role = 'replica';")) try: # 执行批量数据加载 bulk_load_data() db_session.commit() except Exception as e: db_session.rollback() raise e finally: # 重新启用外键约束 db_session.execute(text("SET session_replication_role = 'origin';")) # 重新验证所有外键 db_session.execute(text("ALTER TABLE orders VALIDATE CONSTRAINT fk_orders_users;")) db_session.execute(text("ALTER TABLE order_items VALIDATE CONSTRAINT fk_order_items_orders;"))

七、迁移版本管理

迁移版本管理是数据库长期维护的基础设施。随着项目迭代,迁移文件数量可能增加到数百个,版本管理变得至关重要。Claude Code在迁移版本管理方面提供基线建立、历史梳理、降级路径分析和分支冲突解决等高级功能。

基准线建立

新项目或者接手遗留系统时,需要建立一个清晰的迁移基线。Claude Code通过与当前数据库结构对比,自动生成基准线迁移脚本。

# 基准线建立:Alembic stub模式 # 场景:团队决定从某个版本开始正式使用Alembic管理迁移 # 之前的所有变更通过基准线脚本汇总为一个初始状态 # 步骤1: 创建基准线迁移 alembic revision --autogenerate -m "baseline_v1" # 步骤2: Claude Code生成的自动检测脚本 # 检测当前数据库结构并生成基准线迁移 import sqlalchemy as sa from alembic import op def upgrade(): # 以下SQL由Claude Code根据当前数据库结构自动生成 # 仅当目标数据库不存在该表时才创建 op.execute(""" CREATE TABLE IF NOT EXISTS migration_baseline ( id SERIAL PRIMARY KEY, baseline_version VARCHAR(50) NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), table_count INTEGER, object_hash VARCHAR(64) ); """) # 记录基准线快照 table_count_query = """ SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' """ result = op.get_bind().execute(table_count_query) table_count = result.scalar() op.execute(f""" INSERT INTO migration_baseline (baseline_version, table_count, object_hash) VALUES ('v1.0.0', {table_count}, md5(current_database() || NOW()::text)) """) # 步骤3: 标记现有迁移为已应用(不实际执行) alembic stamp head

降级路径规划

每个迁移都必须有对应的降级脚本。Claude Code在生成升级脚本时同步生成降级脚本,并维护完整的降级路径拓扑图,确保从任何版本都能安全回退。

# 降级路径可视化与验证 # Alembic 降级路径拓扑 # # 001 (初始架构) # | # 002 (添加用户表) # | # 003 (添加订单表) # | \ # | 004a (开发者A: 订单添加折扣字段) # | | # | 005a (开发者A: 订单添加物流信息) # | / # 004b (开发者B: 用户添加手机号) # | # 005 (合并迁移) # | # 006 (当前HEAD: 添加产品目录) # 降级路径示例: # 从 HEAD (006) 降级到 001: # 006 -> 005 -> 004b -> 003 -> 002 -> 001 # 验证所有降级路径 def verify_downgrade_paths(): """验证每个迁移的降级脚本是否可用""" migration_graph = { "001": {"down": None, "up": "002"}, "002": {"down": "001", "up": "003"}, "003": {"down": "002", "up": ["004a", "004b"]}, "004a": {"down": "003", "up": "005a"}, "004b": {"down": "003", "up": "005"}, "005a": {"down": "004a", "up": "005"}, "005": {"down": ["004b", "005a"], "up": "006"}, "006": {"down": "005", "up": None}, } # 从HEAD开始,验证到每个节点的降级路径 for target in migration_graph: if target == "006": continue path = find_downgrade_path("006", target, migration_graph) if path: print(f"[PASS] 006 -> {target}: 降级路径存在") print(f" 路径: {' -> '.join(path)}") else: print(f"[FAIL] 006 -> {target}: 无法找到降级路径") # 执行降级脚本验证(非生产环境) for revision in reversed(sorted(migration_graph.keys())): if revision == "001": continue print(f"验证降级: {revision} -> {migration_graph[revision]['down']}") # 在实际环境中执行: # alembic downgrade {migration_graph[revision]['down']} def find_downgrade_path(current, target, graph, visited=None): """BFS查找降级路径""" if visited is None: visited = set() if current == target: return [current] if current in visited: return None visited.add(current) node = graph.get(current) if not node: return None down = node["down"] if down is None: return None # 支持多个父节点(分支合并) if isinstance(down, list): for parent in down: path = find_downgrade_path(parent, target, graph, visited) if path: return [current] + path else: path = find_downgrade_path(down, target, graph, visited) if path: return [current] + path return None

分支合并冲突

当多个开发者或团队同时进行数据库变更时,分支合并是必然场景。Claude Code通过分析两个分支的迁移文件内容,检测潜在冲突并辅助生成合并迁移。

# Claude Code 分支合并冲突分析与解决 # 检测到的冲突场景: # 分支A (feature/order-discount): # 迁移 004a: 在orders表添加 discount_amount 字段 # 迁移 005a: 在orders表添加 coupon_id 字段 # # 分支B (feature/user-phone): # 迁移 004b: 在users表添加 phone 字段 # 迁移 005b: 在orders表添加 shipping_address 字段 # # 冲突分析结果: # 1. 004a 和 004b 修改不同的表 -> 无冲突 # 2. 005a 和 005b 修改同一张表但不相同字段 -> 无冲突 # 3. 003 -> 004a 和 003 -> 004b 形成分叉 -> 需要合并迁移 # 自动生成的合并迁移: """merge feature/order-discount and feature/user-phone Revision ID: merge_004ab Revises: 004a, 004b Create Date: 2026-05-08 12:00:00.000000 """ revision = 'merge_004ab' down_revision = ('004a', '004b') # 两个父节点 branch_labels = None depends_on = None def upgrade(): # 两个分支的变更是独立的,合并迁移本身不需要额外操作 pass def downgrade(): pass

迁移重置与压缩

当迁移文件积累过多时,会影响迁移执行速度。Claude Code支持迁移压缩,将当前数据库结构导出为单一基准线迁移,并清理历史迁移文件。

# 迁移重置和压缩流程 # 场景: 项目有127个迁移文件,需要压缩为1个基准线 # 流程: # 1. 确保所有迁移已应用到所有环境 # 2. 使用Alembic的 squash 功能压缩迁移 # 步骤1: 生成当前数据库结构的完整转储 pg_dump --schema-only --no-owner --no-acl \ -h $DB_HOST -U $DB_USER -d $DB_NAME > current_schema.sql # 步骤2: 创建新的基准线迁移 alembic revision --autogenerate -m "squash_v2_baseline" # 步骤3: Claude Code验证新旧结构一致 # 比较新旧迁移创建的Schema是否一致 def verify_squash(): old_hash = compute_schema_hash("current_schema.sql") new_hash = compute_schema_hash("squash_schema.sql") assert old_hash == new_hash, "压缩后的Schema与当前不一致" # 步骤4: 标记基线 alembic stamp head # 步骤5: 清理旧的迁移文件(谨慎操作!) # 保留备份,仅在确认所有环境都stamp到新基线后才清理 # 注意: 迁移压缩是一次性的操作,通常在重大版本发布时执行 # 压缩后,旧的迁移文件应该被归档而不是删除

Claude Code提示:迁移压缩是高风险操作,必须确保所有开发环境、CI/CD流水线和生产环境都已经stamp到新的基准线。建议在迁移压缩后运行完整的端到端测试套件,验证所有环境的一致性。

八、核心要点总结

1. 每次迁移必须可回滚:没有回滚脚本的迁移不允许上生产环境。Claude Code在生成任何迁移脚本时都同步生成对应的降级脚本,并通过自动测试验证降级路径的可用性。

2. 向下兼容是底线:新增字段必须允许NULL或提供DEFAULT值,删除字段必须先标记废弃,字段类型变更只能向兼容方向扩展。任何违反向下兼容原则的变更都需要架构评审和多阶段部署计划。

3. 大表操作必须分批:对于超过100万行的大表,任何数据迁移操作都必须使用分批处理策略,配合游标分页、时间窗口限流和断点续传机制。永远不要在生产环境对大表执行一次性UPDATE或DELETE。

4. 迁移验证不可或缺:结构对比、数据完整性检查、性能基准测试和回滚测试是迁移验证的四个必要环节。Claude Code自动生成验证脚本并在非生产环境中执行完整的"升级-验证-回滚-验证"循环。

5. 版本管理需要规划:从项目初期就建立清晰的迁移命名规范、依赖关系和分支策略。定期审查迁移历史,必要时进行迁移压缩。合并迁移要谨慎处理,确保降级路径不中断。

6. 外键处理有顺序:带外键约束的表迁移必须按依赖顺序分层执行(主表先于子表)。批量加载时可临时禁用外键约束,但加载后必须重新验证所有外键完整性。

7. 多环境一致性:开发、测试、预发布、生产环境的数据库结构必须保持一致。使用自动化脚本定期检查各环境的迁移版本,发现偏差及时修复。

8. 生产变更需审核:生产环境的数据库变更必须经过SQL审核、锁影响分析和性能评估。Claude Code生成审核清单,包含变更内容、风险评估、回滚方案和维护窗口建议。