From a3d72c20d3531d479f2279094c46d265a9aedd0d Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 13:57:11 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=80=E6=8B=AC=E7=99=BB=E9=8C=B2?= =?UTF-8?q?=E3=83=95=E3=83=A9=E3=82=B0=E3=81=AB=E3=82=88=E3=82=8A=E4=B8=80?= =?UTF-8?q?=E8=A1=8C=E3=82=B3=E3=83=9F=E3=83=83=E3=83=88=E3=81=8B=E4=B8=80?= =?UTF-8?q?=E6=8B=AC=E7=99=BB=E9=8C=B2=E3=81=8B=E3=82=92=E5=88=86=E5=B2=90?= =?UTF-8?q?=E3=81=99=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=97=E3=81=9F?= =?UTF-8?q?=E3=80=82=E3=81=BE=E3=81=9A=E3=81=AF=E4=B8=80=E8=A1=8C=E3=82=B3?= =?UTF-8?q?=E3=83=9F=E3=83=83=E3=83=88=E3=83=A2=E3=83=BC=E3=83=89=E3=82=92?= =?UTF-8?q?=E9=96=A2=E6=95=B0=E3=81=AB=E7=A7=BB=E6=A4=8D=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 243 +++++++++++++++++++----------- 1 file changed, 151 insertions(+), 92 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 55ea1119..c888b47c 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -125,102 +125,27 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') - # ⑤-1 投入データファイルを1行ごとにループする - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - import_data_bytes = None - compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] - if not compressed_flag or int(compressed_flag) == 0: - # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む - work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_response = s3_client.get_object( - Bucket=bucket_name, Key=work_key) - import_data_bytes = work_response["Body"].read() - # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む - elif compressed_flag and int(compressed_flag) == 1: - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: - import_data_bytes = f.read() + org_import_process_result = None + bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] + if not bulk_import_flag or int(bulk_import_flag) == 0: + # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う + org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) + elif bulk_import_flag and int(bulk_import_flag) == 1: + # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う + org_import_process_result = bulk_import() else: # nop pass - work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - process_count = 0 # 処理件数カウンタ - normal_count = 0 # 正常終了件数カウンタ - warning_count = 0 # ワーニング終了件数カウンター - warning_info = '' # ワーニング情報 - index = 0 # ループインデックス - settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( - ).split(',') - settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( - ).split(',') - - for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - try: - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: - index += 1 - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') - continue - - # 処理件数カウント - process_count += 1 - - # SQL文生成 - query_parameter_list = [] - sql = f'INSERT INTO {load_schema_name} (' - for db_column in settings_db_columu_list: - sql = f'{sql} {db_column},' - sql = f'{sql} file_name,' # システム項目:取込ファイル名 - sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 - sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ - sql = f'{sql} ins_user,' # システム項目:登録者 - sql = f'{sql} ins_date,' # システム項目:登録日時 - sql = f'{sql} upd_user,' # システム項目:更新者 - sql = f'{sql} upd_date)' # システム項目:更新日時 - sql = f'{sql} VALUES (' - for i in range(len(line)): - # データ項目値が0桁より大きいかチェックする - if len(line[i]) == 0: - # 0桁の場合 - sql = f'{sql} NULL,' - continue - - # データ項目値の変換処理(カンマ除去) - org_column_value = line[i] - current_settings_db_column_name = settings_db_columu_list[i] - column_value = convert_column_value( - org_column_value, current_settings_db_column_name, settings_replace_comma_list) - # INSERT文のパラメータとそれに対応するプレースホルダーを設定する - query_parameter_list.append(column_value) - sql = f'{sql} %s,' - sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 - sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 - sql = f'{sql} "0",' # システム項目:論理削除フラグ - sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 - sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 - sql = f'{sql} NULL,' # システム項目:更新者 - sql = f'{sql} NULL)' # システム項目:更新日時 - - index += 1 - - debug_log(sql, log_info, mode) - - # ロードスキーマのトランザクション開始 - with conn.cursor() as cur: - cur.execute(sql, query_parameter_list) - conn.commit() - normal_count += 1 - except Exception as e: - warning_count += 1 - warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') - + # 処理件数カウンタ + process_count = org_import_process_result['counts']['process'] + # 正常終了件数カウンタ + normal_count = org_import_process_result['counts']['normal'] + # ワーニング終了件数カウンター + warning_count = org_import_process_result['counts']['warning'] + # ワーニング情報 + warning_info = org_import_process_result['warning_info'] # ⑥ ⑤の処理結果件数をログ出力する print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') @@ -250,6 +175,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') sql = f'INSERT INTO {storage_schema_name} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' @@ -371,6 +298,138 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) +def row_per_commit_import( + bucket_name: str, + target_data_source: str, + target_file_name: str, + log_info: str, + mode: str, + settings_list: list[dict], + conn: pymysql.Connection) -> dict: + """一行コミットモードでロードスキーマに登録を行います + + Args: + bucket_name (str): バケット名 + target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name (str): 投入データのファイル名 + log_info (str): ログに記載するデータソース名とファイル名 + mode (str): 処理モード + settings_list (list[dict]): 設定ファイル + conn (pymysql.Connection): DB接続 + + Returns: + dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト + """ + # ⑤-1 投入データファイルを1行ごとにループする + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + import_data_bytes = None + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + import_data_bytes = work_response["Body"].read() + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む + elif compressed_flag and int(compressed_flag) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + import_data_bytes = f.read() + else: + # nop + pass + + work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + warning_count = 0 # ワーニング終了件数カウンター + warning_info = '' # ワーニング情報 + index = 0 # ループインデックス + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( + ).split(',') + + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + try: + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: + index += 1 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') + continue + + # 処理件数カウント + process_count += 1 + + # SQL文生成 + query_parameter_list = [] + sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' + for db_column in settings_db_columu_list: + sql = f'{sql} {db_column},' + sql = f'{sql} file_name,' # システム項目:取込ファイル名 + sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 + sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ + sql = f'{sql} ins_user,' # システム項目:登録者 + sql = f'{sql} ins_date,' # システム項目:登録日時 + sql = f'{sql} upd_user,' # システム項目:更新者 + sql = f'{sql} upd_date)' # システム項目:更新日時 + sql = f'{sql} VALUES (' + for i in range(len(line)): + # データ項目値が0桁より大きいかチェックする + if len(line[i]) == 0: + # 0桁の場合 + sql = f'{sql} NULL,' + continue + + # データ項目値の変換処理(カンマ除去) + org_column_value = line[i] + current_settings_db_column_name = settings_db_columu_list[i] + column_value = convert_column_value( + org_column_value, current_settings_db_column_name, settings_replace_comma_list) + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(column_value) + sql = f'{sql} %s,' + sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 + sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 + sql = f'{sql} "0",' # システム項目:論理削除フラグ + sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 + sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 + sql = f'{sql} NULL,' # システム項目:更新者 + sql = f'{sql} NULL)' # システム項目:更新日時 + + index += 1 + + debug_log(sql, log_info, mode) + + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql, query_parameter_list) + conn.commit() + normal_count += 1 + except Exception as e: + warning_count += 1 + warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + + return { + "counts": { + "process": process_count, + "normal": normal_count, + "warning": warning_count + }, + "warning_info": warning_info + } + + +def bulk_import(): + pass + + def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): """DB接続切断処理 Args: