feat: S3周りの処理を修正。
This commit is contained in:
parent
9215c2a44d
commit
89a57a1e04
@ -1,7 +1,4 @@
|
|||||||
import gzip
|
|
||||||
import os
|
|
||||||
import os.path as path
|
import os.path as path
|
||||||
import shutil
|
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
@ -71,38 +68,13 @@ class UltmarcBucket(S3Bucket):
|
|||||||
f.seek(0)
|
f.seek(0)
|
||||||
return temporary_file_path
|
return temporary_file_path
|
||||||
|
|
||||||
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
|
def delete_dat_file(self, dat_file_key: str):
|
||||||
# バックアップバケットにコピー
|
|
||||||
ultmarc_backup_bucket = UltmarcBackupBucket()
|
|
||||||
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
|
|
||||||
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
|
|
||||||
# コピー元のファイルを削除
|
|
||||||
self._s3_client.delete_file(self._bucket_name, dat_file_key)
|
self._s3_client.delete_file(self._bucket_name, dat_file_key)
|
||||||
|
|
||||||
|
|
||||||
class ConfigBucket(S3Bucket):
|
class ConfigBucket(S3Bucket):
|
||||||
_bucket_name = environment.JSKULT_CONFIG_BUCKET
|
_bucket_name = environment.JSKULT_CONFIG_BUCKET
|
||||||
|
|
||||||
def download_holiday_list(self):
|
|
||||||
# 一時ファイルとして保存する
|
|
||||||
temporary_dir = tempfile.mkdtemp()
|
|
||||||
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME)
|
|
||||||
holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}'
|
|
||||||
with open(temporary_file_path, mode='wb') as f:
|
|
||||||
self._s3_client.download_file(self._bucket_name, holiday_list_key, f)
|
|
||||||
f.seek(0)
|
|
||||||
return temporary_file_path
|
|
||||||
|
|
||||||
def download_wholesaler_stock_input_day_list(self):
|
|
||||||
# 一時ファイルとして保存する
|
|
||||||
temporary_dir = tempfile.mkdtemp()
|
|
||||||
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
|
|
||||||
wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
|
|
||||||
with open(temporary_file_path, mode='wb') as f:
|
|
||||||
self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f)
|
|
||||||
f.seek(0)
|
|
||||||
return temporary_file_path
|
|
||||||
|
|
||||||
def download_ultmarc_hex_convert_config(self):
|
def download_ultmarc_hex_convert_config(self):
|
||||||
# 一時ファイルとして保存する
|
# 一時ファイルとして保存する
|
||||||
temporary_dir = tempfile.mkdtemp()
|
temporary_dir = tempfile.mkdtemp()
|
||||||
@ -118,57 +90,13 @@ class JskUltBackupBucket(S3Bucket):
|
|||||||
_bucket_name = environment.JSKULT_BACKUP_BUCKET
|
_bucket_name = environment.JSKULT_BACKUP_BUCKET
|
||||||
|
|
||||||
|
|
||||||
class UltmarcBackupBucket(JskUltBackupBucket):
|
|
||||||
_folder = environment.ULTMARC_BACKUP_FOLDER
|
|
||||||
|
|
||||||
|
|
||||||
class VjskBackupBucket(JskUltBackupBucket):
|
class VjskBackupBucket(JskUltBackupBucket):
|
||||||
_folder = environment.VJSK_BACKUP_FOLDER
|
_folder = environment.JSK_BACKUP_FOLDER
|
||||||
|
|
||||||
|
|
||||||
class VjskReceiveBucket(S3Bucket):
|
|
||||||
_bucket_name = environment.VJSK_DATA_BUCKET
|
|
||||||
_recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER
|
|
||||||
|
|
||||||
_s3_file_list = None
|
|
||||||
|
|
||||||
def get_s3_file_list(self):
|
|
||||||
self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder)
|
|
||||||
return self._s3_file_list
|
|
||||||
|
|
||||||
def download_data_file(self, data_filename: str):
|
|
||||||
temporary_dir = tempfile.mkdtemp()
|
|
||||||
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
|
|
||||||
with open(temporary_file_path, mode='wb') as f:
|
|
||||||
self._s3_client.download_file(self._bucket_name, data_filename, f)
|
|
||||||
f.seek(0)
|
|
||||||
return temporary_file_path
|
|
||||||
|
|
||||||
def unzip_data_file(self, filename: str):
|
|
||||||
temp_dir = os.path.dirname(filename)
|
|
||||||
decompress_filename = os.path.basename(filename).replace('.gz', '')
|
|
||||||
decompress_file_path = os.path.join(temp_dir, decompress_filename)
|
|
||||||
with gzip.open(filename, 'rb') as gz:
|
|
||||||
with open(decompress_file_path, 'wb') as decompressed_file:
|
|
||||||
shutil.copyfileobj(gz, decompressed_file)
|
|
||||||
|
|
||||||
ret = [decompress_file_path]
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def backup_dat_file(self, target_files: list, datetime_key: str):
|
|
||||||
jskult_backup_bucket = VjskBackupBucket()
|
|
||||||
for target_file in target_files:
|
|
||||||
backup_from_file_path = target_file.get("filename")
|
|
||||||
backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "")
|
|
||||||
backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
|
|
||||||
self._s3_client.copy(self._bucket_name, backup_from_file_path,
|
|
||||||
jskult_backup_bucket._bucket_name, backup_key)
|
|
||||||
self._s3_client.delete_file(self._bucket_name, backup_from_file_path)
|
|
||||||
|
|
||||||
|
|
||||||
class VjskSendBucket(S3Bucket):
|
class VjskSendBucket(S3Bucket):
|
||||||
_bucket_name = environment.VJSK_DATA_BUCKET
|
_bucket_name = environment.JSK_IO_BUCKET
|
||||||
_send_folder = environment.VJSK_DATA_SEND_FOLDER
|
_send_folder = environment.JSK_DATA_SEND_FOLDER
|
||||||
|
|
||||||
def upload_dcf_dsf_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
|
def upload_dcf_dsf_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
|
||||||
# S3バケットにファイルを移動
|
# S3バケットにファイルを移動
|
||||||
@ -177,7 +105,7 @@ class VjskSendBucket(S3Bucket):
|
|||||||
s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name)
|
s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name)
|
||||||
return
|
return
|
||||||
|
|
||||||
def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str):
|
def backup_dcf_dsf_csv_file(self, dat_file_key: str, datetime_key: str):
|
||||||
# バックアップバケットにコピー
|
# バックアップバケットにコピー
|
||||||
vjsk_backup_bucket = VjskBackupBucket()
|
vjsk_backup_bucket = VjskBackupBucket()
|
||||||
dat_key = f'{self._send_folder}/{dat_file_key}'
|
dat_key = f'{self._send_folder}/{dat_file_key}'
|
||||||
|
|||||||
@ -3,7 +3,6 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
from src.aws.s3 import ConfigBucket, UltmarcBucket
|
from src.aws.s3 import ConfigBucket, UltmarcBucket
|
||||||
from src.batch.common.batch_context import BatchContext
|
|
||||||
from src.batch.ultmarc.datfile import DatFile
|
from src.batch.ultmarc.datfile import DatFile
|
||||||
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
|
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
|
||||||
UltmarcTableMapperFactory
|
UltmarcTableMapperFactory
|
||||||
@ -14,7 +13,6 @@ from src.logging.get_logger import get_logger
|
|||||||
logger = get_logger('アルトマーク取込')
|
logger = get_logger('アルトマーク取込')
|
||||||
ultmarc_bucket = UltmarcBucket()
|
ultmarc_bucket = UltmarcBucket()
|
||||||
config_bucket = ConfigBucket()
|
config_bucket = ConfigBucket()
|
||||||
batch_context = BatchContext.get_instance()
|
|
||||||
|
|
||||||
|
|
||||||
def exec_import():
|
def exec_import():
|
||||||
@ -50,11 +48,8 @@ def exec_import():
|
|||||||
dat_file = DatFile.from_path(converted_file_path)
|
dat_file = DatFile.from_path(converted_file_path)
|
||||||
# アルトマーク取り込み実行
|
# アルトマーク取り込み実行
|
||||||
_import_to_ultmarc_table(dat_file)
|
_import_to_ultmarc_table(dat_file)
|
||||||
# 処理後ファイルをバックアップ
|
# 取込完了後、S3からファイルを削除
|
||||||
ultmarc_bucket.backup_dat_file(dat_file_name, batch_context.syor_date)
|
ultmarc_bucket.delete_dat_file(dat_file_name)
|
||||||
logger.info(f'取り込み処理が完了したため、datファイルをバックアップ。ファイル名={dat_file_name}')
|
|
||||||
# アルトマーク取込済をマーク
|
|
||||||
batch_context.is_ultmarc_imported = True
|
|
||||||
logger.info('アルトマーク取込処理: 終了')
|
logger.info('アルトマーク取込処理: 終了')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BatchOperationException(e)
|
raise BatchOperationException(e)
|
||||||
|
|||||||
@ -37,7 +37,7 @@ def exec():
|
|||||||
jsk_bucket.upload_dcf_dsf_csv_file(csv_file_name, csv_file_path)
|
jsk_bucket.upload_dcf_dsf_csv_file(csv_file_name, csv_file_path)
|
||||||
|
|
||||||
# 連携ファイルをバックアップ
|
# 連携ファイルをバックアップ
|
||||||
jsk_bucket.backup_inst_pharm_csv_file(csv_file_name, batch_context.syor_date)
|
jsk_bucket.backup_dcf_dsf_csv_file(csv_file_name, batch_context.syor_date)
|
||||||
|
|
||||||
csv_count = len(record_dcf) + len(record_dsf)
|
csv_count = len(record_dcf) + len(record_dsf)
|
||||||
logger.info(f'CSV出力件数: {csv_count}')
|
logger.info(f'CSV出力件数: {csv_count}')
|
||||||
|
|||||||
@ -4,7 +4,7 @@ BATCH_EXIT_CODE_SUCCESS = 0
|
|||||||
# バッチ処理中フラグ:未処理
|
# バッチ処理中フラグ:未処理
|
||||||
BATCH_ACTF_BATCH_UNPROCESSED = '0'
|
BATCH_ACTF_BATCH_UNPROCESSED = '0'
|
||||||
# バッチ処理中フラグ:処理中
|
# バッチ処理中フラグ:処理中
|
||||||
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
|
BATCH_ACTF_BATCH_START = '1'
|
||||||
# dump取得状態区分:未処理
|
# dump取得状態区分:未処理
|
||||||
DUMP_STATUS_KBN_UNPROCESSED = '0'
|
DUMP_STATUS_KBN_UNPROCESSED = '0'
|
||||||
# dump取得状態区分:dump取得正常終了
|
# dump取得状態区分:dump取得正常終了
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user