diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index 40bdc101..c39ed8c3 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -6,6 +6,7 @@ from error import error from common import debug_log # 定数 +DIRECTORY_WORK = '/work/' LOG_LEVEL = {'i': 'Info', 'e': 'Error'} SETTINGS_ITEM = { 'dataSource': 0, @@ -36,11 +37,10 @@ class CheckError(Exception): pass -def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info, mode): +def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode): """チェック処理 Args: bucket_name : バケット名 - target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス @@ -52,7 +52,6 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_key : {target_key}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) @@ -60,7 +59,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting debug_log(f'引数 mode : {mode}', log_info, mode) # ① チェック処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') # データ読込 settings_obj = s3_resource.Object(bucket_name, settings_key) @@ -69,36 +68,37 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) - target_obj = s3_resource.Object(bucket_name, target_key) - target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - target_header_list = line + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + work_header_list = line break # ② C-1の項目数チェックを開始する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') - target_header_list_len = len(target_header_list) - if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') + work_header_list_len = len(work_header_list) + if work_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - C-1:正常終了') else: - raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') + raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_header_list_len}') # ③ C-2の項目並び順チェック開始する if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') for i in range(len(settings_header_list)): - if not settings_header_list[i] == target_header_list[i]: - raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') + if not settings_header_list[i] == work_header_list[i]: + raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') # ④ チェック処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') except CheckError as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} {e}') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) diff --git a/ecs/Dockerfile/dataimport/common.py b/ecs/Dockerfile/dataimport/common.py index 1c21238a..8b8eeed6 100644 --- a/ecs/Dockerfile/dataimport/common.py +++ b/ecs/Dockerfile/dataimport/common.py @@ -1,5 +1,7 @@ from datetime import datetime +# 定数 +LOG_LEVEL = {"d": 'Debug'} MODE_TYPE = { 'n': 'normal', 'd': 'debug', @@ -8,4 +10,4 @@ MODE_TYPE = { def debug_log(log, log_info, mode): if MODE_TYPE['d'] == mode: - print(f'{str(datetime.now())} {log_info} Debug {log}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index f16281f8..79b54717 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -22,7 +22,7 @@ DB_INFO = {"host": DB_HOST, "name": DB_NAME, "pass": DB_PASS, "user": DB_USER} # 定数 LOG_LEVEL = {"i": 'Info'} -LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' +LOG_INFO = f'{DATA_SOURCE_NAME} {FILE_NAME}' """コントロール処理 @@ -33,26 +33,26 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' try: # ① データ取込処理開始ログを出力する - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') # ② 初期処理を呼び出す - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) # ③ チェック処理を呼び出す - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') - check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') + check(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) # ④ メイン処理を呼び出す - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') - warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') + warning_info = main(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) # ⑤ 終了処理を呼び出す - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE) # ⑥ データ取込処理終了ログを出力する - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') except Exception as e: - print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 84c34db7..dfb129cc 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -35,55 +35,55 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf debug_log(f'引数 mode : {mode}', log_info, mode) # ① 終了処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_body = work_response["Body"].read() - done_file_name = str(datetime.now()) + '_' + target_file_name + done_file_name = f'{datetime.now():%Y%m%d%H%M%S}_{target_file_name}' done_key = target_data_source + DIRECTORY_DONE + done_file_name done_obj = s3_resource.Object(bucket_name, done_key) done_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=work_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') # ④ Warning情報が存在するか確認する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') if warning_info: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') # warningファイルの作成 - warning_file_name = str(datetime.now()) + '_' + target_file_name + '_war.log' + warning_file_name = f'{datetime.now():%Y%m%d%H%M%S}_{target_file_name}_war.log' warning_key = target_data_source + DIRECTORY_WARNING + warning_file_name warning_obj = s3_resource.Object(bucket_name, warning_key) warning_obj.put(Body=warning_info) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-06 - warningディレクトリに {warning_file_name} を作成しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-06 - warningディレクトリに {warning_file_name} を作成しました') # warning処理結果ファイルの作成 result_warning_file_name = target_file_name + '.warning' result_warning_key = target_data_source + DIRECTORY_TARGET + result_warning_file_name result_warning_obj = s3_resource.Object(bucket_name, result_warning_key) result_warning_obj.put(Body='') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-07 - targetディレクトリに {result_warning_file_name} を作成しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-07 - targetディレクトリに {result_warning_file_name} を作成しました') else: # done処理結果ファイルの作成 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-08 - Warning情報は存在しませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-08 - Warning情報は存在しませんでした') result_done_file_name = target_file_name + '.done' result_done_key = target_data_source + DIRECTORY_TARGET + result_done_file_name result_done_obj = s3_resource.Object(bucket_name, result_done_key) result_done_obj.put(Body='') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') # ⑤ 終了処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index bba5aa9b..2d7cb55b 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -24,37 +24,37 @@ def error(bucket_name, target_data_source, target_file_name, log_info): try: # ① エラー処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_body = work_response["Body"].read() - error_file_name = str(datetime.now()) + '_' + target_file_name + error_file_name = f'{datetime.now():%Y%m%d%H%M%S}_{target_file_name}' error_key = target_data_source + DIRECTORY_ERROR + error_file_name error_obj = s3_resource.Object(bucket_name, error_key) error_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=work_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') # ④ S3バケット内のtargetディレクトリに、「投入データファイル名.error」ファイルを作成する result_error_file_name = target_file_name + '.error' result_error_key = target_data_source + DIRECTORY_TARGET + result_error_file_name result_error_obj = s3_resource.Object(bucket_name, result_error_key) result_error_obj.put(Body='') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-04 - targetディレクトリに {result_error_file_name} を作成しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-ERR-04 - targetディレクトリに {result_error_file_name} を作成しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') finally: # ⑤ 終了処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') # ⑥ 処理を終了する sys.exit() diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index 53d8c7eb..c6d97c50 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -41,27 +41,27 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info debug_log(f'引数 mode : {mode}', log_info, mode) # ① 初期処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') sys.exit() except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') try: # ③ S3バケット内のtargetディレクトリに、「投入データファイル名.doing」ファイルを作成する doing_obj = s3_resource.Object(bucket_name, doing_key) doing_obj.put(Body='') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') # ④ 投入データファイルをS3バケット内のtargetディレクトリから、workディレクトリに移動(コピー削除)する target_obj = s3_resource.Object(bucket_name, target_key) @@ -71,55 +71,55 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info work_obj = s3_resource.Object(bucket_name, work_key) work_obj.put(Body=work_body) s3_client.delete_object(Bucket=bucket_name, Key=target_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する # doneファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' s3_client.head_object(Bucket=bucket_name, Key=result_done_key) s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') try: # warningファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') try: # errorファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' s3_client.head_object(Bucket=bucket_name, Key=result_error_key) s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') try: # ⑥ 個別設定マッピングリストが存在するかチェックする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑦ 個別設定ファイルを特定する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') @@ -128,31 +128,31 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info match_result = re.fullmatch(row[0], target_file_name) if match_result is not None: settings_file_name = row[1].rstrip() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-18 - 個別設定ファイル:{settings_file_name} を特定しました') break if not settings_file_name: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-03 - 個別設定ファイルが特定出来ません') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑧ ⑦の個別設定ファイルが存在するかチェックする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑨ 初期処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name, log_info) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') - return settings_key + return settings_key + except Exception as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index 9f35e8cd..f2f9f2dc 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -8,6 +8,10 @@ from error import error from common import debug_log # 定数 +DIRECTORY_WORK = '/work/' +DELETE_FLG = 'delete_flg' +UPDATE_USER = 'upd_user' +UPDATE_DATE = 'upd_date' LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} SETTINGS_ITEM = { 'dataSource': 0, @@ -35,11 +39,10 @@ s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') -def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info, mode): +def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode): """メイン処理 Args: bucket_name : バケット名 - target_key : 投入データのフルパス target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス @@ -52,7 +55,6 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_key : {target_key}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) @@ -61,21 +63,20 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings debug_log(f'引数 mode : {mode}', log_info, mode) # ① メイン処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ③ タイムゾーンを変更する with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') - result = cur.fetchall() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする settings_obj = s3_resource.Object(bucket_name, settings_key) @@ -87,35 +88,26 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) - error(bucket_name, target_data_source, target_file_name, log_info) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') - # ⑤ 投入データファイルを1行ごとにループする - try: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') - target_obj = s3_resource.Object(bucket_name, target_key) - target_response = target_obj.get() - target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) - error(bucket_name, target_data_source, target_file_name, log_info) + # ⑤ 投入データファイルを1行ごとにループする + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + work_key = target_data_source + DIRECTORY_WORK + target_file_name + work_obj = s3_resource.Object(bucket_name, work_key) + work_response = work_obj.get() + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - try: process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター warning_info = '' # ワーニング情報 index = 0 # ループインデックス - for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: - if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: index += 1 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -134,29 +126,25 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings sql = f'{sql} NULL,' # システム項目:更新者 sql = f'{sql} NULL)' # システム項目:更新日時 + index += 1 + # ロードスキーマのトランザクション開始 with conn.cursor() as cur: cur.execute(sql) conn.commit() normal_count += 1 - - index += 1 except Exception as e: - warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) - error(bucket_name, target_data_source, target_file_name, log_info) + warning_count += 1 + warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') - try: # ⑥ ⑤の処理結果件数をログ出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' @@ -165,38 +153,36 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings sql = f'{sql} ON DUPLICATE KEY UPDATE' settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') for i in range(len(settings_db_columu_list)): - sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' - sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名 - sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号 - sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者 - sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時 + if settings_db_columu_list[i] == DELETE_FLG: + sql = f'{sql} {settings_db_columu_list[i]}={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.{settings_db_columu_list[i]},' # システム項目:論理削除フラグ + elif settings_db_columu_list[i] == UPDATE_USER: + sql = f'{sql} {settings_db_columu_list[i]}={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.{settings_db_columu_list[i]},' # システム項目:更新者 + elif settings_db_columu_list[i] == UPDATE_DATE: + sql = f'{sql} {settings_db_columu_list[i]}={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.{settings_db_columu_list[i]}' # システム項目:更新日時 + else: + sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) - error(bucket_name, target_data_source, target_file_name, log_info) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') - try: # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: try: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception as e: - warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' + warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: @@ -208,28 +194,32 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: - warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) - - # ⑩ メイン処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) - return warning_info + try: + # ⑩ メイン処理終了ログを出力する + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + + return warning_info + except Exception as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): @@ -243,7 +233,7 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo """ try: conn.close() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) diff --git a/lambda/mbj-newdwh2021-staging-lambda-dataimport.py b/lambda/mbj-newdwh2021-staging-lambda-dataimport.py index c87a7b50..fb79ede5 100644 --- a/lambda/mbj-newdwh2021-staging-lambda-dataimport.py +++ b/lambda/mbj-newdwh2021-staging-lambda-dataimport.py @@ -19,7 +19,7 @@ ecs_client = boto3.client('ecs') def lambda_handler(event, context): - print(f'{str(datetime.now())} Info I-1 駆動処理開始') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-1 駆動処理開始') # イベント情報を取得する s3_event = event["Records"][0]["s3"] @@ -27,9 +27,9 @@ def lambda_handler(event, context): event_object_key = s3_event["object"]["key"] event_file_name = os.path.basename(event_object_key) event_data_source_name = os.path.dirname(event_object_key).split('/')[0] - print(f'{str(datetime.now())} Info I-2 バケット名:{event_bucket_name}') - print(f'{str(datetime.now())} Info I-3 ファイル名:{event_file_name}') - print(f'{str(datetime.now())} Info I-4 データソース名:{event_data_source_name}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-2 バケット名:{event_bucket_name}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-3 ファイル名:{event_file_name}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-4 データソース名:{event_data_source_name}') # ECSを起動する response = ecs_client.run_task( @@ -66,6 +66,6 @@ def lambda_handler(event, context): ], }, ) - print(f'{str(datetime.now())} Info I-5 ECS起動レスポンス:{str(response)}') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-5 ECS起動レスポンス:{str(response)}') - print(f'{str(datetime.now())} Info I-6 駆動処理終了') + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} Info I-6 駆動処理終了')