import os import datetime import boto3 import io import re import csv import logging from abc import * from zoneinfo import ZoneInfo import traceback # 環境変数 CHECK_BUCKET_NAME = os.environ["CHECK_BUCKET_NAME"] CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"] RECEIVE_DAILY_FILE_NAME_LIST_PATH = os.environ["RECEIVE_DAILY_FILE_NAME_LIST_PATH"] NON_BUSINESS_DAY_LIST_PATH = os.environ["NON_BUSINESS_DAY_LIST_PATH"] NOTICE_MAIL_TITLE_TEMPLATE_PATH = os.environ["NOTICE_MAIL_TITLE_TEMPLATE_PATH"] NOTICE_MAIL_BODY_TEMPLATE_PATH = os.environ["NOTICE_MAIL_BODY_TEMPLATE_PATH"] MBJ_SAP_NOTICE_TOPIC = os.environ["MBJ_SAP_NOTICE_TOPIC"] NDS_NOTICE_TOPIC = os.environ["NDS_NOTICE_TOPIC"] NDS_NOTICE_TITLE = os.environ["NDS_NOTICE_TITLE"] LOG_LEVEL = os.environ["LOG_LEVEL"] # 定数 ROW_COMMENT_SYMBOL = '#' INDEX_REGEX = 0 INDEX_DATA_NAME = 1 INDEX_ROW_COMMENT_SYMBOL = 0 INDEX_SPLIT_NUM = 1 INDEX_LAST = -1 # メール本文に出力する不足ファイル名一覧のインデント MAIL_INDENT = '  ' # 変数 s3_client = boto3.client('s3') sns_client = boto3.client('sns') # logger設定 logger = logging.getLogger() def custome_time(*arg): return datetime.datetime.now(ZoneInfo("Asia/Tokyo")).timetuple() formatter = logging.Formatter( '[%(levelname)s]\t%(asctime)s\t%(message)s\n', '%Y-%m-%d %H:%M:%S' ) formatter.converter = custome_time for handler in logger.handlers: handler.setFormatter(formatter) level = logging.getLevelName(LOG_LEVEL) if not isinstance(level, int): level = logging.INFO logger.setLevel(level) def lambda_handler(event, context): try: # ① 処理開始ログを出力する logger.info('I-01-01 処理開始 SAP_supデータ受領チェック処理(日次)') execute_date = datetime.date.today().strftime('%Y/%m/%d') logger.info(f'I-01-02 処理稼働日:{execute_date}') mail_msg = '' # ② 営業日チェック処理を行う logger.info('I-02-01 営業日チェック処理開始') # 1.設定ファイル[メルク社非営業日設定ファイル]を読み込む try: logger.info(f'I-02-02 非営業日設定ファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NON_BUSINESS_DAY_LIST_PATH}') non_business_day_response = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=NON_BUSINESS_DAY_LIST_PATH) logger.info('I-02-03 非営業日設定ファイルを読み込みました') except Exception as e: logger.error(f'E-02-01 非営業日設定ファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-02-01', e) # 2.処理稼働日が「②1.」で読み込んだ「メルク社非営業日設定ファイル」に存在するか確認する try: logger.info(f'I-02-04 本日が非営業日かチェックします チェック日:{execute_date}') none_business_day_list = [] for row in io.TextIOWrapper(io.BytesIO(non_business_day_response["Body"].read()), encoding='utf-8'): if row[INDEX_ROW_COMMENT_SYMBOL] == ROW_COMMENT_SYMBOL: continue non_date = row.rstrip('\n') # 日付妥当性判定 try: datetime.datetime.strptime(non_date, "%Y/%m/%d") except Exception as e: raise e none_business_day_list.append(non_date) if execute_date in none_business_day_list: logger.info('I-02-05 本日は非営業日のため、チェック処理をスキップします') return else: logger.info('I-02-06 本日は営業日のため、チェック処理を実施します') except Exception as e: logger.error(f'E-02-02 メルク社非営業日設定ファイルに不備があります エラー内容:{e}') raise NoneBusinessDayException('E-02-02', e) # ③ 設定ファイル[SAP_supI/Fファイルネーム設定ファイル(日次)]を読み込む try: logger.info(f'I-03-01 日次I/Fファイルネーム設定ファイル読込 読込元:{CONFIG_BUCKET_NAME}/{RECEIVE_DAILY_FILE_NAME_LIST_PATH}') receive_daily_file_name_response = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=RECEIVE_DAILY_FILE_NAME_LIST_PATH) logger.info('I-03-02 日次I/Fファイルネーム設定ファイルを読み込みました') except Exception as e: logger.error(f'E-03-01 日次I/Fファイルネーム設定ファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-03-01', e) # ④ 日次チェック処理を行う logger.info('I-04-01 日次チェック処理開始') # 1.SAP保管用バケットの処理稼働日に該当するサブフォルダにあるファイル一覧を取得する logger.info(f'I-04-02 オブジェクトリストの取得 取得先:{CHECK_BUCKET_NAME}/{execute_date}/') object_prefix = f'{execute_date}/' object_list = s3_client.list_objects_v2(Bucket=CHECK_BUCKET_NAME, Prefix=object_prefix) file_list = [] for obj in object_list.get('Contents', []): obj_key = obj['Key'].rsplit('/', INDEX_SPLIT_NUM) file_list.append(obj_key[INDEX_LAST]) # 2.日次I/Fファイルチェック処理 logger.info('I-04-03 日次I/Fファイルチェック処理開始') logger.info('I-04-04 取得したオブジェクトリストと日次I/Fファイルネーム設定ファイルの突き合わせを開始します') receive_daily_file_name_body = io.TextIOWrapper(io.BytesIO(receive_daily_file_name_response["Body"].read()), encoding='utf-8') match_count = 0 row_count = sum(1 for line in io.BytesIO(s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=RECEIVE_DAILY_FILE_NAME_LIST_PATH)["Body"].read())) for row in csv.reader(receive_daily_file_name_body, delimiter='\t'): file_exists = False for file_name in file_list: match_result = re.fullmatch(row[INDEX_REGEX], file_name) if match_result is not None: file_exists = True break if file_exists == True: match_count += 1 logger.info(f'I-04-05 日次I/Fファイルの受領を確認しました ファイル名:{file_name}') else: logger.error(f'E-04-01 日次I/Fファイルに不足があります ファイル名:{row[INDEX_DATA_NAME]}') mail_msg += f'{MAIL_INDENT}{row[INDEX_DATA_NAME]}\n' if row_count == match_count: logger.info('I-04-06 日次I/Fファイルは全て受領していることを確認しました') # ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する logger.info('I-05-01 メール送信処理開始') if len(mail_msg) > 0: # 1.存在した場合 logger.info(f'I-05-02 {execute_date} 日次I/Fファイルに不足があるため、メール送信処理を開始します') try: logger.info(f'I-05-03 通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_TITLE_TEMPLATE_PATH}') mail_title_obj = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=NOTICE_MAIL_TITLE_TEMPLATE_PATH) mail_title = mail_title_obj['Body'].read().decode('utf-8') logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました') except Exception as e: logger.error(f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-05-01', e) try: logger.info(f'I-05-05 通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{NOTICE_MAIL_BODY_TEMPLATE_PATH}') mail_body_obj = s3_client.get_object(Bucket=CONFIG_BUCKET_NAME, Key=NOTICE_MAIL_BODY_TEMPLATE_PATH) mail_body_response = mail_body_obj['Body'].read().decode('utf-8') # メール本文内のプレースホルダーを置き換える mail_body = substitute_mail_body(mail_body_response, mail_msg) logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました') except Exception as e: logger.error(f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}') raise FileReadException('E-05-02', e) logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_SAP_NOTICE_TOPIC}') mail_title_without_line_break = mail_title.splitlines()[0] params = { 'TopicArn': MBJ_SAP_NOTICE_TOPIC, 'Subject': mail_title_without_line_break, 'Message': mail_body } sns_client.publish(**params) logger.info('I-05-08 メール送信指示をしました') else: # 2.存在しない場合 logger.info(f'I-05-09 {execute_date} 日次I/Fファイルに不足がなかったため、メール送信処理をスキップします') # ⑥ 処理終了ログを出力する logger.info('I-06-01 処理終了 SAP_supデータ受領チェック処理(日次)') except CustomException as e: traceback.print_exc() error_notice(e.id, e.arg) except Exception as e: logger.error(f'E-99 想定外のエラーが発生しました エラー内容:{e}') traceback.print_exc() error_notice('E-99', e) return # 保守要員チーム通知 def error_notice(error_log_id, exception) -> None: try: error_msg = f'{error_log_id} のエラーが発生しました。ご確認ください\n詳細:{exception}' params = { 'TopicArn': NDS_NOTICE_TOPIC, 'Subject': NDS_NOTICE_TITLE, 'Message': error_msg } sns_client.publish(**params) logger.error(f'E-ERR-01 処理異常通知の送信指示をしました 通知先トピック:{NDS_NOTICE_TOPIC}') except Exception as e: logger.error(f'E-98 処理異常通知の送信指示に失敗しました エラー内容:{e}') traceback.print_exc() return def substitute_mail_body(before_mail_body:str, mail_msg: str) -> str: """メール本文のプレースホルダーを置き換えます Args: before_mail_body (str): 置き換え前のメール本文 mail_msg (str): メール本文のプレースホルダーを置き換える文言 Returns: str: 置き換え後のメール本文 """ substitute_dict = { "notice_file_names": mail_msg } mail_body = before_mail_body.format_map(substitute_dict) return mail_body # カスタムExceptionクラス class CustomException(Exception, metaclass=ABCMeta): def __init__(self, id, arg): self.arg = arg self.id = id class FileReadException(CustomException): pass class NoneBusinessDayException(CustomException): pass