load処理のカバレッジ確認分を実装

This commit is contained in:
x.azuma.m@nds-tyo.co.jp 2023-05-17 19:24:45 +09:00
parent 7a7d559740
commit c1c9fd68b5
19 changed files with 184 additions and 5 deletions

View File

@ -1,4 +1,4 @@
"""vjsk_file_cheak用テストフィクスチャoverride"""
"""vjsk_file_check用テストフィクスチャoverride"""
import os

View File

@ -0,0 +1,41 @@
"""vjsk_load用テストフィクスチャoverride"""
import os
import boto3
import pytest
from src.batch.vjsk.vjsk_recv_file_mapper import VjskReceiveFileMapper
@pytest.fixture
def s3_client():
conn = boto3.client('s3')
yield conn
@pytest.fixture
def bucket_name():
return os.environ["JSKULT_DATA_BUCKET"]
@pytest.fixture
def receive_folder():
return os.environ["JSKULT_DATA_FOLDER_RECV"]
@pytest.fixture
def mapper():
return VjskReceiveFileMapper()
# @pytest.fixture
# def init_Load_ok(s3_client, bucket_name, receive_folder):
# # setup
# s3_client.put_object(Bucket=bucket_name,
# Key=f'{receive_folder}/stock_slip_data_00000000000000.gz', Body=b'aaaaaaaaaaaaaaa')
# s3_client.put_object(Bucket=bucket_name,
# Key=f'{receive_folder}/slip_data_00000000000000.gz', Body=b'aaaaaaaaaaaaaaa')
# s3_client.put_object(Bucket=bucket_name,
# Key=f'{receive_folder}/org_cnv_mst_00000000000000.gz', Body=b'aaaaaaaaaaaaaaa')
# # teardown

View File

@ -1,6 +1,144 @@
def test1():
pass
from os import path
import pytest
from src.batch.common.batch_context import BatchContext
# from src.batch.vjsk.vjsk_data_load_manager import VjskDataLoadManager
from src.batch.vjsk.vjsk_importer import (_check_received_files,
_import_file_to_db)
from src.db.database import Database
def test2():
pass
class TestImportFileToDb:
db: Database
batch_context: BatchContext
test_file_path: str
@pytest.fixture(autouse=True, scope='function')
def pre_test(self, database: Database):
"""テスト実行前後処理"""
# setup
self.test_file_path = path.join(path.dirname(__file__), "testdata")
self.batch_context = BatchContext.get_instance()
self.db = database
self.db.connect()
# self.db.begin()
# testing
yield
# teardown
# self.db.rollback()
self.db.disconnect()
def test_import_file_to_db_ok(self, s3_client, bucket_name, receive_folder, mapper):
"""
観点
正常系 : すべての受領データをデータベースに登録できる
期待値
例外が発生しない
"""
# setup
self.batch_context.is_vjsk_stock_import_day = True
test_files = [
"stock_slip_data_202304270000.gz",
"slip_data_202304270000.gz",
"org_cnv_mst_202304270000.gz",
"vop_hco_merge_202304270000.gz",
"whs_mst_202304270000.gz",
"hld_mst_202304270000.gz",
"fcl_mst_202304270000.gz",
"mkr_org_horizon_202304270000.gz",
"tran_kbn_mst_202304270000.gz",
"phm_prd_mst_202304270000.gz",
"phm_price_mst_202304270000.gz",
"whs_customer_mst_202304270000.gz",
"mdb_conv_mst_202304270000.gz",
"bio_slip_data_202304270000.gz",
"lot_num_mst_202304270000.gz",
"dummy.gz"
]
for test_file in test_files:
file_name = path.join(self.test_file_path, test_file)
key = f"{receive_folder}/{test_file}"
s3_client.upload_file(file_name, bucket_name, key)
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_STOCK_SLIP_DATA)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_SLIP_DATA)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_ORG_CNV_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_VOP_HCO_MERGE)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_WHS_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_HLD_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_FCL_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_MKR_ORG_HORIZON)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_TRAN_KBN_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_PHM_PRD_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_PHM_PRICE_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_WHS_CUSTOMER_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_MDB_CONV_MST)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_BIO_SLIP_DATA)}")
# self.db.execute(f"truncate table {mapper.get_src_table(mapper.CONDKEY_LOT_NUM_MST)}")
# assertion
received_s3_files = _check_received_files()
_import_file_to_db(received_s3_files)
# self.db.connect()
# # 検証 (卸在庫データファイル)
# table_name_org = mapper.get_org_table(mapper.CONDKEY_STOCK_SLIP_DATA)
# table_name_src = mapper.get_src_table(mapper.CONDKEY_STOCK_SLIP_DATA)
# result = self.db.execute(f"select * from {table_name_org}")
# assert result.rowcount == 10
# result = self.db.execute(f"select * from {table_name_src}")
# assert result.rowcount == 10
# # 検証 (卸販売データ)
# table_name_org = mapper.get_org_table(mapper.CONDKEY_SLIP_DATA)
# table_name_src = mapper.get_src_table(mapper.CONDKEY_SLIP_DATA)
# result = self.db.execute(f"select * from {table_name_org}")
# assert result.rowcount == 10
# result = self.db.execute(f"select * from {table_name_src}")
# assert result.rowcount == 10
# teardown
for test_file in test_files:
key = f"{receive_folder}/{test_file}"
s3_client.delete_object(Bucket=bucket_name, Key=key)
# def test_load_stock_slip_data_ok(self, mapper):
# table_name_org = mapper.get_org_table(mapper.CONDKEY_SLIP_DATA)
# table_name_src = mapper.get_src_table(mapper.CONDKEY_SLIP_DATA)
# # setup
# self.batch_context.is_vjsk_stock_import_day = True
# self.db.execute(f"truncate table {table_name_src}")
# # assertion (insert)
# target_dict = {
# "condkey": mapper.CONDKEY_STOCK_SLIP_DATA,
# "src_file_path": path.join(self.test_file_path, "stock_slip_data_202304280000.tsv")
# }
# VjskDataLoadManager.load(target_dict)
# result = self.db.execute(f"select * from {table_name_org}")
# assert result.rowcount == 4
# result = self.db.execute(f"select * from {table_name_src}")
# assert result.rowcount == 4
# # assertion (update)
# target_dict = {
# "condkey": mapper.CONDKEY_STOCK_SLIP_DATA,
# "src_file_path": path.join(self.test_file_path, "stock_slip_data_202304290000.tsv")
# }
# VjskDataLoadManager.load(target_dict)
# result_org = self.db.execute(f"select * from {table_name_org}")
# assert result_org.rowcount == 4
# result_src1 = self.db.execute(f"select * from {table_name_src}")
# assert result_src1.rowcount == 6
# # teardown

Binary file not shown.