DigitalOcean 服务迁移笔记

DigitalOcean 服务迁移笔记

概述

经过差不多两周的迁移终于将AWS账号(576184071779)下的数据迁移到DigitalOcean,包含以下几个项目S3桶、前端项目、ES服务数据(main和barnard)、Postgresql数据库这几部分

s3迁移

目标是将AWS中的ES数据迁移到DigitalOcean,使用工具 rclone (参考https://github.com/rclone/rclone ,该工具支持一些市面上一些主流的存储服务的相互迁移)

分为以下步骤

  1. 首先安装rclone成功后,进行配置,我们需要配置源和目标,有两种配置方法即通过命令行配置和配置文件(实际上命令行配置后也会写入到该配置文件,文件路径为~/.config/rclone/rclone.conf),文件内容如下

    # AWS 配置 [s3] type = s3 provider = AWS env_auth = false access_key_id = XXXXXXXX # 获取方式自行查阅相关文档 secret_access_key = XXXXXXXX # 获取方式自行查阅相关文档 region = ap-northeast-1 location_constraint = ap-northeast-1 acl = private # DigitalOcean 配置 [spaces] type = s3 provider = DigitalOcean env_auth = false access_key_id = XXXXXXXX # 获取方式自行查阅相关文档 secret_access_key = XXXXXXXX # 获取方式自行查阅相关文档 endpoint = sgp1.digitaloceanspaces.com acl = private
  2. 执行以下同步命令,将开始同步指定的AWS s3桶,这里举例`cw-to-feishu-westar`,需要开一个窗口等待执行完成(这里建议在远程机器或者在本地上面开一个可被detach的命令行,否则当前session会被锁定在这里直到同步执行完成),请确保在执行该命令前,DigitalOcean中Space中对应的桶已经存在

    # 同步s3 的 cw-to-feishu-westar 桶中的内容到 DigitalOcean中对应的桶 rclone --no-check-certificate sync s3:cw-to-feishu-westar spaces:cw-to-feishu-westar -vv # 同步完成后执行该命令做检查 rclone check s3:cw-to-feishu-westar spaces:cw-to-feishu-westar -vv

 

ElasticSearch数据迁移

目标是将AWS中的ES数据(源集群,下同)迁移到DigitalOcean(目标集群,下同),这里选择了采用基于快照的迁移方式,基于以下优点考虑:

  1. 侵入式比较少且数据也会比较完整

  2. 可以选择自己想要的部分同步

  3. 采用挂载方式省去快照数据拷贝的过程

主要的流程为

  1. 先在源集群上创建一个快照,并存入到对应的s3存储桶

  2. 在目标集群挂载s3

  3. 从快照中进行数据同步

1. 在源的集群上创建快照

AWS这里比较麻烦,需要先去配置权限,(这里参考文档https://docs.aws.amazon.com/zh_cn/opensearch-service/latest/developerguide/managedomains-snapshots.html )如果这里不太熟悉AWS的权限,那么先建议阅读一下相关文档,我这里简要说明一下,AWS中有角色和账户,通常可以通过创建角色来为该角色配置安全策略。关于创建策略和角色参考以下文档:

https://docs.aws.amazon.com/zh_cn/IAM/latest/UserGuide/access_policies_create-console.html

https://docs.aws.amazon.com/zh_cn/IAM/latest/UserGuide/id_roles_create_for-user.html

  1. 首先需要创建一个角色专门为快照进行同步,且赋予它两个IAM策略(上面创建快照文档中均有说明)

    1. 角色本身有个信任源,需要修改角色信任关系,让其信任OpenSearch服务

      { "Version": "2012-10-17", "Statement": [{ "Sid": "", "Effect": "Allow", "Principal": { "Service": "es.amazonaws.com" }, "Action": "sts:AssumeRole" }] }
    2. 允许读写桶的策略,这里的策略资源名称为arn:aws:iam::576184071779:policy/es-snapshot-s3-access,

      { "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:ListBucket" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::elasticserch-snapshot-backup" ] }, { "Action": [ "s3:GetObject", "s3:PutObject", "s3:DeleteObject" ], "Effect": "Allow", "Resource": [ "arn:aws:s3:::elasticserch-snapshot-backup/*" ] } ] }
    3. 是将这个角色的读写桶的权限交给ElasticSearch(AWS里面叫OpenSearch),这里我们称为权限传递,可见传递角色为arn:aws:iam::576184071779:role/es-snapshot,传递给ES服务

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iam:PassRole", "Resource": "arn:aws:iam::576184071779:role/es-snapshot" }, { "Effect": "Allow", "Action": "es:ESHttpPut", "Resource": "arn:aws:es:ap-northeast-1:576184071779:domain/starcoin-es2/*" } ] }
  2. 使用 awscurl 发起请求创建快照库(这里一定要用awscurl,否则OpenSearch服务不知道请求者的身份),如果已经配置了 AWS CLI,awscurl 可以使用相同的凭证文件(通常位于 ~/.aws/credentials),有了快照库之后才会有快照。

    # 创建s3快照库 awscurl --service es --region ap-northeast-1 -XPUT 'https://search-starcoin-es2-47avtmhexhbg7qtynzebcnnu64.ap-northeast-1.es.amazonaws.com/_snapshot/my-snapshot-repo?pretty' -H 'Content-Type: application/json' -d '{"type": "s3", "settings": {"role_arn": "arn:aws:iam::576184071779:role/es-snapshot", "region": "ap-northeast-1", "bucket": "elasticserch-snapshot-backup"}} { "acknowledge": true } # 创建快照 PUT _snapshot/my-snapshot-repo/snapshot-20240917 { "acknowledge": true }
  3. 在kibana的devtool中查看快照的创建进度,若状态为 SUCCESS 说明创建成功

    GET _snapshot/my-snapshot-repo/snapshot-20240917 { "snapshots" : [ { "snapshot" : "snapshot-20240917", "uuid" : "TVlHLRoMSXupw60xQgsWcA", "version_id" : 7100299, "version" : "7.10.2", "indices" : [ "halley.0727.transfer_journal", "vega.0727.block_ids", "vega.0727.txn_events", "vega.0727.dag_inspector_block", "vega.0727.pending_txns", "halley.0727.block_ids", ".opendistro-anomaly-detector-jobs", "halley.0727.token_info", "barnard.0727.blocks", ".tasks", "proxima.0727.pending_txns", "barnard.0727.txn_events", "vega.0727.dag_inspector_height_group", "main.0727.market_cap", "barnard.0727.txn_infos", "txn_infos", "barnard.0727.market_cap_bak", "opendistro-sample-http-responses", "halley.0727.txn_events", "main.0727.pending_txns", "vega.0727.txn_infos", "proxima.0727.transfer_journal", "proxima.0727.address_holder", "halley.0727.txn_infos", "barnard.0727.txn_payloads", "vega.0727.transfer_journal", "barnard.0727.918.address_holder", ".opendistro-anomaly-detectors", "barnard.0914.txn_infos", ".opendistro-reports-definitions", ".opendistro_security", "main.0727.txn_payloads", "main.0727.token_info", ".opendistro-job-scheduler-lock", "halley.0727.txn_payloads", "main.0727.txn_infos", ".opendistro-anomaly-results-history-2021.05.07-1", "proxima.0727.token_info", "barnard.0727.market_cap", ".opendistro-reports-instances", "barnard.0727.block_ids", "main.0727.transfer_journal", "halley.0727.transfer", "vega.0727.txn_payloads", "halley.0727.address_holder", "vega.0727.market_cap", "proxima.0727.transfer", "vega.0727.uncle_blocks", "vega.0727.address_holder", ".opendistro-anomaly-checkpoints", "vega.0727.token_info", "halley.0727.blocks", "barnard.0727.txn_infos_0915", "main.0727.transfer", "halley.0727.uncle_blocks", ".kibana_1", "barnard.0727.address_holder", "proxima.0727.txn_infos", "proxima.0727.blocks", "halley.0727.market_cap", "proxima.0727.uncle_blocks", "barnard.0727.transfer_journal", "barnard.0727.token_info", "main.0727.uncle_blocks", "barnard.0727.uncle_blocks", "main.0727.block_ids", "vega.0727.blocks", "proxima.0727.market_cap", "barnard.0401.txn_infos", "halley.0727.pending_txns", ".opendistro-anomaly-detection-state", "vega.0727.transfer", "proxima.0727.txn_payloads", "barnard.0727.pending_txns", "main.0727.txn_events", "test_index", "main.0727.blocks", "barnard.0727.transfer", "proxima.0727.block_ids", "main.0727.address_holder", ".kibana_-1666338091_elastic_1", "vega.0727.dag_inspector_edge", "proxima.0727.txn_events" ], "data_streams" : [ ], "include_global_state" : true, "state" : "SUCCESS", "start_time" : "2024-09-17T05:04:52.562Z", "start_time_in_millis" : 1726549492562, "end_time" : "2024-09-17T07:08:33.370Z", "end_time_in_millis" : 1726556913370, "duration_in_millis" : 7420808, "failures" : [ ], "shards" : { "total" : 381, "failed" : 0, "successful" : 381 } } ] }

2. 在目标集群挂载S3

  1. 增加配置,这里在命令行做了两件事情:a. 在es服务上面安装s3-repository插件,b. 将aws s3的访问信息加入到es库中

    # elasticsearch-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: elasticsearch spec: replicas: 1 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch spec: ... ################## # 新增部分 lifecycle: postStart: exec: command: ["/bin/bash", "-c", "/usr/share/elasticsearch/bin/elasticsearch-plugin list | grep -q repository-s3 || /usr/share/elasticsearch/bin/elasticsearch-plugin install --batch repository-s3 && \ echo ${S3_CLIENT_ACCESS_KEY} | /usr/share/elasticsearch/bin/elasticsearch-keystore add s3.client.default.access_key --stdin &&\ echo ${S3_CLIENT_SECRET_KEY} | /usr/share/elasticsearch/bin/elasticsearch-keystore add s3.client.default.secret_key --stdin"] ################## ... --- # Elasticsearch Configuration apiVersion: v1 kind: ConfigMap metadata: name: elasticsearch-config data: elasticsearch.yml: | ... ################## # 新增部分 s3.client.default.endpoint: "s3.ap-northeast-1.amazonaws.com" s3.client.default.protocol: https s3.client.default.read_timeout: 50s s3.client.default.max_retries: 3 s3.client.default.use_throttle_retries: true ################## ...
  2. 启动后在目标集群的kibana devtool中创建挂载的快照库

    PUT _snapshot/s3_backup_repository { "type": "s3", "settings": { "region": "ap-northeast-1", "bucket": "elasticserch-snapshot-backup", "compress": true, "server_side_encryption": true, "storage_class": "standard" } } # 若成功,表明s3的快照库关联成功 { "acknowledge": true } # 若失败,则需要检查上一步中的S3_CLIENT_ACCESS_KEY和S3_CLIENT_SECRET_KEY是否成功添加 { "error" : { "root_cause" : [ { "type" : "repository_exception", "reason" : "[s3_backup_repository] Could not determine repository generation from root blobs" } ], "type" : "repository_exception", "reason" : "[s3_backup_repository] Could not determine repository generation from root blobs", "caused_by" : { "type" : "i_o_exception", "reason" : "Exception when listing blobs by prefix [index-]", "caused_by" : { "type" : "sdk_client_exception", "reason" : "The requested metadata is not found at http://169.254.169.254/latest/meta-data/iam/security-credentials/ " } } }, "status" : 500 }
  3. 检查一下快照库中的快照是否存在,如果存在说明挂载的快照可用,此时就可以进行数据同步了

    GET _snapshot/s3_backup_repository/_all { "snapshots" : [ { "snapshot" : "snapshot-20240917", "uuid" : "TVlHLRoMSXupw60xQgsWcA", "repository" : "s3_backup_repository", "version_id" : 7100299, "version" : "7.10.2", "indices" : [ "halley.0727.transfer_journal", "vega.0727.block_ids", "vega.0727.txn_events", "vega.0727.dag_inspector_block", "vega.0727.pending_txns", "halley.0727.block_ids", ".opendistro-anomaly-detector-jobs", "halley.0727.token_info", "barnard.0727.blocks", ".tasks", "proxima.0727.pending_txns", "barnard.0727.txn_events", "vega.0727.dag_inspector_height_group", "main.0727.market_cap", "barnard.0727.txn_infos", "txn_infos", "barnard.0727.market_cap_bak", "opendistro-sample-http-responses", "halley.0727.txn_events", "main.0727.pending_txns", "vega.0727.txn_infos", "proxima.0727.transfer_journal", "proxima.0727.address_holder", "halley.0727.txn_infos", "barnard.0727.txn_payloads", "vega.0727.transfer_journal", "barnard.0727.918.address_holder", ".opendistro-anomaly-detectors", "barnard.0914.txn_infos", ".opendistro-reports-definitions", ".opendistro_security", "main.0727.txn_payloads", "main.0727.token_info", ".opendistro-job-scheduler-lock", "halley.0727.txn_payloads", "main.0727.txn_infos", ".opendistro-anomaly-results-history-2021.05.07-1", "proxima.0727.token_info", "barnard.0727.market_cap", ".opendistro-reports-instances", "barnard.0727.block_ids", "main.0727.transfer_journal", "halley.0727.transfer", "vega.0727.txn_payloads", "halley.0727.address_holder", "vega.0727.market_cap", "proxima.0727.transfer", "vega.0727.uncle_blocks", "vega.0727.address_holder", ".opendistro-anomaly-checkpoints", "vega.0727.token_info", "halley.0727.blocks", "barnard.0727.txn_infos_0915", "main.0727.transfer", "halley.0727.uncle_blocks", ".kibana_1", "barnard.0727.address_holder", "proxima.0727.txn_infos", "proxima.0727.blocks", "halley.0727.market_cap", "proxima.0727.uncle_blocks", "barnard.0727.transfer_journal", "barnard.0727.token_info", "main.0727.uncle_blocks", "barnard.0727.uncle_blocks", "main.0727.block_ids", "vega.0727.blocks", "proxima.0727.market_cap", "barnard.0401.txn_infos", "halley.0727.pending_txns", ".opendistro-anomaly-detection-state", "vega.0727.transfer", "proxima.0727.txn_payloads", "barnard.0727.pending_txns", "main.0727.txn_events", "test_index", "main.0727.blocks", "barnard.0727.transfer", "proxima.0727.block_ids", "main.0727.address_holder", ".kibana_-1666338091_elastic_1", "vega.0727.dag_inspector_edge", "proxima.0727.txn_events" ], "data_streams" : [ ], "include_global_state" : true, "state" : "SUCCESS", "start_time" : "2024-09-17T05:04:52.562Z", "start_time_in_millis" : 1726549492562, "end_time" : "2024-09-17T07:08:33.370Z", "end_time_in_millis" : 1726556913370, "duration_in_millis" : 7420808, "failures" : [ ], "shards" : { "total" : 381, "failed" : 0, "successful" : 381 }, "feature_states" : [ ] } ], "total" : 1, "remaining" : 0 }

 

3. 在目标集群上同步挂载快照的数据

  1. 执行以下命令进行同步

    POST _snapshot/s3_backup_repository/snapshot-20240917/_restore { "indices": "barnard.0727.uncle_blocks", "ignore_unavailable": true, "include_global_state": false } # 成功 { "acknowledge": true }
  2. 查看同步进度,可以通过检查doc的数量与源集群的索引是否一致来判断是否同步完成

    GET /barnard.0727.uncle_blocks/_count { "count" : 1109354, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 } }

 

Postgresql数据库

pg_dump 格式的迁移

(推荐使用该格式进行迁移)该部分比较简单,由于数据量较小(1.7G),直接使用 pg_dump 工具将其所有数据直接导出即可,之后可以进行导入。

在进行导出之前,先使用以下脚本先清理掉原来库中的一些数据,避免脏数据影响。注意schemaname 要改成需要的schema 名称以免清理错误。

DO $$ DECLARE r RECORD; BEGIN -- 禁用所有触发器 SET session_replication_role = 'replica'; -- 开始事务 BEGIN -- 循环遍历指定schema中的所有表 FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'barnard') LOOP -- 执行TRUNCATE EXECUTE 'TRUNCATE TABLE barnard' || quote_ident(r.tablename) || ' CASCADE'; END LOOP; -- 提交事务 COMMIT; -- 重新启用触发器 SET session_replication_role = 'origin'; END $$;

此处为了影响最小,写了一个分离程序,将导出的部分拆分成了main和barnard两个schema

# How to run # python postgresql-dump-extractor.py \ # -s barnard \ # -i 576184071779_dump_20240923.sql\ # -o 576184071779_dump_20240923_barnard.sql import argparse import re def extract_copy_statements(input_file, output_file, schema): with open(input_file, 'r') as infile, open(output_file, 'w') as outfile: copy_block = [] in_copy_block = False copy_pattern = re.compile(rf'^COPY {re.escape(schema)}\.(\w+)') for line in infile: if not in_copy_block: if copy_pattern.match(line): in_copy_block = True copy_block = [line] else: copy_block.append(line) if line.strip() == '\.': outfile.writelines(copy_block) outfile.write('\n') in_copy_block = False copy_block = [] def main(): parser = argparse.ArgumentParser(description='Extract COPY statements from PostgreSQL dump file.') parser.add_argument('-i', '--input', required=True, help='Input dump file name') parser.add_argument('-o', '--output', required=True, help='Output file name') parser.add_argument('-s', '--schema', required=True, help='Schema name') args = parser.parse_args() extract_copy_statements(args.input, args.output, args.schema) print(f"Extraction complete. Output written to {args.output}") if __name__ == "__main__": main()

运行以下命令可以将其数据分离出来,注意每次都要指定 schema 命令,之后可以通过psql命令导入分离出来的sql脚本。

python postgresql-dump-extractor.py \ -s barnard \ -i 576184071779_dump_20240923.sql\ -o 576184071779_dump_20240923_barnard.sql

 

基于AWS snapshot格式的迁移

在使用AWS的RDS创建快照的时候,RDS会将其创建为基于Parquet格式的快照https://parquet.apache.org/ ,(关于RDS为何创建为Parquet格式,可能为了考虑兼容性和跨不同的数据服务)在基于这个格式的快照下,我们可以通过创建python文件来读取parquet格式,Python文件如下:

import pandas as pd import psycopg2 from pathlib import Path import os import logging import sys from datetime import datetime import traceback # 设置日志 def setup_logging(): # 创建日志目录 log_dir = 'restore_logs' if not os.path.exists(log_dir): os.makedirs(log_dir) # 生成日志文件名,包含时间戳 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = f'{log_dir}/restore_{timestamp}.log' # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) return logging.getLogger(__name__) def create_schemas(conn, logger): schemas = ['barnard', 'main', 'halley', 'proxima', 'starcoin_user'] with conn.cursor() as cur: for schema in schemas: try: cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}") logger.info(f"Schema {schema} created or already exists") except Exception as e: logger.error(f"Error creating schema {schema}: {str(e)}") raise conn.commit() def get_table_name(file_path): try: parts = str(file_path).split('/') full_table_name = parts[-3] # 例如 'barnard.address_holder' return full_table_name except Exception as e: raise ValueError(f"Invalid file path structure: {file_path}. Error: {str(e)}") def get_postgres_type(pandas_type): type_mapping = { 'int64': 'BIGINT', 'float64': 'DOUBLE PRECISION', 'object': 'TEXT', 'bool': 'BOOLEAN', 'datetime64[ns]': 'TIMESTAMP', 'category': 'TEXT' } pg_type = type_mapping.get(str(pandas_type), 'TEXT') return pg_type def import_parquet_to_postgres(parquet_dir, db_params): logger = setup_logging() logger.info(f"Starting import process from directory: {parquet_dir}") # 统计信息 stats = { 'total_tables': 0, 'successful_tables': 0, 'failed_tables': 0, 'total_rows': 0 } try: conn = psycopg2.connect(**db_params) logger.info("Successfully connected to database") create_schemas(conn, logger) # 获取所有parquet文件 parquet_files = list(Path(parquet_dir).rglob('*.parquet')) stats['total_tables'] = len(parquet_files) logger.info(f"Found {stats['total_tables']} tables to import") for path in parquet_files: table_name = get_table_name(path) logger.info(f"\nProcessing table: {table_name}") try: # 读取parquet文件 logger.info(f"Reading parquet file: {path}") df = pd.read_parquet(str(path)) row_count = len(df) logger.info(f"Found {row_count} rows in {table_name}") # 获取列信息 columns = df.columns dtypes = df.dtypes logger.info(f"Columns found: {', '.join(columns)}") # 创建表 create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} (" column_defs = [] for col, dtype in zip(columns, dtypes): pg_type = get_postgres_type(dtype) column_defs.append(f"\"{col}\" {pg_type}") logger.debug(f"Column {col}: {dtype} -> {pg_type}") create_table_sql += ", ".join(column_defs) + ")" with conn.cursor() as cur: # 检查表是否已存在 schema, table = table_name.split('.') cur.execute(f""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = %s AND table_name = %s """, (schema, table)) table_exists = cur.fetchone()[0] > 0 if table_exists: logger.warning(f"Table {table_name} already exists. Truncating...") cur.execute(f"TRUNCATE TABLE {table_name}") else: logger.info(f"Creating table {table_name}") cur.execute(create_table_sql) # 使用COPY命令批量插入数据 logger.info(f"Starting data import for {table_name}") from io import StringIO buffer = StringIO() df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='\\N') buffer.seek(0) cur.copy_from(buffer, table_name, columns=columns, null='\\N') # 验证导入的行数 cur.execute(f"SELECT COUNT(*) FROM {table_name}") imported_rows = cur.fetchone()[0] if imported_rows == row_count: logger.info(f"Successfully imported {imported_rows} rows into {table_name}") else: logger.warning(f"Row count mismatch in {table_name}. Expected: {row_count}, Imported: {imported_rows}") conn.commit() stats['successful_tables'] += 1 stats['total_rows'] += row_count logger.info(f"Successfully imported table {table_name}") except Exception as e: stats['failed_tables'] += 1 logger.error(f"Error processing table {table_name}:") logger.error(traceback.format_exc()) conn.rollback() # 打印最终统计信息 logger.info("\nImport Summary:") logger.info(f"Total tables processed: {stats['total_tables']}") logger.info(f"Successfully imported tables: {stats['successful_tables']}") logger.info(f"Failed tables: {stats['failed_tables']}") logger.info(f"Total rows imported: {stats['total_rows']}") except Exception as e: logger.error("Fatal error in import process:") logger.error(traceback.format_exc()) raise finally: if 'conn' in locals(): conn.close() logger.info("Database connection closed") def check_postgres_connection(db_params, logger): max_attempts = 3 attempt = 0 while attempt < max_attempts: try: conn = psycopg2.connect(**db_params) conn.close() logger.info("Successfully connected to PostgreSQL") return True except psycopg2.OperationalError: attempt += 1 logger.warning(f"Connection attempt {attempt} failed. Waiting 5 seconds...") time.sleep(5) logger.error("Could not connect to PostgreSQL after multiple attempts") return False if __name__ == "__main__": # 数据库连接参数 db_params = { 'dbname': 'starocin-swapinfo', 'user': os.environ.get('POSTGRES_USER', 'postgres'), 'password': os.environ.get('POSTGRES_PASSWORD', 'your_password'), 'host': 'localhost' } # 检查数据库连接性 if not check_postgres_connection(db_params, logger): logger.error("PostgreSQL is not running or not accessible") sys.exit(1) try: parquet_dir = '/root/starcoin' import_parquet_to_postgres(parquet_dir, db_params) except Exception as e: logging.error("程序执行失败") logging.error(traceback.format_exc()) sys.exit(1)