32 lines
1.1 KiB
Python

import boto3
import environments
import exceptions
from botocore.exceptions import ClientError
from constants import AWS_RESOURCE_S3, S3_RESPONSE_BODY
class S3Resource:
def __init__(self, bucket_name: str) -> None:
self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
self.__s3_bucket = self.__s3_resource.Bucket(bucket_name)
def get_object(self, object_key: str):
s3_object = self.__s3_bucket.Object(object_key)
response = s3_object.get()
return response[S3_RESPONSE_BODY].read()
class ConfigBucket:
__s3_resource: S3Resource = None
def __init__(self) -> None:
self.__s3_resource = S3Resource(environments.CONFIG_BUCKET_NAME)
def read_check_target_schema_names(self):
try:
return self.__s3_resource.get_object(environments.CHECK_TARGET_SCHEMA_NAMES_PATH)
except ClientError as error:
if error.response['Error']['Code'] == 'NoSuchKey':
raise exceptions.FileNotFoundException('E-02-01', f'チェック対象スキーマ名ファイルの読み込みに失敗しました エラー内容:{error}')