feat: ローカルのストレージを経由せず、パイプラインのみで実行。エラーハンドリングは実装できていない。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-07-11 16:31:14 +09:00
parent 2682824863
commit dbfe62e30b

View File

@ -1,17 +1,17 @@
"""日次バッチ処理前DBダンプ取得"""
import datetime
import textwrap
import subprocess
import os
from src.system_var import environment
import subprocess
import textwrap
from src.batch.batch_functions import (get_batch_statuses,
update_dump_status_kbn_in_complete,
update_dump_status_kbn_in_error,
update_dump_status_kbn_in_processing)
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
from src.batch.batch_functions import (
get_batch_statuses, update_dump_status_kbn_in_processing,
update_dump_status_kbn_in_complete, update_dump_status_kbn_in_error)
import gzip
from src.system_var import constants, environment
logger = get_logger('日次バッチ処理前DBダンプ取得')
@ -23,7 +23,7 @@ def exec():
# 日次バッチ処置中フラグ、dump処理状態区分を取得
batch_processing_flag, dump_status_kbn = get_batch_statuses()
except BatchOperationException as e:
logger.exception(f'日次ジョブ取得エラー(異常終了)\n{e}')
logger.exception(f'日次ジョブ取得エラー(異常終了):{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 日次バッチ処理中の場合、処理は行わない
@ -40,7 +40,7 @@ def exec():
try:
update_dump_status_kbn_in_processing()
except BatchOperationException as e:
logger.exception(f'dump処理状態区分更新(未処理→処理中) エラー(異常終了){e}')
logger.exception(f'dump処理状態区分更新(未処理→処理中) エラー(異常終了):{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# MySQL接続情報を作成する
@ -51,7 +51,7 @@ def exec():
host={environment.DB_HOST}
"""
# my.cnfファイルのパス
my_cnf_path = os.path.join('.', 'my.cnf')
my_cnf_path = os.path.join('my.cnf')
# my.cnfファイルを生成する
with open(my_cnf_path, 'w') as f:
@ -62,39 +62,37 @@ def exec():
file_name = f'backup_rds_src05_{converted_value}.gz'
s3_file_name = f's3://{environment.JSKULT_BACKUP_BUCKET}/{environment.DUMP_BACKUP_FOLDER}/{dt_now.year}/{dt_now.strftime("%m")}/{dt_now.strftime("%d")}/{file_name}'
mysqldump_cmd = [
'mysqldump',
'--user={mysql_user}'.format(mysql_user=environment.DB_USERNAME),
'--password={db_pw}'.format(db_pw=environment.DB_PASSWORD),
'--host={db_host}'.format(db_host=environment.DB_HOST),
'--port={db_port}'.format(db_port=environment.DB_PORT),
'--no-tablespaces',
'--skip-column-statistics',
'--single-transaction',
'--set-gtid-purged=OFF',
'{db_name}'.format(db_name=environment.DB_SCHEMA)
# 'mysqldump --login-path=dwhadmin --no-tablespaces --skip-column-statistics --single-transaction --set-gtid-purged=OFF dwh > /data/mountdwh/backup/%s/before/'
# mysqldumpコマンドを実行する
command = [
'mysqldump',
f'--defaults-file={my_cnf_path}',
'-P',
f"{environment.DB_PORT}",
'--no-tablespaces',
'--skip-column-statistics',
'--single-transaction',
'--set-gtid-purged=OFF',
environment.DB_SCHEMA
# 'src05'
]
# mysqldumpコマンドを実行してデータを標準出力に出力
mysqldump_output = subprocess.check_output(mysqldump_cmd)
# gzipで圧縮
compressed_data = gzip.compress(mysqldump_output)
# AWS CLIを使ってS3にアップロード
aws_cli_cmd = [
'aws',
's3',
'cp',
'-',
s3_file_name
]
subprocess.run(aws_cli_cmd, input=compressed_data)
mysqldump_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# gzipコマンドを実行してdump結果を圧縮する
gzip_process = subprocess.Popen(['gzip'], stdin=mysqldump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# aws s3 cpコマンドを実行してアップロードする
s3_cp_process = subprocess.Popen(['aws', 's3', 'cp', '-', s3_file_name], stdin=gzip_process.stdout, stderr=subprocess.PIPE)
# 出力を取得する
mysqldump_process.stdout.close()
gzip_process.stdout.close()
_, error = s3_cp_process.communicate()
if s3_cp_process.returncode != 0:
print("Error: ", error.decode("utf-8"))
raise Exception('Error')
# dump処理状態区分を正常終了に更新
try:
update_dump_status_kbn_in_complete()
except BatchOperationException as e:
logger.exception(f'dump処理状態区分更新(処理中→正常終了) エラー(異常終了)\n{e}')
logger.exception(f'dump処理状態区分更新(処理中→正常終了) エラー(異常終了):{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 正常終了を保守ユーザーに通知
@ -106,7 +104,7 @@ def exec():
try:
update_dump_status_kbn_in_error()
except BatchOperationException as e:
logger.exception(f'dump処理状態区分更新(処理中→エラー) エラー(異常終了)\n{e}')
logger.exception(f'dump処理状態区分更新(処理中→エラー) エラー(異常終了):{e}')
raise constants.BATCH_EXIT_CODE_SUCCESS
logger.exception(f'日次バッチ処理前DBダンプ取得中に想定外のエラーが発生しました \n{e}')
logger.exception(f'日次バッチ処理前DBダンプ取得中に想定外のエラーが発生しました :{e}')
raise e