"""DBダンプ取得""" import datetime import os import subprocess import textwrap from src.logging.get_logger import get_logger from src.system_var import constants, environment logger = get_logger('DBダンプ取得') def exec(): try: logger.info('DBダンプ取得:開始') # 事前処理(共通処理としては空振りする) _pre_exec() # メイン処理 # MySQL接続情報を作成する my_cnf_file_content = f""" [client] user={environment.DB_USERNAME} password={environment.DB_PASSWORD} host={environment.DB_HOST} """ # my.cnfファイルのパス my_cnf_path = os.path.join('my.cnf') # my.cnfファイルを生成する with open(my_cnf_path, 'w') as f: f.write(textwrap.dedent(my_cnf_file_content)[1:-1]) # ファイルのパーミッションが強いとmysqldumpコマンドが実行できないため # my.cnfファイルのパーミッションをread-onlyに設定 os.chmod(my_cnf_path, 0o444) dt_now = datetime.datetime.now() converted_value = dt_now.strftime('%Y%m%d%H%M%S%f') dump_file_name = f'backup_rds_{environment.DB_SCHEMA}_{converted_value}.gz' s3_file_path = f's3://{environment.DUMP_BACKUP_BUCKET}/{constants.DUMP_BACKUP_FOLDER}/{dt_now.year}/{dt_now.strftime("%m")}/{dt_now.strftime("%d")}/{dump_file_name}' # mysqldumpコマンドを実行し、dumpを取得する command = [ 'mysqldump', f'--defaults-file={my_cnf_path}', '-P', f"{environment.DB_PORT}", '--no-tablespaces', '--skip-column-statistics', '--single-transaction', '--set-gtid-purged=OFF', environment.DB_SCHEMA ] mysqldump_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # gzipコマンドを実行してdump結果を圧縮する gzip_process = subprocess.Popen(['gzip', '-c'], stdin=mysqldump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # aws s3 cpコマンドを実行してアップロードする s3_cp_process = subprocess.Popen(['aws', 's3', 'cp', '-', s3_file_path], stdin=gzip_process.stdout, stderr=subprocess.PIPE) # mysqldumpの標準出力をgzipに接続したため、標準出力をクローズする mysqldump_process.stdout.close() # gzipの標準出力をaws s3 cpに接続したため、標準出力をクローズする gzip_process.stdout.close() # パイプラインを実行し、エラーハンドリング _, error = mysqldump_process.communicate() if mysqldump_process.returncode != 0: raise Exception(f'`mysqldump`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') _, error = gzip_process.communicate() if gzip_process.returncode != 0: raise Exception(f'`gzip`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') _, error = s3_cp_process.communicate() if s3_cp_process.returncode != 0: raise Exception(f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') # 事後処理(共通処理としては空振りする) _post_exec() logger.info('DBダンプ取得:終了(正常終了)') logger.info(f'出力ファイルパス: {s3_file_path}') return constants.BATCH_EXIT_CODE_SUCCESS except Exception as e: logger.exception(f'DBダンプ取得中に想定外のエラーが発生しました :{e}') return constants.BATCH_EXIT_CODE_SUCCESS def _pre_exec(): """ ダンプ復元 事前処理 共通機能としては事前処理を実装しない。 事前処理が必要なダンプ復元処理を実装する場合、当ロジックをコピーする。 """ pass def _post_exec(): """ ダンプ復元 事後処理 共通機能としては事後処理を実装しない。 事後処理が必要なダンプ復元処理を実装する場合、当ロジックをコピーする。 """ pass