newdwh2021/ecs/jskult-batch/src/batch/update_business_day.py

200 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import boto3
from datetime import datetime
from src.batch.jskult_batch_entrypoint import JskultBatchEntrypoint
from src.aws.s3 import (ConfigBucket,
JskTransferListBucket)
from src.batch.common.calendar_file import CalendarFile
from src.batch.environment.update_business_day_environment import \
UpdateBusinessDayEnvironment
from src.manager.jskult_batch_run_manager import JskultBatchRunManager
from src.manager.jskult_batch_status_manager import JskultBatchStatusManager
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.error.exceptions import (EnvironmentVariableNotSetException,
MaxRunCountReachedException)
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('日付テーブル更新')
class UpdateBusinessDay(JskultBatchEntrypoint):
def __init__(self):
super().__init__()
self.environment = UpdateBusinessDayEnvironment()
# 必須の環境変数が設定されていない場合、エラーにする
try:
self.environment.validate()
except EnvironmentVariableNotSetException as e:
logger.exception(e)
return
def execute(self):
"""日付テーブル更新"""
logger.info('I-1 処理開始: 実消化アルトマーク_日付テーブル更新')
jskult_hdke_tbl_manager = JskultHdkeTblManager()
jskult_batch_status_manager = JskultBatchStatusManager(
self.environment.PROCESS_NAME,
constants.PROCESS_NAME_UPDATE_BUSINESS_DAY,
self.environment.MAX_RUN_COUNT,
# 日付テーブル更新では連携ファイル数を参照しないため、0を指定する。
0
)
jskult_batch_run_manager = JskultBatchRunManager(
self.environment.BATCH_MANAGE_DYNAMODB_TABLE_NAME,
self.environment.BATCH_EXECUTION_ID)
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_START)
if not jskult_hdke_tbl_manager.can_run_process():
logger.error(
'日次バッチ処理中でない、もしくはdump取得が正常終了していないため、日次バッチ処理を異常終了します。')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
# バッチステータス管理テーブルをエラーに更新
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
return
try:
if not jskult_batch_status_manager.can_run_post_process():
# 後続処理の起動条件を満たしていない場合
# 処理ステータスを「処理待」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_WAITING)
# バッチ実行管理テーブルに「retry」で登録
jskult_batch_run_manager.batch_retry()
return
except MaxRunCountReachedException:
logger.error('後続処理の実行が完了していないため、日付テーブル更新処理を異常終了します。')
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
logger.error(f'E-1 異常終了実消化アルトマーク_日付テーブル更新。{e}')
return
try:
# 日付テーブルの処理年月日を取得する
_, _, syor_date = jskult_hdke_tbl_manager.get_batch_statuses()
if self._is_archive_run_day(syor_date):
logger.info('I-7 [NOTICE]実消化データアーカイブ取得処理を実行します。')
# 実消化データアーカイブ取得処理を起動する
sfn_client = boto3.client('stepfunctions')
# StepFunctionsを起動する
sfn_client.start_execution(
stateMachineArn=self.environment.ARCHIVE_STATEMACHINE_ARN
)
dt = datetime.strptime(syor_date, "%Y/%m/%d")
# 日付テーブルの処理年月日が月曜日の場合
if dt.weekday() == constants.WEEKDAY_MONDAY:
if not jskult_batch_status_manager.is_done_ultmarc_import():
logger.info(
'I-3 [NOTICE]アルトマーク取込稼働日でしたが、アルトマーク取込が行われませんでした。')
# 実消化転送データ一覧を取得する
try:
transfer_list_bucket = JskTransferListBucket()
transfer_list_file_path = transfer_list_bucket.download_transfer_result_file(
syor_date)
except Exception as e:
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
with open(transfer_list_file_path) as f:
transfer_data_list = json.load(f)
jsk_transfer_data_list = transfer_data_list['jsk_transfer_list']
# 実消化&アルトマーク_実消化受領予定データリストを取得する
try:
config_bucket = ConfigBucket()
jsk_expected_data_path = config_bucket.download_jsk_expected_data_list()
except Exception as e:
logger.exception(f'転送ファイル一覧の取得に失敗しました。 {e}')
# バッチ実行管理テーブルをfailedで登録
jskult_batch_run_manager.batch_failed()
with open(jsk_expected_data_path) as f:
jsk_expected_data_list = json.load(f)
jsk_transfer_expected_data_list = jsk_expected_data_list['jsk_transfer_list']
if len(jsk_transfer_data_list) != 0:
# 不足ファイルがあった場合ログ出力
self._check_missing_received_files(
jsk_transfer_expected_data_list, jsk_transfer_data_list)
# 受領予定にないファイルがあった場合ログ出力
self._check_extra_received_files(
jsk_transfer_expected_data_list, jsk_transfer_data_list)
jskult_hdke_tbl_manager.update_batch_process_complete()
# 処理ステータスを「処理済」に設定
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_DONE)
jskult_batch_run_manager.batch_success()
logger.info('I-4 [NOTICE]処理終了: 実消化アルトマーク_日付テーブル更新')
except Exception as e:
# 何らかのエラーが発生した際に、バッチ実行管理テーブルに「failed」で登録
jskult_batch_run_manager.batch_failed()
jskult_batch_status_manager.set_process_status(
constants.PROCESS_STATUS_ERROR)
logger.error(f'E-1 異常終了実消化アルトマーク_日付テーブル更新。{e}')
def _is_archive_run_day(self, syor_date: str):
# アーカイブ取得日カレンダーを取得する
archive_calendar_file_path = ConfigBucket().download_jsk_archive_run_day_list()
# 設定ファイル「実消化データアーカイブ起動日カレンダーファイル」の定義内容を取得する
target_days = CalendarFile(archive_calendar_file_path)
# 処理日付が、設定ファイル「実消化データアーカイブ起動日カレンダーファイル」の定義に該当するかを判定する
return target_days.compare_date(syor_date)
def _check_missing_received_files(self, jsk_transfer_expected_data_list: list, jsk_transfer_data_list: list):
# jsk_transfer_data_listと比較し、結果不足ファイルがあった場合ログ出力
missing_files = [
jsk_transfer_expected_data
for jsk_transfer_expected_data in jsk_transfer_expected_data_list
if not any(
jsk_transfer_data.startswith(
jsk_transfer_expected_data)
for jsk_transfer_data in jsk_transfer_data_list
)
]
if missing_files != 0:
logger.info(
f'I-5 [NOTICE]日次連携受領ファイルに不足がありました。ファイル名:{missing_files}')
def _check_extra_received_files(self, jsk_transfer_expected_data_list: list, jsk_transfer_data_list: list):
unexpected_data = [
jsk_transfer_data
for jsk_transfer_data in jsk_transfer_data_list
if not any(
jsk_transfer_data.startswith(
jsk_transfer_expected_data)
for jsk_transfer_expected_data in jsk_transfer_expected_data_list
)
]
if len(unexpected_data) != 0:
logger.info(
f'I-6 [NOTICE]受領ファイルに日次連携ファイルでないファイルがありました。ファイル名:{unexpected_data}')