79 lines
3.0 KiB
Python

import base64
import hashlib
import hmac
from src.aws.aws_api_client import AWSAPIClient
from src.aws.cognito import CognitoClient
from src.error.exceptions import NotAuthorizeException
from src.model.db.user_master import UserMasterModel
from src.model.internal.jwt_token import JWTToken
from src.repositories.base_repository import BaseRepository
from src.repositories.user_master_repository import UserMasterRepository
from src.services.base_service import BaseService
from src.system_var import environment
class LoginService(BaseService):
REPOSITORIES = {
'user_repository': UserMasterRepository
}
CLIENTS = {
'cognito_client': CognitoClient
}
def __init__(self, repositories: dict[str, BaseRepository], clients: dict[str, AWSAPIClient]) -> None:
super().__init__(repositories, clients)
self.user_repository = repositories['user_repository']
self.cognito_client = clients['cognito_client']
def login(self, username: str, password: str) -> JWTToken:
try:
id_token, refresh_token = self.cognito_client.login_by_user_password_flow(
username,
password,
self.__secret_hash(username)
)
except Exception as e:
if e.response['Error']['Code'] == 'NotAuthorizedException':
raise NotAuthorizeException(e)
else:
raise e
return JWTToken(id_token, refresh_token)
def login_with_security_code(self, code: str) -> JWTToken:
return JWTToken.request(code)
def logged_in_user(self, user_id):
user_record: UserMasterModel = self.user_repository.fetch_one({'user_id': user_id})
return user_record
def increase_login_failed_count(self, user_id: str):
try:
# セッション内のタイムゾーン変更のため、明示的にトランザクションを開始する
self.user_repository.begin()
self.user_repository.to_jst()
self.user_repository.increase_login_failed_count({'user_id': user_id})
self.user_repository.commit()
except Exception as e:
self.user_repository.rollback()
raise e
def on_login_fail_limit_exceeded(self, user_id: str):
self.user_repository.disable_mnt_user({'user_id': user_id})
def is_login_failed_limit_exceeded(self, user_id: str):
user_record: UserMasterModel = self.user_repository.fetch_one({'user_id': user_id})
if user_record is None:
return False
return user_record.is_login_failed_limit_exceeded()
def __secret_hash(self, username: str):
# see - https://aws.amazon.com/jp/premiumsupport/knowledge-center/cognito-unable-to-verify-secret-hash/ # noqa
message = bytes(username + environment.COGNITO_CLIENT_ID, 'utf-8')
key = bytes(environment.COGNITO_CLIENT_SECRET, 'utf-8')
digest = hmac.new(key, message, digestmod=hashlib.sha256).digest()
return base64.b64encode(digest).decode()