From 838f5122fe999bf9600c006e524a0bdc56aecf89 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 16 Jul 2024 16:06:04 +0900 Subject: [PATCH 1/7] =?UTF-8?q?feat:=20S3=E3=83=90=E3=82=B1=E3=83=83?= =?UTF-8?q?=E3=83=88=E9=96=93=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB=E8=BB=A2?= =?UTF-8?q?=E9=80=81=E5=87=A6=E7=90=86=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transfer-receive-file-between-s3.py | 168 ++++++++++++++++++ .../bucket_transfer_config.json | 16 ++ 2 files changed, 184 insertions(+) create mode 100644 lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py create mode 100644 s3/config/bucket_transfer_config/bucket_transfer_config.json diff --git a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py new file mode 100644 index 00000000..7cf53295 --- /dev/null +++ b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py @@ -0,0 +1,168 @@ +import datetime +import json +import logging +import os +import re +from zoneinfo import ZoneInfo + +import boto3 + +# 環境変数 +CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"] +BUCKET_TRANSFER_SETTING_FILE_FOLDER = os.environ["BUCKET_TRANSFER_SETTING_FILE_FOLDER"] +BUCKET_TRANSFER_SETTING_FILE_NAME = os.environ["BUCKET_TRANSFER_SETTING_FILE_NAME"] +LOG_LEVEL = os.environ["LOG_LEVEL"] +TZ = os.environ["TZ"] +ENV = os.environ["ENV"] + +# 定数 +EXCLUSIVE_CONTROL_FILE_EXT = '.doing' + +# S3クライアント +s3_client = boto3.client('s3') + +# logger設定 +logger = logging.getLogger() + + +def log_datetime_convert_tz(*arg): + """ログに出力するタイムスタンプのロケールを変更する(JST指定)""" + return datetime.datetime.now(ZoneInfo(TZ)).timetuple() + + +formatter = logging.Formatter( + '[%(levelname)s]\t%(asctime)s\t%(message)s\n', + '%Y-%m-%d %H:%M:%S' +) +formatter.converter = log_datetime_convert_tz +for handler in logger.handlers: + handler.setFormatter(formatter) + +level = logging.getLevelName(LOG_LEVEL) +if not isinstance(level, int): + level = logging.INFO +logger.setLevel(level) + + +def get_s3_event_parameter(event: dict) -> tuple[str, str, str, str]: + """Lambdaに送信されたEvent情報からS3のイベントを取得する""" + s3_event = event["Records"][0]["s3"] + event_bucket_name: str = s3_event["bucket"]["name"] + event_object_key: str = s3_event["object"]["key"] + event_file_name: str = os.path.basename(event_object_key) + event_folder_name: str = os.path.dirname(event_object_key).split('/')[0] + + return event_bucket_name, event_object_key, event_file_name, event_folder_name + + +def delete_doing_file(event: dict) -> None: + """.doingファイルをバケット上から削除する""" + # イベント情報を取得 + ( + event_bucket_name, + event_object_key, + _, + _ + ) = get_s3_event_parameter(event) + # ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する + s3_client.delete_object( + Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') + + +def lambda_handler(event, context): + """Lambdaハンドラー関数""" + # ① 処理開始ログを出力する + logger.info('I-01-01 処理開始 S3バケット間ファイル転送処理') + + # ② 処理開始時に受け取ったイベント情報をログに出力する + # バケット名・フォルダ名・受信データファイル名をメモリに保持 + ( + event_bucket_name, + event_object_key, + event_file_name, + event_folder_name + ) = get_s3_event_parameter(event) + + try: + logger.info(f'I-02-01 受信バケット:{event_bucket_name}') + logger.info(f'I-02-01 フォルダ名:{event_folder_name}') + logger.info(f'I-02-01 受信ファイル名:{event_file_name}') + + # ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする + try: + s3_client.head_object( + Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}') + logger.error( + f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。') + return + except Exception: + # .doingファイルが見つからなかった場合は、処理を続行する + # メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する + logger.info( + f'I-03-01 {event_bucket_name}/{event_object_key}を転送します') + s3_client.put_object( + Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'') + + # ④ バケット転送設定ファイルを特定する + transfer_config_file_response = s3_client.get_object( + Bucket=CONFIG_BUCKET_NAME, Key=f'{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}') + transfer_config_file = json.loads( + transfer_config_file_response['Body'].read().decode('utf8')) + + # ⑤ バケット転送設定ファイルのキー[受信ファイル名正規表現パターン]と、[メモリに保持した受信ファイル名]と一致するものを取得する。 + + transfer_config = None + for key in transfer_config_file.keys(): + match_result = re.fullmatch(event_file_name, key) + if match_result is not None: + transfer_config = transfer_config_file[key] + break + + if transfer_config is None: + logger.error( + f'E-03-01 S3バケットの転送設定が見つかりません。{CONFIG_BUCKET_NAME}/{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}') + return + + # ⑥ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、バックアップする + if transfer_config.get('backup_setting', None) is not None: + backup_setting = transfer_config['backup_setting'] + copy_source = {'Bucket': event_bucket_name, + 'Key': event_object_key} + backup_buccet = backup_setting['backup_bucet'].format(env=ENV) + backup_date_pattern = datetime.date.today().strftime( + backup_setting['date_pattern']) + s3_client.copy_object( + Bucket=backup_buccet, + Key=f'{event_folder_name}/{backup_date_pattern}/{event_file_name}', + CopySource=copy_source + ) + logger.info( + f'I-04-01 受信ファイルのバックアップ完了::{backup_buccet}/{event_folder_name}/{backup_date_pattern}/{event_file_name}') + + # ⑦ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、移動する + destination_bucket = transfer_config['destination_bucket'].format( + env=ENV) + destination_folder = transfer_config['destination_folder'] + s3_client.copy_object( + Bucket=destination_bucket, + Key=f"{destination_folder}/{event_file_name}", + CopySource=copy_source + ) + # コピー後、元のバケットからは削除する + s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key) + logger.info( + f'I-05-01 受信ファイルの転送完了:{destination_bucket}/{destination_folder}/{event_file_name}') + + # ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する + delete_doing_file(event) + + logger.info('I-01-06 処理終了 S3バケット間ファイル転送処理') + + except Exception as e: + logger.exception( + f'想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}' + ) + delete_doing_file(event) + raise e + + return diff --git a/s3/config/bucket_transfer_config/bucket_transfer_config.json b/s3/config/bucket_transfer_config/bucket_transfer_config.json new file mode 100644 index 00000000..5f315d88 --- /dev/null +++ b/s3/config/bucket_transfer_config/bucket_transfer_config.json @@ -0,0 +1,16 @@ +{ + "hcpweb_accesslog_\\d{4}-\\d{2}-\\d{2}\\.(csv|CSV)": { + "Description": { + "データソース名": "hcp_web", + "送信元システム名": "Palantir", + "データの論理名(説明)": "HCPウェブアクセスログデータ", + "特記事項": "日〜金 AM1:00にMeDaCAへ連携" + }, + "destination_bucket": "mbj-newdwh2021-{env}-data", + "destination_folder": "hcp_web/target", + "backup_setting": { + "backup_bucket": "mbj-newdwh2021-{env}-backup-hcp-web", + "date_pattern": "%Y/%m/%d" + } + } +} From 42fe7804c4fe9569195ad6687a3fcafc9952136b Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 16 Jul 2024 16:09:00 +0900 Subject: [PATCH 2/7] =?UTF-8?q?fix:=20Palantir=E3=81=AE=E3=83=95=E3=82=A1?= =?UTF-8?q?=E3=82=A4=E3=83=AB=E5=90=8D=E6=AD=A3=E8=A6=8F=E8=A1=A8=E7=8F=BE?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- s3/config/bucket_transfer_config/bucket_transfer_config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3/config/bucket_transfer_config/bucket_transfer_config.json b/s3/config/bucket_transfer_config/bucket_transfer_config.json index 5f315d88..96c957ab 100644 --- a/s3/config/bucket_transfer_config/bucket_transfer_config.json +++ b/s3/config/bucket_transfer_config/bucket_transfer_config.json @@ -1,5 +1,5 @@ { - "hcpweb_accesslog_\\d{4}-\\d{2}-\\d{2}\\.(csv|CSV)": { + "hcpweb_accesslog_\\d{4}-\\d{2}-\\d{2}-\\d{6}\\.(csv|CSV)": { "Description": { "データソース名": "hcp_web", "送信元システム名": "Palantir", From 3c761cd038b000444cafaa80bc2e87892352b5a9 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 17 Jul 2024 10:29:59 +0900 Subject: [PATCH 3/7] =?UTF-8?q?feat:=20=E6=AD=A3=E8=A6=8F=E8=A1=A8?= =?UTF-8?q?=E7=8F=BE=E3=81=AE=E8=A7=A3=E9=87=88=E6=96=B9=E6=B3=95=E3=82=92?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E3=80=82=E3=82=B3=E3=83=B3=E3=83=95=E3=82=A3?= =?UTF-8?q?=E3=82=B0=E3=81=AE=E3=82=AD=E3=83=BC=E3=81=AE=E8=AA=A4=E3=82=8A?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transfer-receive-file-between-s3.py | 54 +++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py index 7cf53295..17f4cc4d 100644 --- a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py +++ b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py @@ -113,7 +113,8 @@ def lambda_handler(event, context): transfer_config = None for key in transfer_config_file.keys(): - match_result = re.fullmatch(event_file_name, key) + filename_regex = re.compile(key) + match_result = filename_regex.fullmatch(event_file_name) if match_result is not None: transfer_config = transfer_config_file[key] break @@ -121,6 +122,7 @@ def lambda_handler(event, context): if transfer_config is None: logger.error( f'E-03-01 S3バケットの転送設定が見つかりません。{CONFIG_BUCKET_NAME}/{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}') + delete_doing_file(event) return # ⑥ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、バックアップする @@ -128,16 +130,16 @@ def lambda_handler(event, context): backup_setting = transfer_config['backup_setting'] copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key} - backup_buccet = backup_setting['backup_bucet'].format(env=ENV) + backup_bucket = backup_setting['backup_bucket'].format(env=ENV) backup_date_pattern = datetime.date.today().strftime( backup_setting['date_pattern']) s3_client.copy_object( - Bucket=backup_buccet, + Bucket=backup_bucket, Key=f'{event_folder_name}/{backup_date_pattern}/{event_file_name}', CopySource=copy_source ) logger.info( - f'I-04-01 受信ファイルのバックアップ完了::{backup_buccet}/{event_folder_name}/{backup_date_pattern}/{event_file_name}') + f'I-04-01 受信ファイルのバックアップ完了::{backup_bucket}/{event_folder_name}/{backup_date_pattern}/{event_file_name}') # ⑦ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、移動する destination_bucket = transfer_config['destination_bucket'].format( @@ -166,3 +168,47 @@ def lambda_handler(event, context): raise e return + +# 動作確認用のコード +# if __name__ == '__main__': +# lambda_handler( +# { +# "Records": [ +# { +# "eventVersion": "2.1", +# "eventSource": "aws:s3", +# "awsRegion": "ap-northeast-1", +# "eventTime": "2024-07-16T07:10:33.021Z", +# "eventName": "ObjectCreated:Put", +# "userIdentity": { +# "principalId": "AWS:AIDA4A3J5AIPDAT6MUJPZ" +# }, +# "requestParameters": { +# "sourceIPAddress": "118.238.231.215" +# }, +# "responseElements": { +# "x-amz-request-id": "0BST21P92A15BH55", +# "x-amz-id-2": "db9n9RpQxHEnq5o5ZLCeIGpuka54ghMHcbJ2Rj9aCcpjf111D4dyTZn5w5VvzV6W56rU89cSx/ihzkEHs8wk30ckbtRMYQ0byJn0UfK6bjg=" +# }, +# "s3": { +# "s3SchemaVersion": "1.0", +# "configurationId": "accesslog-receive-event2", +# "bucket": { +# "name": "mbj-newdwh2021-staging-hcp-web-receive", +# "ownerIdentity": { +# "principalId": "A1YQ10QIZBI5OE" +# }, +# "arn": "arn:aws:s3:::mbj-newdwh2021-staging-hcp-web-receive" +# }, +# "object": { +# "key": "palantir/hcpweb_accesslog_2024-07-16-071045.csv", +# "size": 597820, +# "eTag": "94299e880925b6f655c090521ff83d7a", +# "sequencer": "0066961CE8E7A670F2" +# } +# } +# } +# ] +# }, +# None +# ) From a058ef2d7521db76c4d4bce0e7dec22cc9e8d9dc Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 17 Jul 2024 10:38:30 +0900 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20=E3=83=AD=E3=82=B0=E3=81=8B=E3=82=89?= =?UTF-8?q?SNS=E3=81=ABPublish=E3=81=99=E3=82=8B=E6=A9=9F=E8=83=BD?= =?UTF-8?q?=E3=81=AESubject=E3=81=8C=E9=95=B7=E3=81=99=E3=81=8E=E3=82=8B?= =?UTF-8?q?=E3=81=A8=E3=82=A8=E3=83=A9=E3=83=BC=E3=81=AB=E3=81=AA=E3=82=8B?= =?UTF-8?q?=E5=95=8F=E9=A1=8C=E3=82=92=E4=BF=AE=E6=AD=A3=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lambda/publish-from-log/publish_from_log.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lambda/publish-from-log/publish_from_log.py b/lambda/publish-from-log/publish_from_log.py index b6200cc7..aa2c4529 100644 --- a/lambda/publish-from-log/publish_from_log.py +++ b/lambda/publish-from-log/publish_from_log.py @@ -8,8 +8,9 @@ import boto3 sns_client = boto3.client('sns') + def lambda_handler(event, context): - awslogs_dict = event.get('awslogs') + awslogs_dict = event.get('awslogs') base64_data = awslogs_dict.get('data') try: decoded_gzip_data = base64.b64decode(base64_data) @@ -18,12 +19,16 @@ def lambda_handler(event, context): log_event_str = gzip.GzipFile(fileobj=BytesIO(decoded_gzip_data)).read() log_event = json.loads(log_event_str) + # SNSのSubjectパラメータは100文字までという制限があるため、100文字に切り出す + subject = f'Detect Error(or Warning) in {log_event.get("logGroup")}' + subject = subject[:97] + '...' if len(subject) > 100 else subject + publish_message = { - 'Subject': f'Detect Error(or Warning) in {log_event.get("logGroup")}', + 'Subject': subject, 'Message': '\n'.join([log.get('message') for log in log_event.get('logEvents')]), 'TopicArn': os.environ.get('SNS_TOPIC_ARN') } - + print(publish_message) - + sns_client.publish(**publish_message) From cada4f7b09d487c0992f635bc42d55720c55f41e Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 17 Jul 2024 10:43:56 +0900 Subject: [PATCH 5/7] =?UTF-8?q?style:=20=E3=82=B3=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lambda/publish-from-log/publish_from_log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda/publish-from-log/publish_from_log.py b/lambda/publish-from-log/publish_from_log.py index aa2c4529..79f105b2 100644 --- a/lambda/publish-from-log/publish_from_log.py +++ b/lambda/publish-from-log/publish_from_log.py @@ -19,7 +19,7 @@ def lambda_handler(event, context): log_event_str = gzip.GzipFile(fileobj=BytesIO(decoded_gzip_data)).read() log_event = json.loads(log_event_str) - # SNSのSubjectパラメータは100文字までという制限があるため、100文字に切り出す + # SNSのSubjectパラメータは100文字までという制限があるため、100文字に切り出す(切り捨てた分は「...」に変換) subject = f'Detect Error(or Warning) in {log_event.get("logGroup")}' subject = subject[:97] + '...' if len(subject) > 100 else subject From 1b3995d4651b92802bc84c5f79a4445d2f7dc556 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 18 Jul 2024 13:54:37 +0900 Subject: [PATCH 6/7] =?UTF-8?q?fix:=20=E3=83=AD=E3=82=B0=E6=96=87=E8=A8=80?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transfer-receive-file-between-s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py index 17f4cc4d..d63cf064 100644 --- a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py +++ b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py @@ -158,7 +158,7 @@ def lambda_handler(event, context): # ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する delete_doing_file(event) - logger.info('I-01-06 処理終了 S3バケット間ファイル転送処理') + logger.info('I-06-01 処理終了 S3バケット間ファイル転送処理') except Exception as e: logger.exception( From e0ed18efaf6a640b99e027266174ea2353a68de7 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 18 Jul 2024 14:16:48 +0900 Subject: [PATCH 7/7] =?UTF-8?q?fix:=20=E3=82=A8=E3=83=A9=E3=83=BC=E3=83=AD?= =?UTF-8?q?=E3=82=B0=E3=81=AEID=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transfer-receive-file-between-s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py index d63cf064..46011138 100644 --- a/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py +++ b/lambda/transfer-receive-file-between-s3/transfer-receive-file-between-s3.py @@ -162,7 +162,7 @@ def lambda_handler(event, context): except Exception as e: logger.exception( - f'想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}' + f'E-99 想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}' ) delete_doing_file(event) raise e