diff --git a/ecs/jskult-batch-daily/src/aws/s3.py b/ecs/jskult-batch-daily/src/aws/s3.py index 2aebff4b..79c80db9 100644 --- a/ecs/jskult-batch-daily/src/aws/s3.py +++ b/ecs/jskult-batch-daily/src/aws/s3.py @@ -115,9 +115,16 @@ class VjskBucket(S3Bucket): _bucket_name = environment.JSKULT_DATA_BUCKET _recv_folder = environment.JSKULT_DATA_FOLDER_RECV - def get_file_list(self): - return self._s3_client.list_objects(self._bucket_name, self._recv_folder) + _s3_file_list = None - # def download_data_file(self, data_filename: str): - # temporary_dir = tempfile.mkdtemp() - # temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}') + def get_s3_file_list(self): + self._s3_file_list = self._s3_client.list_objects(self._bucket_name, self._recv_folder) + return self._s3_file_list + + def download_data_file(self, data_filename: str): + temporary_dir = tempfile.mkdtemp() + temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}') + with open(temporary_file_path, mode='wb') as f: + self._s3_client.download_file(self._bucket_name, data_filename, f) + f.seek(0) + return temporary_file_path diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py new file mode 100644 index 00000000..a412c3c7 --- /dev/null +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py @@ -0,0 +1,56 @@ +from src.batch.vjsk.vjsk_recv_file_manager import (VjskDatFile, + VjskRecvFileManager) +from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper +from src.db.database import Database +from src.logging.get_logger import get_logger + +logger = get_logger('V実消化データ取込(DB登録)') +mapper = VjskRecvFileMapper() + + +class JjskDataLoadManager: + def _import_to_db(dat_file: VjskDatFile, condkey: str): + db = Database.get_instance() + table_name_org = mapper.get_org_table(condkey) + table_name_src = mapper.get_org_table(condkey) + + try: + db.connect() # TODO:接続オプション local_infile = True が必要? + db.begin() + + # orgをtruncate + f"TRUNCATE TABLE {table_name_org};" + + # orgにload ※warningは1148エラーになるらしい + sql = f"LOAD DATA LOCAL INFILE {dat_file} INTO TABLE {table_name_org} FIELDS TERMINATED BY '\t' ENCLOSED BY ""'"" IGNORE 1 LINES;" + cnt = db.execute(sql) + logger.info(f'tsvデータをorgテーブルにLOAD : 件数({cnt})') + + # org→srcにinsert select + # TODO: INTO句とSELECT句はmapperに持たせてcondkeyで引っ張ってくるようにしたい + f"INSERT INTO {table_name_src} SELECT * FROM {table_name_org};" + + db.commit() + except Exception as e: # TODO:DB例外だけキャッチしたい + db.rollback() + logger.error(e) + raise e + finally: + db.disconnect() + return + + def Load(self, target: dict): + # target : {"condkey": key, "src_file_path":local_file_path} + + # データファイルオープン + dat_file = VjskRecvFileManager.file_open(target["local_file_path"]) + + # TODO: tsvファイルをload投入用のDMLに加工(システム日時つけたり、エンコードをUTF-8に変換したり) + # TODO: ファイルオンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58 + # TODO: エンコード変換の参考 https://dev.classmethod.jp/articles/python-encoding/ + dat_file = dat_file + + # データベース登録 + self._import_to_db(dat_file, target["condkey"]) + + return diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py index 4c2cb0ee..839058ab 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py @@ -2,6 +2,7 @@ from src.aws.s3 import ConfigBucket, VjskBucket from src.batch.common.batch_context import BatchContext from src.batch.common.calendar_wholestocksaler_file import \ CalendarWholwSalerStockFile +from src.batch.vjsk.vjsk_data_load_manager import JjskDataLoadManager from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper from src.error.exceptions import BatchOperationException from src.logging.get_logger import get_logger @@ -30,7 +31,7 @@ def _check_received_files(): logger.debug('V実消化連携データ存在確認処理:開始') # 実消化&アルトマーク V実消化データ受領バケットにあるファイル一覧を取得 - received_files = vjsk_recv_bucket.get_file_list() + received_files = vjsk_recv_bucket.get_s3_file_list() logger.debug(f'ファイル一覧{received_files}') # ファイル存在確認 卸在庫データファイル(卸在庫データ処理対象日のみ実施) @@ -94,15 +95,67 @@ def _check_received_files(): def _import_file_to_db(): logger.debug('V実消化取込処理:開始') - # diff_upsertに変わるやつを呼び出す - # emp_chg_inst_lau.batch_process() みたいに + # 実消化&アルトマーク V実消化データ受領バケットにあるファイルパス一覧を取得 + received_s3_files = vjsk_recv_bucket.get_s3_file_list() + + # ファイルパス一覧にマッピング情報を参照するためのキーを持たせて辞書可する + target_dict = {} + for s3_file_path in received_s3_files: + local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path) + key = vjsk_mapper.get_condkey_by_s3_file_path(local_file_path) + if key is not None: + target_dict[key] = {"condkey": key, "src_file_path": local_file_path} + logger.debug(f'S3ファイルパス辞書{target_dict}') + + # TODO: diff_upsertに変わるやつを呼び出す + # TODO: emp_chg_inst_lau.batch_process() みたいに + + # DB登録 卸在庫データファイル(卸在庫データ処理対象日のみ実施) + if batch_context.is_import_target_vjsk_stockslipdata: + JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_STOCK_SLIP_DATA]) + + # # # ファイル存在確認 卸販売データ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_SLIP_DATA]) + + # # # ファイル存在確認 卸組織変換マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_ORG_CNV_MST]) + + # # # ファイル存在確認 施設統合マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_VOP_HCO_MERGE]) + + # # # ファイル存在確認 卸マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_WHS_MST]) + + # # # ファイル存在確認 卸ホールディングスマスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_HLD_MST]) + + # # # ファイル存在確認 施設マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_FCL_MST]) + + # # # ファイル存在確認 メーカー卸組織展開表 + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MKR_ORG_HORIZON]) + + # # # ファイル存在確認 取引区分マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_TRAN_KBN_MST]) + + # # # ファイル存在確認 製品マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_PHM_PRD_MST]) + + # # # ファイル存在確認 製品価格マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_PHM_PRICE_MST]) + + # # # ファイル存在確認 卸得意先情報マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_WHS_CUSTOMER_MST]) + + # # # ファイル存在確認 MDBコード変換マスタ + # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MDB_CONV_MST]) logger.debug('V実消化取込処理:終了') def _determine_today_is_stockslipdata_target(): + """設定ファイル「V実消化卸在庫データ連携日ファイル」の内容を取得して、処理日が該当していればTrueを返却する""" try: - # 設定ファイル「V実消化卸在庫データ連携日ファイル」の内容を取得して、処理日が該当していればTrueを返却する today = batch_context.syor_date holiday_list_file_path = ConfigBucket().download_wholesaler_stock_list() @@ -128,7 +181,9 @@ def exec(): # V実消化データファイル受領チェック logger.debug('V実消化データファイル受領チェック:開始') try: + # S3バケット上でV実消化データファイルの存在チェックをする _check_received_files() + except BatchOperationException as e: logger.error('受領したV実消化データファイルに欠落があります') raise e diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py new file mode 100644 index 00000000..5a0dd54f --- /dev/null +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py @@ -0,0 +1,60 @@ +import csv +from io import TextIOWrapper + + +class VjskRecvFileManager: + layout_class: str + records: list[str] + + def __init__(self, dat_line: list[str]) -> None: + self.layout_class = dat_line[0] + self.records = dat_line + + +class VjskDatFile: + """V実消化データファイル""" + + lines: list[VjskRecvFileManager] + success_count: int = 0 + error_count: int = 0 + total_count: int = 0 + __i: int = 0 + + def __iter__(self): + return self + + def __next__(self) -> VjskRecvFileManager: + if self.__i == len(self.lines): + raise StopIteration() + line = self.lines[self.__i] + self.__i += 1 + return line + + def __init__(self, file: TextIOWrapper) -> None: + reader = csv.reader(file) + csv_rows = [VjskRecvFileManager(row) for row in reader] + + self.lines = csv_rows + self.total_count = len(csv_rows) + + def count_up_success(self): + self.success_count += 1 + + def count_up_error(self): + self.error_count += 1 + + @classmethod + def file_open(cls, local_file_path: str): + """V実消化データファイルを読み込み、新しいインスタンスを作成する + + Args: + local_file_path (str): ローカルのファイルパス + + Returns: + VjskDatFile: このクラスのインスタンス + """ + # cp932(Shift-JIS Windows拡張)でファイルを読み込む + file = open(local_file_path, encoding='cp932') + instance = cls(file) + file.close() + return instance diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py index 6a997c9f..d89f5063 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py @@ -164,3 +164,12 @@ class VjskRecvFileMapper: if condkey in self._VJSK_INTERFACE_MAPPING: ret = self._VJSK_INTERFACE_MAPPING.get(condkey).get(self._KEY_SRC_TABLE) return ret + + def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str: + ret = None + filename = s3_file_path[s3_file_path.rfind("/") + 1:] + for element in self._VJSK_INTERFACE_MAPPING: + if filename.startswith(element.get(self._KEY_FILE_PREFIX)) and filename.endswith(element.get(self._KEY_FILE_SUFFIX)): + ret = element + break + return ret