feat: 一括登録モードを実装

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-09 18:30:43 +09:00
parent a3d72c20d3
commit e6ebb1fd32
2 changed files with 124 additions and 11 deletions

View File

@ -13,7 +13,7 @@ from error import error
DIRECTORY_WORK = '/work/'
LOCAL_DIRECTORY_TMP = '/tmp'
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat'
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
SETTINGS_ITEM = {
'dataSource': 0,
@ -217,7 +217,7 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile:
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile:
csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
for row in csv_reader:

View File

@ -4,7 +4,7 @@ from datetime import datetime
import boto3
import pymysql
from chk import LOCAL_UNCOMPRESSED_FILE_PATH
from chk import LOCAL_TEMPORARY_FILE_PATH
from common import convert_quotechar, debug_log
from error import error
from pymysql.constants import CLIENT
@ -46,6 +46,15 @@ LINE_FEED_CODE = {
'CRLF': '\r\n',
}
# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する
MYSQL_CHARSET_CODE = {
'utf-8': 'utf8mb4',
'utf8': 'utf8mb4',
'utf-8-sig': 'utf8mb4',
'shift_jis': 'cp932',
'cp932': 'cp932',
}
DIRECTORY_SETTINGS = '/settings/'
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
@ -89,7 +98,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"],
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
@ -129,11 +138,14 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]]
if not bulk_import_flag or int(bulk_import_flag) == 0:
# 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う
org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode,
settings_list, conn)
org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode,
settings_list, conn)
elif bulk_import_flag and int(bulk_import_flag) == 1:
# 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
org_import_process_result = bulk_import()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します')
org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode,
settings_list, conn)
else:
# nop
pass
@ -298,7 +310,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
error(bucket_name, target_data_source, target_file_name, log_info)
def row_per_commit_import(
def import_data_with_commit_per_row(
bucket_name: str,
target_data_source: str,
target_file_name: str,
@ -335,7 +347,7 @@ def row_per_commit_import(
elif compressed_flag and int(compressed_flag) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f:
with open(LOCAL_TEMPORARY_FILE_PATH, 'rb') as f:
import_data_bytes = f.read()
else:
# nop
@ -426,8 +438,109 @@ def row_per_commit_import(
}
def bulk_import():
pass
def import_data_with_bulk(
bucket_name: str,
target_data_source: str,
target_file_name: str,
log_info: str,
mode: str,
settings_list: list[dict],
conn: pymysql.Connection) -> dict:
"""一括登録モードでロードスキーマに登録を行います
Args:
bucket_name (str): バケット名
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name (str): 投入データのファイル名
log_info (str): ログに記載するデータソース名とファイル名
mode (str): 処理モード
settings_list (list[dict]): 設定ファイル
conn (pymysql.Connection): DB接続
Returns:
dict: 処理件数投入データ件数正常終了件数ワーニング件数とワーニング情報の辞書オブジェクト
"""
# ⑤-2 一括登録を行う
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
if not compressed_flag or int(compressed_flag) == 0:
# 圧縮フラグがOFFの場合、投入データをtmpディレクトリにダウンロードする
work_key = target_data_source + DIRECTORY_WORK + target_file_name
s3_client.download_file(
Bucket=bucket_name, Key=work_key, Filename=LOCAL_TEMPORARY_FILE_PATH)
elif compressed_flag and int(compressed_flag) == 1:
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読むため、何もしない。
pass
else:
# nop
pass
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
has_header = settings_list[SETTINGS_ITEM["headerFlag"]] or int(
settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
).split(',')
enclosed_by = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]])
sql = f"""
SET @file_row_cnt = 1;
LOAD DATA LOCAL
INFILE %s
REPLACE
INTO TABLE {load_schema_name}
CHARACTER SET {MYSQL_CHARSET_CODE[settings_list[SETTINGS_ITEM["charCode"]]]}
FIELDS TERMINATED BY '{settings_list[SETTINGS_ITEM["delimiter"]]}'
ENCLOSED BY '{enclosed_by if enclosed_by != "'" else "\\'"}'
LINES TERMINATED BY '{LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]}'
{'IGNORE 1 LINES' if has_header else ''}
({','.join([column for column in settings_db_columu_list])})
SET
-- 取込ファイル名
file_name = %s,
-- 取込ファイル行番号
file_row_cnt = (@file_row_cnt := @file_row_cnt + 1),
-- 論理削除フラグ
delete_flg = 0,
-- 登録者
ins_user = CURRENT_USER(),
-- 登録日時
ins_date = CURRENT_TIMESTAMP(),
-- 更新者
upd_user = NULL,
-- 更新日時
upd_date = NULL
;
"""
debug_log(sql, log_info, mode)
try:
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql, [LOCAL_TEMPORARY_FILE_PATH, target_file_name])
# 一括登録モードの場合、LOAD文の成功行数を取得してprocess_countにする
cur.execute("SELECT ROW_COUNT()")
process_count = cur.fetchone()[0]
conn.commit()
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} 一括登録モードのSQL実行に失敗しました。エラー内容 {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# 一括登録の場合、クエリ実行に成功したら、処理件数と成功件数は同じにする
normal_count = process_count
return {
"counts": {
"process": process_count,
"normal": normal_count,
"warning": 0 # 一括登録時はワーニングにならずエラーになるため、必ず0件
},
"warning_info": '' # 一括登録時はワーニングにならずエラーになるため、空文字を返却
}
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):