155 lines
5.9 KiB
Python

import secrets
import urllib.parse as parse
from typing import Union
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from starlette import status
from src.depends.auth import code_security
from src.depends.services import get_service
from src.error.exceptions import JWTTokenVerifyException, NotAuthorizeException
from src.model.internal.session import UserSession
from src.model.request.login import LoginModel
from src.model.view.mainte_login_view_model import MainteLoginViewModel
from src.router.session_router import AfterSetCookieSessionRoute
from src.services.login_service import LoginService
from src.services.session_service import set_session
from src.system_var import constants, environment
from src.templates import templates
router = APIRouter()
router.route_class = AfterSetCookieSessionRoute
#########################
# Views #
#########################
@router.get('/userlogin')
def login_user_redirect_view():
auth_query_string = parse.urlencode(
{
'response_type': 'code',
'identity_provider': environment.COGNITO_IDENTITY_PROVIDER,
'client_id': environment.COGNITO_CLIENT_ID,
'redirect_uri': environment.COGNITO_REDIRECT_URI
}
)
authorize_endpoint_url = f'{environment.COGNITO_AUTH_DOMAIN}/{environment.AUTHORIZE_ENDPOINT}?{auth_query_string}'
return RedirectResponse(url=authorize_endpoint_url, status_code=status.HTTP_303_SEE_OTHER)
@router.get('/maintlogin')
def login_maintenance_view(request: Request):
mainte_login = MainteLoginViewModel()
return templates.TemplateResponse(
'maintlogin.html',
{
'request': request,
'mainte_login': mainte_login
}
)
#########################
# APIs #
#########################
@router.post('/maintlogin')
def login(
response: Response,
request: LoginModel = Depends(LoginModel.as_form),
login_service: LoginService = Depends(get_service(LoginService))
):
try:
jwt_token = login_service.login(request.username, request.password)
except NotAuthorizeException as e:
print(e)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
except JWTTokenVerifyException:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
verified_token = jwt_token.verify_token()
# 普通の認証だと、`cognito:username`に入る。
user_id = verified_token.user_id
user_record = login_service.logged_in_user(user_id)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# メンテユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_maintenance_user():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する
session_model: UserSession = UserSession.new(
user_id=user_id,
id_token=verified_token.id_token,
refresh_token=verified_token.refresh_token,
csrf_token=csrf_token,
bio_flg=user_record.auth_flg1,
doc_flg=user_record.auth_flg2,
inst_flg=user_record.auth_flg3,
master_mainte_flg=user_record.auth_flg4,
user_flg=user_record.mntuser_flg
)
session_key = set_session(session_model)
response = RedirectResponse(
url='/menu/',
status_code=status.HTTP_303_SEE_OTHER,
headers={'session_key': session_key}
)
return response
@router.get('/authorize')
def sso_authorize(
code: Union[str, None] = Depends(code_security),
login_service: LoginService = Depends(get_service(LoginService))
) -> Response:
if not code:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=constants.LOGOUT_REASON_NOT_LOGIN)
# トークン取得
jwt_token = login_service.login_with_security_code(code)
try:
# トークン検証
verified_token = jwt_token.verify_token()
except JWTTokenVerifyException:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_SESSION_EXPIRED)
# トークンからユーザーIDを取得
user_id = verified_token.user_id
user_record = login_service.logged_in_user(user_id)
# ユーザーが有効ではない場合、ログアウトにリダイレクトする
if not user_record.is_enable_user():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# Merckユーザーではない場合、ログアウトにリダイレクトする
if user_record is None or not user_record.is_groupware_user():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=constants.LOGOUT_REASON_LOGIN_ERROR)
# CSRFトークンを生成
csrf_token = secrets.token_urlsafe(32)
# DynamoDBにトークンIDを設定する
session_model: UserSession = UserSession.new(
user_id=user_id,
id_token=verified_token.id_token,
refresh_token=verified_token.refresh_token,
csrf_token=csrf_token,
bio_flg=user_record.auth_flg1,
doc_flg=user_record.auth_flg2,
inst_flg=user_record.auth_flg3,
master_mainte_flg=user_record.auth_flg4,
user_flg=user_record.mntuser_flg
)
session_key = set_session(session_model)
response = RedirectResponse(
url='/menu/',
status_code=status.HTTP_303_SEE_OTHER,
headers={'session_key': session_key}
)
return response