newdwh2021/ecs/crm-datafetch/tests/config/test_objects_fetch_target_objects.py
2022-08-16 15:24:04 +09:00

468 lines
17 KiB
Python

import pytest
from src.config.objects import FetchTargetObjects
from src.parser.json_parser import JsonParser
class TestFetchTargetObjects():
def test_constructor(self) -> None:
"""
Cases:
インスタンス生成テスト
辞書型のデータに対して、キーがあるかまた、キーの型が正しいかをチェック
Arranges:
- オブジェクト情報文字列を準備する
- オブジェクト情報を辞書型にパースする
Expects:
- 例外が発生しないこと
"""
# Arranges
fetch_objects = '''{
"objects": [
{
"object_name": "AccountShare",
"columns": [
"Id",
"AccountId",
"UserOrGroupId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"RowCause",
"LastModifiedDate",
"LastModifiedById",
"IsDeleted"
],
"is_skip": false,
"is_update_last_fetch_datetime": true,
"datetime_column": "LastModifiedDate"
},
{
"object_name": "Contact",
"columns": [
"Id",
"IsDeleted",
"MasterRecordId",
"AccountId",
"IsPersonAccount",
"LastName",
"FirstName",
"Salutation",
"Name",
"OtherStreet",
"OtherCity",
"OtherState",
"OtherPostalCode",
"OtherCountry",
"OtherLatitude",
"OtherLongitude",
"OtherGeocodeAccuracy",
"OtherAddress",
"MailingStreet",
"MailingCity",
"MailingState",
"MailingPostalCode",
"MailingCountry",
"MailingLatitude",
"MailingLongitude",
"MailingGeocodeAccuracy",
"MailingAddress",
"Phone",
"Fax",
"MobilePhone",
"HomePhone",
"OtherPhone",
"AssistantPhone",
"ReportsToId",
"Email",
"Title",
"Department",
"AssistantName",
"Birthdate",
"Description",
"OwnerId",
"HasOptedOutOfEmail",
"HasOptedOutOfFax",
"DoNotCall",
"CreatedDate",
"CreatedById",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"LastActivityDate",
"LastCURequestDate",
"LastCUUpdateDate",
"MayEdit",
"IsLocked",
"LastViewedDate",
"LastReferencedDate",
"EmailBouncedReason",
"EmailBouncedDate",
"IsEmailBounced",
"PhotoUrl",
"Jigsaw",
"JigsawContactId",
"IndividualId",
"Mobile_ID_vod__c",
"H1Insights__H1_NPI_Value_for_Testing__c",
"H1Insights__H1_Person_ID__c",
"H1Insights__H1_Request_Status__c",
"H1Insights__H1_URL__c",
"H1Insights__NPI_Number__c",
"H1Insights__NPI_Number_for_H1_Insights__c",
"MSJ_Marketing_Cloud_Integration__c"
],
"is_skip": false,
"is_update_last_fetch_datetime": true
},
{
"object_name": "Territory2",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"Territory2ModelId",
"ParentTerritory2Id",
"Description",
"ForecastUserId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"DeveloperName",
"MSJ_Territory_Type__c",
"MSJ_Level__c"
],
"is_skip": false,
"is_update_last_fetch_datetime": true
}
]
}'''
json_parser = JsonParser(fetch_objects)
fetch_objects_dict = json_parser.parse()
# Act
FetchTargetObjects(fetch_objects_dict)
# Expects
pass
def test_raise_constructor_no_key(self) -> None:
"""
Cases:
インスタンス生成テスト
辞書型のデータに対して、必要なキーがない場合、例外が発生すること
Arranges:
- オブジェクト情報文字列を準備する
Expects:
- 例外が発生し期待値と一致する
"""
# Arranges
fetch_objects_dict = {
"test_objects": [
{
"object_name": "AccountShare",
"columns": [
"Id",
"AccountId",
"UserOrGroupId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"RowCause",
"LastModifiedDate",
"LastModifiedById",
"IsDeleted"
],
"is_skip": False,
"is_update_last_fetch_datetime": False,
"datetime_column": "LastModifiedDate"
},
{
"object_name": "Contact",
"columns": [
"Id",
"IsDeleted",
"MasterRecordId",
"AccountId",
"IsPersonAccount",
"LastName",
"FirstName",
"Salutation",
"Name",
"OtherStreet",
"OtherCity",
"OtherState",
"OtherPostalCode",
"OtherCountry",
"OtherLatitude",
"OtherLongitude",
"OtherGeocodeAccuracy",
"OtherAddress",
"MailingStreet",
"MailingCity",
"MailingState",
"MailingPostalCode",
"MailingCountry",
"MailingLatitude",
"MailingLongitude",
"MailingGeocodeAccuracy",
"MailingAddress",
"Phone",
"Fax",
"MobilePhone",
"HomePhone",
"OtherPhone",
"AssistantPhone",
"ReportsToId",
"Email",
"Title",
"Department",
"AssistantName",
"Birthdate",
"Description",
"OwnerId",
"HasOptedOutOfEmail",
"HasOptedOutOfFax",
"DoNotCall",
"CreatedDate",
"CreatedById",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"LastActivityDate",
"LastCURequestDate",
"LastCUUpdateDate",
"MayEdit",
"IsLocked",
"LastViewedDate",
"LastReferencedDate",
"EmailBouncedReason",
"EmailBouncedDate",
"IsEmailBounced",
"PhotoUrl",
"Jigsaw",
"JigsawContactId",
"IndividualId",
"Mobile_ID_vod__c",
"H1Insights__H1_NPI_Value_for_Testing__c",
"H1Insights__H1_Person_ID__c",
"H1Insights__H1_Request_Status__c",
"H1Insights__H1_URL__c",
"H1Insights__NPI_Number__c",
"H1Insights__NPI_Number_for_H1_Insights__c",
"MSJ_Marketing_Cloud_Integration__c"
],
"is_skip": False,
"is_update_last_fetch_datetime": True
},
{
"object_name": "Territory2",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"Territory2ModelId",
"ParentTerritory2Id",
"Description",
"ForecastUserId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"DeveloperName",
"MSJ_Territory_Type__c",
"MSJ_Level__c"
],
"is_skip": False,
"is_update_last_fetch_datetime": True
}
]
}
# Act
with pytest.raises(Exception) as e:
FetchTargetObjects(fetch_objects_dict)
# Expects
assert str(e.value) == '「objects」キーは必須です'
def test_raise_constructor_no_type(self) -> None:
"""
Cases:
インスタンス生成テスト
辞書型のデータの対象のキーの値の型が想定と違う場合、例外が発生すること
Arranges:
- オブジェクト情報文字列を準備する
Expects:
- 例外が発生し期待値と一致する
"""
# Arranges
fetch_objects_dict = {
"objects": "test_value"
}
# Act
with pytest.raises(Exception) as e:
FetchTargetObjects(fetch_objects_dict)
# Expects
assert str(e.value) == '「objects」キーの値は「<class \'list\'>」でなければなりません'
def test_constructor_iterator(self) -> None:
"""
Cases:
インスタンス生成テスト
登録されたオブジェクトリストをすべて取り出せること
Arranges:
- オブジェクト情報文字列を準備する
Expects:
- ループが最後まで回ること
"""
fetch_objects_dict = {
"objects": [
{
"object_name": "AccountShare",
"columns": [
"Id",
"AccountId",
"UserOrGroupId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"RowCause",
"LastModifiedDate",
"LastModifiedById",
"IsDeleted"
],
"is_skip": False,
"is_update_last_fetch_datetime": False,
"datetime_column": "LastModifiedDate"
},
{
"object_name": "Contact",
"columns": [
"Id",
"IsDeleted",
"MasterRecordId",
"AccountId",
"IsPersonAccount",
"LastName",
"FirstName",
"Salutation",
"Name",
"OtherStreet",
"OtherCity",
"OtherState",
"OtherPostalCode",
"OtherCountry",
"OtherLatitude",
"OtherLongitude",
"OtherGeocodeAccuracy",
"OtherAddress",
"MailingStreet",
"MailingCity",
"MailingState",
"MailingPostalCode",
"MailingCountry",
"MailingLatitude",
"MailingLongitude",
"MailingGeocodeAccuracy",
"MailingAddress",
"Phone",
"Fax",
"MobilePhone",
"HomePhone",
"OtherPhone",
"AssistantPhone",
"ReportsToId",
"Email",
"Title",
"Department",
"AssistantName",
"Birthdate",
"Description",
"OwnerId",
"HasOptedOutOfEmail",
"HasOptedOutOfFax",
"DoNotCall",
"CreatedDate",
"CreatedById",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"LastActivityDate",
"LastCURequestDate",
"LastCUUpdateDate",
"MayEdit",
"IsLocked",
"LastViewedDate",
"LastReferencedDate",
"EmailBouncedReason",
"EmailBouncedDate",
"IsEmailBounced",
"PhotoUrl",
"Jigsaw",
"JigsawContactId",
"IndividualId",
"Mobile_ID_vod__c",
"H1Insights__H1_NPI_Value_for_Testing__c",
"H1Insights__H1_Person_ID__c",
"H1Insights__H1_Request_Status__c",
"H1Insights__H1_URL__c",
"H1Insights__NPI_Number__c",
"H1Insights__NPI_Number_for_H1_Insights__c",
"MSJ_Marketing_Cloud_Integration__c"
],
"is_skip": False,
"is_update_last_fetch_datetime": True
},
{
"object_name": "Territory2",
"columns": [
"Id",
"Name",
"Territory2TypeId",
"Territory2ModelId",
"ParentTerritory2Id",
"Description",
"ForecastUserId",
"AccountAccessLevel",
"OpportunityAccessLevel",
"CaseAccessLevel",
"ContactAccessLevel",
"LastModifiedDate",
"LastModifiedById",
"SystemModstamp",
"DeveloperName",
"MSJ_Territory_Type__c",
"MSJ_Level__c"
],
"is_skip": False,
"is_update_last_fetch_datetime": True
}
]
}
# Act
sut = FetchTargetObjects(fetch_objects_dict)
for i, item in enumerate(sut, 1):
assert item is not None
# Expects
assert i == 3