From f471a2d444c054f48acb2e8d1afff7032977e2b7 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 7 May 2025 17:53:45 +0900 Subject: [PATCH 01/14] =?UTF-8?q?feat:=20=E3=83=81=E3=82=A7=E3=83=83?= =?UTF-8?q?=E3=82=AF=E5=87=A6=E7=90=86=E3=81=A7=E5=9C=A7=E7=B8=AE=E3=83=95?= =?UTF-8?q?=E3=82=A1=E3=82=A4=E3=83=AB=E3=81=AE=E8=A7=A3=E5=87=8D=E3=82=92?= =?UTF-8?q?=E5=AE=9F=E8=A3=85=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 121 ++++++++++++++++++++++++------- 1 file changed, 93 insertions(+), 28 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index f6bb97c6..4674416b 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -1,6 +1,7 @@ import csv import io import sys +import zipfile from datetime import datetime import boto3 @@ -10,6 +11,9 @@ from error import error # 定数 DIRECTORY_WORK = '/work/' +LOCAL_DIRECTORY_TMP = '/tmp' +# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ +LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -26,13 +30,19 @@ SETTINGS_ITEM = { 'exSqlFileName': 11, 'commaReplaceColumns': 12, 'importManner': 13, - 'reserved1': 14, - 'reserved2': 15, - 'reserved3': 16, - 'reserved4': 17, - 'reserved5': 18, - 'reserved6': 19 + 'bulkImportFlag': 14, + 'compressedFlag': 15, + 'compression': 16, + 'reserved1': 17, + 'reserved2': 18, + 'reserved3': 19, + 'reserved4': 20, + 'reserved5': 21, + 'reserved6': 22, + 'reserved7': 23, + 'reserved8': 24 } + LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', @@ -63,26 +73,40 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log( + f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) debug_log(f'引数 log_info : {log_info}', log_info, mode) debug_log(f'引数 mode : {mode}', log_info, mode) # ① チェック処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') # データ読込 - settings_obj_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key) + settings_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=settings_key) settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_obj_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_obj_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + work_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + work_obj_bytes = work_obj_response["Body"].read() + work_data_file = work_obj_bytes + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + + # 圧縮されている場合は解凍する + if compressed_flag and compressed_flag == '1': + work_data_file = uncompress_file(work_data_file, settings_list) + + work_data = io.TextIOWrapper(io.BytesIO(work_data_file), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) work_csv_row = [] - for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): + for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): # ヘッダあり、かつ、1行目の場合 if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0: work_csv_row.append(line) @@ -91,40 +115,55 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i break # ② C-0のデータ件数チェックを開始する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') if is_empty_file(work_csv_row, settings_list): - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') - end(bucket_name, target_data_source, target_file_name, '', log_info, mode) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') + end(bucket_name, target_data_source, + target_file_name, '', log_info, mode) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') sys.exit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') # ③ C-1の項目数チェックを開始する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') work_csv_row_item_len = len(work_csv_row[0]) if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') else: - raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') + raise CheckError( + f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') # ④ C-2の項目並び順チェック開始する - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') - settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') + settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip( + ).split(',') work_header_list = work_csv_row[0] for i in range(len(settings_header_list)): if not settings_header_list[i] == work_header_list[i]: - raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') + raise CheckError( + f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') # ⑤ チェック処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') except CheckError as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -147,6 +186,32 @@ def is_empty_file(work_csv_row: list, settings_list: list): return len(work_csv_row) == 0 +def uncompress_file(work_data_file: bytes, settings_list: list): + if settings_list[SETTINGS_ITEM['compression']] == 'zip': + file_bytes = None + with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref: + for file_name in zip_ref.namelist(): + # zipファイル内には1ファイルのみ + with zip_ref.open(file_name) as file: + file_bytes = file.read() + print(f"{file_name}: {len(file_bytes)} bytes") + break + # ファイルを一時ディレクトリに書き出す。 + uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: + csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + for row in csv_reader: + csv_writer.writerow(row) + return file_bytes + # TODO: zip以外の圧縮形式に対応する際に追記すること + else: + raise ValueError("圧縮形式が一致しません") + + # ローカル実行用コード # 値はよしなに変えてください # if __name__ == '__main__': From e31b20692c0915f13a56e810afe871ba3e0af8d7 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 7 May 2025 18:04:33 +0900 Subject: [PATCH 02/14] =?UTF-8?q?feat:=20=E3=83=AD=E3=82=B0=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0.=E5=A4=89=E6=95=B0=E5=90=8D=E8=A6=8B=E7=9B=B4?= =?UTF-8?q?=E3=81=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 4674416b..834207ef 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -94,15 +94,16 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj_response = s3_client.get_object( Bucket=bucket_name, Key=work_key) - work_obj_bytes = work_obj_response["Body"].read() - work_data_file = work_obj_bytes + work_obj_file = work_obj_response["Body"].read() + work_data_bytes = work_obj_file compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] # 圧縮されている場合は解凍する if compressed_flag and compressed_flag == '1': - work_data_file = uncompress_file(work_data_file, settings_list) + work_data_bytes = uncompress_file( + work_data_bytes, settings_list, log_info) - work_data = io.TextIOWrapper(io.BytesIO(work_data_file), encoding=settings_list[SETTINGS_ITEM["charCode"]], + work_data = io.TextIOWrapper(io.BytesIO(work_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) work_csv_row = [] for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), @@ -186,8 +187,21 @@ def is_empty_file(work_csv_row: list, settings_list: list): return len(work_csv_row) == 0 -def uncompress_file(work_data_file: bytes, settings_list: list): - if settings_list[SETTINGS_ITEM['compression']] == 'zip': +def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> bytes: + """指定された形式で圧縮ファイルを展開し、ローカルに書き出す。 + Args: + work_data_file (bytes): S3から読み込んだ登録 + settings_list (list): 個別設定ファイルのリスト + log_info (str): ログに記載するデータソース名とファイル名 + + Returns: + bytes: 解凍後のファイルのバイト配列 + """ + compression = settings_list[SETTINGS_ITEM['compression']] + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') + + if compression == 'zip': file_bytes = None with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref: for file_name in zip_ref.namelist(): @@ -206,6 +220,8 @@ def uncompress_file(work_data_file: bytes, settings_list: list): delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) for row in csv_reader: csv_writer.writerow(row) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-12 - 投入ファイルの展開が正常終了しました') return file_bytes # TODO: zip以外の圧縮形式に対応する際に追記すること else: From ff26e52b862903311c8aa0c8594e666a9782896b Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 7 May 2025 18:09:11 +0900 Subject: [PATCH 03/14] =?UTF-8?q?fix:=20=E4=BA=88=E7=B4=84=E8=A1=8C?= =?UTF-8?q?=E3=82=92=E7=A9=BA=E8=A1=8C=E3=81=A7=E5=9F=8B=E3=82=81=E3=82=8B?= =?UTF-8?q?=E5=87=A6=E7=90=86=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 834207ef..6f71a0de 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -90,6 +90,9 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) + # 設定ファイルに記載のない行を空文字として扱い、予約行とする + for _ in range(len(SETTINGS_ITEM) - len(settings_list)): + settings_list.append('') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj_response = s3_client.get_object( From 70d61910cc587148d646187861261fab2e76fd30 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 7 May 2025 18:14:51 +0900 Subject: [PATCH 04/14] =?UTF-8?q?feat:=20=E4=B8=8D=E8=A6=81=E3=81=AA?= =?UTF-8?q?=E3=83=AD=E3=82=B0=E5=87=BA=E5=8A=9B=E3=82=92=E5=89=8A=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 6f71a0de..e2f25e41 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -211,7 +211,6 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt # zipファイル内には1ファイルのみ with zip_ref.open(file_name) as file: file_bytes = file.read() - print(f"{file_name}: {len(file_bytes)} bytes") break # ファイルを一時ディレクトリに書き出す。 uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], From e029c5641d2e57142b9c4ed8c2eeb384a1ad627b Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 09:44:28 +0900 Subject: [PATCH 05/14] =?UTF-8?q?refactor:=20=E3=83=95=E3=82=A9=E3=83=BC?= =?UTF-8?q?=E3=83=9E=E3=83=83=E3=83=88=E3=82=92=E9=81=A9=E7=94=A8=E3=81=97?= =?UTF-8?q?=E3=80=81=E9=95=B7=E3=81=99=E3=81=8E=E3=82=8B=E8=A1=8C=E3=82=92?= =?UTF-8?q?=E9=81=A9=E5=BA=A6=E3=81=AB=E6=94=B9=E8=A1=8C=E3=81=97=E3=81=9F?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 171 ++++++++++++++++++++---------- 1 file changed, 115 insertions(+), 56 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 3233472c..1969fb9f 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,6 +1,5 @@ import csv import io -import re from datetime import datetime import boto3 @@ -34,11 +33,13 @@ SETTINGS_ITEM = { 'reserved5': 18, 'reserved6': 19 } + LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n', } + DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' @@ -64,7 +65,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log( + f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) debug_log(f'引数 db_info : {db_info}', log_info, mode) @@ -72,24 +74,30 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf debug_log(f'引数 mode : {mode}', log_info, mode) # ① メイン処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する - conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], + db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ③ タイムゾーンを変更する with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする # 個別設定ファイルの読み込み - settings_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key) + settings_response = s3_client.get_object( + Bucket=bucket_name, Key=settings_key) settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) @@ -102,27 +110,35 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + + f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') # ⑤ 投入データファイルを1行ごとにループする - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( + )), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター warning_info = '' # ワーニング情報 index = 0 # ループインデックス - settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') - settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',') + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( + ).split(',') - for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: index += 1 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -151,7 +167,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # データ項目値の変換処理(カンマ除去) org_column_value = line[i] current_settings_db_column_name = settings_db_columu_list[i] - column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) + column_value = convert_column_value( + org_column_value, current_settings_db_column_name, settings_replace_comma_list) # INSERT文のパラメータとそれに対応するプレースホルダーを設定する query_parameter_list.append(column_value) sql = f'{sql} %s,' @@ -175,26 +192,34 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf except Exception as e: warning_count += 1 warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑥ ⑤の処理結果件数をログ出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') - + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' + + f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + # インポート方法判断 try: if truncate_judge(settings_list): with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' cur.execute(sql_truncate) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + + f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') except InvalidConfigException as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 @@ -222,72 +247,102 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} ON DUPLICATE KEY UPDATE' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' - sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名 - sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号 - sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ - sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' # システム項目:登録者 - sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' # システム項目:登録日時 - sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新者 - sql = f'{sql} upd_date=t.ins_date' # システム項目:更新日時 + # システム項目:取込ファイル名 + sql = f'{sql} file_name=t.file_name,' + # システム項目:取込ファイル行番号 + sql = f'{sql} file_row_cnt=t.file_row_cnt,' + # システム項目:論理削除フラグ + sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' + # システム項目:登録者 + sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' + # システム項目:登録日時 + sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' + # システム項目:更新者 + sql = f'{sql} upd_user=t.ins_user,' + # システム項目:更新日時 + sql = f'{sql} upd_date=t.ins_date' debug_log(sql, log_info, mode) # トランザクション開始 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' + + f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' + + f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: try: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' + + f'拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \ + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') - except Exception as e: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + except Exception: warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: # 拡張SQLファイルからSQL文生成 - ex_sql_obj_response = s3_client.get_object(Bucket=bucket_name, Key=ex_sql_key) + ex_sql_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=ex_sql_key) ex_sql = '' for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'): ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' + + f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' + + f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + connection_close(conn, bucket_name, target_data_source, + target_file_name, log_info) except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, + target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑩ メイン処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') return warning_info except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -302,11 +357,14 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo """ try: conn.close() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): """データ項目値変換処理 - 数値内のカンマ除去処理 @@ -320,8 +378,8 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett """ # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く converted_column_value = org_column_value - if current_settings_db_column_name in settings_replace_comma_list: - converted_column_value = converted_column_value.replace(',', '') + if current_settings_db_column_name in settings_replace_comma_list: + converted_column_value = converted_column_value.replace(',', '') return converted_column_value @@ -339,11 +397,12 @@ def truncate_judge(settings_list): # upsert判定 if not settings_list[SETTINGS_ITEM["importManner"]]: return False - + # インポート方法設定チェック if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) - import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split( + ':') if len(import_manner_splitted_list) != 2: raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: From 841fc95651c21b9d8c560af51c3d85c721640889 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 10:17:41 +0900 Subject: [PATCH 06/14] =?UTF-8?q?refactor:=20=E9=87=8D=E8=A4=87=E3=81=97?= =?UTF-8?q?=E3=81=A6=E3=81=84=E3=82=8B=E8=A8=98=E8=BF=B0=E3=82=92=E5=A4=89?= =?UTF-8?q?=E6=95=B0=E5=8C=96=E3=81=97=E3=81=A6=E3=81=BE=E3=81=A8=E3=82=81?= =?UTF-8?q?=E3=81=9F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 108 ++++++++++++++++-------------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 1969fb9f..859419c9 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -45,6 +45,10 @@ TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' +INFO = LOG_LEVEL['i'] +WARNING = LOG_LEVEL['w'] +ERROR = LOG_LEVEL['e'] + # クラス変数 s3_client = boto3.client('s3') @@ -75,16 +79,16 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ① メイン処理開始ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: @@ -92,7 +96,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする # 個別設定ファイルの読み込み @@ -107,16 +111,17 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf settings_list.append('') # ロードスキーマのTRUNCATE + load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]] with conn.cursor() as cur: - sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' + sql_truncate = f'TRUNCATE table {load_schema_name}' cur.execute(sql_truncate) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + - f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') # ⑤ 投入データファイルを1行ごとにループする print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( @@ -138,7 +143,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: index += 1 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -146,7 +151,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # SQL文生成 query_parameter_list = [] - sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' + sql = f'INSERT INTO {load_schema_name} (' for db_column in settings_db_columu_list: sql = f'{sql} {db_column},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 @@ -193,37 +198,38 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf warning_count += 1 warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑥ ⑤の処理結果件数をログ出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする + storage_schema_name = settings_list[SETTINGS_ITEM["storageSchemaName"]] print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' + - f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-08 - ' + + f'ロードスキーマ({load_schema_name})のデータを蓄積スキーマ({storage_schema_name})に登録します') # インポート方法判断 try: if truncate_judge(settings_list): with conn.cursor() as cur: - sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + sql_truncate = f'TRUNCATE table {storage_schema_name}' cur.execute(sql_truncate) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + - f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + + f' I-MAIN-20 - {storage_schema_name} をTRUNCATEしました') except InvalidConfigException as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-01 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 - sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' + sql = f'INSERT INTO {storage_schema_name} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 @@ -243,7 +249,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} t.ins_date,' # システム項目:登録日時 sql = f'{sql} t.upd_user,' # システム項目:更新者 sql = f'{sql} t.upd_date' # システム項目:更新日時 - sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t' + sql = f'{sql} FROM {load_schema_name} as t' sql = f'{sql} ON DUPLICATE KEY UPDATE' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' @@ -252,11 +258,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # システム項目:取込ファイル行番号 sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:論理削除フラグ - sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' + sql = f'{sql} delete_flg={storage_schema_name}.delete_flg,' # システム項目:登録者 - sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' + sql = f'{sql} ins_user={storage_schema_name}.ins_user,' # システム項目:登録日時 - sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' + sql = f'{sql} ins_date={storage_schema_name}.ins_date,' # システム項目:更新者 sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新日時 @@ -266,36 +272,36 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # トランザクション開始 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' + - f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-09 - ' + + f'標準SQL:{storage_schema_name} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' + - f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-10 - ' + + f'標準SQL:{storage_schema_name} のCOMMIT処理が正常終了しました') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') - if settings_list[SETTINGS_ITEM["exSqlFileName"]]: + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]] + + if ex_sql_file_name: try: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-12 - 拡張SQL設定の存在を確認しました') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' + - f'拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \ - settings_list[SETTINGS_ITEM["exSqlFileName"]] + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-13 - 拡張SQLファイル名:{ex_sql_file_name} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception: warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: @@ -308,28 +314,26 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # トランザクション開始 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' + - f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-15 - 拡張SQL:{storage_schema_name} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' + - f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-16 - 拡張SQL:{storage_schema_name} のCOMMIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) @@ -337,12 +341,12 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf try: # ⑩ メイン処理終了ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-19 - メイン処理を終了します') return warning_info except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -358,10 +362,10 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo try: conn.close() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-18 - DB接続を終了しました') except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -393,15 +397,15 @@ def truncate_judge(settings_list): Returns: Bool: Truncate対象の場合True。Truncate対象でない場合False """ - + import_manner = settings_list[SETTINGS_ITEM["importManner"]] # upsert判定 - if not settings_list[SETTINGS_ITEM["importManner"]]: + if not import_manner: return False # インポート方法設定チェック - if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + if not import_manner.startswith(TRUNCATE_SRC_TABLE_SYMBOL): raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) - import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split( + import_manner_splitted_list = import_manner.split( ':') if len(import_manner_splitted_list) != 2: raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) From df1da34628ff47f129558bc79f8c16bc9f7749fb Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 10:19:06 +0900 Subject: [PATCH 07/14] =?UTF-8?q?feat:=20=E8=A8=AD=E5=AE=9A=E3=83=95?= =?UTF-8?q?=E3=82=A1=E3=82=A4=E3=83=AB=E3=82=AA=E3=83=96=E3=82=B8=E3=82=A7?= =?UTF-8?q?=E3=82=AF=E3=83=88=E3=81=AB=E3=83=97=E3=83=AD=E3=83=91=E3=83=86?= =?UTF-8?q?=E3=82=A3=E3=81=A8=E4=BA=88=E7=B4=84=E8=A1=8C=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 859419c9..3a39c7d6 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -26,12 +26,17 @@ SETTINGS_ITEM = { 'exSqlFileName': 11, 'commaReplaceColumns': 12, 'importManner': 13, - 'reserved1': 14, - 'reserved2': 15, - 'reserved3': 16, - 'reserved4': 17, - 'reserved5': 18, - 'reserved6': 19 + 'bulkImportFlag': 14, + 'compressedFlag': 15, + 'compression': 16, + 'reserved1': 17, + 'reserved2': 18, + 'reserved3': 19, + 'reserved4': 20, + 'reserved5': 21, + 'reserved6': 22, + 'reserved7': 23, + 'reserved8': 24 } LINE_FEED_CODE = { From 06d741e994dbc313c0743d3212ace7915f54a1cd Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 13:48:40 +0900 Subject: [PATCH 08/14] =?UTF-8?q?feat:=20=E5=9C=A7=E7=B8=AE=E3=83=95?= =?UTF-8?q?=E3=83=A9=E3=82=B0ON/OFF=E3=81=AB=E3=82=88=E3=82=8A=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=E3=81=99=E3=82=8B=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB?= =?UTF-8?q?=E3=83=90=E3=82=A4=E3=83=88=E3=82=92=E5=88=A4=E5=AE=9A=E3=81=99?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E4=BF=AE=E6=AD=A3=E3=80=82?= =?UTF-8?q?=20refactor:=20DB=E6=8E=A5=E7=B6=9A=E6=99=82=E3=81=AB=E4=BB=BB?= =?UTF-8?q?=E6=84=8F=E3=81=AEPORT=E3=82=92=E6=8C=87=E5=AE=9A=E3=81=A7?= =?UTF-8?q?=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=97=E3=81=9F?= =?UTF-8?q?=E3=80=82=E6=8C=87=E5=AE=9A=E3=81=95=E3=82=8C=E3=81=A6=E3=81=84?= =?UTF-8?q?=E3=81=AA=E3=81=84=E5=A0=B4=E5=90=88=E3=81=AF3306=E3=81=8C?= =?UTF-8?q?=E3=83=87=E3=83=95=E3=82=A9=E3=83=AB=E3=83=88=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/controller.py | 39 ++++++++++++++++--------- ecs/dataimport/dataimport/main.py | 28 ++++++++++++++---- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/ecs/dataimport/dataimport/controller.py b/ecs/dataimport/dataimport/controller.py index 79b54717..d9a639f3 100644 --- a/ecs/dataimport/dataimport/controller.py +++ b/ecs/dataimport/dataimport/controller.py @@ -1,10 +1,11 @@ import os from datetime import datetime -from ini import init + from chk import check -from main import main from end import end from error import error +from ini import init +from main import main # 引数 BUCKET_NAME = os.environ["BUCKET_NAME"] @@ -18,7 +19,9 @@ DB_HOST = os.environ["DB_HOST"] DB_NAME = os.environ["DB_NAME"] DB_PASS = os.environ["DB_PASS"] DB_USER = os.environ["DB_USER"] -DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} +DB_PORT = int(os.environ.get("DB_PORT", 3306)) +DB_INFO = {"host": DB_HOST, "port": DB_PORT, + "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} # 定数 LOG_LEVEL = {"i": 'Info'} @@ -33,26 +36,36 @@ LOG_INFO = f'{DATA_SOURCE_NAME} {FILE_NAME}' try: # ① データ取込処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') # ② 初期処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') - settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') + settings_key = init(BUCKET_NAME, TARGET_KEY, + DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) # ③ チェック処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') + check(BUCKET_NAME, DATA_SOURCE_NAME, + FILE_NAME, settings_key, LOG_INFO, MODE) # ④ メイン処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') + warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, + FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) # ⑤ 終了処理を呼び出す - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE) # ⑥ データ取込処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 3a39c7d6..55ea1119 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -4,6 +4,7 @@ from datetime import datetime import boto3 import pymysql +from chk import LOCAL_UNCOMPRESSED_FILE_PATH from common import convert_quotechar, debug_log from error import error from pymysql.constants import CLIENT @@ -87,7 +88,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する - conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], + conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') @@ -124,14 +125,29 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') - # ⑤ 投入データファイルを1行ごとにループする + # ⑤-1 投入データファイルを1行ごとにループする print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( - )), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + import_data_bytes = None + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + import_data_bytes = work_response["Body"].read() + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む + elif compressed_flag and int(compressed_flag) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + import_data_bytes = f.read() + else: + # nop + pass + work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター From a3d72c20d3531d479f2279094c46d265a9aedd0d Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Thu, 8 May 2025 13:57:11 +0900 Subject: [PATCH 09/14] =?UTF-8?q?feat:=20=E4=B8=80=E6=8B=AC=E7=99=BB?= =?UTF-8?q?=E9=8C=B2=E3=83=95=E3=83=A9=E3=82=B0=E3=81=AB=E3=82=88=E3=82=8A?= =?UTF-8?q?=E4=B8=80=E8=A1=8C=E3=82=B3=E3=83=9F=E3=83=83=E3=83=88=E3=81=8B?= =?UTF-8?q?=E4=B8=80=E6=8B=AC=E7=99=BB=E9=8C=B2=E3=81=8B=E3=82=92=E5=88=86?= =?UTF-8?q?=E5=B2=90=E3=81=99=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=97?= =?UTF-8?q?=E3=81=9F=E3=80=82=E3=81=BE=E3=81=9A=E3=81=AF=E4=B8=80=E8=A1=8C?= =?UTF-8?q?=E3=82=B3=E3=83=9F=E3=83=83=E3=83=88=E3=83=A2=E3=83=BC=E3=83=89?= =?UTF-8?q?=E3=82=92=E9=96=A2=E6=95=B0=E3=81=AB=E7=A7=BB=E6=A4=8D=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 243 +++++++++++++++++++----------- 1 file changed, 151 insertions(+), 92 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 55ea1119..c888b47c 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -125,102 +125,27 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') - # ⑤-1 投入データファイルを1行ごとにループする - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - import_data_bytes = None - compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] - if not compressed_flag or int(compressed_flag) == 0: - # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む - work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_response = s3_client.get_object( - Bucket=bucket_name, Key=work_key) - import_data_bytes = work_response["Body"].read() - # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む - elif compressed_flag and int(compressed_flag) == 1: - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: - import_data_bytes = f.read() + org_import_process_result = None + bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] + if not bulk_import_flag or int(bulk_import_flag) == 0: + # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う + org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) + elif bulk_import_flag and int(bulk_import_flag) == 1: + # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う + org_import_process_result = bulk_import() else: # nop pass - work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - process_count = 0 # 処理件数カウンタ - normal_count = 0 # 正常終了件数カウンタ - warning_count = 0 # ワーニング終了件数カウンター - warning_info = '' # ワーニング情報 - index = 0 # ループインデックス - settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( - ).split(',') - settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( - ).split(',') - - for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - try: - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: - index += 1 - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') - continue - - # 処理件数カウント - process_count += 1 - - # SQL文生成 - query_parameter_list = [] - sql = f'INSERT INTO {load_schema_name} (' - for db_column in settings_db_columu_list: - sql = f'{sql} {db_column},' - sql = f'{sql} file_name,' # システム項目:取込ファイル名 - sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 - sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ - sql = f'{sql} ins_user,' # システム項目:登録者 - sql = f'{sql} ins_date,' # システム項目:登録日時 - sql = f'{sql} upd_user,' # システム項目:更新者 - sql = f'{sql} upd_date)' # システム項目:更新日時 - sql = f'{sql} VALUES (' - for i in range(len(line)): - # データ項目値が0桁より大きいかチェックする - if len(line[i]) == 0: - # 0桁の場合 - sql = f'{sql} NULL,' - continue - - # データ項目値の変換処理(カンマ除去) - org_column_value = line[i] - current_settings_db_column_name = settings_db_columu_list[i] - column_value = convert_column_value( - org_column_value, current_settings_db_column_name, settings_replace_comma_list) - # INSERT文のパラメータとそれに対応するプレースホルダーを設定する - query_parameter_list.append(column_value) - sql = f'{sql} %s,' - sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 - sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 - sql = f'{sql} "0",' # システム項目:論理削除フラグ - sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 - sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 - sql = f'{sql} NULL,' # システム項目:更新者 - sql = f'{sql} NULL)' # システム項目:更新日時 - - index += 1 - - debug_log(sql, log_info, mode) - - # ロードスキーマのトランザクション開始 - with conn.cursor() as cur: - cur.execute(sql, query_parameter_list) - conn.commit() - normal_count += 1 - except Exception as e: - warning_count += 1 - warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') - + # 処理件数カウンタ + process_count = org_import_process_result['counts']['process'] + # 正常終了件数カウンタ + normal_count = org_import_process_result['counts']['normal'] + # ワーニング終了件数カウンター + warning_count = org_import_process_result['counts']['warning'] + # ワーニング情報 + warning_info = org_import_process_result['warning_info'] # ⑥ ⑤の処理結果件数をログ出力する print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') @@ -250,6 +175,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') sql = f'INSERT INTO {storage_schema_name} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' @@ -371,6 +298,138 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) +def row_per_commit_import( + bucket_name: str, + target_data_source: str, + target_file_name: str, + log_info: str, + mode: str, + settings_list: list[dict], + conn: pymysql.Connection) -> dict: + """一行コミットモードでロードスキーマに登録を行います + + Args: + bucket_name (str): バケット名 + target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name (str): 投入データのファイル名 + log_info (str): ログに記載するデータソース名とファイル名 + mode (str): 処理モード + settings_list (list[dict]): 設定ファイル + conn (pymysql.Connection): DB接続 + + Returns: + dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト + """ + # ⑤-1 投入データファイルを1行ごとにループする + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + import_data_bytes = None + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグが未設定、またはOFFの場合、S3バケット上の投入データファイルを読む + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + import_data_bytes = work_response["Body"].read() + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読む + elif compressed_flag and int(compressed_flag) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + import_data_bytes = f.read() + else: + # nop + pass + + work_data = io.TextIOWrapper(io.BytesIO(import_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + warning_count = 0 # ワーニング終了件数カウンター + warning_info = '' # ワーニング情報 + index = 0 # ループインデックス + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( + ).split(',') + + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + try: + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: + index += 1 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') + continue + + # 処理件数カウント + process_count += 1 + + # SQL文生成 + query_parameter_list = [] + sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' + for db_column in settings_db_columu_list: + sql = f'{sql} {db_column},' + sql = f'{sql} file_name,' # システム項目:取込ファイル名 + sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 + sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ + sql = f'{sql} ins_user,' # システム項目:登録者 + sql = f'{sql} ins_date,' # システム項目:登録日時 + sql = f'{sql} upd_user,' # システム項目:更新者 + sql = f'{sql} upd_date)' # システム項目:更新日時 + sql = f'{sql} VALUES (' + for i in range(len(line)): + # データ項目値が0桁より大きいかチェックする + if len(line[i]) == 0: + # 0桁の場合 + sql = f'{sql} NULL,' + continue + + # データ項目値の変換処理(カンマ除去) + org_column_value = line[i] + current_settings_db_column_name = settings_db_columu_list[i] + column_value = convert_column_value( + org_column_value, current_settings_db_column_name, settings_replace_comma_list) + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(column_value) + sql = f'{sql} %s,' + sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 + sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 + sql = f'{sql} "0",' # システム項目:論理削除フラグ + sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 + sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 + sql = f'{sql} NULL,' # システム項目:更新者 + sql = f'{sql} NULL)' # システム項目:更新日時 + + index += 1 + + debug_log(sql, log_info, mode) + + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql, query_parameter_list) + conn.commit() + normal_count += 1 + except Exception as e: + warning_count += 1 + warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + + return { + "counts": { + "process": process_count, + "normal": normal_count, + "warning": warning_count + }, + "warning_info": warning_info + } + + +def bulk_import(): + pass + + def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): """DB接続切断処理 Args: From e6ebb1fd32b79b8a6821d5872f08e3e443f2bc88 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 9 May 2025 18:30:43 +0900 Subject: [PATCH 10/14] =?UTF-8?q?feat:=20=E4=B8=80=E6=8B=AC=E7=99=BB?= =?UTF-8?q?=E9=8C=B2=E3=83=A2=E3=83=BC=E3=83=89=E3=82=92=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 4 +- ecs/dataimport/dataimport/main.py | 131 ++++++++++++++++++++++++++++-- 2 files changed, 124 insertions(+), 11 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index e2f25e41..dc174eac 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -13,7 +13,7 @@ from error import error DIRECTORY_WORK = '/work/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ -LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat' +LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -217,7 +217,7 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: + with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) for row in csv_reader: diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index c888b47c..bf100f4f 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -4,7 +4,7 @@ from datetime import datetime import boto3 import pymysql -from chk import LOCAL_UNCOMPRESSED_FILE_PATH +from chk import LOCAL_TEMPORARY_FILE_PATH from common import convert_quotechar, debug_log from error import error from pymysql.constants import CLIENT @@ -46,6 +46,15 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } +# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する +MYSQL_CHARSET_CODE = { + 'utf-8': 'utf8mb4', + 'utf8': 'utf8mb4', + 'utf-8-sig': 'utf8mb4', + 'shift_jis': 'cp932', + 'cp932': 'cp932', +} + DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' @@ -89,7 +98,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"], - db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) + db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True) print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') except Exception as e: @@ -129,11 +138,14 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] if not bulk_import_flag or int(bulk_import_flag) == 0: # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う - org_import_process_result = row_per_commit_import(bucket_name, target_data_source, target_file_name, log_info, mode, - settings_list, conn) + org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) elif bulk_import_flag and int(bulk_import_flag) == 1: # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う - org_import_process_result = bulk_import() + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します') + org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode, + settings_list, conn) else: # nop pass @@ -298,7 +310,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf error(bucket_name, target_data_source, target_file_name, log_info) -def row_per_commit_import( +def import_data_with_commit_per_row( bucket_name: str, target_data_source: str, target_file_name: str, @@ -335,7 +347,7 @@ def row_per_commit_import( elif compressed_flag and int(compressed_flag) == 1: print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-21 - ファイルが圧縮されていたため、展開済みのファイルを利用します') - with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'rb') as f: + with open(LOCAL_TEMPORARY_FILE_PATH, 'rb') as f: import_data_bytes = f.read() else: # nop @@ -426,8 +438,109 @@ def row_per_commit_import( } -def bulk_import(): - pass +def import_data_with_bulk( + bucket_name: str, + target_data_source: str, + target_file_name: str, + log_info: str, + mode: str, + settings_list: list[dict], + conn: pymysql.Connection) -> dict: + """一括登録モードでロードスキーマに登録を行います + + Args: + bucket_name (str): バケット名 + target_data_source (str): 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name (str): 投入データのファイル名 + log_info (str): ログに記載するデータソース名とファイル名 + mode (str): 処理モード + settings_list (list[dict]): 設定ファイル + conn (pymysql.Connection): DB接続 + + Returns: + dict: 処理件数(投入データ件数、正常終了件数、ワーニング件数)とワーニング情報の辞書オブジェクト + """ + # ⑤-2 一括登録を行う + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + if not compressed_flag or int(compressed_flag) == 0: + # 圧縮フラグがOFFの場合、投入データをtmpディレクトリにダウンロードする + work_key = target_data_source + DIRECTORY_WORK + target_file_name + s3_client.download_file( + Bucket=bucket_name, Key=work_key, Filename=LOCAL_TEMPORARY_FILE_PATH) + elif compressed_flag and int(compressed_flag) == 1: + # 圧縮フラグがONの場合、チェック処理で展開済みのtmpディレクトリ内のファイルを読むため、何もしない。 + pass + else: + # nop + pass + + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + + load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]] + has_header = settings_list[SETTINGS_ITEM["headerFlag"]] or int( + settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + enclosed_by = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]) + + sql = f""" + SET @file_row_cnt = 1; + + LOAD DATA LOCAL + INFILE %s + REPLACE + INTO TABLE {load_schema_name} + + CHARACTER SET {MYSQL_CHARSET_CODE[settings_list[SETTINGS_ITEM["charCode"]]]} + FIELDS TERMINATED BY '{settings_list[SETTINGS_ITEM["delimiter"]]}' + ENCLOSED BY '{enclosed_by if enclosed_by != "'" else "\\'"}' + LINES TERMINATED BY '{LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]}' + {'IGNORE 1 LINES' if has_header else ''} + ({','.join([column for column in settings_db_columu_list])}) + SET + -- 取込ファイル名 + file_name = %s, + -- 取込ファイル行番号 + file_row_cnt = (@file_row_cnt := @file_row_cnt + 1), + -- 論理削除フラグ + delete_flg = 0, + -- 登録者 + ins_user = CURRENT_USER(), + -- 登録日時 + ins_date = CURRENT_TIMESTAMP(), + -- 更新者 + upd_user = NULL, + -- 更新日時 + upd_date = NULL + ; + """ + + debug_log(sql, log_info, mode) + + try: + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql, [LOCAL_TEMPORARY_FILE_PATH, target_file_name]) + # 一括登録モードの場合、LOAD文の成功行数を取得してprocess_countにする + cur.execute("SELECT ROW_COUNT()") + process_count = cur.fetchone()[0] + conn.commit() + except Exception as e: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} 一括登録モードのSQL実行に失敗しました。エラー内容: {e}') + error(bucket_name, target_data_source, target_file_name, log_info) + + # 一括登録の場合、クエリ実行に成功したら、処理件数と成功件数は同じにする + normal_count = process_count + return { + "counts": { + "process": process_count, + "normal": normal_count, + "warning": 0 # 一括登録時はワーニングにならずエラーになるため、必ず0件 + }, + "warning_info": '' # 一括登録時はワーニングにならずエラーになるため、空文字を返却 + } def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): From bd788ed55e843e0460571d1370a94a71227f5b69 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 13 May 2025 18:09:50 +0900 Subject: [PATCH 11/14] =?UTF-8?q?feat:=20=E3=83=AC=E3=83=93=E3=83=A5?= =?UTF-8?q?=E3=83=BC=E6=8C=87=E6=91=98=E5=AF=BE=E5=BF=9C=E3=80=82=E6=96=B0?= =?UTF-8?q?=E3=81=9F=E3=81=AB=E4=BD=9C=E3=81=A3=E3=81=9F=E9=96=A2=E6=95=B0?= =?UTF-8?q?=E3=81=AF=E3=83=AA=E3=83=95=E3=82=A1=E3=82=AF=E3=82=BF=E3=83=AA?= =?UTF-8?q?=E3=83=B3=E3=82=B0=E3=81=99=E3=82=8B=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 159 ++++++++++++++++++++++-------- ecs/dataimport/dataimport/main.py | 4 +- 2 files changed, 121 insertions(+), 42 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index dc174eac..ac6655e1 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -1,5 +1,6 @@ import csv import io +import os import sys import zipfile from datetime import datetime @@ -14,7 +15,7 @@ DIRECTORY_WORK = '/work/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' -LOG_LEVEL = {'i': 'Info', 'e': 'Error'} +LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"} SETTINGS_ITEM = { 'dataSource': 0, 'delimiter': 1, @@ -49,6 +50,10 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } +INFO = LOG_LEVEL["i"] +ERROR = LOG_LEVEL["e"] +WARNING = LOG_LEVEL["w"] + # クラス変数 s3_client = boto3.client('s3') @@ -82,7 +87,7 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i # ① チェック処理開始ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します') # データ読込 settings_obj_response = s3_client.get_object( @@ -98,19 +103,23 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_obj_response = s3_client.get_object( Bucket=bucket_name, Key=work_key) work_obj_file = work_obj_response["Body"].read() - work_data_bytes = work_obj_file + work_data_bytes = io.BytesIO(work_obj_file) compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] - # 圧縮されている場合は解凍する + # ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。 if compressed_flag and compressed_flag == '1': - work_data_bytes = uncompress_file( - work_data_bytes, settings_list, log_info) + work_data_bytes = io.BytesIO(uncompress_file( + work_data_bytes, settings_list, log_info)) - work_data = io.TextIOWrapper(io.BytesIO(work_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + encoding = settings_list[SETTINGS_ITEM["charCode"]] + line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] + delimiter = settings_list[SETTINGS_ITEM["delimiter"]] + + work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding, + newline=line_feed) work_csv_row = [] for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): + delimiter=delimiter)): # ヘッダあり、かつ、1行目の場合 if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0: work_csv_row.append(line) @@ -118,35 +127,35 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_csv_row.append(line) break - # ② C-0のデータ件数チェックを開始する + # ③ C-0のデータ件数チェックを開始する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します') if is_empty_file(work_csv_row, settings_list): print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') end(bucket_name, target_data_source, target_file_name, '', log_info, mode) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了') sys.exit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0:正常終了') - # ③ C-1の項目数チェックを開始する + # ④ C-1の項目数チェックを開始する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します') work_csv_row_item_len = len(work_csv_row[0]) if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了') else: raise CheckError( f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') - # ④ C-2の項目並び順チェック開始する - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True: + # ⑤ C-2の項目並び順チェック開始する + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip( ).split(',') work_header_list = work_csv_row[0] @@ -155,19 +164,34 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i raise CheckError( f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了') - # ⑤ チェック処理終了ログを出力する + # ⑥ C-3の末尾行項目数チェック開始する + if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します') + + last_line_work_data_bytes = next(reverse_readline_stream( + work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])) + last_line_count = len( + last_line_work_data_bytes.decode(encoding).split(delimiter)) + if last_line_count != settings_list[SETTINGS_ITEM["csvNumItems"]]: + raise CheckError( + f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_work_data_bytes}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了') + + # ⑦ チェック処理終了ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します') except CheckError as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -202,34 +226,89 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt """ compression = settings_list[SETTINGS_ITEM['compression']] print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') if compression == 'zip': file_bytes = None - with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref: - for file_name in zip_ref.namelist(): - # zipファイル内には1ファイルのみ - with zip_ref.open(file_name) as file: - file_bytes = file.read() - break + with zipfile.ZipFile(work_data_file, 'r') as zip_ref: + # 昇順でソートする。 + file_list: list[str] = sorted(zip_ref.namelist()) + + if len(file_list) > 1: + # 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。') + + target_file_name = file_list[0] + # 末尾が「/」で終わるのはフォルダ + if target_file_name.endswith('/'): + # 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。 + raise Exception( + f'展開したデータはファイルではありません。ファイルパス: {target_file_name}') + + # zipファイル内には1ファイルのみ + with zip_ref.open(target_file_name) as file: + file_bytes = file.read() + # ファイルを一時ディレクトリに書き出す。 - uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + encoding = settings_list[SETTINGS_ITEM["charCode"]] + line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] + delimiter = settings_list[SETTINGS_ITEM["delimiter"]] + quote_char = convert_quotechar( + settings_list[SETTINGS_ITEM["quotechar"]]) + uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding, + newline=line_feed) csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) - with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: - csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + delimiter=delimiter) + with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: + csv_writer = csv.writer(csvfile, quotechar=quote_char, + delimiter=delimiter) for row in csv_reader: csv_writer.writerow(row) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-12 - 投入ファイルの展開が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました') return file_bytes - # TODO: zip以外の圧縮形式に対応する際に追記すること + # MEMO: zip以外の圧縮形式に対応する際に追記すること else: raise ValueError("圧縮形式が一致しません") +def reverse_readline_stream(f, line_feed: str, chunk_size=4096): + """ + バイトモードのストリームを後ろ向きにチャンク読みし、 + 1行ずつ返すジェネレータ。 + """ + f.seek(0, os.SEEK_END) + pointer = f.tell() + buffer = b'' + yielded_any = False + + while pointer > 0: + read_size = min(chunk_size, pointer) + pointer -= read_size + f.seek(pointer) + chunk = f.read(read_size) + buffer = chunk + buffer + + parts = buffer.split(line_feed.encode()) + # parts[0] は先頭途中行 → 次ループへ残す + buffer = parts[0] + + # parts[1:] が「完全に揃った行」のリスト + for line in reversed(parts[1:]): + if not yielded_any and line == b'': + # 最初にだけ出てくる空行を飛ばす + continue + yielded_any = True + yield line + + # ファイル先頭まで来たあとの残り1行 + if not (not yielded_any and buffer == b''): + # → 最初の空行だけスキップしたいなら、 + # buffer==b''かつyielded_any==Falseのときは飛ばす + yield buffer + + # ローカル実行用コード # 値はよしなに変えてください # if __name__ == '__main__': diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index bf100f4f..adea2f8e 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -137,11 +137,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf org_import_process_result = None bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] if not bulk_import_flag or int(bulk_import_flag) == 0: - # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う + # ⑤-1 一括登録フラグが未設定、またはOFFの場合、1行コミットモードでロードスキーマに登録を行う org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode, settings_list, conn) elif bulk_import_flag and int(bulk_import_flag) == 1: - # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う + # ⑤-2 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します') org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode, From 8e910e48f467983b703268532bfe6b2d8a5ac585 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 13 May 2025 19:04:06 +0900 Subject: [PATCH 12/14] =?UTF-8?q?style:=20=E3=82=B3=E3=83=A1=E3=83=B3?= =?UTF-8?q?=E3=83=88=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index ac6655e1..dd4300b2 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -171,6 +171,7 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します') + # ファイルの末尾行を取得し、ファイル項目数と比較する last_line_work_data_bytes = next(reverse_readline_stream( work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])) last_line_count = len( @@ -273,11 +274,18 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt raise ValueError("圧縮形式が一致しません") -def reverse_readline_stream(f, line_feed: str, chunk_size=4096): - """ - バイトモードのストリームを後ろ向きにチャンク読みし、 - 1行ずつ返すジェネレータ。 +def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096): + """バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。 + 指定されたバイト単位でファイルを読み込み、1行ずつにして返している。 + Args: + f (io.BytesIO): バイトモードのファイルストリーム + line_feed (str): 改行コード + chunk_size (int, optional): 読み込むファイルチャンク数(byte) Defaults to 4096. + + Yields: + bytes: 末尾から1行分のレコード """ + # ファイルのポインタを末尾に移動させてからポインタを取得する f.seek(0, os.SEEK_END) pointer = f.tell() buffer = b'' @@ -285,9 +293,12 @@ def reverse_readline_stream(f, line_feed: str, chunk_size=4096): while pointer > 0: read_size = min(chunk_size, pointer) + # チャンク数分ファイルを読み込むために、ポインタをずらす pointer -= read_size f.seek(pointer) chunk = f.read(read_size) + + # 読み込んだチャンク + バッファ(途中行) buffer = chunk + buffer parts = buffer.split(line_feed.encode()) @@ -299,13 +310,14 @@ def reverse_readline_stream(f, line_feed: str, chunk_size=4096): if not yielded_any and line == b'': # 最初にだけ出てくる空行を飛ばす continue + + # 一度でも結果を返している場合にフラグを立てる yielded_any = True yield line - # ファイル先頭まで来たあとの残り1行 + # ポインターがファイル先頭まで来たあとの残り1行 if not (not yielded_any and buffer == b''): - # → 最初の空行だけスキップしたいなら、 - # buffer==b''かつyielded_any==Falseのときは飛ばす + # 一度でも結果を返しているかつ、バッファにデータが残っている場合、先頭行のデータを返す yield buffer From 11aee0f43a76ccd2aee144e3d9714e2ce50a12dc Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 13 May 2025 19:06:47 +0900 Subject: [PATCH 13/14] =?UTF-8?q?style:=20=E6=9B=B4=E3=81=AB=E3=82=B3?= =?UTF-8?q?=E3=83=A1=E3=83=B3=E3=83=88=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index dd4300b2..13e8f88b 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -301,11 +301,12 @@ def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096): # 読み込んだチャンク + バッファ(途中行) buffer = chunk + buffer + # 改行文字で区切る parts = buffer.split(line_feed.encode()) # parts[0] は先頭途中行 → 次ループへ残す buffer = parts[0] - # parts[1:] が「完全に揃った行」のリスト + # 改行文字で区切っているため、行途中のparts[1:] は空文字になるため、以下のループは素通りする。 for line in reversed(parts[1:]): if not yielded_any and line == b'': # 最初にだけ出てくる空行を飛ばす From 72e0a18431dac05cb0ec9d2c861c06bb26d15092 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 14 May 2025 14:03:35 +0900 Subject: [PATCH 14/14] =?UTF-8?q?feat:=20=E6=9C=AB=E5=B0=BE=E8=A1=8C?= =?UTF-8?q?=E3=81=AFcsv=E3=83=A9=E3=82=A4=E3=83=96=E3=83=A9=E3=83=AA?= =?UTF-8?q?=E3=82=92=E4=BD=BF=E3=81=A3=E3=81=A6=E8=AA=AD=E3=81=BF=E8=BE=BC?= =?UTF-8?q?=E3=82=80=E3=80=82=E5=8D=98=E7=B4=94=E3=81=AB=E3=82=B9=E3=83=97?= =?UTF-8?q?=E3=83=AA=E3=83=83=E3=83=88=E3=81=97=E3=81=9F=E5=A0=B4=E5=90=88?= =?UTF-8?q?=E3=81=AB=E3=82=AB=E3=83=A9=E3=83=A0=E5=86=85=E3=81=AE=E3=82=AB?= =?UTF-8?q?=E3=83=B3=E3=83=9E=E3=82=92=E5=88=A4=E5=88=A5=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=81=AA=E3=81=84=E3=81=9F=E3=82=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 13e8f88b..87cdc9f9 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -174,11 +174,18 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i # ファイルの末尾行を取得し、ファイル項目数と比較する last_line_work_data_bytes = next(reverse_readline_stream( work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])) - last_line_count = len( - last_line_work_data_bytes.decode(encoding).split(delimiter)) - if last_line_count != settings_list[SETTINGS_ITEM["csvNumItems"]]: + # 区切り文字(例えばカンマ)を文字とデリミタで区別できるように、csvリーダーで読む + last_line_separated_data = [ + row for row in csv.reader( + io.TextIOWrapper(io.BytesIO(last_line_work_data_bytes)), + quotechar=convert_quotechar( + settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=delimiter) + ] + last_line_count = len(last_line_separated_data[0]) + if last_line_count != int(settings_list[SETTINGS_ITEM["csvNumItems"]]): raise CheckError( - f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_work_data_bytes}') + f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_count}') print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了')