feat: レビュー指摘対応。新たに作った関数はリファクタリングする。
This commit is contained in:
parent
e6ebb1fd32
commit
bd788ed55e
@ -1,5 +1,6 @@
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import sys
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
@ -14,7 +15,7 @@ DIRECTORY_WORK = '/work/'
|
||||
LOCAL_DIRECTORY_TMP = '/tmp'
|
||||
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
|
||||
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'
|
||||
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
|
||||
LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"}
|
||||
SETTINGS_ITEM = {
|
||||
'dataSource': 0,
|
||||
'delimiter': 1,
|
||||
@ -49,6 +50,10 @@ LINE_FEED_CODE = {
|
||||
'CRLF': '\r\n',
|
||||
}
|
||||
|
||||
INFO = LOG_LEVEL["i"]
|
||||
ERROR = LOG_LEVEL["e"]
|
||||
WARNING = LOG_LEVEL["w"]
|
||||
|
||||
# クラス変数
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
@ -82,7 +87,7 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
|
||||
# ① チェック処理開始ログを出力する
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します')
|
||||
|
||||
# データ読込
|
||||
settings_obj_response = s3_client.get_object(
|
||||
@ -98,19 +103,23 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
work_obj_response = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=work_key)
|
||||
work_obj_file = work_obj_response["Body"].read()
|
||||
work_data_bytes = work_obj_file
|
||||
work_data_bytes = io.BytesIO(work_obj_file)
|
||||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||||
|
||||
# 圧縮されている場合は解凍する
|
||||
# ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。
|
||||
if compressed_flag and compressed_flag == '1':
|
||||
work_data_bytes = uncompress_file(
|
||||
work_data_bytes, settings_list, log_info)
|
||||
work_data_bytes = io.BytesIO(uncompress_file(
|
||||
work_data_bytes, settings_list, log_info))
|
||||
|
||||
work_data = io.TextIOWrapper(io.BytesIO(work_data_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
encoding = settings_list[SETTINGS_ITEM["charCode"]]
|
||||
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
|
||||
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
|
||||
|
||||
work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding,
|
||||
newline=line_feed)
|
||||
work_csv_row = []
|
||||
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
|
||||
delimiter=delimiter)):
|
||||
# ヘッダあり、かつ、1行目の場合
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
|
||||
work_csv_row.append(line)
|
||||
@ -118,35 +127,35 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
work_csv_row.append(line)
|
||||
break
|
||||
|
||||
# ② C-0のデータ件数チェックを開始する
|
||||
# ③ C-0のデータ件数チェックを開始する
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します')
|
||||
if is_empty_file(work_csv_row, settings_list):
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||||
end(bucket_name, target_data_source,
|
||||
target_file_name, '', log_info, mode)
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了')
|
||||
sys.exit()
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0:正常終了')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0:正常終了')
|
||||
|
||||
# ③ C-1の項目数チェックを開始する
|
||||
# ④ C-1の項目数チェックを開始する
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します')
|
||||
work_csv_row_item_len = len(work_csv_row[0])
|
||||
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了')
|
||||
else:
|
||||
raise CheckError(
|
||||
f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
|
||||
|
||||
# ④ C-2の項目並び順チェック開始する
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) is True:
|
||||
# ⑤ C-2の項目並び順チェック開始する
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します')
|
||||
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip(
|
||||
).split(',')
|
||||
work_header_list = work_csv_row[0]
|
||||
@ -155,19 +164,34 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
raise CheckError(
|
||||
f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2:正常終了')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了')
|
||||
|
||||
# ⑤ チェック処理終了ログを出力する
|
||||
# ⑥ C-3の末尾行項目数チェック開始する
|
||||
if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します')
|
||||
|
||||
last_line_work_data_bytes = next(reverse_readline_stream(
|
||||
work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]))
|
||||
last_line_count = len(
|
||||
last_line_work_data_bytes.decode(encoding).split(delimiter))
|
||||
if last_line_count != settings_list[SETTINGS_ITEM["csvNumItems"]]:
|
||||
raise CheckError(
|
||||
f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_work_data_bytes}')
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了')
|
||||
|
||||
# ⑦ チェック処理終了ログを出力する
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します')
|
||||
except CheckError as e:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
except Exception as e:
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
|
||||
@ -202,34 +226,89 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt
|
||||
"""
|
||||
compression = settings_list[SETTINGS_ITEM['compression']]
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}')
|
||||
|
||||
if compression == 'zip':
|
||||
file_bytes = None
|
||||
with zipfile.ZipFile(io.BytesIO(work_data_file), 'r') as zip_ref:
|
||||
for file_name in zip_ref.namelist():
|
||||
# zipファイル内には1ファイルのみ
|
||||
with zip_ref.open(file_name) as file:
|
||||
file_bytes = file.read()
|
||||
break
|
||||
with zipfile.ZipFile(work_data_file, 'r') as zip_ref:
|
||||
# 昇順でソートする。
|
||||
file_list: list[str] = sorted(zip_ref.namelist())
|
||||
|
||||
if len(file_list) > 1:
|
||||
# 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。')
|
||||
|
||||
target_file_name = file_list[0]
|
||||
# 末尾が「/」で終わるのはフォルダ
|
||||
if target_file_name.endswith('/'):
|
||||
# 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。
|
||||
raise Exception(
|
||||
f'展開したデータはファイルではありません。ファイルパス: {target_file_name}')
|
||||
|
||||
# zipファイル内には1ファイルのみ
|
||||
with zip_ref.open(target_file_name) as file:
|
||||
file_bytes = file.read()
|
||||
|
||||
# ファイルを一時ディレクトリに書き出す。
|
||||
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=settings_list[SETTINGS_ITEM["charCode"]],
|
||||
newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
encoding = settings_list[SETTINGS_ITEM["charCode"]]
|
||||
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
|
||||
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
|
||||
quote_char = convert_quotechar(
|
||||
settings_list[SETTINGS_ITEM["quotechar"]])
|
||||
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding,
|
||||
newline=line_feed)
|
||||
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
|
||||
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=settings_list[SETTINGS_ITEM['charCode']], newline='') as csvfile:
|
||||
csv_writer = csv.writer(csvfile, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||
delimiter=settings_list[SETTINGS_ITEM["delimiter"]])
|
||||
delimiter=delimiter)
|
||||
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
|
||||
csv_writer = csv.writer(csvfile, quotechar=quote_char,
|
||||
delimiter=delimiter)
|
||||
for row in csv_reader:
|
||||
csv_writer.writerow(row)
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-12 - 投入ファイルの展開が正常終了しました')
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました')
|
||||
return file_bytes
|
||||
# TODO: zip以外の圧縮形式に対応する際に追記すること
|
||||
# MEMO: zip以外の圧縮形式に対応する際に追記すること
|
||||
else:
|
||||
raise ValueError("圧縮形式が一致しません")
|
||||
|
||||
|
||||
def reverse_readline_stream(f, line_feed: str, chunk_size=4096):
|
||||
"""
|
||||
バイトモードのストリームを後ろ向きにチャンク読みし、
|
||||
1行ずつ返すジェネレータ。
|
||||
"""
|
||||
f.seek(0, os.SEEK_END)
|
||||
pointer = f.tell()
|
||||
buffer = b''
|
||||
yielded_any = False
|
||||
|
||||
while pointer > 0:
|
||||
read_size = min(chunk_size, pointer)
|
||||
pointer -= read_size
|
||||
f.seek(pointer)
|
||||
chunk = f.read(read_size)
|
||||
buffer = chunk + buffer
|
||||
|
||||
parts = buffer.split(line_feed.encode())
|
||||
# parts[0] は先頭途中行 → 次ループへ残す
|
||||
buffer = parts[0]
|
||||
|
||||
# parts[1:] が「完全に揃った行」のリスト
|
||||
for line in reversed(parts[1:]):
|
||||
if not yielded_any and line == b'':
|
||||
# 最初にだけ出てくる空行を飛ばす
|
||||
continue
|
||||
yielded_any = True
|
||||
yield line
|
||||
|
||||
# ファイル先頭まで来たあとの残り1行
|
||||
if not (not yielded_any and buffer == b''):
|
||||
# → 最初の空行だけスキップしたいなら、
|
||||
# buffer==b''かつyielded_any==Falseのときは飛ばす
|
||||
yield buffer
|
||||
|
||||
|
||||
# ローカル実行用コード
|
||||
# 値はよしなに変えてください
|
||||
# if __name__ == '__main__':
|
||||
|
||||
@ -137,11 +137,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
org_import_process_result = None
|
||||
bulk_import_flag = settings_list[SETTINGS_ITEM["bulkImportFlag"]]
|
||||
if not bulk_import_flag or int(bulk_import_flag) == 0:
|
||||
# 一括登録フラグが未設定またはOFFの場合、一行コミットモードでロードスキーマに登録を行う
|
||||
# ⑤-1 一括登録フラグが未設定、またはOFFの場合、1行コミットモードでロードスキーマに登録を行う
|
||||
org_import_process_result = import_data_with_commit_per_row(bucket_name, target_data_source, target_file_name, log_info, mode,
|
||||
settings_list, conn)
|
||||
elif bulk_import_flag and int(bulk_import_flag) == 1:
|
||||
# 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
|
||||
# ⑤-2 一括登録フラグがONの場合、一括登録モードでロードスキーマに登録を行う
|
||||
print(
|
||||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-22 - 一括登録モードが有効のため、一括INSERTを実行します')
|
||||
org_import_process_result = import_data_with_bulk(bucket_name, target_data_source, target_file_name, log_info, mode,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user