コメント整理
This commit is contained in:
parent
ecbff882bf
commit
241838ef71
@ -11,10 +11,21 @@ mapper = VjskReceiveFileMapper()
|
|||||||
|
|
||||||
|
|
||||||
class VjskDataLoadManager:
|
class VjskDataLoadManager:
|
||||||
|
"""
|
||||||
|
V実消化データ取込機能クラス
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _import_to_db(src_file_name: str, condkey: str):
|
def _import_to_db(src_file_name: str, condkey: str):
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
指定されたtsvファイル src_file_name を策席スキーマに登録する
|
||||||
|
引数
|
||||||
|
src_file_name: ローカルストレージにある取込対象tsvファイルパス
|
||||||
|
condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値)
|
||||||
|
"""
|
||||||
logger.debug(f"_import_to_db start (src_file_name : {src_file_name}, condkey : {condkey})")
|
logger.debug(f"_import_to_db start (src_file_name : {src_file_name}, condkey : {condkey})")
|
||||||
|
|
||||||
db = Database.get_instance()
|
db = Database.get_instance()
|
||||||
@ -24,6 +35,7 @@ class VjskDataLoadManager:
|
|||||||
upsert_sql = mapper.get_upsert_sql(condkey)
|
upsert_sql = mapper.get_upsert_sql(condkey)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# データベース接続
|
||||||
db.connect()
|
db.connect()
|
||||||
db.execute("SET SESSION sql_mode = 'TRADITIONAL';")
|
db.execute("SET SESSION sql_mode = 'TRADITIONAL';")
|
||||||
|
|
||||||
@ -52,17 +64,27 @@ class VjskDataLoadManager:
|
|||||||
# MEMO: https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount
|
# MEMO: https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount
|
||||||
logger.info(f'{table_name_org}を{table_name_src}にUPSERT')
|
logger.info(f'{table_name_org}を{table_name_src}にUPSERT')
|
||||||
|
|
||||||
|
# データベースコミット
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
db.rollback()
|
db.rollback()
|
||||||
raise BatchOperationException(e)
|
raise BatchOperationException(e)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# データベース切断
|
||||||
db.disconnect()
|
db.disconnect()
|
||||||
|
|
||||||
logger.debug("_import_to_db done")
|
logger.debug("_import_to_db done")
|
||||||
return
|
return
|
||||||
|
|
||||||
def _get_tsv_last_row_tab_count(src_file_name: str) -> int:
|
def _get_tsv_last_row_tab_count(src_file_name: str) -> int:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
指定されたtsvファイル src_file_name の末尾行に含まれるタブ文字数を取得する
|
||||||
|
引数
|
||||||
|
src_file_name: ローカルストレージにある取込対象tsvファイルパス
|
||||||
|
"""
|
||||||
# memo: tsvファイルが数百MBに及ぶことを想定して、末尾から1行分を参照する
|
# memo: tsvファイルが数百MBに及ぶことを想定して、末尾から1行分を参照する
|
||||||
# memo: 前提1 行区切りは LF('\n')
|
# memo: 前提1 行区切りは LF('\n')
|
||||||
# memo: 前提2 正常時のファイル終端にある文字は、末尾行の LF('\n')
|
# memo: 前提2 正常時のファイル終端にある文字は、末尾行の LF('\n')
|
||||||
@ -103,12 +125,25 @@ class VjskDataLoadManager:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(self, target: dict):
|
def load(self, target: dict):
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
取込対象受領ファイル target をデータベースに登録する
|
||||||
|
引数
|
||||||
|
target: {
|
||||||
|
condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値)
|
||||||
|
src_file_path: ローカルストレージにある取込対象tsvファイルパス
|
||||||
|
}
|
||||||
|
"""
|
||||||
logger.debug(f'load start target:{target}')
|
logger.debug(f'load start target:{target}')
|
||||||
|
|
||||||
# S3からローカルストレージにdownloadした登録対象のtsvファイルパスを取得
|
# S3からローカルストレージにdownloadした登録対象のtsvファイルパスを取得
|
||||||
local_file_name = target["src_file_path"]
|
local_file_name = target["src_file_path"]
|
||||||
|
|
||||||
# tsvファイル末尾行のTABの数が総定数と一致しない場合は例外をスロー
|
# tsvファイル末尾行のTABの数が総定数と一致しない場合は例外をスロー
|
||||||
|
# memo:
|
||||||
|
# 対向元システムで生成されるファイルは稀に途中欠落が発生することがある。
|
||||||
|
# これを、ファイルMySQL8.0のLOADステートメントで発生するWARNING/ERRORでは検知不可能なので、
|
||||||
|
# LOADステートメント実行前に、物理的に途中欠落があるかを検知してエラーとすることが目的。
|
||||||
tsv_tabs = self._get_tsv_last_row_tab_count(local_file_name)
|
tsv_tabs = self._get_tsv_last_row_tab_count(local_file_name)
|
||||||
expect_tabs = mapper.get_file_column_separators(target["condkey"])
|
expect_tabs = mapper.get_file_column_separators(target["condkey"])
|
||||||
if tsv_tabs != expect_tabs:
|
if tsv_tabs != expect_tabs:
|
||||||
|
|||||||
@ -2,6 +2,9 @@ import textwrap
|
|||||||
|
|
||||||
|
|
||||||
class VjskReceiveFileMapper:
|
class VjskReceiveFileMapper:
|
||||||
|
"""
|
||||||
|
V実消化データファイルI/Fマッピング定義クラス
|
||||||
|
"""
|
||||||
CONDKEY_SLIP_DATA = "SLIP_DATA" # 販売実績データ
|
CONDKEY_SLIP_DATA = "SLIP_DATA" # 販売実績データ
|
||||||
CONDKEY_HLD_MST = "HLD_MST" # V卸ホールディングスマスタ
|
CONDKEY_HLD_MST = "HLD_MST" # V卸ホールディングスマスタ
|
||||||
CONDKEY_WHS_MST = "WHS_MST" # V卸マスタ
|
CONDKEY_WHS_MST = "WHS_MST" # V卸マスタ
|
||||||
@ -1489,27 +1492,75 @@ class VjskReceiveFileMapper:
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def get_data_name(self, condkey: str) -> str:
|
def get_data_name(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、ファイル論理名を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_DATA_NAME)
|
return self._get_interface_property(condkey, self._KEY_DATA_NAME)
|
||||||
|
|
||||||
def get_file_prefix(self, condkey: str) -> str:
|
def get_file_prefix(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、ファイル名接頭辞を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_FILE_PREFIX)
|
return self._get_interface_property(condkey, self._KEY_FILE_PREFIX)
|
||||||
|
|
||||||
def get_file_suffix(self, condkey: str) -> str:
|
def get_file_suffix(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、ファイル拡張子を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_FILE_SUFFIX)
|
return self._get_interface_property(condkey, self._KEY_FILE_SUFFIX)
|
||||||
|
|
||||||
def get_file_column_separators(self, condkey: str) -> int:
|
def get_file_column_separators(self, condkey: str) -> int:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、1行あたりのタブ文字数を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return int(self._get_interface_property(condkey, self._KEY_FILE_COLUMN_SEPARATORS))
|
return int(self._get_interface_property(condkey, self._KEY_FILE_COLUMN_SEPARATORS))
|
||||||
|
|
||||||
def get_org_table(self, condkey: str) -> str:
|
def get_org_table(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、LOAD先ロードスキーマテーブル名を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_ORG_TABLE)
|
return self._get_interface_property(condkey, self._KEY_ORG_TABLE)
|
||||||
|
|
||||||
def get_src_table(self, condkey: str) -> str:
|
def get_src_table(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、登録先蓄積スキーマテーブル名を取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_SRC_TABLE)
|
return self._get_interface_property(condkey, self._KEY_SRC_TABLE)
|
||||||
|
|
||||||
def get_upsert_sql(self, condkey: str) -> str:
|
def get_upsert_sql(self, condkey: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
受領ファイルI/Fが想定する、upsert (ロードスキーマ→蓄積スキーマ) CMLステートメントを取得する
|
||||||
|
引数
|
||||||
|
condkey: 受領データの種類を一意に示す値(このクラスのメンバ CONDKEY_* の値)
|
||||||
|
"""
|
||||||
return self._get_interface_property(condkey, self._KEY_UPSERT_SQL)
|
return self._get_interface_property(condkey, self._KEY_UPSERT_SQL)
|
||||||
|
|
||||||
def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str:
|
def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str:
|
||||||
|
"""
|
||||||
|
概要
|
||||||
|
S3受領バケットに受領したファイル名から、I/F想定に該当する condkey 値を取得する
|
||||||
|
引数
|
||||||
|
s3_file_path: S3受領バケットにある受領したファイルパス
|
||||||
|
"""
|
||||||
ret = None
|
ret = None
|
||||||
filename = s3_file_path[s3_file_path.rfind("/") + 1:]
|
filename = s3_file_path[s3_file_path.rfind("/") + 1:]
|
||||||
for condkey in self._VJSK_INTERFACE_MAPPING:
|
for condkey in self._VJSK_INTERFACE_MAPPING:
|
||||||
|
|||||||
@ -23,7 +23,6 @@ def receive_folder():
|
|||||||
return os.environ["VJSK_DATA_RECEIVE_FOLDER"]
|
return os.environ["VJSK_DATA_RECEIVE_FOLDER"]
|
||||||
|
|
||||||
|
|
||||||
# TODO 共通fixtureにして15個固定でput/delete、各個別fixtureで15個から引き算でdeleteする
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def init_check_received_files_ok1(s3_client, bucket_name, receive_folder):
|
def init_check_received_files_ok1(s3_client, bucket_name, receive_folder):
|
||||||
# setup
|
# setup
|
||||||
|
|||||||
@ -17,6 +17,9 @@ from tests.testing_vjsk_utility import (assert_table_results,
|
|||||||
|
|
||||||
|
|
||||||
class TestImportFileToDb:
|
class TestImportFileToDb:
|
||||||
|
"""
|
||||||
|
V実消化データ取込-データベース登録処理 テストクラス
|
||||||
|
"""
|
||||||
db: Database
|
db: Database
|
||||||
batch_context: BatchContext
|
batch_context: BatchContext
|
||||||
test_file_path_import_all: str
|
test_file_path_import_all: str
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user