diff --git a/ecs/jskult-transfer-receive-file/.dockerignore b/ecs/jskult-transfer-receive-file/.dockerignore new file mode 100644 index 00000000..8b9da402 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/.dockerignore @@ -0,0 +1,12 @@ +tests/* +.coverage +.env +.env.example +.report/* +.vscode/* +.pytest_cache/* +*/__pychache__/* +Dockerfile +pytest.ini +README.md +*.sql diff --git a/ecs/jskult-transfer-receive-file/.env.example b/ecs/jskult-transfer-receive-file/.env.example new file mode 100644 index 00000000..b87d728b --- /dev/null +++ b/ecs/jskult-transfer-receive-file/.env.example @@ -0,0 +1,22 @@ +DB_HOST=************ +DB_PORT=************ +DB_USERNAME=************ +DB_PASSWORD=************ +DB_SCHEMA=src07 +JSK_IO_BUCKET=mbj-newdwh2021-staging-jskult-io +ULTMARC_DATA_BUCKET=mbj-newdwh2021-staging-jskult-ultmarc +JSKULT_BACKUP_BUCKET=mbj-newdwh2021-staging-backup-jskult +DATA_IMPORT_BUCKET=mbj-newdwh2021-staging-data +LOG_LEVEL=INFO +JSK_RECEIVE_FOLDER=recv +ULTMARC_RECEIVE_FOLDER=recv +ULTMARC_IMPORT_FOLDER=import +DATA_IMPORT_FOLDER=jsk/target +JSK_BACKUP_FOLDER=jsk/recv +ULTMARC_BACKUP_FOLDER=ultmarc +RESULT_OUTPUT_FOLDER=transfer_result +RESULT_OUTPUT_FILE_NAME=transfer_result.json +DB_CONNECTION_MAX_RETRY_ATTEMPT=4 +DB_CONNECTION_RETRY_INTERVAL_INIT=5 +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=5 +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=50 diff --git a/ecs/jskult-transfer-receive-file/.gitignore b/ecs/jskult-transfer-receive-file/.gitignore new file mode 100644 index 00000000..bd0b37f8 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/.gitignore @@ -0,0 +1,10 @@ +.vscode/settings.json +.env + +# python +__pycache__ + +# python test +.pytest_cache +.coverage +.report/ \ No newline at end of file diff --git a/ecs/jskult-transfer-receive-file/.vscode/launch.json b/ecs/jskult-transfer-receive-file/.vscode/launch.json new file mode 100644 index 00000000..f45e88b3 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // IntelliSense を使用して利用可能な属性を学べます。 + // 既存の属性の説明をホバーして表示します。 + // 詳細情報は次を確認してください: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "(DEBUG)jskult transfer receive file", + "type": "python", + "request": "launch", + "program": "entrypoint.py", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} \ No newline at end of file diff --git a/ecs/jskult-transfer-receive-file/.vscode/recommended_settings.json b/ecs/jskult-transfer-receive-file/.vscode/recommended_settings.json new file mode 100644 index 00000000..23ed6f0f --- /dev/null +++ b/ecs/jskult-transfer-receive-file/.vscode/recommended_settings.json @@ -0,0 +1,25 @@ +{ + "[python]": { + "editor.defaultFormatter": null, + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": true + } + }, + // 自身の環境に合わせて変えてください + "python.defaultInterpreterPath": "", + "python.linting.lintOnSave": true, + "python.linting.enabled": true, + "python.linting.pylintEnabled": false, + "python.linting.flake8Enabled": true, + "python.linting.flake8Args": [ + "--max-line-length=200", + "--ignore=F541" + ], + "python.formatting.provider": "autopep8", + "python.formatting.autopep8Path": "autopep8", + "python.formatting.autopep8Args": [ + "--max-line-length", "200", + "--ignore=F541" + ] +} diff --git a/ecs/jskult-transfer-receive-file/Dockerfile b/ecs/jskult-transfer-receive-file/Dockerfile new file mode 100644 index 00000000..674c6c53 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim-bookworm + +ENV TZ="Asia/Tokyo" +# pythonの標準出力をバッファリングしないフラグ +ENV PYTHONUNBUFFERED=1 +# pythonのバイトコードを生成しないフラグ +ENV PYTHONDONTWRITEBYTECODE=1 + +WORKDIR /usr/src/app +COPY Pipfile Pipfile.lock ./ +RUN \ + apt update -y && \ + pip install pipenv --no-cache-dir && \ + pipenv install --system --deploy && \ + pip uninstall -y pipenv virtualenv-clone virtualenv + +COPY src ./src +COPY entrypoint.py entrypoint.py + +CMD ["python", "entrypoint.py"] diff --git a/ecs/jskult-transfer-receive-file/Pipfile b/ecs/jskult-transfer-receive-file/Pipfile new file mode 100644 index 00000000..12961dde --- /dev/null +++ b/ecs/jskult-transfer-receive-file/Pipfile @@ -0,0 +1,23 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +boto3 = "*" +PyMySQL = "*" +sqlalchemy = "*" +tenacity = "*" + +[dev-packages] +autopep8 = "*" +flake8 = "*" +pytest = "*" +pytest-cov = "*" +boto3 = "*" + +[requires] +python_version = "3.12" + +[pipenv] +allow_prereleases = true diff --git a/ecs/jskult-transfer-receive-file/entrypoint.py b/ecs/jskult-transfer-receive-file/entrypoint.py new file mode 100644 index 00000000..294cc731 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/entrypoint.py @@ -0,0 +1,10 @@ +"""実消化&アルトマーク データ転送処理""" +from src import main + +if __name__ == '__main__': + try: + exit(main.exec()) + except Exception: + # エラーが起きても、正常系のコードで返す。 + # エラーが起きた事実はexec内でログを出す。 + exit(0) diff --git a/ecs/jskult-transfer-receive-file/src/__init__.py b/ecs/jskult-transfer-receive-file/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/jskult-transfer-receive-file/src/aws/__init__.py b/ecs/jskult-transfer-receive-file/src/aws/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/jskult-transfer-receive-file/src/aws/s3.py b/ecs/jskult-transfer-receive-file/src/aws/s3.py new file mode 100644 index 00000000..2615db0f --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/aws/s3.py @@ -0,0 +1,170 @@ +import gzip +import json +import os +import os.path as path +import shutil +import tempfile + +import boto3 +from src.system_var import environment + + +class S3Client: + __s3_client = boto3.client('s3') + _bucket_name: str + + def list_objects(self, bucket_name: str, folder_name: str): + response = self.__s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=folder_name) + if response['KeyCount'] == 0: + return [] + contents = response['Contents'] + # 末尾がスラッシュで終わるものはフォルダとみなしてスキップする + objects = [{'filename': content['Key'], 'size': content['Size']} + for content in contents if not content['Key'].endswith('/')] + return objects + + def copy(self, src_bucket: str, src_key: str, dest_bucket: str, dest_key: str) -> None: + copy_source = {'Bucket': src_bucket, 'Key': src_key} + self.__s3_client.copy(copy_source, dest_bucket, dest_key) + return + + def put_object(self, bucket: str, key: str, json_body: str, content_type='application/json') -> None: + self.__s3_client.put_object( + Bucket=bucket, + Key=key, + Body=json_body.encode('utf-8'), + ContentType=content_type + ) + + def delete_file(self, bucket_name: str, file_key: str): + self.__s3_client.delete_object( + Bucket=bucket_name, + Key=file_key + ) + + +class S3Bucket(): + _s3_client = S3Client() + _bucket_name: str = None + + +class JskIOBucket(S3Bucket): + _bucket_name = environment.JSK_IO_BUCKET + _recv_folder = environment.JSK_RECEIVE_FOLDER + + _s3_file_list = None + + def get_file_list(self): + self._s3_file_list = self._s3_client.list_objects( + self._bucket_name, self._recv_folder) + return self._s3_file_list + + def download_data_file(self, data_filename: str): + temporary_dir = tempfile.mkdtemp() + temporary_file_path = path.join( + temporary_dir, f'{data_filename.replace(f"{self._recv_folder}/", "")}') + with open(temporary_file_path, mode='wb') as f: + self._s3_client.download_file(self._bucket_name, data_filename, f) + f.seek(0) + return temporary_file_path + + def unzip_data_file(self, filename: str): + temp_dir = os.path.dirname(filename) + decompress_filename = os.path.basename(filename).replace('.gz', '') + decompress_file_path = os.path.join(temp_dir, decompress_filename) + with gzip.open(filename, 'rb') as gz: + with open(decompress_file_path, 'wb') as decompressed_file: + shutil.copyfileobj(gz, decompressed_file) + + ret = [decompress_file_path] + return ret + + def transfer_file_to_import(self, target_file: dict): + data_import_bucket = DataImportBucket() + transfer_from_file_path = target_file.get("filename") + transfer_to_filename = transfer_from_file_path.replace( + f"{self._recv_folder}/", "") + data_import_key = f'{data_import_bucket._folder}/{transfer_to_filename}' + self._s3_client.copy(self._bucket_name, transfer_from_file_path, + data_import_bucket._bucket_name, data_import_key) + + def backup_file(self, target_file: dict, datetime_key: str): + jsk_backup_bucket = JskBackupBucket() + backup_from_file_path = target_file.get("filename") + backup_to_filename = backup_from_file_path.replace( + f"{self._recv_folder}/", "") + backup_key = f'{jsk_backup_bucket._folder}/{datetime_key}/{backup_to_filename}' + self._s3_client.copy(self._bucket_name, backup_from_file_path, + jsk_backup_bucket._bucket_name, backup_key) + + def delete_file(self, target_file: dict): + delete_path = target_file.get("filename").replace( + f"{self._recv_folder}/", "") + self._s3_client.delete_file( + self._bucket_name, delete_path) + + +class DataImportBucket(S3Bucket): + _bucket_name = environment.DATA_IMPORT_BUCKET + _folder = environment.DATA_IMPORT_FOLDER + + +class UltmarcBucket(S3Bucket): + _bucket_name = environment.ULTMARC_DATA_BUCKET + _folder = environment.ULTMARC_DATA_FOLDER + + def get_file_list(self): + return self._s3_client.list_objects(self._bucket_name, self._folder) + + def backup_file(self, target_file: dict, datetime_key: str): + # バックアップバケットにコピー + ultmarc_backup_bucket = UltmarcBackupBucket() + target_file_path = target_file.get("filename") + backup_key = f'{ultmarc_backup_bucket._folder}/{datetime_key}/{target_file_path.replace(f"{self._folder}/", "")}' + self._s3_client.copy(self._bucket_name, target_file, + ultmarc_backup_bucket._bucket_name, backup_key) + + def delete_file(self, target_file: dict): + delete_path = target_file.get("filename").replace( + f"{self._recv_folder}/", "") + self._s3_client.delete_file( + self._bucket_name, delete_path) + + +class UltmarcImportBucket(S3Bucket): + _bucket_name = environment.ULTMARC_DATA_BUCKET + _folder = environment.ULTMARC_IMPORT_FOLDER + + def transfer_file_to_import(self, target_file: dict): + backup_from_file_path = target_file.get("filename") + backup_to_filename = backup_from_file_path.replace( + f"{self._recv_folder}/", "") + data_import_key = f'{self._folder}/{backup_to_filename}' + self._s3_client.copy(self._bucket_name, backup_from_file_path, + self._bucket_name, data_import_key) + + +class JskUltBackupBucket(S3Bucket): + _bucket_name = environment.JSKULT_BACKUP_BUCKET + + +class UltmarcBackupBucket(JskUltBackupBucket): + _folder = environment.ULTMARC_BACKUP_FOLDER + + +class JskBackupBucket(JskUltBackupBucket): + _folder = environment.JSK_BACKUP_FOLDER + + +class TransferResultOutputBucket(JskUltBackupBucket): + _folder = environment.RESULT_OUTPUT_FOLDER + + def put_transfer_result(self, transfer_result: dict, datetime_key: str): + object_key = f'{self._folder}/{datetime_key}/{environment.RESULT_OUTPUT_FILE_NAME}' + json_body = json.dumps(transfer_result, ensure_ascii=False) + self._s3_client.put_object( + self._bucket_name, + object_key, + json_body + ) diff --git a/ecs/jskult-transfer-receive-file/src/batch/batch_functions.py b/ecs/jskult-transfer-receive-file/src/batch/batch_functions.py new file mode 100644 index 00000000..9202511a --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/batch/batch_functions.py @@ -0,0 +1,100 @@ +"""バッチ処理の共通関数""" +from datetime import datetime + +from src.db.database import Database +from src.error.exceptions import BatchOperationException, DBException +from src.system_var import constants + + +def get_batch_statuses() -> tuple[str, str, str]: + """日付テーブルから、以下を取得して返す。 + - 日次バッチ処理中フラグ + - dump取得状況区分 + - 処理日(YYYY/MM/DD) + + Raises: + BatchOperationException: 日付テーブルが取得できないとき、何らかのエラーが発生したとき + + Returns: + tuple[str, str]: [0]日次バッチ処理中フラグ、dump取得状況区分 + """ + db = Database.get_instance() + sql = 'SELECT bch_actf, dump_sts_kbn, src07.get_syor_date() AS syor_date FROM src07.hdke_tbl' + try: + db.connect() + hdke_tbl_result = db.execute_select(sql) + except DBException as e: + raise BatchOperationException(e) + finally: + db.disconnect() + + if len(hdke_tbl_result) == 0: + raise BatchOperationException('日付テーブルが取得できませんでした') + + # 必ず1件取れる + hdke_tbl_record = hdke_tbl_result[0] + batch_processing_flag = hdke_tbl_record['bch_actf'] + dump_status_kbn = hdke_tbl_record['dump_sts_kbn'] + syor_date = hdke_tbl_record['syor_date'] + # 処理日を文字列に変換する + syor_date_str = datetime.strftime(syor_date, '%Y/%m/%d') + + return batch_processing_flag, dump_status_kbn, syor_date_str + + +def update_batch_processing_flag_in_processing() -> None: + """バッチ処理中フラグを処理中に更新する + + Raises: + BatchOperationException: DB操作の何らかのエラー + """ + db = Database.get_instance() + sql = """\ + UPDATE src07.hdke_tbl + SET + bch_actf = :in_processing, + updater = CURRENT_USER(), + update_date = NOW() + """ + try: + db.connect() + db.to_jst() + db.execute( + sql, {'in_processing': constants.BATCH_ACTF_BATCH_IN_PROCESSING}) + except DBException as e: + raise BatchOperationException(e) + finally: + db.disconnect() + + return + + +def update_batch_process_complete() -> None: + """バッチ処理を完了とし、処理日、バッチ処理中フラグ、dump処理状態区分を更新する + + Raises: + BatchOperationException: DB操作の何らかのエラー + """ + db = Database.get_instance() + sql = """\ + UPDATE src07.hdke_tbl + SET + bch_actf = :batch_complete, + dump_sts_kbn = :dump_unprocessed, + syor_date = DATE_FORMAT((src07.get_syor_date() + interval 1 day), '%Y%m%d'), -- +1日 + updater = CURRENT_USER(), + update_date = NOW() + """ + try: + db.connect() + db.to_jst() + db.execute(sql, { + 'batch_complete': constants.BATCH_ACTF_BATCH_UNPROCESSED, + 'dump_unprocessed': constants.DUMP_STATUS_KBN_UNPROCESSED + }) + except DBException as e: + raise BatchOperationException(e) + finally: + db.disconnect() + + return diff --git a/ecs/jskult-transfer-receive-file/src/db/__init__.py b/ecs/jskult-transfer-receive-file/src/db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/jskult-transfer-receive-file/src/db/database.py b/ecs/jskult-transfer-receive-file/src/db/database.py new file mode 100644 index 00000000..3f6ce8ea --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/db/database.py @@ -0,0 +1,198 @@ +from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, + create_engine, text) +from sqlalchemy.engine.url import URL +from src.error.exceptions import DBException +from src.logging.get_logger import get_logger +from src.system_var import environment +from tenacity import retry, stop_after_attempt, wait_exponential + +logger = get_logger(__name__) + + +class Database: + """データベース操作クラス""" + __connection: Connection = None + __transactional_engine: Engine = None + __autocommit_engine: Engine = None + __host: str = None + __port: str = None + __username: str = None + __password: str = None + __schema: str = None + __autocommit: bool = None + __connection_string: str = None + + def __init__(self, username: str, password: str, host: str, port: int, schema: str, autocommit: bool = False) -> None: + """このクラスの新たなインスタンスを初期化します + + Args: + username (str): DBユーザー名 + password (str): DBパスワード + host (str): DBホスト名 + port (int): DBポート + schema (str): DBスキーマ名 + autocommit(bool): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False. + """ + self.__username = username + self.__password = password + self.__host = host + self.__port = int(port) + self.__schema = schema + self.__autocommit = autocommit + + self.__connection_string = URL.create( + drivername='mysql+pymysql', + username=self.__username, + password=self.__password, + host=self.__host, + port=self.__port, + database=self.__schema, + query={"charset": "utf8mb4", "local_infile": "1"}, + ) + + self.__transactional_engine = create_engine( + self.__connection_string, + pool_timeout=5, + poolclass=QueuePool + ) + + self.__autocommit_engine = self.__transactional_engine.execution_options( + isolation_level='AUTOCOMMIT') + + @classmethod + def get_instance(cls, autocommit=False): + """インスタンスを取得します + + Args: + autocommit (bool, optional): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False. + Returns: + Database: DB操作クラスインスタンス + """ + return cls( + username=environment.DB_USERNAME, + password=environment.DB_PASSWORD, + host=environment.DB_HOST, + port=environment.DB_PORT, + schema=environment.DB_SCHEMA, + autocommit=autocommit + ) + + @retry( + wait=wait_exponential( + multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT, + min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS, + max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS + ), + stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT), + retry_error_cls=DBException + ) + def connect(self): + """ + DBに接続します。接続に失敗した場合、リトライします。\n + インスタンスのautocommitがTrueの場合、自動コミットモードで接続する。(明示的なトランザクションも無視される) + Raises: + DBException: 接続失敗 + """ + try: + self.__connection = ( + self.__autocommit_engine.connect() if self.__autocommit is True + else self.__transactional_engine.connect()) + except Exception as e: + raise DBException(e) + + def execute_select(self, select_query: str, parameters=None) -> list[dict]: + """SELECTクエリを実行します。 + + Args: + select_query (str): SELECT文 + parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None. + + Raises: + DBException: DBエラー + + Returns: + list[dict]: カラム名: 値の辞書リスト + """ + if self.__connection is None: + raise DBException('DBに接続していません') + + result = None + try: + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute( + text(select_query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction( + select_query, parameters) + except Exception as e: + raise DBException(f'SQL Error: {e}') + + result_rows = result.mappings().all() + return result_rows + + def execute(self, query: str, parameters=None) -> CursorResult: + """SQLクエリを実行します。 + + Args: + query (str): SQL文 + parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None. + + Raises: + DBException: DBエラー + + Returns: + CursorResult: 取得結果 + """ + if self.__connection is None: + raise DBException('DBに接続していません') + + result = None + try: + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute(text(query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction(query, parameters) + except Exception as e: + raise DBException(f'SQL Error: {e}') + + return result + + def begin(self): + """トランザクションを開始します。""" + if not self.__connection.in_transaction(): + self.__connection.begin() + + def commit(self): + """トランザクションをコミットします""" + if self.__connection.in_transaction(): + self.__connection.commit() + + def rollback(self): + """トランザクションをロールバックします""" + if self.__connection.in_transaction(): + self.__connection.rollback() + + def disconnect(self): + """DB接続を切断します。""" + if self.__connection is not None: + self.__connection.close() + self.__connection = None + + def to_jst(self): + self.execute('SET time_zone = "+9:00"') + + def __execute_with_transaction(self, query: str, parameters: dict): + # トランザクションを開始してクエリを実行する + with self.__connection.begin(): + try: + result = self.__connection.execute( + text(query), parameters=parameters) + except Exception as e: + self.__connection.rollback() + raise e + # ここでコミットされる + return result diff --git a/ecs/jskult-transfer-receive-file/src/error/__init__.py b/ecs/jskult-transfer-receive-file/src/error/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/jskult-transfer-receive-file/src/error/exceptions.py b/ecs/jskult-transfer-receive-file/src/error/exceptions.py new file mode 100644 index 00000000..055c24f6 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/error/exceptions.py @@ -0,0 +1,10 @@ +class MeDaCaException(Exception): + pass + + +class DBException(MeDaCaException): + pass + + +class BatchOperationException(MeDaCaException): + pass diff --git a/ecs/jskult-transfer-receive-file/src/logging/get_logger.py b/ecs/jskult-transfer-receive-file/src/logging/get_logger.py new file mode 100644 index 00000000..f36f1199 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/logging/get_logger.py @@ -0,0 +1,37 @@ +import logging + +from src.system_var.environment import LOG_LEVEL + +# boto3関連モジュールのログレベルを事前に個別指定し、モジュール内のDEBUGログの表示を抑止する +for name in ["boto3", "botocore", "s3transfer", "urllib3"]: + logging.getLogger(name).setLevel(logging.WARNING) + + +def get_logger(log_name: str) -> logging.Logger: + """一意のログ出力モジュールを取得します。 + + Args: + log_name (str): ロガー名 + + Returns: + _type_: _description_ + """ + logger = logging.getLogger(log_name) + level = logging.getLevelName(LOG_LEVEL) + if not isinstance(level, int): + level = logging.INFO + logger.setLevel(level) + + if not logger.hasHandlers(): + handler = logging.StreamHandler() + logger.addHandler(handler) + + formatter = logging.Formatter( + '%(name)s\t[%(levelname)s]\t%(asctime)s\t%(message)s', + '%Y-%m-%d %H:%M:%S' + ) + + for handler in logger.handlers: + handler.setFormatter(formatter) + + return logger diff --git a/ecs/jskult-transfer-receive-file/src/main.py b/ecs/jskult-transfer-receive-file/src/main.py new file mode 100644 index 00000000..676ab7f3 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/main.py @@ -0,0 +1,118 @@ +"""実消化&アルトマーク データ転送処理""" + +from src.aws.s3 import (JskIOBucket, TransferResultOutputBucket, UltmarcBucket, + UltmarcImportBucket) +from src.batch.batch_functions import ( + get_batch_statuses, update_batch_processing_flag_in_processing) +from src.error.exceptions import BatchOperationException +from src.logging.get_logger import get_logger +from src.system_var import constants + +logger = get_logger('実消化&アルトマークデータ転送') + + +def exec(): + try: + # ① 処理開始ログを出力する + logger.info('I-1 処理開始: 実消化&アルトマークデータ転送') + + # 転送データリストを初期化する。 + transfer_file_lists = {"transfer_list": []} + + # ② 日付テーブルのステータスを確認する + try: + # 日次バッチ処置中フラグ、dump処理状態区分、処理日を取得 + batch_processing_flag, dump_status_kbn, syor_date = get_batch_statuses() + except BatchOperationException as e: + logger.exception(f'日付テーブル取得(異常終了){e}') + return constants.BATCH_EXIT_CODE_SUCCESS + + # 日次バッチ処理中の場合、後続の処理は行わない + if batch_processing_flag == constants.BATCH_ACTF_BATCH_IN_PROCESSING: + logger.error('日次バッチ処理中のため、日次バッチ処理を終了します。') + return constants.BATCH_EXIT_CODE_SUCCESS + + # dump取得が正常終了していない場合、後続の処理は行わない + if dump_status_kbn != constants.DUMP_STATUS_KBN_COMPLETE: + logger.error('dump取得が正常終了していないため、日次バッチ処理を終了します。') + return constants.BATCH_EXIT_CODE_SUCCESS + + logger.info(f'処理日={syor_date}') + + logger.info('I-2 日次バッチ処理中フラグ更新') + # バッチ処理中に更新 + try: + update_batch_processing_flag_in_processing() + except BatchOperationException as e: + logger.exception(f'処理フラグ更新(未処理→処理中) エラー(異常終了){e}') + return constants.BATCH_EXIT_CODE_SUCCESS + + # ③ 実消化データのリストを取得する + logger.info('I-3 実消化データリスト取得開始') + jsk_receive_file_list = None + try: + jsk_io_bucket = JskIOBucket() + jsk_receive_file_list: str = jsk_io_bucket.get_file_list() + except Exception as e: + logger.exception(f'実消化データリスト取得に失敗しました。{e}') + return constants.BATCH_EXIT_CODE_SUCCESS + logger.info(f'I-4 実消化データリスト取得終了。取得データ一覧:{jsk_receive_file_list}') + + # ④ 取得した実消化データのリストでループ開始 + logger.info(f'I-5 実消化データ転送処理開始') + for receive_file in jsk_receive_file_list: + # ⑤ 取得したデータを実消化&アルトマークバックアップバケットにコピーする + jsk_io_bucket.backup_file(receive_file, syor_date) + # ⑥ 取得したデータをデータ登録バケットにコピーする + jsk_io_bucket.transfer_file_to_import(receive_file) + # ⑦ 転送が完了したデータを元のバケットからファイルを削除する + jsk_io_bucket.delete_file(receive_file) + # ⑧ 転送が完了したファイル名を転送データリストに追加する + # ファイル名のみ切り出して追加 + transfer_file_lists['transfer_list'].append( + receive_file['filename'].split('/')[1]) + + # ⑨ ループ終了後、実消化データ転送終了ログ(I-6)を出力する + logger.info(f'I-6 実消化データ転送処理終了') + + # ⑩ アルトマークデータのリストを取得する + logger.info('I-7 アルトマークデータリスト取得開始') + ultmarc_receive_file_list = None + try: + ultmarc_bucket = UltmarcBucket() + ultmarc_receive_file_list: str = ultmarc_bucket.get_file_list() + except Exception as e: + logger.exception(f'アルトマークデータリスト取得に失敗しました。{e}') + return constants.BATCH_EXIT_CODE_SUCCESS + logger.info( + f'I-8 アルトマークデータリスト取得終了。取得データ一覧:{ultmarc_receive_file_list}') + + # ⑪ 取得したアルトマークデータのリストでループ開始 + logger.info(f'I-9 アルトマークデータ転送処理開始') + ultmarc_bucket_import_folder = UltmarcImportBucket() + for receive_file in ultmarc_receive_file_list: + # ⑫ 取得したデータを実消化&アルトマークバックアップバケットにコピーする + ultmarc_bucket.backup_file(receive_file, syor_date) + # ⑬ 取得したデータをデータ登録バケットにコピーする + ultmarc_bucket_import_folder.transfer_file_to_import(receive_file) + # ⑭ 転送が完了したデータを元のバケットからファイルを削除する + ultmarc_bucket.delete_file(receive_file) + # ⑮ 転送が完了したファイル名を転送データリストに追加する + # ファイル名のみ切り出して追加 + transfer_file_lists['transfer_list'].append( + receive_file['filename'].split('/')[1]) + + # ⑯ ループ終了後、アルトマークデータ転送終了ログ(I-10)を出力する + logger.info(f'I-6 実消化データ転送処理終了') + + # ⑰ 転送データリストをJSONファイル化し、S3バケットにアップロードする + TransferResultOutputBucket().put_transfer_result(transfer_file_lists, syor_date) + + # ⑱ 処理終了ログ(I-12)を出力する + logger.info(f'I-12 処理終了: 実消化&アルトマークデータ転送') + + return constants.BATCH_EXIT_CODE_SUCCESS + + except Exception as e: + logger.exception(f'実消化&アルトマーク データ転送処理中に想定外のエラーが発生しました {e}') + raise e diff --git a/ecs/jskult-transfer-receive-file/src/system_var/__init__.py b/ecs/jskult-transfer-receive-file/src/system_var/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ecs/jskult-transfer-receive-file/src/system_var/constants.py b/ecs/jskult-transfer-receive-file/src/system_var/constants.py new file mode 100644 index 00000000..d5538a80 --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/system_var/constants.py @@ -0,0 +1,11 @@ +# バッチ正常終了コード +BATCH_EXIT_CODE_SUCCESS = 0 + +# バッチ処理中フラグ:未処理 +BATCH_ACTF_BATCH_UNPROCESSED = '0' +# バッチ処理中フラグ:処理中 +BATCH_ACTF_BATCH_IN_PROCESSING = '1' +# dump取得状態区分:未処理 +DUMP_STATUS_KBN_UNPROCESSED = '0' +# dump取得状態区分:dump取得正常終了 +DUMP_STATUS_KBN_COMPLETE = '2' diff --git a/ecs/jskult-transfer-receive-file/src/system_var/environment.py b/ecs/jskult-transfer-receive-file/src/system_var/environment.py new file mode 100644 index 00000000..386d150b --- /dev/null +++ b/ecs/jskult-transfer-receive-file/src/system_var/environment.py @@ -0,0 +1,33 @@ +import os + +# Database +DB_HOST = os.environ['DB_HOST'] +DB_PORT = int(os.environ['DB_PORT']) +DB_USERNAME = os.environ['DB_USERNAME'] +DB_PASSWORD = os.environ['DB_PASSWORD'] +DB_SCHEMA = os.environ['DB_SCHEMA'] + +# AWS +JSK_IO_BUCKET = os.environ['JSK_IO_BUCKET'] +JSK_RECEIVE_FOLDER = os.environ['JSK_RECEIVE_FOLDER'] +DATA_IMPORT_BUCKET = os.environ['DATA_IMPORT_BUCKET'] +DATA_IMPORT_FOLDER = os.environ['DATA_IMPORT_FOLDER'] +ULTMARC_DATA_BUCKET = os.environ['ULTMARC_DATA_BUCKET'] +ULTMARC_RECEIVE_FOLDER = os.environ['ULTMARC_RECEIVE_FOLDER'] +ULTMARC_IMPORT_FOLDER = os.environ['ULTMARC_IMPORT_FOLDER'] +JSKULT_BACKUP_BUCKET = os.environ['JSKULT_BACKUP_BUCKET'] +JSK_BACKUP_FOLDER = os.environ['JSK_BACKUP_FOLDER'] +ULTMARC_BACKUP_FOLDER = os.environ['ULTMARC_BACKUP_FOLDER'] +RESULT_OUTPUT_FOLDER = os.environ['RESULT_OUTPUT_FOLDER'] +RESULT_OUTPUT_FILE_NAME = os.environ['RESULT_OUTPUT_FILE_NAME'] + +# 初期値がある環境変数 +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') +DB_CONNECTION_MAX_RETRY_ATTEMPT = int( + os.environ.get('DB_CONNECTION_MAX_RETRY_ATTEMPT', 4)) +DB_CONNECTION_RETRY_INTERVAL_INIT = int( + os.environ.get('DB_CONNECTION_RETRY_INTERVAL', 5)) +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MIN_SECONDS', 5)) +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS = int( + os.environ.get('DB_CONNECTION_RETRY_MAX_SECONDS', 50)) diff --git a/s3/config/jskult/task_settings/transfer_receive_file_task_settings.env b/s3/config/jskult/task_settings/transfer_receive_file_task_settings.env new file mode 100644 index 00000000..65e7a660 --- /dev/null +++ b/s3/config/jskult/task_settings/transfer_receive_file_task_settings.env @@ -0,0 +1,14 @@ +# task environment file. +LOG_LEVEL=INFO +JSK_RECEIVE_FOLDER=recv +ULTMARC_RECEIVE_FOLDER=recv +ULTMARC_IMPORT_FOLDER=import +DATA_IMPORT_FOLDER=jsk/target +JSK_BACKUP_FOLDER=jsk/recv +ULTMARC_BACKUP_FOLDER=ultmarc +RESULT_OUTPUT_FOLDER=transfer_result +RESULT_OUTPUT_FILE_NAME=transfer_result.json +DB_CONNECTION_MAX_RETRY_ATTEMPT=4 +DB_CONNECTION_RETRY_INTERVAL_INIT=5 +DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS=5 +DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS=50