from datetime import datetime import boto3 import pymysql from pymysql.constants import CLIENT import io import csv from error import error from common import debug_log # 定数 DIRECTORY_WORK = '/work/' LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} SETTINGS_ITEM = { 'dataSource': 0, 'delimiter': 1, 'charCode': 2, 'quotechar': 3, 'lineFeedCode': 4, 'headerFlag': 5, 'csvNumItems': 6, 'csvNameItems': 7, 'dbColumuName': 8, 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, } LINE_FEED_CODE = { 'CR': '\r', 'LF': '\n', 'CRLF': '\r\n', } DIRECTORY_SETTINGS = '/settings/' # クラス変数 s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') def main(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode): """メイン処理 Args: bucket_name : バケット名 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス db_info : データベース情報 log_info : ログに記載するデータソース名とファイル名 mode : 処理モード Returns: warning_info : Warning情報 """ try: debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) debug_log(f'引数 settings_key : {settings_key}', log_info, mode) debug_log(f'引数 db_info : {db_info}', log_info, mode) debug_log(f'引数 log_info : {log_info}', log_info, mode) debug_log(f'引数 mode : {mode}', log_info, mode) # ① メイン処理開始ログを出力する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) try: # ③ タイムゾーンを変更する with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') # ⑤ 投入データファイルを1行ごとにループする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) process_count = 0 # 処理件数カウンタ normal_count = 0 # 正常終了件数カウンタ warning_count = 0 # ワーニング終了件数カウンター warning_info = '' # ワーニング情報 index = 0 # ループインデックス settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: index += 1 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') continue # 処理件数カウント process_count += 1 # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ sql = f'{sql} ins_user,' # システム項目:登録者 sql = f'{sql} ins_date,' # システム項目:登録日時 sql = f'{sql} upd_user,' # システム項目:更新者 sql = f'{sql} upd_date)' # システム項目:更新日時 sql = f'{sql} VALUES (' for i in range(len(line)): if line[i]: replace_line = line[i].replace('\\', '\\\\') sql = f'{sql} "{replace_line}",' else: sql = f'{sql} NULL,' sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 sql = f'{sql} "0",' # システム項目:論理削除フラグ sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 sql = f'{sql} NULL,' # システム項目:更新者 sql = f'{sql} NULL)' # システム項目:更新日時 index += 1 debug_log(sql, log_info, mode) # ロードスキーマのトランザクション開始 with conn.cursor() as cur: cur.execute(sql) conn.commit() normal_count += 1 except Exception as e: warning_count += 1 warning_info = f'{warning_info}{index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') # ⑥ ⑤の処理結果件数をログ出力する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') if warning_info: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ sql = f'{sql} ins_user,' # システム項目:登録者 sql = f'{sql} ins_date,' # システム項目:登録日時 sql = f'{sql} upd_user,' # システム項目:更新者 sql = f'{sql} upd_date)' # システム項目:更新日時 sql = f'{sql} SELECT' for i in range(len(settings_db_columu_list)): sql = f'{sql} t.{settings_db_columu_list[i]},' sql = f'{sql} t.file_name,' # システム項目:取込ファイル名 sql = f'{sql} t.file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} t.delete_flg,' # システム項目:論理削除フラグ sql = f'{sql} t.ins_user,' # システム項目:登録者 sql = f'{sql} t.ins_date,' # システム項目:登録日時 sql = f'{sql} t.upd_user,' # システム項目:更新者 sql = f'{sql} t.upd_date' # システム項目:更新日時 sql = f'{sql} FROM {settings_list[SETTINGS_ITEM["loadSchemaName"]]} as t' sql = f'{sql} ON DUPLICATE KEY UPDATE' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]}=t.{settings_db_columu_list[i]},' sql = f'{sql} file_name=t.file_name,' # システム項目:取込ファイル名 sql = f'{sql} file_row_cnt=t.file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} delete_flg={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.delete_flg,' # システム項目:論理削除フラグ sql = f'{sql} ins_user=t.ins_user,' # システム項目:登録者 sql = f'{sql} ins_date=t.ins_date,' # システム項目:登録日時 sql = f'{sql} upd_user={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.upd_user,' # システム項目:更新者 sql = f'{sql} upd_date={settings_list[SETTINGS_ITEM["storageSchemaName"]]}.upd_date' # システム項目:更新日時 debug_log(sql, log_info, mode) # トランザクション開始 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-09 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(sql) conn.commit() print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') if settings_list[SETTINGS_ITEM["exSqlFileName"]]: try: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) ex_sql_file_exists = True print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLファイルが存在しません\n' ex_sql_file_exists = False print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') try: if ex_sql_file_exists: # 拡張SQLファイルからSQL文生成 ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key) ex_sql_response = ex_sqls_obj.get() ex_sql = '' for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'): ex_sql = f'{ex_sql} {line.rstrip()}' # トランザクション開始 print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') with conn.cursor() as cur: cur.execute(ex_sql) conn.commit() print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: warning_info = f'{warning_info}- 拡張SQLにエラーが発生しました:{e}\n' print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') else: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') # ⑨ DB接続を終了する connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) try: # ⑩ メイン処理終了ログを出力する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') return warning_info except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): """DB接続切断処理 Args: conn : DBコネクション bucket_name : バケット名 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 log_info : ログに記載するデータソース名とファイル名 """ try: conn.close() print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info)