指摘対応 entrypoint実行済 DB接続の部分は未確認

This commit is contained in:
mori.k 2025-05-27 20:14:03 +09:00
parent e50b827d9f
commit 0086486841
7 changed files with 159 additions and 86 deletions

View File

@ -10,7 +10,8 @@
"request": "launch", "request": "launch",
"program": "entrypoint.py", "program": "entrypoint.py",
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true "justMyCode": true,
"envFile": "${workspaceFolder}/.env"
} }
] ]
} }

View File

@ -14,7 +14,8 @@ class S3Client:
_bucket_name: str _bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str): def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) response = self.__s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0: if response['KeyCount'] == 0:
return [] return []
contents = response['Contents'] contents = response['Contents']
@ -54,6 +55,7 @@ class S3Bucket():
_s3_client = S3Client() _s3_client = S3Client()
_bucket_name: str = None _bucket_name: str = None
class ConfigBucket(S3Bucket): class ConfigBucket(S3Bucket):
# TODO 日付更新処理で内容の修正を行う # TODO 日付更新処理で内容の修正を行う
_bucket_name = environment.JSKULT_CONFIG_BUCKET _bucket_name = environment.JSKULT_CONFIG_BUCKET
@ -61,30 +63,36 @@ class ConfigBucket(S3Bucket):
def download_holiday_list(self): def download_holiday_list(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) temporary_file_path = path.join(
temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME)
holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}' holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, holiday_list_key, f) self._s3_client.download_file(
self._bucket_name, holiday_list_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
def download_wholesaler_stock_input_day_list(self): def download_wholesaler_stock_input_day_list(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) temporary_file_path = path.join(
temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f) self._s3_client.download_file(
self._bucket_name, wholesaler_stock_input_day_list_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
def download_ultmarc_hex_convert_config(self): def download_ultmarc_hex_convert_config(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) temporary_file_path = path.join(
temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME)
hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}' hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, hex_convert_config_key, f) self._s3_client.download_file(
self._bucket_name, hex_convert_config_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
@ -93,19 +101,19 @@ class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.JSKULT_BACKUP_BUCKET _bucket_name = environment.JSKULT_BACKUP_BUCKET
# TODO 設定値をecsタスク定義書から確認
class JskBackupBucket(JskUltBackupBucket): class JskBackupBucket(JskUltBackupBucket):
_folder = environment.VJSK_BACKUP_FOLDER _folder = environment.JSKULT_BACKUP_BUCKET
class JskSendBucket(S3Bucket): class JskSendBucket(S3Bucket):
_bucket_name = environment.JSKULT_DATA_BUCKET _bucket_name = environment.JSK_IO_BUCKET
_send_folder = environment.JSKULT_DATA_SEND_FOLDER _send_folder = environment.JSK_DATA_SEND_FOLDER
def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str): def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str):
# S3バケットにファイルを移動 # S3バケットにファイルを移動
csv_file_name = f'{self._send_folder}/{jskult_create_csv}' csv_file_name = f'{self._send_folder}/{jskult_create_csv}'
self._s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) self._s3_client.upload_file(
csv_file_path, self._bucket_name, csv_file_name)
return return
def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str): def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str):
@ -113,4 +121,5 @@ class JskSendBucket(S3Bucket):
jskult_backup_bucket = JskUltBackupBucket() jskult_backup_bucket = JskUltBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}' dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key, jskult_backup_bucket._bucket_name, backup_key) self._s3_client.copy(self._bucket_name, dat_key,
jskult_backup_bucket._bucket_name, backup_key)

View File

@ -1,64 +1,110 @@
import os
import csv import csv
import os.path as path import os.path as path
import tempfile import tempfile
from src.aws.s3 import JskSendBucket from src.aws.s3 import JskSendBucket
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.batch.common.batch_context import BatchContext from src.batch.common.batch_context import BatchContext
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.db.database import Database
from src.error.exceptions import BatchOperationException, MaxRunCountReachedException
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.system_var import environment
from src.logging.get_logger import get_logger from src.logging.get_logger import get_logger
logger = get_logger('DCF削除新規マスタ作成') logger = get_logger('DCF削除新規マスタ作成')
PROCESS_NAME = os.environ["PROCESS_NAME"]
POST_PROCESS = os.environ["POST_PROCESS"]
MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"]
RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"]
CSV_FILE_NAME = os.environ['CSV_FILE_NAME']
class DcfInstMergeIO(JskultBatchEntrypoint): class DcfInstMergeIO(JskultBatchEntrypoint):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
def execute(self): def execute(self):
jskultHdkeTblManager = JskultHdkeTblManager() jskultBatchRunManager = JskultBatchRunManager(
environment.BATCH_EXECUTION_ID)
if not jskultHdkeTblManager.can_run_process():
return
jskultBatchStatusManager = JskultBatchStatusManager( jskultBatchStatusManager = JskultBatchStatusManager(
PROCESS_NAME, environment.PROCESS_NAME,
POST_PROCESS, environment.POST_PROCESS,
MAX_RUN_COUNT_FLG, environment.MAX_RUN_COUNT_FLG,
RECEIVE_FILE_COUNT environment.RECEIVE_FILE_COUNT
) )
if not jskultBatchStatusManager.can_run_post_process(): try:
jskultHdkeTblManager = JskultHdkeTblManager()
# 処理ステータスを「処理待」に設定 if not jskultHdkeTblManager.can_run_process():
jskultBatchStatusManager.set_process_status("retry") logger.error(
return '日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
return
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行 jskultBatchStatusManager.set_process_status("start")
if jskultBatchStatusManager.is_done_ultmarc_import(): try:
if not jskultBatchStatusManager.can_run_post_process():
# リトライ判断された場合
# 処理ステータスを「処理待」に設定
jskultBatchStatusManager.set_process_status("waiting")
(is_add_dcf_inst_merge, duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self) # バッチ実行管理テーブルに「retry」で登録
jskultBatchRunManager.batch_retry()
return
except MaxRunCountReachedException as e:
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
jskultBatchStatusManager.set_process_status("doing")
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskultBatchStatusManager.is_done_ultmarc_import():
#
(is_add_dcf_inst_merge,
duplication_inst_records) = _insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge: if is_add_dcf_inst_merge:
# COM_施設からDCF削除新規マスタに登録 # COM_施設からDCF削除新規マスタに登録
_output_add_dcf_inst_merge_log(duplication_inst_records) _output_add_dcf_inst_merge_log(duplication_inst_records)
dcf_inst_merge_all_records = _select_dcf_inst_merge_all()
# CSV出力
file_path = _make_csv_data(
dcf_inst_merge_all_records, environment.CSV_FILE_NAME)
# CSV出力 # CSVをS3にアップロード
file_path = _make_csv_data(CSV_FILE_NAME) _upload_dcf_inst_merge_csv_file(
file_path, environment.CSV_FILE_NAME)
# CSVをS3にアップロード # 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
_upload_dcf_inst_merge_csv_file(CSV_FILE_NAME, file_path) logger.info("DCF削除新規マスタ作成処理を正常終了します。")
jskultBatchRunManager.batch_success()
jskultBatchStatusManager.set_process_status("done")
except:
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
logger.error("エラーが発生したため、DCF削除新規マスタ作成処理を終了します。")
jskultBatchRunManager.batch_failed()
jskultBatchStatusManager.set_process_status("failed")
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
self._db.begin()
self._db.to_jst()
sql = """\
SELECT
*
FROM
src07.dcf_inst_merge
"""
dcf_inst_merge_all_records = self._db.execute_select(sql)
return dcf_inst_merge_all_records
except Exception as e:
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]: def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
# com_instからdcf_inst_mergeにinsert # com_instからdcf_inst_mergeにinsert
@ -68,7 +114,7 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
self._db.begin() self._db.begin()
self._db.to_jst() self._db.to_jst()
sql ="""\ sql = """\
SELECT SELECT
ci.DCF_DSF_INST_CD, ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI, ci.FORM_INST_NAME_KANJI,
@ -176,18 +222,18 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
finally: finally:
self._db.disconnect() self._db.disconnect()
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]): def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date'] sys_update_date = duplication_inst_records[0]['sys_update_date']
set_year_month = '{set_year}{set_month}'.format( set_year_month = '{set_year}{set_month}'.format(
set_year=sys_update_date[0:4], set_year=sys_update_date[0:4],
set_month=sys_update_date[-2:] set_month=sys_update_date[-2:]
) )
add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}' add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}'
add_dct_inst_merge_list = [] add_dct_inst_merge_list = []
for row in duplication_inst_records: for row in duplication_inst_records:
add_dct_inst_merge_list.append(add_dct_inst_merge.format(**row)) add_dct_inst_merge_list.append(
add_dct_inst_merge.format(**row))
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list) add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力 # 顧客報告用にログ出力
logger.info( logger.info(
@ -201,33 +247,33 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
) )
return return
def _make_csv_data(csv_file_name: str, record_inst: list):
# CSVファイルを作成する
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, csv_file_name)
def _make_csv_data(record_inst: list, csv_file_name: str): head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH',
# CSVファイルを作成する 'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG',
temporary_dir = tempfile.mkdtemp() 'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE']
csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_INST_CD','DUP_OPP_CD','START_MONTH', with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file:
'INVALID_FLG','REMARKS','DCF_INST_CD_NEW','ENABLED_FLG', # ヘッダ行書き込みくくり文字をつけない為にwriterowではなく、writeを使用しています
'CREATER','CREATE_DATE','UPDATER','UPDATE_DATE'] csv_file.write(f"{','.join(head_str)}\n")
with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file: # Shift-JIS、CRLF、価囲いありで書き込む
# ヘッダ行書き込みくくり文字をつけない為にwriterowではなく、writeを使用しています writer = csv.writer(csv_file, delimiter=',', lineterminator='\n',
csv_file.write(f"{','.join(head_str)}\n") quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL,
strict=True
)
# Shift-JIS、CRLF、価囲いありで書き込む # データ部分書き込み(施設)
writer = csv.writer(csv_file, delimiter=',', lineterminator='\n', for record_inst_data in record_inst:
quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL, record_inst_value = list(record_inst_data.values())
strict=True csv_data = [
) '' if n is None else n for n in record_inst_value]
writer.writerow(csv_data)
# データ部分書き込み(施設) return csv_file_path
for record_inst_data in record_inst:
record_inst_value = list(record_inst_data.values())
csv_data = ['' if n is None else n for n in record_inst_value]
writer.writerow(csv_data)
return csv_file_path
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str): def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, csv_file_path: str):
# S3バケットにファイルを移動 # S3バケットにファイルを移動
@ -235,6 +281,8 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
# バッチ共通設定を取得 # バッチ共通設定を取得
batch_context = BatchContext.get_instance() batch_context = BatchContext.get_instance()
jsk_send_bucket.upload_dcf_inst_merge_csv_file(csv_file_name, csv_file_path) jsk_send_bucket.upload_dcf_inst_merge_csv_file(
jsk_send_bucket.backup_dcf_inst_merge_csv_file(csv_file_name, batch_context.syor_date) csv_file_name, csv_file_path)
jsk_send_bucket.backup_dcf_inst_merge_csv_file(
csv_file_name, batch_context.syor_date)
return return

View File

@ -3,6 +3,6 @@ import abc
class JskultBatchEntrypoint(metaclass=abc.ABCMeta): class JskultBatchEntrypoint(metaclass=abc.ABCMeta):
@abc.abstractmethod() @abc.abstractmethod
def execute(self): def execute(self):
pass pass

View File

@ -8,3 +8,7 @@ class DBException(MeDaCaException):
class BatchOperationException(MeDaCaException): class BatchOperationException(MeDaCaException):
pass pass
class MaxRunCountReachedException(MeDaCaException):
pass

View File

@ -6,13 +6,24 @@ DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME'] DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD'] DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA'] DB_SCHEMA = os.environ['DB_SCHEMA']
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
# 処理名 BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID']
POST_PROCESS = os.environ["POST_PROCESS"]
MAX_RUN_COUNT_FLG = os.environ["MAX_RUN_COUNT_FLG"]
RECEIVE_FILE_COUNT = os.environ["RECEIVE_FILE_COUNT"]
CSV_FILE_NAME = os.environ['CSV_FILE_NAME']
PROCESS_NAME = os.environ['PROCESS_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME']
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET']
JSK_DATA_SEND_FOLDER = os.environ['JSK_DATA_SEND_FOLDER']
# 初期値がある環境変数 # 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) DB_CONNECTION_MAX_RETRY_ATTEMPT = int(
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) DB_CONNECTION_RETRY_INTERVAL_INIT = int(
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))

0
ecs/jskult-batch/test.py Normal file
View File