feat:ワーニングログの出方を変更、エラー時DB切断処理追加、個別設定ファイル読込時に改行コード指定追加、TryExcept不足分修正、デバッグモード追加

This commit is contained in:
*lcOeIaePm0 2021-10-28 11:18:19 +09:00
parent 37e5ca1c3e
commit f1ab105cf5
7 changed files with 221 additions and 135 deletions

View File

@ -3,6 +3,7 @@ import boto3
import io
import csv
from error import error
from common import debug_log
# 定数
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
@ -35,7 +36,7 @@ class CheckError(Exception):
pass
def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info):
def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info, mode):
"""チェック処理
Args:
bucket_name : バケット名
@ -44,19 +45,29 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
target_file_name : 投入データのファイル名
settings_key : 投入データに該当する個別設定ファイルのフルパス
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Raises:
CheckError : チェックでエラーがあった場合に発生する例外
"""
# ① チェック処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① チェック処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します')
# データ読込
settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get()
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip())
settings_list.append(line.rstrip('\n'))
target_obj = s3_resource.Object(bucket_name, target_key)
target_response = target_obj.get()
@ -82,6 +93,8 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2正常終了')
# ④ チェック処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します')
except CheckError as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}')
error(bucket_name, target_data_source, target_file_name, log_info)
@ -89,6 +102,3 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# ④ チェック処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します')

View File

@ -4,12 +4,14 @@ from ini import init
from chk import check
from main import main
from end import end
from error import error
# 引数
BUCKET_NAME = os.environ["BUCKET_NAME"]
TARGET_KEY = os.environ["TARGET_KEY"]
DATA_SOURCE_NAME = os.environ["DATA_SOURCE_NAME"]
FILE_NAME = os.environ["FILE_NAME"]
MODE = os.environ["MODE"]
# 環境変数
DB_HOST = os.environ["DB_HOST"]
@ -29,25 +31,28 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} '
"""
try:
# ① データ取込処理開始ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
# ① データ取込処理開始ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します')
# ② 初期処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE)
# ② 初期処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し')
settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO)
# ③ チェック処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE)
# ③ チェック処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し')
check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO)
# ④ メイン処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE)
# ④ メイン処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し')
warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO)
# ⑤ 終了処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE)
# ⑤ 終了処理を呼び出す
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し')
end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO)
# ⑥ データ取込処理終了ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
# ⑥ データ取込処理終了ログを出力する
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}')
error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO)

View File

@ -1,6 +1,7 @@
from datetime import datetime
import boto3
from error import error
from common import debug_log
# 定数
LOG_LEVEL = {'i': 'Info', 'e': 'Error'}
@ -14,7 +15,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
def end(bucket_name, target_data_source, target_file_name, warning_info, log_info):
def end(bucket_name, target_data_source, target_file_name, warning_info, log_info, mode):
"""終了処理
Args:
bucket_name : バケット名
@ -22,11 +23,20 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf
target_file_name : 投入データのファイル名
warning_info : Warning情報
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
"""
# ① 終了処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します')
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 warning_info : {warning_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① 終了処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します')
# ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key)
@ -71,9 +81,9 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf
result_done_obj = s3_resource.Object(bucket_name, result_done_key)
result_done_obj.put(Body='')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました')
# ⑤ 終了処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑤ 終了処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します')

View File

@ -22,10 +22,10 @@ def error(bucket_name, target_data_source, target_file_name, log_info):
log_info : ログに記載するデータソース名とファイル名
"""
# ① エラー処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します')
try:
# ① エラー処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します')
# ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する
work_key = target_data_source + DIRECTORY_WORK + target_file_name
work_obj = s3_resource.Object(bucket_name, work_key)

View File

@ -5,6 +5,7 @@ import csv
import re
import sys
from error import error
from common import debug_log
# 定数
LOG_LEVEL = {"i": 'Info', "e": 'Error'}
@ -18,7 +19,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
def init(bucket_name, target_key, target_data_source, target_file_name, log_info):
def init(bucket_name, target_key, target_data_source, target_file_name, log_info, mode):
"""初期処理
Args:
bucket_name : バケット名
@ -26,16 +27,27 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns:
settings_key : 投入データに該当する個別設定ファイルのフルパス
"""
# ① 初期処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します')
# ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① 初期処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする
doing_file_name = target_file_name + '.doing'
doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル{doing_file_name} の存在チェック')
@ -64,8 +76,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する
try:
# ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する
# doneファイルの存在確認
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル{target_file_name}.done の存在チェック')
result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done'
@ -95,8 +107,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした')
# ⑥ 個別設定マッピングリストが存在するかチェックする
try:
# ⑥ 個別設定マッピングリストが存在するかチェックする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック')
mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME
s3_client.head_object(Bucket=bucket_name, Key=mapping_key)
@ -105,8 +117,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑦ 個別設定ファイルを特定する
try:
# ⑦ 個別設定ファイルを特定する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します')
mapping_obj = s3_resource.Object(bucket_name, mapping_key)
mapping_response = mapping_obj.get()
@ -126,8 +138,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑧ ⑦の個別設定ファイルが存在するかチェックする
try:
# ⑧ ⑦の個別設定ファイルが存在するかチェックする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック')
settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name
s3_client.head_object(Bucket=bucket_name, Key=settings_key)
@ -136,6 +148,11 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑨ 初期処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します')
try:
# ⑨ 初期処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
return settings_key

View File

@ -5,6 +5,7 @@ from pymysql.constants import CLIENT
import io
import csv
from error import error
from common import debug_log
# 定数
LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'}
@ -34,7 +35,7 @@ s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info):
def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info, mode):
"""メイン処理
Args:
bucket_name : バケット名
@ -44,30 +45,44 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
settings_key : 投入データに該当する個別設定ファイルのフルパス
db_info : データベース情報
log_info : ログに記載するデータソース名とファイル名
mode : 処理モード
Returns:
warning_info : Warning情報
"""
# ① メイン処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
try:
debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode)
debug_log(f'引数 target_key : {target_key}', log_info, mode)
debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode)
debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode)
debug_log(f'引数 settings_key : {settings_key}', log_info, mode)
debug_log(f'引数 db_info : {db_info}', log_info, mode)
debug_log(f'引数 log_info : {log_info}', log_info, mode)
debug_log(f'引数 mode : {mode}', log_info, mode)
# ① メイン処理開始ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します')
# ② DB接続を開始する
conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS)
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ③ タイムゾーンを変更する
with conn.cursor() as cur:
cur.execute(f'SET time_zone = "+9:00"')
result = cur.fetchall()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}')
# ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
# 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
settings_obj = s3_resource.Object(bucket_name, settings_key)
settings_response = settings_obj.get()
settings_list = []
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
settings_list.append(line.rstrip())
settings_list.append(line.rstrip('\n'))
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
@ -75,9 +90,10 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
# 投入データファイルを1行ごとにループする
# 投入データファイルを1行ごとにループする
try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します')
target_obj = s3_resource.Object(bucket_name, target_key)
@ -85,53 +101,61 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0:
index += 1
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
for i in range(len(line)):
sql = f'{sql} "{line[i]}",'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
index += 1
except Exception as e:
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
# ⑤ ④の処理結果件数をログ出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数{warning_count}')
# ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする
try:
process_count = 0 # 処理件数カウンタ
normal_count = 0 # 正常終了件数カウンタ
warning_count = 0 # ワーニング終了件数カウンター
warning_info = '' # ワーニング情報
index = 0 # ループインデックス
for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
try:
if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0:
index += 1
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします')
continue
# 処理件数カウント
process_count += 1
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES ('
for i in range(len(line)):
sql = f'{sql} "{line[i]}",'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
sql = f'{sql} CURRENT_USER(),' # システム項目:登録者
sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時
sql = f'{sql} NULL,' # システム項目:更新者
sql = f'{sql} NULL)' # システム項目:更新日時
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
normal_count += 1
index += 1
except Exception as e:
warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
try:
# ⑥ ⑤の処理結果件数をログ出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}')
if warning_info:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数{warning_count}')
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
# SQL文生成
@ -155,53 +179,71 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - 拡張SQLファイルが存在しません')
try:
# ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします')
if settings_list[SETTINGS_ITEM["exSqlFileName"]]:
try:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました')
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック')
ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]]
s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key)
ex_sql_file_exists = True
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n'
ex_sql_file_exists = False
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません')
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
ex_sql_response = ex_sqls_obj.get()
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
try:
if ex_sql_file_exists:
# 拡張SQLファイルからSQL文生成
ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key)
ex_sql_response = ex_sqls_obj.get()
ex_sql = ''
for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'):
ex_sql = f'{ex_sql} {line.rstrip()}'
# トランザクション開始
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました{e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました{e}')
else:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# トランザクション開始
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します')
with conn.cursor() as cur:
cur.execute(ex_sql)
conn.commit()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました')
except Exception as e:
warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました{e}\n'
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました{e}')
else:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした')
# ⑧ DB接続を終了する
# ⑨ DB接続を終了する
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
# ⑩ メイン処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
connection_close(conn, bucket_name, target_data_source, target_file_name, log_info)
error(bucket_name, target_data_source, target_file_name, log_info)
return warning_info
def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info):
"""DB接続切断処理
Args:
conn : DBコネクション
bucket_name : バケット名
target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分
target_file_name : 投入データのファイル名
log_info : ログに記載するデータソース名とファイル名
"""
try:
conn.close()
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました')
except Exception as e:
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# ⑨ メイン処理終了ログを出力する
print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します')
return warning_info

View File

@ -12,6 +12,7 @@ SECURITY_GROUP_ID_ECRAPI = os.environ["SECURITY_GROUP_ID_ECRAPI"]
SECURITY_GROUP_ID_ECRDKR = os.environ["SECURITY_GROUP_ID_ECRDKR"]
SECURITY_GROUP_ID_LOGS = os.environ["SECURITY_GROUP_ID_LOGS"]
SECURITY_GROUP_ID_RDS = os.environ["SECURITY_GROUP_ID_RDS"]
MODE = os.environ["MODE"]
# クラス変数
ecs_client = boto3.client('ecs')
@ -59,6 +60,7 @@ def lambda_handler(event, context):
{"name": 'TARGET_KEY', "value": event_object_key},
{"name": 'DATA_SOURCE_NAME', "value": event_data_source_name},
{"name": 'FILE_NAME', "value": event_file_name},
{"name": 'MODE', "value": MODE},
],
},
],