diff --git a/ecs/jskult-batch-ultmarc-io/src/aws/s3.py b/ecs/jskult-batch-ultmarc-io/src/aws/s3.py index 9c5b8f71..e5dbcb77 100644 --- a/ecs/jskult-batch-ultmarc-io/src/aws/s3.py +++ b/ecs/jskult-batch-ultmarc-io/src/aws/s3.py @@ -109,5 +109,5 @@ class VjskSendBucket(S3Bucket): # バックアップバケットにコピー vjsk_backup_bucket = VjskBackupBucket() dat_key = f'{self._send_folder}/{dat_file_key}' - backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' + backup_key = f'{vjsk_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key) diff --git a/ecs/jskult-batch/Dockerfile b/ecs/jskult-batch/Dockerfile index fc0fde90..674c6c53 100644 --- a/ecs/jskult-batch/Dockerfile +++ b/ecs/jskult-batch/Dockerfile @@ -15,6 +15,6 @@ RUN \ pip uninstall -y pipenv virtualenv-clone virtualenv COPY src ./src -COPY entrypoint.py entrypoint.py +COPY entrypoint.py entrypoint.py CMD ["python", "entrypoint.py"] diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index 27701f7b..c5868f68 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -85,7 +85,7 @@ class JskUltBackupBucket(S3Bucket): class JskBackupBucket(JskUltBackupBucket): - _folder = environment.JSKULT_BACKUP_BUCKET + _folder = environment.JSK_BACKUP_FOLDER class JskTransferListBucket(JskUltBackupBucket): @@ -118,8 +118,8 @@ class JskSendBucket(S3Bucket): def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str): # バックアップバケットにコピー - jskult_backup_bucket = JskUltBackupBucket() + jskult_backup_bucket = JskBackupBucket() dat_key = f'{self._send_folder}/{dat_file_key}' - backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' + backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key) \ No newline at end of file diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index 499cfa98..88c3b6c7 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -17,7 +17,7 @@ from src.manager.jskult_batch_status_manager import JskultBatchStatusManager from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager from src.system_var import constants -logger = get_logger('DCF削除新規マスタ作成') +logger = get_logger('DCF削除新規マスタ作成/データ出力') class DcfInstMergeIO(JskultBatchEntrypoint): @@ -26,9 +26,6 @@ class DcfInstMergeIO(JskultBatchEntrypoint): # 環境変数をimport self.environment = DCFInstMergeEnvironment() - def execute(self): - logger.info("DCF削除新規マスタ作成処理を開始します。") - # 必須の環境変数が設定されていない場合、エラーにする try: self.environment.validate() @@ -36,9 +33,14 @@ class DcfInstMergeIO(JskultBatchEntrypoint): logger.exception(e) return + def execute(self): + logger.info("DCF削除新規マスタ作成/データ出力処理を開始します。") + jskult_hdke_tbl_manager = JskultHdkeTblManager() jskult_batch_run_manager = JskultBatchRunManager( + self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME, self.environment.BATCH_EXECUTION_ID) + if not jskult_hdke_tbl_manager.can_run_process(): logger.error( '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') @@ -58,6 +60,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}') # バッチ実行管理テーブルをfailedで登録 jskult_batch_run_manager.batch_failed() + return with open(transfer_list_file_path) as f: transfer_list = json.load(f) @@ -73,7 +76,6 @@ class DcfInstMergeIO(JskultBatchEntrypoint): receive_file_count ) try: - jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_START) try: @@ -85,7 +87,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): # バッチ実行管理テーブルに「retry」で登録 jskult_batch_run_manager.batch_retry() - + logger.info("起動条件を満たしていないため、DCF削除新規マスタ作成処理を終了します。") return except MaxRunCountReachedException: logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。') @@ -93,27 +95,39 @@ class DcfInstMergeIO(JskultBatchEntrypoint): jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_DOING) + # DCF削除新規マスタ作成、出力用にDB接続を開始。 + # トランザクションも開始。 + db = Database.get_instance() + db.connect() + db.to_jst() # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 if jskult_batch_status_manager.is_done_ultmarc_import(): + logger.info("アルトマークデータが取り込まれているため、DCF削除新規マスタ作成処理を開始します。") + db.begin() # COM_施設からDCF削除新規マスタに登録 (is_add_dcf_inst_merge, - duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self) + duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(db) if is_add_dcf_inst_merge: + logger.info('[NOTICE]DCF施設削除新規マスタが追加されました。') self._output_add_dcf_inst_merge_log( duplication_inst_records) + logger.info("DCF削除新規マスタ作成処理が正常終了しました。") + db.commit() - # CSV出力 - dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() + # DCF施設削除新規マスタ出力 + logger.info('DCF施設削除新規マスタ出力を開始します。') + dcf_inst_merge_all_records = self._select_dcf_inst_merge_all(db) file_path = self._make_csv_data( self.environment.DCF_INST_MERGE_SEND_FILE_NAME, dcf_inst_merge_all_records) # CSVをS3にアップロード self._upload_dcf_inst_merge_csv_file( - file_path, process_date, self.environment.DCF_INST_MERGE_SEND_FILE_NAME) + self.environment.DCF_INST_MERGE_SEND_FILE_NAME, process_date, file_path) + logger.info("DCF施設削除新規マスタ出力が正常終了しました。") # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録 - logger.info("DCF削除新規マスタ作成処理を正常終了します。") + logger.info("DCF削除新規マスタ作成/データ出力処理を終了します。") jskult_batch_run_manager.batch_success() jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_DONE) @@ -121,146 +135,136 @@ class DcfInstMergeIO(JskultBatchEntrypoint): return except Exception as e: + db.rollback() # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}') jskult_batch_run_manager.batch_failed() jskult_batch_status_manager.set_process_status( constants.PROCESS_STATUS_ERROR) - - def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: - try: - self._db = Database.get_instance() - self._db.connect() - sql = """\ - SELECT - * - FROM - src07.dcf_inst_merge - """ - dcf_inst_merge_all_records = self._db.execute_select(sql) - return dcf_inst_merge_all_records - except Exception as e: - raise BatchOperationException(e) finally: - self._db.disconnect() + db.disconnect() # com_instからdcf_inst_mergeにinsert - def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: + def _insert_dcf_inst_merge_from_com_inst(self, db: Database) -> tuple[bool, list[dict]]: try: - self._db = Database.get_instance() - self._db.connect() - self._db.begin() - self._db.to_jst() sql = """\ SELECT - ci.DCF_DSF_INST_CD, - ci.FORM_INST_NAME_KANJI, - ci.DELETE_SCHE_REASON_CD, - ci.DUP_OPP_CD, - ci.SYS_UPDATE_DATE + ci.dcf_dsf_inst_cd AS dcf_dsf_inst_cd, + ci.form_inst_name_kanji AS form_inst_name_kanji, + ci.dup_opp_cd AS dup_opp_cd, + ( + SELECT + dupci.form_inst_name_kanji + FROM + src05.com_inst AS dupci + WHERE + dupci.dcf_dsf_inst_cd = ci.dup_opp_cd + ) AS dup_inst_name_kanji, + DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), '%Y%m') AS start_month FROM - src05.COM_INST AS ci + src05.com_inst AS ci WHERE - ci.DUP_OPP_CD IS NOT NULL + (ci.dup_opp_cd IS NOT NULL OR CHAR_LENGTH(ci.dup_opp_cd) > 0) AND - ci.DELETE_SCHE_REASON_CD = 'D' + ci.delete_sche_reason_cd = 'D' AND - ci.DELETE_DATA IS NULL + ci.abolish_ymd IS NULL AND - ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() + ci.sys_update_date BETWEEN src07.get_syor_date() AND NOW() AND NOT EXISTS ( SELECT - dim.DCF_INST_CD + 1 FROM - src07.DCF_INST_MERGE AS dim + src07.dcf_inst_merge AS dim WHERE - dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + dim.dcf_inst_cd = ci.dcf_dsf_inst_cd ) - AND - - (ci.DCF_DSF_INST_CD EXISTS( + AND( + EXISTS( SELECT - mia.INST_CD + 1 FROM - src07.MST_INST_ASSN as mia + src07.mst_inst_assn as mia WHERE - mia.INST_CD = ci.DCF_DSF_INST_CD + mia.inst_cd = ci.dcf_dsf_inst_cd ) + OR EXISTS( + SELECT + 1 + FROM + src07.atc_pharm AS ap + WHERE + ap.prsb_inst_cd = ci.dcf_dsf_inst_cd ) - OR ci.DCF_DSF_INST_CD EXISTS( + OR EXISTS( SELECT - ap.PRSB_INST_CD + 1 FROM - src07.ATC_PHARM AS ap + view07.vw_tebra_sales_refreshed AS vtsr WHERE - ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD - ) - OR ci.DCF_DSF_INST_CD EXISTS( - SELECT - trd.INST_CD - FROM - src07.TRN_RESULT_DATA AS trd - WHERE - trd.INST_CD = ci.DCF_DSF_INST_CD - ) + vtsr.cnvs_inst_cd = ci.dcf_dsf_inst_cd ) - ; + ); """ - duplication_inst_records = self._db.execute_select(sql) + duplication_inst_records = db.execute_select(sql) + + if len(duplication_inst_records) == 0: + logger.info('施設統合対象データはありません') + return (False, None) + # DCF削除新規マスタ取り込み values_clauses = [] params = {} for clauses_no, row in enumerate(duplication_inst_records, start=1): - dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" - dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" + dcf_inst_cd_arr = f"dcf_inst_cd{clauses_no}" + dup_opp_cd_arr = f"dup_opp_cd{clauses_no}" + start_month_arr = f'start_month{clauses_no}' values_clause = f"""(:{dcf_inst_cd_arr}, :{dup_opp_cd_arr}, - DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), + :{start_month_arr}, NULL, NULL, NULL, "Y", - batchuser, + CURRENT_USER(), SYSDATE(), - batchuser, + CURRENT_USER(), SYSDATE() )""" values_clauses.append(values_clause) - params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] - params[dup_opp_cd_arr] = row['DUP_OPP_CD'] + params[dcf_inst_cd_arr] = row['dcf_dsf_inst_cd'] + params[dup_opp_cd_arr] = row['dup_opp_cd'] + params[start_month_arr] = row['start_month'] insert_sql = f""" INSERT INTO src07.dcf_inst_merge ( - DCF_INST_CD, - DUP_OPP_CD, - START_MONTH, - INVALID_FLG, - REMARKS, - DCF_INST_CD_NEW, - ENABLED_FLG, - CREATER, - CREATE_DATE, - UPDATER, - UPDATE_DATE + dcf_inst_cd, + dup_opp_cd, + start_month, + invalid_flg, + remarks, + dcf_inst_cd_new, + enabled_flg, + creater, + create_date, + updater, + update_date ) VALUES {','.join(values_clauses)} """ - self._db.execute(insert_sql, params) + db.execute(insert_sql, params) return (True, duplication_inst_records) except Exception as e: - self._db.rollback() raise BatchOperationException(e) - finally: - self._db.disconnect() - def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): - sys_update_date = duplication_inst_records[0]['sys_update_date'] + def _output_add_dcf_inst_merge_log(self, duplication_inst_records: list[dict]): + start_month = duplication_inst_records[0]['start_month'] set_year_month = '{set_year}年{set_month}月'.format( - set_year=sys_update_date[0:4], - set_month=sys_update_date[-2:] + set_year=start_month[0:4], + set_month=start_month[-2:] ) add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' add_dct_inst_merge_list = [] @@ -270,7 +274,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) # 顧客報告用にログ出力 logger.info( - f"""DCF施設統合マスタが追加されました。 + f"""DCF施設削除新規マスタが追加されました。 ********************************************************** 適用月度 {set_year_month} ********************************************************** @@ -281,7 +285,20 @@ class DcfInstMergeIO(JskultBatchEntrypoint): return - def _make_csv_data(csv_file_name: str, record_inst: list): + def _select_dcf_inst_merge_all(self, db: Database) -> tuple[bool, list[dict]]: + try: + sql = """\ + SELECT + * + FROM + src07.dcf_inst_merge + """ + dcf_inst_merge_all_records = db.execute_select(sql) + return dcf_inst_merge_all_records + except Exception as e: + raise BatchOperationException(e) + + def _make_csv_data(self, csv_file_name: str, record_inst: list): temporary_dir = tempfile.mkdtemp() csv_file_path = path.join(temporary_dir, csv_file_name) head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH', diff --git a/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env b/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env index 8469e66f..09723686 100644 --- a/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env +++ b/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env @@ -6,7 +6,6 @@ JSK_BACKUP_FOLDER=jsk/send TRANSFER_RESULT_FOLDER=transfer_result TRANSFER_RESULT_FILE_NAME=transfer_result.json DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv -JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config DB_CONNECTION_MAX_RETRY_ATTEMPT=1 DB_CONNECTION_RETRY_INTERVAL_INIT=1 DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1