Merge pull request #345 feature-NEWDWH2021-1445 into develop

This commit is contained in:
nik afiq 2024-02-15 14:46:03 +09:00
commit 3dd7b2482d

View File

@ -0,0 +1,101 @@
import datetime
import logging
import os
from zoneinfo import ZoneInfo
import boto3
# 環境変数
DATA_IMPORT_BUCKET = os.environ["DATA_IMPORT_BUCKET"]
ENCISE_BACKUP_BUCKET = os.environ["ENCISE_BACKUP_BUCKET"]
ENCISE_TARGET_FOLDER = os.environ["ENCISE_TARGET_FOLDER"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
# S3クライアント
s3_client = boto3.client('s3')
# logger設定
logger = logging.getLogger()
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Encise受信データ転送処理')
# ② 処理開始時に受け取ったイベント情報をログに出力する
# バケット名・フォルダ名・受信データファイル名をメモリに保持
s3_event = event["Records"][0]["s3"]
event_bucket_name = s3_event["bucket"]["name"]
event_object_key = s3_event["object"]["key"]
event_file_name = os.path.basename(event_object_key)
event_folder_name = os.path.dirname(event_object_key).split('/')[0]
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
logger.info(f'I-02-01 ファイル名:{event_file_name}')
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
try:
s3_client.head_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.error(f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
return
except Exception:
# .doingファイルが見つからなかった場合は、処理を続行する
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
logger.info('I-03-01 Encise受信データを転送します')
s3_client.put_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
# ⑤ 受信データファイルを、Enciseデータバックアップバケットにコピーする
copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key}
execute_date_yyyymm = datetime.date.today().strftime('%Y/%m')
s3_client.copy_object(
Bucket=ENCISE_BACKUP_BUCKET,
Key=f'{event_folder_name}/{execute_date_yyyymm}/{event_file_name}',
CopySource=copy_source
)
logger.info(f'I-04-01 Encise受信データのバックアップ完了{ENCISE_BACKUP_BUCKET}/{event_folder_name}/{execute_date_yyyymm}/{event_file_name}')
# ⑥ 受信データファイルを、データ登録バケットのEnciseデータ取込フォルダに移動する
s3_client.copy_object(
Bucket=DATA_IMPORT_BUCKET,
Key=f'{ENCISE_TARGET_FOLDER}/{event_file_name}',
CopySource=copy_source
)
# コピー後、元のバケットからは削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
logger.info(f'I-05-01 Encise受信データの転送完了{DATA_IMPORT_BUCKET}/{ENCISE_TARGET_FOLDER}/{event_file_name}')
# ⑦ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.info('I-01-06 処理終了 Encise受信データ転送処理')
except Exception as e:
logger.exception(f'想定外のエラーが発生しました。処理を終了します。 例外内容:{e}')
raise e
return