newdwh2021/ecs/jskult-dbdump/src/batch/batch_functions.py

132 lines
3.4 KiB
Python

"""バッチ処理の共通関数"""
import logging
import textwrap
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_batch_statuses() -> tuple[str, str]:
"""日付テーブルから、以下を取得して返す。
- バッチ処理中フラグ
- dump取得状況区分
Raises:
BatchOperationException: 日付テーブルが取得できないとき、何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]バッチ処理中フラグ,[1]dump取得状況区分
"""
db = Database.get_instance()
sql = 'SELECT bch_actf, dump_sts_kbn FROM src05.hdke_tbl'
try:
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
return batch_processing_flag, dump_status_kbn
def update_dump_status_kbn_in_processing() -> None:
"""dump取得状況区分を処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :in_processing,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_dump_status_kbn_error() -> None:
"""dump取得状況区分をエラーに更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :dump_unprocessed,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'dump_unprocessed': constants.DUMP_STATUS_KBN_ERROR
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_dump_status_kbn_complete() -> None:
"""dump取得状況区分を正常終了に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :dump_unprocessed,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'dump_unprocessed': constants.DUMP_STATUS_KBN_COMPLETE
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def logging_sql(logger: logging.Logger, sql: str) -> None:
"""SQL文をデバッグログで出力する
Args:
logger (logging.Logger): ロガー
sql (str): SQL文
"""
logger.debug(f'\n{"-" * 15}\n{textwrap.dedent(sql)[1:-1]}\n{"-" * 15}')