feat: とりあえず流れきるところまで。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2022-08-19 12:00:26 +09:00
parent e7aea6c818
commit 75274f60fa
3 changed files with 105 additions and 14 deletions

View File

@ -1,6 +1,7 @@
import textwrap
from src.config.objects import LastFetchDatetime, TargetObject
from src.system_var.environments import FETCH_LIMIT_CLAUSE
class SOQLBuilder:
@ -9,6 +10,7 @@ class SOQLBuilder:
SELECT {column_or_expression} FROM {object_name}
WHERE {datetime_column} > {last_fetch_datetime_from}
AND {datetime_column} <= {last_fetch_datetime_to}
{limit_clause}
""")
self.__target_object = target_object
self.__last_fetch_datetime = last_fetch_datetime
@ -19,7 +21,8 @@ class SOQLBuilder:
object_name=self.__target_object.object_name,
last_fetch_datetime_from=self.__last_fetch_datetime.last_fetch_datetime_from,
last_fetch_datetime_to=self.__last_fetch_datetime.last_fetch_datetime_to,
datetime_column=self.__target_object.datetime_column
datetime_column=self.__target_object.datetime_column,
limit_clause=''
)
return count_soql
@ -31,7 +34,8 @@ class SOQLBuilder:
object_name=self.__target_object.object_name,
last_fetch_datetime_from=self.__last_fetch_datetime.last_fetch_datetime_from,
last_fetch_datetime_to=self.__last_fetch_datetime.last_fetch_datetime_to,
datetime_column=self.__target_object.datetime_column
datetime_column=self.__target_object.datetime_column,
limit_clause=FETCH_LIMIT_CLAUSE
)
return fetch_soql

View File

@ -7,14 +7,14 @@ import src.system_var.constants as constants
LOG_LEVEL = os.environ.get(constants.LOG_LEVEL, constants.LOG_LEVEL_INFO)
# CRMへの認証処理のタイムアウト秒数
CRM_AUTH_TIMEOUT = int(os.environ.get(constants.CRM_AUTH_TIMEOUT, 100))
# CRMへの認証処理の最大リトライ試行回数
CRM_AUTH_MAX_RETRY_ATTEMPT = int(os.environ.get(constants.CRM_AUTH_MAX_RETRY_ATTEMPT, 3))
# CRMへの認証処理のリトライ時の初回待ち秒数
CRM_AUTH_RETRY_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_INTERVAL, 5))
# CRMへの認証処理のリトライ時の最小待ち秒数
CRM_AUTH_RETRY_MIN_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_MIN_INTERVAL, 5))
# CRMへの認証処理のリトライ時の最大待ち秒数
CRM_AUTH_RETRY_MAX_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_MAX_INTERVAL, 50))
# # CRMへの認証処理の最大リトライ試行回数
# CRM_AUTH_MAX_RETRY_ATTEMPT = int(os.environ.get(constants.CRM_AUTH_MAX_RETRY_ATTEMPT, 3))
# # CRMへの認証処理のリトライ時の初回待ち秒数
# CRM_AUTH_RETRY_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_INTERVAL, 5))
# # CRMへの認証処理のリトライ時の最小待ち秒数
# CRM_AUTH_RETRY_MIN_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_MIN_INTERVAL, 5))
# # CRMへの認証処理のリトライ時の最大待ち秒数
# CRM_AUTH_RETRY_MAX_INTERVAL = int(os.environ.get(constants.CRM_AUTH_RETRY_MAX_INTERVAL, 50))
# CRMのレコード件数取得処理のタイムアウト秒数
CRM_GET_RECORD_COUNT_TIMEOUT = int(os.environ.get(constants.CRM_GET_RECORD_COUNT_TIMEOUT, 300))
# CRMのレコード件数取得処理の最大リトライ試行回数
@ -65,9 +65,11 @@ PROCESS_RESULT_FILENAME = os.environ.get(constants.PROCESS_RESULT_FILENAME, 'pro
LAST_FETCH_DATE_FOLDER = os.environ.get(constants.LAST_FETCH_DATE_FOLDER, 'crm/last_fetch_datetime')
# CRMから取得し、取込用に変換したデータを格納するフォルダ
CRM_IMPORT_DATA_FOLDER = os.environ.get(constants.CRM_IMPORT_DATA_FOLDER, 'crm/target')
# CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
LAST_FETCH_DATE_BACKUP_FOLDER = os.environ.get(constants.LAST_FETCH_DATE_BACKUP_FOLDER, 'last_fetch_datetime')
# # CRMからの最終取得日時ファイルのバックアップを格納するフォルダパス
# LAST_FETCH_DATE_BACKUP_FOLDER = os.environ.get(constants.LAST_FETCH_DATE_BACKUP_FOLDER, 'last_fetch_datetime')
# CRMから取得した生データのバックアップを格納するフォルダパス
RESPONSE_JSON_BACKUP_FOLDER = os.environ.get(constants.RESPONSE_JSON_BACKUP_FOLDER, 'response_json')
# CRMから取得し、取込用に変換したデータのバックアップを格納するフォルダ
CRM_IMPORT_DATA_BACKUP_FOLDER = os.environ.get(constants.CRM_IMPORT_DATA_BACKUP_FOLDER, 'data_import')
# 一気通貫テスト用、取得件数を絞るためのSOQL句
FETCH_LIMIT_CLAUSE = os.environ.get('FETCH_LIMIT_CLAUSE', '')

View File

@ -1,7 +1,92 @@
import json
import os
import os.path as path
import pytest
from src.controller import controller
ROOT_DIR = path.abspath(path.dirname(__name__))
DATA_BUCKET = 'mbj-newdwh2021-staging-data'
CONFIG_BUCKET = 'mbj-newdwh2021-staging-config'
BACKUP_BUCKET = 'mbj-newdwh2021-staging-backup-crm'
TARGET_FOLDER = 'crm/target'
OBJECT_INFO_FOLDER = 'crm/object_info'
LAST_FETCH_DATETIME_INFO_FOLDER = 'crm/last_fetch_datetime'
BACKUP_DATA_IMPORT_FOLDER = 'data_import'
BACKUP_RESPONSE_JSON_FOLDER = 'response_json'
@pytest.fixture
def s3_test(s3_client):
s3_client.create_bucket(Bucket=DATA_BUCKET)
s3_client.create_bucket(Bucket=CONFIG_BUCKET)
s3_client.create_bucket(Bucket=BACKUP_BUCKET)
yield
@pytest.mark.walk_through
def test_walk_through():
def test_walk_through(s3_test, s3_client, monkeypatch, caplog):
# Arrange
# バケットにファイルをアップロードしていく
object_info_list = get_object_config_list('object_info')
for object_info in object_info_list:
json_file = read_json(object_info)
json_file = to_upload_json(json_file)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{OBJECT_INFO_FOLDER}/{path.basename(object_info)}')
last_fetch_datetime_list = get_object_config_list('last_fetch_datetime')
for last_fetch_datetime in last_fetch_datetime_list:
json_file = read_json(last_fetch_datetime)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{LAST_FETCH_DATETIME_INFO_FOLDER}/{path.basename(last_fetch_datetime)}')
# 環境変数を設定(CRMの認証情報は別途設定しておくこと)
monkeypatch.setattr('src.aws.s3.IMPORT_DATA_BUCKET', DATA_BUCKET)
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_FOLDER', TARGET_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_CONFIG_BUCKET', CONFIG_BUCKET)
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FOLDER', OBJECT_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FILENAME', 'crm_object_list_diff.json')
monkeypatch.setattr('src.aws.s3.LAST_FETCH_DATE_FOLDER', LAST_FETCH_DATETIME_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_BACKUP_BUCKET', BACKUP_BUCKET)
monkeypatch.setattr('src.upload_result_data_process.PROCESS_RESULT_FILENAME', 'process_result')
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_BACKUP_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.PROCESS_RESULT_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.RESPONSE_JSON_BACKUP_FOLDER', BACKUP_RESPONSE_JSON_FOLDER)
monkeypatch.setattr('src.salesforce.soql_builder.FETCH_LIMIT_CLAUSE', ' LIMIT 10')
# 件数取得はログ出力用なので、0件が返るようにする
monkeypatch.setattr('src.fetch_crm_data_process.fetch_record_count_retry', lambda x, y, z: 0)
# Act
controller()
print(caplog.messages)
assert 0
pass
# Assertion
def get_object_config_list(folder_name: str):
local_s3_path = path.join(ROOT_DIR, '..', '..', 's3', 'config', 'crm', folder_name)
config_list = [os.path.join(local_s3_path, config) for config in os.listdir(local_s3_path) if config.endswith('.json')]
return config_list
def read_json(json_path):
with open(json_path, 'r', encoding='utf8') as f:
json_file = json.load(f)
return json_file
def upload_json(json_file, s3_client, bucket, folder):
json_str = json.dumps(json_file)
s3_client.put_object(Bucket=bucket, Key=folder, Body=json_str)
def to_upload_json(json_file):
for object_info in json_file['objects']:
if object_info['object_name'] != 'User':
continue
columns: list = object_info['columns']
columns.remove('LastPasswordChangeDate')
columns.remove('NumberOfFailedLogins')
columns.remove('UserPreferencesNativeEmailClient')
return json_file