feat: 転送データリストを取得する部分を修正。
This commit is contained in:
parent
d243768e58
commit
95ce00a122
@ -18,6 +18,8 @@ VJSK_DATA_BUCKET=*************
|
|||||||
JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME=jskult_wholesaler_stock_input_day_list.txt
|
JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME=jskult_wholesaler_stock_input_day_list.txt
|
||||||
JSKULT_CONFIG_CONVERT_FOLDER=jskult/convert
|
JSKULT_CONFIG_CONVERT_FOLDER=jskult/convert
|
||||||
JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME=ultmarc_hex_convert_config.json
|
JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME=ultmarc_hex_convert_config.json
|
||||||
|
TRANSFER_RESULT_FOLDER=transfer_result
|
||||||
|
TRANSFER_RESULT_FILE_NAME=transfer_result.json
|
||||||
# 連携データ抽出期間
|
# 連携データ抽出期間
|
||||||
SALES_LAUNDERING_EXTRACT_DATE_PERIOD=0
|
SALES_LAUNDERING_EXTRACT_DATE_PERIOD=0
|
||||||
# 洗替対象テーブル名
|
# 洗替対象テーブル名
|
||||||
|
|||||||
@ -1,11 +1,7 @@
|
|||||||
import gzip
|
|
||||||
import os
|
|
||||||
import os.path as path
|
import os.path as path
|
||||||
import shutil
|
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
|
|
||||||
from src.system_var import environment
|
from src.system_var import environment
|
||||||
|
|
||||||
|
|
||||||
@ -77,7 +73,9 @@ class ConfigBucket(S3Bucket):
|
|||||||
temporary_dir = tempfile.mkdtemp()
|
temporary_dir = tempfile.mkdtemp()
|
||||||
temporary_file_path = path.join(
|
temporary_file_path = path.join(
|
||||||
temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
|
temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
|
||||||
wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
|
wholesaler_stock_input_day_list_key = \
|
||||||
|
f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
|
||||||
|
|
||||||
with open(temporary_file_path, mode='wb') as f:
|
with open(temporary_file_path, mode='wb') as f:
|
||||||
self._s3_client.download_file(
|
self._s3_client.download_file(
|
||||||
self._bucket_name, wholesaler_stock_input_day_list_key, f)
|
self._bucket_name, wholesaler_stock_input_day_list_key, f)
|
||||||
@ -105,6 +103,23 @@ class JskBackupBucket(JskUltBackupBucket):
|
|||||||
_folder = environment.JSKULT_BACKUP_BUCKET
|
_folder = environment.JSKULT_BACKUP_BUCKET
|
||||||
|
|
||||||
|
|
||||||
|
class JskTransferListBucket(JskUltBackupBucket):
|
||||||
|
_folder = environment.TRANSFER_RESULT_FOLDER
|
||||||
|
|
||||||
|
def download_transfer_result_file(self, process_date_yyyymmdd: str):
|
||||||
|
file_name = environment.TRANSFER_RESULT_FILE_NAME
|
||||||
|
# 一時ファイルとして保存する
|
||||||
|
temporary_dir = tempfile.mkdtemp()
|
||||||
|
temporary_file_path = path.join(
|
||||||
|
temporary_dir, file_name)
|
||||||
|
holiday_list_key = f'{self._folder}/{process_date_yyyymmdd}/{file_name}'
|
||||||
|
with open(temporary_file_path, mode='wb') as f:
|
||||||
|
self._s3_client.download_file(
|
||||||
|
self._bucket_name, holiday_list_key, f)
|
||||||
|
f.seek(0)
|
||||||
|
return temporary_file_path
|
||||||
|
|
||||||
|
|
||||||
class JskSendBucket(S3Bucket):
|
class JskSendBucket(S3Bucket):
|
||||||
_bucket_name = environment.JSK_IO_BUCKET
|
_bucket_name = environment.JSK_IO_BUCKET
|
||||||
_send_folder = environment.JSK_DATA_SEND_FOLDER
|
_send_folder = environment.JSK_DATA_SEND_FOLDER
|
||||||
|
|||||||
@ -1,16 +1,18 @@
|
|||||||
import csv
|
import csv
|
||||||
|
import json
|
||||||
import os.path as path
|
import os.path as path
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from src.aws.s3 import S3Client, JskSendBucket
|
from src.aws.s3 import JskSendBucket, JskTransferListBucket
|
||||||
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
|
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
|
||||||
from src.db.database import Database
|
from src.db.database import Database
|
||||||
from src.error.exceptions import BatchOperationException, MaxRunCountReachedException
|
from src.error.exceptions import (BatchOperationException,
|
||||||
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
|
MaxRunCountReachedException)
|
||||||
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
|
|
||||||
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
|
|
||||||
from src.system_var import environment
|
|
||||||
from src.logging.get_logger import get_logger
|
from src.logging.get_logger import get_logger
|
||||||
|
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
|
||||||
|
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
|
||||||
|
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
|
||||||
|
from src.system_var import environment
|
||||||
|
|
||||||
logger = get_logger('DCF削除新規マスタ作成')
|
logger = get_logger('DCF削除新規マスタ作成')
|
||||||
|
|
||||||
@ -20,80 +22,92 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
|
|||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
jskultBatchRunManager = JskultBatchRunManager(
|
jskult_hdke_tbl_manager = JskultHdkeTblManager()
|
||||||
|
jskult_batch_run_manager = JskultBatchRunManager(
|
||||||
environment.BATCH_EXECUTION_ID)
|
environment.BATCH_EXECUTION_ID)
|
||||||
jskultHdkeTblManager = JskultHdkeTblManager()
|
if not jskult_hdke_tbl_manager.can_run_process():
|
||||||
|
logger.error(
|
||||||
|
'日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
|
||||||
|
# バッチ実行管理テーブルをfailedで登録
|
||||||
|
jskult_batch_run_manager.batch_failed()
|
||||||
|
return
|
||||||
|
|
||||||
# /transfer_result/yyyy/mm/dd/
|
# 業務日付を取得
|
||||||
jskult_backuo_folder_name = f"""/transfer_result/{jskultHdkeTblManager.get_batch_statuses()[2]}"""
|
_, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses()
|
||||||
|
|
||||||
receive_file_count = S3Client.list_objects(
|
# 転送ファイル一覧を取得し、転送件数を取得
|
||||||
environment.JSKULT_BACKUP_BUCKET, jskult_backuo_folder_name).count()
|
try:
|
||||||
|
transfer_list_bucket = JskTransferListBucket()
|
||||||
|
transfer_list_file_path = transfer_list_bucket.download_transfer_result_file(
|
||||||
|
process_date)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
|
||||||
|
# バッチ実行管理テーブルをfailedで登録
|
||||||
|
jskult_batch_run_manager.batch_failed()
|
||||||
|
|
||||||
jskultBatchStatusManager = JskultBatchStatusManager(
|
with open(transfer_list_file_path) as f:
|
||||||
|
transfer_list = json.load(f)
|
||||||
|
|
||||||
|
# 実消化データ + アルトマークデータの転送件数を合算し、受信ファイル件数とする
|
||||||
|
receive_file_count = len(
|
||||||
|
transfer_list['jsk_transfer_list']) + len(transfer_list['ult_transfer_list'])
|
||||||
|
|
||||||
|
jskult_batch_status_manager = JskultBatchStatusManager(
|
||||||
environment.PROCESS_NAME,
|
environment.PROCESS_NAME,
|
||||||
|
|
||||||
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
|
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
|
||||||
environment.POST_PROCESS,
|
'post_process',
|
||||||
environment.MAX_RUN_COUNT,
|
environment.MAX_RUN_COUNT,
|
||||||
receive_file_count
|
receive_file_count
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not jskultHdkeTblManager.can_run_process():
|
|
||||||
logger.error(
|
|
||||||
'日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
|
|
||||||
return
|
|
||||||
|
|
||||||
jskultBatchStatusManager.set_process_status("start")
|
jskult_batch_status_manager.set_process_status("start")
|
||||||
try:
|
try:
|
||||||
if not jskultBatchStatusManager.can_run_post_process():
|
if not jskult_batch_status_manager.can_run_post_process():
|
||||||
# 後続処理の起動条件を満たしていない場合
|
# 後続処理の起動条件を満たしていない場合
|
||||||
# 処理ステータスを「処理待」に設定
|
# 処理ステータスを「処理待」に設定
|
||||||
jskultBatchStatusManager.set_process_status("waiting")
|
jskult_batch_status_manager.set_process_status("waiting")
|
||||||
|
|
||||||
# バッチ実行管理テーブルに「retry」で登録
|
# バッチ実行管理テーブルに「retry」で登録
|
||||||
jskultBatchRunManager.batch_retry()
|
jskult_batch_run_manager.batch_retry()
|
||||||
|
|
||||||
return
|
return
|
||||||
except MaxRunCountReachedException as e:
|
except MaxRunCountReachedException:
|
||||||
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
|
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
|
||||||
|
|
||||||
jskultBatchStatusManager.set_process_status("doing")
|
jskult_batch_status_manager.set_process_status("doing")
|
||||||
|
|
||||||
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
|
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
|
||||||
if jskultBatchStatusManager.is_done_ultmarc_import():
|
if jskult_batch_status_manager.is_done_ultmarc_import():
|
||||||
|
|
||||||
# COM_施設からDCF削除新規マスタに登録
|
# COM_施設からDCF削除新規マスタに登録
|
||||||
(is_add_dcf_inst_merge,
|
(is_add_dcf_inst_merge,
|
||||||
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self)
|
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self)
|
||||||
if is_add_dcf_inst_merge:
|
if is_add_dcf_inst_merge:
|
||||||
|
|
||||||
self._output_add_dcf_inst_merge_log(
|
self._output_add_dcf_inst_merge_log(
|
||||||
duplication_inst_records)
|
duplication_inst_records)
|
||||||
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all()
|
|
||||||
# CSV出力
|
# CSV出力
|
||||||
|
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all()
|
||||||
file_path = self._make_csv_data(
|
file_path = self._make_csv_data(
|
||||||
environment.DCF_INST_MERGE_SEND_FILE_NAME,
|
environment.DCF_INST_MERGE_SEND_FILE_NAME,
|
||||||
dcf_inst_merge_all_records)
|
dcf_inst_merge_all_records)
|
||||||
|
|
||||||
# CSVをS3にアップロード
|
# CSVをS3にアップロード
|
||||||
|
|
||||||
self._upload_dcf_inst_merge_csv_file(
|
self._upload_dcf_inst_merge_csv_file(
|
||||||
file_path, environment.DCF_INST_MERGE_SEND_FILE_NAME)
|
file_path, process_date, environment.DCF_INST_MERGE_SEND_FILE_NAME)
|
||||||
|
|
||||||
# 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
|
# 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
|
||||||
logger.info("DCF削除新規マスタ作成処理を正常終了します。")
|
logger.info("DCF削除新規マスタ作成処理を正常終了します。")
|
||||||
|
jskult_batch_run_manager.batch_success()
|
||||||
|
jskult_batch_status_manager.set_process_status("done")
|
||||||
|
|
||||||
jskultBatchRunManager.batch_success()
|
return
|
||||||
jskultBatchStatusManager.set_process_status("done")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
|
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
|
||||||
logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}')
|
logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}')
|
||||||
|
jskult_batch_run_manager.batch_failed()
|
||||||
jskultBatchRunManager.batch_failed()
|
jskult_batch_status_manager.set_process_status("failed")
|
||||||
jskultBatchStatusManager.set_process_status("failed")
|
|
||||||
|
|
||||||
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
|
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
|
||||||
try:
|
try:
|
||||||
@ -246,6 +260,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
|
|||||||
**********************************************************
|
**********************************************************
|
||||||
合計 {len(duplication_inst_records)}件"""
|
合計 {len(duplication_inst_records)}件"""
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def _make_csv_data(csv_file_name: str, record_inst: list):
|
def _make_csv_data(csv_file_name: str, record_inst: list):
|
||||||
@ -268,18 +283,18 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
|
|||||||
csv_data = [
|
csv_data = [
|
||||||
'' if n is None else n for n in record_inst_value]
|
'' if n is None else n for n in record_inst_value]
|
||||||
writer.writerow(csv_data)
|
writer.writerow(csv_data)
|
||||||
|
|
||||||
return csv_file_path
|
return csv_file_path
|
||||||
|
|
||||||
# CSVファイルをバックアップ
|
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, process_date: str, csv_file_path: str):
|
||||||
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str):
|
|
||||||
# S3バケットにファイルを移動
|
|
||||||
jsk_send_bucket = JskSendBucket()
|
jsk_send_bucket = JskSendBucket()
|
||||||
|
|
||||||
# 処理日を取得
|
# S3バケットにファイルをアップロード
|
||||||
_, _, syor_date = JskultHdkeTblManager.get_batch_statuses()
|
|
||||||
|
|
||||||
jsk_send_bucket.upload_dcf_inst_merge_csv_file(
|
jsk_send_bucket.upload_dcf_inst_merge_csv_file(
|
||||||
csv_file_name, csv_file_path)
|
csv_file_name, csv_file_path)
|
||||||
|
|
||||||
|
# CSVファイルをバックアップ
|
||||||
jsk_send_bucket.backup_dcf_inst_merge_csv_file(
|
jsk_send_bucket.backup_dcf_inst_merge_csv_file(
|
||||||
csv_file_name, syor_date)
|
csv_file_name, process_date)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@ -113,7 +113,7 @@ class JskultHdkeTblManager:
|
|||||||
finally:
|
finally:
|
||||||
self._db.disconnect()
|
self._db.disconnect()
|
||||||
# 日次バッチ処理中ではない場合、後続の処理は行わない
|
# 日次バッチ処理中ではない場合、後続の処理は行わない
|
||||||
if batch_processing_flag != constants.BATCH_ACTF_BATCH_START:
|
if batch_processing_flag != constants.BATCH_ACTF_BATCH_START:
|
||||||
return False
|
return False
|
||||||
# dump取得が正常終了していない場合、後続の処理は行わない
|
# dump取得が正常終了していない場合、後続の処理は行わない
|
||||||
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
|
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
|
||||||
|
|||||||
@ -4,7 +4,7 @@ BATCH_EXIT_CODE_SUCCESS = 0
|
|||||||
# バッチ処理中フラグ:未処理
|
# バッチ処理中フラグ:未処理
|
||||||
BATCH_ACTF_BATCH_UNPROCESSED = '0'
|
BATCH_ACTF_BATCH_UNPROCESSED = '0'
|
||||||
# バッチ処理中フラグ:処理中
|
# バッチ処理中フラグ:処理中
|
||||||
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
|
BATCH_ACTF_BATCH_START = '1'
|
||||||
# dump取得状態区分:未処理
|
# dump取得状態区分:未処理
|
||||||
DUMP_STATUS_KBN_UNPROCESSED = '0'
|
DUMP_STATUS_KBN_UNPROCESSED = '0'
|
||||||
# dump取得状態区分:dump取得正常終了
|
# dump取得状態区分:dump取得正常終了
|
||||||
|
|||||||
@ -8,9 +8,9 @@ DB_PASSWORD = os.environ['DB_PASSWORD']
|
|||||||
DB_SCHEMA = os.environ['DB_SCHEMA']
|
DB_SCHEMA = os.environ['DB_SCHEMA']
|
||||||
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
|
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
|
||||||
BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID']
|
BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID']
|
||||||
POST_PROCESS = os.environ["POST_PROCESS"]
|
MAX_RUN_COUNT = os.environ['MAX_RUN_COUNT']
|
||||||
MAX_RUN_COUNT = os.environ["MAX_RUN_COUNT"]
|
TRANSFER_RESULT_FOLDER = os.environ['TRANSFER_RESULT_FOLDER']
|
||||||
RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"]
|
TRANSFER_RESULT_FILE_NAME = os.environ['TRANSFER_RESULT_FILE_NAME']
|
||||||
DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME']
|
DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME']
|
||||||
PROCESS_NAME = os.environ['PROCESS_NAME']
|
PROCESS_NAME = os.environ['PROCESS_NAME']
|
||||||
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
|
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user