diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index 6c043e76..dd70d836 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -47,7 +47,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting Raises: CheckError : チェックでエラーがあった場合に発生する例外 """ - # ① 開始ログの出力 + # ① チェック処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') try: @@ -65,7 +65,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting target_header_list = line break - # ② C-1 項目数チェック + # ② C-1の項目数チェックを開始する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-1のチェックを開始します') target_header_list_len = len(target_header_list) if target_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): @@ -73,7 +73,7 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting else: raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{target_header_list_len}') - # ③ C-2 項目並び順チェック + # ③ C-2の項目並び順チェック開始する if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - C-2のチェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') @@ -90,5 +90,5 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ④ 終了ログの出力 + # ④ チェック処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index e4b0d637..d04edf53 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -30,24 +30,24 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' """ -# ① 開始ログの出力 +# ① データ取込処理開始ログを出力する print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') -# ② 初期処理 +# ② 初期処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) -# ③ チェック処理 +# ③ チェック処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) -# ④ メイン処理 +# ④ メイン処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) -# ⑤ 終了処理 +# ⑤ 終了処理を呼び出す print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) -# ⑥ 終了ログの出力 +# ⑥ データ取込処理終了ログを出力する print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 06bad863..3e7025ad 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -23,11 +23,11 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf warning_info : Warning情報 log_info : ログに記載するデータソース名とファイル名 """ - # ① 開始ログの出力 + # ① 終了処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') try: - # ② 投入データファイルをdoneディレクトリに移動 + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() @@ -39,13 +39,13 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-02 - workディレクトリの {target_file_name} をdoneディレクトリに移動しました 移動後ファイル名:{done_file_name}') - # ③ doingファイルの削除 + # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-03 - targetディレクトリの {doing_file_name} を削除しました') - # ④ Warning情報の存在確認 + # ④ Warning情報が存在するか確認する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-04 - Warning情報の存在チェック') if warning_info: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-05 - Warning情報は存在しました') @@ -75,5 +75,5 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑤ 終了ログの出力 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-0 - 終了処理を終了します') + # ⑤ 終了処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index d8d17c27..08a61238 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -22,11 +22,11 @@ def error(bucket_name, target_data_source, target_file_name, log_info): log_info : ログに記載するデータソース名とファイル名 """ - # ① 開始ログの出力 + # ① エラー処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') try: - # ② 投入データファイルをerrorディレクトリに移動 + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() @@ -38,13 +38,13 @@ def error(bucket_name, target_data_source, target_file_name, log_info): s3_client.delete_object(Bucket=bucket_name, Key=work_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-02 - workディレクトリの {target_file_name} をerrorディレクトリに移動しました 移動後ファイル名:{error_file_name}') - # ③ doingファイルの削除 + # ③ S3バケット内のtargetディレクトリに存在する「投入データファイル名.doing」ファイルを削除する doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name s3_client.delete_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-03 - targetディレクトリの {doing_file_name} を削除しました') - # ④ error処理結果ファイルの作成 + # ④ S3バケット内のtargetディレクトリに、「投入データファイル名.error」ファイルを作成する result_error_file_name = target_file_name + '.error' result_error_key = target_data_source + DIRECTORY_TARGET + result_error_file_name result_error_obj = s3_resource.Object(bucket_name, result_error_key) @@ -53,8 +53,8 @@ def error(bucket_name, target_data_source, target_file_name, log_info): except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-ERR-99 - エラー内容:{e}') finally: - # ⑤ 終了ログの出力 + # ⑤ 終了処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-05 - エラー処理を終了します') - # ⑥ 処理終了 + # ⑥ 処理を終了する sys.exit() diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index d3720db5..5953895b 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -29,88 +29,84 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info settings_key : 投入データに該当する個別設定ファイルのフルパス """ - # ① 開始ログの出力 + # ① 初期処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') - # doingファイル情報作成 - doing_file_name = target_file_name + '.doing' - doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name - # ② doingファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') + # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする try: + doing_file_name = target_file_name + '.doing' + doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') s3_client.head_object(Bucket=bucket_name, Key=doing_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-01 - 投入データ {target_file_name} は既に処理中です') error(bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-03 - doingファイルは存在しませんでした') - # ③ doingファイルの作成 try: + # ③ S3バケット内のtargetディレクトリに、「投入データファイル名.doing」ファイルを作成する doing_obj = s3_resource.Object(bucket_name, doing_key) doing_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-04 - targetディレクトリに {doing_file_name} を作成しました') - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name, log_info) - # ④ 投入データファイルをworkディレクトリに移動 - try: + # ④ 投入データファイルをS3バケット内のtargetディレクトリから、workディレクトリに移動(コピー削除)する target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() work_key = target_data_source + DIRECTORY_WORK + target_file_name work_body = target_response["Body"].read() work_obj = s3_resource.Object(bucket_name, work_key) work_obj.put(Body=work_body) + s3_client.delete_object(Bucket=bucket_name, Key=target_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-05 - 投入データ {target_file_name} をworkディレクトリに移動しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑤ 処理結果ファイルの削除 - # doneファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') - result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' + # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する try: + # doneファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') + result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' s3_client.head_object(Bucket=bucket_name, Key=result_done_key) s3_client.delete_object(Bucket=bucket_name, Key=result_done_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-07 - doneファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-08 - doneファイルは存在しませんでした') - # warningファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') - result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' try: + # warningファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-09 - warningファイル:{target_file_name}.warning の存在チェック') + result_warning_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.warning' s3_client.head_object(Bucket=bucket_name, Key=result_warning_key) s3_client.delete_object(Bucket=bucket_name, Key=result_warning_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-10 - warningファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-11 - warningファイルは存在しませんでした') - # errorファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') - result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' try: + # errorファイルの存在確認 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-12 - errorファイル:{target_file_name}.error の存在チェック') + result_error_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.error' s3_client.head_object(Bucket=bucket_name, Key=result_error_key) s3_client.delete_object(Bucket=bucket_name, Key=result_error_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-13 - errorファイルが存在したため削除しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') - # ⑥ 個別設定マッピングリストの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') - mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME + # ⑥ 個別設定マッピングリストが存在するかチェックする try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') + mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-16 - 個別設定マッピングリストの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑦ 個別設定ファイルの特定 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') + # ⑦ 個別設定ファイルを特定する try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() mapping_body = io.TextIOWrapper(io.BytesIO(mapping_response["Body"].read()), encoding='utf-8') @@ -129,16 +125,16 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑧ 個別設定ファイルの存在確認 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') - settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name + # ⑧ ⑦の個別設定ファイルが存在するかチェックする try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') + settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-20 - 個別設定ファイルの存在を確認しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑨ 終了ログの出力 + # ⑨ 初期処理終了ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') return settings_key diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index a2551d83..ad77fdb4 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -50,35 +50,35 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings # ① メイン処理開始ログを出力する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') - # ② DB接続を開始する try: + # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5) - except Exception as e: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - error(bucket_name, target_data_source, target_file_name, log_info) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') - # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする - try: - # 個別設定ファイルの読込 + # ③ タイムゾーンを変更する + with conn.cursor() as cur: + cur.execute(f'SET time_zone = "+9:00"') + result = cur.fetchall() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}') + + # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip()) - # TRUNCATE実行 with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ④ 投入データファイルを1行ごとにループする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します') try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) @@ -96,7 +96,7 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings try: if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: index += 1 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - ヘッダー行をスキップします') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -127,11 +127,12 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑤ ④の処理結果件数をログ出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') # ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' sql = f'{sql} SELECT t.*' @@ -146,25 +147,25 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時 # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定の存在を確認しました') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception as e: warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False @@ -180,22 +181,26 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました:{e}') else: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL設定の存在はありませんでした') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑧ DB接続を終了する - conn.close() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - DB接続を終了しました') + try: + conn.close() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) - # ⑨ 終了ログの出力 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - メイン処理を終了します') + # ⑨ メイン処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') return warning_info