Merge pull request #161 feature-NEWDWH2021-1053 into develop

This commit is contained in:
下田雅人 2023-04-19 14:27:08 +09:00
commit 44bef7d91b
27 changed files with 659 additions and 270 deletions

View File

@ -6,5 +6,8 @@ DB_SCHEMA=src05
LOG_LEVEL=INFO
ULTMARC_DATA_BUCKET=****************
ULTMARC_DATA_FOLDER=recv
ULTMARC_BACKUP_BUCKET=****************
JSKULT_BACKUP_BUCKET=****************
ULTMARC_BACKUP_FOLDER=ultmarc
JSKULT_CONFIG_BUCKET=**********************
JSKULT_CONFIG_CALENDAR_FOLDER=jskult/calendar
JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME=jskult_holiday_list.txt

View File

@ -1,9 +1,9 @@
"""実消化&アルトマーク 日次バッチのエントリーポイント"""
from src.jobctrl_daily import batch_process
from src import jobctrl_daily
if __name__ == '__main__':
try:
exit(batch_process())
exit(jobctrl_daily.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はbatch_process内でログを出す。

View File

@ -76,8 +76,22 @@ class UltmarcBucket(S3Bucket):
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class ConfigBucket(S3Bucket):
_bucket_name = environment.JSKULT_CONFIG_BUCKET
def download_holiday_list(self):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME)
holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, holiday_list_key, f)
f.seek(0)
return temporary_file_path
class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.ULTMARC_BACKUP_BUCKET
_bucket_name = environment.JSKULT_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket):

View File

@ -1,56 +1,101 @@
"""バッチ処理の共通関数"""
import logging
import textwrap
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_syor_date() -> str:
"""DBから処理日を取得します
def get_batch_statuses() -> tuple[str, str, str]:
"""日付テーブルから、以下を取得して返す。
- 日次バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
str: hdke_tbl.syor_date
tuple[str, str]: [0]日次バッチ処理中フラグdump取得状況区分
"""
db = Database.get_instance()
db.connect()
sql = 'SELECT syor_date FROM src05.hdke_tbl'
sql = 'SELECT bch_actf, dump_sts_kbn, src05.get_syor_date() AS syor_date FROM src05.hdke_tbl'
try:
syor_date_result = db.execute_select(sql)
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
db.disconnect()
if len(syor_date_result) == 0:
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず一件取れる
syor_date_record = syor_date_result[0]
syor_date_str = syor_date_record['syor_date']
return syor_date_str
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def get_syor_date_as_date_format() -> str:
"""DBから処理日を取得し、yyyy/mm/ddのフォーマットにして返します
def update_batch_processing_flag_in_processing() -> None:
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
str: hdke_tbl.syor_dateをyyyy/mm/ddにフォーマットした文字列
BatchOperationException: DB操作の何らかのエラー
"""
syor_date_str = get_syor_date()
syor_date = datetime.strptime(syor_date_str, '%Y%m%d')
return syor_date.strftime('%Y/%m/%d')
db = Database.get_instance()
sql = 'UPDATE src05.hdke_tbl SET bch_actf = :in_processing'
try:
db.connect()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def logging_sql(logger, sql):
def update_batch_process_complete() -> None:
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src05.get_syor_date() + interval 1 day), '%Y%m%d') -- +1
"""
try:
db.connect()
db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def logging_sql(logger: logging.Logger, sql: str) -> None:
"""SQL文をデバッグログで出力する
Args:
logger (logging.Logger): ロガー
sql (str): SQL文
"""
logger.debug(f'\n{"-"*15}\n{textwrap.dedent(sql)[1:-1]}\n{"-"*15}')
logger.debug(f'\n{"-" * 15}\n{textwrap.dedent(sql)[1:-1]}\n{"-" * 15}')

View File

@ -0,0 +1,25 @@
from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('生物由来卸販売ロット分解')
def exec():
"""生物由来卸販売ロット分解"""
logger.debug('生物由来卸販売ロット分解:起動')
# 営業日ではない場合、処理をスキップする
if batch_context.is_not_business_day:
logger.info('営業日ではないため、生物由来卸販売ロット分解処理をスキップします。')
return
# # 非同期処理のサンプル
# import time
# for _ in range(50):
# logger.info('処理中')
# time.sleep(0.5)
# TODO: ここに処理を追記していく
logger.debug('生物由来卸販売ロット分解:終了')
return

View File

@ -1,10 +0,0 @@
class BatchConfig:
# 処理日(yyyy/mm/dd形式)
syor_date: str
__instance = None
@classmethod
def get_instance(cls):
if cls.__instance is None:
cls.__instance = cls()
return cls.__instance

View File

@ -0,0 +1,39 @@
class BatchContext:
__instance = None
__syor_date: str # 処理日(yyyy/mm/dd形式)
__is_not_business_day: bool # 日次バッチ起動日フラグ
__is_ultmarc_imported: bool # アルトマーク取込実施済フラグ
def __init__(self) -> None:
self.__is_not_business_day = False
self.__is_ultmarc_imported = False
@classmethod
def get_instance(cls):
if cls.__instance is None:
cls.__instance = cls()
return cls.__instance
@property
def syor_date(self):
return self.__syor_date
@syor_date.setter
def syor_date(self, syor_date_str: str):
self.__syor_date = syor_date_str
@property
def is_not_business_day(self):
return self.__is_not_business_day
@is_not_business_day.setter
def is_not_business_day(self, flag: bool):
self.__is_not_business_day = flag
@property
def is_ultmarc_imported(self):
return self.__is_ultmarc_imported
@is_ultmarc_imported.setter
def is_ultmarc_imported(self, flag: bool):
self.__is_ultmarc_imported = flag

View File

@ -0,0 +1,32 @@
from src.system_var import constants
class CalendarFile:
"""カレンダーファイル"""
__calendar_file_lines: list[str]
def __init__(self, calendar_file_path):
with open(calendar_file_path) as f:
self.__calendar_file_lines: list[str] = f.readlines()
def compare_date(self, date_str: str) -> bool:
"""与えられた日付がカレンダーファイル内に含まれているかどうか
カレンダーファイル内の日付はyyyy/mm/ddで書かれている前提
コメント#)が含まれている行は無視される
Args:
date_str (str): yyyy/mm/dd文字列
Returns:
bool: 含まれていればTrue
"""
for calendar_date in self.__calendar_file_lines:
# コメント行が含まれている場合はスキップ
if constants.CALENDAR_COMMENT_SYMBOL in calendar_date:
continue
if date_str in calendar_date:
return True
return False

View File

@ -1,11 +0,0 @@
from src.batch.datachange import emp_chg_inst_lau
from src.logging.get_logger import get_logger
logger = get_logger('実績洗替')
def batch_process():
"""実績洗替処理"""
logger.info('Start Jisseki Araigae Batch PGM.')
# 施設担当者洗替
emp_chg_inst_lau.batch_process()

View File

@ -0,0 +1,15 @@
from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('DCF施設統合マスタ作成')
def exec():
"""DCF施設統合マスタ作成"""
# アルトマーク取込が行われていない場合は処理をスキップする
if not batch_context.is_ultmarc_imported:
logger.info('アルトマーク取込が行われていないため、DCF施設統合マスタ作成処理をスキップします。')
pass

View File

@ -0,0 +1,16 @@
from src.batch.common.batch_context import BatchContext
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('メルク施設マスタ作成')
def exec():
"""メルク施設マスタ作成"""
# 営業日ではないかつ、アルトマーク取込が行われていない場合は処理をスキップする
if batch_context.is_not_business_day is True and batch_context.is_ultmarc_imported is not True:
logger.info('営業日ではない、かつ、アルトマーク取込が行われていないため、メルク施設マスタ作成処理をスキップします。')
return
pass

View File

@ -1,48 +1,52 @@
from src.batch.batch_functions import logging_sql
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.time.elapsed_time import ElapsedTime
logger = get_logger('48-施設担当者マスタ洗替')
batch_config = BatchConfig.get_instance()
logger = get_logger('施設担当者マスタ洗替')
batch_context = BatchContext.get_instance()
def batch_process():
def exec():
db = Database.get_instance()
db.connect()
logger.info('##########################')
logger.info('START Changing Employee in charge of institution PGM.')
# 業務日付を取得
syor_date = batch_config.syor_date
# `emp_chg_inst_lau`をTruncate
truncate_emp_chg_inst_lau(db)
# emp_chg_inst から、`emp_chg_inst_lau`へInsert
insert_into_emp_chg_inst_lau_from_emp_chg_inst(db)
# vop_hco_merge_vから、emp_chg_inst_lauをUpdate
update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date)
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date)
db.disconnect()
logger.info('##########################')
logger.info('End All Processing PGM.')
try:
db.connect()
logger.debug('##########################')
logger.debug('START Changing Employee in charge of institution PGM.')
# 業務日付を取得
syor_date = batch_context.syor_date
# `emp_chg_inst_lau`をTruncate
_truncate_emp_chg_inst_lau(db)
# emp_chg_inst から、`emp_chg_inst_lau`へInsert
_insert_into_emp_chg_inst_lau_from_emp_chg_inst(db)
# vop_hco_merge_vから、emp_chg_inst_lauをUpdate
_update_emp_chg_inst_lau_from_vop_hco_merge_v(db, syor_date)
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
_update_dcf_inst_merge_from_emp_chg_inst_lau(db, syor_date)
logger.debug('##########################')
logger.debug('End All Processing PGM.')
except Exception as e:
raise BatchOperationException(e)
finally:
db.disconnect()
def truncate_emp_chg_inst_lau(db: Database):
logger.info("##########################")
def _truncate_emp_chg_inst_lau(db: Database):
logger.debug("##########################")
try:
db.execute("TRUNCATE TABLE src05.emp_chg_inst_lau")
except Exception as e:
logger.info("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!")
raise BatchOperationException(e)
logger.debug("Error! Truncate Table `emp_chg_inst_lau` is Failed!!!")
raise e
logger.info("Table `emp_chg_inst_lau` was truncated!")
logger.debug("Table `emp_chg_inst_lau` was truncated!")
return
def insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logger.info("##########################")
def _insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logger.debug("##########################")
try:
elapsed_time = ElapsedTime()
sql = """
@ -70,16 +74,16 @@ def insert_into_emp_chg_inst_lau_from_emp_chg_inst(db: Database):
logging_sql(logger, sql)
logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!")
raise BatchOperationException(e)
logger.info("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!")
logger.debug("Error! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was failed!!!")
raise e
logger.debug("Success! Insert into `emp_chg_inst_lau` from `emp_chg_inst` was inserted!")
return
def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
def _update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
# vop_hco_merge_vはデータが作られないため、この洗い替え処理は基本空振りする
logger.info("##########################")
logger.debug("##########################")
try:
select_result = db.execute_select(
"""
@ -93,8 +97,8 @@ def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
{'syor_date': syor_date}
)
except Exception as e:
logger.info("Error! `vop_hco_merge_v` Table count error!")
raise BatchOperationException(e)
logger.debug("Error! `vop_hco_merge_v` Table count error!")
raise e
count = [row for row in select_result][0]['row_count']
if count == 0:
logger.info('vop_hco_merge_v Table Data is not exists!')
@ -137,16 +141,16 @@ def update_emp_chg_inst_lau_from_vop_hco_merge_v(db: Database, syor_date: str):
logging_sql(logger, update_sql)
logger.info(f'Query OK, {update_result.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!")
raise BatchOperationException(e)
logger.info(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!")
logger.debug(f"emp_chg_inst_lau v_inst_cd could not set from {v_inst_cd_merge} to {v_inst_cd_merge}!")
raise e
logger.debug(f"Success! emp_chg_inst_lau v_inst_cd was set from {v_inst_cd} to {v_inst_cd_merge}!")
return
def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
def _update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
# Get count from DCF_INST_MERGE
logger.info("##########################")
logger.debug("##########################")
try:
select_result = db.execute_select(
"""
@ -163,18 +167,19 @@ def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
{'syor_date': syor_date}
)
except Exception as e:
logger.info("Error! Getting Count of dcf_inst_merge was failed!")
raise BatchOperationException(e)
logger.debug("Error! Getting Count of dcf_inst_merge was failed!")
raise e
count = [row for row in select_result][0]['row_count']
if count == 0:
logger.info('dcf_inst_merge Table Data is not exists!')
return
logger.info('dcf_inst_merge Table Data is exists!')
logger.debug('dcf_inst_merge Table Data is exists!')
# dcf_inst_mergeから、emp_chg_inst_lauをUpdate
logger.info("##########################")
logger.info("#### UPDATE DATA #########")
logger.info("##########################")
logger.debug("##########################")
logger.debug("#### UPDATE DATA #########")
logger.debug("##########################")
try:
elapsed_time = ElapsedTime()
update_sql = """
@ -205,9 +210,9 @@ def update_dcf_inst_merge_from_emp_chg_inst_lau(db: Database, syor_date: str):
logging_sql(logger, update_sql)
logger.info(f'Query OK, {res.rowcount} rows affected ({elapsed_time.of})')
except Exception as e:
logger.info("emp_chg_inst_lau.v_inst_cd could not set!")
raise BatchOperationException(e)
logger.debug("emp_chg_inst_lau.v_inst_cd could not set!")
raise e
logger.info("emp_chg_inst_lau.v_inst_cd was set!")
logger.debug("emp_chg_inst_lau.v_inst_cd was set!")
return

View File

@ -0,0 +1,21 @@
from src.batch.common.batch_context import BatchContext
from src.batch.laundering import emp_chg_inst_laundering
from src.logging.get_logger import get_logger
batch_context = BatchContext.get_instance()
logger = get_logger('実績洗替')
def exec():
"""実績洗替処理"""
logger.info('実績更新:起動')
# 営業日ではない場合、実績洗替処理は実行しない
if batch_context.is_not_business_day:
logger.info('営業日ではないため、実績洗替処理をスキップします。')
return
# 施設担当者洗替
emp_chg_inst_laundering.exec()
logger.info('実績更新:終了')

View File

@ -0,0 +1,32 @@
"""並行処理"""
import concurrent.futures
from src.batch.bio_sales import create_bio_sales_lot
from src.batch.laundering import sales_laundering
from src.error.exceptions import BatchOperationException
def exec():
# 並行処理を開始
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 実績更新
future_sales_laundering = executor.submit(sales_laundering.exec)
# 生物由来ロット分解
future_create_bio_sales_lot = executor.submit(create_bio_sales_lot.exec)
# 両方の処理が完了するまで待つ
concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot])
# エラーがあれば呼び出し元でキャッチする
sales_laundering_exc = future_sales_laundering.exception()
create_bio_sales_lot_exc = future_create_bio_sales_lot.exception()
# いずれかにエラーが発生していれば、1つのエラーとして返す。
if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None:
sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else ''
create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else ''
raise BatchOperationException(f'並行処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}')
return

View File

@ -1,16 +1,20 @@
"""アルトマークデータ保管"""
"""アルトマークデータ処理"""
from datetime import datetime
from src.aws.s3 import UltmarcBucket
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc.datfile import DatFile
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
UltmarcTableMapperFactory
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('アルトマークデータ保管')
logger = get_logger('アルトマークデータ処理')
ultmarc_bucket = UltmarcBucket()
batch_config = BatchConfig.get_instance()
batch_context = BatchContext.get_instance()
def exec_import():
@ -22,13 +26,15 @@ def exec_import():
# ファイルがない場合は処理せず、正常終了とする
if len(dat_file_list) == 0:
logger.info('ファイルがないため、アルトマーク取込処理をスキップします')
logger.info('取込ファイルがないため、アルトマーク取込処理をスキップします')
# アルトマークデータ受信予定曜日である月曜日は、保守ユーザーに通知する
if datetime.strptime(batch_context.syor_date, '%Y/%m/%d').weekday() == constants.WEEKDAY_MONDAY:
logger.info('[NOTICE]アルトマークデータの受信がありませんでした')
return
# ファイルが複数ある場合はエラーとする
if len(dat_file_list) > 1:
logger.error(f'複数の取込ファイルがあるため、異常終了 ファイル一覧:{dat_file_list}')
return
raise BatchOperationException(f'複数の取込ファイルがあるため、異常終了 ファイル一覧:{dat_file_list}')
# ファイルの件数は必ず1件になる
dat_file_info = dat_file_list[0]
@ -41,7 +47,7 @@ def exec_import():
logger.info(f"{dat_file_name}を取り込みます")
# ファイルをバックアップ
# 現行は、jobctrl_dailyの先頭でやっている
ultmarc_bucket.backup_dat_file(dat_file_name, batch_config.syor_date)
ultmarc_bucket.backup_dat_file(dat_file_name, batch_context.syor_date)
# datファイルをダウンロード
local_file_path = ultmarc_bucket.download_dat_file(dat_file_name)
dat_file = DatFile.from_path(local_file_path)
@ -50,50 +56,57 @@ def exec_import():
# 処理後、ファイルをS3から削除する
logger.info(f'取り込み処理が完了したため、datファイルを削除。ファイル名={dat_file_name}')
ultmarc_bucket.delete_dat_file(dat_file_name)
except Exception as e:
logger.exception(e)
raise e
finally:
# アルトマーク取込済をマーク
batch_context.is_ultmarc_imported = True
logger.info('アルトマーク取込処理: 終了')
except Exception as e:
raise BatchOperationException(e)
def exec_export():
"""V実消化用施設・薬局薬店データ作成処理"""
if not batch_context.is_ultmarc_imported:
logger.info('アルトマーク取込が行われていないため、V実消化用施設・薬局薬店データ作成処理をスキップします。')
return
def _import_to_ultmarc_table(dat_file: DatFile):
db = Database.get_instance()
# DB接続
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
logger.info('Transaction BEGIN')
mapper_factory = UltmarcTableMapperFactory()
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(f'ERROR_LINE: {log_message}')
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
db.disconnect()
# 処理結果をログに出力する
logger.info('Transaction COMMIT')
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
try:
# DB接続
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
mapper_factory = UltmarcTableMapperFactory()
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.info(f'ERROR_LINE: {log_message}')
dat_file.count_up_error()
# 処理結果をログに出力する
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
# 1件でもエラーがあれば、通知用にログに出力する
if dat_file.error_count > 0:
logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。')
# 1件でもエラーがあれば、通知用にログに出力する
if dat_file.error_count > 0:
logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。')
finally:
# 終了時に必ずコミットする
db.commit()
db.disconnect()
return

View File

@ -1,12 +1,12 @@
from abc import ABCMeta, abstractmethod
from datetime import datetime
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc.utmp_tables.tables.ultmarc_table import UltmarcTable
from src.db.database import Database
# 処理日を使用するために、configを使用
batch_config = BatchConfig.get_instance()
batch_context = BatchContext.get_instance()
class UltmarcTableMapper(metaclass=ABCMeta):
@ -32,7 +32,7 @@ class UltmarcTableMapper(metaclass=ABCMeta):
'execute_date_str_ymd': execute_date_str_ymd,
'execute_datetime': execute_datetime,
# バッチ共通設定から処理日を取得
'syor_date': batch_config.syor_date
'syor_date': batch_context.syor_date
}
@abstractmethod

View File

@ -1,6 +1,3 @@
from tenacity import RetryError
class MeDaCaException(Exception):
pass
@ -11,7 +8,3 @@ class DBException(MeDaCaException):
class BatchOperationException(MeDaCaException):
pass
class MaxRetryExceededException(MeDaCaException, RetryError):
pass

View File

@ -1,97 +1,85 @@
from src.batch import jissekiaraigae
from src.batch.batch_functions import get_syor_date_as_date_format
from src.batch.common.batch_config import BatchConfig
"""実消化&アルトマーク 日次バッチ処理"""
from src.aws.s3 import ConfigBucket
from src.batch import parallel_processes
from src.batch.batch_functions import (
get_batch_statuses, update_batch_process_complete,
update_batch_processing_flag_in_processing)
from src.batch.common.batch_context import BatchContext
from src.batch.common.calendar_file import CalendarFile
from src.batch.laundering import create_dcf_inst_merge, create_mst_inst
from src.batch.ultmarc import ultmarc_process
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('日次処理コントロール') # ここを処理IDとかにするといいかもしれない
logger = get_logger('日次処理コントロール')
# バッチ共通設定を取得
batch_config = BatchConfig.get_instance()
batch_context = BatchContext.get_instance()
def batch_process():
def exec():
try:
logger.info('日次ジョブ:開始')
# logger.info('S3マウント状況確認')
# logger.error('S3マウントエラー:DWH異常終了')
# logger.error('S3マウントエラー:BIO異常終了')
# logger.info('データベース接続') # 実際には、ここでつなげているわけではないので、いらないと思う
# logger.error('データベース接続エラー(異常終了)') # 検査例外を捕まえて、共通的に出せばいいと思う
logger.info('日次バッチ:開始')
try:
logger.info('処理日取得')
syor_date = get_syor_date_as_date_format()
# 日次バッチ処置中フラグ、dump処理状態区分、処理日を取得
batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses()
except BatchOperationException as e:
logger.error(f'処理日取得エラー(異常終了){e}')
logger.exception(f'日付テーブル取得(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(f'処理日={syor_date}')
# バッチ共通設定に処理日を追加
batch_config.syor_date = syor_date
# 休日判定ファイルを読み込み
logger.info('休日判定処理')
if True: # 休日判定
logger.info('非営業日かつ月、火、水以外です。') # 分岐
try:
# 処理中フラグ判定。ここでdumpのフラグも見る
logger.info('処理フラグ更新中')
logger.info('処理フラグ更新終了')
except BatchOperationException as e:
logger.error(f'処理フラグ更新処理エラー(異常終了){e}')
logger.info('日次ジョブ:終了(正常終了)')
batch_context.syor_date = syor_date
# 稼働日かかどうかを、V実消化非稼働日ファイルをダウンロードして判定
try:
logger.info('日次ジョブ処理中判定')
if True: # 処理中判定
logger.error('処理フラグ処理中(異常終了)')
logger.info('処理中フラグの更新:起動')
logger.info('処理中フラグの更新:終了')
holiday_list_file_path = ConfigBucket().download_holiday_list()
holiday_calendar = CalendarFile(holiday_list_file_path)
batch_context.is_not_business_day = holiday_calendar.compare_date(syor_date)
except Exception as e:
logger.exception(f'V実消化非稼働日ファイルの読み込みに失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でV実消化稼働日かどうかをログ出力
logger.debug(f'本日は{"V実消化非稼働日です。" if batch_context.is_not_business_day else "V実消化稼働日です。"}')
# バッチ処理中に更新
try:
update_batch_processing_flag_in_processing()
except BatchOperationException as e:
logger.error(f'日次ジョブ処理中エラー(異常終了){e}')
# ↓ここから、やらない↓
# logger.info('処理前バックアップ実行')
# logger.info('処理前バックアップ:起動')
# logger.info('処理前バックアップ:終了')
# logger.error('処理前バックアップ処理エラー(異常終了)', $ex->getMessage())
# ↑ここまで↑
logger.info('卸在庫データ取込判定')
if True: # 卸在庫日判定
logger.info('卸在庫データ取込日です')
logger.debug('卸在庫データファイル名: {_PATH_OROSHI_ZAIKO}')
if True: # 卸在庫ファイル存在確認なければ異常終了
logger.error('卸在庫データ存在確認エラー(異常終了)')
logger.info('卸在庫データ存在確認:取込処理開始')
logger.debug('卸在庫データファイル名作成: {read_filename}')
logger.debug('ファイル移動OK{_MOVE_OROSHI_ZAIKO}') # S3からダウンロード
logger.debug('ファイル解凍OK{sprintf(_ZIP_OROSHI_ZAIKO, $read_filename)}') # gunzip -fなので、gzipを使う
logger.debug('ファイル名変更OK {sprintf(_RENAME_OROSHI_ZAIKO, $read_filename)}') # S3にアップロード
try:
logger.info('卸在庫データ取込:起動')
logger.info('卸在庫データ取込:終了')
except BatchOperationException as e:
logger.error(f'卸在庫データ取込処理エラー(異常終了){e}')
logger.info('日次処理(アルトマーク)')
if True: # アルトマークなければ
logger.info('日次処理(アルトマーク)実行対象日でない為未実行')
logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
logger.info('アルトマーク取込:起動')
ultmarc_process.exec_import()
logger.info('アルトマーク取込:終了')
except BatchOperationException as e:
logger.error(f'アルトマーク取込処理エラー(異常終了){e}')
if True: # 休日判定
try:
logger.info('メルク施設マスタ作成')
logger.info('メルク施設マスタ作成終了')
except BatchOperationException as e:
logger.error(f'メルク施設マスタ作成エラー(異常終了){e}')
try:
logger.info('DCF施設統合マスタ作成')
logger.info('DCF施設統合マスタ作成終了')
except BatchOperationException as e:
logger.error(f'DCF施設統合マスタ作成エラー異常終了{e}')
# if False: # ($holiday === FALSE) とにかく毎日動かす
logger.info('V実消化連携データ存在確認')
if True:
logger.error('V実消化連携データ存在確認異常終了')
logger.exception(f'アルトマーク取込処理エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でアルトマーク取込が行われたかどうかをログ出力
logger.debug(f'{"アルトマーク取込が行われました。" if batch_context.is_ultmarc_imported else "アルトマーク取込が行われませんでした。"}')
try:
logger.info('V実消化用施設・薬局薬店データ作成処理起動')
ultmarc_process.exec_export()
logger.info('V実消化用施設・薬局薬店データ作成処理終了')
except BatchOperationException as e:
logger.exception(f'V実消化用施設・薬局薬店データ作成処理エラー異常終了{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info('日次処理V実消化')
try:
@ -99,28 +87,46 @@ def batch_process():
logger.info('V実消化取込終了')
except BatchOperationException as e:
logger.exception(f'V実消化取込処理エラー異常終了{e}')
logger.info('日次処理(実績更新)')
try:
logger.info('実績更新:起動')
jissekiaraigae.batch_process()
logger.info('実績更新:終了')
except BatchOperationException as e:
logger.exception(f'実績更新処理エラー(異常終了){e}')
# ↓以下、ファイルのバックアップ以外はやらない↓
# logger.info('処理後バックアップ実行')
# logger.info('処理後バックアップ:起動')
# logger.info('処理後バックアップ:終了')
# logger.error('処理後バックアップ処理エラー(異常終了)', $ex->getMessage())
# ↑ここまでやらない↑
logger.info('処理中フラグの更新:非処理中')
try:
logger.info('処理中フラグの更新:起動')
logger.info('処理中フラグの更新:終了')
except BatchOperationException as e:
logger.exception(f'処理中フラグ更新エラー(異常終了){e}')
logger.info('ワークディレクトリクリーニング')
logger.info('日次ジョブ:終了(正常終了)')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
logger.info('メルク施設マスタ作成:起動')
create_mst_inst.exec()
logger.info('メルク施設マスタ作成:終了')
except BatchOperationException as e:
logger.exception(f'メルク施設マスタ作成 エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
# 実績生物由来ロット分解と並行処理
logger.info('並行処理(実績更新-生物由来ロット分解):起動')
parallel_processes.exec()
logger.info('並行処理(実績更新-生物由来ロット分解):終了')
except BatchOperationException as e:
logger.exception(f'並行処理(実績更新-生物由来ロット分解)エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
logger.info('DCF施設統合マスタ作成起動')
create_dcf_inst_merge.exec()
logger.info('DCF施設統合マスタ作成終了')
except BatchOperationException as e:
logger.exception(f'DCF施設統合マスタ作成エラー異常終了{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# バッチ処理完了とし、処理日、バッチ処置中フラグ、dump取得状態区分を更新
logger.info('業務日付更新・バッチステータスリフレッシュ:起動')
try:
update_batch_process_complete()
except BatchOperationException as e:
logger.exception(f'業務日付更新・バッチステータスリフレッシュ エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info('業務日付更新・バッチステータスリフレッシュ:終了')
# 正常終了を保守ユーザーに通知
logger.info('[NOTICE]日次バッチ:終了(正常終了)')
return constants.BATCH_EXIT_CODE_SUCCESS
return 0
except Exception as e:
logger.exception(f'日次バッチ処理中に想定外のエラーが発生しました {e}')
raise e

View File

@ -0,0 +1,17 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0
# バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
# dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了
DUMP_STATUS_KBN_COMPLETE = '2'
# カレンダーファイルのコメントシンボル
CALENDAR_COMMENT_SYMBOL = '#'
# 月曜日(datetime.weekday()で月曜日を表す数字)
WEEKDAY_MONDAY = 0

View File

@ -10,8 +10,11 @@ DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET']
ULTMARC_DATA_FOLDER = os.environ['ULTMARC_DATA_FOLDER']
ULTMARC_BACKUP_BUCKET = os.environ['ULTMARC_BACKUP_BUCKET']
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER']
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
JSKULT_CONFIG_CALENDAR_FOLDER = os.environ['JSKULT_CONFIG_CALENDAR_FOLDER']
JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME = os.environ['JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')

View File

@ -3,7 +3,7 @@ from datetime import datetime
import pytest
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import com_alma_mapper
from src.db.database import Database
from tests.testing_utility import (assert_table_results,
@ -18,13 +18,13 @@ class TestComAlmaMapper:
"""レイアウト区分004: COM_出身校"""
db: Database
batch_config: BatchConfig
batch_context: BatchContext
test_file_path: str = path.dirname(__file__)
@pytest.fixture(autouse=True, scope='function')
def pre_test(self, database: Database):
"""テスト実行前後処理"""
self.batch_config = BatchConfig.get_instance()
self.batch_context = BatchContext.get_instance()
# setup
self.db = database
self.db.connect()
@ -50,7 +50,7 @@ class TestComAlmaMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_alma_insert.csv'))
# 一旦全データをDBから削除
@ -95,7 +95,7 @@ class TestComAlmaMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_alma_update.csv'))
# 一旦全データをDBから削除
@ -152,7 +152,7 @@ class TestComAlmaMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_alma_delete.csv'))
# 一旦全データをDBから削除

View File

@ -2,7 +2,7 @@ import os.path as path
import pytest
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import \
com_dr_wrkplace_mapper
from src.db.database import Database
@ -18,13 +18,13 @@ class TestComDrWrkplaceMapper:
"""COM_DCF医師勤務先"""
db: Database
batch_config: BatchConfig
batch_context: BatchContext
test_file_path: str = path.dirname(__file__)
@ pytest.fixture(autouse=True, scope='function')
def pre_test(self, database: Database):
"""テスト実行前後処理"""
self.batch_config = BatchConfig.get_instance()
self.batch_context = BatchContext.get_instance()
# setup
self.db = database
@ -53,7 +53,7 @@ class TestComDrWrkplaceMapper:
# Arrange
# 処理日設定
# 適用開始日と同値になる
self.batch_config.syor_date = '2020/02/22'
self.batch_context.syor_date = '2020/02/22'
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_dr_wrkplace_insert.csv'))
# 一旦全データをDBから削除
@ -147,7 +147,7 @@ class TestComDrWrkplaceMapper:
# Arrange
# 処理日設定
# 適用開始日と同値、適用終了日の+1日になる
self.batch_config.syor_date = '2020/02/23'
self.batch_context.syor_date = '2020/02/23'
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_dr_wrkplace_update.csv'))
# 一旦全データをDBから削除
@ -264,7 +264,7 @@ class TestComDrWrkplaceMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = '2020/02/24'
self.batch_context.syor_date = '2020/02/24'
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_dr_wrkplace_delete.csv'))
# 一旦全データをDBから削除

View File

@ -3,7 +3,7 @@ from datetime import datetime
import pytest
from src.batch.common.batch_config import BatchConfig
from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import \
com_hamtec_mapper
from src.db.database import Database
@ -19,13 +19,13 @@ class TestComHamtecMapper:
"""レイアウト区分021: COM_高度先進医療"""
db: Database
batch_config: BatchConfig
batch_context: BatchContext
test_file_path: str = path.dirname(__file__)
@pytest.fixture(autouse=True, scope='function')
def pre_test(self, database: Database):
"""テスト実行前後処理"""
self.batch_config = BatchConfig.get_instance()
self.batch_context = BatchContext.get_instance()
# setup
self.db = database
self.db.connect()
@ -51,7 +51,7 @@ class TestComHamtecMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_hamtec_insert.csv'))
# 一旦全データをDBから削除
@ -96,7 +96,7 @@ class TestComHamtecMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_hamtec_update.csv'))
# 一旦全データをDBから削除
@ -153,7 +153,7 @@ class TestComHamtecMapper:
# Arrange
# 処理日設定
self.batch_config.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
self.batch_context.syor_date = datetime.strftime(datetime.now(), '%Y/%m/%d')
# テスト用のCSVを読み込む
test_dat_file = create_ultmarc_test_data_from_csv(path.join(self.test_file_path, 'com_hamtec_delete.csv'))
# 一旦全データをDBから削除

View File

@ -0,0 +1,131 @@
2022/12/03
2022/12/04
2022/12/10
2022/12/11
2022/12/17
2022/12/18
2022/12/24
2022/12/25
2022/12/29
2022/12/30
2022/12/31
2023/01/01
2023/01/02
2023/01/03
2023/01/07
2023/01/08
2023/01/09
2023/01/14
2023/01/15
2023/01/21
2023/01/22
2023/01/28
2023/01/29
2023/02/04
2023/02/05
2023/02/11
2023/02/12
2023/02/18
2023/02/19
2023/02/23
2023/02/25
2023/02/26
2023/03/04
2023/03/05
2023/03/11
2023/03/12
2023/03/18
2023/03/19
2023/03/21
2023/03/25
2023/03/26
2023/04/01
2023/04/02
2023/04/08
2023/04/09
2023/04/15
2023/04/16
2023/04/22
2023/04/23
2023/04/29
2023/04/30
2023/05/03
2023/05/04
2023/05/05
2023/05/06
2023/05/07
2023/05/13
2023/05/14
2023/05/20
2023/05/21
2023/05/27
2023/05/28
2023/06/03
2023/06/04
2023/06/10
2023/06/11
2023/06/17
2023/06/18
2023/06/24
2023/06/25
2023/07/01
2023/07/02
2023/07/08
2023/07/09
2023/07/15
2023/07/16
2023/07/17
2023/07/22
2023/07/23
2023/07/29
2023/07/30
2023/08/05
2023/08/06
2023/08/11
2023/08/12
2023/08/13
2023/08/19
2023/08/20
2023/08/26
2023/08/27
2023/09/02
2023/09/03
2023/09/09
2023/09/10
2023/09/16
2023/09/17
2023/09/18
2023/09/23
2023/09/24
2023/09/30
2023/10/01
2023/10/07
2023/10/08
2023/10/09
2023/10/14
2023/10/15
2023/10/21
2023/10/22
2023/10/28
2023/10/29
2023/11/03
2023/11/04
2023/11/05
2023/11/11
2023/11/12
2023/11/18
2023/11/19
2023/11/23
2023/11/25
2023/11/26
2023/12/02
2023/12/03
2023/12/09
2023/12/10
2023/12/16
2023/12/17
2023/12/23
2023/12/24
2023/12/29
2023/12/30
2023/12/31