597 lines
29 KiB
Python
597 lines
29 KiB
Python
import csv
|
||
import io
|
||
from datetime import datetime
|
||
|
||
import boto3
|
||
import pymysql
|
||
from common import (DIRECTORY_SETTINGS, DIRECTORY_WORK, ERROR, INFO,
|
||
LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH,
|
||
MYSQL_CHARSET_CODE, SETTINGS_ITEM, WARNING,
|
||
convert_quotechar, debug_log)
|
||
from error import error
|
||
from pymysql.constants import CLIENT
|
||
|
||
# 定数
|
||
|
||
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
|
||
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
|
||
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
|
||
|
||
# クラス変数
|
||
s3_client = boto3.client('s3')
|
||
|
||
|
||
def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
|
||
"""メイン処理
|
||
Args:
|
||
bucket_name : バケット名
|
||
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name : 投入データのファイル名
|
||
settings_key : 投入データに該当する個別設定ファイルのフルパス
|
||
db_info : データベース情報
|
||
log_info : ログに記載するデータソース名とファイル名
|
||
mode : 処理モード
|
||
Returns:
|
||
warning_info : Warning情報
|
||
"""
|
||
|
||
try:
|
||
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
|
||
debug_log(
|
||
f'引数 target_data_source : {target_data_source}', log_info, mode)
|
||
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
|
||
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
|
||
debug_log(f'引数 db_info : {db_info}', log_info, mode)
|
||
debug_log(f'引数 log_info : {log_info}', log_info, mode)
|
||
debug_log(f'引数 mode : {mode}', log_info, mode)
|
||
|
||
# ① メイン処理開始ログを出力する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します')
|
||
|
||
# ② DB接続を開始する
|
||
conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"],
|
||
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True)
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました')
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
try:
|
||
# ③ タイムゾーンを変更する
|
||
with conn.cursor() as cur:
|
||
cur.execute(f'SET time_zone = "+9:00"')
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-03 - タイムゾーンを変更しました')
|
||
|
||
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||
# 個別設定ファイルの読み込み
|
||
settings_response = s3_client.get_object(
|
||
Bucket=bucket_name, Key=settings_key)
|
||
settings_list = []
|
||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||
settings_list.append(line.rstrip('\n'))
|
||
|
||
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
|
||
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
|
||
settings_list.append('')
|
||
|
||
# ロードスキーマのTRUNCATE
|
||
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
|
||
with conn.cursor() as cur:
|
||
sql_truncate = f'TRUNCATE table {load_schema_name}'
|
||
cur.execute(sql_truncate)
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
|
||
f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました')
|
||
|
||
org_import_process_result = None
|
||
bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]]
|
||
if not bulk_import_flag or int(bulk_import_flag) == 0:
|
||
# ⑤-1 一括登録フラグが未設定、またはOFFの場合、1行コミットモードでロードスキーマに登録を行う
|
||
org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode,
|
||
settings_list, conn)
|
||
elif bulk_import_flag and int(bulk_import_flag) == 1:
|
||
# ⑤-2 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します')
|
||
org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode,
|
||
settings_list, conn)
|
||
else:
|
||
# nop
|
||
pass
|
||
|
||
# 処理件数カウンタ
|
||
process_count = org_import_process_result['counts']['process']
|
||
# 正常終了件数カウンタ
|
||
normal_count = org_import_process_result['counts']['normal']
|
||
# ワーニング終了件数カウンター
|
||
warning_count = org_import_process_result['counts']['warning']
|
||
# ワーニング情報
|
||
warning_info = org_import_process_result['warning_info']
|
||
# ⑥ ⑤の処理結果件数をログ出力する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
|
||
if warning_info:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-02 - Warning終了件数:{warning_count}')
|
||
|
||
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
||
storage_schema_name = settings_list[SETTINGS_ITEM["storageSchemaName"]]
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-08 - ' +
|
||
f'ロードスキーマ({load_schema_name})のデータを蓄積スキーマ({storage_schema_name})に登録します')
|
||
|
||
# インポート方法判断
|
||
try:
|
||
if truncate_judge(settings_list):
|
||
with conn.cursor() as cur:
|
||
sql_truncate = f'TRUNCATE table {storage_schema_name}'
|
||
cur.execute(sql_truncate)
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
|
||
f' I-MAIN-20 - {storage_schema_name} をTRUNCATEしました')
|
||
|
||
except InvalidConfigException as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-01 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
# SQL文生成
|
||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||
).split(',')
|
||
sql = f'INSERT INTO {storage_schema_name} ('
|
||
for i in range(len(settings_db_columu_list)):
|
||
sql = f'{sql} {settings_db_columu_list[i]},'
|
||
sql = f'{sql} file_name,' # システム項目:取込ファイル名
|
||
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
|
||
sql = f'{sql} ins_user,' # システム項目:登録者
|
||
sql = f'{sql} ins_date,' # システム項目:登録日時
|
||
sql = f'{sql} upd_user,' # システム項目:更新者
|
||
sql = f'{sql} upd_date)' # システム項目:更新日時
|
||
sql = f'{sql} SELECT'
|
||
for i in range(len(settings_db_columu_list)):
|
||
sql = f'{sql} t.{settings_db_columu_list[i]},'
|
||
sql = f'{sql} t.file_name,' # システム項目:取込ファイル名
|
||
sql = f'{sql} t.file_row_cnt,' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} t.delete_flg,' # システム項目:論理削除フラグ
|
||
sql = f'{sql} t.ins_user,' # システム項目:登録者
|
||
sql = f'{sql} t.ins_date,' # システム項目:登録日時
|
||
sql = f'{sql} t.upd_user,' # システム項目:更新者
|
||
sql = f'{sql} t.upd_date' # システム項目:更新日時
|
||
sql = f'{sql} FROM {load_schema_name} as t'
|
||
sql = f'{sql} ON DUPLICATE KEY UPDATE'
|
||
for i in range(len(settings_db_columu_list)):
|
||
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
|
||
# システム項目:取込ファイル名
|
||
sql = f'{sql} file_name=t.file_name,'
|
||
# システム項目:取込ファイル行番号
|
||
sql = f'{sql} file_row_cnt=t.file_row_cnt,'
|
||
# システム項目:論理削除フラグ
|
||
sql = f'{sql} delete_flg={storage_schema_name}.delete_flg,'
|
||
# システム項目:登録者
|
||
sql = f'{sql} ins_user={storage_schema_name}.ins_user,'
|
||
# システム項目:登録日時
|
||
sql = f'{sql} ins_date={storage_schema_name}.ins_date,'
|
||
# システム項目:更新者
|
||
sql = f'{sql} upd_user=t.ins_user,'
|
||
# システム項目:更新日時
|
||
sql = f'{sql} upd_date=t.ins_date'
|
||
|
||
debug_log(sql, log_info, mode)
|
||
|
||
# トランザクション開始
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-09 - ' +
|
||
f'標準SQL:{storage_schema_name} のトランザクションを開始します')
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
conn.commit()
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-10 - ' +
|
||
f'標準SQL:{storage_schema_name} のCOMMIT処理が正常終了しました')
|
||
|
||
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
|
||
ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]]
|
||
|
||
if ex_sql_file_name:
|
||
try:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-13 - 拡張SQLファイル名:{ex_sql_file_name} の存在チェック')
|
||
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name
|
||
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
|
||
ex_sql_file_exists = True
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
|
||
except Exception:
|
||
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
|
||
ex_sql_file_exists = False
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-03 - 拡張SQLファイルが存在しません')
|
||
|
||
try:
|
||
if ex_sql_file_exists:
|
||
# 拡張SQLファイルからSQL文生成
|
||
ex_sql_obj_response = s3_client.get_object(
|
||
Bucket=bucket_name, Key=ex_sql_key)
|
||
ex_sql = ''
|
||
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
|
||
ex_sql = f'{ex_sql} {line.rstrip()}'
|
||
|
||
# トランザクション開始
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-15 - 拡張SQL:{storage_schema_name} のトランザクションを開始します')
|
||
with conn.cursor() as cur:
|
||
cur.execute(ex_sql)
|
||
conn.commit()
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-16 - 拡張SQL:{storage_schema_name} のCOMMIT処理が正常終了しました')
|
||
except Exception as e:
|
||
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n'
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}')
|
||
else:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
|
||
|
||
# ⑨ DB接続を終了する
|
||
connection_close(conn, bucket_name, target_data_source,
|
||
target_file_name, log_info)
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
|
||
connection_close(conn, bucket_name, target_data_source,
|
||
target_file_name, log_info)
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
try:
|
||
# ⑩ メイン処理終了ログを出力する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-19 - メイン処理を終了します')
|
||
|
||
return warning_info
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
|
||
def import_data_with_commit_per_row(
|
||
bucket_name: str,
|
||
target_data_source: str,
|
||
target_file_name: str,
|
||
log_info: str,
|
||
mode: str,
|
||
settings_list: list[dict],
|
||
conn: pymysql.Connection) -> dict:
|
||
"""一行コミットモードでロードスキーマに登録を行います
|
||
|
||
Args:
|
||
bucket_name (str): バケット名
|
||
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name (str): 投入データのファイル名
|
||
log_info (str): ログに記載するデータソース名とファイル名
|
||
mode (str): 処理モード
|
||
settings_list (list[dict]): 設定ファイル
|
||
conn (pymysql.Connection): DB接続
|
||
|
||
Returns:
|
||
dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト
|
||
"""
|
||
# ⑤-1 投入データファイルを1行ごとにループする
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
||
import_data_bytes = None
|
||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||
if not compressed_flag or int(compressed_flag) == 0:
|
||
# 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む
|
||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||
work_response = s3_client.get_object(
|
||
Bucket=bucket_name, Key=work_key)
|
||
import_data_bytes = work_response["Body"].read()
|
||
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む
|
||
elif compressed_flag and int(compressed_flag) == 1:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します')
|
||
with open(LOCAL_TEMPORARY_FILE_PATH, 'rb') as f:
|
||
import_data_bytes = f.read()
|
||
else:
|
||
# nop
|
||
pass
|
||
|
||
work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||
process_count = 0 # 処理件数カウンタ
|
||
normal_count = 0 # 正常終了件数カウンタ
|
||
warning_count = 0 # ワーニング終了件数カウンター
|
||
warning_info = '' # ワーニング情報
|
||
index = 0 # ループインデックス
|
||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||
).split(',')
|
||
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip(
|
||
).split(',')
|
||
|
||
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||
try:
|
||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
|
||
index += 1
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします')
|
||
continue
|
||
|
||
# 処理件数カウント
|
||
process_count += 1
|
||
|
||
# SQL文生成
|
||
query_parameter_list = []
|
||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
|
||
for db_column in settings_db_columu_list:
|
||
sql = f'{sql} {db_column},'
|
||
sql = f'{sql} file_name,' # システム項目:取込ファイル名
|
||
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
|
||
sql = f'{sql} ins_user,' # システム項目:登録者
|
||
sql = f'{sql} ins_date,' # システム項目:登録日時
|
||
sql = f'{sql} upd_user,' # システム項目:更新者
|
||
sql = f'{sql} upd_date)' # システム項目:更新日時
|
||
sql = f'{sql} VALUES ('
|
||
for i in range(len(line)):
|
||
# データ項目値が0桁より大きいかチェックする
|
||
if len(line[i]) == 0:
|
||
# 0桁の場合
|
||
sql = f'{sql} NULL,'
|
||
continue
|
||
|
||
# データ項目値の変換処理(カンマ除去)
|
||
org_column_value = line[i]
|
||
current_settings_db_column_name = settings_db_columu_list[i]
|
||
column_value = convert_column_value(
|
||
org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
||
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
||
query_parameter_list.append(column_value)
|
||
sql = f'{sql} %s,'
|
||
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
|
||
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} "0",' # システム項目:論理削除フラグ
|
||
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
|
||
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
|
||
sql = f'{sql} NULL,' # システム項目:更新者
|
||
sql = f'{sql} NULL)' # システム項目:更新日時
|
||
|
||
index += 1
|
||
|
||
debug_log(sql, log_info, mode)
|
||
|
||
# ロードスキーマのトランザクション開始
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, query_parameter_list)
|
||
conn.commit()
|
||
normal_count += 1
|
||
except Exception as e:
|
||
warning_count += 1
|
||
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
||
|
||
return {
|
||
"counts": {
|
||
"process": process_count,
|
||
"normal": normal_count,
|
||
"warning": warning_count
|
||
},
|
||
"warning_info": warning_info
|
||
}
|
||
|
||
|
||
def import_data_with_bulk(
|
||
bucket_name: str,
|
||
target_data_source: str,
|
||
target_file_name: str,
|
||
log_info: str,
|
||
mode: str,
|
||
settings_list: list[dict],
|
||
conn: pymysql.Connection) -> dict:
|
||
"""一括登録モードでロードスキーマに登録を行います
|
||
|
||
Args:
|
||
bucket_name (str): バケット名
|
||
target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name (str): 投入データのファイル名
|
||
log_info (str): ログに記載するデータソース名とファイル名
|
||
mode (str): 処理モード
|
||
settings_list (list[dict]): 設定ファイル
|
||
conn (pymysql.Connection): DB接続
|
||
|
||
Returns:
|
||
dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト
|
||
"""
|
||
# ⑤-2 一括登録を行う
|
||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||
if not compressed_flag or int(compressed_flag) == 0:
|
||
# 圧縮フラグがOFFの場合、投入データをtmpディレクトリにダウンロードする
|
||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||
s3_client.download_file(
|
||
Bucket=bucket_name, Key=work_key, Filename=LOCAL_TEMPORARY_FILE_PATH)
|
||
elif compressed_flag and int(compressed_flag) == 1:
|
||
# 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読むため、何もしない。
|
||
pass
|
||
else:
|
||
# nop
|
||
pass
|
||
|
||
process_count = 0 # 処理件数カウンタ
|
||
normal_count = 0 # 正常終了件数カウンタ
|
||
warning_count = 0 # ワーニング終了件数カウンター
|
||
warning_info = '' # ワーニング情報
|
||
|
||
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
|
||
has_header = settings_list[SETTINGS_ITEM["headerFlag"]] or int(
|
||
settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
|
||
enclosed_by = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]])
|
||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]]\
|
||
.rstrip().split(',')
|
||
|
||
# データファイル内の空文字をNULLに変換するため、各カラムをパラメータ化
|
||
variables = [f"@{col_name}" for col_name in settings_db_columu_list]
|
||
sets = [f"`{col}` = NULLIF(@{col}, '')"
|
||
for col in settings_db_columu_list]
|
||
|
||
sql = f"""
|
||
SET @file_row_cnt = 1;
|
||
|
||
LOAD DATA LOCAL
|
||
INFILE %s
|
||
IGNORE
|
||
INTO TABLE {load_schema_name}
|
||
|
||
CHARACTER SET {MYSQL_CHARSET_CODE[settings_list[SETTINGS_ITEM["charCode"]]]}
|
||
FIELDS TERMINATED BY {repr(settings_list[SETTINGS_ITEM["delimiter"]])}
|
||
ENCLOSED BY {repr(enclosed_by)}
|
||
LINES TERMINATED BY {repr(LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])}
|
||
{'IGNORE 1 LINES' if has_header else ''}
|
||
({','.join(variables)})
|
||
SET
|
||
{','.join(sets)},
|
||
-- 取込ファイル名
|
||
file_name = %s,
|
||
-- 取込ファイル行番号
|
||
file_row_cnt = (@file_row_cnt := @file_row_cnt + 1),
|
||
-- 論理削除フラグ
|
||
delete_flg = 0,
|
||
-- 登録者
|
||
ins_user = CURRENT_USER(),
|
||
-- 登録日時
|
||
ins_date = CURRENT_TIMESTAMP(),
|
||
-- 更新者
|
||
upd_user = NULL,
|
||
-- 更新日時
|
||
upd_date = NULL
|
||
;
|
||
"""
|
||
|
||
debug_log(sql, log_info, mode)
|
||
|
||
try:
|
||
# ロードスキーマのトランザクション開始
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, [LOCAL_TEMPORARY_FILE_PATH, target_file_name])
|
||
# 処理件数と成功件数を取得する
|
||
cur.execute("SELECT ROW_COUNT()")
|
||
normal_count = cur.fetchone()[0]
|
||
cur.execute("SELECT @file_row_cnt")
|
||
file_row_cnt = cur.fetchone()[0]
|
||
# ファイル行番号はヘッダがある時は-1して、データ行の行数と合わせる。
|
||
process_count = file_row_cnt - 1 if has_header else file_row_cnt
|
||
|
||
conn.commit()
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-02 - 一括登録モードのSQL実行に失敗しました。エラー内容: {e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
# 処理件数 - 登録件数をワーニング件数とする
|
||
warning_count = process_count - normal_count
|
||
# ワーニングが1件でもある場合、一部登録ができていないため、ワーニング情報を登録する
|
||
if warning_count > 0:
|
||
warning_info = f'一部レコードの登録に失敗しています。行が重複している可能性があります。 投入データ件数:{process_count}, 正常終了件数:{normal_count}'
|
||
|
||
return {
|
||
"counts": {
|
||
"process": process_count,
|
||
"normal": normal_count,
|
||
"warning": warning_count
|
||
},
|
||
"warning_info": warning_info
|
||
}
|
||
|
||
|
||
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
|
||
"""DB接続切断処理
|
||
Args:
|
||
conn : DBコネクション
|
||
bucket_name : バケット名
|
||
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name : 投入データのファイル名
|
||
log_info : ログに記載するデータソース名とファイル名
|
||
"""
|
||
try:
|
||
conn.close()
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-18 - DB接続を終了しました')
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
|
||
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
|
||
"""データ項目値変換処理
|
||
- 数値内のカンマ除去処理
|
||
Args:
|
||
org_column_value : 投入データの値
|
||
current_settings_db_column_name : 投入データのDBカラム物理名
|
||
settings_replace_comma_list : 投入データの数値型のDBカラム物理名のリスト
|
||
|
||
Returns:
|
||
converted_column_value:変換処理を行った投入データの値
|
||
"""
|
||
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
|
||
converted_column_value = org_column_value
|
||
if current_settings_db_column_name in settings_replace_comma_list:
|
||
converted_column_value = converted_column_value.replace(',', '')
|
||
|
||
return converted_column_value
|
||
|
||
|
||
def truncate_judge(settings_list):
|
||
"""TRUNCATE処理対応判定
|
||
Args:
|
||
settings_list (list): 個別設定ファイル
|
||
Raises:
|
||
InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス
|
||
Returns:
|
||
Bool: Truncate対象の場合True。Truncate対象でない場合False
|
||
"""
|
||
import_manner = settings_list[SETTINGS_ITEM["importManner"]]
|
||
# upsert判定
|
||
if not import_manner:
|
||
return False
|
||
|
||
# インポート方法設定チェック
|
||
if not import_manner.startswith(TRUNCATE_SRC_TABLE_SYMBOL):
|
||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||
import_manner_splitted_list = import_manner.split(
|
||
':')
|
||
if len(import_manner_splitted_list) != 2:
|
||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
|
||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||
return True
|
||
|
||
|
||
class InvalidConfigException(Exception):
|
||
pass
|
||
|
||
|
||
# ローカル実行用コード
|
||
# 値はよしなに変えてください
|
||
# if __name__ == '__main__':
|
||
# DB_INFO = {"host": '127.0.0.1', "name": 'org02', "pass": 'user', "user": 'user'}
|
||
# main(
|
||
# bucket_name='バケット名',
|
||
# target_data_source='投入データのディレクトリ名よりデータソースに該当する部分',
|
||
# target_file_name='投入データのファイル名',
|
||
# settings_key='投入データに該当する個別設定ファイルのフルパス',
|
||
# db_info=DB_INFO,
|
||
# log_info='info',
|
||
# mode='i'
|
||
# )
|