diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index e2f25e41..dc174eac 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -13,7 +13,7 @@ from error import error DIRECTORY_WORK = '/work/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ -LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat' +LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -217,7 +217,7 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: + with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) for row in csv_reader: diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index c888b47c..bf100f4f 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -4,7 +4,7 @@ from datetime import datetime import boto3 import pymysql -from chk import LOCAL_UNCOMPRESSED_FILE_PATH +from chk import LOCAL_TEMPORARY_FILE_PATH from common import convert_quotechar, debug_log from error import error from pymysql.constants import CLIENT @@ -46,6 +46,15 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } +# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する +MYSQL_CHARSET_CODE = { + 'utf-8': 'utf8mb4', + 'utf8': 'utf8mb4', + 'utf-8-sig': 'utf8mb4', + 'shift_jis': 'cp932', + 'cp932': 'cp932', +} + DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' @@ -89,7 +98,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"], - db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) + db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True) print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') except Exception as e: @@ -129,11 +138,14 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] if not bulk_import_flag or int(bulk_import_flag) == 0: # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う - org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode, - settings_list, conn) + org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) elif bulk_import_flag and int(bulk_import_flag) == 1: # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う - org_import_process_result = bulk_import() + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します') + org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) else: # nop pass @@ -298,7 +310,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) -def row_per_commit_import( +def import_data_with_commit_per_row( bucket_name: str, target_data_source: str, target_file_name: str, @@ -335,7 +347,7 @@ def row_per_commit_import( elif compressed_flag and int(compressed_flag) == 1: print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + with open(LOCAL_TEMPORARY_FILE_PATH, 'rb') as f: import_data_bytes = f.read() else: # nop @@ -426,8 +438,109 @@ def row_per_commit_import( } -def bulk_import(): - pass +def import_data_with_bulk( + bucket_name: str, + target_data_source: str, + target_file_name: str, + log_info: str, + mode: str, + settings_list: list[dict], + conn: pymysql.Connection) -> dict: + """一括登録モードでロードスキーマに登録を行います + + Args: + bucket_name (str): バケット名 + target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name (str): 投入データのファイル名 + log_info (str): ログに記載するデータソース名とファイル名 + mode (str): 処理モード + settings_list (list[dict]): 設定ファイル + conn (pymysql.Connection): DB接続 + + Returns: + dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト + """ + # ⑤-2 一括登録を行う + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグがOFFの場合、投入データをtmpディレクトリにダウンロードする + work_key = target_data_source + DIRECTORY_WORK + target_file_name + s3_client.download_file( + Bucket=bucket_name, Key=work_key, Filename=LOCAL_TEMPORARY_FILE_PATH) + elif compressed_flag and int(compressed_flag) == 1: + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読むため、何もしない。 + pass + else: + # nop + pass + + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + + load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]] + has_header = settings_list[SETTINGS_ITEM["headerFlag"]] or int( + settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + enclosed_by = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]) + + sql = f""" + SET @file_row_cnt = 1; + + LOAD DATA LOCAL + INFILE %s + REPLACE + INTO TABLE {load_schema_name} + + CHARACTER SET {MYSQL_CHARSET_CODE[settings_list[SETTINGS_ITEM["charCode"]]]} + FIELDS TERMINATED BY '{settings_list[SETTINGS_ITEM["delimiter"]]}' + ENCLOSED BY '{enclosed_by if enclosed_by != "'" else "\\'"}' + LINES TERMINATED BY '{LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]}' + {'IGNORE 1 LINES' if has_header else ''} + ({','.join([column for column in settings_db_columu_list])}) + SET + -- 取込ファイル名 + file_name = %s, + -- 取込ファイル行番号 + file_row_cnt = (@file_row_cnt := @file_row_cnt + 1), + -- 論理削除フラグ + delete_flg = 0, + -- 登録者 + ins_user = CURRENT_USER(), + -- 登録日時 + ins_date = CURRENT_TIMESTAMP(), + -- 更新者 + upd_user = NULL, + -- 更新日時 + upd_date = NULL + ; + """ + + debug_log(sql, log_info, mode) + + try: + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql, [LOCAL_TEMPORARY_FILE_PATH, target_file_name]) + # 一括登録モードの場合、LOAD文の成功行数を取得してprocess_countにする + cur.execute("SELECT ROW_COUNT()") + process_count = cur.fetchone()[0] + conn.commit() + except Exception as e: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} 一括登録モードのSQL実行に失敗しました。エラー内容: {e}') + error(bucket_name, target_data_source, target_file_name, log_info) + + # 一括登録の場合、クエリ実行に成功したら、処理件数と成功件数は同じにする + normal_count = process_count + return { + "counts": { + "process": process_count, + "normal": normal_count, + "warning": 0 # 一括登録時はワーニングにならずエラーになるため、必ず0件 + }, + "warning_info": '' # 一括登録時はワーニングにならずエラーになるため、空文字を返却 + } def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):