Revert "feat: 並行処理見直し。ECS タスクのvCPU数最適化のため、async-awaitで並列処理を実装"

This reverts commit e27757e3ee7ef20a218a354549bef89737660bae.
This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-19 13:53:17 +09:00
parent ac374afb85
commit 3299d88ac7
7 changed files with 43 additions and 58 deletions

View File

@ -1,21 +1,10 @@
"""実消化&アルトマーク 日次バッチのエントリーポイント"""
import asyncio
from src import jobctrl_daily
# 一部並行実行のため、非同期関数化
async def run():
if __name__ == '__main__':
try:
task = asyncio.create_task(jobctrl_daily.exec())
result_code = await task
exit(result_code)
exit(jobctrl_daily.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はbatch_process内でログを出す。
exit(0)
if __name__ == '__main__':
# 非同期関数を実行
asyncio.run(run())

View File

@ -5,7 +5,7 @@ batch_context = BatchContext.get_instance()
logger = get_logger('生物由来卸販売ロット分解')
async def exec():
def exec():
"""生物由来卸販売ロット分解"""
logger.debug('生物由来卸販売ロット分解:起動')
# 営業日ではない場合、処理をスキップする

View File

@ -9,7 +9,7 @@ logger = get_logger('48-施設担当者マスタ洗替')
batch_context = BatchContext.get_instance()
async def exec():
def exec():
db = Database.get_instance()
try:
db.connect()

View File

@ -7,15 +7,15 @@ batch_context = BatchContext.get_instance()
logger = get_logger('実績洗替')
async def exec():
def exec():
"""実績洗替処理"""
logger.debug('実績更新:起動')
logger.info('実績更新:起動')
# 営業日ではない場合、実績洗替処理は実行しない
if batch_context.is_not_business_day:
logger.info('営業日ではないため、実績洗替処理をスキップします。')
return
# 施設担当者洗替
await emp_chg_inst_laundering.exec()
emp_chg_inst_laundering.exec()
logger.debug('実績更新:終了')
logger.info('実績更新:終了')

View File

@ -0,0 +1,32 @@
"""並行処理"""
import concurrent.futures
from src.batch.bio_sales import create_bio_sale_lot
from src.batch.laundering import sales_laundering
from src.error.exceptions import BatchOperationException
def exec():
# 並行処理を開始
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 実績更新
future_sales_laundering = executor.submit(sales_laundering.exec)
# 生物由来ロット分解
future_create_bio_sales_lot = executor.submit(create_bio_sale_lot.exec)
# 両方の処理が完了するまで待つ
concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot])
# エラーがあれば呼び出し元でキャッチする
sales_laundering_exc = future_sales_laundering.exception()
create_bio_sales_lot_exc = future_create_bio_sales_lot.exception()
# いずれかにエラーが発生していれば、1つのエラーとして返す。
if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None:
sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else ''
create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else ''
raise BatchOperationException(f'並行処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}')
return

View File

@ -1,34 +0,0 @@
"""並行処理"""
import asyncio
from src.batch.bio_sales import create_bio_sales_lot
from src.batch.laundering import sales_laundering
from src.error.exceptions import BatchOperationException
async def exec():
tasks = {
'実績更新': sales_laundering.exec(),
'生物由来ロット分解': create_bio_sales_lot.exec()
}
# 並行処理を開始
# 両方の処理が完了するまで待つ
done, _ = await asyncio.wait(tasks.values(), return_when=asyncio.ALL_COMPLETED)
task_results = {}
# 実行結果を確認する
for task_key, task in zip(tasks.keys(), done):
try:
task.result()
task_results[task_key] = True
except Exception as e:
task_results[task_key] = str(e)
# エラーが発生しているかを確認し、エラーがあれば例外とする
if_error_tasks = {k: v for k, v in task_results.items() if v is not True}
if len(if_error_tasks.keys()) != 0:
messages = ', '.join([f'{k}={v}' for k, v in if_error_tasks.items()])
raise BatchOperationException(f'並行処理中にエラーが発生しました。{messages}')
return

View File

@ -1,7 +1,7 @@
"""実消化&アルトマーク 日次バッチ処理"""
from src.aws.s3 import ConfigBucket
from src.batch import parallel_tasks
from src.batch import parallel_processes
from src.batch.batch_functions import (
get_batch_statuses, update_batch_process_complete,
update_batch_processing_flag_in_processing)
@ -19,7 +19,7 @@ logger = get_logger('日次処理コントロール')
batch_context = BatchContext.get_instance()
async def exec():
def exec():
try:
logger.info('日次バッチ:開始')
try:
@ -100,7 +100,7 @@ async def exec():
try:
# 実績生物由来ロット分解と並行処理
logger.info('並行処理(実績更新-生物由来ロット分解):起動')
await parallel_tasks.exec()
parallel_processes.exec()
logger.info('並行処理(実績更新-生物由来ロット分解):終了')
except BatchOperationException as e:
logger.exception(f'並行処理(実績更新-生物由来ロット分解)エラー(異常終了){e}')
@ -130,5 +130,3 @@ async def exec():
except Exception as e:
logger.exception(f'日次バッチ処理中に想定外のエラーが発生しました {e}')
raise e
finally:
logger.info('日次バッチ:終了')