ファイルロードからDB登録の実装(実行確認は未)

This commit is contained in:
x.azuma.m@nds-tyo.co.jp 2023-04-28 19:51:59 +09:00
parent 9ba4eda8a3
commit 1fd6633bc8
5 changed files with 196 additions and 9 deletions

View File

@ -115,9 +115,16 @@ class VjskBucket(S3Bucket):
_bucket_name = environment.JSKULT_DATA_BUCKET
_recv_folder = environment.JSKULT_DATA_FOLDER_RECV
def get_file_list(self):
return self._s3_client.list_objects(self._bucket_name, self._recv_folder)
_s3_file_list = None
# def download_data_file(self, data_filename: str):
# temporary_dir = tempfile.mkdtemp()
# temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}')
def get_s3_file_list(self):
self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path

View File

@ -0,0 +1,56 @@
from src.batch.vjsk.vjsk_recv_file_manager import (VjskDatFile,
VjskRecvFileManager)
from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper
from src.db.database import Database
from src.logging.get_logger import get_logger
logger = get_logger('V実消化データ取込(DB登録)')
mapper = VjskRecvFileMapper()
class JjskDataLoadManager:
def _import_to_db(dat_file: VjskDatFile, condkey: str):
db = Database.get_instance()
table_name_org = mapper.get_org_table(condkey)
table_name_src = mapper.get_org_table(condkey)
try:
db.connect() # TODO:接続オプション local_infile = True が必要?
db.begin()
# orgをtruncate
f"TRUNCATE TABLE {table_name_org};"
# orgにload ※warningは1148エラーになるらしい
sql = f"LOAD DATA LOCAL INFILE {dat_file} INTO TABLE {table_name_org} FIELDS TERMINATED BY '\t' ENCLOSED BY ""'"" IGNORE 1 LINES;"
cnt = db.execute(sql)
logger.info(f'tsvデータをorgテーブルにLOAD : 件数({cnt})')
# org→srcにinsert select
# TODO: INTO句とSELECT句はmapperに持たせてcondkeyで引っ張ってくるようにしたい
f"INSERT INTO {table_name_src} SELECT * FROM {table_name_org};"
db.commit()
except Exception as e: # TODO:DB例外だけキャッチしたい
db.rollback()
logger.error(e)
raise e
finally:
db.disconnect()
return
def Load(self, target: dict):
# target : {"condkey": key, "src_file_path":local_file_path}
# データファイルオープン
dat_file = VjskRecvFileManager.file_open(target["local_file_path"])
# TODO: tsvファイルをload投入用のDMLに加工(システム日時つけたり、エンコードをUTF-8に変換したり)
# TODO: ファイルオンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58
# TODO: エンコード変換の参考 https://dev.classmethod.jp/articles/python-encoding/
dat_file = dat_file
# データベース登録
self._import_to_db(dat_file, target["condkey"])
return

View File

@ -2,6 +2,7 @@ from src.aws.s3 import ConfigBucket, VjskBucket
from src.batch.common.batch_context import BatchContext
from src.batch.common.calendar_wholestocksaler_file import \
CalendarWholwSalerStockFile
from src.batch.vjsk.vjsk_data_load_manager import JjskDataLoadManager
from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
@ -30,7 +31,7 @@ def _check_received_files():
logger.debug('V実消化連携データ存在確認処理開始')
# 実消化&アルトマーク V実消化データ受領バケットにあるファイル一覧を取得
received_files = vjsk_recv_bucket.get_file_list()
received_files = vjsk_recv_bucket.get_s3_file_list()
logger.debug(f'ファイル一覧{received_files}')
# ファイル存在確認 卸在庫データファイル(卸在庫データ処理対象日のみ実施)
@ -94,15 +95,67 @@ def _check_received_files():
def _import_file_to_db():
logger.debug('V実消化取込処理開始')
# diff_upsertに変わるやつを呼び出す
# emp_chg_inst_lau.batch_process() みたいに
# 実消化&アルトマーク V実消化データ受領バケットにあるファイルパス一覧を取得
received_s3_files = vjsk_recv_bucket.get_s3_file_list()
# ファイルパス一覧にマッピング情報を参照するためのキーを持たせて辞書可する
target_dict = {}
for s3_file_path in received_s3_files:
local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path)
key = vjsk_mapper.get_condkey_by_s3_file_path(local_file_path)
if key is not None:
target_dict[key] = {"condkey": key, "src_file_path": local_file_path}
logger.debug(f'S3ファイルパス辞書{target_dict}')
# TODO: diff_upsertに変わるやつを呼び出す
# TODO: emp_chg_inst_lau.batch_process() みたいに
# DB登録 卸在庫データファイル(卸在庫データ処理対象日のみ実施)
if batch_context.is_import_target_vjsk_stockslipdata:
JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_STOCK_SLIP_DATA])
# # # ファイル存在確認 卸販売データ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_SLIP_DATA])
# # # ファイル存在確認 卸組織変換マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_ORG_CNV_MST])
# # # ファイル存在確認 施設統合マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_VOP_HCO_MERGE])
# # # ファイル存在確認 卸マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_WHS_MST])
# # # ファイル存在確認 卸ホールディングスマスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_HLD_MST])
# # # ファイル存在確認 施設マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_FCL_MST])
# # # ファイル存在確認 メーカー卸組織展開表
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MKR_ORG_HORIZON])
# # # ファイル存在確認 取引区分マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_TRAN_KBN_MST])
# # # ファイル存在確認 製品マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_PHM_PRD_MST])
# # # ファイル存在確認 製品価格マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_PHM_PRICE_MST])
# # # ファイル存在確認 卸得意先情報マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_WHS_CUSTOMER_MST])
# # # ファイル存在確認 MDBコード変換マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MDB_CONV_MST])
logger.debug('V実消化取込処理終了')
def _determine_today_is_stockslipdata_target():
"""設定ファイル「V実消化卸在庫データ連携日ファイル」の内容を取得して、処理日が該当していればTrueを返却する"""
try:
# 設定ファイル「V実消化卸在庫データ連携日ファイル」の内容を取得して、処理日が該当していればTrueを返却する
today = batch_context.syor_date
holiday_list_file_path = ConfigBucket().download_wholesaler_stock_list()
@ -128,7 +181,9 @@ def exec():
# V実消化データファイル受領チェック
logger.debug('V実消化データファイル受領チェック開始')
try:
# S3バケット上でV実消化データファイルの存在チェックをする
_check_received_files()
except BatchOperationException as e:
logger.error('受領したV実消化データファイルに欠落があります')
raise e

View File

@ -0,0 +1,60 @@
import csv
from io import TextIOWrapper
class VjskRecvFileManager:
layout_class: str
records: list[str]
def __init__(self, dat_line: list[str]) -> None:
self.layout_class = dat_line[0]
self.records = dat_line
class VjskDatFile:
"""V実消化データファイル"""
lines: list[VjskRecvFileManager]
success_count: int = 0
error_count: int = 0
total_count: int = 0
__i: int = 0
def __iter__(self):
return self
def __next__(self) -> VjskRecvFileManager:
if self.__i == len(self.lines):
raise StopIteration()
line = self.lines[self.__i]
self.__i += 1
return line
def __init__(self, file: TextIOWrapper) -> None:
reader = csv.reader(file)
csv_rows = [VjskRecvFileManager(row) for row in reader]
self.lines = csv_rows
self.total_count = len(csv_rows)
def count_up_success(self):
self.success_count += 1
def count_up_error(self):
self.error_count += 1
@classmethod
def file_open(cls, local_file_path: str):
"""V実消化データファイルを読み込み、新しいインスタンスを作成する
Args:
local_file_path (str): ローカルのファイルパス
Returns:
VjskDatFile: このクラスのインスタンス
"""
# cp932(Shift-JIS Windows拡張)でファイルを読み込む
file = open(local_file_path, encoding='cp932')
instance = cls(file)
file.close()
return instance

View File

@ -164,3 +164,12 @@ class VjskRecvFileMapper:
if condkey in self._VJSK_INTERFACE_MAPPING:
ret = self._VJSK_INTERFACE_MAPPING.get(condkey).get(self._KEY_SRC_TABLE)
return ret
def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str:
ret = None
filename = s3_file_path[s3_file_path.rfind("/") + 1:]
for element in self._VJSK_INTERFACE_MAPPING:
if filename.startswith(element.get(self._KEY_FILE_PREFIX)) and filename.endswith(element.get(self._KEY_FILE_SUFFIX)):
ret = element
break
return ret