dbdump-first-commit

This commit is contained in:
nik.n 2024-02-20 18:00:49 +09:00
parent 3dd7b2482d
commit 4b5197bebe
23 changed files with 935 additions and 0 deletions

View File

@ -0,0 +1,12 @@
tests/*
.coverage
.env
.env.example
.report/*
.vscode/*
.pytest_cache/*
*/__pychache__/*
Dockerfile
pytest.ini
README.md
*.sql

View File

@ -0,0 +1,14 @@
DB_HOST=************
DB_PORT=3306
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=*****
DUMP_BACKUP_BUCKET=************
DUMP_BACKUP_FOLDER=dump
LOG_LEVEL=INFO
DB_CONNECTION_MAX_RETRY_ATTEMPT=************
DB_CONNECTION_RETRY_INTERVAL_INIT=************
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=************
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=************

11
ecs/export-dbdump/.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
.vscode/settings.json
.env
my.cnf
# python
__pycache__
# python test
.pytest_cache
.coverage
.report/

16
ecs/export-dbdump/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// IntelliSense 使
//
// : https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(DEBUG)jskult batch laundering",
"type": "python",
"request": "launch",
"program": "entrypoint.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@ -0,0 +1,31 @@
{
"[python]": {
"editor.defaultFormatter": null,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},
//
"python.defaultInterpreterPath": "<pythonインタプリターのパス>",
"python.linting.lintOnSave": true,
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.linting.flake8Args": [
"--max-line-length=200",
"--ignore=F541"
],
"python.formatting.provider": "autopep8",
"python.formatting.autopep8Path": "autopep8",
"python.formatting.autopep8Args": [
"--max-line-length", "200",
"--ignore=F541"
],
"python.testing.pytestArgs": [
"tests/batch/ultmarc"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@ -0,0 +1,40 @@
FROM python:3.9-bullseye
ENV TZ="Asia/Tokyo"
WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
# mysql-apt-config をdpkgでインストールする際に標準出力に渡す文字列ファイルをコピー
COPY mysql_dpkg_selection.txt ./
# 必要なパッケージインストール
RUN apt update && apt install -y less vim curl wget gzip unzip sudo lsb-release
# mysqlをインストール
RUN \
wget https://dev.mysql.com/get/mysql-apt-config_0.8.29-1_all.deb && \
dpkg -i mysql-apt-config_0.8.29-1_all.deb < mysql_dpkg_selection.txt && \
apt update && \
apt install -y mysql-client
# aws cli v2 のインストール
RUN \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
sudo ./aws/install
# python関連のライブラリインストール
RUN \
pip install --upgrade pip wheel setuptools && \
pip install pipenv --no-cache-dir && \
pipenv install --system --deploy && \
pip uninstall -y pipenv virtualenv-clone virtualenv
# パッケージのセキュリティアップデートのみを適用するコマンドを実行
RUN \
apt install -y unattended-upgrades && \
unattended-upgrades
COPY src ./src
COPY entrypoint.py entrypoint.py
CMD ["python", "entrypoint.py"]

19
ecs/export-dbdump/Pipfile Normal file
View File

@ -0,0 +1,19 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
sqlalchemy = "*"
tenacity = "*"
pymysql = "*"
[dev-packages]
autopep8 = "*"
flake8 = "*"
[requires]
python_version = "3.9"
[pipenv]
allow_prereleases = true

213
ecs/export-dbdump/Pipfile.lock generated Normal file
View File

@ -0,0 +1,213 @@
{
"_meta": {
"hash": {
"sha256": "e2e2efd7ebd6ad719931983f277105dd94c4ebb6363f7c00e75dd8ca05523713"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.9"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"greenlet": {
"hashes": [
"sha256:01bc7ea167cf943b4c802068e178bbf70ae2e8c080467070d01bfa02f337ee67",
"sha256:0448abc479fab28b00cb472d278828b3ccca164531daab4e970a0458786055d6",
"sha256:086152f8fbc5955df88382e8a75984e2bb1c892ad2e3c80a2508954e52295257",
"sha256:098d86f528c855ead3479afe84b49242e174ed262456c342d70fc7f972bc13c4",
"sha256:149e94a2dd82d19838fe4b2259f1b6b9957d5ba1b25640d2380bea9c5df37676",
"sha256:1551a8195c0d4a68fac7a4325efac0d541b48def35feb49d803674ac32582f61",
"sha256:15d79dd26056573940fcb8c7413d84118086f2ec1a8acdfa854631084393efcc",
"sha256:1996cb9306c8595335bb157d133daf5cf9f693ef413e7673cb07e3e5871379ca",
"sha256:1a7191e42732df52cb5f39d3527217e7ab73cae2cb3694d241e18f53d84ea9a7",
"sha256:1ea188d4f49089fc6fb283845ab18a2518d279c7cd9da1065d7a84e991748728",
"sha256:1f672519db1796ca0d8753f9e78ec02355e862d0998193038c7073045899f305",
"sha256:2516a9957eed41dd8f1ec0c604f1cdc86758b587d964668b5b196a9db5bfcde6",
"sha256:2797aa5aedac23af156bbb5a6aa2cd3427ada2972c828244eb7d1b9255846379",
"sha256:2dd6e660effd852586b6a8478a1d244b8dc90ab5b1321751d2ea15deb49ed414",
"sha256:3ddc0f794e6ad661e321caa8d2f0a55ce01213c74722587256fb6566049a8b04",
"sha256:3ed7fb269f15dc662787f4119ec300ad0702fa1b19d2135a37c2c4de6fadfd4a",
"sha256:419b386f84949bf0e7c73e6032e3457b82a787c1ab4a0e43732898a761cc9dbf",
"sha256:43374442353259554ce33599da8b692d5aa96f8976d567d4badf263371fbe491",
"sha256:52f59dd9c96ad2fc0d5724107444f76eb20aaccb675bf825df6435acb7703559",
"sha256:57e8974f23e47dac22b83436bdcf23080ade568ce77df33159e019d161ce1d1e",
"sha256:5b51e85cb5ceda94e79d019ed36b35386e8c37d22f07d6a751cb659b180d5274",
"sha256:649dde7de1a5eceb258f9cb00bdf50e978c9db1b996964cd80703614c86495eb",
"sha256:64d7675ad83578e3fc149b617a444fab8efdafc9385471f868eb5ff83e446b8b",
"sha256:68834da854554926fbedd38c76e60c4a2e3198c6fbed520b106a8986445caaf9",
"sha256:6b66c9c1e7ccabad3a7d037b2bcb740122a7b17a53734b7d72a344ce39882a1b",
"sha256:70fb482fdf2c707765ab5f0b6655e9cfcf3780d8d87355a063547b41177599be",
"sha256:7170375bcc99f1a2fbd9c306f5be8764eaf3ac6b5cb968862cad4c7057756506",
"sha256:73a411ef564e0e097dbe7e866bb2dda0f027e072b04da387282b02c308807405",
"sha256:77457465d89b8263bca14759d7c1684df840b6811b2499838cc5b040a8b5b113",
"sha256:7f362975f2d179f9e26928c5b517524e89dd48530a0202570d55ad6ca5d8a56f",
"sha256:81bb9c6d52e8321f09c3d165b2a78c680506d9af285bfccbad9fb7ad5a5da3e5",
"sha256:881b7db1ebff4ba09aaaeae6aa491daeb226c8150fc20e836ad00041bcb11230",
"sha256:894393ce10ceac937e56ec00bb71c4c2f8209ad516e96033e4b3b1de270e200d",
"sha256:99bf650dc5d69546e076f413a87481ee1d2d09aaaaaca058c9251b6d8c14783f",
"sha256:9da2bd29ed9e4f15955dd1595ad7bc9320308a3b766ef7f837e23ad4b4aac31a",
"sha256:afaff6cf5200befd5cec055b07d1c0a5a06c040fe5ad148abcd11ba6ab9b114e",
"sha256:b1b5667cced97081bf57b8fa1d6bfca67814b0afd38208d52538316e9422fc61",
"sha256:b37eef18ea55f2ffd8f00ff8fe7c8d3818abd3e25fb73fae2ca3b672e333a7a6",
"sha256:b542be2440edc2d48547b5923c408cbe0fc94afb9f18741faa6ae970dbcb9b6d",
"sha256:b7dcbe92cc99f08c8dd11f930de4d99ef756c3591a5377d1d9cd7dd5e896da71",
"sha256:b7f009caad047246ed379e1c4dbcb8b020f0a390667ea74d2387be2998f58a22",
"sha256:bba5387a6975598857d86de9eac14210a49d554a77eb8261cc68b7d082f78ce2",
"sha256:c5e1536de2aad7bf62e27baf79225d0d64360d4168cf2e6becb91baf1ed074f3",
"sha256:c5ee858cfe08f34712f548c3c363e807e7186f03ad7a5039ebadb29e8c6be067",
"sha256:c9db1c18f0eaad2f804728c67d6c610778456e3e1cc4ab4bbd5eeb8e6053c6fc",
"sha256:d353cadd6083fdb056bb46ed07e4340b0869c305c8ca54ef9da3421acbdf6881",
"sha256:d46677c85c5ba00a9cb6f7a00b2bfa6f812192d2c9f7d9c4f6a55b60216712f3",
"sha256:d4d1ac74f5c0c0524e4a24335350edad7e5f03b9532da7ea4d3c54d527784f2e",
"sha256:d73a9fe764d77f87f8ec26a0c85144d6a951a6c438dfe50487df5595c6373eac",
"sha256:da70d4d51c8b306bb7a031d5cff6cc25ad253affe89b70352af5f1cb68e74b53",
"sha256:daf3cb43b7cf2ba96d614252ce1684c1bccee6b2183a01328c98d36fcd7d5cb0",
"sha256:dca1e2f3ca00b84a396bc1bce13dd21f680f035314d2379c4160c98153b2059b",
"sha256:dd4f49ae60e10adbc94b45c0b5e6a179acc1736cf7a90160b404076ee283cf83",
"sha256:e1f145462f1fa6e4a4ae3c0f782e580ce44d57c8f2c7aae1b6fa88c0b2efdb41",
"sha256:e3391d1e16e2a5a1507d83e4a8b100f4ee626e8eca43cf2cadb543de69827c4c",
"sha256:fcd2469d6a2cf298f198f0487e0a5b1a47a42ca0fa4dfd1b6862c999f018ebbf",
"sha256:fd096eb7ffef17c456cfa587523c5f92321ae02427ff955bebe9e3c63bc9f0da",
"sha256:fe754d231288e1e64323cfad462fcee8f0288654c10bdf4f603a39ed923bef33"
],
"markers": "platform_machine == 'aarch64' or (platform_machine == 'ppc64le' or (platform_machine == 'x86_64' or (platform_machine == 'amd64' or (platform_machine == 'AMD64' or (platform_machine == 'win32' or platform_machine == 'WIN32')))))",
"version": "==3.0.3"
},
"pymysql": {
"hashes": [
"sha256:4f13a7df8bf36a51e81dd9f3605fede45a4878fe02f9236349fd82a3f0612f96",
"sha256:8969ec6d763c856f7073c4c64662882675702efcb114b4bcbb955aea3a069fa7"
],
"index": "pypi",
"version": "==1.1.0"
},
"sqlalchemy": {
"hashes": [
"sha256:0d3cab3076af2e4aa5693f89622bef7fa770c6fec967143e4da7508b3dceb9b9",
"sha256:0dacf67aee53b16f365c589ce72e766efaabd2b145f9de7c917777b575e3659d",
"sha256:10331f129982a19df4284ceac6fe87353ca3ca6b4ca77ff7d697209ae0a5915e",
"sha256:14a6f68e8fc96e5e8f5647ef6cda6250c780612a573d99e4d881581432ef1669",
"sha256:1b1180cda6df7af84fe72e4530f192231b1f29a7496951db4ff38dac1687202d",
"sha256:29049e2c299b5ace92cbed0c1610a7a236f3baf4c6b66eb9547c01179f638ec5",
"sha256:342d365988ba88ada8af320d43df4e0b13a694dbd75951f537b2d5e4cb5cd002",
"sha256:420362338681eec03f53467804541a854617faed7272fe71a1bfdb07336a381e",
"sha256:4344d059265cc8b1b1be351bfb88749294b87a8b2bbe21dfbe066c4199541ebd",
"sha256:4f7a7d7fcc675d3d85fbf3b3828ecd5990b8d61bd6de3f1b260080b3beccf215",
"sha256:555651adbb503ac7f4cb35834c5e4ae0819aab2cd24857a123370764dc7d7e24",
"sha256:59a21853f5daeb50412d459cfb13cb82c089ad4c04ec208cd14dddd99fc23b39",
"sha256:5fdd402169aa00df3142149940b3bf9ce7dde075928c1886d9a1df63d4b8de62",
"sha256:605b6b059f4b57b277f75ace81cc5bc6335efcbcc4ccb9066695e515dbdb3900",
"sha256:665f0a3954635b5b777a55111ababf44b4fc12b1f3ba0a435b602b6387ffd7cf",
"sha256:6f9e2e59cbcc6ba1488404aad43de005d05ca56e069477b33ff74e91b6319735",
"sha256:736ea78cd06de6c21ecba7416499e7236a22374561493b456a1f7ffbe3f6cdb4",
"sha256:74b080c897563f81062b74e44f5a72fa44c2b373741a9ade701d5f789a10ba23",
"sha256:75432b5b14dc2fff43c50435e248b45c7cdadef73388e5610852b95280ffd0e9",
"sha256:75f99202324383d613ddd1f7455ac908dca9c2dd729ec8584c9541dd41822a2c",
"sha256:790f533fa5c8901a62b6fef5811d48980adeb2f51f1290ade8b5e7ba990ba3de",
"sha256:798f717ae7c806d67145f6ae94dc7c342d3222d3b9a311a784f371a4333212c7",
"sha256:7c88f0c7dcc5f99bdb34b4fd9b69b93c89f893f454f40219fe923a3a2fd11625",
"sha256:7d505815ac340568fd03f719446a589162d55c52f08abd77ba8964fbb7eb5b5f",
"sha256:84daa0a2055df9ca0f148a64fdde12ac635e30edbca80e87df9b3aaf419e144a",
"sha256:87d91043ea0dc65ee583026cb18e1b458d8ec5fc0a93637126b5fc0bc3ea68c4",
"sha256:87f6e732bccd7dcf1741c00f1ecf33797383128bd1c90144ac8adc02cbb98643",
"sha256:884272dcd3ad97f47702965a0e902b540541890f468d24bd1d98bcfe41c3f018",
"sha256:8b8cb63d3ea63b29074dcd29da4dc6a97ad1349151f2d2949495418fd6e48db9",
"sha256:91f7d9d1c4dd1f4f6e092874c128c11165eafcf7c963128f79e28f8445de82d5",
"sha256:a2c69a7664fb2d54b8682dd774c3b54f67f84fa123cf84dda2a5f40dcaa04e08",
"sha256:a3be4987e3ee9d9a380b66393b77a4cd6d742480c951a1c56a23c335caca4ce3",
"sha256:a86b4240e67d4753dc3092d9511886795b3c2852abe599cffe108952f7af7ac3",
"sha256:aa9373708763ef46782d10e950b49d0235bfe58facebd76917d3f5cbf5971aed",
"sha256:b64b183d610b424a160b0d4d880995e935208fc043d0302dd29fee32d1ee3f95",
"sha256:b801154027107461ee992ff4b5c09aa7cc6ec91ddfe50d02bca344918c3265c6",
"sha256:bb209a73b8307f8fe4fe46f6ad5979649be01607f11af1eb94aa9e8a3aaf77f0",
"sha256:bc8b7dabe8e67c4832891a5d322cec6d44ef02f432b4588390017f5cec186a84",
"sha256:c51db269513917394faec5e5c00d6f83829742ba62e2ac4fa5c98d58be91662f",
"sha256:c55731c116806836a5d678a70c84cb13f2cedba920212ba7dcad53260997666d",
"sha256:cf18ff7fc9941b8fc23437cc3e68ed4ebeff3599eec6ef5eebf305f3d2e9a7c2",
"sha256:d24f571990c05f6b36a396218f251f3e0dda916e0c687ef6fdca5072743208f5",
"sha256:db854730a25db7c956423bb9fb4bdd1216c839a689bf9cc15fada0a7fb2f4570",
"sha256:dc55990143cbd853a5d038c05e79284baedf3e299661389654551bd02a6a68d7",
"sha256:e607cdd99cbf9bb80391f54446b86e16eea6ad309361942bf88318bcd452363c",
"sha256:ecf6d4cda1f9f6cb0b45803a01ea7f034e2f1aed9475e883410812d9f9e3cfcf",
"sha256:f2a159111a0f58fb034c93eeba211b4141137ec4b0a6e75789ab7a3ef3c7e7e3",
"sha256:f37c0caf14b9e9b9e8f6dbc81bc56db06acb4363eba5a633167781a48ef036ed",
"sha256:f5693145220517b5f42393e07a6898acdfe820e136c98663b971906120549da5"
],
"index": "pypi",
"version": "==2.0.25"
},
"tenacity": {
"hashes": [
"sha256:5398ef0d78e63f40007c1fb4c0bff96e1911394d2fa8d194f77619c05ff6cc8a",
"sha256:ce510e327a630c9e1beaf17d42e6ffacc88185044ad85cf74c0a8887c6a0f88c"
],
"index": "pypi",
"version": "==8.2.3"
},
"typing-extensions": {
"hashes": [
"sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783",
"sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd"
],
"markers": "python_version >= '3.8'",
"version": "==4.9.0"
}
},
"develop": {
"autopep8": {
"hashes": [
"sha256:067959ca4a07b24dbd5345efa8325f5f58da4298dab0dde0443d5ed765de80cb",
"sha256:2913064abd97b3419d1cc83ea71f042cb821f87e45b9c88cad5ad3c4ea87fe0c"
],
"index": "pypi",
"version": "==2.0.4"
},
"flake8": {
"hashes": [
"sha256:33f96621059e65eec474169085dc92bf26e7b2d47366b70be2f67ab80dc25132",
"sha256:a6dfbb75e03252917f2473ea9653f7cd799c3064e54d4c8140044c5c065f53c3"
],
"index": "pypi",
"version": "==7.0.0"
},
"mccabe": {
"hashes": [
"sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325",
"sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"
],
"markers": "python_version >= '3.6'",
"version": "==0.7.0"
},
"pycodestyle": {
"hashes": [
"sha256:41ba0e7afc9752dfb53ced5489e89f8186be00e599e712660695b7a75ff2663f",
"sha256:44fe31000b2d866f2e41841b18528a505fbd7fef9017b04eff4e2648a0fadc67"
],
"markers": "python_version >= '3.8'",
"version": "==2.11.1"
},
"pyflakes": {
"hashes": [
"sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f",
"sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"
],
"markers": "python_version >= '3.8'",
"version": "==3.2.0"
},
"tomli": {
"hashes": [
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc",
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"
],
"markers": "python_version < '3.11'",
"version": "==2.0.1"
}
}
}

View File

@ -0,0 +1,48 @@
# 日次バッチ処理前DBダンプ取得 
## 概要
日次バッチ処理前DBダンプ取得処理。
## 環境情報
- Python 3.9
- MySQL 8.23
- VSCode
## 環境構築
- Python の構築
- Merck_NewDWH 開発 2021 の Wiki、[Python 環境構築](https://nds-tyo.backlog.com/alias/wiki/1874930)を参照
- 「Pipenv の導入」までを行っておくこと
- 構築完了後、プロジェクト配下で以下のコマンドを実行し、Python の仮想環境を作成する
- `pipenv install --dev --python <pyenvでインストールしたpythonバージョン>`
- この手順で出力される仮想環境のパスは、後述する VSCode の設定手順で使用するため、控えておく
- MySQL の環境構築
- Windows の場合、以下のリンクからダウンロードする
- <https://dev.mysql.com/downloads/installer/>
- Docker を利用する場合、「newsdwh-tools」リポジトリの MySQL 設定を使用すると便利
- 「crm-table-to-ddl」フォルダ内で以下のコマンドを実行すると
- `docker-compose up -d`
- Docker の構築手順は、[Docker のセットアップ手順](https://nds-tyo.backlog.com/alias/wiki/1754332)を参照のこと
- データを投入する
- 立ち上げたデータベースに「src05」スキーマを作成する
- [ローカル開発用データ](https://ndstokyo.sharepoint.com/:f:/r/sites/merck-new-dwh-team/Shared%20Documents/03.NewDWH%E6%A7%8B%E7%AF%89%E3%83%95%E3%82%A7%E3%83%BC%E3%82%BA3/02.%E9%96%8B%E7%99%BA/90.%E9%96%8B%E7%99%BA%E5%85%B1%E6%9C%89/%E3%83%AD%E3%83%BC%E3%82%AB%E3%83%AB%E9%96%8B%E7%99%BA%E7%94%A8%E3%83%87%E3%83%BC%E3%82%BF?csf=1&web=1&e=VVcRUs)をダウンロードし、mysql コマンドを使用して復元する
- `mysql -h <ホスト名> -P <ポート> -u <ユーザー名> -p src05 < src05_dump.sql`
- 環境変数の設定
- 「.env.example」ファイルをコピーし、「.env」ファイルを作成する
- 環境変数を設定する。設定内容は PRJ メンバーより共有を受けてください
- VSCode の設定
- 「.vscode/recommended_settings.json」ファイルをコピーし、「settings.json」ファイルを作成する
- 「python.defaultInterpreterPath」を、Python の構築手順で作成した仮想環境のパスに変更する
## 実行
- VSCode 上で「F5」キーを押下すると、バッチ処理が起動する。
- 「entrypoint.py」が、バッチ処理のエントリーポイント。
- 実際の処理は、「src/jobctrl_dbdump.py」で行っている。
## フォルダ構成(工事中)

View File

@ -0,0 +1,10 @@
"""実消化&アルトマーク 日次バッチ処理前DBダンプ取得のエントリーポイント"""
from src import jobctrl_dbdump
if __name__ == '__main__':
try:
exit(jobctrl_dbdump.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はbatch_process内でログを出す。
exit(0)

View File

@ -0,0 +1,3 @@
1
1
4

View File

View File

@ -0,0 +1,131 @@
"""バッチ処理の共通関数"""
import logging
import textwrap
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_batch_statuses() -> tuple[str, str]:
"""日付テーブルから、以下を取得して返す。
- バッチ処理中フラグ
- dump取得状況区分
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]バッチ処理中フラグ[1]dump取得状況区分
"""
db = Database.get_instance()
sql = 'SELECT bch_actf, dump_sts_kbn FROM src05.hdke_tbl'
try:
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
return batch_processing_flag, dump_status_kbn
def update_dump_status_kbn_in_processing() -> None:
"""dump取得状況区分を処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :in_processing,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_dump_status_kbn_error() -> None:
"""dump取得状況区分をエラーに更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :dump_unprocessed,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'dump_unprocessed': constants.DUMP_STATUS_KBN_ERROR
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_dump_status_kbn_complete() -> None:
"""dump取得状況区分を正常終了に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src05.hdke_tbl
SET
dump_sts_kbn = :dump_unprocessed,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'dump_unprocessed': constants.DUMP_STATUS_KBN_COMPLETE
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def logging_sql(logger: logging.Logger, sql: str) -> None:
"""SQL文をデバッグログで出力する
Args:
logger (logging.Logger): ロガー
sql (str): SQL文
"""
logger.debug(f'\n{"-" * 15}\n{textwrap.dedent(sql)[1:-1]}\n{"-" * 15}')

View File

View File

@ -0,0 +1,180 @@
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
create_engine, text)
from sqlalchemy.engine.url import URL
from tenacity import retry, stop_after_attempt, wait_exponential
from src.error.exceptions import DBException
from src.system_var import environment
class Database:
"""データベース操作クラス"""
__connection: Connection = None
__engine: Engine = None
__host: str = None
__port: str = None
__username: str = None
__password: str = None
__schema: str = None
__connection_string: str = None
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
"""このクラスの新たなインスタンスを初期化します
Args:
username (str): DBユーザー名
password (str): DBパスワード
host (str): DBホスト名
port (int): DBポート
schema (str): DBスキーマ名
"""
self.__username = username
self.__password = password
self.__host = host
self.__port = int(port)
self.__schema = schema
self.__connection_string = URL.create(
drivername='mysql+pymysql',
username=self.__username,
password=self.__password,
host=self.__host,
port=self.__port,
database=self.__schema,
query={"charset": "utf8mb4"}
)
self.__engine = create_engine(
self.__connection_string,
pool_timeout=5,
poolclass=QueuePool
)
@classmethod
def get_instance(cls):
"""インスタンスを取得します
Returns:
Database: DB操作クラスインスタンス
"""
return cls(
username=environment.DB_USERNAME,
password=environment.DB_PASSWORD,
host=environment.DB_HOST,
port=environment.DB_PORT,
schema=environment.DB_SCHEMA
)
@retry(
wait=wait_exponential(
multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT,
min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS,
max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS
),
stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT),
retry_error_cls=DBException
)
def connect(self):
"""
DBに接続します接続に失敗した場合リトライします
Raises:
DBException: 接続失敗
"""
try:
self.__connection = self.__engine.connect()
except Exception as e:
raise DBException(e)
def execute_select(self, select_query: str, parameters=None) -> list[dict]:
"""SELECTクエリを実行します。
Args:
select_query (str): SELECT文
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
Raises:
DBException: DBエラー
Returns:
list[dict]: カラム名: 値の辞書リスト
"""
if self.__connection is None:
raise DBException('DBに接続していません')
result = None
try:
# トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(select_query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(select_query, parameters)
except Exception as e:
raise DBException(f'SQL Error: {e}')
result_rows = result.mappings().all()
return result_rows
def execute(self, query: str, parameters=None) -> CursorResult:
"""SQLクエリを実行します。
Args:
query (str): SQL文
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
Raises:
DBException: DBエラー
Returns:
CursorResult: 取得結果
"""
if self.__connection is None:
raise DBException('DBに接続していません')
result = None
try:
# トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(query, parameters)
except Exception as e:
raise DBException(f'SQL Error: {e}')
return result
def begin(self):
"""トランザクションを開始します。"""
if not self.__connection.in_transaction():
self.__connection.begin()
def commit(self):
"""トランザクションをコミットします"""
if self.__connection.in_transaction():
self.__connection.commit()
def rollback(self):
"""トランザクションをロールバックします"""
if self.__connection.in_transaction():
self.__connection.rollback()
def disconnect(self):
"""DB接続を切断します。"""
if self.__connection is not None:
self.__connection.close()
self.__connection = None
def to_jst(self):
self.execute('SET time_zone = "+9:00"')
def __execute_with_transaction(self, query: str, parameters: dict):
# トランザクションを開始してクエリを実行する
with self.__connection.begin():
try:
result = self.__connection.execute(text(query), parameters=parameters)
except Exception as e:
self.__connection.rollback()
raise e
# ここでコミットされる
return result

View File

View File

@ -0,0 +1,10 @@
class MeDaCaException(Exception):
pass
class DBException(MeDaCaException):
pass
class BatchOperationException(MeDaCaException):
pass

View File

@ -0,0 +1,126 @@
"""DBダンプ取得"""
import datetime
import os
import subprocess
import textwrap
from src.batch.batch_functions import (get_batch_statuses,
update_dump_status_kbn_complete,
update_dump_status_kbn_error,
update_dump_status_kbn_in_processing)
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants, environment
logger = get_logger('DBダンプ取得')
def exec():
try:
logger.info('DBダンプ取得開始')
try:
# 日次バッチ処置中フラグ、dump処理状態区分を取得
batch_processing_flag, dump_status_kbn = get_batch_statuses()
except BatchOperationException as e:
raise BatchOperationException(f'日付テーブル取得エラー(異常終了):{e}')
# 日次バッチ処理中の場合、処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('日次バッチ処理中の為、DBダンプ取得を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump処理状態区分が処理中の場合、処理は行わない
if dump_status_kbn == constants.DUMP_STATUS_KBN_PROCESSED:
logger.error(f'ダンプ取得中の為、DBダンプ取得を終了します。 dump処理状態区分={dump_status_kbn}')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump処理状態区分がエラーの場合、処理は行わない
if dump_status_kbn == constants.DUMP_STATUS_KBN_ERROR:
logger.error(f'ダンプ取得が実行不可能な状態の為、DBダンプ取得を終了します。 dump処理状態区分={dump_status_kbn}')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump処理状態区分を処理中に更新
try:
update_dump_status_kbn_in_processing()
except BatchOperationException as e:
raise BatchOperationException(f'dump処理状態区分更新(未処理→処理中) エラー(異常終了):{e}')
# MySQL接続情報を作成する
my_cnf_file_content = f"""
[client]
user={environment.DB_USERNAME}
password={environment.DB_PASSWORD}
host={environment.DB_HOST}
"""
# my.cnfファイルのパス
my_cnf_path = os.path.join('my.cnf')
# my.cnfファイルを生成する
with open(my_cnf_path, 'w') as f:
f.write(textwrap.dedent(my_cnf_file_content)[1:-1])
# ファイルのパーミッションが強いとmysqldumpコマンドが実行できないため
# my.cnfファイルのパーミッションをread-onlyに設定
os.chmod(my_cnf_path, 0o444)
dt_now = datetime.datetime.now()
converted_value = dt_now.strftime('%Y%m%d%H%M%S')
dump_file_name = f'backup_rds_{environment.DB_SCHEMA}_{converted_value}.gz'
s3_file_path = f's3://{environment.DUMP_BACKUP_BUCKET}/dump/{dt_now.year}/{dt_now.strftime("%m")}/{dt_now.strftime("%d")}/{dump_file_name}'
# mysqldumpコマンドを実行し、dumpを取得する
command = [
'mysqldump',
f'--defaults-file={my_cnf_path}',
'-P',
f"{environment.DB_PORT}",
'--no-tablespaces',
'--skip-column-statistics',
'--single-transaction',
'--set-gtid-purged=OFF',
environment.DB_SCHEMA
]
mysqldump_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# gzipコマンドを実行してdump結果を圧縮する
gzip_process = subprocess.Popen(['gzip', '-c'], stdin=mysqldump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# aws s3 cpコマンドを実行してアップロードする
s3_cp_process = subprocess.Popen(['aws', 's3', 'cp', '-', s3_file_path], stdin=gzip_process.stdout, stderr=subprocess.PIPE)
# mysqldumpの標準出力をgzipに接続したため、標準出力をクローズする
mysqldump_process.stdout.close()
# gzipの標準出力をaws s3 cpに接続したため、標準出力をクローズする
gzip_process.stdout.close()
# パイプラインを実行し、エラーハンドリング
_, error = mysqldump_process.communicate()
if mysqldump_process.returncode != 0:
raise BatchOperationException(f'`mysqldump`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
_, error = gzip_process.communicate()
if gzip_process.returncode != 0:
raise BatchOperationException(f'`gzip`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
_, error = s3_cp_process.communicate()
if s3_cp_process.returncode != 0:
raise BatchOperationException(f'`aws s3 cp`実行時にエラーが発生しました。{"" if error is None else error.decode("utf-8")}')
# dump処理状態区分を正常終了に更新
try:
update_dump_status_kbn_complete()
except BatchOperationException as e:
raise BatchOperationException(f'dump処理状態区分更新(処理中→正常終了) エラー(異常終了):{e}')
logger.info('DBダンプ取得終了正常終了')
logger.info(f'出力ファイルパス: {s3_file_path}')
return constants.BATCH_EXIT_CODE_SUCCESS
except Exception as e:
# dump処理状態区分をエラーに更新
try:
update_dump_status_kbn_error()
except BatchOperationException as e:
logger.exception(f'dump処理状態区分更新(処理中→エラー) エラー(異常終了):{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.exception(f'DBダンプ取得中に想定外のエラーが発生しました :{e}')
return constants.BATCH_EXIT_CODE_SUCCESS

View File

@ -0,0 +1,37 @@
import logging
from src.system_var.environment import LOG_LEVEL
# boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する
for name in ["boto3", "botocore", "s3transfer", "urllib3"]:
logging.getLogger(name).setLevel(logging.WARNING)
def get_logger(log_name: str) -> logging.Logger:
"""一意のログ出力モジュールを取得します。
Args:
log_name (str): ロガー名
Returns:
_type_: _description_
"""
logger = logging.getLogger(log_name)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
if not logger.hasHandlers():
handler = logging.StreamHandler()
logger.addHandler(handler)
formatter = logging.Formatter(
'%(name)s\t[%(levelname)s]\t%(asctime)s\t%(message)s',
'%Y-%m-%d %H:%M:%S'
)
for handler in logger.handlers:
handler.setFormatter(formatter)
return logger

View File

@ -0,0 +1,15 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0
# バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
# dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分処理中
DUMP_STATUS_KBN_PROCESSED = '1'
# dump取得状態区分正常終了
DUMP_STATUS_KBN_COMPLETE = '2'
# dump取得状態区分エラー
DUMP_STATUS_KBN_ERROR = '9'

View File

@ -0,0 +1,19 @@
import os
# Database
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
DUMP_BACKUP_BUCKET = os.environ['DUMP_BACKUP_BUCKET']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))