newdwh2021/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py
2025-05-28 12:09:43 +09:00

286 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os.path as path
import tempfile
from src.aws.s3 import S3Client, JskSendBucket
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.db.database import Database
from src.error.exceptions import BatchOperationException, MaxRunCountReachedException
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.system_var import environment
from src.logging.get_logger import get_logger
logger = get_logger('DCF削除新規マスタ作成')
class DcfInstMergeIO(JskultBatchEntrypoint):
def __init__(self):
super().__init__()
def execute(self):
jskultBatchRunManager = JskultBatchRunManager(
environment.BATCH_EXECUTION_ID)
jskultHdkeTblManager = JskultHdkeTblManager()
# /transfer_result/yyyy/mm/dd/
jskult_backuo_folder_name = f"""/transfer_result/{jskultHdkeTblManager.get_batch_statuses()[2]}"""
receive_file_count = S3Client.list_objects(
environment.JSKULT_BACKUP_BUCKET, jskult_backuo_folder_name).count()
jskultBatchStatusManager = JskultBatchStatusManager(
environment.PROCESS_NAME,
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
environment.POST_PROCESS,
environment.MAX_RUN_COUNT,
receive_file_count
)
try:
if not jskultHdkeTblManager.can_run_process():
logger.error(
'日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
return
jskultBatchStatusManager.set_process_status("start")
try:
if not jskultBatchStatusManager.can_run_post_process():
# 後続処理の起動条件を満たしていない場合
# 処理ステータスを「処理待」に設定
jskultBatchStatusManager.set_process_status("waiting")
# バッチ実行管理テーブルに「retry」で登録
jskultBatchRunManager.batch_retry()
return
except MaxRunCountReachedException as e:
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
jskultBatchStatusManager.set_process_status("doing")
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskultBatchStatusManager.is_done_ultmarc_import():
# COM_施設からDCF削除新規マスタに登録
(is_add_dcf_inst_merge,
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge:
self._output_add_dcf_inst_merge_log(
duplication_inst_records)
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all()
# CSV出力
file_path = self._make_csv_data(
environment.DCF_INST_MERGE_SEND_FILE_NAME,
dcf_inst_merge_all_records)
# CSVをS3にアップロード
self._upload_dcf_inst_merge_csv_file(
file_path, environment.DCF_INST_MERGE_SEND_FILE_NAME)
# 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
logger.info("DCF削除新規マスタ作成処理を正常終了します。")
jskultBatchRunManager.batch_success()
jskultBatchStatusManager.set_process_status("done")
except Exception as e:
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}')
jskultBatchRunManager.batch_failed()
jskultBatchStatusManager.set_process_status("failed")
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
sql = """\
SELECT
*
FROM
src07.dcf_inst_merge
"""
dcf_inst_merge_all_records = self._db.execute_select(sql)
return dcf_inst_merge_all_records
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
# com_instからdcf_inst_mergeにinsert
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
self._db.begin()
self._db.to_jst()
sql = """\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
FROM
src05.COM_INST AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
AND
ci.DELETE_SCHE_REASON_CD = 'D'
AND
ci.DELETE_DATA IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
FROM
src07.DCF_INST_MERGE AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
SELECT
mia.INST_CD
FROM
src07.MST_INST_ASSN as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
)
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
ap.PRSB_INST_CD
FROM
src07.ATC_PHARM AS ap
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
trd.INST_CD
FROM
src07.TRN_RESULT_DATA AS trd
WHERE
trd.INST_CD = ci.DCF_DSF_INST_CD
)
)
;
"""
duplication_inst_records = self._db.execute_select(sql)
# DCF削除新規マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
NULL,
NULL,
NULL,
"Y",
batchuser,
SYSDATE(),
batchuser,
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
)
VALUES
{','.join(values_clauses)}
"""
self._db.execute(insert_sql, params)
return (True, duplication_inst_records)
except Exception as e:
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date']
set_year_month = '{set_year}{set_month}'.format(
set_year=sys_update_date[0:4],
set_month=sys_update_date[-2:]
)
add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}'
add_dct_inst_merge_list = []
for row in duplication_inst_records:
add_dct_inst_merge_list.append(
add_dct_inst_merge.format(**row))
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力
logger.info(
f"""DCF施設統合マスタが追加されました。
**********************************************************
適用月度 {set_year_month}
**********************************************************
{add_dct_inst_merge_list}
**********************************************************
合計 {len(duplication_inst_records)}"""
)
return
def _make_csv_data(csv_file_name: str, record_inst: list):
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH',
'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG',
'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE']
with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file:
# ヘッダ行書き込みくくり文字をつけない為にwriterowではなく、writeを使用しています
csv_file.write(f"{','.join(head_str)}\n")
# UTF-8、CRLF、価囲いありで書き込む
writer = csv.writer(csv_file, delimiter=',', lineterminator='\r\n',
quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL,
strict=True
)
# データ部分書き込み(施設)
for record_inst_data in record_inst:
record_inst_value = list(record_inst_data.values())
csv_data = [
'' if n is None else n for n in record_inst_value]
writer.writerow(csv_data)
return csv_file_path
# CSVファイルをバックアップ
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str):
# S3バケットにファイルを移動
jsk_send_bucket = JskSendBucket()
# 処理日を取得
_, _, syor_date = JskultHdkeTblManager.get_batch_statuses()
jsk_send_bucket.upload_dcf_inst_merge_csv_file(
csv_file_name, csv_file_path)
jsk_send_bucket.backup_dcf_inst_merge_csv_file(
csv_file_name, syor_date)
return