From 9e6403dea521cfc95ced01e5ef3f6231d6ed6630 Mon Sep 17 00:00:00 2001 From: "x.azuma.m@nds-tyo.co.jp" Date: Fri, 30 Jun 2023 19:23:42 +0900 Subject: [PATCH] =?UTF-8?q?NEWDWH2021-1130=20tsv=E3=83=88=E3=83=81?= =?UTF-8?q?=E5=88=87=E3=82=8C=E5=88=A4=E5=AE=9A=E3=82=92LOAD=E5=AE=9F?= =?UTF-8?q?=E8=A1=8C=E5=89=8D=E3=81=AB=E8=A6=8B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/batch/vjsk/vjsk_data_load_manager.py | 27 +++++++++++++++++++ .../src/batch/vjsk/vjsk_recv_file_mapper.py | 19 +++++++++++++ 2 files changed, 46 insertions(+) diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py index b345b0d8..3ef87186 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py @@ -60,6 +60,26 @@ class VjskDataLoadManager: logger.debug("_import_to_db done") return + def _get_tsv_last_row_tab_count(src_file_name: str) -> int: + # memo: tsvファイルが数百MBに及ぶことを想定して、末尾から1行分を参照する + # memo: 前提1 行区切りは LF('\n') + buf_count = 0 + + # バイナリモードでファイルオープン + with open(src_file_name, 'rb') as file: + # ファイルの末尾から2バイト手前に移動 + file.seek(-2, 2) + # 改行文字を見つけるまで逆方向に読み進める + while file.read(1) != b'\n': + # 1バイト戻って再度読み込み + file.seek(-2, 1) + # 末尾行を抽出 + last_line = file.readline().decode().rstrip('\n') + # 末尾行に含まれるタブ文字の数を抽出 + buf_count = last_line.count('\t') + + return buf_count + @classmethod def load(self, target: dict): logger.debug(f'load start target:{target}') @@ -67,6 +87,13 @@ class VjskDataLoadManager: # S3からローカルストレージにdownloadした登録対象のtsvファイルパスを取得 local_file_name = target["src_file_path"] + # tsvファイル末尾行のTABの数が総定数と一致しない場合は例外をスロー + tsv_tabs = self._get_tsv_last_row_tab_count(local_file_name) + expect_tabs = mapper.get_file_column_separators(target["condkey"]) + if tsv_tabs != expect_tabs: + msg = f"受領tsvファイルの末尾行のTABの数が総定数と一致しませんでした local_file_name: {local_file_name}" + raise BatchOperationException(msg) + # データベース登録 self._import_to_db(local_file_name, target["condkey"]) diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py index 352e2f91..19f70067 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py @@ -21,6 +21,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME = "data_name" _KEY_FILE_PREFIX = "file_prefix" _KEY_FILE_SUFFIX = "file_suffix" + _KEY_FILE_COLUMN_SEPARATORS = "file_column_separators" _KEY_ORG_TABLE = "org_table" _KEY_SRC_TABLE = "src_table" _KEY_UPSERT_SQL = "upsert_sql" @@ -30,6 +31,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "販売実績データ", _KEY_FILE_PREFIX: "slip_data_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "82", _KEY_ORG_TABLE: "org05.sales", _KEY_SRC_TABLE: "src05.sales", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -299,6 +301,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V卸ホールディングスマスタ", _KEY_FILE_PREFIX: "hld_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "10", _KEY_ORG_TABLE: "org05.hld_mst_v", _KEY_SRC_TABLE: "src05.hld_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -352,6 +355,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V卸マスタ", _KEY_FILE_PREFIX: "whs_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "15", _KEY_ORG_TABLE: "org05.whs_mst_v", _KEY_SRC_TABLE: "src05.whs_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -420,6 +424,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "Vメーカー卸組織展開表", _KEY_FILE_PREFIX: "mkr_org_horizon_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "45", _KEY_ORG_TABLE: "org05.mkr_org_horizon_v", _KEY_SRC_TABLE: "src05.mkr_org_horizon_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -578,6 +583,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V卸組織変換マスタ", _KEY_FILE_PREFIX: "org_cnv_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "10", _KEY_ORG_TABLE: "org05.org_cnv_mst_v", _KEY_SRC_TABLE: "src05.org_cnv_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -631,6 +637,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V取引区分マスタ", _KEY_FILE_PREFIX: "tran_kbn_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "8", _KEY_ORG_TABLE: "org05.tran_kbn_mst_v", _KEY_SRC_TABLE: "src05.tran_kbn_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -678,6 +685,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V施設マスタ", _KEY_FILE_PREFIX: "fcl_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "23", _KEY_ORG_TABLE: "org05.fcl_mst_v", _KEY_SRC_TABLE: "src05.fcl_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -770,6 +778,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V製品マスタ", _KEY_FILE_PREFIX: "phm_prd_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "27", _KEY_ORG_TABLE: "org05.phm_prd_mst_v", _KEY_SRC_TABLE: "src05.phm_prd_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -874,6 +883,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V製品価格マスタ", _KEY_FILE_PREFIX: "phm_price_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "9", _KEY_ORG_TABLE: "org05.phm_price_mst_v", _KEY_SRC_TABLE: "src05.phm_price_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -924,6 +934,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V施設統合マスタ", _KEY_FILE_PREFIX: "vop_hco_merge_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "3", _KEY_ORG_TABLE: "org05.vop_hco_merge_v", _KEY_SRC_TABLE: "src05.vop_hco_merge_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -956,6 +967,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "V卸得意先情報マスタ", _KEY_FILE_PREFIX: "whs_customer_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "16", _KEY_ORG_TABLE: "org05.whs_customer_mst_v", _KEY_SRC_TABLE: "src05.whs_customer_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -1027,6 +1039,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "MDBコード変換表", _KEY_FILE_PREFIX: "mdb_conv_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "7", _KEY_ORG_TABLE: "org05.mdb_cnv_mst_v", _KEY_SRC_TABLE: "src05.mdb_cnv_mst_v", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -1071,6 +1084,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "卸在庫データ", _KEY_FILE_PREFIX: "stock_slip_data_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "28", _KEY_ORG_TABLE: "org05.whole_stock", _KEY_SRC_TABLE: "src05.whole_stock", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -1178,6 +1192,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "生物由来データ", _KEY_FILE_PREFIX: "bio_slip_data_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "77", _KEY_ORG_TABLE: "org05.bio_sales", _KEY_SRC_TABLE: "src05.bio_sales", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -1432,6 +1447,7 @@ class VjskReceiveFileMapper: _KEY_DATA_NAME: "ロットマスタデータ", _KEY_FILE_PREFIX: "lot_num_mst_", _KEY_FILE_SUFFIX: ".gz", + _KEY_FILE_COLUMN_SEPARATORS: "5", _KEY_ORG_TABLE: "org05.lot_num_mst", _KEY_SRC_TABLE: "src05.lot_num_mst", _KEY_UPSERT_SQL: textwrap.dedent("""\ @@ -1481,6 +1497,9 @@ class VjskReceiveFileMapper: def get_file_suffix(self, condkey: str) -> str: return self._get_interface_property(condkey, self._KEY_FILE_SUFFIX) + def get_file_column_separators(self, condkey: str) -> int: + return int(self._get_interface_property(condkey, self._KEY_FILE_COLUMN_SEPARATORS)) + def get_org_table(self, condkey: str) -> str: return self._get_interface_property(condkey, self._KEY_ORG_TABLE)