diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index 3cd42ca0..c46564b0 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -6,6 +6,7 @@ from error import error from common import debug_log # 定数 +DIRECTORY_WORK = '/work/' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -36,11 +37,10 @@ class CheckError(Exception): pass -def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info, mode): +def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode): """チェック処理 Args: bucket_name : バケット名 - target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス @@ -52,7 +52,6 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_key : {target_key}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) @@ -69,28 +68,29 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) - target_obj = s3_resource.Object(bucket_name, target_key) - target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - target_header_list = line + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + work_header_list = line break # ② C-1の項目数チェックを開始する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') - target_header_list_len = len(target_header_list) - if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + work_header_list_len = len(work_header_list) + if work_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') else: - raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') + raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_header_list_len}') # ③ C-2の項目並び順チェック開始する if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') for i in range(len(settings_header_list)): - if not settings_header_list[i] == target_header_list[i]: - raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') + if not settings_header_list[i] == work_header_list[i]: + raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') # ④ チェック処理終了ログを出力する diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index 518c6fe1..10823061 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -41,11 +41,11 @@ try: # ③ チェック処理を呼び出す print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) + check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) # ④ メイン処理を呼び出す print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) + warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) # ⑤ 終了処理を呼び出す print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index a846c916..00f9e431 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -8,6 +8,7 @@ from error import error from common import debug_log # 定数 +DIRECTORY_WORK = '/work/' LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} SETTINGS_ITEM = { 'dataSource': 0, @@ -35,11 +36,10 @@ s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') -def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info, mode): +def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode): """メイン処理 Args: bucket_name : バケット名 - target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス @@ -52,7 +52,6 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_key : {target_key}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) @@ -91,9 +90,10 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings # ⑤ 投入データファイルを1行ごとにループする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - target_obj = s3_resource.Object(bucket_name, target_key) - target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ @@ -101,7 +101,7 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings warning_info = '' # ワーニング情報 index = 0 # ループインデックス - for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: index += 1