Merge pull request #491 feature-NEWDWH2021-1874 into develop

This commit is contained in:
下田雅人 2025-06-05 20:03:36 +09:00
commit 53e0c0148d
5 changed files with 116 additions and 100 deletions

View File

@ -109,5 +109,5 @@ class VjskSendBucket(S3Bucket):
# バックアップバケットにコピー
vjsk_backup_bucket = VjskBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
backup_key = f'{vjsk_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key)

View File

@ -15,6 +15,6 @@ RUN \
pip uninstall -y pipenv virtualenv-clone virtualenv
COPY src ./src
COPY entrypoint.py entrypoint.py
COPY entrypoint.py entrypoint.py
CMD ["python", "entrypoint.py"]

View File

@ -85,7 +85,7 @@ class JskUltBackupBucket(S3Bucket):
class JskBackupBucket(JskUltBackupBucket):
_folder = environment.JSKULT_BACKUP_BUCKET
_folder = environment.JSK_BACKUP_FOLDER
class JskTransferListBucket(JskUltBackupBucket):
@ -118,8 +118,8 @@ class JskSendBucket(S3Bucket):
def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
jskult_backup_bucket = JskUltBackupBucket()
jskult_backup_bucket = JskBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key,
jskult_backup_bucket._bucket_name, backup_key)

View File

@ -17,7 +17,7 @@ from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.system_var import constants
logger = get_logger('DCF削除新規マスタ作成')
logger = get_logger('DCF削除新規マスタ作成/データ出力')
class DcfInstMergeIO(JskultBatchEntrypoint):
@ -26,9 +26,6 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
# 環境変数をimport
self.environment = DCFInstMergeEnvironment()
def execute(self):
logger.info("DCF削除新規マスタ作成処理を開始します。")
# 必須の環境変数が設定されていない場合、エラーにする
try:
self.environment.validate()
@ -36,9 +33,14 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
logger.exception(e)
return
def execute(self):
logger.info("DCF削除新規マスタ作成/データ出力処理を開始します。")
jskult_hdke_tbl_manager = JskultHdkeTblManager()
jskult_batch_run_manager = JskultBatchRunManager(
self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME,
self.environment.BATCH_EXECUTION_ID)
if not jskult_hdke_tbl_manager.can_run_process():
logger.error(
'日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
@ -58,6 +60,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
return
with open(transfer_list_file_path) as f:
transfer_list = json.load(f)
@ -73,7 +76,6 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
receive_file_count
)
try:
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_START)
try:
@ -85,7 +87,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
# バッチ実行管理テーブルに「retry」で登録
jskult_batch_run_manager.batch_retry()
logger.info("起動条件を満たしていないため、DCF削除新規マスタ作成処理を終了します。")
return
except MaxRunCountReachedException:
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
@ -93,27 +95,39 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DOING)
# DCF削除新規マスタ作成、出力用にDB接続を開始。
# トランザクションも開始。
db = Database.get_instance()
db.connect()
db.to_jst()
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskult_batch_status_manager.is_done_ultmarc_import():
logger.info("アルトマークデータが取り込まれているため、DCF削除新規マスタ作成処理を開始します。")
db.begin()
# COM_施設からDCF削除新規マスタに登録
(is_add_dcf_inst_merge,
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self)
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(db)
if is_add_dcf_inst_merge:
logger.info('[NOTICE]DCF施設削除新規マスタが追加されました。')
self._output_add_dcf_inst_merge_log(
duplication_inst_records)
logger.info("DCF削除新規マスタ作成処理が正常終了しました。")
db.commit()
# CSV出力
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all()
# DCF施設削除新規マスタ出力
logger.info('DCF施設削除新規マスタ出力を開始します。')
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all(db)
file_path = self._make_csv_data(
self.environment.DCF_INST_MERGE_SEND_FILE_NAME,
dcf_inst_merge_all_records)
# CSVをS3にアップロード
self._upload_dcf_inst_merge_csv_file(
file_path, process_date, self.environment.DCF_INST_MERGE_SEND_FILE_NAME)
self.environment.DCF_INST_MERGE_SEND_FILE_NAME, process_date, file_path)
logger.info("DCF施設削除新規マスタ出力が正常終了しました。")
# 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
logger.info("DCF削除新規マスタ作成処理を正常終了します。")
logger.info("DCF削除新規マスタ作成/データ出力処理を終了します。")
jskult_batch_run_manager.batch_success()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DONE)
@ -121,146 +135,136 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
return
except Exception as e:
db.rollback()
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}')
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
sql = """\
SELECT
*
FROM
src07.dcf_inst_merge
"""
dcf_inst_merge_all_records = self._db.execute_select(sql)
return dcf_inst_merge_all_records
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
db.disconnect()
# com_instからdcf_inst_mergeにinsert
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
def _insert_dcf_inst_merge_from_com_inst(self, db: Database) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
self._db.begin()
self._db.to_jst()
sql = """\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
ci.dcf_dsf_inst_cd AS dcf_dsf_inst_cd,
ci.form_inst_name_kanji AS form_inst_name_kanji,
ci.dup_opp_cd AS dup_opp_cd,
(
SELECT
dupci.form_inst_name_kanji
FROM
src05.com_inst AS dupci
WHERE
dupci.dcf_dsf_inst_cd = ci.dup_opp_cd
) AS dup_inst_name_kanji,
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), '%Y%m') AS start_month
FROM
src05.COM_INST AS ci
src05.com_inst AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
(ci.dup_opp_cd IS NOT NULL OR CHAR_LENGTH(ci.dup_opp_cd) > 0)
AND
ci.DELETE_SCHE_REASON_CD = 'D'
ci.delete_sche_reason_cd = 'D'
AND
ci.DELETE_DATA IS NULL
ci.abolish_ymd IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
ci.sys_update_date BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
1
FROM
src07.DCF_INST_MERGE AS dim
src07.dcf_inst_merge AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
dim.dcf_inst_cd = ci.dcf_dsf_inst_cd
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
AND(
EXISTS(
SELECT
mia.INST_CD
1
FROM
src07.MST_INST_ASSN as mia
src07.mst_inst_assn as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
mia.inst_cd = ci.dcf_dsf_inst_cd
)
OR EXISTS(
SELECT
1
FROM
src07.atc_pharm AS ap
WHERE
ap.prsb_inst_cd = ci.dcf_dsf_inst_cd
)
OR ci.DCF_DSF_INST_CD EXISTS(
OR EXISTS(
SELECT
ap.PRSB_INST_CD
1
FROM
src07.ATC_PHARM AS ap
view07.vw_tebra_sales_refreshed AS vtsr
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
trd.INST_CD
FROM
src07.TRN_RESULT_DATA AS trd
WHERE
trd.INST_CD = ci.DCF_DSF_INST_CD
)
vtsr.cnvs_inst_cd = ci.dcf_dsf_inst_cd
)
;
);
"""
duplication_inst_records = self._db.execute_select(sql)
duplication_inst_records = db.execute_select(sql)
if len(duplication_inst_records) == 0:
logger.info('施設統合対象データはありません')
return (False, None)
# DCF削除新規マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
dcf_inst_cd_arr = f"dcf_inst_cd{clauses_no}"
dup_opp_cd_arr = f"dup_opp_cd{clauses_no}"
start_month_arr = f'start_month{clauses_no}'
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
:{start_month_arr},
NULL,
NULL,
NULL,
"Y",
batchuser,
CURRENT_USER(),
SYSDATE(),
batchuser,
CURRENT_USER(),
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
params[dcf_inst_cd_arr] = row['dcf_dsf_inst_cd']
params[dup_opp_cd_arr] = row['dup_opp_cd']
params[start_month_arr] = row['start_month']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
dcf_inst_cd,
dup_opp_cd,
start_month,
invalid_flg,
remarks,
dcf_inst_cd_new,
enabled_flg,
creater,
create_date,
updater,
update_date
)
VALUES
{','.join(values_clauses)}
"""
self._db.execute(insert_sql, params)
db.execute(insert_sql, params)
return (True, duplication_inst_records)
except Exception as e:
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date']
def _output_add_dcf_inst_merge_log(self, duplication_inst_records: list[dict]):
start_month = duplication_inst_records[0]['start_month']
set_year_month = '{set_year}{set_month}'.format(
set_year=sys_update_date[0:4],
set_month=sys_update_date[-2:]
set_year=start_month[0:4],
set_month=start_month[-2:]
)
add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}'
add_dct_inst_merge_list = []
@ -270,7 +274,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力
logger.info(
f"""DCF施設統合マスタが追加されました。
f"""DCF施設削除新規マスタが追加されました。
**********************************************************
適用月度 {set_year_month}
**********************************************************
@ -281,7 +285,20 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
return
def _make_csv_data(csv_file_name: str, record_inst: list):
def _select_dcf_inst_merge_all(self, db: Database) -> tuple[bool, list[dict]]:
try:
sql = """\
SELECT
*
FROM
src07.dcf_inst_merge
"""
dcf_inst_merge_all_records = db.execute_select(sql)
return dcf_inst_merge_all_records
except Exception as e:
raise BatchOperationException(e)
def _make_csv_data(self, csv_file_name: str, record_inst: list):
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH',

View File

@ -6,7 +6,6 @@ JSK_BACKUP_FOLDER=jsk/send
TRANSFER_RESULT_FOLDER=transfer_result
TRANSFER_RESULT_FILE_NAME=transfer_result.json
DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv
JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config
DB_CONNECTION_MAX_RETRY_ATTEMPT=1
DB_CONNECTION_RETRY_INTERVAL_INIT=1
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1