refactor: 重複している記述を変数化してまとめた。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-08 10:17:41 +09:00
parent e029c5641d
commit 841fc95651

View File

@ -45,6 +45,10 @@ TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
INFO = LOG_LEVEL['i']
WARNING = LOG_LEVEL['w']
ERROR = LOG_LEVEL['e']
# クラス変数
s3_client = boto3.client('s3')
@ -75,16 +79,16 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# ① メイン処理開始ログを出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"],
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
@ -92,7 +96,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-03 - タイムゾーンを変更しました')
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
# 個別設定ファイルの読み込み
@ -107,16 +111,17 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
settings_list.append('')
# ロードスキーマのTRUNCATE
load_schema_name = settings_list[SETTINGS_ITEM["loadSchemaName"]]
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
sql_truncate = f'TRUNCATE table {load_schema_name}'
cur.execute(sql_truncate)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' +
f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
f' I-MAIN-04 - {load_schema_name} をTRUNCATEしました')
# ⑤ 投入データファイルを1行ごとにループする
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read(
@ -138,7 +143,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
index += 1
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
@ -146,7 +151,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# SQL文生成
query_parameter_list = []
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
sql = f'INSERT INTO {load_schema_name} ('
for db_column in settings_db_columu_list:
sql = f'{sql} {db_column},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
@ -193,37 +198,38 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
warning_count += 1
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
# ⑥ ⑤の処理結果件数をログ出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
storage_schema_name = settings_list[SETTINGS_ITEM["storageSchemaName"]]
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' +
f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-08 - ' +
f'ロードスキーマ({load_schema_name})のデータを蓄積スキーマ({storage_schema_name})に登録します')
# インポート方法判断
try:
if truncate_judge(settings_list):
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
sql_truncate = f'TRUNCATE table {storage_schema_name}'
cur.execute(sql_truncate)
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' +
f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO}' +
f' I-MAIN-20 - {storage_schema_name} をTRUNCATEしました')
except InvalidConfigException as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-01 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
sql = f'INSERT INTO {storage_schema_name} ('
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
@ -243,7 +249,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
sql = f'{sql} t.ins_date,' # システム項目:登録日時
sql = f'{sql} t.upd_user,' # システム項目:更新者
sql = f'{sql} t.upd_date' # システム項目:更新日時
sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t'
sql = f'{sql} FROM {load_schema_name} as t'
sql = f'{sql} ON DUPLICATE KEY UPDATE'
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
@ -252,11 +258,11 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# システム項目:取込ファイル行番号
sql = f'{sql} file_row_cnt=t.file_row_cnt,'
# システム項目:論理削除フラグ
sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,'
sql = f'{sql} delete_flg={storage_schema_name}.delete_flg,'
# システム項目:登録者
sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,'
sql = f'{sql} ins_user={storage_schema_name}.ins_user,'
# システム項目:登録日時
sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,'
sql = f'{sql} ins_date={storage_schema_name}.ins_date,'
# システム項目:更新者
sql = f'{sql} upd_user=t.ins_user,'
# システム項目:更新日時
@ -266,36 +272,36 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# トランザクション開始
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' +
f'標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-09 - ' +
f'標準SQL{storage_schema_name} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' +
f'標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-10 - ' +
f'標準SQL{storage_schema_name} のCOMMIT処理が正常終了しました')
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]]
if ex_sql_file_name:
try:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' +
f'拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \
settings_list[SETTINGS_ITEM["exSqlFileName"]]
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-13 - 拡張SQLファイル名{ex_sql_file_name} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception:
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-03 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
@ -308,28 +314,26 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# トランザクション開始
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' +
f'拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-15 - 拡張SQL{storage_schema_name} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' +
f'拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-16 - 拡張SQL{storage_schema_name} のCOMMIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました{e}\n'
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source,
target_file_name, log_info)
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source,
target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
@ -337,12 +341,12 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
try:
# ⑩ メイン処理終了ログを出力する
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-19 - メイン処理を終了します')
return warning_info
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
@ -358,10 +362,10 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
try:
conn.close()
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-MAIN-18 - DB接続を終了しました')
except Exception as e:
print(
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {ERROR} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
@ -393,15 +397,15 @@ def truncate_judge(settings_list):
Returns:
Bool: Truncate対象の場合TrueTruncate対象でない場合False
"""
import_manner = settings_list[SETTINGS_ITEM["importManner"]]
# upsert判定
if not settings_list[SETTINGS_ITEM["importManner"]]:
if not import_manner:
return False
# インポート方法設定チェック
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
if not import_manner.startswith(TRUNCATE_SRC_TABLE_SYMBOL):
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(
import_manner_splitted_list = import_manner.split(
':')
if len(import_manner_splitted_list) != 2:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)