58 lines
2.1 KiB
Python
58 lines
2.1 KiB
Python
import base64
|
|
import hashlib
|
|
import hmac
|
|
|
|
from src.aws.aws_api_client import AWSAPIClient
|
|
from src.aws.cognito import CognitoClient
|
|
from src.error.exceptions import NotAuthorizeException
|
|
from src.model.db.user_master import UserMasterModel
|
|
from src.model.internal.jwt_token import JWTToken
|
|
from src.repositories.base_repository import BaseRepository
|
|
from src.repositories.user_master_repository import UserMasterRepository
|
|
from src.services.base_service import BaseService
|
|
from src.system_var import environment
|
|
|
|
|
|
class LoginService(BaseService):
|
|
REPOSITORIES = {
|
|
'user_repository': UserMasterRepository
|
|
}
|
|
|
|
CLIENTS = {
|
|
'cognito_client': CognitoClient
|
|
}
|
|
|
|
def __init__(self, repositories: dict[str, BaseRepository], clients: dict[str, AWSAPIClient]) -> None:
|
|
super().__init__(repositories, clients)
|
|
self.user_repository = repositories['user_repository']
|
|
self.cognito_client = clients['cognito_client']
|
|
|
|
def login(self, username: str, password: str) -> JWTToken:
|
|
try:
|
|
id_token, refresh_token = self.cognito_client.login_by_user_password_flow(
|
|
username,
|
|
password,
|
|
self.__secret_hash(username)
|
|
)
|
|
except Exception as e:
|
|
if e.response['Error']['Code'] == 'NotAuthorizedException':
|
|
raise NotAuthorizeException(e)
|
|
else:
|
|
raise e
|
|
|
|
return JWTToken(id_token, refresh_token)
|
|
|
|
def login_with_security_code(self, code: str) -> JWTToken:
|
|
return JWTToken.request(code)
|
|
|
|
def logged_in_user(self, user_id):
|
|
user_record: UserMasterModel = self.user_repository.fetch_one({'user_id': user_id})
|
|
return user_record
|
|
|
|
def __secret_hash(self, username: str):
|
|
# see - https://aws.amazon.com/jp/premiumsupport/knowledge-center/cognito-unable-to-verify-secret-hash/ # noqa
|
|
message = bytes(username + environment.COGNITO_CLIENT_ID, 'utf-8')
|
|
key = bytes(environment.COGNITO_CLIENT_SECRET, 'utf-8')
|
|
digest = hmac.new(key, message, digestmod=hashlib.sha256).digest()
|
|
return base64.b64encode(digest).decode()
|