diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index 499cfa98..6937de3d 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -73,6 +73,11 @@ class DcfInstMergeIO(JskultBatchEntrypoint): receive_file_count ) try: + # DCF削除新規マスタ作成、出力用にDB接続を開始。 + # トランザクションも開始。 + db = Database.get_instance() + db.connect() + db.to_jst() jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_START) @@ -92,18 +97,19 @@ class DcfInstMergeIO(JskultBatchEntrypoint): jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_DOING) - # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 if jskult_batch_status_manager.is_done_ultmarc_import(): + db.begin() # COM_施設からDCF削除新規マスタに登録 (is_add_dcf_inst_merge, - duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self) + duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(db) if is_add_dcf_inst_merge: + logger.info('[NOTICE]DCF施設削除新規マスタが追加されました。') self._output_add_dcf_inst_merge_log( duplication_inst_records) - + db.commit() # CSV出力 - dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() + dcf_inst_merge_all_records = self._select_dcf_inst_merge_all(db) file_path = self._make_csv_data( self.environment.DCF_INST_MERGE_SEND_FILE_NAME, dcf_inst_merge_all_records) @@ -121,100 +127,82 @@ class DcfInstMergeIO(JskultBatchEntrypoint): return except Exception as e: + db.rollback() # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}') jskult_batch_run_manager.batch_failed() jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_ERROR) - - def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: - try: - self._db = Database.get_instance() - self._db.connect() - sql = """\ - SELECT - * - FROM - src07.dcf_inst_merge - """ - dcf_inst_merge_all_records = self._db.execute_select(sql) - return dcf_inst_merge_all_records - except Exception as e: - raise BatchOperationException(e) finally: - self._db.disconnect() + db.disconnect() # com_instからdcf_inst_mergeにinsert - def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: + def _insert_dcf_inst_merge_from_com_inst(self, db: Database) -> tuple[bool, list[dict]]: try: - self._db = Database.get_instance() - self._db.connect() - self._db.begin() - self._db.to_jst() sql = """\ SELECT - ci.DCF_DSF_INST_CD, - ci.FORM_INST_NAME_KANJI, - ci.DELETE_SCHE_REASON_CD, - ci.DUP_OPP_CD, - ci.SYS_UPDATE_DATE + ci.dcf_dsf_inst_cd AS dcf_dsf_inst_cd, + ci.form_inst_name_kanji AS form_inst_name_kanji, + ci.delete_sche_reason_cd AS delete_sche_reason_cd, + ci.dup_opp_cd AS dup_opp_cd, + ci.sys_update_date AS sys_update_date FROM - src05.COM_INST AS ci + src05.com_inst AS ci WHERE - ci.DUP_OPP_CD IS NOT NULL + ci.dup_opp_cd IS NOT NULL AND - ci.DELETE_SCHE_REASON_CD = 'D' + ci.delete_sche_reason_cd = 'D' AND - ci.DELETE_DATA IS NULL + ci.delete_data IS NULL AND - ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() + ci.sys_update_date BETWEEN src07.get_syor_date() AND NOW() AND NOT EXISTS ( SELECT - dim.DCF_INST_CD + dim.dcf_inst_cd FROM - src07.DCF_INST_MERGE AS dim + src07.dcf_inst_merge AS dim WHERE - dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + dim.dcf_inst_cd = ci.dcf_dsf_inst_cd ) AND - (ci.DCF_DSF_INST_CD EXISTS( + (ci.dcf_dsf_inst_cd EXISTS( SELECT - mia.INST_CD + mia.inst_cd FROM - src07.MST_INST_ASSN as mia + src07.mst_inst_assn as mia WHERE - mia.INST_CD = ci.DCF_DSF_INST_CD + mia.inst_cd = ci.dcf_dsf_inst_cd ) ) - OR ci.DCF_DSF_INST_CD EXISTS( + OR ci.dcf_dsf_inst_cd EXISTS( SELECT - ap.PRSB_INST_CD + ap.prsb_inst_cd FROM - src07.ATC_PHARM AS ap + src07.atc_pharm AS ap WHERE - ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD + ap.prsb_inst_cd = ci.dcf_dsf_inst_cd ) - OR ci.DCF_DSF_INST_CD EXISTS( + OR ci.dcf_dsf_inst_cd EXISTS( SELECT - trd.INST_CD + vtsr.inst_cd FROM - src07.TRN_RESULT_DATA AS trd + src07.vw_tebra_sales_refreshed AS vtsr WHERE - trd.INST_CD = ci.DCF_DSF_INST_CD + vtsr.inst_cd = ci.dcf_dsf_inst_cd ) ) ; """ - duplication_inst_records = self._db.execute_select(sql) + duplication_inst_records = db.execute_select(sql) # DCF削除新規マスタ取り込み values_clauses = [] params = {} for clauses_no, row in enumerate(duplication_inst_records, start=1): - dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" - dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" + dcf_inst_cd_arr = f"dcf_inst_cd{clauses_no}" + dup_opp_cd_arr = f"dup_opp_cd{clauses_no}" values_clause = f"""(:{dcf_inst_cd_arr}, :{dup_opp_cd_arr}, DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), @@ -228,33 +216,30 @@ class DcfInstMergeIO(JskultBatchEntrypoint): SYSDATE() )""" values_clauses.append(values_clause) - params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] - params[dup_opp_cd_arr] = row['DUP_OPP_CD'] + params[dcf_inst_cd_arr] = row['dcf_dsf_inst_cd'] + params[dup_opp_cd_arr] = row['dup_opp_cd'] insert_sql = f""" INSERT INTO src07.dcf_inst_merge ( - DCF_INST_CD, - DUP_OPP_CD, - START_MONTH, - INVALID_FLG, - REMARKS, - DCF_INST_CD_NEW, - ENABLED_FLG, - CREATER, - CREATE_DATE, - UPDATER, - UPDATE_DATE + dcf_inst_cd, + dup_opp_cd, + start_month, + invalid_flg, + remarks, + dcf_inst_cd_new, + enabled_flg, + creater, + create_date, + updater, + update_date ) VALUES {','.join(values_clauses)} """ - self._db.execute(insert_sql, params) + db.execute(insert_sql, params) return (True, duplication_inst_records) except Exception as e: - self._db.rollback() raise BatchOperationException(e) - finally: - self._db.disconnect() def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): sys_update_date = duplication_inst_records[0]['sys_update_date'] @@ -270,7 +255,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) # 顧客報告用にログ出力 logger.info( - f"""DCF施設統合マスタが追加されました。 + f"""DCF施設削除新規マスタが追加されました。 ********************************************************** 適用月度 {set_year_month} ********************************************************** @@ -281,6 +266,19 @@ class DcfInstMergeIO(JskultBatchEntrypoint): return + def _select_dcf_inst_merge_all(self, db: Database) -> tuple[bool, list[dict]]: + try: + sql = """\ + SELECT + * + FROM + src07.dcf_inst_merge + """ + dcf_inst_merge_all_records = db.execute_select(sql) + return dcf_inst_merge_all_records + except Exception as e: + raise BatchOperationException(e) + def _make_csv_data(csv_file_name: str, record_inst: list): temporary_dir = tempfile.mkdtemp() csv_file_path = path.join(temporary_dir, csv_file_name)