import csv import os import tempfile from datetime import datetime from src.batch.ultmarc.datfile import DatFile, DatFileLine from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \ UltmarcTableMapper from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \ UltmarcTableMapperFactory from src.db.database import Database def create_ultmarc_test_data_from_csv(file_path: str) -> DatFile: """ファイルから、アルトマーク取り込み用のテストデータを作成する Args: file_path (str): csvファイルのパス Returns: DatFile: データファイルオブジェクト """ # 一度、Shift-JISファイルで書き出す with open(file_path, encoding='utf8') as csv_file, tempfile.NamedTemporaryFile('w', encoding='cp932', delete=False) as tmp_file: tmp_file.write(csv_file.read()) tmp_file.seek(0) tmpfile_path = tmp_file.name dat_file = DatFile.from_path(tmpfile_path) os.unlink(tmpfile_path) return dat_file def create_db_data_from_csv(file_path: str) -> list[dict]: """ファイルから、DBの期待値データを作成する Args: file_path (str): csvファイルのパス Returns: DatFile: データファイルオブジェクト """ with open(file_path, encoding='utf8') as csv_file: # ヘッダ行を取得し、改行とクォートを取り除く。 header = csv_file.readline().strip('\n').replace('"', '').split(',') reader = csv.DictReader(csv_file, fieldnames=header) rows = [r for r in reader] # データ型変換 for row in rows: for k, v in row.items(): converted_value = v if v == 'NULL': converted_value = None if is_valid_date_format(v, '%Y/%m/%d') is True: # YYYY/MM/DD converted_value = datetime.strptime(v, '%Y/%m/%d').date() if is_valid_date_format(v, '%Y-%m-%d') is True: # YYYY-MM-DD converted_value = datetime.strptime(v, '%Y-%m-%d').date() if is_valid_date_format(v, '%Y/%m/%d %H:%M:%S') is True: # YYYY/MM/DD HH:MM:SS converted_value = datetime.strptime(v, '%Y/%m/%d %H:%M:%S') if is_valid_date_format(v, '%Y-%m-%d %H:%M:%S') is True: # YYYY-MM-DD HH:MM:SS converted_value = datetime.strptime(v, '%Y-%m-%d %H:%M:%S') row[k] = converted_value return rows def create_insert_sql_with_parameter(table_name: str, column_names: list[str], test_data: list[str]) -> tuple[str, dict]: """INSERT文と登録値のパラメータを返す Args: table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma) column_names (list[str]): カラム名のリスト test_data (list[str]): 値のリスト Returns: tuple[str, dict]: [0]→INSERT文,[1]→値のパラメータ """ placeholders = ','.join([f':{column_name}' for column_name in column_names]) insert_sql = f"INSERT INTO {table_name} ({','.join(column_names)}) VALUES({placeholders})" parameter = {k: v for k, v in zip(column_names, test_data)} return insert_sql, parameter def create_delete_sql_with_parameter(table_name: str, delete_parameter: dict[str, str]): """DELETE文と削除条件値のパラメータを返す Args: table_name (str): スキーマ完全修飾のテーブル名(例:src05.com_alma) delete_parameter (dict[str, str]): 削除条件に使用するカラム名と値の辞書 Returns: tuple[str, dict]: [0]→DELETE文,[1]→値のパラメータ """ where_clause_list = [] for k in delete_parameter: where_clause_list.append(f'{k} = :{k}') where_clauses = ' AND '.join(where_clause_list) delete_sql = f"DELETE FROM {table_name} WHERE {where_clauses}" return delete_sql, delete_parameter def create_ultmarc_table_mapper_sut(line: DatFileLine, db: Database) -> UltmarcTableMapper: """アルトマークテーブルマッパーのインスタンスを返す Args: line (DatFileLine): テストデータの1行 db (Database): 接続済みDBインスタンス Returns: UltmarcTableMapper: マッパークラス """ layout_class = line.layout_class factory = UltmarcTableMapperFactory() sut = factory.create( layout_class=layout_class, records=line.records, db=db ) return sut def is_valid_date_format(date_str: str, date_format): """日付文字列が、与えられたフォーマットにマッチするかを検査する Args: date_str (str): 日付文字列 date_format (str, optional): 日付のフォーマット Returns: _type_: 正しい日付文字列の場合、True、それ以外はFalse """ if date_str is None: return False try: datetime.strptime(date_str, date_format) return True except ValueError: return False def assert_table_results(actual_rows: list[dict], expect_rows: list[dict], ignore_col_name: list = None) -> None: """テーブル同士の取得結果突き合わせ Args: actual_rows (list[dict]): テスト結果の辞書リスト expect_rows (list[dict]): 期待値の辞書リスト ignore_col_name (list): 比較を無視するDBのカラム名. Default None. """ # 取得件数が一致すること assert len(actual_rows) == len(expect_rows) line_number = 0 # 1行ずつ調査 for actual_row, expect_row in zip(actual_rows, expect_rows): line_number += 1 # 1カラムずつ調査 for actual_col_name, expect_col_name in zip(actual_row, expect_row): # テストメソッド側で個別に確認するものはスキップさせる if ignore_col_name is not None and actual_col_name in ignore_col_name: continue else: actual_value = actual_row[actual_col_name] expect_value = expect_row[expect_col_name] assert actual_value == expect_value, f'{line_number}行目:{actual_col_name}が、期待値と一致すること'