oca-server-auth/odoo-bringout-oca-server-auth-auth_saml/auth_saml/tests/test_pysaml.py
2025-08-29 15:43:06 +02:00

488 lines
18 KiB
Python

# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import base64
import html
import os
import os.path as osp
from copy import deepcopy
from unittest.mock import patch
import responses
from saml2.sigver import SignatureError
from odoo.exceptions import AccessDenied, UserError, ValidationError
from odoo.tests import HttpCase, tagged
from odoo.tools import mute_logger
from .fake_idp import CONFIG, FakeIDP, UnsignedFakeIDP
@tagged("saml", "post_install", "-at_install")
class TestPySaml(HttpCase):
def setUp(self):
super().setUp()
sp_pem_public = None
sp_pem_private = None
with open(
os.path.join(os.path.dirname(__file__), "data", "sp.pem"),
"r",
encoding="UTF-8",
) as file:
sp_pem_public = file.read()
with open(
os.path.join(os.path.dirname(__file__), "data", "sp.key"),
"r",
encoding="UTF-8",
) as file:
sp_pem_private = file.read()
self.saml_provider = self.env["auth.saml.provider"].create(
{
"name": "SAML Provider Demo",
"idp_metadata": FakeIDP().get_metadata(),
"sp_pem_public": base64.b64encode(sp_pem_public.encode()),
"sp_pem_private": base64.b64encode(sp_pem_private.encode()),
"body": "Login with Authentic",
"active": True,
"sig_alg": "SIG_RSA_SHA1",
"matching_attribute": "mail",
}
)
self.url_saml_request = (
"/auth_saml/get_auth_request?pid=%d" % self.saml_provider.id
)
self.idp = FakeIDP([self.saml_provider._metadata_string()])
# Create a user with only password, and another with both password and saml id
self.user, self.user2 = (
self.env["res.users"]
.with_context(no_reset_password=True, tracking_disable=True)
.create(
[
{
"name": "User",
"email": "test@example.com",
"login": "test@example.com",
"password": "Lu,ums-7vRU>0i]=YDLa",
},
{
"name": "User with SAML",
"email": "user@example.com",
"login": "user@example.com",
"password": "NesTNSte9340D720te>/-A",
"saml_ids": [
(
0,
0,
{
"saml_provider_id": self.saml_provider.id,
"saml_uid": "user@example.com",
},
)
],
},
]
)
)
def test_ensure_provider_appears_on_login(self):
# SAML provider should be listed in the login page
response = self.url_open("/web/login")
self.assertIn("Login with Authentic", response.text)
self.assertIn(self.url_saml_request, response.text)
def test_ensure_provider_appears_on_login_with_redirect_param(self):
"""Test that SAML provider is listed in the login page keeping the redirect"""
response = self.url_open(
"/web/login?redirect=%2Fweb%23action%3D37%26model%3Dir.module.module%26view"
"_type%3Dkanban%26menu_id%3D5"
)
self.assertIn("Login with Authentic", response.text)
self.assertIn(
"/auth_saml/get_auth_request?pid={}&redirect=%2Fweb%23action%3D37%26mod"
"el%3Dir.module.module%26view_type%3Dkanban%26menu_id%3D5".format(
self.saml_provider.id
),
response.text,
)
def test_ensure_metadata_present(self):
response = self.url_open(
"/auth_saml/metadata?p=%d&d=%s"
% (self.saml_provider.id, self.env.cr.dbname)
)
self.assertTrue(response.ok)
self.assertTrue("xml" in response.headers.get("Content-Type"))
def test_ensure_get_auth_request_redirects(self):
response = self.url_open(
"/auth_saml/get_auth_request?pid=%d" % self.saml_provider.id,
allow_redirects=False,
)
self.assertTrue(response.ok)
self.assertEqual(response.status_code, 303)
self.assertIn(
"http://localhost:8000/sso/redirect?SAMLRequest=",
response.headers.get("Location"),
)
def test_login_no_saml(self):
"""
Login with a user account, but without any SAML provider setup
against the user
"""
# Standard login using password
self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa")
self.assertEqual(self.session.uid, self.user.id)
self.logout()
# Try to log in with a non-existing SAML token
with self.assertRaises(AccessDenied):
self.authenticate(user="test@example.com", password="test_saml_token")
redirect_url = self.saml_provider._get_auth_request()
self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url)
response = self.idp.fake_login(redirect_url)
self.assertEqual(200, response.status_code)
unpacked_response = response._unpack()
with self.assertRaises(AccessDenied):
self.env["res.users"].sudo().auth_saml(
self.saml_provider.id, unpacked_response.get("SAMLResponse"), None
)
def add_provider_to_user(self):
"""Add a provider to self.user"""
self.user.write(
{
"saml_ids": [
(
0,
0,
{
"saml_provider_id": self.saml_provider.id,
"saml_uid": "test@example.com",
},
)
]
}
)
def test_login_with_saml(self):
self.add_provider_to_user()
redirect_url = self.saml_provider._get_auth_request()
self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url)
response = self.idp.fake_login(redirect_url)
self.assertEqual(200, response.status_code)
unpacked_response = response._unpack()
(database, login, token) = (
self.env["res.users"]
.sudo()
.auth_saml(
self.saml_provider.id, unpacked_response.get("SAMLResponse"), None
)
)
self.assertEqual(database, self.env.cr.dbname)
self.assertEqual(login, self.user.login)
# We should not be able to log in with the wrong token
with self.assertRaises(AccessDenied):
self.authenticate(
user="test@example.com", password="{}-WRONG".format(token)
)
# User should now be able to log in with the token
self.authenticate(user="test@example.com", password=token)
def test_disallow_user_password_when_changing_ir_config_parameter(self):
"""Test that disabling users from having both a password and SAML ids remove
users password."""
# change the option
self.browse_ref(
"auth_saml.allow_saml_uid_and_internal_password"
).value = "False"
# The password should be blank and the user should not be able to connect
with self.assertRaises(AccessDenied):
self.authenticate(
user="user@example.com", password="NesTNSte9340D720te>/-A"
)
def test_disallow_user_password_new_user(self):
"""Test that a new user can not be set up with both password and SAML ids when
the disallow option is set."""
# change the option
self.browse_ref(
"auth_saml.allow_saml_uid_and_internal_password"
).value = "False"
with self.assertRaises(UserError):
self.env["res.users"].with_context(no_reset_password=True).create(
{
"name": "New user with SAML",
"email": "user2@example.com",
"login": "user2@example.com",
"password": "NesTNSte9340D720te>/-A",
"saml_ids": [
(
0,
0,
{
"saml_provider_id": self.saml_provider.id,
"saml_uid": "user2",
},
)
],
}
)
def test_disallow_user_password_no_password_set(self):
"""Test that a new user with SAML ids can not have its password set up when the
disallow option is set."""
# change the option
self.browse_ref(
"auth_saml.allow_saml_uid_and_internal_password"
).value = "False"
# Create a new user with only SAML ids
user = (
self.env["res.users"]
.with_context(no_reset_password=True, tracking_disable=True)
.create(
{
"name": "New user with SAML",
"email": "user2@example.com",
"login": "user2@example.com",
"saml_ids": [
(
0,
0,
{
"saml_provider_id": self.saml_provider.id,
"saml_uid": "unused",
},
)
],
}
)
)
# Assert that the user password can not be set
with self.assertRaises(ValidationError):
user.password = "new password"
def test_disallow_user_password(self):
"""Test that existing user password is deleted when adding an SAML provider when
the disallow option is set."""
# change the option
self.browse_ref(
"auth_saml.allow_saml_uid_and_internal_password"
).value = "False"
# Test that existing user password is deleted when adding an SAML provider
self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa")
self.add_provider_to_user()
with self.assertRaises(AccessDenied):
self.authenticate(user="test@example.com", password="Lu,ums-7vRU>0i]=YDLa")
def test_disallow_user_admin_can_have_password(self):
"""Test that admin can have its password set even if the disallow option is set."""
# change the option
self.browse_ref(
"auth_saml.allow_saml_uid_and_internal_password"
).value = "False"
# Test base.user_admin exception
self.env.ref("base.user_admin").password = "nNRST4j*->sEatNGg._!"
def test_db_filtering(self):
# change filter to only allow our db.
with patch("odoo.http.db_filter", new=lambda *args, **kwargs: []):
self.add_provider_to_user()
redirect_url = self.saml_provider._get_auth_request()
response = self.idp.fake_login(redirect_url)
unpacked_response = response._unpack()
for key in unpacked_response:
unpacked_response[key] = html.unescape(unpacked_response[key])
response = self.url_open("/auth_saml/signin", data=unpacked_response)
self.assertFalse(response.ok)
self.assertIn(response.status_code, [400, 404])
def test_redirect_after_login(self):
"""Test that providing a redirect will be kept after SAML login."""
self.add_provider_to_user()
redirect_url = self.saml_provider._get_auth_request(
{
"r": "%2Fweb%23action%3D37%26model%3Dir.module.module%26view_type%3Dkan"
"ban%26menu_id%3D5"
}
)
response = self.idp.fake_login(redirect_url)
unpacked_response = response._unpack()
for key in unpacked_response:
unpacked_response[key] = html.unescape(unpacked_response[key])
response = self.url_open(
"/auth_saml/signin",
data=unpacked_response,
allow_redirects=True,
timeout=300,
)
self.assertTrue(response.ok)
self.assertEqual(
response.url,
self.base_url()
+ "/web#action=37&model=ir.module.module&view_type=kanban&menu_id=5",
)
def test_disallow_user_password_when_changing_settings(self):
"""Test that disabling the setting will remove passwords from related users"""
# We activate the settings to allow password login
self.env["res.config.settings"].create(
{
"allow_saml_uid_and_internal_password": True,
}
).execute()
# Test the user can login with the password
self.authenticate(user="user@example.com", password="NesTNSte9340D720te>/-A")
self.env["res.config.settings"].create(
{
"allow_saml_uid_and_internal_password": False,
}
).execute()
with self.assertRaises(AccessDenied):
self.authenticate(
user="user@example.com", password="NesTNSte9340D720te>/-A"
)
@responses.activate
def test_download_metadata(self):
expected_metadata = self.idp.get_metadata()
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=200,
content_type="text/xml",
body=expected_metadata,
)
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
self.saml_provider.idp_metadata = ""
self.saml_provider.action_refresh_metadata_from_url()
self.assertEqual(self.saml_provider.idp_metadata, expected_metadata)
@responses.activate
def test_download_metadata_no_provider(self):
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
self.saml_provider.idp_metadata = ""
self.saml_provider.active = False
self.saml_provider.action_refresh_metadata_from_url()
self.assertFalse(self.saml_provider.idp_metadata)
@responses.activate
def test_download_metadata_error(self):
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=500,
content_type="text/xml",
)
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
self.saml_provider.idp_metadata = ""
with self.assertRaises(UserError):
self.saml_provider.action_refresh_metadata_from_url()
self.assertFalse(self.saml_provider.idp_metadata)
@responses.activate
def test_download_metadata_no_update(self):
expected_metadata = self.idp.get_metadata()
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=200,
content_type="text/xml",
body=expected_metadata,
)
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
self.saml_provider.idp_metadata = expected_metadata
self.saml_provider.action_refresh_metadata_from_url()
self.assertEqual(self.saml_provider.idp_metadata, expected_metadata)
@responses.activate
def test_login_with_saml_metadata_empty(self):
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
self.saml_provider.idp_metadata = ""
expected_metadata = self.idp.get_metadata()
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=200,
content_type="text/xml",
body=expected_metadata,
)
self.test_login_with_saml()
self.assertEqual(self.saml_provider.idp_metadata, expected_metadata)
@responses.activate
def test_login_with_saml_metadata_key_changed(self):
settings = deepcopy(CONFIG)
settings["key_file"] = osp.join(
osp.dirname(__file__), "data", "key_idp_expired.pem"
)
settings["cert"] = osp.join(
osp.dirname(__file__), "data", "key_idp_expired.pem"
)
expired_idp = FakeIDP(settings=settings)
self.saml_provider.idp_metadata = expired_idp.get_metadata()
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
up_to_date_metadata = self.idp.get_metadata()
self.assertNotEqual(self.saml_provider.idp_metadata, up_to_date_metadata)
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=200,
content_type="text/xml",
body=up_to_date_metadata,
)
self.test_login_with_saml()
@responses.activate
def test_login_with_saml_unsigned_response(self):
self.add_provider_to_user()
self.saml_provider.idp_metadata_url = "http://localhost:8000/metadata"
unsigned_idp = UnsignedFakeIDP([self.saml_provider._metadata_string()])
redirect_url = self.saml_provider._get_auth_request()
self.assertIn("http://localhost:8000/sso/redirect?SAMLRequest=", redirect_url)
response = unsigned_idp.fake_login(redirect_url)
self.assertEqual(200, response.status_code)
unpacked_response = response._unpack()
responses.add(
responses.GET,
"http://localhost:8000/metadata",
status=200,
content_type="text/xml",
body=self.saml_provider.idp_metadata,
)
with (
self.assertRaises(SignatureError),
mute_logger("saml2.entity"),
mute_logger("saml2.client_base"),
):
(database, login, token) = (
self.env["res.users"]
.sudo()
.auth_saml(
self.saml_provider.id, unpacked_response.get("SAMLResponse"), None
)
)