import json import logging import os from datetime import datetime from zoneinfo import ZoneInfo import boto3 # 環境変数 SFN_STATE_MACHINE_ARN = os.environ["SFN_STATE_MACHINE_ARN"] TZ = os.environ["TZ"] LOG_LEVEL = os.environ["LOG_LEVEL"] # logger設定 logger = logging.getLogger() def log_datetime_convert_tz(*arg): """ログに出力するタイムスタンプのロケールを変更する(JST指定)""" return datetime.now(ZoneInfo(TZ)).timetuple() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = log_datetime_convert_tz for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) # boto3クライアント sfn_client = boto3.client('stepfunctions') def lambda_handler(event, context): logger.info('I-1 駆動処理開始') # イベント情報を取得する s3_event = event["Records"][0]["s3"] event_bucket_name = s3_event["bucket"]["name"] event_object_key = s3_event["object"]["key"] logger.info(f'I-2 バケット名:{event_bucket_name}') logger.info(f'I-3 キー名:{event_object_key}') # StepFunctionsを起動する response = sfn_client.start_execution( stateMachineArn=SFN_STATE_MACHINE_ARN, input=json.dumps( { 'InputParams': { 'BucketName': event_bucket_name, 'TargetKey': event_object_key, } } ) ) logger.info(f'I-4 StepFunctions起動レスポンス:{str(response)}') logger.info(f'I-5 駆動処理終了') # ローカルでのデバック実行用。 # 利用する際にコメントを外してください # if __name__ == '__main__': # lambda_handler({"Records": [{ # # バケット名やファイルキーはよしなに変えてください。 # "s3": { # "bucket": { # "name": "test-shimoda-bucket" # }, # "object": { # "key": "recv/SRC_ULT_MDB_ALL_20250414070000.csv" # } # } # }]}, {})