diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index 6e5755be..b261f871 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -1,5 +1,9 @@ import os.path as path import tempfile +import shutil +import os +import os.path as path +import gzip import boto3 from src.system_var import environment @@ -138,3 +142,71 @@ class JskSendBucket(S3Bucket): backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key) + +class JskIOBucket(S3Bucket): + _bucket_name = environment.JSK_IO_BUCKET + _recv_folder = environment.JSK_RECEIVE_FOLDER + + _s3_file_list = None + + def get_file_list(self): + self._s3_file_list = self._s3_client.list_objects( + self._bucket_name, self._recv_folder) + return self._s3_file_list + + def download_data_file(self, data_filename: str): + temporary_dir = tempfile.mkdtemp() + temporary_file_path = path.join( + temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') + with open(temporary_file_path, mode='wb') as f: + self._s3_client.download_file(self._bucket_name, data_filename, f) + f.seek(0) + return temporary_file_path + + def unzip_data_file(self, filename: str): + temp_dir = os.path.dirname(filename) + decompress_filename = os.path.basename(filename).replace('.gz', '') + decompress_file_path = os.path.join(temp_dir, decompress_filename) + with gzip.open(filename, 'rb') as gz: + with open(decompress_file_path, 'wb') as decompressed_file: + shutil.copyfileobj(gz, decompressed_file) + + ret = [decompress_file_path] + return ret + + def transfer_file_to_import(self, target_file: dict): + data_import_bucket = DataImportBucket() + transfer_from_file_path = target_file.get("filename") + transfer_to_filename = transfer_from_file_path.replace( + f"{self._recv_folder}/", "") + data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}' + self._s3_client.copy(self._bucket_name, transfer_from_file_path, + data_import_bucket._bucket_name, data_import_key) + + def backup_file(self, target_file: dict, datetime_key: str): + jsk_backup_bucket = JskBackupBucket() + backup_from_file_path = target_file.get("filename") + backup_to_filename = backup_from_file_path.replace( + f"{self._recv_folder}/", "") + backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}' + self._s3_client.copy(self._bucket_name, backup_from_file_path, + jsk_backup_bucket._bucket_name, backup_key) + + def delete_file(self, target_file: dict): + delete_path = target_file.get("filename") + self._s3_client.delete_file( + self._bucket_name, delete_path) +class JskultArchiveBucket(S3Bucket): + _bucket_name = environment.JSKULT_ARCHIVE_BUCKET + + def upload_archive_zip_file(self, archive_zip: str, archive_zip_path: str, send_folder: str): + # S3バケットにファイルを移動 + archive_zip_name = f'{send_folder}/{archive_zip}' + s3_client = S3Client() + s3_client.upload_file( + archive_zip_path, self._bucket_name, archive_zip_name) + return f"{self._bucket_name}/{archive_zip_name}" + +class DataImportBucket(S3Bucket): + _bucket_name = environment.DATA_IMPORT_BUCKET + _folder = environment.DATA_IMPORT_FOLDER \ No newline at end of file diff --git a/ecs/jskult-batch/src/batch/archive_jsk_data.py b/ecs/jskult-batch/src/batch/archive_jsk_data.py new file mode 100644 index 00000000..34b5397b --- /dev/null +++ b/ecs/jskult-batch/src/batch/archive_jsk_data.py @@ -0,0 +1,75 @@ +import csv +import os.path as path +import tempfile +import zipfile +from datetime import timedelta + +from src.aws.s3 import JskultArchiveBucket +from src.manager.jskult_archive_manager import JskultArchiveManager +from src.logging.get_logger import get_logger + +logger = get_logger("実消化_過去データアーカイブ処理") + + +def exec(): + """実消化_過去データアーカイブ処理""" + try: + logger.info("処理開始:実消化_過去データアーカイブ処理") + jskult_archive_manager = JskultArchiveManager() + # アーカイブ管理テーブルから対象テーブル、条件項目、条件年月、実行間隔(月)、前回条件年月、保存先を取得 + jskult_archive_manage_data_list = jskult_archive_manager.get_archive_manage() + + # 取得したレコード分繰り返す + for jskult_archive_manage_data in jskult_archive_manage_data_list: + # 対象テーブルで条件項目が条件年月以前のデータを取得 + archive_data = jskult_archive_manager.get_archive_data( + jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"]) + # 取得データが0件の場合、スキップする + if not archive_data: + logger.info( + f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") + continue + + # 一時フォルダ作成 + with tempfile.TemporaryDirectory() as temporary_dir: + # 取得したデータをCSVに出力 + day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta( + days=1) + file_name = f'{jskult_archive_manage_data["target_table"]}_{day_after_prev_filter_date.strftime('%Y%m%d')}_{jskult_archive_manage_data["filter_date"].strftime('%Y%m%d')}' + csv_file_path = path.join(temporary_dir, f"{file_name}.csv") + headers = archive_data[0].keys() + with open(csv_file_path, 'w', newline='') as file: + writer = csv.DictWriter( + file, fieldnames=headers, quoting=csv.QUOTE_ALL) + writer.writeheader() + writer.writerows(archive_data) + logger.info(f"CSVファイル作成に成功しました。{file_name}.csv") + + # 作成したCSVをzip形式に圧縮 + zip_file_path = path.join(temporary_dir, f"{file_name}.zip") + with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + zipf.write(csv_file_path) + logger.info(f"zip形式への圧縮に成功しました。{file_name}.zip") + + # 圧縮したCSVを保存先へアップロード + archive_bucket = JskultArchiveBucket() + upload_file_path = archive_bucket.upload_archive_zip_file( + f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"]) + logger.info(f"{upload_file_path}へのアップロードに成功しました。") + + # アーカイブしたデータをDBから削除 + jskult_archive_manager.delete_archive_data( + jskult_archive_manage_data["target_table"], + jskult_archive_manage_data["filter_column"], + jskult_archive_manage_data["filter_date"]) + logger.info( + f"アーカイブしたデータのDBから削除に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") + + # 次回に向けてアーカイブ管理テーブルを更新する + jskult_archive_manager.update_archive_manage( + jskult_archive_manage_data["target_table"]) + logger.info( + f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}") + logger.info("処理終了:実消化_過去データアーカイブ処理") + except Exception as e: + logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}") diff --git a/ecs/jskult-batch/src/batch/environment/update_business_day_environment.py b/ecs/jskult-batch/src/batch/environment/update_business_day_environment.py index ff0ca12e..1056f83f 100644 --- a/ecs/jskult-batch/src/batch/environment/update_business_day_environment.py +++ b/ecs/jskult-batch/src/batch/environment/update_business_day_environment.py @@ -3,7 +3,7 @@ from src.batch.environment.jskult_batch_environment import \ from src.system_var import environment -class TrnResultDataBioLotEnvironment(JskultBatchEnvironment): +class UpdateBusinessDayEnvironment(JskultBatchEnvironment): """実消化&アルトマークのバッチ処理で使用する環境変数を管理するクラス""" def __init__(self): @@ -14,6 +14,9 @@ class TrnResultDataBioLotEnvironment(JskultBatchEnvironment): self.PROCESS_NAME = environment.PROCESS_NAME self.TRANSFER_RESULT_FOLDER = environment.TRANSFER_RESULT_FOLDER self.TRANSFER_RESULT_FILE_NAME = environment.TRANSFER_RESULT_FILE_NAME + self.JSK_RECEIVE_FOLDER = environment.JSK_RECEIVE_FOLDER + self.DATA_IMPORT_BUCKET = environment.DATA_IMPORT_BUCKET + self.DATA_IMPORT_FOLDER = environment.DATA_IMPORT_FOLDER def validate(self): """ @@ -23,13 +26,18 @@ class TrnResultDataBioLotEnvironment(JskultBatchEnvironment): Raises: EnvironmentVariableNotSetException: 環境変数の設定ミス """ - super()._assert_variable_not_empty(self.JSKULT_BACKUP_BUCKET, 'JSKULT_BACKUP_BUCKET') - super()._assert_variable_not_empty(self.BATCH_MANAGE_DYNAMODB_TABLE_NAME, 'BATCH_MANAGE_DYNAMODB_TABLE_NAME') + super()._assert_variable_not_empty( + self.JSKULT_BACKUP_BUCKET, 'JSKULT_BACKUP_BUCKET') + super()._assert_variable_not_empty( + self.BATCH_MANAGE_DYNAMODB_TABLE_NAME, 'BATCH_MANAGE_DYNAMODB_TABLE_NAME') super()._assert_variable_not_empty(self.BATCH_EXECUTION_ID, 'BATCH_EXECUTION_ID') super()._assert_variable_is_int(self.MAX_RUN_COUNT, 'MAX_RUN_COUNT') # MAX_RUN_COUNTは数値として扱うため、検査後に変換 self.MAX_RUN_COUNT = int(self.MAX_RUN_COUNT) super()._assert_variable_not_empty(self.PROCESS_NAME, 'PROCESS_NAME') - super()._assert_variable_not_empty(self.TRANSFER_RESULT_FOLDER, 'TRANSFER_RESULT_FOLDER') - super()._assert_variable_not_empty(self.TRANSFER_RESULT_FILE_NAME, 'TRANSFER_RESULT_FILE_NAME') - + super()._assert_variable_not_empty( + self.TRANSFER_RESULT_FOLDER, 'TRANSFER_RESULT_FOLDER') + super()._assert_variable_not_empty( + self.TRANSFER_RESULT_FILE_NAME, 'TRANSFER_RESULT_FILE_NAME') + super()._assert_variable_not_empty(self.DATA_IMPORT_BUCKET, 'DATA_IMPORT_BUCKET') + super()._assert_variable_not_empty(self.DATA_IMPORT_FOLDER, 'DATA_IMPORT_FOLDER') diff --git a/ecs/jskult-batch/src/batch/update_business_day.py b/ecs/jskult-batch/src/batch/update_business_day.py index c689cc48..deafcde2 100644 --- a/ecs/jskult-batch/src/batch/update_business_day.py +++ b/ecs/jskult-batch/src/batch/update_business_day.py @@ -1,20 +1,23 @@ +import json +from datetime import datetime from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint -from src.aws.s3 import JskTransferListBucket -from src.batch.environment.trn_result_data_bio_lot_environment import \ - UpdateBusinessDayEnvironment -from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint +from src.aws.s3 import (JskTransferListBucket, JskIOBucket) +from src.batch.environment.update_business_day_environment import \ + UpdateBusinessDayEnvironment +from src.batch import archive_jsk_data from src.manager.jskult_batch_run_manager import JskultBatchRunManager from src.manager.jskult_batch_status_manager import JskultBatchStatusManager from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager -from src.db.database import Database -from src.error.exceptions import (BatchOperationException, +from src.manager.jskult_archive_manager import JskultArchiveManager +from src.error.exceptions import (NotReadyException, EnvironmentVariableNotSetException, - MaxRunCountReachedException) + ) from src.logging.get_logger import get_logger from src.system_var import constants logger = get_logger('日付テーブル更新') + class UpdateBusinessDay(JskultBatchEntrypoint): def __init__(self): super().__init__() @@ -28,5 +31,105 @@ class UpdateBusinessDay(JskultBatchEntrypoint): return def execute(self): - # TODO: ここで日付更新処理を実行する - pass + """日付テーブル更新""" + logger.info('I-1 処理開始: 実消化&アルトマーク_日付テーブル更新') + + jskult_hdke_tbl_manager = JskultHdkeTblManager() + jskult_archive_manager = JskultArchiveManager( + ) + + jskult_batch_status_manager = JskultBatchStatusManager( + self.environment.PROCESS_NAME, + # TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え + 'update_business_day', + self.environment.MAX_RUN_COUNT, + 0 + ) + + jskult_batch_run_manager = JskultBatchRunManager( + self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME, + self.environment.BATCH_EXECUTION_ID) + + jskult_batch_status_manager.set_process_status( + constants.PROCESS_STATUS_START) + + if not jskult_hdke_tbl_manager.can_run_process(): + logger.error( + '日次バッチ処理中でない、もしくはdump取得が正常終了していないため、日次バッチ処理を異常終了します。') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() + raise NotReadyException() + + if not jskult_batch_status_manager.can_run_business_day_update(): + # 後続処理の起動条件を満たしていない場合 + # 処理ステータスを「処理待」に設定 + jskult_batch_status_manager.set_process_status( + constants.PROCESS_STATUS_WAITING) + + # バッチ実行管理テーブルに「retry」で登録 + jskult_batch_run_manager.batch_retry() + + logger.info('処理待:実消化&アルトマーク_日付テーブル更新') + + return + + _, _, archive_date, _, _, _ = jskult_archive_manager.get_archive_manage() + + _, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses() + + try: + if archive_date == process_date: + logger.info('[NOTICE]実消化データアーカイブ取得処理を実行します。') + # 処理ステータスを「処理中」に設定 + jskult_batch_status_manager.set_process_status( + constants.PROCESS_STATUS_DOING) + + jskult_archive_manager.get_archive_data() + archive_jsk_data.exec() + + dt = datetime.strptime(process_date, "%Y/%m/%d") + # 日付テーブルの処理年月日が月曜日の場合 + if dt.weekday() == 0: + if not JskultBatchStatusManager.is_done_ultmarc_import(): + logger.info('[NOTICE]アルトマーク取込稼働日でしたが、アルトマーク取込が行われませんでした。') + + # 転送ファイル一覧を取得 + try: + transfer_list_bucket = JskTransferListBucket() + transfer_list_file_path = transfer_list_bucket.download_transfer_result_file( + process_date) + except Exception as e: + logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() + + with open(transfer_list_file_path) as f: + transfer_list = json.load(f) + + try: + jsk_io_bucket = JskIOBucket() + jsk_receive_file_list: str = jsk_io_bucket.get_file_list() + except Exception as e: + logger.exception(f'実消化データリスト取得に失敗しました。{e}') + return constants.BATCH_EXIT_CODE_SUCCESS + + if transfer_list is not None: + # TODO jsk_receive_file_listと比較して結果不足ファイルがあった場合(I-5)ログ出力 + + # TODO jsk_receive_file_listと比較して受領予定にないファイルがあった場合(I-6)ログ出力 + pass + + JskultHdkeTblManager.update_batch_process_complete() + # 処理ステータスを「処理済」に設定 + jskult_batch_status_manager.set_process_status( + constants.PROCESS_STATUS_DONE) + + JskultBatchRunManager.batch_success() + + logger.info('[NOTICE]処理終了: 実消化&アルトマーク_日付テーブル更新') + except Exception as e: + # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 + jskult_batch_run_manager.batch_failed() + jskult_batch_status_manager.set_process_status( + constants.PROCESS_STATUS_ERROR) + logger.exception(f'異常終了:実消化&アルトマーク_日付テーブル更新。{e}') diff --git a/ecs/jskult-batch/src/error/exceptions.py b/ecs/jskult-batch/src/error/exceptions.py index a6db7ed7..b4673b28 100644 --- a/ecs/jskult-batch/src/error/exceptions.py +++ b/ecs/jskult-batch/src/error/exceptions.py @@ -16,3 +16,6 @@ class MaxRunCountReachedException(MeDaCaException): class EnvironmentVariableNotSetException(MeDaCaException): pass + +class NotReadyException(MeDaCaException): + pass diff --git a/ecs/jskult-batch/src/manager/jskult_archive_manager.py b/ecs/jskult-batch/src/manager/jskult_archive_manager.py new file mode 100644 index 00000000..9bb25341 --- /dev/null +++ b/ecs/jskult-batch/src/manager/jskult_archive_manager.py @@ -0,0 +1,112 @@ +from src.db.database import Database +from src.logging.get_logger import get_logger + +logger = get_logger("アーカイブ管理テーブル操作") + + +class JskultArchiveManager: + _db: Database = None + + def __init__(self): + self._db = Database.get_instance() + + def get_archive_manage(self): + """対象テーブル、対象項目、対象年月、実行間隔(月)、前回対象年月、保存先を取得""" + try: + logger.info("処理開始:get_archive_manage") + sql = """ + select + target_table + , filter_column + , filter_date + , run_interval_months + , prev_filter_date + , archive_storage + from + internal07.jskult_archive_manage; + """ + self._db.connect() + jskult_archive_manage_data = self._db.execute_select(sql) + logger.info("処理終了:get_archive_manage") + return jskult_archive_manage_data + except Exception as e: + logger.info("異常終了:get_archive_manage") + raise + finally: + self._db.disconnect() + + def get_archive_data(self,target_table:str, filter_column:str, filter_date:str): + """アーカイブするデータを取得""" + try: + logger.info("処理開始:get_archive_data") + sql = f""" + select + * + from + src07.{target_table} + where + str_to_date({filter_column},'%Y%m%d') <= :filter_date; + """ + self._db.connect() + parameter_dict = {'filter_date' : filter_date} + target_table_data = self._db.execute_select(sql, parameter_dict) + logger.info("処理終了:get_archive_data") + return target_table_data + except Exception as e: + logger.info("異常終了:get_archive_data") + raise + finally: + self._db.disconnect() + + def delete_archive_data(self, target_table:str, filter_column:str, filter_date:str): + """アーカイブしたデータを削除""" + try: + logger.info("処理開始:delete_archive_data") + sql = f""" + delete from + src07.{target_table} + where + str_to_date({filter_column},'%Y%m%d') <= :filter_date; + """ + self._db.connect() + self._db.begin() + parameter_dict = {'filter_date' : filter_date} + self._db.execute(sql, parameter_dict) + self._db.commit() + logger.info("処理終了:delete_archive_data") + return + except Exception as e: + self._db.rollback() + logger.info("異常終了:delete_archive_data") + raise + finally: + self._db.disconnect() + + def update_archive_manage(self, target_table:str): + """アーカイブ管理テーブルの指定した対象テーブルのレコードを更新する""" + try: + logger.info("処理開始:update_archive_manage") + sql = f""" + update internal07.jskult_archive_manage + set + prev_filter_date = filter_date + , filter_date = LAST_DAY( + DATE_ADD(filter_date, INTERVAL run_interval_months MONTH) + ) + , upd_user = CURRENT_USER() + , upd_date = NOW() + where + target_table = '{target_table}'; + """ + self._db.connect() + self._db.begin() + self._db.execute(sql) + self._db.commit() + logger.info("処理終了:update_archive_manage") + return + except Exception as e: + self._db.rollback() + logger.info("異常終了:update_archive_manage") + raise + finally: + self._db.disconnect() diff --git a/ecs/jskult-batch/src/system_var/environment.py b/ecs/jskult-batch/src/system_var/environment.py index 492c9661..afe7c8d9 100644 --- a/ecs/jskult-batch/src/system_var/environment.py +++ b/ecs/jskult-batch/src/system_var/environment.py @@ -22,8 +22,12 @@ JSKULT_BACKUP_BUCKET = os.environ.get('JSKULT_BACKUP_BUCKET', None) JSK_IO_BUCKET = os.environ.get('JSK_IO_BUCKET', None) JSK_BACKUP_FOLDER = os.environ.get('JSK_BACKUP_FOLDER', None) JSK_DATA_SEND_FOLDER = os.environ.get('JSK_DATA_SEND_FOLDER', None) +JSK_RECEIVE_FOLDER = os.environ.get('JSK_RECEIVE_FOLDER', None) BATCH_MANAGE_DYNAMODB_TABLE_NAME = os.environ.get( 'BATCH_MANAGE_DYNAMODB_TABLE_NAME', None) +DATA_IMPORT_BUCKET = os.environ.get('DATA_IMPORT_BUCKET', None) +DATA_IMPORT_FOLDER = os.environ.get('DATA_IMPORT_FOLDER', None) +JSKULT_ARCHIVE_BUCKET = os.environ.get('JSKULT_ARCHIVE_BUCKET', None) # 初期値がある環境変数