From bd788ed55e843e0460571d1370a94a71227f5b69 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Tue, 13 May 2025 18:09:50 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E3=83=AC=E3=83=93=E3=83=A5=E3=83=BC?= =?UTF-8?q?=E6=8C=87=E6=91=98=E5=AF=BE=E5=BF=9C=E3=80=82=E6=96=B0=E3=81=9F?= =?UTF-8?q?=E3=81=AB=E4=BD=9C=E3=81=A3=E3=81=9F=E9=96=A2=E6=95=B0=E3=81=AF?= =?UTF-8?q?=E3=83=AA=E3=83=95=E3=82=A1=E3=82=AF=E3=82=BF=E3=83=AA=E3=83=B3?= =?UTF-8?q?=E3=82=B0=E3=81=99=E3=82=8B=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 159 ++++++++++++++++++++++-------- ecs/dataimport/dataimport/main.py | 4 +- 2 files changed, 121 insertions(+), 42 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index dc174eac..ac6655e1 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -1,5 +1,6 @@ import csv import io +import os import sys import zipfile from datetime import datetime @@ -14,7 +15,7 @@ DIRECTORY_WORK = '/work/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' -LOG_LEVEL = {'i': 'Info', 'e': 'Error'} +LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"} SETTINGS_ITEM = { 'dataSource': 0, 'delimiter': 1, @@ -49,6 +50,10 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } +INFO = LOG_LEVEL["i"] +ERROR = LOG_LEVEL["e"] +WARNING = LOG_LEVEL["w"] + # クラス変数 s3_client = boto3.client('s3') @@ -82,7 +87,7 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i # ① チェック処理開始ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します') # データ読込 settings_obj_response = s3_client.get_object( @@ -98,19 +103,23 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_obj_response = s3_client.get_object( Bucket=bucket_name, Key=work_key) work_obj_file = work_obj_response["Body"].read() - work_data_bytes = work_obj_file + work_data_bytes = io.BytesIO(work_obj_file) compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] - # 圧縮されている場合は解凍する + # ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。 if compressed_flag and compressed_flag == '1': - work_data_bytes = uncompress_file( - work_data_bytes, settings_list, log_info) + work_data_bytes = io.BytesIO(uncompress_file( + work_data_bytes, settings_list, log_info)) - work_data = io.TextIOWrapper(io.BytesIO(work_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + encoding = settings_list[SETTINGS_ITEM["charCode"]] + line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] + delimiter = settings_list[SETTINGS_ITEM["delimiter"]] + + work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding, + newline=line_feed) work_csv_row = [] for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): + delimiter=delimiter)): # ヘッダあり、かつ、1行目の場合 if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0: work_csv_row.append(line) @@ -118,35 +127,35 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_csv_row.append(line) break - # ② C-0のデータ件数チェックを開始する + # ③ C-0のデータ件数チェックを開始する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します') if is_empty_file(work_csv_row, settings_list): print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') end(bucket_name, target_data_source, target_file_name, '', log_info, mode) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了') sys.exit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0:正常終了') - # ③ C-1の項目数チェックを開始する + # ④ C-1の項目数チェックを開始する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します') work_csv_row_item_len = len(work_csv_row[0]) if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了') else: raise CheckError( f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') - # ④ C-2の項目並び順チェック開始する - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True: + # ⑤ C-2の項目並び順チェック開始する + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip( ).split(',') work_header_list = work_csv_row[0] @@ -155,19 +164,34 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i raise CheckError( f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了') - # ⑤ チェック処理終了ログを出力する + # ⑥ C-3の末尾行項目数チェック開始する + if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します') + + last_line_work_data_bytes = next(reverse_readline_stream( + work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])) + last_line_count = len( + last_line_work_data_bytes.decode(encoding).split(delimiter)) + if last_line_count != settings_list[SETTINGS_ITEM["csvNumItems"]]: + raise CheckError( + f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_work_data_bytes}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了') + + # ⑦ チェック処理終了ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します') except CheckError as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -202,34 +226,89 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt """ compression = settings_list[SETTINGS_ITEM['compression']] print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') if compression == 'zip': file_bytes = None - with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref: - for file_name in zip_ref.namelist(): - # zipファイル内には1ファイルのみ - with zip_ref.open(file_name) as file: - file_bytes = file.read() - break + with zipfile.ZipFile(work_data_file, 'r') as zip_ref: + # 昇順でソートする。 + file_list: list[str] = sorted(zip_ref.namelist()) + + if len(file_list) > 1: + # 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。') + + target_file_name = file_list[0] + # 末尾が「/」で終わるのはフォルダ + if target_file_name.endswith('/'): + # 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。 + raise Exception( + f'展開したデータはファイルではありません。ファイルパス: {target_file_name}') + + # zipファイル内には1ファイルのみ + with zip_ref.open(target_file_name) as file: + file_bytes = file.read() + # ファイルを一時ディレクトリに書き出す。 - uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], - newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + encoding = settings_list[SETTINGS_ITEM["charCode"]] + line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] + delimiter = settings_list[SETTINGS_ITEM["delimiter"]] + quote_char = convert_quotechar( + settings_list[SETTINGS_ITEM["quotechar"]]) + uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding, + newline=line_feed) csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) - with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: - csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + delimiter=delimiter) + with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: + csv_writer = csv.writer(csvfile, quotechar=quote_char, + delimiter=delimiter) for row in csv_reader: csv_writer.writerow(row) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-12 - 投入ファイルの展開が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました') return file_bytes - # TODO: zip以外の圧縮形式に対応する際に追記すること + # MEMO: zip以外の圧縮形式に対応する際に追記すること else: raise ValueError("圧縮形式が一致しません") +def reverse_readline_stream(f, line_feed: str, chunk_size=4096): + """ + バイトモードのストリームを後ろ向きにチャンク読みし、 + 1行ずつ返すジェネレータ。 + """ + f.seek(0, os.SEEK_END) + pointer = f.tell() + buffer = b'' + yielded_any = False + + while pointer > 0: + read_size = min(chunk_size, pointer) + pointer -= read_size + f.seek(pointer) + chunk = f.read(read_size) + buffer = chunk + buffer + + parts = buffer.split(line_feed.encode()) + # parts[0] は先頭途中行 → 次ループへ残す + buffer = parts[0] + + # parts[1:] が「完全に揃った行」のリスト + for line in reversed(parts[1:]): + if not yielded_any and line == b'': + # 最初にだけ出てくる空行を飛ばす + continue + yielded_any = True + yield line + + # ファイル先頭まで来たあとの残り1行 + if not (not yielded_any and buffer == b''): + # → 最初の空行だけスキップしたいなら、 + # buffer==b''かつyielded_any==Falseのときは飛ばす + yield buffer + + # ローカル実行用コード # 値はよしなに変えてください # if __name__ == '__main__': diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index bf100f4f..adea2f8e 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -137,11 +137,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf org_import_process_result = None bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]] if not bulk_import_flag or int(bulk_import_flag) == 0: - # 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う + # ⑤-1 一括登録フラグが未設定、またはOFFの場合、1行コミットモードでロードスキーマに登録を行う org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode, settings_list, conn) elif bulk_import_flag and int(bulk_import_flag) == 1: - # 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う + # ⑤-2 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します') org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode,