"""日次バッチ処理前DBダンプ取得""" import datetime import os import subprocess import textwrap from src.batch.batch_functions import (get_batch_statuses, update_dump_status_kbn_complete, update_dump_status_kbn_error, update_dump_status_kbn_in_processing) from src.error.exceptions import BatchOperationException from src.logging.get_logger import get_logger from src.system_var import constants, environment logger = get_logger('日次バッチ処理前DBダンプ取得') def exec(): try: logger.info('日次バッチ処理前DBダンプ取得:開始') try: # 日次バッチ処置中フラグ、dump処理状態区分を取得 batch_processing_flag, dump_status_kbn = get_batch_statuses() except BatchOperationException as e: raise BatchOperationException(f'日付テーブル取得エラー(異常終了):{e}') # 日次バッチ処理中の場合、処理は行わない if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING: logger.error('日次バッチ処理中の為、日次バッチ処理前DBダンプ取得を終了します。') return constants.BATCH_EXIT_CODE_SUCCESS # dump処理状態区分が処理中の場合、処理は行わない if dump_status_kbn == constants.DUMP_STATUS_KBN_PROCESSED: logger.error(f'ダンプ取得中の為、日次バッチ処理前DBダンプ取得を終了します。 dump処理状態区分={dump_status_kbn}') return constants.BATCH_EXIT_CODE_SUCCESS # dump処理状態区分がエラーの場合、処理は行わない if dump_status_kbn == constants.DUMP_STATUS_KBN_ERROR: logger.error(f'ダンプ取得が実行不可能な状態の為、日次バッチ処理前DBダンプ取得を終了します。 dump処理状態区分={dump_status_kbn}') return constants.BATCH_EXIT_CODE_SUCCESS # dump処理状態区分を処理中に更新 try: update_dump_status_kbn_in_processing() except BatchOperationException as e: raise BatchOperationException(f'dump処理状態区分更新(未処理→処理中) エラー(異常終了):{e}') # MySQL接続情報を作成する my_cnf_file_content = f""" [client] user={environment.DB_USERNAME} password={environment.DB_PASSWORD} host={environment.DB_HOST} """ # my.cnfファイルのパス my_cnf_path = os.path.join('my.cnf') # my.cnfファイルを生成する with open(my_cnf_path, 'w') as f: f.write(textwrap.dedent(my_cnf_file_content)[1:-1]) dt_now = datetime.datetime.now() converted_value = dt_now.strftime('%Y%m%d%H%M%S') dump_file_name = f'backup_rds_src05_{converted_value}.gz' s3_file_path = f's3://{environment.JSKULT_BACKUP_BUCKET}/{environment.DUMP_BACKUP_FOLDER}/{dt_now.year}/{dt_now.strftime("%m")}/{dt_now.strftime("%d")}/{dump_file_name}' # mysqldumpコマンドを実行し、dumpを取得する command = [ 'mysqldump', f'--defaults-file={my_cnf_path}', '-P', f"{environment.DB_PORT}", '--no-tablespaces', '--skip-column-statistics', '--single-transaction', '--set-gtid-purged=OFF', environment.DB_SCHEMA ] mysqldump_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # gzipコマンドを実行してdump結果を圧縮する gzip_process = subprocess.Popen(['gzip', '-c'], stdin=mysqldump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # aws s3 cpコマンドを実行してアップロードする s3_cp_process = subprocess.Popen(['aws', 's3', 'cp', '-', s3_file_path], stdin=gzip_process.stdout, stderr=subprocess.PIPE) # mysqldumpの標準出力をgzipに接続したため、標準出力をクローズする mysqldump_process.stdout.close() # gzipの標準出力をaws s3 cpに接続したため、標準出力をクローズする gzip_process.stdout.close() # パイプラインを実行し、エラーハンドリング _, error = mysqldump_process.communicate() if mysqldump_process.returncode != 0: raise BatchOperationException(f'`mysqldump`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') _, error = gzip_process.communicate() if gzip_process.returncode != 0: raise BatchOperationException(f'`gzip`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') _, error = s3_cp_process.communicate() if s3_cp_process.returncode != 0: raise BatchOperationException(f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}') # dump処理状態区分を正常終了に更新 try: update_dump_status_kbn_complete() except BatchOperationException as e: raise BatchOperationException(f'dump処理状態区分更新(処理中→正常終了) エラー(異常終了):{e}') logger.info('日次バッチ処理前DBダンプ取得:終了(正常終了)') logger.info(f'出力ファイルパス: {s3_file_path}') return constants.BATCH_EXIT_CODE_SUCCESS except Exception as e: # dump処理状態区分をエラーに更新 try: update_dump_status_kbn_error() except BatchOperationException as e: logger.exception(f'dump処理状態区分更新(処理中→エラー) エラー(異常終了):{e}') return constants.BATCH_EXIT_CODE_SUCCESS logger.exception(f'日次バッチ処理前DBダンプ取得中に想定外のエラーが発生しました :{e}') return constants.BATCH_EXIT_CODE_SUCCESS