Merge pull request #496 feature-NEWDWH2021-1916 into develop

This commit is contained in:
朝倉 明日香 2025-06-06 18:29:55 +09:00
commit 2091623df3
3 changed files with 92 additions and 49 deletions

View File

@ -55,18 +55,17 @@ def exec():
batch_status_manager.set_process_status(constants.PROCESS_STATUS_ERROR)
return constants.BATCH_EXIT_CODE_SUCCESS
# アルトマーク取込が正常終了していればバッチステータスを処理済に変更
# DCF/DSFデータ作成でエラーになっても、バッチ処理としては完了したと判断する。
batch_status_manager.set_process_status(constants.PROCESS_STATUS_DONE)
try:
logger.info('実消化用DCF/DSFデータ作成処理起動')
output_dcf_dsf_data.exec()
logger.info('実消化用DCF/DSFデータ作成処理終了')
except BatchOperationException as e:
logger.exception(f'実消化用施設DCF/DSF作成処理エラー異常終了{e}')
# バッチステータスをエラーに変更
batch_status_manager.set_process_status(constants.PROCESS_STATUS_ERROR)
return constants.BATCH_EXIT_CODE_SUCCESS
# バッチステータスを処理済に変更
batch_status_manager.set_process_status(constants.PROCESS_STATUS_DONE)
logger.info('アルトマーク取込/データ出力:終了')
return constants.BATCH_EXIT_CODE_SUCCESS

View File

@ -1,11 +1,7 @@
import gzip
import json
import os
import os.path as path
import shutil
import tempfile
import boto3
from src.system_var import environment
@ -20,7 +16,7 @@ class S3Client:
return []
contents = response['Contents']
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']}
objects = [content['Key']
for content in contents if not content['Key'].endswith('/')]
return objects
@ -60,46 +56,26 @@ class JskIOBucket(S3Bucket):
self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(
temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def transfer_file_to_import(self, target_file: dict):
def transfer_file_to_import(self, target_file: str):
data_import_bucket = DataImportBucket()
transfer_from_file_path = target_file.get("filename")
transfer_from_file_path = target_file
transfer_to_filename = transfer_from_file_path.replace(
f"{self._recv_folder}/", "")
data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}'
self._s3_client.copy(self._bucket_name, transfer_from_file_path,
data_import_bucket._bucket_name, data_import_key)
def backup_file(self, target_file: dict, datetime_key: str):
def backup_file(self, target_file: str, datetime_key: str):
jsk_backup_bucket = JskBackupBucket()
backup_from_file_path = target_file.get("filename")
backup_from_file_path = target_file
backup_to_filename = backup_from_file_path.replace(
f"{self._recv_folder}/", "")
backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jsk_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename")
def delete_file(self, target_file: str):
delete_path = target_file
self._s3_client.delete_file(
self._bucket_name, delete_path)
@ -116,16 +92,16 @@ class UltmarcBucket(S3Bucket):
def get_file_list(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def backup_file(self, target_file: dict, datetime_key: str):
def backup_file(self, target_file: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
target_file_path = target_file.get("filename")
target_file_path = target_file
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{target_file_path.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, target_file_path,
ultmarc_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename")
def delete_file(self, target_file: str):
delete_path = target_file
self._s3_client.delete_file(
self._bucket_name, delete_path)
@ -134,8 +110,8 @@ class UltmarcImportBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_IMPORT_FOLDER
def transfer_file_to_import(self, target_file: dict):
from_file_path = target_file.get("filename")
def transfer_file_to_import(self, target_file: str):
from_file_path = target_file
to_filename = from_file_path.replace(
f"{UltmarcBucket()._folder}/", "")
data_import_key = f'{self._folder}/{to_filename}'

View File

@ -1,5 +1,9 @@
"""実消化&アルトマーク データ転送処理"""
import itertools
import os
import re
from src.aws.s3 import (JskIOBucket, TransferResultOutputBucket, UltmarcBucket,
UltmarcImportBucket)
from src.error.exceptions import BatchOperationException
@ -32,12 +36,12 @@ def exec():
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_START:
logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。')
logger.error('日次バッチ処理中のため、実消化&アルトマークデータ転送を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。')
logger.error('dump取得が正常終了していないため、実消化&アルトマークデータ転送を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(f'処理日={syor_date}')
@ -54,10 +58,45 @@ def exec():
jsk_receive_file_list = None
try:
jsk_io_bucket = JskIOBucket()
jsk_receive_file_list: str = jsk_io_bucket.get_file_list()
jsk_receive_file_list: list[str] = jsk_io_bucket.get_file_list()
except Exception as e:
logger.exception(f'実消化データリスト取得に失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 実消化データリストの中で、ファイル種類(ファイル名のプレフィックス)が重複するものがあるかどうかをチェックする。
# 1) プレフィックスごとにマップを作り、該当するファイル名をリストで集める
# 以下のようなマップが作られる
# {
# "TRN_RESULT_DATA": ["TRN_RESULT_DATA_20250606102030.zip",
# "TRN_RESULT_DATA_20250606112030.zip"],
# "TRN_Recive_Inventry": ["TRN_Recive_Inventry_20250606102030.zip"],
# ...
# }
prefix_map: dict[str, list[str]] = {}
for filename in jsk_receive_file_list:
p = extract_prefix(filename)
prefix_map.setdefault(p, []).append(filename)
# 2) 重複しているプレフィックスを探す
duplicates = {prefix: file_list for prefix,
file_list in prefix_map.items() if len(file_list) > 1}
# 3) 重複があれば転送一覧から除外する
if duplicates:
# マップをフラットなリストに変換する
duplicate_files = list(
itertools.chain.from_iterable(duplicates.values()))
logger.warning(
f'W-1 実消化データの中で一部重複データがあります。重複データは転送から除外します。重複データ一覧: {duplicate_files}')
# 転送しなかったファイルもバックアップに移動させる
for filename in duplicate_files:
jsk_io_bucket.backup_file(filename, syor_date)
jsk_io_bucket.delete_file(filename)
# S3内のファイル数と重複ファイルの差集合を取ることで、要素を削除
jsk_receive_file_list = list(
set(jsk_receive_file_list) - set(duplicate_files))
logger.info(f'I-4 実消化データリスト取得終了。取得データ一覧:{jsk_receive_file_list}')
# ④ 取得した実消化データのリストでループ開始
@ -72,7 +111,7 @@ def exec():
# ⑧ 転送が完了したファイル名を転送データリストに追加する
# ファイル名のみ切り出して追加
transfer_file_lists['jsk_transfer_list'].append(
receive_file['filename'].split('/')[1])
receive_file.split('/')[1])
# ⑨ ループ終了後、実消化データ転送終了ログI-6)を出力する
logger.info(f'I-6 実消化データ転送処理終了')
@ -86,6 +125,17 @@ def exec():
except Exception as e:
logger.exception(f'アルトマークデータリスト取得に失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# アルトマークデータは1件以上送られてくるのが想定外のため、1件より多かったら連携から除外する
if len(ultmarc_receive_file_list) > 1:
logger.warning(
f'W-2 アルトマークデータが複数配置されているため、転送から除外します。重複データ一覧: {ultmarc_receive_file_list}')
# 転送しなかった場合でもバックアップに移動させる
for filename in ultmarc_receive_file_list:
ultmarc_bucket.backup_file(filename, syor_date)
ultmarc_bucket.delete_file(filename)
# 連携しないようにするため、リストを0件に書き換える。
ultmarc_receive_file_list = []
logger.info(
f'I-8 アルトマークデータリスト取得終了。取得データ一覧:{ultmarc_receive_file_list}')
@ -102,12 +152,13 @@ def exec():
# ⑮ 転送が完了したファイル名を転送データリストに追加する
# ファイル名のみ切り出して追加
transfer_file_lists['ult_transfer_list'].append(
receive_file['filename'].split('/')[1])
receive_file.split('/')[1])
# ⑯ ループ終了後、アルトマークデータ転送終了ログI-10)を出力する
logger.info(f'I-6 実消化データ転送処理終了')
logger.info(f'I-10 アルトマークデータ転送処理終了')
# ⑰ 転送データリストをJSONファイル化し、S3バケットにアップロードする
logger.info(f'I-11 データ転送結果アップロード')
TransferResultOutputBucket().put_transfer_result(transfer_file_lists, syor_date)
# ⑱ 処理終了ログ(I-12)を出力する
@ -118,3 +169,20 @@ def exec():
except Exception as e:
logger.exception(f'実消化&アルトマーク データ転送処理中に想定外のエラーが発生しました {e}')
raise e
def extract_prefix(filename: str) -> str:
"""
ファイル名からタイムスタンプ部分 + 拡張子を除いたプレフィックスを返す
: "TRN_RESULT_DATA_20250606102030.zip" -> "TRN_RESULT_DATA"
"""
# 拡張子を取り除く
file, ext = os.path.splitext(os.path.basename(filename))
# ファイル名の末尾がタイムスタンプ数字14桁である想定なので、最後の '_' 以降を削除
# TRN_RESULT_DATA_20250606102030 -> ["TRN_RESULT_DATA", "20250606102030"]
parts = file.rsplit('_', 1)
if len(parts) == 2 and re.fullmatch(r"\d{14}", parts[1]):
return parts[0]
else:
# 「最後がタイムスタンプじゃない」場合、そのままのファイル名全体を返す
return filename