diff --git a/ecs/dataimport/README.md b/ecs/dataimport/README.md new file mode 100644 index 00000000..8502b98e --- /dev/null +++ b/ecs/dataimport/README.md @@ -0,0 +1,48 @@ +# データ取り込み処理 + +## 概要 + +データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う +処理順序等の詳細は設計書を参照のこと + +## 環境構築 + +### 事前準備 + +- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること + +### Python仮想環境にパッケージをインストール + +- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する + + ```sh + pipenv install -r requirements.txt + ``` + +- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること + +## ローカルでの実行手順 + +- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける + - 既にある場合は作成不要 + + ```json + { + "version": "0.2.0", + "configurations": [ + { + "name": "Python: データ取り込み処理", + "type": "python", + "request": "launch", + "program": "ecs/dataimport/controller.py", + "console": "integratedTerminal", + "justMyCode": true, + "envFile": "${workspaceFolder}/.env", + } + ] + } + ``` + +- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する + - 設定する環境変数は設計書を参照のこと +- F5キーを押し、処理を実行する diff --git a/ecs/dataimport/dataimport/chk.py b/ecs/dataimport/dataimport/chk.py index d90e2fb5..7e050357 100644 --- a/ecs/dataimport/dataimport/chk.py +++ b/ecs/dataimport/dataimport/chk.py @@ -26,7 +26,7 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, 'commaReplaceColumns': 12, - 'reserved0': 13, + 'importManner': 13, 'reserved1': 14, 'reserved2': 15, 'reserved3': 16, diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 23932c6f..641dc8f1 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -1,12 +1,14 @@ -from datetime import datetime +import csv +import io import re +from datetime import datetime + import boto3 import pymysql from pymysql.constants import CLIENT -import io -import csv + +from common import convert_quotechar, debug_log from error import error -from common import debug_log, convert_quotechar # 定数 DIRECTORY_WORK = '/work/' @@ -25,7 +27,7 @@ SETTINGS_ITEM = { 'loadSchemaName': 10, 'exSqlFileName': 11, 'commaReplaceColumns': 12, - 'reserved0': 13, + 'importManner': 13, 'reserved1': 14, 'reserved2': 15, 'reserved3': 16, @@ -39,6 +41,9 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } DIRECTORY_SETTINGS = '/settings/' +TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' +TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' +INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' # クラス変数 s3_client = boto3.client('s3') @@ -95,7 +100,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # 設定ファイルに記載のない行を空文字として扱い、予約行とする for _ in range(len(SETTINGS_ITEM) - len(settings_list)): settings_list.append('') - + # ロードスキーマのTRUNCATE with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' @@ -128,6 +133,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf process_count += 1 # SQL文生成 + query_parameter_list = [] sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' for db_column in settings_db_columu_list: sql = f'{sql} {db_column},' @@ -145,13 +151,14 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # 0桁の場合 sql = f'{sql} NULL,' continue - - # データ項目値の変換処理 + + # データ項目値の変換処理(カンマ除去) org_column_value = line[i] current_settings_db_column_name = settings_db_columu_list[i] column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list) - sql = f'{sql} "{column_value}",' - + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(column_value) + sql = f'{sql} %s,' sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名 sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号 sql = f'{sql} "0",' # システム項目:論理削除フラグ @@ -166,7 +173,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ロードスキーマのトランザクション開始 with conn.cursor() as cur: - cur.execute(sql) + cur.execute(sql, query_parameter_list) conn.commit() normal_count += 1 except Exception as e: @@ -181,6 +188,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + + # インポート方法判断 + try: + if truncate_judge(settings_list): + with conn.cursor() as cur: + sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + cur.execute(sql_truncate) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + + except InvalidConfigException as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' @@ -295,7 +314,7 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list): """データ項目値変換処理 - + - 数値内のカンマ除去処理 Args: org_column_value : 投入データの値 current_settings_db_column_name : 投入データのDBカラム物理名 @@ -304,12 +323,38 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett Returns: converted_column_value:変換処理を行った投入データの値 """ - # データ内の「\\」という文字がpythonで読んだ場合に「\\\\」となるため、「\\」に戻す - converted_column_value = org_column_value.replace('\\', '\\\\') - - # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + # 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く + converted_column_value = org_column_value if current_settings_db_column_name in settings_replace_comma_list: converted_column_value = converted_column_value.replace(',', '') return converted_column_value - \ No newline at end of file + + +def truncate_judge(settings_list): + """TRUNCATE処理対応判定 + Args: + settings_list (list): 個別設定ファイル + Raises: + InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス + Returns: + Bool: Truncate対象の場合True。Truncate対象でない場合False + """ + + # upsert判定 + if not settings_list[SETTINGS_ITEM["importManner"]]: + return False + + # インポート方法設定チェック + if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + if len(import_manner_splitted_list) != 2: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + return True + + +class InvalidConfigException(Exception): + pass