Merge pull request #490 featrue-NEWDWH2021-1869 into develop

This commit is contained in:
森 一真 2025-06-02 18:45:06 +09:00
commit 655e512f0d
2 changed files with 398 additions and 2 deletions

View File

@ -0,0 +1,39 @@
from src.batch.environment.jskult_batch_environment import \
JskultBatchEnvironment
from src.system_var import environment
class MstInstAllEnvironment(JskultBatchEnvironment):
"""実消化&アルトマークのバッチ処理で使用する環境変数を管理するクラス"""
def __init__(self):
self.JSKULT_BACKUP_BUCKET = environment.JSKULT_BACKUP_BUCKET
self.BATCH_MANAGE_DYNAMODB_TABLE_NAME = environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME
self.BATCH_EXECUTION_ID = environment.BATCH_EXECUTION_ID
self.MAX_RUN_COUNT = environment.MAX_RUN_COUNT
self.PROCESS_NAME = environment.PROCESS_NAME
self.TRANSFER_RESULT_FOLDER = environment.TRANSFER_RESULT_FOLDER
self.TRANSFER_RESULT_FILE_NAME = environment.TRANSFER_RESULT_FILE_NAME
def validate(self):
"""
必須の環境変数が設定されているかどうか検査する
DB関連の環境変数は対象外とする
Raises:
EnvironmentVariableNotSetException: 環境変数の設定ミス
"""
super()._assert_variable_not_empty(
self.JSKULT_BACKUP_BUCKET, 'JSKULT_BACKUP_BUCKET')
super()._assert_variable_not_empty(self.BATCH_MANAGE_DYNAMODB_TABLE_NAME,
'BATCH_MANAGE_DYNAMODB_TABLE_NAME')
super()._assert_variable_not_empty(
self.BATCH_EXECUTION_ID, 'BATCH_EXECUTION_ID')
super()._assert_variable_is_int(self.MAX_RUN_COUNT, 'MAX_RUN_COUNT')
# MAX_RUN_COUNTは数値として扱うため、検査後に変換
self.MAX_RUN_COUNT = int(self.MAX_RUN_COUNT)
super()._assert_variable_not_empty(self.PROCESS_NAME, 'PROCESS_NAME')
super()._assert_variable_not_empty(
self.TRANSFER_RESULT_FOLDER, 'TRANSFER_RESULT_FOLDER')
super()._assert_variable_not_empty(
self.TRANSFER_RESULT_FILE_NAME, 'TRANSFER_RESULT_FILE_NAME')

View File

@ -1,10 +1,367 @@
import csv
import json
import os.path as path
import tempfile
from src.aws.s3 import JskSendBucket, JskTransferListBucket
from src.batch.environment.mst_inst_all_environment import \
MstInstAllEnvironment
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.db.database import Database
from src.error.exceptions import (BatchOperationException,
EnvironmentVariableNotSetException,
MaxRunCountReachedException)
from src.logging.get_logger import get_logger
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.system_var import environment
from src.system_var import constants
logger = get_logger('メルク施設マスタ作成')
class MstInstAll(JskultBatchEntrypoint):
def __init__(self):
super().__init__()
self.environment = MstInstAllEnvironment()
# 必須の環境変数が設定されていない場合、エラーにする
try:
self.environment.validate()
except EnvironmentVariableNotSetException as e:
logger.exception(e)
return
def execute(self):
# TODO: ここでメルク施設マスタ作成処理を実行する
pass
logger.info("メルク施設マスタ作成処理を開始します。")
jskult_hdke_tbl_manager = JskultHdkeTblManager()
jskult_batch_run_manager = JskultBatchRunManager(
self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME,
self.environment.BATCH_EXECUTION_ID)
if not jskult_hdke_tbl_manager.can_run_process():
logger.error(
'日次バッチ処理中またはdump取得が正常終了していないため、メルク施設マスタ作成処理を終了します。')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
return
# 業務日付を取得
_, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses()
# 転送ファイル一覧を取得し、転送件数を取得
try:
transfer_list_bucket = JskTransferListBucket()
transfer_list_file_path = transfer_list_bucket.download_transfer_result_file(
process_date)
except Exception as e:
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
with open(transfer_list_file_path) as f:
transfer_list = json.load(f)
# 実消化データ + アルトマークデータの転送件数を合算し、受信ファイル件数とする
receive_file_count = len(
transfer_list['jsk_transfer_list']) + len(transfer_list['ult_transfer_list'])
jskult_batch_status_manager = JskultBatchStatusManager(
environment.PROCESS_NAME,
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
'post_process',
environment.MAX_RUN_COUNT,
receive_file_count
)
try:
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_START)
try:
if not jskult_batch_status_manager.can_run_post_process():
# 後続処理の起動条件を満たしていない場合
# 処理ステータスを「処理待」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_WAITING)
# バッチ実行管理テーブルに「retry」で登録
jskult_batch_run_manager.batch_retry()
return
except MaxRunCountReachedException:
logger.info("最大起動回数に到達したため、メルク施設マスタ作成処理を実行します。")
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DOING)
db = Database.get_instance()
db.connect()
logger.info("メルク施設マスタ作成処理開始")
# mst_inst_allをTruncate
self._truncate_mst_inst_all(db)
# com_instから、mst_inst_allへInsert
self._insert_mst_inst_all_from_com_inst(db)
# com_pharmから、mst_inst_allへInsert
self._insert_mst_inst_all_from_com_pharm(db)
# mst_inst_merckから、最新レコードをmst_inst_allへInsert
self._insert_mst_inst_all_from_mst_inst_merck(db)
logger.info("メルク施設マスタ作成処理終了")
jskult_batch_run_manager.batch_success()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DONE)
except Exception as e:
logger.exception(e)
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
raise BatchOperationException(e)
finally:
db.disconnect()
def _truncate_mst_inst_all(self, db: Database):
try:
db.execute("TRUNCATE TABLE src07.mst_inst_all")
except Exception as e:
logger.info("メルク施設マスタの全件削除に失敗")
raise e
logger.info("メルク施設マスタの全件削除に成功")
return
def _insert_mst_inst_all_from_com_inst(self, db: Database):
# オプティマイザのderived_mergeフラグをoffにする
try:
sql = """
SET SESSION optimizer_switch = 'derived_merge=off'
"""
db.execute(sql)
logger.info("オプティマイザのderived_mergeフラグ = Off")
except Exception as e:
logger.info("オプティマイザのderived_mergeフラグの値変更に失敗")
raise e
# com_instから、mst_inst_allへInsert
try:
sql = """
INSERT INTO
src07.mst_inst_all (
inst_cd,
inst_clas_cd,
inst_name_form,
inst_name,
pref_cd,
city_cd,
pref_name,
city_name,
address,
postal_cd,
tel_num,
bed_num,
manage_cd,
manage_name,
delete_date,
inst_div_cd,
inst_div_name,
yobi,
creater,
create_date,
updater,
update_date
)
SELECT
ci.dcf_dsf_inst_cd,
'1',
ci.form_inst_name_kanji,
ci.inst_name_kanji,
ci.prefc_cd,
ci.city_cd,
mp.prefc_name,
LEFT(mc.city_name, 40),
ci.inst_addr,
ci.postal_number,
ci.inst_phone_number,
ci.bed_num,
ci.manage_cd,
LEFT(cm.manage_name, 40),
ci.abolish_ymd,
ci.inst_div_cd,
LEFT(cid.inst_div_name, 40),
null,
ci.create_user,
ci.regist_date,
ci.update_user,
ci.update_date
FROM
src05.com_inst AS ci
LEFT OUTER JOIN src05.mst_prefc AS mp
ON ci.prefc_cd = mp.prefc_cd
LEFT OUTER JOIN src05.mst_city AS mc
ON ci.prefc_cd = mc.prefc_cd
AND ci.city_cd = mc.city_cd
LEFT OUTER JOIN src05.com_manage AS cm
ON ci.manage_cd = cm.manage_cd
LEFT OUTER JOIN src05.com_inst_div AS cid
ON ci.inst_div_cd = cid.inst_div_cd
"""
res = db.execute(sql)
logger.info("COM_施設からメルク施設マスタに登録成功")
except Exception as e:
logger.info("COM_施設からメルク施設マスタに登録失敗")
raise e
return
def _insert_mst_inst_all_from_com_pharm(self, db: Database):
# com_pharmから、mst_inst_allへInsert
try:
sql = """
INSERT INTO
src07.mst_inst_all (
inst_cd,
inst_clas_cd,
inst_name_form,
inst_name,
pref_cd,
city_cd,
pref_name,
city_name,
address,
postal_cd,
tel_num,
bed_num,
manage_cd,
manage_name,
delete_date,
inst_div_cd,
inst_div_name,
yobi,
creater,
create_date,
updater,
update_date
)
SELECT
cp.dcf_dsf_inst_cd,
'2',
cp.form_inst_name_kanji,
cp.inst_name_kanji,
cp.prefc_cd,
cp.city_cd,
mp.prefc_name,
LEFT(mc.city_name, 40),
cp.inst_addr,
cp.postal_number,
cp.inst_phone_number,
null,
null,
null,
cp.abolish_ymd,
cp.inst_div_cd,
LEFT(cid.inst_div_name, 40),
null,
cp.create_user,
cp.regist_date,
cp.update_user,
cp.update_date
FROM
src05.com_pharm AS cp
LEFT OUTER JOIN src05.mst_prefc AS mp
ON cp.prefc_cd = mp.prefc_cd
LEFT OUTER JOIN src05.mst_city AS mc
ON cp.prefc_cd = mc.prefc_cd
AND cp.city_cd = mc.city_cd
LEFT OUTER JOIN src05.com_manage AS cm
ON cp.manage_cd = cm.manage_cd
LEFT OUTER JOIN src05.com_inst_div AS cid
ON cp.inst_div_cd = cid.inst_div_cd
"""
res = db.execute(sql)
logger.info("COM_薬局からメルク施設マスタに登録成功")
except Exception as e:
logger.info("COM_薬局からメルク施設マスタに登録失敗")
raise e
return
def _insert_mst_inst_all_from_mst_inst_merck(self, db:Database):
# mst_inst_merckから、mst_inst_allへInsert
try:
sql = """
INSERT INTO
src07.mst_inst_all (
inst_cd,
inst_clas_cd,
inst_name_form,
inst_name,
pref_cd,
city_cd,
pref_name,
city_name,
address,
postal_cd,
tel_num,
bed_num,
manage_cd,
manage_name,
delete_date,
inst_div_cd,
inst_div_name,
yobi,
creater,
create_date,
updater,
update_date
)
SELECT
mim.inst_cd,
CASE LEFT(mim.inst_cd, 2)
WHEN '59' THEN '2'
ELSE '3'
END,
mim.inst_nm_kj,
mim.inst_nm_kj_s,
mim.jis_pref_cd,
mim.jis_city_cd,
mjp.jis_pref_nm_kj,
LEFT(mjc.jis_city_nm_kj, 40),
CONCAT(mim.addr1_nm_kj, mim.addr2_nm_kj),
mim.postal_no,
mim.tel_no,
null,
null,
null,
CASE mim.eff_end_ym
WHEN '999912' THEN null
ELSE CONCAT(mim.eff_end_ym, DATE_FORMAT(LAST_DAY(STR_TO_DATE(mim.eff_end_ym, '%Y%m')), '%d'))
END,
null,
null,
null,
mim.ins_user,
mim.ins_date,
mim.upd_user,
mim.upd_date
FROM
src07.mst_inst_merck AS mim
INNER JOIN (
SELECT mim.inst_cd, MAX(mim.eff_end_ym) AS max_end_ym
FROM src07.mst_inst_merck AS mim
GROUP BY mim.inst_cd
) AS mim_max_end_ym
ON mim.inst_cd = mim_max_end_ym.inst_cd
AND mim.eff_end_ym = mim_max_end_ym.max_end_ym
LEFT OUTER JOIN src07.mst_jis_pref AS mjp
ON mim.jis_pref_cd = mjp.jis_pref_cd
LEFT OUTER JOIN src07.mst_jis_city AS mjc
ON mim.jis_pref_cd = mjc.jis_pref_cd
AND mim.jis_city_cd = mjc.jis_city_cd;
"""
res = db.execute(sql)
logger.info("メルク独自施設マスタからメルク施設マスタに登録成功")
except Exception as e:
logger.info("メルク独自施設マスタからメルク施設マスタに登録失敗")
raise e
return