Initial commit: Core packages

This commit is contained in:
Ernad Husremovic 2025-08-29 15:20:45 +02:00
commit 12c29a983b
9512 changed files with 8379910 additions and 0 deletions

View file

@ -0,0 +1,9 @@
"""
Odoo unit testing framework, based on Python unittest.
Some files as case.py, resut.py, suite.py are higly modified versions of unitest
See https://github.com/python/cpython/tree/3.10/Lib/unittest for reference files.
"""
from . import common
from .common import *

View file

@ -0,0 +1,72 @@
import contextlib
from typing import Optional
import astroid
from pylint import interfaces, checkers
try:
from pylint.checkers.utils import only_required_for_messages
except ImportError:
from pylint.checkers.utils import check_messages as only_required_for_messages
class OdooBaseChecker(checkers.BaseChecker):
with contextlib.suppress(AttributeError): # TODO, remove once pylint minimal version is 3.0.0
__implements__ = interfaces.IAstroidChecker
# see https://github.com/pylint-dev/pylint/commit/358264aaf622505f6d2e8bc699618382981a078c
name = 'odoo'
msgs = {
'E8504': (
'The Markup constructor called with a non-constant argument',
'non-const-markup',
'',
)
}
@only_required_for_messages('non-const-markup')
def visit_call(self, node):
if (isinstance(node.func, astroid.Name) and
node.func.name == "Markup" and
not self._is_constant(node.args[0])):
self.add_message('non-const-markup', node=node, col_offset=len(node.as_string().split('\\n')))
elif (isinstance(node.func, astroid.Attribute) and
node.func.attrname == "Markup" and
not self._is_constant(node.args[0])):
self.add_message('non-const-markup', node=node, col_offset=len(node.as_string().split('\\n')))
def _is_constant(self, node: Optional[astroid.node_classes.NodeNG]) -> bool:
if isinstance(node, astroid.Const) or node is None:
return True
elif isinstance(node, astroid.JoinedStr):
return all(map(self._is_constant, node.values))
elif isinstance(node, astroid.FormattedValue):
return self._is_constant(node.value)
elif isinstance(node, astroid.Name):
_, assignments = node.lookup(node.name)
return all(map(self._is_constant, assignments))
elif isinstance(node, astroid.AssignName):
return self._is_constant(node.parent)
elif isinstance(node, astroid.Assign):
return self._is_constant(node.value)
elif (isinstance(node, astroid.Call) and
isinstance(node.func, astroid.Attribute) and
node.func.attrname in ["format", "join"]):
return (self._is_constant(node.func.expr) and
all(map(self._is_constant, node.args)) and
all(map(self._is_constant, node.keywords)))
elif isinstance(node, astroid.Keyword):
return self._is_constant(node.value)
elif isinstance(node, (astroid.List, astroid.Set, astroid.Tuple)):
return all(map(self._is_constant, node.elts))
elif isinstance(node, astroid.Dict):
return all(map(self._is_constant, node.values))
elif isinstance(node, astroid.BinOp):
return self._is_constant(node.left) and self._is_constant(node.right)
elif isinstance(node, astroid.IfExp):
return self._is_constant(node.body) and self._is_constant(node.orelse)
return False
def register(linter):
linter.register_checker(OdooBaseChecker(linter))

View file

@ -0,0 +1,295 @@
"""Test case implementation"""
import contextlib
import inspect
import logging
import sys
from pathlib import PurePath
from unittest import SkipTest
from unittest import TestCase as _TestCase
_logger = logging.getLogger(__name__)
__unittest = True
_subtest_msg_sentinel = object()
class _Outcome(object):
def __init__(self, test, result):
self.result = result
self.success = True
self.test = test
@contextlib.contextmanager
def testPartExecutor(self, test_case, isTest=False):
try:
yield
except KeyboardInterrupt:
raise
except SkipTest as e:
self.success = False
self.result.addSkip(test_case, str(e))
except: # pylint: disable=bare-except
exc_info = sys.exc_info()
self.success = False
if exc_info is not None:
exception_type, exception, tb = exc_info
tb = self._complete_traceback(tb)
exc_info = (exception_type, exception, tb)
self.test._addError(self.result, test_case, exc_info)
# explicitly break a reference cycle:
# exc_info -> frame -> exc_info
exc_info = None
def _complete_traceback(self, initial_tb):
Traceback = type(initial_tb)
# make the set of frames in the traceback
tb_frames = set()
tb = initial_tb
while tb:
tb_frames.add(tb.tb_frame)
tb = tb.tb_next
tb = initial_tb
# find the common frame by searching the last frame of the current_stack present in the traceback.
current_frame = inspect.currentframe()
common_frame = None
while current_frame:
if current_frame in tb_frames:
common_frame = current_frame # we want to find the last frame in common
current_frame = current_frame.f_back
if not common_frame: # not really useful but safer
_logger.warning('No common frame found with current stack, displaying full stack')
tb = initial_tb
else:
# remove the tb_frames until the common_frame is reached (keep the current_frame tb since the line is more accurate)
while tb and tb.tb_frame != common_frame:
tb = tb.tb_next
# add all current frame elements under the common_frame to tb
current_frame = common_frame.f_back
while current_frame:
tb = Traceback(tb, current_frame, current_frame.f_lasti, current_frame.f_lineno)
current_frame = current_frame.f_back
# remove traceback root part (odoo_bin, main, loading, ...), as
# everything under the testCase is not useful. Using '_callTestMethod',
# '_callSetUp', '_callTearDown', '_callCleanup' instead of the test
# method since the error does not comme especially from the test method.
while tb:
code = tb.tb_frame.f_code
if PurePath(code.co_filename).name == 'case.py' and code.co_name in ('_callTestMethod', '_callSetUp', '_callTearDown', '_callCleanup'):
return tb.tb_next
tb = tb.tb_next
_logger.warning('No root frame found, displaying full stacks')
return initial_tb # this shouldn't be reached
class TestCase(_TestCase):
_class_cleanups = [] # needed, backport for versions < 3.8
__unittest_skip__ = False
__unittest_skip_why__ = ''
_moduleSetUpFailed = False
# pylint: disable=super-init-not-called
def __init__(self, methodName='runTest'):
"""Create an instance of the class that will use the named test
method when executed. Raises a ValueError if the instance does
not have a method with the specified name.
"""
self._testMethodName = methodName
self._outcome = None
if methodName != 'runTest' and not hasattr(self, methodName):
# we allow instantiation with no explicit method name
# but not an *incorrect* or missing method name
raise ValueError("no such test method in %s: %s" %
(self.__class__, methodName))
self._cleanups = []
self._subtest = None
# Map types to custom assertEqual functions that will compare
# instances of said type in more detail to generate a more useful
# error message.
self._type_equality_funcs = {}
self.addTypeEqualityFunc(dict, 'assertDictEqual')
self.addTypeEqualityFunc(list, 'assertListEqual')
self.addTypeEqualityFunc(tuple, 'assertTupleEqual')
self.addTypeEqualityFunc(set, 'assertSetEqual')
self.addTypeEqualityFunc(frozenset, 'assertSetEqual')
self.addTypeEqualityFunc(str, 'assertMultiLineEqual')
def addCleanup(self, function, *args, **kwargs):
"""Add a function, with arguments, to be called when the test is
completed. Functions added are called on a LIFO basis and are
called after tearDown on test failure or success.
Cleanup items are called even if setUp fails (unlike tearDown)."""
self._cleanups.append((function, args, kwargs))
@classmethod
def addClassCleanup(cls, function, *args, **kwargs):
"""Same as addCleanup, except the cleanup items are called even if
setUpClass fails (unlike tearDownClass)."""
cls._class_cleanups.append((function, args, kwargs))
def shortDescription(self):
return None
@contextlib.contextmanager
def subTest(self, msg=_subtest_msg_sentinel, **params):
"""Return a context manager that will return the enclosed block
of code in a subtest identified by the optional message and
keyword parameters. A failure in the subtest marks the test
case as failed but resumes execution at the end of the enclosed
block, allowing further test code to be executed.
"""
parent = self._subtest
if parent:
params = {**params, **{k: v for k, v in parent.params.items() if k not in params}}
self._subtest = _SubTest(self, msg, params)
try:
with self._outcome.testPartExecutor(self._subtest, isTest=True):
yield
finally:
self._subtest = parent
def _addError(self, result, test, exc_info):
"""
This method is similar to feed_errors_to_result in python<=3.10
but only manage one error at a time
This is also inspired from python 3.11 _addError but still manages
subtests errors as in python 3.7-3.10 for minimal changes.
The method remains on the test to easily override it in test_test_suite
"""
if isinstance(test, _SubTest):
result.addSubTest(test.test_case, test, exc_info)
elif exc_info is not None:
if issubclass(exc_info[0], self.failureException):
result.addFailure(test, exc_info)
else:
result.addError(test, exc_info)
def _callSetUp(self):
self.setUp()
def _callTestMethod(self, method):
method()
def _callTearDown(self):
self.tearDown()
def _callCleanup(self, function, *args, **kwargs):
function(*args, **kwargs)
def run(self, result):
result.startTest(self)
testMethod = getattr(self, self._testMethodName)
skip = False
skip_why = ''
try:
skip = self.__class__.__unittest_skip__ or testMethod.__unittest_skip__
skip_why = self.__class__.__unittest_skip_why__ or testMethod.__unittest_skip_why__ or ''
except AttributeError: # testMethod may not have a __unittest_skip__ or __unittest_skip_why__
pass
if skip:
result.addSkip(self, skip_why)
result.stopTest(self)
return
outcome = _Outcome(self, result)
try:
self._outcome = outcome
with outcome.testPartExecutor(self):
self._callSetUp()
if outcome.success:
with outcome.testPartExecutor(self, isTest=True):
self._callTestMethod(testMethod)
with outcome.testPartExecutor(self):
self._callTearDown()
self.doCleanups()
if outcome.success:
result.addSuccess(self)
return result
finally:
result.stopTest(self)
# clear the outcome, no more needed
self._outcome = None
def doCleanups(self):
"""Execute all cleanup functions. Normally called for you after
tearDown."""
while self._cleanups:
function, args, kwargs = self._cleanups.pop()
with self._outcome.testPartExecutor(self):
self._callCleanup(function, *args, **kwargs)
@classmethod
def doClassCleanups(cls):
"""Execute all class cleanup functions. Normally called for you after
tearDownClass."""
cls.tearDown_exceptions = []
while cls._class_cleanups:
function, args, kwargs = cls._class_cleanups.pop()
try:
function(*args, **kwargs)
except Exception:
cls.tearDown_exceptions.append(sys.exc_info())
@property
def canonical_tag(self):
module = self.__module__
for prefix in ('odoo.addons.', 'odoo.upgrade.'):
if module.startswith(prefix):
module = module[len(prefix):]
module = module.replace('.', '/')
return f'/{module}.py:{self.__class__.__name__}.{self._testMethodName}'
def get_log_metadata(self):
metadata = {
'canonical_tag': self.canonical_tag,
}
return metadata
class _SubTest(TestCase):
def __init__(self, test_case, message, params):
super().__init__()
self._message = message
self.test_case = test_case
self.params = params
self.failureException = test_case.failureException
def runTest(self):
raise NotImplementedError("subtests cannot be run directly")
def _subDescription(self):
parts = []
if self._message is not _subtest_msg_sentinel:
parts.append("[{}]".format(self._message))
if self.params:
params_desc = ', '.join(
"{}={!r}".format(k, v)
for (k, v) in self.params.items())
parts.append("({})".format(params_desc))
return " ".join(parts) or '(<subtest>)'
def id(self):
return "{} {}".format(self.test_case.id(), self._subDescription())
def __str__(self):
return "{} {}".format(self.test_case, self._subDescription())

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1 @@
/** Should not be read by anyone */

View file

@ -0,0 +1 @@
<!-- Should not be read by anyone -->

View file

@ -0,0 +1,116 @@
import importlib
import importlib.util
import inspect
import itertools
import sys
import threading
import unittest
from pathlib import Path
from .. import tools
from .tag_selector import TagsSelector
from .suite import OdooSuite
from .result import OdooTestResult
def get_test_modules(module):
""" Return a list of module for the addons potentially containing tests to
feed unittest.TestLoader.loadTestsFromModule() """
results = _get_tests_modules(importlib.util.find_spec(f'odoo.addons.{module}'))
results += list(_get_upgrade_test_modules(module))
return results
def _get_tests_modules(mod):
spec = importlib.util.find_spec('.tests', mod.name)
if not spec:
return []
tests_mod = importlib.import_module(spec.name)
return [
mod_obj
for name, mod_obj in inspect.getmembers(tests_mod, inspect.ismodule)
if name.startswith('test_')
]
def _get_upgrade_test_modules(module):
upgrade_modules = (
f"odoo.upgrade.{module}",
f"odoo.addons.{module}.migrations",
f"odoo.addons.{module}.upgrades",
)
for module_name in upgrade_modules:
if not importlib.util.find_spec(module_name):
continue
upg = importlib.import_module(module_name)
for path in map(Path, upg.__path__):
for test in path.glob("tests/test_*.py"):
spec = importlib.util.spec_from_file_location(f"{upg.__name__}.tests.{test.stem}", test)
if not spec:
continue
pymod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = pymod
spec.loader.exec_module(pymod)
yield pymod
def make_suite(module_names, position='at_install'):
""" Creates a test suite for all the tests in the specified modules,
filtered by the provided ``position`` and the current test tags
:param list[str] module_names: modules to load tests from
:param str position: "at_install" or "post_install"
"""
config_tags = TagsSelector(tools.config['test_tags'])
position_tag = TagsSelector(position)
tests = (
t
for module_name in module_names
for m in get_test_modules(module_name)
for t in unwrap_suite(unittest.TestLoader().loadTestsFromModule(m))
if position_tag.check(t) and config_tags.check(t)
)
return OdooSuite(sorted(tests, key=lambda t: t.test_sequence))
def run_suite(suite, module_name=None, global_report=None):
# avoid dependency hell
from ..modules import module
module.current_test = True
threading.current_thread().testing = True
results = OdooTestResult(global_report=global_report)
suite(results)
threading.current_thread().testing = False
module.current_test = False
return results
def unwrap_suite(test):
"""
Attempts to unpack testsuites (holding suites or cases) in order to
generate a single stream of terminals (either test cases or customized
test suites). These can then be checked for run/skip attributes
individually.
An alternative would be to use a variant of @unittest.skipIf with a state
flag of some sort e.g. @unittest.skipIf(common.runstate != 'at_install'),
but then things become weird with post_install as tests should *not* run
by default there
"""
if isinstance(test, unittest.TestCase):
yield test
return
subtests = list(test)
## custom test suite (no test cases)
#if not len(subtests):
# yield test
# return
for item in itertools.chain.from_iterable(unwrap_suite(t) for t in subtests):
yield item

View file

@ -0,0 +1,347 @@
"""Test result object"""
import collections
import contextlib
import inspect
import logging
import os
import re
import sys
import time
import traceback
from typing import NamedTuple
from . import case
from .. import sql_db
__unittest = True
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
ODOO_TEST_MAX_FAILED_TESTS = max(1, int(os.environ.get('ODOO_TEST_MAX_FAILED_TESTS', sys.maxsize)))
stats_logger = logging.getLogger('odoo.tests.stats')
class Stat(NamedTuple):
time: float = 0.0
queries: int = 0
def __add__(self, other: 'Stat') -> 'Stat':
if other == 0:
return self
if not isinstance(other, Stat):
return NotImplemented
return Stat(
self.time + other.time,
self.queries + other.queries,
)
_logger = logging.getLogger(__name__)
_TEST_ID = re.compile(r"""
^
odoo\.addons\.
(?P<module>[^.]+)
\.tests\.
(?P<class>.+)
\.
(?P<method>[^.]+)
$
""", re.VERBOSE)
class OdooTestResult(object):
"""
This class in inspired from TextTestResult and modifies TestResult
Instead of using a stream, we are using the logger.
unittest.TestResult: Holder for test result information.
Test results are automatically managed by the TestCase and TestSuite
classes, and do not need to be explicitly manipulated by writers of tests.
This version does not hold a list of failure but just a count since the failure is logged immediately
This version is also simplied to better match our use cases
"""
_previousTestClass = None
_moduleSetUpFailed = False
def __init__(self, stream=None, descriptions=None, verbosity=None, global_report=None):
self.failures_count = 0
self.errors_count = 0
self.testsRun = 0
self.skipped = 0
self.tb_locals = False
# custom
self.time_start = None
self.queries_start = None
self._soft_fail = False
self.had_failure = False
self.stats = collections.defaultdict(Stat)
self.global_report = global_report
self.shouldStop = self.global_report and self.global_report.shouldStop or False
def total_errors_count(self):
result = self.errors_count + self.failures_count
if self.global_report:
result += self.global_report.total_errors_count()
return result
def _checkShouldStop(self):
if self.total_errors_count() >= ODOO_TEST_MAX_FAILED_TESTS:
global_report = self.global_report or self
if not global_report.shouldStop:
_logger.error(
"Test suite halted: max failed tests already reached (%s). "
"Remaining tests will be skipped.", ODOO_TEST_MAX_FAILED_TESTS)
global_report.shouldStop = True
self.shouldStop = True
def printErrors(self):
"Called by TestRunner after test run"
def startTest(self, test):
"Called when the given test is about to be run"
self.testsRun += 1
self.log(logging.INFO, 'Starting %s ...', self.getDescription(test), test=test)
self.time_start = time.time()
self.queries_start = sql_db.sql_counter
def stopTest(self, test):
"""Called when the given test has been run"""
if stats_logger.isEnabledFor(logging.INFO):
self.stats[test.id()] = Stat(
time=time.time() - self.time_start,
queries=sql_db.sql_counter - self.queries_start,
)
def addError(self, test, err):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info().
"""
if self._soft_fail:
self.had_failure = True
else:
self.errors_count += 1
self.logError("ERROR", test, err)
self._checkShouldStop()
def addFailure(self, test, err):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info()."""
if self._soft_fail:
self.had_failure = True
else:
self.failures_count += 1
self.logError("FAIL", test, err)
self._checkShouldStop()
def addSubTest(self, test, subtest, err):
if err is not None:
if issubclass(err[0], test.failureException):
self.addFailure(subtest, err)
else:
self.addError(subtest, err)
def addSuccess(self, test):
"Called when a test has completed successfully"
def addSkip(self, test, reason):
"""Called when a test is skipped."""
self.skipped += 1
self.log(logging.INFO, 'skipped %s : %s', self.getDescription(test), reason, test=test)
def wasSuccessful(self):
"""Tells whether or not this result was a success."""
# The hasattr check is for test_result's OldResult test. That
# way this method works on objects that lack the attribute.
# (where would such result intances come from? old stored pickles?)
return self.failures_count == self.errors_count == 0
def _exc_info_to_string(self, err, test):
"""Converts a sys.exc_info()-style tuple of values into a string."""
exctype, value, tb = err
# Skip test runner traceback levels
while tb and self._is_relevant_tb_level(tb):
tb = tb.tb_next
if exctype is test.failureException:
# Skip assert*() traceback levels
length = self._count_relevant_tb_levels(tb)
else:
length = None
tb_e = traceback.TracebackException(
exctype, value, tb, limit=length, capture_locals=self.tb_locals)
msgLines = list(tb_e.format())
return ''.join(msgLines)
def _is_relevant_tb_level(self, tb):
return '__unittest' in tb.tb_frame.f_globals
def _count_relevant_tb_levels(self, tb):
length = 0
while tb and not self._is_relevant_tb_level(tb):
length += 1
tb = tb.tb_next
return length
def __repr__(self):
return ("<%s.%s run=%i errors=%i failures=%i>" %
(self.__class__.__module__, self.__class__.__qualname__, self.testsRun, len(self.errors_count), len(self.failures_count)))
def __str__(self):
return f'{self.failures_count} failed, {self.errors_count} error(s) of {self.testsRun} tests'
@contextlib.contextmanager
def soft_fail(self):
self.had_failure = False
self._soft_fail = True
try:
yield
finally:
self._soft_fail = False
self.had_failure = False
def update(self, other):
""" Merges an other test result into this one, only updates contents
:type other: OdooTestResult
"""
self.failures_count += other.failures_count
self.errors_count += other.errors_count
self.testsRun += other.testsRun
self.skipped += other.skipped
self.stats.update(other.stats)
def log(self, level, msg, *args, test=None, exc_info=None, extra=None, stack_info=False, caller_infos=None):
"""
``test`` is the running test case, ``caller_infos`` is
(fn, lno, func, sinfo) (logger.findCaller format), see logger.log for
the other parameters.
"""
test = test or self
while isinstance(test, case._SubTest) and test.test_case:
test = test.test_case
logger = logging.getLogger(test.__module__)
try:
caller_infos = caller_infos or logger.findCaller(stack_info)
except ValueError:
caller_infos = "(unknown file)", 0, "(unknown function)", None
(fn, lno, func, sinfo) = caller_infos
# using logger.log makes it difficult to spot-replace findCaller in
# order to provide useful location information (the problematic spot
# inside the test function), so use lower-level functions instead
if logger.isEnabledFor(level):
record = logger.makeRecord(logger.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
logger.handle(record)
def log_stats(self):
if not stats_logger.isEnabledFor(logging.INFO):
return
details = stats_logger.isEnabledFor(logging.DEBUG)
stats_tree = collections.defaultdict(Stat)
counts = collections.Counter()
for test, stat in self.stats.items():
r = _TEST_ID.match(test)
if not r: # upgrade has tests at weird paths, ignore them
continue
stats_tree[r['module']] += stat
counts[r['module']] += 1
if details:
stats_tree['%(module)s.%(class)s' % r] += stat
stats_tree['%(module)s.%(class)s.%(method)s' % r] += stat
if details:
stats_logger.debug('Detailed Tests Report:\n%s', ''.join(
f'\t{test}: {stats.time:.2f}s {stats.queries} queries\n'
for test, stats in sorted(stats_tree.items())
))
else:
for module, stat in sorted(stats_tree.items()):
stats_logger.info(
"%s: %d tests %.2fs %d queries",
module, counts[module],
stat.time, stat.queries
)
def getDescription(self, test):
if isinstance(test, case._SubTest):
return 'Subtest %s.%s %s' % (test.test_case.__class__.__qualname__, test.test_case._testMethodName, test._subDescription())
if isinstance(test, case.TestCase):
# since we have the module name in the logger, this will avoid to duplicate module info in log line
# we only apply this for TestCase since we can receive error handler or other special case
return "%s.%s" % (test.__class__.__qualname__, test._testMethodName)
return str(test)
@contextlib.contextmanager
def collectStats(self, test_id):
queries_before = sql_db.sql_counter
time_start = time.time()
yield
self.stats[test_id] += Stat(
time=time.time() - time_start,
queries=sql_db.sql_counter - queries_before,
)
def logError(self, flavour, test, error):
err = self._exc_info_to_string(error, test)
caller_infos = self.getErrorCallerInfo(error, test)
self.log(logging.INFO, '=' * 70, test=test, caller_infos=caller_infos) # keep this as info !!!!!!
self.log(logging.ERROR, "%s: %s\n%s", flavour, self.getDescription(test), err, test=test, caller_infos=caller_infos)
def getErrorCallerInfo(self, error, test):
"""
:param error: A tuple (exctype, value, tb) as returned by sys.exc_info().
:param test: A TestCase that created this error.
:returns: a tuple (fn, lno, func, sinfo) matching the logger findCaller format or None
"""
# only handle TestCase here. test can be an _ErrorHolder in some case (setup/teardown class errors)
if not isinstance(test, case.TestCase):
return
_, _, error_traceback = error
# move upwards the subtest hierarchy to find the real test
while isinstance(test, case._SubTest) and test.test_case:
test = test.test_case
method_tb = None
file_tb = None
filename = inspect.getfile(type(test))
# Note: since _ErrorCatcher was introduced, we could always take the
# last frame, keeping the check on the test method for safety.
# Fallbacking on file for cleanup file shoud always be correct to a
# minimal working version would be
#
# infos_tb = error_traceback
# while infos_tb.tb_next()
# infos_tb = infos_tb.tb_next()
#
while error_traceback:
code = error_traceback.tb_frame.f_code
if code.co_name in (test._testMethodName, 'setUp', 'tearDown'):
method_tb = error_traceback
if code.co_filename == filename:
file_tb = error_traceback
error_traceback = error_traceback.tb_next
infos_tb = method_tb or file_tb
if infos_tb:
code = infos_tb.tb_frame.f_code
lineno = infos_tb.tb_lineno
filename = code.co_filename
method = test._testMethodName
return (filename, lineno, method, None)

View file

@ -0,0 +1,193 @@
"""
Vendor unittest.TestSuite
This is a modified version of python 3.8 unitest.TestSuite
Odoo tests customisation combined with the need of a cross version compatibility
started to make TestSuite and other unitest object more complicated than vendoring
the part we need for Odoo. This versions is simplified in order
to minimise the code to maintain
- Removes expected failure support
- Removes module setUp/tearDown support
"""
import logging
import sys
from . import case
from .common import HttpCase
from .result import stats_logger
from unittest import util, BaseTestSuite, TestCase
from odoo.modules import module
__unittest = True
class TestSuite(BaseTestSuite):
"""A test suite is a composite test consisting of a number of TestCases.
For use, create an instance of TestSuite, then add test case instances.
When all tests have been added, the suite can be passed to a test
runner, such as TextTestRunner. It will run the individual test cases
in the order in which they were added, aggregating the results. When
subclassing, do not forget to call the base class constructor.
"""
def run(self, result, debug=False):
for test in self:
if result.shouldStop:
break
assert isinstance(test, (TestCase))
module.current_test = test
self._tearDownPreviousClass(test, result)
self._handleClassSetUp(test, result)
result._previousTestClass = test.__class__
if not test.__class__._classSetupFailed:
test(result)
self._tearDownPreviousClass(None, result)
return result
def _handleClassSetUp(self, test, result):
previousClass = result._previousTestClass
currentClass = test.__class__
if currentClass == previousClass:
return
if result._moduleSetUpFailed:
return
if getattr(currentClass, "__unittest_skip__", False):
return
currentClass._classSetupFailed = False
try:
currentClass.setUpClass()
except Exception as e:
currentClass._classSetupFailed = True
className = util.strclass(currentClass)
self._createClassOrModuleLevelException(result, e,
'setUpClass',
className)
finally:
if currentClass._classSetupFailed is True:
currentClass.doClassCleanups()
if len(currentClass.tearDown_exceptions) > 0:
for exc in currentClass.tearDown_exceptions:
self._createClassOrModuleLevelException(
result, exc[1], 'setUpClass', className,
info=exc)
def _createClassOrModuleLevelException(self, result, exception, method_name,
parent, info=None):
errorName = f'{method_name} ({parent})'
error = _ErrorHolder(errorName)
if isinstance(exception, case.SkipTest):
result.addSkip(error, str(exception))
else:
if not info:
result.addError(error, sys.exc_info())
else:
result.addError(error, info)
def _tearDownPreviousClass(self, test, result):
previousClass = result._previousTestClass
currentClass = test.__class__
if currentClass == previousClass:
return
if not previousClass:
return
if previousClass._classSetupFailed:
return
if getattr(previousClass, "__unittest_skip__", False):
return
try:
previousClass.tearDownClass()
except Exception as e:
className = util.strclass(previousClass)
self._createClassOrModuleLevelException(result, e,
'tearDownClass',
className)
finally:
previousClass.doClassCleanups()
if len(previousClass.tearDown_exceptions) > 0:
for exc in previousClass.tearDown_exceptions:
className = util.strclass(previousClass)
self._createClassOrModuleLevelException(result, exc[1],
'tearDownClass',
className,
info=exc)
class _ErrorHolder(object):
"""
Placeholder for a TestCase inside a result. As far as a TestResult
is concerned, this looks exactly like a unit test. Used to insert
arbitrary errors into a test suite run.
"""
# Inspired by the ErrorHolder from Twisted:
# http://twistedmatrix.com/trac/browser/trunk/twisted/trial/runner.py
# attribute used by TestResult._exc_info_to_string
failureException = None
def __init__(self, description):
self.description = description
def id(self):
return self.description
def shortDescription(self):
return None
def __repr__(self):
return "<ErrorHolder description=%r>" % (self.description,)
def __str__(self):
return self.id()
def run(self, result):
# could call result.addError(...) - but this test-like object
# shouldn't be run anyway
pass
def __call__(self, result):
return self.run(result)
def countTestCases(self):
return 0
class OdooSuite(TestSuite):
def _handleClassSetUp(self, test, result):
previous_test_class = result._previousTestClass
if not (
previous_test_class != type(test)
and hasattr(result, 'stats')
and stats_logger.isEnabledFor(logging.INFO)
):
super()._handleClassSetUp(test, result)
return
test_class = type(test)
test_id = f'{test_class.__module__}.{test_class.__qualname__}.setUpClass'
with result.collectStats(test_id):
super()._handleClassSetUp(test, result)
def _tearDownPreviousClass(self, test, result):
previous_test_class = result._previousTestClass
if not (
previous_test_class
and previous_test_class != type(test)
and hasattr(result, 'stats')
and stats_logger.isEnabledFor(logging.INFO)
):
super()._tearDownPreviousClass(test, result)
return
test_id = f'{previous_test_class.__module__}.{previous_test_class.__qualname__}.tearDownClass'
with result.collectStats(test_id):
super()._tearDownPreviousClass(test, result)
def has_http_case(self):
return self.countTestCases() and any(isinstance(test_case, HttpCase) for test_case in self)

View file

@ -0,0 +1,104 @@
import re
import logging
from odoo.tools.misc import OrderedSet
_logger = logging.getLogger(__name__)
class TagsSelector(object):
""" Test selector based on tags. """
filter_spec_re = re.compile(r'''
^
([+-]?) # operator_re
(\*|\w*) # tag_re
(?:\/([\w\/]*(?:.py)?))? # module_re
(?::(\w*))? # test_class_re
(?:\.(\w*))? # test_method_re
(?:\[(.*)\])? # parameters
$''', re.VERBOSE) # [-][tag][/module][:class][.method][[params]]
def __init__(self, spec):
""" Parse the spec to determine tags to include and exclude. """
parts = re.split(r',(?![^\[]*\])', spec) # split on all comma not inside [] (not followed by ])
filter_specs = [t.strip() for t in parts if t.strip()]
self.exclude = set()
self.include = set()
self.parameters = OrderedSet()
for filter_spec in filter_specs:
match = self.filter_spec_re.match(filter_spec)
if not match:
_logger.error('Invalid tag %s', filter_spec)
continue
sign, tag, module, klass, method, parameters = match.groups()
is_include = sign != '-'
is_exclude = not is_include
if not tag and is_include:
# including /module:class.method implicitly requires 'standard'
tag = 'standard'
elif not tag or tag == '*':
# '*' indicates all tests (instead of 'standard' tests only)
tag = None
module_path = None
if module and (module.endswith('.py')):
module_path = module[:-3].replace('/', '.')
module = None
test_filter = (tag, module, klass, method, module_path)
if parameters:
# we could check here that test supports negated parameters
self.parameters.add((test_filter, ('-' if is_exclude else '+', parameters)))
is_exclude = False
if is_include:
self.include.add(test_filter)
if is_exclude:
self.exclude.add(test_filter)
if (self.exclude or self.parameters) and not self.include:
self.include.add(('standard', None, None, None, None))
def check(self, test):
""" Return whether ``arg`` matches the specification: it must have at
least one tag in ``self.include`` and none in ``self.exclude`` for each tag category.
"""
if not hasattr(test, 'test_tags'): # handle the case where the Test does not inherit from BaseCase and has no test_tags
_logger.debug("Skipping test '%s' because no test_tag found.", test)
return False
test_module = getattr(test, 'test_module', None)
test_class = getattr(test, 'test_class', None)
test_tags = test.test_tags | {test_module} # module as test_tags deprecated, keep for retrocompatibility,
test_method = getattr(test, '_testMethodName', None)
test._test_params = []
def _is_matching(test_filter):
(tag, module, klass, method, module_path) = test_filter
if tag and tag not in test_tags:
return False
elif module_path and not test.__module__.endswith(module_path):
return False
elif not module_path and module and module != test_module:
return False
elif klass and klass != test_class:
return False
elif method and test_method and method != test_method:
return False
return True
if any(_is_matching(test_filter) for test_filter in self.exclude):
return False
if not any(_is_matching(test_filter) for test_filter in self.include):
return False
for test_filter, parameter in self.parameters:
if _is_matching(test_filter):
test._test_params.append(parameter)
return True

View file

@ -0,0 +1,234 @@
#!/usr/bin/env python3
import argparse
import logging.config
import os
import sys
import threading
import time
sys.path.append(os.path.abspath(os.path.join(__file__,'../../../')))
import odoo
from odoo.tools import config, topological_sort, unique
from odoo.netsvc import init_logger
from odoo.tests import standalone_tests
import odoo.tests.loader
_logger = logging.getLogger('odoo.tests.test_module_operations')
BLACKLIST = {
'auth_ldap', 'document_ftp', 'website_instantclick', 'pad',
'pad_project', 'note_pad', 'pos_cache', 'pos_blackbox_be',
}
IGNORE = ('hw_', 'theme_', 'l10n_', 'test_')
INSTALL_BLACKLIST = {
'payment_alipay', 'payment_ogone', 'payment_payulatam', 'payment_payumoney',
} # deprecated modules (cannot be installed manually through button_install anymore)
def install(db_name, module_id, module_name):
with odoo.registry(db_name).cursor() as cr:
env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
module = env['ir.module.module'].browse(module_id)
module.button_immediate_install()
_logger.info('%s installed', module_name)
def uninstall(db_name, module_id, module_name):
with odoo.registry(db_name).cursor() as cr:
env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
module = env['ir.module.module'].browse(module_id)
module.button_immediate_uninstall()
_logger.info('%s uninstalled', module_name)
def cycle(db_name, module_id, module_name):
install(db_name, module_id, module_name)
uninstall(db_name, module_id, module_name)
install(db_name, module_id, module_name)
class CheckAddons(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
self.values = namespace
config._check_addons_path(self, option_string, values, self)
def parse_args():
parser = argparse.ArgumentParser(
description="Script for testing the install / uninstall / reinstall"
" cycle of Odoo modules. Prefer the 'cycle' subcommand to"
" running this without anything specified (this is the"
" default behaviour).")
parser.set_defaults(
func=test_cycle,
reinstall=True,
)
fake_commands = parser.add_mutually_exclusive_group()
parser.add_argument("--database", "-d", type=str, required=True,
help="The database to test (/ run the command on)")
parser.add_argument("--data-dir", "-D", dest="data_dir", type=str,
help="Directory where to store Odoo data"
)
parser.add_argument("--skip", "-s", type=str,
help="Comma-separated list of modules to skip (they will only be installed)")
parser.add_argument("--resume-at", "-r", type=str,
help="Skip modules (only install) up to the specified one in topological order")
parser.add_argument("--addons-path", "-p", type=str, action=CheckAddons,
help="Comma-separated list of paths to directories containing extra Odoo modules")
cmds = parser.add_subparsers(title="subcommands", metavar='')
cycle = cmds.add_parser(
'cycle', help="Full install/uninstall/reinstall cycle.",
description="Installs, uninstalls, and reinstalls all modules which are"
" not skipped or blacklisted, the database should have"
" 'base' installed (only).")
cycle.set_defaults(func=test_cycle)
fake_commands.add_argument(
"--uninstall", "-U", action=UninstallAction,
help="Comma-separated list of modules to uninstall/reinstall. Prefer the 'uninstall' subcommand."
)
uninstall = cmds.add_parser(
'uninstall', help="Uninstallation",
description="Uninstalls then (by default) reinstalls every specified "
"module. Modules which are not installed before running "
"are ignored.")
uninstall.set_defaults(func=test_uninstall)
uninstall.add_argument('uninstall', help="comma-separated list of modules to uninstall/reinstall")
uninstall.add_argument(
'-n', '--no-reinstall', dest='reinstall', action='store_false',
help="Skips reinstalling the module(s) after uninstalling."
)
fake_commands.add_argument("--standalone", action=StandaloneAction,
help="Launch standalone scripts tagged with @standalone. Accepts a list of "
"module names or tags separated by commas. 'all' will run all available scripts. Prefer the 'standalone' subcommand."
)
standalone = cmds.add_parser('standalone', help="Run scripts tagged with @standalone")
standalone.set_defaults(func=test_standalone)
standalone.add_argument('standalone', help="List of module names or tags separated by commas, 'all' will run all available scripts.")
return parser.parse_args()
class UninstallAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
namespace.func = test_uninstall
setattr(namespace, self.dest, values)
class StandaloneAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
namespace.func = test_standalone
setattr(namespace, self.dest, values)
def test_cycle(args):
""" Test full install/uninstall/reinstall cycle for all modules """
with odoo.registry(args.database).cursor() as cr:
env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
def valid(module):
return not (
module.name in BLACKLIST
or module.name in INSTALL_BLACKLIST
or module.name.startswith(IGNORE)
or module.state in ('installed', 'uninstallable')
)
modules = env['ir.module.module'].search([]).filtered(valid)
# order modules in topological order
modules = modules.browse(topological_sort({
module.id: module.dependencies_id.depend_id.ids
for module in modules
}))
modules_todo = [(module.id, module.name) for module in modules]
resume = args.resume_at
skip = set(args.skip.split(',')) if args.skip else set()
for module_id, module_name in modules_todo:
if module_name == resume:
resume = None
if resume or module_name in skip:
install(args.database, module_id, module_name)
else:
cycle(args.database, module_id, module_name)
def test_uninstall(args):
""" Tries to uninstall/reinstall one ore more modules"""
for module_name in args.uninstall.split(','):
with odoo.registry(args.database).cursor() as cr:
env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
module = env['ir.module.module'].search([('name', '=', module_name)])
module_id, module_state = module.id, module.state
if module_state == 'installed':
uninstall(args.database, module_id, module_name)
if args.reinstall and module_name not in INSTALL_BLACKLIST:
install(args.database, module_id, module_name)
elif module_state:
_logger.warning("Module %r is not installed", module_name)
else:
_logger.warning("Module %r does not exist", module_name)
def test_standalone(args):
""" Tries to launch standalone scripts tagged with @post_testing """
# load the registry once for script discovery
registry = odoo.registry(args.database)
for module_name in registry._init_modules:
# import tests for loaded modules
odoo.tests.loader.get_test_modules(module_name)
# fetch and filter scripts to test
funcs = list(unique(
func
for tag in args.standalone.split(',')
for func in standalone_tests[tag]
))
start_time = time.time()
for index, func in enumerate(funcs, start=1):
with odoo.registry(args.database).cursor() as cr:
env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {})
_logger.info("Executing standalone script: %s (%d / %d)",
func.__name__, index, len(funcs))
try:
func(env)
except Exception:
_logger.error("Standalone script %s failed", func.__name__, exc_info=True)
_logger.info("%d standalone scripts executed in %.2fs", len(funcs), time.time() - start_time)
if __name__ == '__main__':
args = parse_args()
config['db_name'] = threading.current_thread().dbname = args.database
# handle paths option
if args.addons_path:
odoo.tools.config['addons_path'] = ','.join([args.addons_path, odoo.tools.config['addons_path']])
if args.data_dir:
odoo.tools.config['data_dir'] = args.data_dir
odoo.modules.module.initialize_sys_path()
init_logger()
logging.config.dictConfig({
'version': 1,
'incremental': True,
'disable_existing_loggers': False,
'loggers': {
'odoo.modules.loading': {'level': 'CRITICAL'},
'odoo.sql_db': {'level': 'CRITICAL'},
'odoo.models.unlink': {'level': 'WARNING'},
'odoo.addons.base.models.ir_model': {'level': "WARNING"},
}
})
try:
args.func(args)
except Exception:
_logger.error("%s tests failed", args.func.__name__[5:])
raise

View file

@ -0,0 +1,18 @@
from os import environ, path
import subprocess
import sys
#This test is meant to be standalone, correct usage : python test_security.py file1 file2 file3 ...
if __name__ == '__main__':
HERE = path.dirname(__file__)
if 'PYTHONPATH' not in environ:
environ['PYTHONPATH'] = HERE
else:
environ['PYTHONPATH'] += ':' + HERE
command = ['pylint', '--rcfile=/dev/null', '--disable=all', '--output-format', 'json', '--enable=non-const-markup', '--reports=n', '--load-plugins=_odoo_checker_markup', *sys.argv[1:]]
proc = subprocess.run(command, env=environ, check=True)