196 lines
11 KiB
Python
196 lines
11 KiB
Python
from datetime import datetime
|
||
import boto3
|
||
import pymysql
|
||
import io
|
||
import csv
|
||
from error import error
|
||
|
||
ecs_client = boto3.client('ecs')
|
||
s3_client = boto3.client('s3')
|
||
s3_resource = boto3.resource('s3')
|
||
|
||
LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'}
|
||
SETTINGS_ITEM = {
|
||
'dataSource': 0,
|
||
'delimiter': 1,
|
||
'charCode': 2,
|
||
'quotechar': 3,
|
||
'lineFeedCode': 4,
|
||
'headerFlag': 5,
|
||
'csvNumItems': 6,
|
||
'csvNameItems': 7,
|
||
'dbColumuName': 8,
|
||
'storageSchemaName': 9,
|
||
'loadSchemaName': 10,
|
||
'exSqlFileName': 11,
|
||
}
|
||
DIRECTORY_SETTINGS = '/settings/'
|
||
|
||
|
||
def init(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info):
|
||
"""メイン処理
|
||
Args:
|
||
bucket_name : バケット名
|
||
target_key : 投入データのフルパス
|
||
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
|
||
target_file_name : 投入データのファイル名
|
||
settings_key : 投入データに該当する個別設定ファイルのフルパス
|
||
db_info : データベース情報
|
||
log_info : ログに記載するデータソース名とファイル名
|
||
Returns:
|
||
warning_info : Warning情報
|
||
"""
|
||
|
||
# ① メイン処理開始ログを出力する
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
|
||
|
||
# ② DB接続を開始する
|
||
try:
|
||
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5)
|
||
except Exception as e:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name)
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
|
||
|
||
# ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||
try:
|
||
# 個別設定ファイルの読込
|
||
settings_obj = s3_resource.Object(bucket_name, settings_key)
|
||
settings_response = settings_obj.get()
|
||
settings_list = []
|
||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||
settings_list.append(line.rstrip())
|
||
|
||
# TRUNCATE実行
|
||
with conn.cursor() as cur:
|
||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
||
cur.execute(sql_truncate)
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
|
||
except Exception as e:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name)
|
||
|
||
# ④ 投入データファイルを1行ごとにループする
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - 投入データ {target_file_name} の読み込みを開始します')
|
||
try:
|
||
target_obj = s3_resource.Object(bucket_name, target_key)
|
||
target_response = target_obj.get()
|
||
target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=settings_list[SETTINGS_ITEM["lineFeedCode"]])
|
||
except Exception as e:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name)
|
||
|
||
process_count = 0 # 処理件数カウンタ
|
||
normal_count = 0 # 正常終了件数カウンタ
|
||
warning_count = 0 # ワーニング終了件数カウンター
|
||
warning_info = '' # ワーニング情報
|
||
index = 0 # ループインデックス
|
||
|
||
for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||
try:
|
||
if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0:
|
||
index += 1
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - ヘッダー行をスキップします')
|
||
continue
|
||
|
||
# 処理件数カウント
|
||
process_count += 1
|
||
|
||
# SQL文生成
|
||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
|
||
for i in range(len(line)):
|
||
sql = f'{sql} "{line[i]}",'
|
||
|
||
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
|
||
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} "0",' # システム項目:論理削除フラグ
|
||
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
|
||
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
|
||
sql = f'{sql} NULL,' # システム項目:更新者
|
||
sql = f'{sql} NULL)' # システム項目:更新日時
|
||
|
||
# ロードスキーマのトランザクション開始
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
conn.commit()
|
||
normal_count += 1
|
||
|
||
index += 1
|
||
except Exception as e:
|
||
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
|
||
|
||
# ⑤ ④の処理結果件数をログ出力する
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}')
|
||
|
||
# ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
||
try:
|
||
# SQL文生成
|
||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
|
||
sql = f'{sql} SELECT t.*'
|
||
sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t'
|
||
sql = f'{sql} ON DUPLICATE KEY UPDATE'
|
||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
|
||
for i in range(len(settings_db_columu_list)):
|
||
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
|
||
sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名
|
||
sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号
|
||
sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者
|
||
sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時
|
||
|
||
# トランザクション開始
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
conn.commit()
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
|
||
except Exception as e:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||
error(bucket_name, target_data_source, target_file_name)
|
||
|
||
# ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 拡張SQL設定が存在するかチェックします')
|
||
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定の存在を確認しました')
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
|
||
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
|
||
try:
|
||
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
|
||
ex_sql_file_exists = True
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名の存在を確認しました')
|
||
except Exception as e:
|
||
warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n'
|
||
ex_sql_file_exists = False
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - 拡張SQLファイルが存在しません')
|
||
|
||
try:
|
||
if ex_sql_file_exists:
|
||
# 拡張SQLファイルからSQL文生成
|
||
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
|
||
ex_sql_response = ex_sqls_obj.get()
|
||
ex_sql = ''
|
||
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
|
||
ex_sql = f'{ex_sql} {line.rstrip()}'
|
||
|
||
# トランザクション開始
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
|
||
with conn.cursor() as cur:
|
||
cur.execute(ex_sql)
|
||
conn.commit()
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
|
||
except Exception as e:
|
||
warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n'
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました:{e}')
|
||
else:
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL設定の存在はありませんでした')
|
||
|
||
# ⑧ DB接続を終了する
|
||
conn.close()
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - DB接続を終了しました')
|
||
|
||
# ⑨ 終了ログの出力
|
||
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - メイン処理を終了します')
|
||
|
||
return warning_info
|