fix: ソースコードレビューの指摘反映

This commit is contained in:
Y_SAKAI 2022-07-27 23:28:28 +09:00
parent 440e6c8fd0
commit a41a40004b
29 changed files with 3947 additions and 680 deletions

View File

@ -1,4 +1,5 @@
import src.controller as controller
from src.controller import controller
"""CRMデータ取得処理のエントリーポイント"""
if __name__ == '__main__':
controller.main()
controller()

View File

@ -1,7 +1,7 @@
import json
import boto3
from src.system_var.constants import AWS_CLINET_S3, AWS_RESOURCE_S3, S3_RESPONSE_BODY
from src.system_var.constants import AWS_RESOURCE_S3, S3_RESPONSE_BODY, S3_CHAR_CODE
from src.system_var.environments import (CRM_BACKUP_BUCKET, CRM_CONFIG_BUCKET,
CRM_IMPORT_DATA_BACKUP_FOLDER,
CRM_IMPORT_DATA_FOLDER, IMPORT_DATA_BUCKET,
@ -10,16 +10,6 @@ from src.system_var.environments import (CRM_BACKUP_BUCKET, CRM_CONFIG_BUCKET,
RESPONSE_JSON_BACKUP_FOLDER)
class S3Clinet:
def __init__(self) -> None:
self.__s3_client = boto3.client(AWS_CLINET_S3)
def copy_object(self, src_bucket, src_key, dest_bucket, dest_key):
self.__s3_client.copy(
{"Bucket": src_bucket, "key": src_key}, dest_bucket, dest_key)
return
class S3Resource:
def __init__(self, bucket_name: str) -> None:
self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
@ -28,41 +18,47 @@ class S3Resource:
def get_object(self, object_key: str) -> str:
response = self.__s3_bucket.Object(object_key).get()
body = response[S3_RESPONSE_BODY].read()
return body.decode(S3_CHAR_CODE)
return body.decode('utf-8')
def put_object(self, object_key: str, data) -> str:
def put_object(self, object_key: str, data: str) -> None:
s3_object = self.__s3_bucket.Object(object_key)
s3_object.put(Body=data.encode('utf-8'), ContentEncoding='utf-8')
s3_object.put(Body=data.encode(S3_CHAR_CODE), ContentEncoding=S3_CHAR_CODE)
return
class S3ResourceNonBucket:
def __init__(self) -> None:
self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
def copy(self, src_bucket, src_key, dest_bucket, dest_key) -> None:
def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_resource.meta.client.copy(copy_source, dest_bucket, dest_key)
return
#class S3ResourceNonBucket:
# def __init__(self) -> None:
# self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
#
# def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
# copy_source = {'Bucket': src_bucket, 'Key': src_key}
# self.__s3_resource.meta.client.copy(copy_source, dest_bucket, dest_key)
# return
class ConfigBucket:
__s3_resource: S3Resource = None
def __init__(self) -> None:
self.__s3_resource = S3Resource(CRM_CONFIG_BUCKET)
def __str__(self) -> str:
return CRM_CONFIG_BUCKET
def get_object_info_file(self) -> str:
return self.__s3_resource.get_object(f'{OBJECT_INFO_FOLDER}/{OBJECT_INFO_FILENAME}')
def get_last_fetch_datetime_file(self, file_name) -> str:
return self.__s3_resource.get_object(f'{LAST_FETCH_DATE_FOLDER}/{file_name}')
def get_last_fetch_datetime_file(self, file_path: str) -> str:
return self.__s3_resource.get_object(f'{LAST_FETCH_DATE_FOLDER}/{file_path}')
def put_last_fetch_datetime_file(self, file_name, data) -> None:
def put_last_fetch_datetime_file(self, file_path: str, data: str) -> None:
self.__s3_resource.put_object(
f'{LAST_FETCH_DATE_FOLDER}/{file_name}', data)
f'{LAST_FETCH_DATE_FOLDER}/{file_path}', data)
return
@ -72,11 +68,19 @@ class DataBucket:
def __init__(self) -> None:
self.__s3_resource = S3Resource(IMPORT_DATA_BUCKET)
def put_csv(self, file_name, data) -> None:
object_key = f'{CRM_IMPORT_DATA_FOLDER}/{file_name}'
def __str__(self) -> str:
return IMPORT_DATA_BUCKET
def put_csv(self, file_path: str, data: str) -> None:
object_key = f'{CRM_IMPORT_DATA_FOLDER}/{file_path}'
self.__s3_resource.put_object(object_key, data)
return
def put_csv_from(self, src_bucket: str, src_key: str):
self.__s3_resource.copy(src_bucket, src_key, str(self), CRM_IMPORT_DATA_FOLDER)
return
class BackupBucket:
__s3_resource: S3Resource = None
@ -84,16 +88,20 @@ class BackupBucket:
def __init__(self) -> None:
self.__s3_resource = S3Resource(CRM_BACKUP_BUCKET)
def put_response_json(self, file_name, data) -> None:
object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_name}'
def __str__(self) -> str:
return CRM_BACKUP_BUCKET
def put_response_json(self, file_path: str, data: dict) -> None:
object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_path}'
self.__s3_resource.put_object(object_key, json.dumps(data))
return
def put_csv_bk(self, file_name, data) -> None:
object_key = f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{file_name}'
def put_csv(self, file_path: str, data: str) -> None:
object_key = f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{file_path}'
self.__s3_resource.put_object(object_key, data)
return
def put_result_json(self, file_name, data) -> None:
object_key = f'{PROCESS_RESULT_FOLDER}/{file_name}'
def put_result_json(self, file_path: str, data: dict) -> None:
object_key = f'{PROCESS_RESULT_FOLDER}/{file_path}'
self.__s3_resource.put_object(object_key, json.dumps(data))
return

View File

@ -0,0 +1,58 @@
from src.aws.s3 import BackupBucket
from src.config.objects import TargetObject
from src.system_var.constants import CSVBK_JP_NAME
from src.error.exceptions import FileUploadException
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def backup_crm_csv_data_process(target_object: TargetObject, exetute_datetime: ExecuteDateTime, csv_string: str):
"""
CSVバックアップ処理
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
csv_string : str
csvデータ
Returns
-------
なし
Raises
------
FileUploadException
S3のファイルアップロード失敗
"""
# ① CSVバックアップ処理の開始ログを出力する
target_object_name = target_object.object_name
upload_file_name = target_object.upload_file_name
logger.info(
f'I-CSVBK-01 [{target_object_name}] のCSVデータのバックアップ処理を開始します ファイル名:[{upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに、変換後のCSVデータのバックアップを保管する
backup_bucket = BackupBucket()
backup_bucket.put_csv(
f'{exetute_datetime.to_path()}/{upload_file_name}.csv', csv_string)
logger.debug(
f'D-CSVBK-02 [{target_object_name}] のCSVデータバックアップ 正常終了')
except Exception as e:
raise FileUploadException(
'E-CSVBK-01',
CSVBK_JP_NAME, f'[{target_object_name}] CSVデータのバックアップに失敗しました ファイル名:[{upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVバックアップ処理の終了ログを出力する
logger.info(
f'I-CSVBK-03 [{target_object_name}] のCSVデータのバックアップ処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -1,31 +0,0 @@
from src.aws.s3 import BackupBucket
from src.system_var.constants import CSVBK_JP_NAME
from src.error.exceptions import FileUploadException
from src.util.logger import logger_instance as logger
def backup_crm_csvdata(target_object, date_path, csv_object):
# ① CSVバックアップ処理の開始ログを出力する
logger.info(
f'I-CSVBK-01 [{target_object.object_name}] のCSVデータのバックアップ処理を開始します ファイル名:[{target_object.upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに、変換後のCSVデータのバックアップを保管する
backup_bucket = BackupBucket()
backup_bucket.put_csv_bk(
f'{date_path}/{target_object.upload_file_name}.csv', csv_object.csv_buffer)
logger.debug(
f'D-CSVBK-02 [{target_object.object_name}] のCSVデータバックアップ 正常終了')
except Exception as e:
raise FileUploadException(
'E-CSVBK-01',
CSVBK_JP_NAME, f'[{target_object.object_name}] CSVデータのバックアップに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVバックアップ処理の終了ログを出力する
logger.info(
f'I-CSVBK-03 [{target_object.object_name}] のCSVデータのバックアップ処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -1,19 +1,42 @@
from src.aws.s3 import BackupBucket
from src.system_var.constants import RESBK_JP_NAME
from src.error.exceptions import FileUploadException
from src.system_var.constants import RESBK_JP_NAME
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def backup_crm_data(object_name, sf_object_jsons, date_path):
def backup_crm_data_process(object_name: str, sf_object_dict: dict, execute_datetime: ExecuteDateTime):
"""
CRM電文データバックアップ処理
Parameters
----------
object_name : str
取得対象オブジェクト情報インスタンス
sf_object_dict : dict
Salesforceオブジェクトデータ
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
Returns
-------
なし
Raises
------
FileUploadException
S3のファイルアップロード失敗
"""
# ① CRM電文データバックアップ処理の開始ログを出力する
logger.info(f'I-RESBK-01 [{object_name}] のCRM電文データバックアップ処理を開始します')
try:
# ② CRMバックアップ保管用バケットに、CRMから取得したJSONの電文データのバックアップを保管する
file_name = f'{date_path}/{object_name}.json'
file_name = f'{execute_datetime.to_path()}/{object_name}.json'
backup_bucket = BackupBucket()
backup_bucket.put_response_json(file_name, sf_object_jsons)
backup_bucket.put_response_json(file_name, sf_object_dict)
logger.debug(f'D-RESBK-02 [{object_name}] のJSONデータバックアップ 正常終了')

View File

@ -1,10 +1,32 @@
from src.config.objects import TargetObject
from src.system_var.constants import CHK_JP_NAME
from src.error.exceptions import InvalidConfigException
from src.util.execute_datetime import ExecuteDateTime
from src.system_var.constants import CHK_JP_NAME
from src.util.logger import logger_instance as logger
def check_object_info(object_info, execute_datetime):
def check_object_info_process(object_info: dict, execute_datetime: ExecuteDateTime):
"""
オブジェクト情報形式チェック処理
Parameters
----------
object_info : dict
取得対象オブジェクト情報
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
Returns
-------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
Raises
------
InvalidConfigException
オブジェクト情報定義が不正だった場合
"""
# ① オブジェクト情報形式チェック処理開始ログを出力する
logger.info('I-CHK-01 オブジェクト情報形式チェック処理を開始します')

View File

@ -1,193 +1,138 @@
from src.system_var.constants import DATE_PATTERN_YYYYMMDDTHHMMSSTZ
from src.util.dict_checker import DictCheck
from src.system_var.constants import ( DATE_PATTERN_YYYYMMDDTHHMMSSTZ,
OBJECTS_KEY,
OBJECTS_TYPE,
OBJECT_NAME_KEY,
OBJECT_NAME_TYPE,
COLUMNS_KEY,
COLUMNS_TYPE,
IS_SKIP_KEY,
IS_SKIP_TYPE,
IS_UPDATE_LAST_FETCH_DATETIME_KEY,
IS_UPDATE_LAST_FETCH_DATETIME_TYPE,
LAST_FETCH_DATETIME_FILE_NAME_KEY,
LAST_FETCH_DATETIME_FILE_NAME_TYPE,
UPLOAD_FILE_NAME_KEY,
UPLOAD_FILE_NAME_TYPE,
DATETIME_COLUMN_KEY,
DATETIME_COLUMN_TYPE,
LAST_FETCH_DATETIME_FROM_KEY,
LAST_FETCH_DATETIME_TO_KEY,
DATETIME_COLUMN_DEFAULT_VALUE
)
from src.util.dict_checker import DictChecker
class FetchTargetObjects():
def __init__(self, object_info_file_dict) -> None:
self.__dict_check = DictCheck()
self.__objects = object_info_file_dict
self.__key = 'objects'
self.check_key_objects()
self.__dict_checker = DictChecker(self.__objects)
self.validate()
self.__i = 0
def __iter__(self):
return self
def __next__(self):
if self.__i == len(self.__objects[self.__key]):
if self.__i == len(self.__objects[OBJECTS_KEY]):
raise StopIteration()
value = self.__objects['objects'][self.__i]
value = self.__objects[OBJECTS_KEY][self.__i]
self.__i += 1
return value
def check_key_objects(self) -> None:
__check_key = self.__key
__check_type = list
self.__dict_check.check_key_exist_and_value_type(
self.__objects, __check_key, __check_type)
def validate(self) -> None:
self.__dict_checker.assert_key_exist(OBJECTS_KEY)
self.__dict_checker.assert_data_type(OBJECTS_KEY, OBJECTS_TYPE)
class TargetObject():
def __init__(self, object_info, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.__dict_checker = DictChecker(object_info)
self.__object_info = object_info
self.execute_datetime = execute_datetime
self.check_key_object_name()
self.check_key_columns()
self.check_key_is_skip()
self.check_key_is_update_last_fetch_datetime()
self.check_key_last_fetch_datetime_file_name()
self.check_key_upload_file_name()
self.object_name = self.__object_info['object_name']
self.columns = self.__object_info['columns']
self.is_skip = self.set_is_skip()
self.is_update_last_fetch_datetime = self.set_is_update_last_fetch_datetime()
self.last_fetch_datetime_file_name = self.set_fetch_datetime_file_name()
self.upload_file_name = self.set_upload_file_name()
self.__execute_datetime = execute_datetime
self.__validate()
def check_key_object_name(self) -> None:
'''
オブジェクト名チェック
'''
__check_key = 'object_name'
__check_type = str
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __check_key, __check_type)
def __validate(self) -> None:
self.__validate_required_properties()
self.__validate_optional_properties()
return
def check_key_columns(self) -> None:
'''
カラム情報チェック
'''
__key = 'columns'
__type = list
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __key, __type)
def __validate_required_properties(self) -> None:
self.__dict_checker.assert_key_exist(OBJECT_NAME_KEY)
self.__dict_checker.assert_data_type(OBJECT_NAME_KEY, OBJECT_NAME_TYPE)
self.__dict_checker.assert_key_exist(COLUMNS_KEY)
self.__dict_checker.assert_data_type(COLUMNS_KEY, COLUMNS_TYPE)
return
def check_key_is_skip(self,) -> None:
'''
スキップフラグ型チェック
'''
__check_key = 'is_skip'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
def __validate_optional_properties(self) -> None:
if self.__dict_checker.check_key_exist(IS_SKIP_KEY):
self.__dict_checker.assert_data_type(IS_SKIP_KEY, IS_SKIP_TYPE)
if self.__dict_checker.check_key_exist(IS_UPDATE_LAST_FETCH_DATETIME_KEY):
self.__dict_checker.assert_data_type(IS_UPDATE_LAST_FETCH_DATETIME_KEY, IS_UPDATE_LAST_FETCH_DATETIME_TYPE)
if self.__dict_checker.check_key_exist(LAST_FETCH_DATETIME_FILE_NAME_KEY):
self.__dict_checker.assert_data_type(LAST_FETCH_DATETIME_FILE_NAME_KEY, LAST_FETCH_DATETIME_FILE_NAME_TYPE)
if self.__dict_checker.check_key_exist(UPLOAD_FILE_NAME_KEY):
self.__dict_checker.assert_data_type(UPLOAD_FILE_NAME_KEY, UPLOAD_FILE_NAME_TYPE)
if self.__dict_checker.check_key_exist(DATETIME_COLUMN_KEY):
self.__dict_checker.assert_data_type(DATETIME_COLUMN_KEY, DATETIME_COLUMN_TYPE)
return
def check_key_is_update_last_fetch_datetime(self) -> None:
'''
前回取得日時ファイル更新フラグチェック
'''
__check_key = 'is_update_last_fetch_datetime'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
@property
def object_name(self) -> str:
return self.__object_info[OBJECT_NAME_KEY]
return
@property
def columns(self) -> list:
return self.__object_info[COLUMNS_KEY]
def check_key_last_fetch_datetime_file_name(self) -> None:
'''
前回取得日時ファイル名型チェック
'''
__check_key = 'last_fetch_datetime_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
@property
def is_skip(self) -> bool:
return self.__object_info[IS_SKIP_KEY] if self.__dict_checker.check_key_exist(IS_SKIP_KEY) else False
return
@property
def is_update_last_fetch_datetime(self) -> bool:
return self.__object_info[IS_UPDATE_LAST_FETCH_DATETIME_KEY] if self.__dict_checker.check_key_exist(IS_UPDATE_LAST_FETCH_DATETIME_KEY) else False
def check_key_upload_file_name(self) -> None:
'''
アップロードファイル名称型チェック
'''
__check_key = 'upload_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
@property
def last_fetch_datetime_file_name(self) -> str:
return self.__object_info[LAST_FETCH_DATETIME_FILE_NAME_KEY] if self.__dict_checker.check_key_exist(LAST_FETCH_DATETIME_FILE_NAME_KEY) else f'{self.__object_info[OBJECT_NAME_KEY]}.json'
return
def set_is_skip(self) -> bool:
'''
スキップフラグ設定
'''
__check_key = 'is_skip'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
@property
def upload_file_name(self) -> str:
return self.__object_info[UPLOAD_FILE_NAME_KEY].format(execute_datetime=self.__execute_datetime) if self.__dict_checker.check_key_exist(UPLOAD_FILE_NAME_KEY) else f'{self.__object_info[OBJECT_NAME_KEY]}_{self.__execute_datetime}'
def set_is_update_last_fetch_datetime(self) -> bool:
'''
前回取得日時ファイル更新フラグ設定
'''
__check_key = 'is_update_last_fetch_datetime'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
def set_fetch_datetime_file_name(self) -> str:
'''
前回取得日時ファイル名設定
'''
__check_key = 'last_fetch_datetime_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return self.__object_info['object_name'] + '.json'
def set_upload_file_name(self) -> str:
'''
アップロードファイル名称設定
'''
__check_key = 'upload_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key].format(execute_datetime=self.execute_datetime)
else:
return 'CRM_' + self.__object_info['object_name'] + '_' + self.execute_datetime
@property
def datetime_column(self) -> str:
return self.__object_info[DATETIME_COLUMN_KEY] if self.__dict_checker.check_key_exist(DATETIME_COLUMN_KEY) else DATETIME_COLUMN_DEFAULT_VALUE
class LastFetchDatetime():
def __init__(self, last_fetch_datetime_file_name, last_fetch_datetime_file_dict, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.execute_datetime = execute_datetime
def __init__(self, last_fetch_datetime_file_dict, execute_datetime) -> None:
self.__dict_checker = DictChecker(last_fetch_datetime_file_dict)
self.__execute_datetime = execute_datetime
self.__last_fetch_datetime_file_dict = last_fetch_datetime_file_dict
self.last_fetch_datetime_file_name = last_fetch_datetime_file_name
self.check_key_last_fetch_datetime_from
self.check_key_last_fetch_datetime_to
self.last_fetch_datetime_from = self.__last_fetch_datetime_file_dict[
'last_fetch_datetime_from']
self.last_fetch_datetime_to = self.set_last_fetch_datetime_to()
self.__validate()
def check_key_last_fetch_datetime_from(self) -> None:
'''
データ取得開始日時チェック
'''
__check_key = 'last_fetch_datetime_from'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_and_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
def __validate(self) -> None:
if self.__dict_checker.check_key_exist(LAST_FETCH_DATETIME_FROM_KEY):
self.__dict_checker.assert_match_pattern(LAST_FETCH_DATETIME_FROM_KEY, DATE_PATTERN_YYYYMMDDTHHMMSSTZ)
if self.__dict_checker.check_key_exist(LAST_FETCH_DATETIME_TO_KEY):
self.__dict_checker.assert_match_pattern(LAST_FETCH_DATETIME_TO_KEY, DATE_PATTERN_YYYYMMDDTHHMMSSTZ)
def check_key_last_fetch_datetime_to(self) -> None:
'''
データ取得終了日時チェック
'''
__check_key = 'last_fetch_datetime_to'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_case_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
return
def set_last_fetch_datetime_to(self) -> str:
'''
データ取得終了日時設定
'''
__check_key = 'last_fetch_datetime_to'
if self.__dict_check.check_key_exist(self.__last_fetch_datetime_file_dict, __check_key):
return self.__last_fetch_datetime_file_dict[__check_key].format(execute_datetime=self.execute_datetime)
else:
return self.execute_datetime
@property
def last_fetch_datetime_from(self) -> str:
return self.__last_fetch_datetime_file_dict[LAST_FETCH_DATETIME_FROM_KEY]
@property
def last_fetch_datetime_to(self) -> str:
return self.__last_fetch_datetime_file_dict[LAST_FETCH_DATETIME_TO_KEY].format(execute_datetime=self.__execute_datetime) if self.__dict_checker.check_key_exist(LAST_FETCH_DATETIME_FROM_KEY) else self.__execute_datetime

View File

@ -1,125 +1,45 @@
from src.check_object_info_process import check_object_info # オブジェクト情報形式チェック処理
from src.convert_crm_csvdata_process import convert_crm_csvdata # CSV変換処理
from src.backup_crm_csvdata_process import backup_crm_csvdata # CSVバックアップ処理
from src.set_datetime_period_process import set_datetime_period # データ取得期間設定処理
from src.updload_result_data_process import updload_result_data # 取得処理実施結果アップロード処理
import gc
from src.backup_crm_csv_data_process import backup_crm_csv_data_process
from src.backup_crm_data_process import backup_crm_data_process
from src.check_object_info_process import check_object_info_process
from src.config.objects import FetchTargetObjects
from src.convert_crm_csv_data_process import convert_crm_csv_data_process
from src.copy_crm_csv_data_process import copy_crm_csv_data_process
from src.error.exceptions import MeDaCaCRMDataFetchException
from src.fetch_crm_data_process import fetch_crm_data # CRMデータ取得処理
from src.prepare_get_data_process import prepare_get_data # データ取得準備処理
from src.backup_crm_data_process import backup_crm_data # CRM電文データバックアップ処理
from src.upload_last_fetch_datetime_process import upload_last_fetch_datetime # 前回取得日時ファイル更新
from src.copy_crm_csvdata_process import copy_crm_csvdata # CSVアップロード処理
from src.util.execute_datetime import ExecuteDateTime
from src.fetch_crm_data_process import fetch_crm_data_process
from src.prepare_data_fetch_process import prepare_data_fetch_process
from src.set_datetime_period_process import set_datetime_period_process
from src.system_var.constants import OBJECT_NAME_KEY
from src.upload_last_fetch_datetime_process import upload_last_fetch_datetime_process
from src.upload_result_data_process import upload_result_data_process
from src.util.logger import logger_instance as logger
def main() -> None:
def controller() -> None:
"""コントロール処理"""
try:
# ① CRMデータ取得処理開始ログを出力する
logger.info('I-CTRL-01 CRMデータ取得処理を開始します')
fetch_target_objects = None # オブジェクト情報ファイル用オブジェクト
execute_datetime = None # 実行日次文字列
date_path = None # 実行日次のパス文字列
process_result = None # オブジェクトごとの実行結果JSON
# ② データ取得準備処理を呼び出す
logger.info('I-CTRL-02 データ取得準備処理呼び出し')
fetch_target_objects, execute_datetime, date_path, process_result = prepare_get_data()
fetch_target_objects, execute_datetime, process_result = prepare_data_fetch_process()
# ③ object_infoのobjectsキーの値の件数分ループする
logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始')
for object_info in fetch_target_objects:
try:
# 1. オブジェクト処理結果の初期化
target_object = None # オブジェクトごとの情報格納用オブジェクト
last_fetch_datetime = None # オブジェクトごとの取得日付格納用オブジェクト
sf_object_jsons = None # オブジェクトごとのSalesforce取得変数JSON
csv_object = None # CSVオブジェクト
process_result[object_info.get('object_name')] = 'fail' # オブジェクト処理結果
logger.debug(f'D-CTRL-04 対象のオブジェクト情報を出力します オブジェクト情報:{object_info}')
# 2. オブジェクト情報形式チェック処理を呼び出す
logger.info('I-CTRL-05 オブジェクト情報形式チェック処理呼び出し')
target_object = check_object_info(object_info, execute_datetime)
# 3. 処理対象のオブジェクト名をログ出力する
logger.info(
f'I-CTRL-06 [{target_object.object_name}]のデータ取得を開始します')
# 4. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する
if target_object.is_skip is True:
logger.info(
f'I-CTRL-07 [{target_object.object_name}]のデータ取得処理をスキップします')
continue
# 5. データ取得期間設定処理を呼び出す
logger.info(
f'I-CTRL-08 [{target_object.object_name}]のデータ取得期間設定処理呼び出し')
last_fetch_datetime = set_datetime_period(target_object, execute_datetime)
# 6. CRMデータ取得処理を呼び出す
logger.info(
f'I-CTRL-09 [{target_object.object_name}]のデータ取得処理呼び出し')
sf_object_jsons = fetch_crm_data(target_object, last_fetch_datetime)
# 7. 出力ファイル名をログ出力する
logger.info(
f'I-CTRL-10 [{target_object.object_name}] の出力ファイル名は [{target_object.upload_file_name}]となります')
# 8. CRM電文データバックアップ処理を呼び出す
logger.info(
f'I-CTRL-11 [{target_object.object_name}] CRM電文データバックアップ処理呼び出し')
backup_crm_data(target_object.object_name, sf_object_jsons, date_path)
# 9. CSV変換処理を呼び出す
logger.info(
f'I-CTRL-12 [{target_object.object_name}] CSV変換処理呼び出し')
csv_object = convert_crm_csvdata(target_object, sf_object_jsons)
# 10. CSVバックアップ処理を呼び出す
logger.info(
f'I-CTRL-13 [{target_object.object_name}] CSVデータバックアップ処理呼び出し')
backup_crm_csvdata(target_object, date_path, csv_object)
# 11. CSVアップロード処理を呼び出す
logger.info(
f'I-CTRL-14 [{target_object.object_name}] CSVデータアップロード処理呼び出し')
copy_crm_csvdata(target_object, date_path)
# 12. 前回取得日時ファイル更新処理を呼びだす
logger.info(
f'I-CTRL-15 [{target_object.object_name}] 前回取得日時ファイル更新処理呼び出し')
upload_last_fetch_datetime(target_object, last_fetch_datetime)
# 13. オブジェクト処理結果の更新
process_result[target_object.object_name] = 'success'
# 14. オブジェクトのアップロードが完了した旨をログに出力する
logger.info(f'I-CTRL-16 [{target_object.object_name}] 処理正常終了')
except MeDaCaCRMDataFetchException as e:
logger.info(f'{e.error_id} {e}')
logger.info(
f'I-ERR-03 [{object_info.get("object_name")}] の[{e.func_name}]でエラーが発生しました 次のオブジェクトの処理に移行します', exc_info=True)
continue
except Exception as e:
logger.info(
f'I-ERR-04 [{object_info.get("object_name")}] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します', e, exc_info=True)
continue
process_result = fetch_crm_data(fetch_target_objects, execute_datetime, process_result)
# ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する
logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{process_result}]')
# ⑤ 取得処理実施結果アップロード処理を呼び出す
logger.info('I-CTRL-19 CRM_取得処理実施結果ファイルアップロード処理開始')
updload_result_data(process_result, date_path)
upload_result_data_process(process_result, execute_datetime)
# ⑥ 最終結果をチェックし、チェック結果をログに出力
if not all([v == 'success' for v in process_result.values()]):
@ -142,5 +62,136 @@ def main() -> None:
return exit(0)
if __name__ == '__main__':
main()
def fetch_crm_data(fetch_target_objects: FetchTargetObjects, execute_datetime: ExecuteDateTime, process_result: dict):
"""
取得対象オブジェクト情報をループしオブジェクトごとのデータを取得する
Parameters
----------
fetch_target_objects : FetchTargetObjects
CRMオブジェクト情報インスタンス
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
process_result : dict
取得処理実行結果辞書オブジェクト
Returns
-------
process_result : dict
取得処理実行結果辞書オブジェクト
"""
for object_info in fetch_target_objects:
try:
process_result[object_info.get(OBJECT_NAME_KEY)] = 'fail'
fetch_crm_data_per_object(object_info, execute_datetime, process_result)
process_result[object_info.get(OBJECT_NAME_KEY)] = 'success'
except MeDaCaCRMDataFetchException as e:
logger.info(f'{e.error_id} {e}')
logger.info(
f'I-ERR-03 [{object_info.get(OBJECT_NAME_KEY)}] の[{e.func_name}]でエラーが発生しました 次のオブジェクトの処理に移行します', exc_info=True)
continue
except Exception as e:
logger.info(
f'I-ERR-04 [{object_info.get(OBJECT_NAME_KEY)}] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します', e, exc_info=True)
continue
return process_result
def fetch_crm_data_per_object(object_info: dict, execute_datetime: ExecuteDateTime) -> None:
"""
オブジェクトごとにCRMのデータを取得し取込フォルダにアップロードする
Parameters
----------
object_info : dict
取得対象オブジェクト情報
execute_datetime : object
実行日次取得インスタンス
Returns
-------
なし
Raises
------
FileNotFoundException
S3上のファイルが存在しない場合
InvalidConfigException
オブジェクト情報定義が不正だった場合
"""
# 1. オブジェクト処理結果の初期化
logger.debug(f'D-CTRL-04 対象のオブジェクト情報を出力します オブジェクト情報:{object_info}')
# 2. オブジェクト情報形式チェック処理を呼び出す
logger.info('I-CTRL-05 オブジェクト情報形式チェック処理呼び出し')
target_object = check_object_info_process(object_info, execute_datetime)
target_object_name = target_object.object_name
# 3. 処理対象のオブジェクト名をログ出力する
logger.info(
f'I-CTRL-06 [{target_object_name}]のデータ取得を開始します')
# 4. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する
if target_object.is_skip is True:
logger.info(
f'I-CTRL-07 [{target_object_name}]のデータ取得処理をスキップします')
return
# 5. データ取得期間設定処理を呼び出す
logger.info(
f'I-CTRL-08 [{target_object_name}]のデータ取得期間設定処理呼び出し')
last_fetch_datetime = set_datetime_period_process(target_object, execute_datetime)
# 6. CRMデータ取得処理を呼び出す
logger.info(
f'I-CTRL-09 [{target_object_name}]のデータ取得処理呼び出し')
crm_data_response = fetch_crm_data_process(target_object, last_fetch_datetime)
# 7. 出力ファイル名をログ出力する
logger.info(
f'I-CTRL-10 [{target_object_name}] の出力ファイル名は [{target_object.upload_file_name}]となります')
# 8. CRM電文データバックアップ処理を呼び出す
logger.info(
f'I-CTRL-11 [{target_object_name}] CRM電文データバックアップ処理呼び出し')
backup_crm_data_process(target_object_name, crm_data_response, execute_datetime)
# 9. CSV変換処理を呼び出す
logger.info(
f'I-CTRL-12 [{target_object.object_name}] CSV変換処理呼び出し')
csv_string = convert_crm_csv_data_process(target_object, crm_data_response)
# 10. CSVバックアップ処理を呼び出す
logger.info(
f'I-CTRL-13 [{target_object_name}] CSVデータバックアップ処理呼び出し')
backup_crm_csv_data_process(target_object, execute_datetime, csv_string)
# 11. CSVアップロード処理を呼び出す
logger.info(
f'I-CTRL-14 [{target_object_name}] CSVデータアップロード処理呼び出し')
copy_crm_csv_data_process(target_object, execute_datetime)
# 12. メモリ解放
del crm_data_response
del csv_string
gc.collect()
# 13. 前回取得日時ファイル更新処理を呼びだす
logger.info(
f'I-CTRL-15 [{target_object_name}] 前回取得日時ファイル更新処理呼び出し')
upload_last_fetch_datetime_process(target_object, last_fetch_datetime)
# 14. オブジェクトのアップロードが完了した旨をログに出力する
logger.info(f'I-CTRL-16 [{target_object_name}] 処理正常終了')
return

View File

@ -0,0 +1,51 @@
from src.config.objects import TargetObject
from src.converter.converter import CSVStringConverter
from src.error.exceptions import DataConvertException
from src.system_var.constants import CONV_JP_NAME
from src.util.logger import logger_instance as logger
def convert_crm_csv_data_process(target_object: TargetObject, crm_data_response: dict):
"""
CSV変換処理
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
crm_data_response : dict
Salesforceオブジェクトデータ
Returns
-------
csv_string : str
csvデータ
Raises
------
DataConvertException
データ変換が失敗した場合
"""
# ① CSV変換処理の開始ログを出力する
target_object_name = target_object.object_name
logger.info(f'I-CONV-01 [{target_object_name}] のCSV変換処理を開始します')
try:
# ② CSV変換
CSVStringConverter(target_object, crm_data_response)
csv_string = CSVStringConverter.convert()
logger.debug(f'D-CONV-02 [{target_object_name}] のCSV変換処理 正常終了')
except Exception as e:
raise DataConvertException(
'E-CONV-01', CONV_JP_NAME, f'[{target_object_name}] CSV変換に失敗しました エラー内容:[{e}]')
# ③ CSV変換処理の終了ログを出力する
logger.info(f'I-CONV-03 [{target_object_name}] のCSV変換処理を終了します')
# ④ 次の処理へ移行する
return csv_string

View File

@ -1,25 +0,0 @@
from src.system_var.constants import CONV_JP_NAME
from src.converter.converter import CSVStringConverter
from src.error.exceptions import DataConvertException
from src.util.logger import logger_instance as logger
def convert_crm_csvdata(target_object, sf_object_jsons):
# ① CSV変換処理の開始ログを出力する
logger.info(f'I-CONV-01 [{target_object.object_name}] のCSV変換処理を開始します')
try:
# ② CSV変換
csv_object = CSVStringConverter(target_object, sf_object_jsons)
logger.debug(f'D-CONV-02 [{target_object.object_name}] のCSV変換処理 正常終了')
except Exception as e:
raise DataConvertException(
'E-CONV-01', CONV_JP_NAME, f'[{target_object.object_name}] CSV変換に失敗しました エラー内容:[{e}]')
# ③ CSV変換処理の終了ログを出力する
logger.info(f'I-CONV-03 [{target_object.object_name}] のCSV変換処理を終了します')
# ④ 次の処理へ移行する
return csv_object

View File

@ -0,0 +1,55 @@
import re
from datetime import datetime
from src.config.objects import TargetObject
from src.system_var.constants import (CRM_DATETIME_FORMAT, CSV_FALSE_VALUE,
CSV_TRUE_VALUE, YYYYMMDDHHMMSS, DATE_PATTERN_YYYYMMDDHHMMSSFFF_UTC)
class ConvertStrategyFactory:
def __init__(self) -> None:
self.__none_value_convert_strategy = NoneValueConvertStrategy()
self.__float_convert_strategy = FloatConvertStrategy()
self.__boolean_convert_strategy = BooleanConvertStrategy()
self.__datetime_convert_strategy = DatatimeConvertStrategy()
def create(self, value):
converted_value = value
if value is None:
converted_value = self.__none_value_convert_strategy.convert_value()
# 指数表記で取得できるパターン。指数表記を整数表記に変換する。
elif type(value) == float:
converted_value = self.__float_convert_strategy.convert_value(value)
# SQLの真偽値に対応するために変換する
elif type(value) == bool:
converted_value = self.__boolean_convert_strategy.convert_value(value)
elif type(value) == str and re.fullmatch(DATE_PATTERN_YYYYMMDDHHMMSSFFF_UTC, value):
converted_value = self.__datetime_convert_strategy.convert_value(value)
return converted_value
class NoneValueConvertStrategy:
def convert_value(self) -> str:
return ''
class BooleanConvertStrategy:
def convert_value(self, convert_value: str) -> bool:
return CSV_TRUE_VALUE if convert_value is True else CSV_FALSE_VALUE
class DatatimeConvertStrategy:
def convert_value(self, convert_value: str) -> str:
return datetime.strptime(convert_value, CRM_DATETIME_FORMAT).strftime(YYYYMMDDHHMMSS)
class FloatConvertStrategy:
def convert_value(self, convert_value: str) -> int:
return int(convert_value)

View File

@ -1,33 +1,35 @@
import csv
import io
import re
from datetime import datetime
from src.system_var.constants import (CRM_DATETIME_FORMAT, CSV_FALSE_VALUE,
CSV_TRUE_VALUE, YYYYMMDDHHMMSS)
from src.config.objects import TargetObject
from src.converter.convert_factory import ConvertStrategyFactory
class CSVStringConverter:
def __init__(self, target_object, sf_object_jsons) -> None:
def __init__(self, target_object: TargetObject, sf_object_jsons: dict) -> None:
self.__target_object = target_object
self.__sf_object_jsons = sf_object_jsons
self.__extracted_sf_object_jsons = self.extract_sf_object_jsons()
self.csv_data = self.convert_to_csv()
self.csv_buffer = self.write_csv()
self.__convert_strategy_factory = ConvertStrategyFactory()
def extract_sf_object_jsons(self) -> list:
def convert(self) -> str:
extracted_sf_object_jsons = self.__extract_sf_object_jsons()
csv_data = self.__convert_to_csv(extracted_sf_object_jsons)
csv_string = self.__write_csv_string(csv_data)
return csv_string
def __extract_sf_object_jsons(self) -> list:
try:
extracted_sf_object_jsons = []
for sf_object_json in self.__sf_object_jsons:
extracted_sf_object_jsons.append(
self.extract_necessary_props_from(sf_object_json))
self.__extract_necessary_props_from(sf_object_json))
return extracted_sf_object_jsons
except Exception as e:
raise Exception('必要なjsonのデータ抽出に失敗しました')
raise Exception('必要なjsonのデータ抽出に失敗しました', e)
def extract_necessary_props_from(self, sf_object_json) -> dict:
def __extract_necessary_props_from(self, sf_object_json) -> dict:
try:
clone_sf_object = {**sf_object_json}
@ -39,19 +41,18 @@ class CSVStringConverter:
return uppercase_key_sf_object
except Exception as e:
raise Exception('必要なjsonのデータ成形に失敗しました')
raise Exception('必要なjsonのデータ成形に失敗しました', e)
def convert_to_csv(self) -> list:
def __convert_to_csv(self, extracted_sf_object_jsons) -> list:
try:
columns = self.__target_object.columns
csv_data = []
for i, json_object in enumerate(self.__extracted_sf_object_jsons, 1):
for i, json_object in enumerate(extracted_sf_object_jsons, 1):
csv_row = []
for column in columns:
v = json_object[column.upper()]
converted_value = CSVStringConverterFactory(
v).value_convert()
converted_value = self.__convert_strategy_factory.create(v)
csv_row.append(converted_value)
@ -62,74 +63,16 @@ class CSVStringConverter:
raise Exception(
f'CSV変換に失敗しました カラム名:[{column}] 行番号: [{i}] エラー内容:[{e}]')
def write_csv(self) -> str:
def __write_csv_string(self, csv_data) -> str:
try:
with io.StringIO(newline='') as string_stream:
writer = csv.writer(string_stream, delimiter=',', lineterminator='\r\n',
doublequote=True, quotechar='"', quoting=csv.QUOTE_ALL, strict=True)
writer.writerow(self.__target_object.columns)
writer.writerows(self.csv_data)
writer.writerows(csv_data)
csv_value = string_stream.getvalue()
return csv_value
except Exception as e:
raise Exception('csvデータの取得に失敗しました')
class NoneValueConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> str:
return ''
class BooleanConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> bool:
return CSV_TRUE_VALUE if self.__convert_value is True else CSV_FALSE_VALUE
class DatatimeConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> str:
return datetime.strptime(self.__convert_value, CRM_DATETIME_FORMAT).strftime(YYYYMMDDHHMMSS)
class FloatConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> int:
return int(self.__convert_value)
class CSVStringConverterFactory:
def __init__(self, v) -> None:
self.__value = v
def value_convert(self):
converted_value = self.__value
if self.__value is None:
converted_value = NoneValueConverter(self.__value).value
# 指数表記で取得できるパターン。指数表記を整数表記に変換する。
elif type(self.__value) == float:
converted_value = FloatConverter(self.__value).value
# SQLの真偽値に対応するために変換する
elif type(self.__value) == bool:
converted_value = BooleanConverter(self.__value).value
elif type(self.__value) == str and re.fullmatch(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.000\+0000', self.__value):
converted_value = DatatimeConverter(self.__value).value
return converted_value
raise Exception('csvデータの取得に失敗しました', e)

View File

@ -0,0 +1,57 @@
from src.aws.s3 import BackupBucket, DataBucket
from src.config.objects import TargetObject
from src.error.exceptions import FileUploadException
from src.system_var.constants import UPLD_JP_NAME
from src.system_var.environments import (CRM_BACKUP_BUCKET, CRM_IMPORT_DATA_BACKUP_FOLDER,
CRM_IMPORT_DATA_FOLDER, IMPORT_DATA_BUCKET)
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def copy_crm_csv_data_process(target_object: TargetObject, execute_datetime: ExecuteDateTime):
"""
CSVアップロード処理
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
Returns
-------
なし
Raises
------
FileUploadException
S3のファイルアップロード失敗
"""
# ① CSVデータアップロード処理の開始ログを出力する
target_object_name = target_object.object_name
upload_file_name = target_object.upload_file_name
logger.info(
f'I-UPLD-01 [{target_object_name}] のCSVデータアップロード処理を開始します ファイル名:[{upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに保管した変換後のCSVデータをデータ取込バケットにコピーする
data_bucket = DataBucket()
backup_bucket = BackupBucket()
data_bucket.put_csv_from(str(backup_bucket), f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{execute_datetime.to_path()}/{upload_file_name}.csv')
logger.debug(
f'D-UPLD-02 [{target_object_name}] のCSVデータアップロード 正常終了')
except Exception as e:
raise FileUploadException(
'E-UPLD-01', UPLD_JP_NAME, f'[{target_object_name}] CSVデータのアップロードに失敗しました ファイル名:[{upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVデータアップロード処理の終了ログを出力する
logger.info(
f'I-UPLD-03 [{target_object_name}] のCSVデータのアップロード処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -1,32 +0,0 @@
from src.aws.s3 import S3ResourceNonBucket
from src.system_var.constants import UPLD_JP_NAME
from src.system_var.environments import (CRM_BACKUP_BUCKET, CRM_IMPORT_DATA_BACKUP_FOLDER,
CRM_IMPORT_DATA_FOLDER, IMPORT_DATA_BUCKET)
from src.error.exceptions import FileUploadException
from src.util.logger import logger_instance as logger
def copy_crm_csvdata(target_object, date_path):
# ① CSVデータアップロード処理の開始ログを出力する
logger.info(
f'I-UPLD-01 [{target_object.object_name}] のCSVデータアップロード処理を開始します ファイル名:[{target_object.upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに保管した変換後のCSVデータをデータ取込バケットにコピーする
s3_resource_non_bucket = S3ResourceNonBucket()
s3_resource_non_bucket.copy(CRM_BACKUP_BUCKET, f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{date_path}/{target_object.upload_file_name}.csv',
IMPORT_DATA_BUCKET, f'{CRM_IMPORT_DATA_FOLDER}/{target_object.upload_file_name}.csv')
logger.debug(
f'D-UPLD-02 [{target_object.object_name}] のCSVデータアップロード 正常終了')
except Exception as e:
raise FileUploadException(
'E-UPLD-01', UPLD_JP_NAME, f'[{target_object.object_name}] CSVデータのアップロードに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVデータアップロード処理の終了ログを出力する
logger.info(
f'I-UPLD-03 [{target_object.object_name}] のCSVデータのアップロード処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -4,7 +4,7 @@ from abc import ABCMeta
class MeDaCaCRMDataFetchException(Exception, metaclass=ABCMeta):
"""MeDaCaシステム固有のカスタムエラークラス"""
def __init__(self, error_id: str, func_name, message) -> None:
def __init__(self, error_id: str, func_name: str, message: str) -> None:
super().__init__(message)
self.func_name = func_name
self.error_id = error_id

View File

@ -2,6 +2,10 @@ from requests.exceptions import ConnectTimeout, ReadTimeout
from tenacity import retry, stop_after_attempt
from tenacity.wait import wait_exponential
from src.config.objects import TargetObject, LastFetchDatetime
from src.error.exceptions import DataConvertException, SalesforceAPIException
from src.salesforce.salesforce_api import SalesforceApiClient
from src.salesforce.soql_builder import SOQLBuilder
from src.system_var.constants import FETCH_JP_NAME
from src.system_var.environments import (CRM_AUTH_TIMEOUT,
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT,
@ -14,20 +18,38 @@ from src.system_var.environments import (CRM_AUTH_TIMEOUT,
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL,
CRM_GET_RECORD_COUNT_TIMEOUT)
from src.error.exceptions import DataConvertException, SalesforceAPIException
from src.salesforce.salesforce_api import SalesForceCount, SalesForceData
from src.util.logger import logger_instance as logger
def fetch_crm_data(target_object, last_fetch_datetime):
def fetch_crm_data_process(target_object: TargetObject, last_fetch_datetime: LastFetchDatetime):
"""
CRMデータ取得処理
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
last_fetch_datetime : LastFetchDatetime
取得対象オブジェクト情報インスタンス
Returns
-------
sf_object_dict : dict
Salesforceオブジェクトデータ
Raises
------
SalesforceAPIException
SalseforceのAPI実行失敗が発生した場合
DataConvertException
データ変換が失敗した場合
"""
# ① CRMデータ取得処理開始ログを出力する
logger.info(
f'I-FETCH-01 [{target_object.object_name}] のCRMからのデータ取得処理を開始します')
object_name = target_object.object_name
columns = ','.join(target_object.columns)
last_fetch_datetime_from = last_fetch_datetime.last_fetch_datetime_from
last_fetch_datetime_to = last_fetch_datetime.last_fetch_datetime_to
global count_contime_counter, count_readtime_counter, count_counter, data_contime_counter, data_readtime_counter, data_counter
count_contime_counter = 1
@ -41,8 +63,10 @@ def fetch_crm_data(target_object, last_fetch_datetime):
# ② 取得対象オブジェクトの取得期間内のレコード件数を取得する
logger.info(f'I-FETCH-02 [{object_name}] の件数取得を開始します')
record_count = fetch_sf_count_retry(
object_name, last_fetch_datetime_from, last_fetch_datetime_to)
soql_builder = SOQLBuilder(target_object, last_fetch_datetime)
count_soql = soql_builder.create_count_soql()
record_count = fetch_record_count_retry(count_soql, object_name)
logger.info(f'I-FETCH-03 [{object_name}] の件数:[{record_count}]')
@ -54,8 +78,9 @@ def fetch_crm_data(target_object, last_fetch_datetime):
# ③ 取得対象オブジェクトのレコードを取得する
logger.info(f'I-FETCH-04 [{object_name}] のレコード取得を開始します')
record_generator = fetch_sf_data_retry(
columns, object_name, last_fetch_datetime_from, last_fetch_datetime_to)
fetch_soql = soql_builder.create_fetch_soql()
record_all = fetch_sf_data_retry(fetch_soql, object_name)
except Exception as e:
raise SalesforceAPIException(
@ -65,10 +90,7 @@ def fetch_crm_data(target_object, last_fetch_datetime):
# ④ 取得対象オブジェクトをJSONに変換
logger.info(f'I-FETCH-05 [{object_name}] のレコードをJSONに変換します')
sf_object_jsons = []
for record in record_generator:
sf_object_jsons.append(record)
crm_data_response = [record for record in record_all]
except Exception as e:
raise DataConvertException(
@ -78,19 +100,19 @@ def fetch_crm_data(target_object, last_fetch_datetime):
logger.info(f'I-FETCH-06 [{object_name}] のレコード取得が成功しました')
# ⑥ 次の処理へ移行する
return sf_object_jsons
return crm_data_response
@retry(
wait=wait_exponential(multiplier=CRM_GET_RECORD_COUNT_RETRY_INTERVAL,
min=CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, max=CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL),
stop=stop_after_attempt(CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT))
def fetch_sf_count_retry(object_name, last_update_datetime_from, last_update_datetime_to):
def fetch_record_count_retry(soql: str, object_name: str):
try:
global count_contime_counter, count_readtime_counter, count_counter
saleforce_count = SalesForceCount()
return saleforce_count.fetch_sf_count(object_name, last_update_datetime_from, last_update_datetime_to)
salesforce_api_client = SalesforceApiClient()
return salesforce_api_client.fetch_sf_count(soql)
except ConnectTimeout as e:
if count_contime_counter < CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT:
@ -117,12 +139,12 @@ def fetch_sf_count_retry(object_name, last_update_datetime_from, last_update_dat
wait=wait_exponential(multiplier=CRM_FETCH_RECORD_RETRY_INTERVAL,
min=CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, max=CRM_FETCH_RECORD_RETRY_MAX_INTERVAL),
stop=stop_after_attempt(CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT))
def fetch_sf_data_retry(columns, object_name, last_update_datetime_from, last_update_datetime_to):
def fetch_sf_data_retry(soql: str, object_name: str):
try:
global data_contime_counter, data_readtime_counter, data_counter
saleforce_data = SalesForceData()
return saleforce_data.fetch_sf_data(columns, object_name, last_update_datetime_from, last_update_datetime_to)
salesforce_api_client = SalesforceApiClient()
return salesforce_api_client.fetch_sf_data(soql)
except ConnectTimeout as e:
if data_contime_counter < CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT:

View File

@ -1,24 +1,46 @@
from datetime import datetime
from src.aws.s3 import ConfigBucket
from src.config.objects import FetchTargetObjects
from src.system_var.constants import PRE_JP_NAME, YYYYMMDDTHHMMSSTZ
from src.system_var.environments import (CRM_CONFIG_BUCKET, OBJECT_INFO_FILENAME,
OBJECT_INFO_FOLDER)
from src.error.exceptions import FileNotFoundException, InvalidConfigException
from src.parser.json_parse import JsonParser
from src.system_var.constants import PRE_JP_NAME
from src.system_var.environments import (CRM_CONFIG_BUCKET, OBJECT_INFO_FILENAME,
OBJECT_INFO_FOLDER)
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def prepare_get_data():
def prepare_data_fetch_process():
"""
データ取得準備処理
Parameters
----------
なし
Returns
-------
fetch_target_objects : FetchTargetObjects
CRMオブジェクト情報インスタンス
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
process_result : dict
取得処理実行結果辞書オブジェクト
Raises
------
FileNotFoundException
S3上のファイルが存在しない場合
InvalidConfigException
オブジェクト情報定義が不正だった場合
"""
# ① データ取得準備処理の開始ログを出力する
logger.info('I-PRE-01 データ取得準備処理を開始します')
# ② 取得処理開始年月日時分秒を控える
execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ)
date_path = execute_datetime.rstrip('000Z').translate(
str.maketrans({'-': '/', 'T': '/', ':': None, '.': None}))
execute_datetime = ExecuteDateTime()
logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}')
@ -29,7 +51,7 @@ def prepare_get_data():
f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{object_info_file_s3_path}]')
config_bucket = ConfigBucket()
object_info_file_json = config_bucket.get_object_info_file()
object_info_file_str = config_bucket.get_object_info_file()
logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました')
@ -41,7 +63,7 @@ def prepare_get_data():
# ④ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する
logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします')
json_parser = JsonParser(object_info_file_json)
json_parser = JsonParser(object_info_file_str)
object_info_file_dict = json_parser.json_parser()
logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました')
@ -69,4 +91,4 @@ def prepare_get_data():
logger.info('I-PRE-09 データ取得準備処理を終了します')
# ⑧ 次の処理へ移行する
return(fetch_target_objects, execute_datetime, date_path, process_result)
return(fetch_target_objects, execute_datetime, process_result)

View File

@ -1,61 +1,28 @@
from simple_salesforce import Salesforce
from src.system_var.environments import (CRM_AUTH_DOMAIN, CRM_AUTH_MAX_RETRY_ATTEMPT,
CRM_AUTH_RETRY_INTERVAL,
CRM_AUTH_RETRY_MAX_INTERVAL,
CRM_AUTH_RETRY_MIN_INTERVAL, CRM_AUTH_TIMEOUT,
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT,
CRM_FETCH_RECORD_RETRY_INTERVAL,
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL,
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL,
from src.system_var.environments import (CRM_AUTH_DOMAIN, CRM_AUTH_TIMEOUT,
CRM_FETCH_RECORD_TIMEOUT,
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT,
CRM_GET_RECORD_COUNT_RETRY_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL,
CRM_GET_RECORD_COUNT_TIMEOUT, CRM_USER_NAME,
CRM_USER_PASSWORD, CRM_USER_SECURITY_TOKEN)
FETCH_SOQL = """SELECT {column_or_expression} FROM {object_name}
WHERE SystemModStamp > {last_update_datetime_from}
AND SystemModStamp <= {last_update_datetime_to}
"""
class SalesfoeceApi():
class SalesforceApiClient():
def __init__(self) -> None:
self.__sf = Salesforce(username=CRM_USER_NAME, password=CRM_USER_PASSWORD,
security_token=CRM_USER_SECURITY_TOKEN,
domain=CRM_AUTH_DOMAIN
)
def sf_query(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
def query(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
return self.__sf.query(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout)))
def sf_query_all_iter(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
return self.__sf.query_all_iter(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout)))
def query_all(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
return self.__sf.query_all(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout)))
class SalesForceCount():
def fetch_sf_count(self, object_name, last_update_datetime_from, last_update_datetime_to):
count_soql = FETCH_SOQL.format(
column_or_expression='COUNT(Id)',
object_name=object_name,
last_update_datetime_from=last_update_datetime_from,
last_update_datetime_to=last_update_datetime_to
)
self.__sf = SalesfoeceApi()
count_res = self.__sf.sf_query(
count_soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_GET_RECORD_COUNT_TIMEOUT)
def fetch_sf_count(self, soql: str):
count_res = self.query(soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_GET_RECORD_COUNT_TIMEOUT)
return count_res.get('records')[0].get('expr0')
class SalesForceData():
def fetch_sf_data(self, columns, object_name, last_update_datetime_from, last_update_datetime_to):
soql = FETCH_SOQL.format(
column_or_expression=columns,
object_name=object_name,
last_update_datetime_from=last_update_datetime_from,
last_update_datetime_to=last_update_datetime_to
)
self.__sf = SalesfoeceApi()
return self.__sf.sf_query_all_iter(soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_FETCH_RECORD_TIMEOUT)
def fetch_sf_data(self, soql: str):
return self.query_all(soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_FETCH_RECORD_TIMEOUT)

View File

@ -0,0 +1,34 @@
from src.config.objects import TargetObject, LastFetchDatetime
class SOQLBuilder:
def __init__(self, target_object: TargetObject, last_fetch_datetime: LastFetchDatetime) -> None:
self.__SELECT_SOQL = """SELECT {column_or_expression} FROM {object_name}
WHERE {datetime_column} > {last_fetch_datetime_from}
AND {datetime_column} <= {last_fetch_datetime_to}
"""
self.__target_object = target_object
self.__last_fetch_datetime = last_fetch_datetime
def create_count_soql(self):
count_soql = self.__SELECT_SOQL.format(
column_or_expression='COUNT(Id)',
object_name=self.__target_object.object_name,
last_fetch_datetime_from=self.__last_fetch_datetime.last_fetch_datetime_from,
last_fetch_datetime_to=self.__last_fetch_datetime.last_fetch_datetime_to,
datetime_column=self.__target_object.datetime_column
)
return count_soql
def create_fetch_soql(self):
columns = ','.join(self.__target_object.columns)
fetch_soql = self.__SELECT_SOQL.format(
column_or_expression=columns,
object_name=self.__target_object.object_name,
last_fetch_datetime_from=self.__last_fetch_datetime.last_fetch_datetime_from,
last_fetch_datetime_to=self.__last_fetch_datetime.last_fetch_datetime_to,
datetime_column=self.__target_object.datetime_column
)
return fetch_soql

View File

@ -1,13 +1,39 @@
from src.aws.s3 import ConfigBucket
from src.config.objects import LastFetchDatetime
from src.system_var.constants import DATE_JP_NAME
from src.system_var.environments import CRM_CONFIG_BUCKET, LAST_FETCH_DATE_FOLDER
from src.config.objects import TargetObject, LastFetchDatetime
from src.error.exceptions import FileNotFoundException, InvalidConfigException
from src.parser.json_parse import JsonParser
from src.system_var.constants import DATE_JP_NAME
from src.system_var.environments import CRM_CONFIG_BUCKET, LAST_FETCH_DATE_FOLDER
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def set_datetime_period(target_object, execute_datetime):
def set_datetime_period_process(target_object: TargetObject, execute_datetime: ExecuteDateTime):
"""
データ取得期間設定処理
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
execute_datetime : ExecuteDateTime
実行日次取得インスタンス
Returns
-------
last_fetch_datetime : LastFetchDatetime
取得対象オブジェクト情報インスタンス
Raises
------
FileNotFoundException
S3上のファイルが存在しない場合
InvalidConfigException
オブジェクト情報定義が不正だった場合
"""
# ① データ取得期間設定処理の開始ログを出力する
logger.info(
f'I-DATE-01 [{target_object.object_name}] のデータ取得期間設定処理を開始します')
@ -18,7 +44,7 @@ def set_datetime_period(target_object, execute_datetime):
f'I-DATE-02 前回取得日時ファイルの取得開始します ファイルパス:[s3://{CRM_CONFIG_BUCKET}/{LAST_FETCH_DATE_FOLDER}/{target_object.last_fetch_datetime_file_name}]')
s3_config_bucket = ConfigBucket()
last_fetch_datetime_file_json = s3_config_bucket.get_last_fetch_datetime_file(
last_fetch_datetime_file_str = s3_config_bucket.get_last_fetch_datetime_file(
target_object.last_fetch_datetime_file_name)
logger.info(f'I-DATE-03 前回取得日時ファイルの取得成功しました')
@ -32,11 +58,10 @@ def set_datetime_period(target_object, execute_datetime):
# ④ データの取得期間を設定する
logger.debug(f'D-DATE-04 前回取得日時ファイルの形式チェックを開始します')
json_parser = JsonParser(last_fetch_datetime_file_json)
json_parser = JsonParser(last_fetch_datetime_file_str)
last_fetch_datetime_file_dict = json_parser.json_parser()
last_fetch_datetime = LastFetchDatetime(target_object.last_fetch_datetime_file_name,
last_fetch_datetime_file_dict, execute_datetime)
last_fetch_datetime = LastFetchDatetime(last_fetch_datetime_file_dict, execute_datetime)
logger.debug(f'D-DATE-05 前回取得日時ファイルの形式チェック 正常終了')
logger.info(

View File

@ -1,6 +1,5 @@
# environments(task settings file)
# ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
LOG_LEVEL = "LOG_LEVEL"
LOG_LEVEL = 'LOG_LEVEL' # ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
CRM_AUTH_TIMEOUT = 'CRM_AUTH_TIMEOUT' # CRMへの認証処理のタイムアウト秒数
CRM_AUTH_MAX_RETRY_ATTEMPT = 'CRM_AUTH_MAX_RETRY_ATTEMPT' # CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_RETRY_INTERVAL = 'CRM_AUTH_RETRY_INTERVAL' # CRMへの認証処理のリトライ時の初回待ち秒数
@ -17,7 +16,7 @@ CRM_FETCH_RECORD_RETRY_INTERVAL = 'CRM_FETCH_RECORD_RETRY_INTERVAL'
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MIN_INTERVAL' # CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MAX_INTERVAL' # CRMのレコード取得処理のリトライ時の最大待ち秒数
# environments(ECS Task Enviroment)
# environments(ECS Task Environment)
CRM_AUTH_DOMAIN = 'CRM_AUTH_DOMAIN' # CRMのAPI実行のための認証エンドポイントのドメイン
CRM_USER_NAME = 'CRM_USER_NAME' # CRMのAPI実行用ユーザ名
CRM_USER_PASSWORD = 'CRM_USER_PASSWORD' # CRMのAPI実行用ユーザパスワード
@ -40,16 +39,17 @@ CRM_IMPORT_DATA_BACKUP_FOLDER = 'CRM_IMPORT_DATA_BACKUP_FOLDER'
YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z'
CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z'
YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S'
MILLISEC_FORMAT = '000Z'
# aws
AWS_RESOURCE_S3 = 's3'
AWS_CLINET_S3 = 's3'
S3_RESPONSE_BODY = 'Body'
S3_CHAR_CODE = 'utf-8'
# 正規表現チェック
EXCLUDE_SYMBOL = ['#', '/']
DATE_PATTERN_YYYYMMDDTHHMMSSTZ = r'[12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])T([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9].000Z'
DATE_PATTERN_YYYYMMDDTHHMMSSTZ = r'[12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])T([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]\.000Z'
DATE_PATTERN_YYYYMMDDHHMMSSFFF_UTC = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.000\+0000'
# logger
LOG_FORMAT = '[%(levelname)s]\t%(asctime)s\t%(message)s\n'
@ -72,3 +72,24 @@ END_JP_NAME = '取得処理実施結果アップロード処理'
# CSVチェック
CSV_TRUE_VALUE = '1'
CSV_FALSE_VALUE = '0'
# オブジェクト変数
OBJECTS_KEY = 'objects'
OBJECTS_TYPE = list
OBJECT_NAME_KEY = 'object_name'
OBJECT_NAME_TYPE = str
COLUMNS_KEY = 'columns'
COLUMNS_TYPE = list
IS_SKIP_KEY = 'is_skip'
IS_SKIP_TYPE = bool
IS_UPDATE_LAST_FETCH_DATETIME_KEY = 'is_update_last_fetch_datetime'
IS_UPDATE_LAST_FETCH_DATETIME_TYPE = bool
LAST_FETCH_DATETIME_FILE_NAME_KEY = 'last_fetch_datetime_file_name'
LAST_FETCH_DATETIME_FILE_NAME_TYPE = str
UPLOAD_FILE_NAME_KEY = 'upload_file_name'
UPLOAD_FILE_NAME_TYPE = str
DATETIME_COLUMN_KEY = 'datetime_column'
DATETIME_COLUMN_TYPE = str
DATETIME_COLUMN_DEFAULT_VALUE = 'SystemModstamp'
LAST_FETCH_DATETIME_TO_KEY = 'last_fetch_datetime_to'
LAST_FETCH_DATETIME_FROM_KEY = 'last_fetch_datetime_from'

View File

@ -6,37 +6,37 @@ import src.system_var.constants as constants
# ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
LOG_LEVEL = os.environ.get(constants.LOG_LEVEL, constants.LOG_LEVEL_INFO)
# CRMへの認証処理のタイムアウト秒数
CRM_AUTH_TIMEOUT = os.environ[constants.CRM_AUTH_TIMEOUT]
CRM_AUTH_TIMEOUT = os.environ.get(constants.CRM_AUTH_TIMEOUT, 100)
# CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ[constants.CRM_AUTH_MAX_RETRY_ATTEMPT]
CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ.get(constants.CRM_AUTH_MAX_RETRY_ATTEMPT, 3)
# CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_INTERVAL = os.environ[constants.CRM_AUTH_RETRY_INTERVAL]
CRM_AUTH_RETRY_INTERVAL = os.environ.get(constants.CRM_AUTH_RETRY_INTERVAL, 5)
# CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = os.environ[constants.CRM_AUTH_RETRY_MIN_INTERVAL]
CRM_AUTH_RETRY_MIN_INTERVAL = os.environ.get(constants.CRM_AUTH_RETRY_MIN_INTERVAL, 5)
# CRMへの認証処理のリトライ時の最大待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = os.environ[constants.CRM_AUTH_RETRY_MAX_INTERVAL]
CRM_AUTH_RETRY_MAX_INTERVAL = os.environ.get(constants.CRM_AUTH_RETRY_MAX_INTERVAL, 50)
# CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_TIMEOUT = os.environ[constants.CRM_GET_RECORD_COUNT_TIMEOUT]
CRM_GET_RECORD_COUNT_TIMEOUT = os.environ.get(constants.CRM_GET_RECORD_COUNT_TIMEOUT, 300)
# CRMのレコード件数取得処理の最大リトライ試行回数
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ[constants.CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT]
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ.get(constants.CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT, 3)
# CRMのレコード件数取得処理のリトライ時の初回待ち秒数
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ[constants.CRM_GET_RECORD_COUNT_RETRY_INTERVAL]
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ.get(constants.CRM_GET_RECORD_COUNT_RETRY_INTERVAL, 5)
# CRMのレコード件数取得処理のリトライ時の最小待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ[constants.CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL]
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ.get(constants.CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, 5)
# CRMのレコード件数取得処理のリトライ時の最大待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ[constants.CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL]
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ.get(constants.CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL, 50)
# CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_TIMEOUT = os.environ[constants.CRM_FETCH_RECORD_TIMEOUT]
CRM_FETCH_RECORD_TIMEOUT = os.environ.get(constants.CRM_FETCH_RECORD_TIMEOUT, 300)
# CRMのレコード取得処理の最大リトライ試行回数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ[constants.CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT]
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ.get(constants.CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT, 3)
# CRMのレコード取得処理のリトライ時の初回待ち秒数
CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ[constants.CRM_FETCH_RECORD_RETRY_INTERVAL]
CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ.get(constants.CRM_FETCH_RECORD_RETRY_INTERVAL, 5)
# CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ[constants.CRM_FETCH_RECORD_RETRY_MIN_INTERVAL]
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ.get(constants.CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, 5)
# CRMのレコード取得処理のリトライ時の最大待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ[constants.CRM_FETCH_RECORD_RETRY_MAX_INTERVAL]
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ.get(constants.CRM_FETCH_RECORD_RETRY_MAX_INTERVAL, 50)
# environments(ECS Task Enviroment)
# environments(ECS Task Environment)
# CRMのAPI実行のための認証エンドポイントのドメイン
CRM_AUTH_DOMAIN = os.environ[constants.CRM_AUTH_DOMAIN]
# CRMのAPI実行用ユーザ名
@ -52,20 +52,20 @@ CRM_BACKUP_BUCKET = os.environ[constants.CRM_BACKUP_BUCKET]
# CRMの取込データを格納するバケット名
IMPORT_DATA_BUCKET = os.environ[constants.IMPORT_DATA_BUCKET]
# CRM取得対象オブジェクトの情報を格納するフォルダパス
OBJECT_INFO_FOLDER = os.environ[constants.OBJECT_INFO_FOLDER]
OBJECT_INFO_FOLDER = os.environ.get(constants.OBJECT_INFO_FOLDER, 'crm/object_info')
# CRM取得対象オブジェクトの情報のファイル名
OBJECT_INFO_FILENAME = os.environ[constants.OBJECT_INFO_FILENAME]
# CRMデータ取得結果を格納するフォルダパス
PROCESS_RESULT_FOLDER = os.environ[constants.PROCESS_RESULT_FOLDER]
PROCESS_RESULT_FOLDER = os.environ.get(constants.PROCESS_RESULT_FOLDER, 'data_import')
# CRMデータ取得結果を格納するファイル名
PROCESS_RESULT_FILENAME = os.environ[constants.PROCESS_RESULT_FILENAME]
PROCESS_RESULT_FILENAME = os.environ.get(constants.PROCESS_RESULT_FILENAME, 'process_result.json')
# CRMからの最終取得日時ファイルを格納するフォルダパス
LAST_FETCH_DATE_FOLDER = os.environ[constants.LAST_FETCH_DATE_FOLDER]
LAST_FETCH_DATE_FOLDER = os.environ.get(constants.LAST_FETCH_DATE_FOLDER, 'crm/last_fetch_datetime')
# CRMから取得し、取込用に変換したデータを格納するフォルダ
CRM_IMPORT_DATA_FOLDER = os.environ[constants.CRM_IMPORT_DATA_FOLDER]
CRM_IMPORT_DATA_FOLDER = os.environ.get(constants.CRM_IMPORT_DATA_FOLDER, 'crm/target')
# CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ[constants.LAST_FETCH_DATE_BACKUP_FOLDER]
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ.get(constants.LAST_FETCH_DATE_BACKUP_FOLDER, 'last_fetch_datetime')
# CRMから取得した生データのバックアップを格納するフォルダパス
RESPONSE_JSON_BACKUP_FOLDER = os.environ[constants.RESPONSE_JSON_BACKUP_FOLDER]
RESPONSE_JSON_BACKUP_FOLDER = os.environ.get(constants.RESPONSE_JSON_BACKUP_FOLDER, 'response_json')
# CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ[constants.CRM_IMPORT_DATA_BACKUP_FOLDER]
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ.get(constants.CRM_IMPORT_DATA_BACKUP_FOLDER, 'data_import')

View File

@ -1,12 +1,33 @@
import json
from src.aws.s3 import ConfigBucket
from src.system_var.constants import UPD_JP_NAME
from src.config.objects import TargetObject, LastFetchDatetime
from src.error.exceptions import FileUploadException
from src.system_var.constants import UPD_JP_NAME
from src.util.logger import logger_instance as logger
def updload_last_fetch_datetime(target_object, last_fetch_datetime):
def upload_last_fetch_datetime_process(target_object: TargetObject, last_fetch_datetime: LastFetchDatetime):
"""
前回取得日時ファイル更新
Parameters
----------
target_object : TargetObject
取得対象オブジェクト情報インスタンス
last_fetch_datetime : LastFetchDatetime
取得対象オブジェクト情報インスタンス
Returns
-------
なし
Raises
------
FileUploadException
S3のファイルアップロード失敗
"""
# ① 前回取得日時ファイル更新処理の開始ログを出力する
logger.info(
f'I-UPD-01 [{target_object.object_name}] の前回取得日時ファイルの更新処理を開始します')
@ -16,20 +37,21 @@ def updload_last_fetch_datetime(target_object, last_fetch_datetime):
# ② オブジェクト情報.is_update_last_fetch_datetimeがfalseの場合、以降の処理をスキップする
logger.info(
f'I-UPD-02 [{target_object.object_name}] の前回取得日時ファイルの更新処理をスキップします')
else:
# ③ 前回取得日時ファイル.last_fetch_datetime_fromに取得処理開始年月日時分秒を設定する
# 前回取得日時ファイル.last_fetch_datetime_toに空文字を設定する
last_fetch_datetime_dict = {
'last_fetch_datetime_from': last_fetch_datetime.last_fetch_datetime_to,
'last_fetch_datetime_to': ''
}
return
config_bucket = ConfigBucket()
config_bucket.put_last_fetch_datetime_file(
target_object.last_fetch_datetime_file_name, json.dumps(last_fetch_datetime_dict))
# ③ 前回取得日時ファイル.last_fetch_datetime_fromに取得処理開始年月日時分秒を設定する
# 前回取得日時ファイル.last_fetch_datetime_toに空文字を設定する
last_fetch_datetime_dict = {
'last_fetch_datetime_from': last_fetch_datetime.last_fetch_datetime_to,
'last_fetch_datetime_to': ''
}
logger.info(
f'D-UPD-03 [{target_object.object_name}] の前回取得日時ファイル更新処理 正常終了')
config_bucket = ConfigBucket()
config_bucket.put_last_fetch_datetime_file(
target_object.last_fetch_datetime_file_name, json.dumps(last_fetch_datetime_dict))
logger.info(
f'D-UPD-03 [{target_object.object_name}] の前回取得日時ファイル更新処理 正常終了')
except Exception as e:
raise FileUploadException(

View File

@ -1,11 +1,32 @@
from src.aws.s3 import BackupBucket
from src.error.exceptions import FileUploadException
from src.system_var.constants import END_JP_NAME
from src.system_var.environments import PROCESS_RESULT_FILENAME
from src.error.exceptions import FileUploadException
from src.util.execute_datetime import ExecuteDateTime
from src.util.logger import logger_instance as logger
def updload_result_data(process_result, date_path):
def upload_result_data_process(process_result: dict, execute_datetime: ExecuteDateTime):
"""
取得処理実施結果アップロード処理
Parameters
----------
process_result : dict
取得処理実行結果辞書オブジェクト
last_fetch_datetime : LastFetchDatetime
取得対象オブジェクト情報インスタンス
Returns
-------
なし
Raises
------
FileUploadException
S3のファイルアップロード失敗
"""
# ① 取得処理実施結果アップロード処理のログを出力する
logger.info(
f'I-END-01 取得処理実施結果アップロード処理を開始します')
@ -14,7 +35,7 @@ def updload_result_data(process_result, date_path):
# ② CRMバックアップ保管用バケットに、取得処理実施結果のJSONデータを保管する
backup_bucket = BackupBucket()
backup_bucket.put_result_json(
f'{date_path}/{PROCESS_RESULT_FILENAME}', process_result)
f'{execute_datetime.to_path()}/{PROCESS_RESULT_FILENAME}', process_result)
logger.debug(f'D-END-02 取得処理実施結果アップロード 正常終了')

View File

@ -1,64 +1,39 @@
import re
class DictCheck:
def __init__(self) -> None:
pass
class DictChecker:
def __init__(self, object_dict: dict) -> None:
self.__object_dict = object_dict
def check_key_exist(self, object_dict: dict, check_key: str) -> bool:
'''
辞書型キー存在チェック
'''
return True if check_key in object_dict and object_dict[check_key] != '' else False
def check_key_exist(self, check_key: str) -> bool:
"""辞書型キー存在チェック"""
return check_key in self.__object_dict and self.__object_dict[check_key] != ''
def check_value_type(self, object_dict: dict, check_key: str, check_type: type) -> bool:
'''
辞書型バリュー型チェック
'''
return True if isinstance(object_dict[check_key], check_type) else False
def check_data_type(self, check_key: str, check_type: type) -> bool:
"""辞書型バリュー型チェック"""
return isinstance(self.__object_dict[check_key], check_type)
def check_regex(self, regex_str: str, object_dict: dict, check_key: str) -> bool:
'''
辞書型バリュー正規表現チェック
'''
return True if re.fullmatch(regex_str, object_dict[check_key]) else False
def check_match_pattern(self, regex_str: str, check_key: str) -> bool:
"""辞書型バリュー正規表現チェック"""
return True if re.fullmatch(regex_str, self.__object_dict[check_key]) else False
def check_key_exist_and_value_type(self, object_dict: dict, check_key: str, check_type: type) -> None:
'''
辞書型キー存在チェックバリュー型チェック
'''
if not self.check_key_exist(object_dict, check_key):
def assert_key_exist(self, check_key: str, check_type: type) -> None:
"""辞書型キー存在検査"""
if not self.check_key_exist(check_key):
raise Exception(f'{check_key}」キーは必須です')
elif not self.check_value_type(object_dict, check_key, check_type):
return
def assert_data_type(self, check_key: str, check_type: type) -> None:
"""バリュー型検査"""
if not self.check_data_type(check_key, check_type):
raise Exception(f'{check_key}」キーの値は「{check_type}」でなければなりません')
def check_key_exist_case_value_type(self, object_dict: dict, check_key: str, check_type: type):
'''
辞書型キー存在した場合のバリュー型チェック
'''
if not self.check_key_exist(object_dict, check_key):
pass
return
elif not self.check_value_type(object_dict, check_key, check_type):
raise Exception(f'{check_key}」キーの値は「{check_type}」でなければなりません')
def check_key_exsit_and_regex(self, object_dict: dict, check_key: str, regex_str: str):
'''
辞書型キー存在チェックバリュー正規表現チェック
'''
if not self.check_key_exist(object_dict, check_key):
raise Exception(f'{check_key}」キーは必須です')
elif not self.check_regex(regex_str, object_dict, check_key):
def assert_match_pattern(self, check_key: str, regex_str: str):
"""正規表現検査"""
if not self.check_match_pattern(regex_str, check_key):
raise Exception(f'{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました')
def check_key_exsit_case_regex(self, object_dict: dict, check_key: str, regex_str: str):
'''
辞書型キー存在した場合のバリュー正規表現チェック
'''
if not self.check_key_exist(object_dict, check_key):
pass
elif not self.check_regex(regex_str, object_dict, check_key):
raise Exception(f'{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました')
return

View File

@ -0,0 +1,17 @@
from datetime import datetime
from src.system_var.constants import(
YYYYMMDDTHHMMSSTZ,
MILLISEC_FORMAT
)
class ExecuteDateTime:
def __init__(self):
self.__execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ)
def __str__(self) -> str:
return self.__execute_datetime
def to_path(self) -> str:
return self.__execute_datetime.rstrip(MILLISEC_FORMAT).translate(str.maketrans({'-': '/', 'T': '/', ':': None, '.': None}))

View File

@ -2,6 +2,7 @@ import logging
from src.system_var.environments import LOG_LEVEL
"""boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する"""
for name in ["boto3", "botocore", "s3transfer", "urllib3"]:
logging.getLogger(name).setLevel(logging.WARNING)

View File

@ -0,0 +1,47 @@
{
"objects": [
{
"object_name": "Territory2",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"Territory2ModelId",
"ParentTerritory2Id",
"Description",
"ForecastUserId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"DeveloperName",
"MSJ_Territory_Type__c",
"MSJ_Level__c"
],
"is_skip": false,
"is_update_last_fetch_datetime": false,
"last_fetch_datetime_file_name": "Territory2_ALL.json",
"upload_file_name": "CRM_Territory2_ALL_{execute_datetime}.csv"
},
{
"object_name": "UserTerritory2Association",
"columns": [
"Id",
"UserId",
"Territory2Id",
"IsActive",
"RoleInTerritory2",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp"
],
"is_skip": false,
"is_update_last_fetch_datetime": false,
"last_fetch_datetime_file_name": "UserTerritory2Association_ALL.json",
"upload_file_name": "CRM_UserTerritory2Association_ALL_{execute_datetime}.csv"
}
]
}

File diff suppressed because it is too large Load Diff