feat: 実消化アルトマークデータ転送処理を実装。共通処理の実装に合わせてbatch_functionsはリファクタリングする

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2025-05-14 19:44:56 +09:00
parent 7ca2318277
commit 0ab0bae0c2
22 changed files with 829 additions and 0 deletions

View File

@ -0,0 +1,12 @@
tests/*
.coverage
.env
.env.example
.report/*
.vscode/*
.pytest_cache/*
*/__pychache__/*
Dockerfile
pytest.ini
README.md
*.sql

View File

@ -0,0 +1,22 @@
DB_HOST=************
DB_PORT=************
DB_USERNAME=************
DB_PASSWORD=************
DB_SCHEMA=src07
JSK_IO_BUCKET=mbj-newdwh2021-staging-jskult-io
ULTMARC_DATA_BUCKET=mbj-newdwh2021-staging-jskult-ultmarc
JSKULT_BACKUP_BUCKET=mbj-newdwh2021-staging-backup-jskult
DATA_IMPORT_BUCKET=mbj-newdwh2021-staging-data
LOG_LEVEL=INFO
JSK_RECEIVE_FOLDER=recv
ULTMARC_RECEIVE_FOLDER=recv
ULTMARC_IMPORT_FOLDER=import
DATA_IMPORT_FOLDER=jsk/target
JSK_BACKUP_FOLDER=jsk/recv
ULTMARC_BACKUP_FOLDER=ultmarc
RESULT_OUTPUT_FOLDER=transfer_result
RESULT_OUTPUT_FILE_NAME=transfer_result.json
DB_CONNECTION_MAX_RETRY_ATTEMPT=4
DB_CONNECTION_RETRY_INTERVAL_INIT=5
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=5
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=50

View File

@ -0,0 +1,10 @@
.vscode/settings.json
.env
# python
__pycache__
# python test
.pytest_cache
.coverage
.report/

View File

@ -0,0 +1,16 @@
{
// IntelliSense 使
//
// : https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "(DEBUG)jskult transfer receive file",
"type": "python",
"request": "launch",
"program": "entrypoint.py",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@ -0,0 +1,25 @@
{
"[python]": {
"editor.defaultFormatter": null,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},
//
"python.defaultInterpreterPath": "<pythonインタプリターのパス>",
"python.linting.lintOnSave": true,
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.linting.flake8Args": [
"--max-line-length=200",
"--ignore=F541"
],
"python.formatting.provider": "autopep8",
"python.formatting.autopep8Path": "autopep8",
"python.formatting.autopep8Args": [
"--max-line-length", "200",
"--ignore=F541"
]
}

View File

@ -0,0 +1,20 @@
FROM python:3.12-slim-bookworm
ENV TZ="Asia/Tokyo"
# pythonの標準出力をバッファリングしないフラグ
ENV PYTHONUNBUFFERED=1
# pythonのバイトコードを生成しないフラグ
ENV PYTHONDONTWRITEBYTECODE=1
WORKDIR /usr/src/app
COPY Pipfile Pipfile.lock ./
RUN \
apt update -y && \
pip install pipenv --no-cache-dir && \
pipenv install --system --deploy && \
pip uninstall -y pipenv virtualenv-clone virtualenv
COPY src ./src
COPY entrypoint.py entrypoint.py
CMD ["python", "entrypoint.py"]

View File

@ -0,0 +1,23 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
boto3 = "*"
PyMySQL = "*"
sqlalchemy = "*"
tenacity = "*"
[dev-packages]
autopep8 = "*"
flake8 = "*"
pytest = "*"
pytest-cov = "*"
boto3 = "*"
[requires]
python_version = "3.12"
[pipenv]
allow_prereleases = true

View File

@ -0,0 +1,10 @@
"""実消化&アルトマーク データ転送処理"""
from src import main
if __name__ == '__main__':
try:
exit(main.exec())
except Exception:
# エラーが起きても、正常系のコードで返す。
# エラーが起きた事実はexec内でログを出す。
exit(0)

View File

@ -0,0 +1,170 @@
import gzip
import json
import os
import os.path as path
import shutil
import tempfile
import boto3
from src.system_var import environment
class S3Client:
__s3_client = boto3.client('s3')
_bucket_name: str
def list_objects(self, bucket_name: str, folder_name: str):
response = self.__s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=folder_name)
if response['KeyCount'] == 0:
return []
contents = response['Contents']
# 末尾がスラッシュで終わるものはフォルダとみなしてスキップする
objects = [{'filename': content['Key'], 'size': content['Size']}
for content in contents if not content['Key'].endswith('/')]
return objects
def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None:
copy_source = {'Bucket': src_bucket, 'Key': src_key}
self.__s3_client.copy(copy_source, dest_bucket, dest_key)
return
def put_object(self, bucket: str, key: str, json_body: str, content_type='application/json') -> None:
self.__s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json_body.encode('utf-8'),
ContentType=content_type
)
def delete_file(self, bucket_name: str, file_key: str):
self.__s3_client.delete_object(
Bucket=bucket_name,
Key=file_key
)
class S3Bucket():
_s3_client = S3Client()
_bucket_name: str = None
class JskIOBucket(S3Bucket):
_bucket_name = environment.JSK_IO_BUCKET
_recv_folder = environment.JSK_RECEIVE_FOLDER
_s3_file_list = None
def get_file_list(self):
self._s3_file_list = self._s3_client.list_objects(
self._bucket_name, self._recv_folder)
return self._s3_file_list
def download_data_file(self, data_filename: str):
temporary_dir = tempfile.mkdtemp()
temporary_file_path = path.join(
temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}')
with open(temporary_file_path, mode='wb') as f:
self._s3_client.download_file(self._bucket_name, data_filename, f)
f.seek(0)
return temporary_file_path
def unzip_data_file(self, filename: str):
temp_dir = os.path.dirname(filename)
decompress_filename = os.path.basename(filename).replace('.gz', '')
decompress_file_path = os.path.join(temp_dir, decompress_filename)
with gzip.open(filename, 'rb') as gz:
with open(decompress_file_path, 'wb') as decompressed_file:
shutil.copyfileobj(gz, decompressed_file)
ret = [decompress_file_path]
return ret
def transfer_file_to_import(self, target_file: dict):
data_import_bucket = DataImportBucket()
transfer_from_file_path = target_file.get("filename")
transfer_to_filename = transfer_from_file_path.replace(
f"{self._recv_folder}/", "")
data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}'
self._s3_client.copy(self._bucket_name, transfer_from_file_path,
data_import_bucket._bucket_name, data_import_key)
def backup_file(self, target_file: dict, datetime_key: str):
jsk_backup_bucket = JskBackupBucket()
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(
f"{self._recv_folder}/", "")
backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
jsk_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename").replace(
f"{self._recv_folder}/", "")
self._s3_client.delete_file(
self._bucket_name, delete_path)
class DataImportBucket(S3Bucket):
_bucket_name = environment.DATA_IMPORT_BUCKET
_folder = environment.DATA_IMPORT_FOLDER
class UltmarcBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_DATA_FOLDER
def get_file_list(self):
return self._s3_client.list_objects(self._bucket_name, self._folder)
def backup_file(self, target_file: dict, datetime_key: str):
# バックアップバケットにコピー
ultmarc_backup_bucket = UltmarcBackupBucket()
target_file_path = target_file.get("filename")
backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{target_file_path.replace(f"{self._folder}/", "")}'
self._s3_client.copy(self._bucket_name, target_file,
ultmarc_backup_bucket._bucket_name, backup_key)
def delete_file(self, target_file: dict):
delete_path = target_file.get("filename").replace(
f"{self._recv_folder}/", "")
self._s3_client.delete_file(
self._bucket_name, delete_path)
class UltmarcImportBucket(S3Bucket):
_bucket_name = environment.ULTMARC_DATA_BUCKET
_folder = environment.ULTMARC_IMPORT_FOLDER
def transfer_file_to_import(self, target_file: dict):
backup_from_file_path = target_file.get("filename")
backup_to_filename = backup_from_file_path.replace(
f"{self._recv_folder}/", "")
data_import_key = f'{self._folder}/{backup_to_filename}'
self._s3_client.copy(self._bucket_name, backup_from_file_path,
self._bucket_name, data_import_key)
class JskUltBackupBucket(S3Bucket):
_bucket_name = environment.JSKULT_BACKUP_BUCKET
class UltmarcBackupBucket(JskUltBackupBucket):
_folder = environment.ULTMARC_BACKUP_FOLDER
class JskBackupBucket(JskUltBackupBucket):
_folder = environment.JSK_BACKUP_FOLDER
class TransferResultOutputBucket(JskUltBackupBucket):
_folder = environment.RESULT_OUTPUT_FOLDER
def put_transfer_result(self, transfer_result: dict, datetime_key: str):
object_key = f'{self._folder}/{datetime_key}/{environment.RESULT_OUTPUT_FILE_NAME}'
json_body = json.dumps(transfer_result, ensure_ascii=False)
self._s3_client.put_object(
self._bucket_name,
object_key,
json_body
)

View File

@ -0,0 +1,100 @@
"""バッチ処理の共通関数"""
from datetime import datetime
from src.db.database import Database
from src.error.exceptions import BatchOperationException, DBException
from src.system_var import constants
def get_batch_statuses() -> tuple[str, str, str]:
"""日付テーブルから、以下を取得して返す。
- 日次バッチ処理中フラグ
- dump取得状況区分
- 処理日YYYY/MM/DD
Raises:
BatchOperationException: 日付テーブルが取得できないとき何らかのエラーが発生したとき
Returns:
tuple[str, str]: [0]日次バッチ処理中フラグdump取得状況区分
"""
db = Database.get_instance()
sql = 'SELECT bch_actf, dump_sts_kbn, src07.get_syor_date() AS syor_date FROM src07.hdke_tbl'
try:
db.connect()
hdke_tbl_result = db.execute_select(sql)
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
if len(hdke_tbl_result) == 0:
raise BatchOperationException('日付テーブルが取得できませんでした')
# 必ず1件取れる
hdke_tbl_record = hdke_tbl_result[0]
batch_processing_flag = hdke_tbl_record['bch_actf']
dump_status_kbn = hdke_tbl_record['dump_sts_kbn']
syor_date = hdke_tbl_record['syor_date']
# 処理日を文字列に変換する
syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d')
return batch_processing_flag, dump_status_kbn, syor_date_str
def update_batch_processing_flag_in_processing() -> None:
"""バッチ処理中フラグを処理中に更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :in_processing,
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(
sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return
def update_batch_process_complete() -> None:
"""バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する
Raises:
BatchOperationException: DB操作の何らかのエラー
"""
db = Database.get_instance()
sql = """\
UPDATE src07.hdke_tbl
SET
bch_actf = :batch_complete,
dump_sts_kbn = :dump_unprocessed,
syor_date = DATE_FORMAT((src07.get_syor_date() + interval 1 day), '%Y%m%d'), -- +1
updater = CURRENT_USER(),
update_date = NOW()
"""
try:
db.connect()
db.to_jst()
db.execute(sql, {
'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED,
'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED
})
except DBException as e:
raise BatchOperationException(e)
finally:
db.disconnect()
return

View File

@ -0,0 +1,198 @@
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
create_engine, text)
from sqlalchemy.engine.url import URL
from src.error.exceptions import DBException
from src.logging.get_logger import get_logger
from src.system_var import environment
from tenacity import retry, stop_after_attempt, wait_exponential
logger = get_logger(__name__)
class Database:
"""データベース操作クラス"""
__connection: Connection = None
__transactional_engine: Engine = None
__autocommit_engine: Engine = None
__host: str = None
__port: str = None
__username: str = None
__password: str = None
__schema: str = None
__autocommit: bool = None
__connection_string: str = None
def __init__(self, username: str, password: str, host: str, port: int, schema: str, autocommit: bool = False) -> None:
"""このクラスの新たなインスタンスを初期化します
Args:
username (str): DBユーザー名
password (str): DBパスワード
host (str): DBホスト名
port (int): DBポート
schema (str): DBスキーマ名
autocommit(bool): 自動コミットモードで接続するかどうか(Trueの場合トランザクションの有無に限らず即座にコミットされる). Defaults to False.
"""
self.__username = username
self.__password = password
self.__host = host
self.__port = int(port)
self.__schema = schema
self.__autocommit = autocommit
self.__connection_string = URL.create(
drivername='mysql+pymysql',
username=self.__username,
password=self.__password,
host=self.__host,
port=self.__port,
database=self.__schema,
query={"charset": "utf8mb4", "local_infile": "1"},
)
self.__transactional_engine = create_engine(
self.__connection_string,
pool_timeout=5,
poolclass=QueuePool
)
self.__autocommit_engine = self.__transactional_engine.execution_options(
isolation_level='AUTOCOMMIT')
@classmethod
def get_instance(cls, autocommit=False):
"""インスタンスを取得します
Args:
autocommit (bool, optional): 自動コミットモードで接続するかどうか(Trueの場合トランザクションの有無に限らず即座にコミットされる). Defaults to False.
Returns:
Database: DB操作クラスインスタンス
"""
return cls(
username=environment.DB_USERNAME,
password=environment.DB_PASSWORD,
host=environment.DB_HOST,
port=environment.DB_PORT,
schema=environment.DB_SCHEMA,
autocommit=autocommit
)
@retry(
wait=wait_exponential(
multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT,
min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS,
max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS
),
stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT),
retry_error_cls=DBException
)
def connect(self):
"""
DBに接続します接続に失敗した場合リトライします\n
インスタンスのautocommitがTrueの場合自動コミットモードで接続する明示的なトランザクションも無視される
Raises:
DBException: 接続失敗
"""
try:
self.__connection = (
self.__autocommit_engine.connect() if self.__autocommit is True
else self.__transactional_engine.connect())
except Exception as e:
raise DBException(e)
def execute_select(self, select_query: str, parameters=None) -> list[dict]:
"""SELECTクエリを実行します。
Args:
select_query (str): SELECT文
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
Raises:
DBException: DBエラー
Returns:
list[dict]: カラム名: 値の辞書リスト
"""
if self.__connection is None:
raise DBException('DBに接続していません')
result = None
try:
# トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(
text(select_query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(
select_query, parameters)
except Exception as e:
raise DBException(f'SQL Error: {e}')
result_rows = result.mappings().all()
return result_rows
def execute(self, query: str, parameters=None) -> CursorResult:
"""SQLクエリを実行します。
Args:
query (str): SQL文
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
Raises:
DBException: DBエラー
Returns:
CursorResult: 取得結果
"""
if self.__connection is None:
raise DBException('DBに接続していません')
result = None
try:
# トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(query, parameters)
except Exception as e:
raise DBException(f'SQL Error: {e}')
return result
def begin(self):
"""トランザクションを開始します。"""
if not self.__connection.in_transaction():
self.__connection.begin()
def commit(self):
"""トランザクションをコミットします"""
if self.__connection.in_transaction():
self.__connection.commit()
def rollback(self):
"""トランザクションをロールバックします"""
if self.__connection.in_transaction():
self.__connection.rollback()
def disconnect(self):
"""DB接続を切断します。"""
if self.__connection is not None:
self.__connection.close()
self.__connection = None
def to_jst(self):
self.execute('SET time_zone = "+9:00"')
def __execute_with_transaction(self, query: str, parameters: dict):
# トランザクションを開始してクエリを実行する
with self.__connection.begin():
try:
result = self.__connection.execute(
text(query), parameters=parameters)
except Exception as e:
self.__connection.rollback()
raise e
# ここでコミットされる
return result

View File

@ -0,0 +1,10 @@
class MeDaCaException(Exception):
pass
class DBException(MeDaCaException):
pass
class BatchOperationException(MeDaCaException):
pass

View File

@ -0,0 +1,37 @@
import logging
from src.system_var.environment import LOG_LEVEL
# boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する
for name in ["boto3", "botocore", "s3transfer", "urllib3"]:
logging.getLogger(name).setLevel(logging.WARNING)
def get_logger(log_name: str) -> logging.Logger:
"""一意のログ出力モジュールを取得します。
Args:
log_name (str): ロガー名
Returns:
_type_: _description_
"""
logger = logging.getLogger(log_name)
level = logging.getLevelName(LOG_LEVEL)
if not isinstance(level, int):
level = logging.INFO
logger.setLevel(level)
if not logger.hasHandlers():
handler = logging.StreamHandler()
logger.addHandler(handler)
formatter = logging.Formatter(
'%(name)s\t[%(levelname)s]\t%(asctime)s\t%(message)s',
'%Y-%m-%d %H:%M:%S'
)
for handler in logger.handlers:
handler.setFormatter(formatter)
return logger

View File

@ -0,0 +1,118 @@
"""実消化&アルトマーク データ転送処理"""
from src.aws.s3 import (JskIOBucket, TransferResultOutputBucket, UltmarcBucket,
UltmarcImportBucket)
from src.batch.batch_functions import (
get_batch_statuses, update_batch_processing_flag_in_processing)
from src.error.exceptions import BatchOperationException
from src.logging.get_logger import get_logger
from src.system_var import constants
logger = get_logger('実消化&アルトマークデータ転送')
def exec():
try:
# ① 処理開始ログを出力する
logger.info('I-1 処理開始: 実消化&アルトマークデータ転送')
# 転送データリストを初期化する。
transfer_file_lists = {"transfer_list": []}
# ② 日付テーブルのステータスを確認する
try:
# 日次バッチ処置中フラグ、dump処理状態区分、処理日を取得
batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses()
except BatchOperationException as e:
logger.exception(f'日付テーブル取得(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# 日次バッチ処理中の場合、後続の処理は行わない
if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING:
logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
# dump取得が正常終了していない場合、後続の処理は行わない
if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE:
logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(f'処理日={syor_date}')
logger.info('I-2 日次バッチ処理中フラグ更新')
# バッチ処理中に更新
try:
update_batch_processing_flag_in_processing()
except BatchOperationException as e:
logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}')
return constants.BATCH_EXIT_CODE_SUCCESS
# ③ 実消化データのリストを取得する
logger.info('I-3 実消化データリスト取得開始')
jsk_receive_file_list = None
try:
jsk_io_bucket = JskIOBucket()
jsk_receive_file_list: str = jsk_io_bucket.get_file_list()
except Exception as e:
logger.exception(f'実消化データリスト取得に失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(f'I-4 実消化データリスト取得終了。取得データ一覧:{jsk_receive_file_list}')
# ④ 取得した実消化データのリストでループ開始
logger.info(f'I-5 実消化データ転送処理開始')
for receive_file in jsk_receive_file_list:
# ⑤ 取得したデータを実消化&アルトマークバックアップバケットにコピーする
jsk_io_bucket.backup_file(receive_file, syor_date)
# ⑥ 取得したデータをデータ登録バケットにコピーする
jsk_io_bucket.transfer_file_to_import(receive_file)
# ⑦ 転送が完了したデータを元のバケットからファイルを削除する
jsk_io_bucket.delete_file(receive_file)
# ⑧ 転送が完了したファイル名を転送データリストに追加する
# ファイル名のみ切り出して追加
transfer_file_lists['transfer_list'].append(
receive_file['filename'].split('/')[1])
# ⑨ ループ終了後、実消化データ転送終了ログI-6)を出力する
logger.info(f'I-6 実消化データ転送処理終了')
# ⑩ アルトマークデータのリストを取得する
logger.info('I-7 アルトマークデータリスト取得開始')
ultmarc_receive_file_list = None
try:
ultmarc_bucket = UltmarcBucket()
ultmarc_receive_file_list: str = ultmarc_bucket.get_file_list()
except Exception as e:
logger.exception(f'アルトマークデータリスト取得に失敗しました。{e}')
return constants.BATCH_EXIT_CODE_SUCCESS
logger.info(
f'I-8 アルトマークデータリスト取得終了。取得データ一覧:{ultmarc_receive_file_list}')
# ⑪ 取得したアルトマークデータのリストでループ開始
logger.info(f'I-9 アルトマークデータ転送処理開始')
ultmarc_bucket_import_folder = UltmarcImportBucket()
for receive_file in ultmarc_receive_file_list:
# ⑫ 取得したデータを実消化&アルトマークバックアップバケットにコピーする
ultmarc_bucket.backup_file(receive_file, syor_date)
# ⑬ 取得したデータをデータ登録バケットにコピーする
ultmarc_bucket_import_folder.transfer_file_to_import(receive_file)
# ⑭ 転送が完了したデータを元のバケットからファイルを削除する
ultmarc_bucket.delete_file(receive_file)
# ⑮ 転送が完了したファイル名を転送データリストに追加する
# ファイル名のみ切り出して追加
transfer_file_lists['transfer_list'].append(
receive_file['filename'].split('/')[1])
# ⑯ ループ終了後、アルトマークデータ転送終了ログI-10)を出力する
logger.info(f'I-6 実消化データ転送処理終了')
# ⑰ 転送データリストをJSONファイル化し、S3バケットにアップロードする
TransferResultOutputBucket().put_transfer_result(transfer_file_lists, syor_date)
# ⑱ 処理終了ログ(I-12)を出力する
logger.info(f'I-12 処理終了: 実消化&アルトマークデータ転送')
return constants.BATCH_EXIT_CODE_SUCCESS
except Exception as e:
logger.exception(f'実消化&アルトマーク データ転送処理中に想定外のエラーが発生しました {e}')
raise e

View File

@ -0,0 +1,11 @@
# バッチ正常終了コード
BATCH_EXIT_CODE_SUCCESS = 0
# バッチ処理中フラグ:未処理
BATCH_ACTF_BATCH_UNPROCESSED = '0'
# バッチ処理中フラグ:処理中
BATCH_ACTF_BATCH_IN_PROCESSING = '1'
# dump取得状態区分未処理
DUMP_STATUS_KBN_UNPROCESSED = '0'
# dump取得状態区分dump取得正常終了
DUMP_STATUS_KBN_COMPLETE = '2'

View File

@ -0,0 +1,33 @@
import os
# Database
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ['DB_PORT'])
DB_USERNAME = os.environ['DB_USERNAME']
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_SCHEMA = os.environ['DB_SCHEMA']
# AWS
JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET']
JSK_RECEIVE_FOLDER = os.environ['JSK_RECEIVE_FOLDER']
DATA_IMPORT_BUCKET = os.environ['DATA_IMPORT_BUCKET']
DATA_IMPORT_FOLDER = os.environ['DATA_IMPORT_FOLDER']
ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET']
ULTMARC_RECEIVE_FOLDER = os.environ['ULTMARC_RECEIVE_FOLDER']
ULTMARC_IMPORT_FOLDER = os.environ['ULTMARC_IMPORT_FOLDER']
JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET']
JSK_BACKUP_FOLDER = os.environ['JSK_BACKUP_FOLDER']
ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER']
RESULT_OUTPUT_FOLDER = os.environ['RESULT_OUTPUT_FOLDER']
RESULT_OUTPUT_FILE_NAME = os.environ['RESULT_OUTPUT_FILE_NAME']
# 初期値がある環境変数
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
DB_CONNECTION_MAX_RETRY_ATTEMPT = int(
os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4))
DB_CONNECTION_RETRY_INTERVAL_INIT = int(
os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5))
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5))
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int(
os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50))

View File

@ -0,0 +1,14 @@
# task environment file.
LOG_LEVEL=INFO
JSK_RECEIVE_FOLDER=recv
ULTMARC_RECEIVE_FOLDER=recv
ULTMARC_IMPORT_FOLDER=import
DATA_IMPORT_FOLDER=jsk/target
JSK_BACKUP_FOLDER=jsk/recv
ULTMARC_BACKUP_FOLDER=ultmarc
RESULT_OUTPUT_FOLDER=transfer_result
RESULT_OUTPUT_FILE_NAME=transfer_result.json
DB_CONNECTION_MAX_RETRY_ATTEMPT=4
DB_CONNECTION_RETRY_INTERVAL_INIT=5
DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=5
DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=50