diff --git a/ecs/jskult-batch-archive-jsk-data/src/aws/s3.py b/ecs/jskult-batch-archive-jsk-data/src/aws/s3.py index a6e0074a..3f40d8ef 100644 --- a/ecs/jskult-batch-archive-jsk-data/src/aws/s3.py +++ b/ecs/jskult-batch-archive-jsk-data/src/aws/s3.py @@ -1,8 +1,3 @@ -import gzip -import os -import os.path as path -import shutil -import tempfile import boto3 from src.system_var import environment @@ -12,7 +7,8 @@ class S3Client: _bucket_name: str def list_objects(self, bucket_name: str, folder_name: str): - response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + response = self.__s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=folder_name) if response['KeyCount'] == 0: return [] contents = response['Contents'] @@ -47,16 +43,19 @@ class S3Client: Key=file_key ) + class S3Bucket(): _s3_client = S3Client() _bucket_name: str = None -class JskultArchiveBucket(S3Bucket): + +class JskultArchiveBucket(S3Bucket): _bucket_name = environment.JSKULT_ARCHIVE_BUCKET def upload_archive_zip_file(self, archive_zip: str, archive_zip_path: str, send_folder: str): # S3バケットにファイルを移動 archive_zip_name = f'{send_folder}/{archive_zip}' s3_client = S3Client() - s3_client.upload_file(archive_zip_path, self._bucket_name, archive_zip_name) - return f"{self._bucket_name}/{archive_zip_name}" \ No newline at end of file + s3_client.upload_file( + archive_zip_path, self._bucket_name, archive_zip_name) + return f"{self._bucket_name}/{archive_zip_name}" diff --git a/ecs/jskult-batch-archive-jsk-data/src/batch/archive_jsk_data.py b/ecs/jskult-batch-archive-jsk-data/src/batch/archive_jsk_data.py index e02b3da8..3db4804b 100644 --- a/ecs/jskult-batch-archive-jsk-data/src/batch/archive_jsk_data.py +++ b/ecs/jskult-batch-archive-jsk-data/src/batch/archive_jsk_data.py @@ -1,15 +1,16 @@ -from src.logging.get_logger import get_logger -from src.batch.jskult_archive_manager import JskultArchiveManager -from src.aws.s3 import JskultArchiveBucket -import os.path as path -from datetime import timedelta -import tempfile import csv +import os.path as path +import tempfile import zipfile +from datetime import timedelta +from src.aws.s3 import JskultArchiveBucket +from src.batch.jskult_archive_manager import JskultArchiveManager +from src.logging.get_logger import get_logger logger = get_logger("実消化_過去データアーカイブ処理") + def exec(): """実消化_過去データアーカイブ処理""" try: @@ -21,25 +22,29 @@ def exec(): # 取得したレコード分繰り返す for jskult_archive_manage_data in jskult_archive_manage_data_list: # 対象テーブルで条件項目が条件年月以前のデータを取得 - archive_data = jskult_archive_manager.get_archive_data(jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"]) + archive_data = jskult_archive_manager.get_archive_data( + jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"]) # 取得データが0件の場合、スキップする if not archive_data: - logger.info(f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") + logger.info( + f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") continue # 一時フォルダ作成 - with tempfile.TemporaryDirectory() as temporary_dir: + with tempfile.TemporaryDirectory() as temporary_dir: # 取得したデータをCSVに出力 - day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta(days=1) + day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta( + days=1) file_name = f'{jskult_archive_manage_data["target_table"]}_{day_after_prev_filter_date.strftime('%Y%m%d')}_{jskult_archive_manage_data["filter_date"].strftime('%Y%m%d')}' csv_file_path = path.join(temporary_dir, f"{file_name}.csv") headers = archive_data[0].keys() with open(csv_file_path, 'w', newline='') as file: - writer = csv.DictWriter(file, fieldnames=headers, quoting=csv.QUOTE_ALL) + writer = csv.DictWriter( + file, fieldnames=headers, quoting=csv.QUOTE_ALL) writer.writeheader() writer.writerows(archive_data) logger.info(f"CSVファイル作成に成功しました。{file_name}.csv") - + # 作成したCSVをzip形式に圧縮 zip_file_path = path.join(temporary_dir, f"{file_name}.zip") with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf: @@ -48,16 +53,23 @@ def exec(): # 圧縮したCSVを保存先へアップロード archive_bucket = JskultArchiveBucket() - upload_file_path = archive_bucket.upload_archive_zip_file(f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"]) + upload_file_path = archive_bucket.upload_archive_zip_file( + f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"]) logger.info(f"{upload_file_path}へのアップロードに成功しました。") # アーカイブしたデータをDBから削除 - jskult_archive_manager.delete_archive_data(jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"]) - logger.info(f"アーカイブしたデータのDBから削除に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") + jskult_archive_manager.delete_archive_data( + jskult_archive_manage_data["target_table"], + jskult_archive_manage_data["filter_column"], + jskult_archive_manage_data["filter_date"]) + logger.info( + f"アーカイブしたデータのDBから削除に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}") # 次回に向けてアーカイブ管理テーブルを更新する - jskult_archive_manager.update_archive_manage(jskult_archive_manage_data["target_table"]) - logger.info(f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}") + jskult_archive_manager.update_archive_manage( + jskult_archive_manage_data["target_table"]) + logger.info( + f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}") logger.info("処理終了:実消化_過去データアーカイブ処理") except Exception as e: - logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}") \ No newline at end of file + logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}") diff --git a/ecs/jskult-batch-archive-jsk-data/src/batch/jskult_archive_manager.py b/ecs/jskult-batch-archive-jsk-data/src/batch/jskult_archive_manager.py index 3d027aa9..9bb25341 100644 --- a/ecs/jskult-batch-archive-jsk-data/src/batch/jskult_archive_manager.py +++ b/ecs/jskult-batch-archive-jsk-data/src/batch/jskult_archive_manager.py @@ -1,9 +1,12 @@ from src.db.database import Database from src.logging.get_logger import get_logger + logger = get_logger("アーカイブ管理テーブル操作") + class JskultArchiveManager: - _db : Database = None + _db: Database = None + def __init__(self): self._db = Database.get_instance() @@ -18,7 +21,7 @@ class JskultArchiveManager: , filter_date , run_interval_months , prev_filter_date - , archive_storage + , archive_storage from internal07.jskult_archive_manage; """ @@ -72,7 +75,7 @@ class JskultArchiveManager: self._db.commit() logger.info("処理終了:delete_archive_data") return - except: + except Exception as e: self._db.rollback() logger.info("異常終了:delete_archive_data") raise @@ -84,10 +87,10 @@ class JskultArchiveManager: try: logger.info("処理開始:update_archive_manage") sql = f""" - update internal07.jskult_archive_manage + update internal07.jskult_archive_manage set prev_filter_date = filter_date - , filter_date = LAST_DAY( + , filter_date = LAST_DAY( DATE_ADD(filter_date, INTERVAL run_interval_months MONTH) ) , upd_user = CURRENT_USER() @@ -101,7 +104,7 @@ class JskultArchiveManager: self._db.commit() logger.info("処理終了:update_archive_manage") return - except: + except Exception as e: self._db.rollback() logger.info("異常終了:update_archive_manage") raise diff --git a/ecs/jskult-batch-archive-jsk-data/src/db/database.py b/ecs/jskult-batch-archive-jsk-data/src/db/database.py index 5ddaba4e..3f6ce8ea 100644 --- a/ecs/jskult-batch-archive-jsk-data/src/db/database.py +++ b/ecs/jskult-batch-archive-jsk-data/src/db/database.py @@ -1,11 +1,10 @@ from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, create_engine, text) from sqlalchemy.engine.url import URL -from tenacity import retry, stop_after_attempt, wait_exponential - from src.error.exceptions import DBException from src.logging.get_logger import get_logger from src.system_var import environment +from tenacity import retry, stop_after_attempt, wait_exponential logger = get_logger(__name__) @@ -57,7 +56,8 @@ class Database: poolclass=QueuePool ) - self.__autocommit_engine = self.__transactional_engine.execution_options(isolation_level='AUTOCOMMIT') + self.__autocommit_engine = self.__transactional_engine.execution_options( + isolation_level='AUTOCOMMIT') @classmethod def get_instance(cls, autocommit=False): @@ -120,10 +120,12 @@ class Database: try: # トランザクションが開始している場合は、トランザクションを引き継ぐ if self.__connection.in_transaction(): - result = self.__connection.execute(text(select_query), parameters) + result = self.__connection.execute( + text(select_query), parameters) else: # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 - result = self.__execute_with_transaction(select_query, parameters) + result = self.__execute_with_transaction( + select_query, parameters) except Exception as e: raise DBException(f'SQL Error: {e}') @@ -181,13 +183,14 @@ class Database: self.__connection = None def to_jst(self): - self.execute('SET time_zone = "+9:00"') + self.execute('SET time_zone = "+9:00"') def __execute_with_transaction(self, query: str, parameters: dict): # トランザクションを開始してクエリを実行する with self.__connection.begin(): try: - result = self.__connection.execute(text(query), parameters=parameters) + result = self.__connection.execute( + text(query), parameters=parameters) except Exception as e: self.__connection.rollback() raise e diff --git a/ecs/jskult-batch-archive-jsk-data/src/system_var/environment.py b/ecs/jskult-batch-archive-jsk-data/src/system_var/environment.py index 249aa4f0..41ac760f 100644 --- a/ecs/jskult-batch-archive-jsk-data/src/system_var/environment.py +++ b/ecs/jskult-batch-archive-jsk-data/src/system_var/environment.py @@ -12,7 +12,11 @@ JSKULT_ARCHIVE_BUCKET = os.environ['JSKULT_ARCHIVE_BUCKET'] # 初期値がある環境変数 LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') -DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) -DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) -DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) -DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) +DB_CONNECTION_MAX_RETRY_ATTEMPT = int( + os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) +DB_CONNECTION_RETRY_INTERVAL_INIT = int( + os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))