fix: レビュー指摘反映

feat: レビュー指摘対応 db.disconnectのタイミング修正
https://nds-tyo.backlog.com/git/NEWDWH2021/newsdwh2021/pullRequests/161#comment-1843452

feat: レビュー指摘反映に伴い、検査例外を出す部分は一箇所に限定
https://nds-tyo.backlog.com/git/NEWDWH2021/newsdwh2021/pullRequests/161#comment-1843452

feat: レビュー指摘対応
https://nds-tyo.backlog.com/git/NEWDWH2021/newsdwh2021/pullRequests/161#comment-1843505
https://nds-tyo.backlog.com/git/NEWDWH2021/newsdwh2021/pullRequests/161#comment-1843511

feat: レビュー指摘反映
https://nds-tyo.backlog.com/git/NEWDWH2021/newsdwh2021/pullRequests/161#comment-1843529

feat: ログ出力レベルの調整。内部関数に_をつけた
This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-18 16:51:22 +09:00
parent f0b91c6a74
commit 4431e5bbaa
5 changed files with 110 additions and 108 deletions

View File

@ -27,7 +27,8 @@ def get_batch_statuses() -> tuple[str, str, str]:
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
@ -56,7 +57,8 @@ def update_batch_processing_flag_in_processing() -> None:
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
finally:
db.disconnect()
return
@ -83,7 +85,8 @@ def update_batch_process_complete() -> None:
})
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
finally:
db.disconnect()
return

View File

@ -1,20 +0,0 @@
from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('生物由来卸販売ロット分解')
def exec():
"""生物由来卸販売ロット分解"""
logger.info('生物由来卸販売ロット分解:起動')
# 営業日ではない場合、処理をスキップする
if batch_context.is_not_business_day:
logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。')
return
# TODO: ここに処理を追記していく
logger.info('生物由来卸販売ロット分解:終了')
return

View File

@ -11,38 +11,42 @@ batch_context = BatchContext.get_instance()
def exec():
db = Database.get_instance()
db.connect()
logger.info('##########################')
logger.info('START Changing Employee in charge of institution PGM.')
# 業務日付を取得
syor_date = batch_context.syor_date
# `emp_chg_inst_lau`をTruncate
truncate_emp_chg_inst_lau(db)
# emp_chg_inst から、`emp_chg_inst_lau`へInsert
insert_into_emp_chg_inst_lau_from_emp_chg_inst(db)
# vop_hco_merge_vから、emp_chg_inst_lauをUpdate
update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date)
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date)
db.disconnect()
logger.info('##########################')
logger.info('End All Processing PGM.')
try:
db.connect()
logger.debug('##########################')
logger.debug('START Changing Employee in charge of institution PGM.')
# 業務日付を取得
syor_date = batch_context.syor_date
# `emp_chg_inst_lau`をTruncate
_truncate_emp_chg_inst_lau(db)
# emp_chg_inst から、`emp_chg_inst_lau`へInsert
_insert_into_emp_chg_inst_lau_from_emp_chg_inst(db)
# vop_hco_merge_vから、emp_chg_inst_lauをUpdate
_update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date)
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
_update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date)
logger.debug('##########################')
logger.debug('End All Processing PGM.')
except Exception as e:
raise BatchOperationException(e)
finally:
db.disconnect()
def truncate_emp_chg_inst_lau(db: Database):
logger.info("##########################")
def _truncate_emp_chg_inst_lau(db: Database):
logger.debug("##########################")
try:
db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau")
except Exception as e:
logger.info("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!")
raise BatchOperationException(e)
logger.debug("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!")
raise e
logger.info("Table `emp_chg_inst_lau` was truncated!")
logger.debug("Table `emp_chg_inst_lau` was truncated!")
return
def insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logger.info("##########################")
def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logger.debug("##########################")
try:
elapsed_time = ElapsedTime()
sql = """
@ -70,16 +74,16 @@ def insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logging_sql(logger, sql)
logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!")
raise BatchOperationException(e)
logger.info("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!")
logger.debug("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!")
raise e
logger.debug("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!")
return
def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
# vop_hco_merge_vはデータが作られないため、この洗い替え処理は基本空振りする
logger.info("##########################")
logger.debug("##########################")
try:
select_result = db.execute_select(
"""
@ -93,8 +97,8 @@ def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
{'syor_date': syor_date}
)
except Exception as e:
logger.info("Error! `vop_hco_merge_v` Table count error!")
raise BatchOperationException(e)
logger.debug("Error! `vop_hco_merge_v` Table count error!")
raise e
count = [row for row in select_result][0]['row_count']
if count == 0:
logger.info('vop_hco_merge_v Table Data is not exists!')
@ -137,16 +141,16 @@ def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
logging_sql(logger, update_sql)
logger.info(f'Query OK, {update_result.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!")
raise BatchOperationException(e)
logger.info(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!")
logger.debug(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!")
raise e
logger.debug(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!")
return
def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
# Get count from DCF_INST_MERGE
logger.info("##########################")
logger.debug("##########################")
try:
select_result = db.execute_select(
"""
@ -163,18 +167,19 @@ def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
{'syor_date': syor_date}
)
except Exception as e:
logger.info("Error! Getting Count of dcf_inst_merge was failed!")
raise BatchOperationException(e)
logger.debug("Error! Getting Count of dcf_inst_merge was failed!")
raise e
count = [row for row in select_result][0]['row_count']
if count == 0:
logger.info('dcf_inst_merge Table Data is not exists!')
return
logger.info('dcf_inst_merge Table Data is exists!')
logger.debug('dcf_inst_merge Table Data is exists!')
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
logger.info("##########################")
logger.info("#### UPDATE DATA #########")
logger.info("##########################")
logger.debug("##########################")
logger.debug("#### UPDATE DATA #########")
logger.debug("##########################")
try:
elapsed_time = ElapsedTime()
update_sql = """
@ -205,9 +210,9 @@ def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
logging_sql(logger, update_sql)
logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info("emp_chg_inst_lau.v_inst_cd could not set!")
raise BatchOperationException(e)
logger.debug("emp_chg_inst_lau.v_inst_cd could not set!")
raise e
logger.info("emp_chg_inst_lau.v_inst_cd was set!")
logger.debug("emp_chg_inst_lau.v_inst_cd was set!")
return

View File

@ -61,7 +61,6 @@ def exec_import():
batch_context.is_ultmarc_imported = True
logger.info('アルトマーク取込処理: 終了')
except Exception as e:
logger.exception(e)
raise BatchOperationException(e)
@ -74,41 +73,46 @@ def exec_export():
def _import_to_ultmarc_table(dat_file: DatFile):
db = Database.get_instance()
# DB接続
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
logger.info('Transaction BEGIN')
mapper_factory = UltmarcTableMapperFactory()
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(f'ERROR_LINE: {log_message}')
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
db.disconnect()
# 処理結果をログに出力する
logger.info('Transaction COMMIT')
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
try:
# DB接続
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
logger.info('Transaction BEGIN')
mapper_factory = UltmarcTableMapperFactory()
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(f'ERROR_LINE: {log_message}')
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
db.disconnect()
# 処理結果をログに出力する
logger.info('Transaction COMMIT')
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
# 1件でもエラーがあれば、通知用にログに出力する
if dat_file.error_count > 0:
logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。')
# 1件でもエラーがあれば、通知用にログに出力する
if dat_file.error_count > 0:
logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。')
except Exception as e:
raise e
finally:
db.disconnect()
return

View File

@ -43,10 +43,17 @@ def exec():
# バッチ共通設定に処理日を追加
batch_context.syor_date = syor_date
# 稼働日かかどうかを、非営業日ファイルをダウンロードして判定
holiday_list_file_path = ConfigBucket().download_holiday_list()
holiday_calendar = CalendarFile(holiday_list_file_path)
batch_context.is_not_business_day = holiday_calendar.compare_date(syor_date)
# 稼働日かかどうかを、V実消化非稼働日ファイルをダウンロードして判定
try:
holiday_list_file_path = ConfigBucket().download_holiday_list()
holiday_calendar = CalendarFile(holiday_list_file_path)
batch_context.is_not_business_day = holiday_calendar.compare_date(syor_date)
except Exception as e:
logger.exception(f'V実消化非稼働日ファイルの読み込みに失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でV実消化稼働日かどうかをログ出力
logger.debug(f'本日は{"V実消化非稼働日です。" if batch_context.is_not_business_day else "V実消化稼働日です。"}')
# バッチ処理中に更新
try:
@ -63,6 +70,9 @@ def exec():
logger.exception(f'アルトマーク取込処理エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でアルトマーク取込が行われたかどうかをログ出力
logger.debug(f'{"アルトマーク取込が行われました。" if batch_context.is_ultmarc_imported else "アルトマーク取込が行われませんでした。"}')
try:
logger.info('V実消化用施設・薬局薬店データ作成処理起動')
ultmarc_process.exec_export()