LOAD DATA LOCAL INFILEで3948エラー

This commit is contained in:
x.azuma.m@nds-tyo.co.jp 2023-05-08 16:25:11 +09:00
parent 1fd6633bc8
commit aca85704da
5 changed files with 89 additions and 78 deletions

View File

@ -123,7 +123,7 @@ class VjskBucket(S3Bucket):
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._folder}/", "")}')
temporary_file_path = path.join(temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)

View File

@ -1,5 +1,4 @@
from src.batch.vjsk.vjsk_recv_file_manager import (VjskDatFile,
VjskRecvFileManager)
# from src.batch.vjsk.vjsk_recv_file_manager import VjskDatFile
from src.batch.vjsk.vjsk_recv_file_mapper import VjskRecvFileMapper
from src.db.database import Database
from src.logging.get_logger import get_logger
@ -9,20 +8,26 @@ mapper = VjskRecvFileMapper()
class JjskDataLoadManager:
def _import_to_db(dat_file: VjskDatFile, condkey: str):
def __init__(self):
pass
def _import_to_db(src_file_name: str, condkey: str):
db = Database.get_instance()
table_name_org = mapper.get_org_table(condkey)
table_name_src = mapper.get_org_table(condkey)
table_name_src = mapper.get_src_table(condkey)
try:
db.connect() # TODO:接続オプション local_infile = True が必要?
db.begin()
# orgをtruncate
f"TRUNCATE TABLE {table_name_org};"
db.execute(f"TRUNCATE TABLE {table_name_org};")
# load DATA local infileステートメント実行許可設定
db.execute("SET GLOBAL local_infile=on;")
# orgにload ※warningは1148エラーになるらしい
sql = f"LOAD DATA LOCAL INFILE {dat_file} INTO TABLE {table_name_org} FIELDS TERMINATED BY '\t' ENCLOSED BY ""'"" IGNORE 1 LINES;"
sql = f"LOAD DATA LOCAL INFILE '{src_file_name}' INTO TABLE {table_name_org} FIELDS TERMINATED BY '\\t' ENCLOSED BY \"'\" IGNORE 1 LINES;"
cnt = db.execute(sql)
logger.info(f'tsvデータをorgテーブルにLOAD : 件数({cnt})')
@ -39,18 +44,21 @@ class JjskDataLoadManager:
db.disconnect()
return
@classmethod
def Load(self, target: dict):
logger.debug(f'JjskDataLoadManager#load start target:{target}')
# target : {"condkey": key, "src_file_path":local_file_path}
# データファイルオープン
dat_file = VjskRecvFileManager.file_open(target["local_file_path"])
local_file_name = target["src_file_path"]
# dat_file = VjskDatFile.retrieve_from_file(local_file_name)
# TODO: tsvファイルをload投入用のDMLに加工(システム日時つけたり、エンコードをUTF-8に変換したり)
# TODO: ファイルンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58
# TODO: ファイルンコード判定の参考 https://zenn.dev/takedato/articles/c3a491546f8c58
# TODO: エンコード変換の参考 https://dev.classmethod.jp/articles/python-encoding/
dat_file = dat_file
# データベース登録
self._import_to_db(dat_file, target["condkey"])
self._import_to_db(local_file_name, target["condkey"])
logger.debug('JjskDataLoadManager#load end')
return

View File

@ -9,7 +9,7 @@ from src.logging.get_logger import get_logger
# from src.batch.datachange import emp_chg_inst_lau
logger = get_logger('V実消化データ取込')
_logger = get_logger('V実消化データ取込')
batch_context = BatchContext.get_instance()
vjsk_recv_bucket = VjskBucket()
vjsk_mapper = VjskRecvFileMapper()
@ -28,11 +28,11 @@ def _check_if_file_exists(src_list: list, key: str) -> bool:
def _check_received_files():
"""V実消化連携データ存在確認処理"""
logger.debug('V実消化連携データ存在確認処理開始')
_logger.debug('V実消化連携データ存在確認処理開始')
# 実消化&アルトマーク V実消化データ受領バケットにあるファイル一覧を取得
received_files = vjsk_recv_bucket.get_s3_file_list()
logger.debug(f'ファイル一覧{received_files}')
_logger.debug(f'ファイル一覧{received_files}')
# ファイル存在確認 卸在庫データファイル(卸在庫データ処理対象日のみ実施)
if batch_context.is_import_target_vjsk_stockslipdata:
@ -87,13 +87,13 @@ def _check_received_files():
if not _check_if_file_exists(received_files, vjsk_mapper.CONDKEY_MDB_CONV_MST):
raise BatchOperationException(f'MDBコード変換マスタファイルがありません ファイル一覧:{received_files}')
logger.debug('V実消化連携データ存在確認処理終了')
_logger.debug('V実消化連携データ存在確認処理終了')
return True
def _import_file_to_db():
logger.debug('V実消化取込処理開始')
_logger.debug('V実消化取込処理開始')
# 実消化&アルトマーク V実消化データ受領バケットにあるファイルパス一覧を取得
received_s3_files = vjsk_recv_bucket.get_s3_file_list()
@ -101,11 +101,11 @@ def _import_file_to_db():
# ファイルパス一覧にマッピング情報を参照するためのキーを持たせて辞書可する
target_dict = {}
for s3_file_path in received_s3_files:
local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path)
key = vjsk_mapper.get_condkey_by_s3_file_path(local_file_path)
local_file_path = vjsk_recv_bucket.download_data_file(s3_file_path.get('filename'))
key = vjsk_mapper.get_condkey_by_s3_file_path(s3_file_path.get('filename'))
if key is not None:
target_dict[key] = {"condkey": key, "src_file_path": local_file_path}
logger.debug(f'S3ファイルパス辞書{target_dict}')
_logger.debug(f'S3ファイルパス辞書{target_dict}')
# TODO: diff_upsertに変わるやつを呼び出す
# TODO: emp_chg_inst_lau.batch_process() みたいに
@ -150,7 +150,7 @@ def _import_file_to_db():
# # # ファイル存在確認 MDBコード変換マスタ
# JjskDataLoadManager.Load(target_dict[vjsk_mapper.CONDKEY_MDB_CONV_MST])
logger.debug('V実消化取込処理終了')
_logger.debug('V実消化取込処理終了')
def _determine_today_is_stockslipdata_target():
@ -162,37 +162,39 @@ def _determine_today_is_stockslipdata_target():
targetdays = CalendarWholwSalerStockFile(holiday_list_file_path)
ret = targetdays.compare_date(today)
except Exception as e:
logger.error(f'{e}')
_logger.error(f'{e}')
raise e
return ret
def exec():
"""V実消化データ取込"""
logger.info('Start Jitsusyouka Torikomi PGM.')
_logger.info('Start Jitsusyouka Torikomi PGM.')
# 卸在庫データ取込対象日であれば、卸在庫データ処理対象フラグを立てる
logger.debug('卸在庫データ取込対象日であるかを判定')
_logger.debug('卸在庫データ取込対象日であるかを判定')
batch_context.is_import_target_vjsk_stockslipdata = _determine_today_is_stockslipdata_target()
logger.debug(f'判定結果 : {batch_context.is_import_target_vjsk_stockslipdata}')
_logger.debug(f'判定結果 : {batch_context.is_import_target_vjsk_stockslipdata}')
if batch_context.is_import_target_vjsk_stockslipdata:
logger.info('卸在庫データ取込対象日です')
_logger.info('卸在庫データ取込対象日です')
# V実消化データファイル受領チェック
logger.debug('V実消化データファイル受領チェック開始')
_logger.debug('V実消化データファイル受領チェック開始')
try:
# S3バケット上でV実消化データファイルの存在チェックをする
_check_received_files()
except BatchOperationException as e:
logger.error('受領したV実消化データファイルに欠落があります')
_logger.error('受領したV実消化データファイルに欠落があります')
raise e
logger.debug('V実消化データファイル受領チェック終了')
_logger.debug('V実消化データファイル受領チェック終了')
# データベース取込
logger.debug('V実消化データ取込開始')
_logger.debug('V実消化データ取込開始')
try:
_import_file_to_db()
except Exception as e:
logger.error(f'データベース登録失敗 {e}')
logger.debug('V実消化データ取込終了')
_logger.error(f'データベース登録失敗 {e}')
raise e
_logger.debug('V実消化データ取込終了')

View File

@ -1,60 +1,60 @@
import csv
from io import TextIOWrapper
# import csv
# from io import TextIOWrapper
class VjskRecvFileManager:
layout_class: str
records: list[str]
# class VjskRecvFileManager:
# layout_class: str
# records: list[str]
def __init__(self, dat_line: list[str]) -> None:
self.layout_class = dat_line[0]
self.records = dat_line
# def __init__(self, dat_line: list[str]) -> None:
# self.layout_class = dat_line[0]
# self.records = dat_line
class VjskDatFile:
"""V実消化データファイル"""
# class VjskDatFile:
# """V実消化データファイル"""
lines: list[VjskRecvFileManager]
success_count: int = 0
error_count: int = 0
total_count: int = 0
__i: int = 0
# lines: list[VjskRecvFileManager]
# success_count: int = 0
# error_count: int = 0
# total_count: int = 0
# __i: int = 0
def __iter__(self):
return self
# def __iter__(self):
# return self
def __next__(self) -> VjskRecvFileManager:
if self.__i == len(self.lines):
raise StopIteration()
line = self.lines[self.__i]
self.__i += 1
return line
# def __next__(self) -> VjskRecvFileManager:
# if self.__i == len(self.lines):
# raise StopIteration()
# line = self.lines[self.__i]
# self.__i += 1
# return line
def __init__(self, file: TextIOWrapper) -> None:
reader = csv.reader(file)
csv_rows = [VjskRecvFileManager(row) for row in reader]
# def __init__(self, file: TextIOWrapper) -> None:
# reader = csv.reader(file)
# csv_rows = [VjskRecvFileManager(row) for row in reader]
self.lines = csv_rows
self.total_count = len(csv_rows)
# self.lines = csv_rows
# self.total_count = len(csv_rows)
def count_up_success(self):
self.success_count += 1
# def count_up_success(self):
# self.success_count += 1
def count_up_error(self):
self.error_count += 1
# def count_up_error(self):
# self.error_count += 1
@classmethod
def file_open(cls, local_file_path: str):
"""V実消化データファイルを読み込み、新しいインスタンスを作成する
# @classmethod
# def retrieve_from_file(cls, local_file_path: str):
# """V実消化データファイルを読み込み、新しいインスタンスを作成する
Args:
local_file_path (str): ローカルのファイルパス
# Args:
# local_file_path (str): ローカルのファイルパス
Returns:
VjskDatFile: このクラスのインスタンス
"""
# cp932(Shift-JIS Windows拡張)でファイルを読み込む
file = open(local_file_path, encoding='cp932')
instance = cls(file)
file.close()
return instance
# Returns:
# VjskDatFile: このクラスのインスタンス
# """
# # cp932(Shift-JIS Windows拡張)でファイルを読み込む
# file = open(local_file_path, encoding='cp932')
# instance = cls(file)
# file.close()
# return instance

View File

@ -168,8 +168,9 @@ class VjskRecvFileMapper:
def get_condkey_by_s3_file_path(self, s3_file_path: str) -> str:
ret = None
filename = s3_file_path[s3_file_path.rfind("/") + 1:]
for element in self._VJSK_INTERFACE_MAPPING:
for condkey in self._VJSK_INTERFACE_MAPPING:
element = self._VJSK_INTERFACE_MAPPING.get(condkey)
if filename.startswith(element.get(self._KEY_FILE_PREFIX)) and filename.endswith(element.get(self._KEY_FILE_SUFFIX)):
ret = element
ret = condkey
break
return ret