Source code for spresso.controller.grant.authentication.relying_party

from json import JSONDecodeError
from urllib.parse import unquote

from cryptography.exceptions import InvalidTag, InvalidSignature
from jsonschema import ValidationError

from spresso.controller.grant.authentication.site_adapter.relying_party import \
    IndexSiteAdapter, StartLoginSiteAdapter, \
    RedirectSiteAdapter, LoginSiteAdapter
from spresso.controller.grant.base import GrantHandler, \
    SiteAdapterMixin, ValidatingGrantHandler, JsonErrorMixin
from spresso.model.authentication.identity_assertion import IdentityAssertion
from spresso.model.authentication.request import IdpInfoRequest
from spresso.model.authentication.session import Session
from spresso.model.base import SettingsMixin, Origin, User
from spresso.utils.base import create_nonce, from_b64
from spresso.utils.error import SpressoInvalidError, UnsupportedAdditionalData
from spresso.view.authentication.relying_party import WaitView, \
    StartLoginView, RedirectView, LoginView
from spresso.view.base import View, Script


[docs]class IndexHandler(GrantHandler, SiteAdapterMixin, SettingsMixin): site_adapter_class = IndexSiteAdapter
[docs] def process(self, request, response, environ): script = Script(settings=self.settings) self.site_adapter.set_javascript(script.render()) data = self.site_adapter.render_page(request, response, environ) view = View() return view.process(data)
[docs]class WaitHandler(GrantHandler, SettingsMixin):
[docs] def process(self, request, response, environ): view = WaitView(settings=self.settings) return view.process(response)
[docs]class StartLoginHandler(ValidatingGrantHandler, SiteAdapterMixin, SettingsMixin, JsonErrorMixin): site_adapter_class = StartLoginSiteAdapter
[docs] def read_validate_params(self, request): self.user = User( request.post_param('email'), regexp=self.settings.regexp ) if not self.user.is_valid: raise SpressoInvalidError( error="invalid_request", message="Missing or malformed email in request", uri=request.path )
[docs] def process(self, request, response, environ): retriever = IdpInfoRequest(self.user.netloc, settings=self.settings) idp_info = retriever.get_content() session = Session(self.user, idp_info, settings=self.settings) try: session.validate() except JSONDecodeError: raise SpressoInvalidError( error="invalid_session", message="JSON decoding failed", uri=request.path ) except (ValidationError, ValueError) as error: raise SpressoInvalidError( error="invalid_session", message=error, uri=request.path ) self.site_adapter.save_session(session) view = StartLoginView(session, settings=self.settings) return view.process(response)
[docs]class RedirectHandler(ValidatingGrantHandler, SiteAdapterMixin, SettingsMixin, JsonErrorMixin): site_adapter_class = RedirectSiteAdapter
[docs] def read_validate_params(self, request): login_session_token = request.get_param('login_session_token') if not login_session_token: raise SpressoInvalidError( error="invalid_request", message="Missing required parameter in request", uri=request.path ) self.login_session_token = from_b64( unquote(login_session_token), return_bytes=True )
[docs] def process(self, request, response, environ): login_session = self.site_adapter.load_session( self.login_session_token ) if not login_session or not isinstance(login_session, Session): raise SpressoInvalidError( error="invalid_request", message="Invalid session", uri=request.path ) view = RedirectView(settings=self.settings) try: login_url = login_session.get_login_url() except ValueError as error: raise SpressoInvalidError( error="invalid_request", message=error, uri=request.path ) view.template_context = dict(url=login_url) return view.process(response)
[docs]class LoginHandler(ValidatingGrantHandler, SiteAdapterMixin, SettingsMixin, JsonErrorMixin): site_adapter_class = LoginSiteAdapter
[docs] def read_validate_params(self, request): login_session_token = request.post_param('login_session_token') self.eia = request.post_param('eia') origin_header = request.header('Origin') if None in [login_session_token, self.eia, origin_header]: raise SpressoInvalidError( error="invalid_request", message="Missing required parameter in request", uri=request.path ) self.login_session_token = from_b64( login_session_token, return_bytes=True ) origin = Origin(origin_header, settings=self.settings) if not origin.valid: raise SpressoInvalidError( error="invalid_request", message="Origin request_header mismatch", uri=request.path )
[docs] def process(self, request, response, environ): login_session = self.site_adapter.load_session( self.login_session_token ) if not login_session or not isinstance(login_session, Session): raise SpressoInvalidError( error="invalid_request", message="Invalid session", uri=request.path ) ia = IdentityAssertion(settings=self.settings) ia.from_session(login_session) try: signature = ia.decrypt(self.eia) except JSONDecodeError: raise SpressoInvalidError( error="invalid_eia", message="JSON decoding failed", uri=request.path ) except ValueError as error: raise SpressoInvalidError( error="invalid_eia", message=error, uri=request.path ) except InvalidTag: raise SpressoInvalidError( error="invalid_eia", message="Decryption of the Identity Assertion failed", uri=request.path ) # Extend IA additional_data = self.site_adapter.get_additional_data() if not isinstance(additional_data, dict): raise UnsupportedAdditionalData( "Additional data must be of type 'dict'" ) ia.expected_signature.update(additional_data) try: ia.verify(signature) except JSONDecodeError: raise SpressoInvalidError( error="invalid_signature", message="JSON decoding failed", uri=request.path ) except ValueError as error: raise SpressoInvalidError( error="invalid_signature", message=error, uri=request.path ) except InvalidSignature: raise SpressoInvalidError( error="invalid_signature", message="Identity Assertion signature verification failed", uri=request.path ) service_token = create_nonce(32) login_session.token = service_token self.site_adapter.save_session(login_session) response = self.site_adapter.set_cookie(service_token, response) view = LoginView(login_session.user.email) return view.process(response)