2022-07-08 11:32:18 +09:00

333 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
import boto3
import pymysql
from pymysql.constants import CLIENT
import io
import csv
from error import error
from common import debug_log
# 定数
DIRECTORY_WORK = '/work/'
LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'}
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
'charCode': 2,
'quotechar': 3,
'lineFeedCode': 4,
'headerFlag': 5,
'csvNumItems': 6,
'csvNameItems': 7,
'dbColumuName': 8,
'storageSchemaName': 9,
'loadSchemaName': 10,
'exSqlFileName': 11,
'reserved0': 12,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,
'reserved4': 17,
'reserved5': 18,
'reserved6': 19
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
DIRECTORY_SETTINGS = '/settings/'
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
# クラス変数
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
"""メイン処理
Args:
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス
db_info : データベース情報
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns:
warning_info : Warning情報
"""
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 db_info : {db_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① メイン処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ③ タイムゾーンを変更する
with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
# 個別設定ファイルの読み込み
settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get()
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
# 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
settings_list.append('')
# ロードスキーマのTRUNCATE
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
# ⑤ 投入データファイルを1行ごとにループする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key)
work_response = work_obj.get()
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0:
index += 1
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user,' # システム項目:登録者
sql = f'{sql} ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user,' # システム項目:更新者
sql = f'{sql} upd_date)' # システム項目:更新日時
sql = f'{sql} VALUES ('
for i in range(len(line)):
# データ項目値が0桁より大きいかチェックする
if len(line[i]) > 0:
# 0桁より大きい場合
replace_line = line[i].replace('\\', '\\\\')
sql = f'{sql} "{replace_line}",'
else:
# 上記以外の場合
sql = f'{sql} NULL,'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
index += 1
debug_log(sql, log_info, mode)
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
except Exception as e:
warning_count += 1
warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
# ⑥ ⑤の処理結果件数をログ出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
# インポート方法判断
try:
if truncate_judge(settings_list):
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
except InvalidConfigException as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]},'
sql = f'{sql} file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user,' # システム項目:登録者
sql = f'{sql} ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user,' # システム項目:更新者
sql = f'{sql} upd_date)' # システム項目:更新日時
sql = f'{sql} SELECT'
for i in range(len(settings_db_columu_list)):
sql = f'{sql} t.{settings_db_columu_list[i]},'
sql = f'{sql} t.file_name,' # システム項目:取込ファイル名
sql = f'{sql} t.file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} t.delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} t.ins_user,' # システム項目:登録者
sql = f'{sql} t.ins_date,' # システム項目:登録日時
sql = f'{sql} t.upd_user,' # システム項目:更新者
sql = f'{sql} t.upd_date' # システム項目:更新日時
sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t'
sql = f'{sql} ON DUPLICATE KEY UPDATE'
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ
sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者
sql = f'{sql} ins_date=t.ins_date,' # システム項目:登録日時
sql = f'{sql} upd_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.upd_user,' # システム項目:更新者
sql = f'{sql} upd_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.upd_date' # システム項目:更新日時
debug_log(sql, log_info, mode)
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
try:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception as e:
warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
ex_sql_response = ex_sqls_obj.get()
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました{e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ⑩ メイン処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
return warning_info
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
"""DB接続切断処理
Args:
conn : DBコネクション
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名
"""
try:
conn.close()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def truncate_judge(settings_list):
"""TRUNCATE処理対応判定
Args:
settings_list (list): 個別設定ファイル
Raises:
InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス
Returns:
Bool: Truncate対象の場合True。Truncate対象でない場合False
"""
# upsert判定
if not settings_list[SETTINGS_ITEM["importManner"]]:
return False
# インポート方法設定チェック
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
if len(import_manner_splitted_list) != 2:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
return True
class InvalidConfigException(Exception):
pass