feat: S3バケット間ファイル転送処理を追加

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2024-07-16 16:06:04 +09:00
parent ef740bf707
commit 838f5122fe
2 changed files with 184 additions and 0 deletions

View File

@ -0,0 +1,168 @@
import datetime
import json
import logging
import os
import re
from zoneinfo import ZoneInfo
import boto3
# 環境変数
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
BUCKET_TRANSFER_SETTING_FILE_FOLDER = os.environ["BUCKET_TRANSFER_SETTING_FILE_FOLDER"]
BUCKET_TRANSFER_SETTING_FILE_NAME = os.environ["BUCKET_TRANSFER_SETTING_FILE_NAME"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
ENV = os.environ["ENV"]
# 定数
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
# S3クライアント
s3_client = boto3.client('s3')
# logger設定
logger = logging.getLogger()
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def get_s3_event_parameter(event: dict) -> tuple[str, str, str, str]:
"""Lambdaに送信されたEvent情報からS3のイベントを取得する"""
s3_event = event["Records"][0]["s3"]
event_bucket_name: str = s3_event["bucket"]["name"]
event_object_key: str = s3_event["object"]["key"]
event_file_name: str = os.path.basename(event_object_key)
event_folder_name: str = os.path.dirname(event_object_key).split('/')[0]
return event_bucket_name, event_object_key, event_file_name, event_folder_name
def delete_doing_file(event: dict) -> None:
""".doingファイルをバケット上から削除する"""
# イベント情報を取得
(
event_bucket_name,
event_object_key,
_,
_
) = get_s3_event_parameter(event)
# ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
s3_client.delete_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
def lambda_handler(event, context):
"""Lambdaハンドラー関数"""
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 S3バケット間ファイル転送処理')
# ② 処理開始時に受け取ったイベント情報をログに出力する
# バケット名・フォルダ名・受信データファイル名をメモリに保持
(
event_bucket_name,
event_object_key,
event_file_name,
event_folder_name
) = get_s3_event_parameter(event)
try:
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
logger.info(f'I-02-01 受信ファイル名:{event_file_name}')
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
try:
s3_client.head_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.error(
f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
return
except Exception:
# .doingファイルが見つからなかった場合は、処理を続行する
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
logger.info(
f'I-03-01 {event_bucket_name}/{event_object_key}を転送します')
s3_client.put_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
# ④ バケット転送設定ファイルを特定する
transfer_config_file_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME, Key=f'{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}')
transfer_config_file = json.loads(
transfer_config_file_response['Body'].read().decode('utf8'))
# ⑤ バケット転送設定ファイルのキー[受信ファイル名正規表現パターン]と、[メモリに保持した受信ファイル名]と一致するものを取得する。
transfer_config = None
for key in transfer_config_file.keys():
match_result = re.fullmatch(event_file_name, key)
if match_result is not None:
transfer_config = transfer_config_file[key]
break
if transfer_config is None:
logger.error(
f'E-03-01 S3バケットの転送設定が見つかりません。{CONFIG_BUCKET_NAME}/{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}')
return
# ⑥ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、バックアップする
if transfer_config.get('backup_setting', None) is not None:
backup_setting = transfer_config['backup_setting']
copy_source = {'Bucket': event_bucket_name,
'Key': event_object_key}
backup_buccet = backup_setting['backup_bucet'].format(env=ENV)
backup_date_pattern = datetime.date.today().strftime(
backup_setting['date_pattern'])
s3_client.copy_object(
Bucket=backup_buccet,
Key=f'{event_folder_name}/{backup_date_pattern}/{event_file_name}',
CopySource=copy_source
)
logger.info(
f'I-04-01 受信ファイルのバックアップ完了::{backup_buccet}/{event_folder_name}/{backup_date_pattern}/{event_file_name}')
# ⑦ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、移動する
destination_bucket = transfer_config['destination_bucket'].format(
env=ENV)
destination_folder = transfer_config['destination_folder']
s3_client.copy_object(
Bucket=destination_bucket,
Key=f"{destination_folder}/{event_file_name}",
CopySource=copy_source
)
# コピー後、元のバケットからは削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
logger.info(
f'I-05-01 受信ファイルの転送完了:{destination_bucket}/{destination_folder}/{event_file_name}')
# ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
delete_doing_file(event)
logger.info('I-01-06 処理終了 S3バケット間ファイル転送処理')
except Exception as e:
logger.exception(
f'想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}'
)
delete_doing_file(event)
raise e
return

View File

@ -0,0 +1,16 @@
{
"hcpweb_accesslog_\\d{4}-\\d{2}-\\d{2}\\.(csv|CSV)": {
"Description": {
"データソース名": "hcp_web",
"送信元システム名": "Palantir",
"データの論理名(説明)": "HCPウェブアクセスログデータ",
"特記事項": "日〜金 AM1:00にMeDaCAへ連携"
},
"destination_bucket": "mbj-newdwh2021-{env}-data",
"destination_folder": "hcp_web/target",
"backup_setting": {
"backup_bucket": "mbj-newdwh2021-{env}-backup-hcp-web",
"date_pattern": "%Y/%m/%d"
}
}
}