...
执行以下命令进行同步
代码块 language json POST _snapshot/s3_backup_repository/snapshot-20240917/_restore { "indices": "barnard.0727.uncle_blocks", "ignore_unavailable": true, "include_global_state": false } # 成功 { "acknowledge": true }
查看同步进度,可以通过检查doc的数量与源集群的索引是否一致来判断是否同步完成
代码块 language json GET /barnard.0727.uncle_blocks/_count { "count" : 1109354, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 } }
Postgresql数据库
...
pg_dump 格式的迁移
(推荐使用该格式进行迁移)该部分比较简单,由于数据量较小(1.7G),直接使用 pg_dump 工具将其所有数据直接导出即可,之后可以进行导入。
...
代码块 | ||
---|---|---|
| ||
python postgresql-dump-extractor.py \ -s barnard \ -i 576184071779_dump_20240923.sql\ -o 576184071779_dump_20240923_barnard.sql |
基于AWS snapshot格式的迁移
在使用AWS的RDS创建快照的时候,RDS会将其创建为基于Parquet格式的快照https://parquet.apache.org/ ,(关于RDS为何创建为Parquet格式,可能为了考虑兼容性和跨不同的数据服务)在基于这个格式的快照下,我们可以通过创建python文件来读取parquet格式,Python文件如下:
代码块 | ||
---|---|---|
| ||
import pandas as pd
import psycopg2
from pathlib import Path
import os
import logging
import sys
from datetime import datetime
import traceback
# 设置日志
def setup_logging():
# 创建日志目录
log_dir = 'restore_logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 生成日志文件名,包含时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = f'{log_dir}/restore_{timestamp}.log'
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def create_schemas(conn, logger):
schemas = ['barnard', 'main', 'halley', 'proxima', 'starcoin_user']
with conn.cursor() as cur:
for schema in schemas:
try:
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
logger.info(f"Schema {schema} created or already exists")
except Exception as e:
logger.error(f"Error creating schema {schema}: {str(e)}")
raise
conn.commit()
def get_table_name(file_path):
try:
parts = str(file_path).split('/')
full_table_name = parts[-3] # 例如 'barnard.address_holder'
return full_table_name
except Exception as e:
raise ValueError(f"Invalid file path structure: {file_path}. Error: {str(e)}")
def get_postgres_type(pandas_type):
type_mapping = {
'int64': 'BIGINT',
'float64': 'DOUBLE PRECISION',
'object': 'TEXT',
'bool': 'BOOLEAN',
'datetime64[ns]': 'TIMESTAMP',
'category': 'TEXT'
}
pg_type = type_mapping.get(str(pandas_type), 'TEXT')
return pg_type
def import_parquet_to_postgres(parquet_dir, db_params):
logger = setup_logging()
logger.info(f"Starting import process from directory: {parquet_dir}")
# 统计信息
stats = {
'total_tables': 0,
'successful_tables': 0,
'failed_tables': 0,
'total_rows': 0
}
try:
conn = psycopg2.connect(**db_params)
logger.info("Successfully connected to database")
create_schemas(conn, logger)
# 获取所有parquet文件
parquet_files = list(Path(parquet_dir).rglob('*.parquet'))
stats['total_tables'] = len(parquet_files)
logger.info(f"Found {stats['total_tables']} tables to import")
for path in parquet_files:
table_name = get_table_name(path)
logger.info(f"\nProcessing table: {table_name}")
try:
# 读取parquet文件
logger.info(f"Reading parquet file: {path}")
df = pd.read_parquet(str(path))
row_count = len(df)
logger.info(f"Found {row_count} rows in {table_name}")
# 获取列信息
columns = df.columns
dtypes = df.dtypes
logger.info(f"Columns found: {', '.join(columns)}")
# 创建表
create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ("
column_defs = []
for col, dtype in zip(columns, dtypes):
pg_type = get_postgres_type(dtype)
column_defs.append(f"\"{col}\" {pg_type}")
logger.debug(f"Column {col}: {dtype} -> {pg_type}")
create_table_sql += ", ".join(column_defs) + ")"
with conn.cursor() as cur:
# 检查表是否已存在
schema, table = table_name.split('.')
cur.execute(f"""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema = %s
AND table_name = %s
""", (schema, table))
table_exists = cur.fetchone()[0] > 0
if table_exists:
logger.warning(f"Table {table_name} already exists. Truncating...")
cur.execute(f"TRUNCATE TABLE {table_name}")
else:
logger.info(f"Creating table {table_name}")
cur.execute(create_table_sql)
# 使用COPY命令批量插入数据
logger.info(f"Starting data import for {table_name}")
from io import StringIO
buffer = StringIO()
df.to_csv(buffer, index=False, header=False, sep='\t', na_rep='\\N')
buffer.seek(0)
cur.copy_from(buffer, table_name, columns=columns, null='\\N')
# 验证导入的行数
cur.execute(f"SELECT COUNT(*) FROM {table_name}")
imported_rows = cur.fetchone()[0]
if imported_rows == row_count:
logger.info(f"Successfully imported {imported_rows} rows into {table_name}")
else:
logger.warning(f"Row count mismatch in {table_name}. Expected: {row_count}, Imported: {imported_rows}")
conn.commit()
stats['successful_tables'] += 1
stats['total_rows'] += row_count
logger.info(f"Successfully imported table {table_name}")
except Exception as e:
stats['failed_tables'] += 1
logger.error(f"Error processing table {table_name}:")
logger.error(traceback.format_exc())
conn.rollback()
# 打印最终统计信息
logger.info("\nImport Summary:")
logger.info(f"Total tables processed: {stats['total_tables']}")
logger.info(f"Successfully imported tables: {stats['successful_tables']}")
logger.info(f"Failed tables: {stats['failed_tables']}")
logger.info(f"Total rows imported: {stats['total_rows']}")
except Exception as e:
logger.error("Fatal error in import process:")
logger.error(traceback.format_exc())
raise
finally:
if 'conn' in locals():
conn.close()
logger.info("Database connection closed")
def check_postgres_connection(db_params, logger):
max_attempts = 3
attempt = 0
while attempt < max_attempts:
try:
conn = psycopg2.connect(**db_params)
conn.close()
logger.info("Successfully connected to PostgreSQL")
return True
except psycopg2.OperationalError:
attempt += 1
logger.warning(f"Connection attempt {attempt} failed. Waiting 5 seconds...")
time.sleep(5)
logger.error("Could not connect to PostgreSQL after multiple attempts")
return False
if __name__ == "__main__":
# 数据库连接参数
db_params = {
'dbname': 'starocin-swapinfo',
'user': os.environ.get('POSTGRES_USER', 'postgres'),
'password': os.environ.get('POSTGRES_PASSWORD', 'your_password'),
'host': 'localhost'
}
# 检查数据库连接性
if not check_postgres_connection(db_params, logger):
logger.error("PostgreSQL is not running or not accessible")
sys.exit(1)
try:
parquet_dir = '/root/starcoin'
import_parquet_to_postgres(parquet_dir, db_params)
except Exception as e:
logging.error("程序执行失败")
logging.error(traceback.format_exc())
sys.exit(1)
|