feat: レビュー指摘対応。取り込み後のファイル削除も追加。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-12 15:14:05 +09:00
parent 54c411b3c3
commit 26c4170643
5 changed files with 102 additions and 72 deletions

View File

@ -3,4 +3,8 @@ DB_PORT=************
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=src05
LOG_LEVEL=INFO
LOG_LEVEL=INFO
ULTMARC_DATA_BUCKET=****************
ULTMARC_DATA_FOLDER=recv
ULTMARC_BACKUP_BUCKET=****************
ULTMARC_BACKUP_FOLDER=ultmarc

View File

@ -5,7 +5,7 @@
"version": "0.2.0",
"configurations": [
{
"name": "Batch Sample",
"name": "(DEBUG)jskult batch daily",
"type": "python",
"request": "launch",
"program": "entrypoint.py",

View File

@ -3,6 +3,8 @@ import tempfile
import boto3
from src.system_var import environment
class S3Client:
__s3_client = boto3.client('s3')
@ -16,13 +18,12 @@ class S3Client:
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']} for content in contents if not content['Key'].endswith('/')]
return objects
def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_client.copy(copy_source, dest_bucket, dest_key)
return
def download_file(self, bucket_name: str, file_key: str, file):
self.__s3_client.download_fileobj(
Bucket=bucket_name,
@ -31,50 +32,53 @@ class S3Client:
)
return
def upload_file(self, local_file_path: str, bucket_name: str, file_key: str):
self.__s3_client.upload_file(
local_file_path,
Bucket=bucket_name,
Key=file_key
)
local_file_path,
Bucket=bucket_name,
Key=file_key
)
def delete_file(self, bucket_name: str, file_key: str):
self.__s3_client.delete_object(
Bucket=bucket_name,
Key=file_key
)
class S3Bucket():
_s3_client = S3Client()
_bucket_name: str = None
def __str__(self) -> str:
return self._bucket_name
class UltmarcBucket(S3Bucket):
# TODO: 環境変数にする
_bucket_name = 'mbj-newdwh2021-staging-jskult-ultmarc'
_folder = 'recv'
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def list_edi_file(self):
def list_dat_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_edi_file(self, edi_filename: str):
def download_dat_file(self, dat_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{edi_filename.replace("recv/", "")}')
temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, edi_filename, f)
self._s3_client.download_file(self._bucket_name, dat_filename, f)
f.seek(0)
return temporary_file_path
def backup_edi_file(self, edi_file_key: str, datetime_key: str):
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{edi_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(str(self), edi_file_key, str(ultmarc_backup_bucket), backup_key)
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
def delete_dat_file(self, dat_file_key: str):
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class JskUltBackupBucket(S3Bucket):
# TODO: 環境変数にする
_bucket_name = 'mbj-newdwh2021-staging-jskult-backup'
_bucket_name = environment.ULTMARC_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket):
# TODO: 環境変数にする
_folder = 'ultmarc'
_folder = environment.ULTMARC_BACKUP_FOLDER

View File

@ -1,8 +1,7 @@
"""アルトマークデータ保管"""
from datetime import datetime
from src.aws.s3 import UltmarcBucket
from src.batch.common.batch_config import BatchConfig
from src.batch.ultmarc.datfile import DatFile
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
UltmarcTableMapperFactory
@ -11,20 +10,16 @@ from src.logging.get_logger import get_logger
logger = get_logger('アルトマークデータ保管')
ultmarc_bucket = UltmarcBucket()
batch_config = BatchConfig.get_instance()
def exec_import():
"""アルトマーク取り込み処理"""
try:
logger.info('ultmarc import process START')
# DBセットアップ
db = Database.get_instance()
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
logger.info('Transaction BEGIN')
logger.info('アルトマーク取込処理: 開始')
# datファイルをS3から取得する
dat_file_list = ultmarc_bucket.list_edi_file()
dat_file_list = ultmarc_bucket.list_dat_file()
# ファイルがない場合は処理せず、正常終了とする
if len(dat_file_list) == 0:
logger.info('ファイルがないため、アルトマーク取込処理をスキップします')
@ -34,52 +29,71 @@ def exec_import():
if len(dat_file_list) > 1:
logger.error(f'複数の取込ファイルがあるため、異常終了 ファイル一覧:{dat_file_list}')
return
# ファイルの件数は必ず1件になる
dat_file_info = dat_file_list[0]
# 0Byteの場合、
if dat_file_info['size'] == 0:
logger.info(f'0Byteファイルのため、処理をスキップします。ファイル名={dat_file_info["filename"]}')
return
dat_file_name = dat_file_info['filename']
logger.info(f"Get File Name :{dat_file_name}")
now = datetime.now().strftime('%Y/%m/%d')
logger.info(f"{dat_file_name}を取り込みます")
# ファイルをバックアップ
# 現行は、jobctrl_dailyの先頭でやっている
ultmarc_bucket.backup_edi_file(dat_file_name, now)
ultmarc_bucket.backup_dat_file(dat_file_name, batch_config.syor_date)
# datファイルをダウンロード
local_file_path = ultmarc_bucket.download_edi_file(dat_file_name)
mapper_factory = UltmarcTableMapperFactory()
local_file_path = ultmarc_bucket.download_dat_file(dat_file_name)
dat_file = DatFile.from_path(local_file_path)
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(log_message)
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
# 処理結果をログに出力する
logger.info('Transaction COMMIT')
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
# アルトマーク取り込み実行
_import_to_ultmarc_table(dat_file)
# 処理後、ファイルをS3から削除する
logger.info(f'取り込み処理が完了したため、datファイルを削除。ファイル名={dat_file_name}')
ultmarc_bucket.delete_dat_file(dat_file_name)
except Exception as e:
logger.exception(e)
raise e
finally:
db.disconnect()
logger.info('終了')
logger.info('アルトマーク取込処理: 終了')
def _import_to_ultmarc_table(dat_file: DatFile):
db = Database.get_instance()
# DB接続
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
logger.info('Transaction BEGIN')
mapper_factory = UltmarcTableMapperFactory()
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.records,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
except Exception as e:
logger.warning(e)
record = line.records
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(f'ERROR_LINE: {log_message}')
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
db.disconnect()
# 処理結果をログに出力する
logger.info('Transaction COMMIT')
logger.info(f'ultmarc import process RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
# 1件でもエラーがあれば、通知用にログに出力する
if dat_file.error_count > 0:
logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。')
return

View File

@ -1,12 +1,20 @@
import os
# Database
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
LOG_LEVEL = os.environ['LOG_LEVEL']
# AWS
ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET']
ULTMARC_DATA_FOLDER = os.environ['ULTMARC_DATA_FOLDER']
ULTMARC_BACKUP_BUCKET = os.environ['ULTMARC_BACKUP_BUCKET']
ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))