Merge pull request #103 feature-NEWDWH2021-731 into develop
This commit is contained in:
commit
1da7a46e34
48
ecs/dataimport/README.md
Normal file
48
ecs/dataimport/README.md
Normal file
@ -0,0 +1,48 @@
|
||||
# データ取り込み処理
|
||||
|
||||
## 概要
|
||||
|
||||
データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う
|
||||
処理順序等の詳細は設計書を参照のこと
|
||||
|
||||
## 環境構築
|
||||
|
||||
### 事前準備
|
||||
|
||||
- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること
|
||||
|
||||
### Python仮想環境にパッケージをインストール
|
||||
|
||||
- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する
|
||||
|
||||
```sh
|
||||
pipenv install -r requirements.txt
|
||||
```
|
||||
|
||||
- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること
|
||||
|
||||
## ローカルでの実行手順
|
||||
|
||||
- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける
|
||||
- 既にある場合は作成不要
|
||||
|
||||
```json
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: データ取り込み処理",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "ecs/dataimport/controller.py",
|
||||
"console": "integratedTerminal",
|
||||
"justMyCode": true,
|
||||
"envFile": "${workspaceFolder}/.env",
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する
|
||||
- 設定する環境変数は設計書を参照のこと
|
||||
- F5キーを押し、処理を実行する
|
||||
@ -1,11 +1,13 @@
|
||||
from datetime import datetime
|
||||
import boto3
|
||||
import io
|
||||
import csv
|
||||
import io
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import boto3
|
||||
|
||||
from common import convert_quotechar, debug_log
|
||||
from end import end
|
||||
from error import error
|
||||
from common import debug_log
|
||||
|
||||
# 定数
|
||||
DIRECTORY_WORK = '/work/'
|
||||
@ -23,6 +25,14 @@ SETTINGS_ITEM = {
|
||||
'storageSchemaName': 9,
|
||||
'loadSchemaName': 10,
|
||||
'exSqlFileName': 11,
|
||||
'commaReplaceColumns': 12,
|
||||
'importManner': 13,
|
||||
'reserved1': 14,
|
||||
'reserved2': 15,
|
||||
'reserved3': 16,
|
||||
'reserved4': 17,
|
||||
'reserved5': 18,
|
||||
'reserved6': 19
|
||||
}
|
||||
LINE_FEED_CODE = {
|
||||
'CR': '\r',
|
||||
@ -74,14 +84,18 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
work_obj = s3_resource.Object(bucket_name, work_key)
|
||||
work_response = work_obj.get()
|
||||
work_data = io.TextIOWrapper(io.BytesIO(work_response["Body"].read()), encoding=settings_list[SETTINGS_ITEM["charCode"]], newline=LINE_FEED_CODE[settings_list[SETTINGS_ITEM["lineFeedCode"]]])
|
||||
work_header_list = []
|
||||
for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||
work_header_list = line
|
||||
work_csv_row = []
|
||||
for i, line in enumerate(csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]])):
|
||||
# ヘッダあり、かつ、1行目の場合
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1 and i == 0:
|
||||
work_csv_row.append(line)
|
||||
continue
|
||||
work_csv_row.append(line)
|
||||
break
|
||||
|
||||
# ② C-0のデータ件数チェックを開始する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-02 - C-0のチェックを開始します')
|
||||
if not len(work_header_list):
|
||||
if is_empty_file(work_csv_row, settings_list):
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-03 - 投入ファイルが0バイトのため処理を終了します')
|
||||
end(bucket_name, target_data_source, target_file_name, '', log_info, mode)
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-04 - 終了処理完了')
|
||||
@ -90,16 +104,17 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
|
||||
# ③ C-1の項目数チェックを開始する
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-06 - C-1のチェックを開始します')
|
||||
work_header_list_len = len(work_header_list)
|
||||
if work_header_list_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||||
work_csv_row_item_len = len(work_csv_row[0])
|
||||
if work_csv_row_item_len == int(settings_list[SETTINGS_ITEM["csvNumItems"]]):
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-07 - C-1:正常終了')
|
||||
else:
|
||||
raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_header_list_len}')
|
||||
raise CheckError(f'E-CHK-01 - 項目数が一致しません 個別設定ファイル項目数:{settings_list[SETTINGS_ITEM["csvNumItems"]]} 投入データ項目数:{work_csv_row_item_len}')
|
||||
|
||||
# ④ C-2の項目並び順チェック開始する
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-CHK-08 - C-2のチェックを開始します')
|
||||
settings_header_list = settings_list[SETTINGS_ITEM["csvNameItems"]].rstrip().split(',')
|
||||
work_header_list = work_csv_row[0]
|
||||
for i in range(len(settings_header_list)):
|
||||
if not settings_header_list[i] == work_header_list[i]:
|
||||
raise CheckError(f'E-CHK-02 - 項目順序が一致しません {i + 1}番目の項目 個別設定ファイル項目:{settings_header_list[i]} 投入データ項目:{work_header_list[i]}')
|
||||
@ -114,3 +129,22 @@ def check(bucket_name, target_data_source, target_file_name, settings_key, log_i
|
||||
except Exception as e:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-CHK-99 - エラー内容:{e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
|
||||
def is_empty_file(work_csv_row: list, settings_list: list):
|
||||
"""② C-0のデータ件数チェック
|
||||
ヘッダ行がある場合は、1行目を読み飛ばして判定する
|
||||
|
||||
Args:
|
||||
work_csv_row (list): CSVファイルの1行目(ヘッダを含む場合は2行目まで)
|
||||
settings_list (list): 個別設定ファイルのリスト
|
||||
|
||||
Returns:
|
||||
bool: CSVファイルの1行目が0件だった場合はTrue
|
||||
"""
|
||||
has_header = int(settings_list[SETTINGS_ITEM["headerFlag"]]) == 1
|
||||
# ヘッダのみのファイルも0バイトファイルをみなす
|
||||
if has_header:
|
||||
return len(work_csv_row[1:]) == 0
|
||||
|
||||
return len(work_csv_row) == 0
|
||||
|
||||
@ -11,3 +11,18 @@ MODE_TYPE = {
|
||||
def debug_log(log, log_info, mode):
|
||||
if MODE_TYPE['d'] == mode:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["d"]} {log}')
|
||||
|
||||
def convert_quotechar(quotechar):
|
||||
"""csvモジュールの囲い文字を変換する
|
||||
|
||||
Args:
|
||||
quotechar : 項目囲い文字の設定値
|
||||
|
||||
Returns:
|
||||
空文字、空白文字の場合→None
|
||||
それ以外→設定値をそのまま帰す
|
||||
"""
|
||||
if (quotechar.strip(' ') == ''):
|
||||
return None
|
||||
|
||||
return quotechar
|
||||
|
||||
@ -1,11 +1,14 @@
|
||||
import csv
|
||||
import io
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import boto3
|
||||
import pymysql
|
||||
from pymysql.constants import CLIENT
|
||||
import io
|
||||
import csv
|
||||
|
||||
from common import convert_quotechar, debug_log
|
||||
from error import error
|
||||
from common import debug_log
|
||||
|
||||
# 定数
|
||||
DIRECTORY_WORK = '/work/'
|
||||
@ -23,6 +26,14 @@ SETTINGS_ITEM = {
|
||||
'storageSchemaName': 9,
|
||||
'loadSchemaName': 10,
|
||||
'exSqlFileName': 11,
|
||||
'commaReplaceColumns': 12,
|
||||
'importManner': 13,
|
||||
'reserved1': 14,
|
||||
'reserved2': 15,
|
||||
'reserved3': 16,
|
||||
'reserved4': 17,
|
||||
'reserved5': 18,
|
||||
'reserved6': 19
|
||||
}
|
||||
LINE_FEED_CODE = {
|
||||
'CR': '\r',
|
||||
@ -30,6 +41,9 @@ LINE_FEED_CODE = {
|
||||
'CRLF': '\r\n',
|
||||
}
|
||||
DIRECTORY_SETTINGS = '/settings/'
|
||||
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
|
||||
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
|
||||
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
|
||||
|
||||
# クラス変数
|
||||
s3_client = boto3.client('s3')
|
||||
@ -76,12 +90,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
|
||||
|
||||
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||||
# 個別設定ファイルの読み込み
|
||||
settings_obj = s3_resource.Object(bucket_name, settings_key)
|
||||
settings_response = settings_obj.get()
|
||||
settings_list = []
|
||||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||||
settings_list.append(line.rstrip('\n'))
|
||||
|
||||
# 設定ファイルに記載のない行を空文字として扱い、予約行とする
|
||||
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
|
||||
settings_list.append('')
|
||||
|
||||
# ロードスキーマのTRUNCATE
|
||||
with conn.cursor() as cur:
|
||||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
||||
cur.execute(sql_truncate)
|
||||
@ -100,8 +120,9 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
warning_info = '' # ワーニング情報
|
||||
index = 0 # ループインデックス
|
||||
settings_db_columu_list = settings_list[SETTINGS_ITEM["dbColumuName"]].rstrip().split(',')
|
||||
settings_replace_comma_list = settings_list[SETTINGS_ITEM["commaReplaceColumns"]].rstrip().split(',')
|
||||
|
||||
for line in csv.reader(work_data, quotechar=settings_list[SETTINGS_ITEM["quotechar"]], delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||
for line in csv.reader(work_data, quotechar=convert_quotechar(settings_list[SETTINGS_ITEM["quotechar"]]), delimiter=settings_list[SETTINGS_ITEM["delimiter"]]):
|
||||
try:
|
||||
if int(settings_list[SETTINGS_ITEM["headerFlag"]]) == True and index == 0:
|
||||
index += 1
|
||||
@ -112,9 +133,10 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
process_count += 1
|
||||
|
||||
# SQL文生成
|
||||
query_parameter_list = []
|
||||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["loadSchemaName"]]} ('
|
||||
for i in range(len(settings_db_columu_list)):
|
||||
sql = f'{sql} {settings_db_columu_list[i]},'
|
||||
for db_column in settings_db_columu_list:
|
||||
sql = f'{sql} {db_column},'
|
||||
sql = f'{sql} file_name,' # システム項目:取込ファイル名
|
||||
sql = f'{sql} file_row_cnt,' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} delete_flg,' # システム項目:論理削除フラグ
|
||||
@ -125,13 +147,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
sql = f'{sql} VALUES ('
|
||||
for i in range(len(line)):
|
||||
# データ項目値が0桁より大きいかチェックする
|
||||
if len(line[i]) > 0:
|
||||
# 0桁より大きい場合
|
||||
replace_line = line[i].replace('\\', '\\\\')
|
||||
sql = f'{sql} "{replace_line}",'
|
||||
else:
|
||||
# 上記以外の場合
|
||||
if len(line[i]) == 0:
|
||||
# 0桁の場合
|
||||
sql = f'{sql} NULL,'
|
||||
continue
|
||||
|
||||
# データ項目値の変換処理(カンマ除去)
|
||||
org_column_value = line[i]
|
||||
current_settings_db_column_name = settings_db_columu_list[i]
|
||||
column_value = convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list)
|
||||
# INSERT文のパラメータとそれに対応するプレースホルダーを設定する
|
||||
query_parameter_list.append(column_value)
|
||||
sql = f'{sql} %s,'
|
||||
sql = f'{sql} "{target_file_name}",' # システム項目:取込ファイル名
|
||||
sql = f'{sql} "{index + 1}",' # システム項目:取込ファイル行番号
|
||||
sql = f'{sql} "0",' # システム項目:論理削除フラグ
|
||||
@ -146,7 +173,7 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
|
||||
# ロードスキーマのトランザクション開始
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql)
|
||||
cur.execute(sql, query_parameter_list)
|
||||
conn.commit()
|
||||
normal_count += 1
|
||||
except Exception as e:
|
||||
@ -162,6 +189,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
||||
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
||||
|
||||
# インポート方法判断
|
||||
try:
|
||||
if truncate_judge(settings_list):
|
||||
with conn.cursor() as cur:
|
||||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
|
||||
cur.execute(sql_truncate)
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
|
||||
|
||||
except InvalidConfigException as e:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
# SQL文生成
|
||||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
|
||||
for i in range(len(settings_db_columu_list)):
|
||||
@ -272,3 +311,50 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
|
||||
except Exception as e:
|
||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||
|
||||
def convert_column_value(org_column_value, current_settings_db_column_name, settings_replace_comma_list):
|
||||
"""データ項目値変換処理
|
||||
- 数値内のカンマ除去処理
|
||||
Args:
|
||||
org_column_value : 投入データの値
|
||||
current_settings_db_column_name : 投入データのDBカラム物理名
|
||||
settings_replace_comma_list : 投入データの数値型のDBカラム物理名のリスト
|
||||
|
||||
Returns:
|
||||
converted_column_value:変換処理を行った投入データの値
|
||||
"""
|
||||
# 投入データのDB物理カラム名が設定ファイルの数値型のDBカラム物理名に含まれている場合、データ項目値の「,」を取り除く
|
||||
converted_column_value = org_column_value
|
||||
if current_settings_db_column_name in settings_replace_comma_list:
|
||||
converted_column_value = converted_column_value.replace(',', '')
|
||||
|
||||
return converted_column_value
|
||||
|
||||
|
||||
def truncate_judge(settings_list):
|
||||
"""TRUNCATE処理対応判定
|
||||
Args:
|
||||
settings_list (list): 個別設定ファイル
|
||||
Raises:
|
||||
InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス
|
||||
Returns:
|
||||
Bool: Truncate対象の場合True。Truncate対象でない場合False
|
||||
"""
|
||||
|
||||
# upsert判定
|
||||
if not settings_list[SETTINGS_ITEM["importManner"]]:
|
||||
return False
|
||||
|
||||
# インポート方法設定チェック
|
||||
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
|
||||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
|
||||
if len(import_manner_splitted_list) != 2:
|
||||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
|
||||
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||
return True
|
||||
|
||||
|
||||
class InvalidConfigException(Exception):
|
||||
pass
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user