Merge branch 'develop' into feature-NEWDWH2021-731-merge-from-develop

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2022-09-20 09:20:40 +09:00
commit c1113137bb
3 changed files with 111 additions and 18 deletions

48
ecs/dataimport/README.md Normal file
View File

@ -0,0 +1,48 @@
# データ取り込み処理
## 概要
データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う
処理順序等の詳細は設計書を参照のこと
## 環境構築
### 事前準備
- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること
### Python仮想環境にパッケージをインストール
- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する
```sh
pipenv install -r requirements.txt
```
- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること
## ローカルでの実行手順
- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける
- 既にある場合は作成不要
```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: データ取り込み処理",
"type": "python",
"request": "launch",
"program": "ecs/dataimport/controller.py",
"console": "integratedTerminal",
"justMyCode": true,
"envFile": "${workspaceFolder}/.env",
}
]
}
```
- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する
- 設定する環境変数は設計書を参照のこと
- F5キーを押し、処理を実行する

View File

@ -26,7 +26,7 @@ SETTINGS_ITEM = {
'loadSchemaName': 10,
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'reserved0': 13,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,

View File

@ -1,12 +1,14 @@
from datetime import datetime
import csv
import io
import re
from datetime import datetime
import boto3
import pymysql
from pymysql.constants import CLIENT
import io
import csv
from common import convert_quotechar, debug_log
from error import error
from common import debug_log, convert_quotechar
# 定数
DIRECTORY_WORK = '/work/'
@ -25,7 +27,7 @@ SETTINGS_ITEM = {
'loadSchemaName': 10,
'exSqlFileName': 11,
'commaReplaceColumns': 12,
'reserved0': 13,
'importManner': 13,
'reserved1': 14,
'reserved2': 15,
'reserved3': 16,
@ -39,6 +41,9 @@ LINE_FEED_CODE = {
'CRLF': '\r\n',
}
DIRECTORY_SETTINGS = '/settings/'
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
# クラス変数
s3_client = boto3.client('s3')
@ -95,7 +100,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
settings_list.append('')
# ロードスキーマのTRUNCATE
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
@ -128,6 +133,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
process_count += 1
# SQL文生成
query_parameter_list = []
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
for db_column in settings_db_columu_list:
sql = f'{sql} {db_column},'
@ -145,13 +151,14 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# 0桁の場合
sql = f'{sql} NULL,'
continue
# データ項目値の変換処理
# データ項目値の変換処理(カンマ除去)
org_column_value = line[i]
current_settings_db_column_name = settings_db_columu_list[i]
column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list)
sql = f'{sql} "{column_value}",'
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
query_parameter_list.append(column_value)
sql = f'{sql} %s,'
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
sql = f'{sql} "0",' # システム項目:論理削除フラグ
@ -166,7 +173,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# ロードスキーマのトランザクション開始
with conn.cursor() as cur:
cur.execute(sql)
cur.execute(sql, query_parameter_list)
conn.commit()
normal_count += 1
except Exception as e:
@ -181,6 +188,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
# インポート方法判断
try:
if truncate_judge(settings_list):
with conn.cursor() as cur:
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
cur.execute(sql_truncate)
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
except InvalidConfigException as e:
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
error(bucket_name, target_data_source, target_file_name, log_info)
# SQL文生成
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
@ -295,7 +314,7 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
"""データ項目値変換処理
- 数値内のカンマ除去処理
Args:
org_column_value : 投入データの値
current_settings_db_column_name : 投入データのDBカラム物理名
@ -304,12 +323,38 @@ def convert_column_value(org_column_value, current_settings_db_column_name, sett
Returns:
converted_column_value:変換処理を行った投入データの値
"""
# データ内の「\\」という文字がpythonで読んだ場合に「\\\\」となるため、「\\」に戻す
converted_column_value = org_column_value.replace('\\', '\\\\')
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
converted_column_value = org_column_value
if current_settings_db_column_name in settings_replace_comma_list:
converted_column_value = converted_column_value.replace(',', '')
return converted_column_value
def truncate_judge(settings_list):
"""TRUNCATE処理対応判定
Args:
settings_list (list): 個別設定ファイル
Raises:
InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス
Returns:
Bool: Truncate対象の場合TrueTruncate対象でない場合False
"""
# upsert判定
if not settings_list[SETTINGS_ITEM["importManner"]]:
return False
# インポート方法設定チェック
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
if len(import_manner_splitted_list) != 2:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
return True
class InvalidConfigException(Exception):
pass