diff --git a/ecs/jskult-batch-daily/src/db/database.py b/ecs/jskult-batch-daily/src/db/database.py index 7e8f845c..288fc4ff 100644 --- a/ecs/jskult-batch-daily/src/db/database.py +++ b/ecs/jskult-batch-daily/src/db/database.py @@ -1,6 +1,5 @@ from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, create_engine, text) -from sqlalchemy.engine.create import create_engine from sqlalchemy.engine.url import URL from tenacity import retry, stop_after_attempt, wait_exponential @@ -10,6 +9,7 @@ from src.system_var import environment logger = get_logger(__name__) + class Database: """データベース操作クラス""" __connection: Connection = None @@ -19,8 +19,7 @@ class Database: __username: str = None __password: str = None __schema: str = None - __connection_string:str = None - + __connection_string: str = None def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None: """このクラスの新たなインスタンスを初期化します @@ -37,9 +36,9 @@ class Database: self.__host = host self.__port = int(port) self.__schema = schema - + self.__connection_string = URL.create( - drivername='mysql+pymysql', + drivername='mysql+pymysql', username=self.__username, password=self.__password, host=self.__host, @@ -47,12 +46,11 @@ class Database: database=self.__schema, query={"charset": "utf8mb4"} ) - + self.__engine = create_engine( self.__connection_string, pool_timeout=5, - poolclass=QueuePool, - isolation_level="AUTOCOMMIT" + poolclass=QueuePool ) @classmethod @@ -100,10 +98,18 @@ class Database: """ if self.__connection is None: raise DBException('DBに接続していません') + + result = None try: - result = self.__connection.execute(text(select_query), parameters=parameters) + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute(text(select_query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction(select_query, parameters) except Exception as e: raise DBException(e) + result_rows = result.mappings().all() return result_rows @@ -122,10 +128,18 @@ class Database: """ if self.__connection is None: raise DBException('DBに接続していません') + + result = None try: - result = self.__connection.execute(text(query), parameters=parameters) + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute(text(query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction(query, parameters) except Exception as e: raise DBException(e) + return result def begin(self): @@ -148,3 +162,14 @@ class Database: if self.__connection is not None: self.__connection.close() self.__connection = None + + def __execute_with_transaction(self, query: str, parameters: dict): + # トランザクションを開始してクエリを実行する + with self.__connection.begin(): + try: + result = self.__connection.execute(text(query), parameters=parameters) + except Exception as e: + self.__connection.rollback() + raise e + # ここでコミットされる + return result diff --git a/ecs/jskult-webapp/src/db/database.py b/ecs/jskult-webapp/src/db/database.py index 8308438d..c639a82a 100644 --- a/ecs/jskult-webapp/src/db/database.py +++ b/ecs/jskult-webapp/src/db/database.py @@ -1,6 +1,5 @@ from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, create_engine, text) -from sqlalchemy.engine.create import create_engine from sqlalchemy.engine.url import URL from src.error.exceptions import DBException @@ -16,8 +15,7 @@ class Database: __username: str = None __password: str = None __schema: str = None - __connection_string:str = None - + __connection_string: str = None def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None: """このクラスの新たなインスタンスを初期化します @@ -34,9 +32,9 @@ class Database: self.__host = host self.__port = int(port) self.__schema = schema - + self.__connection_string = URL.create( - drivername='mysql+pymysql', + drivername='mysql+pymysql', username=self.__username, password=self.__password, host=self.__host, @@ -44,12 +42,11 @@ class Database: database=self.__schema, query={"charset": "utf8mb4"} ) - + self.__engine = create_engine( self.__connection_string, pool_timeout=5, - poolclass=QueuePool, - isolation_level="AUTOCOMMIT" + poolclass=QueuePool ) @classmethod @@ -97,10 +94,18 @@ class Database: """ if self.__connection is None: raise DBException('DBに接続していません') + + result = None try: - result = self.__connection.execute(text(select_query), parameters=parameters) + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute(text(select_query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction(select_query, parameters) except Exception as e: raise DBException(e) + result_rows = result.mappings().all() return result_rows @@ -119,10 +124,18 @@ class Database: """ if self.__connection is None: raise DBException('DBに接続していません') + + result = None try: - result = self.__connection.execute(text(query), parameters=parameters) + # トランザクションが開始している場合は、トランザクションを引き継ぐ + if self.__connection.in_transaction(): + result = self.__connection.execute(text(query), parameters) + else: + # トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。 + result = self.__execute_with_transaction(query, parameters) except Exception as e: raise DBException(e) + return result def begin(self): @@ -145,3 +158,14 @@ class Database: if self.__connection is not None: self.__connection.close() self.__connection = None + + def __execute_with_transaction(self, query: str, parameters: dict): + # トランザクションを開始してクエリを実行する + with self.__connection.begin(): + try: + result = self.__connection.execute(text(query), parameters=parameters) + except Exception as e: + self.__connection.rollback() + raise e + # ここでコミットされる + return result