newdwh2021/ecs/jskult-batch-daily/src/batch/vjsk/vjsk_data_load_manager.py
2023-07-18 13:52:58 +09:00

163 lines
7.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# from src.batch.vjsk.vjsk_recv_file_manager import VjskDatFile
import os
from src.batch.vjsk.vjsk_recv_file_mapper import VjskReceiveFileMapper
from src.db.database import Database
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
logger = get_logger('V実消化データ取込(DB登録)')
mapper = VjskReceiveFileMapper()
class VjskDataLoadManager:
"""
V実消化データ取込機能クラス
"""
def __init__(self):
pass
def _import_to_db(src_file_name: str, condkey: str):
"""
概要
指定されたtsvファイル src_file_name を策席スキーマに登録する
引数
src_file_name: ローカルストレージにある取込対象tsvファイルパス
condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値)
"""
logger.debug(f"_import_to_db start (src_file_name : {src_file_name}, condkey : {condkey})")
db = Database.get_instance()
data_name = mapper.get_data_name(condkey)
table_name_org = mapper.get_org_table(condkey)
table_name_src = mapper.get_src_table(condkey)
upsert_sql = mapper.get_upsert_sql(condkey)
try:
# データベース接続
db.connect()
db.execute("SET SESSION sql_mode = 'TRADITIONAL';")
# orgをtruncate
db.execute(f"TRUNCATE TABLE {table_name_org};")
# orgにload ※warningが発生すれば異常終了させる
sql = f"""\
LOAD DATA LOCAL INFILE :src_file_name
INTO TABLE {table_name_org}
FIELDS TERMINATED BY '\\t'
ENCLOSED BY '\"'
IGNORE 1 LINES;
"""
db.begin()
result = db.execute(sql, {"src_file_name": src_file_name})
logger.info(f'{data_name}tsvファイルを{table_name_org}にLOAD : 件数({result.rowcount})')
db.commit()
# org→srcにinsert select
db.begin()
logger.debug(upsert_sql)
db.execute(upsert_sql)
# MEMO: insert+selectの結果件数は、LOAD結果と必ず等しいので、executeの結果件数はログ出力しない
# MEMO: insert+select 実質10件なのに、result.rowcountは20件になってしまう ※sqlalchemyの仕様
# MEMO: https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount
logger.info(f'{table_name_org}{table_name_src}にUPSERT')
# データベースコミット
db.commit()
except Exception as e:
db.rollback()
raise BatchOperationException(e)
finally:
# データベース切断
db.disconnect()
logger.debug("_import_to_db done")
return
def _get_tsv_last_row_tab_count(src_file_name: str) -> int:
"""
概要
指定されたtsvファイル src_file_name の末尾行に含まれるタブ文字数を取得する
引数
src_file_name: ローカルストレージにある取込対象tsvファイルパス
"""
# memo: tsvファイルが数百MBに及ぶことを想定して、末尾から1行分を参照する
# memo: 前提1 行区切りは LF('\n')
# memo: 前提2 正常時のファイル終端にある文字は、末尾行の LF('\n')
# memo: 前提3 ファイルエンコードはBOM付UTF-8(先頭3byteが b'\xEF' + b'\xBB' + b'\xBF' )
buf_count = 0
# ファイルサイズ取得
file_size = os.path.getsize(src_file_name)
# ファイルサイズが0byteなら処理終了
if file_size == 0:
return buf_count
# バイナリモードでファイルオープン
with open(src_file_name, 'rb') as file:
# ファイルポインタを末尾に移動
file.seek(0, os.SEEK_END)
# ファイルポインタが先頭+1になるまで逆方向にシークする
while file.tell() > 1:
# 2byte戻って
file.seek(-2, os.SEEK_CUR)
# 1byte読む(同時に+1シークする)
char = file.read(1)
# 行区切りを検出したらループ終了
# memo: UTF-8 バイトシーケンスとして、b'\n' が全角文字の一部にはならない
if char == b'\n':
break
# ファイル先頭のBOM3byte目の BF を検出したらループ終了
# memo: UTF-8 バイトシーケンスとして、b'\xbf' が全角文字の一部の可能性がある(例:全角片仮名の「タ」)
# memo: charに代入したときのfile.read(1)によって、ファイルポインタは2→3になっている前提のロジック
if char == b'\xbf' and file.tell() == 3:
break
last_line = file.readline().decode('utf-8-sig').rstrip('\n')
buf_count = last_line.count('\t')
return buf_count
@classmethod
def load(self, target: dict):
"""
概要
取込対象受領ファイル target をデータベースに登録する
引数
target: {
condkey: 受領データの種類を一意に示す値(VjskReceiveFileMapperクラスで管理されているCONDKEY値)
src_file_path: ローカルストレージにある取込対象tsvファイルパス
}
"""
logger.debug(f'load start target:{target}')
# S3からローカルストレージにdownloadした登録対象のtsvファイルパスを取得
local_file_name = target["src_file_path"]
# tsvファイル末尾行のTABの数が総定数と一致しない場合は例外をスロー
# memo:
# 対向元システムで生成されるファイルは稀に途中欠落が発生することがある。
# これを、ファイルMySQL8.0のLOADステートメントで発生するWARNING/ERRORでは検知不可能なので、
# LOADステートメント実行前に、物理的に途中欠落があるかを検知してエラーとすることが目的。
tsv_tabs = self._get_tsv_last_row_tab_count(local_file_name)
expect_tabs = mapper.get_file_column_separators(target["condkey"])
if tsv_tabs != expect_tabs:
msg = [
"受領tsvファイルの末尾行のTABの数が総定数と一致しませんでした",
f"local_file_name: {local_file_name}",
f"末尾行のtab数: {tsv_tabs}",
f"tab想定数: {expect_tabs}"
]
raise BatchOperationException(' '.join(msg))
# データベース登録
self._import_to_db(local_file_name, target["condkey"])
logger.debug('load done')
return