"""バッチ処理の共通関数""" import logging import textwrap from src.db.database import Database from src.error.exceptions import BatchOperationException, DBException from src.system_var import constants def get_batch_statuses() -> tuple[str, str]: """日付テーブルから、以下を取得して返す。 - バッチ処理中フラグ - dump取得状況区分 Raises: BatchOperationException: 日付テーブルが取得できないとき、何らかのエラーが発生したとき Returns: tuple[str, str]: [0]バッチ処理中フラグ,[1]dump取得状況区分 """ db = Database.get_instance() sql = 'SELECT bch_actf, dump_sts_kbn FROM src07.hdke_tbl' try: db.connect() hdke_tbl_result = db.execute_select(sql) except DBException as e: raise BatchOperationException(e) finally: db.disconnect() if len(hdke_tbl_result) == 0: raise BatchOperationException('日付テーブルが取得できませんでした') # 必ず1件取れる hdke_tbl_record = hdke_tbl_result[0] batch_processing_flag = hdke_tbl_record['bch_actf'] dump_status_kbn = hdke_tbl_record['dump_sts_kbn'] return batch_processing_flag, dump_status_kbn def update_dump_status_kbn_in_processing() -> None: """dump取得状況区分を処理中に更新する Raises: BatchOperationException: DB操作の何らかのエラー """ db = Database.get_instance() sql = """\ UPDATE src07.hdke_tbl SET dump_sts_kbn = :in_processing, updater = CURRENT_USER(), update_date = NOW() """ try: db.connect() db.to_jst() db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING}) except DBException as e: raise BatchOperationException(e) finally: db.disconnect() return def update_dump_status_kbn_error() -> None: """dump取得状況区分をエラーに更新する Raises: BatchOperationException: DB操作の何らかのエラー """ db = Database.get_instance() sql = """\ UPDATE src07.hdke_tbl SET dump_sts_kbn = :dump_unprocessed, updater = CURRENT_USER(), update_date = NOW() """ try: db.connect() db.to_jst() db.execute(sql, { 'dump_unprocessed': constants.DUMP_STATUS_KBN_ERROR }) except DBException as e: raise BatchOperationException(e) finally: db.disconnect() return def update_dump_status_kbn_complete() -> None: """dump取得状況区分を正常終了に更新する Raises: BatchOperationException: DB操作の何らかのエラー """ db = Database.get_instance() sql = """\ UPDATE src07.hdke_tbl SET dump_sts_kbn = :dump_unprocessed, updater = CURRENT_USER(), update_date = NOW() """ try: db.connect() db.to_jst() db.execute(sql, { 'dump_unprocessed': constants.DUMP_STATUS_KBN_COMPLETE }) except DBException as e: raise BatchOperationException(e) finally: db.disconnect() return def logging_sql(logger: logging.Logger, sql: str) -> None: """SQL文をデバッグログで出力する Args: logger (logging.Logger): ロガー sql (str): SQL文 """ logger.debug(f'\n{"-" * 15}\n{textwrap.dedent(sql)[1:-1]}\n{"-" * 15}')