newdwh2021/ecs/jskult-batch/tests/manager/test_jskult_batch_run_manager.py

86 lines
2.7 KiB
Python

import boto3
import pytest
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.system_var.environment import BATCH_MANAGE_DYNAMODB_TABLE_NAME
UNITTEST_EXECUTION_ID = 'unittest'
class TestJskultBatchRunManager:
@pytest.fixture(scope='function', autouse=True)
def dynamodb_client(self):
dynamodb = boto3.client('dynamodb')
yield dynamodb
dynamodb.delete_item(
TableName=BATCH_MANAGE_DYNAMODB_TABLE_NAME,
Key={
'execution_id': {'S': UNITTEST_EXECUTION_ID}
}
)
def test_batch_success(self, dynamodb_client):
"""バッチ実行管理テーブルに成功のステータスが書き込まれること
Args:
dynamodb_client (boto3.Client): DynamoDB クライアント
"""
# Arrange
# Act
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
sut.batch_success()
# Assert
options = {
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
'Key': {
'execution_id': {'S': UNITTEST_EXECUTION_ID},
},
}
actual = dynamodb_client.get_item(**options)
assert actual['Item']['batch_run_status']['S'] == 'success'
def test_batch_failed(self, dynamodb_client):
"""バッチ実行管理テーブルに失敗のステータスが書き込まれること
Args:
dynamodb_client (boto3.Client): DynamoDB クライアント
"""
# Arrange
# Act
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
sut.batch_failed()
# Assert
options = {
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
'Key': {
'execution_id': {'S': UNITTEST_EXECUTION_ID},
},
}
actual = dynamodb_client.get_item(**options)
assert actual['Item']['batch_run_status']['S'] == 'failed'
def test_batch_retry(self, dynamodb_client):
"""バッチ実行管理テーブルにリトライのステータスが書き込まれること
Args:
dynamodb_client (boto3.Client): DynamoDB クライアント
"""
# Arrange
# Act
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
sut.batch_retry()
# Assert
options = {
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
'Key': {
'execution_id': {'S': UNITTEST_EXECUTION_ID},
},
}
actual = dynamodb_client.get_item(**options)
assert actual['Item']['batch_run_status']['S'] == 'retry'