newdwh2021/lambda/encise-data-unreceive-check/encise-data-unreceive-check.py
2024-02-13 13:34:46 +09:00

306 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import datetime
import io
import json
import logging
import os
import re
from zoneinfo import ZoneInfo
import boto3
# 環境変数
CHECK_BUCKET_NAME = os.environ["CHECK_BUCKET_NAME"]
CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH = os.environ["CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH"]
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
MAIL_TEMPLATE_FOLDER_PATH = os.environ["MAIL_TEMPLATE_FOLDER_PATH"]
MBJ_NOTICE_TOPIC = os.environ["MBJ_NOTICE_TOPIC"]
NDS_NOTICE_TITLE = os.environ["NDS_NOTICE_TITLE"]
NDS_NOTICE_TOPIC = os.environ["NDS_NOTICE_TOPIC"]
PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME = os.environ["PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME"]
PROCESSED_MESSAGE_EXPIRES_PERIOD = os.environ["PROCESSED_MESSAGE_EXPIRES_PERIOD"]
TZ = os.environ["TZ"]
# 定数
ROW_COMMENT_SYMBOL = '#'
INDEX_REGEX = 0
INDEX_DATA_NAME = 1
INDEX_ROW_COMMENT_SYMBOL = 0
INDEX_SPLIT_NUM = 1
INDEX_LAST = -1
# メール本文に出力する不足ファイル名一覧のインデント
MAIL_INDENT = '  '
# AWS操作クライアント
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
dynamodb_client = boto3.client('dynamodb')
# logger設定
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def is_duplicate_message(message_id: str) -> bool:
"""DynamoDBテーブルに処理済みのSQSメッセージIdが存在するかどうかを調査する
Args:
message_id (str): SQSメッセージId
Returns:
bool: 存在する場合はTrue
"""
return dynamodb_client.query(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Select='COUNT',
KeyConditionExpression='message_id = :message_id',
ExpressionAttributeValues={
':message_id': {'S': message_id}
}
)["Count"] != 0
def push_success_messages_to_dynamo_db(batch_item_success: list[str]) -> bool:
"""処理済みのSQSメッセージIdをDynamoDBにPushする
Args:
batch_item_success (list[str]): SQSメッセージIdのリスト
Returns:
bool: 登録成功の場合、True
"""
for message_id in batch_item_success:
dynamodb_client.put_item(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Item={'message_id': {'S': message_id}}
)
return True
def substitute_mail_template(mail_template: str, receive_timing: str, mail_msg: str) -> str:
"""メールテンプレートのプレースホルダーを置き換える
Args:
mail_template (str): 置き換え前のメールテンプレート
receive_timing (str): メールテンプレートのプレースホルダーを置き換える文言(受信タイミング)
mail_msg (str): メールテンプレートのプレースホルダーを置き換える文言(ファイル一覧)
Returns:
str: 置き換え後のメール本文
"""
substitute_dict = {
"receive_timing": receive_timing,
"notice_file_names": mail_msg
}
mail_str = mail_template.format_map(substitute_dict)
return mail_str
def make_failure_item_on_error(message_id: str) -> dict[str, str]:
"""Report batch item failuresによる処理に失敗したメッセージの判別のためのレスポンスを作成する
@see <https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting>
Args:
message_id (str): SQSメッセージId
Returns:
dict[str, str]: Report batch item failuresで失敗したSQSメッセージを判別するための辞書オブジェクト
"""
return {"itemIdentifier": message_id}
def unreceive_check(records: list, execute_month: str) -> tuple[list[dict[str, str]], list[str]]:
batch_item_failures = []
batch_item_success = []
for record in records:
# メール挿入用文言を格納するためのメモリを保持する
mail_message = ''
try:
try:
# 1.SQSメッセージIDを取得する
message_id = record["messageId"]
# 2.DynamoDBテーブルからレコードを取得し、処理済みメッセージかどうかを判別する
if is_duplicate_message(message_id):
logger.info(f'受信したメッセージは既に処理済みのため、処理をスキップします。メッセージID: {message_id}')
continue
except Exception as e:
logger.error(f"E-02-01 メッセージ重複チェック処理に失敗しました エラー内容:{e}")
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
# SQSパラメータをJSONシリアライズし、Pythonの辞書オブジェクト(イベントパラメータ)を取得する。
event_parameter = json.loads(record['body'])
# ③ 設定ファイル[受領チェック対象ファイルリスト]を読み込む
try:
logger.info(
'I-03-01 ' +
'受領チェック対象ファイルリスト読込 読込元:' +
f'{CONFIG_BUCKET_NAME}/{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
check_target_file_list_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
logger.info('I-03-02 受領チェック対象ファイルリストを読み込みました')
except Exception as e:
logger.error(f"E-03-01 受領チェック対象ファイルリストの読み込みに失敗しました エラー内容:{e}")
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
# ④ 受領チェック処理を行う
receive_timing = event_parameter['receive_timing']
logger.info(f'I-04-01 Enciseデータ受領チェック ({receive_timing}) 処理開始')
object_prefix = f'{event_parameter["check_folder_prefix"]}{execute_month}/'
# 1.Enciseデータバックアップ保管バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する
logger.info(
f'I-04-02 オブジェクトリストの取得 取得先:{CHECK_BUCKET_NAME}/{object_prefix}'
)
check_target_file_list_response = s3_client.list_objects_v2(CHECK_BUCKET_NAME, Prefix=object_prefix)
check_target_file_list = []
for content in check_target_file_list_response[['Contents']]:
# オブジェクトのキーからファイル名を切り出してリストに追加
obj_key = content['Key'].rsplit('/', INDEX_SPLIT_NUM)
check_target_file_list.append(obj_key[INDEX_LAST])
# 2.I/Fファイルチェック処理
logger.info(f'I-04-03 Enciseデータ({receive_timing}) ファイルチェック処理開始')
receive_monthly_file_name_body = io.TextIOWrapper(io.BytesIO(check_target_file_list_response["Body"].read()), encoding='utf-8')
match_count = 0
row_count = 0
for tsv_row in csv.reader(receive_monthly_file_name_body, delimiter='\t'):
# 「④1.」で取得したリストが「③」で読み込んだファイル内に存在するか確認する
is_file_not_exists = True
for file_name in check_target_file_list:
match_result = re.fullmatch(tsv_row[INDEX_REGEX], file_name)
# 「③」で読み込んだファイルに記載されている全てが「④1.」で取得したリストに存在した場合
if match_result is not None:
# 存在したファイルの年月部分を抜き出し、チェック対象年月(処理稼働月-1)である場合
if True: # TODO
logger.info(f'I-04-04 I/Fファイルの受領を確認しました ファイル名{file_name}')
is_file_not_exists = False
match_count += 1
else:
logger.info(f'I-04-07 I/Fファイルの年月がチェック対象年月と一致しません ファイル名{file_name}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}(受領年月が不正:{file_name})\n'
break
if is_file_not_exists:
logger.info(f'E-04-06 月次I/Fファイルに不足があります ファイル名{tsv_row[INDEX_DATA_NAME]}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}\n'
row_count += 1
if row_count == match_count:
logger.info('I-04-05 I/Fファイルは全て受領していることを確認しました')
# ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する
logger.info('I-05-01 メール送信処理開始')
if len(mail_message) == 0:
logger.info(f'I-05-09 {execute_month} {receive_timing}データI/Fファイルに不足が無いため、メール送信処理をスキップします')
batch_item_success.append(message_id)
continue
# 1.存在した場合
logger.info(f'I-05-02 {execute_month} {receive_timing}データI/Fファイルに不足があるため、メール送信処理を開始します')
try:
logger.info(
'I-05-03 ' +
f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_template = mail_title_response['Body'].read().decode('utf-8')
# 改行を取り除く
mail_title = substitute_mail_template(mail_title_template, receive_timing, mail_message)
mail_title_without_line_break = mail_title.splitlines()[0]
logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました')
except Exception as e:
logger.error(f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
try:
logger.info(
'I-05-05 ' +
f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template = mail_body_template_response['Body'].read().decode('utf-8')
# メール本文内のプレースホルダーを置き換える
mail_body = substitute_mail_template(mail_body_template, receive_timing, mail_message)
logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました')
except Exception as e:
logger.error(f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}')
params = {
'TopicArn': MBJ_NOTICE_TOPIC,
'Subject': mail_title_without_line_break,
'Message': mail_body
}
sns_client.publish(**params)
logger.info('I-05-08 メール送信指示をしました')
batch_item_success.append(message_id)
except Exception as e:
logger.error(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
return batch_item_failures, batch_item_success
def lambda_handler(event, context):
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Enciseデータ受領チェック処理')
# 処理稼働年月を取得しメモリに保持する
execute_date_today = datetime.date.today()
execute_month = execute_date_today.strftime('%Y/%m')
logger.info(f'I-01-02 処理稼働月:{execute_month}')
# 処理失敗メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_failures = []
# 処理成功メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_success = []
# ② SQSメッセージ重複排除処理を行う
logger.info('I-02-01 メッセージ処理開始')
batch_item_failures, batch_item_success = unreceive_check(event["Records"], execute_month)
logger.info('I-06-01 すべてのメッセージの処理完了')
# ⑦ メッセージを処理済として、以下のDynamoDBテーブルに記録する
push_success_messages_to_dynamo_db(batch_item_success)
logger.info('I-07-01 処理済みメッセージIDの記録完了')
logger.info('I-07-02 処理終了 Enciseデータ受領チェック処理')
return batch_item_failures