fix: トランザクションが勝手にコミットされるようになっていたのを修正。Webの方も横展開で修正

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-04-06 23:25:36 +09:00
parent f6d0476846
commit 8caef8acec
2 changed files with 69 additions and 20 deletions

View File

@ -1,6 +1,5 @@
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
create_engine, text) create_engine, text)
from sqlalchemy.engine.create import create_engine
from sqlalchemy.engine.url import URL from sqlalchemy.engine.url import URL
from tenacity import retry, stop_after_attempt, wait_exponential from tenacity import retry, stop_after_attempt, wait_exponential
@ -10,6 +9,7 @@ from src.system_var import environment
logger = get_logger(__name__) logger = get_logger(__name__)
class Database: class Database:
"""データベース操作クラス""" """データベース操作クラス"""
__connection: Connection = None __connection: Connection = None
@ -19,8 +19,7 @@ class Database:
__username: str = None __username: str = None
__password: str = None __password: str = None
__schema: str = None __schema: str = None
__connection_string:str = None __connection_string: str = None
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None: def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
"""このクラスの新たなインスタンスを初期化します """このクラスの新たなインスタンスを初期化します
@ -51,8 +50,7 @@ class Database:
self.__engine = create_engine( self.__engine = create_engine(
self.__connection_string, self.__connection_string,
pool_timeout=5, pool_timeout=5,
poolclass=QueuePool, poolclass=QueuePool
isolation_level="AUTOCOMMIT"
) )
@classmethod @classmethod
@ -100,10 +98,18 @@ class Database:
""" """
if self.__connection is None: if self.__connection is None:
raise DBException('DBに接続していません') raise DBException('DBに接続していません')
result = None
try: try:
result = self.__connection.execute(text(select_query), parameters=parameters) # トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(select_query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(select_query, parameters)
except Exception as e: except Exception as e:
raise DBException(e) raise DBException(e)
result_rows = result.mappings().all() result_rows = result.mappings().all()
return result_rows return result_rows
@ -122,10 +128,18 @@ class Database:
""" """
if self.__connection is None: if self.__connection is None:
raise DBException('DBに接続していません') raise DBException('DBに接続していません')
result = None
try: try:
result = self.__connection.execute(text(query), parameters=parameters) # トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(query, parameters)
except Exception as e: except Exception as e:
raise DBException(e) raise DBException(e)
return result return result
def begin(self): def begin(self):
@ -148,3 +162,14 @@ class Database:
if self.__connection is not None: if self.__connection is not None:
self.__connection.close() self.__connection.close()
self.__connection = None self.__connection = None
def __execute_with_transaction(self, query: str, parameters: dict):
# トランザクションを開始してクエリを実行する
with self.__connection.begin():
try:
result = self.__connection.execute(text(query), parameters=parameters)
except Exception as e:
self.__connection.rollback()
raise e
# ここでコミットされる
return result

View File

@ -1,6 +1,5 @@
from sqlalchemy import (Connection, CursorResult, Engine, QueuePool, from sqlalchemy import (Connection, CursorResult, Engine, QueuePool,
create_engine, text) create_engine, text)
from sqlalchemy.engine.create import create_engine
from sqlalchemy.engine.url import URL from sqlalchemy.engine.url import URL
from src.error.exceptions import DBException from src.error.exceptions import DBException
@ -16,8 +15,7 @@ class Database:
__username: str = None __username: str = None
__password: str = None __password: str = None
__schema: str = None __schema: str = None
__connection_string:str = None __connection_string: str = None
def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None: def __init__(self, username: str, password: str, host: str, port: int, schema: str) -> None:
"""このクラスの新たなインスタンスを初期化します """このクラスの新たなインスタンスを初期化します
@ -48,8 +46,7 @@ class Database:
self.__engine = create_engine( self.__engine = create_engine(
self.__connection_string, self.__connection_string,
pool_timeout=5, pool_timeout=5,
poolclass=QueuePool, poolclass=QueuePool
isolation_level="AUTOCOMMIT"
) )
@classmethod @classmethod
@ -97,10 +94,18 @@ class Database:
""" """
if self.__connection is None: if self.__connection is None:
raise DBException('DBに接続していません') raise DBException('DBに接続していません')
result = None
try: try:
result = self.__connection.execute(text(select_query), parameters=parameters) # トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(select_query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(select_query, parameters)
except Exception as e: except Exception as e:
raise DBException(e) raise DBException(e)
result_rows = result.mappings().all() result_rows = result.mappings().all()
return result_rows return result_rows
@ -119,10 +124,18 @@ class Database:
""" """
if self.__connection is None: if self.__connection is None:
raise DBException('DBに接続していません') raise DBException('DBに接続していません')
result = None
try: try:
result = self.__connection.execute(text(query), parameters=parameters) # トランザクションが開始している場合は、トランザクションを引き継ぐ
if self.__connection.in_transaction():
result = self.__connection.execute(text(query), parameters)
else:
# トランザクションが明示的に開始していない場合は、クエリ単位でトランザクションをbegin-commitする。
result = self.__execute_with_transaction(query, parameters)
except Exception as e: except Exception as e:
raise DBException(e) raise DBException(e)
return result return result
def begin(self): def begin(self):
@ -145,3 +158,14 @@ class Database:
if self.__connection is not None: if self.__connection is not None:
self.__connection.close() self.__connection.close()
self.__connection = None self.__connection = None
def __execute_with_transaction(self, query: str, parameters: dict):
# トランザクションを開始してクエリを実行する
with self.__connection.begin():
try:
result = self.__connection.execute(text(query), parameters=parameters)
except Exception as e:
self.__connection.rollback()
raise e
# ここでコミットされる
return result