feat: CRMデータ取得ファーストコミット

This commit is contained in:
Y_SAKAI 2022-07-04 14:40:57 +09:00
parent dfc681094f
commit f2650c1ac8
22 changed files with 628 additions and 0 deletions

View File

@ -0,0 +1,12 @@
FROM python:3.8
ENV TZ="Asia/Tokyo"
WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
RUN pip install --no-cache-dir -r requirements.txt && \
cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
COPY main.py ./
COPY crm-datafetch /usr/src
CMD [ "python", "./main.py" ]

View File

@ -0,0 +1,4 @@
[packages]
boto3
simple-salesforce
tenacity

0
ecs/crm-datafetch/Pipfile.lock generated Normal file
View File

181
ecs/crm-datafetch/main.py Normal file
View File

@ -0,0 +1,181 @@
# モジュールロード
import os
import sys
from datetime import datetime
import src.pre as pre # データ取得準備処理
import src.chk as chk # オブジェクト情報形式チェック処理
import src.date as date # データ取得期間設定処理
import src.fetch as fetch # CRMデータ取得処理
import src.resbk as resbk # CRM電文データバックアップ処理
import src.conv as conv # CSV変換処理
import src.csvbk as csvbk # CSVバックアップ処理
import src.upld as upld # CSVアップロード処理
import src.upd as upd # 前回取得日時ファイル更新
import src.end as end # 取得処理実施結果アップロード処理
from src.utils.logger import Logger
import src.utils.data_retention as data_retention
# 変数
# logger設定
logger = Logger().get_logger()
# 処理
def main() -> None:
"""コントロール処理
各処理を呼び出すコントローラー
"""
try:
# ① CRMデータ取得処理開始ログを出力する
logger.info('I-CTRL-01 CRMデータ取得処理を開始します')
# ② データ取得準備処理を呼び出す
try:
logger.info('I-CTRL-02 データ取得準備処理呼び出し')
crm_object_info_var = data_retention.CrmObjectInfoVar()
crm_object_info_dict, execute_datetime, process_result_per_object = pre(
crm_object_info_var)
except Exception as e:
logger.error('')
raise CustomException(e)
# ③ object_infoのobjectsキーの値の件数分ループする
logger.info('I-CTRL-03 取得対象オブジェクトのループ処理開始')
for object_info in crm_object_info_dict.get('objects'):
try:
# 1. オブジェクト情報形式チェック処理を呼び出す
logger.info('I-CTRL-04 オブジェクト情報形式チェック処理呼び出し')
try:
chk()
except Exception as e:
logger.error('')
# 2. 処理対象のオブジェクト名をログ出力する
logger.info('I-CTRL-05 ')
# 3. オブジェクト情報.is_skipがTrueの場合、次のオブジェクトの処理に移行する
logger.info('I-CTRL-06 ')
# 4. データ取得期間設定処理を呼び出す
logger.info('I-CTRL-07 ')
try:
except Exception as e:
logger.error('')
# 5. CRMデータ取得処理を呼び出す
logger.info('I-CTRL-08 ')
try:
except Exception as e:
logger.error('')
# 6. CRMデータのジェネレータをループする
# 6-1. 出力ファイル名を生成する
logger.info('I-CTRL-10 ')
try:
except Exception as e:
logger.error('')
# 6-2. CRM電文データバックアップ処理を呼び出す
logger.info('I-CTRL-11 ')
try:
except Exception as e:
logger.error('')
# 6-3. CSV変換処理を呼び出す
logger.info('I-CTRL-12 ')
try:
except Exception as e:
logger.error('')
# 6-4. CSVバックアップ処理を呼び出す
logger.info('I-CTRL-13 ')
try:
except Exception as e:
logger.error('')
# 6-5. CSVアップロード処理を呼び出す
logger.info('I-CTRL-14 ')
try:
except Exception as e:
logger.error('')
# 7. 前回取得日時ファイル更新処理を呼びだす
logger.info('I-CTRL-15 ')
try:
except Exception as e:
logger.error('')
# 8. 1オブジェクトのアップロードが完了した旨をログに出力する
logger.info('I-CTRL-16 ')
except GetObjectException as e:
logger.error(
'[取得対象のオブジェクト名] の[処理名]でエラーが発生しました 次のオブジェクトの処理に移行します')
except Exception as e:
logger.error(
'[取得対象のオブジェクト名] の処理中に予期せぬエラーが発生しました 次のオブジェクトの処理に移行します')
# ④ すべてのオブジェクトの処理が完了したことと、オブジェクト毎の処理結果をログに出力する
try:
logger.info(f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{実行結果JSON}]')
except Exception as e:
logger.error('')
raise CustomException()
# ⑤ 取得処理実施結果アップロード処理を呼び出す
try:
logger.info('I-CTRL-18 CRM_取得処理実施結果ファイルアップロード処理開始')
except Exception as e:
logger.error('')
raise CustomException()
# ⑥ CRMデータ取得処理終了ログを出力する
try:
logger.info('I-CTRL-19 CRMデータ取得処理を終了します')
except Exception as e:
logger.error('')
raise
except CustomException as e:
logger.error('I-ERR-01 [処理名]でエラーが発生したため、処理を終了します')
except Exception as e:
logger.error('I-ERR-02 予期せぬエラーが発生したため、処理を終了します')
# カスタムExceptionクラス
class CustomException(Exception, metaclass=ABCMeta):
def __init__(self, id, arg):
self.arg = arg
self.id = id
class GetObjectException(CustomException):
pass
if __name__ == '__main__':
main()

View File

View File

View File

View File

View File

View File

View File

View File

@ -0,0 +1,96 @@
# モジュールロード
from datetime import datetime
from src.utils.logger import Logger
import src.utils.data_retention as data_retention
import src.utils.s3_access as s3_access
import src.utils.json_parser as json_parser
# 変数
# .000ZはUTCを表す。ミリ秒までの考慮は不要なので固定で指定
YYYYMMDDTHHMMSSTZ = '%Y-%m-%dT%H:%M:%S.000Z'
CRM_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S.000%z'
YYYYMMDDHHMMSS = '%Y-%m-%d %H:%M:%S'
# logger設定
logger = Logger().get_logger()
# 処理
def pre(crm_object_info_var):
try:
# ① データ取得準備処理の開始ログを出力する
logger.info('I-PRE-01 データ取得準備処理を開始します')
# ② 取得処理開始年月日時分秒を控える
execute_datetime = datetime.now().strftime(YYYYMMDDTHHMMSSTZ)
logger.info(f'I-PRE-02 データ取得処理開始日時:{execute_datetime}')
# ⑤ S3 設定ファイル保管用バケットから、CRM_取得オブジェクト情報ファイルを取得する
try:
crm_object_info_var = data_retention.CrmObjectInfoVar()
s3_fullpath = f's3://{crm_object_info_var.bucket}/{crm_object_info_var.file_name_key}'
logger.debug(f'D-PRE-03 CRM_取得オブジェクト情報ファイルの取得開始します ファイルパス:[{s3_fullpath}]')
get_file_s3 = s3_access.GetFileS3()
crm_object_info_json = get_file_s3.get_file_from_s3(crm_object_info_var.bucket, crm_object_info_var.file_name_key)
logger.debug('D-PRE-04 CRM_取得オブジェクト情報ファイルの取得成功しました')
except Exception as e:
logger.error(f'E-PRE-01 CRM_取得オブジェクト情報ファイルが存在しません ファイル名[{s3_fullpath}]', e)
raise e
# ⑥ CRM_取得オブジェクト情報ファイルをパースし、メモリ上に展開する
try:
logger.debug('D-PRE-05 CRM_取得オブジェクト情報ファイルをパースします')
json_exclusde_comment = json_parser.JsonExculudeComment()
crm_object_info_dict: dict = json_exclusde_comment(crm_object_info_json)
logger.debug('D-PRE-06 CRM_取得オブジェクト情報ファイルのパースに成功しました')
except Exception as e:
logger.error(f'E-PRE-02 CRM_取得オブジェクト情報ファイルのパースに失敗しました エラー内容[{e}]')
raise e
# ⑦ メモリ上のCRM_取得オブジェクト情報のキーobjectsの形式チェックを行う
try:
logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェックを開始します')
if 'objects' not in crm_object_info_dict:
logger.error(f'E-PRE-03 CRM_取得オブジェクト情報ファイル「objects」キーは必須です ファイル名[{s3_fullpath}]')
raise e
if not isinstance(crm_object_info_dict["objects"], list):
logger.error(f'E-PRE-04 CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名[{s3_fullpath}]')
raise e
logger.debug('D-PRE-07 CRM_取得オブジェクト情報ファイルの形式チェック 正常終了')
except Exception as e:
logger.error(f'CRM_取得オブジェクト情報ファイル「objects」キーの値は配列でなければなりません ファイル名[{s3_fullpath}]', e)
raise e
# ⑧ 処理結果出力用のマップを初期化
process_result_per_object = {}
for sfdc_object in crm_object_info_dict.get('objects'):
object_name = sfdc_object.get('object_name')
process_result_per_object[object_name] = 'fail'
# ⑨ データ取得準備処理の終了ログを出力する
logger.info('I-PRE-09 データ取得準備処理を終了します')
# ⑩ 次の処理へ移行する
return(crm_object_info_dict, execute_datetime, process_result_per_object)
except Exception as e:
logger.error(f'データ取得準備処理で想定外のエラーが発生しました エラー内容:[{e}]', e)
raise e

View File

View File

View File

View File

View File

@ -0,0 +1,169 @@
# モジュールロード
import dataclasses
#from fileinput import filename
import os
#from tracemalloc import DomainFilter
# 変数
# ログ出力レベル。DEBUG, INFO, WARNING, ERRORの4つから指定する
LOG_LEVEL = os.environ["LOG_LEVEL"]
CRM_AUTH_TIMEOUT = os.environ["CRM_AUTH_TIMEOUT"] # CRMへの認証処理のタイムアウト秒数
# CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_MAX_RETRY_ATTEMPT = os.environ["CRM_AUTH_MAX_RETRY_ATTEMPT"]
# CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_INTERVAL = os.environ["CRM_AUTH_RETRY_INTERVAL"]
# CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = os.environ["CRM_AUTH_RETRY_MIN_INTERVAL"]
# CRMへの認証処理のリトライ時の最大待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = os.environ["CRM_AUTH_RETRY_MAX_INTERVAL"]
# CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_TIMEOUT = os.environ["CRM_GET_RECORD_COUNT_TIMEOUT"]
# CRMのレコード件数取得処理の最大リトライ試行回数
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT = os.environ["CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT"]
# CRMのレコード件数取得処理のリトライ時の初回待ち秒数
CRM_GET_RECORD_COUNT_RETRY_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_INTERVAL"]
# CRMのレコード件数取得処理のリトライ時の最小待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL"]
# CRMのレコード件数取得処理のリトライ時の最大待ち秒数
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL = os.environ["CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL"]
# CRMのレコード取得処理のタイムアウト秒数
CRM_FETCH_RECORD_TIMEOUT = os.environ["CRM_FETCH_RECORD_TIMEOUT"]
# CRMのレコード取得処理の最大リトライ試行回数
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT = os.environ["CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT"]
# CRMのレコード取得処理のリトライ時の初回待ち秒数
CRM_FETCH_RECORD_RETRY_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_INTERVAL"]
# CRMのレコード取得処理のリトライ時の最小待ち秒数
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MIN_INTERVAL"]
# CRMのレコード取得処理のリトライ時の最大待ち秒数
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL = os.environ["CRM_FETCH_RECORD_RETRY_MAX_INTERVAL"]
CRM_AUTH_DOMAIN = os.environ["CRM_AUTH_DOMAIN"] # CRMのAPI実行のための認証エンドポイントのドメイン
CRM_USER_NAME = os.environ["CRM_USER_NAME"] # CRMのAPI実行用ユーザ名
CRM_USER_PASSWORD = os.environ["CRM_USER_PASSWORD"] # CRMのAPI実行用ユーザパスワード
# CRMのAPI実行用ユーザのセキュリティトークン
CRM_USER_SECURITY_TOKEN = os.environ["CRM_USER_SECURITY_TOKEN"]
# CRMデータ取得用の設定ファイルを格納するバケット名
CRM_CONFIG_BUCKET = os.environ["CRM_CONFIG_BUCKET"]
CRM_BACKUP_BUCKET = os.environ["CRM_BACKUP_BUCKET"] # CRMのバックアップデータを格納するバケット名
IMPORT_DATA_BUCKET = os.environ["IMPORT_DATA_BUCKET"] # CRMの取込データを格納するバケット名
# TASK_SETTING_FOLDER = os.environ["TASK_SETTING_FOLDER"] # CRM取得処理タスクの設定ファイルを格納するフォルダパス
# TASK_SETTING_FILENAME = os.environ["TASK_SETTING_FILENAME"] # CRM取得処理タスクの設定ファイルのファイル名
# CRM取得対象オブジェクトの情報を格納するフォルダパス
OBJECT_INFO_FOLDER = os.environ["OBJECT_INFO_FOLDER"]
# CRM取得対象オブジェクトの情報のファイル名
OBJECT_INFO_FILENAME = os.environ["OBJECT_INFO_FILENAME"]
# CRMデータ取得結果を格納するフォルダパス
FETCH_RESULT_FOLDER = os.environ["FETCH_RESULT_FOLDER"]
# CRMデータ取得結果を格納するファイル名
FETCH_RESULT_FILENAME = os.environ["FETCH_RESULT_FILENAME"]
# CRMからの最終取得日時ファイルを格納するフォルダパス
LAST_FETCH_DATE_FOLDER = os.environ["LAST_FETCH_DATE_FOLDER"]
# CRMから取得し、取込用に変換したデータを格納するフォルダ
CRM_IMPORT_DATA_FOLDER = os.environ["CRM_IMPORT_DATA_FOLDER"]
# CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ["LAST_FETCH_DATE_BACKUP_FOLDER"]
# CRMから取得した生データのバックアップを格納するフォルダパス
FETCH_DATA_BACKUP_FOLDER = os.environ["FETCH_DATA_BACKUP_FOLDER"]
# CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ["CRM_IMPORT_DATA_BACKUP_FOLDER"]
# 処理
@dataclasses.dataclass
class LogLevel:
loglevel: str = LOG_LEVEL
@dataclasses.dataclass
class CrmAuthVar:
auth_domain: str = CRM_AUTH_DOMAIN
user_name: str = CRM_USER_NAME
user_password: str = CRM_USER_PASSWORD
security_token: str = CRM_USER_SECURITY_TOKEN
timeout: int = CRM_AUTH_TIMEOUT
max_retry_attempt: int = CRM_AUTH_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_AUTH_RETRY_INTERVAL
retry_min_interval: int = CRM_AUTH_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_AUTH_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmGetRecordCountVar:
timeout: int = CRM_GET_RECORD_COUNT_TIMEOUT
max_retry_attempt: int = CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_GET_RECORD_COUNT_RETRY_INTERVAL
retry_min_interval: int = CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmFetchRecordVar:
timeout: int = CRM_FETCH_RECORD_TIMEOUT
max_retry_attempt: int = CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT
retry_interval: int = CRM_FETCH_RECORD_RETRY_INTERVAL
retry_min_interval: int = CRM_FETCH_RECORD_RETRY_MIN_INTERVAL
retry_max_interval: int = CRM_FETCH_RECORD_RETRY_MAX_INTERVAL
@dataclasses.dataclass
class CrmBucketVar:
config_bucket: str = CRM_CONFIG_BUCKET
backup_bucket: str = CRM_BACKUP_BUCKET
data_bucket: str = IMPORT_DATA_BUCKET
@dataclasses.dataclass
class CrmObjectInfoVar:
bucket: str = CRM_CONFIG_BUCKET
folder: str = OBJECT_INFO_FOLDER
file_name: str = OBJECT_INFO_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class FetchResultVar:
folder: str = FETCH_RESULT_FOLDER
file_name: str = FETCH_RESULT_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class FetchResultVar:
folder: str = FETCH_RESULT_FOLDER
file_name: str = FETCH_RESULT_FILENAME
file_name_key: str = f'{folder}/{file_name}'
@dataclasses.dataclass
class LastFetchDateVar:
folder: str = LAST_FETCH_DATE_FOLDER
@dataclasses.dataclass
class CrmImportDataVar:
folder: str = CRM_IMPORT_DATA_FOLDER
@dataclasses.dataclass
class LastFetchDateBackupVar:
folder: str = LAST_FETCH_DATE_BACKUP_FOLDER
@dataclasses.dataclass
class FetchDataBackupVar:
folder: str = FETCH_DATA_BACKUP_FOLDER
@dataclasses.dataclass
class CrmImportDataBackupVar:
folder: str = CRM_IMPORT_DATA_BACKUP_FOLDER

View File

@ -0,0 +1,28 @@
# モジュールロード
import re
import json
# 変数
SYMBOL = ['#', '/']
# 処理
class JsonExculudeComment:
def __init__(self) -> None:
pass
def json_exclude_comment(self, json_str) -> dict:
for symbol in SYMBOL:
# コメントアウトシンボルを含む部分を置き換える正規表現
replace_comment_regex = rf'\s(?!\"){symbol}[\s\S]*?.*'
json_str = self.json_regex_parse(replace_comment_regex, json_str)
result = json.loads(json_str)
return result
def json_regex_parse(replace_comment_regex, json_str) -> str:
json_without_comment = re.sub(replace_comment_regex, '', json_str)
return json_without_comment

View File

@ -0,0 +1,33 @@
import logging
import os
LOG_LEVEL = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warn': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG
}
class Logger():
__logger: logging.Logger
def __init__(self):
# logger設定
self.__logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
handler = logging.StreamHandler()
self.__logger.addHandler(handler)
for handler in self.__logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(os.environ.get('LOG_LEVEL', 'WARN').upper())
if not isinstance(level, int):
level = logging.INFO
self.__logger.setLevel(level)
def get_logger(self) -> logging.Logger:
return self.__logger

View File

@ -0,0 +1,48 @@
# モジュールロード
import boto3
from tenacity import retry, stop_after_attempt
from tenacity.wait import wait_exponential
# 変数
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
# 処理
class UploadFileS3:
def __init__(self, bucket_name, local_file_path, file_name_key) -> None:
self.bucket_name: str = bucket_name
self.local_file_path: str = local_file_path
self.file_name_key: str = file_name_key
def upload_file_to_s3(self) -> None:
bucket = s3_resource.Bucket(self.bucket_name)
bucket.upload_file(self.local_file_path, self.file_name_key)
class GetFileS3:
def __init__(self, bucket_name, file_name_key, multiplier=5, min=5, max=50, count=3) -> None:
self.bucket_name: str = bucket_name
self.file_name_key: str = file_name_key
self.multiplier: int = multiplier
self.min: int = min
self.max: int = max
self.count: int = count
def get_file_from_s3(self) -> str:
bucket = s3_resource.Bucket(self.bucket_name)
response = bucket.Object(self.file_name_key).get()
body = response['Body'].read()
return body.decode('utf-8')
def get_file_from_s3_with_retry(self) -> str:
@retry(wait=wait_exponential(multiplier=self.multiplier, min=self.min, max=self.max), stop=stop_after_attempt(self.count))
def get_file_from_s3_with_retry_deco(*args, **kwargs) -> str:
return self.get_file_from_s3(self.bucket_name, self.file_name_key)
response = get_file_from_s3_with_retry_deco(self)
return response

View File

@ -0,0 +1,41 @@
{
"objects": [
{
#
"object_name": "Account",
"columns": [
"Id",
"Name",
"SystemModStamp",
"NumberOfEmployees",
"IsDeleted"
]
},
{
"object_name": "Territory2",
"is_skip": false,
"can_update_last_update": false,
"upload_file_name": "CRM_Territory2_All_{execute_datetime}_{split_csv_suffix_number:03}",
"last_fetch_datetime_file_name": "Territory2.json",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"SystemModStamp"
]
},
{
"object_name": "AccountShare",
"is_skip": false,
"datetime_column": "LastModifiedDate",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"LastModifiedDate"
]
}
]
}

View File

@ -0,0 +1,16 @@
LOG_LEVEL=INFO
CRM_AUTH_TIMEOUT=100
CRM_AUTH_MAX_RETRY_ATTEMPT=3
CRM_AUTH_RETRY_INTERVAL=5
CRM_AUTH_RETRY_MIN_INTERVAL=5
CRM_AUTH_RETRY_MAX_INTERVAL=50
CRM_GET_RECORD_COUNT_TIMEOUT=300
CRM_GET_RECORD_COUNT_MAX_RETRY_ATTEMPT=3
CRM_GET_RECORD_COUNT_RETRY_INTERVAL=5
CRM_GET_RECORD_COUNT_RETRY_MIN_INTERVAL=5
CRM_GET_RECORD_COUNT_RETRY_MAX_INTERVAL=50
CRM_FETCH_RECORD_TIMEOUT=300
CRM_FETCH_RECORD_MAX_RETRY_ATTEMPT=3
CRM_FETCH_RECORD_RETRY_INTERVAL=5
CRM_FETCH_RECORD_RETRY_MIN_INTERVAL=5
CRM_FETCH_RECORD_RETRY_MAX_INTERVAL=50