Merge pull request #414 feature-NEWDWH2021-1620 into develop-v4.6.0
This commit is contained in:
commit
5a2fa9864a
@ -8,6 +8,7 @@ import boto3
|
|||||||
|
|
||||||
sns_client = boto3.client('sns')
|
sns_client = boto3.client('sns')
|
||||||
|
|
||||||
|
|
||||||
def lambda_handler(event, context):
|
def lambda_handler(event, context):
|
||||||
awslogs_dict = event.get('awslogs')
|
awslogs_dict = event.get('awslogs')
|
||||||
base64_data = awslogs_dict.get('data')
|
base64_data = awslogs_dict.get('data')
|
||||||
@ -18,8 +19,12 @@ def lambda_handler(event, context):
|
|||||||
log_event_str = gzip.GzipFile(fileobj=BytesIO(decoded_gzip_data)).read()
|
log_event_str = gzip.GzipFile(fileobj=BytesIO(decoded_gzip_data)).read()
|
||||||
log_event = json.loads(log_event_str)
|
log_event = json.loads(log_event_str)
|
||||||
|
|
||||||
|
# SNSのSubjectパラメータは100文字までという制限があるため、100文字に切り出す(切り捨てた分は「...」に変換)
|
||||||
|
subject = f'Detect Error(or Warning) in {log_event.get("logGroup")}'
|
||||||
|
subject = subject[:97] + '...' if len(subject) > 100 else subject
|
||||||
|
|
||||||
publish_message = {
|
publish_message = {
|
||||||
'Subject': f'Detect Error(or Warning) in {log_event.get("logGroup")}',
|
'Subject': subject,
|
||||||
'Message': '\n'.join([log.get('message') for log in log_event.get('logEvents')]),
|
'Message': '\n'.join([log.get('message') for log in log_event.get('logEvents')]),
|
||||||
'TopicArn': os.environ.get('SNS_TOPIC_ARN')
|
'TopicArn': os.environ.get('SNS_TOPIC_ARN')
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,214 @@
|
|||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
|
||||||
|
# 環境変数
|
||||||
|
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
|
||||||
|
BUCKET_TRANSFER_SETTING_FILE_FOLDER = os.environ["BUCKET_TRANSFER_SETTING_FILE_FOLDER"]
|
||||||
|
BUCKET_TRANSFER_SETTING_FILE_NAME = os.environ["BUCKET_TRANSFER_SETTING_FILE_NAME"]
|
||||||
|
LOG_LEVEL = os.environ["LOG_LEVEL"]
|
||||||
|
TZ = os.environ["TZ"]
|
||||||
|
ENV = os.environ["ENV"]
|
||||||
|
|
||||||
|
# 定数
|
||||||
|
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
|
||||||
|
|
||||||
|
# S3クライアント
|
||||||
|
s3_client = boto3.client('s3')
|
||||||
|
|
||||||
|
# logger設定
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
def log_datetime_convert_tz(*arg):
|
||||||
|
"""ログに出力するタイムスタンプのロケールを変更する(JST指定)"""
|
||||||
|
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
|
||||||
|
|
||||||
|
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
|
||||||
|
'%Y-%m-%d %H:%M:%S'
|
||||||
|
)
|
||||||
|
formatter.converter = log_datetime_convert_tz
|
||||||
|
for handler in logger.handlers:
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
|
||||||
|
level = logging.getLevelName(LOG_LEVEL)
|
||||||
|
if not isinstance(level, int):
|
||||||
|
level = logging.INFO
|
||||||
|
logger.setLevel(level)
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_event_parameter(event: dict) -> tuple[str, str, str, str]:
|
||||||
|
"""Lambdaに送信されたEvent情報からS3のイベントを取得する"""
|
||||||
|
s3_event = event["Records"][0]["s3"]
|
||||||
|
event_bucket_name: str = s3_event["bucket"]["name"]
|
||||||
|
event_object_key: str = s3_event["object"]["key"]
|
||||||
|
event_file_name: str = os.path.basename(event_object_key)
|
||||||
|
event_folder_name: str = os.path.dirname(event_object_key).split('/')[0]
|
||||||
|
|
||||||
|
return event_bucket_name, event_object_key, event_file_name, event_folder_name
|
||||||
|
|
||||||
|
|
||||||
|
def delete_doing_file(event: dict) -> None:
|
||||||
|
""".doingファイルをバケット上から削除する"""
|
||||||
|
# イベント情報を取得
|
||||||
|
(
|
||||||
|
event_bucket_name,
|
||||||
|
event_object_key,
|
||||||
|
_,
|
||||||
|
_
|
||||||
|
) = get_s3_event_parameter(event)
|
||||||
|
# ⑨ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
|
||||||
|
s3_client.delete_object(
|
||||||
|
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
|
||||||
|
|
||||||
|
|
||||||
|
def lambda_handler(event, context):
|
||||||
|
"""Lambdaハンドラー関数"""
|
||||||
|
# ① 処理開始ログを出力する
|
||||||
|
logger.info('I-01-01 処理開始 S3バケット間ファイル転送処理')
|
||||||
|
|
||||||
|
# ② 処理開始時に受け取ったイベント情報をログに出力する
|
||||||
|
# バケット名・フォルダ名・受信データファイル名をメモリに保持
|
||||||
|
(
|
||||||
|
event_bucket_name,
|
||||||
|
event_object_key,
|
||||||
|
event_file_name,
|
||||||
|
event_folder_name
|
||||||
|
) = get_s3_event_parameter(event)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
|
||||||
|
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
|
||||||
|
logger.info(f'I-02-01 受信ファイル名:{event_file_name}')
|
||||||
|
|
||||||
|
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
|
||||||
|
try:
|
||||||
|
s3_client.head_object(
|
||||||
|
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
|
||||||
|
logger.error(
|
||||||
|
f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
# .doingファイルが見つからなかった場合は、処理を続行する
|
||||||
|
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
|
||||||
|
logger.info(
|
||||||
|
f'I-03-01 {event_bucket_name}/{event_object_key}を転送します')
|
||||||
|
s3_client.put_object(
|
||||||
|
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
|
||||||
|
|
||||||
|
# ④ バケット転送設定ファイルを特定する
|
||||||
|
transfer_config_file_response = s3_client.get_object(
|
||||||
|
Bucket=CONFIG_BUCKET_NAME, Key=f'{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}')
|
||||||
|
transfer_config_file = json.loads(
|
||||||
|
transfer_config_file_response['Body'].read().decode('utf8'))
|
||||||
|
|
||||||
|
# ⑤ バケット転送設定ファイルのキー[受信ファイル名正規表現パターン]と、[メモリに保持した受信ファイル名]と一致するものを取得する。
|
||||||
|
|
||||||
|
transfer_config = None
|
||||||
|
for key in transfer_config_file.keys():
|
||||||
|
filename_regex = re.compile(key)
|
||||||
|
match_result = filename_regex.fullmatch(event_file_name)
|
||||||
|
if match_result is not None:
|
||||||
|
transfer_config = transfer_config_file[key]
|
||||||
|
break
|
||||||
|
|
||||||
|
if transfer_config is None:
|
||||||
|
logger.error(
|
||||||
|
f'E-03-01 S3バケットの転送設定が見つかりません。{CONFIG_BUCKET_NAME}/{BUCKET_TRANSFER_SETTING_FILE_FOLDER}/{BUCKET_TRANSFER_SETTING_FILE_NAME}')
|
||||||
|
delete_doing_file(event)
|
||||||
|
return
|
||||||
|
|
||||||
|
# ⑥ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、バックアップする
|
||||||
|
if transfer_config.get('backup_setting', None) is not None:
|
||||||
|
backup_setting = transfer_config['backup_setting']
|
||||||
|
copy_source = {'Bucket': event_bucket_name,
|
||||||
|
'Key': event_object_key}
|
||||||
|
backup_bucket = backup_setting['backup_bucket'].format(env=ENV)
|
||||||
|
backup_date_pattern = datetime.date.today().strftime(
|
||||||
|
backup_setting['date_pattern'])
|
||||||
|
s3_client.copy_object(
|
||||||
|
Bucket=backup_bucket,
|
||||||
|
Key=f'{event_folder_name}/{backup_date_pattern}/{event_file_name}',
|
||||||
|
CopySource=copy_source
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f'I-04-01 受信ファイルのバックアップ完了::{backup_bucket}/{event_folder_name}/{backup_date_pattern}/{event_file_name}')
|
||||||
|
|
||||||
|
# ⑦ 受信ファイルを、⑤でメモリ上に保持したJSONオブジェクトの設定内容に基づき、移動する
|
||||||
|
destination_bucket = transfer_config['destination_bucket'].format(
|
||||||
|
env=ENV)
|
||||||
|
destination_folder = transfer_config['destination_folder']
|
||||||
|
s3_client.copy_object(
|
||||||
|
Bucket=destination_bucket,
|
||||||
|
Key=f"{destination_folder}/{event_file_name}",
|
||||||
|
CopySource=copy_source
|
||||||
|
)
|
||||||
|
# コピー後、元のバケットからは削除する
|
||||||
|
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
|
||||||
|
logger.info(
|
||||||
|
f'I-05-01 受信ファイルの転送完了:{destination_bucket}/{destination_folder}/{event_file_name}')
|
||||||
|
|
||||||
|
# ⑧ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
|
||||||
|
delete_doing_file(event)
|
||||||
|
|
||||||
|
logger.info('I-06-01 処理終了 S3バケット間ファイル転送処理')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(
|
||||||
|
f'E-99 想定外のエラーが発生しました。処理を終了します。ファイル名フルパス: {event_bucket_name}/{event_object_key} 例外内容:{e}'
|
||||||
|
)
|
||||||
|
delete_doing_file(event)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# 動作確認用のコード
|
||||||
|
# if __name__ == '__main__':
|
||||||
|
# lambda_handler(
|
||||||
|
# {
|
||||||
|
# "Records": [
|
||||||
|
# {
|
||||||
|
# "eventVersion": "2.1",
|
||||||
|
# "eventSource": "aws:s3",
|
||||||
|
# "awsRegion": "ap-northeast-1",
|
||||||
|
# "eventTime": "2024-07-16T07:10:33.021Z",
|
||||||
|
# "eventName": "ObjectCreated:Put",
|
||||||
|
# "userIdentity": {
|
||||||
|
# "principalId": "AWS:AIDA4A3J5AIPDAT6MUJPZ"
|
||||||
|
# },
|
||||||
|
# "requestParameters": {
|
||||||
|
# "sourceIPAddress": "118.238.231.215"
|
||||||
|
# },
|
||||||
|
# "responseElements": {
|
||||||
|
# "x-amz-request-id": "0BST21P92A15BH55",
|
||||||
|
# "x-amz-id-2": "db9n9RpQxHEnq5o5ZLCeIGpuka54ghMHcbJ2Rj9aCcpjf111D4dyTZn5w5VvzV6W56rU89cSx/ihzkEHs8wk30ckbtRMYQ0byJn0UfK6bjg="
|
||||||
|
# },
|
||||||
|
# "s3": {
|
||||||
|
# "s3SchemaVersion": "1.0",
|
||||||
|
# "configurationId": "accesslog-receive-event2",
|
||||||
|
# "bucket": {
|
||||||
|
# "name": "mbj-newdwh2021-staging-hcp-web-receive",
|
||||||
|
# "ownerIdentity": {
|
||||||
|
# "principalId": "A1YQ10QIZBI5OE"
|
||||||
|
# },
|
||||||
|
# "arn": "arn:aws:s3:::mbj-newdwh2021-staging-hcp-web-receive"
|
||||||
|
# },
|
||||||
|
# "object": {
|
||||||
|
# "key": "palantir/hcpweb_accesslog_2024-07-16-071045.csv",
|
||||||
|
# "size": 597820,
|
||||||
|
# "eTag": "94299e880925b6f655c090521ff83d7a",
|
||||||
|
# "sequencer": "0066961CE8E7A670F2"
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# ]
|
||||||
|
# },
|
||||||
|
# None
|
||||||
|
# )
|
||||||
16
s3/config/bucket_transfer_config/bucket_transfer_config.json
Normal file
16
s3/config/bucket_transfer_config/bucket_transfer_config.json
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
"hcpweb_accesslog_\\d{4}-\\d{2}-\\d{2}-\\d{6}\\.(csv|CSV)": {
|
||||||
|
"Description": {
|
||||||
|
"データソース名": "hcp_web",
|
||||||
|
"送信元システム名": "Palantir",
|
||||||
|
"データの論理名(説明)": "HCPウェブアクセスログデータ",
|
||||||
|
"特記事項": "日〜金 AM1:00にMeDaCAへ連携"
|
||||||
|
},
|
||||||
|
"destination_bucket": "mbj-newdwh2021-{env}-data",
|
||||||
|
"destination_folder": "hcp_web/target",
|
||||||
|
"backup_setting": {
|
||||||
|
"backup_bucket": "mbj-newdwh2021-{env}-backup-hcp-web",
|
||||||
|
"date_pattern": "%Y/%m/%d"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user