diff --git a/ecs/crm-datafetch/Dockerfile b/ecs/crm-datafetch/Dockerfile index e84b79b3..88e588e1 100644 --- a/ecs/crm-datafetch/Dockerfile +++ b/ecs/crm-datafetch/Dockerfile @@ -6,6 +6,13 @@ WORKDIR /usr/src/app COPY Pipfile Pipfile.lock ./ RUN pip install --no-cache-dir -r requirements.txt && \ cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime + +RUN \ + apt update -y && \ + # パッケージのセキュリティアップデートのみを適用するコマンド + apt install -y unattended-upgrades && \ + unattended-upgrades + COPY main.py ./ COPY crm-datafetch /usr/src diff --git a/ecs/crm-datafetch/main.py b/ecs/crm-datafetch/main.py index 0170f413..b0edf545 100644 --- a/ecs/crm-datafetch/main.py +++ b/ecs/crm-datafetch/main.py @@ -1,181 +1,4 @@ -# モジュールロード -import os -import sys -from datetime import datetime -import src.pre as pre # データ取得準備処理 -import src.chk as chk # オブジェクト情報形式チェック処理 -import src.date as date # データ取得期間設定処理 -import src.fetch as fetch # CRMデータ取得処理 -import src.resbk as resbk # CRM電文データバックアップ処理 -import src.conv as conv # CSV変換処理 -import src.csvbk as csvbk # CSVバックアップ処理 -import src.upld as upld # CSVアップロード処理 -import src.upd as upd # 前回取得日時ファイル更新 -import src.end as end # 取得処理実施結果アップロード処理 - -from src.utils.logger import Logger -import src.utils.data_retention as data_retention - - -# 変数 - -# logger設定 -logger = Logger().get_logger() - - -# 処理 -def main() -> None: - """コントロール処理 - - 各処理を呼び出すコントローラー - - """ - - try: - # ① CRMデータ取得処理開始ログを出力する - logger.info('I-CTRL-01 CRMデータ取得処理を開始します') - - # ② データ取得準備処理を呼び出す - try: - logger.info('I-CTRL-02 データ取得準備処理呼び出し') - crm_object_info_var = data_retention.CrmObjectInfoVar() - crm_object_info_dict, execute_datetime, process_result_per_object = pre( - crm_object_info_var) - - except Exception as e: - logger.error('') - raise CustomException(e) - - # ③ object_infoのobjectsキーの値の件数分ループする - logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始') - - for object_info in crm_object_info_dict.get('objects'): - try: - - # 1. オブジェクト情報形式チェック処理を呼び出す - logger.info('I-CTRL-04 オブジェクト情報形式チェック処理呼び出し') - - try: - chk() - - except Exception as e: - logger.error('') - - # 2. 処理対象のオブジェクト名をログ出力する - logger.info('I-CTRL-05 ') - - # 3. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する - logger.info('I-CTRL-06 ') - - # 4. データ取得期間設定処理を呼び出す - logger.info('I-CTRL-07 ') - try: - - except Exception as e: - logger.error('') - - # 5. CRMデータ取得処理を呼び出す - logger.info('I-CTRL-08 ') - try: - - except Exception as e: - logger.error('') - - # 6. CRMデータのジェネレータをループする - # 6-1. 出力ファイル名を生成する - logger.info('I-CTRL-10 ') - try: - - except Exception as e: - logger.error('') - - # 6-2. CRM電文データバックアップ処理を呼び出す - logger.info('I-CTRL-11 ') - try: - - except Exception as e: - logger.error('') - - # 6-3. CSV変換処理を呼び出す - logger.info('I-CTRL-12 ') - try: - - except Exception as e: - logger.error('') - - # 6-4. CSVバックアップ処理を呼び出す - logger.info('I-CTRL-13 ') - try: - - except Exception as e: - logger.error('') - - # 6-5. CSVアップロード処理を呼び出す - logger.info('I-CTRL-14 ') - try: - - except Exception as e: - logger.error('') - - # 7. 前回取得日時ファイル更新処理を呼びだす - logger.info('I-CTRL-15 ') - try: - - except Exception as e: - logger.error('') - - # 8. 1オブジェクトのアップロードが完了した旨をログに出力する - logger.info('I-CTRL-16 ') - - except GetObjectException as e: - logger.error( - '[取得対象のオブジェクト名] の[処理名]でエラーが発生しました 次のオブジェクトの処理に移行します') - - except Exception as e: - logger.error( - '[取得対象のオブジェクト名] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します') - - # ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する - try: - logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{実行結果JSON}]') - - except Exception as e: - logger.error('') - raise CustomException() - - # ⑤ 取得処理実施結果アップロード処理を呼び出す - try: - logger.info('I-CTRL-18 CRM_取得処理実施結果ファイルアップロード処理開始') - - except Exception as e: - logger.error('') - raise CustomException() - - # ⑥ CRMデータ取得処理終了ログを出力する - try: - logger.info('I-CTRL-19 CRMデータ取得処理を終了します') - - except Exception as e: - logger.error('') - raise - - except CustomException as e: - logger.error('I-ERR-01 [処理名]でエラーが発生したため、処理を終了します') - - except Exception as e: - logger.error('I-ERR-02 予期せぬエラーが発生したため、処理を終了します') - - -# カスタムExceptionクラス -class CustomException(Exception, metaclass=ABCMeta): - def __init__(self, id, arg): - self.arg = arg - self.id = id - - -class GetObjectException(CustomException): - pass - +import src.process as process if __name__ == '__main__': - main() + process.main() diff --git a/ecs/crm-datafetch/src/utils/__init__.py b/ecs/crm-datafetch/src/aws/__init__.py similarity index 100% rename from ecs/crm-datafetch/src/utils/__init__.py rename to ecs/crm-datafetch/src/aws/__init__.py diff --git a/ecs/crm-datafetch/src/aws/s3.py b/ecs/crm-datafetch/src/aws/s3.py new file mode 100644 index 00000000..e5235016 --- /dev/null +++ b/ecs/crm-datafetch/src/aws/s3.py @@ -0,0 +1,107 @@ +import boto3 +import json +from src.environments import ( + CRM_CONFIG_BUCKET, + CRM_BACKUP_BUCKET, + IMPORT_DATA_BUCKET, + OBJECT_INFO_FOLDER, + OBJECT_INFO_FILENAME, + CRM_IMPORT_DATA_FOLDER, + CRM_IMPORT_DATA_BACKUP_FOLDER, + LAST_FETCH_DATE_FOLDER, + RESPONSE_JSON_BACKUP_FOLDER +) +from src.constants import ( + AWS_RESOURCE_S3, + AWS_CLINET_S3, + S3_RESPONSE_BODY +) + + +class S3Clinet: + def __init__(self) -> None: + self.__s3_client = boto3.client(AWS_CLINET_S3) + + def copy_object(self, src_bucket, src_key, dest_bucket, dest_key): + self.__s3_client.copy( + {"Bucket": src_bucket, "key": src_key}, dest_bucket, dest_key) + return + + +class S3Resource: + def __init__(self, bucket_name: str) -> None: + self.__s3_resource = boto3.resource(AWS_RESOURCE_S3) + self.__s3_bucket = self.__s3_resource.Bucket(bucket_name) + + def get_object(self, object_key: str) -> str: + response = self.__s3_bucket.Object(object_key).get() + body = response[S3_RESPONSE_BODY].read() + + return body.decode('utf-8') + + def put_object(self, object_key: str, data) -> str: + s3_object = self.__s3_bucket.Object(object_key) + s3_object.put(Body=data.encode('utf-8'), ContentEncoding='utf-8') + return + + +class S3ResourceNonBucket: + def __init__(self) -> None: + self.__s3_resource = boto3.resource(AWS_RESOURCE_S3) + + def copy(self, src_bucket, src_key, dest_bucket, dest_key): + copy_source = {'Bucket': src_bucket, 'Key': src_key} + self.__s3_resource.meta.client.copy(copy_source, dest_bucket, dest_key) + + return + + +class ConfigBucket: + __s3_resource: S3Resource = None + + def __init__(self) -> None: + self.__s3_resource = S3Resource(CRM_CONFIG_BUCKET) + + def get_object_info_file(self): + return self.__s3_resource.get_object(f'{OBJECT_INFO_FOLDER}/{OBJECT_INFO_FILENAME}') + + def get_last_fetch_datetime_file(self, file_name): + return self.__s3_resource.get_object(f'{LAST_FETCH_DATE_FOLDER}/{file_name}') + + def put_last_fetch_datetime_file(self, file_name, data): + self.__s3_resource.put_object( + f'{LAST_FETCH_DATE_FOLDER}/{file_name}', data) + return + + +class DataBucket: + __s3_resource: S3Resource = None + + def __init__(self) -> None: + self.__s3_resource = S3Resource(IMPORT_DATA_BUCKET) + + def put_csv(self, file_name, data): + object_key = f'{CRM_IMPORT_DATA_FOLDER}/{file_name}' + self.__s3_resource.put_object(object_key, data) + return + + +class BackupBucket: + __s3_resource: S3Resource = None + + def __init__(self) -> None: + self.__s3_resource = S3Resource(CRM_BACKUP_BUCKET) + + def put_response_json(self, file_name, data): + object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_name}' + self.__s3_resource.put_object(object_key, json.dumps(data)) + return + + def put_csv_bk(self, file_name, data): + object_key = f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{file_name}' + self.__s3_resource.put_object(object_key, data) + return + + def put_result_json(self, file_name, data): + object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_name}' + self.__s3_resource.put_object(object_key, json.dumps(data)) diff --git a/ecs/crm-datafetch/src/chk.py b/ecs/crm-datafetch/src/chk.py index e69de29b..1342243e 100644 --- a/ecs/crm-datafetch/src/chk.py +++ b/ecs/crm-datafetch/src/chk.py @@ -0,0 +1,30 @@ +from src.util.logger import Logger +from src.constants import( + CHK_JP_NAME +) +from src.error.exceptions import( + InvalidConfigException +) +from src.config.objects import TargetObject + + +logger = Logger().get_logger() + + +def check_object_info(object_info, execute_datetime): + # ① オブジェクト情報形式チェック処理開始ログを出力する + logger.info('I-CHK-01 オブジェクト情報形式チェック処理を開始します') + + try: + # ② オブジェクト情報形式チェック + target_object = TargetObject(object_info, execute_datetime) + + except Exception as e: + raise InvalidConfigException( + 'E-CHK-01', CHK_JP_NAME, f'オブジェクト情報形式チェック処理が失敗しました エラー内容:[{e}]') + + # ③ チェック処理終了ログを出力する + logger.info('I-CHK-02 オブジェクト情報形式チェック処理を終了します') + + # ④ 次の処理へ移行する + return target_object diff --git a/ecs/crm-datafetch/src/config/objects.py b/ecs/crm-datafetch/src/config/objects.py new file mode 100644 index 00000000..edce5b33 --- /dev/null +++ b/ecs/crm-datafetch/src/config/objects.py @@ -0,0 +1,195 @@ +from src.constants import ( + DATE_PATTERN_YYYYMMDDTHHMMSSTZ +) +from src.util.dict_checker import DictCheck + + +class FetchTargetObjects(): + def __init__(self, object_info_file_dict) -> None: + self.__dict_check = DictCheck() + self.__objects = object_info_file_dict + self.__key = 'objects' + self.check_key_objects() + self.__i = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.__i == len(self.__objects[self.__key]): + raise StopIteration() + value = self.__objects['objects'][self.__i] + self.__i += 1 + return value + + def check_key_objects(self) -> None: + __check_key = self.__key + __check_type = list + self.__dict_check.check_key_exist_and_value_type( + self.__objects, __check_key, __check_type) + + +class TargetObject(): + def __init__(self, object_info, execute_datetime) -> None: + self.__dict_check = DictCheck() + self.__object_info = object_info + self.execute_datetime = execute_datetime + self.check_key_object_name() + self.check_key_columns() + self.check_key_is_skip() + self.check_key_is_update_last_fetch_datetime() + self.check_key_last_fetch_datetime_file_name() + self.check_key_upload_file_name() + self.object_name = self.__object_info['object_name'] + self.columns = self.__object_info['columns'] + self.is_skip = self.set_is_skip() + self.is_update_last_fetch_datetime = self.set_is_update_last_fetch_datetime() + self.last_fetch_datetime_file_name = self.set_fetch_datetime_file_name() + self.upload_file_name = self.set_upload_file_name() + + def check_key_object_name(self) -> None: + ''' + オブジェクト名チェック + ''' + __check_key = 'object_name' + __check_type = str + self.__dict_check.check_key_exist_and_value_type( + self.__object_info, __check_key, __check_type) + + return + + def check_key_columns(self) -> None: + ''' + カラム情報チェック + ''' + __key = 'columns' + __type = list + self.__dict_check.check_key_exist_and_value_type( + self.__object_info, __key, __type) + + return + + def check_key_is_skip(self,) -> None: + ''' + スキップフラグ型チェック + ''' + __check_key = 'is_skip' + __check_type = bool + self.__dict_check.check_key_exist_case_value_type( + self.__object_info, __check_key, __check_type) + + return + + def check_key_is_update_last_fetch_datetime(self) -> None: + ''' + 前回取得日時ファイル更新フラグチェック + ''' + __check_key = 'is_update_last_fetch_datetime' + __check_type = bool + self.__dict_check.check_key_exist_case_value_type( + self.__object_info, __check_key, __check_type) + + return + + def check_key_last_fetch_datetime_file_name(self) -> None: + ''' + 前回取得日時ファイル名型チェック + ''' + __check_key = 'last_fetch_datetime_file_name' + __check_type = str + self.__dict_check.check_key_exist_case_value_type( + self.__object_info, __check_key, __check_type) + + return + + def check_key_upload_file_name(self) -> None: + ''' + アップロードファイル名称型チェック + ''' + __check_key = 'upload_file_name' + __check_type = str + self.__dict_check.check_key_exist_case_value_type( + self.__object_info, __check_key, __check_type) + + return + + def set_is_skip(self) -> bool: + ''' + スキップフラグ設定 + ''' + __check_key = 'is_skip' + if self.__dict_check.check_key_exist(self.__object_info, __check_key): + return self.__object_info[__check_key] + else: + return False + + def set_is_update_last_fetch_datetime(self) -> bool: + ''' + 前回取得日時ファイル更新フラグ設定 + ''' + __check_key = 'is_update_last_fetch_datetime' + if self.__dict_check.check_key_exist(self.__object_info, __check_key): + return self.__object_info[__check_key] + else: + return False + + def set_fetch_datetime_file_name(self) -> str: + ''' + 前回取得日時ファイル名設定 + ''' + __check_key = 'last_fetch_datetime_file_name' + if self.__dict_check.check_key_exist(self.__object_info, __check_key): + return self.__object_info[__check_key] + else: + return self.__object_info['object_name'] + '.json' + + def set_upload_file_name(self) -> str: + ''' + アップロードファイル名称設定 + ''' + __check_key = 'upload_file_name' + if self.__dict_check.check_key_exist(self.__object_info, __check_key): + return self.__object_info[__check_key].format(execute_datetime=self.execute_datetime) + else: + return 'CRM_' + self.__object_info['object_name'] + '_' + self.execute_datetime + + +class LastFetchDatetime(): + def __init__(self, last_fetch_datetime_file_name, last_fetch_datetime_file_dict, execute_datetime) -> None: + self.__dict_check = DictCheck() + self.execute_datetime = execute_datetime + self.__last_fetch_datetime_file_dict = last_fetch_datetime_file_dict + self.last_fetch_datetime_file_name = last_fetch_datetime_file_name + self.check_key_last_fetch_datetime_from + self.check_key_last_fetch_datetime_to + self.last_fetch_datetime_from = self.__last_fetch_datetime_file_dict[ + 'last_fetch_datetime_from'] + self.last_fetch_datetime_to = self.set_last_fetch_datetime_to() + + def check_key_last_fetch_datetime_from(self) -> None: + ''' + データ取得開始日時チェック + ''' + __check_key = 'last_fetch_datetime_from' + __regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ + self.__dict_check.check_key_exsit_and_regex( + self.__last_fetch_datetime_file_dict, __check_key, __regex_str) + + def check_key_last_fetch_datetime_to(self) -> None: + ''' + データ取得終了日時チェック + ''' + __check_key = 'last_fetch_datetime_to' + __regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ + self.__dict_check.check_key_exsit_case_regex( + self.__last_fetch_datetime_file_dict, __check_key, __regex_str) + + def set_last_fetch_datetime_to(self) -> str: + ''' + データ取得終了日時設定 + ''' + __check_key = 'last_fetch_datetime_to' + if self.__dict_check.check_key_exist(self.__last_fetch_datetime_file_dict, __check_key): + return self.__last_fetch_datetime_file_dict[__check_key].format(execute_datetime=self.execute_datetime) + else: + return self.execute_datetime diff --git a/ecs/crm-datafetch/src/constants.py b/ecs/crm-datafetch/src/constants.py new file mode 100644 index 00000000..fcd6b4a3 --- /dev/null +++ b/ecs/crm-datafetch/src/constants.py @@ -0,0 +1,73 @@ +# environments(task settings file) +LOG_LEVEL = "LOG_LEVEL" # ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する +CRM_AUTH_TIMEOUT = 'CRM_AUTH_TIMEOUT' # CRMへの認証処理のタイムアウト秒数 +CRM_AUTH_MAX_RETRY_ATTEMPT = 'CRM_AUTH_MAX_RETRY_ATTEMPT' # CRMへの認証処理の最大リトライ試行回数 +CRM_AUTH_RETRY_INTERVAL = 'CRM_AUTH_RETRY_INTERVAL' # CRMへの認証処理のリトライ時の初回待ち秒数 +CRM_AUTH_RETRY_MIN_INTERVAL = 'CRM_AUTH_RETRY_MIN_INTERVAL' # CRMへの認証処理のリトライ時の最小待ち秒数 +CRM_AUTH_RETRY_MAX_INTERVAL = 'CRM_AUTH_RETRY_MAX_INTERVAL' # CRMへの認証処理のリトライ時の最大待ち秒数 +CRM_GET_RECORD_COUNT_TIMEOUT = 'CRM_GET_RECORD_COUNT_TIMEOUT' # CRMのレコード件数取得処理のタイムアウト秒数 +CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = 'CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT' # CRMのレコード件数取得処理の最大リトライ試行回数 +CRM_GET_RECORD_COUNT_RETRY_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_INTERVAL' # CRMのレコード件数取得処理のリトライ時の初回待ち秒数 +CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL' # CRMのレコード件数取得処理のリトライ時の最小待ち秒数 +CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL' # CRMのレコード件数取得処理のリトライ時の最大待ち秒数 +CRM_FETCH_RECORD_TIMEOUT = 'CRM_FETCH_RECORD_TIMEOUT' # CRMのレコード取得処理のタイムアウト秒数 +CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = 'CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT' # CRMのレコード取得処理の最大リトライ試行回数 +CRM_FETCH_RECORD_RETRY_INTERVAL = 'CRM_FETCH_RECORD_RETRY_INTERVAL' # CRMのレコード取得処理のリトライ時の初回待ち秒数 +CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MIN_INTERVAL' # CRMのレコード取得処理のリトライ時の最小待ち秒数 +CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MAX_INTERVAL' # CRMのレコード取得処理のリトライ時の最大待ち秒数 + +# environments(ECS Task Enviroment) +CRM_AUTH_DOMAIN = 'CRM_AUTH_DOMAIN' # CRMのAPI実行のための認証エンドポイントのドメイン +CRM_USER_NAME = 'CRM_USER_NAME' # CRMのAPI実行用ユーザ名 +CRM_USER_PASSWORD = 'CRM_USER_PASSWORD' # CRMのAPI実行用ユーザパスワード +CRM_USER_SECURITY_TOKEN = 'CRM_USER_SECURITY_TOKEN' # CRMのAPI実行用ユーザのセキュリティトークン +CRM_CONFIG_BUCKET = 'CRM_CONFIG_BUCKET' # CRMデータ取得用の設定ファイルを格納するバケット名 +CRM_BACKUP_BUCKET = 'CRM_BACKUP_BUCKET' # CRMのバックアップデータを格納するバケット名 +IMPORT_DATA_BUCKET = 'IMPORT_DATA_BUCKET' # CRMの取込データを格納するバケット名 +OBJECT_INFO_FOLDER = 'OBJECT_INFO_FOLDER' # CRM取得対象オブジェクトの情報を格納するフォルダパス +OBJECT_INFO_FILENAME = 'OBJECT_INFO_FILENAME' # CRM取得対象オブジェクトの情報のファイル名 +PROCESS_RESULT_FOLDER = 'PROCESS_RESULT_FOLDER' # CRMデータ取得結果を格納するフォルダパス +PROCESS_RESULT_FILENAME = 'PROCESS_RESULT_FILENAME' # CRMデータ取得結果を格納するファイル名 +LAST_FETCH_DATE_FOLDER = 'LAST_FETCH_DATE_FOLDER' # CRMからの最終取得日時ファイルを格納するフォルダパス +CRM_IMPORT_DATA_FOLDER ='CRM_IMPORT_DATA_FOLDER' # CRMから取得し、取込用に変換したデータを格納するフォルダ +LAST_FETCH_DATE_BACKUP_FOLDER = 'LAST_FETCH_DATE_BACKUP_FOLDER' # CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス +RESPONSE_JSON_BACKUP_FOLDER = 'RESPONSE_JSON_BACKUP_FOLDER' # CRMから取得した生データのバックアップを格納するフォルダパス +CRM_IMPORT_DATA_BACKUP_FOLDER = 'CRM_IMPORT_DATA_BACKUP_FOLDER' # CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ + +# 時刻フォーマット +# .000ZはUTCを表す。ミリ秒までの考慮は不要なので固定で指定 +YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z' +CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z' +YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S' + +# aws +AWS_RESOURCE_S3 = 's3' +AWS_CLINET_S3 = 's3' +S3_RESPONSE_BODY = 'Body' + + +# 正規表現チェック +EXCLUDE_SYMBOL = ['#', '/'] +DATE_PATTERN_YYYYMMDDTHHMMSSTZ = r'[12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])T([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9].000Z' + +# logger +LOG_FORMAT = '[%(levelname)s]\t%(asctime)s\t%(message)s\n' +LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S' +LOG_LEVEL_INFO = 'INFO' + +# 処理名 +PROCESS_JP_NAME = 'コントロール処理' +PRE_JP_NAME = 'データ取得準備処理' +CHK_JP_NAME = 'オブジェクト情報形式チェック処理' +DATE_JP_NAME = 'データ取得期間設定処理' +FETCH_JP_NAME = 'CRMデータ取得処理' +RESBK_JP_NAME = 'CRM電文データバックアップ処理' +CONV_JP_NAME = 'CSV変換処理' +CSVBK_JP_NAME = 'CSVバックアップ処理' +UPLD_JP_NAME = 'CSVアップロード処理' +UPD_JP_NAME = '前回取得日時ファイル更新' +END_JP_NAME = '取得処理実施結果アップロード処理' + +# CSVチェック +CSV_TRUE_VALUE = '1' +CSV_FALSE_VALUE = '0' \ No newline at end of file diff --git a/ecs/crm-datafetch/src/conv.py b/ecs/crm-datafetch/src/conv.py index e69de29b..88b86ae8 100644 --- a/ecs/crm-datafetch/src/conv.py +++ b/ecs/crm-datafetch/src/conv.py @@ -0,0 +1,33 @@ +from datetime import datetime +from src.util.logger import Logger +from src.constants import ( + CONV_JP_NAME, +) +from src.error.exceptions import( + DataConvertException +) +from src.converter.converter import CSVStringConverter + + +logger = Logger().get_logger() + + +def convert_crm_csvdata(target_object, sf_object_jsons): + # ① CSV変換処理の開始ログを出力する + logger.info(f'I-CONV-01 [{target_object.object_name}] のCSV変換処理を開始します') + + try: + # ② CSV変換 + csv_object = CSVStringConverter(target_object, sf_object_jsons) + + logger.debug(f'D-CONV-02 [{target_object.object_name}] のCSV変換処理 正常終了') + + except Exception as e: + raise DataConvertException( + 'E-CONV-01', CONV_JP_NAME, f'[{target_object.object_name}] CSV変換に失敗しました エラー内容:[{e}]') + + # ③ CSV変換処理の終了ログを出力する + logger.info(f'I-CONV-03 [{target_object.object_name}] のCSV変換処理を終了します') + + # ④ 次の処理へ移行する + return csv_object diff --git a/ecs/crm-datafetch/src/converter/converter.py b/ecs/crm-datafetch/src/converter/converter.py new file mode 100644 index 00000000..1b785c35 --- /dev/null +++ b/ecs/crm-datafetch/src/converter/converter.py @@ -0,0 +1,138 @@ +import re +import io +import csv +from datetime import datetime +from src.constants import( + CSV_TRUE_VALUE, + CSV_FALSE_VALUE, + CRM_DATETIME_FORMAT, + YYYYMMDDHHMMSS +) + + +class CSVStringConverter: + def __init__(self, target_object, sf_object_jsons) -> None: + self.__target_object = target_object + self.__sf_object_jsons = sf_object_jsons + self.__extracted_sf_object_jsons = self.extract_sf_object_jsons() + self.csv_data = self.convert_to_csv() + self.csv_buffer = self.write_csv() + + def extract_sf_object_jsons(self) -> list: + try: + extracted_sf_object_jsons = [] + for sf_object_json in self.__sf_object_jsons: + extracted_sf_object_jsons.append( + self.extract_necessary_props_from(sf_object_json)) + + return extracted_sf_object_jsons + + except Exception as e: + raise Exception('必要なjsonのデータ抽出に失敗しました') + + def extract_necessary_props_from(self, sf_object_json) -> dict: + try: + clone_sf_object = {**sf_object_json} + + del clone_sf_object['attributes'] + + uppercase_key_sf_object = { + k.upper(): v for k, v in clone_sf_object.items()} + + return uppercase_key_sf_object + + except Exception as e: + raise Exception('必要なjsonのデータ成形に失敗しました') + + def convert_to_csv(self) -> list: + try: + columns = self.__target_object.columns + csv_data = [] + for i, json_object in enumerate(self.__extracted_sf_object_jsons, 1): + csv_row = [] + for column in columns: + v = json_object[column.upper()] + + converted_value = CSVStringConverterFactory( + v).value_convert() + + csv_row.append(converted_value) + + csv_data.append(csv_row) + return csv_data + + except Exception as e: + raise Exception( + f'CSV変換に失敗しました カラム名:[{column}] 行番号: [{i}] エラー内容:[{e}]') + + def write_csv(self) -> str: + try: + with io.StringIO(newline='') as string_stream: + writer = csv.writer(string_stream, delimiter=',', lineterminator='\r\n', + doublequote=True, quotechar='"', quoting=csv.QUOTE_ALL, strict=True) + writer.writerow(self.__target_object.columns) + writer.writerows(self.csv_data) + csv_value = string_stream.getvalue() + + return csv_value + + except Exception as e: + raise Exception('csvデータの取得に失敗しました') + + +class NoneValueConverter: + def __init__(self, convert_value) -> None: + self.__convert_value = convert_value + self.value = self.convert_value() + + def convert_value(self) -> str: + return '' + + +class BooleanConverter: + def __init__(self, convert_value) -> None: + self.__convert_value = convert_value + self.value = self.convert_value() + + def convert_value(self) -> bool: + return CSV_TRUE_VALUE if self.__convert_value is True else CSV_FALSE_VALUE + + +class DatatimeConverter: + def __init__(self, convert_value) -> None: + self.__convert_value = convert_value + self.value = self.convert_value() + + def convert_value(self) -> str: + return datetime.strptime(self.__convert_value, CRM_DATETIME_FORMAT).strftime(YYYYMMDDHHMMSS) + + +class FloatConverter: + def __init__(self, convert_value) -> None: + self.__convert_value = convert_value + self.value = self.convert_value() + + def convert_value(self) -> int: + return int(self.__convert_value) + + +class CSVStringConverterFactory: + def __init__(self, v) -> None: + self.__value = v + + def value_convert(self): + + converted_value = self.__value + + if self.__value is None: + converted_value = NoneValueConverter(self.__value).value + # 指数表記で取得できるパターン。指数表記を整数表記に変換する。 + elif type(self.__value) == float: + converted_value = FloatConverter(self.__value).value + # SQLの真偽値に対応するために変換する + elif type(self.__value) == bool: + converted_value = BooleanConverter(self.__value).value + elif type(self.__value) == str and re.fullmatch(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.000\+0000', self.__value): + converted_value = DatatimeConverter(self.__value).value + + return converted_value diff --git a/ecs/crm-datafetch/src/csvbk.py b/ecs/crm-datafetch/src/csvbk.py index e69de29b..db490c63 100644 --- a/ecs/crm-datafetch/src/csvbk.py +++ b/ecs/crm-datafetch/src/csvbk.py @@ -0,0 +1,34 @@ +from src.util.logger import Logger +from src.aws.s3 import BackupBucket +from src.constants import ( + CSVBK_JP_NAME +) +from src.error.exceptions import( + FileUploadException +) + + +logger = Logger().get_logger() + + +def backup_crm_csvdata(target_object, date_path, csv_object): + # ① CSVバックアップ処理の開始ログを出力する + logger.info( + f'I-CSVBK-01 [{target_object.object_name}] のCSVデータのバックアップ処理を開始します ファイル名:[{target_object.upload_file_name}.csv]') + + try: + # ② CRMバックアップ保管用バケットに、変換後のCSVデータのバックアップを保管する + backup_bucket = BackupBucket() + backup_bucket.put_csv_bk( + f'{date_path}/{target_object.upload_file_name}.csv', csv_object.csv_buffer) + + except Exception as e: + raise FileUploadException( + 'E-CSVBK-01', CSVBK_JP_NAME, f'[{target_object.object_name}] CSVデータのバックアップに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]') + + # ③ CSVバックアップ処理の終了ログを出力する + logger.info( + f'I-CSVBK-03 [{target_object.object_name}] のCSVデータのバックアップ処理を終了します') + + # ④ 次の処理へ移行する + return diff --git a/ecs/crm-datafetch/src/date.py b/ecs/crm-datafetch/src/date.py index e69de29b..8fff0d94 100644 --- a/ecs/crm-datafetch/src/date.py +++ b/ecs/crm-datafetch/src/date.py @@ -0,0 +1,65 @@ +from src.util.logger import Logger +from src.constants import( + DATE_JP_NAME +) +from src.environments import ( + CRM_CONFIG_BUCKET, + LAST_FETCH_DATE_FOLDER +) +from src.error.exceptions import( + FileNotFoundException, + InvalidConfigException +) +from src.parser.json_parse import JsonParser +from src.config.objects import LastFetchDatetime +from src.aws.s3 import ConfigBucket + + +logger = Logger().get_logger() + + +def set_datetime_period(target_object, execute_datetime): + # ① データ取得期間設定処理の開始ログを出力する + logger.info( + f'I-DATE-01 [{target_object.object_name}] のデータ取得期間設定処理を開始します') + + try: + # ② S3 設定ファイル保管用バケットから、前回取得日時ファイルを取得する + logger.info( + f'I-DATE-02 前回取得日時ファイルの取得開始します ファイルパス:[s3://{CRM_CONFIG_BUCKET}/{LAST_FETCH_DATE_FOLDER}/{target_object.last_fetch_datetime_file_name}]') + + s3_config_bucket = ConfigBucket() + last_fetch_datetime_file_json = s3_config_bucket.get_last_fetch_datetime_file( + target_object.last_fetch_datetime_file_name) + + logger.info(f'I-DATE-03 前回取得日時ファイルの取得成功しました') + + except Exception as e: + raise FileNotFoundException( + 'E-DATE-01', DATE_JP_NAME, f'前回取得日時ファイルが存在しません ファイル名:[{target_object.last_fetch_datetime_file_name}] エラー内容:[{e}]') + + try: + # ③ 取得した前回取得日時ファイルの形式チェックを行う + # ④ データの取得期間を設定する + logger.debug(f'D-DATE-04 前回取得日時ファイルの形式チェックを開始します') + + json_parser = JsonParser(last_fetch_datetime_file_json) + last_fetch_datetime_file_dict = json_parser.json_parser() + + last_fetch_datetime = LastFetchDatetime(target_object.last_fetch_datetime_file_name, + last_fetch_datetime_file_dict, execute_datetime) + + logger.debug(f'D-DATE-05 前回取得日時ファイルの形式チェック 正常終了') + logger.info( + f'I-DATE-06 取得範囲 From: [{last_fetch_datetime.last_fetch_datetime_from}] To: [{last_fetch_datetime.last_fetch_datetime_to}]') + + except Exception as e: + raise InvalidConfigException( + 'E-DATE-02', DATE_JP_NAME, f'前回取得日時ファイルの形式チェック処理が失敗しました エラー内容:[{e}]') + + # ⑤ データ取得準備処理の終了ログを出力する + logger.info( + f'I-DATE-07 [{target_object.object_name}] のデータ取得期間設定処理を終了します') + + # ⑥ 次の処理へ移行する + return last_fetch_datetime diff --git a/ecs/crm-datafetch/src/end.py b/ecs/crm-datafetch/src/end.py index e69de29b..e2b274b3 100644 --- a/ecs/crm-datafetch/src/end.py +++ b/ecs/crm-datafetch/src/end.py @@ -0,0 +1,37 @@ +from src.util.logger import Logger +from src.constants import ( + END_JP_NAME +) +from src.environments import( + PROCESS_RESULT_FILENAME +) +from src.error.exceptions import( + FileUploadException +) +from src.aws.s3 import BackupBucket + + +logger = Logger().get_logger() + + +def updload_result_data(process_result, date_path): + # ① 取得処理実施結果アップロード処理のログを出力する + logger.info( + f'I-END-01 取得処理実施結果アップロード処理を開始します') + + try: + # ② CRMバックアップ保管用バケットに、取得処理実施結果のJSONデータを保管する + backup_bucket = BackupBucket() + backup_bucket.put_result_json( + f'{date_path}/{PROCESS_RESULT_FILENAME}', process_result) + + logger.debug(f'D-END-02 取得処理実施結果アップロード 正常終了') + + except Exception as e: + raise FileUploadException( + 'E-END-01', END_JP_NAME, f'取得処理実施結果のアップロードに失敗しました ファイル名:[{PROCESS_RESULT_FILENAME}] エラー内容:[{e}]') + + # ③ 取得処理実施結果アップロード処理の終了ログを出力する + logger.info(f'I-END-03 取得処理実施結果アップロード処理を終了します') + + return diff --git a/ecs/crm-datafetch/src/environments.py b/ecs/crm-datafetch/src/environments.py new file mode 100644 index 00000000..8d63cc64 --- /dev/null +++ b/ecs/crm-datafetch/src/environments.py @@ -0,0 +1,73 @@ +import os + +from src.constants import ( + LOG_LEVEL, + LOG_LEVEL_INFO, + CRM_AUTH_TIMEOUT, + CRM_AUTH_MAX_RETRY_ATTEMPT, + CRM_AUTH_RETRY_INTERVAL, + CRM_AUTH_RETRY_MIN_INTERVAL, + CRM_AUTH_RETRY_MAX_INTERVAL, + CRM_GET_RECORD_COUNT_TIMEOUT, + CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT, + CRM_GET_RECORD_COUNT_RETRY_INTERVAL, + CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, + CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL, + CRM_FETCH_RECORD_TIMEOUT, + CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT, + CRM_FETCH_RECORD_RETRY_INTERVAL, + CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, + CRM_FETCH_RECORD_RETRY_MAX_INTERVAL, + CRM_AUTH_DOMAIN, + CRM_USER_NAME, + CRM_USER_PASSWORD, + CRM_USER_SECURITY_TOKEN, + CRM_CONFIG_BUCKET, + CRM_BACKUP_BUCKET, + IMPORT_DATA_BUCKET, + OBJECT_INFO_FOLDER, + OBJECT_INFO_FILENAME, + PROCESS_RESULT_FOLDER, + PROCESS_RESULT_FILENAME, + LAST_FETCH_DATE_FOLDER, + CRM_IMPORT_DATA_FOLDER, + LAST_FETCH_DATE_BACKUP_FOLDER, + RESPONSE_JSON_BACKUP_FOLDER, + CRM_IMPORT_DATA_BACKUP_FOLDER +) + +# environments(task settings file) +LOG_LEVEL = os.environ.get(LOG_LEVEL, LOG_LEVEL_INFO) # ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する +CRM_AUTH_TIMEOUT = os.environ["CRM_AUTH_TIMEOUT"] # CRMへの認証処理のタイムアウト秒数 +CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ["CRM_AUTH_MAX_RETRY_ATTEMPT"] # CRMへの認証処理の最大リトライ試行回数 +CRM_AUTH_RETRY_INTERVAL = os.environ["CRM_AUTH_RETRY_INTERVAL"] # CRMへの認証処理のリトライ時の初回待ち秒数 +CRM_AUTH_RETRY_MIN_INTERVAL = os.environ["CRM_AUTH_RETRY_MIN_INTERVAL"] # CRMへの認証処理のリトライ時の最小待ち秒数 +CRM_AUTH_RETRY_MAX_INTERVAL = os.environ["CRM_AUTH_RETRY_MAX_INTERVAL"] # CRMへの認証処理のリトライ時の最大待ち秒数 +CRM_GET_RECORD_COUNT_TIMEOUT = os.environ["CRM_GET_RECORD_COUNT_TIMEOUT"] # CRMのレコード件数取得処理のタイムアウト秒数 +CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ["CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT"] # CRMのレコード件数取得処理の最大リトライ試行回数 +CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の初回待ち秒数 +CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の最小待ち秒数 +CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の最大待ち秒数 +CRM_FETCH_RECORD_TIMEOUT = os.environ["CRM_FETCH_RECORD_TIMEOUT"] # CRMのレコード取得処理のタイムアウト秒数 +CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ["CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT"] # CRMのレコード取得処理の最大リトライ試行回数 +CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_INTERVAL"] # CRMのレコード取得処理のリトライ時の初回待ち秒数 +CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MIN_INTERVAL"] # CRMのレコード取得処理のリトライ時の最小待ち秒数 +CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MAX_INTERVAL"] # CRMのレコード取得処理のリトライ時の最大待ち秒数 + +# environments(ECS Task Enviroment) +CRM_AUTH_DOMAIN = os.environ["CRM_AUTH_DOMAIN"] # CRMのAPI実行のための認証エンドポイントのドメイン +CRM_USER_NAME = os.environ["CRM_USER_NAME"] # CRMのAPI実行用ユーザ名 +CRM_USER_PASSWORD = os.environ["CRM_USER_PASSWORD"] # CRMのAPI実行用ユーザパスワード +CRM_USER_SECURITY_TOKEN = os.environ["CRM_USER_SECURITY_TOKEN"] # CRMのAPI実行用ユーザのセキュリティトークン +CRM_CONFIG_BUCKET = os.environ["CRM_CONFIG_BUCKET"] # CRMデータ取得用の設定ファイルを格納するバケット名 +CRM_BACKUP_BUCKET = os.environ["CRM_BACKUP_BUCKET"] # CRMのバックアップデータを格納するバケット名 +IMPORT_DATA_BUCKET = os.environ["IMPORT_DATA_BUCKET"] # CRMの取込データを格納するバケット名 +OBJECT_INFO_FOLDER = os.environ["OBJECT_INFO_FOLDER"] # CRM取得対象オブジェクトの情報を格納するフォルダパス +OBJECT_INFO_FILENAME = os.environ["OBJECT_INFO_FILENAME"] # CRM取得対象オブジェクトの情報のファイル名 +PROCESS_RESULT_FOLDER = os.environ["PROCESS_RESULT_FOLDER"] # CRMデータ取得結果を格納するフォルダパス +PROCESS_RESULT_FILENAME = os.environ["PROCESS_RESULT_FILENAME"] # CRMデータ取得結果を格納するファイル名 +LAST_FETCH_DATE_FOLDER = os.environ["LAST_FETCH_DATE_FOLDER"] # CRMからの最終取得日時ファイルを格納するフォルダパス +CRM_IMPORT_DATA_FOLDER = os.environ["CRM_IMPORT_DATA_FOLDER"] # CRMから取得し、取込用に変換したデータを格納するフォルダ +LAST_FETCH_DATE_BACKUP_FOLDER = os.environ["LAST_FETCH_DATE_BACKUP_FOLDER"] # CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス +RESPONSE_JSON_BACKUP_FOLDER = os.environ["FETCH_DATA_BACKUP_FOLDER"] # CRMから取得した生データのバックアップを格納するフォルダパス +CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ["CRM_IMPORT_DATA_BACKUP_FOLDER"] # CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ diff --git a/ecs/crm-datafetch/src/error/__init__.py b/ecs/crm-datafetch/src/error/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/crm-datafetch/src/error/exceptions.py b/ecs/crm-datafetch/src/error/exceptions.py new file mode 100644 index 00000000..528c262d --- /dev/null +++ b/ecs/crm-datafetch/src/error/exceptions.py @@ -0,0 +1,45 @@ +from abc import ABCMeta + + +class MeDaCaCRMDataFetchException(Exception, metaclass=ABCMeta): + """MeDaCaシステム固有のカスタムエラークラス""" + + def __init__(self, error_id: str, func_name, message) -> None: + super().__init__(message) + self.func_name = func_name + self.error_id = error_id + + +class FileNotFoundException(MeDaCaCRMDataFetchException): + """S3のファイルが見つからない場合の例外""" + pass + + +class FileUploadException(MeDaCaCRMDataFetchException): + """S3のファイルアップロード失敗の例外""" + pass + + +class InvalidConfigException(MeDaCaCRMDataFetchException): + """Configのバリデーションチェック失敗の例外""" + pass + + +class DataConvertException(MeDaCaCRMDataFetchException): + """データ変換が失敗した場合の例外""" + pass + + +class TimeOutException(MeDaCaCRMDataFetchException): + """タイムアウトが発生した場合の例外""" + pass + + +class RetryExceededException(MeDaCaCRMDataFetchException): + """リトライ処理が発生した場合の例外""" + pass + + +class SalesforceAPIException(MeDaCaCRMDataFetchException): + """SalseforceのAPI実行失敗が発生した場合の例外""" + pass diff --git a/ecs/crm-datafetch/src/fetch.py b/ecs/crm-datafetch/src/fetch.py index e69de29b..bac313fd 100644 --- a/ecs/crm-datafetch/src/fetch.py +++ b/ecs/crm-datafetch/src/fetch.py @@ -0,0 +1,71 @@ +from src.util.logger import Logger +from src.constants import( + FETCH_JP_NAME +) +from src.error.exceptions import( + SalesforceAPIException, + DataConvertException +) +from src.salesforce.salesforce_api import( + SalesForceCount, + SalesForceData +) + + +logger = Logger().get_logger() + + +def fetch_crm_data(target_object, last_fetch_datetime): + # ① CRMデータ取得処理開始ログを出力する + logger.info( + f'I-FETCH-01 [{target_object.object_name}] のCRMからのデータ取得処理を開始します') + + object_name = target_object.object_name + columns = ','.join(target_object.columns) + last_fetch_datetime_from = last_fetch_datetime.last_fetch_datetime_from + last_fetch_datetime_to = last_fetch_datetime.last_fetch_datetime_to + + try: + # ② 取得対象オブジェクトの取得期間内のレコード件数を取得する + logger.info(f'I-FETCH-02 [{object_name}] の件数取得を開始します') + + saleforce_count = SalesForceCount() + record_count = saleforce_count.fetch_sf_count_retry( + object_name, last_fetch_datetime_from, last_fetch_datetime_to) + + logger.info(f'I-FETCH-03 [{object_name}] の件数:[{record_count}]') + + except Exception as e: + raise SalesforceAPIException( + 'E-FETCH-01', FETCH_JP_NAME, f'[{object_name}] の件数取得に失敗しました [{e}]') + + try: + # ③ 取得対象オブジェクトのレコードを取得する + logger.info(f'I-FETCH-04 [{object_name}] のレコード取得を開始します') + + saleforce_data = SalesForceData() + record_generator = saleforce_data.fetch_sf_data_retry( + columns, object_name, last_fetch_datetime_from, last_fetch_datetime_to) + + except Exception as e: + raise SalesforceAPIException( + 'E-FETCH-02', FETCH_JP_NAME, f'[{object_name}] のレコード取得に失敗しました [{e}]') + + try: + # ④ 取得対象オブジェクトをJSONに変換 + logger.info(f'I-FETCH-05 [{object_name}] のレコードをJSONに変換します') + + sf_object_jsons = [] + + for record in record_generator: + sf_object_jsons.append(record) + + except Exception as e: + raise DataConvertException( + 'E-FETCH-03', FETCH_JP_NAME, f'[{object_name}] のレコードのJSON変換に失敗しました [{e}]') + + # ⑤ CRMデータ取得処理終了ログを出力する + logger.info(f'I-FETCH-06 [{object_name}] のレコード取得が成功しました') + + # ⑥ 次の処理へ移行する + return sf_object_jsons diff --git a/ecs/crm-datafetch/src/parser/__init__.py b/ecs/crm-datafetch/src/parser/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/crm-datafetch/src/parser/json_parse.py b/ecs/crm-datafetch/src/parser/json_parse.py new file mode 100644 index 00000000..4273125d --- /dev/null +++ b/ecs/crm-datafetch/src/parser/json_parse.py @@ -0,0 +1,15 @@ +import re +import json +from src.constants import EXCLUDE_SYMBOL + +class JsonParser(): + def __init__(self, json_str) -> None: + self.__json_str = json_str + + def json_parser(self) -> dict: + for symbol in EXCLUDE_SYMBOL: + # コメントアウトシンボルを含む部分を置き換える正規表現 + replace_comment_regex = rf'\s(?!\"){symbol}[\s\S]*?.*' + self.__json_str = re.sub(replace_comment_regex, '', self.__json_str) + + return json.loads(self.__json_str) diff --git a/ecs/crm-datafetch/src/pre.py b/ecs/crm-datafetch/src/pre.py index 546cbc37..50071fe1 100644 --- a/ecs/crm-datafetch/src/pre.py +++ b/ecs/crm-datafetch/src/pre.py @@ -1,96 +1,83 @@ -# モジュールロード from datetime import datetime -from src.utils.logger import Logger -import src.utils.data_retention as data_retention -import src.utils.s3_access as s3_access -import src.utils.json_parser as json_parser +from src.util.logger import Logger +from src.constants import( + PRE_JP_NAME, + YYYYMMDDTHHMMSSTZ +) +from src.environments import( + CRM_CONFIG_BUCKET, + OBJECT_INFO_FOLDER, + OBJECT_INFO_FILENAME +) +from src.error.exceptions import( + FileNotFoundException, + InvalidConfigException +) +from src.aws.s3 import ConfigBucket +from src.parser.json_parse import JsonParser +from src.config.objects import FetchTargetObjects -# 変数 -# .000ZはUTCを表す。ミリ秒までの考慮は不要なので固定で指定 -YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z' -CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z' -YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S' - - -# logger設定 logger = Logger().get_logger() -# 処理 +def prepare_get_data(): + # ① データ取得準備処理の開始ログを出力する + logger.info('I-PRE-01 データ取得準備処理を開始します') + + # ② 取得処理開始年月日時分秒を控える + execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ) + date_path = execute_datetime.rstrip('000Z').translate( + str.maketrans({'-': '/', 'T': '/', ':': None, '.': None})) + + logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}') -def pre(crm_object_info_var): try: - # ① データ取得準備処理の開始ログを出力する - logger.info('I-PRE-01 データ取得準備処理を開始します') + # ③ S3 設定ファイル保管用バケットから、CRM_取得オブジェクト情報ファイルを取得する + object_info_file_s3_path = f's3://{CRM_CONFIG_BUCKET}{OBJECT_INFO_FOLDER}/{OBJECT_INFO_FILENAME}' + logger.debug( + f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{object_info_file_s3_path}]') - # ② 取得処理開始年月日時分秒を控える - execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ) - logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}') + config_bucket = ConfigBucket() + object_info_file_json = config_bucket.get_object_info_file() - # ⑤ S3 設定ファイル保管用バケットから、CRM_取得オブジェクト情報ファイルを取得する - - try: - crm_object_info_var = data_retention.CrmObjectInfoVar() - s3_fullpath = f's3://{crm_object_info_var.bucket}/{crm_object_info_var.file_name_key}' - logger.debug(f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{s3_fullpath}]') - - get_file_s3 = s3_access.GetFileS3() - crm_object_info_json = get_file_s3.get_file_from_s3(crm_object_info_var.bucket, crm_object_info_var.file_name_key) - - logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました') - - except Exception as e: - logger.error(f'E-PRE-01 CRM_取得オブジェクト情報ファイルが存在しません ファイル名:[{s3_fullpath}]', e) - raise e - - - # ⑥ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する - try: - logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします') - - json_exclusde_comment = json_parser.JsonExculudeComment() - crm_object_info_dict: dict = json_exclusde_comment(crm_object_info_json) - - logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました') - - except Exception as e: - logger.error(f'E-PRE-02 CRM_取得オブジェクト情報ファイルのパースに失敗しました エラー内容:[{e}]') - raise e - - # ⑦ メモリ上のCRM_取得オブジェクト情報のキーobjectsの形式チェックを行う - try: - logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェックを開始します') - - if 'objects' not in crm_object_info_dict: - logger.error(f'E-PRE-03 CRM_取得オブジェクト情報ファイル「objects」キーは必須です ファイル名:[{s3_fullpath}]') - raise e - - if not isinstance(crm_object_info_dict["objects"], list): - logger.error(f'E-PRE-04 CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名:[{s3_fullpath}]') - raise e - - logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェック 正常終了') - - except Exception as e: - logger.error(f'CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名:[{s3_fullpath}]', e) - raise e - - # ⑧ 処理結果出力用のマップを初期化 - process_result_per_object = {} - for sfdc_object in crm_object_info_dict.get('objects'): - object_name = sfdc_object.get('object_name') - process_result_per_object[object_name] = 'fail' - - - - # ⑨ データ取得準備処理の終了ログを出力する - logger.info('I-PRE-09 データ取得準備処理を終了します') - - # ⑩ 次の処理へ移行する - return(crm_object_info_dict, execute_datetime, process_result_per_object) + logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました') except Exception as e: - logger.error(f'データ取得準備処理で想定外のエラーが発生しました エラー内容:[{e}]', e) - raise e + raise FileNotFoundException( + 'E-PRE-01', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルが存在しません ファイル名:[{OBJECT_INFO_FILENAME}] エラー内容:[{e}]') + + try: + # ④ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する + logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします') + + json_parser = JsonParser(object_info_file_json) + object_info_file_dict = json_parser.json_parser() + + logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました') + + except Exception as e: + raise InvalidConfigException( + 'E-PRE-02', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルのパースに失敗しました エラー内容:[{e}]') + + # ⑤ メモリ上のCRM_取得オブジェクト情報のキーobjectsの形式チェックを行う + try: + logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェックを開始します') + + fetch_target_objects = FetchTargetObjects(object_info_file_dict) + + logger.debug('D-PRE-08 CRM_取得オブジェクト情報ファイルの形式チェック 正常終了') + + except Exception as e: + raise InvalidConfigException( + 'E-PRE-03', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルの形式チェックに失敗しました ファイル名:[{OBJECT_INFO_FILENAME}] エラー内容:[{e}]') + + # ⑥ 処理結果出力用のマップを初期化 + process_result = {} + + # ⑦ データ取得準備処理の終了ログを出力する + logger.info('I-PRE-09 データ取得準備処理を終了します') + + # ⑧ 次の処理へ移行する + return(fetch_target_objects, execute_datetime, date_path, process_result) diff --git a/ecs/crm-datafetch/src/process.py b/ecs/crm-datafetch/src/process.py new file mode 100644 index 00000000..5ccbd716 --- /dev/null +++ b/ecs/crm-datafetch/src/process.py @@ -0,0 +1,150 @@ +from src.pre import prepare_get_data # データ取得準備処理 +from src.chk import check_object_info # オブジェクト情報形式チェック処理 +from src.date import set_datetime_period # データ取得期間設定処理 +from src.fetch import fetch_crm_data # CRMデータ取得処理 +from src.resbk import backup_crm_data # CRM電文データバックアップ処理 +from src.conv import convert_crm_csvdata # CSV変換処理 +from src.csvbk import backup_crm_csvdata # CSVバックアップ処理 +from src.upld import copy_crm_csvdata # CSVアップロード処理 +from src.upd import updload_last_fetch_datetime # 前回取得日時ファイル更新 +from src.end import updload_result_data # 取得処理実施結果アップロード処理 +from src.util.logger import Logger +from src.error.exceptions import( + MeDaCaCRMDataFetchException +) + +logger = Logger().get_logger() + + +def main() -> None: + try: + # ① CRMデータ取得処理開始ログを出力する + logger.info('I-CTRL-01 CRMデータ取得処理を開始します') + + fetch_target_objects = None # オブジェクト情報ファイル用オブジェクト + execute_datetime = None # 実行日次文字列 + date_path = None # 実行日次のパス文字列 + process_result = None # オブジェクトごとの実行結果JSON + + # ② データ取得準備処理を呼び出す + logger.info('I-CTRL-02 データ取得準備処理呼び出し') + + fetch_target_objects, execute_datetime, date_path, process_result = prepare_get_data() + + # ③ object_infoのobjectsキーの値の件数分ループする + logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始') + + for object_info in fetch_target_objects: + try: + # 1. オブジェクト処理結果の初期化 + target_object = None # オブジェクトごとの情報格納用オブジェクト + last_fetch_datetime = None # オブジェクトごとの取得日付格納用オブジェクト + sf_object_jsons = None # オブジェクトごとのSalesforce取得変数JSON + csv_object = None # CSVオブジェクト + process_result[object_info.get('object_name')] = 'fail' # オブジェクト処理結果 + + logger.debug(f'D-CTRL-04 対象のオブジェクト情報を出力します オブジェクト情報:{object_info}') + + # 2. オブジェクト情報形式チェック処理を呼び出す + logger.info('I-CTRL-05 オブジェクト情報形式チェック処理呼び出し') + + target_object = check_object_info(object_info, execute_datetime) + + # 3. 処理対象のオブジェクト名をログ出力する + logger.info( + f'I-CTRL-06 [{target_object.object_name}]のデータ取得を開始します') + + # 4. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する + if target_object.is_skip is True: + logger.info( + f'I-CTRL-07 [{target_object.object_name}]のデータ取得処理をスキップします') + continue + + # 5. データ取得期間設定処理を呼び出す + logger.info( + f'I-CTRL-08 [{target_object.object_name}]のデータ取得期間設定処理呼び出し') + + last_fetch_datetime = set_datetime_period(target_object, execute_datetime) + + # 6. CRMデータ取得処理を呼び出す + logger.info( + f'I-CTRL-09 [{target_object.object_name}]のデータ取得処理呼び出し') + + sf_object_jsons = fetch_crm_data(target_object, last_fetch_datetime) + + # 7. 出力ファイル名をログ出力する + logger.info( + f'I-CTRL-10 [{target_object.object_name}] の出力ファイル名は [{target_object.upload_file_name}]となります') + + # 8. CRM電文データバックアップ処理を呼び出す + logger.info( + f'I-CTRL-11 [{target_object.object_name}] CRM電文データバックアップ処理呼び出し') + backup_crm_data(target_object.object_name, sf_object_jsons, date_path) + + # 9. CSV変換処理を呼び出す + logger.info( + f'I-CTRL-12 [{target_object.object_name}] CSV変換処理呼び出し') + csv_object = convert_crm_csvdata(target_object, sf_object_jsons) + + # 10. CSVバックアップ処理を呼び出す + logger.info( + f'I-CTRL-13 [{target_object.object_name}] CSVデータバックアップ処理呼び出し') + backup_crm_csvdata(target_object, date_path, csv_object) + + # 11. CSVアップロード処理を呼び出す + logger.info( + f'I-CTRL-14 [{target_object.object_name}] CSVデータアップロード処理呼び出し') + copy_crm_csvdata(target_object, date_path) + + # 12. 前回取得日時ファイル更新処理を呼びだす + logger.info( + f'I-CTRL-15 [{target_object.object_name}] 前回取得日時ファイル更新処理呼び出し') + updload_last_fetch_datetime(target_object, last_fetch_datetime) + + # 13. オブジェクト処理結果の更新 + process_result[target_object.object_name] = 'success' + + # 14. オブジェクトのアップロードが完了した旨をログに出力する + logger.info(f'I-CTRL-16 [{target_object.object_name}] 処理正常終了') + + except MeDaCaCRMDataFetchException as e: + logger.error(f'{e.error_id} {e}') + logger.exception( + f'I-ERR-03 [{object_info.get("object_name")}] の[{e.func_name}]でエラーが発生しました 次のオブジェクトの処理に移行します') + continue + + except Exception as e: + logger.exception( + f'I-ERR-04 [{object_info.get("object_name")}] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します', e) + continue + + # ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する + logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{process_result}]') + + # ⑤ 取得処理実施結果アップロード処理を呼び出す + logger.info('I-CTRL-19 CRM_取得処理実施結果ファイルアップロード処理開始') + updload_result_data(process_result, date_path) + + # ⑥ 最終結果をチェックし、チェック結果をログに出力 + if not all([v == 'success' for v in process_result.values()]): + logger.error('E-CTRL-01 一部のデータ取得に失敗しています。') + else: + logger.info('I-CTRL-18 すべてのデータの取得に成功しました。') + + # ⑦ CRMデータ取得処理終了ログを出力する + logger.info('I-CTRL-20 CRMデータ取得処理を終了します') + + return exit(0) + + except MeDaCaCRMDataFetchException as e: + logger.error(f'{e.error_id} {e}') + logger.exception(f'I-ERR-01 [{e.func_name}]でエラーが発生したため、処理を終了します') + return exit(0) + + except Exception as e: + logger.exception('I-ERR-02 予期せぬエラーが発生したため、処理を終了します', e) + return exit(0) + + +if __name__ == '__main__': + main() diff --git a/ecs/crm-datafetch/src/resbk.py b/ecs/crm-datafetch/src/resbk.py index e69de29b..02b3c53b 100644 --- a/ecs/crm-datafetch/src/resbk.py +++ b/ecs/crm-datafetch/src/resbk.py @@ -0,0 +1,35 @@ +from src.util.logger import Logger +from src.constants import( + RESBK_JP_NAME +) +from src.error.exceptions import( + FileUploadException +) +from src.aws.s3 import BackupBucket + + +logger = Logger().get_logger() + + +def backup_crm_data(object_name, sf_object_jsons, date_path): + # ① CRM電文データバックアップ処理の開始ログを出力する + logger.info(f'I-RESBK-01 [{object_name}] のCRM電文データバックアップ処理を開始します') + + try: + # ② CRMバックアップ保管用バケットに、CRMから取得したJSONの電文データのバックアップを保管する + file_name = f'{date_path}/{object_name}.json' + + backup_bucket = BackupBucket() + backup_bucket.put_response_json(file_name, sf_object_jsons) + + logger.debug(f'D-RESBK-02 [{object_name}] のJSONデータバックアップ 正常終了') + + except Exception as e: + raise FileUploadException( + 'E-RESBK-01', RESBK_JP_NAME, f'[{object_name}] 電文データのバックアップに失敗しました ファイル名:[{object_name}.json] エラー内容:[{e}]') + + # ③ CRM電文データバックアップ処理の終了ログを出力する + logger.info(f'I-RESBK-03 [{object_name}] のCRM電文データバックアップ処理を終了します') + + # ④ 次の処理へ移行する + return diff --git a/ecs/crm-datafetch/src/salesforce/__init__.py b/ecs/crm-datafetch/src/salesforce/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/crm-datafetch/src/salesforce/salesforce_api.py b/ecs/crm-datafetch/src/salesforce/salesforce_api.py new file mode 100644 index 00000000..ad466782 --- /dev/null +++ b/ecs/crm-datafetch/src/salesforce/salesforce_api.py @@ -0,0 +1,78 @@ +from tenacity import retry, stop_after_attempt, stop_after_delay +from tenacity.wait import wait_exponential +from simple_salesforce import Salesforce +from src.util.async_retry import TimeoutManager, AsyncRetry +from src.environments import( + CRM_AUTH_DOMAIN, + CRM_USER_NAME, + CRM_USER_PASSWORD, + CRM_USER_SECURITY_TOKEN, + CRM_AUTH_TIMEOUT, + CRM_AUTH_MAX_RETRY_ATTEMPT, + CRM_AUTH_RETRY_INTERVAL, + CRM_AUTH_RETRY_MIN_INTERVAL, + CRM_AUTH_RETRY_MAX_INTERVAL, + CRM_GET_RECORD_COUNT_TIMEOUT, + CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT, + CRM_GET_RECORD_COUNT_RETRY_INTERVAL, + CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, + CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL, + CRM_FETCH_RECORD_TIMEOUT, # CRMのレコード取得処理のタイムアウト秒数 + CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT, + CRM_FETCH_RECORD_RETRY_INTERVAL, + CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, + CRM_FETCH_RECORD_RETRY_MAX_INTERVAL +) + +FETCH_SOQL = """SELECT {column_or_expression} FROM {object_name} + WHERE SystemModStamp > {last_update_datetime_from} + AND SystemModStamp <= {last_update_datetime_to} +""" + + +class SalesfoeceApi(): + def __init__(self) -> None: + self.__sf = Salesforce(username=CRM_USER_NAME, password=CRM_USER_PASSWORD, + security_token=CRM_USER_SECURITY_TOKEN, + domain=CRM_AUTH_DOMAIN + ) + + def sf_query(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300): + return self.__sf.query(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout))) + + def sf_query_all_iter(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300): + return self.__sf.query_all_iter(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout))) + + +class SalesForceCount(): + def fetch_sf_count(self, object_name, last_update_datetime_from, last_update_datetime_to): + count_soql = FETCH_SOQL.format( + column_or_expression='COUNT(Id)', + object_name=object_name, + last_update_datetime_from=last_update_datetime_from, + last_update_datetime_to=last_update_datetime_to + ) + self.__sf = SalesfoeceApi() + count_res = self.__sf.sf_query( + count_soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_GET_RECORD_COUNT_TIMEOUT) + return count_res.get('records')[0].get('expr0') + + @retry(wait=wait_exponential(multiplier=CRM_GET_RECORD_COUNT_RETRY_INTERVAL, min=CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, max=CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL), stop=stop_after_attempt(CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT) | stop_after_delay(CRM_GET_RECORD_COUNT_TIMEOUT)) + def fetch_sf_count_retry(self, object_name, last_update_datetime_from, last_update_datetime_to): + return self.fetch_sf_count(object_name, last_update_datetime_from, last_update_datetime_to) + + +class SalesForceData(): + def fetch_sf_data(self, columns, object_name, last_update_datetime_from, last_update_datetime_to): + soql = FETCH_SOQL.format( + column_or_expression=columns, + object_name=object_name, + last_update_datetime_from=last_update_datetime_from, + last_update_datetime_to=last_update_datetime_to + ) + self.__sf = SalesfoeceApi() + return self.__sf.sf_query_all_iter(soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_FETCH_RECORD_TIMEOUT) + + @retry(wait=wait_exponential(multiplier=CRM_FETCH_RECORD_RETRY_INTERVAL, min=CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, max=CRM_FETCH_RECORD_RETRY_MAX_INTERVAL), stop=stop_after_attempt(CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT) | stop_after_delay(CRM_FETCH_RECORD_TIMEOUT)) + def fetch_sf_data_retry(self, columns, object_name, last_update_datetime_from, last_update_datetime_to): + return self.fetch_sf_data(columns, object_name, last_update_datetime_from, last_update_datetime_to) diff --git a/ecs/crm-datafetch/src/upd.py b/ecs/crm-datafetch/src/upd.py index e69de29b..4e3cb90d 100644 --- a/ecs/crm-datafetch/src/upd.py +++ b/ecs/crm-datafetch/src/upd.py @@ -0,0 +1,49 @@ +import json +from src.util.logger import Logger +from src.aws.s3 import ConfigBucket +from src.constants import ( + UPD_JP_NAME +) +from src.error.exceptions import( + FileUploadException +) + + +logger = Logger().get_logger() + + +def updload_last_fetch_datetime(target_object, last_fetch_datetime): + # ① 前回取得日時ファイル更新処理の開始ログを出力する + logger.info( + f'I-UPD-01 [{target_object.object_name}] の前回取得日時ファイルの更新処理を開始します') + + try: + if target_object.is_update_last_fetch_datetime == False: + # ② オブジェクト情報.is_update_last_fetch_datetimeがfalseの場合、以降の処理をスキップする + logger.info( + f'I-UPD-02 [{target_object.object_name}] の前回取得日時ファイルの更新処理をスキップします') + else: + # ③ 前回取得日時ファイル.last_fetch_datetime_fromに取得処理開始年月日時分秒を設定する + # 前回取得日時ファイル.last_fetch_datetime_toに空文字を設定する + last_fetch_datetime_dict = { + 'last_fetch_datetime_from': last_fetch_datetime.last_fetch_datetime_to, + 'last_fetch_datetime_to': '' + } + + config_bucket = ConfigBucket() + config_bucket.put_last_fetch_datetime_file( + target_object.last_fetch_datetime_file_name, json.dumps(last_fetch_datetime_dict)) + + logger.info( + f'D-UPD-03 [{target_object.object_name}] の前回取得日時ファイル更新処理 正常終了') + + except Exception as e: + raise FileUploadException( + 'E-UPD-01', UPD_JP_NAME, f'[{target_object.object_name}] 前回処理日時ファイルのアップロードに失敗しました ファイル名:[{target_object.last_fetch_datetime_file_name}] エラー内容:[{e}]') + + # ④ 前回取得日時ファイル更新処理の終了ログを出力する + logger.info( + f'I-UPD-04 [{target_object.object_name}] の前回取得日時ファイルの更新処理を終了します') + + # ⑤ 次の処理へ移行する + return diff --git a/ecs/crm-datafetch/src/upld.py b/ecs/crm-datafetch/src/upld.py index e69de29b..57e33e25 100644 --- a/ecs/crm-datafetch/src/upld.py +++ b/ecs/crm-datafetch/src/upld.py @@ -0,0 +1,43 @@ +from src.util.logger import Logger +from src.constants import ( + UPLD_JP_NAME +) +from src.environments import( + CRM_BACKUP_BUCKET, + IMPORT_DATA_BUCKET, + CRM_IMPORT_DATA_FOLDER, + CRM_IMPORT_DATA_BACKUP_FOLDER, +) +from src.error.exceptions import( + FileUploadException +) +from src.aws.s3 import S3ResourceNonBucket + + +logger = Logger().get_logger() + + +def copy_crm_csvdata(target_object, date_path): + # ① CSVデータアップロード処理の開始ログを出力する + logger.info( + f'I-UPLD-01 [{target_object.object_name}] のCSVデータアップロード処理を開始します ファイル名:[{target_object.upload_file_name}.csv]') + + try: + # ② CRMバックアップ保管用バケットに保管した変換後のCSVデータをデータ取込バケットにコピーする + s3_resource_non_bucket = S3ResourceNonBucket() + s3_resource_non_bucket.copy(CRM_BACKUP_BUCKET, f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{date_path}/{target_object.upload_file_name}.csv', + IMPORT_DATA_BUCKET, f'{CRM_IMPORT_DATA_FOLDER}/{target_object.upload_file_name}.csv') + + logger.debug( + f'D-UPLD-02 [{target_object.object_name}] のCSVデータアップロード 正常終了') + + except Exception as e: + raise FileUploadException( + 'E-UPLD-01', UPLD_JP_NAME, f'[{target_object.object_name}] CSVデータのアップロードに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]') + + # ③ CSVデータアップロード処理の終了ログを出力する + logger.info( + f'I-UPLD-03 [{target_object.object_name}] のCSVデータのアップロード処理を終了します') + + # ④ 次の処理へ移行する + return diff --git a/ecs/crm-datafetch/src/util/__init__.py b/ecs/crm-datafetch/src/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/crm-datafetch/src/util/dict_checker.py b/ecs/crm-datafetch/src/util/dict_checker.py new file mode 100644 index 00000000..a8bfed62 --- /dev/null +++ b/ecs/crm-datafetch/src/util/dict_checker.py @@ -0,0 +1,64 @@ +import re + + +class DictCheck: + def __init__(self) -> None: + pass + + def check_key_exist(self, object_dict: dict, check_key: str) -> bool: + ''' + 辞書型キー存在チェック + ''' + return True if check_key in object_dict and object_dict[check_key] != '' else False + + def check_value_type(self, object_dict: dict, check_key: str, check_type: type) -> bool: + ''' + 辞書型バリュー型チェック + ''' + return True if isinstance(object_dict[check_key], check_type) else False + + def check_regex(self, regex_str: str, object_dict: dict, check_key: str) -> bool: + ''' + 辞書型バリュー正規表現チェック + ''' + return True if re.fullmatch(regex_str, object_dict[check_key]) else False + + def check_key_exist_and_value_type(self, object_dict: dict, check_key: str, check_type: type) -> None: + ''' + 辞書型キー存在チェック&バリュー型チェック + ''' + if not self.check_key_exist(object_dict, check_key): + raise Exception(f'「{check_key}」キーは必須です') + + elif not self.check_value_type(object_dict, check_key, check_type): + raise Exception(f'「{check_key}」キーの値は「{check_type}」でなければなりません') + + def check_key_exist_case_value_type(self, object_dict: dict, check_key: str, check_type: type): + ''' + 辞書型キー存在した場合のバリュー型チェック + ''' + if not self.check_key_exist(object_dict, check_key): + pass + + elif not self.check_value_type(object_dict, check_key, check_type): + raise Exception(f'「{check_key}」キーの値は「{check_type}」でなければなりません') + + def check_key_exsit_and_regex(self, object_dict: dict, check_key: str, regex_str: str): + ''' + 辞書型キー存在チェック&バリュー正規表現チェック + ''' + if not self.check_key_exist(object_dict, check_key): + raise Exception(f'「{check_key}」キーは必須です') + + elif not self.check_regex(regex_str, object_dict, check_key): + raise Exception(f'「{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました') + + def check_key_exsit_case_regex(self, object_dict: dict, check_key: str, regex_str: str): + ''' + 辞書型キー存在した場合のバリュー正規表現チェック + ''' + if not self.check_key_exist(object_dict, check_key): + pass + + elif not self.check_regex(regex_str, object_dict, check_key): + raise Exception(f'「{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました') diff --git a/ecs/crm-datafetch/src/util/logger.py b/ecs/crm-datafetch/src/util/logger.py new file mode 100644 index 00000000..d9892da8 --- /dev/null +++ b/ecs/crm-datafetch/src/util/logger.py @@ -0,0 +1,29 @@ +import logging +from src.environments import LOG_LEVEL + + +class Logger(): + __logger: logging.Logger + + def __init__(self): + self.__logger = logging.getLogger() + + level = logging.getLevelName(LOG_LEVEL) + if not isinstance(level, int): + level = logging.INFO + self.__logger.setLevel(level) + + if not self.__logger.hasHandlers(): + handler = logging.StreamHandler() + self.__logger.addHandler(handler) + + formatter = logging.Formatter( + '[%(levelname)s]\t%(asctime)s\t%(message)s\n', + '%Y-%m-%d %H:%M:%S' + ) + + for handler in self.__logger.handlers: + handler.setFormatter(formatter) + + def get_logger(self) -> logging.Logger: + return self.__logger diff --git a/ecs/crm-datafetch/src/utils/data_retention.py b/ecs/crm-datafetch/src/utils/data_retention.py deleted file mode 100644 index 89d7d4c3..00000000 --- a/ecs/crm-datafetch/src/utils/data_retention.py +++ /dev/null @@ -1,169 +0,0 @@ -# モジュールロード -import dataclasses -#from fileinput import filename -import os -#from tracemalloc import DomainFilter - -# 変数 -# ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する -LOG_LEVEL = os.environ["LOG_LEVEL"] - -CRM_AUTH_TIMEOUT = os.environ["CRM_AUTH_TIMEOUT"] # CRMへの認証処理のタイムアウト秒数 -# CRMへの認証処理の最大リトライ試行回数 -CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ["CRM_AUTH_MAX_RETRY_ATTEMPT"] -# CRMへの認証処理のリトライ時の初回待ち秒数 -CRM_AUTH_RETRY_INTERVAL = os.environ["CRM_AUTH_RETRY_INTERVAL"] -# CRMへの認証処理のリトライ時の最小待ち秒数 -CRM_AUTH_RETRY_MIN_INTERVAL = os.environ["CRM_AUTH_RETRY_MIN_INTERVAL"] -# CRMへの認証処理のリトライ時の最大待ち秒数 -CRM_AUTH_RETRY_MAX_INTERVAL = os.environ["CRM_AUTH_RETRY_MAX_INTERVAL"] - -# CRMのレコード件数取得処理のタイムアウト秒数 -CRM_GET_RECORD_COUNT_TIMEOUT = os.environ["CRM_GET_RECORD_COUNT_TIMEOUT"] -# CRMのレコード件数取得処理の最大リトライ試行回数 -CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ["CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT"] -# CRMのレコード件数取得処理のリトライ時の初回待ち秒数 -CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_INTERVAL"] -# CRMのレコード件数取得処理のリトライ時の最小待ち秒数 -CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL"] -# CRMのレコード件数取得処理のリトライ時の最大待ち秒数 -CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL"] - -# CRMのレコード取得処理のタイムアウト秒数 -CRM_FETCH_RECORD_TIMEOUT = os.environ["CRM_FETCH_RECORD_TIMEOUT"] -# CRMのレコード取得処理の最大リトライ試行回数 -CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ["CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT"] -# CRMのレコード取得処理のリトライ時の初回待ち秒数 -CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_INTERVAL"] -# CRMのレコード取得処理のリトライ時の最小待ち秒数 -CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MIN_INTERVAL"] -# CRMのレコード取得処理のリトライ時の最大待ち秒数 -CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MAX_INTERVAL"] - - -CRM_AUTH_DOMAIN = os.environ["CRM_AUTH_DOMAIN"] # CRMのAPI実行のための認証エンドポイントのドメイン -CRM_USER_NAME = os.environ["CRM_USER_NAME"] # CRMのAPI実行用ユーザ名 -CRM_USER_PASSWORD = os.environ["CRM_USER_PASSWORD"] # CRMのAPI実行用ユーザパスワード -# CRMのAPI実行用ユーザのセキュリティトークン -CRM_USER_SECURITY_TOKEN = os.environ["CRM_USER_SECURITY_TOKEN"] - -# CRMデータ取得用の設定ファイルを格納するバケット名 -CRM_CONFIG_BUCKET = os.environ["CRM_CONFIG_BUCKET"] -CRM_BACKUP_BUCKET = os.environ["CRM_BACKUP_BUCKET"] # CRMのバックアップデータを格納するバケット名 -IMPORT_DATA_BUCKET = os.environ["IMPORT_DATA_BUCKET"] # CRMの取込データを格納するバケット名 - -# TASK_SETTING_FOLDER = os.environ["TASK_SETTING_FOLDER"] # CRM取得処理タスクの設定ファイルを格納するフォルダパス -# TASK_SETTING_FILENAME = os.environ["TASK_SETTING_FILENAME"] # CRM取得処理タスクの設定ファイルのファイル名 - -# CRM取得対象オブジェクトの情報を格納するフォルダパス -OBJECT_INFO_FOLDER = os.environ["OBJECT_INFO_FOLDER"] -# CRM取得対象オブジェクトの情報のファイル名 -OBJECT_INFO_FILENAME = os.environ["OBJECT_INFO_FILENAME"] - -# CRMデータ取得結果を格納するフォルダパス -FETCH_RESULT_FOLDER = os.environ["FETCH_RESULT_FOLDER"] -# CRMデータ取得結果を格納するファイル名 -FETCH_RESULT_FILENAME = os.environ["FETCH_RESULT_FILENAME"] - -# CRMからの最終取得日時ファイルを格納するフォルダパス -LAST_FETCH_DATE_FOLDER = os.environ["LAST_FETCH_DATE_FOLDER"] -# CRMから取得し、取込用に変換したデータを格納するフォルダ -CRM_IMPORT_DATA_FOLDER = os.environ["CRM_IMPORT_DATA_FOLDER"] - -# CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス -LAST_FETCH_DATE_BACKUP_FOLDER = os.environ["LAST_FETCH_DATE_BACKUP_FOLDER"] -# CRMから取得した生データのバックアップを格納するフォルダパス -FETCH_DATA_BACKUP_FOLDER = os.environ["FETCH_DATA_BACKUP_FOLDER"] -# CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ -CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ["CRM_IMPORT_DATA_BACKUP_FOLDER"] - - -# 処理 -@dataclasses.dataclass -class LogLevel: - loglevel: str = LOG_LEVEL - - -@dataclasses.dataclass -class CrmAuthVar: - auth_domain: str = CRM_AUTH_DOMAIN - user_name: str = CRM_USER_NAME - user_password: str = CRM_USER_PASSWORD - security_token: str = CRM_USER_SECURITY_TOKEN - timeout: int = CRM_AUTH_TIMEOUT - max_retry_attempt: int = CRM_AUTH_MAX_RETRY_ATTEMPT - retry_interval: int = CRM_AUTH_RETRY_INTERVAL - retry_min_interval: int = CRM_AUTH_RETRY_MIN_INTERVAL - retry_max_interval: int = CRM_AUTH_RETRY_MAX_INTERVAL - - -@dataclasses.dataclass -class CrmGetRecordCountVar: - timeout: int = CRM_GET_RECORD_COUNT_TIMEOUT - max_retry_attempt: int = CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT - retry_interval: int = CRM_GET_RECORD_COUNT_RETRY_INTERVAL - retry_min_interval: int = CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL - retry_max_interval: int = CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL - - -@dataclasses.dataclass -class CrmFetchRecordVar: - timeout: int = CRM_FETCH_RECORD_TIMEOUT - max_retry_attempt: int = CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT - retry_interval: int = CRM_FETCH_RECORD_RETRY_INTERVAL - retry_min_interval: int = CRM_FETCH_RECORD_RETRY_MIN_INTERVAL - retry_max_interval: int = CRM_FETCH_RECORD_RETRY_MAX_INTERVAL - - -@dataclasses.dataclass -class CrmBucketVar: - config_bucket: str = CRM_CONFIG_BUCKET - backup_bucket: str = CRM_BACKUP_BUCKET - data_bucket: str = IMPORT_DATA_BUCKET - - -@dataclasses.dataclass -class CrmObjectInfoVar: - bucket: str = CRM_CONFIG_BUCKET - folder: str = OBJECT_INFO_FOLDER - file_name: str = OBJECT_INFO_FILENAME - file_name_key: str = f'{folder}/{file_name}' - - -@dataclasses.dataclass -class FetchResultVar: - folder: str = FETCH_RESULT_FOLDER - file_name: str = FETCH_RESULT_FILENAME - file_name_key: str = f'{folder}/{file_name}' - - -@dataclasses.dataclass -class FetchResultVar: - folder: str = FETCH_RESULT_FOLDER - file_name: str = FETCH_RESULT_FILENAME - file_name_key: str = f'{folder}/{file_name}' - - -@dataclasses.dataclass -class LastFetchDateVar: - folder: str = LAST_FETCH_DATE_FOLDER - - -@dataclasses.dataclass -class CrmImportDataVar: - folder: str = CRM_IMPORT_DATA_FOLDER - - -@dataclasses.dataclass -class LastFetchDateBackupVar: - folder: str = LAST_FETCH_DATE_BACKUP_FOLDER - - -@dataclasses.dataclass -class FetchDataBackupVar: - folder: str = FETCH_DATA_BACKUP_FOLDER - - -@dataclasses.dataclass -class CrmImportDataBackupVar: - folder: str = CRM_IMPORT_DATA_BACKUP_FOLDER diff --git a/ecs/crm-datafetch/src/utils/json_parser.py b/ecs/crm-datafetch/src/utils/json_parser.py deleted file mode 100644 index b6d288a9..00000000 --- a/ecs/crm-datafetch/src/utils/json_parser.py +++ /dev/null @@ -1,28 +0,0 @@ -# モジュールロード -import re -import json - - -# 変数 -SYMBOL = ['#', '/'] - - -# 処理 -class JsonExculudeComment: - def __init__(self) -> None: - pass - - def json_exclude_comment(self, json_str) -> dict: - for symbol in SYMBOL: - # コメントアウトシンボルを含む部分を置き換える正規表現 - replace_comment_regex = rf'\s(?!\"){symbol}[\s\S]*?.*' - - json_str = self.json_regex_parse(replace_comment_regex, json_str) - - result = json.loads(json_str) - - return result - - def json_regex_parse(replace_comment_regex, json_str) -> str: - json_without_comment = re.sub(replace_comment_regex, '', json_str) - return json_without_comment diff --git a/ecs/crm-datafetch/src/utils/logger.py b/ecs/crm-datafetch/src/utils/logger.py deleted file mode 100644 index 5ec076fd..00000000 --- a/ecs/crm-datafetch/src/utils/logger.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging -import os - -LOG_LEVEL = { - 'critical': logging.CRITICAL, - 'error': logging.ERROR, - 'warn': logging.WARNING, - 'info': logging.INFO, - 'debug': logging.DEBUG -} - - -class Logger(): - __logger: logging.Logger - - def __init__(self): - # logger設定 - self.__logger = logging.getLogger() - formatter = logging.Formatter( - '[%(levelname)s]\t%(asctime)s\t%(message)s\n', - '%Y-%m-%d %H:%M:%S' - ) - handler = logging.StreamHandler() - self.__logger.addHandler(handler) - for handler in self.__logger.handlers: - handler.setFormatter(formatter) - level = logging.getLevelName(os.environ.get('LOG_LEVEL', 'WARN').upper()) - if not isinstance(level, int): - level = logging.INFO - self.__logger.setLevel(level) - - def get_logger(self) -> logging.Logger: - return self.__logger diff --git a/ecs/crm-datafetch/src/utils/s3_access.py b/ecs/crm-datafetch/src/utils/s3_access.py deleted file mode 100644 index 7ffb697d..00000000 --- a/ecs/crm-datafetch/src/utils/s3_access.py +++ /dev/null @@ -1,48 +0,0 @@ -# モジュールロード -import boto3 -from tenacity import retry, stop_after_attempt -from tenacity.wait import wait_exponential - -# 変数 -s3_client = boto3.client('s3') -s3_resource = boto3.resource('s3') - -# 処理 - - -class UploadFileS3: - def __init__(self, bucket_name, local_file_path, file_name_key) -> None: - self.bucket_name: str = bucket_name - self.local_file_path: str = local_file_path - self.file_name_key: str = file_name_key - - def upload_file_to_s3(self) -> None: - bucket = s3_resource.Bucket(self.bucket_name) - bucket.upload_file(self.local_file_path, self.file_name_key) - - -class GetFileS3: - def __init__(self, bucket_name, file_name_key, multiplier=5, min=5, max=50, count=3) -> None: - self.bucket_name: str = bucket_name - self.file_name_key: str = file_name_key - self.multiplier: int = multiplier - self.min: int = min - self.max: int = max - self.count: int = count - - def get_file_from_s3(self) -> str: - bucket = s3_resource.Bucket(self.bucket_name) - response = bucket.Object(self.file_name_key).get() - body = response['Body'].read() - - return body.decode('utf-8') - - def get_file_from_s3_with_retry(self) -> str: - - @retry(wait=wait_exponential(multiplier=self.multiplier, min=self.min, max=self.max), stop=stop_after_attempt(self.count)) - def get_file_from_s3_with_retry_deco(*args, **kwargs) -> str: - return self.get_file_from_s3(self.bucket_name, self.file_name_key) - - response = get_file_from_s3_with_retry_deco(self) - - return response