From 26c4170643e723f9b8ed68a833ab43d185e4a9d8 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 12 Apr 2023 15:14:05 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E3=83=AC=E3=83=93=E3=83=A5=E3=83=BC?= =?UTF-8?q?=E6=8C=87=E6=91=98=E5=AF=BE=E5=BF=9C=E3=80=82=E5=8F=96=E3=82=8A?= =?UTF-8?q?=E8=BE=BC=E3=81=BF=E5=BE=8C=E3=81=AE=E3=83=95=E3=82=A1=E3=82=A4?= =?UTF-8?q?=E3=83=AB=E5=89=8A=E9=99=A4=E3=82=82=E8=BF=BD=E5=8A=A0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/jskult-batch-daily/.env.example | 6 +- ecs/jskult-batch-daily/.vscode/launch.json | 2 +- ecs/jskult-batch-daily/src/aws/s3.py | 54 +++++----- .../src/batch/ultmarc/ultmarc_process.py | 102 ++++++++++-------- .../src/system_var/environment.py | 10 +- 5 files changed, 102 insertions(+), 72 deletions(-) diff --git a/ecs/jskult-batch-daily/.env.example b/ecs/jskult-batch-daily/.env.example index d1f4ddf4..91ae5f74 100644 --- a/ecs/jskult-batch-daily/.env.example +++ b/ecs/jskult-batch-daily/.env.example @@ -3,4 +3,8 @@ DB_PORT=************ DB_USERNAME=************ DB_PASSWORD=************ DB_SCHEMA=src05 -LOG_LEVEL=INFO \ No newline at end of file +LOG_LEVEL=INFO +ULTMARC_DATA_BUCKET=**************** +ULTMARC_DATA_FOLDER=recv +ULTMARC_BACKUP_BUCKET=**************** +ULTMARC_BACKUP_FOLDER=ultmarc diff --git a/ecs/jskult-batch-daily/.vscode/launch.json b/ecs/jskult-batch-daily/.vscode/launch.json index 2df4d8bf..9dbaa9c6 100644 --- a/ecs/jskult-batch-daily/.vscode/launch.json +++ b/ecs/jskult-batch-daily/.vscode/launch.json @@ -5,7 +5,7 @@ "version": "0.2.0", "configurations": [ { - "name": "Batch Sample", + "name": "(DEBUG)jskult batch daily", "type": "python", "request": "launch", "program": "entrypoint.py", diff --git a/ecs/jskult-batch-daily/src/aws/s3.py b/ecs/jskult-batch-daily/src/aws/s3.py index ef3e2b36..43460f8d 100644 --- a/ecs/jskult-batch-daily/src/aws/s3.py +++ b/ecs/jskult-batch-daily/src/aws/s3.py @@ -3,6 +3,8 @@ import tempfile import boto3 +from src.system_var import environment + class S3Client: __s3_client = boto3.client('s3') @@ -16,13 +18,12 @@ class S3Client: # 末尾がスラッシュで終わるものはフォルダとみなしてスキップする objects = [{'filename': content['Key'], 'size': content['Size']} for content in contents if not content['Key'].endswith('/')] return objects - + def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None: copy_source = {'Bucket': src_bucket, 'Key': src_key} self.__s3_client.copy(copy_source, dest_bucket, dest_key) return - def download_file(self, bucket_name: str, file_key: str, file): self.__s3_client.download_fileobj( Bucket=bucket_name, @@ -31,50 +32,53 @@ class S3Client: ) return - def upload_file(self, local_file_path: str, bucket_name: str, file_key: str): self.__s3_client.upload_file( - local_file_path, - Bucket=bucket_name, - Key=file_key - ) + local_file_path, + Bucket=bucket_name, + Key=file_key + ) + + def delete_file(self, bucket_name: str, file_key: str): + self.__s3_client.delete_object( + Bucket=bucket_name, + Key=file_key + ) class S3Bucket(): _s3_client = S3Client() _bucket_name: str = None - def __str__(self) -> str: - return self._bucket_name class UltmarcBucket(S3Bucket): - # TODO: 環境変数にする - _bucket_name = 'mbj-newdwh2021-staging-jskult-ultmarc' - _folder = 'recv' + _bucket_name = environment.ULTMARC_DATA_BUCKET + _folder = environment.ULTMARC_DATA_FOLDER - def list_edi_file(self): + def list_dat_file(self): return self._s3_client.list_objects(self._bucket_name, self._folder) - - def download_edi_file(self, edi_filename: str): + + def download_dat_file(self, dat_filename: str): # 一時ファイルとして保存する temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{edi_filename.replace("recv/", "")}') + temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}') with open(temporary_file_path, mode='wb') as f: - self._s3_client.download_file(self._bucket_name, edi_filename, f) + self._s3_client.download_file(self._bucket_name, dat_filename, f) f.seek(0) return temporary_file_path - - def backup_edi_file(self, edi_file_key: str, datetime_key: str): + + def backup_dat_file(self, dat_file_key: str, datetime_key: str): ultmarc_backup_bucket = UltmarcBackupBucket() - backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{edi_file_key.replace(f"{self._folder}/", "")}' - self._s3_client.copy(str(self), edi_file_key, str(ultmarc_backup_bucket), backup_key) + backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}' + self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key) + + def delete_dat_file(self, dat_file_key: str): + self._s3_client.delete_file(self._bucket_name, dat_file_key) class JskUltBackupBucket(S3Bucket): - # TODO: 環境変数にする - _bucket_name = 'mbj-newdwh2021-staging-jskult-backup' + _bucket_name = environment.ULTMARC_BACKUP_BUCKET class UltmarcBackupBucket(JskUltBackupBucket): - # TODO: 環境変数にする - _folder = 'ultmarc' + _folder = environment.ULTMARC_BACKUP_FOLDER diff --git a/ecs/jskult-batch-daily/src/batch/ultmarc/ultmarc_process.py b/ecs/jskult-batch-daily/src/batch/ultmarc/ultmarc_process.py index f4d8fafe..42bc6af6 100644 --- a/ecs/jskult-batch-daily/src/batch/ultmarc/ultmarc_process.py +++ b/ecs/jskult-batch-daily/src/batch/ultmarc/ultmarc_process.py @@ -1,8 +1,7 @@ """アルトマークデータ保管""" -from datetime import datetime - from src.aws.s3 import UltmarcBucket +from src.batch.common.batch_config import BatchConfig from src.batch.ultmarc.datfile import DatFile from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \ UltmarcTableMapperFactory @@ -11,20 +10,16 @@ from src.logging.get_logger import get_logger logger = get_logger('アルトマークデータ保管') ultmarc_bucket = UltmarcBucket() +batch_config = BatchConfig.get_instance() def exec_import(): """アルトマーク取り込み処理""" try: - logger.info('ultmarc import process START') - # DBセットアップ - db = Database.get_instance() - db.connect() - # ファイル単位でトランザクションを行う - db.begin() - logger.info('Transaction BEGIN') + logger.info('アルトマーク取込処理: 開始') # datファイルをS3から取得する - dat_file_list = ultmarc_bucket.list_edi_file() + dat_file_list = ultmarc_bucket.list_dat_file() + # ファイルがない場合は処理せず、正常終了とする if len(dat_file_list) == 0: logger.info('ファイルがないため、アルトマーク取込処理をスキップします') @@ -34,52 +29,71 @@ def exec_import(): if len(dat_file_list) > 1: logger.error(f'複数の取込ファイルがあるため、異常終了 ファイル一覧:{dat_file_list}') return + # ファイルの件数は必ず1件になる dat_file_info = dat_file_list[0] # 0Byteの場合、 if dat_file_info['size'] == 0: logger.info(f'0Byteファイルのため、処理をスキップします。ファイル名={dat_file_info["filename"]}') return + dat_file_name = dat_file_info['filename'] - logger.info(f"Get File Name :{dat_file_name}") - now = datetime.now().strftime('%Y/%m/%d') + logger.info(f"{dat_file_name}を取り込みます") # ファイルをバックアップ # 現行は、jobctrl_dailyの先頭でやっている - ultmarc_bucket.backup_edi_file(dat_file_name, now) + ultmarc_bucket.backup_dat_file(dat_file_name, batch_config.syor_date) # datファイルをダウンロード - local_file_path = ultmarc_bucket.download_edi_file(dat_file_name) - - mapper_factory = UltmarcTableMapperFactory() + local_file_path = ultmarc_bucket.download_dat_file(dat_file_name) dat_file = DatFile.from_path(local_file_path) - # datファイルを1行ずつ処理し、各テーブルへ登録 - for line in dat_file: - try: - # 書き込み先のテーブルを特定 - mapper_class = mapper_factory.create( - line.layout_class, - line.records, - db - ) - mapper_class.make_query() - mapper_class.execute_queries() - dat_file.count_up_success() - except Exception as e: - logger.warning(e) - record = line.records - log_message = ','.join([f'"{r}"' for r in record]) - logger.warning(log_message) - dat_file.count_up_error() - # すべての行を登録終えたらコミットする - db.commit() - # 処理結果をログに出力する - logger.info('Transaction COMMIT') - logger.info(f'ultmarc import process RESULT') - logger.info(f'SUCCESS_COUNT={dat_file.success_count}') - logger.info(f'ERROR_COUNT={dat_file.error_count}') - logger.info(f'ALL_COUNT={dat_file.total_count}') + # アルトマーク取り込み実行 + _import_to_ultmarc_table(dat_file) + # 処理後、ファイルをS3から削除する + logger.info(f'取り込み処理が完了したため、datファイルを削除。ファイル名={dat_file_name}') + ultmarc_bucket.delete_dat_file(dat_file_name) except Exception as e: logger.exception(e) raise e finally: - db.disconnect() - logger.info('終了') + logger.info('アルトマーク取込処理: 終了') + + +def _import_to_ultmarc_table(dat_file: DatFile): + db = Database.get_instance() + # DB接続 + db.connect() + # ファイル単位でトランザクションを行う + db.begin() + logger.info('Transaction BEGIN') + mapper_factory = UltmarcTableMapperFactory() + # datファイルを1行ずつ処理し、各テーブルへ登録 + for line in dat_file: + try: + # 書き込み先のテーブルを特定 + mapper_class = mapper_factory.create( + line.layout_class, + line.records, + db + ) + mapper_class.make_query() + mapper_class.execute_queries() + dat_file.count_up_success() + except Exception as e: + logger.warning(e) + record = line.records + log_message = ','.join([f'"{r}"' for r in record]) + logger.warning(f'ERROR_LINE: {log_message}') + dat_file.count_up_error() + # すべての行を登録終えたらコミットする + db.commit() + db.disconnect() + # 処理結果をログに出力する + logger.info('Transaction COMMIT') + logger.info(f'ultmarc import process RESULT') + logger.info(f'SUCCESS_COUNT={dat_file.success_count}') + logger.info(f'ERROR_COUNT={dat_file.error_count}') + logger.info(f'ALL_COUNT={dat_file.total_count}') + + # 1件でもエラーがあれば、通知用にログに出力する + if dat_file.error_count > 0: + logger.warning('取り込みに失敗した行があります。詳細は`ERROR_LINE:`の行を確認してください。') + return diff --git a/ecs/jskult-batch-daily/src/system_var/environment.py b/ecs/jskult-batch-daily/src/system_var/environment.py index 143d2d26..e7248191 100644 --- a/ecs/jskult-batch-daily/src/system_var/environment.py +++ b/ecs/jskult-batch-daily/src/system_var/environment.py @@ -1,12 +1,20 @@ import os +# Database DB_HOST = os.environ['DB_HOST'] DB_PORT = int(os.environ['DB_PORT']) DB_USERNAME = os.environ['DB_USERNAME'] DB_PASSWORD = os.environ['DB_PASSWORD'] DB_SCHEMA = os.environ['DB_SCHEMA'] -LOG_LEVEL = os.environ['LOG_LEVEL'] +# AWS +ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET'] +ULTMARC_DATA_FOLDER = os.environ['ULTMARC_DATA_FOLDER'] +ULTMARC_BACKUP_BUCKET = os.environ['ULTMARC_BACKUP_BUCKET'] +ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER'] + +# 初期値がある環境変数 +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))