From f471a2d444c054f48acb2e8d1afff7032977e2b7 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Wed, 7 May 2025 17:53:45 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E3=83=81=E3=82=A7=E3=83=83=E3=82=AF?= =?UTF-8?q?=E5=87=A6=E7=90=86=E3=81=A7=E5=9C=A7=E7=B8=AE=E3=83=95=E3=82=A1?= =?UTF-8?q?=E3=82=A4=E3=83=AB=E3=81=AE=E8=A7=A3=E5=87=8D=E3=82=92=E5=AE=9F?= =?UTF-8?q?=E8=A3=85=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 121 ++++++++++++++++++++++++------- 1 file changed, 93 insertions(+), 28 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index f6bb97c6..4674416b 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -1,6 +1,7 @@ import csv import io import sys +import zipfile from datetime import datetime import boto3 @@ -10,6 +11,9 @@ from error import error # 定数 DIRECTORY_WORK = '/work/' +LOCAL_DIRECTORY_TMP = '/tmp' +# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ +LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -26,13 +30,19 @@ SETTINGS_ITEM = { 'exSqlFileName': 11, 'commaReplaceColumns': 12, 'importManner': 13, - 'reserved1': 14, - 'reserved2': 15, - 'reserved3': 16, - 'reserved4': 17, - 'reserved5': 18, - 'reserved6': 19 + 'bulkImportFlag': 14, + 'compressedFlag': 15, + 'compression': 16, + 'reserved1': 17, + 'reserved2': 18, + 'reserved3': 19, + 'reserved4': 20, + 'reserved5': 21, + 'reserved6': 22, + 'reserved7': 23, + 'reserved8': 24 } + LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', @@ -63,26 +73,40 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log( + f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) debug_log(f'引数 log_info : {log_info}', log_info, mode) debug_log(f'引数 mode : {mode}', log_info, mode) # ① チェック処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') # データ読込 - settings_obj_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key) + settings_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=settings_key) settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) work_key = target_data_source + DIRECTORY_WORK + target_file_name - work_obj_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_obj_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + work_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=work_key) + work_obj_bytes = work_obj_response["Body"].read() + work_data_file = work_obj_bytes + compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]] + + # 圧縮されている場合は解凍する + if compressed_flag and compressed_flag == '1': + work_data_file = uncompress_file(work_data_file, settings_list) + + work_data = io.TextIOWrapper(io.BytesIO(work_data_file), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) work_csv_row = [] - for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): + for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): # ヘッダあり、かつ、1行目の場合 if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0: work_csv_row.append(line) @@ -91,40 +115,55 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i break # ② C-0のデータ件数チェックを開始する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') if is_empty_file(work_csv_row, settings_list): - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') - end(bucket_name, target_data_source, target_file_name, '', log_info, mode) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') + end(bucket_name, target_data_source, + target_file_name, '', log_info, mode) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') sys.exit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了') # ③ C-1の項目数チェックを開始する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') work_csv_row_item_len = len(work_csv_row[0]) if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') else: - raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') + raise CheckError( + f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') # ④ C-2の項目並び順チェック開始する - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') - settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') + settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip( + ).split(',') work_header_list = work_csv_row[0] for i in range(len(settings_header_list)): if not settings_header_list[i] == work_header_list[i]: - raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') + raise CheckError( + f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了') # ⑤ チェック処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します') except CheckError as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -147,6 +186,32 @@ def is_empty_file(work_csv_row: list, settings_list: list): return len(work_csv_row) == 0 +def uncompress_file(work_data_file: bytes, settings_list: list): + if settings_list[SETTINGS_ITEM['compression']] == 'zip': + file_bytes = None + with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref: + for file_name in zip_ref.namelist(): + # zipファイル内には1ファイルのみ + with zip_ref.open(file_name) as file: + file_bytes = file.read() + print(f"{file_name}: {len(file_bytes)} bytes") + break + # ファイルを一時ディレクトリに書き出す。 + uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]], + newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile: + csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]) + for row in csv_reader: + csv_writer.writerow(row) + return file_bytes + # TODO: zip以外の圧縮形式に対応する際に追記すること + else: + raise ValueError("圧縮形式が一致しません") + + # ローカル実行用コード # 値はよしなに変えてください # if __name__ == '__main__':