From d243768e5858400f6341bdd1d6730f23971ed682 Mon Sep 17 00:00:00 2001 From: "mori.k" Date: Wed, 28 May 2025 12:09:43 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=AC=E3=83=93=E3=83=A5=E3=83=BC=E6=8C=87?= =?UTF-8?q?=E6=91=98=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/batch/common/batch_context.py | 48 --- .../src/batch/dcf_inst_merge_io.py | 403 +++++++++--------- .../src/system_var/environment.py | 4 +- 3 files changed, 202 insertions(+), 253 deletions(-) delete mode 100644 ecs/jskult-batch/src/batch/common/batch_context.py diff --git a/ecs/jskult-batch/src/batch/common/batch_context.py b/ecs/jskult-batch/src/batch/common/batch_context.py deleted file mode 100644 index b3fc4967..00000000 --- a/ecs/jskult-batch/src/batch/common/batch_context.py +++ /dev/null @@ -1,48 +0,0 @@ -class BatchContext: - __instance = None - __syor_date: str # 処理日(yyyy/mm/dd形式) - __is_not_business_day: bool # 日次バッチ起動日フラグ - __is_ultmarc_imported: bool # アルトマーク取込実施済フラグ - __is_vjsk_stock_import_day: bool # 卸在庫データ取込対象フラグ - - def __init__(self) -> None: - self.__is_not_business_day = False - self.__is_ultmarc_imported = False - - @classmethod - def get_instance(cls): - if cls.__instance is None: - cls.__instance = cls() - return cls.__instance - - @property - def syor_date(self): - return self.__syor_date - - @syor_date.setter - def syor_date(self, syor_date_str: str): - self.__syor_date = syor_date_str - - @property - def is_not_business_day(self): - return self.__is_not_business_day - - @is_not_business_day.setter - def is_not_business_day(self, flag: bool): - self.__is_not_business_day = flag - - @property - def is_ultmarc_imported(self): - return self.__is_ultmarc_imported - - @is_ultmarc_imported.setter - def is_ultmarc_imported(self, flag: bool): - self.__is_ultmarc_imported = flag - - @property - def is_vjsk_stock_import_day(self): - return self.__is_vjsk_stock_import_day - - @is_vjsk_stock_import_day.setter - def is_vjsk_stock_import_day(self, flag: bool): - self.__is_vjsk_stock_import_day = flag diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index e3e7ed12..a7298153 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -2,8 +2,7 @@ import csv import os.path as path import tempfile -from src.aws.s3 import JskSendBucket -from src.batch.common.batch_context import BatchContext +from src.aws.s3 import S3Client, JskSendBucket from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.db.database import Database from src.error.exceptions import BatchOperationException, MaxRunCountReachedException @@ -23,16 +22,24 @@ class DcfInstMergeIO(JskultBatchEntrypoint): def execute(self): jskultBatchRunManager = JskultBatchRunManager( environment.BATCH_EXECUTION_ID) + jskultHdkeTblManager = JskultHdkeTblManager() + + # /transfer_result/yyyy/mm/dd/ + jskult_backuo_folder_name = f"""/transfer_result/{jskultHdkeTblManager.get_batch_statuses()[2]}""" + + receive_file_count = S3Client.list_objects( + environment.JSKULT_BACKUP_BUCKET, jskult_backuo_folder_name).count() + jskultBatchStatusManager = JskultBatchStatusManager( environment.PROCESS_NAME, + + # TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え environment.POST_PROCESS, - environment.MAX_RUN_COUNT_FLG, - environment.RECEIVE_FILE_COUNT + environment.MAX_RUN_COUNT, + receive_file_count ) try: - jskultHdkeTblManager = JskultHdkeTblManager() - if not jskultHdkeTblManager.can_run_process(): logger.error( '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') @@ -41,7 +48,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): jskultBatchStatusManager.set_process_status("start") try: if not jskultBatchStatusManager.can_run_post_process(): - # リトライ判断された場合 + # 後続処理の起動条件を満たしていない場合 # 処理ステータスを「処理待」に設定 jskultBatchStatusManager.set_process_status("waiting") @@ -57,21 +64,23 @@ class DcfInstMergeIO(JskultBatchEntrypoint): # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 if jskultBatchStatusManager.is_done_ultmarc_import(): - # + # COM_施設からDCF削除新規マスタに登録 (is_add_dcf_inst_merge, - duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) + duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self) if is_add_dcf_inst_merge: - # COM_施設からDCF削除新規マスタに登録 - _output_add_dcf_inst_merge_log(duplication_inst_records) - dcf_inst_merge_all_records = _select_dcf_inst_merge_all() + self._output_add_dcf_inst_merge_log( + duplication_inst_records) + dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() # CSV出力 - file_path = _make_csv_data( - dcf_inst_merge_all_records, environment.CSV_FILE_NAME) + file_path = self._make_csv_data( + environment.DCF_INST_MERGE_SEND_FILE_NAME, + dcf_inst_merge_all_records) # CSVをS3にアップロード - _upload_dcf_inst_merge_csv_file( - file_path, environment.CSV_FILE_NAME) + + self._upload_dcf_inst_merge_csv_file( + file_path, environment.DCF_INST_MERGE_SEND_FILE_NAME) # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録 logger.info("DCF削除新規マスタ作成処理を正常終了します。") @@ -79,210 +88,198 @@ class DcfInstMergeIO(JskultBatchEntrypoint): jskultBatchRunManager.batch_success() jskultBatchStatusManager.set_process_status("done") - except: + except Exception as e: # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 - logger.error("エラーが発生したため、DCF削除新規マスタ作成処理を終了します。") + logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}') + jskultBatchRunManager.batch_failed() jskultBatchStatusManager.set_process_status("failed") - def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: - try: - self._db = Database.get_instance() - self._db.connect() - self._db.begin() - self._db.to_jst() - sql = """\ - SELECT - * - FROM - src07.dcf_inst_merge - """ - dcf_inst_merge_all_records = self._db.execute_select(sql) - return dcf_inst_merge_all_records + def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: + try: + self._db = Database.get_instance() + self._db.connect() + sql = """\ + SELECT + * + FROM + src07.dcf_inst_merge + """ + dcf_inst_merge_all_records = self._db.execute_select(sql) + return dcf_inst_merge_all_records + except Exception as e: + raise BatchOperationException(e) + finally: + self._db.disconnect() - except Exception as e: - self._db.rollback() - raise BatchOperationException(e) - finally: - self._db.disconnect() + # com_instからdcf_inst_mergeにinsert + def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: - def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: - # com_instからdcf_inst_mergeにinsert - try: - self._db = Database.get_instance() - self._db.connect() - self._db.begin() - self._db.to_jst() - - sql = """\ - SELECT - ci.DCF_DSF_INST_CD, - ci.FORM_INST_NAME_KANJI, - ci.DELETE_SCHE_REASON_CD, - ci.DUP_OPP_CD, - ci.SYS_UPDATE_DATE - FROM - src05.COM_INST AS ci - WHERE - ci.DUP_OPP_CD IS NOT NULL - AND - ci.DELETE_SCHE_REASON_CD = 'D' - AND - ci.DELETE_DATA IS NULL - AND - ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() - AND - NOT EXISTS ( - SELECT - dim.DCF_INST_CD - FROM - src07.DCF_INST_MERGE AS dim - WHERE - dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + try: + self._db = Database.get_instance() + self._db.connect() + self._db.begin() + self._db.to_jst() + sql = """\ + SELECT + ci.DCF_DSF_INST_CD, + ci.FORM_INST_NAME_KANJI, + ci.DELETE_SCHE_REASON_CD, + ci.DUP_OPP_CD, + ci.SYS_UPDATE_DATE + FROM + src05.COM_INST AS ci + WHERE + ci.DUP_OPP_CD IS NOT NULL + AND + ci.DELETE_SCHE_REASON_CD = 'D' + AND + ci.DELETE_DATA IS NULL + AND + ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() + AND + NOT EXISTS ( + SELECT + dim.DCF_INST_CD + FROM + src07.DCF_INST_MERGE AS dim + WHERE + dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + ) + AND + + (ci.DCF_DSF_INST_CD EXISTS( + SELECT + mia.INST_CD + FROM + src07.MST_INST_ASSN as mia + WHERE + mia.INST_CD = ci.DCF_DSF_INST_CD + ) ) - AND - - (ci.DCF_DSF_INST_CD EXISTS( - SELECT - mia.INST_CD - FROM - src07.MST_INST_ASSN as mia - WHERE - mia.INST_CD = ci.DCF_DSF_INST_CD - ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + ap.PRSB_INST_CD + FROM + src07.ATC_PHARM AS ap + WHERE + ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD ) - OR ci.DCF_DSF_INST_CD EXISTS( - SELECT - ap.PRSB_INST_CD - FROM - src07.ATC_PHARM AS ap - WHERE - ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD - ) - OR ci.DCF_DSF_INST_CD EXISTS( - SELECT - trd.INST_CD - FROM - src07.TRN_RESULT_DATA AS trd - WHERE - trd.INST_CD = ci.DCF_DSF_INST_CD - ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + trd.INST_CD + FROM + src07.TRN_RESULT_DATA AS trd + WHERE + trd.INST_CD = ci.DCF_DSF_INST_CD ) - ; + ) + ; + """ + duplication_inst_records = self._db.execute_select(sql) + # DCF削除新規マスタ取り込み + values_clauses = [] + params = {} + for clauses_no, row in enumerate(duplication_inst_records, start=1): + dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" + dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" + values_clause = f"""(:{dcf_inst_cd_arr}, + :{dup_opp_cd_arr}, + DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), + NULL, + NULL, + NULL, + "Y", + batchuser, + SYSDATE(), + batchuser, + SYSDATE() + )""" + values_clauses.append(values_clause) + params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] + params[dup_opp_cd_arr] = row['DUP_OPP_CD'] + insert_sql = f""" + INSERT INTO + src07.dcf_inst_merge ( + DCF_INST_CD, + DUP_OPP_CD, + START_MONTH, + INVALID_FLG, + REMARKS, + DCF_INST_CD_NEW, + ENABLED_FLG, + CREATER, + CREATE_DATE, + UPDATER, + UPDATE_DATE + ) + VALUES + {','.join(values_clauses)} + """ + self._db.execute(insert_sql, params) + return (True, duplication_inst_records) + except Exception as e: + self._db.rollback() + raise BatchOperationException(e) + finally: + self._db.disconnect() - """ - duplication_inst_records = self._db.execute_select(sql) - - # DCF削除新規マスタ取り込み - values_clauses = [] - params = {} - for clauses_no, row in enumerate(duplication_inst_records, start=1): - dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" - dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" - values_clause = f"""(:{dcf_inst_cd_arr}, - :{dup_opp_cd_arr}, - DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), - NULL, - NULL, - NULL, - "Y", - batchuser, - SYSDATE(), - batchuser, - SYSDATE() - )""" - values_clauses.append(values_clause) - params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] - params[dup_opp_cd_arr] = row['DUP_OPP_CD'] - insert_sql = f""" - INSERT INTO - src07.dcf_inst_merge ( - DCF_INST_CD, - DUP_OPP_CD, - START_MONTH, - INVALID_FLG, - REMARKS, - DCF_INST_CD_NEW, - ENABLED_FLG, - CREATER, - CREATE_DATE, - UPDATER, - UPDATE_DATE - ) - VALUES - {','.join(values_clauses)} - """ - - self._db.execute(insert_sql, params) - - return (True, duplication_inst_records) - except Exception as e: - self._db.rollback() - raise BatchOperationException(e) - finally: - self._db.disconnect() - - def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): - sys_update_date = duplication_inst_records[0]['sys_update_date'] - set_year_month = '{set_year}年{set_month}月'.format( - set_year=sys_update_date[0:4], - set_month=sys_update_date[-2:] - ) - - add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' - add_dct_inst_merge_list = [] - for row in duplication_inst_records: - add_dct_inst_merge_list.append( - add_dct_inst_merge.format(**row)) - add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) - # 顧客報告用にログ出力 - logger.info( - f"""DCF施設統合マスタが追加されました。 + def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): + sys_update_date = duplication_inst_records[0]['sys_update_date'] + set_year_month = '{set_year}年{set_month}月'.format( + set_year=sys_update_date[0:4], + set_month=sys_update_date[-2:] + ) + add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' + add_dct_inst_merge_list = [] + for row in duplication_inst_records: + add_dct_inst_merge_list.append( + add_dct_inst_merge.format(**row)) + add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) + # 顧客報告用にログ出力 + logger.info( + f"""DCF施設統合マスタが追加されました。 ********************************************************** 適用月度 {set_year_month} ********************************************************** {add_dct_inst_merge_list} ********************************************************** 合計 {len(duplication_inst_records)}件""" - ) - return + ) + return - def _make_csv_data(csv_file_name: str, record_inst: list): - # CSVファイルを作成する - temporary_dir = tempfile.mkdtemp() - csv_file_path = path.join(temporary_dir, csv_file_name) + def _make_csv_data(csv_file_name: str, record_inst: list): + temporary_dir = tempfile.mkdtemp() + csv_file_path = path.join(temporary_dir, csv_file_name) + head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH', + 'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG', + 'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE'] + with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: + # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) + csv_file.write(f"{','.join(head_str)}\n") + # UTF-8、CRLF、価囲いありで書き込む + writer = csv.writer(csv_file, delimiter=',', lineterminator='\r\n', + quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, + strict=True + ) + # データ部分書き込み(施設) + for record_inst_data in record_inst: + record_inst_value = list(record_inst_data.values()) + csv_data = [ + '' if n is None else n for n in record_inst_value] + writer.writerow(csv_data) + return csv_file_path - head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH', - 'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG', - 'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE'] + # CSVファイルをバックアップ + def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): + # S3バケットにファイルを移動 + jsk_send_bucket = JskSendBucket() - with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: - # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) - csv_file.write(f"{','.join(head_str)}\n") + # 処理日を取得 + _, _, syor_date = JskultHdkeTblManager.get_batch_statuses() - # Shift-JIS、CRLF、価囲いありで書き込む - writer = csv.writer(csv_file, delimiter=',', lineterminator='\n', - quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, - strict=True - ) - - # データ部分書き込み(施設) - for record_inst_data in record_inst: - record_inst_value = list(record_inst_data.values()) - csv_data = [ - '' if n is None else n for n in record_inst_value] - writer.writerow(csv_data) - - return csv_file_path - - def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): - # S3バケットにファイルを移動 - jsk_send_bucket = JskSendBucket() - # バッチ共通設定を取得 - batch_context = BatchContext.get_instance() - - jsk_send_bucket.upload_dcf_inst_merge_csv_file( - csv_file_name, csv_file_path) - jsk_send_bucket.backup_dcf_inst_merge_csv_file( - csv_file_name, batch_context.syor_date) - return + jsk_send_bucket.upload_dcf_inst_merge_csv_file( + csv_file_name, csv_file_path) + jsk_send_bucket.backup_dcf_inst_merge_csv_file( + csv_file_name, syor_date) + return diff --git a/ecs/jskult-batch/src/system_var/environment.py b/ecs/jskult-batch/src/system_var/environment.py index 2d5d5f41..91a060e6 100644 --- a/ecs/jskult-batch/src/system_var/environment.py +++ b/ecs/jskult-batch/src/system_var/environment.py @@ -9,9 +9,9 @@ DB_SCHEMA = os.environ['DB_SCHEMA'] JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET'] BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID'] POST_PROCESS = os.environ["POST_PROCESS"] -MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"] +MAX_RUN_COUNT = os.environ["MAX_RUN_COUNT"] RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] -CSV_FILE_NAME = os.environ['CSV_FILE_NAME'] +DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME'] JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET'] JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET']