feat: 並行処理の試作品。例外処理とかの検証がまだ

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-17 11:03:42 +09:00
parent 834bc503dd
commit 1e6856c51e
7 changed files with 68 additions and 14 deletions

View File

@ -0,0 +1,21 @@
from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('生物由来卸販売ロット分解')
def exec():
"""生物由来卸販売ロット分解"""
logger.info('生物由来卸販売ロット分解:起動')
# 営業日ではない場合、処理をスキップする
if batch_context.is_not_business_day:
logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。')
return
pass
logger.info('作成処理')
logger.info('生物由来卸販売ロット分解:終了')
return

View File

@ -2,11 +2,11 @@ from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('メルク施設統合マスタ')
logger = get_logger('メルク施設統合マスタ作成')
def exec():
"""メルク施設統合マスタ"""
"""メルク施設統合マスタ作成"""
# 営業日ではない場合、処理をスキップする
if batch_context.is_not_business_day:

View File

@ -9,7 +9,7 @@ logger = get_logger('実績洗替')
def exec():
"""実績洗替処理"""
logger.info('実績更新:起動')
# 非営業日の場合実績洗替処理は実行しない
if batch_context.is_not_business_day:
logger.info('非営業日のため、実績洗替処理をスキップします。')
@ -17,3 +17,5 @@ def exec():
# 施設担当者洗替
emp_chg_inst_laundering.exec()
logger.info('実績更新:終了')

View File

@ -0,0 +1,35 @@
"""並行処理"""
import concurrent.futures
from src.batch.bio_sales import create_bio_sale_lot
from src.batch.laundering import sales_laundering
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
logger = get_logger('日次処理コントロール(並行処理)')
def exec():
# 並行処理を開始
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 実績更新
future_sales_laundering = executor.submit(sales_laundering.exec)
# 生物由来ロット分解
future_create_bio_sales_lot = executor.submit(create_bio_sale_lot.exec)
# 両方の処理が完了するまで待つ
concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot])
# エラーがあれば呼び出し元でキャッチする
sales_laundering_exc = future_sales_laundering.exception()
create_bio_sales_lot_exc = future_create_bio_sales_lot.exception()
# いずれかにエラーが発生していれば、、1つのエラーとして返す。
if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None:
sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else ''
create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else ''
raise BatchOperationException(f'並行処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}')
return

View File

@ -1,13 +1,13 @@
"""実消化&アルトマーク 日次バッチ処理"""
from src.aws.s3 import ConfigBucket
from src.batch import parallel_processes
from src.batch.batch_functions import (
get_batch_statuses, update_batch_process_complete,
update_batch_processing_flag_in_processing)
from src.batch.common.batch_context import BatchContext
from src.batch.common.calendar_file import CalendarFile
from src.batch.laundering import (create_dcf_inst_merge, create_mst_inst,
sales_laundering)
from src.batch.laundering import create_dcf_inst_merge, create_mst_inst
from src.batch.ultmarc import ultmarc_process
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
@ -88,18 +88,14 @@ def exec():
return constants.BATCH_EXIT_CODE_SUCCESS
try:
# TODO: ここで、生物由来ロット分解と並行処理
logger.info('実績更新:起動')
sales_laundering.exec()
logger.info('生物由来ロット分解:起動')
logger.info('実績更新:終了')
logger.info('生物由来ロット分解:終了')
# 実績生物由来ロット分解と並行処理
logger.info('並行処理(実績更新-生物由来ロット分解):起動')
parallel_processes.exec()
logger.info('並行処理(実績更新-生物由来ロット分解):終了')
except BatchOperationException as e:
logger.exception(f'実績更新処理エラー(異常終了){e}')
logger.exception(f'並行処理(実績更新-生物由来ロット分解)エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# TODO: ここで並行処理合流
try:
logger.info('DCF施設統合マスタ作成起動')
create_dcf_inst_merge.exec()