import boto3 import environments import exceptions from botocore.exceptions import ClientError from constants import AWS_RESOURCE_S3, S3_RESPONSE_BODY class S3Resource: def __init__(self, bucket_name: str) -> None: self.__s3_resource = boto3.resource(AWS_RESOURCE_S3) self.__s3_bucket = self.__s3_resource.Bucket(bucket_name) def get_object(self, object_key: str): s3_object = self.__s3_bucket.Object(object_key) response = s3_object.get() return response[S3_RESPONSE_BODY].read() class ConfigBucket: __s3_resource: S3Resource = None def __init__(self) -> None: self.__s3_resource = S3Resource(environments.CONFIG_BUCKET_NAME) def read_check_target_schema_names(self): try: return self.__s3_resource.get_object(environments.CHECK_TARGET_SCHEMA_NAMES_PATH) except ClientError as error: if error.response['Error']['Code'] == 'NoSuchKey': raise exceptions.FileNotFoundException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{error}')