175 lines
5.8 KiB
Python
175 lines
5.8 KiB
Python
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
|
|
create_engine, text)
|
|
from sqlalchemy.engine.url import URL
|
|
|
|
from src.error.exceptions import DBException
|
|
from src.system_var import environment
|
|
|
|
|
|
class Database:
|
|
"""データベース操作クラス"""
|
|
__connection: Connection = None
|
|
__engine: Engine = None
|
|
__host: str = None
|
|
__port: str = None
|
|
__username: str = None
|
|
__password: str = None
|
|
__schema: str = None
|
|
__connection_string: str = None
|
|
|
|
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
|
|
"""このクラスの新たなインスタンスを初期化します
|
|
|
|
Args:
|
|
username (str): DBユーザー名
|
|
password (str): DBパスワード
|
|
host (str): DBホスト名
|
|
port (int): DBポート
|
|
schema (str): DBスキーマ名
|
|
"""
|
|
self.__username = username
|
|
self.__password = password
|
|
self.__host = host
|
|
self.__port = int(port)
|
|
self.__schema = schema
|
|
|
|
self.__connection_string = URL.create(
|
|
drivername='mysql+pymysql',
|
|
username=self.__username,
|
|
password=self.__password,
|
|
host=self.__host,
|
|
port=self.__port,
|
|
database=self.__schema,
|
|
query={"charset": "utf8mb4"}
|
|
)
|
|
|
|
self.__engine = create_engine(
|
|
self.__connection_string,
|
|
pool_timeout=5,
|
|
poolclass=QueuePool
|
|
)
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
"""インスタンスを取得します
|
|
|
|
Returns:
|
|
Database: DB操作クラスインスタンス
|
|
"""
|
|
return cls(
|
|
username=environment.DB_USERNAME,
|
|
password=environment.DB_PASSWORD,
|
|
host=environment.DB_HOST,
|
|
port=environment.DB_PORT,
|
|
schema=environment.DB_SCHEMA
|
|
)
|
|
|
|
@property
|
|
def connection(self):
|
|
"""
|
|
DBの接続を返します。
|
|
"""
|
|
return self.__connection
|
|
|
|
def connect(self):
|
|
"""
|
|
DBに接続します。接続に失敗した場合、リトライします。
|
|
Raises:
|
|
DBException: 接続失敗
|
|
"""
|
|
try:
|
|
self.__connection = self.__engine.connect()
|
|
except Exception as e:
|
|
raise DBException(e)
|
|
|
|
def execute_select(self, select_query: str, parameters=None) -> list[dict]:
|
|
"""SELECTクエリを実行します。
|
|
|
|
Args:
|
|
select_query (str): SELECT文
|
|
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
|
|
|
Raises:
|
|
DBException: DBエラー
|
|
|
|
Returns:
|
|
list[dict]: カラム名: 値の辞書リスト
|
|
"""
|
|
if self.__connection is None:
|
|
raise DBException('DBに接続していません')
|
|
|
|
result = None
|
|
try:
|
|
# トランザクションが開始している場合は、トランザクションを引き継ぐ
|
|
if self.__connection.in_transaction():
|
|
result = self.__connection.execute(text(select_query), parameters)
|
|
else:
|
|
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
|
|
result = self.__execute_with_transaction(select_query, parameters)
|
|
except Exception as e:
|
|
raise DBException(e)
|
|
|
|
result_rows = result.mappings().all()
|
|
return result_rows
|
|
|
|
def execute(self, query: str, parameters=None) -> CursorResult:
|
|
"""SQLクエリを実行します。
|
|
|
|
Args:
|
|
query (str): SQL文
|
|
parameters (dict, optional): クエリのプレースホルダーに埋め込む変数の辞書. Defaults to None.
|
|
|
|
Raises:
|
|
DBException: DBエラー
|
|
|
|
Returns:
|
|
CursorResult: 取得結果
|
|
"""
|
|
if self.__connection is None:
|
|
raise DBException('DBに接続していません')
|
|
|
|
result = None
|
|
try:
|
|
# トランザクションが開始している場合は、トランザクションを引き継ぐ
|
|
if self.__connection.in_transaction():
|
|
result = self.__connection.execute(text(query), parameters)
|
|
else:
|
|
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
|
|
result = self.__execute_with_transaction(query, parameters)
|
|
except Exception as e:
|
|
raise DBException(e)
|
|
|
|
return result
|
|
|
|
def begin(self):
|
|
"""トランザクションを開始します。"""
|
|
if not self.__connection.in_transaction():
|
|
self.__connection.begin()
|
|
|
|
def commit(self):
|
|
"""トランザクションをコミットします"""
|
|
if self.__connection.in_transaction():
|
|
self.__connection.commit()
|
|
|
|
def rollback(self):
|
|
"""トランザクションをロールバックします"""
|
|
if self.__connection.in_transaction():
|
|
self.__connection.rollback()
|
|
|
|
def disconnect(self):
|
|
"""DB接続を切断します。"""
|
|
if self.__connection is not None:
|
|
self.__connection.close()
|
|
self.__connection = None
|
|
|
|
def __execute_with_transaction(self, query: str, parameters: dict):
|
|
# トランザクションを開始してクエリを実行する
|
|
with self.__connection.begin():
|
|
try:
|
|
result = self.__connection.execute(text(query), parameters=parameters)
|
|
except Exception as e:
|
|
self.__connection.rollback()
|
|
raise e
|
|
# ここでコミットされる
|
|
return result
|