refactor: フォーマットを適用し、長すぎる行を適度に改行した。
This commit is contained in:
parent
70d61910cc
commit
e029c5641d
@ -1,6 +1,5 @@
|
|||||||
import csv
|
import csv
|
||||||
import io
|
import io
|
||||||
import re
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
@ -34,11 +33,13 @@ SETTINGS_ITEM = {
|
|||||||
'reserved5': 18,
|
'reserved5': 18,
|
||||||
'reserved6': 19
|
'reserved6': 19
|
||||||
}
|
}
|
||||||
|
|
||||||
LINE_FEED_CODE = {
|
LINE_FEED_CODE = {
|
||||||
'CR': '\r',
|
'CR': '\r',
|
||||||
'LF': '\n',
|
'LF': '\n',
|
||||||
'CRLF': '\r\n',
|
'CRLF': '\r\n',
|
||||||
}
|
}
|
||||||
|
|
||||||
DIRECTORY_SETTINGS = '/settings/'
|
DIRECTORY_SETTINGS = '/settings/'
|
||||||
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
|
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
|
||||||
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
|
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
|
||||||
@ -64,7 +65,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
|
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
|
||||||
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
|
debug_log(
|
||||||
|
f'引数 target_data_source : {target_data_source}', log_info, mode)
|
||||||
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
|
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
|
||||||
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
|
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
|
||||||
debug_log(f'引数 db_info : {db_info}', log_info, mode)
|
debug_log(f'引数 db_info : {db_info}', log_info, mode)
|
||||||
@ -72,24 +74,30 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
debug_log(f'引数 mode : {mode}', log_info, mode)
|
debug_log(f'引数 mode : {mode}', log_info, mode)
|
||||||
|
|
||||||
# ① メイン処理開始ログを出力する
|
# ① メイン処理開始ログを出力する
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
|
||||||
|
|
||||||
# ② DB接続を開始する
|
# ② DB接続を開始する
|
||||||
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
|
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"],
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
|
db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
|
||||||
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ③ タイムゾーンを変更する
|
# ③ タイムゾーンを変更する
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(f'SET time_zone = "+9:00"')
|
cur.execute(f'SET time_zone = "+9:00"')
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
|
||||||
|
|
||||||
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||||||
# 個別設定ファイルの読み込み
|
# 個別設定ファイルの読み込み
|
||||||
settings_response = s3_client.get_object(Bucket=bucket_name, Key=settings_key)
|
settings_response = s3_client.get_object(
|
||||||
|
Bucket=bucket_name, Key=settings_key)
|
||||||
settings_list = []
|
settings_list = []
|
||||||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||||||
settings_list.append(line.rstrip('\n'))
|
settings_list.append(line.rstrip('\n'))
|
||||||
@ -102,27 +110,35 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
||||||
cur.execute(sql_truncate)
|
cur.execute(sql_truncate)
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' +
|
||||||
|
f' I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
|
||||||
|
|
||||||
# ⑤ 投入データファイルを1行ごとにループする
|
# ⑤ 投入データファイルを1行ごとにループする
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
|
||||||
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
work_key = target_data_source + DIRECTORY_WORK + target_file_name
|
||||||
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
|
work_response = s3_client.get_object(Bucket=bucket_name, Key=work_key)
|
||||||
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read(
|
||||||
|
)), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||||
|
|
||||||
process_count = 0 # 処理件数カウンタ
|
process_count = 0 # 処理件数カウンタ
|
||||||
normal_count = 0 # 正常終了件数カウンタ
|
normal_count = 0 # 正常終了件数カウンタ
|
||||||
warning_count = 0 # ワーニング終了件数カウンター
|
warning_count = 0 # ワーニング終了件数カウンター
|
||||||
warning_info = '' # ワーニング情報
|
warning_info = '' # ワーニング情報
|
||||||
index = 0 # ループインデックス
|
index = 0 # ループインデックス
|
||||||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
|
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip(
|
||||||
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',')
|
).split(',')
|
||||||
|
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip(
|
||||||
|
).split(',')
|
||||||
|
|
||||||
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]),
|
||||||
|
delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||||
try:
|
try:
|
||||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0:
|
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and index == 0:
|
||||||
index += 1
|
index += 1
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 処理件数カウント
|
# 処理件数カウント
|
||||||
@ -151,7 +167,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
# データ項目値の変換処理(カンマ除去)
|
# データ項目値の変換処理(カンマ除去)
|
||||||
org_column_value = line[i]
|
org_column_value = line[i]
|
||||||
current_settings_db_column_name = settings_db_columu_list[i]
|
current_settings_db_column_name = settings_db_columu_list[i]
|
||||||
column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
column_value = convert_column_value(
|
||||||
|
org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
||||||
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
||||||
query_parameter_list.append(column_value)
|
query_parameter_list.append(column_value)
|
||||||
sql = f'{sql} %s,'
|
sql = f'{sql} %s,'
|
||||||
@ -175,26 +192,34 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
warning_count += 1
|
warning_count += 1
|
||||||
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
||||||
|
|
||||||
# ⑥ ⑤の処理結果件数をログ出力する
|
# ⑥ ⑤の処理結果件数をログ出力する
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
|
||||||
if warning_info:
|
if warning_info:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}')
|
||||||
|
|
||||||
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ' +
|
||||||
|
f'ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
||||||
|
|
||||||
# インポート方法判断
|
# インポート方法判断
|
||||||
try:
|
try:
|
||||||
if truncate_judge(settings_list):
|
if truncate_judge(settings_list):
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
|
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
|
||||||
cur.execute(sql_truncate)
|
cur.execute(sql_truncate)
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]}' +
|
||||||
|
f' I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
|
||||||
|
|
||||||
except InvalidConfigException as e:
|
except InvalidConfigException as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
# SQL文生成
|
# SQL文生成
|
||||||
@ -222,72 +247,102 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
sql = f'{sql} ON DUPLICATE KEY UPDATE'
|
sql = f'{sql} ON DUPLICATE KEY UPDATE'
|
||||||
for i in range(len(settings_db_columu_list)):
|
for i in range(len(settings_db_columu_list)):
|
||||||
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
|
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
|
||||||
sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名
|
# システム項目:取込ファイル名
|
||||||
sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号
|
sql = f'{sql} file_name=t.file_name,'
|
||||||
sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ
|
# システム項目:取込ファイル行番号
|
||||||
sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,' # システム項目:登録者
|
sql = f'{sql} file_row_cnt=t.file_row_cnt,'
|
||||||
sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,' # システム項目:登録日時
|
# システム項目:論理削除フラグ
|
||||||
sql = f'{sql} upd_user=t.ins_user,' # システム項目:更新者
|
sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,'
|
||||||
sql = f'{sql} upd_date=t.ins_date' # システム項目:更新日時
|
# システム項目:登録者
|
||||||
|
sql = f'{sql} ins_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_user,'
|
||||||
|
# システム項目:登録日時
|
||||||
|
sql = f'{sql} ins_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.ins_date,'
|
||||||
|
# システム項目:更新者
|
||||||
|
sql = f'{sql} upd_user=t.ins_user,'
|
||||||
|
# システム項目:更新日時
|
||||||
|
sql = f'{sql} upd_date=t.ins_date'
|
||||||
|
|
||||||
debug_log(sql, log_info, mode)
|
debug_log(sql, log_info, mode)
|
||||||
|
|
||||||
# トランザクション開始
|
# トランザクション開始
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - ' +
|
||||||
|
f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(sql)
|
cur.execute(sql)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - ' +
|
||||||
|
f'標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました')
|
||||||
|
|
||||||
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
|
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
|
||||||
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
|
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
|
||||||
try:
|
try:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
|
print(
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
|
||||||
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - ' +
|
||||||
|
f'拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
|
||||||
|
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + \
|
||||||
|
settings_list[SETTINGS_ITEM["exSqlFileName"]]
|
||||||
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
|
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
|
||||||
ex_sql_file_exists = True
|
ex_sql_file_exists = True
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
|
print(
|
||||||
except Exception as e:
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
|
||||||
|
except Exception:
|
||||||
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
|
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
|
||||||
ex_sql_file_exists = False
|
ex_sql_file_exists = False
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if ex_sql_file_exists:
|
if ex_sql_file_exists:
|
||||||
# 拡張SQLファイルからSQL文生成
|
# 拡張SQLファイルからSQL文生成
|
||||||
ex_sql_obj_response = s3_client.get_object(Bucket=bucket_name, Key=ex_sql_key)
|
ex_sql_obj_response = s3_client.get_object(
|
||||||
|
Bucket=bucket_name, Key=ex_sql_key)
|
||||||
ex_sql = ''
|
ex_sql = ''
|
||||||
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
|
for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'):
|
||||||
ex_sql = f'{ex_sql} {line.rstrip()}'
|
ex_sql = f'{ex_sql} {line.rstrip()}'
|
||||||
|
|
||||||
# トランザクション開始
|
# トランザクション開始
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - ' +
|
||||||
|
f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(ex_sql)
|
cur.execute(ex_sql)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - ' +
|
||||||
|
f'拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMMIT処理が正常終了しました')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n'
|
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n'
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}')
|
||||||
else:
|
else:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
|
||||||
|
|
||||||
# ⑨ DB接続を終了する
|
# ⑨ DB接続を終了する
|
||||||
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
|
connection_close(conn, bucket_name, target_data_source,
|
||||||
|
target_file_name, log_info)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
print(
|
||||||
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||||
|
connection_close(conn, bucket_name, target_data_source,
|
||||||
|
target_file_name, log_info)
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ⑩ メイン処理終了ログを出力する
|
# ⑩ メイン処理終了ログを出力する
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
|
||||||
|
|
||||||
return warning_info
|
return warning_info
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
|
|
||||||
@ -302,11 +357,14 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
conn.close()
|
conn.close()
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
print(
|
||||||
|
f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
|
|
||||||
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
|
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
|
||||||
"""データ項目値変換処理
|
"""データ項目値変換処理
|
||||||
- 数値内のカンマ除去処理
|
- 数値内のカンマ除去処理
|
||||||
@ -320,8 +378,8 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett
|
|||||||
"""
|
"""
|
||||||
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
|
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
|
||||||
converted_column_value = org_column_value
|
converted_column_value = org_column_value
|
||||||
if current_settings_db_column_name in settings_replace_comma_list:
|
if current_settings_db_column_name in settings_replace_comma_list:
|
||||||
converted_column_value = converted_column_value.replace(',', '')
|
converted_column_value = converted_column_value.replace(',', '')
|
||||||
|
|
||||||
return converted_column_value
|
return converted_column_value
|
||||||
|
|
||||||
@ -339,11 +397,12 @@ def truncate_judge(settings_list):
|
|||||||
# upsert判定
|
# upsert判定
|
||||||
if not settings_list[SETTINGS_ITEM["importManner"]]:
|
if not settings_list[SETTINGS_ITEM["importManner"]]:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# インポート方法設定チェック
|
# インポート方法設定チェック
|
||||||
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
|
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
|
||||||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||||
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
|
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(
|
||||||
|
':')
|
||||||
if len(import_manner_splitted_list) != 2:
|
if len(import_manner_splitted_list) != 2:
|
||||||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||||
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
|
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user