165 lines
6.6 KiB
Python
165 lines
6.6 KiB
Python
import secrets
|
|
import urllib.parse as parse
|
|
from typing import Union
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, Response
|
|
from fastapi.responses import RedirectResponse
|
|
from starlette import status
|
|
|
|
from src.depends.auth import code_security
|
|
from src.depends.services import get_service
|
|
from src.error.exceptions import JWTTokenVerifyException, NotAuthorizeException
|
|
from src.logging.get_logger import get_logger
|
|
from src.model.internal.session import UserSession
|
|
from src.model.request.login import LoginModel
|
|
from src.model.view.mainte_login_view_model import MainteLoginViewModel
|
|
from src.router.session_router import AfterSetCookieSessionRoute
|
|
from src.services.login_service import LoginService
|
|
from src.services.session_service import set_session
|
|
from src.system_var import constants, environment
|
|
from src.templates import templates
|
|
|
|
router = APIRouter()
|
|
router.route_class = AfterSetCookieSessionRoute
|
|
|
|
logger = get_logger('ログイン')
|
|
|
|
#########################
|
|
# Views #
|
|
#########################
|
|
|
|
|
|
@router.get('/userlogin')
|
|
def login_user_redirect_view():
|
|
auth_query_string = parse.urlencode(
|
|
{
|
|
'response_type': 'code',
|
|
'identity_provider': environment.COGNITO_IDENTITY_PROVIDER,
|
|
'client_id': environment.COGNITO_CLIENT_ID,
|
|
'redirect_uri': environment.COGNITO_REDIRECT_URI
|
|
}
|
|
)
|
|
authorize_endpoint_url = f'{environment.COGNITO_AUTH_DOMAIN}/{environment.AUTHORIZE_ENDPOINT}?{auth_query_string}'
|
|
|
|
return RedirectResponse(url=authorize_endpoint_url, status_code=status.HTTP_303_SEE_OTHER)
|
|
|
|
|
|
@router.get('/maintlogin')
|
|
def login_maintenance_view(request: Request):
|
|
mainte_login = MainteLoginViewModel()
|
|
return templates.TemplateResponse(
|
|
'maintlogin.html',
|
|
{
|
|
'request': request,
|
|
'mainte_login': mainte_login
|
|
}
|
|
)
|
|
|
|
#########################
|
|
# APIs #
|
|
#########################
|
|
|
|
|
|
@router.post('/maintlogin')
|
|
def login(
|
|
response: Response,
|
|
request: LoginModel = Depends(LoginModel.as_form),
|
|
login_service: LoginService = Depends(get_service(LoginService))
|
|
):
|
|
try:
|
|
jwt_token = login_service.login(request.username, request.password)
|
|
except NotAuthorizeException as e:
|
|
logger.exception(e)
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
|
|
except JWTTokenVerifyException as e:
|
|
logger.exception(e)
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
|
|
|
|
verified_token = jwt_token.verify_token()
|
|
# 普通の認証だと、`cognito:username`に入る。
|
|
user_id = verified_token.user_id
|
|
user_record = login_service.logged_in_user(user_id)
|
|
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
|
|
if not user_record.is_enable_user():
|
|
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
|
|
# メンテユーザーではない場合、ログアウトにリダイレクトする
|
|
if user_record is None or not user_record.is_maintenance_user():
|
|
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
|
|
logger.info(f'メンテナンスユーザー認証成功: {user_id}')
|
|
# CSRFトークンを生成
|
|
csrf_token = secrets.token_urlsafe(32)
|
|
# DynamoDBにトークンIDを設定する
|
|
session_model: UserSession = UserSession.new(
|
|
user_id=user_id,
|
|
id_token=verified_token.id_token,
|
|
refresh_token=verified_token.refresh_token,
|
|
csrf_token=csrf_token,
|
|
bio_flg=user_record.auth_flg1,
|
|
doc_flg=user_record.auth_flg2,
|
|
inst_flg=user_record.auth_flg3,
|
|
master_mainte_flg=user_record.auth_flg4,
|
|
user_flg=user_record.mntuser_flg
|
|
)
|
|
session_key = set_session(session_model)
|
|
|
|
response = RedirectResponse(
|
|
url='/menu/',
|
|
status_code=status.HTTP_303_SEE_OTHER,
|
|
headers={'session_key': session_key}
|
|
)
|
|
return response
|
|
|
|
|
|
@router.get('/authorize')
|
|
def sso_authorize(
|
|
code: Union[str, None] = Depends(code_security),
|
|
login_service: LoginService = Depends(get_service(LoginService))
|
|
) -> Response:
|
|
if not code:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=constants.LOGOUT_REASON_NOT_LOGIN)
|
|
|
|
# トークン取得
|
|
jwt_token = login_service.login_with_security_code(code)
|
|
try:
|
|
# トークン検証
|
|
verified_token = jwt_token.verify_token()
|
|
except JWTTokenVerifyException as e:
|
|
logger.exception(e)
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
|
|
|
|
# トークンからユーザーIDを取得
|
|
user_id = verified_token.user_id
|
|
user_record = login_service.logged_in_user(user_id)
|
|
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
|
|
if not user_record.is_enable_user():
|
|
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
|
|
# Merckユーザーではない場合、ログアウトにリダイレクトする
|
|
if user_record is None or not user_record.is_groupware_user():
|
|
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
|
|
logger.info(f'顧客ユーザー認証成功: {user_id}')
|
|
# CSRFトークンを生成
|
|
csrf_token = secrets.token_urlsafe(32)
|
|
# DynamoDBにトークンIDを設定する
|
|
session_model: UserSession = UserSession.new(
|
|
user_id=user_id,
|
|
id_token=verified_token.id_token,
|
|
refresh_token=verified_token.refresh_token,
|
|
csrf_token=csrf_token,
|
|
bio_flg=user_record.auth_flg1,
|
|
doc_flg=user_record.auth_flg2,
|
|
inst_flg=user_record.auth_flg3,
|
|
master_mainte_flg=user_record.auth_flg4,
|
|
user_flg=user_record.mntuser_flg
|
|
)
|
|
session_key = set_session(session_model)
|
|
response = RedirectResponse(
|
|
url='/menu/',
|
|
status_code=status.HTTP_303_SEE_OTHER,
|
|
headers={'session_key': session_key}
|
|
)
|
|
return response
|