Merge branch 'feature-NEWDWH2021-1902' of nds-tyo.git.backlog.com:/NEWDWH2021/newsdwh2021 into feature-NEWDWH2021-1902

This commit is contained in:
yono 2025-05-28 10:39:37 +09:00
commit ca36186e6f
5 changed files with 65 additions and 44 deletions

View File

@ -1,8 +1,3 @@
import gzip
import os
import os.path as path
import shutil
import tempfile
import boto3
from src.system_var import environment
@ -12,7 +7,8 @@ class S3Client:
_bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
response = self.__s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0:
return []
contents = response['Contents']
@ -47,16 +43,19 @@ class S3Client:
Key=file_key
)
class S3Bucket():
_s3_client = S3Client()
_bucket_name: str = None
class JskultArchiveBucket(S3Bucket):
class JskultArchiveBucket(S3Bucket):
_bucket_name = environment.JSKULT_ARCHIVE_BUCKET
def upload_archive_zip_file(self, archive_zip: str, archive_zip_path: str, send_folder: str):
# S3バケットにファイルを移動
archive_zip_name = f'{send_folder}/{archive_zip}'
s3_client = S3Client()
s3_client.upload_file(archive_zip_path, self._bucket_name, archive_zip_name)
return f"{self._bucket_name}/{archive_zip_name}"
s3_client.upload_file(
archive_zip_path, self._bucket_name, archive_zip_name)
return f"{self._bucket_name}/{archive_zip_name}"

View File

@ -1,15 +1,16 @@
from src.logging.get_logger import get_logger
from src.batch.jskult_archive_manager import JskultArchiveManager
from src.aws.s3 import JskultArchiveBucket
import os.path as path
from datetime import timedelta
import tempfile
import csv
import os.path as path
import tempfile
import zipfile
from datetime import timedelta
from src.aws.s3 import JskultArchiveBucket
from src.batch.jskult_archive_manager import JskultArchiveManager
from src.logging.get_logger import get_logger
logger = get_logger("実消化_過去データアーカイブ処理")
def exec():
"""実消化_過去データアーカイブ処理"""
try:
@ -21,25 +22,29 @@ def exec():
# 取得したレコード分繰り返す
for jskult_archive_manage_data in jskult_archive_manage_data_list:
# 対象テーブルで条件項目が条件年月以前のデータを取得
archive_data = jskult_archive_manager.get_archive_data(jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"])
archive_data = jskult_archive_manager.get_archive_data(
jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"])
# 取得データが0件の場合、スキップする
if not archive_data:
logger.info(f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
logger.info(
f"アーカイブ対象データがありませんでした。対象テーブル:{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
continue
# 一時フォルダ作成
with tempfile.TemporaryDirectory() as temporary_dir:
with tempfile.TemporaryDirectory() as temporary_dir:
# 取得したデータをCSVに出力
day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta(days=1)
day_after_prev_filter_date = jskult_archive_manage_data["prev_filter_date"] + timedelta(
days=1)
file_name = f'{jskult_archive_manage_data["target_table"]}_{day_after_prev_filter_date.strftime('%Y%m%d')}_{jskult_archive_manage_data["filter_date"].strftime('%Y%m%d')}'
csv_file_path = path.join(temporary_dir, f"{file_name}.csv")
headers = archive_data[0].keys()
with open(csv_file_path, 'w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=headers, quoting=csv.QUOTE_ALL)
writer = csv.DictWriter(
file, fieldnames=headers, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(archive_data)
logger.info(f"CSVファイル作成に成功しました。{file_name}.csv")
# 作成したCSVをzip形式に圧縮
zip_file_path = path.join(temporary_dir, f"{file_name}.zip")
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
@ -48,16 +53,23 @@ def exec():
# 圧縮したCSVを保存先へアップロード
archive_bucket = JskultArchiveBucket()
upload_file_path = archive_bucket.upload_archive_zip_file(f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"])
upload_file_path = archive_bucket.upload_archive_zip_file(
f"{file_name}.zip", zip_file_path, jskult_archive_manage_data["archive_storage"])
logger.info(f"{upload_file_path}へのアップロードに成功しました。")
# アーカイブしたデータをDBから削除
jskult_archive_manager.delete_archive_data(jskult_archive_manage_data["target_table"], jskult_archive_manage_data["filter_column"], jskult_archive_manage_data["filter_date"])
logger.info(f"アーカイブしたデータのDBから削除に成功しました。対象テーブル{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
jskult_archive_manager.delete_archive_data(
jskult_archive_manage_data["target_table"],
jskult_archive_manage_data["filter_column"],
jskult_archive_manage_data["filter_date"])
logger.info(
f"アーカイブしたデータのDBから削除に成功しました。対象テーブル{jskult_archive_manage_data['target_table']} 条件年月:{jskult_archive_manage_data['filter_date']}")
# 次回に向けてアーカイブ管理テーブルを更新する
jskult_archive_manager.update_archive_manage(jskult_archive_manage_data["target_table"])
logger.info(f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}")
jskult_archive_manager.update_archive_manage(
jskult_archive_manage_data["target_table"])
logger.info(
f"アーカイブ管理テーブルの更新に成功しました。対象テーブル:{jskult_archive_manage_data['target_table']}")
logger.info("処理終了:実消化_過去データアーカイブ処理")
except Exception as e:
logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}")
logger.exception(f"異常終了:実消化_過去データアーカイブ処理 {e}")

View File

@ -1,9 +1,12 @@
from src.db.database import Database
from src.logging.get_logger import get_logger
logger = get_logger("アーカイブ管理テーブル操作")
class JskultArchiveManager:
_db : Database = None
_db: Database = None
def __init__(self):
self._db = Database.get_instance()
@ -18,7 +21,7 @@ class JskultArchiveManager:
, filter_date
, run_interval_months
, prev_filter_date
, archive_storage
, archive_storage
from
internal07.jskult_archive_manage;
"""
@ -72,7 +75,7 @@ class JskultArchiveManager:
self._db.commit()
logger.info("処理終了delete_archive_data")
return
except:
except Exception as e:
self._db.rollback()
logger.info("異常終了delete_archive_data")
raise
@ -84,10 +87,10 @@ class JskultArchiveManager:
try:
logger.info("処理開始update_archive_manage")
sql = f"""
update internal07.jskult_archive_manage
update internal07.jskult_archive_manage
set
prev_filter_date = filter_date
, filter_date = LAST_DAY(
, filter_date = LAST_DAY(
DATE_ADD(filter_date, INTERVAL run_interval_months MONTH)
)
, upd_user = CURRENT_USER()
@ -101,7 +104,7 @@ class JskultArchiveManager:
self._db.commit()
logger.info("処理終了update_archive_manage")
return
except:
except Exception as e:
self._db.rollback()
logger.info("異常終了update_archive_manage")
raise

View File

@ -1,11 +1,10 @@
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
create_engine, text)
from sqlalchemy.engine.url import URL
from tenacity import retry, stop_after_attempt, wait_exponential
from src.error.exceptions import DBException
from src.logging.get_logger import get_logger
from src.system_var import environment
from tenacity import retry, stop_after_attempt, wait_exponential
logger = get_logger(__name__)
@ -57,7 +56,8 @@ class Database:
poolclass=QueuePool
)
self.__autocommit_engine = self.__transactional_engine.execution_options(isolation_level='AUTOCOMMIT')
self.__autocommit_engine = self.__transactional_engine.execution_options(
isolation_level='AUTOCOMMIT')
@classmethod
def get_instance(cls, autocommit=False):
@ -120,10 +120,12 @@ class Database:
try:
# トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(select_query), parameters)
result = self.__connection.execute(
text(select_query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(select_query, parameters)
result = self.__execute_with_transaction(
select_query, parameters)
except Exception as e:
raise DBException(f'SQL Error: {e}')
@ -181,13 +183,14 @@ class Database:
self.__connection = None
def to_jst(self):
self.execute('SET time_zone = "+9:00"')
self.execute('SET time_zone = "+9:00"')
def __execute_with_transaction(self, query: str, parameters: dict):
# トランザクションを開始してクエリを実行する
with self.__connection.begin():
try:
result = self.__connection.execute(text(query), parameters=parameters)
result = self.__connection.execute(
text(query), parameters=parameters)
except Exception as e:
self.__connection.rollback()
raise e

View File

@ -12,7 +12,11 @@ JSKULT_ARCHIVE_BUCKET = os.environ['JSKULT_ARCHIVE_BUCKET']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(
os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(
os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))