Merge pull request #369 release-8-encise-automation into master

This commit is contained in:
朝倉 明日香 2024-04-01 11:22:04 +09:00
commit 21c7af0c13
47 changed files with 1541 additions and 0 deletions

View File

@ -0,0 +1,12 @@
tests/*
.coverage
.env
.env.example
.report/*
.vscode/*
.pytest_cache/*
*/__pychache__/*
Dockerfile
pytest.ini
README.md
*.sql

View File

@ -0,0 +1,9 @@
DB_HOST=************
DB_PORT=3306
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=*****
DUMP_BACKUP_BUCKET=************
LOG_LEVEL=INFO

11
ecs/export-dbdump/.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
.vscode/settings.json
.env
my.cnf
# python
__pycache__
# python test
.pytest_cache
.coverage
.report/

16
ecs/export-dbdump/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// IntelliSense 使
//
// : https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(DEBUG) export dbdump",
"type": "python",
"request": "launch",
"program": "entrypoint.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@ -0,0 +1,31 @@
{
"[python]": {
"editor.defaultFormatter": null,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},
//
"python.defaultInterpreterPath": "<pythonインタプリターのパス>",
"python.linting.lintOnSave": true,
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.linting.flake8Args": [
"--max-line-length=200",
"--ignore=F541"
],
"python.formatting.provider": "autopep8",
"python.formatting.autopep8Path": "autopep8",
"python.formatting.autopep8Args": [
"--max-line-length", "200",
"--ignore=F541"
],
"python.testing.pytestArgs": [
"tests/batch/ultmarc"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@ -0,0 +1,40 @@
FROM python:3.9-bullseye
ENV TZ="Asia/Tokyo"
WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
# mysql-apt-config をdpkgでインストールする際に標準出力に渡す文字列ファイルをコピー
COPY mysql_dpkg_selection.txt ./
# 必要なパッケージインストール
RUN apt update && apt install -y less vim curl wget gzip unzip sudo lsb-release
# mysqlをインストール
RUN \
wget https://dev.mysql.com/get/mysql-apt-config_0.8.29-1_all.deb && \
dpkg -i mysql-apt-config_0.8.29-1_all.deb < mysql_dpkg_selection.txt && \
apt update && \
apt install -y mysql-client
# aws cli v2 のインストール
RUN \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
sudo ./aws/install
# python関連のライブラリインストール
RUN \
pip install --upgrade pip wheel setuptools && \
pip install pipenv --no-cache-dir && \
pipenv install --system --deploy && \
pip uninstall -y pipenv virtualenv-clone virtualenv
# パッケージのセキュリティアップデートのみを適用するコマンドを実行
RUN \
apt install -y unattended-upgrades && \
unattended-upgrades
COPY src ./src
COPY entrypoint.py entrypoint.py
CMD ["python", "entrypoint.py"]

16
ecs/export-dbdump/Pipfile Normal file
View File

@ -0,0 +1,16 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
autopep8 = "*"
flake8 = "*"
[requires]
python_version = "3.9"
[pipenv]
allow_prereleases = true

71
ecs/export-dbdump/Pipfile.lock generated Normal file
View File

@ -0,0 +1,71 @@
{
"_meta": {
"hash": {
"sha256": "cc5f54bfb2073051a26f113ceac64e12fdd0bf8faa36f1a42210cc9c921c134b"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.9"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {},
"develop": {
"autopep8": {
"hashes": [
"sha256:067959ca4a07b24dbd5345efa8325f5f58da4298dab0dde0443d5ed765de80cb",
"sha256:2913064abd97b3419d1cc83ea71f042cb821f87e45b9c88cad5ad3c4ea87fe0c"
],
"index": "pypi",
"markers": "python_version >= '3.6'",
"version": "==2.0.4"
},
"flake8": {
"hashes": [
"sha256:33f96621059e65eec474169085dc92bf26e7b2d47366b70be2f67ab80dc25132",
"sha256:a6dfbb75e03252917f2473ea9653f7cd799c3064e54d4c8140044c5c065f53c3"
],
"index": "pypi",
"markers": "python_full_version >= '3.8.1'",
"version": "==7.0.0"
},
"mccabe": {
"hashes": [
"sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325",
"sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"
],
"markers": "python_version >= '3.6'",
"version": "==0.7.0"
},
"pycodestyle": {
"hashes": [
"sha256:41ba0e7afc9752dfb53ced5489e89f8186be00e599e712660695b7a75ff2663f",
"sha256:44fe31000b2d866f2e41841b18528a505fbd7fef9017b04eff4e2648a0fadc67"
],
"markers": "python_version >= '3.8'",
"version": "==2.11.1"
},
"pyflakes": {
"hashes": [
"sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f",
"sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"
],
"markers": "python_version >= '3.8'",
"version": "==3.2.0"
},
"tomli": {
"hashes": [
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc",
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"
],
"markers": "python_version < '3.11'",
"version": "==2.0.1"
}
}
}

View File

@ -0,0 +1,48 @@
# 【共通】DBダンプ取得 
## 概要
当処理は特定の機能で利用するものではなく、共通処理として要件に応じて実行することを想定している。
## 環境情報
- Python 3.9
- MySQL 8.23
- VSCode
## 環境構築
- Python の構築
- Merck_NewDWH 開発 2021 の Wiki、[Python 環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)を参照
- 「Pipenv の導入」までを行っておくこと
- 構築完了後、プロジェクト配下で以下のコマンドを実行し、Python の仮想環境を作成する
- `pipenv install --dev --python <pyenvでインストールしたpythonバージョン>`
- この手順で出力される仮想環境のパスは、後述する VSCode の設定手順で使用するため、控えておく
- MySQL の環境構築
- Windows の場合、以下のリンクからダウンロードする
- <https://dev.mysql.com/downloads/installer/>
- Docker を利用する場合、「newsdwh-tools」リポジトリの MySQL 設定を使用すると便利
- 「crm-table-to-ddl」フォルダ内で以下のコマンドを実行すると
- `docker-compose up -d`
- Docker の構築手順は、[Docker のセットアップ手順](https://nds-tyo.backlog.com/alias/wiki/1754332)を参照のこと
- データを投入する
- 立ち上げたデータベースに「src05」スキーマを作成する
- [ローカル開発用データ](https://ndstokyo.sharepoint.com/:f:/r/sites/merck-new-dwh-team/Shared%20Documents/03.NewDWH%E6%A7%8B%E7%AF%89%E3%83%95%E3%82%A7%E3%83%BC%E3%82%BA3/02.%E9%96%8B%E7%99%BA/90.%E9%96%8B%E7%99%BA%E5%85%B1%E6%9C%89/%E3%83%AD%E3%83%BC%E3%82%AB%E3%83%AB%E9%96%8B%E7%99%BA%E7%94%A8%E3%83%87%E3%83%BC%E3%82%BF?csf=1&web=1&e=VVcRUs)をダウンロードし、mysql コマンドを使用して復元する
- `mysql -h <ホスト名> -P <ポート> -u <ユーザー名> -p src05 < src05_dump.sql`
- 環境変数の設定
- 「.env.example」ファイルをコピーし、「.env」ファイルを作成する
- 環境変数を設定する。設定内容は PRJ メンバーより共有を受けてください
- VSCode の設定
- 「.vscode/recommended_settings.json」ファイルをコピーし、「settings.json」ファイルを作成する
- 「python.defaultInterpreterPath」を、Python の構築手順で作成した仮想環境のパスに変更する
## 実行
- VSCode 上で「F5」キーを押下すると、バッチ処理が起動する。
- 「entrypoint.py」が、バッチ処理のエントリーポイント。
- 実際の処理は、「src/jobctrl_dbdump.py」で行っている。
## フォルダ構成(工事中)

View File

@ -0,0 +1,10 @@
"""【共通】DBダンプ取得処理のエントリーポイント"""
from src import jobctrl_dbdump
if __name__ == '__main__':
try:
exit(jobctrl_dbdump.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はbatch_process内でログを出す。
exit(0)

View File

@ -0,0 +1,3 @@
1
1
4

View File

View File

@ -0,0 +1,106 @@
"""DBダンプ取得"""
import datetime
import os
import subprocess
import textwrap
from src.logging.get_logger import get_logger
from src.system_var import constants, environment
logger = get_logger('DBダンプ取得')
def exec():
try:
logger.info('DBダンプ取得開始')
# 事前処理(共通処理としては空振りする)
_pre_exec()
# メイン処理
# MySQL接続情報を作成する
my_cnf_file_content = f"""
[client]
user={environment.DB_USERNAME}
password={environment.DB_PASSWORD}
host={environment.DB_HOST}
"""
# my.cnfファイルのパス
my_cnf_path = os.path.join('my.cnf')
# my.cnfファイルを生成する
with open(my_cnf_path, 'w') as f:
f.write(textwrap.dedent(my_cnf_file_content)[1:-1])
# ファイルのパーミッションが強いとmysqldumpコマンドが実行できないため
# my.cnfファイルのパーミッションをread-onlyに設定
os.chmod(my_cnf_path, 0o444)
dt_now = datetime.datetime.now()
converted_value = dt_now.strftime('%Y%m%d%H%M%S%f')
dump_file_name = f'backup_rds_{environment.DB_SCHEMA}_{converted_value}.gz'
s3_file_path = f's3://{environment.DUMP_BACKUP_BUCKET}/{constants.DUMP_BACKUP_FOLDER}/{dt_now.year}/{dt_now.strftime("%m")}/{dt_now.strftime("%d")}/{dump_file_name}'
# mysqldumpコマンドを実行し、dumpを取得する
command = [
'mysqldump',
f'--defaults-file={my_cnf_path}',
'-P',
f"{environment.DB_PORT}",
'--no-tablespaces',
'--skip-column-statistics',
'--single-transaction',
'--set-gtid-purged=OFF',
environment.DB_SCHEMA
]
mysqldump_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# gzipコマンドを実行してdump結果を圧縮する
gzip_process = subprocess.Popen(['gzip', '-c'], stdin=mysqldump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# aws s3 cpコマンドを実行してアップロードする
s3_cp_process = subprocess.Popen(['aws', 's3', 'cp', '-', s3_file_path], stdin=gzip_process.stdout, stderr=subprocess.PIPE)
# mysqldumpの標準出力をgzipに接続したため、標準出力をクローズする
mysqldump_process.stdout.close()
# gzipの標準出力をaws s3 cpに接続したため、標準出力をクローズする
gzip_process.stdout.close()
# パイプラインを実行し、エラーハンドリング
_, error = mysqldump_process.communicate()
if mysqldump_process.returncode != 0:
raise Exception(f'`mysqldump`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
_, error = gzip_process.communicate()
if gzip_process.returncode != 0:
raise Exception(f'`gzip`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
_, error = s3_cp_process.communicate()
if s3_cp_process.returncode != 0:
raise Exception(f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
# 事後処理(共通処理としては空振りする)
_post_exec()
logger.info('DBダンプ取得終了正常終了')
logger.info(f'出力ファイルパス: {s3_file_path}')
return constants.BATCH_EXIT_CODE_SUCCESS
except Exception as e:
logger.exception(f'DBダンプ取得中に想定外のエラーが発生しました :{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
def _pre_exec():
"""
ダンプ復元 事前処理
共通機能としては事前処理を実装しない
事前処理が必要なダンプ復元処理を実装する場合当ロジックをコピーする
"""
pass
def _post_exec():
"""
ダンプ復元 事後処理
共通機能としては事後処理を実装しない
事後処理が必要なダンプ復元処理を実装する場合当ロジックをコピーする
"""
pass

View File

@ -0,0 +1,37 @@
import logging
from src.system_var.environment import LOG_LEVEL
# boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する
for name in ["boto3", "botocore", "s3transfer", "urllib3"]:
logging.getLogger(name).setLevel(logging.WARNING)
def get_logger(log_name: str) -> logging.Logger:
"""一意のログ出力モジュールを取得します。
Args:
log_name (str): ロガー名
Returns:
_type_: _description_
"""
logger = logging.getLogger(log_name)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
if not logger.hasHandlers():
handler = logging.StreamHandler()
logger.addHandler(handler)
formatter = logging.Formatter(
'%(name)s\t[%(levelname)s]\t%(asctime)s\t%(message)s',
'%Y-%m-%d %H:%M:%S'
)
for handler in logger.handlers:
handler.setFormatter(formatter)
return logger

View File

@ -0,0 +1,5 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0
# ダンプバックアップフォルダー
DUMP_BACKUP_FOLDER = 'dump'

View File

@ -0,0 +1,19 @@
import os
# Database
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
DUMP_BACKUP_BUCKET = os.environ['DUMP_BACKUP_BUCKET']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))

View File

@ -0,0 +1,13 @@
tests/*
.coverage
.env
.env.example
.report/*
.vscode/*
.pytest_cache/*
*/__pychache__/*
Dockerfile
pytest.ini
README.md
*.sql
*.gz

View File

@ -0,0 +1,7 @@
DB_HOST=************
DB_PORT=3306
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=src05
DUMP_FILE_S3_PATH=*******************
LOG_LEVEL=INFO

16
ecs/restore-dbdump/.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
.vscode/settings.json
.env
# python
__pycache__
# python test
.pytest_cache
.coverage
.report/
# mysql config file
my.cnf
# compress file
*.gz

17
ecs/restore-dbdump/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,17 @@
{
// IntelliSense 使
//
// : https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(DEBUG)restore dbdump",
"type": "python",
"request": "launch",
"program": "entrypoint.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@ -0,0 +1,31 @@
{
"[python]": {
"editor.defaultFormatter": null,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},
//
"python.defaultInterpreterPath": "<pythonインタプリターのパス>",
"python.linting.lintOnSave": true,
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.linting.flake8Args": [
"--max-line-length=200",
"--ignore=F541"
],
"python.formatting.provider": "autopep8",
"python.formatting.autopep8Path": "autopep8",
"python.formatting.autopep8Args": [
"--max-line-length", "200",
"--ignore=F541"
],
"python.testing.pytestArgs": [
"tests/batch/ultmarc"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@ -0,0 +1,40 @@
FROM python:3.9-bullseye
ENV TZ="Asia/Tokyo"
WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
# mysql-apt-config をdpkgでインストールする際に標準出力に渡す文字列ファイルをコピー
COPY mysql_dpkg_selection.txt ./
# 必要なパッケージインストール
RUN apt update && apt install -y less vim curl wget gzip unzip sudo lsb-release
# mysqlをインストール
RUN \
wget https://dev.mysql.com/get/mysql-apt-config_0.8.29-1_all.deb && \
dpkg -i mysql-apt-config_0.8.29-1_all.deb < mysql_dpkg_selection.txt && \
apt update && \
apt install -y mysql-client
# aws cli v2 のインストール
RUN \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
sudo ./aws/install
# python関連のライブラリインストール
RUN \
pip install --upgrade pip wheel setuptools && \
pip install pipenv --no-cache-dir && \
pipenv install --system --deploy && \
pip uninstall -y pipenv virtualenv-clone virtualenv
# パッケージのセキュリティアップデートのみを適用するコマンドを実行
RUN \
apt install -y unattended-upgrades && \
unattended-upgrades
COPY src ./src
COPY entrypoint.py entrypoint.py
CMD ["python", "entrypoint.py"]

View File

@ -0,0 +1,16 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
autopep8 = "*"
flake8 = "*"
[requires]
python_version = "3.9"
[pipenv]
allow_prereleases = true

71
ecs/restore-dbdump/Pipfile.lock generated Normal file
View File

@ -0,0 +1,71 @@
{
"_meta": {
"hash": {
"sha256": "cc5f54bfb2073051a26f113ceac64e12fdd0bf8faa36f1a42210cc9c921c134b"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.9"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {},
"develop": {
"autopep8": {
"hashes": [
"sha256:067959ca4a07b24dbd5345efa8325f5f58da4298dab0dde0443d5ed765de80cb",
"sha256:2913064abd97b3419d1cc83ea71f042cb821f87e45b9c88cad5ad3c4ea87fe0c"
],
"index": "pypi",
"markers": "python_version >= '3.6'",
"version": "==2.0.4"
},
"flake8": {
"hashes": [
"sha256:33f96621059e65eec474169085dc92bf26e7b2d47366b70be2f67ab80dc25132",
"sha256:a6dfbb75e03252917f2473ea9653f7cd799c3064e54d4c8140044c5c065f53c3"
],
"index": "pypi",
"markers": "python_full_version >= '3.8.1'",
"version": "==7.0.0"
},
"mccabe": {
"hashes": [
"sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325",
"sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"
],
"markers": "python_version >= '3.6'",
"version": "==0.7.0"
},
"pycodestyle": {
"hashes": [
"sha256:41ba0e7afc9752dfb53ced5489e89f8186be00e599e712660695b7a75ff2663f",
"sha256:44fe31000b2d866f2e41841b18528a505fbd7fef9017b04eff4e2648a0fadc67"
],
"markers": "python_version >= '3.8'",
"version": "==2.11.1"
},
"pyflakes": {
"hashes": [
"sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f",
"sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"
],
"markers": "python_version >= '3.8'",
"version": "==3.2.0"
},
"tomli": {
"hashes": [
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc",
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"
],
"markers": "python_version < '3.11'",
"version": "==2.0.1"
}
}
}

View File

@ -0,0 +1,67 @@
# ダンプ復元スクリプト
## 概要
当処理は特定の機能で利用するものではなく、共通処理として要件に応じて実行することを想定している。
## 環境情報
- Python 3.9
- MySQL 8.23
- VSCode
## 環境構築
- Python の構築
- Merck_NewDWH 開発 2021 の Wiki、[Python 環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)を参照
- 「Pipenv の導入」までを行っておくこと
- 構築完了後、プロジェクト配下で以下のコマンドを実行し、Python の仮想環境を作成する
- `pipenv install --dev --python <pyenvでインストールしたpythonバージョン>`
- この手順で出力される仮想環境のパスは、後述する VSCode の設定手順で使用するため、控えておく
- MySQL の環境構築
- Windows の場合、以下のリンクからダウンロードする
- <https://dev.mysql.com/downloads/installer/>
- Docker を利用する場合、「newsdwh-tools」リポジトリの MySQL 設定を使用すると便利
- 「crm-table-to-ddl」フォルダ内で以下のコマンドを実行すると
- `docker-compose up -d`
- Docker の構築手順は、[Docker のセットアップ手順](https://nds-tyo.backlog.com/alias/wiki/1754332)を参照のこと
- データを投入する
- 立ち上げたデータベースに「src05」スキーマを作成する
- [ローカル開発用データ](https://ndstokyo.sharepoint.com/:f:/r/sites/merck-new-dwh-team/Shared%20Documents/03.NewDWH%E6%A7%8B%E7%AF%89%E3%83%95%E3%82%A7%E3%83%BC%E3%82%BA3/02.%E9%96%8B%E7%99%BA/90.%E9%96%8B%E7%99%BA%E5%85%B1%E6%9C%89/%E3%83%AD%E3%83%BC%E3%82%AB%E3%83%AB%E9%96%8B%E7%99%BA%E7%94%A8%E3%83%87%E3%83%BC%E3%82%BF?csf=1&web=1&e=VVcRUs)をダウンロードし、mysql コマンドを使用して復元する
- `mysql -h <ホスト名> -P <ポート> -u <ユーザー名> -p src05 < src05_dump.sql`
- 環境変数の設定
- 「.env.example」ファイルをコピーし、「.env」ファイルを作成する
- 環境変数を設定する。設定内容は PRJ メンバーより共有を受けてください
- VSCode の設定
- 「.vscode/recommended_settings.json」ファイルをコピーし、「settings.json」ファイルを作成する
- 「python.defaultInterpreterPath」を、Python の構築手順で作成した仮想環境のパスに変更する
## 実行
- VSCode 上で「F5」キーを押下すると、バッチ処理が起動する。
- 「entrypoint.py」が、バッチ処理のエントリーポイント。
- 実際の処理は、「src/restore_backup.py」で行っている。
## フォルダ構成
```txt
.
├── .env.example -- ローカル実行用の環境変数のサンプル値。
├── .dockerignore -- docker build時のコンテキストに含めるファイルの抑制リスト
├── .gitignore -- Git差分管理除外リスト
├── Dockerfile -- Dockerイメージ作成用
├── Pipfile -- pythonの依存関係管理
├── Pipfile.lock -- 依存関係バージョン固定
├── README.md -- 当ファイル
├── entrypoint.py -- エントリーポイントとなるファイル
├── mysql_dpkg_selection.txt -- Dockerイメージでdpkgを使うときに外部から注入する選択値
└── src -- ソースコードフォルダ
├── logging
│ └── get_logger.py -- ロガー
├── restore_backup.py -- dump復元処理本体
└── system_var
├── constants.py -- 定数ファイル
└── environment.py -- 環境変数ファイル
```

View File

@ -0,0 +1,10 @@
"""実消化&アルトマーク DBダンプ復元のエントリーポイント"""
from src import restore_backup
if __name__ == '__main__':
try:
exit(restore_backup.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はbatch_process内でログを出す。
exit(0)

View File

@ -0,0 +1,3 @@
1
1
4

View File

View File

@ -0,0 +1,37 @@
import logging
from src.system_var.environment import LOG_LEVEL
# boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する
for name in ["boto3", "botocore", "s3transfer", "urllib3"]:
logging.getLogger(name).setLevel(logging.WARNING)
def get_logger(log_name: str) -> logging.Logger:
"""一意のログ出力モジュールを取得します。
Args:
log_name (str): ロガー名
Returns:
_type_: _description_
"""
logger = logging.getLogger(log_name)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
if not logger.hasHandlers():
handler = logging.StreamHandler()
logger.addHandler(handler)
formatter = logging.Formatter(
'%(name)s\t[%(levelname)s]\t%(asctime)s\t%(message)s',
'%Y-%m-%d %H:%M:%S'
)
for handler in logger.handlers:
handler.setFormatter(formatter)
return logger

View File

@ -0,0 +1,108 @@
"""ダンプ復元スクリプト"""
import os
import subprocess
import textwrap
from src.logging.get_logger import get_logger
from src.system_var import constants, environment
logger = get_logger('ダンプ復元スクリプト')
def exec():
try:
logger.info('ダンプ復元スクリプト:開始')
# 事前処理(共通処理としては空振りする)
_pre_exec()
# メイン処理
# MySQL接続情報を作成する
my_cnf_file_content = f"""
[client]
user={environment.DB_USERNAME}
password={environment.DB_PASSWORD}
host={environment.DB_HOST}
"""
# my.cnfファイルのパス
my_cnf_path = os.path.join('my.cnf')
# my.cnfファイルを生成する
with open(my_cnf_path, 'w') as f:
f.write(textwrap.dedent(my_cnf_file_content)[1:-1])
os.chmod(my_cnf_path, 0o444)
# DBへの接続エラーを早期に検出するため、事前にMySQLサーバーに接続
mysql_pre_process = subprocess.Popen(
['mysql', f'--defaults-file={my_cnf_path}', '-P', f"{environment.DB_PORT}",
environment.DB_SCHEMA, '-N', '-e', 'SELECT 1;'],
stderr=subprocess.PIPE
)
_, error = mysql_pre_process.communicate()
if mysql_pre_process.returncode != 0:
logger.error(
f'MySQLサーバーへの接続に失敗しました。{"" if error is None else error.decode("utf-8")}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 復元対象のダンプファイルを特定
s3_file_path = environment.DUMP_FILE_S3_PATH
# aws s3 cpコマンドを実行してdumpファイルをローカルにダウンロードする
s3_cp_process = subprocess.Popen(
['aws', 's3', 'cp', s3_file_path, './dump.gz'], stderr=subprocess.PIPE)
_, error = s3_cp_process.communicate()
if s3_cp_process.returncode != 0:
logger.error(
f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
return constants.BATCH_EXIT_CODE_SUCCESS
# S3コマンドの標準エラーはクローズしておく
s3_cp_process.stderr.close()
# gzipコマンドを実行してdumpファイルを解凍する
gzip_process = subprocess.Popen(
['gunzip', '-c', './dump.gz'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# mysqlコマンドを実行し、dumpを復元する
mysql_process = subprocess.Popen(
['mysql', f'--defaults-file={my_cnf_path}', '-P',
f"{environment.DB_PORT}", environment.DB_SCHEMA],
stdin=gzip_process.stdout, stderr=subprocess.PIPE
)
# gzipの標準出力をmysqlに接続したため、標準出力をクローズする
gzip_process.stdout.close()
_, error = mysql_process.communicate()
if mysql_process.returncode != 0:
logger.error(
f'コマンド実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 事後処理(共通処理としては空振りする)
_post_exec()
logger.info('[NOTICE]ダンプ復元スクリプト:終了(正常終了)')
return constants.BATCH_EXIT_CODE_SUCCESS
except Exception as e:
logger.exception(f'ダンプ復元スクリプト中に想定外のエラーが発生しました :{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
def _pre_exec():
"""
ダンプ復元 事前処理
共通機能としては事前処理を実装しない
事前処理が必要なダンプ復元処理を実装する場合当ロジックをコピーする
"""
pass
def _post_exec():
"""
ダンプ復元 事後処理
共通機能としては事後処理を実装しない
事後処理が必要なダンプ復元処理を実装する場合当ロジックをコピーする
"""
pass

View File

@ -0,0 +1,2 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0

View File

@ -0,0 +1,12 @@
import os
# Database
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
DUMP_FILE_S3_PATH = os.environ['DUMP_FILE_S3_PATH']
LOG_LEVEL = os.environ['LOG_LEVEL']

View File

@ -0,0 +1,375 @@
import csv
import datetime
import io
import json
import logging
import os
import re
from zoneinfo import ZoneInfo
import boto3
from dateutil.relativedelta import relativedelta
# 環境変数
CHECK_BUCKET_NAME = os.environ["CHECK_BUCKET_NAME"]
CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH = os.environ["CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH"]
CONFIG_BUCKET_NAME = os.environ["CONFIG_BUCKET_NAME"]
MAIL_TEMPLATE_FOLDER_PATH = os.environ["MAIL_TEMPLATE_FOLDER_PATH"]
MBJ_NOTICE_TOPIC = os.environ["MBJ_NOTICE_TOPIC"]
PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME = os.environ["PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME"]
PROCESSED_MESSAGE_EXPIRES_PERIOD = int(
os.environ["PROCESSED_MESSAGE_EXPIRES_PERIOD"])
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
ROW_COMMENT_SYMBOL = '#'
INDEX_REGEX = 0
INDEX_DATA_NAME = 1
INDEX_ROW_COMMENT_SYMBOL = 0
INDEX_SPLIT_NUM = 1
INDEX_LAST = -1
# メール本文に出力する不足ファイル名一覧のインデント
MAIL_INDENT = '  '
# AWS操作クライアント
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
dynamodb_client = boto3.client('dynamodb')
# logger設定
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
logger = logging.getLogger()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def is_duplicate_message(message_id: str) -> bool:
"""DynamoDBテーブルに処理済みのSQSメッセージIdが存在するかどうかを調査する
Args:
message_id (str): SQSメッセージId
Returns:
bool: 存在する場合はTrue
"""
return dynamodb_client.query(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Select='COUNT',
KeyConditionExpression='message_id = :message_id',
ExpressionAttributeValues={
':message_id': {'S': message_id}
}
)["Count"] != 0
def put_success_messages_to_dynamo_db(batch_item_success: list[str]) -> bool:
"""処理済みのSQSメッセージIdをDynamoDBにPushする
Args:
batch_item_success (list[str]): SQSメッセージIdのリスト
Returns:
bool: 登録成功の場合True
"""
# レコードの有効期限を算出
now = datetime.datetime.now(ZoneInfo(TZ))
record_expiration_datetime = now + \
datetime.timedelta(minutes=PROCESSED_MESSAGE_EXPIRES_PERIOD)
record_expiration_time = record_expiration_datetime.timestamp()
for message_id in batch_item_success:
dynamodb_client.put_item(
TableName=PROCESSED_MESSAGE_DYNAMODB_TABLE_NAME,
Item={
'message_id': {'S': message_id},
'record_expiration_time': {'N': f'{record_expiration_time}'}
}
)
return True
def substitute_mail_template(mail_template: str, receive_timing: str, mail_msg: str) -> str:
"""メールテンプレートのプレースホルダーを置き換える
Args:
mail_template (str): 置き換え前のメールテンプレート
receive_timing (str): メールテンプレートのプレースホルダーを置き換える文言(受信タイミング)
mail_msg (str): メールテンプレートのプレースホルダーを置き換える文言(ファイル一覧)
Returns:
str: 置き換え後のメール本文
"""
substitute_dict = {
"receive_timing": receive_timing,
"notice_file_names": mail_msg
}
mail_str = mail_template.format_map(substitute_dict)
return mail_str
def make_failure_item_on_error(message_id: str) -> dict[str, str]:
"""Report batch item failuresによる処理に失敗したメッセージの判別のためのレスポンスを作成する
@see <https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting>
Args:
message_id (str): SQSメッセージId
Returns:
dict[str, str]: Report batch item failuresで失敗したSQSメッセージを判別するための辞書オブジェクト
"""
return {"itemIdentifier": message_id}
def encise_data_unreceive_check(records: list, execute_month: str) -> tuple[list[dict[str, str]], list[str]]:
"""Enciseデータ未受領チェック
Args:
records (list): SQS Eventのレコードリスト
execute_month (str): 処理起動年月
Returns:
tuple[list[dict[str, str]], list[str]]: 失敗メッセージIdのリスト, 成功メッセージIdのリスト
"""
batch_item_failures = []
batch_item_success = []
for record in records:
# メール挿入用文言を格納するためのメモリを保持する
mail_message = ''
try:
try:
# 1.SQSメッセージIDを取得する
message_id = record["messageId"]
# 2.DynamoDBテーブルからレコードを取得し、処理済みメッセージかどうかを判別する
if is_duplicate_message(message_id):
logger.info(
f'受信したメッセージは既に処理済みのため、処理をスキップします。メッセージID: {message_id}')
continue
except Exception as e:
logger.exception(f"E-02-01 メッセージ重複チェック処理に失敗しました エラー内容:{e}")
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
# SQSパラメータをJSONシリアライズし、Pythonの辞書オブジェクト(イベントパラメータ)を取得する。
event_parameter = json.loads(record['body'])
# ③ 設定ファイル[受領チェック対象ファイルリスト]を読み込む
try:
logger.info(
'I-03-01 ' +
'受領チェック対象ファイルリスト読込 読込元:' +
f'{CONFIG_BUCKET_NAME}/{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
check_target_file_list_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{CHECK_TARGET_FILE_NAME_LIST_FOLDER_PATH}/{event_parameter["check_target_file_list"]}'
)
logger.info('I-03-02 受領チェック対象ファイルリストを読み込みました')
except Exception as e:
logger.exception(
f"E-03-01 受領チェック対象ファイルリストの読み込みに失敗しました エラー内容:{e}")
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
# ④ 受領チェック処理を行う
receive_timing = event_parameter['receive_timing']
logger.info(f'I-04-01 Enciseデータ受領チェック ({receive_timing}) 処理開始')
object_prefix = f'{event_parameter["check_folder_prefix"]}/{execute_month}/'
# 1.Enciseデータバックアップ保管バケットの処理稼働月に該当するサブフォルダにあるファイル一覧を取得する
logger.info(
f'I-04-02 オブジェクトリストの取得 取得先:{CHECK_BUCKET_NAME}/{object_prefix}'
)
receive_file_list_response = s3_client.list_objects_v2(
Bucket=CHECK_BUCKET_NAME, Prefix=object_prefix)
receive_file_list = []
for content in receive_file_list_response.get('Contents', []):
# オブジェクトのキーからファイル名を切り出してリストに追加
obj_key = content['Key'].rsplit('/', INDEX_SPLIT_NUM)
receive_file_list.append(obj_key[INDEX_LAST])
# 2.I/Fファイルチェック処理
logger.info(f'I-04-03 Enciseデータ({receive_timing}) ファイルチェック処理開始')
check_target_file_name_body = io.TextIOWrapper(io.BytesIO(
check_target_file_list_response["Body"].read()), encoding='utf-8')
match_count = 0
row_count = 0
for tsv_row in csv.reader(check_target_file_name_body, delimiter='\t'):
# 「④1.」で取得したリストが「③」で読み込んだファイル内に存在するか確認する
is_file_not_exists = True
for file_name in receive_file_list:
match_result = re.fullmatch(
tsv_row[INDEX_REGEX], file_name)
# 「③」で読み込んだファイルに記載されている全てが「④1.」で取得したリストに存在した場合
if match_result is not None:
is_file_not_exists = False
# 存在したファイルの年月部分を抜き出し、チェック対象年月(処理稼働月-1)である場合
match_group_yyyymm = match_result.group(1)
today = datetime.datetime.now(ZoneInfo(TZ))
minus_1_month = today + relativedelta(months=-1)
check_target_yyyymm = minus_1_month.strftime('%Y%m')
if match_group_yyyymm == check_target_yyyymm:
logger.info(
f'I-04-04 I/Fファイルの受領を確認しました ファイル名{file_name}')
match_count += 1
else:
logger.info(
f'I-04-07 I/Fファイルの年月がチェック対象年月と一致しません ファイル名{file_name}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}(受領年月が不正:{file_name})\n'
break
if is_file_not_exists:
logger.info(
f'E-04-06 月次I/Fファイルに不足があります ファイル名{tsv_row[INDEX_DATA_NAME]}')
mail_message += f'{MAIL_INDENT}{tsv_row[INDEX_DATA_NAME]}\n'
row_count += 1
if row_count == match_count:
logger.info('I-04-05 I/Fファイルは全て受領していることを確認しました')
# ⑤ 「①」でメモリ保持しているメール挿入用文言に出力内容が存在するか確認する
logger.info('I-05-01 メール送信処理開始')
if len(mail_message) == 0:
logger.info(
f'I-05-09 {execute_month} {receive_timing}データI/Fファイルに不足が無いため、メール送信処理をスキップします')
batch_item_success.append(message_id)
continue
# 1.存在した場合
logger.info(
f'I-05-02 {execute_month} {receive_timing}データI/Fファイルに不足があるため、メール送信処理を開始します')
try:
logger.info(
'I-05-03 ' +
f'通知メール(タイトル)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_title_template"]}'
)
mail_title_template = (
mail_title_response['Body'].read().decode('utf-8'))
# 改行を取り除く
mail_title = substitute_mail_template(
mail_title_template, receive_timing, mail_message)
mail_title_without_line_break = mail_title.splitlines()[0]
logger.info('I-05-04 通知メール(タイトル)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(
f'E-05-01 通知メール(タイトル)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
try:
logger.info(
'I-05-05 ' +
f'通知メール(本文)テンプレートファイル読込 読込元:{CONFIG_BUCKET_NAME}/{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template_response = s3_client.get_object(
Bucket=CONFIG_BUCKET_NAME,
Key=f'{MAIL_TEMPLATE_FOLDER_PATH}/{event_parameter["notice_mail_body_template"]}'
)
mail_body_template = (
mail_body_template_response['Body'].read().decode('utf-8'))
# メール本文内のプレースホルダーを置き換える
mail_body = substitute_mail_template(
mail_body_template, receive_timing, mail_message)
logger.info('I-05-06 通知メール(本文)テンプレートファイルを読み込みました')
except Exception as e:
logger.exception(
f'E-05-02 通知メール(本文)テンプレートファイルの読み込みに失敗しました エラー内容:{e}')
batch_item_failures.append(
make_failure_item_on_error(message_id))
continue
logger.info(f'I-05-07 メール送信指示をします 送信先トピック:{MBJ_NOTICE_TOPIC}')
params = {
'TopicArn': MBJ_NOTICE_TOPIC,
'Subject': mail_title_without_line_break,
'Message': mail_body
}
sns_client.publish(**params)
logger.info('I-05-08 メール送信指示をしました')
batch_item_success.append(message_id)
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
batch_item_failures.append(make_failure_item_on_error(message_id))
continue
return batch_item_failures, batch_item_success
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Enciseデータ受領チェック処理')
# 処理稼働年月を取得しメモリに保持する
execute_date_today = datetime.date.today()
execute_month = execute_date_today.strftime('%Y/%m')
logger.info(f'I-01-02 処理稼働月:{execute_month}')
# 処理失敗メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_failures = []
# 処理成功メッセージIDリストをメモリに保持する初期値空のリスト
batch_item_success = []
# ② SQSメッセージ重複排除処理を行う
logger.info('I-02-01 メッセージ処理開始')
batch_item_failures, batch_item_success = encise_data_unreceive_check(
event["Records"], execute_month)
logger.info('I-06-01 すべてのメッセージの処理完了')
# ⑦ メッセージを処理済として、以下のDynamoDBテーブルに記録する
put_success_messages_to_dynamo_db(batch_item_success)
logger.info('I-07-01 処理済みメッセージIDの記録完了')
logger.info('I-07-02 処理終了 Enciseデータ受領チェック処理')
except Exception as e:
logger.exception(f'E-99 想定外のエラーが発生しました エラー内容:{e}')
raise e
return batch_item_failures
# 動作確認用のコード
# if __name__ == '__main__':
# lambda_handler({
# "Records": [
# {
# "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
# "receiptHandle": "MessageReceiptHandle",
# "body": "{\"receive_timing\": \"速報\",\"check_folder_prefix\": \"encise\",\"check_target_file_list\": \"en_quick_file_list.config\",\"notice_mail_title_template\": \"en_unreceive_notice_mail_title.config\",\"notice_mail_body_template\": \"en_unreceive_notice_mail_body.config\"\r\n}",
# "attributes": {
# "ApproximateReceiveCount": "1",
# "SentTimestamp": "1523232000000",
# "SenderId": "123456789012",
# "ApproximateFirstReceiveTimestamp": "1523232000001"
# },
# "messageAttributes": {},
# "md5OfBody": "{{{md5_of_body}}}",
# "eventSource": "aws:sqs",
# "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
# "awsRegion": "us-east-1"
# }
# ]
# }, {})

View File

@ -0,0 +1,105 @@
import datetime
import logging
import os
from zoneinfo import ZoneInfo
import boto3
# 環境変数
DATA_IMPORT_BUCKET = os.environ["DATA_IMPORT_BUCKET"]
ENCISE_BACKUP_BUCKET = os.environ["ENCISE_BACKUP_BUCKET"]
ENCISE_TARGET_FOLDER = os.environ["ENCISE_TARGET_FOLDER"]
LOG_LEVEL = os.environ["LOG_LEVEL"]
TZ = os.environ["TZ"]
# 定数
EXCLUSIVE_CONTROL_FILE_EXT = '.doing'
# S3クライアント
s3_client = boto3.client('s3')
# logger設定
logger = logging.getLogger()
def log_datetime_convert_tz(*arg):
"""ログに出力するタイムスタンプのロケールを変更するJST指定"""
return datetime.datetime.now(ZoneInfo(TZ)).timetuple()
formatter = logging.Formatter(
'[%(levelname)s]\t%(asctime)s\t%(message)s\n',
'%Y-%m-%d %H:%M:%S'
)
formatter.converter = log_datetime_convert_tz
for handler in logger.handlers:
handler.setFormatter(formatter)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
def lambda_handler(event, context):
try:
# ① 処理開始ログを出力する
logger.info('I-01-01 処理開始 Encise受信データ転送処理')
# ② 処理開始時に受け取ったイベント情報をログに出力する
# バケット名・フォルダ名・受信データファイル名をメモリに保持
s3_event = event["Records"][0]["s3"]
event_bucket_name = s3_event["bucket"]["name"]
event_object_key = s3_event["object"]["key"]
event_file_name = os.path.basename(event_object_key)
event_folder_name = os.path.dirname(event_object_key).split('/')[0]
logger.info(f'I-02-01 受信バケット:{event_bucket_name}')
logger.info(f'I-02-01 フォルダ名:{event_folder_name}')
logger.info(f'I-02-01 ファイル名:{event_file_name}')
# ③ S3イベントによるLambdaの重複発火防止の為、メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルが存在するかチェックする
try:
s3_client.head_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.error(f'E-01-01 {event_bucket_name}/{event_object_key}は現在処理中です。処理を終了します。')
return
except Exception:
# .doingファイルが見つからなかった場合は、処理を続行する
# メモリに保持したバケット名/フォルダ名内に、「受信データファイル名.doing」ファイルを作成する
logger.info('I-03-01 Encise受信データを転送します')
s3_client.put_object(
Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}', Body=b'')
# ⑤ 受信データファイルを、Enciseデータバックアップバケットにコピーする
copy_source = {'Bucket': event_bucket_name, 'Key': event_object_key}
execute_date_yyyymm = datetime.date.today().strftime('%Y/%m')
s3_client.copy_object(
Bucket=ENCISE_BACKUP_BUCKET,
Key=f'{event_folder_name}/{execute_date_yyyymm}/{event_file_name}',
CopySource=copy_source,
ContentType='text/csv', # ファイルのタイプを指定する必要があるため、ContentTypeを指定する。
MetadataDirective='REPLACE' # ファイルのメタデータを置き換える必要があるため、MetadataDirectiveを指定する。
)
logger.info(f'I-04-01 Encise受信データのバックアップ完了{ENCISE_BACKUP_BUCKET}/{event_folder_name}/{execute_date_yyyymm}/{event_file_name}')
# ⑥ 受信データファイルを、データ登録バケットのEnciseデータ取込フォルダに移動する
s3_client.copy_object(
Bucket=DATA_IMPORT_BUCKET,
Key=f'{ENCISE_TARGET_FOLDER}/{event_file_name}',
CopySource=copy_source,
ContentType='text/csv', # ファイルのタイプを指定する必要があるため、ContentTypeを指定する。
MetadataDirective='REPLACE' # ファイルのメタデータを置き換える必要があるため、MetadataDirectiveを指定する。
)
# コピー後、元のバケットからは削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=event_object_key)
logger.info(f'I-05-01 Encise受信データの転送完了{DATA_IMPORT_BUCKET}/{ENCISE_TARGET_FOLDER}/{event_file_name}')
# ⑦ メモリに保持したバケット名/フォルダ名内の「受信データファイル名.doing」ファイルを削除する
s3_client.delete_object(Bucket=event_bucket_name, Key=f'{event_object_key}{EXCLUSIVE_CONTROL_FILE_EXT}')
logger.info('I-01-06 処理終了 Encise受信データ転送処理')
except Exception as e:
logger.exception(f'想定外のエラーが発生しました。処理を終了します。 例外内容:{e}')
raise e
return

View File

@ -0,0 +1,9 @@
宛先各位
 Encise {receive_timing}である以下のファイルを受領できておりません。
{notice_file_names}
 QlikSenseサーバー側の送信状況のご確認をお願いいたします。
 尚、本メールはシステム自動送信のため、返信は出来ません。
 本件に関する問い合わせは、MeDaCA Enciseデータ担当者にお願いいたします。

View File

@ -0,0 +1 @@
【MeDaCa連携エラー通知】Encise {receive_timing}ファイル未受領

View File

@ -0,0 +1,9 @@
宛先各位
 Encise {receive_timing}である以下のファイルを受領できておりません。
{notice_file_names}
 Encise社へ送信状況のご確認をお願いいたします。
 尚、本メールはシステム自動送信のため、返信は出来ません。
 本件に関する問い合わせは、MeDaCA Enciseデータ担当者にお願いいたします。

View File

@ -0,0 +1 @@
【MeDaCa連携エラー通知】Encise {receive_timing}ファイル未受領

View File

@ -0,0 +1,3 @@
CLUFD_Merck02_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck02
CLUFD_Merck03_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck03
CLUFD_Merck04_496_([0-9]{6})D\.(CSV|csv) 直販分 En-Clusterデータ Merck04

View File

@ -0,0 +1,6 @@
CLUFD_Merck02_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck02
CLUFD_Merck03_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck03
CLUFD_Merck04_496_([0-9]{6})\.(CSV|csv) En-Cluster データ Merck04
CLUFM_CLUMST_496_01_([0-9]{6})\.(CSV|csv) En-Cluster クラスタマスタ
CLUFM_PROADD_496_([0-9]{6})\.(CSV|csv) En-Cluster 製品定義ファイル
CLUFM_PROLST_496_([0-9]{6})\.(CSV|csv) En-Cluster 製品マスタ

View File

@ -0,0 +1,11 @@
NATFD_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation データ
NATFM_PROADD_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation 製品定義ファイル
NATFM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 確定版 En-Nation セグメントマスタ
MST_BRAND([0-9]{6})\.(CSV|csv) 確定版 En-Nation ブランドマスタ
CITFD_Merck01_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck01
CITFD_Merck06_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck06
CITFD_Merck07_496_([0-9]{6})\.(CSV|csv) 確定版 En-City データ Merck07
CITFM_PROADD_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 製品定義ファイル
CITFM_PROLST_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 製品マスタ
CITFM_REGION_496_([0-9]{6})\.(CSV|csv) 確定版 En-City 地域マスタ
CITFM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 確定版 En-City セグメントマスタ

View File

@ -0,0 +1 @@
NATFD_496_([0-9]{6})D\.(CSV|csv) 確定版 En-Nation データ 直販分

View File

@ -0,0 +1,10 @@
NATQD_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation データ
NATQM_PROADD_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation 製品定義ファイル
NATQM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 速報版 En-Nation セグメントマスタ
CITQD_Merck01_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck01
CITQD_Merck06_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck06
CITQD_Merck07_496_([0-9]{6})\.(CSV|csv) 速報版 En-City データ Merck07
CITQM_PROADD_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 製品定義ファイル
CITQM_PROLST_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 製品マスタ
CITQM_REGION_496_([0-9]{6})\.(CSV|csv) 速報版 En-City 地域マスタ
CITQM_SEGMNT_496_([0-9]{6})\.(CSV|csv) 速報版 En-City セグメントマスタ

View File

@ -26,6 +26,8 @@ resource:
- &STG_SG_JSKULT_DBDUMP "sg-0967779af13538a8e"
# セキュリティグループ(ecs-jskult-batch-laundering)
- &STG_SG_JSKULT_BATCH_LAUNDERING "sg-00b9ea30c5c6bb77a"
# セキュリティグループ(ecs-export-dbdump)
- &STG_SG_EXPORT_DBDUMP "sg-03962e5f52b380186"
# 本番環境
product:
# サブネット(PrivateSubnet1)
@ -42,6 +44,8 @@ resource:
- &PRD_SG_JSKULT_DBDUMP "sg-050ab3bc0d9ed261a"
# セキュリティグループ(ecs-jskult-batch-laundering)
- &PRD_SG_JSKULT_BATCH_LAUNDERING "sg-0d2bc30c1a2939c32"
# セキュリティグループ(ecs-export-dbdump)
- &PRD_SG_EXPORT_DBDUMP "sg-07ce73feffb53fadc"
config:
# CRMデータ取得
r-crm-datafetch-state:
@ -177,3 +181,33 @@ config:
SG_ECS_ALL: *PRD_SG_ECS_ALL
# セキュリティグループ(ecs-jskut-batch-laundering)
SG_JSKULT_BATCH_LAUNDERING: *PRD_SG_JSKULT_BATCH_LAUNDERING
# DBダンプ取得
r-export-dbdump-state:
# ステージング環境
staging:
# AWSアカウントID
AWS_ACCOUNT_ID: *AWS_ACCOUNT_ID
# 東京リージョン
REGION_AP_NORTHEAST_1: *REGION_AP_NORTHEAST_1
# サブネット(PrivateSubnet1)
SUBNET_PRI_1A: *STG_SUBNET_PRI_1A
# サブネット(PrivateSubnet2)
SUBNET_PRI_1D: *STG_SUBNET_PRI_1D
# セキュリティグループ(ecs-all)
SG_ECS_ALL: *STG_SG_ECS_ALL
# セキュリティグループ(ecs-export-dbdump)
SG_EXPORT_DBDUMP: *STG_SG_EXPORT_DBDUMP
# 本番環境
product:
# AWSアカウントID
AWS_ACCOUNT_ID: *AWS_ACCOUNT_ID
# 東京リージョン
REGION_AP_NORTHEAST_1: *REGION_AP_NORTHEAST_1
# サブネット(PrivateSubnet1)
SUBNET_PRI_1A: *PRD_SUBNET_PRI_1A
# サブネット(PrivateSubnet2)
SUBNET_PRI_1D: *PRD_SUBNET_PRI_1D
# セキュリティグループ(ecs-all)
SG_ECS_ALL: *PRD_SG_ECS_ALL
# セキュリティグループ(ecs-export-dbdump)
SG_EXPORT_DBDUMP: *PRD_SG_EXPORT_DBDUMP

View File

@ -0,0 +1,92 @@
{
"Comment": "DBダンプ取得ステートマシン",
"StartAt": "params",
"States": {
"params": {
"Comment": "パラメータ設定",
"Type": "Pass",
"Parameters": {
"ecs": {
"LaunchType": "FARGATE",
"Cluster": "arn:aws:ecs:#{REGION_AP_NORTHEAST_1}:#{AWS_ACCOUNT_ID}:cluster/mbj-newdwh2021-#{ENV_NAME}-export-dbdump-ecs",
"TaskDefinition": "arn:aws:ecs:#{REGION_AP_NORTHEAST_1}:#{AWS_ACCOUNT_ID}:task-definition/mbj-newdwh2021-#{ENV_NAME}-task-export-dbdump",
"NetworkConfiguration": {
"AwsvpcConfiguration": {
"Subnets": [
"#{SUBNET_PRI_1A}",
"#{SUBNET_PRI_1D}"
],
"SecurityGroups": [
"#{SG_ECS_ALL}",
"#{SG_EXPORT_DBDUMP}"
],
"AssignPublicIp": "DISABLED"
}
},
"Overrides": {
"ContainerOverrides": [
{
"Name": "mbj-newdwh2021-#{ENV_NAME}-container-export-dbdump",
"Environment": [
{
"Name": "DB_SCHEMA",
"Value.$": "$.db_schema"
},
{
"Name": "DUMP_BACKUP_BUCKET",
"Value.$": "$.dump_backup_bucket"
}
]
}
]
}
}
},
"ResultPath": "$.params",
"Next": "exec-export-dbdump"
},
"exec-export-dbdump": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"LaunchType.$": "$.params.ecs.LaunchType",
"Cluster.$": "$.params.ecs.Cluster",
"TaskDefinition.$": "$.params.ecs.TaskDefinition",
"NetworkConfiguration.$": "$.params.ecs.NetworkConfiguration",
"Overrides.$": "$.params.ecs.Overrides"
},
"ResultPath": "$.result",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"BackoffRate": 2,
"IntervalSeconds": 3,
"MaxAttempts": 3
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "ErrorEnd",
"ResultPath": "$.result"
}
],
"Next": "NormalEnd",
"Comment": "データ登録処理"
},
"NormalEnd": {
"Comment": "正常終了",
"Type": "Succeed"
},
"ErrorEnd": {
"Comment": "異常終了",
"Type": "Fail",
"Error": "StatesError",
"Cause": "StepFunctions ErrorEnd"
}
}
}