newdwh2021/lambda/transfer-encise-data/transfer-encise-data.py

106 lines
5.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import logging
import os
from zoneinfo import ZoneInfo
import boto3
# 環境変数
DATA_IMPORT_BUCKET = os.environ["DATA_IMPORT_BUCKET"]
ENCISE_BACKUP_BUCKET = os.environ["ENCISE_BACKUP_BUCKET"]
ENCISE_TARGET_FOLDER = os.environ["ENCISE_TARGET_FOLDER"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
# S3クライアント
s3_client = boto3.client('s3')
# logger設定
logger = logging.getLogger()
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Encise受信データ転送処理')
# ② 処理開始時に受け取ったイベント情報をログに出力する
# バケット名・フォルダ名・受信データファイル名をメモリに保持
s3_event = event["Records"][0]["s3"]
event_bucket_name = s3_event["bucket"]["name"]
event_object_key = s3_event["object"]["key"]
event_file_name = os.path.basename(event_object_key)
event_folder_name = os.path.dirname(event_object_key).split('/')[0]
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
logger.info(f'I-02-01 ファイル名:{event_file_name}')
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
try:
s3_client.head_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.error(f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
return
except Exception:
# .doingファイルが見つからなかった場合は、処理を続行する
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
logger.info('I-03-01 Encise受信データを転送します')
s3_client.put_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
# ⑤ 受信データファイルを、Enciseデータバックアップバケットにコピーする
copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key}
execute_date_yyyymm = datetime.date.today().strftime('%Y/%m')
s3_client.copy_object(
Bucket=ENCISE_BACKUP_BUCKET,
Key=f'{event_folder_name}/{execute_date_yyyymm}/{event_file_name}',
CopySource=copy_source,
ContentType='text/csv', # ファイルのタイプを指定する必要があるため、ContentTypeを指定する。
MetadataDirective='REPLACE' # ファイルのメタデータを置き換える必要があるため、MetadataDirectiveを指定する。
)
logger.info(f'I-04-01 Encise受信データのバックアップ完了{ENCISE_BACKUP_BUCKET}/{event_folder_name}/{execute_date_yyyymm}/{event_file_name}')
# ⑥ 受信データファイルを、データ登録バケットのEnciseデータ取込フォルダに移動する
s3_client.copy_object(
Bucket=DATA_IMPORT_BUCKET,
Key=f'{ENCISE_TARGET_FOLDER}/{event_file_name}',
CopySource=copy_source,
ContentType='text/csv', # ファイルのタイプを指定する必要があるため、ContentTypeを指定する。
MetadataDirective='REPLACE' # ファイルのメタデータを置き換える必要があるため、MetadataDirectiveを指定する。
)
# コピー後、元のバケットからは削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
logger.info(f'I-05-01 Encise受信データの転送完了{DATA_IMPORT_BUCKET}/{ENCISE_TARGET_FOLDER}/{event_file_name}')
# ⑦ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.info('I-01-06 処理終了 Encise受信データ転送処理')
except Exception as e:
logger.exception(f'想定外のエラーが発生しました。処理を終了します。 例外内容:{e}')
raise e
return