動作テスト前コミット

This commit is contained in:
mori.k 2025-05-27 16:38:47 +09:00
parent cd1e663b4a
commit e50b827d9f
3 changed files with 226 additions and 202 deletions

View File

@ -54,33 +54,8 @@ class S3Bucket():
_s3_client = S3Client() _s3_client = S3Client()
_bucket_name: str = None _bucket_name: str = None
class UltmarcBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def list_dat_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_dat_file(self, dat_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, dat_filename, f)
f.seek(0)
return temporary_file_path
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
# コピー元のファイルを削除
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class ConfigBucket(S3Bucket): class ConfigBucket(S3Bucket):
# TODO 日付更新処理で内容の修正を行う
_bucket_name = environment.JSKULT_CONFIG_BUCKET _bucket_name = environment.JSKULT_CONFIG_BUCKET
def download_holiday_list(self): def download_holiday_list(self):
@ -118,68 +93,24 @@ class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.JSKULT_BACKUP_BUCKET _bucket_name = environment.JSKULT_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket):
_folder = environment.ULTMARC_BACKUP_FOLDER
# TODO 設定値をecsタスク定義書から確認
class VjskBackupBucket(JskUltBackupBucket): class JskBackupBucket(JskUltBackupBucket):
_folder = environment.VJSK_BACKUP_FOLDER _folder = environment.VJSK_BACKUP_FOLDER
class JskSendBucket(S3Bucket):
_bucket_name = environment.JSKULT_DATA_BUCKET
_send_folder = environment.JSKULT_DATA_SEND_FOLDER
class VjskReceiveBucket(S3Bucket): def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str):
_bucket_name = environment.VJSK_DATA_BUCKET
_recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER
_s3_file_list = None
def get_s3_file_list(self):
self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def backup_dat_file(self, target_files: list, datetime_key: str):
jskult_backup_bucket = VjskBackupBucket()
for target_file in target_files:
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "")
backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jskult_backup_bucket._bucket_name, backup_key)
self._s3_client.delete_file(self._bucket_name, backup_from_file_path)
class VjskSendBucket(S3Bucket):
_bucket_name = environment.VJSK_DATA_BUCKET
_send_folder = environment.VJSK_DATA_SEND_FOLDER
def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
# S3バケットにファイルを移動 # S3バケットにファイルを移動
csv_file_name = f'{self._send_folder}/{vjsk_create_csv}' csv_file_name = f'{self._send_folder}/{jskult_create_csv}'
s3_client = S3Client() self._s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name)
s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name)
return return
def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str): def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー # バックアップバケットにコピー
vjsk_backup_bucket = VjskBackupBucket() jskult_backup_bucket = JskUltBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}' dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key) self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key)

View File

@ -0,0 +1,48 @@
class BatchContext:
__instance = None
__syor_date: str # 処理日(yyyy/mm/dd形式)
__is_not_business_day: bool # 日次バッチ起動日フラグ
__is_ultmarc_imported: bool # アルトマーク取込実施済フラグ
__is_vjsk_stock_import_day: bool # 卸在庫データ取込対象フラグ
def __init__(self) -> None:
self.__is_not_business_day = False
self.__is_ultmarc_imported = False
@classmethod
def get_instance(cls):
if cls.__instance is None:
cls.__instance = cls()
return cls.__instance
@property
def syor_date(self):
return self.__syor_date
@syor_date.setter
def syor_date(self, syor_date_str: str):
self.__syor_date = syor_date_str
@property
def is_not_business_day(self):
return self.__is_not_business_day
@is_not_business_day.setter
def is_not_business_day(self, flag: bool):
self.__is_not_business_day = flag
@property
def is_ultmarc_imported(self):
return self.__is_ultmarc_imported
@is_ultmarc_imported.setter
def is_ultmarc_imported(self, flag: bool):
self.__is_ultmarc_imported = flag
@property
def is_vjsk_stock_import_day(self):
return self.__is_vjsk_stock_import_day
@is_vjsk_stock_import_day.setter
def is_vjsk_stock_import_day(self, flag: bool):
self.__is_vjsk_stock_import_day = flag

View File

@ -1,6 +1,12 @@
import os import os
import csv
import os.path as path
import tempfile
from src.aws.s3 import JskSendBucket
from src.db.database import Database from src.db.database import Database
from src.error.exceptions import BatchOperationException from src.error.exceptions import BatchOperationException
from src.batch.common.batch_context import BatchContext
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
@ -9,19 +15,11 @@ from src.logging.get_logger import get_logger
logger = get_logger('DCF削除新規マスタ作成') logger = get_logger('DCF削除新規マスタ作成')
LOG_LEVEL = os.environ["LOG_LEVEL"]
PROCESS_NAME = os.environ["PROCESS_NAME"] PROCESS_NAME = os.environ["PROCESS_NAME"]
POST_PROCESS = os.environ["POST_PROCESS"] POST_PROCESS = os.environ["POST_PROCESS"]
MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"] MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"]
RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"]
JSK_DATA_SEND_FOLDER = os.environ["JSK_DATA_SEND_FOLDER"] CSV_FILE_NAME = os.environ['CSV_FILE_NAME']
JSK_BACKUP_FOLDER = os.environ["JSK_BACKUP_FOLDER"]
TRANSFER_RESULT_FOLDER = os.environ["TRANSFER_RESULT_FOLDER"]
DCF_INST_MERGE_SEND_FILE_NAME = os.environ["DCF_INST_MERGE_SEND_FILE_NAME"]
DB_CONNECTION_MAX_RETRY_ATTEMPT = os.environ["DB_CONNECTION_MAX_RETRY_ATTEMPT"]
DB_CONNECTION_RETRY_INTERVAL_INIT = os.environ["DB_CONNECTION_RETRY_INTERVAL_INIT"]
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS"]
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS"]
class DcfInstMergeIO(JskultBatchEntrypoint): class DcfInstMergeIO(JskultBatchEntrypoint):
def __init__(self): def __init__(self):
@ -49,125 +47,135 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskultBatchStatusManager.is_done_ultmarc_import(): if jskultBatchStatusManager.is_done_ultmarc_import():
(is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge:
# COM_施設からDCF削除新規マスタに登録
_output_add_dcf_inst_merge_log(duplication_inst_records)
# CSV出力
file_path = _make_csv_data(CSV_FILE_NAME)
# CSVをS3にアップロード
_upload_dcf_inst_merge_csv_file(CSV_FILE_NAME, file_path)
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
# com_instからdcf_inst_mergeにinsert
try: try:
self._db = Database.get_instance() self._db = Database.get_instance()
self._db.connect() self._db.connect()
self._db.begin() self._db.begin()
self._db.to_jst() self._db.to_jst()
(is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge:
_output_add_dcf_inst_merge_log(duplication_inst_records)
sql ="""\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
FROM
src05.COM_INST AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
AND
ci.DELETE_SCHE_REASON_CD = 'D'
AND
ci.DELETE_DATA IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
FROM
src07.DCF_INST_MERGE AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
SELECT
mia.INST_CD
FROM
src07.MST_INST_ASSN as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
)
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
ap.PRSB_INST_CD
FROM
src07.ATC_PHARM AS ap
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
trd.INST_CD
FROM
src07.TRN_RESULT_DATA AS trd
WHERE
trd.INST_CD = ci.DCF_DSF_INST_CD
)
)
;
"""
duplication_inst_records = self._db.execute_select(sql)
# DCF削除新規マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
NULL,
NULL,
NULL,
"Y",
batchuser,
SYSDATE(),
batchuser,
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
)
VALUES
{','.join(values_clauses)}
"""
self._db.execute(insert_sql, params)
return (True, duplication_inst_records)
except Exception as e: except Exception as e:
self._db.rollback() self._db.rollback()
raise BatchOperationException(e) raise BatchOperationException(e)
finally: finally:
self._db.disconnect() self._db.disconnect()
# TODO DCF施設削除新規マスタをS3に出力
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
sql ="""\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
FROM
COM_INST AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
AND
ci.DELETE_SCHE_REASON_CD = 'D'
AND
ci.DELETE_DATA IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
FROM
DCF_INST_MERGE AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
SELECT
mia.INST_CD
FROM
MST_INST_ASSN as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
)
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
ap.PRSB_INST_CD
FROM
ATC_PHARM AS ap
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
vtrd.INST_CD
FROM
VW_TRN_RESULT_DATA AS vtrd
WHERE
vtrd.INST_CD = ci.DCF_DSF_INST_CD
)
)
;
"""
duplication_inst_records = self._db.execute_select(sql)
# DCF施設統合マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
NULL,
NULL,
NULL,
"Y",
batchuser,
SYSDATE(),
batchuser,
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
)
VALUES
{','.join(values_clauses)}
"""
return (True, duplication_inst_records)
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date'] sys_update_date = duplication_inst_records[0]['sys_update_date']
@ -181,15 +189,52 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
for row in duplication_inst_records: for row in duplication_inst_records:
add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row)) add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row))
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力 # 顧客報告用にログ出力
logger.info( logger.info(
f"""DCF削除新規マスタが追加されました。 f"""DCF施設統合マスタが追加されました。
********************************************************** **********************************************************
適用月度 {set_year_month} 適用月度 {set_year_month}
********************************************************** **********************************************************
{add_dct_inst_merge_list} {add_dct_inst_merge_list}
********************************************************** **********************************************************
合計 {len(duplication_inst_records)}""" 合計 {len(duplication_inst_records)}"""
) )
return return
def _make_csv_data(record_inst: list, csv_file_name: str):
# CSVファイルを作成する
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_INST_CD','DUP_OPP_CD','START_MONTH',
'INVALID_FLG','REMARKS','DCF_INST_CD_NEW','ENABLED_FLG',
'CREATER','CREATE_DATE','UPDATER','UPDATE_DATE']
with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file:
# ヘッダ行書き込みくくり文字をつけない為にwriterowではなく、writeを使用しています
csv_file.write(f"{','.join(head_str)}\n")
# Shift-JIS、CRLF、価囲いありで書き込む
writer = csv.writer(csv_file, delimiter=',', lineterminator='\n',
quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL,
strict=True
)
# データ部分書き込み(施設)
for record_inst_data in record_inst:
record_inst_value = list(record_inst_data.values())
csv_data = ['' if n is None else n for n in record_inst_value]
writer.writerow(csv_data)
return csv_file_path
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str):
# S3バケットにファイルを移動
jsk_send_bucket = JskSendBucket()
# バッチ共通設定を取得
batch_context = BatchContext.get_instance()
jsk_send_bucket.upload_dcf_inst_merge_csv_file(csv_file_name, csv_file_path)
jsk_send_bucket.backup_dcf_inst_merge_csv_file(csv_file_name, batch_context.syor_date)
return