343 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import io
import os
import sys
import zipfile
from datetime import datetime
import boto3
from common import convert_quotechar, debug_log
from end import end
from error import error
# 定数
DIRECTORY_WORK = '/work/'
LOCAL_DIRECTORY_TMP = '/tmp'
# チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ
LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat'
LOG_LEVEL = {'i': 'Info', 'e': 'Error', 'w': "Warning"}
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
'charCode': 2,
'quotechar': 3,
'lineFeedCode': 4,
'headerFlag': 5,
'csvNumItems': 6,
'csvNameItems': 7,
'dbColumuName': 8,
'storageSchemaName': 9,
'loadSchemaName': 10,
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'importManner': 13,
'bulkImportFlag': 14,
'compressedFlag': 15,
'compression': 16,
'reserved1': 17,
'reserved2': 18,
'reserved3': 19,
'reserved4': 20,
'reserved5': 21,
'reserved6': 22,
'reserved7': 23,
'reserved8': 24
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
INFO = LOG_LEVEL["i"]
ERROR = LOG_LEVEL["e"]
WARNING = LOG_LEVEL["w"]
# クラス変数
s3_client = boto3.client('s3')
# チェック例外クラス
class CheckError(Exception):
pass
def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode):
"""チェック処理
Args:
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Raises:
CheckError : チェックでエラーがあった場合に発生する例外
"""
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(
f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① チェック処理開始ログを出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-01 - チェック処理を開始します')
# データ読込
settings_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=settings_key)
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
settings_list.append('')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj_response = s3_client.get_object(
Bucket=bucket_name, Key=work_key)
work_obj_file = work_obj_response["Body"].read()
work_data_bytes = io.BytesIO(work_obj_file)
compressed_flag = settings_list[SETTINGS_ITEM["compressedFlag"]]
# ② ファイル圧縮フラグがONの場合、チェック対象ファイルの展開処理を行う。
if compressed_flag and compressed_flag == '1':
work_data_bytes = io.BytesIO(uncompress_file(
work_data_bytes, settings_list, log_info))
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
work_data = io.TextIOWrapper(work_data_bytes, encoding=encoding,
newline=line_feed)
work_csv_row = []
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)):
# ヘッダあり、かつ、1行目の場合
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
work_csv_row.append(line)
continue
work_csv_row.append(line)
break
# ③ C-0のデータ件数チェックを開始する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します')
if is_empty_file(work_csv_row, settings_list):
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
end(bucket_name, target_data_source,
target_file_name, '', log_info, mode)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-04 - 終了処理完了')
sys.exit()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-05 - C-0正常終了')
# ④ C-1の項目数チェックを開始する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-06 - C-1 項目数チェックを開始します')
work_csv_row_item_len = len(work_csv_row[0])
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-07 - C-1 項目数チェック:正常終了')
else:
raise CheckError(
f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
# ⑤ C-2の項目並び順チェック開始する
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-08 - C-2 項目並び順チェックを開始します')
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip(
).split(',')
work_header_list = work_csv_row[0]
for i in range(len(settings_header_list)):
if not settings_header_list[i] == work_header_list[i]:
raise CheckError(
f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-2 項目並び順チェック:正常終了')
# ⑥ C-3の末尾行項目数チェック開始する
if settings_list[SETTINGS_ITEM["bulkImportFlag"]] and int(settings_list[SETTINGS_ITEM["bulkImportFlag"]]) == 1:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-13 - C-3 末尾行項目数チェックを開始します')
# ファイルの末尾行を取得し、ファイル項目数と比較する
last_line_work_data_bytes = next(reverse_readline_stream(
work_data_bytes, LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]))
# 区切り文字例えばカンマを文字とデリミタで区別できるように、csvリーダーで読む
last_line_separated_data = [
row for row in csv.reader(
io.TextIOWrapper(io.BytesIO(last_line_work_data_bytes)),
quotechar=convert_quotechar(
settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)
]
last_line_count = len(last_line_separated_data[0])
if last_line_count != int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
raise CheckError(
f'E-CHK-03 - 投入データ末尾の項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{last_line_count}')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-09 - C-3 末尾行項目数チェック 正常終了')
# ⑦ チェック処理終了ログを出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-10 - チェック処理を終了します')
except CheckError as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-CHK-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def is_empty_file(work_csv_row: list, settings_list: list):
"""② C-0のデータ件数チェック
ヘッダ行がある場合は、1行目を読み飛ばして判定する
Args:
work_csv_row (list): CSVファイルの1行目(ヘッダを含む場合は2行目まで)
settings_list (list): 個別設定ファイルのリスト
Returns:
bool: CSVファイルの1行目が0件だった場合はTrue
"""
has_header = int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
# ヘッダのみのファイルも0バイトファイルをみなす
if has_header:
return len(work_csv_row[1:]) == 0
return len(work_csv_row) == 0
def uncompress_file(work_data_file: bytes, settings_list: list, log_info) -> bytes:
"""指定された形式で圧縮ファイルを展開し、ローカルに書き出す。
Args:
work_data_file (bytes): S3から読み込んだ登録
settings_list (list): 個別設定ファイルのリスト
log_info (str): ログに記載するデータソース名とファイル名
Returns:
bytes: 解凍後のファイルのバイト配列
"""
compression = settings_list[SETTINGS_ITEM['compression']]
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-11 - 投入ファイルが圧縮されているため、展開処理を行います。圧縮形式:{compression}')
if compression == 'zip':
file_bytes = None
with zipfile.ZipFile(work_data_file, 'r') as zip_ref:
# 昇順でソートする。
file_list: list[str] = sorted(zip_ref.namelist())
if len(file_list) > 1:
# 圧縮ファイル内に複数ファイルが存在する場合、warningログを出力する。
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-01 - 圧縮データ内に複数ファイルが存在したため、{file_list[0]}のみ登録を行います。')
target_file_name = file_list[0]
# 末尾が「/」で終わるのはフォルダ
if target_file_name.endswith('/'):
# 圧縮ファイル内の先頭がフォルダの場合、エラー処理を行う。
raise Exception(
f'展開したデータはファイルではありません。ファイルパス: {target_file_name}')
# zipファイル内には1ファイルのみ
with zip_ref.open(target_file_name) as file:
file_bytes = file.read()
# ファイルを一時ディレクトリに書き出す。
encoding = settings_list[SETTINGS_ITEM["charCode"]]
line_feed = LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]
delimiter = settings_list[SETTINGS_ITEM["delimiter"]]
quote_char = convert_quotechar(
settings_list[SETTINGS_ITEM["quotechar"]])
uncompressed_file = io.TextIOWrapper(io.BytesIO(file_bytes), encoding=encoding,
newline=line_feed)
csv_reader = csv.reader(uncompressed_file, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
delimiter=delimiter)
with open(LOCAL_TEMPORARY_FILE_PATH, 'w', encoding=encoding, newline='') as csvfile:
csv_writer = csv.writer(csvfile, quotechar=quote_char,
delimiter=delimiter)
for row in csv_reader:
csv_writer.writerow(row)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-12 - 投入ファイルの展開が正常終了しました')
return file_bytes
# MEMO: zip以外の圧縮形式に対応する際に追記すること
else:
raise ValueError("圧縮形式が一致しません")
def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096):
"""バイトモードのファイルストリームを後ろから読み、1行ずつ返すジェネレータ。
指定されたバイト単位でファイルを読み込み、1行ずつにして返している。
Args:
f (io.BytesIO): バイトモードのファイルストリーム
line_feed (str): 改行コード
chunk_size (int, optional): 読み込むファイルチャンク数(byte) Defaults to 4096.
Yields:
bytes: 末尾から1行分のレコード
"""
# ファイルのポインタを末尾に移動させてからポインタを取得する
f.seek(0, os.SEEK_END)
pointer = f.tell()
buffer = b''
yielded_any = False
while pointer > 0:
read_size = min(chunk_size, pointer)
# チャンク数分ファイルを読み込むために、ポインタをずらす
pointer -= read_size
f.seek(pointer)
chunk = f.read(read_size)
# 読み込んだチャンク + バッファ(途中行)
buffer = chunk + buffer
# 改行文字で区切る
parts = buffer.split(line_feed.encode())
# parts[0] は先頭途中行 → 次ループへ残す
buffer = parts[0]
# parts[1:] が「完全に揃った行」のリスト
# 改行文字で区切っているため、行途中のparts[1:] は空文字になるため、以下のループは素通りする。
for line in reversed(parts[1:]):
if not yielded_any and line == b'':
# 最初にだけ出てくる空行を飛ばす
continue
# 一度でも結果を返している場合にフラグを立てる
yielded_any = True
yield line
# ポインターがファイル先頭まで来たあとの残り1行
if not (not yielded_any and buffer == b''):
# 一度でも結果を返しているかつ、バッファにデータが残っている場合、先頭行のデータを返す
yield buffer
# ローカル実行用コード
# 値はよしなに変えてください
# if __name__ == '__main__':
# check(
# bucket_name='バケット名',
# target_data_source='データソース名',
# target_file_name='targetフォルダ内のファイル名',
# settings_key='個別設定ファイル名',
# log_info='Info',
# mode='i'
# )