234 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
import boto3
import pymysql
from pymysql.constants import CLIENT
import io
import csv
from error import error
from common import debug_log
# 定数
DIRECTORY_WORK = '/work/'
LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'}
SETTINGS_ITEM = {
'dataSource': 0,
'delimiter': 1,
'charCode': 2,
'quotechar': 3,
'lineFeedCode': 4,
'headerFlag': 5,
'csvNumItems': 6,
'csvNameItems': 7,
'dbColumuName': 8,
'storageSchemaName': 9,
'loadSchemaName': 10,
'exSqlFileName': 11,
}
LINE_FEED_CODE = {
'CR': '\r',
'LF': '\n',
'CRLF': '\r\n',
}
DIRECTORY_SETTINGS = '/settings/'
# クラス変数
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
"""メイン処理
Args:
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス
db_info : データベース情報
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns:
warning_info : Warning情報
"""
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 db_info : {db_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① メイン処理開始ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ③ タイムゾーンを変更する
with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get()
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip('\n'))
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
# ⑤ 投入データファイルを1行ごとにループする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key)
work_response = work_obj.get()
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0:
index += 1
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
for i in range(len(line)):
sql = f'{sql} "{line[i]}",'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
index += 1
except Exception as e:
warning_count += 1
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
# ⑥ ⑤の処理結果件数をログ出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
sql = f'{sql} SELECT t.*'
sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t'
sql = f'{sql} ON DUPLICATE KEY UPDATE'
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
for i in range(len(settings_db_columu_list)):
sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},'
sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名
sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号
sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者
sql = f'{sql} ins_date=t.ins_date' # システム項目:登録日時
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
try:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
ex_sql_response = ex_sqls_obj.get()
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
# トランザクション開始
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました{e}\n'
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ⑩ メイン処理終了ログを出力する
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
return warning_info
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
"""DB接続切断処理
Args:
conn : DBコネクション
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名
"""
try:
conn.close()
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
except Exception as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)