作業中
This commit is contained in:
parent
7f82cc9e48
commit
626ee291c3
236
ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py
Normal file
236
ecs/jskult-batch-monthly/src/batch/jskult_batch_monthly.py
Normal file
@ -0,0 +1,236 @@
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
# from src.aws.s3 import UltmarcBucket
|
||||
# from src.batch.common.batch_context import BatchContext
|
||||
from src.db.database import Database
|
||||
from src.error.exceptions import BatchOperationException
|
||||
from src.logging.get_logger import get_logger
|
||||
# from src.system_var import constants
|
||||
import pathlib
|
||||
import os
|
||||
|
||||
logger = get_logger('実消化&アルトマーク月次バッチ')
|
||||
|
||||
|
||||
class JskultBathcMonthly():
|
||||
""" 実消化&アルトマーク月次バッチ """
|
||||
|
||||
# WKテーブルの過去分削除SQL
|
||||
PHYSICAL_NORMAL_DELETE_QUERY = """\
|
||||
DELETE FROM src05.wk_inst_aris_if
|
||||
"""
|
||||
|
||||
# 正常系データを取得しWKテーブルに保存SQL
|
||||
NORMAL_INSERT_SELECT_QUERY = """\
|
||||
INSERT src05.wk_inst_aris_if
|
||||
SELECT
|
||||
TRIM(' ' FROM TRIM(' ' FROM SUBSTRIN(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
|
||||
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
|
||||
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
|
||||
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.prefc_cd)) AS pref_cd
|
||||
,TRIM(' ' FROM TRIM(' ' FROM SUBSTR(cp.prefc_name,1,8))) AS pref_name
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.postal_number)) AS postal_cd
|
||||
,TRIM(' ' FROM TRIM(' ' FROM cc.city_name)) AS city_name
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_addr)) AS address
|
||||
,TRIM(' ' FROM TRIM(' ' FROM cd.inst_div_name))
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_phone_number)) AS phone_no
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.inst_div_cd))
|
||||
,TRIM(' ' FROM TRIM(' ' FROM ci.manage_cd))
|
||||
,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date
|
||||
,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date
|
||||
,sysdate()
|
||||
FROM src05.com_inst ci
|
||||
LEFT JOIN src05.mst_prefc cp
|
||||
ON ci.prefc_cd = cp.prefc_cd
|
||||
LEFT JOIN src05.mst_city cc
|
||||
ON ci.prefc_cd = cc.prefc_cd
|
||||
AND ci.city_cd = cc.city_cd
|
||||
LEFT OUTER JOIN src05.JOIN com_inst_div cd
|
||||
ON ci.inst_div_cd = cd.inst_div_cd
|
||||
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
|
||||
AND ci.dcf_dsf_inst_cd IS NOT NULL
|
||||
AND ci.form_inst_name_kanji IS NOT NULL
|
||||
AND ci.prefc_cd IS NOT NULL
|
||||
AND cp.prefc_name IS NOT NULL
|
||||
AND cc.city_name IS NOT NULL
|
||||
AND ci.inst_addr IS NOT NULL
|
||||
ORDER BY ci.dcf_dsf_inst_cd
|
||||
"""
|
||||
|
||||
# 正常系データの件数を取得SQL
|
||||
NORMAL_COUNT_QUERY = """\
|
||||
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if
|
||||
"""
|
||||
|
||||
# 異常系WKテーブルの過去分削除SQL
|
||||
PHYSICAL_ABNORMAL_DELETE_QUERY = """\
|
||||
DELETE FROM src05.wk_inst_aris_if_wrn
|
||||
"""
|
||||
|
||||
# 異常系データを取得しWKテーブルに保存SQL
|
||||
ABNORMAL_INSERT_SELECT_QUERY = """\
|
||||
INSERT src05.wk_inst_aris_if_wrn
|
||||
SELECT
|
||||
TRIM(' ' FROM TRIM(' ' FROM SUBSTRING(ci.dcf_dsf_inst_cd,3))) AS dcf_inst_cd
|
||||
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kanji,1,50))) AS inst_name_form
|
||||
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.inst_name_kanji,1,10))) AS inst_name
|
||||
,TRIM(' ' FROM TRIM(' ' from SUBSTR(ci.form_inst_name_kana,1,80))) AS inst_name_kana_form
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.prefc_cd)) AS pref_cd
|
||||
,TRIM(' ' FROM TRIM(' ' from SUBSTR(cp.prefc_name,1,8))) AS pref_name
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.postal_number)) AS postal_cd
|
||||
,TRIM(' ' FROM TRIM(' ' from cc.city_name)) AS city_name
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.inst_addr)) AS address
|
||||
,TRIM(' ' FROM TRIM(' ' from cd.inst_div_name))
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.inst_phone_number)) AS phone_no
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.inst_div_cd))
|
||||
,TRIM(' ' FROM TRIM(' ' from ci.manage_cd))
|
||||
,DATE_FORMAT(ci.sys_update_date,'%y%m%d') AS update_date
|
||||
,DATE_FORMAT(ci.abolish_ymd,'%y%m%d') AS delete_date
|
||||
,IF(ci.dcf_dsf_inst_cd IS NULL,'bi0402000001', NULL) AS wrnid_dcf_inst_cd
|
||||
,IF(ci.form_inst_name_kanji IS NULL,'bi0402000002', NULL) AS wrnid_inst_name_form
|
||||
,IF(ci.prefc_cd IS NULL,'bi0402000003', NULL) AS wrnid_pref_cd
|
||||
,IF(cp.prefc_name IS NULL,'bi0402000004', NULL) AS wrnid_pref_name
|
||||
,IF(cc.city_name IS NULL,'bi0402000005', NULL) AS wrnid_city_name
|
||||
,IF(ci.inst_addr IS NULL,'bi0402000006', NULL) AS wrnid_address
|
||||
,sysdate()
|
||||
FROM src05.com_inst ci
|
||||
LEFT JOIN src05.mst_prefc cp
|
||||
ON ci.prefc_cd = cp.prefc_cd
|
||||
LEFT JOIN src05.mst_city cc
|
||||
ON ci.prefc_cd = cc.prefc_cd
|
||||
AND ci.city_cd = cc.city_cd
|
||||
LEFT OUTER JOIN src05.com_inst_div cd
|
||||
ON ci.inst_div_cd = cd.inst_div_cd
|
||||
WHERE ci.dcf_dsf_inst_cd NOT LIKE '%9999999%'
|
||||
AND( ci.dcf_dsf_inst_cd IS NULL
|
||||
OR ci.form_inst_name_kanji IS NULL
|
||||
OR ci.prefc_cd IS NULL
|
||||
OR cp.prefc_name IS NULL
|
||||
OR cc.city_name IS NULL
|
||||
OR ci.inst_addr IS NULL)
|
||||
ORDER BY ci.dcf_dsf_inst_cd
|
||||
"""
|
||||
|
||||
# 正常系データの件数を取得SQL
|
||||
ABNORMAL_COUNT_QUERY = """\
|
||||
SELECT COUNT(*) AS countNum FROM src05.wk_inst_aris_if_wrn
|
||||
"""
|
||||
# CSVファイルの作成用のSQL
|
||||
SELECT_QUERY = """\
|
||||
SELECT dcf_inst_cd, inst_name_form, inst_name, inst_name_kana_form, pref_cd, pref_name,
|
||||
postal_cd, city_name, address, inst_div_name, phone_no, inst_div_cd, manage_cd,
|
||||
'', inst_delete_date
|
||||
FROM src05.wk_inst_aris_if ORDER BY dcf_inst_cd
|
||||
"""
|
||||
|
||||
aris_log = '/var/log/temporarydwh/'
|
||||
move_file_path = '/data/mountaris/DATA/'
|
||||
create_date = datetime.now().strftime('%Y%m%d%H%M%S')
|
||||
create_date_format = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
aris_create_csv = f'/home/nds_dwh/tmpcsv/D0004_ARIS_M_DCF_{create_date}csv'
|
||||
aris_move_csv = f'{move_file_path}D0004_ARIS_M_DCF_{create_date}.csv'
|
||||
res_log = f'{aris_log}D0004{create_date}.log'
|
||||
move_res_og = f'{move_file_path}D0004{create_date}log'
|
||||
prg_id = 'PrgId:BI0402'
|
||||
head_str = 'TC_HOSPITAL, TJ_HOSPITAL, TJ_HOSPITALSHORT, TK_HOSPITAL, \
|
||||
TC_PREFECTURE, TJ_PREFECTURE, TJ_ZIPCODE, TJ_CITY, TJ_ADDRESS, TJ_DEPARTMENT, \
|
||||
TJ_TELEPHONENUMBER, TC_HOSPITALCAT, TC_HOSPITALTYPE, TS_UPDATE, TD_UPDATE'
|
||||
|
||||
start_msg = "MsgID:BI0000000001 Message:バッチ処理を開始しました。\n"
|
||||
dbConnect_err_msg = "MsgID:999999000002 Message:DB接続エラーです。\n"
|
||||
err_end_msg = "MsgID:BI0000009998 Message:バッチ処理を異常終了しました。\n"
|
||||
move_err_msg = "MsgID:BI0000000041 Message:S3バケットARISへのCSVデータ、実行ログ移動できませんでした。\n"
|
||||
sql_err_msg = "MsgID:999999000002 Message:SQL実行エラーです。\n"
|
||||
csv_err_msg = "MsgID:BI0000000040 Message:ワークデータの作成に失敗しました。\n"
|
||||
cnt_msg = "MsgID: Message: LogText:"
|
||||
suc_end_msg = "MsgID:BI0000009999 Message:バッチ処理を正常に終了しました。\n"
|
||||
|
||||
def exec_batch_monthly(self):
|
||||
""" 実消化&アルトマーク月次バッチ """
|
||||
try:
|
||||
# 実行ログに書き込む
|
||||
res_log_p = pathlib.Path(self.res_log)
|
||||
res_log_p.touch()
|
||||
os.chmod(self.res_log, '0664')
|
||||
|
||||
# 実行ログ
|
||||
resLog_f = open(self.res_log)
|
||||
print(f'{self.create_date_format}[DWH][3][INFO]{self.prg_id} {self.start_msg}')
|
||||
|
||||
db = Database.get_instance()
|
||||
# DB接続
|
||||
db.connect()
|
||||
# トランザクションの開始
|
||||
db.begin()
|
||||
|
||||
# 正常系データの反映
|
||||
# 過去分は不要のため、デリート
|
||||
db.execute(self.PHYSICAL_NORMAL_DELETE_QUERY)
|
||||
|
||||
# 正常系データを取得しWKテーブルに保存する。
|
||||
db.execute(self.NORMAL_INSERT_SELECT_QUERY)
|
||||
|
||||
# 正常系データの件数を取得
|
||||
record_count = db.execute_select(self.NORMAL_COUNT_QUERY)
|
||||
suc_count = record_count[0]['countNum']
|
||||
|
||||
# 警告系データの反映
|
||||
# 過去分は不要のため、DWH.WK_INST_ARIS_IF_WRNをデリートする。
|
||||
db.execute(self.PHYSICAL_ABNORMAL_DELETE_QUERY)
|
||||
|
||||
# 異常系データを取得しWKテーブルに保存する。
|
||||
db.execute(self.ABNORMAL_INSERT_SELECT_QUERY)
|
||||
|
||||
# 異常系データの件数を取得
|
||||
record_count = db.execute_select(self.ABNORMAL_COUNT_QUERY)
|
||||
wrn_count = record_count[0]['countNum']
|
||||
|
||||
# CSVファイルの作成用のSQL実行
|
||||
record_csv = db.execute_select(self.SELECT_QUERY)
|
||||
|
||||
# CSVファイル作成
|
||||
arisCreateCsv_p = pathlib.Path(self.arisCreateCsv)
|
||||
arisCreateCsv_p.touch()
|
||||
if not os.path.exists(self.arisCreateCsv):
|
||||
print(f'{self.create_date_format}[DWH][5][ERROR]{self.prg_id} {self.csv_err_msg}')
|
||||
print(f'{self.create_date_format}[DWH][5][ERROR]{self.prg_id} {self.err_end_msg}')
|
||||
|
||||
# ヘッダ行書き込み
|
||||
resLog_f = open(self.aris_create_csv)
|
||||
print(f'{self.head_str}\r\n')
|
||||
|
||||
# データ部分書き込み
|
||||
for record_data in record_csv:
|
||||
csv_data = ",".join(record_data).encode('shift_jis')
|
||||
print(f'{csv_data}\r\n')
|
||||
|
||||
logger.info('use memory--->')
|
||||
logger.info('memory_get_usage 与えられたメモリの量')
|
||||
logger.info('\n')
|
||||
logger.info('max memory--->')
|
||||
logger.info('memory_get_peak_usage メモリの最大値')
|
||||
logger.info('\n')
|
||||
|
||||
resLog_f.close()
|
||||
|
||||
# トランザクションの終了
|
||||
db.commit()
|
||||
|
||||
# 実行ログファイルの追記
|
||||
# 実行ログに処理件数を書き込む。
|
||||
sum_count = suc_count + wrn_count
|
||||
print(f'{self.create_date_format}[DWH][3][INFO]{self.prg_id} {self.cnt_msg}(対象件数:{sum_count}/正常件数:{suc_count}/警告件数:{wrn_count})\n')
|
||||
|
||||
# ファイル移動処理
|
||||
|
||||
logger.info('実消化&アルトマーク月次バッチ処理: 終了')
|
||||
except Exception as e:
|
||||
raise BatchOperationException(e)
|
||||
|
||||
finally:
|
||||
# 終了時に必ずコミットする
|
||||
db.commit()
|
||||
db.disconnect()
|
||||
return
|
||||
@ -9,6 +9,7 @@ from src.batch.common.calendar_file import CalendarFile
|
||||
from src.error.exceptions import BatchOperationException
|
||||
from src.logging.get_logger import get_logger
|
||||
from src.system_var import constants
|
||||
from src.batch.jskult_batch_monthly import JskultBathcMonthly
|
||||
|
||||
logger = get_logger('月次処理コントロール')
|
||||
|
||||
@ -34,7 +35,7 @@ def exec():
|
||||
# dump取得が正常終了していない場合、後続の処理は行わない
|
||||
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
|
||||
logger.error('dump取得が正常終了していないため、月次バッチ処理を終了します。')
|
||||
return constants.BATCH_EXIT_CODE_SUCCESS
|
||||
# 戻すんだよ return constants.BATCH_EXIT_CODE_SUCCESS
|
||||
|
||||
logger.info(f'処理日={syor_date}')
|
||||
# バッチ共通設定に処理日を追加
|
||||
@ -61,7 +62,7 @@ def exec():
|
||||
|
||||
try:
|
||||
logger.info('月次バッチ:起動')
|
||||
# ultmarc_process.exec_import()
|
||||
JskultBathcMonthly.exec_batch_monthly()
|
||||
logger.info('月次バッチ:終了')
|
||||
except BatchOperationException as e:
|
||||
logger.exception(f'月次バッチ処理エラー(異常終了){e}')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user