From 3299d88ac7231125cab057f4dac85e3197078a6a Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 19 Apr 2023 13:53:17 +0900 Subject: [PATCH] =?UTF-8?q?Revert=20"feat:=20=E4=B8=A6=E8=A1=8C=E5=87=A6?= =?UTF-8?q?=E7=90=86=E8=A6=8B=E7=9B=B4=E3=81=97=E3=80=82ECS=20=E3=82=BF?= =?UTF-8?q?=E3=82=B9=E3=82=AF=E3=81=AEvCPU=E6=95=B0=E6=9C=80=E9=81=A9?= =?UTF-8?q?=E5=8C=96=E3=81=AE=E3=81=9F=E3=82=81=E3=80=81async-await?= =?UTF-8?q?=E3=81=A7=E4=B8=A6=E5=88=97=E5=87=A6=E7=90=86=E3=82=92=E5=AE=9F?= =?UTF-8?q?=E8=A3=85"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit e27757e3ee7ef20a218a354549bef89737660bae. --- ecs/jskult-batch-daily/entrypoint.py | 15 ++------ ...io_sales_lot.py => create_bio_sale_lot.py} | 2 +- .../laundering/emp_chg_inst_laundering.py | 2 +- .../src/batch/laundering/sales_laundering.py | 8 ++--- .../src/batch/parallel_processes.py | 32 +++++++++++++++++ .../src/batch/parallel_tasks.py | 34 ------------------- ecs/jskult-batch-daily/src/jobctrl_daily.py | 8 ++--- 7 files changed, 43 insertions(+), 58 deletions(-) rename ecs/jskult-batch-daily/src/batch/bio_sales/{create_bio_sales_lot.py => create_bio_sale_lot.py} (97%) create mode 100644 ecs/jskult-batch-daily/src/batch/parallel_processes.py delete mode 100644 ecs/jskult-batch-daily/src/batch/parallel_tasks.py diff --git a/ecs/jskult-batch-daily/entrypoint.py b/ecs/jskult-batch-daily/entrypoint.py index 5b06ca48..472efd9f 100644 --- a/ecs/jskult-batch-daily/entrypoint.py +++ b/ecs/jskult-batch-daily/entrypoint.py @@ -1,21 +1,10 @@ """実消化&アルトマーク 日次バッチのエントリーポイント""" -import asyncio - from src import jobctrl_daily - -# 一部並行実行のため、非同期関数化 -async def run(): +if __name__ == '__main__': try: - task = asyncio.create_task(jobctrl_daily.exec()) - result_code = await task - exit(result_code) + exit(jobctrl_daily.exec()) except Exception: # エラーが起きても、正常系のコードで返す。 # エラーが起きた事実はbatch_process内でログを出す。 exit(0) - - -if __name__ == '__main__': - # 非同期関数を実行 - asyncio.run(run()) diff --git a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py similarity index 97% rename from ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py rename to ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py index be9f2164..49bd19f1 100644 --- a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py +++ b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sale_lot.py @@ -5,7 +5,7 @@ batch_context = BatchContext.get_instance() logger = get_logger('生物由来卸販売ロット分解') -async def exec(): +def exec(): """生物由来卸販売ロット分解""" logger.debug('生物由来卸販売ロット分解:起動') # 営業日ではない場合、処理をスキップする diff --git a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py index 57ef56be..16ff0c79 100644 --- a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py +++ b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py @@ -9,7 +9,7 @@ logger = get_logger('48-施設担当者マスタ洗替') batch_context = BatchContext.get_instance() -async def exec(): +def exec(): db = Database.get_instance() try: db.connect() diff --git a/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py b/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py index 3265440f..7b77d8ac 100644 --- a/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py +++ b/ecs/jskult-batch-daily/src/batch/laundering/sales_laundering.py @@ -7,15 +7,15 @@ batch_context = BatchContext.get_instance() logger = get_logger('実績洗替') -async def exec(): +def exec(): """実績洗替処理""" - logger.debug('実績更新:起動') + logger.info('実績更新:起動') # 営業日ではない場合、実績洗替処理は実行しない if batch_context.is_not_business_day: logger.info('営業日ではないため、実績洗替処理をスキップします。') return # 施設担当者洗替 - await emp_chg_inst_laundering.exec() + emp_chg_inst_laundering.exec() - logger.debug('実績更新:終了') + logger.info('実績更新:終了') diff --git a/ecs/jskult-batch-daily/src/batch/parallel_processes.py b/ecs/jskult-batch-daily/src/batch/parallel_processes.py new file mode 100644 index 00000000..1b7db6c7 --- /dev/null +++ b/ecs/jskult-batch-daily/src/batch/parallel_processes.py @@ -0,0 +1,32 @@ +"""並行処理""" + +import concurrent.futures + +from src.batch.bio_sales import create_bio_sale_lot +from src.batch.laundering import sales_laundering +from src.error.exceptions import BatchOperationException + + +def exec(): + # 並行処理を開始 + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + + # 実績更新 + future_sales_laundering = executor.submit(sales_laundering.exec) + # 生物由来ロット分解 + future_create_bio_sales_lot = executor.submit(create_bio_sale_lot.exec) + + # 両方の処理が完了するまで待つ + concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot]) + + # エラーがあれば呼び出し元でキャッチする + sales_laundering_exc = future_sales_laundering.exception() + create_bio_sales_lot_exc = future_create_bio_sales_lot.exception() + + # いずれかにエラーが発生していれば、1つのエラーとして返す。 + if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None: + sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else '' + create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else '' + raise BatchOperationException(f'並行処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}') + + return diff --git a/ecs/jskult-batch-daily/src/batch/parallel_tasks.py b/ecs/jskult-batch-daily/src/batch/parallel_tasks.py deleted file mode 100644 index afb58820..00000000 --- a/ecs/jskult-batch-daily/src/batch/parallel_tasks.py +++ /dev/null @@ -1,34 +0,0 @@ -"""並行処理""" - -import asyncio - -from src.batch.bio_sales import create_bio_sales_lot -from src.batch.laundering import sales_laundering -from src.error.exceptions import BatchOperationException - - -async def exec(): - tasks = { - '実績更新': sales_laundering.exec(), - '生物由来ロット分解': create_bio_sales_lot.exec() - } - - # 並行処理を開始 - # 両方の処理が完了するまで待つ - done, _ = await asyncio.wait(tasks.values(), return_when=asyncio.ALL_COMPLETED) - task_results = {} - # 実行結果を確認する - for task_key, task in zip(tasks.keys(), done): - try: - task.result() - task_results[task_key] = True - except Exception as e: - task_results[task_key] = str(e) - - # エラーが発生しているかを確認し、エラーがあれば例外とする - if_error_tasks = {k: v for k, v in task_results.items() if v is not True} - if len(if_error_tasks.keys()) != 0: - messages = ', '.join([f'{k}={v}' for k, v in if_error_tasks.items()]) - raise BatchOperationException(f'並行処理中にエラーが発生しました。{messages}') - - return diff --git a/ecs/jskult-batch-daily/src/jobctrl_daily.py b/ecs/jskult-batch-daily/src/jobctrl_daily.py index 5c178220..3788d368 100644 --- a/ecs/jskult-batch-daily/src/jobctrl_daily.py +++ b/ecs/jskult-batch-daily/src/jobctrl_daily.py @@ -1,7 +1,7 @@ """実消化&アルトマーク 日次バッチ処理""" from src.aws.s3 import ConfigBucket -from src.batch import parallel_tasks +from src.batch import parallel_processes from src.batch.batch_functions import ( get_batch_statuses, update_batch_process_complete, update_batch_processing_flag_in_processing) @@ -19,7 +19,7 @@ logger = get_logger('日次処理コントロール') batch_context = BatchContext.get_instance() -async def exec(): +def exec(): try: logger.info('日次バッチ:開始') try: @@ -100,7 +100,7 @@ async def exec(): try: # 実績生物由来ロット分解と並行処理 logger.info('並行処理(実績更新-生物由来ロット分解):起動') - await parallel_tasks.exec() + parallel_processes.exec() logger.info('並行処理(実績更新-生物由来ロット分解):終了') except BatchOperationException as e: logger.exception(f'並行処理(実績更新-生物由来ロット分解)エラー(異常終了){e}') @@ -130,5 +130,3 @@ async def exec(): except Exception as e: logger.exception(f'日次バッチ処理中に想定外のエラーが発生しました {e}') raise e - finally: - logger.info('日次バッチ:終了')