feat: ステータス管理テーブル操作クラスの実装を正式化。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-27 12:36:25 +09:00
parent 418b6e417e
commit 0de1b06e3e
2 changed files with 382 additions and 232 deletions

View File

@ -1,239 +1,323 @@
import os
from src.error.exceptions import MaxRunCountReachedException
from src.db.database import Database
# 実消化&アルトマーク_バッチステータス管理テーブルを管理するクラス
from src.error.exceptions import (BatchOperationException,
MaxRunCountReachedException)
from src.system_var import constants
class JskultBatchStatusManager:
"""実消化&アルトマーク_バッチステータス管理テーブルを管理するクラス"""
def __init__(self, process_name: str, process_type: str, max_run_count_flg: int, receive_file_count: int):
"""コンストラクタ
Args:
process_name (str): 処理名
process_type (str): 管理区分
max_run_count_flg (int): 最大起動回数
receive_file_count (int): 受信ファイル数
"""
# 処理名
self._process_name: str = process_name
# 管理区分
self._process_type: str = process_type
# 最大起動回数
self._max_run_count_flg: str = max_run_count_flg
# 受信ファイル数
self._receive_file_count: str = receive_file_count
# DB接続モジュール(バッチ)のget_instanceを呼び出し設定
# DB接続モジュールを初期化
self._db = Database.get_instance()
# 処理ステータスの登録および更新を行う
def set_process_status(self, process_status: str):
"""
処理ステータスの登録および更新を行う
Args:
process_status (str): 処理ステータス
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
try:
# DB接続開始
self._db.connect()
self._db.begin()
self._db.to_jst()
self._db.execute(
f"""
"""
CALL
internal07.upsert_jskult_batch_status_manage(
{self._process_name},
{self._process_type},
{process_status},
:process_name,
:process_type,
:process_status,
NULL,
NULL
);"""
);""",
{
'process_name': self._process_name,
'process_type': self._process_type,
'process_status': process_status,
}
)
self._db.commit()
except Exception as e:
# Exceptionをcatchした場合はrollback
print(self._db)
# 例外発生時はrollback
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def can_run_post_process(self) -> bool:
"""
後続処理を実行してよいか判定する
Raises:
BatchOperationException: DB操作の何らかのエラー
MaxRunCountReachedException: 最大起動回数到達時の例外
Returns:
bool 後続処理を実行してよい場合はTrue
"""
try:
self._db.connect()
# 自身(後続処理 or 日付テーブル更新)のレコードを取得する
record = self._db.execute_select(
"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = :process_name
AND process_date = src07.get_syor_date();
""",
{'process_name': self._process_name}
)
record_count = record[0]['count']
if record_count == 0:
raise BatchOperationException("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント
self._increment_run_count()
# データ取込が完了していた場合
if self._is_done_data_import():
return True
# 最大起動回数に到達していない場合
if not self._is_max_run_count_reached():
return False
# 最大起動回数に到達していた場合
# 最大起動回数フラグを立てて例外を送出する
self._activate_max_run_count_flg()
raise MaxRunCountReachedException("最大起動回数に到達しました")
except MaxRunCountReachedException as e:
raise e
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
# 後続処理を実行してよいか判定する
def can_run_post_process(self):
def can_run_business_day_update(self) -> bool:
"""
日付テーブル更新を実行してよいか判定する
# SELECTの結果からレコード数を取得
Raises:
BatchOperationException: DB操作の何らかのエラー
MaxRunCountReachedException: 最大起動回数到達時の例外
Returns:
bool: 日付テーブル更新を実行してよい場合はTrue
"""
try:
# 自身(後続処理 or 日付テーブル更新)のレコードを取得する
record = self._db.execute_select(
"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = :process_name
AND process_date = src07.get_syor_date();
""",
{'process_name': self._process_name}
)
record_count = record[0]['count']
if record_count == 0:
raise ValueError("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント
self._increment_run_count()
# 後続処理が完了していた場合
if self._is_done_post_process():
return True
# 最大起動回数に到達していない場合
if not self._is_max_run_count_reached():
return False
# 最大起動回数フラグを立てる
self._activate_max_run_count_flg()
# 最大起動回数に到達した場合にメッセージをスロー
raise MaxRunCountReachedException("最大起動回数に到達しました")
except MaxRunCountReachedException as e:
raise e
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
def is_done_ultmarc_import(self) -> bool:
"""アルトマークデータ連携があったかを確認する
Raises:
BatchOperationException: DB操作の何らかのエラー
Returns:
bool アルトマークデータ連携があった場合はTrue
"""
try:
self._db.connect()
record = self._db.execute_select(
"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = :process_name
AND
process_date = src07.get_syor_date();
""",
{'process_name': constants.PROCESS_NAME_ULTMARC_IO}
)
record_count = record[0]['count']
return record_count != 0
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _increment_run_count(self):
"""起動回数をインクリメントする
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
try:
self._db.connect()
self._db.begin()
self._db.to_jst()
# 自身(後続処理 or 日付テーブル更新)のレコードを取得する
record = self._db.execute_select(
"""
SELECT
run_count
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = :process_name
AND process_date = src07.get_syor_date();
""",
{'process_name': self._process_name}
)
if len(record) == 0:
raise BatchOperationException("レコードの取得が出来ませんでした。")
run_count = record[0]['run_count']
self._db.execute(
"""
CALL
internal07.upsert_jskult_batch_status_manage(
:process_name,{self._process_name},
:process_type,{self._process_type},
NULL
:run_count,
NULL
)
""",
{
'process_name': self._process_name,
'process_type': self._process_type,
'run_count': run_count + 1,
}
)
self._db.commit()
except Exception as e:
# 例外発生時はrollback
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _is_done_data_import(self) -> bool:
"""データ取込処理が完了しているかを判定する
Raises:
BatchOperationException: DB操作の何らかのエラー
MaxRunCountReachedException: 最大起動回数到達時の例外
Returns:
bool 後続処理を実行してよい場合はTrue
"""
# 自身(後続処理 or 日付テーブル更新)のレコードを取得する
record = self._db.execute_select(
f"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_date = src07.get_syor_date();
"""
)
record_count = record[0]['count']
if record_count == 0:
raise ValueError("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント
self._increment_run_count()
# データ取込が完了していた場合
if self._is_done_data_import():
return True
# 最大起動回数に到達していない場合
if not self._is_max_run_count_reached:
return False
# 最大起動回数フラグを立てる
self._activate_max_run_count_flg()
return True
# 日付テーブル更新を実行してよいか判定する
def can_run_business_day_update(self):
# SELECTの結果からレコード数を取得
record = self._db.execute_select(
f"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_date = src07.get_syor_date();
"""
)
record_count = record[0]['count']
if record_count == 0:
raise ValueError("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント
self._increment_run_count()
# 後続処理が完了していた場合
if self._is_done_post_process():
return True
# 最大起動回数に到達していない場合
if not self._is_max_run_count_reached():
return False
# 最大起動回数フラグを立てる
self._activate_max_run_count_flg()
# 最大起動回数に到達した場合にメッセージをスロー
raise MaxRunCountReachedException("最大起動回数に到達しました")
# アルトマークデータ連携があったかを確認する
def is_done_ultmarc_import(self):
# SELECTの結果からレコード数を取得
record_count = self._db.execute_select(
f"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = 'jskult-batch-ultmarc-io'
AND
process_date = src07.get_syor_date();
"""
)
# アルトマークデータ連携が無かった場合
if record_count.pop() == 0:
return False
return True
# 起動回数をインクリメントする
def _increment_run_count(self):
try:
# DB接続開始
self._db.connect()
self._db.begin()
self._db.to_jst()
# SELECTの結果からレコードを取得
record = self._db.execute_select(
f"""
SELECT
*
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_date = src07.get_syor_date();
"""
)
run_count += record[0]['run_count']
self._db.execute(
f"""
CALL
upsert_jskult_batch_status_manage(
{self._process_name},
{self._process_type},
NULL,
{run_count},
NULL);
"""
)
self._db.commit()
except Exception as e:
# Exceptionをcatchした場合はrollbakc
self._db.rollback()
raise e
# データ取込処理が完了しているかを判定する
def _is_done_data_import(self):
# SELECTの結果からレコード数を取得
record = self._db.execute_select(
f"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_type = {self._process_type}
AND
process_status = 'done'
AND
process_date = src07.get_syor_date();
"""
process_type = :process_type
AND process_status = :process_status
AND process_date = src07.get_syor_date();
""",
{
'process_type': constants.PROCESS_TYPE_DATA_IMPORT,
'process_status': constants.PROCESS_STATUS_DONE
}
)
record_count = record[0]['count']
# データ取込数が一致している場合
if (self._receive_file_count == record_count):
return True
# データ取込数とデータ登録の件数が一致しているか確認
return self._receive_file_count == record_count
return False
def _is_done_post_process(self) -> bool:
"""後続処理のすべての処理が完了しているかを判定する
# 後続処理のすべての処理が完了しているかを判定する
def _is_done_post_process(self):
Returns:
bool: 後続処理がすべて完了していたらTrue
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
# 生物由来データロット分解のチェック
if not self._is_done_process("jskult-batch-trn-result-data-bio-lot"):
if not self._is_done_process(constants.PROCESS_NAME_TRN_RESULT_DATA_BIO_LOT):
return False
# メルク施設マスタ作成のチェック
if not self._is_done_process("jskult-batch-mst-inst"):
if not self._is_done_process(constants.PROCESS_NAME_MST_INST_ALL):
return False
# DCF削除新規マスタ作成のチェック
if not self._is_done_process("jskult-batch-dcf-inst-merge-io"):
if not self._is_done_process(constants.PROCESS_NAME_DCF_INST_MERGE_IO):
return False
# 全ての後続処理が完了している場合Trueを返す
@ -241,80 +325,110 @@ class JskultBatchStatusManager:
# データ取込処理が完了しているかを判定する
def _is_done_process(self):
def _is_done_process(self, process_name: str) -> bool:
"""指定された処理名の処理が完了しているかを判定する
# SELECTの結果からレコード数を取得
record = self._db.execute_select(
f"""
SELECT
*
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_status = 'done'
AND
process_date = src07.get_syor_date();
"""
)
Args:
process_name (str): 処理名
record_count = record[0]['count']
Returns:
bool: 処理名に一致する処理が完了していたらTrue
if (record_count == 0):
return False
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
try:
self._db.connect()
record = self._db.execute_select(
"""
SELECT
COUNT(*)
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = :process_name
AND process_status = process_status
AND process_date = src07.get_syor_date();
""",
{
'process_name': process_name,
'process_status': constants.PROCESS_STATUS_DONE
}
)
return True
record_count = record[0]['count']
# 起動回数が最大回数に到達しているか判定する
return record_count != 0
def _is_max_run_count_reached(self):
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
# SELECTの結果からレコードを取得
record = self._db.execute_select(
f"""
SELECT
*
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_date = src07.get_syor_date();
"""
)
def _is_max_run_count_reached(self) -> bool:
"""起動回数が最大回数に到達しているか判定する
run_count = record[0]['run_count']
Returns:
bool: 最大起動回数に到達していたらTrue
# 取得した起動回数とフィールド変数の最大起動回数が一致するか確認
if run_count == self._max_run_count_flg:
return True
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
try:
self._db.connect()
record = self._db.execute_select(
"""
SELECT
run_count
FROM
internal07.jskult_batch_status_manage
WHERE
process_name = {self._process_name}
AND
process_date = src07.get_syor_date();
"""
)
return False
run_count = record[0]['run_count']
# 取得した起動回数とフィールド変数の最大起動回数が一致を確認
return run_count == self._max_run_count_flg
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _activate_max_run_count_flg(self):
"""最大起動回数フラグにフラグを立てる
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
try:
# DB接続開始
self._db.connect()
self._db.begin()
self._db.to_jst()
# 最大起動回数フラグにフラグを立てる
self._db.execute(
f"""
"""
CALL
upsert_jskult_batch_status_manage(
{self._process_name},
{self._process_type},
:process_name,
:process_type,
NULL,
NULL,
1);
"""
:max_run_count_flag);
""",
{
'process_name': self._process_name,
'process_type': self._process_type,
'max_run_count_flag': constants.MAX_RUN_COUNT_FLAG_ON
}
)
self._db.commit()
except Exception as e:
self._db.rollback()
raise e
raise BatchOperationException(e)
finally:
self._db.disconnect()

View File

@ -10,6 +10,42 @@ DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了
DUMP_STATUS_KBN_COMPLETE = '2'
# バッチステータス管理
# 処理名:
# アルトマーク取込
PROCESS_NAME_ULTMARC_IO = 'jskult-batch-ultmarc-io'
# 生物由来ロット分解
PROCESS_NAME_TRN_RESULT_DATA_BIO_LOT = 'jskult-batch-trn-result-data-bio-lot'
# メルク施設マスタ作成
PROCESS_NAME_MST_INST_ALL = 'jskult-batch-mst-inst-all'
# DCF削除新規マスタ作成
PROCESS_NAME_DCF_INST_MERGE_IO = 'jskult-batch-dcf-inst-merge-io'
# 日付テーブル更新
PROCESS_NAME_UPDATE_BUSINESS_DAY = 'jskult-batch-update-business-day'
# 管理区分:
# データ取込
PROCESS_TYPE_DATA_IMPORT = 'data_import'
# 後続処理
PROCESS_TYPE_POST_PROCESS = 'post_process'
# 日付テーブル更新
PROCESS_TYPE_DATA_IMPORT = 'update_business_day'
# 処理ステータス:
# 処理開始
PROCESS_STATUS_START = 'start'
# 処理待
PROCESS_STATUS_WAITING = 'waiting'
# 処理中
PROCESS_STATUS_DOING = 'doing'
# 処理済
PROCESS_STATUS_DONE = 'done'
# エラー
PROCESS_STATUS_ERROR = 'error'
# 最大起動回数フラグ: ON
MAX_RUN_COUNT_FLAG_ON = 1
# カレンダーファイルのコメントシンボル
CALENDAR_COMMENT_SYMBOL = '#'