diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 91f59d04..7b8b3ba6 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -2,57 +2,16 @@ import csv import io import os import sys -import zipfile from datetime import datetime import boto3 -from common import convert_quotechar, debug_log +from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM, + convert_quotechar, debug_log, uncompress_zip) from end import end from error import error # 定数 DIRECTORY_WORK = '/work/' -LOCAL_DIRECTORY_TMP = '/tmp' -# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ -LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' -LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"} -SETTINGS_ITEM = { - 'dataSource': 0, - 'delimiter': 1, - 'charCode': 2, - 'quotechar': 3, - 'lineFeedCode': 4, - 'headerFlag': 5, - 'csvNumItems': 6, - 'csvNameItems': 7, - 'dbColumuName': 8, - 'storageSchemaName': 9, - 'loadSchemaName': 10, - 'exSqlFileName': 11, - 'commaReplaceColumns': 12, - 'importManner': 13, - 'bulkImportFlag': 14, - 'compressedFlag': 15, - 'compression': 16, - 'reserved1': 17, - 'reserved2': 18, - 'reserved3': 19, - 'reserved4': 20, - 'reserved5': 21, - 'reserved6': 22, - 'reserved7': 23, - 'reserved8': 24 -} - -LINE_FEED_CODE = { - 'CR': '\r', - 'LF': '\n', - 'CRLF': '\r\n', -} - -INFO = LOG_LEVEL["i"] -ERROR = LOG_LEVEL["e"] -WARNING = LOG_LEVEL["w"] # クラス変数 s3_client = boto3.client('s3') @@ -236,50 +195,19 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}') + file_bytes: bytes = None if compression == 'zip': - file_bytes = None - with zipfile.ZipFile(work_data_file, 'r') as zip_ref: - # 昇順でソートする。 - file_list: list[str] = sorted(zip_ref.namelist()) + # zipファイルを展開し、ファイルを書き出す。 + file_bytes = uncompress_zip() - if len(file_list) > 1: - # 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。 - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。') - - target_file_name = file_list[0] - # 末尾が「/」で終わるのはフォルダ - if target_file_name.endswith('/'): - # 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。 - raise Exception( - f'展開したデータはファイルではありません。ファイルパス: {target_file_name}') - - # zipファイル内には1ファイルのみ - with zip_ref.open(target_file_name) as file: - file_bytes = file.read() - - # ファイルを一時ディレクトリに書き出す。 - encoding = settings_list[SETTINGS_ITEM["charCode"]] - line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] - delimiter = settings_list[SETTINGS_ITEM["delimiter"]] - quote_char = convert_quotechar( - settings_list[SETTINGS_ITEM["quotechar"]]) - uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding, - newline=line_feed) - csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), - delimiter=delimiter) - with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: - csv_writer = csv.writer(csvfile, quotechar=quote_char, - delimiter=delimiter) - for row in csv_reader: - csv_writer.writerow(row) - print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました') - return file_bytes # MEMO: zip以外の圧縮形式に対応する際に追記すること else: raise ValueError("圧縮形式が一致しません") + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました') + return file_bytes + def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096): """バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。 diff --git a/ecs/dataimport/dataimport/common.py b/ecs/dataimport/dataimport/common.py index 1361ff53..22adf07d 100644 --- a/ecs/dataimport/dataimport/common.py +++ b/ecs/dataimport/dataimport/common.py @@ -1,16 +1,80 @@ +import csv +import io +import zipfile from datetime import datetime # 定数 -LOG_LEVEL = {"d": 'Debug'} +LOG_LEVEL = { + "d": 'Debug', + 'i': 'Info', + 'e': 'Error', + 'w': "Warning" +} + +INFO = LOG_LEVEL["i"] +ERROR = LOG_LEVEL["e"] +WARNING = LOG_LEVEL["w"] + + +LINE_FEED_CODE = { + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n', +} + +# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する +MYSQL_CHARSET_CODE = { + 'utf-8': 'utf8mb4', + 'utf8': 'utf8mb4', + 'utf-8-sig': 'utf8mb4', + 'shift_jis': 'cp932', + 'cp932': 'cp932', +} + MODE_TYPE = { 'n': 'normal', 'd': 'debug', } +# 設定ファイルのの項目行数のマップ +SETTINGS_ITEM = { + 'dataSource': 0, + 'delimiter': 1, + 'charCode': 2, + 'quotechar': 3, + 'lineFeedCode': 4, + 'headerFlag': 5, + 'csvNumItems': 6, + 'csvNameItems': 7, + 'dbColumuName': 8, + 'storageSchemaName': 9, + 'loadSchemaName': 10, + 'exSqlFileName': 11, + 'commaReplaceColumns': 12, + 'importManner': 13, + 'bulkImportFlag': 14, + 'compressedFlag': 15, + 'compression': 16, + 'reserved1': 17, + 'reserved2': 18, + 'reserved3': 19, + 'reserved4': 20, + 'reserved5': 21, + 'reserved6': 22, + 'reserved7': 23, + 'reserved8': 24 +} + +LOCAL_DIRECTORY_TMP = '/tmp' +# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ +LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' + def debug_log(log, log_info, mode): if MODE_TYPE['d'] == mode: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}') + def convert_quotechar(quotechar): """csvモジュールの囲い文字を変換する @@ -26,3 +90,57 @@ def convert_quotechar(quotechar): return None return quotechar + + +def uncompress_zip(work_data_file: bytes, settings_list: list, log_info) -> bytes: + """zip圧縮されているファイルを展開する + + Args: + work_data_file (bytes): 展開対象のバイナリ + settings_list (list): 個別設定ファイルの情報 + log_info (_type_): ログ情報 + + Raises: + Exception: 展開したものがファイルではなかった場合 + + Returns: + bytes: 展開後ファイルのバイナリ + """ + file_bytes = None + with zipfile.ZipFile(work_data_file, 'r') as zip_ref: + # 昇順でソートする。 + file_list: list[str] = sorted(zip_ref.namelist()) + + if len(file_list) > 1: + # 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING}' + + f'W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。' + ) + + target_file_name = file_list[0] + # 末尾が「/」で終わるのはフォルダ + if target_file_name.endswith('/'): + # 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。 + raise Exception( + f'展開したデータはファイルではありません。ファイルパス: {target_file_name}') + + # zipファイル内には1ファイルのみ + with zip_ref.open(target_file_name) as file: + file_bytes = file.read() + + # ファイルを一時ディレクトリに書き出す。 + encoding = settings_list[SETTINGS_ITEM["charCode"]] + line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] + delimiter = settings_list[SETTINGS_ITEM["delimiter"]] + quote_char = convert_quotechar( + settings_list[SETTINGS_ITEM["quotechar"]]) + uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding, + newline=line_feed) + csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=delimiter) + with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: + csv_writer = csv.writer(csvfile, quotechar=quote_char, + delimiter=delimiter) + for row in csv_reader: + csv_writer.writerow(row) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index aa717847..13f9c0c9 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -5,65 +5,19 @@ from datetime import datetime import boto3 import pymysql from chk import LOCAL_TEMPORARY_FILE_PATH -from common import convert_quotechar, debug_log +from common import (ERROR, INFO, LINE_FEED_CODE, MYSQL_CHARSET_CODE, + SETTINGS_ITEM, WARNING, convert_quotechar, debug_log) from error import error from pymysql.constants import CLIENT # 定数 DIRECTORY_WORK = '/work/' -LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} -SETTINGS_ITEM = { - 'dataSource': 0, - 'delimiter': 1, - 'charCode': 2, - 'quotechar': 3, - 'lineFeedCode': 4, - 'headerFlag': 5, - 'csvNumItems': 6, - 'csvNameItems': 7, - 'dbColumuName': 8, - 'storageSchemaName': 9, - 'loadSchemaName': 10, - 'exSqlFileName': 11, - 'commaReplaceColumns': 12, - 'importManner': 13, - 'bulkImportFlag': 14, - 'compressedFlag': 15, - 'compression': 16, - 'reserved1': 17, - 'reserved2': 18, - 'reserved3': 19, - 'reserved4': 20, - 'reserved5': 21, - 'reserved6': 22, - 'reserved7': 23, - 'reserved8': 24 -} - -LINE_FEED_CODE = { - 'CR': '\r', - 'LF': '\n', - 'CRLF': '\r\n', -} - -# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する -MYSQL_CHARSET_CODE = { - 'utf-8': 'utf8mb4', - 'utf8': 'utf8mb4', - 'utf-8-sig': 'utf8mb4', - 'shift_jis': 'cp932', - 'cp932': 'cp932', -} DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' -INFO = LOG_LEVEL['i'] -WARNING = LOG_LEVEL['w'] -ERROR = LOG_LEVEL['e'] - # クラス変数 s3_client = boto3.client('s3')