feat: レビュー指摘対応

This commit is contained in:
高木要 2023-05-25 15:30:51 +09:00
parent 3b369b8fc4
commit 7da4be471c
3 changed files with 51 additions and 48 deletions

View File

@ -1,5 +1,4 @@
from datetime import datetime, timedelta
from sqlalchemy import CursorResult
from src.batch.batch_functions import logging_sql
from src.batch.common.batch_context import BatchContext
from src.db.database import Database
@ -8,7 +7,7 @@ from src.logging.get_logger import get_logger
from src.time.elapsed_time import ElapsedTime
batch_context = BatchContext.get_instance()
logger = get_logger('DCF施設統合マスタ日次更新バッチ')
logger = get_logger('DCF施設統合マスタ日次更新')
def exec():
@ -16,7 +15,7 @@ def exec():
try:
db.connect()
db.begin()
logger.debug('DCF施設統合マスタ日次更新バッチ処理開始')
logger.debug('DCF施設統合マスタ日次更新処理開始')
# DCF施設統合マスタ移行先コードのセット(無効フラグが『0(有効)』)
enabled_dst_inst_merge_records = _set_enabled_dct_inst_merge(db)
# DCF施設統合マスタ移行先コードのセット(無効フラグが『1(無効)』)
@ -26,7 +25,7 @@ def exec():
_add_emp_chg_inst(db, enabled_dst_inst_merge_records)
_add_ult_ident_presc(db, enabled_dst_inst_merge_records)
db.commit()
logger.info('DCF施設統合マスタ日次更新バッチ処理終了')
logger.debug('DCF施設統合マスタ日次更新処理終了')
except Exception as e:
db.rollback()
raise BatchOperationException(e)
@ -59,10 +58,11 @@ def _set_disabled_dct_inst_merge(db: Database):
def _add_ult_ident_presc(db: Database, enabled_dst_inst_merge_records: list[dict]):
# 納入先処方元マスタの追加
logger.info('納入先処方元マスタの登録 開始')
for data_inst_cnt, row in enumerate(enabled_dst_inst_merge_records, start=1):
tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month'])
for data_inst_cnt, enabled_merge_record in enumerate(enabled_dst_inst_merge_records, start=1):
tekiyo_month_first_day = _get_first_day_of_month(enabled_merge_record['tekiyo_month'])
ult_ident_presc_records = _select_ult_ident_presc(db, row['dcf_inst_cd'], row['dup_opp_cd'])
ult_ident_presc_records = _select_ult_ident_presc(db, enabled_merge_record['dcf_inst_cd'],
enabled_merge_record['dup_opp_cd'])
for data_cnt, ult_ident_presc_row in enumerate(ult_ident_presc_records, start=1):
logger.info(f'{data_inst_cnt}件目の移行施設の{data_cnt}レコード目処理 開始')
# 処方元コード=重複時相手先コードが発生した場合
@ -80,13 +80,17 @@ def _add_ult_ident_presc(db: Database, enabled_dst_inst_merge_records: list[dict
is_exists_duplicate_key = True
else:
logger.info('納入先処方元マスタの重複予定データなし')
_insert_ult_ident_presc(db, set_start_date, row['dup_opp_cd'], ult_ident_presc_row)
_insert_ult_ident_presc(db, set_start_date, enabled_merge_record['dup_opp_cd'], ult_ident_presc_row)
# 適用終了日 < 適用開始日の場合
if _str_to_date_time(ult_ident_presc_row['end_date']) < start_date:
# 対象レコードを物理削除する
_delete_ult_ident_presc(db, ult_ident_presc_row['start_date'], ult_ident_presc_row,
'開始月>適用開始日のため物理削除')
continue
# 重複予定データが存在しない、且つ、適用終了日 ≧ 適用開始日の場合
if not is_exists_duplicate_key:
# 適用終了日を、DCF施設統合マスタの適用月度の前月末日で更新
last_end_date = tekiyo_month_first_day - timedelta(days=1)
_update_ult_ident_presc_end_date(db, _date_time_to_str(last_end_date), ult_ident_presc_row)
if start_date > last_end_date:
@ -99,9 +103,9 @@ def _add_ult_ident_presc(db: Database, enabled_dst_inst_merge_records: list[dict
def _add_emp_chg_inst(db: Database, enabled_dst_inst_merge_records: list[dict]):
# 従業員担当施設マスタの登録
logger.info('従業員担当施設マスタの登録 開始')
for row in enabled_dst_inst_merge_records:
tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month'])
emp_chg_inst_records = _select_emp_chg_inst(db, row['dcf_inst_cd'], row['dup_opp_cd'])
for enabled_merge_record in enabled_dst_inst_merge_records:
tekiyo_month_first_day = _get_first_day_of_month(enabled_merge_record['tekiyo_month'])
emp_chg_inst_records = _select_emp_chg_inst(db, enabled_merge_record['dcf_inst_cd'], enabled_merge_record['dup_opp_cd'])
for emp_chg_inst_row in emp_chg_inst_records:
# 重複時相手先コードが存在したかのチェック
if emp_chg_inst_row['opp_count'] > 0:
@ -111,22 +115,24 @@ def _add_emp_chg_inst(db: Database, enabled_dst_inst_merge_records: list[dict]):
set_start_date = start_date \
if start_date > tekiyo_month_first_day else tekiyo_month_first_day
_insert_emp_chg_inst(db, row['dup_opp_cd'], _date_time_to_str(set_start_date),
_insert_emp_chg_inst(db, enabled_merge_record['dup_opp_cd'], _date_time_to_str(set_start_date),
emp_chg_inst_row)
# 適用開始日 < DCF施設統合マスタの適用月度の1日の場合
if start_date < tekiyo_month_first_day:
# DCF施設統合マスタの適用月度の前月末日で、適用終了日を更新する
last_end_date = tekiyo_month_first_day - timedelta(days=1)
_update_emp_chg_inst_end_date(db, row['dcf_inst_cd'], _date_time_to_str(last_end_date),
_update_emp_chg_inst_end_date(db, enabled_merge_record['dcf_inst_cd'], _date_time_to_str(last_end_date),
emp_chg_inst_row)
if start_date <= last_end_date:
continue
_update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_chg_inst_row['ta_cd'],
emp_chg_inst_row['start_date'])
continue
# 適用開始日 ≧ DCF施設統合マスタの適用月度の1日の場合、N(論理削除レコード)に設定する
_update_emp_chg_inst_disabled(db, enabled_merge_record['dcf_inst_cd'], emp_chg_inst_row['ta_cd'],
emp_chg_inst_row['start_date'])
logger.info('従業員担当施設マスタの登録 終了')
def _delete_ult_ident_presc(db: Database, start_date: str, ult_ident_presc_row: CursorResult,
def _delete_ult_ident_presc(db: Database, start_date: str, ult_ident_presc_row: dict,
log_message: str):
# ult_ident_prescのDelete
try:
@ -154,8 +160,8 @@ def _delete_ult_ident_presc(db: Database, start_date: str, ult_ident_presc_row:
raise e
def _update_emp_chg_inst_enabled_flg(db: Database, dcf_inst_cd: str, ta_cd: str, start_date: str):
# emp_chg_instを更新
def _update_emp_chg_inst_disabled(db: Database, dcf_inst_cd: str, ta_cd: str, start_date: str):
# emp_chg_instをUPDATE
try:
elapsed_time = ElapsedTime()
sql = """
@ -180,8 +186,8 @@ def _update_emp_chg_inst_enabled_flg(db: Database, dcf_inst_cd: str, ta_cd: str,
def _update_emp_chg_inst_end_date(db: Database, dcf_inst_cd: str, last_end_date: str,
emp_chg_inst_row: CursorResult):
# emp_chg_instを更新
emp_chg_inst_row: dict):
# emp_chg_instをUPDATE
try:
elapsed_time = ElapsedTime()
sql = """
@ -214,8 +220,8 @@ def _update_emp_chg_inst_end_date(db: Database, dcf_inst_cd: str, last_end_date:
def _insert_emp_chg_inst(db: Database, dup_opp_cd: str, set_start_date: str,
emp_chg_inst_row: CursorResult):
# emp_chg_instにInsert
emp_chg_inst_row: dict):
# emp_chg_instにINSERT
try:
elapsed_time = ElapsedTime()
sql = """
@ -268,7 +274,8 @@ def _insert_emp_chg_inst(db: Database, dup_opp_cd: str, set_start_date: str,
def _select_dct_inst_merge(db: Database, muko_flg: int) -> list[dict]:
# dcf_inst_mergeからSelect
# dcf_inst_mergeからSELECT
# muko_flgの値によって、SQLのWHERE条件を変更
try:
sql = """
SELECT
@ -301,9 +308,11 @@ def _select_dct_inst_merge(db: Database, muko_flg: int) -> list[dict]:
def _update_dcf_inst_merge(db: Database, muko_flg: int) -> int:
#
try:
# dcf_inst_mergeをUPDATE
# muko_flgの値によって、SQLのWHERE条件とSET句を変更
try:
elapsed_time = ElapsedTime()
log_message = '更新しました' if muko_flg == 0 else '無効データに戻しました'
sql = """
UPDATE
src05.dcf_inst_merge AS updim
@ -318,9 +327,9 @@ def _update_dcf_inst_merge(db: Database, muko_flg: int) -> int:
src05.dcf_inst_merge AS dim
INNER JOIN
src05.hdke_tbl AS ht
ON dim.tekiyo_month=DATE_FORMAT(ht.syor_date, '%Y%m')
ON dim.tekiyo_month = DATE_FORMAT(ht.syor_date, '%Y%m')
WHERE
dim.muko_flg= :muko_flg
dim.muko_flg = :muko_flg
AND dim.enabled_flg='Y'
AND dim.dcf_inst_cd_new IS {not_null}NULL
) AS bf_dim
@ -343,16 +352,16 @@ def _update_dcf_inst_merge(db: Database, muko_flg: int) -> int:
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'DCF施設統合マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
logger.info(f'DCF施設統合マスタの有効データを{log_message} 成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('DCF施設統合マスタの更新に失敗')
logger.debug(f'DCF施設統合マスタの{log_message} 失敗')
raise e
return res.rowcount
def _update_dcf_inst_cd_new(db: Database, dcf_inst_cd_new_after: str, dcf_inst_cd_new_before: str):
# dcf_inst_mergeをUpdate
# dcf_inst_mergeをUPDATE
try:
elapsed_time = ElapsedTime()
sql = """
@ -379,8 +388,8 @@ def _update_dcf_inst_cd_new(db: Database, dcf_inst_cd_new_after: str, dcf_inst_c
raise e
def _update_ult_ident_presc_end_date(db: Database, last_end_date: str, ult_ident_presc_row: CursorResult):
# ult_ident_presc_endをUpdate
def _update_ult_ident_presc_end_date(db: Database, last_end_date: str, ult_ident_presc_row: dict):
# ult_ident_presc_endをUPDATE
try:
elapsed_time = ElapsedTime()
sql = """
@ -411,8 +420,8 @@ def _update_ult_ident_presc_end_date(db: Database, last_end_date: str, ult_ident
def _insert_ult_ident_presc(db: Database, set_Start_Date: str, dup_opp_cd: str,
ult_ident_presc_row: CursorResult):
# ult_ident_prescにInsert
ult_ident_presc_row: dict):
# ult_ident_prescにINSERT
try:
elapsed_time = ElapsedTime()
sql = """
@ -459,7 +468,7 @@ def _insert_ult_ident_presc(db: Database, set_Start_Date: str, dup_opp_cd: str,
def _select_emp_chg_inst(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]:
# emp_chg_instから取得
# emp_chg_instからSELECT
try:
sql = """
SELECT
@ -497,7 +506,7 @@ def _select_emp_chg_inst(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> lis
def _select_ult_ident_presc(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]:
# ult_ident_prescから取得
# ult_ident_prescからSELECT
try:
sql = """
SELECT
@ -531,7 +540,7 @@ def _select_ult_ident_presc(db: Database, dcf_inst_cd: str, dup_opp_cd: str) ->
def _count_duplicate_ult_ident_presc(db: Database, set_start_date: str,
ult_ident_presc_row: CursorResult) -> int:
ult_ident_presc_row: dict) -> int:
# ult_ident_prescの重複時相手先コードの件数取得
try:
sql = """

View File

@ -1,5 +1,6 @@
from src.batch.common.batch_context import BatchContext
from src.batch.laundering import create_inst_merge_for_laundering, emp_chg_inst_laundering, ult_ident_presc_laundering
from src.batch.dcf_inst_merge import integrate_dcf_inst_merge
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
@ -16,6 +17,8 @@ def exec():
return
# 洗替用マスタ作成
create_inst_merge_for_laundering.exec()
# DCF施設統合マスタ日次更新
integrate_dcf_inst_merge.exec()
# 施設担当者洗替
emp_chg_inst_laundering.exec()
# 納入先処方元マスタ洗替

View File

@ -7,7 +7,6 @@ from src.batch.batch_functions import (
update_batch_processing_flag_in_processing)
from src.batch.common.batch_context import BatchContext
from src.batch.common.calendar_file import CalendarFile
from src.batch.dcf_inst_merge import Integrate_dcf_inst_merge
from src.batch.laundering import create_dcf_inst_merge, mst_inst_laundering
from src.batch.ultmarc import ultmarc_process
from src.error.exceptions import BatchOperationException
@ -115,14 +114,6 @@ def exec():
logger.exception(f'DCF施設統合マスタ作成エラー異常終了{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
logger.info('DCF施設統合マスタ日次更新バッチ起動')
Integrate_dcf_inst_merge.exec()
logger.info('DCF施設統合マスタ日次更新バッチ終了')
except BatchOperationException as e:
logger.exception(f'DCF施設統合マスタ日次更新バッチエラー異常終了{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# バッチ処理完了とし、処理日、バッチ処置中フラグ、dump取得状態区分を更新
logger.info('業務日付更新・バッチステータスリフレッシュ:起動')
try: