From 626ee291c36245e5fc6562f3114c29a0261998a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E9=96=93?= Date: Mon, 26 Jun 2023 08:54:13 +0900 Subject: [PATCH] =?UTF-8?q?=E4=BD=9C=E6=A5=AD=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/batch/jskult_batch_monthly.py | 236 ++++++++++++++++++ .../src/jobctrl_monthly.py | 5 +- 2 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py diff --git a/ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py b/ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py new file mode 100644 index 00000000..3cbecd15 --- /dev/null +++ b/ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py @@ -0,0 +1,236 @@ + +from datetime import datetime + +# from src.aws.s3 import UltmarcBucket +# from src.batch.common.batch_context import BatchContext +from src.db.database import Database +from src.error.exceptions import BatchOperationException +from src.logging.get_logger import get_logger +# from src.system_var import constants +import pathlib +import os + +logger = get_logger('実消化&アルトマーク月次バッチ') + + +class JskultBathcMonthly(): + """ 実消化&アルトマーク月次バッチ """ + + # WKテーブルの過去分削除SQL + PHYSICAL_NORMAL_DELETE_QUERY = """\ + DELETE FROM src05.wk_inst_aris_if + """ + + # 正常系データを取得しWKテーブルに保存SQL + NORMAL_INSERT_SELECT_QUERY = """\ + INSERT src05.wk_inst_aris_if + SELECT + TRIM(' ' FROM TRIM(' ' FROM SUBSTRIN(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd + ,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form + ,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name + ,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form + ,TRIM(' ' FROM TRIM(' ' FROM ci.prefc_cd)) AS pref_cd + ,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(cp.prefc_name,1,8))) AS pref_name + ,TRIM(' ' FROM TRIM(' ' FROM ci.postal_number)) AS postal_cd + ,TRIM(' ' FROM TRIM(' ' FROM cc.city_name)) AS city_name + ,TRIM(' ' FROM TRIM(' ' FROM ci.inst_addr)) AS address + ,TRIM(' ' FROM TRIM(' ' FROM cd.inst_div_name)) + ,TRIM(' ' FROM TRIM(' ' FROM ci.inst_phone_number)) AS phone_no + ,TRIM(' ' FROM TRIM(' ' FROM ci.inst_div_cd)) + ,TRIM(' ' FROM TRIM(' ' FROM ci.manage_cd)) + ,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date + ,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date + ,sysdate() + FROM src05.com_inst ci + LEFT JOIN src05.mst_prefc cp + ON ci.prefc_cd = cp.prefc_cd + LEFT JOIN src05.mst_city cc + ON ci.prefc_cd = cc.prefc_cd + AND ci.city_cd = cc.city_cd + LEFT OUTER JOIN src05.JOIN com_inst_div cd + ON ci.inst_div_cd = cd.inst_div_cd + WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%' + AND ci.dcf_dsf_inst_cd IS NOT NULL + AND ci.form_inst_name_kanji IS NOT NULL + AND ci.prefc_cd IS NOT NULL + AND cp.prefc_name IS NOT NULL + AND cc.city_name IS NOT NULL + AND ci.inst_addr IS NOT NULL + ORDER BY ci.dcf_dsf_inst_cd + """ + + # 正常系データの件数を取得SQL + NORMAL_COUNT_QUERY = """\ + SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if + """ + + # 異常系WKテーブルの過去分削除SQL + PHYSICAL_ABNORMAL_DELETE_QUERY = """\ + DELETE FROM src05.wk_inst_aris_if_wrn + """ + + # 異常系データを取得しWKテーブルに保存SQL + ABNORMAL_INSERT_SELECT_QUERY = """\ + INSERT src05.wk_inst_aris_if_wrn + SELECT + TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd + ,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form + ,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name + ,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form + ,TRIM(' ' FROM TRIM(' ' from ci.prefc_cd)) AS pref_cd + ,TRIM(' ' FROM TRIM(' ' from SUBSTR(cp.prefc_name,1,8))) AS pref_name + ,TRIM(' ' FROM TRIM(' ' from ci.postal_number)) AS postal_cd + ,TRIM(' ' FROM TRIM(' ' from cc.city_name)) AS city_name + ,TRIM(' ' FROM TRIM(' ' from ci.inst_addr)) AS address + ,TRIM(' ' FROM TRIM(' ' from cd.inst_div_name)) + ,TRIM(' ' FROM TRIM(' ' from ci.inst_phone_number)) AS phone_no + ,TRIM(' ' FROM TRIM(' ' from ci.inst_div_cd)) + ,TRIM(' ' FROM TRIM(' ' from ci.manage_cd)) + ,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date + ,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date + ,IF(ci.dcf_dsf_inst_cd IS NULL,'bi0402000001', NULL) AS wrnid_dcf_inst_cd + ,IF(ci.form_inst_name_kanji IS NULL,'bi0402000002', NULL) AS wrnid_inst_name_form + ,IF(ci.prefc_cd IS NULL,'bi0402000003', NULL) AS wrnid_pref_cd + ,IF(cp.prefc_name IS NULL,'bi0402000004', NULL) AS wrnid_pref_name + ,IF(cc.city_name IS NULL,'bi0402000005', NULL) AS wrnid_city_name + ,IF(ci.inst_addr IS NULL,'bi0402000006', NULL) AS wrnid_address + ,sysdate() + FROM src05.com_inst ci + LEFT JOIN src05.mst_prefc cp + ON ci.prefc_cd = cp.prefc_cd + LEFT JOIN src05.mst_city cc + ON ci.prefc_cd = cc.prefc_cd + AND ci.city_cd = cc.city_cd + LEFT OUTER JOIN src05.com_inst_div cd + ON ci.inst_div_cd = cd.inst_div_cd + WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%' + AND( ci.dcf_dsf_inst_cd IS NULL + OR ci.form_inst_name_kanji IS NULL + OR ci.prefc_cd IS NULL + OR cp.prefc_name IS NULL + OR cc.city_name IS NULL + OR ci.inst_addr IS NULL) + ORDER BY ci.dcf_dsf_inst_cd + """ + + # 正常系データの件数を取得SQL + ABNORMAL_COUNT_QUERY = """\ + SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if_wrn + """ + # CSVファイルの作成用のSQL + SELECT_QUERY = """\ + SELECT dcf_inst_cd, inst_name_form, inst_name, inst_name_kana_form, pref_cd, pref_name, + postal_cd, city_name, address, inst_div_name, phone_no, inst_div_cd, manage_cd, + '', inst_delete_date + FROM src05.wk_inst_aris_if ORDER BY dcf_inst_cd + """ + + aris_log = '/var/log/temporarydwh/' + move_file_path = '/data/mountaris/DATA/' + create_date = datetime.now().strftime('%Y%m%d%H%M%S') + create_date_format = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + aris_create_csv = f'/home/nds_dwh/tmpcsv/D0004_ARIS_M_DCF_{create_date}csv' + aris_move_csv = f'{move_file_path}D0004_ARIS_M_DCF_{create_date}.csv' + res_log = f'{aris_log}D0004{create_date}.log' + move_res_og = f'{move_file_path}D0004{create_date}log' + prg_id = 'PrgId:BI0402' + head_str = 'TC_HOSPITAL, TJ_HOSPITAL, TJ_HOSPITALSHORT, TK_HOSPITAL, \ + TC_PREFECTURE, TJ_PREFECTURE, TJ_ZIPCODE, TJ_CITY, TJ_ADDRESS, TJ_DEPARTMENT, \ + TJ_TELEPHONENUMBER, TC_HOSPITALCAT, TC_HOSPITALTYPE, TS_UPDATE, TD_UPDATE' + + start_msg = "MsgID:BI0000000001 Message:バッチ処理を開始しました。\n" + dbConnect_err_msg = "MsgID:999999000002 Message:DB接続エラーです。\n" + err_end_msg = "MsgID:BI0000009998 Message:バッチ処理を異常終了しました。\n" + move_err_msg = "MsgID:BI0000000041 Message:S3バケットARISへのCSVデータ、実行ログ移動できませんでした。\n" + sql_err_msg = "MsgID:999999000002 Message:SQL実行エラーです。\n" + csv_err_msg = "MsgID:BI0000000040 Message:ワークデータの作成に失敗しました。\n" + cnt_msg = "MsgID: Message: LogText:" + suc_end_msg = "MsgID:BI0000009999 Message:バッチ処理を正常に終了しました。\n" + + def exec_batch_monthly(self): + """ 実消化&アルトマーク月次バッチ """ + try: + # 実行ログに書き込む + res_log_p = pathlib.Path(self.res_log) + res_log_p.touch() + os.chmod(self.res_log, '0664') + + # 実行ログ + resLog_f = open(self.res_log) + print(f'{self.create_date_format}[DWH][3][INFO]{self.prg_id} {self.start_msg}') + + db = Database.get_instance() + # DB接続 + db.connect() + # トランザクションの開始 + db.begin() + + # 正常系データの反映 + # 過去分は不要のため、デリート + db.execute(self.PHYSICAL_NORMAL_DELETE_QUERY) + + # 正常系データを取得しWKテーブルに保存する。 + db.execute(self.NORMAL_INSERT_SELECT_QUERY) + + # 正常系データの件数を取得 + record_count = db.execute_select(self.NORMAL_COUNT_QUERY) + suc_count = record_count[0]['countNum'] + + # 警告系データの反映 + # 過去分は不要のため、DWH.WK_INST_ARIS_IF_WRNをデリートする。 + db.execute(self.PHYSICAL_ABNORMAL_DELETE_QUERY) + + # 異常系データを取得しWKテーブルに保存する。 + db.execute(self.ABNORMAL_INSERT_SELECT_QUERY) + + # 異常系データの件数を取得 + record_count = db.execute_select(self.ABNORMAL_COUNT_QUERY) + wrn_count = record_count[0]['countNum'] + + # CSVファイルの作成用のSQL実行 + record_csv = db.execute_select(self.SELECT_QUERY) + + # CSVファイル作成 + arisCreateCsv_p = pathlib.Path(self.arisCreateCsv) + arisCreateCsv_p.touch() + if not os.path.exists(self.arisCreateCsv): + print(f'{self.create_date_format}[DWH][5][ERROR]{self.prg_id} {self.csv_err_msg}') + print(f'{self.create_date_format}[DWH][5][ERROR]{self.prg_id} {self.err_end_msg}') + + # ヘッダ行書き込み + resLog_f = open(self.aris_create_csv) + print(f'{self.head_str}\r\n') + + # データ部分書き込み + for record_data in record_csv: + csv_data = ",".join(record_data).encode('shift_jis') + print(f'{csv_data}\r\n') + + logger.info('use memory--->') + logger.info('memory_get_usage 与えられたメモリの量') + logger.info('\n') + logger.info('max memory--->') + logger.info('memory_get_peak_usage  メモリの最大値') + logger.info('\n') + + resLog_f.close() + + # トランザクションの終了 + db.commit() + + # 実行ログファイルの追記 + # 実行ログに処理件数を書き込む。 + sum_count = suc_count + wrn_count + print(f'{self.create_date_format}[DWH][3][INFO]{self.prg_id} {self.cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})\n') + + # ファイル移動処理 + + logger.info('実消化&アルトマーク月次バッチ処理: 終了') + except Exception as e: + raise BatchOperationException(e) + + finally: + # 終了時に必ずコミットする + db.commit() + db.disconnect() + return diff --git a/ecs/jskult-batch-monthly/src/jobctrl_monthly.py b/ecs/jskult-batch-monthly/src/jobctrl_monthly.py index b58c4601..302993a2 100644 --- a/ecs/jskult-batch-monthly/src/jobctrl_monthly.py +++ b/ecs/jskult-batch-monthly/src/jobctrl_monthly.py @@ -9,6 +9,7 @@ from src.batch.common.calendar_file import CalendarFile from src.error.exceptions import BatchOperationException from src.logging.get_logger import get_logger from src.system_var import constants +from src.batch.jskult_batch_monthly import JskultBathcMonthly logger = get_logger('月次処理コントロール') @@ -34,7 +35,7 @@ def exec(): # dump取得が正常終了していない場合、後続の処理は行わない if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE: logger.error('dump取得が正常終了していないため、月次バッチ処理を終了します。') - return constants.BATCH_EXIT_CODE_SUCCESS + # 戻すんだよ return constants.BATCH_EXIT_CODE_SUCCESS logger.info(f'処理日={syor_date}') # バッチ共通設定に処理日を追加 @@ -61,7 +62,7 @@ def exec(): try: logger.info('月次バッチ:起動') - # ultmarc_process.exec_import() + JskultBathcMonthly.exec_batch_monthly() logger.info('月次バッチ:終了') except BatchOperationException as e: logger.exception(f'月次バッチ処理エラー(異常終了){e}')