import csv import datetime import io import json import logging import os import re from zoneinfo import ZoneInfo import boto3 from dateutil.relativedelta import relativedelta # 環境変数 CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"] MBJ_NOTICE_TOPIC = os.environ["MBJ_NOTICE_TOPIC"] PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME = os.environ["PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME"] PROCESSED_MESSAGE_EXPIRES_PERIOD = int(os.environ["PROCESSED_MESSAGE_EXPIRES_PERIOD"]) LOG_LEVEL = os.environ["LOG_LEVEL"] TZ = os.environ["TZ"] # 定数 ROW_COMMENT_SYMBOL = '#' INDEX_REGEX = 0 INDEX_DATA_NAME = 1 INDEX_ROW_COMMENT_SYMBOL = 0 INDEX_SPLIT_NUM = 1 INDEX_LAST = -1 # メール本文に出力する不足ファイル名一覧のインデント MAIL_INDENT = '  ' # AWS操作クライアント s3_client = boto3.client('s3') sns_client = boto3.client('sns') dynamodb_client = boto3.client('dynamodb') # logger設定 def log_datetime_convert_tz(*arg): """ログに出力するタイムスタンプのロケールを変更する(JST指定)""" return datetime.datetime.now(ZoneInfo(TZ)).timetuple() logger = logging.getLogger() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = log_datetime_convert_tz for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) def is_duplicate_message(message_id: str) -> bool: """DynamoDBテーブルに処理済みのSQSメッセージIdが存在するかどうかを調査する Args: message_id (str): SQSメッセージId Returns: bool: 存在する場合はTrue """ return dynamodb_client.query( TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME, Select='COUNT', KeyConditionExpression='message_id = :message_id', ExpressionAttributeValues={ ':message_id': {'S': message_id} } )["Count"] != 0 def put_success_messages_to_dynamo_db(batch_success_items: list[str]) -> bool: """処理済みのSQSメッセージIdをDynamoDBにPushする Args: batch_success_items (list[str]): SQSメッセージIdのリスト Returns: bool: 登録成功の場合、True """ # レコードの有効期限を算出 now = datetime.datetime.now(ZoneInfo(TZ)) record_expiration_datetime = now + \ datetime.timedelta(minutes=PROCESSED_MESSAGE_EXPIRES_PERIOD) record_expiration_time = record_expiration_datetime.timestamp() for message_id in batch_success_items: dynamodb_client.put_item( TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME, Item={ 'message_id': {'S': message_id}, 'record_expiration_time': {'N': f'{record_expiration_time}'} } ) return True def substitute_mail_template(mail_template: str, mail_msg: str) -> str: """メールテンプレートのプレースホルダーを置き換える Args: mail_template (str): 置き換え前のメールテンプレート mail_msg (str): メールテンプレートのプレースホルダーを置き換える文言(ファイル一覧) Returns: str: 置き換え後のメール本文 """ substitute_dict = { "notice_file_names": mail_msg } mail_str = mail_template.format_map(substitute_dict) return mail_str def make_failure_item_on_error(message_id: str) -> dict[str, str]: """Report batch item failuresによる処理に失敗したメッセージの判別のためのレスポンスを作成する @see Args: message_id (str): SQSメッセージId Returns: dict[str, str]: Report batch item failuresで失敗したSQSメッセージを判別するための辞書オブジェクト """ return {"itemIdentifier": message_id} def daily_data_unreceive_check(records: list, execute_date: str) -> tuple[list[dict[str, str]], list[str]]: """日次データ未受領チェック Args: records (list): SQS Eventのレコードリスト execute_date (str): 処理稼働年月日 Returns: tuple[list[dict[str, str]], list[str]]: 失敗メッセージIdのリスト, 成功メッセージIdのリスト """ batch_failed_items = [] batch_success_items = [] for record in records: # メール挿入用文言を格納するためのメモリを保持する mail_message = '' try: try: # 1.SQSメッセージIDを取得する message_id = record["messageId"] # 2.DynamoDBテーブルからレコードを取得し、処理済みメッセージかどうかを判別する if is_duplicate_message(message_id): logger.info(f'I-02-02 受信したメッセージは既に処理済みのため、処理をスキップします。メッセージID: {message_id}') continue except Exception as e: logger.exception(f"E-02-01 メッセージ重複チェック処理に失敗しました エラー内容:{e}") batch_failed_items.append(make_failure_item_on_error(message_id)) continue # SQSパラメータをJSONシリアライズし、Pythonの辞書オブジェクト(イベントパラメータ)を取得する。 event_parameter = json.loads(record['body']) # ③ 設定ファイル[受領チェック対象ファイルリスト]を読み込む try: logger.info('I-03-01 ' +'受領チェック対象ファイルリスト読込 読込元:' + f'{CONFIG_BUCKET_NAME}/{event_parameter["check_target_file_list"]}') check_target_file_list_response = s3_client.get_object( Bucket=CONFIG_BUCKET_NAME, Key=f'{event_parameter["check_target_file_list"]}' ) logger.info('I-03-02 受領チェック対象ファイルリストを読み込みました') except Exception as e: logger.exception(f"E-03-01 受領チェック対象ファイルリストの読み込みに失敗しました エラー内容:{e}") batch_failed_items.append(make_failure_item_on_error(message_id)) continue # ④ 受領チェック処理を行う receive_date = execute_date.strftime('%Y/%m/%d') logger.info(f'I-04-01 日次データ受領チェック ({event_parameter['data_source_name']}) 処理開始') object_prefix = f'{event_parameter["check_folder_prefix"]}/{receive_date}/' # 1.日次データバックアップ保管バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する logger.info(f'I-04-02 オブジェクトリストの取得 取得先:{event_parameter['check_bucket_name']}/{object_prefix}') receive_file_list_response = s3_client.list_objects_v2(Bucket=event_parameter['check_bucket_name'], Prefix=object_prefix) receive_file_list = [] for content in receive_file_list_response.get('Contents', []): # オブジェクトのキーからファイル名を切り出してリストに追加 obj_key = content['Key'].rsplit('/', INDEX_SPLIT_NUM) receive_file_list.append(obj_key[INDEX_LAST]) # 2.I/Fファイルチェック処理 logger.info(f'I-04-03 日次受信データ({event_parameter['data_source_name']}) 未受領チェック処理開始') check_target_file_name_body = io.TextIOWrapper(io.BytesIO( check_target_file_list_response["Body"].read()), encoding='utf-8') match_count = 0 row_count = 0 for tsv_row in csv.reader(check_target_file_name_body, delimiter='\t'): # 「④1.」で取得したリストが「③」で読み込んだファイル内に存在するか確認する is_file_not_exists = True for file_name in receive_file_list: match_result = re.fullmatch(tsv_row[INDEX_REGEX], file_name) # 「③」で読み込んだファイルに記載されている全てが「④1.」で取得したリストに存在した場合 if match_result is not None: is_file_not_exists = False logger.info(f'I-04-04 I/Fファイルの受領を確認しました ファイル名:{file_name}') match_count += 1 if is_file_not_exists: logger.info(f'I-04-06 月次I/Fファイルに不足があります ファイル名:{tsv_row[INDEX_DATA_NAME]}') mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}\n' row_count += 1 if row_count == match_count: logger.info('I-04-05 I/Fファイルは全て受領していることを確認しました') # ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する logger.info('I-05-01 メール送信処理開始') if len(mail_message) == 0: logger.info( f'I-05-09 {execute_date} {event_parameter["data_source_name"]}データI/Fファイルに不足が無いため、メール送信処理をスキップします') batch_success_items.append(message_id) continue # 1.存在した場合 logger.info( f'I-05-02 {execute_date} {event_parameter["data_source_name"]}データI/Fファイルに不足があるため、メール送信処理を開始します') try: logger.info('I-05-03 ' +f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_title_template"]}') mail_title_response = s3_client.get_object( Bucket=CONFIG_BUCKET_NAME, Key=f'{event_parameter["notice_mail_title_template"]}' ) mail_title_template = (mail_title_response['Body'].read().decode('utf-8')) # 改行を取り除く mail_title_without_line_break = mail_title_template.splitlines()[0] logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました') except Exception as e: logger.exception( f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') batch_failed_items.append( make_failure_item_on_error(message_id)) continue try: logger.info('I-05-05 ' +f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{event_parameter["notice_mail_body_template"]}') mail_body_template_response = s3_client.get_object( Bucket=CONFIG_BUCKET_NAME, Key=f'{event_parameter["notice_mail_body_template"]}' ) mail_body_template = (mail_body_template_response['Body'].read().decode('utf-8')) # メール本文内のプレースホルダーを置き換える mail_body = substitute_mail_template(mail_body_template, mail_message) logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました') except Exception as e: logger.exception(f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') batch_failed_items.append(make_failure_item_on_error(message_id)) continue logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}') params = { 'TopicArn': MBJ_NOTICE_TOPIC, 'Subject': mail_title_without_line_break, 'Message': mail_body } sns_client.publish(**params) logger.info('I-05-08 メール送信指示をしました') batch_success_items.append(message_id) except Exception as e: logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}') batch_failed_items.append(make_failure_item_on_error(message_id)) continue return batch_failed_items, batch_success_items def lambda_handler(event, context): try: # ① 処理開始ログを出力する logger.info('I-01-01 処理開始 日次データ受領チェック処理') # 処理稼働年月を取得しメモリに保持する execute_date = datetime.date.today() # 処理成功メッセージIDリストをメモリに保持する(初期値=空のリスト) batch_success_items = [] # 処理失敗メッセージIDリストをメモリに保持する(初期値=空のリスト) batch_failed_items = [] # ② SQSメッセージ重複排除処理を行う logger.info('I-02-01 メッセージ処理開始') batch_failed_items, batch_success_items = daily_data_unreceive_check(event["Records"], execute_date) logger.info('I-06-01 すべてのメッセージの処理完了') # ⑦ メッセージを処理済として、以下のDynamoDBテーブルに記録する put_success_messages_to_dynamo_db(batch_success_items) logger.info('I-07-01 処理済みメッセージIDの記録完了') logger.info('I-07-02 処理終了 日次データ受領チェック処理') except Exception as e: logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}') raise e return batch_failed_items # 動作確認用のコード # if __name__ == '__main__': # lambda_handler({ # "Records": [ # { # "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", # "receiptHandle": "MessageReceiptHandle", # "body": "{\"data_source_name\":\"data_source_name\",\"check_bucket_name\":\"check_bucket_name\",\"check_folder_prefix\":\"check_folder_prefix\",\"check_target_file_list\":\"check_target_file_list\",\"notice_mail_title_template\":\"notice_mail_title_template\",\"notice_mail_body_template\":\"notice_mail_body_template\"\r\n}", # "attributes": { # "ApproximateReceiveCount": "1", # "SentTimestamp": "1523232000000", # "SenderId": "123456789012", # "ApproximateFirstReceiveTimestamp": "1523232000001" # }, # "messageAttributes": {}, # "md5OfBody": "{{{md5_of_body}}}", # "eventSource": "aws:sqs", # "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue", # "awsRegion": "us-east-1" # } # ] # }, {})