feat: アルトマークデータ保管のサンプルコード 第一版

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-04 11:15:33 +09:00
parent 3131422eb8
commit af3dbde89e
11 changed files with 316 additions and 0 deletions

View File

@ -0,0 +1,80 @@
import os.path as path
import tempfile
import boto3
class S3Client:
__s3_client = boto3.client('s3')
_bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0:
return []
contents = response['Contents']
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']} for content in contents if not content['Key'].endswith('/')]
return objects
def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_client.copy(copy_source, dest_bucket, dest_key)
return
def download_file(self, bucket_name: str, file_key: str, file):
self.__s3_client.download_fileobj(
Bucket=bucket_name,
Key=file_key,
Fileobj=file
)
return
def upload_file(self, local_file_path: str, bucket_name: str, file_key: str):
self.__s3_client.upload_file(
local_file_path,
Bucket=bucket_name,
Key=file_key
)
class S3Bucket():
_s3_client = S3Client()
_bucket_name: str = None
def __str__(self) -> str:
return self._bucket_name
class UltmarcBucket(S3Bucket):
# TODO: 環境変数にする
_bucket_name = 'mbj-newdwh2021-staging-jskult-ultmarc'
_folder = 'recv'
def list_edi_file(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def download_edi_file(self, edi_filename: str):
# 一時ファイルとして保存する
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{edi_filename.replace("recv/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, edi_filename, f)
f.seek(0)
return temporary_file_path
def backup_edi_file(self, edi_file_key: str, datetime_key: str):
ultmarc_backup_bucket = UltmarcBackupBucket()
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{edi_file_key.replace(f"{self._folder}/", "")}'
self._s3_client.copy(str(self), edi_file_key, str(ultmarc_backup_bucket), backup_key)
class JskUltBackupBucket(S3Bucket):
# TODO: 環境変数にする
_bucket_name = 'mbj-newdwh2021-staging-jskult-backup'
class UltmarcBackupBucket(JskUltBackupBucket):
# TODO: 環境変数にする
_folder = 'ultmarc'

View File

@ -0,0 +1,57 @@
"""アルトマークデータ保管"""
import json
import os.path as path
from datetime import datetime
from src.aws.s3 import UltmarcBucket
from src.batch.ultmarc.datfile import DatFile
from src.batch.ultmarc.utmp_tables.ultmarc_table_mapper_factory import \
UltmarcTableMapperFactory
from src.logging.get_logger import get_logger
logger = get_logger('アルトマークデータ保管')
ultmarc_bucket = UltmarcBucket()
def dat_insert_control():
try:
logger.info('datInsert START')
# datファイルをS3から取得する
dat_file_list = ultmarc_bucket.list_edi_file()
# ファイルがない場合は処理せず、正常終了とする
if len(dat_file_list) == 0:
logger.info('ファイルがないため、処理をスキップします')
return
# ファイルが複数ある場合はエラーとする
if len(dat_file_list) > 1:
logger.error('複数の取込ファイルがあるため、異常終了')
return
# ファイルの件数は必ず1件になる
dat_file_info = dat_file_list[0]
# 0Byteの場合、
if dat_file_info['size'] == 0:
logger.info('0Byteファイルのため、処理をスキップします')
return
dat_file_name = dat_file_info['filename']
logger.info(f"Get File Name :{dat_file_name}")
now = datetime.now().strftime('%Y/%m/%d')
# ファイルをバックアップ
# 現行は、jobctrl_dailyの先頭でやっている
ultmarc_bucket.backup_edi_file(dat_file_name, now)
# datファイルをダウンロード
local_file_path = ultmarc_bucket.download_edi_file(dat_file_name)
# サンプル実装するのは、3つ
# 511-01(所属学会専門医: COM_専門分野), 101-00(DCF施設: COM_施設), 008(所属部科コード: COM_所属部科)
mapper_factory = UltmarcTableMapperFactory()
dat_file = DatFile.from_path(local_file_path)
# datファイルを1行ずつ処理し、各テーブルへ登録
for line in dat_file:
# 書き込み先のテーブルを特定
com_class = mapper_factory.create(line.layout_class, line.record_id, line.record)
print(com_class)
query = com_class.make_query()
# print(query)
finally:
logger.info('終了')

View File

@ -0,0 +1,45 @@
import binascii
import csv
import os.path as path
from io import TextIOWrapper
from src.batch.ultmarc.mdb_character_set_hex import MDB_CHARACTER_SET_HEX
class DatFileLine:
layout_class: str
record_id: str
record: list[str]
def __init__(self, dat_line: list[str]) -> None:
self.layout_class = dat_line[0]
self.record_id = dat_line[1]
self.record = dat_line[1:]
class DatFile:
lines: list[DatFileLine]
__i: int = 0
def __iter__(self):
return self
def __next__(self) -> DatFileLine:
if self.__i == len(self.lines):
raise StopIteration()
line = self.lines[self.__i]
self.__i += 1
return line
def __init__(self, file: TextIOWrapper) -> None:
reader = csv.reader(file)
csv_rows = [DatFileLine(row) for row in reader]
self.lines = csv_rows
@classmethod
def from_path(cls, local_file_path: str):
# cp932(Shift-JIS Windows拡張)でファイルを読み込む
file = open(local_file_path, encoding='cp932')
instance = cls(file)
file.close()
return instance

View File

@ -0,0 +1,8 @@
import glob
import os
# 同階層内のモジュールを一括でインポート
__all__ = [
os.path.split(os.path.splitext(file)[0])[1]
for file in glob.glob(os.path.join(os.path.dirname(__file__), '[a-zA-Z0-9]*.py'))
]

View File

@ -0,0 +1,7 @@
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
class ComInst(UltmarcTableMapper):
def make_query(self):
return "INSERT INTO src05.com_inst values('hogehoge')"

View File

@ -0,0 +1,8 @@
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
class NullMapper(UltmarcTableMapper):
def make_query(self):
return super().make_query()

View File

@ -0,0 +1,14 @@
from abc import ABCMeta, abstractmethod
class UltmarcTableMapper(metaclass=ABCMeta):
pass
_records: list[str]
def __init__(self, records: list[str]) -> None:
self._records = records
@abstractmethod
def make_query(self):
pass

View File

@ -0,0 +1,97 @@
from src.batch.ultmarc.utmp_tables.table_mapper.concrete import (com_inst,
null_mapper)
from src.batch.ultmarc.utmp_tables.table_mapper.ultmarc_table_mapper import \
UltmarcTableMapper
# テーブルとのマッピング
COM_TABLE_LIST = {
# レコードID固定
# COM_医師学会
"521": {"01": null_mapper.NullMapper},
# COM_施設属性
"111": {"00": null_mapper.NullMapper},
# COM_臨床研修病院
"112": {"00": null_mapper.NullMapper},
# COM_医師
"501": {"01": null_mapper.NullMapper},
# COM_施設
"101": {"00": com_inst.ComInst},
# COM_医師勤務先
"502": {"01": null_mapper.NullMapper},
# COM_専門分野
"511": {"01": null_mapper.NullMapper},
# COM_都道府県医療機能情報(基本)
"132": {"00": null_mapper.NullMapper},
# COM_都道府県医療機能情報(施設設備)
"133": {"00": null_mapper.NullMapper},
# COM_都道府県医療機能情報(疾患治療)
"134": {"00": null_mapper.NullMapper},
# COM_都道府県医療機能情報(短期滞在手術)
"135": {"00": null_mapper.NullMapper},
# COM_都道府県医療機能情報(専門外来)
"136": {"00": null_mapper.NullMapper},
# レコードID浮動
# COM_診療科目
"001": null_mapper.NullMapper,
# COM_病院種別
"002": null_mapper.NullMapper,
# COM_出身校学部識別
"003": null_mapper.NullMapper,
# COM_出身校
"004": null_mapper.NullMapper,
# COM_役職
"005": null_mapper.NullMapper,
# 都道府県マスタ
"006": null_mapper.NullMapper,
# COM_経営体
"007": null_mapper.NullMapper,
# COM_所属部科
"008": null_mapper.NullMapper,
# COM_学会
"009": null_mapper.NullMapper,
# COM_専門医資格
"010": null_mapper.NullMapper,
# COM_施設区分
"011": null_mapper.NullMapper,
# COM_高度先進医療
"021": null_mapper.NullMapper,
# COM_先端医療機器
"022": null_mapper.NullMapper,
# COM_看護種別
"023": null_mapper.NullMapper,
# COM_医療機能評価
"024": null_mapper.NullMapper,
# COM_地域クリティカルパス
"026": null_mapper.NullMapper,
# COM_疾患別リハビリテーション科
"027": null_mapper.NullMapper,
# COM_政策医療
"028": null_mapper.NullMapper,
# COM_医療圏都道府県
"121": null_mapper.NullMapper,
# COM_医療圏次マスタ
"122": null_mapper.NullMapper,
# COM_二次医療圏
"123": null_mapper.NullMapper,
# COM_医療圏都道府県市町村対照表
"124": null_mapper.NullMapper
}
class UltmarcTableMapperFactory:
def create(self, layout_class: str, record_id: str, records: list[str]) -> UltmarcTableMapper:
# レイアウト種別とレコードIDから、マッピング先のテーブルを特定
table_by_layout_class = COM_TABLE_LIST.get(layout_class)
# レイアウト種別が特定できない場合はエラーとする
if table_by_layout_class is None:
raise Exception('特定できませんでした')
mapper_class: UltmarcTableMapper = None
if type(table_by_layout_class) is dict:
mapper_class = table_by_layout_class.get(record_id)
elif issubclass(table_by_layout_class, UltmarcTableMapper):
mapper_class = table_by_layout_class
return mapper_class(records)