newdwh2021/ecs/crm-datafetch/tests/test_walk_through.py
2022-08-19 12:00:26 +09:00

229 lines
12 KiB
Python

import json
import os
import os.path as path
from datetime import datetime, timezone
import pytest
from src.controller import controller
from src.parser.json_parser import JsonParser
from src.system_var.constants import YYYYMMDDTHHMMSSTZ
from src.util.execute_datetime import ExecuteDateTime
ROOT_DIR = path.abspath(path.dirname(__name__))
DATA_BUCKET = 'mbj-newdwh2021-staging-data'
CONFIG_BUCKET = 'mbj-newdwh2021-staging-config'
BACKUP_BUCKET = 'mbj-newdwh2021-staging-backup-crm'
TARGET_FOLDER = 'crm/target'
OBJECT_INFO_FOLDER = 'crm/object_info'
LAST_FETCH_DATETIME_INFO_FOLDER = 'crm/last_fetch_datetime'
BACKUP_DATA_IMPORT_FOLDER = 'data_import'
BACKUP_RESPONSE_JSON_FOLDER = 'response_json'
@pytest.fixture
def s3_test(s3_client):
s3_client.create_bucket(Bucket=DATA_BUCKET)
s3_client.create_bucket(Bucket=CONFIG_BUCKET)
s3_client.create_bucket(Bucket=BACKUP_BUCKET)
yield
@pytest.mark.walk_through
def test_walk_through(s3_test, s3_client, monkeypatch, caplog):
# Arrange
# バケットにファイルをアップロードしていく
object_info_files = []
object_info_list = get_object_config_list('object_info')
for object_info in object_info_list:
json_file = read_json(object_info)
json_file = to_upload_json(json_file)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{OBJECT_INFO_FOLDER}/{path.basename(object_info)}')
object_info_files.append(json_file)
last_fetch_datetime_list = get_object_config_list('last_fetch_datetime')
for last_fetch_datetime in last_fetch_datetime_list:
json_file = read_json(last_fetch_datetime)
upload_json(json_file, s3_client, CONFIG_BUCKET, f'{LAST_FETCH_DATETIME_INFO_FOLDER}/{path.basename(last_fetch_datetime)}')
# 環境変数を設定(CRMの認証情報は別途設定しておくこと)
set_environment(monkeypatch)
# 差分取得用に環境変数を設定
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FILENAME', 'crm_object_list_diff.json')
# 一気通貫テスト用に件数の制限をかける
monkeypatch.setattr('src.salesforce.soql_builder.FETCH_LIMIT_CLAUSE', ' LIMIT 10')
# 件数取得はログ出力用なので、0件が返るようにする
monkeypatch.setattr('src.fetch_crm_data_process.fetch_record_count_retry', lambda x, y, z: 10)
# 実行日時を固定する
now = datetime.now(timezone.utc).strftime(YYYYMMDDTHHMMSSTZ)
format_now = datetime.strptime(now, YYYYMMDDTHHMMSSTZ).strftime('%Y%m%d%H%M%S')
class MockExecuteDateTime(ExecuteDateTime):
def __init__(self):
super().__init__()
self._ExecuteDateTime__execute_datetime = now
monkeypatch.setattr('src.prepare_data_fetch_process.ExecuteDateTime', MockExecuteDateTime)
# Act
controller()
# Assertion
log_messages = caplog.messages
# ループ前のログ確認
assert 'I-CTRL-01 CRMデータ取得処理を開始します' in log_messages
assert 'I-CTRL-02 データ取得準備処理呼び出し' in log_messages
assert_prepare_process_log(log_messages, now)
assert 'I-CTRL-03 取得対象オブジェクトのループ処理開始' in log_messages
# オブジェクト情報を取得する(diff)
object_info_list = object_info_files[0]
for object_info in object_info_list['objects']:
target_object_name = object_info['object_name']
upload_file_name = f'CRM_{target_object_name}_{format_now}'
assert 'I-CTRL-05 オブジェクト情報形式チェック処理呼び出し' in log_messages
assert_check_process_log(log_messages)
assert f'I-CTRL-06 [{target_object_name}]のデータ取得を開始します' in log_messages
assert f'I-CTRL-08 [{target_object_name}]のデータ取得期間設定処理呼び出し' in log_messages
assert_period_process_log(log_messages, target_object_name, target_object_name, now)
assert f'I-CTRL-09 [{target_object_name}]のデータ取得処理呼び出し' in log_messages
assert_fetch_process_log(log_messages, target_object_name)
assert f'I-CTRL-10 [{target_object_name}] の出力ファイル名は [{upload_file_name}] となります' in log_messages
assert f'I-CTRL-11 [{target_object_name}] CRM電文データバックアップ処理呼び出し' in log_messages
assert_backup_response_process_log(log_messages, target_object_name)
assert f'I-CTRL-12 [{target_object_name}] CSV変換処理呼び出し' in log_messages
assert_convert_process_log(log_messages, target_object_name)
assert f'I-CTRL-13 [{target_object_name}] CSVデータバックアップ処理呼び出し' in log_messages
assert_backup_csv_process_log(log_messages, target_object_name, upload_file_name)
assert f'I-CTRL-14 [{target_object_name}] CSVデータアップロード処理呼び出し' in log_messages
assert_upload_csv_process_log(log_messages, target_object_name, upload_file_name)
assert f'I-CTRL-15 [{target_object_name}] 前回取得日時ファイル更新処理呼び出し' in log_messages
assert_upload_fetch_datetime_process_log(log_messages, target_object_name, False)
assert f'I-CTRL-16 [{target_object_name}] 処理正常終了' in log_messages
# ループ終了後のログの確認
process_result_json = {obj['object_name']: 'success' for obj in object_info_list['objects']}
assert f'I-CTRL-17 すべてのオブジェクトの処理が終了しました 実行結果:[{process_result_json}]' in log_messages
assert f'I-CTRL-18 CRM_取得処理実施結果ファイルアップロード処理開始' in log_messages
assert f'I-CTRL-19 すべてのデータの取得に成功しました' in log_messages
assert f'I-CTRL-20 CRMデータ取得処理を終了します' in log_messages
assert 0
"""
以下、アサーション関数
"""
def assert_prepare_process_log(log_messages, now):
assert 'I-PRE-01 データ取得準備処理を開始します' in log_messages
assert f'I-PRE-02 データ取得処理開始日時:{now}' in log_messages
assert 'I-PRE-09 データ取得準備処理を終了します' in log_messages
def assert_check_process_log(log_messages):
assert 'I-CHK-01 オブジェクト情報形式チェック処理を開始します' in log_messages
assert 'I-CHK-02 オブジェクト情報形式チェック処理を終了します' in log_messages
def assert_period_process_log(log_messages, target_object_name, datetime_file_name, now):
assert f'I-DATE-01 [{target_object_name}] のデータ取得期間設定処理を開始します' in log_messages
assert f'I-DATE-02 前回取得日時ファイルの取得開始します ファイルパス:[s3://{CONFIG_BUCKET}/{LAST_FETCH_DATETIME_INFO_FOLDER}/{datetime_file_name}.json]' in log_messages
assert 'I-DATE-03 前回取得日時ファイルの取得成功しました' in log_messages
assert f'I-DATE-06 取得範囲 From: [1900-01-01T00:00:00.000Z] To: [{now}]' in log_messages
assert f'I-DATE-07 [{target_object_name}] のデータ取得期間設定処理を終了します' in log_messages
def assert_fetch_process_log(log_messages, target_object_name):
assert f'I-FETCH-01 [{target_object_name}] のCRMからのデータ取得処理を開始します' in log_messages
assert f'I-FETCH-02 [{target_object_name}] の件数取得を開始します' in log_messages
assert f'I-FETCH-03 [{target_object_name}] の件数:[10]' in log_messages
assert f'I-FETCH-04 [{target_object_name}] のレコード取得を開始します' in log_messages
assert f'I-FETCH-05 [{target_object_name}] のレコード取得が成功しました' in log_messages
assert f'I-FETCH-06 [{target_object_name}] のCRMからのデータ取得処理を終了します' in log_messages
def assert_backup_response_process_log(log_messages, target_object_name):
assert f'I-RESBK-01 [{target_object_name}] のCRM電文データバックアップ処理を開始します' in log_messages
assert f'I-RESBK-03 [{target_object_name}] のCRM電文データバックアップ処理を終了します' in log_messages
def assert_convert_process_log(log_messages, target_object_name):
assert f'I-CONV-01 [{target_object_name}] のCSV変換処理を開始します' in log_messages
assert f'I-CONV-03 [{target_object_name}] のCSV変換処理を終了します' in log_messages
def assert_backup_csv_process_log(log_messages, target_object_name, upload_file_name):
assert f'I-CSVBK-01 [{target_object_name}] のCSVデータのバックアップ処理を開始します ファイル名:[{upload_file_name}.csv]' in log_messages
assert f'I-CSVBK-03 [{target_object_name}] のCSVデータのバックアップ処理を終了します' in log_messages
def assert_upload_csv_process_log(log_messages, target_object_name, upload_file_name):
assert f'I-UPLD-01 [{target_object_name}] のCSVデータアップロード処理を開始します ファイル名:[{upload_file_name}.csv]' in log_messages
assert f'I-UPLD-03 [{target_object_name}] のCSVデータのアップロード処理を終了します' in log_messages
def assert_upload_fetch_datetime_process_log(log_messages, target_object_name, is_all=False):
assert f'I-UPD-01 [{target_object_name}] の前回取得日時ファイルの更新処理を開始します' in log_messages
if is_all:
assert f'I-UPD-02 [{target_object_name}] の前回取得日時ファイルの更新処理をスキップします' in log_messages
assert f'I-UPD-04 [{target_object_name}] の前回取得日時ファイルの更新処理を終了します' not in log_messages
else:
assert f'I-UPD-02 [{target_object_name}] の前回取得日時ファイルの更新処理をスキップします' not in log_messages
assert f'I-UPD-04 [{target_object_name}] の前回取得日時ファイルの更新処理を終了します' in log_messages
"""
以下、取得準備関数
"""
def get_object_config_list(folder_name: str):
local_s3_path = path.join(ROOT_DIR, '..', '..', 's3', 'config', 'crm', folder_name)
config_list = [os.path.join(local_s3_path, config) for config in os.listdir(local_s3_path) if config.endswith('.json')]
return config_list
def read_json(json_path):
with open(json_path, 'r', encoding='utf8') as f:
json_str = f.read()
JsonParser
json_file = JsonParser(json_str).parse()
return json_file
def upload_json(json_file, s3_client, bucket, folder):
json_str = json.dumps(json_file)
s3_client.put_object(Bucket=bucket, Key=folder, Body=json_str)
def to_upload_json(json_file):
"""Userオブジェクトの取得できないプロパティを取り除く
TODO: Userオブジェクトの恒久対応が確定したらこのメソッドは消す
"""
for object_info in json_file['objects']:
if object_info['object_name'] != 'User':
continue
columns: list = object_info['columns']
columns.remove('LastPasswordChangeDate')
columns.remove('NumberOfFailedLogins')
columns.remove('UserPreferencesNativeEmailClient')
return json_file
def set_environment(monkeypatch):
# 環境変数を設定(CRMの認証情報は別途設定しておくこと)
monkeypatch.setattr('src.aws.s3.IMPORT_DATA_BUCKET', DATA_BUCKET)
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_FOLDER', TARGET_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_CONFIG_BUCKET', CONFIG_BUCKET)
monkeypatch.setattr('src.aws.s3.OBJECT_INFO_FOLDER', OBJECT_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.LAST_FETCH_DATE_FOLDER', LAST_FETCH_DATETIME_INFO_FOLDER)
monkeypatch.setattr('src.aws.s3.CRM_BACKUP_BUCKET', BACKUP_BUCKET)
monkeypatch.setattr('src.upload_result_data_process.PROCESS_RESULT_FILENAME', 'process_result')
monkeypatch.setattr('src.copy_crm_csv_data_process.CRM_IMPORT_DATA_BACKUP_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.set_datetime_period_process.CRM_CONFIG_BUCKET', CONFIG_BUCKET)
monkeypatch.setattr('src.aws.s3.CRM_IMPORT_DATA_BACKUP_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.PROCESS_RESULT_FOLDER', BACKUP_DATA_IMPORT_FOLDER)
monkeypatch.setattr('src.aws.s3.RESPONSE_JSON_BACKUP_FOLDER', BACKUP_RESPONSE_JSON_FOLDER)