newdwh2021/lambda/daily-data-unreceive-check/daily-data-unreceive-check.py
2024-07-19 16:30:51 +09:00

330 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import datetime
import io
import json
import logging
import os
import re
from zoneinfo import ZoneInfo
import boto3
from dateutil.relativedelta import relativedelta
# 環境変数
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
MBJ_NOTICE_TOPIC = os.environ["MBJ_NOTICE_TOPIC"]
PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME = os.environ["PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME"]
PROCESSED_MESSAGE_EXPIRES_PERIOD = int(os.environ["PROCESSED_MESSAGE_EXPIRES_PERIOD"])
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
ROW_COMMENT_SYMBOL = '#'
INDEX_REGEX = 0
INDEX_DATA_NAME = 1
INDEX_ROW_COMMENT_SYMBOL = 0
INDEX_SPLIT_NUM = 1
INDEX_LAST = -1
# メール本文に出力する不足ファイル名一覧のインデント
MAIL_INDENT = '  '
# AWS操作クライアント
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
dynamodb_client = boto3.client('dynamodb')
# logger設定
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def is_duplicate_message(message_id: str) -> bool:
"""DynamoDBテーブルに処理済みのSQSメッセージIdが存在するかどうかを調査する
Args:
message_id (str): SQSメッセージId
Returns:
bool: 存在する場合はTrue
"""
return dynamodb_client.query(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Select='COUNT',
KeyConditionExpression='message_id = :message_id',
ExpressionAttributeValues={
':message_id': {'S': message_id}
}
)["Count"] != 0
def put_success_messages_to_dynamo_db(batch_success_items: list[str]) -> bool:
"""処理済みのSQSメッセージIdをDynamoDBにPushする
Args:
batch_success_items (list[str]): SQSメッセージIdのリスト
Returns:
bool: 登録成功の場合、True
"""
# レコードの有効期限を算出
now = datetime.datetime.now(ZoneInfo(TZ))
record_expiration_datetime = now + \
datetime.timedelta(minutes=PROCESSED_MESSAGE_EXPIRES_PERIOD)
record_expiration_time = record_expiration_datetime.timestamp()
for message_id in batch_success_items:
dynamodb_client.put_item(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Item={
'message_id': {'S': message_id},
'record_expiration_time': {'N': f'{record_expiration_time}'}
}
)
return True
def substitute_mail_template(mail_template: str, mail_msg: str) -> str:
"""メールテンプレートのプレースホルダーを置き換える
Args:
mail_template (str): 置き換え前のメールテンプレート
mail_msg (str): メールテンプレートのプレースホルダーを置き換える文言(ファイル一覧)
Returns:
str: 置き換え後のメール本文
"""
substitute_dict = {
"notice_file_names": mail_msg
}
mail_str = mail_template.format_map(substitute_dict)
return mail_str
def make_failure_item_on_error(message_id: str) -> dict[str, str]:
"""Report batch item failuresによる処理に失敗したメッセージの判別のためのレスポンスを作成する
@see <https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting>
Args:
message_id (str): SQSメッセージId
Returns:
dict[str, str]: Report batch item failuresで失敗したSQSメッセージを判別するための辞書オブジェクト
"""
return {"itemIdentifier": message_id}
def daily_data_unreceive_check(records: list, execute_date: str) -> tuple[list[dict[str, str]], list[str]]:
"""日次データ未受領チェック
Args:
records (list): SQS Eventのレコードリスト
execute_date (str): 処理稼働年月日
Returns:
tuple[list[dict[str, str]], list[str]]: 失敗メッセージIdのリスト, 成功メッセージIdのリスト
"""
batch_failed_items = []
batch_success_items = []
for record in records:
# メール挿入用文言を格納するためのメモリを保持する
mail_message = ''
try:
# SQSパラメータをJSONシリアライズし、Pythonの辞書オブジェクト(イベントパラメータ)を取得する。
event_parameter = json.loads(record['body'])
receive_date = execute_date.strftime('%Y/%m/%d')
try:
# 1.SQSメッセージIDを取得する
message_id = record["messageId"]
# 2.DynamoDBテーブルからレコードを取得し、処理済みメッセージかどうかを判別する
if is_duplicate_message(message_id):
logger.info(f'I-02-02 受信したメッセージは既に処理済みのため、処理をスキップします。メッセージID: {message_id} バケットディレクトリ: {event_parameter["check_bucket_name"]}/{event_parameter["check_folder_prefix"]}/{receive_date}/')
continue
except Exception as e:
logger.exception(f"E-02-01 メッセージ重複チェック処理に失敗しました エラー内容:{e}")
batch_failed_items.append(make_failure_item_on_error(message_id))
continue
# ③ 設定ファイル[受領チェック対象ファイルリスト]を読み込む
try:
logger.info('I-03-01 ' +'受領チェック対象ファイルリスト読込 読込元:' + f'{CONFIG_BUCKET_NAME}/{event_parameter["check_target_file_list"]}')
check_target_file_list_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{event_parameter["check_target_file_list"]}'
)
logger.info('I-03-02 受領チェック対象ファイルリストを読み込みました')
except Exception as e:
logger.exception(f"E-03-01 受領チェック対象ファイルリストの読み込みに失敗しました エラー内容:{e} 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["check_target_file_list"]}")
batch_failed_items.append(make_failure_item_on_error(message_id))
continue
# ④ 受領チェック処理を行う
logger.info(f'I-04-01 日次データ受領チェック ({event_parameter['data_source_name']}) 処理開始')
object_prefix = f'{event_parameter["check_folder_prefix"]}/{receive_date}/'
# 1.日次データバックアップ保管バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する
logger.info(f'I-04-02 オブジェクトリストの取得 取得先:{event_parameter['check_bucket_name']}/{object_prefix}')
receive_file_list_response = s3_client.list_objects_v2(Bucket=event_parameter['check_bucket_name'], Prefix=object_prefix)
receive_file_list = []
for content in receive_file_list_response.get('Contents', []):
# オブジェクトのキーからファイル名を切り出してリストに追加
obj_key = content['Key'].rsplit('/', INDEX_SPLIT_NUM)
receive_file_list.append(obj_key[INDEX_LAST])
# 2.I/Fファイルチェック処理
logger.info(f'I-04-03 日次受信データ({event_parameter['data_source_name']}) 未受領チェック処理開始')
check_target_file_name_body = io.TextIOWrapper(io.BytesIO(
check_target_file_list_response["Body"].read()), encoding='utf-8')
match_count = 0
row_count = 0
for tsv_row in csv.reader(check_target_file_name_body, delimiter='\t'):
# 「④1.」で取得したリストが「③」で読み込んだファイル内に存在するか確認する
is_file_not_exists = True
for file_name in receive_file_list:
match_result = re.fullmatch(tsv_row[INDEX_REGEX], file_name)
# 「③」で読み込んだファイルに記載されている全てが「④1.」で取得したリストに存在した場合
if match_result is not None:
is_file_not_exists = False
logger.info(f'I-04-04 I/Fファイルの受領を確認しました ファイル名{file_name}')
match_count += 1
if is_file_not_exists:
logger.info(f'I-04-06 I/Fファイルに不足があります ファイル名{tsv_row[INDEX_DATA_NAME]}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}\n'
row_count += 1
if row_count == match_count:
logger.info('I-04-05 I/Fファイルは全て受領していることを確認しました')
# ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する
logger.info('I-05-01 メール送信処理開始')
if len(mail_message) == 0:
logger.info(
f'I-05-09 {execute_date} {event_parameter["data_source_name"]}データI/Fファイルに不足が無いため、メール送信処理をスキップします')
batch_success_items.append(message_id)
continue
# 1.存在した場合
logger.info(f'I-05-02 {execute_date} {event_parameter["data_source_name"]} データI/Fファイルに不足があるため、メール送信処理を開始します')
try:
logger.info('I-05-03 ' +f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_title_template"]}')
mail_title_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{event_parameter["notice_mail_title_template"]}'
)
mail_title_template = (mail_title_response['Body'].read().decode('utf-8'))
# 改行を取り除く
mail_title_without_line_break = mail_title_template.splitlines()[0]
logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(
f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e} 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_title_template"]}')
batch_failed_items.append(
make_failure_item_on_error(message_id))
continue
try:
logger.info('I-05-05 ' +f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_body_template"]}')
mail_body_template_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{event_parameter["notice_mail_body_template"]}'
)
mail_body_template = (mail_body_template_response['Body'].read().decode('utf-8'))
# メール本文内のプレースホルダーを置き換える
mail_body = substitute_mail_template(mail_body_template, mail_message)
logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e} 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_body_template"]}')
batch_failed_items.append(make_failure_item_on_error(message_id))
continue
logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}')
params = {
'TopicArn': MBJ_NOTICE_TOPIC,
'Subject': mail_title_without_line_break,
'Message': mail_body
}
sns_client.publish(**params)
logger.info('I-05-08 メール送信指示をしました')
batch_success_items.append(message_id)
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
batch_failed_items.append(make_failure_item_on_error(message_id))
continue
return batch_failed_items, batch_success_items
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 日次データ受領チェック処理')
# 処理稼働年月を取得しメモリに保持する
execute_date = datetime.date.today()
# 処理成功メッセージIDリストをメモリに保持する初期値空のリスト
batch_success_items = []
# 処理失敗メッセージIDリストをメモリに保持する初期値空のリスト
batch_failed_items = []
# ② SQSメッセージ重複排除処理を行う
logger.info('I-02-01 メッセージ処理開始')
batch_failed_items, batch_success_items = daily_data_unreceive_check(event["Records"], execute_date)
logger.info('I-06-01 すべてのメッセージの処理完了')
# ⑦ メッセージを処理済として、以下のDynamoDBテーブルに記録する
put_success_messages_to_dynamo_db(batch_success_items)
logger.info('I-07-01 処理済みメッセージIDの記録完了')
logger.info('I-07-02 処理終了 日次データ受領チェック処理')
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
raise e
return batch_failed_items
# 動作確認用のコード
# if __name__ == '__main__':
# lambda_handler({
# "Records": [
# {
# "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
# "receiptHandle": "MessageReceiptHandle",
# "body": "{\"data_source_name\":\"data_source_name\",\"check_bucket_name\":\"check_bucket_name\",\"check_folder_prefix\":\"check_folder_prefix\",\"check_target_file_list\":\"check_target_file_list\",\"notice_mail_title_template\":\"notice_mail_title_template\",\"notice_mail_body_template\":\"notice_mail_body_template\"\r\n}",
# "attributes": {
# "ApproximateReceiveCount": "1",
# "SentTimestamp": "1523232000000",
# "SenderId": "123456789012",
# "ApproximateFirstReceiveTimestamp": "1523232000001"
# },
# "messageAttributes": {},
# "md5OfBody": "{{{md5_of_body}}}",
# "eventSource": "aws:sqs",
# "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
# "awsRegion": "us-east-1"
# }
# ]
# }, {})