diff --git a/ecs/jskult-batch-daily/entrypoint.py b/ecs/jskult-batch-daily/entrypoint.py index 472efd9f..5b06ca48 100644 --- a/ecs/jskult-batch-daily/entrypoint.py +++ b/ecs/jskult-batch-daily/entrypoint.py @@ -1,10 +1,21 @@ """実消化&アルトマーク 日次バッチのエントリーポイント""" +import asyncio + from src import jobctrl_daily -if __name__ == '__main__': + +# 一部並行実行のため、非同期関数化 +async def run(): try: - exit(jobctrl_daily.exec()) + task = asyncio.create_task(jobctrl_daily.exec()) + result_code = await task + exit(result_code) except Exception: # エラーが起きても、正常系のコードで返す。 # エラーが起きた事実はbatch_process内でログを出す。 exit(0) + + +if __name__ == '__main__': + # 非同期関数を実行 + asyncio.run(run()) diff --git a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py similarity index 97% rename from ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py rename to ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py index 7d166157..02080d76 100644 --- a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py +++ b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py @@ -5,7 +5,7 @@ batch_context = BatchContext.get_instance() logger = get_logger('生物由来卸販売ロット分解') -def exec(): +async def exec(): """生物由来卸販売ロット分解""" logger.info('生物由来卸販売ロット分解:起動') diff --git a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py index 16ff0c79..57ef56be 100644 --- a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py +++ b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py @@ -9,7 +9,7 @@ logger = get_logger('48-施設担当者マスタ洗替') batch_context = BatchContext.get_instance() -def exec(): +async def exec(): db = Database.get_instance() try: db.connect() diff --git a/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py b/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py index 7b77d8ac..3265440f 100644 --- a/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py +++ b/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py @@ -7,15 +7,15 @@ batch_context = BatchContext.get_instance() logger = get_logger('実績洗替') -def exec(): +async def exec(): """実績洗替処理""" - logger.info('実績更新:起動') + logger.debug('実績更新:起動') # 営業日ではない場合、実績洗替処理は実行しない if batch_context.is_not_business_day: logger.info('営業日ではないため、実績洗替処理をスキップします。') return # 施設担当者洗替 - emp_chg_inst_laundering.exec() + await emp_chg_inst_laundering.exec() - logger.info('実績更新:終了') + logger.debug('実績更新:終了') diff --git a/ecs/jskult-batch-daily/src/batch/parallel_processes.py b/ecs/jskult-batch-daily/src/batch/parallel_processes.py deleted file mode 100644 index 1b7db6c7..00000000 --- a/ecs/jskult-batch-daily/src/batch/parallel_processes.py +++ /dev/null @@ -1,32 +0,0 @@ -"""並行処理""" - -import concurrent.futures - -from src.batch.bio_sales import create_bio_sale_lot -from src.batch.laundering import sales_laundering -from src.error.exceptions import BatchOperationException - - -def exec(): - # 並行処理を開始 - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - - # 実績更新 - future_sales_laundering = executor.submit(sales_laundering.exec) - # 生物由来ロット分解 - future_create_bio_sales_lot = executor.submit(create_bio_sale_lot.exec) - - # 両方の処理が完了するまで待つ - concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot]) - - # エラーがあれば呼び出し元でキャッチする - sales_laundering_exc = future_sales_laundering.exception() - create_bio_sales_lot_exc = future_create_bio_sales_lot.exception() - - # いずれかにエラーが発生していれば、1つのエラーとして返す。 - if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None: - sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else '' - create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else '' - raise BatchOperationException(f'並行処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}') - - return diff --git a/ecs/jskult-batch-daily/src/batch/parallel_tasks.py b/ecs/jskult-batch-daily/src/batch/parallel_tasks.py new file mode 100644 index 00000000..afb58820 --- /dev/null +++ b/ecs/jskult-batch-daily/src/batch/parallel_tasks.py @@ -0,0 +1,34 @@ +"""並行処理""" + +import asyncio + +from src.batch.bio_sales import create_bio_sales_lot +from src.batch.laundering import sales_laundering +from src.error.exceptions import BatchOperationException + + +async def exec(): + tasks = { + '実績更新': sales_laundering.exec(), + '生物由来ロット分解': create_bio_sales_lot.exec() + } + + # 並行処理を開始 + # 両方の処理が完了するまで待つ + done, _ = await asyncio.wait(tasks.values(), return_when=asyncio.ALL_COMPLETED) + task_results = {} + # 実行結果を確認する + for task_key, task in zip(tasks.keys(), done): + try: + task.result() + task_results[task_key] = True + except Exception as e: + task_results[task_key] = str(e) + + # エラーが発生しているかを確認し、エラーがあれば例外とする + if_error_tasks = {k: v for k, v in task_results.items() if v is not True} + if len(if_error_tasks.keys()) != 0: + messages = ', '.join([f'{k}={v}' for k, v in if_error_tasks.items()]) + raise BatchOperationException(f'並行処理中にエラーが発生しました。{messages}') + + return diff --git a/ecs/jskult-batch-daily/src/jobctrl_daily.py b/ecs/jskult-batch-daily/src/jobctrl_daily.py index 3788d368..5c178220 100644 --- a/ecs/jskult-batch-daily/src/jobctrl_daily.py +++ b/ecs/jskult-batch-daily/src/jobctrl_daily.py @@ -1,7 +1,7 @@ """実消化&アルトマーク 日次バッチ処理""" from src.aws.s3 import ConfigBucket -from src.batch import parallel_processes +from src.batch import parallel_tasks from src.batch.batch_functions import ( get_batch_statuses, update_batch_process_complete, update_batch_processing_flag_in_processing) @@ -19,7 +19,7 @@ logger = get_logger('日次処理コントロール') batch_context = BatchContext.get_instance() -def exec(): +async def exec(): try: logger.info('日次バッチ:開始') try: @@ -100,7 +100,7 @@ def exec(): try: # 実績生物由来ロット分解と並行処理 logger.info('並行処理(実績更新-生物由来ロット分解):起動') - parallel_processes.exec() + await parallel_tasks.exec() logger.info('並行処理(実績更新-生物由来ロット分解):終了') except BatchOperationException as e: logger.exception(f'並行処理(実績更新-生物由来ロット分解)エラー(異常終了){e}') @@ -130,3 +130,5 @@ def exec(): except Exception as e: logger.exception(f'日次バッチ処理中に想定外のエラーが発生しました {e}') raise e + finally: + logger.info('日次バッチ:終了')