feat:機能全般の追加

This commit is contained in:
Y_SAKAI 2022-07-19 20:16:40 +09:00
parent f2650c1ac8
commit a7426900b7
32 changed files with 1443 additions and 540 deletions

View File

@ -6,6 +6,13 @@ WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
RUN pip install --no-cache-dir -r requirements.txt && \
cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
RUN \
apt update -y && \
# パッケージのセキュリティアップデートのみを適用するコマンド
apt install -y unattended-upgrades && \
unattended-upgrades
COPY main.py ./
COPY crm-datafetch /usr/src

View File

@ -1,181 +1,4 @@
# モジュールロード
import os
import sys
from datetime import datetime
import src.pre as pre # データ取得準備処理
import src.chk as chk # オブジェクト情報形式チェック処理
import src.date as date # データ取得期間設定処理
import src.fetch as fetch # CRMデータ取得処理
import src.resbk as resbk # CRM電文データバックアップ処理
import src.conv as conv # CSV変換処理
import src.csvbk as csvbk # CSVバックアップ処理
import src.upld as upld # CSVアップロード処理
import src.upd as upd # 前回取得日時ファイル更新
import src.end as end # 取得処理実施結果アップロード処理
from src.utils.logger import Logger
import src.utils.data_retention as data_retention
# 変数
# logger設定
logger = Logger().get_logger()
# 処理
def main() -> None:
"""コントロール処理
各処理を呼び出すコントローラー
"""
try:
# ① CRMデータ取得処理開始ログを出力する
logger.info('I-CTRL-01 CRMデータ取得処理を開始します')
# ② データ取得準備処理を呼び出す
try:
logger.info('I-CTRL-02 データ取得準備処理呼び出し')
crm_object_info_var = data_retention.CrmObjectInfoVar()
crm_object_info_dict, execute_datetime, process_result_per_object = pre(
crm_object_info_var)
except Exception as e:
logger.error('')
raise CustomException(e)
# ③ object_infoのobjectsキーの値の件数分ループする
logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始')
for object_info in crm_object_info_dict.get('objects'):
try:
# 1. オブジェクト情報形式チェック処理を呼び出す
logger.info('I-CTRL-04 オブジェクト情報形式チェック処理呼び出し')
try:
chk()
except Exception as e:
logger.error('')
# 2. 処理対象のオブジェクト名をログ出力する
logger.info('I-CTRL-05 ')
# 3. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する
logger.info('I-CTRL-06 ')
# 4. データ取得期間設定処理を呼び出す
logger.info('I-CTRL-07 ')
try:
except Exception as e:
logger.error('')
# 5. CRMデータ取得処理を呼び出す
logger.info('I-CTRL-08 ')
try:
except Exception as e:
logger.error('')
# 6. CRMデータのジェネレータをループする
# 6-1. 出力ファイル名を生成する
logger.info('I-CTRL-10 ')
try:
except Exception as e:
logger.error('')
# 6-2. CRM電文データバックアップ処理を呼び出す
logger.info('I-CTRL-11 ')
try:
except Exception as e:
logger.error('')
# 6-3. CSV変換処理を呼び出す
logger.info('I-CTRL-12 ')
try:
except Exception as e:
logger.error('')
# 6-4. CSVバックアップ処理を呼び出す
logger.info('I-CTRL-13 ')
try:
except Exception as e:
logger.error('')
# 6-5. CSVアップロード処理を呼び出す
logger.info('I-CTRL-14 ')
try:
except Exception as e:
logger.error('')
# 7. 前回取得日時ファイル更新処理を呼びだす
logger.info('I-CTRL-15 ')
try:
except Exception as e:
logger.error('')
# 8. 1オブジェクトのアップロードが完了した旨をログに出力する
logger.info('I-CTRL-16 ')
except GetObjectException as e:
logger.error(
'[取得対象のオブジェクト名] の[処理名]でエラーが発生しました 次のオブジェクトの処理に移行します')
except Exception as e:
logger.error(
'[取得対象のオブジェクト名] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します')
# ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する
try:
logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{実行結果JSON}]')
except Exception as e:
logger.error('')
raise CustomException()
# ⑤ 取得処理実施結果アップロード処理を呼び出す
try:
logger.info('I-CTRL-18 CRM_取得処理実施結果ファイルアップロード処理開始')
except Exception as e:
logger.error('')
raise CustomException()
# ⑥ CRMデータ取得処理終了ログを出力する
try:
logger.info('I-CTRL-19 CRMデータ取得処理を終了します')
except Exception as e:
logger.error('')
raise
except CustomException as e:
logger.error('I-ERR-01 [処理名]でエラーが発生したため、処理を終了します')
except Exception as e:
logger.error('I-ERR-02 予期せぬエラーが発生したため、処理を終了します')
# カスタムExceptionクラス
class CustomException(Exception, metaclass=ABCMeta):
def __init__(self, id, arg):
self.arg = arg
self.id = id
class GetObjectException(CustomException):
pass
import src.process as process
if __name__ == '__main__':
main()
process.main()

View File

@ -0,0 +1,107 @@
import boto3
import json
from src.environments import (
CRM_CONFIG_BUCKET,
CRM_BACKUP_BUCKET,
IMPORT_DATA_BUCKET,
OBJECT_INFO_FOLDER,
OBJECT_INFO_FILENAME,
CRM_IMPORT_DATA_FOLDER,
CRM_IMPORT_DATA_BACKUP_FOLDER,
LAST_FETCH_DATE_FOLDER,
RESPONSE_JSON_BACKUP_FOLDER
)
from src.constants import (
AWS_RESOURCE_S3,
AWS_CLINET_S3,
S3_RESPONSE_BODY
)
class S3Clinet:
def __init__(self) -> None:
self.__s3_client = boto3.client(AWS_CLINET_S3)
def copy_object(self, src_bucket, src_key, dest_bucket, dest_key):
self.__s3_client.copy(
{"Bucket": src_bucket, "key": src_key}, dest_bucket, dest_key)
return
class S3Resource:
def __init__(self, bucket_name: str) -> None:
self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
self.__s3_bucket = self.__s3_resource.Bucket(bucket_name)
def get_object(self, object_key: str) -> str:
response = self.__s3_bucket.Object(object_key).get()
body = response[S3_RESPONSE_BODY].read()
return body.decode('utf-8')
def put_object(self, object_key: str, data) -> str:
s3_object = self.__s3_bucket.Object(object_key)
s3_object.put(Body=data.encode('utf-8'), ContentEncoding='utf-8')
return
class S3ResourceNonBucket:
def __init__(self) -> None:
self.__s3_resource = boto3.resource(AWS_RESOURCE_S3)
def copy(self, src_bucket, src_key, dest_bucket, dest_key):
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_resource.meta.client.copy(copy_source, dest_bucket, dest_key)
return
class ConfigBucket:
__s3_resource: S3Resource = None
def __init__(self) -> None:
self.__s3_resource = S3Resource(CRM_CONFIG_BUCKET)
def get_object_info_file(self):
return self.__s3_resource.get_object(f'{OBJECT_INFO_FOLDER}/{OBJECT_INFO_FILENAME}')
def get_last_fetch_datetime_file(self, file_name):
return self.__s3_resource.get_object(f'{LAST_FETCH_DATE_FOLDER}/{file_name}')
def put_last_fetch_datetime_file(self, file_name, data):
self.__s3_resource.put_object(
f'{LAST_FETCH_DATE_FOLDER}/{file_name}', data)
return
class DataBucket:
__s3_resource: S3Resource = None
def __init__(self) -> None:
self.__s3_resource = S3Resource(IMPORT_DATA_BUCKET)
def put_csv(self, file_name, data):
object_key = f'{CRM_IMPORT_DATA_FOLDER}/{file_name}'
self.__s3_resource.put_object(object_key, data)
return
class BackupBucket:
__s3_resource: S3Resource = None
def __init__(self) -> None:
self.__s3_resource = S3Resource(CRM_BACKUP_BUCKET)
def put_response_json(self, file_name, data):
object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_name}'
self.__s3_resource.put_object(object_key, json.dumps(data))
return
def put_csv_bk(self, file_name, data):
object_key = f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{file_name}'
self.__s3_resource.put_object(object_key, data)
return
def put_result_json(self, file_name, data):
object_key = f'{RESPONSE_JSON_BACKUP_FOLDER}/{file_name}'
self.__s3_resource.put_object(object_key, json.dumps(data))

View File

@ -0,0 +1,30 @@
from src.util.logger import Logger
from src.constants import(
CHK_JP_NAME
)
from src.error.exceptions import(
InvalidConfigException
)
from src.config.objects import TargetObject
logger = Logger().get_logger()
def check_object_info(object_info, execute_datetime):
# ① オブジェクト情報形式チェック処理開始ログを出力する
logger.info('I-CHK-01 オブジェクト情報形式チェック処理を開始します')
try:
# ② オブジェクト情報形式チェック
target_object = TargetObject(object_info, execute_datetime)
except Exception as e:
raise InvalidConfigException(
'E-CHK-01', CHK_JP_NAME, f'オブジェクト情報形式チェック処理が失敗しました エラー内容:[{e}]')
# ③ チェック処理終了ログを出力する
logger.info('I-CHK-02 オブジェクト情報形式チェック処理を終了します')
# ④ 次の処理へ移行する
return target_object

View File

@ -0,0 +1,195 @@
from src.constants import (
DATE_PATTERN_YYYYMMDDTHHMMSSTZ
)
from src.util.dict_checker import DictCheck
class FetchTargetObjects():
def __init__(self, object_info_file_dict) -> None:
self.__dict_check = DictCheck()
self.__objects = object_info_file_dict
self.__key = 'objects'
self.check_key_objects()
self.__i = 0
def __iter__(self):
return self
def __next__(self):
if self.__i == len(self.__objects[self.__key]):
raise StopIteration()
value = self.__objects['objects'][self.__i]
self.__i += 1
return value
def check_key_objects(self) -> None:
__check_key = self.__key
__check_type = list
self.__dict_check.check_key_exist_and_value_type(
self.__objects, __check_key, __check_type)
class TargetObject():
def __init__(self, object_info, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.__object_info = object_info
self.execute_datetime = execute_datetime
self.check_key_object_name()
self.check_key_columns()
self.check_key_is_skip()
self.check_key_is_update_last_fetch_datetime()
self.check_key_last_fetch_datetime_file_name()
self.check_key_upload_file_name()
self.object_name = self.__object_info['object_name']
self.columns = self.__object_info['columns']
self.is_skip = self.set_is_skip()
self.is_update_last_fetch_datetime = self.set_is_update_last_fetch_datetime()
self.last_fetch_datetime_file_name = self.set_fetch_datetime_file_name()
self.upload_file_name = self.set_upload_file_name()
def check_key_object_name(self) -> None:
'''
オブジェクト名チェック
'''
__check_key = 'object_name'
__check_type = str
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_columns(self) -> None:
'''
カラム情報チェック
'''
__key = 'columns'
__type = list
self.__dict_check.check_key_exist_and_value_type(
self.__object_info, __key, __type)
return
def check_key_is_skip(self,) -> None:
'''
スキップフラグ型チェック
'''
__check_key = 'is_skip'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_is_update_last_fetch_datetime(self) -> None:
'''
前回取得日時ファイル更新フラグチェック
'''
__check_key = 'is_update_last_fetch_datetime'
__check_type = bool
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_last_fetch_datetime_file_name(self) -> None:
'''
前回取得日時ファイル名型チェック
'''
__check_key = 'last_fetch_datetime_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def check_key_upload_file_name(self) -> None:
'''
アップロードファイル名称型チェック
'''
__check_key = 'upload_file_name'
__check_type = str
self.__dict_check.check_key_exist_case_value_type(
self.__object_info, __check_key, __check_type)
return
def set_is_skip(self) -> bool:
'''
スキップフラグ設定
'''
__check_key = 'is_skip'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
def set_is_update_last_fetch_datetime(self) -> bool:
'''
前回取得日時ファイル更新フラグ設定
'''
__check_key = 'is_update_last_fetch_datetime'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return False
def set_fetch_datetime_file_name(self) -> str:
'''
前回取得日時ファイル名設定
'''
__check_key = 'last_fetch_datetime_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key]
else:
return self.__object_info['object_name'] + '.json'
def set_upload_file_name(self) -> str:
'''
アップロードファイル名称設定
'''
__check_key = 'upload_file_name'
if self.__dict_check.check_key_exist(self.__object_info, __check_key):
return self.__object_info[__check_key].format(execute_datetime=self.execute_datetime)
else:
return 'CRM_' + self.__object_info['object_name'] + '_' + self.execute_datetime
class LastFetchDatetime():
def __init__(self, last_fetch_datetime_file_name, last_fetch_datetime_file_dict, execute_datetime) -> None:
self.__dict_check = DictCheck()
self.execute_datetime = execute_datetime
self.__last_fetch_datetime_file_dict = last_fetch_datetime_file_dict
self.last_fetch_datetime_file_name = last_fetch_datetime_file_name
self.check_key_last_fetch_datetime_from
self.check_key_last_fetch_datetime_to
self.last_fetch_datetime_from = self.__last_fetch_datetime_file_dict[
'last_fetch_datetime_from']
self.last_fetch_datetime_to = self.set_last_fetch_datetime_to()
def check_key_last_fetch_datetime_from(self) -> None:
'''
データ取得開始日時チェック
'''
__check_key = 'last_fetch_datetime_from'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_and_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
def check_key_last_fetch_datetime_to(self) -> None:
'''
データ取得終了日時チェック
'''
__check_key = 'last_fetch_datetime_to'
__regex_str = DATE_PATTERN_YYYYMMDDTHHMMSSTZ
self.__dict_check.check_key_exsit_case_regex(
self.__last_fetch_datetime_file_dict, __check_key, __regex_str)
def set_last_fetch_datetime_to(self) -> str:
'''
データ取得終了日時設定
'''
__check_key = 'last_fetch_datetime_to'
if self.__dict_check.check_key_exist(self.__last_fetch_datetime_file_dict, __check_key):
return self.__last_fetch_datetime_file_dict[__check_key].format(execute_datetime=self.execute_datetime)
else:
return self.execute_datetime

View File

@ -0,0 +1,73 @@
# environments(task settings file)
LOG_LEVEL = "LOG_LEVEL" # ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
CRM_AUTH_TIMEOUT = 'CRM_AUTH_TIMEOUT' # CRMへの認証処理のタイムアウト秒数
CRM_AUTH_MAX_RETRY_ATTEMPT = 'CRM_AUTH_MAX_RETRY_ATTEMPT' # CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_RETRY_INTERVAL = 'CRM_AUTH_RETRY_INTERVAL' # CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = 'CRM_AUTH_RETRY_MIN_INTERVAL' # CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = 'CRM_AUTH_RETRY_MAX_INTERVAL' # CRMへの認証処理のリトライ時の最大待ち秒数
CRM_GET_RECORD_COUNT_TIMEOUT = 'CRM_GET_RECORD_COUNT_TIMEOUT' # CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = 'CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT' # CRMのレコード件数取得処理の最大リトライ試行回数
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_INTERVAL' # CRMのレコード件数取得処理のリトライ時の初回待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL' # CRMのレコード件数取得処理のリトライ時の最小待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = 'CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL' # CRMのレコード件数取得処理のリトライ時の最大待ち秒数
CRM_FETCH_RECORD_TIMEOUT = 'CRM_FETCH_RECORD_TIMEOUT' # CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = 'CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT' # CRMのレコード取得処理の最大リトライ試行回数
CRM_FETCH_RECORD_RETRY_INTERVAL = 'CRM_FETCH_RECORD_RETRY_INTERVAL' # CRMのレコード取得処理のリトライ時の初回待ち秒数
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MIN_INTERVAL' # CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = 'CRM_FETCH_RECORD_RETRY_MAX_INTERVAL' # CRMのレコード取得処理のリトライ時の最大待ち秒数
# environments(ECS Task Enviroment)
CRM_AUTH_DOMAIN = 'CRM_AUTH_DOMAIN' # CRMのAPI実行のための認証エンドポイントのドメイン
CRM_USER_NAME = 'CRM_USER_NAME' # CRMのAPI実行用ユーザ名
CRM_USER_PASSWORD = 'CRM_USER_PASSWORD' # CRMのAPI実行用ユーザパスワード
CRM_USER_SECURITY_TOKEN = 'CRM_USER_SECURITY_TOKEN' # CRMのAPI実行用ユーザのセキュリティトークン
CRM_CONFIG_BUCKET = 'CRM_CONFIG_BUCKET' # CRMデータ取得用の設定ファイルを格納するバケット名
CRM_BACKUP_BUCKET = 'CRM_BACKUP_BUCKET' # CRMのバックアップデータを格納するバケット名
IMPORT_DATA_BUCKET = 'IMPORT_DATA_BUCKET' # CRMの取込データを格納するバケット名
OBJECT_INFO_FOLDER = 'OBJECT_INFO_FOLDER' # CRM取得対象オブジェクトの情報を格納するフォルダパス
OBJECT_INFO_FILENAME = 'OBJECT_INFO_FILENAME' # CRM取得対象オブジェクトの情報のファイル名
PROCESS_RESULT_FOLDER = 'PROCESS_RESULT_FOLDER' # CRMデータ取得結果を格納するフォルダパス
PROCESS_RESULT_FILENAME = 'PROCESS_RESULT_FILENAME' # CRMデータ取得結果を格納するファイル名
LAST_FETCH_DATE_FOLDER = 'LAST_FETCH_DATE_FOLDER' # CRMからの最終取得日時ファイルを格納するフォルダパス
CRM_IMPORT_DATA_FOLDER ='CRM_IMPORT_DATA_FOLDER' # CRMから取得し、取込用に変換したデータを格納するフォルダ
LAST_FETCH_DATE_BACKUP_FOLDER = 'LAST_FETCH_DATE_BACKUP_FOLDER' # CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
RESPONSE_JSON_BACKUP_FOLDER = 'RESPONSE_JSON_BACKUP_FOLDER' # CRMから取得した生データのバックアップを格納するフォルダパス
CRM_IMPORT_DATA_BACKUP_FOLDER = 'CRM_IMPORT_DATA_BACKUP_FOLDER' # CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ
# 時刻フォーマット
# .000ZはUTCを表す。ミリ秒までの考慮は不要なので固定で指定
YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z'
CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z'
YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S'
# aws
AWS_RESOURCE_S3 = 's3'
AWS_CLINET_S3 = 's3'
S3_RESPONSE_BODY = 'Body'
# 正規表現チェック
EXCLUDE_SYMBOL = ['#', '/']
DATE_PATTERN_YYYYMMDDTHHMMSSTZ = r'[12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])T([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9].000Z'
# logger
LOG_FORMAT = '[%(levelname)s]\t%(asctime)s\t%(message)s\n'
LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
LOG_LEVEL_INFO = 'INFO'
# 処理名
PROCESS_JP_NAME = 'コントロール処理'
PRE_JP_NAME = 'データ取得準備処理'
CHK_JP_NAME = 'オブジェクト情報形式チェック処理'
DATE_JP_NAME = 'データ取得期間設定処理'
FETCH_JP_NAME = 'CRMデータ取得処理'
RESBK_JP_NAME = 'CRM電文データバックアップ処理'
CONV_JP_NAME = 'CSV変換処理'
CSVBK_JP_NAME = 'CSVバックアップ処理'
UPLD_JP_NAME = 'CSVアップロード処理'
UPD_JP_NAME = '前回取得日時ファイル更新'
END_JP_NAME = '取得処理実施結果アップロード処理'
# CSVチェック
CSV_TRUE_VALUE = '1'
CSV_FALSE_VALUE = '0'

View File

@ -0,0 +1,33 @@
from datetime import datetime
from src.util.logger import Logger
from src.constants import (
CONV_JP_NAME,
)
from src.error.exceptions import(
DataConvertException
)
from src.converter.converter import CSVStringConverter
logger = Logger().get_logger()
def convert_crm_csvdata(target_object, sf_object_jsons):
# ① CSV変換処理の開始ログを出力する
logger.info(f'I-CONV-01 [{target_object.object_name}] のCSV変換処理を開始します')
try:
# ② CSV変換
csv_object = CSVStringConverter(target_object, sf_object_jsons)
logger.debug(f'D-CONV-02 [{target_object.object_name}] のCSV変換処理 正常終了')
except Exception as e:
raise DataConvertException(
'E-CONV-01', CONV_JP_NAME, f'[{target_object.object_name}] CSV変換に失敗しました エラー内容:[{e}]')
# ③ CSV変換処理の終了ログを出力する
logger.info(f'I-CONV-03 [{target_object.object_name}] のCSV変換処理を終了します')
# ④ 次の処理へ移行する
return csv_object

View File

@ -0,0 +1,138 @@
import re
import io
import csv
from datetime import datetime
from src.constants import(
CSV_TRUE_VALUE,
CSV_FALSE_VALUE,
CRM_DATETIME_FORMAT,
YYYYMMDDHHMMSS
)
class CSVStringConverter:
def __init__(self, target_object, sf_object_jsons) -> None:
self.__target_object = target_object
self.__sf_object_jsons = sf_object_jsons
self.__extracted_sf_object_jsons = self.extract_sf_object_jsons()
self.csv_data = self.convert_to_csv()
self.csv_buffer = self.write_csv()
def extract_sf_object_jsons(self) -> list:
try:
extracted_sf_object_jsons = []
for sf_object_json in self.__sf_object_jsons:
extracted_sf_object_jsons.append(
self.extract_necessary_props_from(sf_object_json))
return extracted_sf_object_jsons
except Exception as e:
raise Exception('必要なjsonのデータ抽出に失敗しました')
def extract_necessary_props_from(self, sf_object_json) -> dict:
try:
clone_sf_object = {**sf_object_json}
del clone_sf_object['attributes']
uppercase_key_sf_object = {
k.upper(): v for k, v in clone_sf_object.items()}
return uppercase_key_sf_object
except Exception as e:
raise Exception('必要なjsonのデータ成形に失敗しました')
def convert_to_csv(self) -> list:
try:
columns = self.__target_object.columns
csv_data = []
for i, json_object in enumerate(self.__extracted_sf_object_jsons, 1):
csv_row = []
for column in columns:
v = json_object[column.upper()]
converted_value = CSVStringConverterFactory(
v).value_convert()
csv_row.append(converted_value)
csv_data.append(csv_row)
return csv_data
except Exception as e:
raise Exception(
f'CSV変換に失敗しました カラム名:[{column}] 行番号: [{i}] エラー内容:[{e}]')
def write_csv(self) -> str:
try:
with io.StringIO(newline='') as string_stream:
writer = csv.writer(string_stream, delimiter=',', lineterminator='\r\n',
doublequote=True, quotechar='"', quoting=csv.QUOTE_ALL, strict=True)
writer.writerow(self.__target_object.columns)
writer.writerows(self.csv_data)
csv_value = string_stream.getvalue()
return csv_value
except Exception as e:
raise Exception('csvデータの取得に失敗しました')
class NoneValueConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> str:
return ''
class BooleanConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> bool:
return CSV_TRUE_VALUE if self.__convert_value is True else CSV_FALSE_VALUE
class DatatimeConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> str:
return datetime.strptime(self.__convert_value, CRM_DATETIME_FORMAT).strftime(YYYYMMDDHHMMSS)
class FloatConverter:
def __init__(self, convert_value) -> None:
self.__convert_value = convert_value
self.value = self.convert_value()
def convert_value(self) -> int:
return int(self.__convert_value)
class CSVStringConverterFactory:
def __init__(self, v) -> None:
self.__value = v
def value_convert(self):
converted_value = self.__value
if self.__value is None:
converted_value = NoneValueConverter(self.__value).value
# 指数表記で取得できるパターン。指数表記を整数表記に変換する。
elif type(self.__value) == float:
converted_value = FloatConverter(self.__value).value
# SQLの真偽値に対応するために変換する
elif type(self.__value) == bool:
converted_value = BooleanConverter(self.__value).value
elif type(self.__value) == str and re.fullmatch(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.000\+0000', self.__value):
converted_value = DatatimeConverter(self.__value).value
return converted_value

View File

@ -0,0 +1,34 @@
from src.util.logger import Logger
from src.aws.s3 import BackupBucket
from src.constants import (
CSVBK_JP_NAME
)
from src.error.exceptions import(
FileUploadException
)
logger = Logger().get_logger()
def backup_crm_csvdata(target_object, date_path, csv_object):
# ① CSVバックアップ処理の開始ログを出力する
logger.info(
f'I-CSVBK-01 [{target_object.object_name}] のCSVデータのバックアップ処理を開始します ファイル名:[{target_object.upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに、変換後のCSVデータのバックアップを保管する
backup_bucket = BackupBucket()
backup_bucket.put_csv_bk(
f'{date_path}/{target_object.upload_file_name}.csv', csv_object.csv_buffer)
except Exception as e:
raise FileUploadException(
'E-CSVBK-01', CSVBK_JP_NAME, f'[{target_object.object_name}] CSVデータのバックアップに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVバックアップ処理の終了ログを出力する
logger.info(
f'I-CSVBK-03 [{target_object.object_name}] のCSVデータのバックアップ処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -0,0 +1,65 @@
from src.util.logger import Logger
from src.constants import(
DATE_JP_NAME
)
from src.environments import (
CRM_CONFIG_BUCKET,
LAST_FETCH_DATE_FOLDER
)
from src.error.exceptions import(
FileNotFoundException,
InvalidConfigException
)
from src.parser.json_parse import JsonParser
from src.config.objects import LastFetchDatetime
from src.aws.s3 import ConfigBucket
logger = Logger().get_logger()
def set_datetime_period(target_object, execute_datetime):
# ① データ取得期間設定処理の開始ログを出力する
logger.info(
f'I-DATE-01 [{target_object.object_name}] のデータ取得期間設定処理を開始します')
try:
# ② S3 設定ファイル保管用バケットから、前回取得日時ファイルを取得する
logger.info(
f'I-DATE-02 前回取得日時ファイルの取得開始します ファイルパス:[s3://{CRM_CONFIG_BUCKET}/{LAST_FETCH_DATE_FOLDER}/{target_object.last_fetch_datetime_file_name}]')
s3_config_bucket = ConfigBucket()
last_fetch_datetime_file_json = s3_config_bucket.get_last_fetch_datetime_file(
target_object.last_fetch_datetime_file_name)
logger.info(f'I-DATE-03 前回取得日時ファイルの取得成功しました')
except Exception as e:
raise FileNotFoundException(
'E-DATE-01', DATE_JP_NAME, f'前回取得日時ファイルが存在しません ファイル名:[{target_object.last_fetch_datetime_file_name}] エラー内容:[{e}]')
try:
# ③ 取得した前回取得日時ファイルの形式チェックを行う
# ④ データの取得期間を設定する
logger.debug(f'D-DATE-04 前回取得日時ファイルの形式チェックを開始します')
json_parser = JsonParser(last_fetch_datetime_file_json)
last_fetch_datetime_file_dict = json_parser.json_parser()
last_fetch_datetime = LastFetchDatetime(target_object.last_fetch_datetime_file_name,
last_fetch_datetime_file_dict, execute_datetime)
logger.debug(f'D-DATE-05 前回取得日時ファイルの形式チェック 正常終了')
logger.info(
f'I-DATE-06 取得範囲 From: [{last_fetch_datetime.last_fetch_datetime_from}] To: [{last_fetch_datetime.last_fetch_datetime_to}]')
except Exception as e:
raise InvalidConfigException(
'E-DATE-02', DATE_JP_NAME, f'前回取得日時ファイルの形式チェック処理が失敗しました エラー内容:[{e}]')
# ⑤ データ取得準備処理の終了ログを出力する
logger.info(
f'I-DATE-07 [{target_object.object_name}] のデータ取得期間設定処理を終了します')
# ⑥ 次の処理へ移行する
return last_fetch_datetime

View File

@ -0,0 +1,37 @@
from src.util.logger import Logger
from src.constants import (
END_JP_NAME
)
from src.environments import(
PROCESS_RESULT_FILENAME
)
from src.error.exceptions import(
FileUploadException
)
from src.aws.s3 import BackupBucket
logger = Logger().get_logger()
def updload_result_data(process_result, date_path):
# ① 取得処理実施結果アップロード処理のログを出力する
logger.info(
f'I-END-01 取得処理実施結果アップロード処理を開始します')
try:
# ② CRMバックアップ保管用バケットに、取得処理実施結果のJSONデータを保管する
backup_bucket = BackupBucket()
backup_bucket.put_result_json(
f'{date_path}/{PROCESS_RESULT_FILENAME}', process_result)
logger.debug(f'D-END-02 取得処理実施結果アップロード 正常終了')
except Exception as e:
raise FileUploadException(
'E-END-01', END_JP_NAME, f'取得処理実施結果のアップロードに失敗しました ファイル名:[{PROCESS_RESULT_FILENAME}] エラー内容:[{e}]')
# ③ 取得処理実施結果アップロード処理の終了ログを出力する
logger.info(f'I-END-03 取得処理実施結果アップロード処理を終了します')
return

View File

@ -0,0 +1,73 @@
import os
from src.constants import (
LOG_LEVEL,
LOG_LEVEL_INFO,
CRM_AUTH_TIMEOUT,
CRM_AUTH_MAX_RETRY_ATTEMPT,
CRM_AUTH_RETRY_INTERVAL,
CRM_AUTH_RETRY_MIN_INTERVAL,
CRM_AUTH_RETRY_MAX_INTERVAL,
CRM_GET_RECORD_COUNT_TIMEOUT,
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT,
CRM_GET_RECORD_COUNT_RETRY_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL,
CRM_FETCH_RECORD_TIMEOUT,
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT,
CRM_FETCH_RECORD_RETRY_INTERVAL,
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL,
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL,
CRM_AUTH_DOMAIN,
CRM_USER_NAME,
CRM_USER_PASSWORD,
CRM_USER_SECURITY_TOKEN,
CRM_CONFIG_BUCKET,
CRM_BACKUP_BUCKET,
IMPORT_DATA_BUCKET,
OBJECT_INFO_FOLDER,
OBJECT_INFO_FILENAME,
PROCESS_RESULT_FOLDER,
PROCESS_RESULT_FILENAME,
LAST_FETCH_DATE_FOLDER,
CRM_IMPORT_DATA_FOLDER,
LAST_FETCH_DATE_BACKUP_FOLDER,
RESPONSE_JSON_BACKUP_FOLDER,
CRM_IMPORT_DATA_BACKUP_FOLDER
)
# environments(task settings file)
LOG_LEVEL = os.environ.get(LOG_LEVEL, LOG_LEVEL_INFO) # ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
CRM_AUTH_TIMEOUT = os.environ["CRM_AUTH_TIMEOUT"] # CRMへの認証処理のタイムアウト秒数
CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ["CRM_AUTH_MAX_RETRY_ATTEMPT"] # CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_RETRY_INTERVAL = os.environ["CRM_AUTH_RETRY_INTERVAL"] # CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = os.environ["CRM_AUTH_RETRY_MIN_INTERVAL"] # CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = os.environ["CRM_AUTH_RETRY_MAX_INTERVAL"] # CRMへの認証処理のリトライ時の最大待ち秒数
CRM_GET_RECORD_COUNT_TIMEOUT = os.environ["CRM_GET_RECORD_COUNT_TIMEOUT"] # CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ["CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT"] # CRMのレコード件数取得処理の最大リトライ試行回数
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の初回待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の最小待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL"] # CRMのレコード件数取得処理のリトライ時の最大待ち秒数
CRM_FETCH_RECORD_TIMEOUT = os.environ["CRM_FETCH_RECORD_TIMEOUT"] # CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ["CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT"] # CRMのレコード取得処理の最大リトライ試行回数
CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_INTERVAL"] # CRMのレコード取得処理のリトライ時の初回待ち秒数
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MIN_INTERVAL"] # CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MAX_INTERVAL"] # CRMのレコード取得処理のリトライ時の最大待ち秒数
# environments(ECS Task Enviroment)
CRM_AUTH_DOMAIN = os.environ["CRM_AUTH_DOMAIN"] # CRMのAPI実行のための認証エンドポイントのドメイン
CRM_USER_NAME = os.environ["CRM_USER_NAME"] # CRMのAPI実行用ユーザ名
CRM_USER_PASSWORD = os.environ["CRM_USER_PASSWORD"] # CRMのAPI実行用ユーザパスワード
CRM_USER_SECURITY_TOKEN = os.environ["CRM_USER_SECURITY_TOKEN"] # CRMのAPI実行用ユーザのセキュリティトークン
CRM_CONFIG_BUCKET = os.environ["CRM_CONFIG_BUCKET"] # CRMデータ取得用の設定ファイルを格納するバケット名
CRM_BACKUP_BUCKET = os.environ["CRM_BACKUP_BUCKET"] # CRMのバックアップデータを格納するバケット名
IMPORT_DATA_BUCKET = os.environ["IMPORT_DATA_BUCKET"] # CRMの取込データを格納するバケット名
OBJECT_INFO_FOLDER = os.environ["OBJECT_INFO_FOLDER"] # CRM取得対象オブジェクトの情報を格納するフォルダパス
OBJECT_INFO_FILENAME = os.environ["OBJECT_INFO_FILENAME"] # CRM取得対象オブジェクトの情報のファイル名
PROCESS_RESULT_FOLDER = os.environ["PROCESS_RESULT_FOLDER"] # CRMデータ取得結果を格納するフォルダパス
PROCESS_RESULT_FILENAME = os.environ["PROCESS_RESULT_FILENAME"] # CRMデータ取得結果を格納するファイル名
LAST_FETCH_DATE_FOLDER = os.environ["LAST_FETCH_DATE_FOLDER"] # CRMからの最終取得日時ファイルを格納するフォルダパス
CRM_IMPORT_DATA_FOLDER = os.environ["CRM_IMPORT_DATA_FOLDER"] # CRMから取得し、取込用に変換したデータを格納するフォルダ
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ["LAST_FETCH_DATE_BACKUP_FOLDER"] # CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
RESPONSE_JSON_BACKUP_FOLDER = os.environ["FETCH_DATA_BACKUP_FOLDER"] # CRMから取得した生データのバックアップを格納するフォルダパス
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ["CRM_IMPORT_DATA_BACKUP_FOLDER"] # CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ

View File

View File

@ -0,0 +1,45 @@
from abc import ABCMeta
class MeDaCaCRMDataFetchException(Exception, metaclass=ABCMeta):
"""MeDaCaシステム固有のカスタムエラークラス"""
def __init__(self, error_id: str, func_name, message) -> None:
super().__init__(message)
self.func_name = func_name
self.error_id = error_id
class FileNotFoundException(MeDaCaCRMDataFetchException):
"""S3のファイルが見つからない場合の例外"""
pass
class FileUploadException(MeDaCaCRMDataFetchException):
"""S3のファイルアップロード失敗の例外"""
pass
class InvalidConfigException(MeDaCaCRMDataFetchException):
"""Configのバリデーションチェック失敗の例外"""
pass
class DataConvertException(MeDaCaCRMDataFetchException):
"""データ変換が失敗した場合の例外"""
pass
class TimeOutException(MeDaCaCRMDataFetchException):
"""タイムアウトが発生した場合の例外"""
pass
class RetryExceededException(MeDaCaCRMDataFetchException):
"""リトライ処理が発生した場合の例外"""
pass
class SalesforceAPIException(MeDaCaCRMDataFetchException):
"""SalseforceのAPI実行失敗が発生した場合の例外"""
pass

View File

@ -0,0 +1,71 @@
from src.util.logger import Logger
from src.constants import(
FETCH_JP_NAME
)
from src.error.exceptions import(
SalesforceAPIException,
DataConvertException
)
from src.salesforce.salesforce_api import(
SalesForceCount,
SalesForceData
)
logger = Logger().get_logger()
def fetch_crm_data(target_object, last_fetch_datetime):
# ① CRMデータ取得処理開始ログを出力する
logger.info(
f'I-FETCH-01 [{target_object.object_name}] のCRMからのデータ取得処理を開始します')
object_name = target_object.object_name
columns = ','.join(target_object.columns)
last_fetch_datetime_from = last_fetch_datetime.last_fetch_datetime_from
last_fetch_datetime_to = last_fetch_datetime.last_fetch_datetime_to
try:
# ② 取得対象オブジェクトの取得期間内のレコード件数を取得する
logger.info(f'I-FETCH-02 [{object_name}] の件数取得を開始します')
saleforce_count = SalesForceCount()
record_count = saleforce_count.fetch_sf_count_retry(
object_name, last_fetch_datetime_from, last_fetch_datetime_to)
logger.info(f'I-FETCH-03 [{object_name}] の件数:[{record_count}]')
except Exception as e:
raise SalesforceAPIException(
'E-FETCH-01', FETCH_JP_NAME, f'[{object_name}] の件数取得に失敗しました [{e}]')
try:
# ③ 取得対象オブジェクトのレコードを取得する
logger.info(f'I-FETCH-04 [{object_name}] のレコード取得を開始します')
saleforce_data = SalesForceData()
record_generator = saleforce_data.fetch_sf_data_retry(
columns, object_name, last_fetch_datetime_from, last_fetch_datetime_to)
except Exception as e:
raise SalesforceAPIException(
'E-FETCH-02', FETCH_JP_NAME, f'[{object_name}] のレコード取得に失敗しました [{e}]')
try:
# ④ 取得対象オブジェクトをJSONに変換
logger.info(f'I-FETCH-05 [{object_name}] のレコードをJSONに変換します')
sf_object_jsons = []
for record in record_generator:
sf_object_jsons.append(record)
except Exception as e:
raise DataConvertException(
'E-FETCH-03', FETCH_JP_NAME, f'[{object_name}] のレコードのJSON変換に失敗しました [{e}]')
# ⑤ CRMデータ取得処理終了ログを出力する
logger.info(f'I-FETCH-06 [{object_name}] のレコード取得が成功しました')
# ⑥ 次の処理へ移行する
return sf_object_jsons

View File

View File

@ -0,0 +1,15 @@
import re
import json
from src.constants import EXCLUDE_SYMBOL
class JsonParser():
def __init__(self, json_str) -> None:
self.__json_str = json_str
def json_parser(self) -> dict:
for symbol in EXCLUDE_SYMBOL:
# コメントアウトシンボルを含む部分を置き換える正規表現
replace_comment_regex = rf'\s(?!\"){symbol}[\s\S]*?.*'
self.__json_str = re.sub(replace_comment_regex, '', self.__json_str)
return json.loads(self.__json_str)

View File

@ -1,96 +1,83 @@
# モジュールロード
from datetime import datetime
from src.utils.logger import Logger
import src.utils.data_retention as data_retention
import src.utils.s3_access as s3_access
import src.utils.json_parser as json_parser
from src.util.logger import Logger
from src.constants import(
PRE_JP_NAME,
YYYYMMDDTHHMMSSTZ
)
from src.environments import(
CRM_CONFIG_BUCKET,
OBJECT_INFO_FOLDER,
OBJECT_INFO_FILENAME
)
from src.error.exceptions import(
FileNotFoundException,
InvalidConfigException
)
from src.aws.s3 import ConfigBucket
from src.parser.json_parse import JsonParser
from src.config.objects import FetchTargetObjects
# 変数
# .000ZはUTCを表す。ミリ秒までの考慮は不要なので固定で指定
YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z'
CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z'
YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S'
# logger設定
logger = Logger().get_logger()
# 処理
def prepare_get_data():
# ① データ取得準備処理の開始ログを出力する
logger.info('I-PRE-01 データ取得準備処理を開始します')
# ② 取得処理開始年月日時分秒を控える
execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ)
date_path = execute_datetime.rstrip('000Z').translate(
str.maketrans({'-': '/', 'T': '/', ':': None, '.': None}))
logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}')
def pre(crm_object_info_var):
try:
# ① データ取得準備処理の開始ログを出力する
logger.info('I-PRE-01 データ取得準備処理を開始します')
# ③ S3 設定ファイル保管用バケットから、CRM_取得オブジェクト情報ファイルを取得する
object_info_file_s3_path = f's3://{CRM_CONFIG_BUCKET}{OBJECT_INFO_FOLDER}/{OBJECT_INFO_FILENAME}'
logger.debug(
f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{object_info_file_s3_path}]')
# ② 取得処理開始年月日時分秒を控える
execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ)
logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}')
config_bucket = ConfigBucket()
object_info_file_json = config_bucket.get_object_info_file()
# ⑤ S3 設定ファイル保管用バケットから、CRM_取得オブジェクト情報ファイルを取得する
try:
crm_object_info_var = data_retention.CrmObjectInfoVar()
s3_fullpath = f's3://{crm_object_info_var.bucket}/{crm_object_info_var.file_name_key}'
logger.debug(f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{s3_fullpath}]')
get_file_s3 = s3_access.GetFileS3()
crm_object_info_json = get_file_s3.get_file_from_s3(crm_object_info_var.bucket, crm_object_info_var.file_name_key)
logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました')
except Exception as e:
logger.error(f'E-PRE-01 CRM_取得オブジェクト情報ファイルが存在しません ファイル名[{s3_fullpath}]', e)
raise e
# ⑥ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する
try:
logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします')
json_exclusde_comment = json_parser.JsonExculudeComment()
crm_object_info_dict: dict = json_exclusde_comment(crm_object_info_json)
logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました')
except Exception as e:
logger.error(f'E-PRE-02 CRM_取得オブジェクト情報ファイルのパースに失敗しました エラー内容[{e}]')
raise e
# ⑦ メモリ上のCRM_取得オブジェクト情報のキーobjectsの形式チェックを行う
try:
logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェックを開始します')
if 'objects' not in crm_object_info_dict:
logger.error(f'E-PRE-03 CRM_取得オブジェクト情報ファイル「objects」キーは必須です ファイル名[{s3_fullpath}]')
raise e
if not isinstance(crm_object_info_dict["objects"], list):
logger.error(f'E-PRE-04 CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名[{s3_fullpath}]')
raise e
logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェック 正常終了')
except Exception as e:
logger.error(f'CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名[{s3_fullpath}]', e)
raise e
# ⑧ 処理結果出力用のマップを初期化
process_result_per_object = {}
for sfdc_object in crm_object_info_dict.get('objects'):
object_name = sfdc_object.get('object_name')
process_result_per_object[object_name] = 'fail'
# ⑨ データ取得準備処理の終了ログを出力する
logger.info('I-PRE-09 データ取得準備処理を終了します')
# ⑩ 次の処理へ移行する
return(crm_object_info_dict, execute_datetime, process_result_per_object)
logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました')
except Exception as e:
logger.error(f'データ取得準備処理で想定外のエラーが発生しました エラー内容:[{e}]', e)
raise e
raise FileNotFoundException(
'E-PRE-01', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルが存在しません ファイル名:[{OBJECT_INFO_FILENAME}] エラー内容:[{e}]')
try:
# ④ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する
logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします')
json_parser = JsonParser(object_info_file_json)
object_info_file_dict = json_parser.json_parser()
logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました')
except Exception as e:
raise InvalidConfigException(
'E-PRE-02', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルのパースに失敗しました エラー内容:[{e}]')
# ⑤ メモリ上のCRM_取得オブジェクト情報のキーobjectsの形式チェックを行う
try:
logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェックを開始します')
fetch_target_objects = FetchTargetObjects(object_info_file_dict)
logger.debug('D-PRE-08 CRM_取得オブジェクト情報ファイルの形式チェック 正常終了')
except Exception as e:
raise InvalidConfigException(
'E-PRE-03', PRE_JP_NAME, f'CRM_取得オブジェクト情報ファイルの形式チェックに失敗しました ファイル名:[{OBJECT_INFO_FILENAME}] エラー内容:[{e}]')
# ⑥ 処理結果出力用のマップを初期化
process_result = {}
# ⑦ データ取得準備処理の終了ログを出力する
logger.info('I-PRE-09 データ取得準備処理を終了します')
# ⑧ 次の処理へ移行する
return(fetch_target_objects, execute_datetime, date_path, process_result)

View File

@ -0,0 +1,150 @@
from src.pre import prepare_get_data # データ取得準備処理
from src.chk import check_object_info # オブジェクト情報形式チェック処理
from src.date import set_datetime_period # データ取得期間設定処理
from src.fetch import fetch_crm_data # CRMデータ取得処理
from src.resbk import backup_crm_data # CRM電文データバックアップ処理
from src.conv import convert_crm_csvdata # CSV変換処理
from src.csvbk import backup_crm_csvdata # CSVバックアップ処理
from src.upld import copy_crm_csvdata # CSVアップロード処理
from src.upd import updload_last_fetch_datetime # 前回取得日時ファイル更新
from src.end import updload_result_data # 取得処理実施結果アップロード処理
from src.util.logger import Logger
from src.error.exceptions import(
MeDaCaCRMDataFetchException
)
logger = Logger().get_logger()
def main() -> None:
try:
# ① CRMデータ取得処理開始ログを出力する
logger.info('I-CTRL-01 CRMデータ取得処理を開始します')
fetch_target_objects = None # オブジェクト情報ファイル用オブジェクト
execute_datetime = None # 実行日次文字列
date_path = None # 実行日次のパス文字列
process_result = None # オブジェクトごとの実行結果JSON
# ② データ取得準備処理を呼び出す
logger.info('I-CTRL-02 データ取得準備処理呼び出し')
fetch_target_objects, execute_datetime, date_path, process_result = prepare_get_data()
# ③ object_infoのobjectsキーの値の件数分ループする
logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始')
for object_info in fetch_target_objects:
try:
# 1. オブジェクト処理結果の初期化
target_object = None # オブジェクトごとの情報格納用オブジェクト
last_fetch_datetime = None # オブジェクトごとの取得日付格納用オブジェクト
sf_object_jsons = None # オブジェクトごとのSalesforce取得変数JSON
csv_object = None # CSVオブジェクト
process_result[object_info.get('object_name')] = 'fail' # オブジェクト処理結果
logger.debug(f'D-CTRL-04 対象のオブジェクト情報を出力します オブジェクト情報:{object_info}')
# 2. オブジェクト情報形式チェック処理を呼び出す
logger.info('I-CTRL-05 オブジェクト情報形式チェック処理呼び出し')
target_object = check_object_info(object_info, execute_datetime)
# 3. 処理対象のオブジェクト名をログ出力する
logger.info(
f'I-CTRL-06 [{target_object.object_name}]のデータ取得を開始します')
# 4. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する
if target_object.is_skip is True:
logger.info(
f'I-CTRL-07 [{target_object.object_name}]のデータ取得処理をスキップします')
continue
# 5. データ取得期間設定処理を呼び出す
logger.info(
f'I-CTRL-08 [{target_object.object_name}]のデータ取得期間設定処理呼び出し')
last_fetch_datetime = set_datetime_period(target_object, execute_datetime)
# 6. CRMデータ取得処理を呼び出す
logger.info(
f'I-CTRL-09 [{target_object.object_name}]のデータ取得処理呼び出し')
sf_object_jsons = fetch_crm_data(target_object, last_fetch_datetime)
# 7. 出力ファイル名をログ出力する
logger.info(
f'I-CTRL-10 [{target_object.object_name}] の出力ファイル名は [{target_object.upload_file_name}]となります')
# 8. CRM電文データバックアップ処理を呼び出す
logger.info(
f'I-CTRL-11 [{target_object.object_name}] CRM電文データバックアップ処理呼び出し')
backup_crm_data(target_object.object_name, sf_object_jsons, date_path)
# 9. CSV変換処理を呼び出す
logger.info(
f'I-CTRL-12 [{target_object.object_name}] CSV変換処理呼び出し')
csv_object = convert_crm_csvdata(target_object, sf_object_jsons)
# 10. CSVバックアップ処理を呼び出す
logger.info(
f'I-CTRL-13 [{target_object.object_name}] CSVデータバックアップ処理呼び出し')
backup_crm_csvdata(target_object, date_path, csv_object)
# 11. CSVアップロード処理を呼び出す
logger.info(
f'I-CTRL-14 [{target_object.object_name}] CSVデータアップロード処理呼び出し')
copy_crm_csvdata(target_object, date_path)
# 12. 前回取得日時ファイル更新処理を呼びだす
logger.info(
f'I-CTRL-15 [{target_object.object_name}] 前回取得日時ファイル更新処理呼び出し')
updload_last_fetch_datetime(target_object, last_fetch_datetime)
# 13. オブジェクト処理結果の更新
process_result[target_object.object_name] = 'success'
# 14. オブジェクトのアップロードが完了した旨をログに出力する
logger.info(f'I-CTRL-16 [{target_object.object_name}] 処理正常終了')
except MeDaCaCRMDataFetchException as e:
logger.error(f'{e.error_id} {e}')
logger.exception(
f'I-ERR-03 [{object_info.get("object_name")}] の[{e.func_name}]でエラーが発生しました 次のオブジェクトの処理に移行します')
continue
except Exception as e:
logger.exception(
f'I-ERR-04 [{object_info.get("object_name")}] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します', e)
continue
# ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する
logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{process_result}]')
# ⑤ 取得処理実施結果アップロード処理を呼び出す
logger.info('I-CTRL-19 CRM_取得処理実施結果ファイルアップロード処理開始')
updload_result_data(process_result, date_path)
# ⑥ 最終結果をチェックし、チェック結果をログに出力
if not all([v == 'success' for v in process_result.values()]):
logger.error('E-CTRL-01 一部のデータ取得に失敗しています。')
else:
logger.info('I-CTRL-18 すべてのデータの取得に成功しました。')
# ⑦ CRMデータ取得処理終了ログを出力する
logger.info('I-CTRL-20 CRMデータ取得処理を終了します')
return exit(0)
except MeDaCaCRMDataFetchException as e:
logger.error(f'{e.error_id} {e}')
logger.exception(f'I-ERR-01 [{e.func_name}]でエラーが発生したため、処理を終了します')
return exit(0)
except Exception as e:
logger.exception('I-ERR-02 予期せぬエラーが発生したため、処理を終了します', e)
return exit(0)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,35 @@
from src.util.logger import Logger
from src.constants import(
RESBK_JP_NAME
)
from src.error.exceptions import(
FileUploadException
)
from src.aws.s3 import BackupBucket
logger = Logger().get_logger()
def backup_crm_data(object_name, sf_object_jsons, date_path):
# ① CRM電文データバックアップ処理の開始ログを出力する
logger.info(f'I-RESBK-01 [{object_name}] のCRM電文データバックアップ処理を開始します')
try:
# ② CRMバックアップ保管用バケットに、CRMから取得したJSONの電文データのバックアップを保管する
file_name = f'{date_path}/{object_name}.json'
backup_bucket = BackupBucket()
backup_bucket.put_response_json(file_name, sf_object_jsons)
logger.debug(f'D-RESBK-02 [{object_name}] のJSONデータバックアップ 正常終了')
except Exception as e:
raise FileUploadException(
'E-RESBK-01', RESBK_JP_NAME, f'[{object_name}] 電文データのバックアップに失敗しました ファイル名:[{object_name}.json] エラー内容:[{e}]')
# ③ CRM電文データバックアップ処理の終了ログを出力する
logger.info(f'I-RESBK-03 [{object_name}] のCRM電文データバックアップ処理を終了します')
# ④ 次の処理へ移行する
return

View File

@ -0,0 +1,78 @@
from tenacity import retry, stop_after_attempt, stop_after_delay
from tenacity.wait import wait_exponential
from simple_salesforce import Salesforce
from src.util.async_retry import TimeoutManager, AsyncRetry
from src.environments import(
CRM_AUTH_DOMAIN,
CRM_USER_NAME,
CRM_USER_PASSWORD,
CRM_USER_SECURITY_TOKEN,
CRM_AUTH_TIMEOUT,
CRM_AUTH_MAX_RETRY_ATTEMPT,
CRM_AUTH_RETRY_INTERVAL,
CRM_AUTH_RETRY_MIN_INTERVAL,
CRM_AUTH_RETRY_MAX_INTERVAL,
CRM_GET_RECORD_COUNT_TIMEOUT,
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT,
CRM_GET_RECORD_COUNT_RETRY_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL,
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL,
CRM_FETCH_RECORD_TIMEOUT, # CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT,
CRM_FETCH_RECORD_RETRY_INTERVAL,
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL,
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL
)
FETCH_SOQL = """SELECT {column_or_expression} FROM {object_name}
WHERE SystemModStamp > {last_update_datetime_from}
AND SystemModStamp <= {last_update_datetime_to}
"""
class SalesfoeceApi():
def __init__(self) -> None:
self.__sf = Salesforce(username=CRM_USER_NAME, password=CRM_USER_PASSWORD,
security_token=CRM_USER_SECURITY_TOKEN,
domain=CRM_AUTH_DOMAIN
)
def sf_query(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
return self.__sf.query(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout)))
def sf_query_all_iter(self, soql, include_deleted=True, conn_timeout=100, read_timeout=300):
return self.__sf.query_all_iter(soql, include_deleted, timeout=(float(conn_timeout), float(read_timeout)))
class SalesForceCount():
def fetch_sf_count(self, object_name, last_update_datetime_from, last_update_datetime_to):
count_soql = FETCH_SOQL.format(
column_or_expression='COUNT(Id)',
object_name=object_name,
last_update_datetime_from=last_update_datetime_from,
last_update_datetime_to=last_update_datetime_to
)
self.__sf = SalesfoeceApi()
count_res = self.__sf.sf_query(
count_soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_GET_RECORD_COUNT_TIMEOUT)
return count_res.get('records')[0].get('expr0')
@retry(wait=wait_exponential(multiplier=CRM_GET_RECORD_COUNT_RETRY_INTERVAL, min=CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL, max=CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL), stop=stop_after_attempt(CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT) | stop_after_delay(CRM_GET_RECORD_COUNT_TIMEOUT))
def fetch_sf_count_retry(self, object_name, last_update_datetime_from, last_update_datetime_to):
return self.fetch_sf_count(object_name, last_update_datetime_from, last_update_datetime_to)
class SalesForceData():
def fetch_sf_data(self, columns, object_name, last_update_datetime_from, last_update_datetime_to):
soql = FETCH_SOQL.format(
column_or_expression=columns,
object_name=object_name,
last_update_datetime_from=last_update_datetime_from,
last_update_datetime_to=last_update_datetime_to
)
self.__sf = SalesfoeceApi()
return self.__sf.sf_query_all_iter(soql, conn_timeout=CRM_AUTH_TIMEOUT, read_timeout=CRM_FETCH_RECORD_TIMEOUT)
@retry(wait=wait_exponential(multiplier=CRM_FETCH_RECORD_RETRY_INTERVAL, min=CRM_FETCH_RECORD_RETRY_MIN_INTERVAL, max=CRM_FETCH_RECORD_RETRY_MAX_INTERVAL), stop=stop_after_attempt(CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT) | stop_after_delay(CRM_FETCH_RECORD_TIMEOUT))
def fetch_sf_data_retry(self, columns, object_name, last_update_datetime_from, last_update_datetime_to):
return self.fetch_sf_data(columns, object_name, last_update_datetime_from, last_update_datetime_to)

View File

@ -0,0 +1,49 @@
import json
from src.util.logger import Logger
from src.aws.s3 import ConfigBucket
from src.constants import (
UPD_JP_NAME
)
from src.error.exceptions import(
FileUploadException
)
logger = Logger().get_logger()
def updload_last_fetch_datetime(target_object, last_fetch_datetime):
# ① 前回取得日時ファイル更新処理の開始ログを出力する
logger.info(
f'I-UPD-01 [{target_object.object_name}] の前回取得日時ファイルの更新処理を開始します')
try:
if target_object.is_update_last_fetch_datetime == False:
# ② オブジェクト情報.is_update_last_fetch_datetimeがfalseの場合、以降の処理をスキップする
logger.info(
f'I-UPD-02 [{target_object.object_name}] の前回取得日時ファイルの更新処理をスキップします')
else:
# ③ 前回取得日時ファイル.last_fetch_datetime_fromに取得処理開始年月日時分秒を設定する
# 前回取得日時ファイル.last_fetch_datetime_toに空文字を設定する
last_fetch_datetime_dict = {
'last_fetch_datetime_from': last_fetch_datetime.last_fetch_datetime_to,
'last_fetch_datetime_to': ''
}
config_bucket = ConfigBucket()
config_bucket.put_last_fetch_datetime_file(
target_object.last_fetch_datetime_file_name, json.dumps(last_fetch_datetime_dict))
logger.info(
f'D-UPD-03 [{target_object.object_name}] の前回取得日時ファイル更新処理 正常終了')
except Exception as e:
raise FileUploadException(
'E-UPD-01', UPD_JP_NAME, f'[{target_object.object_name}] 前回処理日時ファイルのアップロードに失敗しました ファイル名:[{target_object.last_fetch_datetime_file_name}] エラー内容:[{e}]')
# ④ 前回取得日時ファイル更新処理の終了ログを出力する
logger.info(
f'I-UPD-04 [{target_object.object_name}] の前回取得日時ファイルの更新処理を終了します')
# ⑤ 次の処理へ移行する
return

View File

@ -0,0 +1,43 @@
from src.util.logger import Logger
from src.constants import (
UPLD_JP_NAME
)
from src.environments import(
CRM_BACKUP_BUCKET,
IMPORT_DATA_BUCKET,
CRM_IMPORT_DATA_FOLDER,
CRM_IMPORT_DATA_BACKUP_FOLDER,
)
from src.error.exceptions import(
FileUploadException
)
from src.aws.s3 import S3ResourceNonBucket
logger = Logger().get_logger()
def copy_crm_csvdata(target_object, date_path):
# ① CSVデータアップロード処理の開始ログを出力する
logger.info(
f'I-UPLD-01 [{target_object.object_name}] のCSVデータアップロード処理を開始します ファイル名:[{target_object.upload_file_name}.csv]')
try:
# ② CRMバックアップ保管用バケットに保管した変換後のCSVデータをデータ取込バケットにコピーする
s3_resource_non_bucket = S3ResourceNonBucket()
s3_resource_non_bucket.copy(CRM_BACKUP_BUCKET, f'{CRM_IMPORT_DATA_BACKUP_FOLDER}/{date_path}/{target_object.upload_file_name}.csv',
IMPORT_DATA_BUCKET, f'{CRM_IMPORT_DATA_FOLDER}/{target_object.upload_file_name}.csv')
logger.debug(
f'D-UPLD-02 [{target_object.object_name}] のCSVデータアップロード 正常終了')
except Exception as e:
raise FileUploadException(
'E-UPLD-01', UPLD_JP_NAME, f'[{target_object.object_name}] CSVデータのアップロードに失敗しました ファイル名:[{target_object.upload_file_name}.csv] エラー内容:[{e}]')
# ③ CSVデータアップロード処理の終了ログを出力する
logger.info(
f'I-UPLD-03 [{target_object.object_name}] のCSVデータのアップロード処理を終了します')
# ④ 次の処理へ移行する
return

View File

View File

@ -0,0 +1,64 @@
import re
class DictCheck:
def __init__(self) -> None:
pass
def check_key_exist(self, object_dict: dict, check_key: str) -> bool:
'''
辞書型キー存在チェック
'''
return True if check_key in object_dict and object_dict[check_key] != '' else False
def check_value_type(self, object_dict: dict, check_key: str, check_type: type) -> bool:
'''
辞書型バリュー型チェック
'''
return True if isinstance(object_dict[check_key], check_type) else False
def check_regex(self, regex_str: str, object_dict: dict, check_key: str) -> bool:
'''
辞書型バリュー正規表現チェック
'''
return True if re.fullmatch(regex_str, object_dict[check_key]) else False
def check_key_exist_and_value_type(self, object_dict: dict, check_key: str, check_type: type) -> None:
'''
辞書型キー存在チェックバリュー型チェック
'''
if not self.check_key_exist(object_dict, check_key):
raise Exception(f'{check_key}」キーは必須です')
elif not self.check_value_type(object_dict, check_key, check_type):
raise Exception(f'{check_key}」キーの値は「{check_type}」でなければなりません')
def check_key_exist_case_value_type(self, object_dict: dict, check_key: str, check_type: type):
'''
辞書型キー存在した場合のバリュー型チェック
'''
if not self.check_key_exist(object_dict, check_key):
pass
elif not self.check_value_type(object_dict, check_key, check_type):
raise Exception(f'{check_key}」キーの値は「{check_type}」でなければなりません')
def check_key_exsit_and_regex(self, object_dict: dict, check_key: str, regex_str: str):
'''
辞書型キー存在チェックバリュー正規表現チェック
'''
if not self.check_key_exist(object_dict, check_key):
raise Exception(f'{check_key}」キーは必須です')
elif not self.check_regex(regex_str, object_dict, check_key):
raise Exception(f'{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました')
def check_key_exsit_case_regex(self, object_dict: dict, check_key: str, regex_str: str):
'''
辞書型キー存在した場合のバリュー正規表現チェック
'''
if not self.check_key_exist(object_dict, check_key):
pass
elif not self.check_regex(regex_str, object_dict, check_key):
raise Exception(f'{check_key}」キーの値の正規表現「{regex_str}」チェックに失敗しました')

View File

@ -0,0 +1,29 @@
import logging
from src.environments import LOG_LEVEL
class Logger():
__logger: logging.Logger
def __init__(self):
self.__logger = logging.getLogger()
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
self.__logger.setLevel(level)
if not self.__logger.hasHandlers():
handler = logging.StreamHandler()
self.__logger.addHandler(handler)
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
for handler in self.__logger.handlers:
handler.setFormatter(formatter)
def get_logger(self) -> logging.Logger:
return self.__logger

View File

@ -1,169 +0,0 @@
# モジュールロード
import dataclasses
#from fileinput import filename
import os
#from tracemalloc import DomainFilter
# 変数
# ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
LOG_LEVEL = os.environ["LOG_LEVEL"]
CRM_AUTH_TIMEOUT = os.environ["CRM_AUTH_TIMEOUT"] # CRMへの認証処理のタイムアウト秒数
# CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ["CRM_AUTH_MAX_RETRY_ATTEMPT"]
# CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_INTERVAL = os.environ["CRM_AUTH_RETRY_INTERVAL"]
# CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = os.environ["CRM_AUTH_RETRY_MIN_INTERVAL"]
# CRMへの認証処理のリトライ時の最大待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = os.environ["CRM_AUTH_RETRY_MAX_INTERVAL"]
# CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_TIMEOUT = os.environ["CRM_GET_RECORD_COUNT_TIMEOUT"]
# CRMのレコード件数取得処理の最大リトライ試行回数
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ["CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT"]
# CRMのレコード件数取得処理のリトライ時の初回待ち秒数
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_INTERVAL"]
# CRMのレコード件数取得処理のリトライ時の最小待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL"]
# CRMのレコード件数取得処理のリトライ時の最大待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL"]
# CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_TIMEOUT = os.environ["CRM_FETCH_RECORD_TIMEOUT"]
# CRMのレコード取得処理の最大リトライ試行回数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ["CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT"]
# CRMのレコード取得処理のリトライ時の初回待ち秒数
CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_INTERVAL"]
# CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MIN_INTERVAL"]
# CRMのレコード取得処理のリトライ時の最大待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MAX_INTERVAL"]
CRM_AUTH_DOMAIN = os.environ["CRM_AUTH_DOMAIN"] # CRMのAPI実行のための認証エンドポイントのドメイン
CRM_USER_NAME = os.environ["CRM_USER_NAME"] # CRMのAPI実行用ユーザ名
CRM_USER_PASSWORD = os.environ["CRM_USER_PASSWORD"] # CRMのAPI実行用ユーザパスワード
# CRMのAPI実行用ユーザのセキュリティトークン
CRM_USER_SECURITY_TOKEN = os.environ["CRM_USER_SECURITY_TOKEN"]
# CRMデータ取得用の設定ファイルを格納するバケット名
CRM_CONFIG_BUCKET = os.environ["CRM_CONFIG_BUCKET"]
CRM_BACKUP_BUCKET = os.environ["CRM_BACKUP_BUCKET"] # CRMのバックアップデータを格納するバケット名
IMPORT_DATA_BUCKET = os.environ["IMPORT_DATA_BUCKET"] # CRMの取込データを格納するバケット名
# TASK_SETTING_FOLDER = os.environ["TASK_SETTING_FOLDER"] # CRM取得処理タスクの設定ファイルを格納するフォルダパス
# TASK_SETTING_FILENAME = os.environ["TASK_SETTING_FILENAME"] # CRM取得処理タスクの設定ファイルのファイル名
# CRM取得対象オブジェクトの情報を格納するフォルダパス
OBJECT_INFO_FOLDER = os.environ["OBJECT_INFO_FOLDER"]
# CRM取得対象オブジェクトの情報のファイル名
OBJECT_INFO_FILENAME = os.environ["OBJECT_INFO_FILENAME"]
# CRMデータ取得結果を格納するフォルダパス
FETCH_RESULT_FOLDER = os.environ["FETCH_RESULT_FOLDER"]
# CRMデータ取得結果を格納するファイル名
FETCH_RESULT_FILENAME = os.environ["FETCH_RESULT_FILENAME"]
# CRMからの最終取得日時ファイルを格納するフォルダパス
LAST_FETCH_DATE_FOLDER = os.environ["LAST_FETCH_DATE_FOLDER"]
# CRMから取得し、取込用に変換したデータを格納するフォルダ
CRM_IMPORT_DATA_FOLDER = os.environ["CRM_IMPORT_DATA_FOLDER"]
# CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ["LAST_FETCH_DATE_BACKUP_FOLDER"]
# CRMから取得した生データのバックアップを格納するフォルダパス
FETCH_DATA_BACKUP_FOLDER = os.environ["FETCH_DATA_BACKUP_FOLDER"]
# CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ["CRM_IMPORT_DATA_BACKUP_FOLDER"]
# 処理
@dataclasses.dataclass
class LogLevel:
loglevel: str = LOG_LEVEL
@dataclasses.dataclass
class CrmAuthVar:
auth_domain: str = CRM_AUTH_DOMAIN
user_name: str = CRM_USER_NAME
user_password: str = CRM_USER_PASSWORD
security_token: str = CRM_USER_SECURITY_TOKEN
timeout: int = CRM_AUTH_TIMEOUT
max_retry_attempt: int = CRM_AUTH_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_AUTH_RETRY_INTERVAL
retry_min_interval: int = CRM_AUTH_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_AUTH_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmGetRecordCountVar:
timeout: int = CRM_GET_RECORD_COUNT_TIMEOUT
max_retry_attempt: int = CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_GET_RECORD_COUNT_RETRY_INTERVAL
retry_min_interval: int = CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmFetchRecordVar:
timeout: int = CRM_FETCH_RECORD_TIMEOUT
max_retry_attempt: int = CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_FETCH_RECORD_RETRY_INTERVAL
retry_min_interval: int = CRM_FETCH_RECORD_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_FETCH_RECORD_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmBucketVar:
config_bucket: str = CRM_CONFIG_BUCKET
backup_bucket: str = CRM_BACKUP_BUCKET
data_bucket: str = IMPORT_DATA_BUCKET
@dataclasses.dataclass
class CrmObjectInfoVar:
bucket: str = CRM_CONFIG_BUCKET
folder: str = OBJECT_INFO_FOLDER
file_name: str = OBJECT_INFO_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class FetchResultVar:
folder: str = FETCH_RESULT_FOLDER
file_name: str = FETCH_RESULT_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class FetchResultVar:
folder: str = FETCH_RESULT_FOLDER
file_name: str = FETCH_RESULT_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class LastFetchDateVar:
folder: str = LAST_FETCH_DATE_FOLDER
@dataclasses.dataclass
class CrmImportDataVar:
folder: str = CRM_IMPORT_DATA_FOLDER
@dataclasses.dataclass
class LastFetchDateBackupVar:
folder: str = LAST_FETCH_DATE_BACKUP_FOLDER
@dataclasses.dataclass
class FetchDataBackupVar:
folder: str = FETCH_DATA_BACKUP_FOLDER
@dataclasses.dataclass
class CrmImportDataBackupVar:
folder: str = CRM_IMPORT_DATA_BACKUP_FOLDER

View File

@ -1,28 +0,0 @@
# モジュールロード
import re
import json
# 変数
SYMBOL = ['#', '/']
# 処理
class JsonExculudeComment:
def __init__(self) -> None:
pass
def json_exclude_comment(self, json_str) -> dict:
for symbol in SYMBOL:
# コメントアウトシンボルを含む部分を置き換える正規表現
replace_comment_regex = rf'\s(?!\"){symbol}[\s\S]*?.*'
json_str = self.json_regex_parse(replace_comment_regex, json_str)
result = json.loads(json_str)
return result
def json_regex_parse(replace_comment_regex, json_str) -> str:
json_without_comment = re.sub(replace_comment_regex, '', json_str)
return json_without_comment

View File

@ -1,33 +0,0 @@
import logging
import os
LOG_LEVEL = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warn': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG
}
class Logger():
__logger: logging.Logger
def __init__(self):
# logger設定
self.__logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
handler = logging.StreamHandler()
self.__logger.addHandler(handler)
for handler in self.__logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(os.environ.get('LOG_LEVEL', 'WARN').upper())
if not isinstance(level, int):
level = logging.INFO
self.__logger.setLevel(level)
def get_logger(self) -> logging.Logger:
return self.__logger

View File

@ -1,48 +0,0 @@
# モジュールロード
import boto3
from tenacity import retry, stop_after_attempt
from tenacity.wait import wait_exponential
# 変数
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
# 処理
class UploadFileS3:
def __init__(self, bucket_name, local_file_path, file_name_key) -> None:
self.bucket_name: str = bucket_name
self.local_file_path: str = local_file_path
self.file_name_key: str = file_name_key
def upload_file_to_s3(self) -> None:
bucket = s3_resource.Bucket(self.bucket_name)
bucket.upload_file(self.local_file_path, self.file_name_key)
class GetFileS3:
def __init__(self, bucket_name, file_name_key, multiplier=5, min=5, max=50, count=3) -> None:
self.bucket_name: str = bucket_name
self.file_name_key: str = file_name_key
self.multiplier: int = multiplier
self.min: int = min
self.max: int = max
self.count: int = count
def get_file_from_s3(self) -> str:
bucket = s3_resource.Bucket(self.bucket_name)
response = bucket.Object(self.file_name_key).get()
body = response['Body'].read()
return body.decode('utf-8')
def get_file_from_s3_with_retry(self) -> str:
@retry(wait=wait_exponential(multiplier=self.multiplier, min=self.min, max=self.max), stop=stop_after_attempt(self.count))
def get_file_from_s3_with_retry_deco(*args, **kwargs) -> str:
return self.get_file_from_s3(self.bucket_name, self.file_name_key)
response = get_file_from_s3_with_retry_deco(self)
return response