newdwh2021/ecs/crm-datafetch/tests/test_walk_through.py
2022-08-18 16:57:37 +09:00

93 lines
3.7 KiB
Python

import json
import os
import os.path as path
import pytest
from src.controller import controller
ROOT_DIR = path.abspath(path.dirname(__name__))
DATA_BUCKET = 'mbj-newdwh2021-staging-data'
CONFIG_BUCKET = 'mbj-newdwh2021-staging-config'
BACKUP_BUCKET = 'mbj-newdwh2021-staging-backup-crm'
TARGET_FOLDER = 'crm/target'
OBJECT_INFO_FOLDER = 'crm/object_info'
LAST_FETCH_DATETIME_INFO_FOLDER = 'crm/last_fetch_datetime'
BACKUP_DATA_IMPORT_FOLDER = 'data_import'
BACKUP_RESPONSE_JSON_FOLDER = 'response_json'
@pytest.fixture
def s3_test(s3_client):
s3_client.create_bucket(Bucket=DATA_BUCKET)
s3_client.create_bucket(Bucket=CONFIG_BUCKET)
s3_client.create_bucket(Bucket=BACKUP_BUCKET)
yield
@pytest.mark.walk_through
def test_walk_through(s3_test, s3_client, monkeypatch, caplog):
# Arrange
# バケットにファイルをアップロードしていく
object_info_list = get_object_config_list('object_info')
for object_info in object_info_list:
json_file = read_json(object_info)
json_file = to_upload_json(json_file)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{OBJECT_INFO_FOLDER}/{path.basename(object_info)}')
last_fetch_datetime_list = get_object_config_list('last_fetch_datetime')
for last_fetch_datetime in last_fetch_datetime_list:
json_file = read_json(last_fetch_datetime)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{LAST_FETCH_DATETIME_INFO_FOLDER}/{path.basename(last_fetch_datetime)}')
# 環境変数を設定(CRMの認証情報は別途設定しておくこと)
monkeypatch.setattr('src.aws.s3.IMPORT_DATA_BUCKET', DATA_BUCKET)
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_FOLDER', TARGET_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_CONFIG_BUCKET', CONFIG_BUCKET)
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FOLDER', OBJECT_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FILENAME', 'crm_object_list_diff.json')
monkeypatch.setattr('src.aws.s3.LAST_FETCH_DATE_FOLDER', LAST_FETCH_DATETIME_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_BACKUP_BUCKET', BACKUP_BUCKET)
monkeypatch.setattr('src.upload_result_data_process.PROCESS_RESULT_FILENAME', 'process_result')
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_BACKUP_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.PROCESS_RESULT_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.RESPONSE_JSON_BACKUP_FOLDER', BACKUP_RESPONSE_JSON_FOLDER)
monkeypatch.setattr('src.salesforce.soql_builder.FETCH_LIMIT_CLAUSE', ' LIMIT 10')
# 件数取得はログ出力用なので、0件が返るようにする
monkeypatch.setattr('src.fetch_crm_data_process.fetch_record_count_retry', lambda x, y, z: 0)
# Act
controller()
print(caplog.messages)
assert 0
# Assertion
def get_object_config_list(folder_name: str):
local_s3_path = path.join(ROOT_DIR, '..', '..', 's3', 'config', 'crm', folder_name)
config_list = [os.path.join(local_s3_path, config) for config in os.listdir(local_s3_path) if config.endswith('.json')]
return config_list
def read_json(json_path):
with open(json_path, 'r', encoding='utf8') as f:
json_file = json.load(f)
return json_file
def upload_json(json_file, s3_client, bucket, folder):
json_str = json.dumps(json_file)
s3_client.put_object(Bucket=bucket, Key=folder, Body=json_str)
def to_upload_json(json_file):
for object_info in json_file['objects']:
if object_info['object_name'] != 'User':
continue
columns: list = object_info['columns']
columns.remove('LastPasswordChangeDate')
columns.remove('NumberOfFailedLogins')
columns.remove('UserPreferencesNativeEmailClient')
return json_file