diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 730a7be8..5540f886 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,4 +1,5 @@ from datetime import datetime +from pickle import FALSE import boto3 import pymysql from pymysql.constants import CLIENT @@ -23,6 +24,7 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, + 'importManner': 12, } LINE_FEED_CODE = { 'CR': '\r', @@ -30,6 +32,10 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } DIRECTORY_SETTINGS = '/settings/' +TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' +TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' +TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' +INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' # クラス変数 s3_client = boto3.client('s3') @@ -161,6 +167,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + + # インポート方法判断 + try: + if truncate_judge(settings_list): + with conn.cursor() as cur: + sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + cur.execute(sql_truncate) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} E-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + + except InvalidConfigException as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' @@ -272,3 +290,34 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + + +def truncate_judge(settings_list): + """TRUNCATE処理対応判定 + Args: + settings_list (list): 個別設定ファイル + Raises: + InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス + Returns: + Bool: Truncate対象の場合True。Truncate対象でない場合False + """ + + # upsert判定 + if len(settings_list) < 13: + return False + if not settings_list[SETTINGS_ITEM["importManner"]]: + return False + + # インポート方法設定チェック + if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + if len(import_manner_splitted_list) != 2: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + if not import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + return True + + +class InvalidConfigException(Exception): + pass