feat: DCF施設統合マスタ日次更新バッチ(途中まで)

This commit is contained in:
高木要 2023-05-18 18:04:16 +09:00
parent 5a8d88c259
commit 94016f513d

View File

@ -0,0 +1,547 @@
from datetime import datetime, timedelta
from sqlalchemy import CursorResult
from src.batch.batch_functions import logging_sql
from src.batch.common.batch_context import BatchContext
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.time.elapsed_time import ElapsedTime
logger = get_logger('DCF施設統合マスタ日次更新バッチ')
batch_context = BatchContext.get_instance()
def exec():
db = Database.get_instance()
try:
db.connect()
db.begin()
#
enabled_dst_inst_merge_records = _laundering_enabled_dct_inst_merge(db)
#
_laundering_disabled_dct_inst_merge(db)
#
if len(enabled_dst_inst_merge_records) > 0:
logger.info('')
_add_emp_chg_inst(db, enabled_dst_inst_merge_records)
logger.info('')
_add_ult_ident_presc(db, enabled_dst_inst_merge_records)
db.commit()
#
logger.info('')
except Exception as e:
db.rollback()
raise BatchOperationException(e)
finally:
db.disconnect()
def _laundering_enabled_dct_inst_merge(db: Database) -> list[dict]:
# データ取得無効フラグが『0(有効)』)
valid_dst_inst_merge_records = _select_dct_inst_merge(db, 0, True)
# 移行先DCF施設コードの更新無効フラグが『0(有効)』)
_update_dcf_inst_merge(db, 0)
# DCF施設統合マスタの過去分の洗い替え
if len(valid_dst_inst_merge_records) == 0:
return
for row in valid_dst_inst_merge_records:
_update_dcf_inst_cd_new(db, row['dup_opp_cd'], row['dcf_inst_cd'])
return valid_dst_inst_merge_records
def _laundering_disabled_dct_inst_merge(db: Database):
# データ取得無効フラグが『1(無効)』)
disabled_dst_inst_merge_records = _select_dct_inst_merge(db, 1, False)
# 移行先DCF施設コードの更新無効フラグが『1(無効)』)
_update_dcf_inst_merge(db, 1)
# DCF施設統合マスタの過去分の洗い替え
if len(disabled_dst_inst_merge_records) == 0:
return
for row in disabled_dst_inst_merge_records:
_update_dcf_inst_cd_new(db, row['dcf_inst_cd'], row['dup_opp_cd'])
def _add_ult_ident_presc(db: Database, valid_dst_inst_merge_records: list[dict]):
#
for data_inst_cnt, row in enumerate(valid_dst_inst_merge_records, start=1):
tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month'])
ult_ident_presc_records = _select_ult_ident_presc(db, row['dcf_inst_cd'], row['dup_opp_cd'])
for data_cnt, ult_row in enumerate(ult_ident_presc_records, start=1):
logger.info(f'{data_inst_cnt}件目の移行施設の{data_cnt}レコード目処理 開始')
# 処方元コード=重複時相手先コードが発生した場合
if ult_row['opp_count'] > 0:
break
start_date = _str_to_date_time(ult_row['start_date'])
set_start_date = start_date \
if start_date > tekiyo_month_first_day else tekiyo_month_first_day
set_start_date = _date_time_to_str(set_start_date)
is_delete_duplicate_key = False
if _count_duplicate_ult_ident_presc(db, set_start_date, ult_row):
_delete_ult_ident_presc(db, set_start_date, ult_row)
is_delete_duplicate_key = True
else:
logger.info('納入先処方元マスタの重複予定データなし')
_insert_ult_ident_presc(db, set_start_date, row['dup_opp_cd'], ult_row)
if _str_to_date_time(ult_row['end_date']) < start_date:
_delete_ult_ident_presc(db, ult_row['start_date'], ult_row)
continue
if not is_delete_duplicate_key:
last_end_date = tekiyo_month_first_day - timedelta(days=1)
_update_ult_ident_presc_end_date(db, _date_time_to_str(last_end_date), ult_row)
if start_date > last_end_date:
_delete_ult_ident_presc(db, ult_row['start_date'], ult_row)
def _delete_ult_ident_presc(db: Database, start_date: str, ult_row: CursorResult):
#
try:
elapsed_time = ElapsedTime()
sql = """
DELETE FROM
src05.ult_ident_presc
WHERE
ta_cd = :ta_cd
AND ult_ident_cd = :ult_ident_cd
AND ratio = :ratio
AND start_date = :set_start_date
"""
params = {
'ta_cd': ult_row['ta_cd'],
'ult_ident_cd': ult_row['ult_ident_cd'],
'ratio': ult_row['ratio'],
'start_date': start_date
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info('')
except Exception as e:
logger.debug('')
raise e
def _add_emp_chg_inst(db: Database, valid_dst_inst_merge_records: list[dict]):
#
for row in valid_dst_inst_merge_records:
tekiyo_month_first_day = _get_first_day_of_month(row['tekiyo_month'])
emp_chg_inst_records = _select_emp_chg_inst(db, row['dcf_inst_cd'], row['dup_opp_cd'])
for emp_row in emp_chg_inst_records:
# 重複時相手先コードが存在したかのチェック
if emp_row['opp_count'] > 0:
break
start_date = _str_to_date_time(emp_row['start_date'])
set_start_date = start_date \
if start_date > tekiyo_month_first_day else tekiyo_month_first_day
_insert_emp_chg_inst(db, row['dup_opp_cd'], _date_time_to_str(set_start_date), emp_row)
if start_date < tekiyo_month_first_day:
last_end_date = tekiyo_month_first_day - timedelta(days=1)
_update_emp_chg_inst_end_date(db, row['dcf_inst_cd'], _date_time_to_str(last_end_date), emp_row)
if start_date <= last_end_date:
continue
_update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], emp_row['start_date'])
# if start_date >= tekiyo_month_first_day:
# _update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], start_date)
# continue
# last_end_date = tekiyo_month_first_day - timedelta(days=1)
# _update_emp_chg_inst_end_date(db, row['dcf_inst_cd'], last_end_date, emp_row)
# if start_date > last_end_date:
# _update_emp_chg_inst_enabled_flg(db, row['dcf_inst_cd'], emp_row['ta_cd'], start_date)
def _update_emp_chg_inst_enabled_flg(db: Database, dcf_inst_cd: str, ta_cd: str, start_date: str):
#
try:
elapsed_time = ElapsedTime()
sql = """
UPDATE
src05.emp_chg_inst
SET
enabled_flg = 'N',
updater = CURRENT_USER(),
update_date = SYSDATE()
WHERE
inst_cd = :dcf_inst_cd
AND ta_cd = :ta_cd
AND start_date = :start_date
"""
params = {'dcf_inst_cd': dcf_inst_cd, 'ta_cd': ta_cd, 'start_date': start_date}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'従業員担当施設マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _update_emp_chg_inst_end_date(db: Database, dcf_inst_cd: str, last_end_date: str, emp_row: CursorResult):
#
try:
elapsed_time = ElapsedTime()
sql = """
UPDATE
src05.emp_chg_inst
SET end_date = :end_date,
updater = CURRENT_USER(),
update_date= SYSDATE()
WHERE
inst_cd = :dcf_inst_cd
AND ta_cd = :ta_cd
AND emp_cd = :emp_cd
AND bu_cd = :bu_cd
AND start_date = :start_date
"""
params = {
'end_date': last_end_date,
'dcf_inst_cd': dcf_inst_cd,
'ta_cd': emp_row['ta_cd'],
'emp_cd': emp_row['emp_cd'],
'bu_cd': emp_row['bu_cd'],
'start_date': emp_row['start_date']
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'従業員担当施設マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _insert_emp_chg_inst(db: Database, dup_opp_cd: str, set_start_date: str, emp_row: CursorResult):
#
try:
elapsed_time = ElapsedTime()
sql = """
INSERT INTO
src05.emp_chg_inst(
inst_cd,
ta_cd,
emp_cd,
bu_cd,
start_date,
end_date,
main_chg_flg,
enabled_flg,
creater,
create_date,
updater,
update_date
)
VALUES(
:dup_opp_cd,
:ta_cd,
:emp_cd,
:bu_cd,
:start_date,
:end_date,
:main_chg_flg,
'Y',
CURRENT_USER(),
SYSDATE(),
CURRENT_USER(),
SYSDATE()
)
"""
params = {
'dup_opp_cd': dup_opp_cd,
'ta_cd': emp_row['ta_cd'],
'emp_cd': emp_row['emp_cd'],
'bu_cd': emp_row['bu_cd'],
'start_date': set_start_date,
'end_date': emp_row['end_date'],
'main_chg_flg': emp_row['main_chg_flg'] if emp_row['main_chg_flg'] is None else None
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'従業員担当施設マスタの追加に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _select_dct_inst_merge(db: Database, muko_flg: int, is_null_dcf_inst_cd_new: bool):
#
try:
sql = """
SELECT
dim.dcf_inst_cd,
dim.dup_opp_cd,
dim.tekiyo_month
FROM
src05.dcf_inst_merge AS dim
INNER JOIN
src05.hdke_tbl AS ht
ON dim.tekiyo_month = DATE_FORMAT(ht.syor_date, '%Y%m')
WHERE
dim.muko_flg =: muko_flg
AND dim.enabled_flg = 'Y'
AND dim.dcf_inst_cd_new <= >: is_null_dcf_inst_cd_new
"""
params = {
'muko_flg': muko_flg,
'is_null_dcf_inst_cd_new': None
}
dst_inst_merge_records = db.execute_select(sql, params)
logging_sql(logger, sql)
logger.info('')
except Exception as e:
logger.debug('')
raise e
return dst_inst_merge_records
def _update_dcf_inst_merge(db: Database, muko_flg: int):
#
try:
elapsed_time = ElapsedTime()
sql = """
UPDATE
src05.dcf_inst_merge AS updim
INNER JOIN(
SELECT
dim.dcf_inst_cd AS base_dcf_inst_cd,
dim.dup_opp_cd AS base_dup_opp_cd,
dim.tekiyo_month AS base_tekiyo_month,
dim.muko_flg AS base_muko_flg,
dim.enabled_flg AS base_enabled_flg
FROM
src05.dcf_inst_merge AS dim
INNER JOIN
src05.hdke_tbl AS ht
ON dim.tekiyo_month=DATE_FORMAT(ht.syor_date, '%Y%m')
WHERE
dim.muko_flg= :muko_flg
AND dim.enabled_flg='Y'
AND dim.dcf_inst_cd_new IS {$dcfInstCdNew}NULL
) AS bf_dim
SET
updim.dcf_inst_cd_new = {column},
updim.updater = CURRENT_USER(),
updim.update_date = SYSDATE()
WHERE
updim.dcf_inst_cd = base_dcf_inst_cd
AND updim.dup_opp_cd = base_dup_opp_cd
AND updim.tekiyo_month = base_tekiyo_month
AND updim.muko_flg =base_muko_flg
AND updim.enabled_flg =base_enabled_flg
"""
params = {
'muko_flg': muko_flg
}
res = db.execute(sql.format(
column='base_dup_opp_cd' if muko_flg == 1 else 'NULL'
), params)
logging_sql(logger, sql)
logger.info(f'DCF施設統合マスタの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _update_dcf_inst_cd_new(db: Database, dcf_inst_cd_new_after: str, dcf_inst_cd_new_before: str):
#
try:
elapsed_time = ElapsedTime()
sql = """
UPDATE
src05.dcf_inst_merge
SET
dcf_inst_cd_new = :dcf_inst_cd_new_after,
updater = CURRENT_USER(),
update_date = SYSDATE()
WHERE
dcf_inst_cd_new = :dcf_inst_cd_new_before
AND enabled_flg = 'Y'
AND muko_flg = 0
"""
params = {'dcf_inst_cd_new_after': dcf_inst_cd_new_after, 'dcf_inst_cd_new_before': dcf_inst_cd_new_before}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'移行先DCF施設コードの更新に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _update_ult_ident_presc_end_date(db: Database, last_end_date: str, ult_ident_presc_record: CursorResult):
#
try:
elapsed_time = ElapsedTime()
sql = """
UPDATE
src05.ult_ident_presc
SET end_date = :end_date,
updater = CURRENT_USER(),
update_date= SYSDATE()
WHERE
ta_cd = :ta_cd
AND ult_ident_cd = :ult_ident_cd
AND ratio = :ratio
AND start_date = :start_date
"""
params = {
'end_date': last_end_date,
'ta_cd': ult_ident_presc_record['ta_cd'],
'ult_ident_cd': ult_ident_presc_record['ult_ident_cd'],
'ratio': ult_ident_presc_record['ratio'],
'start_date': ult_ident_presc_record['start_date']
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'終了日 > 開始月のため適用終了日を更新, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('')
raise e
def _insert_ult_ident_presc(db: Database, set_Start_Date: str, dup_opp_cd: str, ult_row: CursorResult):
#
try:
elapsed_time = ElapsedTime()
sql = """
INSERT INTO
src05.ult_ident_presc(
ta_cd,
ult_ident_cd,
ratio,
start_date,
presc_cd,
end_date,
creater,
create_date,
update_date,
updater
)
VALUES(
:ta_cd,
:ult_ident_cd,
:ratio,
:start_date,
:presc_cd,
:end_date,
CURRENT_USER(),
SYSDATE(),
SYSDATE(),
CURRENT_USER()
)
"""
params = {
'ta_cd': ult_row['ta_cd'],
'ult_ident_cd': ult_row['ult_ident_cd'],
'ratio': ult_row['ratio'],
'start_date': set_Start_Date,
'presc_cd': dup_opp_cd,
'end_date': ult_row['end_date']
}
res = db.execute(sql, params)
logging_sql(logger, sql)
logger.info(f'納入先処方元マスタに追加に成功, {res.rowcount} 行更新 ({elapsed_time.of})')
except Exception as e:
logger.debug('納入先処方元マスタに追加に失敗')
raise e
def _select_emp_chg_inst(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]:
#
try:
sql = """
SELECT
eci.inst_cd,
eci.ta_cd,
eci.emp_cd,
eci.bu_cd,
eci.start_date,
eci.end_date,
eci.main_chg_flg,
eci.enabled_flg,
(SELECT COUNT(eciopp.inst_cd) FROM src05.emp_chg_inst AS eciopp WHERE eciopp.inst_cd = :dup_opp_cd) AS opp_count
FROM
src05.emp_chg_inst AS eci
WHERE
eci.inst_cd = :dcf_inst_cd
AND eci.enabled_flg = 'Y'
AND (SELECT ht.syor_date FROM src05.hdke_tbl AS ht) < eci.end_date
"""
params = {'dcf_inst_cd': dcf_inst_cd, 'dup_opp_cd': dup_opp_cd}
emp_chg_inst_records = db.execute_select(sql, params)
logging_sql(logger, sql)
logger.info('')
except Exception as e:
logger.debug('')
raise e
return emp_chg_inst_records
def _select_ult_ident_presc(db: Database, dcf_inst_cd: str, dup_opp_cd: str) -> list[dict]:
#
try:
sql = """
SELECT
uip.ta_cd,
uip.ult_ident_cd,
uip.ratio,
uip.start_date,
uip.end_date,
(SELECT COUNT(uipopp.ta_cd) FROM ult_ident_presc AS uipopp WHERE uipopp.presc_cd = :dup_opp_cd) AS opp_count
FROM
src05.ult_ident_presc AS uip
WHERE
uip.presc_cd = '{$dcfInstCd}'
AND (SELECT ht.syor_date FROM src05.hdke_tbl AS ht) < uip.end_date
"""
params = {'dcf_inst_cd': dcf_inst_cd, 'dup_opp_cd': dup_opp_cd}
ult_ident_presc_records = db.execute_select(sql, params)
logging_sql(logger, sql)
logger.info('')
except Exception as e:
logger.debug('')
raise e
return ult_ident_presc_records
def _count_duplicate_ult_ident_presc(db: Database, set_Start_Date: str, ult_row: CursorResult):
#
try:
sql = """
SELECT
COUNT(ta_cd) AS cnt
FROM
src05.ult_ident_presc
WHERE
ta_cd = :ta_cd
AND ult_ident_cd = :ult_ident_cd
AND ratio = :ratio
AND start_date = :set_Start_Date
"""
params = {
'ta_cd': ult_row['ta_cd'],
'ult_ident_cd': ult_row['ult_ident_cd'],
'ratio': ult_row['ratio'],
'start_date': set_Start_Date
}
result = db.execute_select(sql, params)
logging_sql(logger, sql)
logger.info('')
except Exception as e:
logger.debug('')
raise e
return result[0]['cnt']
def _get_first_day_of_month(month_day: str):
return datetime.datetime.strptime(month_day, '%Y%m01')
def _str_to_date_time(str_date_time: str):
return datetime.datetime.strptime(str_date_time, '%Y%m%d')
def _date_time_to_str(date_time: datetime):
return date_time.strptime('%Y%m%d')