187 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import secrets
import urllib.parse as parse
from typing import Union
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from starlette import status
from src.depends.auth import code_security
from src.depends.services import get_service
from src.error.exceptions import JWTTokenVerifyException, NotAuthorizeException
from src.logging.get_logger import get_logger
from src.model.internal.session import UserSession
from src.model.request.login import LoginModel
from src.model.view.mainte_login_view_model import MainteLoginViewModel
from src.router.session_router import AfterSetCookieSessionRoute
from src.services.login_service import LoginService
from src.services.session_service import set_session
from src.system_var import constants, environment
from src.templates import templates
router = APIRouter()
router.route_class = AfterSetCookieSessionRoute
logger = get_logger('ログイン')
#########################
# Views #
#########################
@router.get('/userlogin')
def login_user_redirect_view():
auth_query_string = parse.urlencode(
{
'response_type': 'code',
'identity_provider': environment.COGNITO_IDENTITY_PROVIDER,
'client_id': environment.COGNITO_CLIENT_ID,
'redirect_uri': environment.COGNITO_REDIRECT_URI
}
)
authorize_endpoint_url = f'{environment.COGNITO_AUTH_DOMAIN}/{environment.AUTHORIZE_ENDPOINT}?{auth_query_string}'
return RedirectResponse(url=authorize_endpoint_url, status_code=status.HTTP_303_SEE_OTHER)
@router.get('/maintlogin')
def login_maintenance_view(request: Request):
mainte_login = MainteLoginViewModel()
return templates.TemplateResponse(
'maintlogin.html',
{
'request': request,
'mainte_login': mainte_login
}
)
#########################
# APIs #
#########################
@router.post('/maintlogin')
def login(
response: Response,
request: LoginModel = Depends(LoginModel.as_form),
login_service: LoginService = Depends(get_service(LoginService))
):
try:
jwt_token = login_service.login(request.username, request.password)
except NotAuthorizeException as e:
logger.info(f'ログイン失敗:{e}')
# ログイン失敗回数をカウント
login_service.increase_login_failed_count(request.username)
# ログイン失敗回数を超過した場合はメッセージを変える
if login_service.is_login_failed_limit_exceeded(request.username):
login_service.on_login_fail_limit_exceeded(request.username)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_FAILED_LIMIT_EXCEEDED)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
except JWTTokenVerifyException as e:
logger.info(f'ログイン失敗:{e}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# ログイン成功問わず、DBのログイン失敗回数が10回以上あれば、ログアウト画面にリダイレクトする
if login_service.is_login_failed_limit_exceeded(request.username):
logger.info(f'ログイン失敗回数が10回以上: {request.username}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_FAILED_LIMIT_EXCEEDED)
verified_token = jwt_token.verify_token()
# 普通の認証だと、`cognito:username`に入る。
user_id = verified_token.user_id
user_record = login_service.logged_in_user(user_id)
# ユーザーがマスタに存在しない場合、ログアウトにリダイレクトする
if user_record is None:
logger.info(f'存在しないユーザー: {user_id}, ユーザーID: {user_id}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# メンテユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_maintenance_user():
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
logger.info(f'メンテナンスユーザー認証成功: {user_id}')
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する
session_model: UserSession = UserSession.new(
user_id=user_id,
id_token=verified_token.id_token,
refresh_token=verified_token.refresh_token,
csrf_token=csrf_token,
bio_flg=user_record.bio_sales_inq_auth_flg,
doc_flg=user_record.ult_doctor_inq_auth_flg,
inst_flg=user_record.ult_inst_inq_auth_flg,
master_mainte_flg=user_record.auth_flg4,
user_flg=user_record.mntuser_flg
)
session_key = set_session(session_model)
response = RedirectResponse(
url='/menu/',
status_code=status.HTTP_303_SEE_OTHER,
headers={'session_key': session_key}
)
return response
@router.get('/authorize')
def sso_authorize(
code: Union[str, None] = Depends(code_security),
login_service: LoginService = Depends(get_service(LoginService))
) -> Response:
if not code:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=constants.LOGOUT_REASON_NOT_LOGIN)
# トークン取得
jwt_token = login_service.login_with_security_code(code)
try:
# トークン検証
verified_token = jwt_token.verify_token()
except JWTTokenVerifyException as e:
logger.info(f'SSOログイン失敗{e}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
# トークンからユーザーIDを取得
user_id = verified_token.user_id
user_record = login_service.logged_in_user(user_id)
# ユーザーがマスタに存在しない場合、ログアウトにリダイレクトする
if user_record is None:
logger.info(f'存在しないユーザー: {user_id}, ユーザーID: {user_id}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
logger.info(f'無効なユーザー: {user_id}, 有効フラグ: {user_record.enabled_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# Merckユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_groupware_user():
logger.info(f'メンテナンスユーザーではない: {user_id}, メンテナンスユーザーフラグ: {user_record.mntuser_flg}')
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
logger.info(f'顧客ユーザー認証成功: {user_id}')
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する
session_model: UserSession = UserSession.new(
user_id=user_id,
id_token=verified_token.id_token,
refresh_token=verified_token.refresh_token,
csrf_token=csrf_token,
bio_flg=user_record.bio_sales_inq_auth_flg,
doc_flg=user_record.ult_doctor_inq_auth_flg,
inst_flg=user_record.ult_inst_inq_auth_flg,
master_mainte_flg=user_record.auth_flg4,
user_flg=user_record.mntuser_flg
)
session_key = set_session(session_model)
response = RedirectResponse(
url='/menu/',
status_code=status.HTTP_303_SEE_OTHER,
headers={'session_key': session_key}
)
return response