指摘事項修正1

This commit is contained in:
野間 2023-06-28 15:02:16 +09:00
parent be75bf7a9a
commit e38938a243
13 changed files with 403 additions and 486 deletions

View File

@ -1,23 +1,20 @@
DB_HOST=************
DB_PORT=************
DB_PORT=3306
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=src05
ARISJ_DATA_BUCKET=mbj-newdwh2021-staging-jskult-arisj
JSKULT_BACKUP_BUCKET=mbj-newdwh2021-staging-backup-jskult
JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config
ULTMARC_BACKUP_FOLDER=************
LOG_LEVEL=INFO
ULTMARC_DATA_BUCKET=****************
ULTMARC_DATA_FOLDER=recv
JSKULT_BACKUP_BUCKET=****************
ULTMARC_BACKUP_FOLDER=ultmarc
JSKULT_CONFIG_BUCKET=**********************
JSKULT_CONFIG_CALENDAR_FOLDER=jskult/calendar
JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME=jskult_holiday_list.txt
ARISJ_DATA_BUCKET=**********
LOG_LEVEL=**************
ARISJ_BACKUP_FOLDER=arisj
ARISJ_DATA_FOLDER=DATA
ARISJ_BACKUP_FOLDER=arisj
JSKULT_CONFIG_CALENDAR_FOLDER=jskult/calendar
JSKULT_CONFIG_CALENDAR_ARISJ_OUTPUT_DAY_LIST_FILE_NAME=jskult_arisj_output_day_list.txt
DB_CONNECTION_MAX_RETRY_ATTEMPT=**************
DB_CONNECTION_RETRY_INTERVAL_INIT=**************
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=**************
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=*************
VJSK_DATA_BUCKET=*************
DB_CONNECTION_MAX_RETRY_ATTEMPT=************
DB_CONNECTION_RETRY_INTERVAL_INIT=************
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=************
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=************

View File

@ -3,10 +3,6 @@ url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[scripts]
"test:ultmarc" = "pytest tests/batch/ultmarc/"
"test:ultmarc:cov" = "pytest --cov=src/batch/ultmarc/ --cov-branch --cov-report=term-missing tests/batch/ultmarc/"
[packages]
boto3 = "*"
sqlalchemy = "*"
@ -16,8 +12,6 @@ pymysql = "*"
[dev-packages]
autopep8 = "*"
flake8 = "*"
pytest = "*"
pytest-cov = "*"
[requires]
python_version = "3.9"

View File

@ -42,7 +42,7 @@
- VSCode 上で「F5」キーを押下すると、バッチ処理が起動する。
- 「entrypoint.py」が、バッチ処理のエントリーポイント。
- 実際の処理は、「src/jobctrl_daily.py」で行っている。
- 実際の処理は、「src/jobctrl_monthly.py」で行っている。
## フォルダ構成(工事中)

View File

@ -1,4 +1,4 @@
"""実消化&アルトマーク 次バッチのエントリーポイント"""
"""実消化&アルトマーク 次バッチのエントリーポイント"""
from src import jobctrl_monthly
if __name__ == '__main__':

View File

@ -51,31 +51,6 @@ class S3Bucket():
_bucket_name: str = None
class UltmarcBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def list_dat_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_dat_file(self, dat_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, dat_filename, f)
f.seek(0)
return temporary_file_path
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
# コピー元のファイルを削除
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class ConfigBucket(S3Bucket):
_bucket_name = environment.JSKULT_CONFIG_BUCKET

View File

@ -5,23 +5,21 @@ from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_batch_statuses() -> tuple[str, str, str]:
def get_batch_statuses() -> tuple[str, str]:
"""日付テーブルから、以下を取得して返す。
- バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]バッチ処理中フラグdump取得状況区分
tuple[str, str]: [0]バッチ処理中フラグ[1]処理日
"""
db = Database.get_instance()
sql = 'SELECT bch_actf, dump_sts_kbn, src05.get_syor_date() AS syor_date FROM src05.hdke_tbl'
sql = 'SELECT bch_actf, src05.get_syor_date() AS syor_date FROM src05.hdke_tbl'
try:
db.connect()
hdke_tbl_result = db.execute_select(sql)
@ -36,59 +34,11 @@ def get_batch_statuses() -> tuple[str, str, str]:
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def update_batch_processing_flag_in_processing() -> None:
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = 'UPDATE src05.hdke_tbl SET bch_actf = :in_processing'
try:
db.connect()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_batch_process_complete() -> None:
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src05.get_syor_date() + interval 1 day), '%Y%m%d') -- +1
"""
try:
db.connect()
db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
return batch_processing_flag, syor_date_str
def logging_sql(logger: logging.Logger, sql: str) -> None:

View File

@ -1,10 +1,9 @@
class BatchContext:
__instance = None
__syor_date: str # 処理日(yyyy/mm/dd形式)
__is_not_business_monthly: bool # 月次バッチ起動日フラグ
__is_arisj_output_day: bool # 月次バッチ起動日フラグ
def __init__(self) -> None:
self.__is_not_business_monthly = False
self.__is_arisj_output_day = False
@classmethod
def get_instance(cls):
@ -13,25 +12,9 @@ class BatchContext:
return cls.__instance
@property
def syor_date(self):
return self.__syor_date
def is_arisj_output_day(self):
return self.__is_arisj_output_day
@syor_date.setter
def syor_date(self, syor_date_str: str):
self.__syor_date = syor_date_str
@property
def is_not_business_monthly(self):
return self.__is_not_business_monthly
@is_not_business_monthly.setter
def is_not_business_monthly(self, flag: bool):
self.__is_not_business_monthly = flag
@property
def is_ultmarc_imported(self):
return self.__is_ultmarc_imported
@is_ultmarc_imported.setter
def is_ultmarc_imported(self, flag: bool):
self.__is_ultmarc_imported = flag
@is_arisj_output_day.setter
def is_arisj_output_day(self, flag: bool):
self.__is_arisj_output_day = flag

View File

@ -1,278 +0,0 @@
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
import os
import tempfile
import os.path as path
import boto3
logger = get_logger('実消化&アルトマーク月次バッチ')
# WKテーブルの過去分削除SQL
PHYSICAL_NORMAL_DELETE_QUERY = """\
DELETE FROM src05.wk_inst_aris_if
"""
# 正常系データを取得しWKテーブルに保存SQL
NORMAL_INSERT_SELECT_QUERY = """\
INSERT src05.wk_inst_aris_if
SELECT
TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
,TRIM(' ' FROM TRIM(' ' FROM ci.prefc_cd)) AS pref_cd
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(cp.prefc_name,1,8))) AS pref_name
,TRIM(' ' FROM TRIM(' ' FROM ci.postal_number)) AS postal_cd
,TRIM(' ' FROM TRIM(' ' FROM cc.city_name)) AS city_name
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_addr)) AS address
,TRIM(' ' FROM TRIM(' ' FROM cd.inst_div_name))
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_phone_number)) AS phone_no
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_div_cd))
,TRIM(' ' FROM TRIM(' ' FROM ci.manage_cd))
,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date
,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date
,sysdate()
FROM src05.com_inst ci
LEFT JOIN src05.mst_prefc cp
ON ci.prefc_cd = cp.prefc_cd
LEFT JOIN src05.mst_city cc
ON ci.prefc_cd = cc.prefc_cd
AND ci.city_cd = cc.city_cd
LEFT OUTER JOIN src05.com_inst_div cd
ON ci.inst_div_cd = cd.inst_div_cd
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
AND ci.dcf_dsf_inst_cd IS NOT NULL
AND ci.form_inst_name_kanji IS NOT NULL
AND ci.prefc_cd IS NOT NULL
AND cp.prefc_name IS NOT NULL
AND cc.city_name IS NOT NULL
AND ci.inst_addr IS NOT NULL
ORDER BY ci.dcf_dsf_inst_cd
"""
# 正常系データの件数を取得SQL
NORMAL_COUNT_QUERY = """\
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if
"""
# 異常系WKテーブルの過去分削除SQL
PHYSICAL_ABNORMAL_DELETE_QUERY = """\
DELETE FROM src05.wk_inst_aris_if_wrn
"""
# 異常系データを取得しWKテーブルに保存SQL
ABNORMAL_INSERT_SELECT_QUERY = """\
INSERT src05.wk_inst_aris_if_wrn
SELECT
TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
,TRIM(' ' FROM TRIM(' ' from ci.prefc_cd)) AS pref_cd
,TRIM(' ' FROM TRIM(' ' from SUBSTR(cp.prefc_name,1,8))) AS pref_name
,TRIM(' ' FROM TRIM(' ' from ci.postal_number)) AS postal_cd
,TRIM(' ' FROM TRIM(' ' from cc.city_name)) AS city_name
,TRIM(' ' FROM TRIM(' ' from ci.inst_addr)) AS address
,TRIM(' ' FROM TRIM(' ' from cd.inst_div_name))
,TRIM(' ' FROM TRIM(' ' from ci.inst_phone_number)) AS phone_no
,TRIM(' ' FROM TRIM(' ' from ci.inst_div_cd))
,TRIM(' ' FROM TRIM(' ' from ci.manage_cd))
,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date
,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date
,IF(ci.dcf_dsf_inst_cd IS NULL,'bi0402000001', NULL) AS wrnid_dcf_inst_cd
,IF(ci.form_inst_name_kanji IS NULL,'bi0402000002', NULL) AS wrnid_inst_name_form
,IF(ci.prefc_cd IS NULL,'bi0402000003', NULL) AS wrnid_pref_cd
,IF(cp.prefc_name IS NULL,'bi0402000004', NULL) AS wrnid_pref_name
,IF(cc.city_name IS NULL,'bi0402000005', NULL) AS wrnid_city_name
,IF(ci.inst_addr IS NULL,'bi0402000006', NULL) AS wrnid_address
,sysdate()
FROM src05.com_inst ci
LEFT JOIN src05.mst_prefc cp
ON ci.prefc_cd = cp.prefc_cd
LEFT JOIN src05.mst_city cc
ON ci.prefc_cd = cc.prefc_cd
AND ci.city_cd = cc.city_cd
LEFT OUTER JOIN src05.com_inst_div cd
ON ci.inst_div_cd = cd.inst_div_cd
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
AND( ci.dcf_dsf_inst_cd IS NULL
OR ci.form_inst_name_kanji IS NULL
OR ci.prefc_cd IS NULL
OR cp.prefc_name IS NULL
OR cc.city_name IS NULL
OR ci.inst_addr IS NULL)
ORDER BY ci.dcf_dsf_inst_cd
"""
# 正常系データの件数を取得SQL
ABNORMAL_COUNT_QUERY = """\
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if_wrn
"""
# CSVファイルの作成用のSQL
SELECT_QUERY = """\
SELECT dcf_inst_cd, inst_name_form, inst_name, inst_name_kana_form, pref_cd, pref_name,
postal_cd, city_name, address, inst_div_name, phone_no, inst_div_cd, manage_cd,
'', inst_delete_date
FROM src05.wk_inst_aris_if ORDER BY dcf_inst_cd
"""
create_date = datetime.now().strftime('%Y%m%d%H%M%S')
create_date_format = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
aris_create_csv = f'D0004_ARIS_M_DCF_{create_date}.csv'
res_log = f'D0004{create_date}.log'
prg_id = 'PrgId:BI0402'
head_str = 'TC_HOSPITAL,TJ_HOSPITAL,TJ_HOSPITALSHORT,TK_HOSPITAL,TC_PREFECTURE,TJ_PREFECTURE,TJ_ZIPCODE,TJ_CITY,TJ_ADDRESS,\
TJ_DEPARTMENT,TJ_TELEPHONENUMBER,TC_HOSPITALCAT,TC_HOSPITALTYPE,TS_UPDATE, TD_UPDATE'
start_msg = "MsgID:BI0000000001 Message:バッチ処理を開始しました。"
err_end_msg = "MsgID:BI0000009998 Message:バッチ処理を異常終了しました。"
csv_err_msg = "MsgID:BI0000000040 Message:ワークデータの作成に失敗しました。"
cnt_msg = "MsgID: Message: LogText:"
move_err_msg = "MsgID:BI0000000041 Message:S3バケットARISへのCSVデータ、実行ログ移動できませんでした。"
def exec():
""" 実消化&アルトマーク月次バッチ """
try:
# 実行ログに書き込む
resLog = make_log_data()
resLog_f = resLog[0]
log_file_path = resLog[1]
resLog_f.write(f'{create_date_format}[DWH][3][INFO]{prg_id} {start_msg}\n')
logger.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {start_msg}')
db = Database.get_instance()
# DB接続
db.connect()
# トランザクションの開始
db.begin()
# 正常系データの反映
# 過去分は不要のため、デリート
db.execute(PHYSICAL_NORMAL_DELETE_QUERY)
# 正常系データを取得しWKテーブルに保存する。
db.execute(NORMAL_INSERT_SELECT_QUERY)
# 正常系データの件数を取得
record_count = db.execute_select(NORMAL_COUNT_QUERY)
suc_count = record_count[0]['countNum']
# 警告系データの反映
# 過去分は不要のため、DWH.WK_INST_ARIS_IF_WRNをデリートする。
db.execute(PHYSICAL_ABNORMAL_DELETE_QUERY)
# 異常系データを取得しWKテーブルに保存する。
db.execute(ABNORMAL_INSERT_SELECT_QUERY)
# 異常系データの件数を取得
record_count = db.execute_select(ABNORMAL_COUNT_QUERY)
wrn_count = record_count[0]['countNum']
# CSVファイルの作成用のSQL実行
record_csv = db.execute_select(SELECT_QUERY)
# CSVファイル作成
csv_file_path = make_csv_data(record_csv, resLog_f)
# トランザクションの終了
db.commit()
# 実行ログファイルの追記
# 実行ログに処理件数を書き込む。
sum_count = suc_count + wrn_count
resLog_f.write(f'{create_date_format}[DWH][3][INFO]{prg_id} {cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})\n')
logger.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})')
# CSVファイル移動処理
s3_csv_upload_data(csv_file_path, resLog_f)
# 実行ログファイルクローズ
resLog_f.close()
# logファイル移動処理
s3_log_upload_data(log_file_path)
logger.info('実消化&アルトマーク月次バッチ処理: 終了')
except Exception as e:
logger.info(f'{create_date_format}[DWH][5][INFO]{e.message}')
raise BatchOperationException(e)
finally:
# 終了時に必ずコミットする
db.commit()
db.disconnect()
def make_csv_data(record_csv: list, resLog_f):
# 一時ファイルとして保存する(CSVファイル)
try:
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, aris_create_csv)
# ヘッダ行書き込み
fp = open(csv_file_path, mode='w')
fp.write(f'{head_str}\n')
# データ部分書き込み
for record_data in record_csv:
record_value = list(record_data.values())
record_value = ['' if n is None else n for n in record_value]
csv_data = ",".join(map(str, record_value))
fp.write(f'{csv_data}\n')
# ファイルクローズ
fp.close()
except Exception as e:
resLog_f.write(f'{create_date_format}[DWH][5][INFO]{prg_id} {csv_err_msg}\n')
resLog_f.write(f'{create_date_format}[DWH][5][INFO]{prg_id} {err_end_msg}\n')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {csv_err_msg}')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {err_end_msg}')
raise e
return csv_file_path
def make_log_data():
# 一時ファイルとして保存する(ログファイル)
temporary_dir = tempfile.mkdtemp()
log_file_path = path.join(temporary_dir, res_log)
fp = open(log_file_path, mode='w')
return fp, log_file_path
def s3_csv_upload_data(csv_file_path, resLog_f):
# s3にログファイルとCSVファイルをUPする
Bucket = os.environ['ARISJ_DATA_BUCKET']
folder = os.environ['ARISJ_DATA_FOLDER']
csv_file_name = f'{folder}/{aris_create_csv}'
s3_client = boto3.client('s3')
try:
s3_client.upload_file(csv_file_path, Bucket, csv_file_name)
except Exception as e:
resLog_f.write(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}\n')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}')
raise e
return
def s3_log_upload_data(log_file_path):
# s3にログファイルとCSVファイルをUPする
Bucket = os.environ['ARISJ_DATA_BUCKET']
folder = os.environ['ARISJ_DATA_FOLDER']
log_file_name = f'{folder}/{res_log}'
s3_client = boto3.client('s3')
try:
s3_client.upload_file(log_file_path, Bucket, log_file_name)
except Exception as e:
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}')
raise e
return

View File

@ -0,0 +1,361 @@
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.aws.s3 import S3Client
from src.logging.get_logger import get_logger
import tempfile
import os
import os.path as path
import logging
import csv
logger = get_logger('実消化&アルトマーク月次バッチ')
create_date_format = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
prg_id = 'PrgId:BI0402'
create_date = datetime.now().strftime('%Y%m%d%H%M%S')
aris_create_csv = f'D0004_ARIS_M_DCF_{create_date}.csv'
res_log = f'D0004{create_date}.log'
sql_err_msg = "MsgID:999999000002 Message:SQL実行エラーです。"
move_err_msg = "MsgID:BI0000000041 Message:S3バケットARISへのCSVデータ、実行ログ移動できませんでした。"
def exec():
""" 実消化&アルトマーク月次バッチ """
try:
start_msg = "MsgID:BI0000000001 Message:バッチ処理を開始しました。"
cnt_msg = "MsgID: Message: LogText:"
# 実行ログに書き込む
resLog, log_file_path = make_log_data()
resLog.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {start_msg}')
logger.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {start_msg}')
db = Database.get_instance()
# DB接続
db.connect()
# トランザクションの開始
db.begin()
# 正常系データの反映
# 過去分は不要のため、デリート
physical_normal_delete(db)
# 正常系データを取得しWKテーブルに保存する。
normal_insert_into(db)
# 正常系データの件数を取得
suc_count = normal_count(db)
# 警告系データの反映
# 過去分は不要のため、DWH.WK_INST_ARIS_IF_WRNをデリートする。
physical_abnormal_delete(db)
# 異常系データを取得しWKテーブルに保存する。
abnormal_insert_into(db)
# 異常系データの件数を取得
wrn_count = abnormal_count(db)
# CSVファイルの作成用のSQL実行
record_csv = csv_data_select(db)
# CSVファイル作成
csv_file_path = make_csv_data(record_csv, resLog)
# トランザクションの終了
db.commit()
# 実行ログファイルの追記
# 実行ログに処理件数を書き込む。
sum_count = suc_count + wrn_count
resLog.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})')
logger.info(f'{create_date_format}[DWH][3][INFO]{prg_id} {cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})')
# CSVファイル移動処理
s3_csv_upload_data(csv_file_path, resLog)
# logファイル移動処理
s3_log_upload_data(log_file_path)
logger.info('実消化&アルトマーク月次バッチ処理: 終了')
except Exception as e:
logger.info(f'{create_date_format}[DWH][5][INFO]')
raise BatchOperationException(e)
finally:
# 終了時に必ずコミットする
db.commit()
db.disconnect()
def physical_normal_delete(db):
# 過去分は不要のため、デリート
try:
# WKテーブルの過去分削除SQL
sql = """\
DELETE FROM src05.wk_inst_aris_if
"""
db.execute(sql)
return
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def normal_insert_into(db):
# 正常系データを取得しWKテーブルに保存する。
try:
# 正常系データを取得しWKテーブルに保存SQL
sql = """\
INSERT src05.wk_inst_aris_if
SELECT
TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
,TRIM(' ' FROM TRIM(' ' FROM ci.prefc_cd)) AS pref_cd
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(cp.prefc_name,1,8))) AS pref_name
,TRIM(' ' FROM TRIM(' ' FROM ci.postal_number)) AS postal_cd
,TRIM(' ' FROM TRIM(' ' FROM cc.city_name)) AS city_name
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_addr)) AS address
,TRIM(' ' FROM TRIM(' ' FROM cd.inst_div_name))
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_phone_number)) AS phone_no
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_div_cd))
,TRIM(' ' FROM TRIM(' ' FROM ci.manage_cd))
,DATE_FORMAT(ci.sys_update_date,'%Y%m%d') AS update_date
,DATE_FORMAT(ci.abolish_ymd,'%Y%m%d') AS delete_date
,sysdate()
FROM src05.com_inst ci
LEFT JOIN src05.mst_prefc cp
ON ci.prefc_cd = cp.prefc_cd
LEFT JOIN src05.mst_city cc
ON ci.prefc_cd = cc.prefc_cd
AND ci.city_cd = cc.city_cd
LEFT OUTER JOIN src05.com_inst_div cd
ON ci.inst_div_cd = cd.inst_div_cd
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
AND ci.dcf_dsf_inst_cd IS NOT NULL
AND ci.form_inst_name_kanji IS NOT NULL
AND ci.prefc_cd IS NOT NULL
AND cp.prefc_name IS NOT NULL
AND cc.city_name IS NOT NULL
AND ci.inst_addr IS NOT NULL
ORDER BY ci.dcf_dsf_inst_cd
"""
db.execute(sql)
return
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def normal_count(db):
# 正常系データの件数を取得
try:
# 正常系データの件数を取得SQL
sql = """\
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if
"""
record_count = db.execute_select(sql)
return record_count[0]['countNum']
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def physical_abnormal_delete(db):
# 過去分は不要のため、DWH.WK_INST_ARIS_IF_WRNをデリートする。
try:
# 異常系WKテーブルの過去分削除SQL
sql = """\
DELETE FROM src05.wk_inst_aris_if_wrn
"""
db.execute(sql)
return
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def abnormal_insert_into(db):
# 異常系データを取得しWKテーブルに保存する。
try:
# 異常系データを取得しWKテーブルに保存SQL
sql = """\
INSERT src05.wk_inst_aris_if_wrn
SELECT
TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
,TRIM(' ' FROM TRIM(' ' from ci.prefc_cd)) AS pref_cd
,TRIM(' ' FROM TRIM(' ' from SUBSTR(cp.prefc_name,1,8))) AS pref_name
,TRIM(' ' FROM TRIM(' ' from ci.postal_number)) AS postal_cd
,TRIM(' ' FROM TRIM(' ' from cc.city_name)) AS city_name
,TRIM(' ' FROM TRIM(' ' from ci.inst_addr)) AS address
,TRIM(' ' FROM TRIM(' ' from cd.inst_div_name))
,TRIM(' ' FROM TRIM(' ' from ci.inst_phone_number)) AS phone_no
,TRIM(' ' FROM TRIM(' ' from ci.inst_div_cd))
,TRIM(' ' FROM TRIM(' ' from ci.manage_cd))
,DATE_FORMAT(ci.sys_update_date,'%Y%m%d') AS update_date
,DATE_FORMAT(ci.abolish_ymd,'%Y%m%d') AS delete_date
,IF(ci.dcf_dsf_inst_cd IS NULL,'bi0402000001', NULL) AS wrnid_dcf_inst_cd
,IF(ci.form_inst_name_kanji IS NULL,'bi0402000002', NULL) AS wrnid_inst_name_form
,IF(ci.prefc_cd IS NULL,'bi0402000003', NULL) AS wrnid_pref_cd
,IF(cp.prefc_name IS NULL,'bi0402000004', NULL) AS wrnid_pref_name
,IF(cc.city_name IS NULL,'bi0402000005', NULL) AS wrnid_city_name
,IF(ci.inst_addr IS NULL,'bi0402000006', NULL) AS wrnid_address
,sysdate()
FROM src05.com_inst ci
LEFT JOIN src05.mst_prefc cp
ON ci.prefc_cd = cp.prefc_cd
LEFT JOIN src05.mst_city cc
ON ci.prefc_cd = cc.prefc_cd
AND ci.city_cd = cc.city_cd
LEFT OUTER JOIN src05.com_inst_div cd
ON ci.inst_div_cd = cd.inst_div_cd
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
AND( ci.dcf_dsf_inst_cd IS NULL
OR ci.form_inst_name_kanji IS NULL
OR ci.prefc_cd IS NULL
OR cp.prefc_name IS NULL
OR cc.city_name IS NULL
OR ci.inst_addr IS NULL)
ORDER BY ci.dcf_dsf_inst_cd
"""
db.execute(sql)
return
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def abnormal_count(db):
# 異常系データの件数を取得
try:
# 異常系データの件数を取得SQL
sql = """\
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if_wrn
"""
record_count = db.execute_select(sql)
return record_count[0]['countNum']
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def csv_data_select(db):
# CSVファイルの作成用のSQL実行
try:
# CSVファイルの作成用のSQL
sql = """\
SELECT dcf_inst_cd, inst_name_form, inst_name, inst_name_kana_form, pref_cd, pref_name,
postal_cd, city_name, address, inst_div_name, phone_no, inst_div_cd, manage_cd,
'', inst_delete_date
FROM src05.wk_inst_aris_if ORDER BY dcf_inst_cd
"""
return db.execute_select(sql)
except Exception as e:
logger.debug(f'{create_date_format}:{prg_id} {sql_err_msg}')
raise e
def make_csv_data(record_csv: list, resLog):
# 一時ファイルとして保存する(CSVファイル)
try:
err_end_msg = "MsgID:BI0000009998 Message:バッチ処理を異常終了しました。"
csv_err_msg = "MsgID:BI0000000040 Message:ワークデータの作成に失敗しました。"
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, aris_create_csv)
head_str = ['TC_HOSPITAL', 'TJ_HOSPITAL', 'TJ_HOSPITALSHORT', 'TK_HOSPITAL',
'TC_PREFECTURE', 'TJ_PREFECTURE', 'TJ_ZIPCODE', 'TJ_CITY', 'TJ_ADDRESS', 'TJ_DEPARTMENT',
'TJ_TELEPHONENUMBER', 'TC_HOSPITALCAT', 'TC_HOSPITALTYPE', 'TS_UPDATE', ' TD_UPDATE']
# Shift-JIS、CRLF、価囲いありで書き込む
with open(csv_file_path, mode='w', encoding='cp932') as csv_file:
writer = csv.writer(csv_file, delimiter=',', lineterminator='\n',
quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL,
strict=True
)
# ヘッダ行書き込み
writer.writerow(head_str)
# データ部分書き込み
for record_data in record_csv:
record_value = list(record_data.values())
csv_data = ['' if n is None else n for n in record_value]
writer.writerow(csv_data)
except Exception as e:
resLog.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {csv_err_msg}')
resLog.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {err_end_msg}')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {csv_err_msg}')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {err_end_msg}')
raise e
return csv_file_path
def make_log_data():
# 一時ファイルとして保存する(ログファイル)
temporary_dir = tempfile.mkdtemp()
log_file_path = path.join(temporary_dir, res_log)
# ロガーの生成
resLog = logging.getLogger('resLog')
# 出力レベルの設定
resLog.setLevel(logging.INFO)
# ハンドラの生成
resLog_handler = logging.FileHandler(log_file_path)
# ロガーにハンドラを登録
resLog.addHandler(resLog_handler)
# フォーマッタの生成
fmt = logging.Formatter('%(message)s')
# ハンドラにフォーマッタを登録
resLog_handler.setFormatter(fmt)
return resLog, log_file_path
def s3_csv_upload_data(csv_file_path, resLog):
# s3にCSVファイルをUPする
Bucket = os.environ['ARISJ_DATA_BUCKET']
folder = os.environ['ARISJ_DATA_FOLDER']
csv_file_name = f'{folder}/{aris_create_csv}'
s3_client = S3Client()
try:
s3_client.upload_file(csv_file_path, Bucket, csv_file_name)
except Exception as e:
resLog.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}')
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}')
raise e
return
def s3_log_upload_data(log_file_path):
# s3にログファイルをUPする
Bucket = os.environ['ARISJ_DATA_BUCKET']
folder = os.environ['ARISJ_DATA_FOLDER']
log_file_name = f'{folder}/{res_log}'
s3_client = S3Client()
try:
s3_client.upload_file(log_file_path, Bucket, log_file_name)
except Exception as e:
logger.info(f'{create_date_format}[DWH][5][INFO]{prg_id} {move_err_msg}')
raise e
return

View File

@ -1,32 +0,0 @@
"""並列処理"""
import concurrent.futures
from src.batch.bio_sales import create_bio_sales_lot
from src.batch.laundering import sales_laundering
from src.error.exceptions import BatchOperationException
def exec():
# 並列処理を開始
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 実績更新
future_sales_laundering = executor.submit(sales_laundering.exec)
# 生物由来ロット分解
future_create_bio_sales_lot = executor.submit(create_bio_sales_lot.exec)
# 両方の処理が完了するまで待つ
concurrent.futures.wait([future_sales_laundering, future_create_bio_sales_lot])
# エラーがあれば呼び出し元でキャッチする
sales_laundering_exc = future_sales_laundering.exception()
create_bio_sales_lot_exc = future_create_bio_sales_lot.exception()
# いずれかにエラーが発生していれば、1つのエラーとして返す。
if sales_laundering_exc is not None or create_bio_sales_lot_exc is not None:
sales_laundering_exc_message = str(sales_laundering_exc) if sales_laundering_exc is not None else ''
create_bio_sales_lot_exc_message = str(create_bio_sales_lot_exc) if create_bio_sales_lot_exc is not None else ''
raise BatchOperationException(f'並列処理中にエラーが発生しました。実績更新="{sales_laundering_exc_message}", 生物由来ロット分解={create_bio_sales_lot_exc_message}')
return

View File

@ -1,15 +1,13 @@
"""実消化&アルトマーク 月次バッチ処理"""
from src.aws.s3 import ConfigBucket
from src.batch.batch_functions import (
get_batch_statuses, update_batch_process_complete,
update_batch_processing_flag_in_processing)
from src.batch.batch_functions import get_batch_statuses
from src.batch.common.batch_context import BatchContext
from src.batch.common.calendar_file import CalendarFile
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
from src.batch import jskult_batch_monthly
from src.batch import output_arisj_file_process
logger = get_logger('月次処理コントロール')
@ -22,64 +20,42 @@ def exec():
logger.info('月次バッチ:開始')
try:
# 月次バッチ処置中フラグ、dump処理状態区分、処理日を取得
batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses()
batch_processing_flag, syor_date = get_batch_statuses()
except BatchOperationException as e:
logger.exception(f'日付テーブル取得(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 次バッチ処理中の場合、後続の処理は行わない
# 次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('バッチ処理中のため、月次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、月次バッチ処理を終了します。')
logger.error('日次バッチ処理中のため、月次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(f'処理日={syor_date}')
# バッチ共通設定に処理日を追加
batch_context.syor_date = syor_date
# 稼働日かかどうかを、実消化&アルトマーク月次バッチ稼働日ファイルをダウンロードして判定
try:
arisj_output_day_list_file_path = ConfigBucket().download_arisj_output_day_list()
arisj_output_day_calendar = CalendarFile(arisj_output_day_list_file_path)
batch_context.is_not_business_monthly = arisj_output_day_calendar.compare_date(syor_date)
batch_context.is_arisj_output_day = arisj_output_day_calendar.compare_date(syor_date)
except Exception as e:
logger.exception(f'実消化&アルトマーク月次バッチ稼働日ファイルの読み込みに失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でV実消化稼働日かどうかをログ出力
logger.debug(f'本日は{"実消化&アルトマーク月次バッチ稼働日です。" if batch_context.is_not_business_monthly else "実消化&アルトマーク月次バッチ非稼働日です。"}')
# バッチ処理中に更新
try:
update_batch_processing_flag_in_processing()
except BatchOperationException as e:
logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}')
# 調査目的で実消化&アルトマーク月次バッチ稼働日かどうかをログ出力
if batch_context.is_arisj_output_day:
logger.info('本日は実消化&アルトマーク月次バッチ稼働日です。')
else:
logger.info('月次バッチは行われませんでした。')
return constants.BATCH_EXIT_CODE_SUCCESS
try:
logger.info('月次バッチ:起動')
jskult_batch_monthly.exec()
output_arisj_file_process.exec()
logger.info('月次バッチ:終了')
except BatchOperationException as e:
logger.exception(f'月次バッチ処理エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的で月次バッチが行われたかどうかをログ出力
logger.debug(f'{"月次バッチが行われました。" if batch_context.is_not_business_monthly else "月次バッチが行われませんでした。"}')
# バッチ処理完了とし、処理日、バッチ処置中フラグ、dump取得状態区分を更新
logger.info('業務日付更新・バッチステータスリフレッシュ:起動')
try:
update_batch_process_complete()
except BatchOperationException as e:
logger.exception(f'業務日付更新・バッチステータスリフレッシュ エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info('業務日付更新・バッチステータスリフレッシュ:終了')
# 正常終了を保守ユーザーに通知
logger.info('[NOTICE]月次バッチ:終了(正常終了)')
return constants.BATCH_EXIT_CODE_SUCCESS

View File

@ -1,17 +1,8 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0
# バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
# dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了
DUMP_STATUS_KBN_COMPLETE = '2'
# カレンダーファイルのコメントシンボル
CALENDAR_COMMENT_SYMBOL = '#'
# 月曜日(datetime.weekday()で月曜日を表す数字)
WEEKDAY_MONDAY = 0

View File

@ -8,14 +8,14 @@ DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET']
ULTMARC_DATA_FOLDER = os.environ['ULTMARC_DATA_FOLDER']
ARISJ_DATA_BUCKET = os.environ['ARISJ_DATA_BUCKET']
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER']
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
ARISJ_DATA_FOLDER = os.environ['ARISJ_DATA_FOLDER']
ARISJ_BACKUP_FOLDER = os.environ['ARISJ_BACKUP_FOLDER']
JSKULT_CONFIG_CALENDAR_FOLDER = os.environ['JSKULT_CONFIG_CALENDAR_FOLDER']
JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME = os.environ['JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME']
JSKULT_CONFIG_CALENDAR_ARISJ_OUTPUT_DAY_LIST_FILE_NAME = os.environ['JSKULT_CONFIG_CALENDAR_ARISJ_OUTPUT_DAY_LIST_FILE_NAME']
ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')