61 lines
2.3 KiB
Python
61 lines
2.3 KiB
Python
import datetime
|
|
from typing import Union
|
|
|
|
from fastapi import Depends
|
|
from fastapi.security import APIKeyCookie, APIKeyQuery
|
|
|
|
from src.error.exceptions import JWTTokenVerifyException
|
|
from src.logging.get_logger import get_logger
|
|
from src.model.internal.jwt_token import JWTToken
|
|
from src.model.internal.session import UserSession
|
|
from src.services.session_service import get_session, set_session
|
|
from src.system_var import environment
|
|
|
|
logger = get_logger('認証チェック')
|
|
cookie_security = APIKeyCookie(name='session', auto_error=False)
|
|
code_security = APIKeyQuery(name='code', auto_error=False)
|
|
|
|
|
|
def get_current_session(session_key=Depends(cookie_security)) -> Union[UserSession, None]:
|
|
if session_key is None:
|
|
return None
|
|
|
|
session = get_session(session_key)
|
|
|
|
# sessionが存在しない場合はNoneが返る
|
|
return session
|
|
|
|
|
|
def check_session_expired(session: Union[UserSession, None] = Depends(get_current_session)) -> Union[UserSession, None]:
|
|
"""セッションの最後にアクセスした時間が、セッション有効期限切れであるかどうかをチェックする"""
|
|
if session is None:
|
|
return None
|
|
|
|
last_access_time = session.last_access_time
|
|
last_access_datetime = datetime.datetime.fromtimestamp(last_access_time)
|
|
session_expired_period = last_access_datetime + datetime.timedelta(minutes=environment.SESSION_EXPIRE_MINUTE)
|
|
logger.debug(f'last_access_time: {last_access_datetime}')
|
|
logger.debug(f'session_expired_period: {session_expired_period}')
|
|
if session_expired_period < datetime.datetime.now():
|
|
return None
|
|
|
|
return session
|
|
|
|
|
|
def verify_session(session: Union[UserSession, None] = Depends(check_session_expired)) -> Union[UserSession, None]:
|
|
if session is None:
|
|
return None
|
|
jwt_token = JWTToken(session.id_token, session.refresh_token)
|
|
try:
|
|
verified_token = jwt_token.verify_token()
|
|
except JWTTokenVerifyException as e:
|
|
logger.info(e)
|
|
return None
|
|
|
|
# IDトークンがリフレッシュされた場合、セッションに詰め直して更新
|
|
if verified_token.is_refreshed:
|
|
session.update(actions=[UserSession.id_token.set(verified_token.id_token)])
|
|
set_session(session)
|
|
session.id_token = verified_token.id_token
|
|
return session
|