版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

  1. 首先安装rclone成功后,进行配置,我们需要配置源和目标,有两种配置方法即通过命令行配置和配置文件(实际上命令行配置后也会写入到该配置文件,文件路径为~/.config/rclone/rclone.conf),文件内容如下

    代码块
    languagetext
    # AWS 配置
    [s3]
    type = s3
    provider = AWS
    env_auth = false
    access_key_id = XXXXXXXX # 获取方式自行查阅相关文档
    secret_access_key = XXXXXXXX # 获取方式自行查阅相关文档
    region = ap-northeast-1
    location_constraint = ap-northeast-1
    acl = private
    
    # DigitalOcean 配置
    [spaces]
    type = s3
    provider = DigitalOcean
    env_auth = false
    access_key_id = XXXXXXXX # 获取方式自行查阅相关文档
    secret_access_key = XXXXXXXX # 获取方式自行查阅相关文档
    endpoint = sgp1.digitaloceanspaces.com
    acl = private
  2. 执行以下同步命令,将开始同步指定的AWS s3桶,这里举例`flamegraph.starcoin.org`,需要开一个窗口等待执行完成(这里建议在远程机器或者在本地上面开一个可被detach的命令行,否则当前session会被锁定在这里直到同步执行完成),请确保在执行该命令前,DigitalOcean中Space中对应的桶已经存在s3桶,这里举例`cw-to-feishu-westar`,需要开一个窗口等待执行完成(这里建议在远程机器或者在本地上面开一个可被detach的命令行,否则当前session会被锁定在这里直到同步执行完成),请确保在执行该命令前,DigitalOcean中Space中对应的桶已经存在

    代码块
    languagebash
    # 同步s3 的 cw-to-feishu-westar  桶中的内容到 DigitalOcean中对应的桶
    rclone --no-check-certificate sync s3:cw-to-feishu-westar spaces:cw-to-feishu-westar -vv
    
    # 同步完成后执行该命令做检查
    rclone check s3:cw-to-feishu-westar spaces:cw-to-feishu-westar -vv

...

  1. 执行以下命令进行同步

    代码块
    languagejson
    POST _snapshot/s3_backup_repository/snapshot-20240917/_restore {
      "indices": "barnard.0727.uncle_blocks",
      "ignore_unavailable": true,
      "include_global_state": false
    }
    # 成功
    {
      "acknowledge": true
    }
  2. 查看同步进度,可以通过检查doc的数量与源集群的索引是否一致来判断是否同步完成

    代码块
    languagejson
    GET /barnard.0727.uncle_blocks/_count
    
    {
      "count" : 1109354,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      }
    }
    

Postgresql数据库

pg_dump 格式的迁移

(推荐使用该格式进行迁移)该部分比较简单,由于数据量较小(1.7G),直接使用 pg_dump 工具将其所有数据直接导出即可,之后可以进行导入。

在进行导出之前,先使用以下脚本先清理掉原来库中的一些数据,避免脏数据影响。注意schemaname 要改成需要的schema 名称以免清理错误。

代码块
languagesql
DO $$
DECLARE
    r RECORD;
BEGIN
    -- 禁用所有触发器
    SET session_replication_role = 'replica';
    
    -- 开始事务
    BEGIN
        -- 循环遍历指定schema中的所有表
        FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'barnard')
        LOOP
            -- 执行TRUNCATE
            EXECUTE 'TRUNCATE TABLE barnard' || quote_ident(r.tablename) || ' CASCADE';
        END LOOP;
    
    -- 提交事务
    COMMIT;
    
    -- 重新启用触发器
    SET session_replication_role = 'origin';
END $$;

此处为了影响最小,写了一个分离程序,将导出的部分拆分成了main和barnard两个schema

代码块
languagepy

# How to run
# python postgresql-dump-extractor.py \
#  -s barnard \
#  -i 576184071779_dump_20240923.sql\
#  -o 576184071779_dump_20240923_barnard.sql

import argparse
import re

def extract_copy_statements(input_file, output_file, schema):
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        copy_block = []
        in_copy_block = False
        copy_pattern = re.compile(rf'^COPY {re.escape(schema)}\.(\w+)')

        for line in infile:
            if not in_copy_block:
                if copy_pattern.match(line):
                    in_copy_block = True
                    copy_block = [line]
            else:
                copy_block.append(line)
                if line.strip() == '\.':
                    outfile.writelines(copy_block)
                    outfile.write('\n')
                    in_copy_block = False
                    copy_block = []

def main():
    parser = argparse.ArgumentParser(description='Extract COPY statements from PostgreSQL dump file.')
    parser.add_argument('-i', '--input', required=True, help='Input dump file name')
    parser.add_argument('-o', '--output', required=True, help='Output file name')
    parser.add_argument('-s', '--schema', required=True, help='Schema name')

    args = parser.parse_args()

    extract_copy_statements(args.input, args.output, args.schema)
    print(f"Extraction complete. Output written to {args.output}")

if __name__ == "__main__":
    main()

运行以下命令可以将其数据分离出来,注意每次都要指定 schema 命令,之后可以通过psql命令导入分离出来的sql脚本。

代码块
languagebash
python postgresql-dump-extractor.py \
  -s barnard \
  -i 576184071779_dump_20240923.sql\
  -o 576184071779_dump_20240923_barnard.sql

基于AWS snapshot格式的迁移

在使用AWS的RDS创建快照的时候,RDS会将其创建为基于Parquet格式的快照https://parquet.apache.org/ ,(关于RDS为何创建为Parquet格式,可能为了考虑兼容性和跨不同的数据服务)在基于这个格式的快照下,我们可以通过创建python文件来读取parquet格式,Python文件如下:

代码块
languagepy
import pandas as pd
import psycopg2
from pathlib import Path
import os
import logging
import sys
from datetime import datetime
import traceback

# 设置日志
def setup_logging():
    # 创建日志目录
    log_dir = 'restore_logs'
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # 生成日志文件名,包含时间戳
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    log_file = f'{log_dir}/restore_{timestamp}.log'
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler(sys.stdout)
        ]
    )
    return logging.getLogger(__name__)

def create_schemas(conn, logger):
    schemas = ['barnard', 'main', 'halley', 'proxima', 'starcoin_user']
    with conn.cursor() as cur:
        for schema in schemas:
            try:
                cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
                logger.info(f"Schema {schema} created or already exists")
            except Exception as e:
                logger.error(f"Error creating schema {schema}: {str(e)}")
                raise
    conn.commit()

def get_table_name(file_path):
    try:
        parts = str(file_path).split('/')
        full_table_name = parts[-3]  # 例如 'barnard.address_holder'
        return full_table_name
    except Exception as e:
        raise ValueError(f"Invalid file path structure: {file_path}. Error: {str(e)}")

def get_postgres_type(pandas_type):
    type_mapping = {
        'int64': 'BIGINT',
        'float64': 'DOUBLE PRECISION',
        'object': 'TEXT',
        'bool': 'BOOLEAN',
        'datetime64[ns]': 'TIMESTAMP',
        'category': 'TEXT'
    }
    pg_type = type_mapping.get(str(pandas_type), 'TEXT')
    return pg_type

def import_parquet_to_postgres(parquet_dir, db_params):
    logger = setup_logging()
    logger.info(f"Starting import process from directory: {parquet_dir}")
    
    # 统计信息
    stats = {
        'total_tables': 0,
        'successful_tables': 0,
        'failed_tables': 0,
        'total_rows': 0
    }
    
    try:
        conn = psycopg2.connect(**db_params)
        logger.info("Successfully connected to database")
        
        create_schemas(conn, logger)
        
        # 获取所有parquet文件
        parquet_files = list(Path(parquet_dir).rglob('*.parquet'))
        stats['total_tables'] = len(parquet_files)
        logger.info(f"Found {stats['total_tables']} tables to import")
        
        for path in parquet_files:
            table_name = get_table_name(path)
            logger.info(f"\nProcessing table: {table_name}")
            
            try:
                # 读取parquet文件
                logger.info(f"Reading parquet file: {path}")
                df = pd.read_parquet(str(path))
                row_count = len(df)
                logger.info(f"Found {row_count} rows in {table_name}")
                
                # 获取列信息
                columns = df.columns
                dtypes = df.dtypes
                logger.info(f"Columns found: {', '.join(columns)}")
                
                # 创建表
                create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ("
                column_defs = []
                for col, dtype in zip(columns, dtypes):
                    pg_type = get_postgres_type(dtype)
                    column_defs.append(f"\"{col}\" {pg_type}")
                    logger.debug(f"Column {col}: {dtype} -> {pg_type}")
                create_table_sql += ", ".join(column_defs) + ")"
                
                with conn.cursor() as cur:
                    # 检查表是否已存在
                    schema, table = table_name.split('.')
                    cur.execute(f"""
                        SELECT COUNT(*) 
                        FROM information_schema.tables 
                        WHERE table_schema = %s 
                        AND table_name = %s
                    """, (schema, table))
                    table_exists = cur.fetchone()[0] > 0
                    
                    if table_exists:
                        logger.warning(f"Table {table_name} already exists. Truncating...")
                        cur.execute(f"TRUNCATE TABLE {table_name}")
                    else:
                        logger.info(f"Creating table {table_name}")
                        cur.execute(create_table_sql)
                    
                    # 使用COPY命令批量插入数据
                    logger.info(f"Starting data import for {table_name}")
                    from io import StringIO
                    buffer = StringIO()
                    df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='\\N')
                    buffer.seek(0)
                    
                    cur.copy_from(buffer, table_name, columns=columns, null='\\N')
                    
                    # 验证导入的行数
                    cur.execute(f"SELECT COUNT(*) FROM {table_name}")
                    imported_rows = cur.fetchone()[0]
                    if imported_rows == row_count:
                        logger.info(f"Successfully imported {imported_rows} rows into {table_name}")
                    else:
                        logger.warning(f"Row count mismatch in {table_name}. Expected: {row_count}, Imported: {imported_rows}")
                
                conn.commit()
                stats['successful_tables'] += 1
                stats['total_rows'] += row_count
                logger.info(f"Successfully imported table {table_name}")
                
            except Exception as e:
                stats['failed_tables'] += 1
                logger.error(f"Error processing table {table_name}:")
                logger.error(traceback.format_exc())
                conn.rollback()
                
        # 打印最终统计信息
        logger.info("\nImport Summary:")
        logger.info(f"Total tables processed: {stats['total_tables']}")
        logger.info(f"Successfully imported tables: {stats['successful_tables']}")
        logger.info(f"Failed tables: {stats['failed_tables']}")
        logger.info(f"Total rows imported: {stats['total_rows']}")
        
    except Exception as e:
        logger.error("Fatal error in import process:")
        logger.error(traceback.format_exc())
        raise
    finally:
        if 'conn' in locals():
            conn.close()
            logger.info("Database connection closed")

def check_postgres_connection(db_params, logger):
    max_attempts = 3
    attempt = 0
    while attempt < max_attempts:
        try:
            conn = psycopg2.connect(**db_params)
            conn.close()
            logger.info("Successfully connected to PostgreSQL")
            return True
        except psycopg2.OperationalError:
            attempt += 1
            logger.warning(f"Connection attempt {attempt} failed. Waiting 5 seconds...")
            time.sleep(5)
    
    logger.error("Could not connect to PostgreSQL after multiple attempts")
    return False

if __name__ == "__main__":
    # 数据库连接参数
    db_params = {
        'dbname': 'starocin-swapinfo',
        'user': os.environ.get('POSTGRES_USER', 'postgres'),
        'password': os.environ.get('POSTGRES_PASSWORD', 'your_password'),
        'host': 'localhost'
    }

    # 检查数据库连接性
    if not check_postgres_connection(db_params, logger):
        logger.error("PostgreSQL is not running or not accessible")
        sys.exit(1)

    try:
        parquet_dir = '/root/starcoin'
        import_parquet_to_postgres(parquet_dir, db_params)
    except Exception as e:
        logging.error("程序执行失败")
        logging.error(traceback.format_exc())
        sys.exit(1)