import boto3 import environments from constants import AWS_RESOURCE_SNS class SNSClient: def __init__(self) -> None: self.__sns_client = boto3.client(AWS_RESOURCE_SNS) def publish(self, sns_topic_arn: str, subject: str, message: str) -> None: publish_params = { 'TopicArn': sns_topic_arn, 'Subject': subject.rstrip('\n'), 'Message': message } self.__sns_client.publish(**publish_params) class SNSNotifier: __sns_client: SNSClient = None def __init__(self) -> None: self.__sns_client = SNSClient() def publish_to_mbj(self, subject: str, message: str): self.__sns_client.publish(environments.MBJ_NOTICE_TOPIC, subject, message) def publish_to_nds(self, error_id: str, exception: Exception): error_message = f'{error_id} のエラーが発生しました。ご確認ください\n詳細:{exception}' self.__sns_client.publish(environments.NDS_NOTICE_TOPIC, environments.NDS_NOTICE_TITLE, error_message)