import os import datetime import boto3 import io import re import csv import logging from abc import * from zoneinfo import ZoneInfo import traceback # 環境変数 CHECK_BUCKET_NAME = os.environ["CHECK_BUCKET_NAME"] CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"] RECEIVE_MONTHLY_FILE_NAME_LIST_PATH = os.environ["RECEIVE_MONTHLY_FILE_NAME_LIST_PATH"] MONTHLY_CEHCK_DAY_LIST_PATH = os.environ["MONTHLY_CEHCK_DAY_LIST_PATH"] NOTICE_MAIL_TITLE_TEMPLATE_PATH = os.environ["NOTICE_MAIL_TITLE_TEMPLATE_PATH"] NOTICE_MAIL_BODY_TEMPLATE_PATH = os.environ["NOTICE_MAIL_BODY_TEMPLATE_PATH"] MBJ_SAP_NOTICE_TOPIC = os.environ["MBJ_SAP_NOTICE_TOPIC"] NDS_NOTICE_TOPIC = os.environ["NDS_NOTICE_TOPIC"] NDS_NOTICE_TITLE = os.environ["NDS_NOTICE_TITLE"] LOG_LEVEL = os.environ["LOG_LEVEL"] # 定数 ROW_COMMENT_SYMBOL = '#' INDEX_REGEX = 0 INDEX_DATA_NAME = 1 INDEX_ROW_COMMENT_SYMBOL = 0 INDEX_SPLIT_NUM = 1 INDEX_LAST = -1 # メール本文に出力する不足ファイル名一覧のインデント MAIL_INDENT = '  ' # 変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') sns_client = boto3.client('sns') # logger設定 logger = logging.getLogger() def custome_time(*arg): return datetime.datetime.now(ZoneInfo("Asia/Tokyo")).timetuple() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = custome_time for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) def lambda_handler(event, context): try: # ① 処理開始ログを出力する logger.info('I-01-01 処理開始 SAP_supデータ受領チェック処理(月次)') execute_datetoday = datetime.date.today() execute_date = execute_datetoday.strftime('%Y/%m/%d') execute_month = f'{execute_datetoday.strftime("%Y")}/{execute_datetoday.strftime("%m")}' logger.info(f'I-01-02 処理稼働日:{execute_date}') mail_msg = '' # ② チェック処理実施指定日か確認を行う logger.info('I-02-01 チェック処理実施指定日確認処理開始') # 1.設定ファイル[チェック処理実施指定日ファイル]を読み込む try: logger.info(f'I-02-02 チェック処理実施指定日ファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MONTHLY_CEHCK_DAY_LIST_PATH}') monthly_day_obj = s3_resource.Object(CONFIG_BUCKET_NAME, MONTHLY_CEHCK_DAY_LIST_PATH) monthly_day_response = monthly_day_obj.get() logger.info('I-02-03 チェック処理実施指定日ファイルを読み込みました') except Exception as e: logger.error(f'E-02-01 チェック処理実施指定日設定ファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-02-01', e) # 2.処理稼働日が「②1.」で読み込んだ「チェック処理実施指定日ファイル」に存在するか確認する try: logger.info(f'I-02-04 本日がチェック処理実施指定日か確認します 確認日:{execute_date}') check_date_list = [] for row in io.TextIOWrapper(io.BytesIO(monthly_day_response["Body"].read()), encoding='utf-8'): if row[INDEX_ROW_COMMENT_SYMBOL] == ROW_COMMENT_SYMBOL: continue check_date = row.rstrip('\n') # 日付妥当性判定 try: datetime.datetime.strptime(check_date, "%Y/%m/%d") except Exception as e: raise e check_date_list.append(check_date) if execute_date in check_date_list: logger.info('I-02-05 本日はチェック処理実施指定日のため、チェック処理を実施します') else: logger.info('I-02-06 本日はチェック処理実施指定日ではないため、チェック処理をスキップします') return except Exception as e: logger.error(f'E-02-02 チェック処理実施指定日設定ファイルに不備があります エラー内容:{e}') raise CheckDayException('E-02-02', e) # ③ 設定ファイル[SAP_supI/Fファイルネーム設定ファイル(月次)]を読み込む try: logger.info(f'I-03-01 月次I/Fファイルネーム設定ファイル読込 読込元:{CONFIG_BUCKET_NAME}/{RECEIVE_MONTHLY_FILE_NAME_LIST_PATH}') receive_monthly_file_name_obj = s3_resource.Object(CONFIG_BUCKET_NAME, RECEIVE_MONTHLY_FILE_NAME_LIST_PATH) receive_monthly_file_name_response = receive_monthly_file_name_obj.get() logger.info('I-03-02 月次I/Fファイルネーム設定ファイルを読み込みました') except Exception as e: logger.error(f'E-03-01 月次I/Fファイルネーム設定ファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-03-01', e) # ④ 月次チェック処理を行う logger.info('I-04-01 月次チェック処理開始') # 1.SAP保管用バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する logger.info(f'I-04-02 オブジェクトリストの取得 取得先:{CHECK_BUCKET_NAME}/{execute_month}/') object_prefix = f'{execute_month}/' object_list = s3_resource.Bucket(CHECK_BUCKET_NAME).objects.filter(Prefix=object_prefix) file_list = [] for obj in object_list: obj_key = obj.key.rsplit('/', INDEX_SPLIT_NUM) file_list.append(obj_key[INDEX_LAST]) # 2.月次I/Fファイルチェック処理 logger.info('I-04-03 月次I/Fファイルチェック処理開始') logger.info('I-04-04 取得したオブジェクトリストと月次I/Fファイルネーム設定ファイルの突き合わせを開始します') receive_monthly_file_name_body = io.TextIOWrapper(io.BytesIO(receive_monthly_file_name_response["Body"].read()), encoding='utf-8') match_count = 0 row_count = sum(1 for line in io.BytesIO(receive_monthly_file_name_obj.get()["Body"].read())) for row in csv.reader(receive_monthly_file_name_body, delimiter='\t'): file_exists = False for file_name in file_list: match_result = re.fullmatch(row[INDEX_REGEX], file_name) if match_result is not None: file_exists = True break if file_exists == True: match_count += 1 logger.info(f'I-04-05 月次I/Fファイルの受領を確認しました ファイル名:{file_name}') else: logger.error(f'E-04-01 月次I/Fファイルに不足があります ファイル名:{row[INDEX_DATA_NAME]}') mail_msg += f'{MAIL_INDENT}{row[INDEX_DATA_NAME]}\n' if row_count == match_count: logger.info('I-04-06 月次I/Fファイルは全て受領していることを確認しました') # ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する logger.info('I-05-01 メール送信処理開始') if len(mail_msg) > 0: # 1.存在した場合 logger.info(f'I-05-02 {execute_month} 月次I/Fファイルに不足があるため、メール送信処理を開始します') try: logger.info(f'I-05-03 通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_TITLE_TEMPLATE_PATH}') mail_title_obj = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=NOTICE_MAIL_TITLE_TEMPLATE_PATH) mail_title = mail_title_obj['Body'].read().decode('utf-8') logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました') except Exception as e: logger.error(f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-05-01', e) try: logger.info(f'I-05-05 通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_BODY_TEMPLATE_PATH}') mail_body_obj = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=NOTICE_MAIL_BODY_TEMPLATE_PATH) mail_body_response = mail_body_obj['Body'].read().decode('utf-8') # メール本文内のプレースホルダーを置き換える mail_body = substitute_mail_body(mail_body_response, mail_msg) logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました') except Exception as e: logger.error(f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-05-02', e) logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_SAP_NOTICE_TOPIC}') mail_title_without_line_break = mail_title.splitlines()[0] params = { 'TopicArn': MBJ_SAP_NOTICE_TOPIC, 'Subject': mail_title_without_line_break, 'Message': mail_body } sns_client.publish(**params) logger.info('I-05-08 メール送信指示をしました') else: # 2.存在しない場合 logger.info(f'I-05-09 {execute_month} 月次I/Fファイルに不足がなかったため、メール送信処理をスキップします') # ⑥ 処理終了ログを出力する logger.info('I-06-01 処理終了 SAP_supデータ受領チェック処理(月次)') except CustomException as e: traceback.print_exc() error_notice(e.id, e.arg) except Exception as e: logger.error(f'E-99 想定外のエラーが発生しました エラー内容:{e}') traceback.print_exc() error_notice('E-99', e) return # 保守要員チーム通知 def error_notice(error_log_id, exception) -> None: try: error_msg = f'{error_log_id} のエラーが発生しました。ご確認ください\n詳細:{exception}' params = { 'TopicArn': NDS_NOTICE_TOPIC, 'Subject': NDS_NOTICE_TITLE, 'Message': error_msg } sns_client.publish(**params) logger.error(f'E-ERR-01 処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}') except Exception as e: logger.error(f'E-98 処理異常通知の送信指示に失敗しました エラー内容:{e}') traceback.print_exc() return def substitute_mail_body(before_mail_body:str, mail_msg: str) -> str: """メール本文のプレースホルダーを置き換えます Args: before_mail_body (str): 置き換え前のメール本文 mail_msg (str): メール本文のプレースホルダーを置き換える文言 Returns: str: 置き換え後のメール本文 """ substitute_dict = { "notice_file_names": mail_msg } mail_body = before_mail_body.format_map(substitute_dict) return mail_body # カスタムExceptionクラス class CustomException(Exception, metaclass=ABCMeta): def __init__(self, id, arg): self.arg = arg self.id = id class FileReadException(CustomException): pass class CheckDayException(CustomException): pass