From 45ba4eab7424f9a71b0f93273f99183f41ad63c9 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 12:14:52 +0900 Subject: [PATCH 1/6] =?UTF-8?q?feat:=20develop6-crm=E3=81=AE=E3=83=87?= =?UTF-8?q?=E3=83=BC=E3=82=BF=E7=99=BB=E9=8C=B2=E5=87=A6=E7=90=86=E9=96=A2?= =?UTF-8?q?=E9=80=A3=E3=81=AE=E5=A4=89=E6=9B=B4=E5=B7=AE=E5=88=86=E3=82=92?= =?UTF-8?q?=E3=83=9E=E3=83=BC=E3=82=B8=20=EF=BC=88=E8=A8=AD=E5=AE=9A?= =?UTF-8?q?=E3=83=95=E3=82=A1=E3=82=A4=E3=83=AB=E3=81=AA=E3=81=A9=E3=81=AF?= =?UTF-8?q?=E3=83=AA=E3=83=AA=E3=83=BC=E3=82=B9=E3=81=97=E3=81=9F=E3=81=8F?= =?UTF-8?q?=E3=81=AA=E3=81=84=E3=81=9F=E3=82=81=E3=80=81=E3=82=B3=E3=83=BC?= =?UTF-8?q?=E3=83=89=E3=81=A0=E3=81=91=E3=82=92=E3=83=9E=E3=83=BC=E3=82=B8?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/README.md | 48 ++++++++++++++++++++++ ecs/dataimport/dataimport/main.py | 66 +++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 ecs/dataimport/README.md diff --git a/ecs/dataimport/README.md b/ecs/dataimport/README.md new file mode 100644 index 00000000..8502b98e --- /dev/null +++ b/ecs/dataimport/README.md @@ -0,0 +1,48 @@ +# データ取り込み処理 + +## 概要 + +データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う +処理順序等の詳細は設計書を参照のこと + +## 環境構築 + +### 事前準備 + +- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること + +### Python仮想環境にパッケージをインストール + +- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する + + ```sh + pipenv install -r requirements.txt + ``` + +- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること + +## ローカルでの実行手順 + +- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける + - 既にある場合は作成不要 + + ```json + { + "version": "0.2.0", + "configurations": [ + { + "name": "Python: データ取り込み処理", + "type": "python", + "request": "launch", + "program": "ecs/dataimport/controller.py", + "console": "integratedTerminal", + "justMyCode": true, + "envFile": "${workspaceFolder}/.env", + } + ] + } + ``` + +- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する + - 設定する環境変数は設計書を参照のこと +- F5キーを押し、処理を実行する diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 730a7be8..281c1728 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -23,6 +23,14 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, + 'reserved0': 12, + 'importManner': 13, + 'reserved1': 14, + 'reserved2': 15, + 'reserved3': 16, + 'reserved4': 17, + 'reserved5': 18, + 'reserved6': 19 } LINE_FEED_CODE = { 'CR': '\r', @@ -30,6 +38,9 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } DIRECTORY_SETTINGS = '/settings/' +TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' +TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' +INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' # クラス変数 s3_client = boto3.client('s3') @@ -76,12 +87,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする + # 個別設定ファイルの読み込み settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) + # 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える + for _ in range(len(SETTINGS_ITEM) - len(settings_list)): + settings_list.append('') + + # ロードスキーマのTRUNCATE with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) @@ -112,6 +129,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf process_count += 1 # SQL文生成 + query_parameter_list = [] sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' @@ -127,8 +145,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # データ項目値が0桁より大きいかチェックする if len(line[i]) > 0: # 0桁より大きい場合 - replace_line = line[i].replace('\\', '\\\\') - sql = f'{sql} "{replace_line}",' + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(line[i]) + sql = f'{sql} %s,' else: # 上記以外の場合 sql = f'{sql} NULL,' @@ -146,7 +165,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ロードスキーマのトランザクション開始 with conn.cursor() as cur: - cur.execute(sql) + cur.execute(sql, query_parameter_list) conn.commit() normal_count += 1 except Exception as e: @@ -161,6 +180,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + + # インポート方法判断 + try: + if truncate_judge(settings_list): + with conn.cursor() as cur: + sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + cur.execute(sql_truncate) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + + except InvalidConfigException as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' @@ -272,3 +303,32 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + + +def truncate_judge(settings_list): + """TRUNCATE処理対応判定 + Args: + settings_list (list): 個別設定ファイル + Raises: + InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス + Returns: + Bool: Truncate対象の場合True。Truncate対象でない場合False + """ + + # upsert判定 + if not settings_list[SETTINGS_ITEM["importManner"]]: + return False + + # インポート方法設定チェック + if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + if len(import_manner_splitted_list) != 2: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + return True + + +class InvalidConfigException(Exception): + pass From f78fb03d98df598c68cbcdf29d8f7a6d693f86aa Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 12:54:32 +0900 Subject: [PATCH 2/6] =?UTF-8?q?fix:=20develop-6crm=E5=81=B4=E3=81=AB?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E3=81=95=E3=82=8C=E3=81=A6=E3=81=84=E3=81=AA?= =?UTF-8?q?=E3=81=8B=E3=81=A3=E3=81=9FSETTINGS=5FITEM=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index a0bf2ced..96178194 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -23,6 +23,14 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, + 'reserved0': 12, + 'importManner': 13, + 'reserved1': 14, + 'reserved2': 15, + 'reserved3': 16, + 'reserved4': 17, + 'reserved5': 18, + 'reserved6': 19 } LINE_FEED_CODE = { 'CR': '\r', From d11c5ccb496be3acc1743fccc0a64af154160ac1 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 13:08:17 +0900 Subject: [PATCH 3/6] =?UTF-8?q?feat:=20develop-6crm=E3=81=A7=E3=81=AE?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E5=88=86=E3=82=92=E3=83=9E=E3=83=BC=E3=82=B8?= =?UTF-8?q?=EF=BC=88main=E5=87=A6=E7=90=86=E4=BB=A5=E5=A4=96=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 35 +++++++++++++++++++++++------ ecs/dataimport/dataimport/common.py | 15 +++++++++++++ 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 96178194..0c54b103 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -1,11 +1,13 @@ -from datetime import datetime -import boto3 -import io import csv +import io import sys +from datetime import datetime + +import boto3 + +from common import convert_quotechar, debug_log from end import end from error import error -from common import debug_log # 定数 DIRECTORY_WORK = '/work/' @@ -23,7 +25,7 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, - 'reserved0': 12, + 'commaReplaceColumns': 12, 'importManner': 13, 'reserved1': 14, 'reserved2': 15, @@ -83,13 +85,13 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_response = work_obj.get() work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) work_header_list = [] - for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): work_header_list = line break # ② C-0のデータ件数チェックを開始する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') - if not len(work_header_list): + if is_empty_file(work_header_list, settings_list): print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') end(bucket_name, target_data_source, target_file_name, '', log_info, mode) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') @@ -122,3 +124,22 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + + +def is_empty_file(work_header_list: list, settings_list: list): + """② C-0のデータ件数チェック + ヘッダ行がある場合は、1行目を読み飛ばして判定する + + Args: + work_header_list (list): CSVファイルの1行目 + settings_list (list): 個別設定ファイルのリスト + + Returns: + bool: CSVファイルの1行目が0件だった場合はTrue + """ + has_header = settings_list[SETTINGS_ITEM["headerFlag"]] + # ヘッダのみのファイルも0バイトファイルをみなす + if has_header: + return len(work_header_list[1:]) == 0 + + return len(work_header_list) == 0 diff --git a/ecs/dataimport/dataimport/common.py b/ecs/dataimport/dataimport/common.py index 8b8eeed6..1361ff53 100644 --- a/ecs/dataimport/dataimport/common.py +++ b/ecs/dataimport/dataimport/common.py @@ -11,3 +11,18 @@ MODE_TYPE = { def debug_log(log, log_info, mode): if MODE_TYPE['d'] == mode: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}') + +def convert_quotechar(quotechar): + """csvモジュールの囲い文字を変換する + + Args: + quotechar : 項目囲い文字の設定値 + + Returns: + 空文字、空白文字の場合→None + それ以外→設定値をそのまま帰す + """ + if (quotechar.strip(' ') == ''): + return None + + return quotechar From 7909ad7034672b1b5ba672984b7e8e71d88d8c7f Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 13:09:04 +0900 Subject: [PATCH 4/6] =?UTF-8?q?feat:=20develop-6crm=E3=81=A7=E3=81=AE?= =?UTF-8?q?=E8=BF=BD=E5=8A=A0=E5=88=86=E3=82=92=E3=83=9E=E3=83=BC=E3=82=B8?= =?UTF-8?q?=EF=BC=88main=E5=87=A6=E7=90=86=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 49 +++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 281c1728..816b601a 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,11 +1,12 @@ from datetime import datetime +import re import boto3 import pymysql from pymysql.constants import CLIENT import io import csv from error import error -from common import debug_log +from common import debug_log, convert_quotechar # 定数 DIRECTORY_WORK = '/work/' @@ -23,7 +24,7 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, - 'reserved0': 12, + 'commaReplaceColumns': 12, 'importManner': 13, 'reserved1': 14, 'reserved2': 15, @@ -94,7 +95,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) - # 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える + # 設定ファイルに記載のない行を空文字として扱い、予約行とする for _ in range(len(SETTINGS_ITEM) - len(settings_list)): settings_list.append('') @@ -117,8 +118,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf warning_info = '' # ワーニング情報 index = 0 # ループインデックス settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',') + settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',') - for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): + for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): try: if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0: index += 1 @@ -131,8 +133,8 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # SQL文生成 query_parameter_list = [] sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' - for i in range(len(settings_db_columu_list)): - sql = f'{sql} {settings_db_columu_list[i]},' + for db_column in settings_db_columu_list: + sql = f'{sql} {db_column},' sql = f'{sql} file_name,' # システム項目:取込ファイル名 sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号 sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ @@ -143,14 +145,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} VALUES (' for i in range(len(line)): # データ項目値が0桁より大きいかチェックする - if len(line[i]) > 0: - # 0桁より大きい場合 - # INSERT文のパラメータとそれに対応するプレースホルダーを設定する - query_parameter_list.append(line[i]) - sql = f'{sql} %s,' - else: - # 上記以外の場合 + if len(line[i]) == 0: + # 0桁の場合 sql = f'{sql} NULL,' + continue + + # データ項目値の変換処理 + org_column_value = line[i] + current_settings_db_column_name = settings_db_columu_list[i] + column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(column_value) + sql = f'{sql} %s,' sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 sql = f'{sql} "0",' # システム項目:論理削除フラグ @@ -304,7 +310,24 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) +def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): + """データ項目値変換処理 + Args: + org_column_value : 投入データの値 + current_settings_db_column_name : 投入データのDBカラム物理名 + settings_replace_comma_list : 投入データの数値型のDBカラム物理名のリスト + + Returns: + converted_column_value:変換処理を行った投入データの値 + """ + # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + if current_settings_db_column_name in settings_replace_comma_list: + converted_column_value = converted_column_value.replace(',', '') + + return converted_column_value + + def truncate_judge(settings_list): """TRUNCATE処理対応判定 Args: From 9280a7cdab026e88c801f184ed8b8c2ef09768dc Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 13:12:53 +0900 Subject: [PATCH 5/6] =?UTF-8?q?fix:=20=E6=95=B0=E5=80=A4=E3=81=AE=E3=82=AB?= =?UTF-8?q?=E3=83=B3=E3=83=9E=E9=99=A4=E5=8E=BB=E5=87=A6=E7=90=86=E3=82=92?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3=20=E8=BB=BD=E5=BE=AE=E3=81=AA=E4=BF=AE?= =?UTF-8?q?=E6=AD=A3=E3=82=82=E4=B8=80=E7=B7=92=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/main.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 816b601a..641dc8f1 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,12 +1,14 @@ -from datetime import datetime +import csv +import io import re +from datetime import datetime + import boto3 import pymysql from pymysql.constants import CLIENT -import io -import csv + +from common import convert_quotechar, debug_log from error import error -from common import debug_log, convert_quotechar # 定数 DIRECTORY_WORK = '/work/' @@ -150,7 +152,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf sql = f'{sql} NULL,' continue - # データ項目値の変換処理 + # データ項目値の変換処理(カンマ除去) org_column_value = line[i] current_settings_db_column_name = settings_db_columu_list[i] column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) @@ -312,7 +314,7 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): """データ項目値変換処理 - + - 数値内のカンマ除去処理 Args: org_column_value : 投入データの値 current_settings_db_column_name : 投入データのDBカラム物理名 @@ -321,13 +323,14 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett Returns: converted_column_value:変換処理を行った投入データの値 """ - # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + converted_column_value = org_column_value if current_settings_db_column_name in settings_replace_comma_list: converted_column_value = converted_column_value.replace(',', '') return converted_column_value - + def truncate_judge(settings_list): """TRUNCATE処理対応判定 Args: From 7ccbe2183d29e347beb10c9b61f34e082d28150f Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 13:39:59 +0900 Subject: [PATCH 6/6] =?UTF-8?q?feat:=20=E5=88=A5=E5=AF=BE=E5=BF=9C?= =?UTF-8?q?=E3=81=AE=E4=BF=AE=E6=AD=A3=E5=86=85=E5=AE=B9=E3=82=92=E5=8F=8D?= =?UTF-8?q?=E6=98=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/dataimport/chk.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index 0c54b103..7e050357 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -84,14 +84,18 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i work_obj = s3_resource.Object(bucket_name, work_key) work_response = work_obj.get() work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]]) - work_header_list = [] - for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]): - work_header_list = line + work_csv_row = [] + for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])): + # ヘッダあり、かつ、1行目の場合 + if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0: + work_csv_row.append(line) + continue + work_csv_row.append(line) break # ② C-0のデータ件数チェックを開始する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します') - if is_empty_file(work_header_list, settings_list): + if is_empty_file(work_csv_row, settings_list): print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します') end(bucket_name, target_data_source, target_file_name, '', log_info, mode) print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了') @@ -100,16 +104,17 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i # ③ C-1の項目数チェックを開始する print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します') - work_header_list_len = len(work_header_list) - if work_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): + work_csv_row_item_len = len(work_csv_row[0]) + if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]): print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了') else: - raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_header_list_len}') + raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}') # ④ C-2の項目並び順チェック開始する if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します') settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',') + work_header_list = work_csv_row[0] for i in range(len(settings_header_list)): if not settings_header_list[i] == work_header_list[i]: raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}') @@ -126,20 +131,20 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i error(bucket_name, target_data_source, target_file_name, log_info) -def is_empty_file(work_header_list: list, settings_list: list): +def is_empty_file(work_csv_row: list, settings_list: list): """② C-0のデータ件数チェック ヘッダ行がある場合は、1行目を読み飛ばして判定する Args: - work_header_list (list): CSVファイルの1行目 + work_csv_row (list): CSVファイルの1行目(ヘッダを含む場合は2行目まで) settings_list (list): 個別設定ファイルのリスト Returns: bool: CSVファイルの1行目が0件だった場合はTrue """ - has_header = settings_list[SETTINGS_ITEM["headerFlag"]] + has_header = int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 # ヘッダのみのファイルも0バイトファイルをみなす if has_header: - return len(work_header_list[1:]) == 0 + return len(work_csv_row[1:]) == 0 - return len(work_header_list) == 0 + return len(work_csv_row) == 0