diff --git a/ecs/jskult-batch-ultmarc-io/src/main.py b/ecs/jskult-batch-ultmarc-io/src/main.py index cd952ddb..fac189ab 100644 --- a/ecs/jskult-batch-ultmarc-io/src/main.py +++ b/ecs/jskult-batch-ultmarc-io/src/main.py @@ -55,18 +55,17 @@ def exec(): batch_status_manager.set_process_status(constants.PROCESS_STATUS_ERROR) return constants.BATCH_EXIT_CODE_SUCCESS + # アルトマーク取込が正常終了していればバッチステータスを処理済に変更 + # DCF/DSFデータ作成でエラーになっても、バッチ処理としては完了したと判断する。 + batch_status_manager.set_process_status(constants.PROCESS_STATUS_DONE) + try: logger.info('実消化用DCF/DSFデータ作成処理:起動') output_dcf_dsf_data.exec() logger.info('実消化用DCF/DSFデータ作成処理:終了') except BatchOperationException as e: logger.exception(f'実消化用施設DCF/DSF作成処理エラー(異常終了){e}') - # バッチステータスをエラーに変更 - batch_status_manager.set_process_status(constants.PROCESS_STATUS_ERROR) return constants.BATCH_EXIT_CODE_SUCCESS - # バッチステータスを処理済に変更 - batch_status_manager.set_process_status(constants.PROCESS_STATUS_DONE) - logger.info('アルトマーク取込/データ出力:終了') return constants.BATCH_EXIT_CODE_SUCCESS diff --git a/ecs/jskult-transfer-receive-file/src/aws/s3.py b/ecs/jskult-transfer-receive-file/src/aws/s3.py index 015992b4..a726c1c3 100644 --- a/ecs/jskult-transfer-receive-file/src/aws/s3.py +++ b/ecs/jskult-transfer-receive-file/src/aws/s3.py @@ -1,11 +1,7 @@ -import gzip import json -import os -import os.path as path -import shutil -import tempfile import boto3 + from src.system_var import environment @@ -20,7 +16,7 @@ class S3Client: return [] contents = response['Contents'] # 末尾がスラッシュで終わるものはフォルダとみなしてスキップする - objects = [{'filename': content['Key'], 'size': content['Size']} + objects = [content['Key'] for content in contents if not content['Key'].endswith('/')] return objects @@ -60,46 +56,26 @@ class JskIOBucket(S3Bucket): self._bucket_name, self._recv_folder) return self._s3_file_list - def download_data_file(self, data_filename: str): - temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join( - temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') - with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, data_filename, f) - f.seek(0) - return temporary_file_path - - def unzip_data_file(self, filename: str): - temp_dir = os.path.dirname(filename) - decompress_filename = os.path.basename(filename).replace('.gz', '') - decompress_file_path = os.path.join(temp_dir, decompress_filename) - with gzip.open(filename, 'rb') as gz: - with open(decompress_file_path, 'wb') as decompressed_file: - shutil.copyfileobj(gz, decompressed_file) - - ret = [decompress_file_path] - return ret - - def transfer_file_to_import(self, target_file: dict): + def transfer_file_to_import(self, target_file: str): data_import_bucket = DataImportBucket() - transfer_from_file_path = target_file.get("filename") + transfer_from_file_path = target_file transfer_to_filename = transfer_from_file_path.replace( f"{self._recv_folder}/", "") data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}' self._s3_client.copy(self._bucket_name, transfer_from_file_path, data_import_bucket._bucket_name, data_import_key) - def backup_file(self, target_file: dict, datetime_key: str): + def backup_file(self, target_file: str, datetime_key: str): jsk_backup_bucket = JskBackupBucket() - backup_from_file_path = target_file.get("filename") + backup_from_file_path = target_file backup_to_filename = backup_from_file_path.replace( f"{self._recv_folder}/", "") backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}' self._s3_client.copy(self._bucket_name, backup_from_file_path, jsk_backup_bucket._bucket_name, backup_key) - def delete_file(self, target_file: dict): - delete_path = target_file.get("filename") + def delete_file(self, target_file: str): + delete_path = target_file self._s3_client.delete_file( self._bucket_name, delete_path) @@ -116,16 +92,16 @@ class UltmarcBucket(S3Bucket): def get_file_list(self): return self._s3_client.list_objects(self._bucket_name, self._folder) - def backup_file(self, target_file: dict, datetime_key: str): + def backup_file(self, target_file: str, datetime_key: str): # バックアップバケットにコピー ultmarc_backup_bucket = UltmarcBackupBucket() - target_file_path = target_file.get("filename") + target_file_path = target_file backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{target_file_path.replace(f"{self._folder}/", "")}' self._s3_client.copy(self._bucket_name, target_file_path, ultmarc_backup_bucket._bucket_name, backup_key) - def delete_file(self, target_file: dict): - delete_path = target_file.get("filename") + def delete_file(self, target_file: str): + delete_path = target_file self._s3_client.delete_file( self._bucket_name, delete_path) @@ -134,8 +110,8 @@ class UltmarcImportBucket(S3Bucket): _bucket_name = environment.ULTMARC_DATA_BUCKET _folder = environment.ULTMARC_IMPORT_FOLDER - def transfer_file_to_import(self, target_file: dict): - from_file_path = target_file.get("filename") + def transfer_file_to_import(self, target_file: str): + from_file_path = target_file to_filename = from_file_path.replace( f"{UltmarcBucket()._folder}/", "") data_import_key = f'{self._folder}/{to_filename}' diff --git a/ecs/jskult-transfer-receive-file/src/main.py b/ecs/jskult-transfer-receive-file/src/main.py index 55a5f847..180500ba 100644 --- a/ecs/jskult-transfer-receive-file/src/main.py +++ b/ecs/jskult-transfer-receive-file/src/main.py @@ -1,5 +1,9 @@ """実消化&アルトマーク データ転送処理""" +import itertools +import os +import re + from src.aws.s3 import (JskIOBucket, TransferResultOutputBucket, UltmarcBucket, UltmarcImportBucket) from src.error.exceptions import BatchOperationException @@ -32,12 +36,12 @@ def exec(): # 日次バッチ処理中の場合、後続の処理は行わない if batch_processing_flag == constants.BATCH_ACTF_BATCH_START: - logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。') + logger.error('日次バッチ処理中のため、実消化&アルトマークデータ転送を終了します。') return constants.BATCH_EXIT_CODE_SUCCESS # dump取得が正常終了していない場合、後続の処理は行わない if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE: - logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。') + logger.error('dump取得が正常終了していないため、実消化&アルトマークデータ転送を終了します。') return constants.BATCH_EXIT_CODE_SUCCESS logger.info(f'処理日={syor_date}') @@ -54,10 +58,45 @@ def exec(): jsk_receive_file_list = None try: jsk_io_bucket = JskIOBucket() - jsk_receive_file_list: str = jsk_io_bucket.get_file_list() + jsk_receive_file_list: list[str] = jsk_io_bucket.get_file_list() except Exception as e: logger.exception(f'実消化データリスト取得に失敗しました。{e}') return constants.BATCH_EXIT_CODE_SUCCESS + + # 実消化データリストの中で、ファイル種類(ファイル名のプレフィックス)が重複するものがあるかどうかをチェックする。 + # 1) プレフィックスごとにマップを作り、該当するファイル名をリストで集める + # 以下のようなマップが作られる + # { + # "TRN_RESULT_DATA": ["TRN_RESULT_DATA_20250606102030.zip", + # "TRN_RESULT_DATA_20250606112030.zip"], + # "TRN_Recive_Inventry": ["TRN_Recive_Inventry_20250606102030.zip"], + # ... + # } + prefix_map: dict[str, list[str]] = {} + for filename in jsk_receive_file_list: + p = extract_prefix(filename) + prefix_map.setdefault(p, []).append(filename) + + # 2) 重複しているプレフィックスを探す + duplicates = {prefix: file_list for prefix, + file_list in prefix_map.items() if len(file_list) > 1} + + # 3) 重複があれば転送一覧から除外する + if duplicates: + # マップをフラットなリストに変換する + duplicate_files = list( + itertools.chain.from_iterable(duplicates.values())) + logger.warning( + f'W-1 実消化データの中で一部重複データがあります。重複データは転送から除外します。重複データ一覧: {duplicate_files}') + # 転送しなかったファイルもバックアップに移動させる + for filename in duplicate_files: + jsk_io_bucket.backup_file(filename, syor_date) + jsk_io_bucket.delete_file(filename) + + # S3内のファイル数と重複ファイルの差集合を取ることで、要素を削除 + jsk_receive_file_list = list( + set(jsk_receive_file_list) - set(duplicate_files)) + logger.info(f'I-4 実消化データリスト取得終了。取得データ一覧:{jsk_receive_file_list}') # ④ 取得した実消化データのリストでループ開始 @@ -72,7 +111,7 @@ def exec(): # ⑧ 転送が完了したファイル名を転送データリストに追加する # ファイル名のみ切り出して追加 transfer_file_lists['jsk_transfer_list'].append( - receive_file['filename'].split('/')[1]) + receive_file.split('/')[1]) # ⑨ ループ終了後、実消化データ転送終了ログ(I-6)を出力する logger.info(f'I-6 実消化データ転送処理終了') @@ -86,6 +125,17 @@ def exec(): except Exception as e: logger.exception(f'アルトマークデータリスト取得に失敗しました。{e}') return constants.BATCH_EXIT_CODE_SUCCESS + + # アルトマークデータは1件以上送られてくるのが想定外のため、1件より多かったら連携から除外する + if len(ultmarc_receive_file_list) > 1: + logger.warning( + f'W-2 アルトマークデータが複数配置されているため、転送から除外します。重複データ一覧: {ultmarc_receive_file_list}') + # 転送しなかった場合でもバックアップに移動させる + for filename in ultmarc_receive_file_list: + ultmarc_bucket.backup_file(filename, syor_date) + ultmarc_bucket.delete_file(filename) + # 連携しないようにするため、リストを0件に書き換える。 + ultmarc_receive_file_list = [] logger.info( f'I-8 アルトマークデータリスト取得終了。取得データ一覧:{ultmarc_receive_file_list}') @@ -102,12 +152,13 @@ def exec(): # ⑮ 転送が完了したファイル名を転送データリストに追加する # ファイル名のみ切り出して追加 transfer_file_lists['ult_transfer_list'].append( - receive_file['filename'].split('/')[1]) + receive_file.split('/')[1]) # ⑯ ループ終了後、アルトマークデータ転送終了ログ(I-10)を出力する - logger.info(f'I-6 実消化データ転送処理終了') + logger.info(f'I-10 アルトマークデータ転送処理終了') # ⑰ 転送データリストをJSONファイル化し、S3バケットにアップロードする + logger.info(f'I-11 データ転送結果アップロード') TransferResultOutputBucket().put_transfer_result(transfer_file_lists, syor_date) # ⑱ 処理終了ログ(I-12)を出力する @@ -118,3 +169,20 @@ def exec(): except Exception as e: logger.exception(f'実消化&アルトマーク データ転送処理中に想定外のエラーが発生しました {e}') raise e + + +def extract_prefix(filename: str) -> str: + """ + ファイル名から『タイムスタンプ部分 + 拡張子』を除いたプレフィックスを返す。 + 例: "TRN_RESULT_DATA_20250606102030.zip" -> "TRN_RESULT_DATA" + """ + # 拡張子を取り除く + file, ext = os.path.splitext(os.path.basename(filename)) + # ファイル名の末尾がタイムスタンプ(数字14桁)である想定なので、最後の '_' 以降を削除 + # TRN_RESULT_DATA_20250606102030 -> ["TRN_RESULT_DATA", "20250606102030"] + parts = file.rsplit('_', 1) + if len(parts) == 2 and re.fullmatch(r"\d{14}", parts[1]): + return parts[0] + else: + # 「最後がタイムスタンプじゃない」場合、そのままのファイル名全体を返す + return filename