"""ダンプ復元スクリプト""" import os import subprocess import textwrap from src.logging.get_logger import get_logger from src.system_var import constants, environment logger = get_logger('ダンプ復元スクリプト') def exec(): try: logger.info('ダンプ復元スクリプト:開始') # 事前処理(共通処理としては空振りする) _pre_exec() # メイン処理 # MySQL接続情報を作成する my_cnf_file_content = f""" [client] user={environment.DB_USERNAME} password={environment.DB_PASSWORD} host={environment.DB_HOST} """ # my.cnfファイルのパス my_cnf_path = os.path.join('my.cnf') # my.cnfファイルを生成する with open(my_cnf_path, 'w') as f: f.write(textwrap.dedent(my_cnf_file_content)[1:-1]) os.chmod(my_cnf_path, 0o444) # DBへの接続エラーを早期に検出するため、事前にMySQLサーバーに接続 mysql_pre_process = subprocess.Popen( ['mysql', f'--defaults-file={my_cnf_path}', '-P', f"{environment.DB_PORT}", environment.DB_SCHEMA, '-N', '-e', 'SELECT 1;'], stderr=subprocess.PIPE ) _, error = mysql_pre_process.communicate() if mysql_pre_process.returncode != 0: logger.error( f'MySQLサーバーへの接続に失敗しました。{"" if error is None else error.decode("utf-8")}') return constants.BATCH_EXIT_CODE_SUCCESS # 復元対象のダンプファイルを特定 s3_file_path = environment.DUMP_FILE_S3_PATH # aws s3 cpコマンドを実行してdumpファイルをローカルにダウンロードする s3_cp_process = subprocess.Popen( ['aws', 's3', 'cp', s3_file_path, './dump.gz'], stderr=subprocess.PIPE) _, error = s3_cp_process.communicate() if s3_cp_process.returncode != 0: logger.error( f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') return constants.BATCH_EXIT_CODE_SUCCESS # S3コマンドの標準エラーはクローズしておく s3_cp_process.stderr.close() # gzipコマンドを実行してdumpファイルを解凍する gzip_process = subprocess.Popen( ['gunzip', '-c', './dump.gz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # mysqlコマンドを実行し、dumpを復元する mysql_process = subprocess.Popen( ['mysql', f'--defaults-file={my_cnf_path}', '-P', f"{environment.DB_PORT}", environment.DB_SCHEMA], stdin=gzip_process.stdout, stderr=subprocess.PIPE ) # gzipの標準出力をmysqlに接続したため、標準出力をクローズする gzip_process.stdout.close() _, error = mysql_process.communicate() if mysql_process.returncode != 0: logger.error( f'コマンド実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') return constants.BATCH_EXIT_CODE_SUCCESS # 事後処理(共通処理としては空振りする) _post_exec() logger.info('[NOTICE]ダンプ復元スクリプト:終了(正常終了)') return constants.BATCH_EXIT_CODE_SUCCESS except Exception as e: logger.exception(f'ダンプ復元スクリプト中に想定外のエラーが発生しました :{e}') return constants.BATCH_EXIT_CODE_SUCCESS def _pre_exec(): """ ダンプ復元 事前処理 共通機能としては事前処理を実装しない。 事前処理が必要なダンプ復元処理を実装する場合、当ロジックをコピーする。 """ pass def _post_exec(): """ ダンプ復元 事後処理 共通機能としては事後処理を実装しない。 事後処理が必要なダンプ復元処理を実装する場合、当ロジックをコピーする。 """ pass