feat: 共通処理実装。can_run_processはこれから。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-15 19:11:40 +09:00
parent 0ab0bae0c2
commit fbd3bdb590

View File

@ -0,0 +1,101 @@
import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
class JskultHdkeTblManager:
_db: Database
def __init__(self):
self._db = Database.get_instance()
def get_batch_statuses(self) -> tuple[str, str, str]:
"""日付テーブルから、以下を取得して返す。
- 日次バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]日次バッチ処理中フラグdump取得状況区分
"""
sql = 'SELECT bch_actf, dump_sts_kbn, src07.get_syor_date() AS syor_date FROM src07.hdke_tbl'
try:
self._db.connect()
hdke_tbl_result = self._db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def update_batch_process_start(self):
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :in_processing,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
self._db.connect()
self._db.to_jst()
self._db.execute(
sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
return
def update_batch_process_complete(self):
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src07.get_syor_date() + interval 1 day), '%Y%m%d'), -- +1
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
self._db.connect()
self._db.to_jst()
self._db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
def can_run_process(self):
pass