151 lines
4.9 KiB
Python
151 lines
4.9 KiB
Python
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
|
|
create_engine, text)
|
|
from sqlalchemy.engine.create import create_engine
|
|
from sqlalchemy.engine.url import URL
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
from src.error.exceptions import DBException
|
|
from src.logging.get_logger import get_logger
|
|
from src.system_var import environment
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
class Database:
|
|
"""データベース操作クラス"""
|
|
__connection: Connection = None
|
|
__engine: Engine = None
|
|
__host: str = None
|
|
__port: str = None
|
|
__username: str = None
|
|
__password: str = None
|
|
__schema: str = None
|
|
__connection_string:str = None
|
|
|
|
|
|
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
|
|
"""このクラスの新たなインスタンスを初期化します
|
|
|
|
Args:
|
|
username (str): DBユーザー名
|
|
password (str): DBパスワード
|
|
host (str): DBホスト名
|
|
port (int): DBポート
|
|
schema (str): DBスキーマ名
|
|
"""
|
|
self.__username = username
|
|
self.__password = password
|
|
self.__host = host
|
|
self.__port = int(port)
|
|
self.__schema = schema
|
|
|
|
self.__connection_string = URL.create(
|
|
drivername='mysql+pymysql',
|
|
username=self.__username,
|
|
password=self.__password,
|
|
host=self.__host,
|
|
port=self.__port,
|
|
database=self.__schema,
|
|
query={"charset": "utf8mb4"}
|
|
)
|
|
|
|
self.__engine = create_engine(
|
|
self.__connection_string,
|
|
pool_timeout=5,
|
|
poolclass=QueuePool,
|
|
isolation_level="AUTOCOMMIT"
|
|
)
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
"""インスタンスを取得します
|
|
|
|
Returns:
|
|
Database: DB操作クラスインスタンス
|
|
"""
|
|
return cls(
|
|
username=environment.DB_USERNAME,
|
|
password=environment.DB_PASSWORD,
|
|
host=environment.DB_HOST,
|
|
port=environment.DB_PORT,
|
|
schema=environment.DB_SCHEMA
|
|
)
|
|
|
|
@retry(
|
|
wait=wait_exponential(
|
|
multiplier=environment.DB_CONNECTION_RETRY_INTERVAL_INIT,
|
|
min=environment.DB_CONNECTION_RETRY_INTERVAL_MIN_SECONDS,
|
|
max=environment.DB_CONNECTION_RETRY_INTERVAL_MAX_SECONDS
|
|
),
|
|
stop=stop_after_attempt(environment.DB_CONNECTION_MAX_RETRY_ATTEMPT))
|
|
def connect(self):
|
|
"""
|
|
DBに接続します。接続に失敗した場合、リトライします。
|
|
Raises:
|
|
DBException: 接続失敗
|
|
"""
|
|
self.__connection = self.__engine.connect()
|
|
|
|
def execute_select(self, select_query: str, parameters=None) -> list[dict]:
|
|
"""SELECTクエリを実行します。
|
|
|
|
Args:
|
|
select_query (str): SELECT文
|
|
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
|
|
|
Raises:
|
|
DBException: DBエラー
|
|
|
|
Returns:
|
|
list[dict]: カラム名: 値の辞書リスト
|
|
"""
|
|
if self.__connection is None:
|
|
raise DBException('DBに接続していません')
|
|
try:
|
|
result = self.__connection.execute(text(select_query), parameters=parameters)
|
|
except Exception as e:
|
|
raise DBException(e)
|
|
result_rows = result.mappings().all()
|
|
return result_rows
|
|
|
|
def execute(self, query: str, parameters=None) -> CursorResult:
|
|
"""SQLクエリを実行します。
|
|
|
|
Args:
|
|
query (str): SQL文
|
|
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
|
|
|
Raises:
|
|
DBException: DBエラー
|
|
|
|
Returns:
|
|
CursorResult: 取得結果
|
|
"""
|
|
if self.__connection is None:
|
|
raise DBException('DBに接続していません')
|
|
try:
|
|
result = self.__connection.execute(text(query), parameters=parameters)
|
|
except Exception as e:
|
|
raise DBException(e)
|
|
return result
|
|
|
|
def begin(self):
|
|
"""トランザクションを開始します。"""
|
|
if not self.__connection.in_transaction():
|
|
self.__connection.begin()
|
|
|
|
def commit(self):
|
|
"""トランザクションをコミットします"""
|
|
if self.__connection.in_transaction():
|
|
self.__connection.commit()
|
|
|
|
def rollback(self):
|
|
"""トランザクションをロールバックします"""
|
|
if self.__connection.in_transaction():
|
|
self.__connection.rollback()
|
|
|
|
def disconnect(self):
|
|
"""DB接続を切断します。"""
|
|
if self.__connection is not None:
|
|
self.__connection.close()
|
|
self.__connection = None
|