From f1ab105cf562b336c641304e8499ec916180481e Mon Sep 17 00:00:00 2001 From: *lcOeIaePm0 Date: Thu, 28 Oct 2021 11:18:19 +0900 Subject: [PATCH] =?UTF-8?q?feat:=E3=83=AF=E3=83=BC=E3=83=8B=E3=83=B3?= =?UTF-8?q?=E3=82=B0=E3=83=AD=E3=82=B0=E3=81=AE=E5=87=BA=E6=96=B9=E3=82=92?= =?UTF-8?q?=E5=A4=89=E6=9B=B4=E3=80=81=E3=82=A8=E3=83=A9=E3=83=BC=E6=99=82?= =?UTF-8?q?DB=E5=88=87=E6=96=AD=E5=87=A6=E7=90=86=E8=BF=BD=E5=8A=A0?= =?UTF-8?q?=E3=80=81=E5=80=8B=E5=88=A5=E8=A8=AD=E5=AE=9A=E3=83=95=E3=82=A1?= =?UTF-8?q?=E3=82=A4=E3=83=AB=E8=AA=AD=E8=BE=BC=E6=99=82=E3=81=AB=E6=94=B9?= =?UTF-8?q?=E8=A1=8C=E3=82=B3=E3=83=BC=E3=83=89=E6=8C=87=E5=AE=9A=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0=E3=80=81TryExcept=E4=B8=8D=E8=B6=B3=E5=88=86=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E3=80=81=E3=83=87=E3=83=90=E3=83=83=E3=82=B0=E3=83=A2?= =?UTF-8?q?=E3=83=BC=E3=83=89=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/Dockerfile/dataimport/chk.py | 24 +- ecs/Dockerfile/dataimport/controller.py | 39 +-- ecs/Dockerfile/dataimport/end.py | 22 +- ecs/Dockerfile/dataimport/error.py | 6 +- ecs/Dockerfile/dataimport/ini.py | 41 +++- ecs/Dockerfile/dataimport/main.py | 222 +++++++++++------- ...bj-newdwh2021-staging-lambda-dataimport.py | 2 + 7 files changed, 221 insertions(+), 135 deletions(-) diff --git a/ecs/Dockerfile/dataimport/chk.py b/ecs/Dockerfile/dataimport/chk.py index dd70d836..40bdc101 100644 --- a/ecs/Dockerfile/dataimport/chk.py +++ b/ecs/Dockerfile/dataimport/chk.py @@ -3,6 +3,7 @@ import boto3 import io import csv from error import error +from common import debug_log # 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} @@ -35,7 +36,7 @@ class CheckError(Exception): pass -def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info): +def check(bucket_name, target_key, target_data_source, target_file_name, settings_key, log_info, mode): """チェック処理 Args: bucket_name : バケット名 @@ -44,19 +45,29 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス log_info : ログに記載するデータソース名とファイル名 + mode : 処理モード Raises: CheckError : チェックでエラーがあった場合に発生する例外 """ - # ① チェック処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') try: + debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) + debug_log(f'引数 target_key : {target_key}', log_info, mode) + debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) + debug_log(f'引数 settings_key : {settings_key}', log_info, mode) + debug_log(f'引数 log_info : {log_info}', log_info, mode) + debug_log(f'引数 mode : {mode}', log_info, mode) + + # ① チェック処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-01 - チェック処理を開始します') + # データ読込 settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): - settings_list.append(line.rstrip()) + settings_list.append(line.rstrip('\n')) target_obj = s3_resource.Object(bucket_name, target_key) target_response = target_obj.get() @@ -82,6 +93,8 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{target_header_list[i]}') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-05 - C-2:正常終了') + # ④ チェック処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') except CheckError as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} {e}') error(bucket_name, target_data_source, target_file_name, log_info) @@ -89,6 +102,3 @@ def check(bucket_name, target_key, target_data_source, target_file_name, setting except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - - # ④ チェック処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - チェック処理を終了します') diff --git a/ecs/Dockerfile/dataimport/controller.py b/ecs/Dockerfile/dataimport/controller.py index d04edf53..f16281f8 100644 --- a/ecs/Dockerfile/dataimport/controller.py +++ b/ecs/Dockerfile/dataimport/controller.py @@ -4,12 +4,14 @@ from ini import init from chk import check from main import main from end import end +from error import error # 引数 BUCKET_NAME = os.environ["BUCKET_NAME"] TARGET_KEY = os.environ["TARGET_KEY"] DATA_SOURCE_NAME = os.environ["DATA_SOURCE_NAME"] FILE_NAME = os.environ["FILE_NAME"] +MODE = os.environ["MODE"] # 環境変数 DB_HOST = os.environ["DB_HOST"] @@ -29,25 +31,28 @@ LOG_INFO = f' {DATA_SOURCE_NAME} {FILE_NAME} ' """ +try: + # ① データ取込処理開始ログを出力する + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') -# ① データ取込処理開始ログを出力する -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-01 - データ取込処理を開始します') + # ② 初期処理を呼び出す + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') + settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO, MODE) -# ② 初期処理を呼び出す -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-02 - 初期処理の呼び出し') -settings_key = init(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) + # ③ チェック処理を呼び出す + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') + check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO, MODE) -# ③ チェック処理を呼び出す -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') -check(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, LOG_INFO) + # ④ メイン処理を呼び出す + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') + warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) -# ④ メイン処理を呼び出す -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-04 - メイン処理の呼び出し') -warning_info = main(BUCKET_NAME, TARGET_KEY, DATA_SOURCE_NAME, FILE_NAME, settings_key, DB_INFO, LOG_INFO) + # ⑤ 終了処理を呼び出す + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') + end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO, MODE) -# ⑤ 終了処理を呼び出す -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-05 - 終了処理の呼び出し') -end(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, warning_info, LOG_INFO) - -# ⑥ データ取込処理終了ログを出力する -print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') + # ⑥ データ取込処理終了ログを出力する + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-06 - データ取込処理を終了します') +except Exception as e: + print(f'{str(datetime.now())} {LOG_INFO} {LOG_LEVEL["e"]} E-CTRL-99 - エラー内容:{e}') + error(BUCKET_NAME, DATA_SOURCE_NAME, FILE_NAME, LOG_INFO) diff --git a/ecs/Dockerfile/dataimport/end.py b/ecs/Dockerfile/dataimport/end.py index 3e7025ad..84c34db7 100644 --- a/ecs/Dockerfile/dataimport/end.py +++ b/ecs/Dockerfile/dataimport/end.py @@ -1,6 +1,7 @@ from datetime import datetime import boto3 from error import error +from common import debug_log # 定数 LOG_LEVEL = {'i': 'Info', 'e': 'Error'} @@ -14,7 +15,7 @@ s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') -def end(bucket_name, target_data_source, target_file_name, warning_info, log_info): +def end(bucket_name, target_data_source, target_file_name, warning_info, log_info, mode): """終了処理 Args: bucket_name : バケット名 @@ -22,11 +23,20 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf target_file_name : 投入データのファイル名 warning_info : Warning情報 log_info : ログに記載するデータソース名とファイル名 + mode : 処理モード """ - # ① 終了処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') try: + debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) + debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) + debug_log(f'引数 warning_info : {warning_info}', log_info, mode) + debug_log(f'引数 log_info : {log_info}', log_info, mode) + debug_log(f'引数 mode : {mode}', log_info, mode) + + # ① 終了処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-01 - 終了処理を開始します') + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でdoneディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) @@ -71,9 +81,9 @@ def end(bucket_name, target_data_source, target_file_name, warning_info, log_inf result_done_obj = s3_resource.Object(bucket_name, result_done_key) result_done_obj.put(Body='') print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-09 - targetディレクトリに {result_done_file_name} を作成しました') + + # ⑤ 終了処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-END-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - - # ⑤ 終了処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-END-10 - 終了処理を終了します') diff --git a/ecs/Dockerfile/dataimport/error.py b/ecs/Dockerfile/dataimport/error.py index 08a61238..bba5aa9b 100644 --- a/ecs/Dockerfile/dataimport/error.py +++ b/ecs/Dockerfile/dataimport/error.py @@ -22,10 +22,10 @@ def error(bucket_name, target_data_source, target_file_name, log_info): log_info : ログに記載するデータソース名とファイル名 """ - # ① エラー処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') - try: + # ① エラー処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-ERR-01 - エラー処理を開始します') + # ② 投入データファイルをS3バケット内のworkディレクトリから、以下ファイル名でerrorディレクトリに移動(コピー削除)する work_key = target_data_source + DIRECTORY_WORK + target_file_name work_obj = s3_resource.Object(bucket_name, work_key) diff --git a/ecs/Dockerfile/dataimport/ini.py b/ecs/Dockerfile/dataimport/ini.py index b80af238..53d8c7eb 100644 --- a/ecs/Dockerfile/dataimport/ini.py +++ b/ecs/Dockerfile/dataimport/ini.py @@ -5,6 +5,7 @@ import csv import re import sys from error import error +from common import debug_log # 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error'} @@ -18,7 +19,7 @@ s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') -def init(bucket_name, target_key, target_data_source, target_file_name, log_info): +def init(bucket_name, target_key, target_data_source, target_file_name, log_info, mode): """初期処理 Args: bucket_name : バケット名 @@ -26,16 +27,27 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 log_info : ログに記載するデータソース名とファイル名 + mode : 処理モード Returns: settings_key : 投入データに該当する個別設定ファイルのフルパス """ - # ① 初期処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') - - - # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする try: + debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) + debug_log(f'引数 target_key : {target_key}', log_info, mode) + debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) + debug_log(f'引数 log_info : {log_info}', log_info, mode) + debug_log(f'引数 mode : {mode}', log_info, mode) + + # ① 初期処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-01 - 初期処理を開始します') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) + + try: + # ② S3バケット内のtargetディレクトリに「投入データファイル名.doing」ファイルが存在するかチェックする doing_file_name = target_file_name + '.doing' doing_key = target_data_source + DIRECTORY_TARGET + doing_file_name print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-02 - doingファイル:{doing_file_name} の存在チェック') @@ -64,8 +76,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する try: + # ⑤ S3バケット内のtargetディレクトリの以下ファイル群を削除(前回分の削除)する # doneファイルの存在確認 print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-06 - doneファイル:{target_file_name}.done の存在チェック') result_done_key = target_data_source + DIRECTORY_TARGET + target_file_name + '.done' @@ -95,8 +107,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-14 - errorファイルは存在しませんでした') - # ⑥ 個別設定マッピングリストが存在するかチェックする try: + # ⑥ 個別設定マッピングリストが存在するかチェックする print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-15 - 個別設定マッピングリスト:{MAPPING_FILE_NAME} の存在チェック') mapping_key = target_data_source + DIRECTORY_SETTINGS + MAPPING_FILE_NAME s3_client.head_object(Bucket=bucket_name, Key=mapping_key) @@ -105,8 +117,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-02 - 個別設定マッピングリストが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑦ 個別設定ファイルを特定する try: + # ⑦ 個別設定ファイルを特定する print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-17 - 個別設定ファイルを検索します') mapping_obj = s3_resource.Object(bucket_name, mapping_key) mapping_response = mapping_obj.get() @@ -126,8 +138,8 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑧ ⑦の個別設定ファイルが存在するかチェックする try: + # ⑧ ⑦の個別設定ファイルが存在するかチェックする print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-19 - 個別設定ファイル:{settings_file_name} の存在チェック') settings_key = target_data_source + DIRECTORY_SETTINGS + settings_file_name s3_client.head_object(Bucket=bucket_name, Key=settings_key) @@ -136,6 +148,11 @@ def init(bucket_name, target_key, target_data_source, target_file_name, log_info print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-04 - 個別設定ファイルが存在しません') error(bucket_name, target_data_source, target_file_name, log_info) - # ⑨ 初期処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') + try: + # ⑨ 初期処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-INI-21 - 初期処理を終了します') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-INI-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) + return settings_key diff --git a/ecs/Dockerfile/dataimport/main.py b/ecs/Dockerfile/dataimport/main.py index 92eac609..9f35e8cd 100644 --- a/ecs/Dockerfile/dataimport/main.py +++ b/ecs/Dockerfile/dataimport/main.py @@ -5,6 +5,7 @@ from pymysql.constants import CLIENT import io import csv from error import error +from common import debug_log # 定数 LOG_LEVEL = {"i": 'Info', "e": 'Error', "w": 'Warning'} @@ -34,7 +35,7 @@ s3_client = boto3.client('s3') s3_resource = boto3.resource('s3') -def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info): +def main(bucket_name, target_key, target_data_source, target_file_name, settings_key, db_info, log_info, mode): """メイン処理 Args: bucket_name : バケット名 @@ -44,30 +45,44 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings settings_key : 投入データに該当する個別設定ファイルのフルパス db_info : データベース情報 log_info : ログに記載するデータソース名とファイル名 + mode : 処理モード Returns: warning_info : Warning情報 """ - # ① メイン処理開始ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') - try: + debug_log(f'引数 bucket_name : {bucket_name}', log_info, mode) + debug_log(f'引数 target_key : {target_key}', log_info, mode) + debug_log(f'引数 target_data_source : {target_data_source}', log_info, mode) + debug_log(f'引数 target_file_name : {target_file_name}', log_info, mode) + debug_log(f'引数 settings_key : {settings_key}', log_info, mode) + debug_log(f'引数 db_info : {db_info}', log_info, mode) + debug_log(f'引数 log_info : {log_info}', log_info, mode) + debug_log(f'引数 mode : {mode}', log_info, mode) + + # ① メイン処理開始ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-01 - メイン処理を開始します') + # ② DB接続を開始する conn = pymysql.connect(host=db_info["host"], user=db_info["user"], passwd=db_info["pass"], db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS) print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-02 - DB接続を開始しました') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) + try: # ③ タイムゾーンを変更する with conn.cursor() as cur: cur.execute(f'SET time_zone = "+9:00"') result = cur.fetchall() print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました:{result}') - # ③ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする + # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): - settings_list.append(line.rstrip()) + settings_list.append(line.rstrip('\n')) with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' @@ -75,9 +90,10 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-04 - {settings_list[SETTINGS_ITEM["loadSchemaName"]]} をTRUNCATEしました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) - # ④ 投入データファイルを1行ごとにループする + # ⑤ 投入データファイルを1行ごとにループする try: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-05 - 投入データ {target_file_name} の読み込みを開始します') target_obj = s3_resource.Object(bucket_name, target_key) @@ -85,53 +101,61 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings target_data = io.TextIOWrapper(io.BytesIO(target_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) - process_count = 0 # 処理件数カウンタ - normal_count = 0 # 正常終了件数カウンタ - warning_count = 0 # ワーニング終了件数カウンター - warning_info = '' # ワーニング情報 - index = 0 # ループインデックス - - for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - try: - if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: - index += 1 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') - continue - - # 処理件数カウント - process_count += 1 - - # SQL文生成 - sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES (' - for i in range(len(line)): - sql = f'{sql} "{line[i]}",' - - sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 - sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号 - sql = f'{sql} "0",' # システム項目:論理削除フラグ - sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 - sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 - sql = f'{sql} NULL,' # システム項目:更新者 - sql = f'{sql} NULL)' # システム項目:更新日時 - - # ロードスキーマのトランザクション開始 - with conn.cursor() as cur: - cur.execute(sql) - conn.commit() - normal_count += 1 - - index += 1 - except Exception as e: - warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') - - # ⑤ ④の処理結果件数をログ出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count} Warning終了件数:{warning_count}') - - # ⑥ ロードスキーマのデータを蓄積スキーマにUPSERTする try: + process_count = 0 # 処理件数カウンタ + normal_count = 0 # 正常終了件数カウンタ + warning_count = 0 # ワーニング終了件数カウンター + warning_info = '' # ワーニング情報 + index = 0 # ループインデックス + + for line in csv.reader(target_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + try: + if settings_list[SETTINGS_ITEM["headerFlag"]] and index == 0: + index += 1 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-06 - ヘッダー行をスキップします') + continue + + # 処理件数カウント + process_count += 1 + + # SQL文生成 + sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} VALUES (' + for i in range(len(line)): + sql = f'{sql} "{line[i]}",' + + sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 + sql = f'{sql} "{index}",' # システム項目:取込ファイル行番号 + sql = f'{sql} "0",' # システム項目:論理削除フラグ + sql = f'{sql} CURRENT_USER(),' # システム項目:登録者 + sql = f'{sql} CURRENT_TIMESTAMP(),' # システム項目:登録日時 + sql = f'{sql} NULL,' # システム項目:更新者 + sql = f'{sql} NULL)' # システム項目:更新日時 + + # ロードスキーマのトランザクション開始 + with conn.cursor() as cur: + cur.execute(sql) + conn.commit() + normal_count += 1 + + index += 1 + except Exception as e: + warning_info = f'{warning_info} {index} ロードスキーマ登録時にエラーが発生しました {line} {e}\n' + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-01 {index} ロードスキーマ登録時にエラーが発生しました {line} {e}') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + error(bucket_name, target_data_source, target_file_name, log_info) + + try: + # ⑥ ⑤の処理結果件数をログ出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-07 - 投入データ件数:{process_count} 正常終了件数:{normal_count}') + if warning_info: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - Warning終了件数:{warning_count}') + + # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') # SQL文生成 @@ -155,53 +179,71 @@ def main(bucket_name, target_key, target_data_source, target_file_name, settings print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-10 - 標準SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) error(bucket_name, target_data_source, target_file_name, log_info) - # ⑦ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') - if settings_list[SETTINGS_ITEM["exSqlFileName"]]: - try: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') - ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] - s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) - ex_sql_file_exists = True - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') - except Exception as e: - warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' - ex_sql_file_exists = False - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-02 - 拡張SQLファイルが存在しません') + try: + # ⑧ 個別設定ファイルに拡張SQLファイル名が設定されているかチェック + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-11 - 拡張SQL設定が存在するかチェックします') + if settings_list[SETTINGS_ITEM["exSqlFileName"]]: + try: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-12 - 拡張SQL設定の存在を確認しました') + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-13 - 拡張SQLファイル名:{settings_list[SETTINGS_ITEM["exSqlFileName"]]} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + settings_list[SETTINGS_ITEM["exSqlFileName"]] + s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) + ex_sql_file_exists = True + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-14 - 拡張SQLファイル名の存在を確認しました') + except Exception as e: + warning_info = f'{warning_info} - 拡張SQLファイルが存在しません\n' + ex_sql_file_exists = False + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLファイルが存在しません') - try: - if ex_sql_file_exists: - # 拡張SQLファイルからSQL文生成 - ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key) - ex_sql_response = ex_sqls_obj.get() - ex_sql = '' - for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'): - ex_sql = f'{ex_sql} {line.rstrip()}' + try: + if ex_sql_file_exists: + # 拡張SQLファイルからSQL文生成 + ex_sqls_obj = s3_resource.Object(bucket_name, ex_sql_key) + ex_sql_response = ex_sqls_obj.get() + ex_sql = '' + for line in io.TextIOWrapper(io.BytesIO(ex_sql_response["Body"].read()), encoding='utf-8'): + ex_sql = f'{ex_sql} {line.rstrip()}' - # トランザクション開始 - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') - with conn.cursor() as cur: - cur.execute(ex_sql) - conn.commit() - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') - except Exception as e: - warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-03 - 拡張SQLにエラーが発生しました:{e}') - else: - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') + # トランザクション開始 + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-15 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のトランザクションを開始します') + with conn.cursor() as cur: + cur.execute(ex_sql) + conn.commit() + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-16 - 拡張SQL:{settings_list[SETTINGS_ITEM["storageSchemaName"]]} のCOMIIT処理が正常終了しました') + except Exception as e: + warning_info = f'{warning_info} - 拡張SQLにエラーが発生しました:{e}\n' + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["w"]} W-MAIN-04 - 拡張SQLにエラーが発生しました:{e}') + else: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-17 - 拡張SQL設定の存在はありませんでした') - # ⑧ DB接続を終了する + # ⑨ DB接続を終了する + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + + # ⑩ メイン処理終了ログを出力する + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') + except Exception as e: + print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') + connection_close(conn, bucket_name, target_data_source, target_file_name, log_info) + error(bucket_name, target_data_source, target_file_name, log_info) + + return warning_info + + +def connection_close(conn, bucket_name, target_data_source, target_file_name, log_info): + """DB接続切断処理 + Args: + conn : DBコネクション + bucket_name : バケット名 + target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 + target_file_name : 投入データのファイル名 + log_info : ログに記載するデータソース名とファイル名 + """ try: conn.close() print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-18 - DB接続を終了しました') except Exception as e: print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) - - # ⑨ メイン処理終了ログを出力する - print(f'{str(datetime.now())} {log_info} {LOG_LEVEL["i"]} I-MAIN-19 - メイン処理を終了します') - - return warning_info diff --git a/lambda/mbj-newdwh2021-staging-lambda-dataimport.py b/lambda/mbj-newdwh2021-staging-lambda-dataimport.py index 6b5b061d..c87a7b50 100644 --- a/lambda/mbj-newdwh2021-staging-lambda-dataimport.py +++ b/lambda/mbj-newdwh2021-staging-lambda-dataimport.py @@ -12,6 +12,7 @@ SECURITY_GROUP_ID_ECRAPI = os.environ["SECURITY_GROUP_ID_ECRAPI"] SECURITY_GROUP_ID_ECRDKR = os.environ["SECURITY_GROUP_ID_ECRDKR"] SECURITY_GROUP_ID_LOGS = os.environ["SECURITY_GROUP_ID_LOGS"] SECURITY_GROUP_ID_RDS = os.environ["SECURITY_GROUP_ID_RDS"] +MODE = os.environ["MODE"] # クラス変数 ecs_client = boto3.client('ecs') @@ -59,6 +60,7 @@ def lambda_handler(event, context): {"name": 'TARGET_KEY', "value": event_object_key}, {"name": 'DATA_SOURCE_NAME', "value": event_data_source_name}, {"name": 'FILE_NAME', "value": event_file_name}, + {"name": 'MODE', "value": MODE}, ], }, ],