feat: 日次バッチの制御、日付更新関連の処理追加。やらない処理を間引いた。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-14 11:53:57 +09:00
parent 4751d887f8
commit 2b57687697
5 changed files with 151 additions and 96 deletions

View File

@ -5,48 +5,90 @@ from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_syor_date() -> str:
"""DBから処理日を取得します
def get_batch_statuses() -> tuple[str, str, str]:
"""日付テーブルから、以下を取得して返す。
- 日次バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
str: hdke_tbl.syor_date
tuple[str, str]: [0]日次バッチ処理中フラグdump取得状況区分
"""
db = Database.get_instance()
db.connect()
sql = 'SELECT syor_date FROM src05.hdke_tbl'
sql = 'SELECT bch_actf, dump_sts_kbn, src05.get_syor_date() AS syor_date FROM src05.hdke_tbl'
try:
syor_date_result = db.execute_select(sql)
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
if len(syor_date_result) == 0:
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず一件取れる
syor_date_record = syor_date_result[0]
syor_date_str = syor_date_record['syor_date']
return syor_date_str
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['dump_sts_kbn']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def get_syor_date_as_date_format() -> str:
"""DBから処理日を取得し、yyyy/mm/ddのフォーマットにして返します
def update_batch_processing_flag_in_processing() -> None:
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
str: hdke_tbl.syor_dateをyyyy/mm/ddにフォーマットした文字列
BatchOperationException: DB操作の何らかのエラー
"""
syor_date_str = get_syor_date()
syor_date = datetime.strptime(syor_date_str, '%Y%m%d')
return syor_date.strftime('%Y/%m/%d')
db = Database.get_instance()
sql = 'UPDATE src05.hdke_tbl SET bch_actf = :in_processing'
try:
db.connect()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
return
def logging_sql(logger, sql):
def update_batch_process_complete() -> None:
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src05.get_syor_date() + interval 1 day), '%Y%m%d) -- +1日
"""
try:
db.connect()
db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
return
def logging_sql(logger, sql) -> None:
"""SQL文をデバッグログで出力する
Args:

View File

@ -6,6 +6,7 @@ from src.batch.ultmarc.datfile import DatFile
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
UltmarcTableMapperFactory
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
logger = get_logger('アルトマークデータ保管')
@ -52,7 +53,7 @@ def exec_import():
ultmarc_bucket.delete_dat_file(dat_file_name)
except Exception as e:
logger.exception(e)
raise e
raise BatchOperationException(e)
finally:
logger.info('アルトマーク取込処理: 終了')

View File

@ -1,6 +1,3 @@
from tenacity import RetryError
class MeDaCaException(Exception):
pass
@ -11,7 +8,3 @@ class DBException(MeDaCaException):
class BatchOperationException(MeDaCaException):
pass
class MaxRetryExceededException(MeDaCaException, RetryError):
pass

View File

@ -1,11 +1,14 @@
from src.batch import jissekiaraigae
from src.batch.batch_functions import get_syor_date_as_date_format
from src.batch.batch_functions import (
get_batch_statuses, update_batch_process_complete,
update_batch_processing_flag_in_processing)
from src.batch.common.batch_config import BatchConfig
from src.batch.ultmarc import ultmarc_process
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('日次処理コントロール') # ここを処理IDとかにするといいかもしれない
logger = get_logger('日次処理コントロール')
# バッチ共通設定を取得
batch_config = BatchConfig.get_instance()
@ -14,62 +17,71 @@ batch_config = BatchConfig.get_instance()
def batch_process():
try:
logger.info('日次ジョブ:開始')
# logger.info('S3マウント状況確認')
# logger.error('S3マウントエラー:DWH異常終了')
# logger.error('S3マウントエラー:BIO異常終了')
# logger.info('データベース接続') # 実際には、ここでつなげているわけではないので、いらないと思う
# logger.error('データベース接続エラー(異常終了)') # 検査例外を捕まえて、共通的に出せばいいと思う
try:
logger.info('処理日取得')
syor_date = get_syor_date_as_date_format()
# 日次バッチ処置中フラグ、dump処理状態区分、処理日を取得
batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses()
except BatchOperationException as e:
logger.error(f'処理日取得エラー(異常終了){e}')
logger.exception(f'日付テーブル取得(異常終了){e}')
raise e
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。')
return 0
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。')
return 0 # エラーで返せば、StepFunctionsでリトライしてくれるので、どう返すかは要検討
logger.info(f'処理日={syor_date}')
# バッチ共通設定に処理日を追加
batch_config.syor_date = syor_date
# 休日判定ファイルを読み込み
# バッチ処理中に更新
try:
update_batch_processing_flag_in_processing()
except BatchOperationException as e:
logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}')
# 休日判定ファイルを読み込み(ここは、各処理内に押し込むつもり)
logger.info('休日判定処理')
if True: # 休日判定
logger.info('非営業日かつ月、火、水以外です。') # 分岐
try:
# 処理中フラグ判定。ここでdumpのフラグも見る
logger.info('処理フラグ更新中')
logger.info('処理フラグ更新終了')
except BatchOperationException as e:
logger.error(f'処理フラグ更新処理エラー(異常終了){e}')
# try:
# # 処理中フラグ判定。ここでdumpのフラグも見る
# logger.info('処理フラグ更新中')
# logger.info('処理フラグ更新終了')
# except BatchOperationException as e:
# logger.error(f'処理フラグ更新処理エラー(異常終了){e}')
logger.info('日次ジョブ:終了(正常終了)')
try:
logger.info('日次ジョブ処理中判定')
if True: # 処理中判定
logger.error('処理フラグ処理中(異常終了)')
logger.info('処理中フラグの更新:起動')
logger.info('処理中フラグの更新:終了')
except BatchOperationException as e:
logger.error(f'日次ジョブ処理中エラー(異常終了){e}')
# ↓ここから、やらない↓
# logger.info('処理前バックアップ実行')
# logger.info('処理前バックアップ:起動')
# logger.info('処理前バックアップ:終了')
# logger.error('処理前バックアップ処理エラー(異常終了)', $ex->getMessage())
# ↑ここまで↑
logger.info('卸在庫データ取込判定')
if True: # 卸在庫日判定
logger.info('卸在庫データ取込日です')
logger.debug('卸在庫データファイル名: {_PATH_OROSHI_ZAIKO}')
if True: # 卸在庫ファイル存在確認なければ異常終了
logger.error('卸在庫データ存在確認エラー(異常終了)')
logger.info('卸在庫データ存在確認:取込処理開始')
logger.debug('卸在庫データファイル名作成: {read_filename}')
logger.debug('ファイル移動OK{_MOVE_OROSHI_ZAIKO}') # S3からダウンロード
logger.debug('ファイル解凍OK{sprintf(_ZIP_OROSHI_ZAIKO, $read_filename)}') # gunzip -fなので、gzipを使う
logger.debug('ファイル名変更OK {sprintf(_RENAME_OROSHI_ZAIKO, $read_filename)}') # S3にアップロード
try:
logger.info('卸在庫データ取込:起動')
logger.info('卸在庫データ取込:終了')
except BatchOperationException as e:
logger.error(f'卸在庫データ取込処理エラー(異常終了){e}')
# try:
# logger.info('日次ジョブ処理中判定')
# if True: # 処理中判定
# logger.error('処理フラグ処理中(異常終了)')
# logger.info('処理中フラグの更新:起動')
# logger.info('処理中フラグの更新:終了')
# except BatchOperationException as e:
# logger.error(f'日次ジョブ処理中エラー(異常終了){e}')
# TODO: 以下、卸在庫取り込み処理は、実消化取り込み内で行う
# logger.info('卸在庫データ取込判定')
# if True: # 卸在庫日判定
# logger.info('卸在庫データ取込日です')
# logger.debug('卸在庫データファイル名: {_PATH_OROSHI_ZAIKO}')
# if True: # 卸在庫ファイル存在確認なければ異常終了
# logger.error('卸在庫データ存在確認エラー(異常終了)')
# logger.info('卸在庫データ存在確認:取込処理開始')
# logger.debug('卸在庫データファイル名作成: {read_filename}')
# logger.debug('ファイル移動OK{_MOVE_OROSHI_ZAIKO}') # S3からダウンロード
# logger.debug('ファイル解凍OK{sprintf(_ZIP_OROSHI_ZAIKO, $read_filename)}') # gunzip -fなので、gzipを使う
# logger.debug('ファイル名変更OK {sprintf(_RENAME_OROSHI_ZAIKO, $read_filename)}') # S3にアップロード
# try:
# logger.info('卸在庫データ取込:起動')
# logger.info('卸在庫データ取込:終了')
# except BatchOperationException as e:
# logger.error(f'卸在庫データ取込処理エラー(異常終了){e}')
logger.info('日次処理(アルトマーク)')
if True: # アルトマークなければ
if True: # アルトマーク取り込み処理内で実装
logger.info('日次処理(アルトマーク)実行対象日でない為未実行')
try:
logger.info('アルトマーク取込:起動')
@ -77,7 +89,7 @@ def batch_process():
logger.info('アルトマーク取込:終了')
except BatchOperationException as e:
logger.error(f'アルトマーク取込処理エラー(異常終了){e}')
if True: # 休日判定
if True: # 休日判定、メルク施設マスタ作成内で行いたい
try:
logger.info('メルク施設マスタ作成')
logger.info('メルク施設マスタ作成終了')
@ -88,10 +100,11 @@ def batch_process():
logger.info('DCF施設統合マスタ作成終了')
except BatchOperationException as e:
logger.error(f'DCF施設統合マスタ作成エラー異常終了{e}')
# if False: # ($holiday === FALSE) とにかく毎日動かす
logger.info('V実消化連携データ存在確認')
if True:
logger.error('V実消化連携データ存在確認異常終了')
# if False: # ($holiday === FALSE) # DCF施設統合マスタを作成する日は必ず休日なので、V実消化の取り込みは行わない。
# TODO: データ存在確認は実消化取り込み内で行う
# logger.info('V実消化連携データ存在確認')
# if True:
# logger.error('V実消化連携データ存在確認異常終了')
logger.info('日次処理V実消化')
try:
@ -99,28 +112,26 @@ def batch_process():
logger.info('V実消化取込終了')
except BatchOperationException as e:
logger.exception(f'V実消化取込処理エラー異常終了{e}')
logger.info('日次処理(実績更新)')
# ここも、休日判定を内側で行う
try:
# ここで、生物由来ロット分解と並行処理
logger.info('実績更新:起動')
jissekiaraigae.batch_process()
logger.info('生物由来ロット分解:起動')
logger.info('実績更新:終了')
logger.info('生物由来ロット分解:終了')
except BatchOperationException as e:
logger.exception(f'実績更新処理エラー(異常終了){e}')
# ↓以下、ファイルのバックアップ以外はやらない↓
# logger.info('処理後バックアップ実行')
# logger.info('処理後バックアップ:起動')
# logger.info('処理後バックアップ:終了')
# logger.error('処理後バックアップ処理エラー(異常終了)', $ex->getMessage())
# ↑ここまでやらない↑
logger.info('処理中フラグの更新:非処理中')
try:
logger.info('処理中フラグの更新:起動')
logger.info('処理中フラグの更新:終了')
except BatchOperationException as e:
logger.exception(f'処理中フラグ更新エラー(異常終了){e}')
logger.info('ワークディレクトリクリーニング')
logger.info('日次ジョブ:終了(正常終了)')
# バッチ処理完了とし、処理日、バッチ処置中フラグ、dump取得状態区分を更新
try:
update_batch_process_complete()
except BatchOperationException as e:
logger.exception(f'日次バッチ完了処理 エラー(異常終了){e}')
logger.info('日次ジョブ:終了(正常終了)')
return 0
except Exception as e:
raise e

View File

@ -0,0 +1,8 @@
# バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
# dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了
DUMP_STATUS_KBN_COMPLETE = '2'