From aca85704dafb90c5bbb68e6ed3d11b45222fe433 Mon Sep 17 00:00:00 2001 From: "x.azuma.m@nds-tyo.co.jp" Date: Mon, 8 May 2023 16:25:11 +0900 Subject: [PATCH] =?UTF-8?q?LOAD=20DATA=20=20LOCAL=20INFILE=E3=81=A73948?= =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/jskult-batch-daily/src/aws/s3.py | 2 +- .../src/batch/vjsk/vjsk_data_load_manager.py | 28 +++--- .../src/batch/vjsk/vjsk_importer.py | 42 ++++----- .../src/batch/vjsk/vjsk_recv_file_manager.py | 90 +++++++++---------- .../src/batch/vjsk/vjsk_recv_file_mapper.py | 5 +- 5 files changed, 89 insertions(+), 78 deletions(-) diff --git a/ecs/jskult-batch-daily/src/aws/s3.py b/ecs/jskult-batch-daily/src/aws/s3.py index 79c80db9..2ee93eb8 100644 --- a/ecs/jskult-batch-daily/src/aws/s3.py +++ b/ecs/jskult-batch-daily/src/aws/s3.py @@ -123,7 +123,7 @@ class VjskBucket(S3Bucket): def download_data_file(self, data_filename: str): temporary_dir = tempfile.mkdtemp() - temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}') + temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') with open(temporary_file_path, mode='wb') as f: self._s3_client.download_file(self._bucket_name, data_filename, f) f.seek(0) diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py index a412c3c7..9aa80070 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py @@ -1,5 +1,4 @@ -from src.batch.vjsk.vjsk_recv_file_manager import (VjskDatFile, - VjskRecvFileManager) +# from src.batch.vjsk.vjsk_recv_file_manager import VjskDatFile from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper from src.db.database import Database from src.logging.get_logger import get_logger @@ -9,20 +8,26 @@ mapper = VjskRecvFileMapper() class JjskDataLoadManager: - def _import_to_db(dat_file: VjskDatFile, condkey: str): + def __init__(self): + pass + + def _import_to_db(src_file_name: str, condkey: str): db = Database.get_instance() table_name_org = mapper.get_org_table(condkey) - table_name_src = mapper.get_org_table(condkey) + table_name_src = mapper.get_src_table(condkey) try: db.connect() # TODO:接続オプション local_infile = True が必要? db.begin() # orgをtruncate - f"TRUNCATE TABLE {table_name_org};" + db.execute(f"TRUNCATE TABLE {table_name_org};") + + # load DATA local infileステートメント実行許可設定 + db.execute("SET GLOBAL local_infile=on;") # orgにload ※warningは1148エラーになるらしい - sql = f"LOAD DATA LOCAL INFILE {dat_file} INTO TABLE {table_name_org} FIELDS TERMINATED BY '\t' ENCLOSED BY ""'"" IGNORE 1 LINES;" + sql = f"LOAD DATA LOCAL INFILE '{src_file_name}' INTO TABLE {table_name_org} FIELDS TERMINATED BY '\\t' ENCLOSED BY \"'\" IGNORE 1 LINES;" cnt = db.execute(sql) logger.info(f'tsvデータをorgテーブルにLOAD : 件数({cnt})') @@ -39,18 +44,21 @@ class JjskDataLoadManager: db.disconnect() return + @classmethod def Load(self, target: dict): + logger.debug(f'JjskDataLoadManager#load start target:{target}') # target : {"condkey": key, "src_file_path":local_file_path} # データファイルオープン - dat_file = VjskRecvFileManager.file_open(target["local_file_path"]) + local_file_name = target["src_file_path"] + # dat_file = VjskDatFile.retrieve_from_file(local_file_name) # TODO: tsvファイルをload投入用のDMLに加工(システム日時つけたり、エンコードをUTF-8に変換したり) - # TODO: ファイルオンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58 + # TODO: ファイルエンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58 # TODO: エンコード変換の参考 https://dev.classmethod.jp/articles/python-encoding/ - dat_file = dat_file # データベース登録 - self._import_to_db(dat_file, target["condkey"]) + self._import_to_db(local_file_name, target["condkey"]) + logger.debug('JjskDataLoadManager#load end') return diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py index 839058ab..7db2983e 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_importer.py @@ -9,7 +9,7 @@ from src.logging.get_logger import get_logger # from src.batch.datachange import emp_chg_inst_lau -logger = get_logger('V実消化データ取込') +_logger = get_logger('V実消化データ取込') batch_context = BatchContext.get_instance() vjsk_recv_bucket = VjskBucket() vjsk_mapper = VjskRecvFileMapper() @@ -28,11 +28,11 @@ def _check_if_file_exists(src_list: list, key: str) -> bool: def _check_received_files(): """V実消化連携データ存在確認処理""" - logger.debug('V実消化連携データ存在確認処理:開始') + _logger.debug('V実消化連携データ存在確認処理:開始') # 実消化&アルトマーク V実消化データ受領バケットにあるファイル一覧を取得 received_files = vjsk_recv_bucket.get_s3_file_list() - logger.debug(f'ファイル一覧{received_files}') + _logger.debug(f'ファイル一覧{received_files}') # ファイル存在確認 卸在庫データファイル(卸在庫データ処理対象日のみ実施) if batch_context.is_import_target_vjsk_stockslipdata: @@ -87,13 +87,13 @@ def _check_received_files(): if not _check_if_file_exists(received_files, vjsk_mapper.CONDKEY_MDB_CONV_MST): raise BatchOperationException(f'MDBコード変換マスタファイルがありません ファイル一覧:{received_files}') - logger.debug('V実消化連携データ存在確認処理:終了') + _logger.debug('V実消化連携データ存在確認処理:終了') return True def _import_file_to_db(): - logger.debug('V実消化取込処理:開始') + _logger.debug('V実消化取込処理:開始') # 実消化&アルトマーク V実消化データ受領バケットにあるファイルパス一覧を取得 received_s3_files = vjsk_recv_bucket.get_s3_file_list() @@ -101,11 +101,11 @@ def _import_file_to_db(): # ファイルパス一覧にマッピング情報を参照するためのキーを持たせて辞書可する target_dict = {} for s3_file_path in received_s3_files: - local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path) - key = vjsk_mapper.get_condkey_by_s3_file_path(local_file_path) + local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path.get('filename')) + key = vjsk_mapper.get_condkey_by_s3_file_path(s3_file_path.get('filename')) if key is not None: target_dict[key] = {"condkey": key, "src_file_path": local_file_path} - logger.debug(f'S3ファイルパス辞書{target_dict}') + _logger.debug(f'S3ファイルパス辞書{target_dict}') # TODO: diff_upsertに変わるやつを呼び出す # TODO: emp_chg_inst_lau.batch_process() みたいに @@ -150,7 +150,7 @@ def _import_file_to_db(): # # # ファイル存在確認 MDBコード変換マスタ # JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MDB_CONV_MST]) - logger.debug('V実消化取込処理:終了') + _logger.debug('V実消化取込処理:終了') def _determine_today_is_stockslipdata_target(): @@ -162,37 +162,39 @@ def _determine_today_is_stockslipdata_target(): targetdays = CalendarWholwSalerStockFile(holiday_list_file_path) ret = targetdays.compare_date(today) except Exception as e: - logger.error(f'{e}') + _logger.error(f'{e}') raise e return ret def exec(): """V実消化データ取込""" - logger.info('Start Jitsusyouka Torikomi PGM.') + _logger.info('Start Jitsusyouka Torikomi PGM.') # 卸在庫データ取込対象日であれば、卸在庫データ処理対象フラグを立てる - logger.debug('卸在庫データ取込対象日であるかを判定') + _logger.debug('卸在庫データ取込対象日であるかを判定') batch_context.is_import_target_vjsk_stockslipdata = _determine_today_is_stockslipdata_target() - logger.debug(f'判定結果 : {batch_context.is_import_target_vjsk_stockslipdata}') + _logger.debug(f'判定結果 : {batch_context.is_import_target_vjsk_stockslipdata}') if batch_context.is_import_target_vjsk_stockslipdata: - logger.info('卸在庫データ取込対象日です') + _logger.info('卸在庫データ取込対象日です') # V実消化データファイル受領チェック - logger.debug('V実消化データファイル受領チェック:開始') + _logger.debug('V実消化データファイル受領チェック:開始') try: # S3バケット上でV実消化データファイルの存在チェックをする _check_received_files() except BatchOperationException as e: - logger.error('受領したV実消化データファイルに欠落があります') + _logger.error('受領したV実消化データファイルに欠落があります') raise e - logger.debug('V実消化データファイル受領チェック:終了') + _logger.debug('V実消化データファイル受領チェック:終了') # データベース取込 - logger.debug('V実消化データ取込:開始') + _logger.debug('V実消化データ取込:開始') try: _import_file_to_db() except Exception as e: - logger.error(f'データベース登録失敗 {e}') - logger.debug('V実消化データ取込:終了') + _logger.error(f'データベース登録失敗 {e}') + raise e + + _logger.debug('V実消化データ取込:終了') diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py index 5a0dd54f..7ddbb766 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_manager.py @@ -1,60 +1,60 @@ -import csv -from io import TextIOWrapper +# import csv +# from io import TextIOWrapper -class VjskRecvFileManager: - layout_class: str - records: list[str] +# class VjskRecvFileManager: +# layout_class: str +# records: list[str] - def __init__(self, dat_line: list[str]) -> None: - self.layout_class = dat_line[0] - self.records = dat_line +# def __init__(self, dat_line: list[str]) -> None: +# self.layout_class = dat_line[0] +# self.records = dat_line -class VjskDatFile: - """V実消化データファイル""" +# class VjskDatFile: +# """V実消化データファイル""" - lines: list[VjskRecvFileManager] - success_count: int = 0 - error_count: int = 0 - total_count: int = 0 - __i: int = 0 +# lines: list[VjskRecvFileManager] +# success_count: int = 0 +# error_count: int = 0 +# total_count: int = 0 +# __i: int = 0 - def __iter__(self): - return self +# def __iter__(self): +# return self - def __next__(self) -> VjskRecvFileManager: - if self.__i == len(self.lines): - raise StopIteration() - line = self.lines[self.__i] - self.__i += 1 - return line +# def __next__(self) -> VjskRecvFileManager: +# if self.__i == len(self.lines): +# raise StopIteration() +# line = self.lines[self.__i] +# self.__i += 1 +# return line - def __init__(self, file: TextIOWrapper) -> None: - reader = csv.reader(file) - csv_rows = [VjskRecvFileManager(row) for row in reader] +# def __init__(self, file: TextIOWrapper) -> None: +# reader = csv.reader(file) +# csv_rows = [VjskRecvFileManager(row) for row in reader] - self.lines = csv_rows - self.total_count = len(csv_rows) +# self.lines = csv_rows +# self.total_count = len(csv_rows) - def count_up_success(self): - self.success_count += 1 +# def count_up_success(self): +# self.success_count += 1 - def count_up_error(self): - self.error_count += 1 +# def count_up_error(self): +# self.error_count += 1 - @classmethod - def file_open(cls, local_file_path: str): - """V実消化データファイルを読み込み、新しいインスタンスを作成する +# @classmethod +# def retrieve_from_file(cls, local_file_path: str): +# """V実消化データファイルを読み込み、新しいインスタンスを作成する - Args: - local_file_path (str): ローカルのファイルパス +# Args: +# local_file_path (str): ローカルのファイルパス - Returns: - VjskDatFile: このクラスのインスタンス - """ - # cp932(Shift-JIS Windows拡張)でファイルを読み込む - file = open(local_file_path, encoding='cp932') - instance = cls(file) - file.close() - return instance +# Returns: +# VjskDatFile: このクラスのインスタンス +# """ +# # cp932(Shift-JIS Windows拡張)でファイルを読み込む +# file = open(local_file_path, encoding='cp932') +# instance = cls(file) +# file.close() +# return instance diff --git a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py index d89f5063..4aeea24f 100644 --- a/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py +++ b/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_recv_file_mapper.py @@ -168,8 +168,9 @@ class VjskRecvFileMapper: def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str: ret = None filename = s3_file_path[s3_file_path.rfind("/") + 1:] - for element in self._VJSK_INTERFACE_MAPPING: + for condkey in self._VJSK_INTERFACE_MAPPING: + element = self._VJSK_INTERFACE_MAPPING.get(condkey) if filename.startswith(element.get(self._KEY_FILE_PREFIX)) and filename.endswith(element.get(self._KEY_FILE_SUFFIX)): - ret = element + ret = condkey break return ret