271 lines
12 KiB
Python
271 lines
12 KiB
Python
import csv
|
||
import io
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
|
||
import boto3
|
||
from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM,
|
||
convert_quotechar, debug_log, uncompress_zip)
|
||
from end import end
|
||
from error import error
|
||
|
||
# 定数
|
||
DIRECTORY_WORK = '/work/'
|
||
|
||
# クラス変数
|
||
s3_client = boto3.client('s3')
|
||
|
||
|
||
# チェック例外クラス
|
||
class CheckError(Exception):
|
||
pass
|
||
|
||
|
||
def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode):
|
||
"""チェック処理
|
||
Args:
|
||
bucket_name : バケット名
|
||
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name : 投入データのファイル名
|
||
settings_key : 投入データに該当する個別設定ファイルのフルパス
|
||
log_info : ログに記載するデータソース名とファイル名
|
||
mode : 処理モード
|
||
Raises:
|
||
CheckError : チェックでエラーがあった場合に発生する例外
|
||
"""
|
||
|
||
try:
|
||
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
|
||
debug_log(
|
||
f'引数 target_data_source : {target_data_source}', log_info, mode)
|
||
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
|
||
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
|
||
debug_log(f'引数 log_info : {log_info}', log_info, mode)
|
||
debug_log(f'引数 mode : {mode}', log_info, mode)
|
||
|
||
# ① チェック処理開始ログを出力する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します')
|
||
|
||
# データ読込
|
||
settings_obj_response = s3_client.get_object(
|
||
Bucket=bucket_name, Key=settings_key)
|
||
settings_list = []
|
||
for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'):
|
||
settings_list.append(line.rstrip('\n'))
|
||
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
|
||
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
|
||
settings_list.append('')
|
||
|
||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||
work_obj_response = s3_client.get_object(
|
||
Bucket=bucket_name, Key=work_key)
|
||
work_obj_file = work_obj_response["Body"].read()
|
||
work_data_bytes = io.BytesIO(work_obj_file)
|
||
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
|
||
|
||
# ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。
|
||
if compressed_flag and compressed_flag == '1':
|
||
work_data_bytes = io.BytesIO(uncompress_file(
|
||
work_data_bytes, settings_list, log_info))
|
||
|
||
encoding = settings_list[SETTINGS_ITEM["charCode"]]
|
||
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
|
||
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
|
||
|
||
work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding,
|
||
newline=line_feed)
|
||
work_csv_row = []
|
||
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||
delimiter=delimiter)):
|
||
# ヘッダあり、かつ、1行目の場合
|
||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
|
||
work_csv_row.append(line)
|
||
continue
|
||
work_csv_row.append(line)
|
||
break
|
||
|
||
# ③ C-0のデータ件数チェックを開始する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します')
|
||
if is_empty_file(work_csv_row, settings_list):
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||
end(bucket_name, target_data_source,
|
||
target_file_name, '', log_info, mode)
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了')
|
||
sys.exit()
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0:正常終了')
|
||
|
||
# ④ C-1の項目数チェックを開始する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します')
|
||
work_csv_row_item_len = len(work_csv_row[0])
|
||
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了')
|
||
else:
|
||
raise CheckError(
|
||
f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
|
||
|
||
# ⑤ C-2の項目並び順チェック開始する
|
||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します')
|
||
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip(
|
||
).split(',')
|
||
work_header_list = work_csv_row[0]
|
||
for i in range(len(settings_header_list)):
|
||
if not settings_header_list[i] == work_header_list[i]:
|
||
raise CheckError(
|
||
f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了')
|
||
|
||
# ⑥ C-3の末尾行項目数チェック開始する
|
||
if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します')
|
||
|
||
# ファイルの末尾行を取得し、ファイル項目数と比較する
|
||
last_line_work_data_bytes = next(reverse_readline_stream(
|
||
work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]))
|
||
# 区切り文字(例えばカンマ)を文字とデリミタで区別できるように、csvリーダーで読む
|
||
last_line_separated_data = [
|
||
row for row in csv.reader(
|
||
io.TextIOWrapper(io.BytesIO(last_line_work_data_bytes)),
|
||
quotechar=convert_quotechar(
|
||
settings_list[SETTINGS_ITEM["quotechar"]]),
|
||
delimiter=delimiter)
|
||
]
|
||
last_line_count = len(last_line_separated_data[0])
|
||
if last_line_count != int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||
raise CheckError(
|
||
f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_count}')
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-14 - C-3 末尾行項目数チェック 正常終了')
|
||
|
||
# ⑦ チェック処理終了ログを出力する
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します')
|
||
except CheckError as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
except Exception as e:
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||
|
||
|
||
def is_empty_file(work_csv_row: list, settings_list: list):
|
||
"""② C-0のデータ件数チェック
|
||
ヘッダ行がある場合は、1行目を読み飛ばして判定する
|
||
|
||
Args:
|
||
work_csv_row (list): CSVファイルの1行目(ヘッダを含む場合は2行目まで)
|
||
settings_list (list): 個別設定ファイルのリスト
|
||
|
||
Returns:
|
||
bool: CSVファイルの1行目が0件だった場合はTrue
|
||
"""
|
||
has_header = int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
|
||
# ヘッダのみのファイルも0バイトファイルをみなす
|
||
if has_header:
|
||
return len(work_csv_row[1:]) == 0
|
||
|
||
return len(work_csv_row) == 0
|
||
|
||
|
||
def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> bytes:
|
||
"""指定された形式で圧縮ファイルを展開し、ローカルに書き出す。
|
||
Args:
|
||
work_data_file (bytes): S3から読み込んだ登録
|
||
settings_list (list): 個別設定ファイルのリスト
|
||
log_info (str): ログに記載するデータソース名とファイル名
|
||
|
||
Returns:
|
||
bytes: 解凍後のファイルのバイト配列
|
||
"""
|
||
compression = settings_list[SETTINGS_ITEM['compression']]
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}')
|
||
|
||
file_bytes: bytes = None
|
||
if compression == 'zip':
|
||
# zipファイルを展開し、ファイルを書き出す。
|
||
file_bytes = uncompress_zip()
|
||
|
||
# MEMO: zip以外の圧縮形式に対応する際に追記すること
|
||
else:
|
||
raise ValueError("圧縮形式が一致しません")
|
||
|
||
print(
|
||
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました')
|
||
return file_bytes
|
||
|
||
|
||
def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096):
|
||
"""バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。
|
||
指定されたバイト単位でファイルを読み込み、1行ずつにして返している。
|
||
Args:
|
||
f (io.BytesIO): バイトモードのファイルストリーム
|
||
line_feed (str): 改行コード
|
||
chunk_size (int, optional): 読み込むファイルチャンク数(byte) Defaults to 4096.
|
||
|
||
Yields:
|
||
bytes: 末尾から1行分のレコード
|
||
"""
|
||
# ファイルのポインタを末尾に移動させてからポインタを取得する
|
||
f.seek(0, os.SEEK_END)
|
||
pointer = f.tell()
|
||
buffer = b''
|
||
yielded_any = False
|
||
|
||
while pointer > 0:
|
||
read_size = min(chunk_size, pointer)
|
||
# チャンク数分ファイルを読み込むために、ポインタをずらす
|
||
pointer -= read_size
|
||
f.seek(pointer)
|
||
chunk = f.read(read_size)
|
||
|
||
# 読み込んだチャンク + バッファ(途中行)
|
||
buffer = chunk + buffer
|
||
|
||
# 改行文字で区切る
|
||
parts = buffer.split(line_feed.encode())
|
||
# parts[0] は先頭途中行 → 次ループへ残す
|
||
buffer = parts[0]
|
||
# parts[1:] が「完全に揃った行」のリスト
|
||
# 改行文字で区切っているため、行途中のparts[1:] は空文字になるため、以下のループは素通りする。
|
||
for line in reversed(parts[1:]):
|
||
if not yielded_any and line == b'':
|
||
# 最初にだけ出てくる空行を飛ばす
|
||
continue
|
||
|
||
# 一度でも結果を返している場合にフラグを立てる
|
||
yielded_any = True
|
||
yield line
|
||
|
||
# ポインターがファイル先頭まで来たあとの残り1行
|
||
if not (not yielded_any and buffer == b''):
|
||
# 一度でも結果を返しているかつ、バッファにデータが残っている場合、先頭行のデータを返す
|
||
yield buffer
|
||
|
||
|
||
# ローカル実行用コード
|
||
# 値はよしなに変えてください
|
||
# if __name__ == '__main__':
|
||
# check(
|
||
# bucket_name='バケット名',
|
||
# target_data_source='データソース名',
|
||
# target_file_name='targetフォルダ内のファイル名',
|
||
# settings_key='個別設定ファイル名',
|
||
# log_info='Info',
|
||
# mode='i'
|
||
# )
|