DCF施設削除新規マスタの作成とDCF施設統合マスタ(DCF_INST_MERGE)の取り込み、ログの出力

This commit is contained in:
mori.k 2025-05-26 19:54:23 +09:00
parent b24d1b0eb9
commit cd1e663b4a

View File

@ -1,10 +1,195 @@
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
import os
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.logging.get_logger import get_logger
logger = get_logger('DCF削除新規マスタ作成')
LOG_LEVEL = os.environ["LOG_LEVEL"]
PROCESS_NAME = os.environ["PROCESS_NAME"]
POST_PROCESS = os.environ["POST_PROCESS"]
MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"]
RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"]
JSK_DATA_SEND_FOLDER = os.environ["JSK_DATA_SEND_FOLDER"]
JSK_BACKUP_FOLDER = os.environ["JSK_BACKUP_FOLDER"]
TRANSFER_RESULT_FOLDER = os.environ["TRANSFER_RESULT_FOLDER"]
DCF_INST_MERGE_SEND_FILE_NAME = os.environ["DCF_INST_MERGE_SEND_FILE_NAME"]
DB_CONNECTION_MAX_RETRY_ATTEMPT = os.environ["DB_CONNECTION_MAX_RETRY_ATTEMPT"]
DB_CONNECTION_RETRY_INTERVAL_INIT = os.environ["DB_CONNECTION_RETRY_INTERVAL_INIT"]
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS"]
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS"]
class DcfInstMergeIO(JskultBatchEntrypoint):
def __init__(self):
super().__init__()
def execute(self):
# TODO: ここでDCF削除新規マスタ作成/データ出力処理を実行する
pass
jskultHdkeTblManager = JskultHdkeTblManager()
if not jskultHdkeTblManager.can_run_process():
return
jskultBatchStatusManager = JskultBatchStatusManager(
PROCESS_NAME,
POST_PROCESS,
MAX_RUN_COUNT_FLG,
RECEIVE_FILE_COUNT
)
if not jskultBatchStatusManager.can_run_post_process():
# 処理ステータスを「処理待」に設定
jskultBatchStatusManager.set_process_status("retry")
return
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskultBatchStatusManager.is_done_ultmarc_import():
try:
self._db = Database.get_instance()
self._db.connect()
self._db.begin()
self._db.to_jst()
(is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge:
_output_add_dcf_inst_merge_log(duplication_inst_records)
except Exception as e:
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
# TODO DCF施設削除新規マスタをS3に出力
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
sql ="""\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
FROM
COM_INST AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
AND
ci.DELETE_SCHE_REASON_CD = 'D'
AND
ci.DELETE_DATA IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
FROM
DCF_INST_MERGE AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
SELECT
mia.INST_CD
FROM
MST_INST_ASSN as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
)
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
ap.PRSB_INST_CD
FROM
ATC_PHARM AS ap
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
vtrd.INST_CD
FROM
VW_TRN_RESULT_DATA AS vtrd
WHERE
vtrd.INST_CD = ci.DCF_DSF_INST_CD
)
)
;
"""
duplication_inst_records = self._db.execute_select(sql)
# DCF施設統合マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
NULL,
NULL,
NULL,
"Y",
batchuser,
SYSDATE(),
batchuser,
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
)
VALUES
{','.join(values_clauses)}
"""
return (True, duplication_inst_records)
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date']
set_year_month = '{set_year}{set_month}'.format(
set_year=sys_update_date[0:4],
set_month=sys_update_date[-2:]
)
add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}'
add_dct_inst_merge_list = []
for row in duplication_inst_records:
add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row))
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力
logger.info(
f"""DCF削除新規マスタが追加されました。
**********************************************************
適用月度 {set_year_month}
**********************************************************
{add_dct_inst_merge_list}
**********************************************************
合計 {len(duplication_inst_records)}"""
)
return