From 94016f513d9eb9004f9883dbf3db52fac76a8a81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E6=9C=A8=E8=A6=81?= Date: Thu, 18 May 2023 18:04:16 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20DCF=E6=96=BD=E8=A8=AD=E7=B5=B1=E5=90=88?= =?UTF-8?q?=E3=83=9E=E3=82=B9=E3=82=BF=E6=97=A5=E6=AC=A1=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E3=83=90=E3=83=83=E3=83=81(=E9=80=94=E4=B8=AD=E3=81=BE?= =?UTF-8?q?=E3=81=A7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Integrate_dcf_inst_merge.py | 547 ++++++++++++++++++ 1 file changed, 547 insertions(+) create mode 100644 ecs/jskult-batch-daily/src/batch/dcf_inst_merge/Integrate_dcf_inst_merge.py diff --git a/ecs/jskult-batch-daily/src/batch/dcf_inst_merge/Integrate_dcf_inst_merge.py b/ecs/jskult-batch-daily/src/batch/dcf_inst_merge/Integrate_dcf_inst_merge.py new file mode 100644 index 00000000..fa6ce77a --- /dev/null +++ b/ecs/jskult-batch-daily/src/batch/dcf_inst_merge/Integrate_dcf_inst_merge.py @@ -0,0 +1,547 @@ +from datetime import datetime, timedelta +from sqlalchemy import CursorResult +from src.batch.batch_functions import logging_sql +from src.batch.common.batch_context import BatchContext +from src.db.database import Database +from src.error.exceptions import BatchOperationException +from src.logging.get_logger import get_logger +from src.time.elapsed_time import ElapsedTime + +logger = get_logger('DCF施設統合マスタ日次更新バッチ') +batch_context = BatchContext.get_instance() + + +def exec(): + db = Database.get_instance() + try: + db.connect() + db.begin() + # + enabled_dst_inst_merge_records = _laundering_enabled_dct_inst_merge(db) + # + _laundering_disabled_dct_inst_merge(db) + # + if len(enabled_dst_inst_merge_records) > 0: + logger.info('') + _add_emp_chg_inst(db, enabled_dst_inst_merge_records) + logger.info('') + _add_ult_ident_presc(db, enabled_dst_inst_merge_records) + db.commit() + # + logger.info('') + except Exception as e: + db.rollback() + raise BatchOperationException(e) + finally: + db.disconnect() + + +def _laundering_enabled_dct_inst_merge(db: Database) -> list[dict]: + # データ取得(無効フラグが『0(有効)』) + valid_dst_inst_merge_records = _select_dct_inst_merge(db, 0, True) + # 移行先DCF施設コードの更新(無効フラグが『0(有効)』) + _update_dcf_inst_merge(db, 0) + # DCF施設統合マスタの過去分の洗い替え + if len(valid_dst_inst_merge_records) == 0: + return + for row in valid_dst_inst_merge_records: + _update_dcf_inst_cd_new(db, row['dup_opp_cd'], row['dcf_inst_cd']) + + return valid_dst_inst_merge_records + + +def _laundering_disabled_dct_inst_merge(db: Database): + # データ取得(無効フラグが『1(無効)』) + disabled_dst_inst_merge_records = _select_dct_inst_merge(db, 1, False) + # 移行先DCF施設コードの更新(無効フラグが『1(無効)』) + _update_dcf_inst_merge(db, 1) + # DCF施設統合マスタの過去分の洗い替え + if len(disabled_dst_inst_merge_records) == 0: + return + for row in disabled_dst_inst_merge_records: + _update_dcf_inst_cd_new(db, row['dcf_inst_cd'], row['dup_opp_cd']) + + +def _add_ult_ident_presc(db: Database, valid_dst_inst_merge_records: list[dict]): + # + for data_inst_cnt, row in enumerate(valid_dst_inst_merge_records, start=1): + tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month']) + + ult_ident_presc_records = _select_ult_ident_presc(db, row['dcf_inst_cd'], row['dup_opp_cd']) + for data_cnt, ult_row in enumerate(ult_ident_presc_records, start=1): + logger.info(f'{data_inst_cnt}件目の移行施設の{data_cnt}レコード目処理 開始') + # 処方元コード=重複時相手先コードが発生した場合 + if ult_row['opp_count'] > 0: + break + + start_date = _str_to_date_time(ult_row['start_date']) + set_start_date = start_date \ + if start_date > tekiyo_month_first_day else tekiyo_month_first_day + set_start_date = _date_time_to_str(set_start_date) + is_delete_duplicate_key = False + if _count_duplicate_ult_ident_presc(db, set_start_date, ult_row): + _delete_ult_ident_presc(db, set_start_date, ult_row) + is_delete_duplicate_key = True + else: + logger.info('納入先処方元マスタの重複予定データなし') + _insert_ult_ident_presc(db, set_start_date, row['dup_opp_cd'], ult_row) + + if _str_to_date_time(ult_row['end_date']) < start_date: + _delete_ult_ident_presc(db, ult_row['start_date'], ult_row) + continue + if not is_delete_duplicate_key: + last_end_date = tekiyo_month_first_day - timedelta(days=1) + _update_ult_ident_presc_end_date(db, _date_time_to_str(last_end_date), ult_row) + if start_date > last_end_date: + _delete_ult_ident_presc(db, ult_row['start_date'], ult_row) + + +def _delete_ult_ident_presc(db: Database, start_date: str, ult_row: CursorResult): + # + try: + elapsed_time = ElapsedTime() + sql = """ + DELETE FROM + src05.ult_ident_presc + WHERE + ta_cd = :ta_cd + AND ult_ident_cd = :ult_ident_cd + AND ratio = :ratio + AND start_date = :set_start_date + """ + params = { + 'ta_cd': ult_row['ta_cd'], + 'ult_ident_cd': ult_row['ult_ident_cd'], + 'ratio': ult_row['ratio'], + 'start_date': start_date + } + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info('') + except Exception as e: + logger.debug('') + raise e + + +def _add_emp_chg_inst(db: Database, valid_dst_inst_merge_records: list[dict]): + # + for row in valid_dst_inst_merge_records: + tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month']) + emp_chg_inst_records = _select_emp_chg_inst(db, row['dcf_inst_cd'], row['dup_opp_cd']) + for emp_row in emp_chg_inst_records: + # 重複時相手先コードが存在したかのチェック + if emp_row['opp_count'] > 0: + break + + start_date = _str_to_date_time(emp_row['start_date']) + set_start_date = start_date \ + if start_date > tekiyo_month_first_day else tekiyo_month_first_day + + _insert_emp_chg_inst(db, row['dup_opp_cd'], _date_time_to_str(set_start_date), emp_row) + + if start_date < tekiyo_month_first_day: + last_end_date = tekiyo_month_first_day - timedelta(days=1) + _update_emp_chg_inst_end_date(db, row['dcf_inst_cd'], _date_time_to_str(last_end_date), emp_row) + if start_date <= last_end_date: + continue + _update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], emp_row['start_date']) + + # if start_date >= tekiyo_month_first_day: + # _update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], start_date) + # continue + # last_end_date = tekiyo_month_first_day - timedelta(days=1) + # _update_emp_chg_inst_end_date(db, row['dcf_inst_cd'], last_end_date, emp_row) + # if start_date > last_end_date: + # _update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], start_date) + + +def _update_emp_chg_inst_enabled_flg(db: Database, dcf_inst_cd: str, ta_cd: str, start_date: str): + # + try: + elapsed_time = ElapsedTime() + sql = """ + UPDATE + src05.emp_chg_inst + SET + enabled_flg = 'N', + updater = CURRENT_USER(), + update_date = SYSDATE() + WHERE + inst_cd = :dcf_inst_cd + AND ta_cd = :ta_cd + AND start_date = :start_date + """ + params = {'dcf_inst_cd': dcf_inst_cd, 'ta_cd': ta_cd, 'start_date': start_date} + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'従業員担当施設マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _update_emp_chg_inst_end_date(db: Database, dcf_inst_cd: str, last_end_date: str, emp_row: CursorResult): + # + try: + elapsed_time = ElapsedTime() + sql = """ + UPDATE + src05.emp_chg_inst + SET end_date = :end_date, + updater = CURRENT_USER(), + update_date= SYSDATE() + WHERE + inst_cd = :dcf_inst_cd + AND ta_cd = :ta_cd + AND emp_cd = :emp_cd + AND bu_cd = :bu_cd + AND start_date = :start_date + """ + params = { + 'end_date': last_end_date, + 'dcf_inst_cd': dcf_inst_cd, + 'ta_cd': emp_row['ta_cd'], + 'emp_cd': emp_row['emp_cd'], + 'bu_cd': emp_row['bu_cd'], + 'start_date': emp_row['start_date'] + } + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'従業員担当施設マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _insert_emp_chg_inst(db: Database, dup_opp_cd: str, set_start_date: str, emp_row: CursorResult): + # + try: + elapsed_time = ElapsedTime() + sql = """ + INSERT INTO + src05.emp_chg_inst( + inst_cd, + ta_cd, + emp_cd, + bu_cd, + start_date, + end_date, + main_chg_flg, + enabled_flg, + creater, + create_date, + updater, + update_date + ) + VALUES( + :dup_opp_cd, + :ta_cd, + :emp_cd, + :bu_cd, + :start_date, + :end_date, + :main_chg_flg, + 'Y', + CURRENT_USER(), + SYSDATE(), + CURRENT_USER(), + SYSDATE() + ) + """ + params = { + 'dup_opp_cd': dup_opp_cd, + 'ta_cd': emp_row['ta_cd'], + 'emp_cd': emp_row['emp_cd'], + 'bu_cd': emp_row['bu_cd'], + 'start_date': set_start_date, + 'end_date': emp_row['end_date'], + 'main_chg_flg': emp_row['main_chg_flg'] if emp_row['main_chg_flg'] is None else None + } + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'従業員担当施設マスタの追加に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _select_dct_inst_merge(db: Database, muko_flg: int, is_null_dcf_inst_cd_new: bool): + # + try: + sql = """ + SELECT + dim.dcf_inst_cd, + dim.dup_opp_cd, + dim.tekiyo_month + FROM + src05.dcf_inst_merge AS dim + INNER JOIN + src05.hdke_tbl AS ht + ON dim.tekiyo_month = DATE_FORMAT(ht.syor_date, '%Y%m') + WHERE + dim.muko_flg =: muko_flg + AND dim.enabled_flg = 'Y' + AND dim.dcf_inst_cd_new <= >: is_null_dcf_inst_cd_new + """ + params = { + 'muko_flg': muko_flg, + 'is_null_dcf_inst_cd_new': None + } + dst_inst_merge_records = db.execute_select(sql, params) + logging_sql(logger, sql) + logger.info('') + except Exception as e: + logger.debug('') + raise e + + return dst_inst_merge_records + + +def _update_dcf_inst_merge(db: Database, muko_flg: int): + # + try: + elapsed_time = ElapsedTime() + sql = """ + UPDATE + src05.dcf_inst_merge AS updim + INNER JOIN( + SELECT + dim.dcf_inst_cd AS base_dcf_inst_cd, + dim.dup_opp_cd AS base_dup_opp_cd, + dim.tekiyo_month AS base_tekiyo_month, + dim.muko_flg AS base_muko_flg, + dim.enabled_flg AS base_enabled_flg + FROM + src05.dcf_inst_merge AS dim + INNER JOIN + src05.hdke_tbl AS ht + ON dim.tekiyo_month=DATE_FORMAT(ht.syor_date, '%Y%m') + WHERE + dim.muko_flg= :muko_flg + AND dim.enabled_flg='Y' + AND dim.dcf_inst_cd_new IS {$dcfInstCdNew}NULL + ) AS bf_dim + SET + updim.dcf_inst_cd_new = {column}, + updim.updater = CURRENT_USER(), + updim.update_date = SYSDATE() + WHERE + updim.dcf_inst_cd = base_dcf_inst_cd + AND updim.dup_opp_cd = base_dup_opp_cd + AND updim.tekiyo_month = base_tekiyo_month + AND updim.muko_flg =base_muko_flg + AND updim.enabled_flg =base_enabled_flg + """ + params = { + 'muko_flg': muko_flg + } + res = db.execute(sql.format( + column='base_dup_opp_cd' if muko_flg == 1 else 'NULL' + ), params) + logging_sql(logger, sql) + logger.info(f'DCF施設統合マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _update_dcf_inst_cd_new(db: Database, dcf_inst_cd_new_after: str, dcf_inst_cd_new_before: str): + # + try: + elapsed_time = ElapsedTime() + sql = """ + UPDATE + src05.dcf_inst_merge + SET + dcf_inst_cd_new = :dcf_inst_cd_new_after, + updater = CURRENT_USER(), + update_date = SYSDATE() + WHERE + dcf_inst_cd_new = :dcf_inst_cd_new_before + AND enabled_flg = 'Y' + AND muko_flg = 0 + """ + params = {'dcf_inst_cd_new_after': dcf_inst_cd_new_after, 'dcf_inst_cd_new_before': dcf_inst_cd_new_before} + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'移行先DCF施設コードの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _update_ult_ident_presc_end_date(db: Database, last_end_date: str, ult_ident_presc_record: CursorResult): + # + try: + elapsed_time = ElapsedTime() + sql = """ + UPDATE + src05.ult_ident_presc + SET end_date = :end_date, + updater = CURRENT_USER(), + update_date= SYSDATE() + WHERE + ta_cd = :ta_cd + AND ult_ident_cd = :ult_ident_cd + AND ratio = :ratio + AND start_date = :start_date + """ + params = { + 'end_date': last_end_date, + 'ta_cd': ult_ident_presc_record['ta_cd'], + 'ult_ident_cd': ult_ident_presc_record['ult_ident_cd'], + 'ratio': ult_ident_presc_record['ratio'], + 'start_date': ult_ident_presc_record['start_date'] + } + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'終了日 > 開始月のため適用終了日を更新, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('') + raise e + + +def _insert_ult_ident_presc(db: Database, set_Start_Date: str, dup_opp_cd: str, ult_row: CursorResult): + # + try: + elapsed_time = ElapsedTime() + sql = """ + INSERT INTO + src05.ult_ident_presc( + ta_cd, + ult_ident_cd, + ratio, + start_date, + presc_cd, + end_date, + creater, + create_date, + update_date, + updater + ) + VALUES( + :ta_cd, + :ult_ident_cd, + :ratio, + :start_date, + :presc_cd, + :end_date, + CURRENT_USER(), + SYSDATE(), + SYSDATE(), + CURRENT_USER() + ) + """ + params = { + 'ta_cd': ult_row['ta_cd'], + 'ult_ident_cd': ult_row['ult_ident_cd'], + 'ratio': ult_row['ratio'], + 'start_date': set_Start_Date, + 'presc_cd': dup_opp_cd, + 'end_date': ult_row['end_date'] + } + res = db.execute(sql, params) + logging_sql(logger, sql) + logger.info(f'納入先処方元マスタに追加に成功, {res.rowcount} 行更新 ({elapsed_time.of})') + except Exception as e: + logger.debug('納入先処方元マスタに追加に失敗') + raise e + + +def _select_emp_chg_inst(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]: + # + try: + sql = """ + SELECT + eci.inst_cd, + eci.ta_cd, + eci.emp_cd, + eci.bu_cd, + eci.start_date, + eci.end_date, + eci.main_chg_flg, + eci.enabled_flg, + (SELECT COUNT(eciopp.inst_cd) FROM src05.emp_chg_inst AS eciopp WHERE eciopp.inst_cd = :dup_opp_cd) AS opp_count + FROM + src05.emp_chg_inst AS eci + WHERE + eci.inst_cd = :dcf_inst_cd + AND eci.enabled_flg = 'Y' + AND (SELECT ht.syor_date FROM src05.hdke_tbl AS ht) < eci.end_date + """ + params = {'dcf_inst_cd': dcf_inst_cd, 'dup_opp_cd': dup_opp_cd} + emp_chg_inst_records = db.execute_select(sql, params) + logging_sql(logger, sql) + logger.info('') + except Exception as e: + logger.debug('') + raise e + return emp_chg_inst_records + + +def _select_ult_ident_presc(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]: + # + try: + sql = """ + SELECT + uip.ta_cd, + uip.ult_ident_cd, + uip.ratio, + uip.start_date, + uip.end_date, + (SELECT COUNT(uipopp.ta_cd) FROM ult_ident_presc AS uipopp WHERE uipopp.presc_cd = :dup_opp_cd) AS opp_count + FROM + src05.ult_ident_presc AS uip + WHERE + uip.presc_cd = '{$dcfInstCd}' + AND (SELECT ht.syor_date FROM src05.hdke_tbl AS ht) < uip.end_date + """ + params = {'dcf_inst_cd': dcf_inst_cd, 'dup_opp_cd': dup_opp_cd} + ult_ident_presc_records = db.execute_select(sql, params) + logging_sql(logger, sql) + logger.info('') + except Exception as e: + logger.debug('') + raise e + return ult_ident_presc_records + + +def _count_duplicate_ult_ident_presc(db: Database, set_Start_Date: str, ult_row: CursorResult): + # + try: + sql = """ + SELECT + COUNT(ta_cd) AS cnt + FROM + src05.ult_ident_presc + WHERE + ta_cd = :ta_cd + AND ult_ident_cd = :ult_ident_cd + AND ratio = :ratio + AND start_date = :set_Start_Date + """ + params = { + 'ta_cd': ult_row['ta_cd'], + 'ult_ident_cd': ult_row['ult_ident_cd'], + 'ratio': ult_row['ratio'], + 'start_date': set_Start_Date + } + result = db.execute_select(sql, params) + logging_sql(logger, sql) + logger.info('') + except Exception as e: + logger.debug('') + raise e + return result[0]['cnt'] + + +def _get_first_day_of_month(month_day: str): + return datetime.datetime.strptime(month_day, '%Y%m01') + + +def _str_to_date_time(str_date_time: str): + return datetime.datetime.strptime(str_date_time, '%Y%m%d') + + +def _date_time_to_str(date_time: datetime): + return date_time.strptime('%Y%m%d')