diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 3233472c..1969fb9f 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,6 +1,5 @@ import csv import io -import re from datetime import datetime import boto3 @@ -34,11 +33,13 @@ SETTINGS_ITEM = { 'reserved5': 18, 'reserved6': 19 } + LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n', } + DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' @@ -64,7 +65,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) - debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log( + f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) debug_log(f'引数 db_info : {db_info}', log_info, mode) @@ -72,24 +74,30 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf debug_log(f'引数 mode : {mode}', log_info, mode) # ① メイン処理開始ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する - conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], + db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ③ タイムゾーンを変更する with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする # 個別設定ファイルの読み込み - settings_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key) + settings_response = s3_client.get_object( + Bucket=bucket_name, Key=settings_key) settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) @@ -102,27 +110,35 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + + f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') # ⑤ 投入データファイルを1行ごとにループする - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key) - work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) + work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read( + )), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター warning_info = '' # ワーニング情報 index = 0 # ループインデックス - settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') - settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',') + settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip( + ).split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip( + ).split(',') - for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), + delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: - if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0: index += 1 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント @@ -151,7 +167,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # データ項目値の変換処理(カンマ除去) org_column_value = line[i] current_settings_db_column_name = settings_db_columu_list[i] - column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) + column_value = convert_column_value( + org_column_value, current_settings_db_column_name, settings_replace_comma_list) # INSERT文のパラメータとそれに対応するプレースホルダーを設定する query_parameter_list.append(column_value) sql = f'{sql} %s,' @@ -175,26 +192,34 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf except Exception as e: warning_count += 1 warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑥ ⑤の処理結果件数をログ出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') - + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' + + f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + # インポート方法判断 try: if truncate_judge(settings_list): with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' cur.execute(sql_truncate) - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' + + f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') except InvalidConfigException as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 @@ -222,72 +247,102 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} ON DUPLICATE KEY UPDATE' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' - sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名 - sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号 - sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ - sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' # システム項目:登録者 - sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' # システム項目:登録日時 - sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新者 - sql = f'{sql} upd_date=t.ins_date' # システム項目:更新日時 + # システム項目:取込ファイル名 + sql = f'{sql} file_name=t.file_name,' + # システム項目:取込ファイル行番号 + sql = f'{sql} file_row_cnt=t.file_row_cnt,' + # システム項目:論理削除フラグ + sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' + # システム項目:登録者 + sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' + # システム項目:登録日時 + sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' + # システム項目:更新者 + sql = f'{sql} upd_user=t.ins_user,' + # システム項目:更新日時 + sql = f'{sql} upd_date=t.ins_date' debug_log(sql, log_info, mode) # トランザクション開始 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' + + f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' + + f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: try: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' + + f'拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \ + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') - except Exception as e: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + except Exception: warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: # 拡張SQLファイルからSQL文生成 - ex_sql_obj_response = s3_client.get_object(Bucket=bucket_name, Key=ex_sql_key) + ex_sql_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=ex_sql_key) ex_sql = '' for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'): ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' + + f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' + + f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + connection_close(conn, bucket_name, target_data_source, + target_file_name, log_info) except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') - connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, + target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑩ メイン処理終了ログを出力する - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') return warning_info except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -302,11 +357,14 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo """ try: conn.close() - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') except Exception as e: - print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): """データ項目値変換処理 - 数値内のカンマ除去処理 @@ -320,8 +378,8 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett """ # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く converted_column_value = org_column_value - if current_settings_db_column_name in settings_replace_comma_list: - converted_column_value = converted_column_value.replace(',', '') + if current_settings_db_column_name in settings_replace_comma_list: + converted_column_value = converted_column_value.replace(',', '') return converted_column_value @@ -339,11 +397,12 @@ def truncate_judge(settings_list): # upsert判定 if not settings_list[SETTINGS_ITEM["importManner"]]: return False - + # インポート方法設定チェック if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) - import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split( + ':') if len(import_manner_splitted_list) != 2: raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: