Merge pull request #36 feature-NEWDWH2021-511 into develop-6crm
This commit is contained in:
commit
5f47171265
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,4 +1,7 @@
|
|||||||
lambda/mbj-newdwh2021-staging-NoticeToSlack/package-lock.json
|
lambda/mbj-newdwh2021-staging-NoticeToSlack/package-lock.json
|
||||||
lambda/mbj-newdwh2021-staging-NoticeToSlack/node_modules/*
|
lambda/mbj-newdwh2021-staging-NoticeToSlack/node_modules/*
|
||||||
lambda/mbj-newdwh2021-staging-PublishFromLog/package-lock.json
|
lambda/mbj-newdwh2021-staging-PublishFromLog/package-lock.json
|
||||||
lambda/mbj-newdwh2021-staging-PublishFromLog/node_modules/*
|
lambda/mbj-newdwh2021-staging-PublishFromLog/node_modules/*
|
||||||
|
__pycache__/
|
||||||
|
.env
|
||||||
|
settings.json
|
||||||
25
.vscode/launch.json
vendored
Normal file
25
.vscode/launch.json
vendored
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
{
|
||||||
|
// IntelliSense を使用して利用可能な属性を学べます。
|
||||||
|
// 既存の属性の説明をホバーして表示します。
|
||||||
|
// 詳細情報は次を確認してください: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python: データ取り込みローカル実行",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
// windowsだと\区切りかも
|
||||||
|
"program": "ecs\\dataimport\\dataimport\\controller.py",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"justMyCode": true,
|
||||||
|
"envFile": "${workspaceFolder}/.env"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Python: Attach using Process Id",
|
||||||
|
"type": "python",
|
||||||
|
"request": "attach",
|
||||||
|
"processId": "${command:pickProcess}",
|
||||||
|
"justMyCode": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
13
ecs/Dockerfile/Pipfile
Normal file
13
ecs/Dockerfile/Pipfile
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[[source]]
|
||||||
|
url = "https://pypi.org/simple"
|
||||||
|
verify_ssl = true
|
||||||
|
name = "pypi"
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
boto3 = "*"
|
||||||
|
pymysql = "*"
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.9"
|
||||||
85
ecs/Dockerfile/Pipfile.lock
generated
Normal file
85
ecs/Dockerfile/Pipfile.lock
generated
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
{
|
||||||
|
"_meta": {
|
||||||
|
"hash": {
|
||||||
|
"sha256": "7c69c0c237f231fcd67984f1d1b171c2eebfe00ed1877894bbbe77e201862057"
|
||||||
|
},
|
||||||
|
"pipfile-spec": 6,
|
||||||
|
"requires": {
|
||||||
|
"python_version": "3.9"
|
||||||
|
},
|
||||||
|
"sources": [
|
||||||
|
{
|
||||||
|
"name": "pypi",
|
||||||
|
"url": "https://pypi.org/simple",
|
||||||
|
"verify_ssl": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"default": {
|
||||||
|
"boto3": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:490f5e88f5551b33ae3019a37412158b76426d63d1fb910968ade9b6a024e5fe",
|
||||||
|
"sha256:e284705da36faa668c715ae1f74ebbff4320dbfbe3a733df3a8ab076d1ed1226"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==1.24.14"
|
||||||
|
},
|
||||||
|
"botocore": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:bb56fa77b8fa1ec367c2e16dee62d60000451aac5140dcce3ebddc167fd5c593",
|
||||||
|
"sha256:df1e9b208ff93daac7c645b0b04fb6dccd7f20262eae24d87941727025cbeece"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.7'",
|
||||||
|
"version": "==1.27.14"
|
||||||
|
},
|
||||||
|
"jmespath": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980",
|
||||||
|
"sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.7'",
|
||||||
|
"version": "==1.0.1"
|
||||||
|
},
|
||||||
|
"pymysql": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:41fc3a0c5013d5f039639442321185532e3e2c8924687abe6537de157d403641",
|
||||||
|
"sha256:816927a350f38d56072aeca5dfb10221fe1dc653745853d30a216637f5d7ad36"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"version": "==1.0.2"
|
||||||
|
},
|
||||||
|
"python-dateutil": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86",
|
||||||
|
"sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'",
|
||||||
|
"version": "==2.8.2"
|
||||||
|
},
|
||||||
|
"s3transfer": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd",
|
||||||
|
"sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '3.7'",
|
||||||
|
"version": "==0.6.0"
|
||||||
|
},
|
||||||
|
"six": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
|
||||||
|
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2'",
|
||||||
|
"version": "==1.16.0"
|
||||||
|
},
|
||||||
|
"urllib3": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14",
|
||||||
|
"sha256:aabaf16477806a5e1dd19aa41f8c2b7950dd3c746362d7e3223dbe6de6ac448e"
|
||||||
|
],
|
||||||
|
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'",
|
||||||
|
"version": "==1.26.9"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"develop": {}
|
||||||
|
}
|
||||||
48
ecs/dataimport/README.md
Normal file
48
ecs/dataimport/README.md
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
# データ取り込み処理
|
||||||
|
|
||||||
|
## 概要
|
||||||
|
|
||||||
|
データ取り込みバケット(`mbj-newdwh2021-<環境名>-data`)に配置されたデータファイルを、設定に基づいてデータベースに登録する処理を行う
|
||||||
|
処理順序等の詳細は設計書を参照のこと
|
||||||
|
|
||||||
|
## 環境構築
|
||||||
|
|
||||||
|
### 事前準備
|
||||||
|
|
||||||
|
- [Wiki - Python環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)の「pipenvの導入」まで完了していること
|
||||||
|
|
||||||
|
### Python仮想環境にパッケージをインストール
|
||||||
|
|
||||||
|
- このドキュメントと同じ階層でコマンドラインを開き、以下のコマンドを実行する
|
||||||
|
|
||||||
|
```sh
|
||||||
|
pipenv install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
- 以降、依存モジュールの追加が発生した場合に、`requirements.txt`に追記した上で、上記のコマンドを実行すること
|
||||||
|
|
||||||
|
## ローカルでの実行手順
|
||||||
|
|
||||||
|
- 当ディレクトリ内に`.vscode/launch.json`を作成し、以下のコードを貼り付ける
|
||||||
|
- 既にある場合は作成不要
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python: データ取り込み処理",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "ecs/dataimport/controller.py",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"justMyCode": true,
|
||||||
|
"envFile": "${workspaceFolder}/.env",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- 当ディレクトリ内に`.env`ファイルを、作成し、環境変数を設定する
|
||||||
|
- 設定する環境変数は設計書を参照のこと
|
||||||
|
- F5キーを押し、処理を実行する
|
||||||
@ -23,6 +23,14 @@ SETTINGS_ITEM = {
|
|||||||
'storageSchemaName': 9,
|
'storageSchemaName': 9,
|
||||||
'loadSchemaName': 10,
|
'loadSchemaName': 10,
|
||||||
'exSqlFileName': 11,
|
'exSqlFileName': 11,
|
||||||
|
'reserved0': 12,
|
||||||
|
'importManner': 13,
|
||||||
|
'reserved1': 14,
|
||||||
|
'reserved2': 15,
|
||||||
|
'reserved3': 16,
|
||||||
|
'reserved4': 17,
|
||||||
|
'reserved5': 18,
|
||||||
|
'reserved6': 19
|
||||||
}
|
}
|
||||||
LINE_FEED_CODE = {
|
LINE_FEED_CODE = {
|
||||||
'CR': '\r',
|
'CR': '\r',
|
||||||
@ -30,6 +38,9 @@ LINE_FEED_CODE = {
|
|||||||
'CRLF': '\r\n',
|
'CRLF': '\r\n',
|
||||||
}
|
}
|
||||||
DIRECTORY_SETTINGS = '/settings/'
|
DIRECTORY_SETTINGS = '/settings/'
|
||||||
|
TRUNCATE_SRC_TABLE_SYMBOL = 'truncate_src_table:'
|
||||||
|
TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT = f'{TRUNCATE_SRC_TABLE_SYMBOL}[蓄積スキーマのテーブル名]'
|
||||||
|
INVALID_CONFIG_EXCEPTION_MESSAGE = f'個別設定ファイルのインポート方法に不備がありました。 インポート方法は "{TRUNCATE_SRC_TABLE_IDENTIFY_SYMBOL_FORMAT}" のように設定してください'
|
||||||
|
|
||||||
# クラス変数
|
# クラス変数
|
||||||
s3_client = boto3.client('s3')
|
s3_client = boto3.client('s3')
|
||||||
@ -76,12 +87,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-03 - タイムゾーンを変更しました')
|
||||||
|
|
||||||
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
# ④ 個別設定ファイルのロードスキーマのテーブル名に記載されているテーブルをTRUNCATEする
|
||||||
|
# 個別設定ファイルの読み込み
|
||||||
settings_obj = s3_resource.Object(bucket_name, settings_key)
|
settings_obj = s3_resource.Object(bucket_name, settings_key)
|
||||||
settings_response = settings_obj.get()
|
settings_response = settings_obj.get()
|
||||||
settings_list = []
|
settings_list = []
|
||||||
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
for line in io.TextIOWrapper(io.BytesIO(settings_response["Body"].read()), encoding='utf-8'):
|
||||||
settings_list.append(line.rstrip('\n'))
|
settings_list.append(line.rstrip('\n'))
|
||||||
|
|
||||||
|
# 予約行挿入のためsetting_listとSETTINGS_ITEMの要素数を揃える
|
||||||
|
for _ in range(len(SETTINGS_ITEM) - len(settings_list)):
|
||||||
|
settings_list.append('')
|
||||||
|
|
||||||
|
# ロードスキーマのTRUNCATE
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["loadSchemaName"]]}'
|
||||||
cur.execute(sql_truncate)
|
cur.execute(sql_truncate)
|
||||||
@ -161,6 +178,18 @@ def main(bucket_name, target_data_source, target_file_name, settings_key, db_inf
|
|||||||
|
|
||||||
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
# ⑦ ロードスキーマのデータを蓄積スキーマにUPSERTする
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-08 - ロードスキーマ({settings_list[SETTINGS_ITEM["loadSchemaName"]]})のデータを蓄積スキーマ({settings_list[SETTINGS_ITEM["storageSchemaName"]]})に登録します')
|
||||||
|
|
||||||
|
# インポート方法判断
|
||||||
|
try:
|
||||||
|
if truncate_judge(settings_list):
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
sql_truncate = f'TRUNCATE table {settings_list[SETTINGS_ITEM["storageSchemaName"]]}'
|
||||||
|
cur.execute(sql_truncate)
|
||||||
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["i"]} I-MAIN-20 - {settings_list[SETTINGS_ITEM["storageSchemaName"]]} をTRUNCATEしました')
|
||||||
|
|
||||||
|
except InvalidConfigException as e:
|
||||||
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-01 - エラー内容:{e}')
|
||||||
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
# SQL文生成
|
# SQL文生成
|
||||||
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
|
sql = f'INSERT INTO {settings_list[SETTINGS_ITEM["storageSchemaName"]]} ('
|
||||||
@ -272,3 +301,32 @@ def connection_close(conn, bucket_name, target_data_source, target_file_name, lo
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
print(f'{datetime.now():%Y-%m-%d %H:%M:%S} {log_info} {LOG_LEVEL["e"]} E-MAIN-99 - エラー内容:{e}')
|
||||||
error(bucket_name, target_data_source, target_file_name, log_info)
|
error(bucket_name, target_data_source, target_file_name, log_info)
|
||||||
|
|
||||||
|
|
||||||
|
def truncate_judge(settings_list):
|
||||||
|
"""TRUNCATE処理対応判定
|
||||||
|
Args:
|
||||||
|
settings_list (list): 個別設定ファイル
|
||||||
|
Raises:
|
||||||
|
InvalidConfigException: 個別設定ファイルのインポート方法の設定ミス
|
||||||
|
Returns:
|
||||||
|
Bool: Truncate対象の場合True。Truncate対象でない場合False
|
||||||
|
"""
|
||||||
|
|
||||||
|
# upsert判定
|
||||||
|
if not settings_list[SETTINGS_ITEM["importManner"]]:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# インポート方法設定チェック
|
||||||
|
if not settings_list[SETTINGS_ITEM["importManner"]].startswith(TRUNCATE_SRC_TABLE_SYMBOL):
|
||||||
|
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||||
|
import_manner_splitted_list = settings_list[SETTINGS_ITEM["importManner"]].split(':')
|
||||||
|
if len(import_manner_splitted_list) != 2:
|
||||||
|
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||||
|
if import_manner_splitted_list[1] != settings_list[SETTINGS_ITEM["storageSchemaName"]]:
|
||||||
|
raise InvalidConfigException(INVALID_CONFIG_EXCEPTION_MESSAGE)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidConfigException(Exception):
|
||||||
|
pass
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user