diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 1969fb9f..859419c9 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -45,6 +45,10 @@ TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' +INFO = LOG_LEVEL['i'] +WARNING = LOG_LEVEL['w'] +ERROR = LOG_LEVEL['e'] + # クラス変数 s3_client = boto3.client('s3') @@ -75,16 +79,16 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ① メイン処理開始ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました') except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: @@ -92,7 +96,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする # 個別設定ファイルの読み込み @@ -107,16 +111,17 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf settings_list.append('') # ロードスキーマのTRUNCATE + load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]] with conn.cursor() as cur: - sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' + sql_truncate = f'TRUNCATE table {load_schema_name}' cur.execute(sql_truncate) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + - f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + + f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました') # ⑤ 投入データファイルを1行ごとにループする print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( @@ -138,7 +143,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: index += 1 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -146,7 +151,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # SQL文生成 query_parameter_list = [] - sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' + sql = f'INSERT INTO {load_schema_name} (' for db_column in settings_db_columu_list: sql = f'{sql} {db_column},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 @@ -193,37 +198,38 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf warning_count += 1 warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑥ ⑤の処理結果件数をログ出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする + storage_schema_name = settings_list[SETTINGS_ITEM["storageSchemaName"]] print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' + - f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-08 - ' + + f'ロードスキーマ({load_schema_name})のデータを蓄積スキーマ({storage_schema_name})に登録します') # インポート方法判断 try: if truncate_judge(settings_list): with conn.cursor() as cur: - sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + sql_truncate = f'TRUNCATE table {storage_schema_name}' cur.execute(sql_truncate) print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + - f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' + + f' I-MAIN-20 - {storage_schema_name} をTRUNCATEしました') except InvalidConfigException as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-01 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 - sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' + sql = f'INSERT INTO {storage_schema_name} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 @@ -243,7 +249,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} t.ins_date,' # システム項目:登録日時 sql = f'{sql} t.upd_user,' # システム項目:更新者 sql = f'{sql} t.upd_date' # システム項目:更新日時 - sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t' + sql = f'{sql} FROM {load_schema_name} as t' sql = f'{sql} ON DUPLICATE KEY UPDATE' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' @@ -252,11 +258,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # システム項目:取込ファイル行番号 sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:論理削除フラグ - sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' + sql = f'{sql} delete_flg={storage_schema_name}.delete_flg,' # システム項目:登録者 - sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' + sql = f'{sql} ins_user={storage_schema_name}.ins_user,' # システム項目:登録日時 - sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' + sql = f'{sql} ins_date={storage_schema_name}.ins_date,' # システム項目:更新者 sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新日時 @@ -266,36 +272,36 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # トランザクション開始 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' + - f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-09 - ' + + f'標準SQL:{storage_schema_name} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' + - f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-10 - ' + + f'標準SQL:{storage_schema_name} のCOMMIT処理が正常終了しました') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') - if settings_list[SETTINGS_ITEM["exSqlFileName"]]: + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]] + + if ex_sql_file_name: try: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-12 - 拡張SQL設定の存在を確認しました') print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' + - f'拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \ - settings_list[SETTINGS_ITEM["exSqlFileName"]] + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-13 - 拡張SQLファイル名:{ex_sql_file_name} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception: warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: @@ -308,28 +314,26 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # トランザクション開始 print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' + - f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-15 - 拡張SQL:{storage_schema_name} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' + - f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-16 - 拡張SQL:{storage_schema_name} のCOMMIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) @@ -337,12 +341,12 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf try: # ⑩ メイン処理終了ログを出力する print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-19 - メイン処理を終了します') return warning_info except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -358,10 +362,10 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo try: conn.close() print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-18 - DB接続を終了しました') except Exception as e: print( - f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -393,15 +397,15 @@ def truncate_judge(settings_list): Returns: Bool: Truncate対象の場合True。Truncate対象でない場合False """ - + import_manner = settings_list[SETTINGS_ITEM["importManner"]] # upsert判定 - if not settings_list[SETTINGS_ITEM["importManner"]]: + if not import_manner: return False # インポート方法設定チェック - if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + if not import_manner.startswith(TRUNCATE_SRC_TABLE_SYMBOL): raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) - import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split( + import_manner_splitted_list = import_manner.split( ':') if len(import_manner_splitted_list) != 2: raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)