newdwh2021/ecs/jskult-webapp/src/model/internal/master_mainte_csv.py

573 lines
25 KiB
Python

import csv
import json
from io import TextIOWrapper
from datetime import datetime
from abc import ABCMeta, abstractmethod
from src.system_var import constants
from src.util.string_util import is_not_empty
from src.repositories.mst_inst_repository import MstInstRepository
from src.repositories.bu_master_cd_repository import BuMasterRepository
from src.repositories.emp_master_repository import EmpMasterRepository
from src.repositories.emp_chg_inst_repository import EmpChgInstRepository
from src.logging.get_logger import get_logger
logger = get_logger('マスターメンテ')
class MasterMainteCSVItem(metaclass=ABCMeta):
csv_row: list[str]
table_name: str
line_num: str
mst_inst_repository: MstInstRepository
emp_master_repository: EmpMasterRepository
bu_master_repository: BuMasterRepository
emp_chginst_repository: EmpChgInstRepository
def __init__(
self,
csv_row: list[str],
table_name: str,
line_num: str,
mst_inst_repository: MstInstRepository,
emp_master_repository: EmpMasterRepository,
bu_master_repository: BuMasterRepository,
emp_chginst_repository: EmpChgInstRepository
):
self.csv_row = csv_row
self.table_name = table_name
self.line_num = line_num
self.mst_inst_repository = mst_inst_repository
self.emp_master_repository = emp_master_repository
self.bu_master_repository = bu_master_repository
self.emp_chginst_repository = emp_chginst_repository
def validate(self) -> list[str]:
"""
項目のバリデーションを行うテンプレートメソッド\n
各チェックロジックはサブクラスで実装する
エラーが有る場合、[行数、項目名: エラー内容]のリストを返す
"""
error_list = []
# 項目数チェック
error_list.extend(self.check_item_count())
if len(error_list) == 0:
# 必須チェック 及び コメントエラーチェック
error_list.extend(self.check_require())
# 施設コード存在チェック
error_list.extend(self.check_inst_cd_exists())
# MUID存在チェック
error_list.extend(self.check_emp_cd_exists())
# BuCd存在チェック
error_list.extend(self.check_bu_cd_exists())
# 適用開始日 < 適用終了日、実在日チェック
error_list.extend(self.check_existing_date())
# データ存在チェック
error_list.extend(self.check_data_exists())
# エラーのないリストを省いて返す
error_list = [error for error in error_list if len(error) != 0]
return error_list
def check_csv_item_count(self, item_count: int) -> list[str]:
error_list = []
if not len(self.csv_row) == item_count:
error_list.append(f'{self.line_num}行目の項目数が一致しません。項目数を確認してください。')
return error_list
def emp_chg_inst_count(self, start_date: str):
return self.emp_chginst_repository.fetch_count(self.inst_cd, self.ta_cd, start_date, self.table_name)
def is_exist_emp_cd(self, start_date: str) -> bool:
if start_date is None or len(start_date) == 0:
return False
if self.emp_master_repository.fetch_count(self.emp_cd, start_date) == 0:
return True
return False
def is_exist_inst_cd(self) -> bool:
return True if self.mst_inst_repository.fetch_count(self.inst_cd) > 0 else False
def is_exist_bu_cd(self) -> bool:
return True if self.bu_master_repository.fetch_count(self.bu_cd) > 0 else False
def make_require_error_message(self, line_num: str, col_name: str) -> str:
return f'{line_num}行目の{col_name}が入力されておりません。'
def __parse_str_to_date(self, check_date: str) -> tuple[bool, datetime]:
try:
check_date_time: datetime = datetime.strptime(check_date, '%Y%m%d')
except Exception as e:
logger.debug(f'適用期間の日付が不正{e}')
return (False, None)
try:
reverse_check_date: str = check_date_time.strftime('%Y%m%d')
except Exception as e:
logger.debug(f'適用期間の日付が不正{e}')
return (False, None)
if check_date != reverse_check_date:
return (False, None)
return (True, check_date_time)
def check_term_date(self,
start_date: str,
end_date: str,
start_date_col_name: str,
end_date_col_name: str) -> tuple[list[str], datetime, datetime]:
error_list = []
start_date_time: datetime = None
end_date_time: datetime = None
if is_not_empty(start_date):
(result, start_date_time) = self.__parse_str_to_date(start_date)
if result is False:
error_list.append(f'{self.line_num}行目の{start_date_col_name}が実在しない日付になっています。')
if is_not_empty(end_date):
(result, end_date_time) = self.__parse_str_to_date(end_date)
if result is False:
error_list.append(f'{self.line_num}行目の{end_date_col_name}が実在しない日付になっています。')
return (error_list, start_date_time, end_date_time)
def get_csv_value(self, column_no: int):
try:
column_value = self.csv_row[column_no]
except IndexError:
column_value = ''
return column_value
@abstractmethod
def csv_row_data(self) -> dict:
pass
...
@abstractmethod
def check_require(self) -> list[str]:
"""必須チェック"""
pass
...
@abstractmethod
def check_inst_cd_exists(self) -> list[str]:
"""InstCD存在チェック"""
pass
...
@abstractmethod
def check_emp_cd_exists(self) -> list[str]:
"""MUID存在チェック"""
pass
...
@abstractmethod
def check_bu_cd_exists(self) -> list[str]:
"""BuCd存在チェック"""
pass
...
@abstractmethod
def check_existing_date(self) -> list[str]:
"""適用開始日 < 適用終了日、実在日チェック"""
@abstractmethod
def check_item_count(self) -> list[str]:
"""項目数チェック"""
pass
...
@abstractmethod
def check_data_exists(self) -> list[str]:
"""データ存在チェック"""
pass
...
class MasterMainteNewInstEmpCSVItem(MasterMainteCSVItem):
"""新規施設担当者登録CSV"""
inst_name: str
emp_name_family: str
emp_name_first: str
start_date: str
end_date: str
def __init__(
self,
csv_row: list[str],
table_name: str,
line_num: str,
mst_inst_repository: MstInstRepository,
emp_master_repository: EmpMasterRepository,
bu_master_repository: BuMasterRepository,
emp_chginst_repository: EmpChgInstRepository
):
super().__init__(
csv_row,
table_name,
line_num,
mst_inst_repository,
emp_master_repository,
bu_master_repository,
emp_chginst_repository
)
self.inst_cd = super().get_csv_value(constants.CSV_NEW_INST_CD_COL_NO)
self.inst_name = super().get_csv_value(constants.CSV_NEW_INST_NAME_COL_NO)
self.ta_cd = super().get_csv_value(constants.CSV_NEW_TA_CD_COL_NO)
self.emp_cd = super().get_csv_value(constants.CSV_NEW_EMP_CD_COL_NO)
self.emp_name_family = super().get_csv_value(constants.CSV_NEW_EMP_NAME_FAMILY_COL_NO)
self.emp_name_first = super().get_csv_value(constants.CSV_NEW_EMP_NAME_FIRST_COL_NO)
self.bu_cd = super().get_csv_value(constants.CSV_NEW_BU_CD_COL_NO)
self.start_date = super().get_csv_value(constants.CSV_NEW_START_DATE)
self.end_date = super().get_csv_value(constants.CSV_NEW_END_DATE)
def csv_row_data(self) -> dict:
return {constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[i]: self.csv_row[i] for i in range(len(self.csv_row))}
def check_require(self) -> list[str]:
error_list = []
if len(self.inst_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_INST_CD_COL_NO]))
if len(self.ta_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_TA_CD_COL_NO]))
if len(self.emp_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_EMP_CD_COL_NO]))
if len(self.bu_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_BU_CD_COL_NO]))
if len(self.start_date) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_START_DATE]))
if len(self.end_date) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_END_DATE]))
return error_list
def check_inst_cd_exists(self) -> list[str]:
error_list = []
if is_not_empty(self.inst_cd) and super().is_exist_inst_cd() is False:
error_list.append(
f'{self.line_num}行目の{constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_INST_CD_COL_NO]}\
は施設マスタに存在しないコードです。')
return error_list
def check_emp_cd_exists(self) -> list[str]:
error_list = []
if not self.start_date or not self.emp_cd:
return error_list
if super().is_exist_emp_cd(self.start_date) is True:
error_list.append(f'{self.line_num}行目の{constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_EMP_CD_COL_NO]}\
は従業員マスタに存在しない もしくは 適用期間外のIDです。')
return error_list
def check_bu_cd_exists(self) -> list[str]:
error_list = []
if is_not_empty(self.bu_cd) and super().is_exist_bu_cd() is False:
error_list.append(f'{self.line_num}行目の{constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_BU_CD_COL_NO]}\
はビジネスユニットマスタに存在しないコードです。')
return error_list
def check_existing_date(self) -> list[str]:
error_list = []
if not self.start_date or not self.end_date:
return error_list
(error_list, start_date_time, end_date_time) = super().check_term_date(
self.start_date,
self.end_date,
constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_START_DATE],
constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_END_DATE])
if len(error_list) > 0:
return error_list
if start_date_time > end_date_time:
error_list.append(f'{self.line_num}行目の{constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_START_DATE]}\
{constants.NEW_INST_EMP_CSV_LOGICAL_NAMES[constants.CSV_NEW_END_DATE]}よりも後の日付になっています。')
return error_list
def check_item_count(self) -> list[str]:
return super().check_csv_item_count(len(constants.NEW_INST_EMP_CSV_LOGICAL_NAMES))
def check_data_exists(self) -> list[str]:
error_list = []
if super().emp_chg_inst_count(self.start_date) > 0:
error_list.append(f'{self.line_num}行目の施設コード、領域コード、適用開始日がすべて同一のデータが既に登録されています。')
return error_list
class MasterMainteChangeInstEmpCSVItem(MasterMainteCSVItem):
"""施設担当者変更登録CSV"""
bu_name: str
org_cd: str
org_short_name: str
inst_name: str
explain: str
emp_full_name: str
inst_emp_start_date: str
inst_emp_end_date: str
change_end_date: str
comment: str
def __init__(
self,
csv_row: list[str],
table_name: str,
line_num: str,
mst_inst_repository: MstInstRepository,
emp_master_repository: EmpMasterRepository,
bu_master_repository: BuMasterRepository,
emp_chginst_repository: EmpChgInstRepository
):
super().__init__(
csv_row,
table_name,
line_num,
mst_inst_repository,
emp_master_repository,
bu_master_repository,
emp_chginst_repository
)
self.bu_cd = super().get_csv_value(constants.CSV_CHANGE_BU_CD_COL_NO)
self.bu_name = super().get_csv_value(constants.CSV_CHANGE_BU_NAME_COL_NO)
self.org_cd = super().get_csv_value(constants.CSV_CHANGE_ORG_CD_COL_NO)
self.org_short_name = super().get_csv_value(constants.CSV_CHANGE_ORG_SHORT_NAME_COL_NO)
self.inst_cd = super().get_csv_value(constants.CSV_CHANGE_INST_CD_COL_NO)
self.inst_name = super().get_csv_value(constants.CSV_CHANGE_INST_NAME_COL_NO)
self.ta_cd = super().get_csv_value(constants.CSV_CHANGE_TA_CD_COL_NO)
self.explain = super().get_csv_value(constants.CSV_CHANGE_EXPLAIN_COL_NO)
self.emp_cd = super().get_csv_value(constants.CSV_CHANGE_EMP_CD_COL_NO)
self.emp_full_name = super().get_csv_value(constants.CSV_CHANGE_EMP_FULL_NAME_COL_NO)
self.inst_emp_start_date = super().get_csv_value(constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO)
self.inst_emp_end_date = super().get_csv_value(constants.CSV_CHANGE_INST_EMP_END_DATE_COL_NO)
self.change_end_date = super().get_csv_value(constants.CSV_CHANGE_CHANGE_END_DATE_COL_NO)
self.comment = super().get_csv_value(constants.CSV_CHANGE_COMMENT)
def csv_row_data(self) -> dict:
return {constants.CHANGE_INST_CSV_LOGICAL_NAMES[i]: self.csv_row[i] for i in range(len(self.csv_row))}
def check_require(self) -> list[str]:
error_list = []
if self.comment == '追加':
if len(self.bu_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_BU_CD_COL_NO]))
if len(self.inst_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_CD_COL_NO]))
if len(self.ta_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_TA_CD_COL_NO]))
if len(self.emp_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_EMP_CD_COL_NO]))
if len(self.inst_emp_start_date) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[
constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO]))
if len(self.inst_emp_end_date) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[
constants.CSV_CHANGE_INST_EMP_END_DATE_COL_NO]))
elif self.comment == '終了':
if len(self.inst_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_CD_COL_NO]))
if len(self.ta_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_TA_CD_COL_NO]))
if len(self.inst_emp_start_date) == 0:
error_list.append(self.make_require_error_message(
self.line_num,
constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO]))
if len(self.change_end_date) == 0:
error_list.append(self.make_require_error_message(self.line_num,
constants.CHANGE_INST_CSV_LOGICAL_NAMES[
constants.CSV_CHANGE_CHANGE_END_DATE_COL_NO]))
elif self.comment == '担当者修正':
if len(self.inst_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_CD_COL_NO]))
if len(self.ta_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_TA_CD_COL_NO]))
if len(self.emp_cd) == 0:
error_list.append(self.make_require_error_message(
self.line_num, constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_EMP_CD_COL_NO]))
if len(self.inst_emp_start_date) == 0:
error_list.append(self.make_require_error_message(self.line_num,
constants.CHANGE_INST_CSV_LOGICAL_NAMES[
constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO]))
else:
error_list.append(f'{self.line_num}行目のコメントが不正です。 「追加」「終了」「担当者修正」のいずれかを入力してください。')
return error_list
def check_inst_cd_exists(self) -> list[str]:
error_list = []
if is_not_empty(self.inst_cd) and super().is_exist_inst_cd() is False:
error_list.append(f'{self.line_num}行目の{constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_CD_COL_NO]}\
は施設マスタに存在しないコードです。')
return error_list
def check_emp_cd_exists(self) -> list[str]:
error_list = []
if not self.inst_emp_start_date or not self.emp_cd:
return error_list
if self.comment != '追加' and self.comment != '担当者修正':
return error_list
if super().is_exist_emp_cd(self.inst_emp_start_date) is True:
error_list.append(f'{self.line_num}行目の{constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_EMP_CD_COL_NO]}\
は従業員マスタに存在しない もしくは 適用期間外のIDです。')
return error_list
def check_bu_cd_exists(self) -> list[str]:
error_list = []
if is_not_empty(self.bu_cd) and self.comment == '追加'\
and super().is_exist_bu_cd() is False:
error_list.append(f'{self.line_num}行目の{constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_BU_CD_COL_NO]}\
はビジネスユニットマスタに存在しないコードです。')
return error_list
def check_existing_date(self) -> list[str]:
error_list = []
start_date = self.inst_emp_start_date
if self.comment == '追加' or self.comment == '終了':
if self.comment == '追加':
end_date = self.inst_emp_end_date
end_date_col_name = constants.CHANGE_INST_CSV_LOGICAL_NAMES[
constants.CSV_CHANGE_INST_EMP_END_DATE_COL_NO]
compare_error_message = f'\
{constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO]}\
{end_date_col_name}よりも後の日付になっています。'
else:
end_date = self.change_end_date
end_date_col_name = constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_CHANGE_END_DATE_COL_NO]
compare_error_message = f'\
{constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO]}\
{end_date_col_name}よりも後の日付になっています。'
if not start_date or not end_date:
return error_list
(error_list, start_date_time, end_date_time) = super().check_term_date(
start_date,
end_date,
constants.CHANGE_INST_CSV_LOGICAL_NAMES[constants.CSV_CHANGE_INST_EMP_START_DATE_COL_NO],
end_date_col_name)
if len(error_list) > 0:
return error_list
if start_date_time > end_date_time:
error_list.append(f'{self.line_num}行目の{compare_error_message}')
return error_list
def check_item_count(self) -> list[str]:
return super().check_csv_item_count(len(constants.CHANGE_INST_CSV_LOGICAL_NAMES))
def check_data_exists(self) -> list[str]:
error_list = []
emp_chg_inst_count = super().emp_chg_inst_count(self.inst_emp_start_date)
if self.comment == '追加' and emp_chg_inst_count > 0:
error_list.append(f'{self.line_num}行目の施設コード、領域コード、施設担当_開始日がすべて同一のデータが既に登録されています。')
elif (self.comment == '終了' or self.comment == '担当者修正') and emp_chg_inst_count == 0:
error_list.append(f'{self.line_num}行目の施設コード、領域コード、施設担当_開始日がすべて同一のデータが存在しないため更新できません。')
return error_list
class MasterMainteCSVItems:
"""施設担当者CSVをループで回すためのもの"""
lines: list[MasterMainteCSVItem]
__i: int = 0
def to_json(self):
# CSVをjsonに変換
csv_row_dict_list: list[dict] = self.to_dict()
# json作成
return json.dumps(csv_row_dict_list, ensure_ascii=False)
def to_dict(self):
return [row_item.csv_row_data() for row_item in self.lines]
def __iter__(self):
return self
def __next__(self) -> MasterMainteCSVItem:
if self.__i == len(self.lines):
raise StopIteration()
line = self.lines[self.__i]
self.__i += 1
return line
def __init__(
self,
file: TextIOWrapper,
select_function: str,
table_name: str,
mst_inst_repository: MstInstRepository,
emp_master_repository: EmpMasterRepository,
bu_master_repository: BuMasterRepository,
emp_chginst_repository: EmpChgInstRepository
) -> None:
reader = csv.reader(file)
csv_rows = []
for line_num, row in enumerate(reader, start=0):
if line_num == 0:
continue
csv_rows.append(self.__select_function(
select_function,
row,
table_name,
line_num,
mst_inst_repository,
emp_master_repository,
bu_master_repository,
emp_chginst_repository))
self.lines = csv_rows
def __select_function(self,
function_type: str,
row: list[str],
table_name: str,
line_num: int,
mst_inst_repository: MstInstRepository,
emp_master_repository: EmpMasterRepository,
bu_master_repository: BuMasterRepository,
emp_chginst_repository: EmpChgInstRepository) -> MasterMainteCSVItem:
if function_type == 'new':
return MasterMainteNewInstEmpCSVItem(
row,
table_name,
str(line_num),
mst_inst_repository,
emp_master_repository,
bu_master_repository,
emp_chginst_repository)
elif function_type == 'change':
return MasterMainteChangeInstEmpCSVItem(
row,
table_name,
str(line_num),
mst_inst_repository,
emp_master_repository,
bu_master_repository,
emp_chginst_repository)