feat: async-awaitで非同期処理を制御できるようにした

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-19 13:18:12 +09:00
parent 243373e647
commit d8b6dd9978
3 changed files with 75 additions and 21 deletions

View File

@ -13,6 +13,11 @@ async def exec():
logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。') logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。')
return return
# 並列処理の確認用コード
# import asyncio
# for i in range(50):
# logger.debug('生物由来卸販売ロット分解:処理中')
# await asyncio.sleep(0.5)
# TODO: ここに処理を追記していく # TODO: ここに処理を追記していく
logger.debug('生物由来卸販売ロット分解:終了') logger.debug('生物由来卸販売ロット分解:終了')

View File

@ -1,51 +1,52 @@
from src.batch.batch_functions import logging_sql from src.batch.batch_functions import logging_sql
from src.batch.common.batch_context import BatchContext from src.batch.common.batch_context import BatchContext
from src.db.database import Database from src.db.async_database import AsyncDatabase
from src.error.exceptions import BatchOperationException from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger from src.logging.get_logger import get_logger
from src.time.elapsed_time import ElapsedTime from src.time.elapsed_time import ElapsedTime
logger = get_logger('48-施設担当者マスタ洗替') logger = get_logger('施設担当者マスタ洗替')
batch_context = BatchContext.get_instance() batch_context = BatchContext.get_instance()
async def exec(): async def exec():
db = Database.get_instance() db = AsyncDatabase.get_instance()
try: try:
db.connect() await db.connect()
logger.debug('##########################') logger.debug('##########################')
logger.debug('START Changing Employee in charge of institution PGM.') logger.debug('START Changing Employee in charge of institution PGM.')
# 業務日付を取得 # 業務日付を取得
syor_date = batch_context.syor_date syor_date = batch_context.syor_date
# `emp_chg_inst_lau`をTruncate # `emp_chg_inst_lau`をTruncate
_truncate_emp_chg_inst_lau(db) await _truncate_emp_chg_inst_lau(db)
# emp_chg_inst から、`emp_chg_inst_lau`へInsert # emp_chg_inst から、`emp_chg_inst_lau`へInsert
_insert_into_emp_chg_inst_lau_from_emp_chg_inst(db) await _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db)
# vop_hco_merge_vから、emp_chg_inst_lauをUpdate # vop_hco_merge_vから、emp_chg_inst_lauをUpdate
_update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date) await _update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date)
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate # dcf_inst_mergeから、emp_chg_inst_lauをUpdate
_update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date) await _update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date)
logger.debug('##########################') logger.debug('##########################')
logger.debug('End All Processing PGM.') logger.debug('End All Processing PGM.')
except Exception as e: except Exception as e:
raise BatchOperationException(e) raise BatchOperationException(e)
finally: finally:
db.disconnect() await db.disconnect()
def _truncate_emp_chg_inst_lau(db: Database): async def _truncate_emp_chg_inst_lau(db: AsyncDatabase):
logger.debug("##########################") logger.debug("##########################")
try: try:
db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau") await db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau")
except Exception as e: except Exception as e:
logger.debug("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!") logger.debug("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!")
logger.exception(e)
raise e raise e
logger.debug("Table `emp_chg_inst_lau` was truncated!") logger.debug("Table `emp_chg_inst_lau` was truncated!")
return return
def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database): async def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: AsyncDatabase):
logger.debug("##########################") logger.debug("##########################")
try: try:
elapsed_time = ElapsedTime() elapsed_time = ElapsedTime()
@ -70,22 +71,21 @@ def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
WHERE WHERE
enabled_flg = 'Y' enabled_flg = 'Y'
""" """
res = db.execute(sql) res = await db.execute(sql)
logging_sql(logger, sql) logging_sql(logger, sql)
logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})') logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})')
except Exception as e: except Exception as e:
logger.debug("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!") logger.debug("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!")
raise e raise e
logger.debug("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!") logger.debug("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!")
return return
def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str): async def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: AsyncDatabase, syor_date: str):
# vop_hco_merge_vはデータが作られないため、この洗い替え処理は基本空振りする # vop_hco_merge_vはデータが作られないため、この洗い替え処理は基本空振りする
logger.debug("##########################") logger.debug("##########################")
try: try:
select_result = db.execute_select( select_result = await db.execute_select(
""" """
SELECT SELECT
COUNT(v_inst_cd) AS row_count COUNT(v_inst_cd) AS row_count
@ -106,7 +106,7 @@ def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
logger.info('vop_hco_merge_v Table Data is exists!') logger.info('vop_hco_merge_v Table Data is exists!')
# vop_hco_merge_v から、emp_chg_inst_lauをUpdateします # vop_hco_merge_v から、emp_chg_inst_lauをUpdateします
result = db.execute_select( result = await db.execute_select(
""" """
SELECT SELECT
v_inst_cd, v_inst_cd,
@ -144,15 +144,16 @@ def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
logger.debug(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!") logger.debug(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!")
raise e raise e
logger.debug(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!") logger.debug(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!")
return return
def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str): async def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: AsyncDatabase, syor_date: str):
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate # dcf_inst_mergeから、emp_chg_inst_lauをUpdate
# Get count from DCF_INST_MERGE # Get count from DCF_INST_MERGE
logger.debug("##########################") logger.debug("##########################")
try: try:
select_result = db.execute_select( select_result = await db.execute_select(
""" """
SELECT SELECT
COUNT(dcf_inst_cd) AS row_count COUNT(dcf_inst_cd) AS row_count
@ -166,6 +167,7 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
""", """,
{'syor_date': syor_date} {'syor_date': syor_date}
) )
except Exception as e: except Exception as e:
logger.debug("Error! Getting Count of dcf_inst_merge was failed!") logger.debug("Error! Getting Count of dcf_inst_merge was failed!")
raise e raise e
@ -203,7 +205,7 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
WHERE WHERE
el.inst_cd = dm.dcf_inst_cd el.inst_cd = dm.dcf_inst_cd
""" """
res = db.execute( res = await db.execute(
update_sql, update_sql,
{'syor_date': syor_date} {'syor_date': syor_date}
) )
@ -214,5 +216,4 @@ def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
raise e raise e
logger.debug("emp_chg_inst_lau.v_inst_cd was set!") logger.debug("emp_chg_inst_lau.v_inst_cd was set!")
return return

View File

@ -0,0 +1,48 @@
import asyncio
from sqlalchemy import CursorResult
from tenacity import retry, stop_after_attempt, wait_exponential
from src.db.database import Database
from src.logging.get_logger import get_logger
from src.system_var import environment
logger = get_logger(__name__)
class AsyncDatabase(Database):
"""データベース非同期操作クラス"""
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
super().__init__(username, password, host, port, schema)
@classmethod
def get_instance(cls):
return cls(
username=environment.DB_USERNAME,
password=environment.DB_PASSWORD,
host=environment.DB_HOST,
port=environment.DB_PORT,
schema=environment.DB_SCHEMA
)
@retry(
wait=wait_exponential(
multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT,
min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS,
max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS
),
stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT))
async def connect(self):
await asyncio.get_event_loop().run_in_executor(None, super().connect)
async def execute_select(self, select_query: str, parameters=None) -> list[dict]:
res = await asyncio.get_event_loop().run_in_executor(None, super().execute_select, select_query, parameters)
return res
async def execute(self, query: str, parameters=None) -> CursorResult:
res = await asyncio.get_event_loop().run_in_executor(None, super().execute, query, parameters)
return res
async def disconnect(self):
await asyncio.get_event_loop().run_in_executor(None, super().disconnect)