256 lines
8.9 KiB
Python
256 lines
8.9 KiB
Python
"""テスト用共通処理関数"""
|
|
import csv
|
|
import io
|
|
import tempfile
|
|
from datetime import datetime
|
|
from types import ModuleType
|
|
|
|
from src.batch.ultmarc.datfile import DatFile, DatFileLine
|
|
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
|
|
UltmarcTableMapper
|
|
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
|
|
UltmarcTableMapperFactory
|
|
from src.db.database import Database
|
|
|
|
|
|
def create_ultmarc_test_data_from_csv(file_path: str) -> DatFile:
|
|
"""ファイルから、アルトマーク取り込み用のテストデータを作成する
|
|
|
|
Args:
|
|
file_path (str): csvファイルのパス
|
|
|
|
Returns:
|
|
DatFile: データファイルオブジェクト
|
|
"""
|
|
|
|
# 一度、Shift-JISファイルで書き出す
|
|
with open(file_path, encoding='utf8') as csv_file, tempfile.NamedTemporaryFile('w', encoding='cp932') as tmp_file:
|
|
tmp_file.write(csv_file.read())
|
|
tmp_file.seek(0)
|
|
tmpfile_path = tmp_file.name
|
|
dat_file = DatFile.from_path(tmpfile_path)
|
|
|
|
return dat_file
|
|
|
|
|
|
def create_db_data_from_csv(file_path: str) -> list[dict]:
|
|
"""ファイルから、DBの期待値データを作成する
|
|
|
|
Args:
|
|
file_path (str): csvファイルのパス
|
|
|
|
Returns:
|
|
DatFile: データファイルオブジェクト
|
|
"""
|
|
|
|
with open(file_path, encoding='utf8') as csv_file:
|
|
# ヘッダ行を取得し、改行とクォートを取り除く。
|
|
header = csv_file.readline().strip('\n').replace('"', '').split(',')
|
|
reader = csv.DictReader(csv_file, fieldnames=header)
|
|
rows = [r for r in reader]
|
|
|
|
# データ型変換
|
|
for row in rows:
|
|
for k, v in row.items():
|
|
converted_value = v
|
|
if v == 'NULL':
|
|
converted_value = None
|
|
if is_valid_date_format(v, '%Y/%m/%d') is True: # YYYY/MM/DD
|
|
converted_value = datetime.strptime(v, '%Y/%m/%d').date()
|
|
if is_valid_date_format(v, '%Y-%m-%d') is True: # YYYY-MM-DD
|
|
converted_value = datetime.strptime(v, '%Y-%m-%d').date()
|
|
if is_valid_date_format(v, '%Y/%m/%d %H:%M:%S') is True: # YYYY/MM/DD HH:MM:SS
|
|
converted_value = datetime.strptime(v, '%Y/%m/%d %H:%M:%S')
|
|
if is_valid_date_format(v, '%Y-%m-%d %H:%M:%S') is True: # YYYY-MM-DD HH:MM:SS
|
|
converted_value = datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
|
|
|
|
row[k] = converted_value
|
|
|
|
return rows
|
|
|
|
|
|
def create_ultmarc_test_csv(csv_rows: list[str]) -> DatFile:
|
|
"""アルトマーク取込テストのCSVを作成
|
|
Args:
|
|
csv_rows (tuple[str]): CSV文字列のリスト
|
|
Returns:
|
|
list[list[str]]: CSVデータ
|
|
"""
|
|
string_io = io.StringIO()
|
|
for csv_row in csv_rows:
|
|
string_io.write(csv_row)
|
|
string_io.write('\n')
|
|
string_io.seek(0)
|
|
dat_file = DatFile(string_io)
|
|
|
|
return dat_file
|
|
|
|
|
|
def create_ultmarc_common_column_names() -> list[str]:
|
|
"""アルトマークテーブル共通のカラム名を作成
|
|
Returns:
|
|
list[str]: 共通カラム名
|
|
"""
|
|
|
|
return [
|
|
'regist_ymd',
|
|
'update_ymd',
|
|
'delete_ymd',
|
|
'regist_date',
|
|
'create_user',
|
|
'update_date',
|
|
'update_user',
|
|
'sys_regist_date',
|
|
'regist_prgm_id',
|
|
'sys_update_date',
|
|
'update_prgm_id'
|
|
]
|
|
|
|
|
|
def create_insert_sql_with_parameter(table_name: str, column_names: list[str], test_data: list[str]) -> tuple[str, dict]:
|
|
"""INSERT文と登録値のパラメータを返す
|
|
|
|
Args:
|
|
table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma)
|
|
column_names (list[str]): カラム名のリスト
|
|
test_data (list[str]): 値のリスト
|
|
|
|
Returns:
|
|
tuple[str, dict]: [0]→INSERT文,[1]→値のパラメータ
|
|
"""
|
|
placeholders = ','.join([f':{column_name}' for column_name in column_names])
|
|
insert_sql = f"INSERT INTO {table_name} ({','.join(column_names)}) VALUES({placeholders})"
|
|
parameter = {k: v for k, v in zip(column_names, test_data)}
|
|
|
|
return insert_sql, parameter
|
|
|
|
|
|
def create_delete_sql_with_parameter(table_name: str, delete_parameter: dict[str, str]):
|
|
"""DELETE文と削除条件値のパラメータを返す
|
|
|
|
Args:
|
|
table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma)
|
|
delete_parameter (dict[str, str]): 削除条件に使用するカラム名と値の辞書
|
|
|
|
Returns:
|
|
tuple[str, dict]: [0]→DELETE文,[1]→値のパラメータ
|
|
"""
|
|
where_clause_list = []
|
|
for k in delete_parameter:
|
|
where_clause_list.append(f'{k} = :{k}')
|
|
where_clauses = ' AND '.join(where_clause_list)
|
|
delete_sql = f"DELETE FROM {table_name} WHERE {where_clauses}"
|
|
|
|
return delete_sql, delete_parameter
|
|
|
|
|
|
def create_ultmarc_common_column_values(**kwargs) -> list:
|
|
"""アルトマークテーブル共通のカラムを作成
|
|
|
|
Args:
|
|
kwargs 有効なキー一覧
|
|
regist_ymd (str): 登録年月日
|
|
update_ymd (str): 更新年月日
|
|
delete_ymd (str): 削除年月日
|
|
regist_date (datetime): 登録日時
|
|
create_user (str): 登録者
|
|
update_date (datetime): 更新日時
|
|
update_user (str): 更新者
|
|
sys_regist_date (datetime): システム登録日時
|
|
regist_prgm_id (str): 登録プログラムid
|
|
sys_update_date (datetime): システム更新日時
|
|
update_prgm_id (str): 更新プログラムid
|
|
Returns:
|
|
list: 共通カラム値
|
|
"""
|
|
|
|
return [
|
|
kwargs.get('regist_ymd'), # 登録年月日(regist_ymd)
|
|
kwargs.get('update_ymd'), # 更新年月日(update_ymd)
|
|
kwargs.get('delete_ymd'), # 削除年月日(delete_ymd)
|
|
kwargs.get('regist_date'), # 登録日時(regist_date)
|
|
kwargs.get('create_user'), # 登録者(create_user)
|
|
kwargs.get('update_date'), # 更新日時(update_date)
|
|
kwargs.get('update_user'), # 更新者(update_user)
|
|
kwargs.get('sys_regist_date'), # システム登録日時(sys_regist_date)
|
|
kwargs.get('regist_prgm_id'), # 登録プログラムid(regist_prgm_id)
|
|
kwargs.get('sys_update_date'), # システム更新日時(sys_update_date)
|
|
kwargs.get('update_prgm_id') # 更新プログラムid(update_prgm_id)
|
|
]
|
|
|
|
|
|
def create_ultmarc_table_mapper_sut(line: DatFileLine, db: Database) -> UltmarcTableMapper:
|
|
"""アルトマークテーブルマッパーのインスタンスを返す
|
|
|
|
Args:
|
|
line (DatFileLine): テストデータの1行
|
|
db (Database): 接続済みDBインスタンス
|
|
|
|
Returns:
|
|
UltmarcTableMapper: マッパークラス
|
|
"""
|
|
layout_class = line.layout_class
|
|
record_id = line.record_id
|
|
factory = UltmarcTableMapperFactory()
|
|
sut = factory.create(
|
|
layout_class=layout_class,
|
|
record_id=record_id,
|
|
records=line.records,
|
|
db=db
|
|
)
|
|
|
|
return sut
|
|
|
|
|
|
def get_module_name(module: ModuleType) -> str:
|
|
"""登録プログラムID、更新プログラムIDに登録するモジュール名を作成
|
|
|
|
Args:
|
|
module (ModuleType): pythonモジュール
|
|
|
|
Returns:
|
|
str: モジュール名
|
|
"""
|
|
return module.__name__.split('.')[-1]
|
|
|
|
|
|
def is_valid_date_format(date_str: str, date_format):
|
|
"""日付文字列が、与えられたフォーマットにマッチするかを検査する
|
|
|
|
Args:
|
|
date_str (str): 日付文字列
|
|
date_format (str, optional): 日付のフォーマット
|
|
|
|
Returns:
|
|
_type_: 正しい日付文字列の場合、True、それ以外はFalse
|
|
"""
|
|
try:
|
|
datetime.strptime(date_str, date_format)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def assert_table_results(actual_rows: list[dict], expect_rows: list[dict], ignore_col_name: list = None) -> None:
|
|
"""テーブル同士の取得結果突き合わせ
|
|
|
|
Args:
|
|
actual_rows (list[dict]): テスト結果の辞書リスト
|
|
expect_rows (list[dict]): 期待値の辞書リスト
|
|
"""
|
|
# 取得件数が一致すること
|
|
assert len(actual_rows) == len(expect_rows)
|
|
|
|
# 1カラムずつ調査
|
|
line_number = 0
|
|
for actual_row, expect_row in zip(actual_rows, expect_rows):
|
|
line_number += 1
|
|
for actual_col_name, expect_col_name in zip(actual_row, expect_row):
|
|
# テストメソッド側で個別に確認するものはスキップさせる
|
|
if ignore_col_name is not None and actual_col_name in ignore_col_name:
|
|
continue
|
|
else:
|
|
actual_value = actual_row[actual_col_name]
|
|
expect_value = expect_row[expect_col_name]
|
|
assert actual_value == expect_value, f'{line_number}行目:{actual_col_name}が、期待値と一致すること'
|