feat: バッチ実行管理テーブル操作クラスのテスト実装(途中)

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-26 18:10:31 +09:00
parent 9ec8b764ae
commit 61ff9f1159
4 changed files with 60 additions and 20 deletions

View File

@ -6,6 +6,7 @@ DB_SCHEMA=src05
LOG_LEVEL=INFO
# 処理名: 起動する処理に応じて変更する
PROCESS_NAME=*************
BATCH_MANAGE_DYNAMODB_TABLE_NAME=****************
ULTMARC_DATA_BUCKET=****************
ULTMARC_DATA_FOLDER=recv
JSKULT_BACKUP_BUCKET=****************

View File

@ -1,39 +1,43 @@
import os
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
import boto3
TABLE_NAME = 'mbj-newdwh2021-staging-jskult-batch-run-manage'
dynamodb = boto3.client('dynamodb')
class JskultBatchRunManager:
def __init__(self, execution_id: str):
def __init__(self, table_name: str, execution_id: str):
self._table_name = table_name
self._execution_id: str = execution_id
self._dynamodb = boto3.client('dynamodb')
# バッチ処理ステータスをsuccessで登録および更新を行う
def batch_success(self):
"""バッチ処理ステータスをsuccessで登録および更新を行う"""
self._put_dynamodb_record('success')
# バッチ処理ステータスをfailedで登録および更新を行う
def batch_failed(self):
"""バッチ処理ステータスをfailedで登録および更新を行う"""
self._put_dynamodb_record('failed')
# バッチ処理ステータスをretryで登録および更新を行う
def batch_retry(self):
"""バッチ処理ステータスをretryで登録および更新を行う"""
self._put_dynamodb_record('retry')
def _put_dynamodb_record(self, record: str):
# バッチ実行管理テーブルの登録、更新upsert)
now = int(datetime.now(timezone.utc).timestamp() * 1000)
def _put_dynamodb_record(self, batch_run_status: str):
"""バッチ実行管理テーブルの登録、更新upsert)
Args:
batch_run_status (str): バッチ処理ステータス
"""
# レコードの有効期限を現在時刻+24hのタイムスタンプで生成
now = datetime.now(timezone.utc)
later = now + timedelta(hours=24)
timestamp_24h = int(later.timestamp())
options = {
'TableName': TABLE_NAME,
'TableName': self._table_name,
'Item': {
'execution_id': {'S': self._execution_id},
'status': {'S': record},
'createdAt': {'N': str(now)},
'batch_run_status': {'S': batch_run_status},
'record_expiration_time': {'N': str(timestamp_24h)},
},
}
dynamodb.put_item(**options)
self._dynamodb.put_item(**options)

View File

@ -10,6 +10,9 @@ DB_SCHEMA = os.environ['DB_SCHEMA']
# 処理名
PROCESS_NAME = os.environ['PROCESS_NAME']
# AWS
BATCH_MANAGE_DYNAMODB_TABLE_NAME = os.environ.get('BATCH_MANAGE_DYNAMODB_TABLE_NAME')
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))

View File

@ -1,4 +1,36 @@
class TestJskultBatchRunManager:
import boto3
import pytest
def test_1(self):
pass
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.system_var.environment import BATCH_MANAGE_DYNAMODB_TABLE_NAME
UNITTEST_EXECUTION_ID = 'unittest'
class TestJskultBatchRunManager:
@pytest.fixture(scope='function', autouse=True)
def dynamodb_client(self):
dynamodb = boto3.client('dynamodb')
yield dynamodb
dynamodb.delete_item(Key={
'execution_id': UNITTEST_EXECUTION_ID
})
def test_batch_success(self, dynamodb_client):
# Arrange
# Act
sut = JskultBatchRunManager(BATCH_MANAGE_DYNAMODB_TABLE_NAME, UNITTEST_EXECUTION_ID)
sut.batch_success()
# Assert
options = {
'TableName': BATCH_MANAGE_DYNAMODB_TABLE_NAME,
'Key': {
'execution_id': {'S': UNITTEST_EXECUTION_ID},
},
}
actual = dynamodb_client.get_item(**options)
assert actual['Item']['batch_run_status']['S'] == 'success'