diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index ac6655e1..dd4300b2 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -171,6 +171,7 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します') + # ファイルの末尾行を取得し、ファイル項目数と比較する last_line_work_data_bytes = next(reverse_readline_stream( work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])) last_line_count = len( @@ -273,11 +274,18 @@ def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> byt raise ValueError("圧縮形式が一致しません") -def reverse_readline_stream(f, line_feed: str, chunk_size=4096): - """ - バイトモードのストリームを後ろ向きにチャンク読みし、 - 1行ずつ返すジェネレータ。 +def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096): + """バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。 + 指定されたバイト単位でファイルを読み込み、1行ずつにして返している。 + Args: + f (io.BytesIO): バイトモードのファイルストリーム + line_feed (str): 改行コード + chunk_size (int, optional): 読み込むファイルチャンク数(byte) Defaults to 4096. + + Yields: + bytes: 末尾から1行分のレコード """ + # ファイルのポインタを末尾に移動させてからポインタを取得する f.seek(0, os.SEEK_END) pointer = f.tell() buffer = b'' @@ -285,9 +293,12 @@ def reverse_readline_stream(f, line_feed: str, chunk_size=4096): while pointer > 0: read_size = min(chunk_size, pointer) + # チャンク数分ファイルを読み込むために、ポインタをずらす pointer -= read_size f.seek(pointer) chunk = f.read(read_size) + + # 読み込んだチャンク + バッファ(途中行) buffer = chunk + buffer parts = buffer.split(line_feed.encode()) @@ -299,13 +310,14 @@ def reverse_readline_stream(f, line_feed: str, chunk_size=4096): if not yielded_any and line == b'': # 最初にだけ出てくる空行を飛ばす continue + + # 一度でも結果を返している場合にフラグを立てる yielded_any = True yield line - # ファイル先頭まで来たあとの残り1行 + # ポインターがファイル先頭まで来たあとの残り1行 if not (not yielded_any and buffer == b''): - # → 最初の空行だけスキップしたいなら、 - # buffer==b''かつyielded_any==Falseのときは飛ばす + # 一度でも結果を返しているかつ、バッファにデータが残っている場合、先頭行のデータを返す yield buffer