import csv import gzip import io import zipfile from datetime import datetime # 定数 LOG_LEVEL = { "d": 'Debug', 'i': 'Info', 'e': 'Error', 'w': "Warning" } INFO = LOG_LEVEL["i"] ERROR = LOG_LEVEL["e"] WARNING = LOG_LEVEL["w"] LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n', } # LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する MYSQL_CHARSET_CODE = { 'utf-8': 'utf8mb4', 'utf8': 'utf8mb4', 'utf-8-sig': 'utf8mb4', 'shift_jis': 'cp932', 'cp932': 'cp932', } MODE_TYPE = { 'n': 'normal', 'd': 'debug', } # 設定ファイルのの項目行数のマップ SETTINGS_ITEM = { 'dataSource': 0, 'delimiter': 1, 'charCode': 2, 'quotechar': 3, 'lineFeedCode': 4, 'headerFlag': 5, 'csvNumItems': 6, 'csvNameItems': 7, 'dbColumuName': 8, 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, 'commaReplaceColumns': 12, 'importManner': 13, 'bulkImportFlag': 14, 'compressedFlag': 15, 'compression': 16, 'executeExSqlIfFileEmptyFlag': 17, 'reserved1': 18, 'reserved2': 19, 'reserved3': 20, 'reserved4': 21, 'reserved5': 22, 'reserved6': 23, 'reserved7': 24 } DIRECTORY_WORK = '/work/' DIRECTORY_SETTINGS = '/settings/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' def debug_log(log, log_info, mode): if MODE_TYPE['d'] == mode: print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}') def convert_quotechar(quotechar): """csvモジュールの囲い文字を変換する Args: quotechar : 項目囲い文字の設定値 Returns: 空文字、空白文字の場合→None それ以外→設定値をそのまま帰す """ if (quotechar.strip(' ') == ''): return None return quotechar def uncompress_zip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes: """zip圧縮されているファイルを展開する Args: work_data_file (io.BytesIO): 展開対象のバイナリ(.zip) settings_list (list): 個別設定ファイルの情報 log_info (_type_): ログ情報 Raises: Exception: 展開したものがファイルではなかった場合 Returns: bytes: 展開後ファイルのバイナリ """ file_bytes = None with zipfile.ZipFile(work_data_file, 'r') as zip_ref: # 昇順でソートする。 file_list: list[str] = sorted(zip_ref.namelist()) if len(file_list) > 1: # 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。 print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING}' + f'W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。' ) target_file_name = file_list[0] # 末尾が「/」で終わるのはフォルダ if target_file_name.endswith('/'): # 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。 raise Exception( f'展開したデータはファイルではありません。ファイルパス: {target_file_name}') # zipファイル内には1ファイルのみ with zip_ref.open(target_file_name) as file: file_bytes = file.read() # ファイルを一時ディレクトリに書き出す。 encoding = settings_list[SETTINGS_ITEM["charCode"]] line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] delimiter = settings_list[SETTINGS_ITEM["delimiter"]] quote_char = convert_quotechar( settings_list[SETTINGS_ITEM["quotechar"]]) uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding, newline=line_feed) csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=delimiter) with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: csv_writer = csv.writer(csvfile, quotechar=quote_char, delimiter=delimiter) for row in csv_reader: csv_writer.writerow(row) def uncompress_gzip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes: """gzip 圧縮されたファイルを展開する Args: work_data_file (io.BytesIO): 展開対象のバイナリ (.gz) settings_list (list): 個別設定ファイルの情報 log_info (_type_): ログ情報 Raises: Exception: 展開に失敗した場合 Returns: bytes: 展開後ファイルのバイナリ """ gz_bytes = work_data_file.getvalue() try: # GZIP をバイト列ごと展開する file_bytes = gzip.decompress(gz_bytes) except (OSError, EOFError) as e: raise Exception(f"GZIP 解凍に失敗しました: {e}") # ファイルを一時ディレクトリに書き出す。 encoding = settings_list[SETTINGS_ITEM["charCode"]] line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]] delimiter = settings_list[SETTINGS_ITEM["delimiter"]] quote_char = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]) uncompressed_file = io.TextIOWrapper( io.BytesIO(file_bytes), encoding=encoding, newline=line_feed ) csv_reader = csv.reader( uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=delimiter ) with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile: csv_writer = csv.writer( csvfile, quotechar=quote_char, delimiter=delimiter) for row in csv_reader: csv_writer.writerow(row) return file_bytes