170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
import csv
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
from src.batch.ultmarc.datfile import DatFile, DatFileLine
|
|
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
|
|
UltmarcTableMapper
|
|
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
|
|
UltmarcTableMapperFactory
|
|
from src.db.database import Database
|
|
|
|
|
|
def create_ultmarc_test_data_from_csv(file_path: str) -> DatFile:
|
|
"""ファイルから、アルトマーク取り込み用のテストデータを作成する
|
|
|
|
Args:
|
|
file_path (str): csvファイルのパス
|
|
|
|
Returns:
|
|
DatFile: データファイルオブジェクト
|
|
"""
|
|
|
|
# 一度、Shift-JISファイルで書き出す
|
|
with open(file_path, encoding='utf8') as csv_file, tempfile.NamedTemporaryFile('w', encoding='cp932', delete=False) as tmp_file:
|
|
tmp_file.write(csv_file.read())
|
|
tmp_file.seek(0)
|
|
tmpfile_path = tmp_file.name
|
|
|
|
dat_file = DatFile.from_path(tmpfile_path)
|
|
os.unlink(tmpfile_path)
|
|
return dat_file
|
|
|
|
|
|
def create_db_data_from_csv(file_path: str) -> list[dict]:
|
|
"""ファイルから、DBの期待値データを作成する
|
|
|
|
Args:
|
|
file_path (str): csvファイルのパス
|
|
|
|
Returns:
|
|
DatFile: データファイルオブジェクト
|
|
"""
|
|
|
|
with open(file_path, encoding='utf8') as csv_file:
|
|
# ヘッダ行を取得し、改行とクォートを取り除く。
|
|
header = csv_file.readline().strip('\n').replace('"', '').split(',')
|
|
reader = csv.DictReader(csv_file, fieldnames=header)
|
|
rows = [r for r in reader]
|
|
|
|
# データ型変換
|
|
for row in rows:
|
|
for k, v in row.items():
|
|
converted_value = v
|
|
if v == 'NULL':
|
|
converted_value = None
|
|
if is_valid_date_format(v, '%Y/%m/%d') is True: # YYYY/MM/DD
|
|
converted_value = datetime.strptime(v, '%Y/%m/%d').date()
|
|
if is_valid_date_format(v, '%Y-%m-%d') is True: # YYYY-MM-DD
|
|
converted_value = datetime.strptime(v, '%Y-%m-%d').date()
|
|
if is_valid_date_format(v, '%Y/%m/%d %H:%M:%S') is True: # YYYY/MM/DD HH:MM:SS
|
|
converted_value = datetime.strptime(v, '%Y/%m/%d %H:%M:%S')
|
|
if is_valid_date_format(v, '%Y-%m-%d %H:%M:%S') is True: # YYYY-MM-DD HH:MM:SS
|
|
converted_value = datetime.strptime(v, '%Y-%m-%d %H:%M:%S')
|
|
|
|
row[k] = converted_value
|
|
|
|
return rows
|
|
|
|
|
|
def create_insert_sql_with_parameter(table_name: str, column_names: list[str], test_data: list[str]) -> tuple[str, dict]:
|
|
"""INSERT文と登録値のパラメータを返す
|
|
|
|
Args:
|
|
table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma)
|
|
column_names (list[str]): カラム名のリスト
|
|
test_data (list[str]): 値のリスト
|
|
|
|
Returns:
|
|
tuple[str, dict]: [0]→INSERT文,[1]→値のパラメータ
|
|
"""
|
|
placeholders = ','.join([f':{column_name}' for column_name in column_names])
|
|
insert_sql = f"INSERT INTO {table_name} ({','.join(column_names)}) VALUES({placeholders})"
|
|
parameter = {k: v for k, v in zip(column_names, test_data)}
|
|
|
|
return insert_sql, parameter
|
|
|
|
|
|
def create_delete_sql_with_parameter(table_name: str, delete_parameter: dict[str, str]):
|
|
"""DELETE文と削除条件値のパラメータを返す
|
|
|
|
Args:
|
|
table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma)
|
|
delete_parameter (dict[str, str]): 削除条件に使用するカラム名と値の辞書
|
|
|
|
Returns:
|
|
tuple[str, dict]: [0]→DELETE文,[1]→値のパラメータ
|
|
"""
|
|
where_clause_list = []
|
|
for k in delete_parameter:
|
|
where_clause_list.append(f'{k} = :{k}')
|
|
where_clauses = ' AND '.join(where_clause_list)
|
|
delete_sql = f"DELETE FROM {table_name} WHERE {where_clauses}"
|
|
|
|
return delete_sql, delete_parameter
|
|
|
|
|
|
def create_ultmarc_table_mapper_sut(line: DatFileLine, db: Database) -> UltmarcTableMapper:
|
|
"""アルトマークテーブルマッパーのインスタンスを返す
|
|
|
|
Args:
|
|
line (DatFileLine): テストデータの1行
|
|
db (Database): 接続済みDBインスタンス
|
|
|
|
Returns:
|
|
UltmarcTableMapper: マッパークラス
|
|
"""
|
|
layout_class = line.layout_class
|
|
factory = UltmarcTableMapperFactory()
|
|
sut = factory.create(
|
|
layout_class=layout_class,
|
|
records=line.records,
|
|
db=db
|
|
)
|
|
|
|
return sut
|
|
|
|
|
|
def is_valid_date_format(date_str: str, date_format):
|
|
"""日付文字列が、与えられたフォーマットにマッチするかを検査する
|
|
|
|
Args:
|
|
date_str (str): 日付文字列
|
|
date_format (str, optional): 日付のフォーマット
|
|
|
|
Returns:
|
|
_type_: 正しい日付文字列の場合、True、それ以外はFalse
|
|
"""
|
|
try:
|
|
datetime.strptime(date_str, date_format)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def assert_table_results(actual_rows: list[dict], expect_rows: list[dict], ignore_col_name: list = None) -> None:
|
|
"""テーブル同士の取得結果突き合わせ
|
|
|
|
Args:
|
|
actual_rows (list[dict]): テスト結果の辞書リスト
|
|
expect_rows (list[dict]): 期待値の辞書リスト
|
|
ignore_col_name (list): 比較を無視するDBのカラム名. Default None.
|
|
"""
|
|
# 取得件数が一致すること
|
|
assert len(actual_rows) == len(expect_rows)
|
|
|
|
line_number = 0
|
|
# 1行ずつ調査
|
|
for actual_row, expect_row in zip(actual_rows, expect_rows):
|
|
line_number += 1
|
|
# 1カラムずつ調査
|
|
for actual_col_name, expect_col_name in zip(actual_row, expect_row):
|
|
# テストメソッド側で個別に確認するものはスキップさせる
|
|
if ignore_col_name is not None and actual_col_name in ignore_col_name:
|
|
continue
|
|
else:
|
|
actual_value = actual_row[actual_col_name]
|
|
expect_value = expect_row[expect_col_name]
|
|
assert actual_value == expect_value, f'{line_number}行目:{actual_col_name}が、期待値と一致すること'
|