diff --git a/lambda/dataimport/dataimport.py b/lambda/dataimport/dataimport.py index 00fe2f86..b9a57494 100644 --- a/lambda/dataimport/dataimport.py +++ b/lambda/dataimport/dataimport.py @@ -1,8 +1,11 @@ +import json import os from datetime import datetime + import boto3 # 環境変数 +SFN_STATE_MACHINE_ARN=os.environ["SFN_STATE_MACHINE_ARN"] CLUSTER_NAME = os.environ["CLUSTER_NAME"] TASK_NAME = os.environ["TASK_NAME"] CONTAINER_NAME = os.environ["CONTAINER_NAME"] @@ -13,7 +16,7 @@ SECURITY_GROUP_ID_ECSDATAIMPORT = os.environ["SECURITY_GROUP_ID_ECSDATAIMPORT"] MODE = os.environ["MODE"] # クラス変数 -ecs_client = boto3.client('ecs') +sfn_client = boto3.client('stepfunctions') def lambda_handler(event, context): @@ -29,38 +32,45 @@ def lambda_handler(event, context): print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-3 ファイル名:{event_file_name}') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-4 データソース名:{event_data_source_name}') - # ECSを起動する - response = ecs_client.run_task( - launchType='FARGATE', - cluster=CLUSTER_NAME, - taskDefinition=TASK_NAME, - networkConfiguration={ - "awsvpcConfiguration": { - "subnets": [ - SUBNET_ID_AP_NORTHEAST_1A, - SUBNET_ID_AP_NORTHEAST_1D, - ], - "securityGroups": [ - SECURITY_GROUP_ID_ECSALL, - SECURITY_GROUP_ID_ECSDATAIMPORT, - ], + # StepFunctionsを起動する + response = sfn_client.start_execution( + stateMachineArn=SFN_STATE_MACHINE_ARN, + input=json.dumps( + { + 'InputParams': { + 'ClusterName': CLUSTER_NAME, + 'TaskName': TASK_NAME, + 'SubNetIdApNorthEast1A': SUBNET_ID_AP_NORTHEAST_1A, + 'SubNetIdApNorthEast1D': SUBNET_ID_AP_NORTHEAST_1D, + 'SecurityGroupIdECSAll': SECURITY_GROUP_ID_ECSALL, + 'SecurityGroupIdECSDataImport': SECURITY_GROUP_ID_ECSDATAIMPORT, + 'ContainerName': CONTAINER_NAME, + 'BucketName': event_bucket_name, + 'TargetKey': event_object_key, + 'DataSourceName': event_data_source_name, + 'FileName': event_file_name, + 'Mode': MODE + } } - }, - overrides={ - "containerOverrides": [ - { - "name": CONTAINER_NAME, - "environment": [ - {"name": 'BUCKET_NAME', "value": event_bucket_name}, - {"name": 'TARGET_KEY', "value": event_object_key}, - {"name": 'DATA_SOURCE_NAME', "value": event_data_source_name}, - {"name": 'FILE_NAME', "value": event_file_name}, - {"name": 'MODE', "value": MODE}, - ], - }, - ], - }, + ) ) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-5 ECS起動レスポンス:{str(response)}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-5 StepFunctions起動レスポンス:{str(response)}') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-6 駆動処理終了') + + + +# ローカルでのデバック実行用。 +# 利用する際にコメントを外してください +# if __name__ == '__main__': +# lambda_handler({"Records": [{ +# # バケット名やファイルキーはよしなに変えてください。 +# "s3": { +# "bucket": { +# "name": "test-shimoda-bucket" +# }, +# "object": { +# "key": "crm/target/CRM_Account_20221116111800.csv" +# } +# } +# }]}, {}) diff --git a/stepfunctions/TOOLS/convert_config.yaml b/stepfunctions/TOOLS/convert_config.yaml index 2b6487ea..f393460f 100644 --- a/stepfunctions/TOOLS/convert_config.yaml +++ b/stepfunctions/TOOLS/convert_config.yaml @@ -62,3 +62,18 @@ config: SG_ECS_ALL: *PRD_SG_ECS_ALL # セキュリティグループ(ecs-crm-datafetch) SG_CRM_DATAFETCH: *PRD_SG_CRM_DATAFETCH + +config: + r-data-import-state: + # ステージング環境 + staging: + # AWSアカウントID + AWS_ACCOUNT_ID: *AWS_ACCOUNT_ID + # 東京リージョン + REGION_AP_NORTHEAST_1: *REGION_AP_NORTHEAST_1 + # 本番環境 + product: + # AWSアカウントID + AWS_ACCOUNT_ID: *AWS_ACCOUNT_ID + # 東京リージョン + REGION_AP_NORTHEAST_1: *REGION_AP_NORTHEAST_1 diff --git a/stepfunctions/r-data-import-state/r-data-import-state.json b/stepfunctions/r-data-import-state/r-data-import-state.json new file mode 100644 index 00000000..67d11016 --- /dev/null +++ b/stepfunctions/r-data-import-state/r-data-import-state.json @@ -0,0 +1,155 @@ +{ + "Comment": "MeDaCa データ取込ECSタスク起動 ステートマシン", + "StartAt": "params", + "States": { + "params": { + "Type": "Pass", + "Parameters": { + "sns": { + "TopicArn": "arn:aws:sns:#{REGION_AP_NORTHEAST_1}:#{AWS_ACCOUNT_ID}:nds-notice-#{ENV_NAME}" + }, + "ecs": { + "LaunchType": "FARGATE", + "Cluster.$": "$.InputParams.ClusterName", + "TaskDefinition.$": "$.InputParams.TaskName", + "NetworkConfiguration": { + "AwsvpcConfiguration": { + "Subnets.$": "States.Array($.InputParams.SubNetIdApNorthEast1A, $.InputParams.SubNetIdApNorthEast1D)", + "SecurityGroups.$": "States.Array($.InputParams.SecurityGroupIdECSAll, $.InputParams.SecurityGroupIdECSDataImport)", + "AssignPublicIp": "DISABLED" + } + }, + "Overrides": { + "ContainerOverrides": [ + { + "Name.$": "$.InputParams.ContainerName", + "Environment": [ + { + "Name": "BUCKET_NAME", + "Value.$": "$.InputParams.BucketName" + }, + { + "Name": "TARGET_KEY", + "Value.$": "$.InputParams.TargetKey" + }, + { + "Name": "DATA_SOURCE_NAME", + "Value.$": "$.InputParams.DataSourceName" + }, + { + "Name": "FILE_NAME", + "Value.$": "$.InputParams.FileName" + }, + { + "Name": "MODE", + "Value.$": "$.InputParams.Mode" + } + ] + } + ] + } + } + }, + "Comment": "パラメータ設定", + "ResultPath": "$.params", + "Next": "CheckExclusive" + }, + "CheckExclusive": { + "Type": "Task", + "Parameters": { + "Bucket.$": "$.InputParams.BucketName", + "Key.$": "States.Format('{}/target/.detected_fatal_error_ecs', $.InputParams.DataSourceName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:headObject", + "ResultPath": "$.result", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "data-import", + "ResultPath": "$.result" + } + ], + "Comment": "ECSタスク起動エラーファイル存在チェック", + "Next": "PendingDataImportNotice" + }, + "PendingDataImportNotice": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:sns:publish", + "Parameters": { + "TopicArn.$": "$.params.sns.TopicArn", + "Subject": "データ登録ECSタスク異常通知", + "Message.$": "States.Format('「{}/{}/target/」に登録処理中断ファイル(.detected_fatal_error_ecs)が見つかったため、データ登録の起動を中断しました。\n', $.InputParams.BucketName, $.InputParams.DataSourceName)" + }, + "End": true, + "Comment": "データ登録処理起動中断通知" + }, + "data-import": { + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.sync", + "Parameters": { + "LaunchType.$": "$.params.ecs.LaunchType", + "Cluster.$": "$.params.ecs.Cluster", + "TaskDefinition.$": "$.params.ecs.TaskDefinition", + "NetworkConfiguration.$": "$.params.ecs.NetworkConfiguration", + "Overrides.$": "$.params.ecs.Overrides" + }, + "ResultPath": "$.result", + "Retry": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "BackoffRate": 2, + "IntervalSeconds": 3, + "MaxAttempts": 3 + } + ], + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "CreateImportPendingFile", + "ResultPath": "$.result" + } + ], + "Next": "Success", + "Comment": "データ登録処理" + }, + "CreateImportPendingFile": { + "Type": "Task", + "Parameters": { + "Body": "", + "Bucket.$": "$.InputParams.BucketName", + "Key.$": "States.Format('{}/target/.detected_fatal_error_ecs', $.InputParams.DataSourceName)" + }, + "Resource": "arn:aws:states:::aws-sdk:s3:putObject", + "Next": "FailedNotice", + "Comment": "データ登録処理起動中断ファイル作成", + "ResultPath": "$.result" + }, + "FailedNotice": { + "Type": "Task", + "Resource": "arn:aws:states:::aws-sdk:sns:publish", + "Parameters": { + "TopicArn.$": "$.params.sns.TopicArn", + "Subject": "データ登録ECSタスク異常通知", + "Message.$": "States.Format('【{}】{}のデータ登録が失敗したため、登録処理中断ファイル(.detected_fatal_error_ecs)を作成しました。\n\n バケット名:{}\n取込ファイルパス:{}\n\n', $.InputParams.DataSourceName, $.InputParams.FileName, $.InputParams.BucketName, $.InputParams.TargetKey)" + }, + "Next": "Fail", + "Comment": "データ登録処理起動失敗通知" + }, + "Success": { + "Type": "Succeed", + "Comment": "正常終了" + }, + "Fail": { + "Type": "Fail", + "Comment": "異常終了", + "Error": "StatusError", + "Cause": "StepFunctions ErrorEnd" + } + } +}