feat: gz解凍に対応
This commit is contained in:
parent
6d7917a7f7
commit
363a177559
@ -6,7 +6,8 @@ from datetime import datetime
|
|||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM,
|
from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM,
|
||||||
convert_quotechar, debug_log, uncompress_zip)
|
convert_quotechar, debug_log, uncompress_gzip,
|
||||||
|
uncompress_zip)
|
||||||
from end import end
|
from end import end
|
||||||
from error import error
|
from error import error
|
||||||
|
|
||||||
@ -181,7 +182,7 @@ def is_empty_file(work_csv_row: list, settings_list: list):
|
|||||||
return len(work_csv_row) == 0
|
return len(work_csv_row) == 0
|
||||||
|
|
||||||
|
|
||||||
def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> bytes:
|
def uncompress_file(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes:
|
||||||
"""指定された形式で圧縮ファイルを展開し、ローカルに書き出す。
|
"""指定された形式で圧縮ファイルを展開し、ローカルに書き出す。
|
||||||
Args:
|
Args:
|
||||||
work_data_file (bytes): S3から読み込んだ登録
|
work_data_file (bytes): S3から読み込んだ登録
|
||||||
@ -198,7 +199,9 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt
|
|||||||
file_bytes: bytes = None
|
file_bytes: bytes = None
|
||||||
if compression == 'zip':
|
if compression == 'zip':
|
||||||
# zipファイルを展開し、ファイルを書き出す。
|
# zipファイルを展開し、ファイルを書き出す。
|
||||||
file_bytes = uncompress_zip()
|
file_bytes = uncompress_zip(work_data_file, settings_list, log_info)
|
||||||
|
elif compression == 'gzip':
|
||||||
|
file_bytes = uncompress_gzip(work_data_file, settings_list, log_info)
|
||||||
|
|
||||||
# MEMO: zip以外の圧縮形式に対応する際に追記すること
|
# MEMO: zip以外の圧縮形式に対応する際に追記すること
|
||||||
else:
|
else:
|
||||||
@ -259,12 +262,12 @@ def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096):
|
|||||||
|
|
||||||
# ローカル実行用コード
|
# ローカル実行用コード
|
||||||
# 値はよしなに変えてください
|
# 値はよしなに変えてください
|
||||||
# if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# check(
|
check(
|
||||||
# bucket_name='バケット名',
|
bucket_name='test-shimoda-bucket',
|
||||||
# target_data_source='データソース名',
|
target_data_source='test',
|
||||||
# target_file_name='targetフォルダ内のファイル名',
|
target_file_name='compress_test.gz',
|
||||||
# settings_key='個別設定ファイル名',
|
settings_key='test/settings/compress_test_gzip.txt',
|
||||||
# log_info='Info',
|
log_info='Info',
|
||||||
# mode='i'
|
mode='i'
|
||||||
# )
|
)
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import csv
|
import csv
|
||||||
|
import gzip
|
||||||
import io
|
import io
|
||||||
import zipfile
|
import zipfile
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@ -92,11 +93,11 @@ def convert_quotechar(quotechar):
|
|||||||
return quotechar
|
return quotechar
|
||||||
|
|
||||||
|
|
||||||
def uncompress_zip(work_data_file: bytes, settings_list: list, log_info) -> bytes:
|
def uncompress_zip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes:
|
||||||
"""zip圧縮されているファイルを展開する
|
"""zip圧縮されているファイルを展開する
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
work_data_file (bytes): 展開対象のバイナリ
|
work_data_file (io.BytesIO): 展開対象のバイナリ(.zip)
|
||||||
settings_list (list): 個別設定ファイルの情報
|
settings_list (list): 個別設定ファイルの情報
|
||||||
log_info (_type_): ログ情報
|
log_info (_type_): ログ情報
|
||||||
|
|
||||||
@ -144,3 +145,51 @@ def uncompress_zip(work_data_file: bytes, settings_list: list, log_info) -> byte
|
|||||||
delimiter=delimiter)
|
delimiter=delimiter)
|
||||||
for row in csv_reader:
|
for row in csv_reader:
|
||||||
csv_writer.writerow(row)
|
csv_writer.writerow(row)
|
||||||
|
|
||||||
|
|
||||||
|
def uncompress_gzip(work_data_file: io.BytesIO, settings_list: list, log_info) -> bytes:
|
||||||
|
"""gzip 圧縮されたファイルを展開する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
work_data_file (io.BytesIO): 展開対象のバイナリ (.gz)
|
||||||
|
settings_list (list): 個別設定ファイルの情報
|
||||||
|
log_info (_type_): ログ情報
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: 展開に失敗した場合
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: 展開後ファイルのバイナリ
|
||||||
|
"""
|
||||||
|
|
||||||
|
gz_bytes = work_data_file.getvalue()
|
||||||
|
try:
|
||||||
|
# GZIP をバイト列ごと展開する
|
||||||
|
file_bytes = gzip.decompress(gz_bytes)
|
||||||
|
except (OSError, EOFError) as e:
|
||||||
|
raise Exception(f"GZIP 解凍に失敗しました: {e}")
|
||||||
|
|
||||||
|
# ファイルを一時ディレクトリに書き出す。
|
||||||
|
encoding = settings_list[SETTINGS_ITEM["charCode"]]
|
||||||
|
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
|
||||||
|
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
|
||||||
|
quote_char = convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]])
|
||||||
|
|
||||||
|
uncompressed_file = io.TextIOWrapper(
|
||||||
|
io.BytesIO(file_bytes),
|
||||||
|
encoding=encoding,
|
||||||
|
newline=line_feed
|
||||||
|
)
|
||||||
|
csv_reader = csv.reader(
|
||||||
|
uncompressed_file,
|
||||||
|
quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||||
|
delimiter=delimiter
|
||||||
|
)
|
||||||
|
|
||||||
|
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
|
||||||
|
csv_writer = csv.writer(
|
||||||
|
csvfile, quotechar=quote_char, delimiter=delimiter)
|
||||||
|
for row in csv_reader:
|
||||||
|
csv_writer.writerow(row)
|
||||||
|
|
||||||
|
return file_bytes
|
||||||
|
|||||||
@ -4,9 +4,9 @@ from datetime import datetime
|
|||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
import pymysql
|
import pymysql
|
||||||
from chk import LOCAL_TEMPORARY_FILE_PATH
|
from common import (ERROR, INFO, LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH,
|
||||||
from common import (ERROR, INFO, LINE_FEED_CODE, MYSQL_CHARSET_CODE,
|
MYSQL_CHARSET_CODE, SETTINGS_ITEM, WARNING,
|
||||||
SETTINGS_ITEM, WARNING, convert_quotechar, debug_log)
|
convert_quotechar, debug_log)
|
||||||
from error import error
|
from error import error
|
||||||
from pymysql.constants import CLIENT
|
from pymysql.constants import CLIENT
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user