feat: 日付テーブルチェックを共通処理で行うように修正。DCFDSFファイル出力処理を修正。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-20 12:38:27 +09:00
parent 46e9828fdf
commit 0e191bf686
6 changed files with 324 additions and 370 deletions

View File

@ -170,7 +170,7 @@ class VjskSendBucket(S3Bucket):
_bucket_name = environment.VJSK_DATA_BUCKET _bucket_name = environment.VJSK_DATA_BUCKET
_send_folder = environment.VJSK_DATA_SEND_FOLDER _send_folder = environment.VJSK_DATA_SEND_FOLDER
def upload_inst_pharm_csv_file(self, vjsk_create_csv: str, csv_file_path: str): def upload_dcf_dsf_csv_file(self, vjsk_create_csv: str, csv_file_path: str):
# S3バケットにファイルを移動 # S3バケットにファイルを移動
csv_file_name = f'{self._send_folder}/{vjsk_create_csv}' csv_file_name = f'{self._send_folder}/{vjsk_create_csv}'
s3_client = S3Client() s3_client = S3Client()

View File

@ -1,111 +0,0 @@
"""バッチ処理の共通関数"""
import logging
import textwrap
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_batch_statuses() -> tuple[str, str, str]:
"""日付テーブルから、以下を取得して返す。
- 日次バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]日次バッチ処理中フラグdump取得状況区分
"""
db = Database.get_instance()
sql = 'SELECT bch_actf, dump_sts_kbn, src05.get_syor_date() AS syor_date FROM src05.hdke_tbl'
try:
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def update_batch_processing_flag_in_processing() -> None:
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
bch_actf = :in_processing,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_batch_process_complete() -> None:
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src05.get_syor_date() + interval 1 day), '%Y%m%d'), -- +1
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def logging_sql(logger: logging.Logger, sql: str) -> None:
"""SQL文をデバッグログで出力する
Args:
logger (logging.Logger): ロガー
sql (str): SQL文
"""
logger.debug(f'\n{"-" * 15}\n{textwrap.dedent(sql)[1:-1]}\n{"-" * 15}')

View File

@ -1,76 +1,55 @@
"""output_vjsk_inst_pharm_data""" """DCF DSFデータ作成処理"""
import csv
import os.path as path
import tempfile
from src.aws.s3 import VjskSendBucket from src.aws.s3 import VjskSendBucket
from src.batch.common.batch_context import BatchContext from src.batch.common.batch_context import BatchContext
from src.db.database import Database from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger from src.logging.get_logger import get_logger
import tempfile
import os.path as path
import csv
logger = get_logger('V実消化用施設データ作成処理') logger = get_logger('DCF_DSFデータ作成処理')
sql_err_msg = "SQL実行エラーです。"
def exec(): def exec():
vjsk_csv_file_name = 'ComInst.csv' csv_file_name = 'ult_dcf_dsf.csv'
# バッチ共通設定を取得 # バッチ共通設定を取得
batch_context = BatchContext.get_instance() batch_context = BatchContext.get_instance()
if not batch_context.is_ultmarc_imported:
logger.info('アルトマーク取込が行われていないため、V実消化用施設データ作成処理をスキップします。')
return
db = Database.get_instance() db = Database.get_instance()
try: try:
logger.info('処理開始') logger.info('処理開始')
try:
# DB接続 # DB接続
db.connect() db.connect()
except Exception as e:
logger.info('DB接続エラーです。')
raise e
# CSVファイルの作成用のSQL実行(施設) # CSVファイルの作成用のSQL実行(施設)
record_inst = select_inst_record(db) record_dcf = select_dcf_record(db)
# CSVファイルの作成用のSQL実行(薬局) # CSVファイルの作成用のSQL実行(薬局)
record_pharm = select_pharm_record(db) record_dsf = select_dsf_record(db)
# CSVファイル作成 # CSVファイル作成
csv_file_path = make_csv_data(record_inst, record_pharm, vjsk_csv_file_name) csv_file_path = make_csv_data(record_dcf, record_dsf, csv_file_name)
vjsk_bucket = VjskSendBucket() # s3へアップロード
try: jsk_bucket = VjskSendBucket()
# s3へデータ移動 jsk_bucket.upload_dcf_dsf_csv_file(csv_file_name, csv_file_path)
vjsk_bucket.upload_inst_pharm_csv_file(vjsk_csv_file_name, csv_file_path)
except Exception as e:
logger.info('S3バケットにCSVデータを作成できませんでした。')
raise e
try: # 連携ファイルをバックアップ
# 処理後ファイルをバックアップ jsk_bucket.backup_inst_pharm_csv_file(csv_file_name, batch_context.syor_date)
vjsk_bucket.backup_inst_pharm_csv_file(vjsk_csv_file_name, batch_context.syor_date)
except Exception as e:
logger.info('バックアップバケットへCSVデータをコピーできませんでした。')
raise e
csv_count = len(record_inst) + len(record_pharm) csv_count = len(record_dcf) + len(record_dsf)
logger.info(f'CSV出力件数: {csv_count}') logger.info(f'CSV出力件数: {csv_count}')
logger.info('正常終了') logger.info('正常終了')
except Exception as e: except Exception as e:
raise e raise BatchOperationException(e)
finally: finally:
db.disconnect() db.disconnect()
return return
def select_inst_record(db): def select_dcf_record(db):
# CSVファイル作成用のSQL実行(施設) # CSVファイル作成用のSQL実行(DCF施設)
try:
# 施設テーブル検索SQL
sql = """\ sql = """\
SELECT dcf_dsf_inst_cd, SELECT dcf_dsf_inst_cd,
inst_div_cd, inst_div_cd,
@ -140,15 +119,10 @@ def select_inst_record(db):
FROM src05.com_inst ORDER BY dcf_dsf_inst_cd FROM src05.com_inst ORDER BY dcf_dsf_inst_cd
""" """
return db.execute_select(sql) return db.execute_select(sql)
except Exception as e:
logger.debug(sql_err_msg)
raise e
def select_pharm_record(db): def select_dsf_record(db):
# CSVファイル作成用のSQL実行(薬局) # CSVファイル作成用のSQL実行(DSF薬局)
try:
# 薬局テーブル検索SQL
sql = """\ sql = """\
SELECT dcf_dsf_inst_cd, SELECT dcf_dsf_inst_cd,
inst_div_cd, inst_div_cd,
@ -218,17 +192,12 @@ def select_pharm_record(db):
FROM src05.com_pharm ORDER BY dcf_dsf_inst_cd FROM src05.com_pharm ORDER BY dcf_dsf_inst_cd
""" """
return db.execute_select(sql) return db.execute_select(sql)
except Exception as e:
logger.debug(sql_err_msg)
raise e
def make_csv_data(record_inst: list, record_pharm: list, vjsk_csv_file_name: str): def make_csv_data(record_inst: list, record_pharm: list, csv_file_name: str):
# 一時ファイルとして保存する(CSVファイル) # CSVファイルを作成する
try:
temporary_dir = tempfile.mkdtemp() temporary_dir = tempfile.mkdtemp()
csv_file_path = path.join(temporary_dir, vjsk_csv_file_name) csv_file_path = path.join(temporary_dir, csv_file_name)
head_str = ['DCF_DSF_INST_CD', 'INST_DIV_CD', 'ADDR_UNKNOWN_REASON_CD', 'FORM_INST_NAME_KANA', 'INST_NAME_KANA', head_str = ['DCF_DSF_INST_CD', 'INST_DIV_CD', 'ADDR_UNKNOWN_REASON_CD', 'FORM_INST_NAME_KANA', 'INST_NAME_KANA',
'FORM_INST_NAME_KANJI', 'INST_NAME_KANJI', 'RLTD_UNIV_PRNT_CD', 'BED_NUM', 'CLOSE_FLG', 'ESTAB_SCHE_FLG', 'FORM_INST_NAME_KANJI', 'INST_NAME_KANJI', 'RLTD_UNIV_PRNT_CD', 'BED_NUM', 'CLOSE_FLG', 'ESTAB_SCHE_FLG',
@ -264,8 +233,4 @@ def make_csv_data(record_inst: list, record_pharm: list, vjsk_csv_file_name: str
csv_data = ['' if n is None else n for n in record_pharm_value] csv_data = ['' if n is None else n for n in record_pharm_value]
writer.writerow(csv_data) writer.writerow(csv_data)
except Exception as e:
logger.info('CSVデータの作成に失敗しました。')
raise e
return csv_file_path return csv_file_path

View File

@ -1,11 +1,10 @@
"""実消化&アルトマーク アルトマーク取込/データ出力処理""" """実消化&アルトマーク アルトマーク取込/データ出力処理"""
from src.batch.batch_functions import (
get_batch_statuses, update_batch_processing_flag_in_processing)
from src.batch.common.batch_context import BatchContext from src.batch.common.batch_context import BatchContext
from src.batch.ultmarc import import_ultmarc_process, output_dcf_dsf_data from src.batch.ultmarc import import_ultmarc_process, output_dcf_dsf_data
from src.error.exceptions import BatchOperationException from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger from src.logging.get_logger import get_logger
from src.manager.jskult_hdke_tbl_manager import JskultHdkeTblManager
from src.system_var import constants from src.system_var import constants
logger = get_logger('アルトマーク取込/データ出力') logger = get_logger('アルトマーク取込/データ出力')
@ -17,34 +16,16 @@ batch_context = BatchContext.get_instance()
def exec(): def exec():
try: try:
logger.info('アルトマーク取込/データ出力:開始') logger.info('アルトマーク取込/データ出力:開始')
try: hdke_tbl_manager = JskultHdkeTblManager()
# 日次バッチ処置中フラグ、dump処理状態区分、処理日を取得 if not hdke_tbl_manager.can_run_process():
batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses() logger.error('日次バッチ処理中日次バッチ処理中またはdump取得が正常終了していないため、日次バッチ処理を終了します。')
except BatchOperationException as e:
logger.exception(f'日付テーブル取得(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS return constants.BATCH_EXIT_CODE_SUCCESS
_, _, syor_date = hdke_tbl_manager.get_batch_statuses()
logger.info(f'処理日={syor_date}') logger.info(f'処理日={syor_date}')
# バッチ共通設定に処理日を追加 # バッチ共通設定に処理日を追加
batch_context.syor_date = syor_date batch_context.syor_date = syor_date
# バッチ処理中に更新
try:
update_batch_processing_flag_in_processing()
except BatchOperationException as e:
logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
try: try:
logger.info('アルトマーク取込:起動') logger.info('アルトマーク取込:起動')
import_ultmarc_process.exec_import() import_ultmarc_process.exec_import()
@ -53,9 +34,6 @@ def exec():
logger.exception(f'アルトマーク取込処理エラー(異常終了){e}') logger.exception(f'アルトマーク取込処理エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS return constants.BATCH_EXIT_CODE_SUCCESS
# 調査目的でアルトマーク取込が行われたかどうかをログ出力
logger.debug(f'{"アルトマーク取込が行われました。" if batch_context.is_ultmarc_imported else "アルトマーク取込が行われませんでした。"}')
try: try:
logger.info('V実消化用施設データ作成処理起動') logger.info('V実消化用施設データ作成処理起動')
output_dcf_dsf_data.exec() output_dcf_dsf_data.exec()

View File

@ -0,0 +1,122 @@
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
class JskultHdkeTblManager:
_db: Database
def __init__(self):
self._db = Database.get_instance()
def get_batch_statuses(self) -> tuple[str, str, str]:
"""日次バッチ処理中フラグ、dump取得状況区分、処理日を取得する
Raises:
BatchOperationException: DB操作の何らかのエラー
Returns:
tuple[str, str, str]: [0]日次バッチ処理中フラグ[1]dump取得状況区分[2]処理日
"""
sql = 'SELECT bch_actf, dump_sts_kbn, src07.get_syor_date() AS syor_date FROM src07.hdke_tbl'
try:
self._db.connect()
hdke_tbl_result = self._db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def update_batch_process_start(self):
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :start,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
self._db.connect()
self._db.to_jst()
self._db.execute(
sql, {'start': constants.BATCH_ACTF_BATCH_START})
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
return
def update_batch_process_complete(self) -> None:
"""バッチ正常終了処理時の更新処理
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src07.get_syor_date() + interval 1 day), '%Y%m%d'), -- +1
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
self._db.connect()
self._db.to_jst()
self._db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
def can_run_process(self) -> bool:
"""バッチ処理を起動してよいかを判定する
Raises:
BatchOperationException: DB操作の何らかのエラー
Returns:
bool: バッチ処理を実行して良い場合はTrue
"""
try:
# 日次バッチ処置中フラグ、dump処理状態区分を取得
batch_processing_flag, dump_status_kbn, _ = self.get_batch_statuses()
except DBException as e:
raise BatchOperationException(e)
finally:
self._db.disconnect()
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_START:
return False
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
return False
return True