不足ファイルの比較と受領予定にないファイルの比較以外の部分の実装とアーカイブ関連の取り込み

This commit is contained in:
mori.k 2025-06-03 17:46:41 +09:00
parent 2783da3328
commit e8253f27da
7 changed files with 392 additions and 15 deletions

View File

@ -1,5 +1,9 @@
import os.path as path
import tempfile
import shutil
import os
import os.path as path
import gzip
import boto3
from src.system_var import environment
@ -138,3 +142,71 @@ class JskSendBucket(S3Bucket):
backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key,
jskult_backup_bucket._bucket_name, backup_key)
class JskIOBucket(S3Bucket):
_bucket_name = environment.JSK_IO_BUCKET
_recv_folder = environment.JSK_RECEIVE_FOLDER
_s3_file_list = None
def get_file_list(self):
self._s3_file_list = self._s3_client.list_objects(
self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(
temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def transfer_file_to_import(self, target_file: dict):
data_import_bucket = DataImportBucket()
transfer_from_file_path = target_file.get("filename")
transfer_to_filename = transfer_from_file_path.replace(
f"{self._recv_folder}/", "")
data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}'
self._s3_client.copy(self._bucket_name, transfer_from_file_path,
data_import_bucket._bucket_name, data_import_key)
def backup_file(self, target_file: dict, datetime_key: str):
jsk_backup_bucket = JskBackupBucket()
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(
f"{self._recv_folder}/", "")
backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jsk_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename")
self._s3_client.delete_file(
self._bucket_name, delete_path)
class JskultArchiveBucket(S3Bucket):
_bucket_name = environment.JSKULT_ARCHIVE_BUCKET
def upload_archive_zip_file(self, archive_zip: str, archive_zip_path: str, send_folder: str):
# S3バケットにファイルを移動
archive_zip_name = f'{send_folder}/{archive_zip}'
s3_client = S3Client()
s3_client.upload_file(
archive_zip_path, self._bucket_name, archive_zip_name)
return f"{self._bucket_name}/{archive_zip_name}"
class DataImportBucket(S3Bucket):
_bucket_name = environment.DATA_IMPORT_BUCKET
_folder = environment.DATA_IMPORT_FOLDER

View File

@ -0,0 +1,75 @@
import csv
import os.path as path
import tempfile
import zipfile
from datetime import timedelta
from src.aws.s3 import JskultArchiveBucket
from src.manager.jskult_archive_manager import JskultArchiveManager
from src.logging.get_logger import get_logger
logger = get_logger("実消化_過去データアーカイブ処理")
def exec():
"""実消化_過去データアーカイブ処理"""
try:
logger.info("処理開始:実消化_過去データアーカイブ処理")
jskult_archive_manager = JskultArchiveManager()
# アーカイブ管理テーブルから対象テーブル、条件項目、条件年月、実行間隔(月)、前回条件年月、保存先を取得
jskult_archive_manage_data_list = jskult_archive_manager.get_archive_manage()
# 取得したレコード分繰り返す
for jskult_archive_manage_data in jskult_archive_manage_data_list:
# 対象テーブルで条件項目が条件年月以前のデータを取得
archive_data = jskult_archive_manager.get_archive_data(
jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"])
# 取得データが0件の場合、スキップする
if not archive_data:
logger.info(
f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
continue
# 一時フォルダ作成
with tempfile.TemporaryDirectory() as temporary_dir:
# 取得したデータをCSVに出力
day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta(
days=1)
file_name = f'{jskult_archive_manage_data["target_table"]}_{day_after_prev_filter_date.strftime('%Y%m%d')}_{jskult_archive_manage_data["filter_date"].strftime('%Y%m%d')}'
csv_file_path = path.join(temporary_dir, f"{file_name}.csv")
headers = archive_data[0].keys()
with open(csv_file_path, 'w', newline='') as file:
writer = csv.DictWriter(
file, fieldnames=headers, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(archive_data)
logger.info(f"CSVファイル作成に成功しました。{file_name}.csv")
# 作成したCSVをzip形式に圧縮
zip_file_path = path.join(temporary_dir, f"{file_name}.zip")
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(csv_file_path)
logger.info(f"zip形式への圧縮に成功しました。{file_name}.zip")
# 圧縮したCSVを保存先へアップロード
archive_bucket = JskultArchiveBucket()
upload_file_path = archive_bucket.upload_archive_zip_file(
f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"])
logger.info(f"{upload_file_path}へのアップロードに成功しました。")
# アーカイブしたデータをDBから削除
jskult_archive_manager.delete_archive_data(
jskult_archive_manage_data["target_table"],
jskult_archive_manage_data["filter_column"],
jskult_archive_manage_data["filter_date"])
logger.info(
f"アーカイブしたデータのDBから削除に成功しました。対象テーブル{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
# 次回に向けてアーカイブ管理テーブルを更新する
jskult_archive_manager.update_archive_manage(
jskult_archive_manage_data["target_table"])
logger.info(
f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}")
logger.info("処理終了:実消化_過去データアーカイブ処理")
except Exception as e:
logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}")

View File

@ -3,7 +3,7 @@ from src.batch.environment.jskult_batch_environment import \
from src.system_var import environment
class TrnResultDataBioLotEnvironment(JskultBatchEnvironment):
class UpdateBusinessDayEnvironment(JskultBatchEnvironment):
"""実消化&アルトマークのバッチ処理で使用する環境変数を管理するクラス"""
def __init__(self):
@ -14,6 +14,9 @@ class TrnResultDataBioLotEnvironment(JskultBatchEnvironment):
self.PROCESS_NAME = environment.PROCESS_NAME
self.TRANSFER_RESULT_FOLDER = environment.TRANSFER_RESULT_FOLDER
self.TRANSFER_RESULT_FILE_NAME = environment.TRANSFER_RESULT_FILE_NAME
self.JSK_RECEIVE_FOLDER = environment.JSK_RECEIVE_FOLDER
self.DATA_IMPORT_BUCKET = environment.DATA_IMPORT_BUCKET
self.DATA_IMPORT_FOLDER = environment.DATA_IMPORT_FOLDER
def validate(self):
"""
@ -23,13 +26,18 @@ class TrnResultDataBioLotEnvironment(JskultBatchEnvironment):
Raises:
EnvironmentVariableNotSetException: 環境変数の設定ミス
"""
super()._assert_variable_not_empty(self.JSKULT_BACKUP_BUCKET, 'JSKULT_BACKUP_BUCKET')
super()._assert_variable_not_empty(self.BATCH_MANAGE_DYNAMODB_TABLE_NAME, 'BATCH_MANAGE_DYNAMODB_TABLE_NAME')
super()._assert_variable_not_empty(
self.JSKULT_BACKUP_BUCKET, 'JSKULT_BACKUP_BUCKET')
super()._assert_variable_not_empty(
self.BATCH_MANAGE_DYNAMODB_TABLE_NAME, 'BATCH_MANAGE_DYNAMODB_TABLE_NAME')
super()._assert_variable_not_empty(self.BATCH_EXECUTION_ID, 'BATCH_EXECUTION_ID')
super()._assert_variable_is_int(self.MAX_RUN_COUNT, 'MAX_RUN_COUNT')
# MAX_RUN_COUNTは数値として扱うため、検査後に変換
self.MAX_RUN_COUNT = int(self.MAX_RUN_COUNT)
super()._assert_variable_not_empty(self.PROCESS_NAME, 'PROCESS_NAME')
super()._assert_variable_not_empty(self.TRANSFER_RESULT_FOLDER, 'TRANSFER_RESULT_FOLDER')
super()._assert_variable_not_empty(self.TRANSFER_RESULT_FILE_NAME, 'TRANSFER_RESULT_FILE_NAME')
super()._assert_variable_not_empty(
self.TRANSFER_RESULT_FOLDER, 'TRANSFER_RESULT_FOLDER')
super()._assert_variable_not_empty(
self.TRANSFER_RESULT_FILE_NAME, 'TRANSFER_RESULT_FILE_NAME')
super()._assert_variable_not_empty(self.DATA_IMPORT_BUCKET, 'DATA_IMPORT_BUCKET')
super()._assert_variable_not_empty(self.DATA_IMPORT_FOLDER, 'DATA_IMPORT_FOLDER')

View File

@ -1,20 +1,23 @@
import json
from datetime import datetime
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.aws.s3 import JskTransferListBucket
from src.batch.environment.trn_result_data_bio_lot_environment import \
UpdateBusinessDayEnvironment
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.aws.s3 import (JskTransferListBucket, JskIOBucket)
from src.batch.environment.update_business_day_environment import \
UpdateBusinessDayEnvironment
from src.batch import archive_jsk_data
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.db.database import Database
from src.error.exceptions import (BatchOperationException,
from src.manager.jskult_archive_manager import JskultArchiveManager
from src.error.exceptions import (NotReadyException,
EnvironmentVariableNotSetException,
MaxRunCountReachedException)
)
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('日付テーブル更新')
class UpdateBusinessDay(JskultBatchEntrypoint):
def __init__(self):
super().__init__()
@ -28,5 +31,105 @@ class UpdateBusinessDay(JskultBatchEntrypoint):
return
def execute(self):
# TODO: ここで日付更新処理を実行する
pass
"""日付テーブル更新"""
logger.info('I-1 処理開始: 実消化アルトマーク_日付テーブル更新')
jskult_hdke_tbl_manager = JskultHdkeTblManager()
jskult_archive_manager = JskultArchiveManager(
)
jskult_batch_status_manager = JskultBatchStatusManager(
self.environment.PROCESS_NAME,
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
'update_business_day',
self.environment.MAX_RUN_COUNT,
0
)
jskult_batch_run_manager = JskultBatchRunManager(
self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME,
self.environment.BATCH_EXECUTION_ID)
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_START)
if not jskult_hdke_tbl_manager.can_run_process():
logger.error(
'日次バッチ処理中でない、もしくはdump取得が正常終了していないため、日次バッチ処理を異常終了します。')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
raise NotReadyException()
if not jskult_batch_status_manager.can_run_business_day_update():
# 後続処理の起動条件を満たしていない場合
# 処理ステータスを「処理待」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_WAITING)
# バッチ実行管理テーブルに「retry」で登録
jskult_batch_run_manager.batch_retry()
logger.info('処理待実消化アルトマーク_日付テーブル更新')
return
_, _, archive_date, _, _, _ = jskult_archive_manager.get_archive_manage()
_, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses()
try:
if archive_date == process_date:
logger.info('[NOTICE]実消化データアーカイブ取得処理を実行します。')
# 処理ステータスを「処理中」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DOING)
jskult_archive_manager.get_archive_data()
archive_jsk_data.exec()
dt = datetime.strptime(process_date, "%Y/%m/%d")
# 日付テーブルの処理年月日が月曜日の場合
if dt.weekday() == 0:
if not JskultBatchStatusManager.is_done_ultmarc_import():
logger.info('[NOTICE]アルトマーク取込稼働日でしたが、アルトマーク取込が行われませんでした。')
# 転送ファイル一覧を取得
try:
transfer_list_bucket = JskTransferListBucket()
transfer_list_file_path = transfer_list_bucket.download_transfer_result_file(
process_date)
except Exception as e:
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
with open(transfer_list_file_path) as f:
transfer_list = json.load(f)
try:
jsk_io_bucket = JskIOBucket()
jsk_receive_file_list: str = jsk_io_bucket.get_file_list()
except Exception as e:
logger.exception(f'実消化データリスト取得に失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
if transfer_list is not None:
# TODO jsk_receive_file_listと比較して結果不足ファイルがあった場合(I-5)ログ出力
# TODO jsk_receive_file_listと比較して受領予定にないファイルがあった場合(I-6)ログ出力
pass
JskultHdkeTblManager.update_batch_process_complete()
# 処理ステータスを「処理済」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DONE)
JskultBatchRunManager.batch_success()
logger.info('[NOTICE]処理終了: 実消化アルトマーク_日付テーブル更新')
except Exception as e:
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
logger.exception(f'異常終了実消化アルトマーク_日付テーブル更新。{e}')

View File

@ -16,3 +16,6 @@ class MaxRunCountReachedException(MeDaCaException):
class EnvironmentVariableNotSetException(MeDaCaException):
pass
class NotReadyException(MeDaCaException):
pass

View File

@ -0,0 +1,112 @@
from src.db.database import Database
from src.logging.get_logger import get_logger
logger = get_logger("アーカイブ管理テーブル操作")
class JskultArchiveManager:
_db: Database = None
def __init__(self):
self._db = Database.get_instance()
def get_archive_manage(self):
"""対象テーブル、対象項目、対象年月、実行間隔(月)、前回対象年月、保存先を取得"""
try:
logger.info("処理開始get_archive_manage")
sql = """
select
target_table
, filter_column
, filter_date
, run_interval_months
, prev_filter_date
, archive_storage
from
internal07.jskult_archive_manage;
"""
self._db.connect()
jskult_archive_manage_data = self._db.execute_select(sql)
logger.info("処理終了get_archive_manage")
return jskult_archive_manage_data
except Exception as e:
logger.info("異常終了get_archive_manage")
raise
finally:
self._db.disconnect()
def get_archive_data(self,target_table:str, filter_column:str, filter_date:str):
"""アーカイブするデータを取得"""
try:
logger.info("処理開始get_archive_data")
sql = f"""
select
*
from
src07.{target_table}
where
str_to_date({filter_column},'%Y%m%d') <= :filter_date;
"""
self._db.connect()
parameter_dict = {'filter_date' : filter_date}
target_table_data = self._db.execute_select(sql, parameter_dict)
logger.info("処理終了get_archive_data")
return target_table_data
except Exception as e:
logger.info("異常終了get_archive_data")
raise
finally:
self._db.disconnect()
def delete_archive_data(self, target_table:str, filter_column:str, filter_date:str):
"""アーカイブしたデータを削除"""
try:
logger.info("処理開始delete_archive_data")
sql = f"""
delete from
src07.{target_table}
where
str_to_date({filter_column},'%Y%m%d') <= :filter_date;
"""
self._db.connect()
self._db.begin()
parameter_dict = {'filter_date' : filter_date}
self._db.execute(sql, parameter_dict)
self._db.commit()
logger.info("処理終了delete_archive_data")
return
except Exception as e:
self._db.rollback()
logger.info("異常終了delete_archive_data")
raise
finally:
self._db.disconnect()
def update_archive_manage(self, target_table:str):
"""アーカイブ管理テーブルの指定した対象テーブルのレコードを更新する"""
try:
logger.info("処理開始update_archive_manage")
sql = f"""
update internal07.jskult_archive_manage
set
prev_filter_date = filter_date
, filter_date = LAST_DAY(
DATE_ADD(filter_date, INTERVAL run_interval_months MONTH)
)
, upd_user = CURRENT_USER()
, upd_date = NOW()
where
target_table = '{target_table}';
"""
self._db.connect()
self._db.begin()
self._db.execute(sql)
self._db.commit()
logger.info("処理終了update_archive_manage")
return
except Exception as e:
self._db.rollback()
logger.info("異常終了update_archive_manage")
raise
finally:
self._db.disconnect()

View File

@ -22,8 +22,12 @@ JSKULT_BACKUP_BUCKET = os.environ.get('JSKULT_BACKUP_BUCKET', None)
JSK_IO_BUCKET = os.environ.get('JSK_IO_BUCKET', None)
JSK_BACKUP_FOLDER = os.environ.get('JSK_BACKUP_FOLDER', None)
JSK_DATA_SEND_FOLDER = os.environ.get('JSK_DATA_SEND_FOLDER', None)
JSK_RECEIVE_FOLDER = os.environ.get('JSK_RECEIVE_FOLDER', None)
BATCH_MANAGE_DYNAMODB_TABLE_NAME = os.environ.get(
'BATCH_MANAGE_DYNAMODB_TABLE_NAME', None)
DATA_IMPORT_BUCKET = os.environ.get('DATA_IMPORT_BUCKET', None)
DATA_IMPORT_FOLDER = os.environ.get('DATA_IMPORT_FOLDER', None)
JSKULT_ARCHIVE_BUCKET = os.environ.get('JSKULT_ARCHIVE_BUCKET', None)
# 初期値がある環境変数