From d8b6dd9978f72eb227e4b4e2fa15ae871691bd71 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 19 Apr 2023 13:18:12 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20async-await=E3=81=A7=E9=9D=9E=E5=90=8C?= =?UTF-8?q?=E6=9C=9F=E5=87=A6=E7=90=86=E3=82=92=E5=88=B6=E5=BE=A1=E3=81=A7?= =?UTF-8?q?=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=97=E3=81=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/bio_sales/create_bio_sales_lot.py | 5 ++ .../laundering/emp_chg_inst_laundering.py | 43 +++++++++-------- .../src/db/async_database.py | 48 +++++++++++++++++++ 3 files changed, 75 insertions(+), 21 deletions(-) create mode 100644 ecs/jskult-batch-daily/src/db/async_database.py diff --git a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py index be9f2164..53b6a2c4 100644 --- a/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py +++ b/ecs/jskult-batch-daily/src/batch/bio_sales/create_bio_sales_lot.py @@ -13,6 +13,11 @@ async def exec(): logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。') return + # 並列処理の確認用コード + # import asyncio + # for i in range(50): + # logger.debug('生物由来卸販売ロット分解:処理中') + # await asyncio.sleep(0.5) # TODO: ここに処理を追記していく logger.debug('生物由来卸販売ロット分解:終了') diff --git a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py index 57ef56be..9cd5ba07 100644 --- a/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py +++ b/ecs/jskult-batch-daily/src/batch/laundering/emp_chg_inst_laundering.py @@ -1,51 +1,52 @@ from src.batch.batch_functions import logging_sql from src.batch.common.batch_context import BatchContext -from src.db.database import Database +from src.db.async_database import AsyncDatabase from src.error.exceptions import BatchOperationException from src.logging.get_logger import get_logger from src.time.elapsed_time import ElapsedTime -logger = get_logger('48-施設担当者マスタ洗替') +logger = get_logger('施設担当者マスタ洗替') batch_context = BatchContext.get_instance() async def exec(): - db = Database.get_instance() + db = AsyncDatabase.get_instance() try: - db.connect() + await db.connect() logger.debug('##########################') logger.debug('START Changing Employee in charge of institution PGM.') # 業務日付を取得 syor_date = batch_context.syor_date # `emp_chg_inst_lau`をTruncate - _truncate_emp_chg_inst_lau(db) + await _truncate_emp_chg_inst_lau(db) # emp_chg_inst から、`emp_chg_inst_lau`へInsert - _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db) + await _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db) # vop_hco_merge_vから、emp_chg_inst_lauをUpdate - _update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date) + await _update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date) # dcf_inst_mergeから、emp_chg_inst_lauをUpdate - _update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date) + await _update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date) logger.debug('##########################') logger.debug('End All Processing PGM.') except Exception as e: raise BatchOperationException(e) finally: - db.disconnect() + await db.disconnect() -def _truncate_emp_chg_inst_lau(db: Database): +async def _truncate_emp_chg_inst_lau(db: AsyncDatabase): logger.debug("##########################") try: - db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau") + await db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau") except Exception as e: logger.debug("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!") + logger.exception(e) raise e logger.debug("Table `emp_chg_inst_lau` was truncated!") return -def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database): +async def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: AsyncDatabase): logger.debug("##########################") try: elapsed_time = ElapsedTime() @@ -70,22 +71,21 @@ def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database): WHERE enabled_flg = 'Y' """ - res = db.execute(sql) + res = await db.execute(sql) logging_sql(logger, sql) logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})') except Exception as e: logger.debug("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!") raise e logger.debug("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!") - return -def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str): +async def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: AsyncDatabase, syor_date: str): # vop_hco_merge_vはデータが作られないため、この洗い替え処理は基本空振りする logger.debug("##########################") try: - select_result = db.execute_select( + select_result = await db.execute_select( """ SELECT COUNT(v_inst_cd) AS row_count @@ -106,7 +106,7 @@ def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str): logger.info('vop_hco_merge_v Table Data is exists!') # vop_hco_merge_v から、emp_chg_inst_lauをUpdateします - result = db.execute_select( + result = await db.execute_select( """ SELECT v_inst_cd, @@ -144,15 +144,16 @@ def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str): logger.debug(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!") raise e logger.debug(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!") + return -def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str): +async def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: AsyncDatabase, syor_date: str): # dcf_inst_mergeから、emp_chg_inst_lauをUpdate # Get count from DCF_INST_MERGE logger.debug("##########################") try: - select_result = db.execute_select( + select_result = await db.execute_select( """ SELECT COUNT(dcf_inst_cd) AS row_count @@ -166,6 +167,7 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str): """, {'syor_date': syor_date} ) + except Exception as e: logger.debug("Error! Getting Count of dcf_inst_merge was failed!") raise e @@ -203,7 +205,7 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str): WHERE el.inst_cd = dm.dcf_inst_cd """ - res = db.execute( + res = await db.execute( update_sql, {'syor_date': syor_date} ) @@ -214,5 +216,4 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str): raise e logger.debug("emp_chg_inst_lau.v_inst_cd was set!") - return diff --git a/ecs/jskult-batch-daily/src/db/async_database.py b/ecs/jskult-batch-daily/src/db/async_database.py new file mode 100644 index 00000000..fb026fa4 --- /dev/null +++ b/ecs/jskult-batch-daily/src/db/async_database.py @@ -0,0 +1,48 @@ +import asyncio + +from sqlalchemy import CursorResult +from tenacity import retry, stop_after_attempt, wait_exponential + +from src.db.database import Database +from src.logging.get_logger import get_logger +from src.system_var import environment + +logger = get_logger(__name__) + + +class AsyncDatabase(Database): + """データベース非同期操作クラス""" + + def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None: + super().__init__(username, password, host, port, schema) + + @classmethod + def get_instance(cls): + return cls( + username=environment.DB_USERNAME, + password=environment.DB_PASSWORD, + host=environment.DB_HOST, + port=environment.DB_PORT, + schema=environment.DB_SCHEMA + ) + + @retry( + wait=wait_exponential( + multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT, + min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS, + max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS + ), + stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT)) + async def connect(self): + await asyncio.get_event_loop().run_in_executor(None, super().connect) + + async def execute_select(self, select_query: str, parameters=None) -> list[dict]: + res = await asyncio.get_event_loop().run_in_executor(None, super().execute_select, select_query, parameters) + return res + + async def execute(self, query: str, parameters=None) -> CursorResult: + res = await asyncio.get_event_loop().run_in_executor(None, super().execute, query, parameters) + return res + + async def disconnect(self): + await asyncio.get_event_loop().run_in_executor(None, super().disconnect)