Merge pull request #483 feature-NEWDWH2021-1863 into develop

This commit is contained in:
下田雅人 2025-05-28 14:56:42 +09:00
commit 7cff756d5a
11 changed files with 397 additions and 122 deletions

View File

@ -1,26 +1,24 @@
DB_HOST=************ DB_HOST******************
DB_PORT=************ DB_PORT=*****************
DB_USERNAME=************ DB_USERNAME=*************
DB_PASSWORD=************ DB_PASSWORD=*************
DB_SCHEMA=src05 DB_SCHEMA=src05
JSK_IO_BUCKET=mbj-newdwh2021-staging-jskult-io
JSKULT_BACKUP_BUCKET=mbj-newdwh2021-staging-backup-jskult
BATCH_MANAGE_DYNAMODB_TABLE_NAME=mbj-newdwh2021-staging-jskult-batch-run-manage
BATCH_EXECUTION_ID=localtest
MAX_RUN_COUNT=3
LOG_LEVEL=INFO LOG_LEVEL=INFO
ULTMARC_DATA_BUCKET=**************** PROCESS_NAME=jskult-batch-dcf-inst-merge-io
ULTMARC_DATA_FOLDER=recv JSK_DATA_SEND_FOLDER=send
JSKULT_BACKUP_BUCKET=**************** JSK_BACKUP_FOLDER=jsk/send
ULTMARC_BACKUP_FOLDER=ultmarc TRANSFER_RESULT_FOLDER=transfer_result
VJSK_BACKUP_FOLDER=vjsk TRANSFER_RESULT_FILE_NAME=transfer_result.json
JSKULT_CONFIG_BUCKET=********************** DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv
JSKULT_CONFIG_CALENDAR_FOLDER=jskult/calendar JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config
JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME=jskult_holiday_list.txt
VJSK_DATA_SEND_FOLDER=send # DB接続リトライ設定
VJSK_DATA_RECEIVE_FOLDER=recv DB_CONNECTION_MAX_RETRY_ATTEMPT=1
VJSK_DATA_BUCKET=************* DB_CONNECTION_RETRY_INTERVAL_INIT=1
JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME=jskult_wholesaler_stock_input_day_list.txt DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1
JSKULT_CONFIG_CONVERT_FOLDER=jskult/convert DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=1
JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME=ultmarc_hex_convert_config.json
# 連携データ抽出期間
SALES_LAUNDERING_EXTRACT_DATE_PERIOD=0
# 洗替対象テーブル名
SALES_LAUNDERING_TARGET_TABLE_NAME=src05.sales_lau
# 卸実績洗替で作成するデータの期間(年単位)
SALES_LAUNDERING_TARGET_YEAR_OFFSET=5

View File

@ -10,7 +10,8 @@
"request": "launch", "request": "launch",
"program": "entrypoint.py", "program": "entrypoint.py",
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true "justMyCode": true,
"envFile": "${workspaceFolder}/.env"
} }
] ]
} }

View File

@ -1,11 +1,7 @@
import gzip
import os
import os.path as path import os.path as path
import shutil
import tempfile import tempfile
import boto3 import boto3
from src.system_var import environment from src.system_var import environment
@ -14,7 +10,8 @@ class S3Client:
_bucket_name: str _bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str): def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) response = self.__s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0: if response['KeyCount'] == 0:
return [] return []
contents = response['Contents'] contents = response['Contents']
@ -55,61 +52,45 @@ class S3Bucket():
_bucket_name: str = None _bucket_name: str = None
class UltmarcBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def list_dat_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_dat_file(self, dat_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{dat_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, dat_filename, f)
f.seek(0)
return temporary_file_path
def backup_dat_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{dat_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_file_key, ultmarc_backup_bucket._bucket_name, backup_key)
# コピー元のファイルを削除
self._s3_client.delete_file(self._bucket_name, dat_file_key)
class ConfigBucket(S3Bucket): class ConfigBucket(S3Bucket):
# TODO 日付更新処理で内容の修正を行う
_bucket_name = environment.JSKULT_CONFIG_BUCKET _bucket_name = environment.JSKULT_CONFIG_BUCKET
def download_holiday_list(self): def download_holiday_list(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME) temporary_file_path = path.join(
temporary_dir, environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME)
holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}' holiday_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_HOLIDAY_LIST_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, holiday_list_key, f) self._s3_client.download_file(
self._bucket_name, holiday_list_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
def download_wholesaler_stock_input_day_list(self): def download_wholesaler_stock_input_day_list(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME) temporary_file_path = path.join(
wholesaler_stock_input_day_list_key = f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}' temporary_dir, environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME)
wholesaler_stock_input_day_list_key = \
f'{environment.JSKULT_CONFIG_CALENDAR_FOLDER}/{environment.JSKULT_CONFIG_CALENDAR_WHOLESALER_STOCK_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, wholesaler_stock_input_day_list_key, f) self._s3_client.download_file(
self._bucket_name, wholesaler_stock_input_day_list_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
def download_ultmarc_hex_convert_config(self): def download_ultmarc_hex_convert_config(self):
# 一時ファイルとして保存する # 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME) temporary_file_path = path.join(
temporary_dir, environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME)
hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}' hex_convert_config_key = f'{environment.JSKULT_CONFIG_CONVERT_FOLDER}/{environment.JSKULT_ULTMARC_HEX_CONVERT_CONFIG_FILE_NAME}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, hex_convert_config_key, f) self._s3_client.download_file(
self._bucket_name, hex_convert_config_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
@ -118,68 +99,42 @@ class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.JSKULT_BACKUP_BUCKET _bucket_name = environment.JSKULT_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket): class JskBackupBucket(JskUltBackupBucket):
_folder = environment.ULTMARC_BACKUP_FOLDER _folder = environment.JSKULT_BACKUP_BUCKET
class VjskBackupBucket(JskUltBackupBucket): class JskTransferListBucket(JskUltBackupBucket):
_folder = environment.VJSK_BACKUP_FOLDER _folder = environment.TRANSFER_RESULT_FOLDER
def download_transfer_result_file(self, process_date_yyyymmdd: str):
class VjskReceiveBucket(S3Bucket): file_name = environment.TRANSFER_RESULT_FILE_NAME
_bucket_name = environment.VJSK_DATA_BUCKET # 一時ファイルとして保存する
_recv_folder = environment.VJSK_DATA_RECEIVE_FOLDER
_s3_file_list = None
def get_s3_file_list(self):
self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') temporary_file_path = path.join(
temporary_dir, file_name)
holiday_list_key = f'{self._folder}/{process_date_yyyymmdd}/{file_name}'
with open(temporary_file_path, mode='wb') as f: with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f) self._s3_client.download_file(
self._bucket_name, holiday_list_key, f)
f.seek(0) f.seek(0)
return temporary_file_path return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path] class JskSendBucket(S3Bucket):
return ret _bucket_name = environment.JSK_IO_BUCKET
_send_folder = environment.JSK_DATA_SEND_FOLDER
def backup_dat_file(self, target_files: list, datetime_key: str): def upload_dcf_inst_merge_csv_file(self, jskult_create_csv: str, csv_file_path: str):
jskult_backup_bucket = VjskBackupBucket()
for target_file in target_files:
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(f"{self._recv_folder}/", "")
backup_key = f'{jskult_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jskult_backup_bucket._bucket_name, backup_key)
self._s3_client.delete_file(self._bucket_name, backup_from_file_path)
class VjskSendBucket(S3Bucket):
_bucket_name = environment.VJSK_DATA_BUCKET
_send_folder = environment.VJSK_DATA_SEND_FOLDER
def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
# S3バケットにファイルを移動 # S3バケットにファイルを移動
csv_file_name = f'{self._send_folder}/{vjsk_create_csv}' csv_file_name = f'{self._send_folder}/{jskult_create_csv}'
s3_client = S3Client() self._s3_client.upload_file(
s3_client.upload_file(csv_file_path, self._bucket_name, csv_file_name) csv_file_path, self._bucket_name, csv_file_name)
return return
def backup_inst_pharm_csv_file(self, dat_file_key: str, datetime_key: str): def backup_dcf_inst_merge_csv_file(self, dat_file_key: str, datetime_key: str):
# バックアップバケットにコピー # バックアップバケットにコピー
vjsk_backup_bucket = VjskBackupBucket() jskult_backup_bucket = JskUltBackupBucket()
dat_key = f'{self._send_folder}/{dat_file_key}' dat_key = f'{self._send_folder}/{dat_file_key}'
backup_key = f'{vjsk_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}' backup_key = f'{jskult_backup_bucket._folder}/{self._send_folder}/{datetime_key}/{dat_file_key.replace(f"{self._send_folder}/", "")}'
self._s3_client.copy(self._bucket_name, dat_key, vjsk_backup_bucket._bucket_name, backup_key) self._s3_client.copy(self._bucket_name, dat_key,
jskult_backup_bucket._bucket_name, backup_key)

View File

@ -1,4 +1,20 @@
import csv
import json
import os.path as path
import tempfile
from src.aws.s3 import JskSendBucket, JskTransferListBucket
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.db.database import Database
from src.error.exceptions import (BatchOperationException,
MaxRunCountReachedException)
from src.logging.get_logger import get_logger
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.system_var import environment
logger = get_logger('DCF削除新規マスタ作成')
class DcfInstMergeIO(JskultBatchEntrypoint): class DcfInstMergeIO(JskultBatchEntrypoint):
@ -6,5 +22,279 @@ class DcfInstMergeIO(JskultBatchEntrypoint):
super().__init__() super().__init__()
def execute(self): def execute(self):
# TODO: ここでDCF削除新規マスタ作成/データ出力処理を実行する jskult_hdke_tbl_manager = JskultHdkeTblManager()
pass jskult_batch_run_manager = JskultBatchRunManager(
environment.BATCH_EXECUTION_ID)
if not jskult_hdke_tbl_manager.can_run_process():
logger.error(
'日次バッチ処理中またはdump取得が正常終了していないため、DCF削除新規マスタ作成を終了します。')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
return
# 業務日付を取得
_, _, process_date = jskult_hdke_tbl_manager.get_batch_statuses()
# 転送ファイル一覧を取得し、転送件数を取得
try:
transfer_list_bucket = JskTransferListBucket()
transfer_list_file_path = transfer_list_bucket.download_transfer_result_file(
process_date)
except Exception as e:
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
with open(transfer_list_file_path) as f:
transfer_list = json.load(f)
# 実消化データ + アルトマークデータの転送件数を合算し、受信ファイル件数とする
receive_file_count = len(
transfer_list['jsk_transfer_list']) + len(transfer_list['ult_transfer_list'])
jskult_batch_status_manager = JskultBatchStatusManager(
environment.PROCESS_NAME,
# TODO チケットNEWDWH2021-1847の実装で作成した定数に置き換え
'post_process',
environment.MAX_RUN_COUNT,
receive_file_count
)
try:
jskult_batch_status_manager.set_process_status("start")
try:
if not jskult_batch_status_manager.can_run_post_process():
# 後続処理の起動条件を満たしていない場合
# 処理ステータスを「処理待」に設定
jskult_batch_status_manager.set_process_status("waiting")
# バッチ実行管理テーブルに「retry」で登録
jskult_batch_run_manager.batch_retry()
return
except MaxRunCountReachedException:
logger.info('最大起動回数に到達したため、DCF削除新規マスタ作成処理を実行します。')
jskult_batch_status_manager.set_process_status("doing")
# アルトマーク取込が実行されていた場合にDCF施設削除新規マスタの作成処理を実行
if jskult_batch_status_manager.is_done_ultmarc_import():
# COM_施設からDCF削除新規マスタに登録
(is_add_dcf_inst_merge,
duplication_inst_records) = self._insert_dcf_inst_merge_from_com_inst(self)
if is_add_dcf_inst_merge:
self._output_add_dcf_inst_merge_log(
duplication_inst_records)
# CSV出力
dcf_inst_merge_all_records = self._select_dcf_inst_merge_all()
file_path = self._make_csv_data(
environment.DCF_INST_MERGE_SEND_FILE_NAME,
dcf_inst_merge_all_records)
# CSVをS3にアップロード
self._upload_dcf_inst_merge_csv_file(
file_path, process_date, environment.DCF_INST_MERGE_SEND_FILE_NAME)
# 処理が全て正常終了した際に、バッチ実行管理テーブルに「success」で登録
logger.info("DCF削除新規マスタ作成処理を正常終了します。")
jskult_batch_run_manager.batch_success()
jskult_batch_status_manager.set_process_status("done")
return
except Exception as e:
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
logger.exception(f'予期せぬエラーが発生したため、DCF削除新規マスタ作成処理を終了します。{e}')
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status("failed")
def _select_dcf_inst_merge_all(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
sql = """\
SELECT
*
FROM
src07.dcf_inst_merge
"""
dcf_inst_merge_all_records = self._db.execute_select(sql)
return dcf_inst_merge_all_records
except Exception as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
# com_instからdcf_inst_mergeにinsert
def _insert_dcf_inst_merge_from_com_inst(self) -> tuple[bool, list[dict]]:
try:
self._db = Database.get_instance()
self._db.connect()
self._db.begin()
self._db.to_jst()
sql = """\
SELECT
ci.DCF_DSF_INST_CD,
ci.FORM_INST_NAME_KANJI,
ci.DELETE_SCHE_REASON_CD,
ci.DUP_OPP_CD,
ci.SYS_UPDATE_DATE
FROM
src05.COM_INST AS ci
WHERE
ci.DUP_OPP_CD IS NOT NULL
AND
ci.DELETE_SCHE_REASON_CD = 'D'
AND
ci.DELETE_DATA IS NULL
AND
ci.SYS_UPDATE_DATE BETWEEN src07.get_syor_date() AND NOW()
AND
NOT EXISTS (
SELECT
dim.DCF_INST_CD
FROM
src07.DCF_INST_MERGE AS dim
WHERE
dim.DCF_INST_CD = ci.DCF_DSF_INST_CD
)
AND
(ci.DCF_DSF_INST_CD EXISTS(
SELECT
mia.INST_CD
FROM
src07.MST_INST_ASSN as mia
WHERE
mia.INST_CD = ci.DCF_DSF_INST_CD
)
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
ap.PRSB_INST_CD
FROM
src07.ATC_PHARM AS ap
WHERE
ap.PRSB_INST_CD = ci.DCF_DSF_INST_CD
)
OR ci.DCF_DSF_INST_CD EXISTS(
SELECT
trd.INST_CD
FROM
src07.TRN_RESULT_DATA AS trd
WHERE
trd.INST_CD = ci.DCF_DSF_INST_CD
)
)
;
"""
duplication_inst_records = self._db.execute_select(sql)
# DCF削除新規マスタ取り込み
values_clauses = []
params = {}
for clauses_no, row in enumerate(duplication_inst_records, start=1):
dcf_inst_cd_arr = f"DCF_INST_CD{clauses_no}"
dup_opp_cd_arr = f"DUP_OPP_CD{clauses_no}"
values_clause = f"""(:{dcf_inst_cd_arr},
:{dup_opp_cd_arr},
DATE_FORMAT((src07.get_syor_date() + INTERVAL 1 MONTH),
NULL,
NULL,
NULL,
"Y",
batchuser,
SYSDATE(),
batchuser,
SYSDATE()
)"""
values_clauses.append(values_clause)
params[dcf_inst_cd_arr] = row['DCF_DSF_INST_CD']
params[dup_opp_cd_arr] = row['DUP_OPP_CD']
insert_sql = f"""
INSERT INTO
src07.dcf_inst_merge (
DCF_INST_CD,
DUP_OPP_CD,
START_MONTH,
INVALID_FLG,
REMARKS,
DCF_INST_CD_NEW,
ENABLED_FLG,
CREATER,
CREATE_DATE,
UPDATER,
UPDATE_DATE
)
VALUES
{','.join(values_clauses)}
"""
self._db.execute(insert_sql, params)
return (True, duplication_inst_records)
except Exception as e:
self._db.rollback()
raise BatchOperationException(e)
finally:
self._db.disconnect()
def _output_add_dcf_inst_merge_log(duplication_inst_records: list[dict]):
sys_update_date = duplication_inst_records[0]['sys_update_date']
set_year_month = '{set_year}{set_month}'.format(
set_year=sys_update_date[0:4],
set_month=sys_update_date[-2:]
)
add_dct_inst_merge = 'DCF施設コード {dcf_dsf_inst_cd} {form_inst_name_kanji},  重複時相手先コード {dup_opp_cd} {dup_inst_name_kanji}'
add_dct_inst_merge_list = []
for row in duplication_inst_records:
add_dct_inst_merge_list.append(
add_dct_inst_merge.format(**row))
add_dct_inst_merge_list = '\n'.join(add_dct_inst_merge_list)
# 顧客報告用にログ出力
logger.info(
f"""DCF施設統合マスタが追加されました。
**********************************************************
適用月度 {set_year_month}
**********************************************************
{add_dct_inst_merge_list}
**********************************************************
合計 {len(duplication_inst_records)}"""
)
return
def _make_csv_data(csv_file_name: str, record_inst: list):
temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_INST_CD', 'DUP_OPP_CD', 'START_MONTH',
'INVALID_FLG', 'REMARKS', 'DCF_INST_CD_NEW', 'ENABLED_FLG',
'CREATER', 'CREATE_DATE', 'UPDATER', 'UPDATE_DATE']
with open(csv_file_path, mode='w', encoding='UTF-8') as csv_file:
# ヘッダ行書き込みくくり文字をつけない為にwriterowではなく、writeを使用しています
csv_file.write(f"{','.join(head_str)}\n")
# UTF-8、CRLF、価囲いありで書き込む
writer = csv.writer(csv_file, delimiter=',', lineterminator='\r\n',
quotechar='"', doublequote=True, quoting=csv.QUOTE_ALL,
strict=True
)
# データ部分書き込み(施設)
for record_inst_data in record_inst:
record_inst_value = list(record_inst_data.values())
csv_data = [
'' if n is None else n for n in record_inst_value]
writer.writerow(csv_data)
return csv_file_path
def _upload_dcf_inst_merge_csv_file(self, csv_file_name: str, process_date: str, csv_file_path: str):
jsk_send_bucket = JskSendBucket()
# S3バケットにファイルをアップロード
jsk_send_bucket.upload_dcf_inst_merge_csv_file(
csv_file_name, csv_file_path)
# CSVファイルをバックアップ
jsk_send_bucket.backup_dcf_inst_merge_csv_file(
csv_file_name, process_date)
return

View File

@ -3,6 +3,6 @@ import abc
class JskultBatchEntrypoint(metaclass=abc.ABCMeta): class JskultBatchEntrypoint(metaclass=abc.ABCMeta):
@abc.abstractmethod() @abc.abstractmethod
def execute(self): def execute(self):
pass pass

View File

@ -8,3 +8,7 @@ class DBException(MeDaCaException):
class BatchOperationException(MeDaCaException): class BatchOperationException(MeDaCaException):
pass pass
class MaxRunCountReachedException(MeDaCaException):
pass

View File

@ -4,7 +4,7 @@ BATCH_EXIT_CODE_SUCCESS = 0
# バッチ処理中フラグ:未処理 # バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0' BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中 # バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1' BATCH_ACTF_BATCH_START = '1'
# dump取得状態区分未処理 # dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0' DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了 # dump取得状態区分dump取得正常終了

View File

@ -7,12 +7,26 @@ DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD'] DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA'] DB_SCHEMA = os.environ['DB_SCHEMA']
# 処理名 # AWS
JSKULT_CONFIG_BUCKET = os.environ['JSKULT_CONFIG_BUCKET']
BATCH_EXECUTION_ID = os.environ['BATCH_EXECUTION_ID']
MAX_RUN_COUNT = os.environ['MAX_RUN_COUNT']
TRANSFER_RESULT_FOLDER = os.environ['TRANSFER_RESULT_FOLDER']
TRANSFER_RESULT_FILE_NAME = os.environ['TRANSFER_RESULT_FILE_NAME']
DCF_INST_MERGE_SEND_FILE_NAME = os.environ['DCF_INST_MERGE_SEND_FILE_NAME']
PROCESS_NAME = os.environ['PROCESS_NAME'] PROCESS_NAME = os.environ['PROCESS_NAME']
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET']
JSK_BACKUP_FOLDER = os.environ['JSK_BACKUP_FOLDER']
JSK_DATA_SEND_FOLDER = os.environ['JSK_DATA_SEND_FOLDER']
# 初期値がある環境変数 # 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) DB_CONNECTION_MAX_RETRY_ATTEMPT = int(
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) DB_CONNECTION_RETRY_INTERVAL_INIT = int(
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))

0
ecs/jskult-batch/test.py Normal file
View File

View File

@ -0,0 +1,13 @@
# task environment file.
LOG_LEVEL=INFO
PROCESS_NAME=jskult-batch-dcf-inst-merge-io
JSK_DATA_SEND_FOLDER=send
JSK_BACKUP_FOLDER=jsk/send
TRANSFER_RESULT_FOLDER=transfer_result
TRANSFER_RESULT_FILE_NAME=transfer_result.json
DCF_INST_MERGE_SEND_FILE_NAME=dcf_inst_merge.csv
JSKULT_CONFIG_BUCKET=mbj-newdwh2021-staging-config
DB_CONNECTION_MAX_RETRY_ATTEMPT=1
DB_CONNECTION_RETRY_INTERVAL_INIT=1
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=1
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=1