From 45ba4eab7424f9a71b0f93273f99183f41ad63c9 Mon Sep 17 00:00:00 2001 From: "shimoda.m@nds-tyo.co.jp" Date: Fri, 16 Sep 2022 12:14:52 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20develop6-crm=E3=81=AE=E3=83=87=E3=83=BC?= =?UTF-8?q?=E3=82=BF=E7=99=BB=E9=8C=B2=E5=87=A6=E7=90=86=E9=96=A2=E9=80=A3?= =?UTF-8?q?=E3=81=AE=E5=A4=89=E6=9B=B4=E5=B7=AE=E5=88=86=E3=82=92=E3=83=9E?= =?UTF-8?q?=E3=83=BC=E3=82=B8=20=EF=BC=88=E8=A8=AD=E5=AE=9A=E3=83=95?= =?UTF-8?q?=E3=82=A1=E3=82=A4=E3=83=AB=E3=81=AA=E3=81=A9=E3=81=AF=E3=83=AA?= =?UTF-8?q?=E3=83=AA=E3=83=BC=E3=82=B9=E3=81=97=E3=81=9F=E3=81=8F=E3=81=AA?= =?UTF-8?q?=E3=81=84=E3=81=9F=E3=82=81=E3=80=81=E3=82=B3=E3=83=BC=E3=83=89?= =?UTF-8?q?=E3=81=A0=E3=81=91=E3=82=92=E3=83=9E=E3=83=BC=E3=82=B8=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecs/dataimport/README.md | 48 ++++++++++++++++++++++ ecs/dataimport/dataimport/main.py | 66 +++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 ecs/dataimport/README.md diff --git a/ecs/dataimport/README.md b/ecs/dataimport/README.md new file mode 100644 index 00000000..8502b98e --- /dev/null +++ b/ecs/dataimport/README.md @@ -0,0 +1,48 @@ +# データ取り込み処理 + +## 概要 + +データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う +処理順序等の詳細は設計書を参照のこと + +## 環境構築 + +### 事前準備 + +- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること + +### Python仮想環境にパッケージをインストール + +- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する + + ```sh + pipenv install -r requirements.txt + ``` + +- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること + +## ローカルでの実行手順 + +- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける + - 既にある場合は作成不要 + + ```json + { + "version": "0.2.0", + "configurations": [ + { + "name": "Python: データ取り込み処理", + "type": "python", + "request": "launch", + "program": "ecs/dataimport/controller.py", + "console": "integratedTerminal", + "justMyCode": true, + "envFile": "${workspaceFolder}/.env", + } + ] + } + ``` + +- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する + - 設定する環境変数は設計書を参照のこと +- F5キーを押し、処理を実行する diff --git a/ecs/dataimport/dataimport/main.py b/ecs/dataimport/dataimport/main.py index 730a7be8..281c1728 100644 --- a/ecs/dataimport/dataimport/main.py +++ b/ecs/dataimport/dataimport/main.py @@ -23,6 +23,14 @@ SETTINGS_ITEM = { 'storageSchemaName': 9, 'loadSchemaName': 10, 'exSqlFileName': 11, + 'reserved0': 12, + 'importManner': 13, + 'reserved1': 14, + 'reserved2': 15, + 'reserved3': 16, + 'reserved4': 17, + 'reserved5': 18, + 'reserved6': 19 } LINE_FEED_CODE = { 'CR': '\r', @@ -30,6 +38,9 @@ LINE_FEED_CODE = { 'CRLF': '\r\n', } DIRECTORY_SETTINGS = '/settings/' +TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:' +TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]' +INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください' # クラス変数 s3_client = boto3.client('s3') @@ -76,12 +87,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました') # ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする + # 個別設定ファイルの読み込み settings_obj = s3_resource.Object(bucket_name, settings_key) settings_response = settings_obj.get() settings_list = [] for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'): settings_list.append(line.rstrip('\n')) + # 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える + for _ in range(len(SETTINGS_ITEM) - len(settings_list)): + settings_list.append('') + + # ロードスキーマのTRUNCATE with conn.cursor() as cur: sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}' cur.execute(sql_truncate) @@ -112,6 +129,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf process_count += 1 # SQL文生成 + query_parameter_list = [] sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} (' for i in range(len(settings_db_columu_list)): sql = f'{sql} {settings_db_columu_list[i]},' @@ -127,8 +145,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # データ項目値が0桁より大きいかチェックする if len(line[i]) > 0: # 0桁より大きい場合 - replace_line = line[i].replace('\\', '\\\\') - sql = f'{sql} "{replace_line}",' + # INSERT文のパラメータとそれに対応するプレースホルダーを設定する + query_parameter_list.append(line[i]) + sql = f'{sql} %s,' else: # 上記以外の場合 sql = f'{sql} NULL,' @@ -146,7 +165,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ロードスキーマのトランザクション開始 with conn.cursor() as cur: - cur.execute(sql) + cur.execute(sql, query_parameter_list) conn.commit() normal_count += 1 except Exception as e: @@ -161,6 +180,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf # ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します') + + # インポート方法判断 + try: + if truncate_judge(settings_list): + with conn.cursor() as cur: + sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}' + cur.execute(sql_truncate) + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました') + + except InvalidConfigException as e: + print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}') + error(bucket_name, target_data_source, target_file_name, log_info) # SQL文生成 sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} (' @@ -272,3 +303,32 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo except Exception as e: print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}') error(bucket_name, target_data_source, target_file_name, log_info) + + +def truncate_judge(settings_list): + """TRUNCATE処理対応判定 + Args: + settings_list (list): 個別設定ファイル + Raises: + InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス + Returns: + Bool: Truncate対象の場合True。Truncate対象でない場合False + """ + + # upsert判定 + if not settings_list[SETTINGS_ITEM["importManner"]]: + return False + + # インポート方法設定チェック + if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL): + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':') + if len(import_manner_splitted_list) != 2: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]: + raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE) + return True + + +class InvalidConfigException(Exception): + pass