diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index 6203868d..f70cc049 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -54,33 +54,8 @@ class S3Bucket(): _s3_client = S3Client() _bucket_name: str = None - -class UltmarcBucket(S3Bucket): - _bucket_name = environment.ULTMARC_DATA_BUCKET - _folder = environment.ULTMARC_DATA_FOLDER - - def list_dat_file(self): - return self._s3_client.list_objects(self._bucket_name, self._folder) - - def download_dat_file(self, dat_filename: str): - # 一時ファイルとして保存する - temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}') - with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, dat_filename, f) - f.seek(0) - return temporary_file_path - - def backup_dat_file(self, dat_file_key: str, datetime_key: str): - # バックアップバケットにコピー - ultmarc_backup_bucket = UltmarcBackupBucket() - backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}' - self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key) - # コピー元のファイルを削除 - self._s3_client.delete_file(self._bucket_name, dat_file_key) - - class ConfigBucket(S3Bucket): + # TODO 日付更新処理で内容の修正を行う _bucket_name = environment.JSKULT_CONFIG_BUCKET def download_holiday_list(self): @@ -118,68 +93,24 @@ class JskUltBackupBucket(S3Bucket): _bucket_name = environment.JSKULT_BACKUP_BUCKET -class UltmarcBackupBucket(JskUltBackupBucket): - _folder = environment.ULTMARC_BACKUP_FOLDER - -class VjskBackupBucket(JskUltBackupBucket): +# TODO 設定値をecsタスク定義書から確認 +class JskBackupBucket(JskUltBackupBucket): _folder = environment.VJSK_BACKUP_FOLDER +class JskSendBucket(S3Bucket): + _bucket_name = environment.JSKULT_DATA_BUCKET + _send_folder = environment.JSKULT_DATA_SEND_FOLDER -class VjskReceiveBucket(S3Bucket): - _bucket_name = environment.VJSK_DATA_BUCKET - _recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER - - _s3_file_list = None - - def get_s3_file_list(self): - self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder) - return self._s3_file_list - - def download_data_file(self, data_filename: str): - temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') - with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, data_filename, f) - f.seek(0) - return temporary_file_path - - def unzip_data_file(self, filename: str): - temp_dir = os.path.dirname(filename) - decompress_filename = os.path.basename(filename).replace('.gz', '') - decompress_file_path = os.path.join(temp_dir, decompress_filename) - with gzip.open(filename, 'rb') as gz: - with open(decompress_file_path, 'wb') as decompressed_file: - shutil.copyfileobj(gz, decompressed_file) - - ret = [decompress_file_path] - return ret - - def backup_dat_file(self, target_files: list, datetime_key: str): - jskult_backup_bucket = VjskBackupBucket() - for target_file in target_files: - backup_from_file_path = target_file.get("filename") - backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "") - backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}' - self._s3_client.copy(self._bucket_name, backup_from_file_path, - jskult_backup_bucket._bucket_name, backup_key) - self._s3_client.delete_file(self._bucket_name, backup_from_file_path) - - -class VjskSendBucket(S3Bucket): - _bucket_name = environment.VJSK_DATA_BUCKET - _send_folder = environment.VJSK_DATA_SEND_FOLDER - - def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str): + def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str): # S3バケットにファイルを移動 - csv_file_name = f'{self._send_folder}/{vjsk_create_csv}' - s3_client = S3Client() - s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) + csv_file_name = f'{self._send_folder}/{jskult_create_csv}' + self._s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) return - def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str): + def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str): # バックアップバケットにコピー - vjsk_backup_bucket = VjskBackupBucket() + jskult_backup_bucket = JskUltBackupBucket() dat_key = f'{self._send_folder}/{dat_file_key}' - backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' - self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key) + backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' + self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key) diff --git a/ecs/jskult-batch/src/batch/common/batch_context.py b/ecs/jskult-batch/src/batch/common/batch_context.py new file mode 100644 index 00000000..b3fc4967 --- /dev/null +++ b/ecs/jskult-batch/src/batch/common/batch_context.py @@ -0,0 +1,48 @@ +class BatchContext: + __instance = None + __syor_date: str # 処理日(yyyy/mm/dd形式) + __is_not_business_day: bool # 日次バッチ起動日フラグ + __is_ultmarc_imported: bool # アルトマーク取込実施済フラグ + __is_vjsk_stock_import_day: bool # 卸在庫データ取込対象フラグ + + def __init__(self) -> None: + self.__is_not_business_day = False + self.__is_ultmarc_imported = False + + @classmethod + def get_instance(cls): + if cls.__instance is None: + cls.__instance = cls() + return cls.__instance + + @property + def syor_date(self): + return self.__syor_date + + @syor_date.setter + def syor_date(self, syor_date_str: str): + self.__syor_date = syor_date_str + + @property + def is_not_business_day(self): + return self.__is_not_business_day + + @is_not_business_day.setter + def is_not_business_day(self, flag: bool): + self.__is_not_business_day = flag + + @property + def is_ultmarc_imported(self): + return self.__is_ultmarc_imported + + @is_ultmarc_imported.setter + def is_ultmarc_imported(self, flag: bool): + self.__is_ultmarc_imported = flag + + @property + def is_vjsk_stock_import_day(self): + return self.__is_vjsk_stock_import_day + + @is_vjsk_stock_import_day.setter + def is_vjsk_stock_import_day(self, flag: bool): + self.__is_vjsk_stock_import_day = flag diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index fcb104bc..5a0199f0 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -1,6 +1,12 @@ import os +import csv +import os.path as path +import tempfile +from src.aws.s3 import JskSendBucket + from src.db.database import Database from src.error.exceptions import BatchOperationException +from src.batch.common.batch_context import BatchContext from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager @@ -9,19 +15,11 @@ from src.logging.get_logger import get_logger logger = get_logger('DCF削除新規マスタ作成') -LOG_LEVEL = os.environ["LOG_LEVEL"] PROCESS_NAME = os.environ["PROCESS_NAME"] POST_PROCESS = os.environ["POST_PROCESS"] MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"] RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] -JSK_DATA_SEND_FOLDER = os.environ["JSK_DATA_SEND_FOLDER"] -JSK_BACKUP_FOLDER = os.environ["JSK_BACKUP_FOLDER"] -TRANSFER_RESULT_FOLDER = os.environ["TRANSFER_RESULT_FOLDER"] -DCF_INST_MERGE_SEND_FILE_NAME = os.environ["DCF_INST_MERGE_SEND_FILE_NAME"] -DB_CONNECTION_MAX_RETRY_ATTEMPT = os.environ["DB_CONNECTION_MAX_RETRY_ATTEMPT"] -DB_CONNECTION_RETRY_INTERVAL_INIT = os.environ["DB_CONNECTION_RETRY_INTERVAL_INIT"] -DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS"] -DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = os.environ["DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS"] +CSV_FILE_NAME = os.environ['CSV_FILE_NAME'] class DcfInstMergeIO(JskultBatchEntrypoint): def __init__(self): @@ -49,125 +47,135 @@ class DcfInstMergeIO(JskultBatchEntrypoint): # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 if jskultBatchStatusManager.is_done_ultmarc_import(): + + (is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) + if is_add_dcf_inst_merge: + + # COM_施設からDCF削除新規マスタに登録 + _output_add_dcf_inst_merge_log(duplication_inst_records) + + # CSV出力 + file_path = _make_csv_data(CSV_FILE_NAME) + + # CSVをS3にアップロード + _upload_dcf_inst_merge_csv_file(CSV_FILE_NAME, file_path) + + def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: + # com_instからdcf_inst_mergeにinsert try: self._db = Database.get_instance() self._db.connect() self._db.begin() self._db.to_jst() - (is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) - if is_add_dcf_inst_merge: - _output_add_dcf_inst_merge_log(duplication_inst_records) + sql ="""\ + SELECT + ci.DCF_DSF_INST_CD, + ci.FORM_INST_NAME_KANJI, + ci.DELETE_SCHE_REASON_CD, + ci.DUP_OPP_CD, + ci.SYS_UPDATE_DATE + FROM + src05.COM_INST AS ci + WHERE + ci.DUP_OPP_CD IS NOT NULL + AND + ci.DELETE_SCHE_REASON_CD = 'D' + AND + ci.DELETE_DATA IS NULL + AND + ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() + AND + NOT EXISTS ( + SELECT + dim.DCF_INST_CD + FROM + src07.DCF_INST_MERGE AS dim + WHERE + dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + ) + AND + + (ci.DCF_DSF_INST_CD EXISTS( + SELECT + mia.INST_CD + FROM + src07.MST_INST_ASSN as mia + WHERE + mia.INST_CD = ci.DCF_DSF_INST_CD + ) + ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + ap.PRSB_INST_CD + FROM + src07.ATC_PHARM AS ap + WHERE + ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD + ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + trd.INST_CD + FROM + src07.TRN_RESULT_DATA AS trd + WHERE + trd.INST_CD = ci.DCF_DSF_INST_CD + ) + ) + ; + + """ + duplication_inst_records = self._db.execute_select(sql) + + # DCF削除新規マスタ取り込み + values_clauses = [] + params = {} + for clauses_no, row in enumerate(duplication_inst_records, start=1): + dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" + dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" + values_clause = f"""(:{dcf_inst_cd_arr}, + :{dup_opp_cd_arr}, + DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), + NULL, + NULL, + NULL, + "Y", + batchuser, + SYSDATE(), + batchuser, + SYSDATE() + )""" + values_clauses.append(values_clause) + params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] + params[dup_opp_cd_arr] = row['DUP_OPP_CD'] + insert_sql = f""" + INSERT INTO + src07.dcf_inst_merge ( + DCF_INST_CD, + DUP_OPP_CD, + START_MONTH, + INVALID_FLG, + REMARKS, + DCF_INST_CD_NEW, + ENABLED_FLG, + CREATER, + CREATE_DATE, + UPDATER, + UPDATE_DATE + ) + VALUES + {','.join(values_clauses)} + """ + + self._db.execute(insert_sql, params) + + return (True, duplication_inst_records) except Exception as e: self._db.rollback() raise BatchOperationException(e) finally: self._db.disconnect() - # TODO DCF施設削除新規マスタをS3に出力 - - - - def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: - sql ="""\ - SELECT - ci.DCF_DSF_INST_CD, - ci.FORM_INST_NAME_KANJI, - ci.DELETE_SCHE_REASON_CD, - ci.DUP_OPP_CD, - ci.SYS_UPDATE_DATE - FROM - COM_INST AS ci - WHERE - ci.DUP_OPP_CD IS NOT NULL - AND - ci.DELETE_SCHE_REASON_CD = 'D' - AND - ci.DELETE_DATA IS NULL - AND - ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() - AND - NOT EXISTS ( - SELECT - dim.DCF_INST_CD - FROM - DCF_INST_MERGE AS dim - WHERE - dim.DCF_INST_CD = ci.DCF_DSF_INST_CD - ) - AND - (ci.DCF_DSF_INST_CD EXISTS( - SELECT - mia.INST_CD - FROM - MST_INST_ASSN as mia - WHERE - mia.INST_CD = ci.DCF_DSF_INST_CD - ) - ) - OR ci.DCF_DSF_INST_CD EXISTS( - SELECT - ap.PRSB_INST_CD - FROM - ATC_PHARM AS ap - WHERE - ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD - ) - OR ci.DCF_DSF_INST_CD EXISTS( - SELECT - vtrd.INST_CD - FROM - VW_TRN_RESULT_DATA AS vtrd - WHERE - vtrd.INST_CD = ci.DCF_DSF_INST_CD - ) - ) - ; - - """ - duplication_inst_records = self._db.execute_select(sql) - - # DCF施設統合マスタ取り込み - values_clauses = [] - params = {} - for clauses_no, row in enumerate(duplication_inst_records, start=1): - dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" - dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" - values_clause = f"""(:{dcf_inst_cd_arr}, - :{dup_opp_cd_arr}, - DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), - NULL, - NULL, - NULL, - "Y", - batchuser, - SYSDATE(), - batchuser, - SYSDATE() - )""" - values_clauses.append(values_clause) - params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] - params[dup_opp_cd_arr] = row['DUP_OPP_CD'] - insert_sql = f""" - INSERT INTO - src07.dcf_inst_merge ( - DCF_INST_CD, - DUP_OPP_CD, - START_MONTH, - INVALID_FLG, - REMARKS, - DCF_INST_CD_NEW, - ENABLED_FLG, - CREATER, - CREATE_DATE, - UPDATER, - UPDATE_DATE - ) - VALUES - {','.join(values_clauses)} - """ - return (True, duplication_inst_records) - def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): sys_update_date = duplication_inst_records[0]['sys_update_date'] @@ -181,15 +189,52 @@ class DcfInstMergeIO(JskultBatchEntrypoint): for row in duplication_inst_records: add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row)) add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) - # 顧客報告用にログ出力 logger.info( - f"""DCF削除新規マスタが追加されました。 + f"""DCF施設統合マスタが追加されました。 ********************************************************** 適用月度 {set_year_month} ********************************************************** {add_dct_inst_merge_list} ********************************************************** 合計 {len(duplication_inst_records)}件""" - ) - return \ No newline at end of file + ) + return + + + def _make_csv_data(record_inst: list, csv_file_name: str): + # CSVファイルを作成する + temporary_dir = tempfile.mkdtemp() + csv_file_path = path.join(temporary_dir, csv_file_name) + + head_str = ['DCF_INST_CD','DUP_OPP_CD','START_MONTH', + 'INVALID_FLG','REMARKS','DCF_INST_CD_NEW','ENABLED_FLG', + 'CREATER','CREATE_DATE','UPDATER','UPDATE_DATE'] + + with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: + # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) + csv_file.write(f"{','.join(head_str)}\n") + + # Shift-JIS、CRLF、価囲いありで書き込む + writer = csv.writer(csv_file, delimiter=',', lineterminator='\n', + quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, + strict=True + ) + + # データ部分書き込み(施設) + for record_inst_data in record_inst: + record_inst_value = list(record_inst_data.values()) + csv_data = ['' if n is None else n for n in record_inst_value] + writer.writerow(csv_data) + + return csv_file_path + + def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): + # S3バケットにファイルを移動 + jsk_send_bucket = JskSendBucket() + # バッチ共通設定を取得 + batch_context = BatchContext.get_instance() + + jsk_send_bucket.upload_dcf_inst_merge_csv_file(csv_file_name, csv_file_path) + jsk_send_bucket.backup_dcf_inst_merge_csv_file(csv_file_name, batch_context.syor_date) + return \ No newline at end of file