86 lines
2.7 KiB
Python
86 lines
2.7 KiB
Python
import boto3
|
|
import pytest
|
|
|
|
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
|
|
from src.system_var.environment import BATCH_MANAGE_DYNAMODB_TABLE_NAME
|
|
|
|
UNITTEST_EXECUTION_ID = 'unittest'
|
|
|
|
|
|
class TestJskultBatchRunManager:
|
|
@pytest.fixture(scope='function', autouse=True)
|
|
def dynamodb_client(self):
|
|
dynamodb = boto3.client('dynamodb')
|
|
|
|
yield dynamodb
|
|
|
|
dynamodb.delete_item(
|
|
TableName=BATCH_MANAGE_DYNAMODB_TABLE_NAME,
|
|
Key={
|
|
'execution_id': {'S': UNITTEST_EXECUTION_ID}
|
|
}
|
|
)
|
|
|
|
def test_batch_success(self, dynamodb_client):
|
|
"""バッチ実行管理テーブルに成功のステータスが書き込まれること
|
|
|
|
Args:
|
|
dynamodb_client (boto3.Client): DynamoDB クライアント
|
|
"""
|
|
# Arrange
|
|
# Act
|
|
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
|
|
sut.batch_success()
|
|
|
|
# Assert
|
|
options = {
|
|
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
|
|
'Key': {
|
|
'execution_id': {'S': UNITTEST_EXECUTION_ID},
|
|
},
|
|
}
|
|
actual = dynamodb_client.get_item(**options)
|
|
assert actual['Item']['batch_run_status']['S'] == 'success'
|
|
|
|
def test_batch_failed(self, dynamodb_client):
|
|
"""バッチ実行管理テーブルに失敗のステータスが書き込まれること
|
|
|
|
Args:
|
|
dynamodb_client (boto3.Client): DynamoDB クライアント
|
|
"""
|
|
# Arrange
|
|
# Act
|
|
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
|
|
sut.batch_failed()
|
|
|
|
# Assert
|
|
options = {
|
|
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
|
|
'Key': {
|
|
'execution_id': {'S': UNITTEST_EXECUTION_ID},
|
|
},
|
|
}
|
|
actual = dynamodb_client.get_item(**options)
|
|
assert actual['Item']['batch_run_status']['S'] == 'failed'
|
|
|
|
def test_batch_retry(self, dynamodb_client):
|
|
"""バッチ実行管理テーブルにリトライのステータスが書き込まれること
|
|
|
|
Args:
|
|
dynamodb_client (boto3.Client): DynamoDB クライアント
|
|
"""
|
|
# Arrange
|
|
# Act
|
|
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
|
|
sut.batch_retry()
|
|
|
|
# Assert
|
|
options = {
|
|
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
|
|
'Key': {
|
|
'execution_id': {'S': UNITTEST_EXECUTION_ID},
|
|
},
|
|
}
|
|
actual = dynamodb_client.get_item(**options)
|
|
assert actual['Item']['batch_run_status']['S'] == 'retry'
|