feat: 一括登録フラグにより一行コミットか一括登録かを分岐するようにした。まずは一行コミットモードを関数に移植。
This commit is contained in:
parent
06d741e994
commit
a3d72c20d3
@ -125,102 +125,27 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
|
||||
f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました')
|
||||
|
||||
# ⑤-1 投入データファイルを1行ごとにループする
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
||||
import_data_bytes = None
|
||||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||||
if not compressed_flag or int(compressed_flag) == 0:
|
||||
# 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む
|
||||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||||
work_response = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=work_key)
|
||||
import_data_bytes = work_response["Body"].read()
|
||||
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む
|
||||
elif compressed_flag and int(compressed_flag) == 1:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
|
||||
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f:
|
||||
import_data_bytes = f.read()
|
||||
org_import_process_result = None
|
||||
bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]]
|
||||
if not bulk_import_flag or int(bulk_import_flag) == 0:
|
||||
# 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う
|
||||
org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode,
|
||||
settings_list, conn)
|
||||
elif bulk_import_flag and int(bulk_import_flag) == 1:
|
||||
# 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
|
||||
org_import_process_result = bulk_import()
|
||||
else:
|
||||
# nop
|
||||
pass
|
||||
|
||||
work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
process_count = 0 # 処理件数カウンタ
|
||||
normal_count = 0 # 正常終了件数カウンタ
|
||||
warning_count = 0 # ワーニング終了件数カウンター
|
||||
warning_info = '' # ワーニング情報
|
||||
index = 0 # ループインデックス
|
||||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||||
).split(',')
|
||||
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip(
|
||||
).split(',')
|
||||
|
||||
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||
try:
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
|
||||
index += 1
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします')
|
||||
continue
|
||||
|
||||
# 処理件数カウント
|
||||
process_count += 1
|
||||
|
||||
# SQL文生成
|
||||
query_parameter_list = []
|
||||
sql = f'INSERT INTO {load_schema_name} ('
|
||||
for db_column in settings_db_columu_list:
|
||||
sql = f'{sql} {db_column},'
|
||||
sql = f'{sql} file_name,' # システム項目:取込ファイル名
|
||||
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
|
||||
sql = f'{sql} ins_user,' # システム項目:登録者
|
||||
sql = f'{sql} ins_date,' # システム項目:登録日時
|
||||
sql = f'{sql} upd_user,' # システム項目:更新者
|
||||
sql = f'{sql} upd_date)' # システム項目:更新日時
|
||||
sql = f'{sql} VALUES ('
|
||||
for i in range(len(line)):
|
||||
# データ項目値が0桁より大きいかチェックする
|
||||
if len(line[i]) == 0:
|
||||
# 0桁の場合
|
||||
sql = f'{sql} NULL,'
|
||||
continue
|
||||
|
||||
# データ項目値の変換処理(カンマ除去)
|
||||
org_column_value = line[i]
|
||||
current_settings_db_column_name = settings_db_columu_list[i]
|
||||
column_value = convert_column_value(
|
||||
org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
||||
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
||||
query_parameter_list.append(column_value)
|
||||
sql = f'{sql} %s,'
|
||||
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
|
||||
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} "0",' # システム項目:論理削除フラグ
|
||||
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
|
||||
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
|
||||
sql = f'{sql} NULL,' # システム項目:更新者
|
||||
sql = f'{sql} NULL)' # システム項目:更新日時
|
||||
|
||||
index += 1
|
||||
|
||||
debug_log(sql, log_info, mode)
|
||||
|
||||
# ロードスキーマのトランザクション開始
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, query_parameter_list)
|
||||
conn.commit()
|
||||
normal_count += 1
|
||||
except Exception as e:
|
||||
warning_count += 1
|
||||
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
||||
|
||||
# 処理件数カウンタ
|
||||
process_count = org_import_process_result['counts']['process']
|
||||
# 正常終了件数カウンタ
|
||||
normal_count = org_import_process_result['counts']['normal']
|
||||
# ワーニング終了件数カウンター
|
||||
warning_count = org_import_process_result['counts']['warning']
|
||||
# ワーニング情報
|
||||
warning_info = org_import_process_result['warning_info']
|
||||
# ⑥ ⑤の処理結果件数をログ出力する
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
|
||||
@ -250,6 +175,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
# SQL文生成
|
||||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||||
).split(',')
|
||||
sql = f'INSERT INTO {storage_schema_name} ('
|
||||
for i in range(len(settings_db_columu_list)):
|
||||
sql = f'{sql} {settings_db_columu_list[i]},'
|
||||
@ -371,6 +298,138 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
|
||||
def row_per_commit_import(
|
||||
bucket_name: str,
|
||||
target_data_source: str,
|
||||
target_file_name: str,
|
||||
log_info: str,
|
||||
mode: str,
|
||||
settings_list: list[dict],
|
||||
conn: pymysql.Connection) -> dict:
|
||||
"""一行コミットモードでロードスキーマに登録を行います
|
||||
|
||||
Args:
|
||||
bucket_name (str): バケット名
|
||||
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
|
||||
target_file_name (str): 投入データのファイル名
|
||||
log_info (str): ログに記載するデータソース名とファイル名
|
||||
mode (str): 処理モード
|
||||
settings_list (list[dict]): 設定ファイル
|
||||
conn (pymysql.Connection): DB接続
|
||||
|
||||
Returns:
|
||||
dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト
|
||||
"""
|
||||
# ⑤-1 投入データファイルを1行ごとにループする
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
||||
import_data_bytes = None
|
||||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||||
if not compressed_flag or int(compressed_flag) == 0:
|
||||
# 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む
|
||||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||||
work_response = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=work_key)
|
||||
import_data_bytes = work_response["Body"].read()
|
||||
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む
|
||||
elif compressed_flag and int(compressed_flag) == 1:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
|
||||
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f:
|
||||
import_data_bytes = f.read()
|
||||
else:
|
||||
# nop
|
||||
pass
|
||||
|
||||
work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
process_count = 0 # 処理件数カウンタ
|
||||
normal_count = 0 # 正常終了件数カウンタ
|
||||
warning_count = 0 # ワーニング終了件数カウンター
|
||||
warning_info = '' # ワーニング情報
|
||||
index = 0 # ループインデックス
|
||||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||||
).split(',')
|
||||
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip(
|
||||
).split(',')
|
||||
|
||||
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||
try:
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
|
||||
index += 1
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします')
|
||||
continue
|
||||
|
||||
# 処理件数カウント
|
||||
process_count += 1
|
||||
|
||||
# SQL文生成
|
||||
query_parameter_list = []
|
||||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
|
||||
for db_column in settings_db_columu_list:
|
||||
sql = f'{sql} {db_column},'
|
||||
sql = f'{sql} file_name,' # システム項目:取込ファイル名
|
||||
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
|
||||
sql = f'{sql} ins_user,' # システム項目:登録者
|
||||
sql = f'{sql} ins_date,' # システム項目:登録日時
|
||||
sql = f'{sql} upd_user,' # システム項目:更新者
|
||||
sql = f'{sql} upd_date)' # システム項目:更新日時
|
||||
sql = f'{sql} VALUES ('
|
||||
for i in range(len(line)):
|
||||
# データ項目値が0桁より大きいかチェックする
|
||||
if len(line[i]) == 0:
|
||||
# 0桁の場合
|
||||
sql = f'{sql} NULL,'
|
||||
continue
|
||||
|
||||
# データ項目値の変換処理(カンマ除去)
|
||||
org_column_value = line[i]
|
||||
current_settings_db_column_name = settings_db_columu_list[i]
|
||||
column_value = convert_column_value(
|
||||
org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
||||
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
||||
query_parameter_list.append(column_value)
|
||||
sql = f'{sql} %s,'
|
||||
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
|
||||
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} "0",' # システム項目:論理削除フラグ
|
||||
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
|
||||
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
|
||||
sql = f'{sql} NULL,' # システム項目:更新者
|
||||
sql = f'{sql} NULL)' # システム項目:更新日時
|
||||
|
||||
index += 1
|
||||
|
||||
debug_log(sql, log_info, mode)
|
||||
|
||||
# ロードスキーマのトランザクション開始
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, query_parameter_list)
|
||||
conn.commit()
|
||||
normal_count += 1
|
||||
except Exception as e:
|
||||
warning_count += 1
|
||||
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
||||
|
||||
return {
|
||||
"counts": {
|
||||
"process": process_count,
|
||||
"normal": normal_count,
|
||||
"warning": warning_count
|
||||
},
|
||||
"warning_info": warning_info
|
||||
}
|
||||
|
||||
|
||||
def bulk_import():
|
||||
pass
|
||||
|
||||
|
||||
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
|
||||
"""DB接続切断処理
|
||||
Args:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user