198 lines
6.1 KiB
Python

import csv
import gzip
import io
import zipfile
from datetime import datetime
# 定数
LOG_LEVEL = {
"d": 'Debug',
'i': 'Info',
'e': 'Error',
'w': "Warning"
}
INFO = LOG_LEVEL["i"]
ERROR = LOG_LEVEL["e"]
WARNING = LOG_LEVEL["w"]
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
# LOAD DATA文で文字コードを指定するために、個別設定ファイルの文字コード指定をMySQLの文字コード表記に変換する
MYSQL_CHARSET_CODE = {
'utf-8': 'utf8mb4',
'utf8': 'utf8mb4',
'utf-8-sig': 'utf8mb4',
'shift_jis': 'cp932',
'cp932': 'cp932',
}
MODE_TYPE = {
'n': 'normal',
'd': 'debug',
}
# 設定ファイルのの項目行数のマップ
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
'charCode': 2,
'quotechar': 3,
'lineFeedCode': 4,
'headerFlag': 5,
'csvNumItems': 6,
'csvNameItems': 7,
'dbColumuName': 8,
'storageSchemaName': 9,
'loadSchemaName': 10,
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'importManner': 13,
'bulkImportFlag': 14,
'compressedFlag': 15,
'compression': 16,
'executeExSqlIfFileEmptyFlag': 17,
'reserved1': 18,
'reserved2': 19,
'reserved3': 20,
'reserved4': 21,
'reserved5': 22,
'reserved6': 23,
'reserved7': 24
}
DIRECTORY_WORK = '/work/'
DIRECTORY_SETTINGS = '/settings/'
LOCAL_DIRECTORY_TMP = '/tmp'
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'
def debug_log(log, log_info, mode):
if MODE_TYPE['d'] == mode:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}')
def convert_quotechar(quotechar):
"""csvモジュールの囲い文字を変換する
Args:
quotechar : 項目囲い文字の設定値
Returns:
空文字、空白文字の場合→None
それ以外→設定値をそのまま帰す
"""
if (quotechar.strip(' ') == ''):
return None
return quotechar
def uncompress_zip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes:
"""zip圧縮されているファイルを展開する
Args:
work_data_file (io.BytesIO): 展開対象のバイナリ(.zip)
settings_list (list): 個別設定ファイルの情報
log_info (_type_): ログ情報
Raises:
Exception: 展開したものがファイルではなかった場合
Returns:
bytes: 展開後ファイルのバイナリ
"""
file_bytes = None
with zipfile.ZipFile(work_data_file, 'r') as zip_ref:
# 昇順でソートする。
file_list: list[str] = sorted(zip_ref.namelist())
if len(file_list) > 1:
# 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING}' +
f'W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。'
)
target_file_name = file_list[0]
# 末尾が「/」で終わるのはフォルダ
if target_file_name.endswith('/'):
# 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。
raise Exception(
f'展開したデータはファイルではありません。ファイルパス: {target_file_name}')
# zipファイル内には1ファイルのみ
with zip_ref.open(target_file_name) as file:
file_bytes = file.read()
# ファイルを一時ディレクトリに書き出す。
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
quote_char = convert_quotechar(
settings_list[SETTINGS_ITEM["quotechar"]])
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding,
newline=line_feed)
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
csv_writer = csv.writer(csvfile, quotechar=quote_char,
delimiter=delimiter)
for row in csv_reader:
csv_writer.writerow(row)
def uncompress_gzip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes:
"""gzip 圧縮されたファイルを展開する
Args:
work_data_file (io.BytesIO): 展開対象のバイナリ (.gz)
settings_list (list): 個別設定ファイルの情報
log_info (_type_): ログ情報
Raises:
Exception: 展開に失敗した場合
Returns:
bytes: 展開後ファイルのバイナリ
"""
gz_bytes = work_data_file.getvalue()
try:
# GZIP をバイト列ごと展開する
file_bytes = gzip.decompress(gz_bytes)
except (OSError, EOFError) as e:
raise Exception(f"GZIP 解凍に失敗しました: {e}")
# ファイルを一時ディレクトリに書き出す。
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
quote_char = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]])
uncompressed_file = io.TextIOWrapper(
io.BytesIO(file_bytes),
encoding=encoding,
newline=line_feed
)
csv_reader = csv.reader(
uncompressed_file,
quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter
)
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
csv_writer = csv.writer(
csvfile, quotechar=quote_char, delimiter=delimiter)
for row in csv_reader:
csv_writer.writerow(row)
return file_bytes