feat: チェック処理で圧縮ファイルの解凍を実装。
This commit is contained in:
parent
7ca2318277
commit
f471a2d444
@ -1,6 +1,7 @@
|
||||
import csv
|
||||
import io
|
||||
import sys
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
|
||||
import boto3
|
||||
@ -10,6 +11,9 @@ from error import error
|
||||
|
||||
# 定数
|
||||
DIRECTORY_WORK = '/work/'
|
||||
LOCAL_DIRECTORY_TMP = '/tmp'
|
||||
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
|
||||
LOCAL_UNCOMPRESSED_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/uncompressed_file.dat'
|
||||
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
|
||||
SETTINGS_ITEM = {
|
||||
'dataSource': 0,
|
||||
@ -26,13 +30,19 @@ SETTINGS_ITEM = {
|
||||
'exSqlFileName': 11,
|
||||
'commaReplaceColumns': 12,
|
||||
'importManner': 13,
|
||||
'reserved1': 14,
|
||||
'reserved2': 15,
|
||||
'reserved3': 16,
|
||||
'reserved4': 17,
|
||||
'reserved5': 18,
|
||||
'reserved6': 19
|
||||
'bulkImportFlag': 14,
|
||||
'compressedFlag': 15,
|
||||
'compression': 16,
|
||||
'reserved1': 17,
|
||||
'reserved2': 18,
|
||||
'reserved3': 19,
|
||||
'reserved4': 20,
|
||||
'reserved5': 21,
|
||||
'reserved6': 22,
|
||||
'reserved7': 23,
|
||||
'reserved8': 24
|
||||
}
|
||||
|
||||
LINE_FEED_CODE = {
|
||||
'CR': '\r',
|
||||
'LF': '\n',
|
||||
@ -63,26 +73,40 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
|
||||
try:
|
||||
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
|
||||
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
|
||||
debug_log(
|
||||
f'引数 target_data_source : {target_data_source}', log_info, mode)
|
||||
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
|
||||
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
|
||||
debug_log(f'引数 log_info : {log_info}', log_info, mode)
|
||||
debug_log(f'引数 mode : {mode}', log_info, mode)
|
||||
|
||||
# ① チェック処理開始ログを出力する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
|
||||
|
||||
# データ読込
|
||||
settings_obj_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
|
||||
settings_obj_response = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=settings_key)
|
||||
settings_list = []
|
||||
for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'):
|
||||
settings_list.append(line.rstrip('\n'))
|
||||
|
||||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||||
work_obj_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
|
||||
work_data = io.TextIOWrapper(io.BytesIO(work_obj_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
work_obj_response = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=work_key)
|
||||
work_obj_bytes = work_obj_response["Body"].read()
|
||||
work_data_file = work_obj_bytes
|
||||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||||
|
||||
# 圧縮されている場合は解凍する
|
||||
if compressed_flag and compressed_flag == '1':
|
||||
work_data_file = uncompress_file(work_data_file, settings_list)
|
||||
|
||||
work_data = io.TextIOWrapper(io.BytesIO(work_data_file), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
work_csv_row = []
|
||||
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
|
||||
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
|
||||
# ヘッダあり、かつ、1行目の場合
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
|
||||
work_csv_row.append(line)
|
||||
@ -91,40 +115,55 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
break
|
||||
|
||||
# ② C-0のデータ件数チェックを開始する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
|
||||
if is_empty_file(work_csv_row, settings_list):
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||||
end(bucket_name, target_data_source, target_file_name, '', log_info, mode)
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||||
end(bucket_name, target_data_source,
|
||||
target_file_name, '', log_info, mode)
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
|
||||
sys.exit()
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了')
|
||||
|
||||
# ③ C-1の項目数チェックを開始する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
|
||||
work_csv_row_item_len = len(work_csv_row[0])
|
||||
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了')
|
||||
else:
|
||||
raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
|
||||
raise CheckError(
|
||||
f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
|
||||
|
||||
# ④ C-2の項目並び順チェック開始する
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
|
||||
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',')
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
|
||||
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip(
|
||||
).split(',')
|
||||
work_header_list = work_csv_row[0]
|
||||
for i in range(len(settings_header_list)):
|
||||
if not settings_header_list[i] == work_header_list[i]:
|
||||
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了')
|
||||
raise CheckError(
|
||||
f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了')
|
||||
|
||||
# ⑤ チェック処理終了ログを出力する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します')
|
||||
except CheckError as e:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
except Exception as e:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
|
||||
@ -147,6 +186,32 @@ def is_empty_file(work_csv_row: list, settings_list: list):
|
||||
return len(work_csv_row) == 0
|
||||
|
||||
|
||||
def uncompress_file(work_data_file: bytes, settings_list: list):
|
||||
if settings_list[SETTINGS_ITEM['compression']] == 'zip':
|
||||
file_bytes = None
|
||||
with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref:
|
||||
for file_name in zip_ref.namelist():
|
||||
# zipファイル内には1ファイルのみ
|
||||
with zip_ref.open(file_name) as file:
|
||||
file_bytes = file.read()
|
||||
print(f"{file_name}: {len(file_bytes)} bytes")
|
||||
break
|
||||
# ファイルを一時ディレクトリに書き出す。
|
||||
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
|
||||
with open(LOCAL_UNCOMPRESSED_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile:
|
||||
csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
|
||||
for row in csv_reader:
|
||||
csv_writer.writerow(row)
|
||||
return file_bytes
|
||||
# TODO: zip以外の圧縮形式に対応する際に追記すること
|
||||
else:
|
||||
raise ValueError("圧縮形式が一致しません")
|
||||
|
||||
|
||||
# ローカル実行用コード
|
||||
# 値はよしなに変えてください
|
||||
# if __name__ == '__main__':
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user