191 lines
7.2 KiB
Python
191 lines
7.2 KiB
Python
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
|
||
create_engine, text)
|
||
from sqlalchemy.engine.url import URL
|
||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||
|
||
from src.error.exceptions import DBException
|
||
from src.logging.get_logger import get_logger
|
||
from src.system_var import environment
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class Database:
|
||
"""データベース操作クラス"""
|
||
__connection: Connection = None
|
||
__transactional_engine: Engine = None
|
||
__autocommit_engine: Engine = None
|
||
__host: str = None
|
||
__port: str = None
|
||
__username: str = None
|
||
__password: str = None
|
||
__schema: str = None
|
||
__autocommit: bool = None
|
||
__connection_string: str = None
|
||
|
||
def __init__(self, username: str, password: str, host: str, port: int, schema: str, autocommit: bool = False) -> None:
|
||
"""このクラスの新たなインスタンスを初期化します
|
||
|
||
Args:
|
||
username (str): DBユーザー名
|
||
password (str): DBパスワード
|
||
host (str): DBホスト名
|
||
port (int): DBポート
|
||
schema (str): DBスキーマ名
|
||
autocommit(bool): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False.
|
||
"""
|
||
self.__username = username
|
||
self.__password = password
|
||
self.__host = host
|
||
self.__port = int(port)
|
||
self.__schema = schema
|
||
self.__autocommit = autocommit
|
||
|
||
self.__connection_string = URL.create(
|
||
drivername='mysql+pymysql',
|
||
username=self.__username,
|
||
password=self.__password,
|
||
host=self.__host,
|
||
port=self.__port,
|
||
database=self.__schema,
|
||
query={"charset": "utf8mb4", "local_infile": "1"},
|
||
)
|
||
|
||
self.__transactional_engine = create_engine(
|
||
self.__connection_string,
|
||
pool_timeout=5,
|
||
poolclass=QueuePool
|
||
)
|
||
|
||
self.__autocommit_engine = self.__transactional_engine.execution_options(isolation_level='AUTOCOMMIT')
|
||
|
||
@classmethod
|
||
def get_instance(cls, autocommit=False):
|
||
"""インスタンスを取得します
|
||
|
||
Args:
|
||
autocommit (bool, optional): 自動コミットモードで接続するかどうか(Trueの場合、トランザクションの有無に限らず即座にコミットされる). Defaults to False.
|
||
Returns:
|
||
Database: DB操作クラスインスタンス
|
||
"""
|
||
return cls(
|
||
username=environment.DB_USERNAME,
|
||
password=environment.DB_PASSWORD,
|
||
host=environment.DB_HOST,
|
||
port=environment.DB_PORT,
|
||
schema=environment.DB_SCHEMA,
|
||
autocommit=autocommit
|
||
)
|
||
|
||
@retry(
|
||
wait=wait_exponential(
|
||
multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT,
|
||
min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS,
|
||
max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS
|
||
),
|
||
stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT))
|
||
def connect(self):
|
||
"""
|
||
DBに接続します。接続に失敗した場合、リトライします。\n
|
||
インスタンスのautocommitがTrueの場合、自動コミットモードで接続する。(明示的なトランザクションも無視される)
|
||
Raises:
|
||
DBException: 接続失敗
|
||
"""
|
||
try:
|
||
self.__connection = (
|
||
self.__autocommit_engine.connect() if self.__autocommit is True
|
||
else self.__transactional_engine.connect())
|
||
except Exception as e:
|
||
raise DBException(e)
|
||
|
||
def execute_select(self, select_query: str, parameters=None) -> list[dict]:
|
||
"""SELECTクエリを実行します。
|
||
|
||
Args:
|
||
select_query (str): SELECT文
|
||
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
||
|
||
Raises:
|
||
DBException: DBエラー
|
||
|
||
Returns:
|
||
list[dict]: カラム名: 値の辞書リスト
|
||
"""
|
||
if self.__connection is None:
|
||
raise DBException('DBに接続していません')
|
||
|
||
result = None
|
||
try:
|
||
# トランザクションが開始している場合は、トランザクションを引き継ぐ
|
||
if self.__connection.in_transaction():
|
||
result = self.__connection.execute(text(select_query), parameters)
|
||
else:
|
||
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
|
||
result = self.__execute_with_transaction(select_query, parameters)
|
||
except Exception as e:
|
||
raise DBException(f'SQL Error: {e}')
|
||
|
||
result_rows = result.mappings().all()
|
||
return result_rows
|
||
|
||
def execute(self, query: str, parameters=None) -> CursorResult:
|
||
"""SQLクエリを実行します。
|
||
|
||
Args:
|
||
query (str): SQL文
|
||
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
||
|
||
Raises:
|
||
DBException: DBエラー
|
||
|
||
Returns:
|
||
CursorResult: 取得結果
|
||
"""
|
||
if self.__connection is None:
|
||
raise DBException('DBに接続していません')
|
||
|
||
result = None
|
||
try:
|
||
# トランザクションが開始している場合は、トランザクションを引き継ぐ
|
||
if self.__connection.in_transaction():
|
||
result = self.__connection.execute(text(query), parameters)
|
||
else:
|
||
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
|
||
result = self.__execute_with_transaction(query, parameters)
|
||
except Exception as e:
|
||
raise DBException(f'SQL Error: {e}')
|
||
|
||
return result
|
||
|
||
def begin(self):
|
||
"""トランザクションを開始します。"""
|
||
if not self.__connection.in_transaction():
|
||
self.__connection.begin()
|
||
|
||
def commit(self):
|
||
"""トランザクションをコミットします"""
|
||
if self.__connection.in_transaction():
|
||
self.__connection.commit()
|
||
|
||
def rollback(self):
|
||
"""トランザクションをロールバックします"""
|
||
if self.__connection.in_transaction():
|
||
self.__connection.rollback()
|
||
|
||
def disconnect(self):
|
||
"""DB接続を切断します。"""
|
||
if self.__connection is not None:
|
||
self.__connection.close()
|
||
self.__connection = None
|
||
|
||
def __execute_with_transaction(self, query: str, parameters: dict):
|
||
# トランザクションを開始してクエリを実行する
|
||
with self.__connection.begin():
|
||
try:
|
||
result = self.__connection.execute(text(query), parameters=parameters)
|
||
except Exception as e:
|
||
self.__connection.rollback()
|
||
raise e
|
||
# ここでコミットされる
|
||
return result
|