import datetime import json import logging import os import re from zoneinfo import ZoneInfo import boto3 # 環境変数 CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"] BUCKET_TRANSFER_SETTING_FILE_FOLDER = os.environ["BUCKET_TRANSFER_SETTING_FILE_FOLDER"] BUCKET_TRANSFER_SETTING_FILE_NAME = os.environ["BUCKET_TRANSFER_SETTING_FILE_NAME"] LOG_LEVEL = os.environ["LOG_LEVEL"] TZ = os.environ["TZ"] ENV = os.environ["ENV"] # 定数 EXCLUSIVE_CONTROL_FILE_EXT = '.doing' # S3クライアント s3_client = boto3.client('s3') # logger設定 logger = logging.getLogger() def log_datetime_convert_tz(*arg): """ログに出力するタイムスタンプのロケールを変更する(JST指定)""" return datetime.datetime.now(ZoneInfo(TZ)).timetuple() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = log_datetime_convert_tz for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) def get_s3_event_parameter(event: dict) -> tuple[str, str, str, str]: """Lambdaに送信されたEvent情報からS3のイベントを取得する""" s3_event = event["Records"][0]["s3"] event_bucket_name: str = s3_event["bucket"]["name"] event_object_key: str = s3_event["object"]["key"] event_file_name: str = os.path.basename(event_object_key) event_folder_name: str = os.path.dirname(event_object_key).split('/')[0] return event_bucket_name, event_object_key, event_file_name, event_folder_name def delete_doing_file(event: dict) -> None: """.doingファイルをバケット上から削除する""" # イベント情報を取得 ( event_bucket_name, event_object_key, _, _ ) = get_s3_event_parameter(event) # ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する s3_client.delete_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') def lambda_handler(event, context): """Lambdaハンドラー関数""" # ① 処理開始ログを出力する logger.info('I-01-01 処理開始 S3バケット間ファイル転送処理') # ② 処理開始時に受け取ったイベント情報をログに出力する # バケット名・フォルダ名・受信データファイル名をメモリに保持 ( event_bucket_name, event_object_key, event_file_name, event_folder_name ) = get_s3_event_parameter(event) try: logger.info(f'I-02-01 受信バケット:{event_bucket_name}') logger.info(f'I-02-01 フォルダ名:{event_folder_name}') logger.info(f'I-02-01 受信ファイル名:{event_file_name}') # ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする try: s3_client.head_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') logger.error( f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。') return except Exception: # .doingファイルが見つからなかった場合は、処理を続行する # メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する logger.info( f'I-03-01 {event_bucket_name}/{event_object_key}を転送します') s3_client.put_object( Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'') # ④ バケット転送設定ファイルを特定する transfer_config_file_response = s3_client.get_object( Bucket=CONFIG_BUCKET_NAME, Key=f'{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}') transfer_config_file = json.loads( transfer_config_file_response['Body'].read().decode('utf8')) # ⑤ バケット転送設定ファイルのキー[受信ファイル名正規表現パターン]と、[メモリに保持した受信ファイル名]と一致するものを取得する。 transfer_config = None for key in transfer_config_file.keys(): filename_regex = re.compile(key) match_result = filename_regex.fullmatch(event_file_name) if match_result is not None: transfer_config = transfer_config_file[key] break if transfer_config is None: logger.error( f'E-03-01 S3バケットの転送設定が見つかりません。{CONFIG_BUCKET_NAME}/{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}') delete_doing_file(event) return # ⑥ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、バックアップする if transfer_config.get('backup_setting', None) is not None: backup_setting = transfer_config['backup_setting'] copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key} backup_bucket = backup_setting['backup_bucket'].format(env=ENV) backup_date_pattern = datetime.date.today().strftime( backup_setting['date_pattern']) s3_client.copy_object( Bucket=backup_bucket, Key=f'{event_folder_name}/{backup_date_pattern}/{event_file_name}', CopySource=copy_source ) logger.info( f'I-04-01 受信ファイルのバックアップ完了::{backup_bucket}/{event_folder_name}/{backup_date_pattern}/{event_file_name}') # ⑦ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、移動する destination_bucket = transfer_config['destination_bucket'].format( env=ENV) destination_folder = transfer_config['destination_folder'] s3_client.copy_object( Bucket=destination_bucket, Key=f"{destination_folder}/{event_file_name}", CopySource=copy_source ) # コピー後、元のバケットからは削除する s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key) logger.info( f'I-05-01 受信ファイルの転送完了:{destination_bucket}/{destination_folder}/{event_file_name}') # ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する delete_doing_file(event) logger.info('I-06-01 処理終了 S3バケット間ファイル転送処理') except Exception as e: logger.exception( f'E-99 想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}' ) delete_doing_file(event) raise e return # 動作確認用のコード # if __name__ == '__main__': # lambda_handler( # { # "Records": [ # { # "eventVersion": "2.1", # "eventSource": "aws:s3", # "awsRegion": "ap-northeast-1", # "eventTime": "2024-07-16T07:10:33.021Z", # "eventName": "ObjectCreated:Put", # "userIdentity": { # "principalId": "AWS:AIDA4A3J5AIPDAT6MUJPZ" # }, # "requestParameters": { # "sourceIPAddress": "118.238.231.215" # }, # "responseElements": { # "x-amz-request-id": "0BST21P92A15BH55", # "x-amz-id-2": "db9n9RpQxHEnq5o5ZLCeIGpuka54ghMHcbJ2Rj9aCcpjf111D4dyTZn5w5VvzV6W56rU89cSx/ihzkEHs8wk30ckbtRMYQ0byJn0UfK6bjg=" # }, # "s3": { # "s3SchemaVersion": "1.0", # "configurationId": "accesslog-receive-event2", # "bucket": { # "name": "mbj-newdwh2021-staging-hcp-web-receive", # "ownerIdentity": { # "principalId": "A1YQ10QIZBI5OE" # }, # "arn": "arn:aws:s3:::mbj-newdwh2021-staging-hcp-web-receive" # }, # "object": { # "key": "palantir/hcpweb_accesslog_2024-07-16-071045.csv", # "size": 597820, # "eTag": "94299e880925b6f655c090521ff83d7a", # "sequencer": "0066961CE8E7A670F2" # } # } # } # ] # }, # None # )