# Copyright (C) 2020 Glodo UK # Copyright (C) 2010-2016, 2022 XCG Consulting # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import base64 import json import logging import os import tempfile import urllib.parse import requests # dependency name is pysaml2 # pylint: disable=W7936 import saml2 import saml2.xmldsig as ds from saml2.client import Saml2Client from saml2.config import Config as Saml2Config from saml2.sigver import SignatureError from odoo import api, fields, models from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class AuthSamlProvider(models.Model): """Configuration values of a SAML2 provider""" _name = "auth.saml.provider" _description = "SAML2 Provider" _order = "sequence, name" name = fields.Char("Provider Name", required=True, index="trigram") entity_id = fields.Char( "Entity ID", help="EntityID passed to IDP, used to identify the Odoo", required=True, default="odoo", ) idp_metadata = fields.Text( string="Identity Provider Metadata", help=( "Configuration for this Identity Provider. Supplied by the" " provider, in XML format." ), required=True, ) idp_metadata_url = fields.Char( string="Identity Provider Metadata URL", help="Some SAML providers, notably Office365 can have a metadata " "document which changes over time, and they provide a URL to the " "document instead. When this field is set, the metadata can be " "fetched from the provided URL.", ) sp_baseurl = fields.Text( string="Override Base URL", help="""Base URL sent to Odoo with this, rather than automatically detecting from request or system parameter web.base.url""", ) sp_pem_public = fields.Binary( string="Odoo Public Certificate", attachment=True, required=True, ) sp_pem_public_filename = fields.Char("Odoo Public Certificate File Name") sp_pem_private = fields.Binary( string="Odoo Private Key", attachment=True, required=True, ) sp_pem_private_filename = fields.Char("Odoo Private Key File Name") sp_metadata_url = fields.Char( compute="_compute_sp_metadata_url", string="Metadata URL", readonly=True, ) matching_attribute = fields.Char( string="Identity Provider matching attribute", default="subject.nameId", required=True, help=( "Attribute to look for in the returned IDP response to match" " against an Odoo user." ), ) matching_attribute_to_lower = fields.Boolean( string="Lowercase IDP Matching Attribute", help="Force matching_attribute to lower case before passing back to Odoo.", ) attribute_mapping_ids = fields.One2many( "auth.saml.attribute.mapping", "provider_id", string="Attribute Mapping", ) active = fields.Boolean(default=True) sequence = fields.Integer(index=True) css_class = fields.Char( string="Button Icon CSS class", help="Add a CSS class that serves you to style the login button.", default="fa fa-fw fa-sign-in text-primary", ) body = fields.Char( string="Login button label", help="Link text in Login Dialog", translate=True ) autoredirect = fields.Boolean( "Automatic Redirection", default=False, help="Only the provider with the higher priority will be automatically " "redirected", ) sig_alg = fields.Selection( selection=lambda s: s._sig_alg_selection(), required=True, string="Signature Algorithm", ) # help string is from pysaml2 documentation authn_requests_signed = fields.Boolean( default=True, help="Indicates if the Authentication Requests sent by this SP should be signed" " by default.", ) logout_requests_signed = fields.Boolean( default=True, help="Indicates if this entity will sign the Logout Requests originated from it" ".", ) want_assertions_signed = fields.Boolean( default=True, help="Indicates if this SP wants the IdP to send the assertions signed.", ) want_response_signed = fields.Boolean( default=True, help="Indicates that Authentication Responses to this SP must be signed.", ) want_assertions_or_response_signed = fields.Boolean( default=True, help="Indicates that either the Authentication Response or the assertions " "contained within the response to this SP must be signed.", ) # this one is used in Saml2Client.prepare_for_authenticate sign_authenticate_requests = fields.Boolean( default=True, help="Whether the request should be signed or not", ) sign_metadata = fields.Boolean( default=True, help="Whether metadata should be signed or not", ) @api.model def _sig_alg_selection(self): return [(sig[0], sig[0]) for sig in ds.SIG_ALLOWED_ALG] @api.onchange("name") def _onchange_name(self): if not self.body: self.body = self.name @api.depends("sp_baseurl") def _compute_sp_metadata_url(self): icp_base_url = ( self.env["ir.config_parameter"].sudo().get_param("web.base.url", "") ) for record in self: if isinstance(record.id, models.NewId): record.sp_metadata_url = False continue base_url = icp_base_url if record.sp_baseurl: base_url = record.sp_baseurl qs = urllib.parse.urlencode({"p": record.id, "d": self.env.cr.dbname}) record.sp_metadata_url = urllib.parse.urljoin( base_url, ("/auth_saml/metadata?%s" % qs) ) def _get_cert_key_path(self, field="sp_pem_public"): self.ensure_one() model_attachment = self.env["ir.attachment"].sudo() keys = model_attachment.search( [ ("res_model", "=", self._name), ("res_field", "=", field), ("res_id", "=", self.id), ], limit=1, ) if model_attachment._storage() != "file": # For non-file locations we need to create a temp file to pass to pysaml. fd, keys_path = tempfile.mkstemp() with open(keys_path, "wb") as f: f.write(base64.b64decode(keys.datas)) os.close(fd) else: keys_path = model_attachment._full_path(keys.store_fname) return keys_path def _get_config_for_provider(self, base_url: str = None) -> Saml2Config: """ Internal helper to get a configured Saml2Client """ self.ensure_one() if self.sp_baseurl: base_url = self.sp_baseurl if not base_url: base_url = ( self.env["ir.config_parameter"].sudo().get_param("web.base.url", "") ) acs_url = urllib.parse.urljoin(base_url, "/auth_saml/signin") settings = { "metadata": {"inline": [self.idp_metadata]}, "entityid": self.entity_id, "service": { "sp": { "endpoints": { "assertion_consumer_service": [ (acs_url, saml2.BINDING_HTTP_REDIRECT), (acs_url, saml2.BINDING_HTTP_POST), (acs_url, saml2.BINDING_HTTP_REDIRECT), (acs_url, saml2.BINDING_HTTP_POST), ], }, "allow_unsolicited": False, "authn_requests_signed": self.authn_requests_signed, "logout_requests_signed": self.logout_requests_signed, "want_assertions_signed": self.want_assertions_signed, "want_response_signed": self.want_response_signed, "want_assertions_or_response_signed": ( self.want_assertions_or_response_signed ), }, }, "cert_file": self._get_cert_key_path("sp_pem_public"), "key_file": self._get_cert_key_path("sp_pem_private"), } try: sp_config = Saml2Config() sp_config.load(settings) sp_config.allow_unknown_attributes = True return sp_config except saml2.SAMLError: if self.env.context.get("saml2_retry_after_refresh_metadata", False): raise # Retry after refresh metadata self.action_refresh_metadata_from_url() return self.with_context( saml2_retry_after_refresh_metatata=1 )._get_config_for_provider(base_url) def _get_client_for_provider(self, base_url: str = None) -> Saml2Client: sp_config = self._get_config_for_provider(base_url) saml_client = Saml2Client(config=sp_config) return saml_client def _get_auth_request(self, extra_state=None, url_root=None): """ build an authentication request and give it back to our client """ self.ensure_one() if extra_state is None: extra_state = {} state = { "d": self.env.cr.dbname, "p": self.id, } state.update(extra_state) sig_alg = ds.SIG_RSA_SHA1 if self.sig_alg: sig_alg = getattr(ds, self.sig_alg) saml_client = self._get_client_for_provider(url_root) reqid, info = saml_client.prepare_for_authenticate( sign=self.sign_authenticate_requests, relay_state=json.dumps(state), sigalg=sig_alg, ) redirect_url = None # Select the IdP URL to send the AuthN request to for key, value in info["headers"]: if key == "Location": redirect_url = value self._store_outstanding_request(reqid) return redirect_url def _validate_auth_response(self, token: str, base_url: str = None): """return the validation data corresponding to the access token""" self.ensure_one() try: client = self._get_client_for_provider(base_url) response = client.parse_authn_request_response( token, saml2.entity.BINDING_HTTP_POST, self._get_outstanding_requests_dict(), ) except SignatureError: # we have a metadata url: try to refresh the metadata document if self.idp_metadata_url: self.action_refresh_metadata_from_url() # retry: if it fails again, we let the exception flow client = self._get_client_for_provider(base_url) response = client.parse_authn_request_response( token, saml2.entity.BINDING_HTTP_POST, self._get_outstanding_requests_dict(), ) else: raise matching_value = None if self.matching_attribute == "subject.nameId": matching_value = response.name_id.text else: attrs = response.get_identity() for k, v in attrs.items(): if k == self.matching_attribute: matching_value = v break if not matching_value: raise Exception( "Matching attribute %s not found in user attrs: %s" % (self.matching_attribute, attrs) ) if matching_value and isinstance(matching_value, list): matching_value = next(iter(matching_value), None) if isinstance(matching_value, str) and self.matching_attribute_to_lower: matching_value = matching_value.lower() vals = {"user_id": matching_value} post_vals = self._hook_validate_auth_response(response, matching_value) if post_vals: vals.update(post_vals) return vals def _get_outstanding_requests_dict(self): self.ensure_one() requests = ( self.env["auth_saml.request"] .sudo() .search([("saml_provider_id", "=", self.id)]) ) return {record.saml_request_id: record.id for record in requests} def _store_outstanding_request(self, reqid): self.ensure_one() self.env["auth_saml.request"].sudo().create( {"saml_provider_id": self.id, "saml_request_id": reqid} ) def _metadata_string(self, valid=None, base_url: str = None): self.ensure_one() sp_config = self._get_config_for_provider(base_url) return saml2.metadata.create_metadata_string( None, config=sp_config, valid=valid, cert=self._get_cert_key_path("sp_pem_public"), keyfile=self._get_cert_key_path("sp_pem_private"), sign=self.sign_metadata, ) def _hook_validate_auth_response(self, response, matching_value): self.ensure_one() vals = {} attrs = response.get_identity() for attribute in self.attribute_mapping_ids: if attribute.attribute_name not in attrs: _logger.debug( "SAML attribute '%s' not found in response %s", attribute.attribute_name, attrs, ) continue attribute_value = attrs[attribute.attribute_name] if isinstance(attribute_value, list): attribute_value = attribute_value[0] vals[attribute.field_name] = attribute_value return {"mapped_attrs": vals} def action_refresh_metadata_from_url(self): providers = self.search( [("idp_metadata_url", "ilike", "http%"), ("id", "in", self.ids)] ) if not providers: return False providers_to_update = {} for provider in providers: document = requests.get(provider.idp_metadata_url, timeout=5) if document.status_code != 200: raise UserError( f"Unable to download the metadata for {provider.name}: {document.reason}" ) if document.text != provider.idp_metadata: providers_to_update[provider.id] = document.text if not providers_to_update: return False # lock the records we might update, so that multiple simultaneous login # attempts will not cause concurrent updates provider_ids = tuple(providers_to_update.keys()) self.env.cr.execute( "SELECT id FROM auth_saml_provider WHERE id in %s FOR UPDATE", (provider_ids,), ) updated = False for provider in providers: if provider.id in providers_to_update: provider.idp_metadata = providers_to_update[provider.id] _logger.info( "Updated metadata for provider %s from %s", provider.name, ) updated = True return updated