diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py index 11764edf..af23490b 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py @@ -11,10 +11,21 @@ mapper = VjskReceiveFileMapper() class VjskDataLoadManager: + """ + V実消化データ取込機能クラス + """ + def __init__(self): pass def _import_to_db(src_file_name: str, condkey: str): + """ + 概要 + 指定されたtsvファイル src_file_name を策席スキーマに登録する + 引数 + src_file_name: ローカルストレージにある取込対象tsvファイルパス + condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値) + """ logger.debug(f"_import_to_db start (src_file_name : {src_file_name}, condkey : {condkey})") db = Database.get_instance() @@ -24,6 +35,7 @@ class VjskDataLoadManager: upsert_sql = mapper.get_upsert_sql(condkey) try: + # データベース接続 db.connect() db.execute("SET SESSION sql_mode = 'TRADITIONAL';") @@ -52,17 +64,27 @@ class VjskDataLoadManager: # MEMO: https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount logger.info(f'{table_name_org}を{table_name_src}にUPSERT') + # データベースコミット db.commit() + except Exception as e: db.rollback() raise BatchOperationException(e) + finally: + # データベース切断 db.disconnect() logger.debug("_import_to_db done") return def _get_tsv_last_row_tab_count(src_file_name: str) -> int: + """ + 概要 + 指定されたtsvファイル src_file_name の末尾行に含まれるタブ文字数を取得する + 引数 + src_file_name: ローカルストレージにある取込対象tsvファイルパス + """ # memo: tsvファイルが数百MBに及ぶことを想定して、末尾から1行分を参照する # memo: 前提1 行区切りは LF('\n') # memo: 前提2 正常時のファイル終端にある文字は、末尾行の LF('\n') @@ -103,12 +125,25 @@ class VjskDataLoadManager: @classmethod def load(self, target: dict): + """ + 概要 + 取込対象受領ファイル target をデータベースに登録する + 引数 + target: { + condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値) + src_file_path: ローカルストレージにある取込対象tsvファイルパス + } + """ logger.debug(f'load start target:{target}') # S3からローカルストレージにdownloadした登録対象のtsvファイルパスを取得 local_file_name = target["src_file_path"] # tsvファイル末尾行のTABの数が総定数と一致しない場合は例外をスロー + # memo: + # 対向元システムで生成されるファイルは稀に途中欠落が発生することがある。 + # これを、ファイルMySQL8.0のLOADステートメントで発生するWARNING/ERRORでは検知不可能なので、 + # LOADステートメント実行前に、物理的に途中欠落があるかを検知してエラーとすることが目的。 tsv_tabs = self._get_tsv_last_row_tab_count(local_file_name) expect_tabs = mapper.get_file_column_separators(target["condkey"]) if tsv_tabs != expect_tabs: diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py index 19f70067..21264c42 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py @@ -2,6 +2,9 @@ import textwrap class VjskReceiveFileMapper: + """ + V実消化データファイルI/Fマッピング定義クラス + """ CONDKEY_SLIP_DATA = "SLIP_DATA" # 販売実績データ CONDKEY_HLD_MST = "HLD_MST" # V卸ホールディングスマスタ CONDKEY_WHS_MST = "WHS_MST" # V卸マスタ @@ -1489,27 +1492,75 @@ class VjskReceiveFileMapper: return ret def get_data_name(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、ファイル論理名を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_DATA_NAME) def get_file_prefix(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、ファイル名接頭辞を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_FILE_PREFIX) def get_file_suffix(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、ファイル拡張子を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_FILE_SUFFIX) def get_file_column_separators(self, condkey: str) -> int: + """ + 概要 + 受領ファイルI/Fが想定する、1行あたりのタブ文字数を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return int(self._get_interface_property(condkey, self._KEY_FILE_COLUMN_SEPARATORS)) def get_org_table(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、LOAD先ロードスキーマテーブル名を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_ORG_TABLE) def get_src_table(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、登録先蓄積スキーマテーブル名を取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_SRC_TABLE) def get_upsert_sql(self, condkey: str) -> str: + """ + 概要 + 受領ファイルI/Fが想定する、upsert (ロードスキーマ→蓄積スキーマ) CMLステートメントを取得する + 引数 + condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値) + """ return self._get_interface_property(condkey, self._KEY_UPSERT_SQL) def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str: + """ + 概要 + S3受領バケットに受領したファイル名から、I/F想定に該当する condkey 値を取得する + 引数 + s3_file_path: S3受領バケットにある受領したファイルパス + """ ret = None filename = s3_file_path[s3_file_path.rfind("/") + 1:] for condkey in self._VJSK_INTERFACE_MAPPING: diff --git a/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_file_check/conftest.py b/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_file_check/conftest.py index dccdd0df..bf9b2629 100644 --- a/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_file_check/conftest.py +++ b/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_file_check/conftest.py @@ -23,7 +23,6 @@ def receive_folder(): return os.environ["VJSK_DATA_RECEIVE_FOLDER"] -# TODO 共通fixtureにして15個固定でput/delete、各個別fixtureで15個から引き算でdeleteする @pytest.fixture def init_check_received_files_ok1(s3_client, bucket_name, receive_folder): # setup diff --git a/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_load/test_vjsk_load.py b/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_load/test_vjsk_load.py index 70504795..d2fb07ac 100644 --- a/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_load/test_vjsk_load.py +++ b/ecs/jskult-batch-daily/tests/batch/vjsk/vjsk_load/test_vjsk_load.py @@ -17,6 +17,9 @@ from tests.testing_vjsk_utility import (assert_table_results, class TestImportFileToDb: + """ + V実消化データ取込-データベース登録処理 テストクラス + """ db: Database batch_context: BatchContext test_file_path_import_all: str