コードの修正

This commit is contained in:
mori.k 2025-05-22 19:51:33 +09:00
parent 800f79b825
commit fa408d2a83

View File

@ -1,10 +1,19 @@
import os
from src.error.exceptions import MaxRunCountReachedException from src.error.exceptions import MaxRunCountReachedException
from src.db.database import Database from src.db.database import Database
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT")
DB_SCHEMA = os.environ.get("DB_SCHEMA")
# 実消化&アルトマーク_バッチステータス管理テーブルを管理するクラス # 実消化&アルトマーク_バッチステータス管理テーブルを管理するクラス
class JskultBatchStatusManager: class JskultBatchStatusManager:
def __init__(self, process_name : str, process_type : str, max_run_count_flg : int, receive_file_count : int): def __init__(self, process_name: str, process_type: str, max_run_count_flg: int, receive_file_count: int):
# 処理名 # 処理名
self._process_name: str = process_name self._process_name: str = process_name
@ -18,43 +27,46 @@ class JskultBatchStatusManager:
self._receive_file_count: str = receive_file_count self._receive_file_count: str = receive_file_count
# DB接続モジュール(バッチ)のget_instanceを呼び出し設定 # DB接続モジュール(バッチ)のget_instanceを呼び出し設定
self._db = Database().get_instance() self._db = Database(DB_USERNAME, DB_PASSWORD, DB_HOST,
DB_PORT, DB_SCHEMA).get_instance()
# 処理ステータスの登録および更新を行う # 処理ステータスの登録および更新を行う
def set_process_status (self, process_status : str): def set_process_status(self, process_status: str):
try: try:
self._db.begin() self._db.begin()
self._db.execute(f"CALL upsert_jskult_batch_status_manage({self._process_name} {self._process_type} {process_status} NULL NULL);") self._db.execute(
f"CALL upsert_jskult_batch_status_manage({self._process_name}, {self._process_type}, {process_status}, NULL, NULL);"
)
self._db.commit() self._db.commit()
except Exception as e: except Exception as e:
# Exceptionをcatchした場合はrollback # Exceptionをcatchした場合はrollback
self._db.rollback() self._db.rollback()
raise e raise e
# 後続処理を実行してよいか判定する # 後続処理を実行してよいか判定する
def can_run_post_process(self): def can_run_post_process(self):
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record_count = self._db.execute_select( record_count = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();"
).count() ).count()
if record_count == 0: if record_count == 0:
raise ValueError("レコードの取得が出来ませんでした。") raise ValueError("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント # 起動回数のインクリメント
self._increment_run_count() self._increment_run_count()
# データ取込が完了していた場合 # データ取込が完了していた場合
if self._is_done_data_import(): if self._is_done_data_import():
return True return True
#最大起動回数に到達していない場合 # 最大起動回数に到達していない場合
if not self._is_max_run_count_reached: if not self._is_max_run_count_reached:
return False return False
# 最大起動回数フラグを立てる # 最大起動回数フラグを立てる
self._activate_max_run_count_flg() self._activate_max_run_count_flg()
@ -65,41 +77,41 @@ class JskultBatchStatusManager:
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record_count = self._db.execute_select( record_count = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();"
).count() ).count()
if record_count == 0: if record_count == 0:
raise ValueError("レコードの取得が出来ませんでした。") raise ValueError("レコードの取得が出来ませんでした。")
# 起動回数のインクリメント # 起動回数のインクリメント
self._increment_run_count() self._increment_run_count()
# 後続処理が完了していた場合 # 後続処理が完了していた場合
if self._is_done_post_process(): if self._is_done_post_process():
return True return True
#最大起動回数に到達していない場合 # 最大起動回数に到達していない場合
if not self._is_max_run_count_reached(): if not self._is_max_run_count_reached():
return False return False
# 最大起動回数フラグを立てる # 最大起動回数フラグを立てる
self._activate_max_run_count_flg() self._activate_max_run_count_flg()
# 最大起動回数に到達した場合にメッセージをスロー # 最大起動回数に到達した場合にメッセージをスロー
raise MaxRunCountReachedException("最大起動回数に到達しました") raise MaxRunCountReachedException("最大起動回数に到達しました")
# アルトマークデータ連携があったかを確認する # アルトマークデータ連携があったかを確認する
def is_done_ultmarc_import(self): def is_done_ultmarc_import(self):
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record_count = self._db.execute_select( record_count = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = 'jskult-batch-ultmarc-io' AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = 'jskult-batch-ultmarc-io' AND process_date = src07.get_syor_date();"
).count() ).count()
# アルトマークデータ連携が無かった場合 # アルトマークデータ連携が無かった場合
if record_count == 0: if record_count == 0:
return False return False
return True return True
# 起動回数をインクリメントする # 起動回数をインクリメントする
@ -108,12 +120,13 @@ class JskultBatchStatusManager:
self._db.begin() self._db.begin()
# SELECTの結果からレコードを取得 # SELECTの結果からレコードを取得
record = self._db.execute_select( record = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();"
) )
run_count += record[0]['run_count'] run_count += record[0]['run_count']
self._db.execute(f"CALL upsert_jskult_batch_status_manage({self._process_name} {self._process_type} NULL {run_count} NULL);") self._db.execute(
f"CALL upsert_jskult_batch_status_manage({self._process_name}, {self._process_type}, NULL, {run_count}, NULL);")
self._db.commit() self._db.commit()
@ -122,78 +135,78 @@ class JskultBatchStatusManager:
# Exceptionをcatchした場合はrollbakc # Exceptionをcatchした場合はrollbakc
self._db.rollback() self._db.rollback()
raise e raise e
# データ取込処理が完了しているかを判定する # データ取込処理が完了しているかを判定する
def _is_done_data_import(self): def _is_done_data_import(self):
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record_count = self._db.execute_select( record_count = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_type = {self._process_type} AND process_status = 'done' AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_type = {self._process_type} AND process_status = 'done' AND process_date = src07.get_syor_date();"
).count() ).count()
# データ取込数が一致している場合 # データ取込数が一致している場合
if(self._receive_file_count == record_count): if (self._receive_file_count == record_count):
return True return True
return False return False
# 後続処理のすべての処理が完了しているかを判定する # 後続処理のすべての処理が完了しているかを判定する
def _is_done_post_process(self): def _is_done_post_process(self):
if not self._is_done_process("jskult-batch-trn-result-data-bio-lot"): if not self._is_done_process("jskult-batch-trn-result-data-bio-lot"):
return False return False
if not self._is_done_process("jskult-batch-mst-inst"): if not self._is_done_process("jskult-batch-mst-inst"):
return False return False
if not self._is_done_process("jskult-batch-dcf-inst-merge-io"): if not self._is_done_process("jskult-batch-dcf-inst-merge-io"):
return False return False
# 全ての後続処理が完了している場合Trueを返す # 全ての後続処理が完了している場合Trueを返す
return True return True
# データ取込処理が完了しているかを判定する # データ取込処理が完了しているかを判定する
def _is_done_process(self, process_name : str):
def _is_done_process(self):
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record_count = self._db.execute_select( record_count = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_status = 'done' AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_status = 'done' AND process_date = src07.get_syor_date();"
).count() ).count()
if(record_count == 0): if (record_count == 0):
return False return False
return True return True
# 起動回数が最大回数に到達しているか判定する # 起動回数が最大回数に到達しているか判定する
def _is_max_run_count_reached(self): def _is_max_run_count_reached(self):
# SELECTの結果からレコード数を取得 # SELECTの結果からレコード数を取得
record = self._db.execute_select( record = self._db.execute_select(
f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();" f"SELECT * FROM internal07.jskult_batch_status_manage WHERE process_name = {self._process_name} AND process_date = src07.get_syor_date();"
) )
run_count = record[0]['run_count'] run_count = record[0]['run_count']
# 取得した起動回数とフィールド変数の最大起動回数が一致するか確認 # 取得した起動回数とフィールド変数の最大起動回数が一致するか確認
if run_count == self._max_run_count_flg: if run_count == self._max_run_count_flg:
return True return True
return False
return False
def _activate_max_run_count_flg(self): def _activate_max_run_count_flg(self):
try: try:
self._db.begin() self._db.begin()
# 最大起動回数フラグにフラグを立てる # 最大起動回数フラグにフラグを立てる
self._db.execute(f"CALL upsert_jskult_batch_status_manage({self._process_name} {self._process_type} NULL NULL 1);") self._db.execute(
f"CALL upsert_jskult_batch_status_manage({self._process_name}, {self._process_type}, NULL, NULL, 1);")
self._db.commit() self._db.commit()
except Exception as e: except Exception as e:
self._db.rollback() self._db.rollback()
raise e raise e