194 lines
6.8 KiB
Python

from src.constants import DATE_PATTERN_YYYYMMDDTHHMMSSTZ
from src.util.dict_checker import DictCheck
class FetchTargetObjects():
def __init__(self, object_info_file_dict) -> None:
self.__dict_check = DictCheck()
self.__objects = object_info_file_dict
self.__key = 'objects'
self.check_key_objects()
self.__i = 0
def __iter__(self):
return self
def __next__(self):
if self.__i == len(self.__objects[self.__key]):
raise StopIteration()
value = self.__objects['objects'][self.__i]
self.__i += 1
return value
def check_key_objects(self) -> None:
__check_key = self.__key
__check_type = list
self.__dict_check.check_key_exist_and_value_type(
self.__objects, __check_key, __check_type)
class TargetObject():
def __init__(self, object_info, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.__object_info = object_info
self.execute_datetime = execute_datetime
self.check_key_object_name()
self.check_key_columns()
self.check_key_is_skip()
self.check_key_is_update_last_fetch_datetime()
self.check_key_last_fetch_datetime_file_name()
self.check_key_upload_file_name()
self.object_name = self.__object_info['object_name']
self.columns = self.__object_info['columns']
self.is_skip = self.set_is_skip()
self.is_update_last_fetch_datetime = self.set_is_update_last_fetch_datetime()
self.last_fetch_datetime_file_name = self.set_fetch_datetime_file_name()
self.upload_file_name = self.set_upload_file_name()
def check_key_object_name(self) -> None:
'''
オブジェクト名チェック
'''
__check_key = 'object_name'
__check_type = str
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_columns(self) -> None:
'''
カラム情報チェック
'''
__key = 'columns'
__type = list
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __key, __type)
return
def check_key_is_skip(self,) -> None:
'''
スキップフラグ型チェック
'''
__check_key = 'is_skip'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_is_update_last_fetch_datetime(self) -> None:
'''
前回取得日時ファイル更新フラグチェック
'''
__check_key = 'is_update_last_fetch_datetime'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_last_fetch_datetime_file_name(self) -> None:
'''
前回取得日時ファイル名型チェック
'''
__check_key = 'last_fetch_datetime_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_upload_file_name(self) -> None:
'''
アップロードファイル名称型チェック
'''
__check_key = 'upload_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def set_is_skip(self) -> bool:
'''
スキップフラグ設定
'''
__check_key = 'is_skip'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
def set_is_update_last_fetch_datetime(self) -> bool:
'''
前回取得日時ファイル更新フラグ設定
'''
__check_key = 'is_update_last_fetch_datetime'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
def set_fetch_datetime_file_name(self) -> str:
'''
前回取得日時ファイル名設定
'''
__check_key = 'last_fetch_datetime_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return self.__object_info['object_name'] + '.json'
def set_upload_file_name(self) -> str:
'''
アップロードファイル名称設定
'''
__check_key = 'upload_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key].format(execute_datetime=self.execute_datetime)
else:
return 'CRM_' + self.__object_info['object_name'] + '_' + self.execute_datetime
class LastFetchDatetime():
def __init__(self, last_fetch_datetime_file_name, last_fetch_datetime_file_dict, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.execute_datetime = execute_datetime
self.__last_fetch_datetime_file_dict = last_fetch_datetime_file_dict
self.last_fetch_datetime_file_name = last_fetch_datetime_file_name
self.check_key_last_fetch_datetime_from
self.check_key_last_fetch_datetime_to
self.last_fetch_datetime_from = self.__last_fetch_datetime_file_dict[
'last_fetch_datetime_from']
self.last_fetch_datetime_to = self.set_last_fetch_datetime_to()
def check_key_last_fetch_datetime_from(self) -> None:
'''
データ取得開始日時チェック
'''
__check_key = 'last_fetch_datetime_from'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_and_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
def check_key_last_fetch_datetime_to(self) -> None:
'''
データ取得終了日時チェック
'''
__check_key = 'last_fetch_datetime_to'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_case_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
def set_last_fetch_datetime_to(self) -> str:
'''
データ取得終了日時設定
'''
__check_key = 'last_fetch_datetime_to'
if self.__dict_check.check_key_exist(self.__last_fetch_datetime_file_dict, __check_key):
return self.__last_fetch_datetime_file_dict[__check_key].format(execute_datetime=self.execute_datetime)
else:
return self.execute_datetime