diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index de3feca6..207e898b 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -5,14 +5,13 @@ import sys from datetime import datetime import boto3 -from common import (ERROR, INFO, LINE_FEED_CODE, SETTINGS_ITEM, - convert_quotechar, debug_log, uncompress_gzip, - uncompress_zip) +import pymysql +from common import (DIRECTORY_SETTINGS, DIRECTORY_WORK, ERROR, INFO, + LINE_FEED_CODE, SETTINGS_ITEM, WARNING, convert_quotechar, + debug_log, uncompress_gzip, uncompress_zip) from end import end from error import error - -# 定数 -DIRECTORY_WORK = '/work/' +from pymysql.constants import CLIENT # クラス変数 s3_client = boto3.client('s3') @@ -23,13 +22,14 @@ class CheckError(Exception): pass -def check(bucket_name, target_data_source, target_file_name, settings_key, log_info, mode): +def check(bucket_name, target_data_source, target_file_name, settings_key, db_info, log_info, mode): """チェック処理 Args: bucket_name : バケット名 target_data_source : 投入データのディレクトリ名よりデータソースに該当する部分 target_file_name : 投入データのファイル名 settings_key : 投入データに該当する個別設定ファイルのフルパス + db_info : データベース情報 log_info : ログに記載するデータソース名とファイル名 mode : 処理モード Raises: @@ -91,6 +91,12 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-02 - C-0のチェックを開始します') if is_empty_file(work_csv_row, settings_list): + # 拡張SQL実行フラグがONになっている場合は拡張SQLを実行して処理終了する。 + if settings_list[SETTINGS_ITEM["executeExSqlIfFileEmptyFlag"]] == '1': + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-15 - 投入ファイルが0バイトでしたが、拡張SQLを実行します。') + execute_ex_sql(bucket_name, target_data_source, + settings_list, db_info, log_info) print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') end(bucket_name, target_data_source, @@ -260,6 +266,57 @@ def reverse_readline_stream(f: io.BytesIO, line_feed: str, chunk_size=4096): yield buffer +def execute_ex_sql(bucket_name, target_data_source, settings_list, db_info, log_info): + # 個別設定ファイルに拡張SQLファイル名が設定されているかチェック + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-16 - 拡張SQL設定が存在するかチェックします') + ex_sql_file_name = settings_list[SETTINGS_ITEM["exSqlFileName"]] + + if ex_sql_file_name: + try: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-17 - 拡張SQL設定の存在を確認しました') + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-18 - 拡張SQLファイル名:{ex_sql_file_name} の存在チェック') + ex_sql_key = target_data_source + DIRECTORY_SETTINGS + ex_sql_file_name + s3_client.head_object(Bucket=bucket_name, Key=ex_sql_key) + ex_sql_file_exists = True + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-19 - 拡張SQLファイル名の存在を確認しました') + except Exception: + ex_sql_file_exists = False + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-02 - 拡張SQLファイルが存在しません') + + try: + if ex_sql_file_exists: + # 拡張SQLファイルからSQL文生成 + ex_sql_obj_response = s3_client.get_object( + Bucket=bucket_name, Key=ex_sql_key) + ex_sql = '' + for line in io.TextIOWrapper(io.BytesIO(ex_sql_obj_response["Body"].read()), encoding='utf-8'): + ex_sql = f'{ex_sql} {line.rstrip()}' + + # DB接続を開始する + conn = pymysql.connect(host=db_info["host"], port=db_info["port"], user=db_info["user"], passwd=db_info["pass"], + db=db_info["name"], connect_timeout=5, client_flag=CLIENT.MULTI_STATEMENTS, local_infile=True) + # トランザクション開始 + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-20 - 拡張SQL:{ex_sql_file_name} のトランザクションを開始します') + with conn.cursor() as cur: + cur.execute(ex_sql) + conn.commit() + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-21 - 拡張SQL:{ex_sql_file_name} のCOMMIT処理が正常終了しました') + conn.close() + except Exception as e: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {WARNING} W-CHK-03 - 拡張SQLにエラーが発生しました:{e}') + else: + print( + f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {INFO} I-CHK-22 - 拡張SQL設定の存在はありませんでした') + + # ローカル実行用コード # 値はよしなに変えてください if __name__ == '__main__': diff --git a/ecs/dataimport/dataimport/common.py b/ecs/dataimport/dataimport/common.py index 0afea617..0f5eb967 100644 --- a/ecs/dataimport/dataimport/common.py +++ b/ecs/dataimport/dataimport/common.py @@ -56,16 +56,18 @@ SETTINGS_ITEM = { 'bulkImportFlag': 14, 'compressedFlag': 15, 'compression': 16, - 'reserved1': 17, - 'reserved2': 18, - 'reserved3': 19, - 'reserved4': 20, - 'reserved5': 21, - 'reserved6': 22, - 'reserved7': 23, - 'reserved8': 24 + 'executeExSqlIfFileEmptyFlag': 17, + 'reserved1': 18, + 'reserved2': 19, + 'reserved3': 20, + 'reserved4': 21, + 'reserved5': 22, + 'reserved6': 23, + 'reserved7': 24 } +DIRECTORY_WORK = '/work/' +DIRECTORY_SETTINGS = '/settings/' LOCAL_DIRECTORY_TMP = '/tmp' # チェック処理で解凍した圧縮ファイルの中身を格納するフォルダ LOCAL_TEMPORARY_FILE_PATH = f'{LOCAL_DIRECTORY_TMP}/temporary_file.dat' diff --git a/ecs/dataimport/dataimport/controller.py b/ecs/dataimport/dataimport/controller.py index d9a639f3..450e328b 100644 --- a/ecs/dataimport/dataimport/controller.py +++ b/ecs/dataimport/dataimport/controller.py @@ -49,7 +49,7 @@ try: print( f'{datetime.now():%Y-%m-%d %H:%M:%S} {LOG_INFO} {LOG_LEVEL["i"]} I-CTRL-03 - チェック処理の呼び出し') check(BUCKET_NAME, DATA_SOURCE_NAME, - FILE_NAME, settings_key, LOG_INFO, MODE) + FILE_NAME, settings_key, DB_INFO, LOG_INFO, MODE) # ④ メイン処理を呼び出す print( diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index d51a6759..a1197a89 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -4,16 +4,15 @@ from datetime import datetime import boto3 import pymysql -from common import (ERROR, INFO, LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH, +from common import (DIRECTORY_SETTINGS, DIRECTORY_WORK, ERROR, INFO, + LINE_FEED_CODE, LOCAL_TEMPORARY_FILE_PATH, MYSQL_CHARSET_CODE, SETTINGS_ITEM, WARNING, convert_quotechar, debug_log) from error import error from pymysql.constants import CLIENT # 定数 -DIRECTORY_WORK = '/work/' -DIRECTORY_SETTINGS = '/settings/' TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'