diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 281c1728..816b601a 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,11 +1,12 @@ from datetime import datetime +import re import boto3 import pymysql from pymysql.constants import CLIENT import io import csv from error import error -from common import debug_log +from common import debug_log, convert_quotechar # 定数 DIRECTORY_WORK = '/work/' @@ -23,7 +24,7 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, - 'reserved0': 12, + 'commaReplaceColumns': 12, 'importManner': 13, 'reserved1': 14, 'reserved2': 15, @@ -94,7 +95,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) - # 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える + # 設定ファイルに記載のない行を空文字として扱い、予約行とする for _ in range(len(SETTINGS_ITEM) - len(settings_list)): settings_list.append('') @@ -117,8 +118,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf warning_info = '' # ワーニング情報 index = 0 # ループインデックス settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',') - for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: index += 1 @@ -131,8 +133,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # SQL文生成 query_parameter_list = [] sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' - for i in range(len(settings_db_columu_list)): - sql = f'{sql} {settings_db_columu_list[i]},' + for db_column in settings_db_columu_list: + sql = f'{sql} {db_column},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ @@ -143,14 +145,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} VALUES (' for i in range(len(line)): # データ項目値が0桁より大きいかチェックする - if len(line[i]) > 0: - # 0桁より大きい場合 - # INSERT文のパラメータとそれに対応するプレースホルダーを設定する - query_parameter_list.append(line[i]) - sql = f'{sql} %s,' - else: - # 上記以外の場合 + if len(line[i]) == 0: + # 0桁の場合 sql = f'{sql} NULL,' + continue + + # データ項目値の変換処理 + org_column_value = line[i] + current_settings_db_column_name = settings_db_columu_list[i] + column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(column_value) + sql = f'{sql} %s,' sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 sql = f'{sql} "0",' # システム項目:論理削除フラグ @@ -304,7 +310,24 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) +def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): + """データ項目値変換処理 + Args: + org_column_value : 投入データの値 + current_settings_db_column_name : 投入データのDBカラム物理名 + settings_replace_comma_list : 投入データの数値型のDBカラム物理名のリスト + + Returns: + converted_column_value:変換処理を行った投入データの値 + """ + # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + if current_settings_db_column_name in settings_replace_comma_list: + converted_column_value = converted_column_value.replace(',', '') + + return converted_column_value + + def truncate_judge(settings_list): """TRUNCATE処理対応判定 Args: