refactor: ファイル情報をdictで取り回していたが、意味ないためstrで取り回すように修正。不要なメソッドを削除。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-06-06 15:52:27 +09:00
parent a3be8d39e3
commit f05ec64f41

View File

@ -1,11 +1,7 @@
import gzip
import json
import os
import os.path as path
import shutil
import tempfile
import boto3
from src.system_var import environment
@ -20,7 +16,7 @@ class S3Client:
return []
contents = response['Contents']
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']}
objects = [content['Key']
for content in contents if not content['Key'].endswith('/')]
return objects
@ -60,46 +56,26 @@ class JskIOBucket(S3Bucket):
self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(
temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def transfer_file_to_import(self, target_file: dict):
def transfer_file_to_import(self, target_file: str):
data_import_bucket = DataImportBucket()
transfer_from_file_path = target_file.get("filename")
transfer_from_file_path = target_file
transfer_to_filename = transfer_from_file_path.replace(
f"{self._recv_folder}/", "")
data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}'
self._s3_client.copy(self._bucket_name, transfer_from_file_path,
data_import_bucket._bucket_name, data_import_key)
def backup_file(self, target_file: dict, datetime_key: str):
def backup_file(self, target_file: str, datetime_key: str):
jsk_backup_bucket = JskBackupBucket()
backup_from_file_path = target_file.get("filename")
backup_from_file_path = target_file
backup_to_filename = backup_from_file_path.replace(
f"{self._recv_folder}/", "")
backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jsk_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename")
def delete_file(self, target_file: str):
delete_path = target_file
self._s3_client.delete_file(
self._bucket_name, delete_path)
@ -116,16 +92,16 @@ class UltmarcBucket(S3Bucket):
def get_file_list(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def backup_file(self, target_file: dict, datetime_key: str):
def backup_file(self, target_file: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
target_file_path = target_file.get("filename")
target_file_path = target_file
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{target_file_path.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, target_file_path,
ultmarc_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename")
def delete_file(self, target_file: str):
delete_path = target_file
self._s3_client.delete_file(
self._bucket_name, delete_path)
@ -134,8 +110,8 @@ class UltmarcImportBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_IMPORT_FOLDER
def transfer_file_to_import(self, target_file: dict):
from_file_path = target_file.get("filename")
def transfer_file_to_import(self, target_file: str):
from_file_path = target_file
to_filename = from_file_path.replace(
f"{UltmarcBucket()._folder}/", "")
data_import_key = f'{self._folder}/{to_filename}'