diff --git a/ecs/jskult-batch/.env.example b/ecs/jskult-batch/.env.example index 500f843d..d0bf48d3 100644 --- a/ecs/jskult-batch/.env.example +++ b/ecs/jskult-batch/.env.example @@ -18,6 +18,8 @@ VJSK_DATA_BUCKET=************* JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME=jskult_wholesaler_stock_input_day_list.txt JSKULT_CONFIG_CONVERT_FOLDER=jskult/convert JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME=ultmarc_hex_convert_config.json +TRANSFER_RESULT_FOLDER=transfer_result +TRANSFER_RESULT_FILE_NAME=transfer_result.json # 連携データ抽出期間 SALES_LAUNDERING_EXTRACT_DATE_PERIOD=0 # 洗替対象テーブル名 diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index 66032e1c..6e5755be 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -1,11 +1,7 @@ -import gzip -import os import os.path as path -import shutil import tempfile import boto3 - from src.system_var import environment @@ -77,7 +73,9 @@ class ConfigBucket(S3Bucket): temporary_dir = tempfile.mkdtemp() temporary_file_path = path.join( temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) - wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' + wholesaler_stock_input_day_list_key = \ + f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' + with open(temporary_file_path, mode='wb') as f: self._s3_client.download_file( self._bucket_name, wholesaler_stock_input_day_list_key, f) @@ -105,6 +103,23 @@ class JskBackupBucket(JskUltBackupBucket): _folder = environment.JSKULT_BACKUP_BUCKET +class JskTransferListBucket(JskUltBackupBucket): + _folder = environment.TRANSFER_RESULT_FOLDER + + def download_transfer_result_file(self, process_date_yyyymmdd: str): + file_name = environment.TRANSFER_RESULT_FILE_NAME + # 一時ファイルとして保存する + temporary_dir = tempfile.mkdtemp() + temporary_file_path = path.join( + temporary_dir, file_name) + holiday_list_key = f'{self._folder}/{process_date_yyyymmdd}/{file_name}' + with open(temporary_file_path, mode='wb') as f: + self._s3_client.download_file( + self._bucket_name, holiday_list_key, f) + f.seek(0) + return temporary_file_path + + class JskSendBucket(S3Bucket): _bucket_name = environment.JSK_IO_BUCKET _send_folder = environment.JSK_DATA_SEND_FOLDER diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index a7298153..ad311d42 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -1,16 +1,18 @@ import csv +import json import os.path as path import tempfile -from src.aws.s3 import S3Client, JskSendBucket +from src.aws.s3 import JskSendBucket, JskTransferListBucket from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.db.database import Database -from src.error.exceptions import BatchOperationException, MaxRunCountReachedException -from src.manager.jskult_batch_run_manager import JskultBatchRunManager -from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager -from src.manager.jskult_batch_status_manager import JskultBatchStatusManager -from src.system_var import environment +from src.error.exceptions import (BatchOperationException, + MaxRunCountReachedException) from src.logging.get_logger import get_logger +from src.manager.jskult_batch_run_manager import JskultBatchRunManager +from src.manager.jskult_batch_status_manager import JskultBatchStatusManager +from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager +from src.system_var import environment logger = get_logger('DCF削除新規マスタ作成') @@ -20,80 +22,92 @@ class DcfInstMergeIO(JskultBatchEntrypoint): super().__init__() def execute(self): - jskultBatchRunManager = JskultBatchRunManager( + jskult_hdke_tbl_manager = JskultHdkeTblManager() + jskult_batch_run_manager = JskultBatchRunManager( environment.BATCH_EXECUTION_ID) - jskultHdkeTblManager = JskultHdkeTblManager() + if not jskult_hdke_tbl_manager.can_run_process(): + logger.error( + '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() + return - # /transfer_result/yyyy/mm/dd/ - jskult_backuo_folder_name = f"""/transfer_result/{jskultHdkeTblManager.get_batch_statuses()[2]}""" + # 業務日付を取得 + _, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses() - receive_file_count = S3Client.list_objects( - environment.JSKULT_BACKUP_BUCKET, jskult_backuo_folder_name).count() + # 転送ファイル一覧を取得し、転送件数を取得 + try: + transfer_list_bucket = JskTransferListBucket() + transfer_list_file_path = transfer_list_bucket.download_transfer_result_file( + process_date) + except Exception as e: + logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() - jskultBatchStatusManager = JskultBatchStatusManager( + with open(transfer_list_file_path) as f: + transfer_list = json.load(f) + + # 実消化データ + アルトマークデータの転送件数を合算し、受信ファイル件数とする + receive_file_count = len( + transfer_list['jsk_transfer_list']) + len(transfer_list['ult_transfer_list']) + + jskult_batch_status_manager = JskultBatchStatusManager( environment.PROCESS_NAME, - # TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え - environment.POST_PROCESS, + 'post_process', environment.MAX_RUN_COUNT, receive_file_count ) - try: - if not jskultHdkeTblManager.can_run_process(): - logger.error( - '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') - return - jskultBatchStatusManager.set_process_status("start") + jskult_batch_status_manager.set_process_status("start") try: - if not jskultBatchStatusManager.can_run_post_process(): + if not jskult_batch_status_manager.can_run_post_process(): # 後続処理の起動条件を満たしていない場合 # 処理ステータスを「処理待」に設定 - jskultBatchStatusManager.set_process_status("waiting") + jskult_batch_status_manager.set_process_status("waiting") # バッチ実行管理テーブルに「retry」で登録 - jskultBatchRunManager.batch_retry() + jskult_batch_run_manager.batch_retry() return - except MaxRunCountReachedException as e: + except MaxRunCountReachedException: logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。') - jskultBatchStatusManager.set_process_status("doing") + jskult_batch_status_manager.set_process_status("doing") # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 - if jskultBatchStatusManager.is_done_ultmarc_import(): - + if jskult_batch_status_manager.is_done_ultmarc_import(): # COM_施設からDCF削除新規マスタに登録 (is_add_dcf_inst_merge, duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self) if is_add_dcf_inst_merge: - self._output_add_dcf_inst_merge_log( duplication_inst_records) - dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() + # CSV出力 + dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() file_path = self._make_csv_data( environment.DCF_INST_MERGE_SEND_FILE_NAME, dcf_inst_merge_all_records) # CSVをS3にアップロード - self._upload_dcf_inst_merge_csv_file( - file_path, environment.DCF_INST_MERGE_SEND_FILE_NAME) + file_path, process_date, environment.DCF_INST_MERGE_SEND_FILE_NAME) # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録 logger.info("DCF削除新規マスタ作成処理を正常終了します。") + jskult_batch_run_manager.batch_success() + jskult_batch_status_manager.set_process_status("done") - jskultBatchRunManager.batch_success() - jskultBatchStatusManager.set_process_status("done") + return except Exception as e: # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}') - - jskultBatchRunManager.batch_failed() - jskultBatchStatusManager.set_process_status("failed") + jskult_batch_run_manager.batch_failed() + jskult_batch_status_manager.set_process_status("failed") def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: try: @@ -121,7 +135,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): self._db.begin() self._db.to_jst() sql = """\ - SELECT + SELECT ci.DCF_DSF_INST_CD, ci.FORM_INST_NAME_KANJI, ci.DELETE_SCHE_REASON_CD, @@ -137,7 +151,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): ci.DELETE_DATA IS NULL AND ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() - AND + AND NOT EXISTS ( SELECT dim.DCF_INST_CD @@ -147,11 +161,11 @@ class DcfInstMergeIO(JskultBatchEntrypoint): dim.DCF_INST_CD = ci.DCF_DSF_INST_CD ) AND - + (ci.DCF_DSF_INST_CD EXISTS( SELECT mia.INST_CD - FROM + FROM src07.MST_INST_ASSN as mia WHERE mia.INST_CD = ci.DCF_DSF_INST_CD @@ -160,7 +174,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): OR ci.DCF_DSF_INST_CD EXISTS( SELECT ap.PRSB_INST_CD - FROM + FROM src07.ATC_PHARM AS ap WHERE ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD @@ -201,7 +215,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): insert_sql = f""" INSERT INTO src07.dcf_inst_merge ( - DCF_INST_CD, + DCF_INST_CD, DUP_OPP_CD, START_MONTH, INVALID_FLG, @@ -246,6 +260,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): ********************************************************** 合計 {len(duplication_inst_records)}件""" ) + return def _make_csv_data(csv_file_name: str, record_inst: list): @@ -268,18 +283,18 @@ class DcfInstMergeIO(JskultBatchEntrypoint): csv_data = [ '' if n is None else n for n in record_inst_value] writer.writerow(csv_data) + return csv_file_path - # CSVファイルをバックアップ - def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): - # S3バケットにファイルを移動 + def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, process_date: str, csv_file_path: str): jsk_send_bucket = JskSendBucket() - # 処理日を取得 - _, _, syor_date = JskultHdkeTblManager.get_batch_statuses() - + # S3バケットにファイルをアップロード jsk_send_bucket.upload_dcf_inst_merge_csv_file( csv_file_name, csv_file_path) + + # CSVファイルをバックアップ jsk_send_bucket.backup_dcf_inst_merge_csv_file( - csv_file_name, syor_date) + csv_file_name, process_date) + return diff --git a/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py b/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py index 4a804ef3..f6c8a9f0 100644 --- a/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py +++ b/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py @@ -113,7 +113,7 @@ class JskultHdkeTblManager: finally: self._db.disconnect() # 日次バッチ処理中ではない場合、後続の処理は行わない - if batch_processing_flag != constants.BATCH_ACTF_BATCH_START: + if batch_processing_flag != constants.BATCH_ACTF_BATCH_START: return False # dump取得が正常終了していない場合、後続の処理は行わない if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE: diff --git a/ecs/jskult-batch/src/system_var/constants.py b/ecs/jskult-batch/src/system_var/constants.py index 8a0ccbb3..10cd7fb8 100644 --- a/ecs/jskult-batch/src/system_var/constants.py +++ b/ecs/jskult-batch/src/system_var/constants.py @@ -4,7 +4,7 @@ BATCH_EXIT_CODE_SUCCESS = 0 # バッチ処理中フラグ:未処理 BATCH_ACTF_BATCH_UNPROCESSED = '0' # バッチ処理中フラグ:処理中 -BATCH_ACTF_BATCH_IN_PROCESSING = '1' +BATCH_ACTF_BATCH_START = '1' # dump取得状態区分:未処理 DUMP_STATUS_KBN_UNPROCESSED = '0' # dump取得状態区分:dump取得正常終了 diff --git a/ecs/jskult-batch/src/system_var/environment.py b/ecs/jskult-batch/src/system_var/environment.py index 91a060e6..e4a230c3 100644 --- a/ecs/jskult-batch/src/system_var/environment.py +++ b/ecs/jskult-batch/src/system_var/environment.py @@ -8,9 +8,9 @@ DB_PASSWORD = os.environ['DB_PASSWORD'] DB_SCHEMA = os.environ['DB_SCHEMA'] JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET'] BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID'] -POST_PROCESS = os.environ["POST_PROCESS"] -MAX_RUN_COUNT = os.environ["MAX_RUN_COUNT"] -RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] +MAX_RUN_COUNT = os.environ['MAX_RUN_COUNT'] +TRANSFER_RESULT_FOLDER = os.environ['TRANSFER_RESULT_FOLDER'] +TRANSFER_RESULT_FILE_NAME = os.environ['TRANSFER_RESULT_FILE_NAME'] DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME'] JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']