diff --git a/ecs/jskult-batch/.env.example b/ecs/jskult-batch/.env.example index 500f843d..b5ac18f8 100644 --- a/ecs/jskult-batch/.env.example +++ b/ecs/jskult-batch/.env.example @@ -1,26 +1,24 @@ -DB_HOST=************ -DB_PORT=************ -DB_USERNAME=************ -DB_PASSWORD=************ +DB_HOST****************** +DB_PORT=***************** +DB_USERNAME=************* +DB_PASSWORD=************* DB_SCHEMA=src05 +JSK_IO_BUCKET=mbj-newdwh2021-staging-jskult-io +JSKULT_BACKUP_BUCKET=mbj-newdwh2021-staging-backup-jskult +BATCH_MANAGE_DYNAMODB_TABLE_NAME=mbj-newdwh2021-staging-jskult-batch-run-manage +BATCH_EXECUTION_ID=localtest +MAX_RUN_COUNT=3 LOG_LEVEL=INFO -ULTMARC_DATA_BUCKET=**************** -ULTMARC_DATA_FOLDER=recv -JSKULT_BACKUP_BUCKET=**************** -ULTMARC_BACKUP_FOLDER=ultmarc -VJSK_BACKUP_FOLDER=vjsk -JSKULT_CONFIG_BUCKET=********************** -JSKULT_CONFIG_CALENDAR_FOLDER=jskult/calendar -JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME=jskult_holiday_list.txt -VJSK_DATA_SEND_FOLDER=send -VJSK_DATA_RECEIVE_FOLDER=recv -VJSK_DATA_BUCKET=************* -JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME=jskult_wholesaler_stock_input_day_list.txt -JSKULT_CONFIG_CONVERT_FOLDER=jskult/convert -JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME=ultmarc_hex_convert_config.json -# 連携データ抽出期間 -SALES_LAUNDERING_EXTRACT_DATE_PERIOD=0 -# 洗替対象テーブル名 -SALES_LAUNDERING_TARGET_TABLE_NAME=src05.sales_lau -# 卸実績洗替で作成するデータの期間(年単位) -SALES_LAUNDERING_TARGET_YEAR_OFFSET=5 +PROCESS_NAME=jskult-batch-dcf-inst-merge-io +JSK_DATA_SEND_FOLDER=send +JSK_BACKUP_FOLDER=jsk/send +TRANSFER_RESULT_FOLDER=transfer_result +TRANSFER_RESULT_FILE_NAME=transfer_result.json +DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv +JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config + +# DB接続リトライ設定 +DB_CONNECTION_MAX_RETRY_ATTEMPT=1 +DB_CONNECTION_RETRY_INTERVAL_INIT=1 +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1 +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=1 diff --git a/ecs/jskult-batch/.vscode/launch.json b/ecs/jskult-batch/.vscode/launch.json index bcd1c6dd..8bbb94a0 100644 --- a/ecs/jskult-batch/.vscode/launch.json +++ b/ecs/jskult-batch/.vscode/launch.json @@ -10,7 +10,8 @@ "request": "launch", "program": "entrypoint.py", "console": "integratedTerminal", - "justMyCode": true + "justMyCode": true, + "envFile": "${workspaceFolder}/.env" } ] } \ No newline at end of file diff --git a/ecs/jskult-batch/src/aws/s3.py b/ecs/jskult-batch/src/aws/s3.py index 6203868d..6e5755be 100644 --- a/ecs/jskult-batch/src/aws/s3.py +++ b/ecs/jskult-batch/src/aws/s3.py @@ -1,11 +1,7 @@ -import gzip -import os import os.path as path -import shutil import tempfile import boto3 - from src.system_var import environment @@ -14,7 +10,8 @@ class S3Client: _bucket_name: str def list_objects(self, bucket_name: str, folder_name: str): - response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + response = self.__s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=folder_name) if response['KeyCount'] == 0: return [] contents = response['Contents'] @@ -55,61 +52,45 @@ class S3Bucket(): _bucket_name: str = None -class UltmarcBucket(S3Bucket): - _bucket_name = environment.ULTMARC_DATA_BUCKET - _folder = environment.ULTMARC_DATA_FOLDER - - def list_dat_file(self): - return self._s3_client.list_objects(self._bucket_name, self._folder) - - def download_dat_file(self, dat_filename: str): - # 一時ファイルとして保存する - temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}') - with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, dat_filename, f) - f.seek(0) - return temporary_file_path - - def backup_dat_file(self, dat_file_key: str, datetime_key: str): - # バックアップバケットにコピー - ultmarc_backup_bucket = UltmarcBackupBucket() - backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}' - self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key) - # コピー元のファイルを削除 - self._s3_client.delete_file(self._bucket_name, dat_file_key) - - class ConfigBucket(S3Bucket): + # TODO 日付更新処理で内容の修正を行う _bucket_name = environment.JSKULT_CONFIG_BUCKET def download_holiday_list(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, holiday_list_key, f) + self._s3_client.download_file( + self._bucket_name, holiday_list_key, f) f.seek(0) return temporary_file_path def download_wholesaler_stock_input_day_list(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) - wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) + wholesaler_stock_input_day_list_key = \ + f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' + with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f) + self._s3_client.download_file( + self._bucket_name, wholesaler_stock_input_day_list_key, f) f.seek(0) return temporary_file_path def download_ultmarc_hex_convert_config(self): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) + temporary_file_path = path.join( + temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, hex_convert_config_key, f) + self._s3_client.download_file( + self._bucket_name, hex_convert_config_key, f) f.seek(0) return temporary_file_path @@ -118,68 +99,42 @@ class JskUltBackupBucket(S3Bucket): _bucket_name = environment.JSKULT_BACKUP_BUCKET -class UltmarcBackupBucket(JskUltBackupBucket): - _folder = environment.ULTMARC_BACKUP_FOLDER +class JskBackupBucket(JskUltBackupBucket): + _folder = environment.JSKULT_BACKUP_BUCKET -class VjskBackupBucket(JskUltBackupBucket): - _folder = environment.VJSK_BACKUP_FOLDER +class JskTransferListBucket(JskUltBackupBucket): + _folder = environment.TRANSFER_RESULT_FOLDER - -class VjskReceiveBucket(S3Bucket): - _bucket_name = environment.VJSK_DATA_BUCKET - _recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER - - _s3_file_list = None - - def get_s3_file_list(self): - self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder) - return self._s3_file_list - - def download_data_file(self, data_filename: str): + def download_transfer_result_file(self, process_date_yyyymmdd: str): + file_name = environment.TRANSFER_RESULT_FILE_NAME + # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') + temporary_file_path = path.join( + temporary_dir, file_name) + holiday_list_key = f'{self._folder}/{process_date_yyyymmdd}/{file_name}' with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, data_filename, f) + self._s3_client.download_file( + self._bucket_name, holiday_list_key, f) f.seek(0) return temporary_file_path - def unzip_data_file(self, filename: str): - temp_dir = os.path.dirname(filename) - decompress_filename = os.path.basename(filename).replace('.gz', '') - decompress_file_path = os.path.join(temp_dir, decompress_filename) - with gzip.open(filename, 'rb') as gz: - with open(decompress_file_path, 'wb') as decompressed_file: - shutil.copyfileobj(gz, decompressed_file) - ret = [decompress_file_path] - return ret +class JskSendBucket(S3Bucket): + _bucket_name = environment.JSK_IO_BUCKET + _send_folder = environment.JSK_DATA_SEND_FOLDER - def backup_dat_file(self, target_files: list, datetime_key: str): - jskult_backup_bucket = VjskBackupBucket() - for target_file in target_files: - backup_from_file_path = target_file.get("filename") - backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "") - backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}' - self._s3_client.copy(self._bucket_name, backup_from_file_path, - jskult_backup_bucket._bucket_name, backup_key) - self._s3_client.delete_file(self._bucket_name, backup_from_file_path) - - -class VjskSendBucket(S3Bucket): - _bucket_name = environment.VJSK_DATA_BUCKET - _send_folder = environment.VJSK_DATA_SEND_FOLDER - - def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str): + def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str): # S3バケットにファイルを移動 - csv_file_name = f'{self._send_folder}/{vjsk_create_csv}' - s3_client = S3Client() - s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) + csv_file_name = f'{self._send_folder}/{jskult_create_csv}' + self._s3_client.upload_file( + csv_file_path, self._bucket_name, csv_file_name) return - def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str): + def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str): # バックアップバケットにコピー - vjsk_backup_bucket = VjskBackupBucket() + jskult_backup_bucket = JskUltBackupBucket() dat_key = f'{self._send_folder}/{dat_file_key}' - backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' - self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key) + backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' + self._s3_client.copy(self._bucket_name, dat_key, + jskult_backup_bucket._bucket_name, backup_key) diff --git a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py index 710380c9..ad311d42 100644 --- a/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py +++ b/ecs/jskult-batch/src/batch/dcf_inst_merge_io.py @@ -1,4 +1,20 @@ +import csv +import json +import os.path as path +import tempfile + +from src.aws.s3 import JskSendBucket, JskTransferListBucket from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint +from src.db.database import Database +from src.error.exceptions import (BatchOperationException, + MaxRunCountReachedException) +from src.logging.get_logger import get_logger +from src.manager.jskult_batch_run_manager import JskultBatchRunManager +from src.manager.jskult_batch_status_manager import JskultBatchStatusManager +from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager +from src.system_var import environment + +logger = get_logger('DCF削除新規マスタ作成') class DcfInstMergeIO(JskultBatchEntrypoint): @@ -6,5 +22,279 @@ class DcfInstMergeIO(JskultBatchEntrypoint): super().__init__() def execute(self): - # TODO: ここでDCF削除新規マスタ作成/データ出力処理を実行する - pass + jskult_hdke_tbl_manager = JskultHdkeTblManager() + jskult_batch_run_manager = JskultBatchRunManager( + environment.BATCH_EXECUTION_ID) + if not jskult_hdke_tbl_manager.can_run_process(): + logger.error( + '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() + return + + # 業務日付を取得 + _, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses() + + # 転送ファイル一覧を取得し、転送件数を取得 + try: + transfer_list_bucket = JskTransferListBucket() + transfer_list_file_path = transfer_list_bucket.download_transfer_result_file( + process_date) + except Exception as e: + logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}') + # バッチ実行管理テーブルをfailedで登録 + jskult_batch_run_manager.batch_failed() + + with open(transfer_list_file_path) as f: + transfer_list = json.load(f) + + # 実消化データ + アルトマークデータの転送件数を合算し、受信ファイル件数とする + receive_file_count = len( + transfer_list['jsk_transfer_list']) + len(transfer_list['ult_transfer_list']) + + jskult_batch_status_manager = JskultBatchStatusManager( + environment.PROCESS_NAME, + # TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え + 'post_process', + environment.MAX_RUN_COUNT, + receive_file_count + ) + try: + + jskult_batch_status_manager.set_process_status("start") + try: + if not jskult_batch_status_manager.can_run_post_process(): + # 後続処理の起動条件を満たしていない場合 + # 処理ステータスを「処理待」に設定 + jskult_batch_status_manager.set_process_status("waiting") + + # バッチ実行管理テーブルに「retry」で登録 + jskult_batch_run_manager.batch_retry() + + return + except MaxRunCountReachedException: + logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。') + + jskult_batch_status_manager.set_process_status("doing") + + # アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 + if jskult_batch_status_manager.is_done_ultmarc_import(): + # COM_施設からDCF削除新規マスタに登録 + (is_add_dcf_inst_merge, + duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self) + if is_add_dcf_inst_merge: + self._output_add_dcf_inst_merge_log( + duplication_inst_records) + + # CSV出力 + dcf_inst_merge_all_records = self._select_dcf_inst_merge_all() + file_path = self._make_csv_data( + environment.DCF_INST_MERGE_SEND_FILE_NAME, + dcf_inst_merge_all_records) + + # CSVをS3にアップロード + self._upload_dcf_inst_merge_csv_file( + file_path, process_date, environment.DCF_INST_MERGE_SEND_FILE_NAME) + + # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録 + logger.info("DCF削除新規マスタ作成処理を正常終了します。") + jskult_batch_run_manager.batch_success() + jskult_batch_status_manager.set_process_status("done") + + return + + except Exception as e: + # 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録 + logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}') + jskult_batch_run_manager.batch_failed() + jskult_batch_status_manager.set_process_status("failed") + + def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]: + try: + self._db = Database.get_instance() + self._db.connect() + sql = """\ + SELECT + * + FROM + src07.dcf_inst_merge + """ + dcf_inst_merge_all_records = self._db.execute_select(sql) + return dcf_inst_merge_all_records + except Exception as e: + raise BatchOperationException(e) + finally: + self._db.disconnect() + + # com_instからdcf_inst_mergeにinsert + def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: + + try: + self._db = Database.get_instance() + self._db.connect() + self._db.begin() + self._db.to_jst() + sql = """\ + SELECT + ci.DCF_DSF_INST_CD, + ci.FORM_INST_NAME_KANJI, + ci.DELETE_SCHE_REASON_CD, + ci.DUP_OPP_CD, + ci.SYS_UPDATE_DATE + FROM + src05.COM_INST AS ci + WHERE + ci.DUP_OPP_CD IS NOT NULL + AND + ci.DELETE_SCHE_REASON_CD = 'D' + AND + ci.DELETE_DATA IS NULL + AND + ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW() + AND + NOT EXISTS ( + SELECT + dim.DCF_INST_CD + FROM + src07.DCF_INST_MERGE AS dim + WHERE + dim.DCF_INST_CD = ci.DCF_DSF_INST_CD + ) + AND + + (ci.DCF_DSF_INST_CD EXISTS( + SELECT + mia.INST_CD + FROM + src07.MST_INST_ASSN as mia + WHERE + mia.INST_CD = ci.DCF_DSF_INST_CD + ) + ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + ap.PRSB_INST_CD + FROM + src07.ATC_PHARM AS ap + WHERE + ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD + ) + OR ci.DCF_DSF_INST_CD EXISTS( + SELECT + trd.INST_CD + FROM + src07.TRN_RESULT_DATA AS trd + WHERE + trd.INST_CD = ci.DCF_DSF_INST_CD + ) + ) + ; + """ + duplication_inst_records = self._db.execute_select(sql) + # DCF削除新規マスタ取り込み + values_clauses = [] + params = {} + for clauses_no, row in enumerate(duplication_inst_records, start=1): + dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}" + dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}" + values_clause = f"""(:{dcf_inst_cd_arr}, + :{dup_opp_cd_arr}, + DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH), + NULL, + NULL, + NULL, + "Y", + batchuser, + SYSDATE(), + batchuser, + SYSDATE() + )""" + values_clauses.append(values_clause) + params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD'] + params[dup_opp_cd_arr] = row['DUP_OPP_CD'] + insert_sql = f""" + INSERT INTO + src07.dcf_inst_merge ( + DCF_INST_CD, + DUP_OPP_CD, + START_MONTH, + INVALID_FLG, + REMARKS, + DCF_INST_CD_NEW, + ENABLED_FLG, + CREATER, + CREATE_DATE, + UPDATER, + UPDATE_DATE + ) + VALUES + {','.join(values_clauses)} + """ + self._db.execute(insert_sql, params) + return (True, duplication_inst_records) + except Exception as e: + self._db.rollback() + raise BatchOperationException(e) + finally: + self._db.disconnect() + + def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): + sys_update_date = duplication_inst_records[0]['sys_update_date'] + set_year_month = '{set_year}年{set_month}月'.format( + set_year=sys_update_date[0:4], + set_month=sys_update_date[-2:] + ) + add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' + add_dct_inst_merge_list = [] + for row in duplication_inst_records: + add_dct_inst_merge_list.append( + add_dct_inst_merge.format(**row)) + add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) + # 顧客報告用にログ出力 + logger.info( + f"""DCF施設統合マスタが追加されました。 +********************************************************** +適用月度 {set_year_month} +********************************************************** +{add_dct_inst_merge_list} +********************************************************** +合計 {len(duplication_inst_records)}件""" + ) + + return + + def _make_csv_data(csv_file_name: str, record_inst: list): + temporary_dir = tempfile.mkdtemp() + csv_file_path = path.join(temporary_dir, csv_file_name) + head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH', + 'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG', + 'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE'] + with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: + # ヘッダ行書き込み(くくり文字をつけない為にwriterowではなく、writeを使用しています) + csv_file.write(f"{','.join(head_str)}\n") + # UTF-8、CRLF、価囲いありで書き込む + writer = csv.writer(csv_file, delimiter=',', lineterminator='\r\n', + quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, + strict=True + ) + # データ部分書き込み(施設) + for record_inst_data in record_inst: + record_inst_value = list(record_inst_data.values()) + csv_data = [ + '' if n is None else n for n in record_inst_value] + writer.writerow(csv_data) + + return csv_file_path + + def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, process_date: str, csv_file_path: str): + jsk_send_bucket = JskSendBucket() + + # S3バケットにファイルをアップロード + jsk_send_bucket.upload_dcf_inst_merge_csv_file( + csv_file_name, csv_file_path) + + # CSVファイルをバックアップ + jsk_send_bucket.backup_dcf_inst_merge_csv_file( + csv_file_name, process_date) + + return diff --git a/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py b/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py index 291a8d1f..47f34952 100644 --- a/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py +++ b/ecs/jskult-batch/src/batch/jskult_batch_entrypoint.py @@ -3,6 +3,6 @@ import abc class JskultBatchEntrypoint(metaclass=abc.ABCMeta): - @abc.abstractmethod() + @abc.abstractmethod def execute(self): pass diff --git a/ecs/jskult-batch/src/error/exceptions.py b/ecs/jskult-batch/src/error/exceptions.py index 055c24f6..aa5f9be6 100644 --- a/ecs/jskult-batch/src/error/exceptions.py +++ b/ecs/jskult-batch/src/error/exceptions.py @@ -8,3 +8,7 @@ class DBException(MeDaCaException): class BatchOperationException(MeDaCaException): pass + + +class MaxRunCountReachedException(MeDaCaException): + pass diff --git a/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py b/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py index 4a804ef3..f6c8a9f0 100644 --- a/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py +++ b/ecs/jskult-batch/src/manager/jskult_hdke_tbl_manager.py @@ -113,7 +113,7 @@ class JskultHdkeTblManager: finally: self._db.disconnect() # 日次バッチ処理中ではない場合、後続の処理は行わない - if batch_processing_flag != constants.BATCH_ACTF_BATCH_START: + if batch_processing_flag != constants.BATCH_ACTF_BATCH_START: return False # dump取得が正常終了していない場合、後続の処理は行わない if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE: diff --git a/ecs/jskult-batch/src/system_var/constants.py b/ecs/jskult-batch/src/system_var/constants.py index 8a0ccbb3..10cd7fb8 100644 --- a/ecs/jskult-batch/src/system_var/constants.py +++ b/ecs/jskult-batch/src/system_var/constants.py @@ -4,7 +4,7 @@ BATCH_EXIT_CODE_SUCCESS = 0 # バッチ処理中フラグ:未処理 BATCH_ACTF_BATCH_UNPROCESSED = '0' # バッチ処理中フラグ:処理中 -BATCH_ACTF_BATCH_IN_PROCESSING = '1' +BATCH_ACTF_BATCH_START = '1' # dump取得状態区分:未処理 DUMP_STATUS_KBN_UNPROCESSED = '0' # dump取得状態区分:dump取得正常終了 diff --git a/ecs/jskult-batch/src/system_var/environment.py b/ecs/jskult-batch/src/system_var/environment.py index e70d8bb4..4e220ba0 100644 --- a/ecs/jskult-batch/src/system_var/environment.py +++ b/ecs/jskult-batch/src/system_var/environment.py @@ -7,12 +7,26 @@ DB_USERNAME = os.environ['DB_USERNAME'] DB_PASSWORD = os.environ['DB_PASSWORD'] DB_SCHEMA = os.environ['DB_SCHEMA'] -# 処理名 +# AWS +JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET'] +BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID'] +MAX_RUN_COUNT = os.environ['MAX_RUN_COUNT'] +TRANSFER_RESULT_FOLDER = os.environ['TRANSFER_RESULT_FOLDER'] +TRANSFER_RESULT_FILE_NAME = os.environ['TRANSFER_RESULT_FILE_NAME'] +DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME'] +JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET'] +JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET'] +JSK_BACKUP_FOLDER = os.environ['JSK_BACKUP_FOLDER'] +JSK_DATA_SEND_FOLDER = os.environ['JSK_DATA_SEND_FOLDER'] # 初期値がある環境変数 LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') -DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) -DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) -DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) -DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) +DB_CONNECTION_MAX_RETRY_ATTEMPT = int( + os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) +DB_CONNECTION_RETRY_INTERVAL_INIT = int( + os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) diff --git a/ecs/jskult-batch/test.py b/ecs/jskult-batch/test.py new file mode 100644 index 00000000..e69de29b diff --git a/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env b/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env new file mode 100644 index 00000000..8469e66f --- /dev/null +++ b/s3/config/jskult/task_settings/dcf_inst_merge_io_task_settings.env @@ -0,0 +1,13 @@ +# task environment file. +LOG_LEVEL=INFO +PROCESS_NAME=jskult-batch-dcf-inst-merge-io +JSK_DATA_SEND_FOLDER=send +JSK_BACKUP_FOLDER=jsk/send +TRANSFER_RESULT_FOLDER=transfer_result +TRANSFER_RESULT_FILE_NAME=transfer_result.json +DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv +JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config +DB_CONNECTION_MAX_RETRY_ATTEMPT=1 +DB_CONNECTION_RETRY_INTERVAL_INIT=1 +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1 +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=1