Source code for spresso.model.authentication.identity_assertion

from spresso.model.base import Composition, SettingsMixin
from spresso.utils.base import update_existing_keys, to_b64, from_b64
from spresso.utils.crypto import create_signature, decrypt_aes_gcm, \
    verify_signature
from spresso.utils.error import InvalidSettings


[docs]class IdentityAssertionBase(Composition, SettingsMixin): """ Basic Identity Assertion Class. The template instances 'signature' and 'expected_signature' can be extended to hold further information. Object is used by IdP and RP. """ template = Composition( tag=None, email=None, forwarder_domain=None ) def __init__(self, **kwargs): super(IdentityAssertionBase, self).__init__(**kwargs) self.expected_signature = Composition() self.expected_signature.update(self.template) self.signature = Composition() self.signature.update(self.template) self.tag = None self.email = None self.forwarder_domain = None self.public_key = None self.iv = None self.cipher_text = None self.ia_key = None
[docs] def from_session(self, session): """ Load an Identity Assertion from a :class:`Session` object. Args: session(:class:`Session`): The session instance. """ self.tag = session.tag_enc_json self.email = session.user.email self.forwarder_domain = session.forwarder_domain self.ia_key = session.ia_key self.public_key = session.idp_wk.public_key
[docs] def from_request(self, request): """ Load an Identity Assertion from a request object. Args: request(:class:`Request`): The request instance. """ self.email = request.post_param('email') self.tag = request.post_param('tag') self.forwarder_domain = request.post_param('forwarder_domain')
[docs]class IdentityAssertion(IdentityAssertionBase):
[docs] def sign(self): """ Method for signing the identity assertion. Returns: str: The b64-encoded signature. Raises: ValueError: Attempt of creating a signature from a malformed IA. InvalidSettings: The private key is missing. """ # Update IA template with values from self update_existing_keys(self, self.signature) if self.settings.private_key is None: raise InvalidSettings( "Private key is empty" ) if None in self.signature.values(): raise ValueError( "Empty required parameter during signature creation" ) # Create b64 encoded JSON ia_json = self.signature.to_json() ia_json_bytes = ia_json.encode('utf-8') # Create signature signature = create_signature(self.settings.private_key, ia_json_bytes) return to_b64(signature)
[docs] def decrypt(self, data): """ Decrypt the encrypted Identity Assertion. Args: data(str): The encrypted IA as serialized JSON. Returns: bytes: The decrypted IA as serialized JSON. Raises: ValueError: A required parameter of the encrypted IA is missing. """ eia = Composition() eia.from_json(data) self.iv = eia.iv self.cipher_text = eia.ciphertext if None in [self.iv, self.cipher_text, self.ia_key]: raise ValueError( "Empty required parameter in encrypted Identity Assertion" ) self.iv = from_b64(self.iv, return_bytes=True) self.cipher_text = from_b64(self.cipher_text, return_bytes=True) # First 12 characters for iv iv = self.iv[:12] # Last 16 characters for authentication tag auth_tag = self.cipher_text[-16:] # Remaining characters for cipher text cipher_text = self.cipher_text[0:-16] # Decrypt tag, in this case no additional authentication data is used return decrypt_aes_gcm(self.ia_key, iv, auth_tag, cipher_text)
[docs] def verify(self, signature): """ Verifies with a public key from whom the data came that it was indeed signed by their private key. Args: signature(bytes): The Identity Assertion as serialized JSON. Raises: ValueError: A required parameter to perform the verification is missing. InvalidSignature: The signature verification of the IA failed. """ # Get signature from IA if signature is None: raise ValueError("Empty required parameter during: signature") ia = signature.decode('utf-8') ia_json = Composition() ia_json.from_json(ia) signature_b64 = ia_json.ia_signature signature_bytes = from_b64(signature_b64, return_bytes=True) # Update IA template with values from self update_existing_keys(self, self.expected_signature) if None in [*self.expected_signature.values(), self.public_key]: raise ValueError("Empty required parameter in expected signature") # Get expected signature expected_signature = self.expected_signature.to_json() expected_signature_bytes = expected_signature.encode('utf-8') public_key_bytes = self.public_key.encode('utf-8') # Verify, throws exception on failure verify_signature( public_key_bytes, signature_bytes, expected_signature_bytes )