Merge pull request #346 feature-NEWDWH2021-1439 into develop

This commit is contained in:
朝倉 明日香 2024-02-27 11:15:30 +09:00
commit 01356a8735
10 changed files with 425 additions and 0 deletions

View File

@ -0,0 +1,374 @@
import csv
import datetime
import io
import json
import logging
import os
import re
from zoneinfo import ZoneInfo
import boto3
from dateutil.relativedelta import relativedelta
# 環境変数
CHECK_BUCKET_NAME = os.environ["CHECK_BUCKET_NAME"]
CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH = os.environ["CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH"]
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
MAIL_TEMPLATE_FOLDER_PATH = os.environ["MAIL_TEMPLATE_FOLDER_PATH"]
MBJ_NOTICE_TOPIC = os.environ["MBJ_NOTICE_TOPIC"]
PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME = os.environ["PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME"]
PROCESSED_MESSAGE_EXPIRES_PERIOD = int(
os.environ["PROCESSED_MESSAGE_EXPIRES_PERIOD"])
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
ROW_COMMENT_SYMBOL = '#'
INDEX_REGEX = 0
INDEX_DATA_NAME = 1
INDEX_ROW_COMMENT_SYMBOL = 0
INDEX_SPLIT_NUM = 1
INDEX_LAST = -1
# メール本文に出力する不足ファイル名一覧のインデント
MAIL_INDENT = '  '
# AWS操作クライアント
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
dynamodb_client = boto3.client('dynamodb')
# logger設定
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def is_duplicate_message(message_id: str) -> bool:
"""DynamoDBテーブルに処理済みのSQSメッセージIdが存在するかどうかを調査する
Args:
message_id (str): SQSメッセージId
Returns:
bool: 存在する場合はTrue
"""
return dynamodb_client.query(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Select='COUNT',
KeyConditionExpression='message_id = :message_id',
ExpressionAttributeValues={
':message_id': {'S': message_id}
}
)["Count"] != 0
def put_success_messages_to_dynamo_db(batch_item_success: list[str]) -> bool:
"""処理済みのSQSメッセージIdをDynamoDBにPushする
Args:
batch_item_success (list[str]): SQSメッセージIdのリスト
Returns:
bool: 登録成功の場合True
"""
# レコードの有効期限を算出
now = datetime.datetime.now(ZoneInfo(TZ))
record_expiration_datetime = now + \
datetime.timedelta(minutes=PROCESSED_MESSAGE_EXPIRES_PERIOD)
record_expiration_time = record_expiration_datetime.timestamp()
for message_id in batch_item_success:
dynamodb_client.put_item(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Item={
'message_id': {'S': message_id},
'record_expiration_time': {'N': f'{record_expiration_time}'}
}
)
return True
def substitute_mail_template(mail_template: str, receive_timing: str, mail_msg: str) -> str:
"""メールテンプレートのプレースホルダーを置き換える
Args:
mail_template (str): 置き換え前のメールテンプレート
receive_timing (str): メールテンプレートのプレースホルダーを置き換える文言(受信タイミング)
mail_msg (str): メールテンプレートのプレースホルダーを置き換える文言(ファイル一覧)
Returns:
str: 置き換え後のメール本文
"""
substitute_dict = {
"receive_timing": receive_timing,
"notice_file_names": mail_msg
}
mail_str = mail_template.format_map(substitute_dict)
return mail_str
def make_failure_item_on_error(message_id: str) -> dict[str, str]:
"""Report batch item failuresによる処理に失敗したメッセージの判別のためのレスポンスを作成する
@see <https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting>
Args:
message_id (str): SQSメッセージId
Returns:
dict[str, str]: Report batch item failuresで失敗したSQSメッセージを判別するための辞書オブジェクト
"""
return {"itemIdentifier": message_id}
def encise_data_unreceive_check(records: list, execute_month: str) -> tuple[list[dict[str, str]], list[str]]:
"""Enciseデータ未受領チェック
Args:
records (list): SQS Eventのレコードリスト
execute_month (str): 処理起動年月
Returns:
tuple[list[dict[str, str]], list[str]]: 失敗メッセージIdのリスト, 成功メッセージIdのリスト
"""
batch_item_failures = []
batch_item_success = []
for record in records:
# メール挿入用文言を格納するためのメモリを保持する
mail_message = ''
try:
try:
# 1.SQSメッセージIDを取得する
message_id = record["messageId"]
# 2.DynamoDBテーブルからレコードを取得し、処理済みメッセージかどうかを判別する
if is_duplicate_message(message_id):
logger.info(
f'受信したメッセージは既に処理済みのため、処理をスキップします。メッセージID: {message_id}')
continue
except Exception as e:
logger.exception(f"E-02-01 メッセージ重複チェック処理に失敗しました エラー内容:{e}")
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
# SQSパラメータをJSONシリアライズし、Pythonの辞書オブジェクト(イベントパラメータ)を取得する。
event_parameter = json.loads(record['body'])
# ③ 設定ファイル[受領チェック対象ファイルリスト]を読み込む
try:
logger.info(
'I-03-01 ' +
'受領チェック対象ファイルリスト読込 読込元:' +
f'{CONFIG_BUCKET_NAME}/{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
check_target_file_list_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
logger.info('I-03-02 受領チェック対象ファイルリストを読み込みました')
except Exception as e:
logger.exception(
f"E-03-01 受領チェック対象ファイルリストの読み込みに失敗しました エラー内容:{e}")
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
# ④ 受領チェック処理を行う
receive_timing = event_parameter['receive_timing']
logger.info(f'I-04-01 Enciseデータ受領チェック ({receive_timing}) 処理開始')
object_prefix = f'{event_parameter["check_folder_prefix"]}/{execute_month}/'
# 1.Enciseデータバックアップ保管バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する
logger.info(
f'I-04-02 オブジェクトリストの取得 取得先:{CHECK_BUCKET_NAME}/{object_prefix}'
)
receive_file_list_response = s3_client.list_objects_v2(
Bucket=CHECK_BUCKET_NAME, Prefix=object_prefix)
receive_file_list = []
for content in receive_file_list_response.get('Contents', []):
# オブジェクトのキーからファイル名を切り出してリストに追加
obj_key = content['Key'].rsplit('/', INDEX_SPLIT_NUM)
receive_file_list.append(obj_key[INDEX_LAST])
# 2.I/Fファイルチェック処理
logger.info(f'I-04-03 Enciseデータ({receive_timing}) ファイルチェック処理開始')
check_target_file_name_body = io.TextIOWrapper(io.BytesIO(
check_target_file_list_response["Body"].read()), encoding='utf-8')
match_count = 0
row_count = 0
for tsv_row in csv.reader(check_target_file_name_body, delimiter='\t'):
# 「④1.」で取得したリストが「③」で読み込んだファイル内に存在するか確認する
is_file_not_exists = True
for file_name in receive_file_list:
match_result = re.fullmatch(
tsv_row[INDEX_REGEX], file_name)
# 「③」で読み込んだファイルに記載されている全てが「④1.」で取得したリストに存在した場合
if match_result is not None:
is_file_not_exists = False
# 存在したファイルの年月部分を抜き出し、チェック対象年月(処理稼働月-1)である場合
match_group_yyyymm = match_result.group(1)
check_target_yyyymm = (datetime.datetime.now(
ZoneInfo(TZ)) - relativedelta(month=1)).strftime('%Y%m')
if match_group_yyyymm == check_target_yyyymm:
logger.info(
f'I-04-04 I/Fファイルの受領を確認しました ファイル名{file_name}')
match_count += 1
else:
logger.info(
f'I-04-07 I/Fファイルの年月がチェック対象年月と一致しません ファイル名{file_name}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}(受領年月が不正:{file_name})\n'
break
if is_file_not_exists:
logger.info(
f'E-04-06 月次I/Fファイルに不足があります ファイル名{tsv_row[INDEX_DATA_NAME]}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}\n'
row_count += 1
if row_count == match_count:
logger.info('I-04-05 I/Fファイルは全て受領していることを確認しました')
# ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する
logger.info('I-05-01 メール送信処理開始')
if len(mail_message) == 0:
logger.info(
f'I-05-09 {execute_month} {receive_timing}データI/Fファイルに不足が無いため、メール送信処理をスキップします')
batch_item_success.append(message_id)
continue
# 1.存在した場合
logger.info(
f'I-05-02 {execute_month} {receive_timing}データI/Fファイルに不足があるため、メール送信処理を開始します')
try:
logger.info(
'I-05-03 ' +
f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_template = (
mail_title_response['Body'].read().decode('utf-8'))
# 改行を取り除く
mail_title = substitute_mail_template(
mail_title_template, receive_timing, mail_message)
mail_title_without_line_break = mail_title.splitlines()[0]
logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(
f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
try:
logger.info(
'I-05-05 ' +
f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template = (
mail_body_template_response['Body'].read().decode('utf-8'))
# メール本文内のプレースホルダーを置き換える
mail_body = substitute_mail_template(
mail_body_template, receive_timing, mail_message)
logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(
f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}')
params = {
'TopicArn': MBJ_NOTICE_TOPIC,
'Subject': mail_title_without_line_break,
'Message': mail_body
}
sns_client.publish(**params)
logger.info('I-05-08 メール送信指示をしました')
batch_item_success.append(message_id)
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
return batch_item_failures, batch_item_success
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Enciseデータ受領チェック処理')
# 処理稼働年月を取得しメモリに保持する
execute_date_today = datetime.date.today()
execute_month = execute_date_today.strftime('%Y/%m')
logger.info(f'I-01-02 処理稼働月:{execute_month}')
# 処理失敗メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_failures = []
# 処理成功メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_success = []
# ② SQSメッセージ重複排除処理を行う
logger.info('I-02-01 メッセージ処理開始')
batch_item_failures, batch_item_success = encise_data_unreceive_check(
event["Records"], execute_month)
logger.info('I-06-01 すべてのメッセージの処理完了')
# ⑦ メッセージを処理済として、以下のDynamoDBテーブルに記録する
put_success_messages_to_dynamo_db(batch_item_success)
logger.info('I-07-01 処理済みメッセージIDの記録完了')
logger.info('I-07-02 処理終了 Enciseデータ受領チェック処理')
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
raise e
return batch_item_failures
# 動作確認用のコード
# if __name__ == '__main__':
# lambda_handler({
# "Records": [
# {
# "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
# "receiptHandle": "MessageReceiptHandle",
# "body": "{\"receive_timing\": \"速報\",\"check_folder_prefix\": \"encise\",\"check_target_file_list\": \"en_quick_file_list.config\",\"notice_mail_title_template\": \"en_unreceive_notice_mail_title.config\",\"notice_mail_body_template\": \"en_unreceive_notice_mail_body.config\"\r\n}",
# "attributes": {
# "ApproximateReceiveCount": "1",
# "SentTimestamp": "1523232000000",
# "SenderId": "123456789012",
# "ApproximateFirstReceiveTimestamp": "1523232000001"
# },
# "messageAttributes": {},
# "md5OfBody": "{{{md5_of_body}}}",
# "eventSource": "aws:sqs",
# "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
# "awsRegion": "us-east-1"
# }
# ]
# }, {})

View File

@ -0,0 +1,9 @@
宛先各位
 Encise {receive_timing}である以下のファイルを受領できておりません。
{notice_file_names}
 QlikSenseサーバー側の送信状況のご確認をお願いいたします。
 尚、本メールはシステム自動送信のため、返信は出来ません。
 本件に関する問い合わせは、MeDaCA Enciseデータ担当者にお願いいたします。

View File

@ -0,0 +1 @@
【MeDaCa連携エラー通知】Encise {receive_timing}ファイル未受領

View File

@ -0,0 +1,9 @@
宛先各位
 Encise {receive_timing}である以下のファイルを受領できておりません。
{notice_file_names}
 Encise社へ送信状況のご確認をお願いいたします。
 尚、本メールはシステム自動送信のため、返信は出来ません。
 本件に関する問い合わせは、MeDaCA Enciseデータ担当者にお願いいたします。

View File

@ -0,0 +1 @@
【MeDaCa連携エラー通知】Encise {receive_timing}ファイル未受領

View File

@ -0,0 +1,3 @@
CLUFD_Merck02_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck02
CLUFD_Merck03_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck03
CLUFD_Merck04_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck04

View File

@ -0,0 +1,6 @@
CLUFD_Merck02_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck02
CLUFD_Merck03_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck03
CLUFD_Merck04_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck04
CLUFM_CLUMST_496_01_([0-9]{6})\.(CSV|csv) En-Cluster クラスタマスタ
CLUFM_PROADD_496_([0-9]{6})\.(CSV|csv) En-Cluster 製品定義ファイル
CLUFM_PROLST_496_([0-9]{6})\.(CSV|csv) En-Cluster 製品マスタ

View File

@ -0,0 +1,11 @@
NATFD_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation データ
NATFM_PROADD_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation 製品定義ファイル
NATFM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation セグメントマスタ
MST_BRAND([0-9]{6})\.(CSV|csv) 確定版 En-Nation ブランドマスタ
CITFD_Merck01_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck01
CITFD_Merck06_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck06
CITFD_Merck07_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck07
CITFM_PROADD_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 製品定義ファイル
CITFM_PROLST_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 製品マスタ
CITFM_REGION_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 地域マスタ
CITFM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 確定版 En-City セグメントマスタ

View File

@ -0,0 +1 @@
NATFD_496_([0-9]{6})D\.(CSV|csv) 確定版 En-Nation データ 直販分

View File

@ -0,0 +1,10 @@
NATQD_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation データ
NATQM_PROADD_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation 製品定義ファイル
NATQM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation セグメントマスタ
CITQD_Merck01_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck01
CITQD_Merck06_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck06
CITQD_Merck07_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck07
CITQM_PROADD_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 製品定義ファイル
CITQM_PROLST_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 製品マスタ
CITQM_REGION_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 地域マスタ
CITQM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 速報版 En-City セグメントマスタ