fix: SSOログインユーザー判定のロジック修正。ログも仕込んだ。

This commit is contained in:
shimoda.m@nds-tyo.co.jp 2023-06-06 16:55:22 +09:00
parent 5ab495de81
commit c874596c86
4 changed files with 20 additions and 13 deletions

View File

@ -9,6 +9,7 @@ from starlette import status
from src.depends.auth import code_security
from src.depends.services import get_service
from src.error.exceptions import JWTTokenVerifyException, NotAuthorizeException
from src.logging.get_logger import get_logger
from src.model.internal.session import UserSession
from src.model.request.login import LoginModel
from src.model.view.mainte_login_view_model import MainteLoginViewModel
@ -21,6 +22,8 @@ from src.templates import templates
router = APIRouter()
router.route_class = AfterSetCookieSessionRoute
logger = get_logger('ログイン')
#########################
# Views #
#########################
@ -66,9 +69,10 @@ def login(
try:
jwt_token = login_service.login(request.username, request.password)
except NotAuthorizeException as e:
print(e)
logger.exception(e)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
except JWTTokenVerifyException:
except JWTTokenVerifyException as e:
logger.exception(e)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
verified_token = jwt_token.verify_token()
@ -77,10 +81,13 @@ def login(
user_record = login_service.logged_in_user(user_id)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# メンテユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_maintenance_user():
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
logger.info(f'メンテナンスユーザー認証成功: {user_id}')
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する
@ -118,7 +125,8 @@ def sso_authorize(
try:
# トークン検証
verified_token = jwt_token.verify_token()
except JWTTokenVerifyException:
except JWTTokenVerifyException as e:
logger.exception(e)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
# トークンからユーザーIDを取得
@ -126,11 +134,13 @@ def sso_authorize(
user_record = login_service.logged_in_user(user_id)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# Merckユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_groupware_user():
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
logger.info(f'顧客ユーザー認証成功: {user_id}')
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する

View File

@ -33,4 +33,4 @@ class UserMasterModel(BaseDBModel):
return self.mntuser_flg == '1'
def is_groupware_user(self):
return self.mntuser_flg == '0'
return self.mntuser_flg == '0' or self.mntuser_flg is None

View File

@ -14,11 +14,11 @@ class UserSession(DynamoDBTableModel):
session_key = UnicodeAttribute(hash_key=True)
user_id = UnicodeAttribute()
id_token = UnicodeAttribute()
doc_flg = UnicodeAttribute()
inst_flg = UnicodeAttribute()
bio_flg = UnicodeAttribute()
master_mainte_flg = UnicodeAttribute()
user_flg = UnicodeAttribute()
doc_flg = UnicodeAttribute(null=True)
inst_flg = UnicodeAttribute(null=True)
bio_flg = UnicodeAttribute(null=True)
master_mainte_flg = UnicodeAttribute(null=True)
user_flg = UnicodeAttribute(null=True)
refresh_token = UnicodeAttribute()
csrf_token = UnicodeAttribute()
last_access_time = NumberAttribute()

View File

@ -21,6 +21,3 @@ class UserViewModel(BaseModel):
def has_master_maintenance_permission(self):
return self.master_mainte_flg == '1'
def is_maintenance_user(self):
return self.user_flg == '1'