from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, create_engine, text) from sqlalchemy.engine.url import URL from tenacity import retry, stop_after_attempt, wait_exponential from src.error.exceptions import DBException from src.logging.get_logger import get_logger from src.system_var import environment logger = get_logger(__name__) class Database: """データベース操作クラス""" __connection: Connection = None __transactional_engine: Engine = None __autocommit_engine: Engine = None __host: str = None __port: str = None __username: str = None __password: str = None __schema: str = None __autocommit: bool = None __connection_string: str = None def __init__(self, username: str, password: str, host: str, port: int, schema: str, autocommit: bool = False) -> None: """このクラスの新たなインスタンスを初期化します Args: username (str): DBユーザー名 password (str): DBパスワード host (str): DBホスト名 port (int): DBポート schema (str): DBスキーマ名 autocommit(bool): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False. """ self.__username = username self.__password = password self.__host = host self.__port = int(port) self.__schema = schema self.__autocommit = autocommit self.__connection_string = URL.create( drivername='mysql+pymysql', username=self.__username, password=self.__password, host=self.__host, port=self.__port, database=self.__schema, query={"charset": "utf8mb4"} ) self.__transactional_engine = create_engine( self.__connection_string, pool_timeout=5, poolclass=QueuePool ) self.__autocommit_engine = self.__transactional_engine.execution_options(isolation_level='AUTOCOMMIT') @classmethod def get_instance(cls, autocommit=False): """インスタンスを取得します Args: autocommit (bool, optional): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False. Returns: Database: DB操作クラスインスタンス """ return cls( username=environment.DB_USERNAME, password=environment.DB_PASSWORD, host=environment.DB_HOST, port=environment.DB_PORT, schema=environment.DB_SCHEMA, autocommit=autocommit ) @retry( wait=wait_exponential( multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT, min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS, max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS ), stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT)) def connect(self): """ DBに接続します。接続に失敗した場合、リトライします。\n インスタンスのautocommitがTrueの場合、自動コミットモードで接続する。(明示的なトランザクションも無視される) Raises: DBException: 接続失敗 """ try: self.__connection = ( self.__autocommit_engine.connect() if self.__autocommit is True else self.__transactional_engine.connect()) except Exception as e: raise DBException(e) def execute_select(self, select_query: str, parameters=None) -> list[dict]: """SELECTクエリを実行します。 Args: select_query (str): SELECT文 parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None. Raises: DBException: DBエラー Returns: list[dict]: カラム名: 値の辞書リスト """ if self.__connection is None: raise DBException('DBに接続していません') result = None try: # トランザクションが開始している場合は、トランザクションを引き継ぐ if self.__connection.in_transaction(): result = self.__connection.execute(text(select_query), parameters) else: # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 result = self.__execute_with_transaction(select_query, parameters) except Exception as e: raise DBException(f'SQL Error: {e}') result_rows = result.mappings().all() return result_rows def execute(self, query: str, parameters=None) -> CursorResult: """SQLクエリを実行します。 Args: query (str): SQL文 parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None. Raises: DBException: DBエラー Returns: CursorResult: 取得結果 """ if self.__connection is None: raise DBException('DBに接続していません') result = None try: # トランザクションが開始している場合は、トランザクションを引き継ぐ if self.__connection.in_transaction(): result = self.__connection.execute(text(query), parameters) else: # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 result = self.__execute_with_transaction(query, parameters) except Exception as e: raise DBException(f'SQL Error: {e}') return result def begin(self): """トランザクションを開始します。""" if not self.__connection.in_transaction(): self.__connection.begin() def commit(self): """トランザクションをコミットします""" if self.__connection.in_transaction(): self.__connection.commit() def rollback(self): """トランザクションをロールバックします""" if self.__connection.in_transaction(): self.__connection.rollback() def disconnect(self): """DB接続を切断します。""" if self.__connection is not None: self.__connection.close() self.__connection = None def __execute_with_transaction(self, query: str, parameters: dict): # トランザクションを開始してクエリを実行する with self.__connection.begin(): try: result = self.__connection.execute(text(query), parameters=parameters) except Exception as e: self.__connection.rollback() raise e # ここでコミットされる return result