diff --git a/ecs/jskult-batch/.vscode/launch.json b/ecs/jskult-batch/.vscode/launch.json index bcd1c6dd..8bbb94a0 100644 --- a/ecs/jskult-batch/.vscode/launch.json +++ b/ecs/jskult-batch/.vscode/launch.json @@ -10,7 +10,8 @@ "request": "launch", "program": "entrypoint.py", "console": "integratedTerminal", - "justMyCode": true + "justMyCode": true, + "envFile": "${workspaceFolder}/.env" } ] } \ No newline at end of file diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index f70cc049..66032e1c 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -14,7 +14,8 @@ class S3Client: _bucket_name: str def list_objects(self, bucket_name: str, folder_name: str): - response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + response = self.__s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=folder_name) if response['KeyCount'] == 0: return [] contents = response['Contents'] @@ -54,6 +55,7 @@ class S3Bucket(): _s3_client = S3Client() _bucket_name: str = None + class ConfigBucket(S3Bucket): # TODO 日付更新処理で内容の修正を行う _bucket_name = environment.JSKULT_CONFIG_BUCKET @@ -61,30 +63,36 @@ class ConfigBucket(S3Bucket): def download_holiday_list(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, holiday_list_key, f) + self._s3_client.download_file( + self._bucket_name, holiday_list_key, f) f.seek(0) return temporary_file_path def download_wholesaler_stock_input_day_list(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f) + self._s3_client.download_file( + self._bucket_name, wholesaler_stock_input_day_list_key, f) f.seek(0) return temporary_file_path def download_ultmarc_hex_convert_config(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, hex_convert_config_key, f) + self._s3_client.download_file( + self._bucket_name, hex_convert_config_key, f) f.seek(0) return temporary_file_path @@ -93,19 +101,19 @@ class JskUltBackupBucket(S3Bucket): _bucket_name = environment.JSKULT_BACKUP_BUCKET - -# TODO 設定値をecsタスク定義書から確認 class JskBackupBucket(JskUltBackupBucket): - _folder = environment.VJSK_BACKUP_FOLDER + _folder = environment.JSKULT_BACKUP_BUCKET + class JskSendBucket(S3Bucket): - _bucket_name = environment.JSKULT_DATA_BUCKET - _send_folder = environment.JSKULT_DATA_SEND_FOLDER + _bucket_name = environment.JSK_IO_BUCKET + _send_folder = environment.JSK_DATA_SEND_FOLDER def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str): # S3バケットにファイルを移動 csv_file_name = f'{self._send_folder}/{jskult_create_csv}' - self._s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) + self._s3_client.upload_file( + csv_file_path, self._bucket_name, csv_file_name) return def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str): @@ -113,4 +121,5 @@ class JskSendBucket(S3Bucket): jskult_backup_bucket = JskUltBackupBucket() dat_key = f'{self._send_folder}/{dat_file_key}' backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' - self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key) + self._s3_client.copy(self._bucket_name, dat_key, + jskult_backup_bucket._bucket_name, backup_key) diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index 5a0199f0..e3e7ed12 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -1,64 +1,110 @@ -import os import csv import os.path as path import tempfile + from src.aws.s3 import JskSendBucket - -from src.db.database import Database -from src.error.exceptions import BatchOperationException from src.batch.common.batch_context import BatchContext - from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint +from src.db.database import Database +from src.error.exceptions import BatchOperationException, MaxRunCountReachedException +from src.manager.jskult_batch_run_manager import JskultBatchRunManager from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager from src.manager.jskult_batch_status_manager import JskultBatchStatusManager +from src.system_var import environment from src.logging.get_logger import get_logger logger = get_logger('DCF削除新規マスタ作成') -PROCESS_NAME = os.environ["PROCESS_NAME"] -POST_PROCESS = os.environ["POST_PROCESS"] -MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"] -RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] -CSV_FILE_NAME = os.environ['CSV_FILE_NAME'] class DcfInstMergeIO(JskultBatchEntrypoint): def __init__(self): super().__init__() - def execute(self): - jskultHdkeTblManager = JskultHdkeTblManager() - - if not jskultHdkeTblManager.can_run_process(): - return - + jskultBatchRunManager = JskultBatchRunManager( + environment.BATCH_EXECUTION_ID) jskultBatchStatusManager = JskultBatchStatusManager( - PROCESS_NAME, - POST_PROCESS, - MAX_RUN_COUNT_FLG, - RECEIVE_FILE_COUNT + environment.PROCESS_NAME, + environment.POST_PROCESS, + environment.MAX_RUN_COUNT_FLG, + environment.RECEIVE_FILE_COUNT ) - if not jskultBatchStatusManager.can_run_post_process(): + try: + jskultHdkeTblManager = JskultHdkeTblManager() - # 処理ステータスを「処理待」に設定 - jskultBatchStatusManager.set_process_status("retry") - return - - # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 - if jskultBatchStatusManager.is_done_ultmarc_import(): - - (is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) + if not jskultHdkeTblManager.can_run_process(): + logger.error( + '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') + return + + jskultBatchStatusManager.set_process_status("start") + try: + if not jskultBatchStatusManager.can_run_post_process(): + # リトライ判断された場合 + # 処理ステータスを「処理待」に設定 + jskultBatchStatusManager.set_process_status("waiting") + + # バッチ実行管理テーブルに「retry」で登録 + jskultBatchRunManager.batch_retry() + + return + except MaxRunCountReachedException as e: + logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。') + + jskultBatchStatusManager.set_process_status("doing") + + # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 + if jskultBatchStatusManager.is_done_ultmarc_import(): + + # + (is_add_dcf_inst_merge, + duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) if is_add_dcf_inst_merge: - + # COM_施設からDCF削除新規マスタに登録 _output_add_dcf_inst_merge_log(duplication_inst_records) - - # CSV出力 - file_path = _make_csv_data(CSV_FILE_NAME) + dcf_inst_merge_all_records = _select_dcf_inst_merge_all() + # CSV出力 + file_path = _make_csv_data( + dcf_inst_merge_all_records, environment.CSV_FILE_NAME) - # CSVをS3にアップロード - _upload_dcf_inst_merge_csv_file(CSV_FILE_NAME, file_path) + # CSVをS3にアップロード + _upload_dcf_inst_merge_csv_file( + file_path, environment.CSV_FILE_NAME) + + # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録 + logger.info("DCF削除新規マスタ作成処理を正常終了します。") + + jskultBatchRunManager.batch_success() + jskultBatchStatusManager.set_process_status("done") + + except: + # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 + logger.error("エラーが発生したため、DCF削除新規マスタ作成処理を終了します。") + jskultBatchRunManager.batch_failed() + jskultBatchStatusManager.set_process_status("failed") + + def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: + try: + self._db = Database.get_instance() + self._db.connect() + self._db.begin() + self._db.to_jst() + sql = """\ + SELECT + * + FROM + src07.dcf_inst_merge + """ + dcf_inst_merge_all_records = self._db.execute_select(sql) + return dcf_inst_merge_all_records + + except Exception as e: + self._db.rollback() + raise BatchOperationException(e) + finally: + self._db.disconnect() def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: # com_instからdcf_inst_mergeにinsert @@ -68,7 +114,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint): self._db.begin() self._db.to_jst() - sql ="""\ + sql = """\ SELECT ci.DCF_DSF_INST_CD, ci.FORM_INST_NAME_KANJI, @@ -176,18 +222,18 @@ class DcfInstMergeIO(JskultBatchEntrypoint): finally: self._db.disconnect() - def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): sys_update_date = duplication_inst_records[0]['sys_update_date'] set_year_month = '{set_year}年{set_month}月'.format( set_year=sys_update_date[0:4], set_month=sys_update_date[-2:] - ) + ) add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' add_dct_inst_merge_list = [] for row in duplication_inst_records: - add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row)) + add_dct_inst_merge_list.append( + add_dct_inst_merge.format(**row)) add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) # 顧客報告用にログ出力 logger.info( @@ -201,40 +247,42 @@ class DcfInstMergeIO(JskultBatchEntrypoint): ) return + def _make_csv_data(csv_file_name: str, record_inst: list): + # CSVファイルを作成する + temporary_dir = tempfile.mkdtemp() + csv_file_path = path.join(temporary_dir, csv_file_name) - def _make_csv_data(record_inst: list, csv_file_name: str): - # CSVファイルを作成する - temporary_dir = tempfile.mkdtemp() - csv_file_path = path.join(temporary_dir, csv_file_name) + head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH', + 'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG', + 'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE'] - head_str = ['DCF_INST_CD','DUP_OPP_CD','START_MONTH', - 'INVALID_FLG','REMARKS','DCF_INST_CD_NEW','ENABLED_FLG', - 'CREATER','CREATE_DATE','UPDATER','UPDATE_DATE'] + with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: + # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) + csv_file.write(f"{','.join(head_str)}\n") - with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: - # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) - csv_file.write(f"{','.join(head_str)}\n") + # Shift-JIS、CRLF、価囲いありで書き込む + writer = csv.writer(csv_file, delimiter=',', lineterminator='\n', + quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, + strict=True + ) - # Shift-JIS、CRLF、価囲いありで書き込む - writer = csv.writer(csv_file, delimiter=',', lineterminator='\n', - quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, - strict=True - ) + # データ部分書き込み(施設) + for record_inst_data in record_inst: + record_inst_value = list(record_inst_data.values()) + csv_data = [ + '' if n is None else n for n in record_inst_value] + writer.writerow(csv_data) - # データ部分書き込み(施設) - for record_inst_data in record_inst: - record_inst_value = list(record_inst_data.values()) - csv_data = ['' if n is None else n for n in record_inst_value] - writer.writerow(csv_data) + return csv_file_path - return csv_file_path - def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): # S3バケットにファイルを移動 jsk_send_bucket = JskSendBucket() # バッチ共通設定を取得 batch_context = BatchContext.get_instance() - jsk_send_bucket.upload_dcf_inst_merge_csv_file(csv_file_name, csv_file_path) - jsk_send_bucket.backup_dcf_inst_merge_csv_file(csv_file_name, batch_context.syor_date) - return \ No newline at end of file + jsk_send_bucket.upload_dcf_inst_merge_csv_file( + csv_file_name, csv_file_path) + jsk_send_bucket.backup_dcf_inst_merge_csv_file( + csv_file_name, batch_context.syor_date) + return diff --git a/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py b/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py index 291a8d1f..47f34952 100644 --- a/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py +++ b/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py @@ -3,6 +3,6 @@ import abc class JskultBatchEntrypoint(metaclass=abc.ABCMeta): - @abc.abstractmethod() + @abc.abstractmethod def execute(self): pass diff --git a/ecs/jskult-batch/src/error/exceptions.py b/ecs/jskult-batch/src/error/exceptions.py index 055c24f6..aa5f9be6 100644 --- a/ecs/jskult-batch/src/error/exceptions.py +++ b/ecs/jskult-batch/src/error/exceptions.py @@ -8,3 +8,7 @@ class DBException(MeDaCaException): class BatchOperationException(MeDaCaException): pass + + +class MaxRunCountReachedException(MeDaCaException): + pass diff --git a/ecs/jskult-batch/src/system_var/environment.py b/ecs/jskult-batch/src/system_var/environment.py index e70d8bb4..2d5d5f41 100644 --- a/ecs/jskult-batch/src/system_var/environment.py +++ b/ecs/jskult-batch/src/system_var/environment.py @@ -6,13 +6,24 @@ DB_PORT = int(os.environ['DB_PORT']) DB_USERNAME = os.environ['DB_USERNAME'] DB_PASSWORD = os.environ['DB_PASSWORD'] DB_SCHEMA = os.environ['DB_SCHEMA'] - -# 処理名 +JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET'] +BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID'] +POST_PROCESS = os.environ["POST_PROCESS"] +MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"] +RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"] +CSV_FILE_NAME = os.environ['CSV_FILE_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME'] +JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET'] +JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET'] +JSK_DATA_SEND_FOLDER = os.environ['JSK_DATA_SEND_FOLDER'] # 初期値がある環境変数 LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') -DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) -DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) -DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) -DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) +DB_CONNECTION_MAX_RETRY_ATTEMPT = int( + os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) +DB_CONNECTION_RETRY_INTERVAL_INIT = int( + os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) diff --git a/ecs/jskult-batch/test.py b/ecs/jskult-batch/test.py new file mode 100644 index 00000000..e69de29b