148 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import io
import sys
from datetime import datetime
import boto3
from common import convert_quotechar, debug_log
from end import end
from error import error
# 定数
DIRECTORY_WORK = '/work/'
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
'charCode': 2,
'quotechar': 3,
'lineFeedCode': 4,
'headerFlag': 5,
'csvNumItems': 6,
'csvNameItems': 7,
'dbColumuName': 8,
'storageSchemaName': 9,
'loadSchemaName': 10,
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,
'reserved4': 17,
'reserved5': 18,
'reserved6': 19
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
# クラス変数
s3_client = boto3.client('s3')
# チェック例外クラス
class CheckError(Exception):
pass
def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode):
"""チェック処理
Args:
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Raises:
CheckError : チェックでエラーがあった場合に発生する例外
"""
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① チェック処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
# データ読込
settings_obj_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_obj_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
work_data = io.TextIOWrapper(io.BytesIO(work_obj_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
work_csv_row = []
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
# ヘッダあり、かつ、1行目の場合
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
work_csv_row.append(line)
continue
work_csv_row.append(line)
break
# ② C-0のデータ件数チェックを開始する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
if is_empty_file(work_csv_row, settings_list):
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
end(bucket_name, target_data_source, target_file_name, '', log_info, mode)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
sys.exit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-0正常終了')
# ③ C-1の項目数チェックを開始する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
work_csv_row_item_len = len(work_csv_row[0])
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1正常終了')
else:
raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
# ④ C-2の項目並び順チェック開始する
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',')
work_header_list = work_csv_row[0]
for i in range(len(settings_header_list)):
if not settings_header_list[i] == work_header_list[i]:
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-09 - C-2正常終了')
# ⑤ チェック処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-10 - チェック処理を終了します')
except CheckError as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def is_empty_file(work_csv_row: list, settings_list: list):
"""② C-0のデータ件数チェック
ヘッダ行がある場合は、1行目を読み飛ばして判定する
Args:
work_csv_row (list): CSVファイルの1行目(ヘッダを含む場合は2行目まで)
settings_list (list): 個別設定ファイルのリスト
Returns:
bool: CSVファイルの1行目が0件だった場合はTrue
"""
has_header = int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
# ヘッダのみのファイルも0バイトファイルをみなす
if has_header:
return len(work_csv_row[1:]) == 0
return len(work_csv_row) == 0