186 lines
7.6 KiB
Python

import gzip
import os
import os.path as path
import shutil
import tempfile
import boto3
from src.system_var import environment
class S3Client:
__s3_client = boto3.client('s3')
_bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0:
return []
contents = response['Contents']
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']}
for content in contents if not content['Key'].endswith('/')]
return objects
def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_client.copy(copy_source, dest_bucket, dest_key)
return
def download_file(self, bucket_name: str, file_key: str, file):
self.__s3_client.download_fileobj(
Bucket=bucket_name,
Key=file_key,
Fileobj=file
)
return
def upload_file(self, local_file_path: str, bucket_name: str, file_key: str):
self.__s3_client.upload_file(
local_file_path,
Bucket=bucket_name,
Key=file_key
)
def delete_file(self, bucket_name: str, file_key: str):
self.__s3_client.delete_object(
Bucket=bucket_name,
Key=file_key
)
class S3Bucket():
_s3_client = S3Client()
_bucket_name: str = None
class UltmarcBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def list_dat_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_dat_file(self, dat_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, dat_filename, f)
f.seek(0)
return temporary_file_path
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
# コピー元のファイルを削除
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class ConfigBucket(S3Bucket):
_bucket_name = environment.JSKULT_CONFIG_BUCKET
def download_holiday_list(self):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME)
holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, holiday_list_key, f)
f.seek(0)
return temporary_file_path
def download_wholesaler_stock_input_day_list(self):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f)
f.seek(0)
return temporary_file_path
def download_ultmarc_hex_convert_config(self):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME)
hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, hex_convert_config_key, f)
f.seek(0)
return temporary_file_path
class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.JSKULT_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket):
_folder = environment.ULTMARC_BACKUP_FOLDER
class VjskBackupBucket(JskUltBackupBucket):
_folder = environment.VJSK_BACKUP_FOLDER
class VjskReceiveBucket(S3Bucket):
_bucket_name = environment.VJSK_DATA_BUCKET
_recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER
_s3_file_list = None
def get_s3_file_list(self):
self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def backup_dat_file(self, target_files: list, datetime_key: str):
jskult_backup_bucket = VjskBackupBucket()
for target_file in target_files:
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "")
backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jskult_backup_bucket._bucket_name, backup_key)
self._s3_client.delete_file(self._bucket_name, backup_from_file_path)
class VjskSendBucket(S3Bucket):
_bucket_name = environment.VJSK_DATA_BUCKET
_send_folder = environment.VJSK_DATA_SEND_FOLDER
def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
# S3バケットにファイルを移動
csv_file_name = f'{self._send_folder}/{vjsk_create_csv}'
s3_client = S3Client()
s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name)
return
def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
vjsk_backup_bucket = VjskBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key)