# Copyright 2020 Camptocamp SA (http://www.camptocamp.com) # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html). # from urllib.parse import urlparse import json from unittest import mock from odoo import exceptions from odoo.http import Response from odoo.tools import mute_logger from odoo.addons.base_rest.controllers.main import _PseudoCollection from odoo.addons.base_rest.tests.common import TransactionRestServiceRegistryCase from odoo.addons.component.tests.common import new_rollbacked_env from odoo.addons.rest_log import exceptions as log_exceptions # pylint: disable=W7950 from .common import FakeConcurrentUpdateError, TestDBLoggingMixin class TestDBLogging(TransactionRestServiceRegistryCase, TestDBLoggingMixin): @classmethod def setUpClass(cls): super().setUpClass() cls._setup_registry(cls) cls.service = cls._get_service(cls) cls.log_model = cls.env["rest.log"].sudo() @classmethod def tearDownClass(cls): # pylint: disable=W8110 cls._teardown_registry(cls) super().tearDownClass() def test_log_enabled_conf_parsing(self): key1 = "coll1.service1.endpoint" key2 = "coll1.service2.endpoint:failed" key3 = "coll2.service1.endpoint:success" self.env["ir.config_parameter"].sudo().set_param( "rest.log.active", ",".join((key1, key2, key3)) ) expected = { # fmt:off "coll1.service1.endpoint": ("success", "failed"), "coll1.service2.endpoint": ("failed", ), "coll2.service1.endpoint": ("success", ), # fmt: on } self.assertEqual(self.env["rest.log"]._get_log_active_conf(), expected) def test_log_enabled(self): self.service._log_calls_in_db = False with self._get_mocked_request(): # no conf no flag self.assertFalse(self.service._db_logging_active("avg_endpoint")) # by conf for collection self.env["ir.config_parameter"].sudo().set_param( "rest.log.active", self.service._collection ) self.assertTrue(self.service._db_logging_active("avg_endpoint")) # by conf for usage self.env["ir.config_parameter"].sudo().set_param( "rest.log.active", self.service._collection + "." + self.service._usage ) self.assertTrue(self.service._db_logging_active("avg_endpoint")) # by conf for usage and endpoint self.env["ir.config_parameter"].sudo().set_param( "rest.log.active", self.service._collection + "." + self.service._usage + ".avg_endpoint", ) self.assertTrue(self.service._db_logging_active("avg_endpoint")) self.assertFalse(self.service._db_logging_active("not_so_avg_endpoint")) # no conf, service class flag self.env["ir.config_parameter"].sudo().set_param("rest.log.active", "") self.service._log_calls_in_db = True self.assertTrue(self.service._db_logging_active("avg_endpoint")) def test_no_log_entry(self): self.service._log_calls_in_db = False log_entry_count = self.log_model.search_count([]) with self._get_mocked_request(): resp = self.service.dispatch("get", 100) self.assertNotIn("log_entry_url", resp) self.assertFalse(self.log_model.search_count([]) > log_entry_count) def test_log_entry(self): log_entry_count = self.log_model.search_count([]) with self._get_mocked_request(): resp = self.service.dispatch("get", 100) self.assertIn("log_entry_url", resp) self.assertTrue(self.log_model.search_count([]) > log_entry_count) def test_log_entry_values_success(self): params = {"some": "value"} kw = {"result": {"data": "worked!"}} # test full data request only once, other tests will skip this part httprequest = mock.Mock( url="https://my.odoo.test/service/endpoint", method="POST" ) extra_headers = {"KEEP-ME": "FOO"} with self._get_mocked_request( httprequest=httprequest, extra_headers=extra_headers ) as mocked_request: entry = self.service._log_call_in_db( self.env, mocked_request, "avg_method", params=params, **kw ) expected = { "collection": self.service._collection, "request_url": httprequest.url, "request_method": httprequest.method, "state": "success", "error": False, "exception_name": False, "severity": False, } self.assertRecordValues(entry, [expected]) expected_json = { "result": {"data": "worked!"}, "params": dict(params), "headers": { "Cookie": "", "Api-Key": "", "KEEP-ME": "FOO", }, } for k, v in expected_json.items(): self.assertEqual(json.loads(entry[k]), v) def test_log_entry_values_failed(self): params = {"some": "value"} # no result, will fail kw = {"result": {}} with self._get_mocked_request() as mocked_request: entry = self.service._log_call_in_db( self.env, mocked_request, "avg_method", params=params, **kw ) expected = { "collection": self.service._collection, "state": "failed", "result": "{}", "error": False, "exception_name": False, "severity": False, } self.assertRecordValues(entry, [expected]) def _test_log_entry_values_failed_with_exception_default(self, severity=None): params = {"some": "value"} fake_tb = """ [...] File "/somewhere/in/your/custom/code/file.py", line 503, in write [...] ValueError: Ops, something went wrong """ orig_exception = ValueError("Ops, something went wrong") kw = {"result": {}, "traceback": fake_tb, "orig_exception": orig_exception} with self._get_mocked_request() as mocked_request: entry = self.service._log_call_in_db( self.env, mocked_request, "avg_method", params=params, **kw ) expected = { "collection": self.service._collection, "state": "failed", "result": "{}", "error": fake_tb, "exception_name": "ValueError", "exception_message": "Ops, something went wrong", "severity": severity or "severe", } self.assertRecordValues(entry, [expected]) def test_log_entry_values_failed_with_exception_default(self): self._test_log_entry_values_failed_with_exception_default() def test_log_entry_values_failed_with_exception_functional(self): params = {"some": "value"} fake_tb = """ [...] File "/somewhere/in/your/custom/code/file.py", line 503, in write [...] UserError: You are doing something wrong Dave! """ orig_exception = exceptions.UserError("You are doing something wrong Dave!") kw = {"result": {}, "traceback": fake_tb, "orig_exception": orig_exception} with self._get_mocked_request() as mocked_request: entry = self.service._log_call_in_db( self.env, mocked_request, "avg_method", params=params, **kw ) expected = { "collection": self.service._collection, "state": "failed", "result": "{}", "error": fake_tb, "exception_name": "odoo.exceptions.UserError", "exception_message": "You are doing something wrong Dave!", "severity": "functional", } self.assertRecordValues(entry, [expected]) # test that we can still change severity as we like entry.severity = "severe" self.assertEqual(entry.severity, "severe") def test_log_entry_severity_mapping_param(self): # test override of mapping via config param mapping = self.log_model._get_exception_severity_mapping() self.assertEqual(mapping, self.log_model.EXCEPTION_SEVERITY_MAPPING) self.assertEqual(mapping["ValueError"], "severe") self.assertEqual(mapping["odoo.exceptions.UserError"], "functional") value = "ValueError: warning, odoo.exceptions.UserError: severe" self.env["ir.config_parameter"].sudo().create( {"key": "rest.log.severity.exception.mapping", "value": value} ) mapping = self.log_model._get_exception_severity_mapping() self.assertEqual(mapping["ValueError"], "warning") self.assertEqual(mapping["odoo.exceptions.UserError"], "severe") self._test_log_entry_values_failed_with_exception_default("warning") @mute_logger("odoo.addons.rest_log.models.rest_log") def test_log_entry_severity_mapping_param_bad_values(self): # bad values are discarded value = """ ValueError: warning, odoo.exceptions.UserError::badvalue, VeryBadValue|error """ self.env["ir.config_parameter"].sudo().create( {"key": "rest.log.severity.exception.mapping", "value": value} ) mapping = self.log_model._get_exception_severity_mapping() expected = self.log_model.EXCEPTION_SEVERITY_MAPPING.copy() expected["ValueError"] = "warning" self.assertEqual(mapping, expected) def test_log_entry_values_success_with_response(self): with self._get_mocked_request() as mocked_request: res = Response( b"A test .pdf file to download", headers=[ ("Content-Type", "application/pdf"), ("X-Content-Type-Options", "nosniff"), ("Content-Disposition", "attachment; filename*=UTF-8''test.pdf"), ("Content-Length", 28), ], ) res.status_code = 200 entry = self.service._log_call_in_db( self.env, mocked_request, "method", result=res ) self.assertEqual(entry.state, "success") self.assertEqual( json.loads(entry.result), { "headers": { "Content-Disposition": "attachment; filename*=UTF-8''test.pdf", "Content-Length": "28", "Content-Type": "application/pdf", "X-Content-Type-Options": "nosniff", }, "status": 200, }, ) def test_log_entry_values_failure_with_response(self): with self._get_mocked_request() as mocked_request: res = Response(b"", headers=[]) res.status_code = 418 entry = self.service._log_call_in_db( self.env, mocked_request, "method", result=res ) self.assertEqual(entry.state, "failed") self.assertEqual( json.loads(entry.result), { "headers": { "Content-Length": "0", "Content-Type": "text/html; charset=utf-8", }, "status": 418, }, ) class TestDBLoggingExceptionBase( TransactionRestServiceRegistryCase, TestDBLoggingMixin ): @classmethod def setUpClass(cls): super().setUpClass() cls._setup_registry(cls) @classmethod def tearDownClass(cls): # pylint: disable=W8110 cls._teardown_registry(cls) super().tearDownClass() def _test_exception(self, test_type, wrapping_exc, exc_name, severity): log_model = self.env["rest.log"].sudo() initial_entries = log_model.search([]) entry_url_from_exc = None # Context: we are running in a transaction case which uses savepoints. # The log machinery is going to rollback the transation when catching errors. # Hence we need a completely separated env for the service. with new_rollbacked_env() as new_env: # Init fake collection w/ new env collection = _PseudoCollection(self._collection_name, new_env) service = self._get_service(self, collection=collection) with self._get_mocked_request(env=new_env): try: service.dispatch("fail", test_type) except Exception as err: # Not using `assertRaises` to inspect the exception directly self.assertTrue(isinstance(err, wrapping_exc)) self.assertEqual( service._get_exception_message(err), "Failed as you wanted!" ) entry_url_from_exc = err.rest_json_info["log_entry_url"] with new_rollbacked_env() as new_env: log_model = new_env["rest.log"].sudo() entry = log_model.search([]) - initial_entries expected = { "collection": service._collection, "state": "failed", "result": "null", "exception_name": exc_name, "exception_message": "Failed as you wanted!", "severity": severity, } self.assertRecordValues(entry, [expected]) self.assertEqual(entry_url_from_exc, service._get_log_entry_url(entry)) class TestDBLoggingExceptionUserError(TestDBLoggingExceptionBase): @staticmethod def _get_test_controller(class_or_instance, root_path=None): # Override to avoid registering twice the same controller route. return super()._get_test_controller( class_or_instance, root_path="/test_log_exception_user/" ) def test_log_exception_user(self): self._test_exception( "user", log_exceptions.RESTServiceUserErrorException, "odoo.exceptions.UserError", "functional", ) class TestDBLoggingExceptionValidationError(TestDBLoggingExceptionBase): @staticmethod def _get_test_controller(class_or_instance, root_path=None): return super()._get_test_controller( class_or_instance, root_path="/test_log_exception_validation/" ) def test_log_exception_validation(self): self._test_exception( "validation", log_exceptions.RESTServiceValidationErrorException, "odoo.exceptions.ValidationError", "functional", ) class TestDBLoggingExceptionValueError(TestDBLoggingExceptionBase): @staticmethod def _get_test_controller(class_or_instance, root_path=None): return super()._get_test_controller( class_or_instance, root_path="/test_log_exception_value/" ) def test_log_exception_value(self): self._test_exception( "value", log_exceptions.RESTServiceDispatchException, "ValueError", "severe" ) class TestDBLoggingRetryableError( TransactionRestServiceRegistryCase, TestDBLoggingMixin ): @classmethod def setUpClass(cls): super().setUpClass() cls._setup_registry(cls) @classmethod def tearDownClass(cls): # pylint: disable=W8110 cls._teardown_registry(cls) super().tearDownClass() def _test_exception(self, test_type, wrapping_exc, exc_name, severity): log_model = self.env["rest.log"].sudo() initial_entries = log_model.search([]) # Context: we are running in a transaction case which uses savepoints. # The log machinery is going to rollback the transation when catching errors. # Hence we need a completely separated env for the service. with new_rollbacked_env() as new_env: # Init fake collection w/ new env collection = _PseudoCollection(self._collection_name, new_env) service = self._get_service(self, collection=collection) with self._get_mocked_request(env=new_env): try: service.dispatch("fail", test_type) except Exception as err: # Not using `assertRaises` to inspect the exception directly self.assertTrue(isinstance(err, wrapping_exc)) self.assertEqual( service._get_exception_message(err), "Failed as you wanted!" ) with new_rollbacked_env() as new_env: log_model = new_env["rest.log"].sudo() entry = log_model.search([]) - initial_entries expected = { "collection": service._collection, "state": "failed", "result": "null", "exception_name": exc_name, "exception_message": "Failed as you wanted!", "severity": severity, } self.assertRecordValues(entry, [expected]) @staticmethod def _get_test_controller(class_or_instance, root_path=None): return super()._get_test_controller( class_or_instance, root_path="/test_log_exception_retryable/" ) def test_log_exception_retryable(self): # retryable error must bubble up to the retrying mechanism self._test_exception( "retryable", FakeConcurrentUpdateError, "odoo.addons.rest_log.tests.common.FakeConcurrentUpdateError", "warning", )