feat: COM_先端医療機器の登録処理を実装、周辺のログ出力も

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-05 14:42:32 +09:00
parent ec2a4a9470
commit 5f67c8f008
9 changed files with 213 additions and 27 deletions

View File

@ -8,6 +8,7 @@ from src.aws.s3 import UltmarcBucket
from src.batch.ultmarc.datfile import DatFile
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
UltmarcTableMapperFactory
from src.db.database import Database
from src.logging.get_logger import get_logger
logger = get_logger('アルトマークデータ保管')
@ -16,9 +17,13 @@ ultmarc_bucket = UltmarcBucket()
def dat_insert_control():
try:
logger.info('datInsert START')
# DBセットアップ
db = Database.get_instance()
db.connect()
# ファイル単位でトランザクションを行う
db.begin()
# datファイルをS3から取得する
dat_file_list = ultmarc_bucket.list_edi_file()
# ファイルがない場合は処理せず、正常終了とする
if len(dat_file_list) == 0:
logger.info('ファイルがないため、処理をスキップします')
@ -42,16 +47,41 @@ def dat_insert_control():
ultmarc_bucket.backup_edi_file(dat_file_name, now)
# datファイルをダウンロード
local_file_path = ultmarc_bucket.download_edi_file(dat_file_name)
# サンプル実装するのは、3つ
# 511-01(所属学会専門医: COM_専門分野), 101-00(DCF施設: COM_施設), 008(所属部科コード: COM_所属部科)
mapper_factory = UltmarcTableMapperFactory()
dat_file = DatFile.from_path(local_file_path)
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
# 書き込み先のテーブルを特定
com_class = mapper_factory.create(line.layout_class, line.record_id, line.record)
print(com_class)
query = com_class.make_query()
# print(query)
for log_count, line in enumerate(dat_file):
try:
# 書き込み先のテーブルを特定
mapper_class = mapper_factory.create(
line.layout_class,
line.record_id,
line.record,
db
)
mapper_class.make_query()
mapper_class.execute_queries()
dat_file.count_up_success()
# 5000件ごとにログ記録
# これいる??
if log_count % 5000 == 0:
logger.info(f'Count: {log_count}')
except Exception as e:
# TODO: ログちゃんとする
record = line.record
log_message = ','.join([f'"{r}"' for r in record])
logger.warning(log_message)
dat_file.count_up_error()
# すべての行を登録終えたらコミットする
db.commit()
logger.info(f'datInsert RESULT')
logger.info(f'SUCCESS_COUNT={dat_file.success_count}')
logger.info(f'ERROR_COUNT={dat_file.error_count}')
logger.info(f'ALL_COUNT={dat_file.total_count}')
except Exception as e:
logger.exception(e)
raise e
finally:
db.disconnect()
logger.info('終了')

View File

@ -1,10 +1,6 @@
import binascii
import csv
import os.path as path
from io import TextIOWrapper
from src.batch.ultmarc.mdb_character_set_hex import MDB_CHARACTER_SET_HEX
class DatFileLine:
layout_class: str
@ -14,10 +10,13 @@ class DatFileLine:
def __init__(self, dat_line: list[str]) -> None:
self.layout_class = dat_line[0]
self.record_id = dat_line[1]
self.record = dat_line[1:]
self.record = dat_line
class DatFile:
lines: list[DatFileLine]
success_count: int = 0
error_count: int = 0
total_count: int = 0
__i: int = 0
def __iter__(self):
@ -35,6 +34,13 @@ class DatFile:
csv_rows = [DatFileLine(row) for row in reader]
self.lines = csv_rows
self.total_count = len(csv_rows)
def count_up_success(self):
self.success_count += 1
def count_up_error(self):
self.error_count += 1
@classmethod
def from_path(cls, local_file_path: str):

View File

@ -0,0 +1,96 @@
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
from src.batch.ultmarc.utmp_tables.tables.com_hamtec import ComHamtec
class ComHamtecMapper(UltmarcTableMapper):
# レコード存在確認SQL
RECORD_EXISTS_QUERY = """\
SELECT
COUNT(*) AS count_num
FROM
src05.com_hamtec
WHERE
hamtec_cd = :hamtec_cd
"""
# データ登録・更新用のSQL
INSERT_QUERY = """\
INSERT INTO src05.com_hamtec
(
hamtec_cd,
hamtec_div,
hamtec_name,
regist_ymd,
sys_regist_date,
regist_prgm_id,
sys_update_date,
update_prgm_id
)
VALUES (
:hamtec_cd,
:hamtec_div,
:hamtec_name,
:execute_date_str_ymd,
:execute_datetime,
:program_name,
:execute_datetime,
:program_name
)
"""
UPDATE_QUERY = """\
UPDATE
src05.com_hamtec
SET
hamtec_div = :hamtec_div,
hamtec_name = :hamtec_name,
update_ymd = :execute_date_str_ymd,
sys_update_date = :execute_datetime,
update_prgm_id = :program_name
WHERE
hamtec_cd = :hamtec_cd
"""
# 修正区分が「C(削除)」の場合の更新SQL
LOGICAL_DELETE_QUERY = """\
UPDATE
src05.com_hamtec
SET
delete_ymd = :execute_date_str_ymd,
sys_update_date = :execute_date_str_ymd,
update_prgm_id = :program_name
WHERE
hamtec_cd = :hamtec_cd
"""
record: ComHamtec
def __init__(self, record: list[str], db) -> None:
super().__init__(record, db, ComHamtec)
program_name = __name__.split('.')[-1] # 当モジュール名(現行から変わっている)
# モジュール名をクエリパラメータに設定
self.query_parameters[0]['program_name'] = program_name
# 読み込んだレコード値もクエリパラメータに追加
self.query_parameters[0] = {**self.query_parameters[0], **self.record.to_sql_parameter()}
def make_query(self):
# 修正区分がC(削除)の場合、論理削除
if self.record.maint_flag == 'C':
self.queries.append(self.LOGICAL_DELETE_QUERY)
return
# 追加、更新の場合
self.queries.append(self.__make_upsert_query())
return
def __make_upsert_query(self):
# レコードの存在確認
record_count = self.db.execute_select(self.RECORD_EXISTS_QUERY, self.query_parameters[0])
# 存在しない場合はInsert
if record_count[0]['count_num'] == 0:
return self.INSERT_QUERY
# 存在する場合はUpdate
# 更新データがある場合のみ更新
if self.record.hamtec_div != '' or self.record.hamtec_name != '':
return self.UPDATE_QUERY
else:
return None

View File

@ -2,6 +2,6 @@ from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
class ComInst(UltmarcTableMapper):
class ComInstMapper(UltmarcTableMapper):
def make_query(self):
return "INSERT INTO src05.com_inst values('hogehoge')"
self.queries.append(None)

View File

@ -1,14 +1,42 @@
from abc import ABCMeta, abstractmethod
from datetime import datetime
from src.batch.ultmarc.utmp_tables.tables.ultmarc_table import UltmarcTable
from src.db.database import Database
class UltmarcTableMapper(metaclass=ABCMeta):
pass
_records: list[str]
record: UltmarcTable
db: Database
queries: list[str]
query_parameters: list[dict]
def __init__(self, records: list[str]) -> None:
self._records = records
def __init__(self, record: list[str], db: Database, table_class: type[UltmarcTable]) -> None:
self.record = table_class(record)
self.db = db
# 実行年月日(文字列)、実行年月日時分秒を設定
now = datetime.now()
execute_date_str_ymd = now.strftime('%Y%m%d')
execute_datetime = now.strftime('%Y/%m/%d %H:%M:%S')
# クエリリストを初期化
self.queries = []
# 共通クエリパラメータを設定
self.query_parameters = [{
'execute_date_str_ymd': execute_date_str_ymd,
'execute_datetime': execute_datetime
}]
@abstractmethod
def make_query(self):
pass
def execute_queries(self):
if len(self.queries) == 0:
raise Exception('make_queryを呼び出してから実行してください')
for i, query in enumerate(self.queries):
if query is None:
continue
self.db.execute(query, self.query_parameters[i])

View File

@ -0,0 +1,17 @@
from src.batch.ultmarc.utmp_tables.tables.ultmarc_table import UltmarcTable
class ComHamtec(UltmarcTable):
maint_flag: str # 修正区分
hamtec_cd: str # 高度先進医療コード
hamtec_div: str # 高度先進医療区分
hamtec_name: str # 高度先進医療名
def __init__(self, record: list[str]):
super().__init__(record)
self.maint_flag = record[3]
self.hamtec_cd = record[1]
# 0埋めでデータが来るので、0を削除する
self.hamtec_div = record[2].replace('0', '')
self.hamtec_name = record[6]

View File

@ -0,0 +1,8 @@
class UltmarcTable:
record: list
def __init__(self, record: list):
self.record = record
def to_sql_parameter(self):
return vars(self)

View File

@ -1,7 +1,8 @@
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import (com_inst,
null_mapper)
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import (
com_hamtec_mapper, com_inst_mapper, null_mapper)
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
from src.db.database import Database
# テーブルとのマッピング
COM_TABLE_LIST = {
@ -15,7 +16,7 @@ COM_TABLE_LIST = {
# COM_医師
"501": {"01": null_mapper.NullMapper},
# COM_施設
"101": {"00": com_inst.ComInst},
"101": {"00": com_inst_mapper.ComInstMapper},
# COM_薬局
"102": {"03": null_mapper.NullMapper},
# COM_医師勤務先
@ -57,7 +58,7 @@ COM_TABLE_LIST = {
# COM_施設区分
"011": null_mapper.NullMapper,
# COM_高度先進医療
"021": null_mapper.NullMapper,
"021": com_hamtec_mapper.ComHamtecMapper,
# COM_先端医療機器
"022": null_mapper.NullMapper,
# COM_看護種別
@ -82,7 +83,7 @@ COM_TABLE_LIST = {
class UltmarcTableMapperFactory:
def create(self, layout_class: str, record_id: str, records: list[str]) -> UltmarcTableMapper:
def create(self, layout_class: str, record_id: str, records: list[str], db: Database) -> UltmarcTableMapper:
# レイアウト種別とレコードIDから、マッピング先のテーブルを特定
table_by_layout_class = COM_TABLE_LIST.get(layout_class)
@ -96,4 +97,4 @@ class UltmarcTableMapperFactory:
elif issubclass(table_by_layout_class, UltmarcTableMapper):
mapper_class = table_by_layout_class
return mapper_class(records)
return mapper_class(records, db)