コードの修正

This commit is contained in:
mori.k 2025-05-22 19:50:47 +09:00
parent 8214de77e4
commit 800f79b825

View File

@ -1,28 +1,39 @@
import os
from datetime import datetime, timezone
import boto3
TABLE_NAME = 'mbj-newdwh2021-staging-jskult-batch-run-manage'
dynamodb = boto3.client('dynamodb')
class JskultBatchRunManager:
def __init__(self, execution_id: str):
self._execution_id: str = execution_id
# バッチ処理ステータスをsuccessで登録および更新を行う
def batch_success(self):
try:
self._put_dynamodb_record('success')
except Exception as e:
raise e
# バッチ処理ステータスをfailedで登録および更新を行う
def batch_failed(self):
try:
self._put_dynamodb_record('failed')
except Exception as e:
raise e
# バッチ処理ステータスをretryで登録および更新を行う
def batch_retry(self):
try:
self._put_dynamodb_record('retry')
except Exception as e:
raise e
def _put_dynamodb_record(self, execution_id: str):
try:
self._execution_id = execution_id
# TODO バッチ実行管理テーブルの登録、更新upsert)
except Exception as e:
raise e
def _put_dynamodb_record(self, record: str):
# バッチ実行管理テーブルの登録、更新upsert)
now = int(datetime.now(timezone.utc).timestamp() * 1000)
options = {
'TableName': TABLE_NAME,
'Item': {
'execution_id': {'S': self._execution_id},
'status': {'S': record},
'createdAt': {'N': str(now)},
},
}
dynamodb.put_item(**options)