Initial commit: OCA Report packages (45 packages)

This commit is contained in:
Ernad Husremovic 2025-08-29 15:43:05 +02:00
commit 2f4db400df
2543 changed files with 469120 additions and 0 deletions

View file

@ -0,0 +1,11 @@
# Copyright 2014 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from . import mis_report
from . import mis_report_subreport
from . import mis_report_instance
from . import mis_report_style
from . import aep
from . import mis_kpi_data
from . import prorata_read_group_mixin
from . import mis_report_instance_annotation

View file

@ -0,0 +1,215 @@
# Copyright 2016 Thomas Binsfeld
# Copyright 2016 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
"""
Provides the AccountingNone singleton.
AccountingNone is a null value that dissolves in basic arithmetic operations,
as illustrated in the examples below. In comparisons, AccountingNone behaves
the same as zero.
>>> 1 + 1
2
>>> 1 + AccountingNone
1
>>> AccountingNone + 1
1
>>> AccountingNone + None
AccountingNone
>>> None + AccountingNone
AccountingNone
>>> +AccountingNone
AccountingNone
>>> -AccountingNone
AccountingNone
>>> -(AccountingNone)
AccountingNone
>>> AccountingNone - 1
-1
>>> 1 - AccountingNone
1
>>> abs(AccountingNone)
AccountingNone
>>> AccountingNone - None
AccountingNone
>>> None - AccountingNone
AccountingNone
>>> AccountingNone / 2
0.0
>>> 2 / AccountingNone
Traceback (most recent call last):
...
ZeroDivisionError
>>> AccountingNone / AccountingNone
AccountingNone
>>> AccountingNone // 2
0.0
>>> 2 // AccountingNone
Traceback (most recent call last):
...
ZeroDivisionError
>>> AccountingNone // AccountingNone
AccountingNone
>>> AccountingNone * 2
0.0
>>> 2 * AccountingNone
0.0
>>> AccountingNone * AccountingNone
AccountingNone
>>> AccountingNone * None
AccountingNone
>>> None * AccountingNone
AccountingNone
>>> str(AccountingNone)
''
>>> bool(AccountingNone)
False
>>> AccountingNone > 0
False
>>> AccountingNone < 0
False
>>> AccountingNone < 1
True
>>> AccountingNone > 1
False
>>> 0 < AccountingNone
False
>>> 0 > AccountingNone
False
>>> 1 < AccountingNone
False
>>> 1 > AccountingNone
True
>>> AccountingNone == 0
True
>>> AccountingNone == 0.0
True
>>> AccountingNone == None
True
>>> AccountingNone >= AccountingNone
True
>>> AccountingNone <= AccountingNone
True
>>> round(AccountingNone, 2)
0.0
>>> float(AccountingNone)
0.0
>>> int(AccountingNone)
0
"""
__all__ = ["AccountingNone"]
class AccountingNoneType:
def __add__(self, other):
if other is None:
return AccountingNone
return other
__radd__ = __add__
def __sub__(self, other):
if other is None:
return AccountingNone
return -other
def __rsub__(self, other):
if other is None:
return AccountingNone
return other
def __iadd__(self, other):
if other is None:
return AccountingNone
return other
def __isub__(self, other):
if other is None:
return AccountingNone
return -other
def __abs__(self):
return self
def __pos__(self):
return self
def __neg__(self):
return self
def __div__(self, other):
if other is AccountingNone:
return AccountingNone
return 0.0
def __rdiv__(self, other):
raise ZeroDivisionError
def __floordiv__(self, other):
if other is AccountingNone:
return AccountingNone
return 0.0
def __rfloordiv__(self, other):
raise ZeroDivisionError
def __truediv__(self, other):
if other is AccountingNone:
return AccountingNone
return 0.0
def __rtruediv__(self, other):
raise ZeroDivisionError
def __mul__(self, other):
if other is None or other is AccountingNone:
return AccountingNone
return 0.0
__rmul__ = __mul__
def __repr__(self):
return "AccountingNone"
def __str__(self):
return ""
def __nonzero__(self):
return False
def __bool__(self):
return False
def __eq__(self, other):
return other == 0 or other is None or other is AccountingNone
def __lt__(self, other):
return other > 0
def __gt__(self, other):
return other < 0
def __le__(self, other):
return other >= 0
def __ge__(self, other):
return other <= 0
def __float__(self):
return 0.0
def __int__(self):
return 0
def __round__(self, ndigits):
return 0.0
AccountingNone = AccountingNoneType()
if __name__ == "__main__": # pragma: no cover
import doctest
doctest.testmod()

View file

@ -0,0 +1,660 @@
# Copyright 2014 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import logging
import re
from collections import defaultdict
from odoo import _, fields
from odoo.exceptions import UserError
from odoo.models import expression
from odoo.tools.float_utils import float_is_zero
from odoo.tools.safe_eval import datetime, dateutil, safe_eval, time
from .accounting_none import AccountingNone
_logger = logging.getLogger(__name__)
_DOMAIN_START_RE = re.compile(r"\(|(['\"])[!&|]\1")
def _is_domain(s):
"""Test if a string looks like an Odoo domain"""
return _DOMAIN_START_RE.match(s)
class Accumulator:
"""A simple class to accumulate debit, credit and custom field values.
>>> acc1 = Accumulator(["f1", "f2"])
>>> acc1.debit
AccountingNone
>>> acc1.credit
AccountingNone
>>> acc1.custom_fields
{'f1': AccountingNone, 'f2': AccountingNone}
>>> acc1.add_debit_credit(10, 20)
>>> acc1.debit, acc1.credit
(10, 20)
>>> acc1.add_custom_field("f1", 10)
>>> acc1.custom_fields
{'f1': 10, 'f2': AccountingNone}
>>> acc2 = Accumulator(["f1", "f2"])
>>> acc2.add_debit_credit(21, 31)
>>> acc2.add_custom_field("f2", 41)
>>> acc1 += acc2
>>> acc1.debit, acc1.credit
(31, 51)
>>> acc1.custom_fields
{'f1': 10, 'f2': 41}
"""
def __init__(self, custom_field_names=()):
self.debit = AccountingNone
self.credit = AccountingNone
self.custom_fields = {
custom_field: AccountingNone for custom_field in custom_field_names
}
def has_data(self):
return (
self.debit is not AccountingNone
or self.credit is not AccountingNone
or any(v is not AccountingNone for v in self.custom_fields.values())
)
def add_debit_credit(self, debit, credit):
self.debit += debit
self.credit += credit
def add_custom_field(self, field, value):
self.custom_fields[field] += value
def __iadd__(self, other):
self.debit += other.debit
self.credit += other.credit
for field in self.custom_fields:
self.custom_fields[field] += other.custom_fields[field]
return self
class AccountingExpressionProcessor:
"""Processor for accounting expressions.
Expressions of the form
<field><mode>(.fieldname)?[accounts][optional move line domain]
are supported, where:
* field is bal, crd, deb, pbal (positive balances only),
nbal (negative balance only), fld (custom field)
* mode is i (initial balance), e (ending balance),
p (moves over period)
* .fieldname is used only with fldp and specifies the field name to sum
* there is also a special u mode (unallocated P&L) which computes
the sum from the beginning until the beginning of the fiscal year
of the period; it is only meaningful for P&L accounts
* accounts is a list of accounts, possibly containing % wildcards,
or a domain expression on account.account
* an optional domain on move lines allowing filters on eg analytic
accounts or journal
Examples:
* bal[70]: variation of the balance of moves on account 70
over the period (it is the same as balp[70]);
* bali[70,60]: balance of accounts 70 and 60 at the start of period;
* bale[1%]: balance of accounts starting with 1 at end of period.
* fldp.quantity[60%]: sum of the quantity field of moves on accounts 60
How to use:
* repeatedly invoke parse_expr() for each expression containing
accounting variables as described above; this lets the processor
group domains and modes and accounts;
* when all expressions have been parsed, invoke done_parsing()
to notify the processor that it can prepare to query (mainly
search all accounts - children, consolidation - that will need to
be queried;
* for each period, call do_queries(), then call replace_expr() for each
expression to replace accounting variables with their resulting value
for the given period.
How it works:
* by accumulating the expressions before hand, it ensures to do the
strict minimum number of queries to the database (for each period,
one query per domain and mode);
* it queries using the orm read_group which reduces to a query with
sum on debit and credit and group by on account_id and company_id,
(note: it seems the orm then does one query per account to fetch
the account name...);
* additionally, one query per view/consolidation account is done to
discover the children accounts.
"""
MODE_VARIATION = "p"
MODE_INITIAL = "i"
MODE_END = "e"
MODE_UNALLOCATED = "u"
_ACC_RE = re.compile(
r"(?P<field>\bbal|\bpbal|\bnbal|\bcrd|\bdeb|\bfld)"
r"(?P<mode>[piseu])?"
r"(?P<fld_name>\.[a-zA-Z0-9_]+)?"
r"\s*"
r"(?P<account_sel>_[a-zA-Z0-9]+|\[.*?\])"
r"\s*"
r"(?P<ml_domain>\[.*?\])?"
)
def __init__(self, companies, currency=None, account_model="account.account"):
self.env = companies.env
self.companies = companies
if not currency:
self.currency = companies.mapped("currency_id")
if len(self.currency) > 1:
raise UserError(
_(
"If currency_id is not provided, "
"all companies must have the same currency."
)
)
else:
self.currency = currency
self.dp = self.currency.decimal_places
# before done_parsing: {(ml_domain, mode): set(acc_domain)}
# after done_parsing: {(ml_domain, mode): list(account_ids)}
self._map_account_ids = defaultdict(set)
# {account_domain: set(account_ids)}
self._account_ids_by_acc_domain = defaultdict(set)
# smart ending balance (returns AccountingNone if there
# are no moves in period and 0 initial balance), implies
# a first query to get the initial balance and another
# to get the variation, so it's a bit slower
self.smart_end = True
# custom field to query and sum
self._custom_fields = set()
# Account model
self._account_model = self.env[account_model].with_context(active_test=False)
def _account_codes_to_domain(self, account_codes):
"""Convert a comma separated list of account codes
(possibly with wildcards) to a domain on account.account.
"""
elems = []
for account_code in account_codes.split(","):
account_code = account_code.strip()
if "%" in account_code:
elems.append([("code", "=like", account_code)])
else:
elems.append([("code", "=", account_code)])
return tuple(expression.OR(elems))
def _parse_match_object(self, mo):
"""Split a match object corresponding to an accounting variable
Returns field, mode, fld_name, account domain, move line domain.
"""
domain_eval_context = {
"ref": self.env.ref,
"user": self.env.user,
"time": time,
"datetime": datetime,
"dateutil": dateutil,
}
field, mode, fld_name, account_sel, ml_domain = mo.groups()
# handle some legacy modes
if not mode:
mode = self.MODE_VARIATION
elif mode == "s":
mode = self.MODE_END
# custom fields
if fld_name:
assert fld_name[0] == "."
fld_name = fld_name[1:] # strip leading dot
# convert account selector to account domain
if account_sel.startswith("_"):
# legacy bal_NNN%
acc_domain = self._account_codes_to_domain(account_sel[1:])
else:
assert account_sel[0] == "[" and account_sel[-1] == "]"
inner_account_sel = account_sel[1:-1].strip()
if not inner_account_sel:
# empty selector: select all accounts
acc_domain = tuple()
elif _is_domain(inner_account_sel):
# account selector is a domain
acc_domain = tuple(safe_eval(account_sel, domain_eval_context))
else:
# account selector is a list of account codes
acc_domain = self._account_codes_to_domain(inner_account_sel)
# move line domain
if ml_domain:
assert ml_domain[0] == "[" and ml_domain[-1] == "]"
ml_domain = tuple(safe_eval(ml_domain, domain_eval_context))
else:
ml_domain = tuple()
return field, mode, fld_name, acc_domain, ml_domain
def parse_expr(self, expr):
"""Parse an expression, extracting accounting variables.
Move line domains and account selectors are extracted and
stored in the map so when all expressions have been parsed,
we know which account domains to query for each move line domain
and mode.
"""
for mo in self._ACC_RE.finditer(expr):
field, mode, fld_name, acc_domain, ml_domain = self._parse_match_object(mo)
if mode == self.MODE_END and self.smart_end:
modes = (self.MODE_INITIAL, self.MODE_VARIATION, self.MODE_END)
else:
modes = (mode,)
for mode in modes:
key = (ml_domain, mode)
self._map_account_ids[key].add(acc_domain)
if field == "fld":
if mode != self.MODE_VARIATION:
raise UserError(
_(
"`fld` can only be used with mode `p` (variation) "
"in expression %s",
expr,
)
)
if not fld_name:
raise UserError(
_("`fld` must have a field name in exression %s", expr)
)
self._custom_fields.add(fld_name)
else:
if fld_name:
raise UserError(
_(
"`%(field)s` cannot have a field name "
"in expression %(expr)s",
field=field,
expr=expr,
)
)
def done_parsing(self):
"""Replace account domains by account ids in map"""
for key, acc_domains in self._map_account_ids.items():
all_account_ids = set()
for acc_domain in acc_domains:
acc_domain_with_company = expression.AND(
[acc_domain, [("company_id", "in", self.companies.ids)]]
)
account_ids = self._account_model.search(acc_domain_with_company).ids
self._account_ids_by_acc_domain[acc_domain].update(account_ids)
all_account_ids.update(account_ids)
self._map_account_ids[key] = list(all_account_ids)
@classmethod
def has_account_var(cls, expr):
"""Test if an string contains an accounting variable."""
return bool(cls._ACC_RE.search(expr))
def get_account_ids_for_expr(self, expr):
"""Get a set of account ids that are involved in an expression.
Prerequisite: done_parsing() must have been invoked.
"""
account_ids = set()
for mo in self._ACC_RE.finditer(expr):
_, _, _, acc_domain, _ = self._parse_match_object(mo)
account_ids.update(self._account_ids_by_acc_domain[acc_domain])
return account_ids
def get_aml_domain_for_expr(self, expr, date_from, date_to, account_id=None):
"""Get a domain on account.move.line for an expression.
Prerequisite: done_parsing() must have been invoked.
Returns a domain that can be used to search on account.move.line.
"""
aml_domains = []
date_domain_by_mode = {}
for mo in self._ACC_RE.finditer(expr):
field, mode, fld_name, acc_domain, ml_domain = self._parse_match_object(mo)
aml_domain = list(ml_domain)
account_ids = set()
account_ids.update(self._account_ids_by_acc_domain[acc_domain])
if not account_id:
aml_domain.append(("account_id", "in", tuple(account_ids)))
else:
# filter on account_id
if account_id in account_ids:
aml_domain.append(("account_id", "=", account_id))
else:
continue
if field == "crd":
aml_domain.append(("credit", "<>", 0.0))
elif field == "deb":
aml_domain.append(("debit", "<>", 0.0))
elif fld_name:
aml_domain.append((fld_name, "!=", False))
aml_domains.append(expression.normalize_domain(aml_domain))
if mode not in date_domain_by_mode:
date_domain_by_mode[mode] = self.get_aml_domain_for_dates(
date_from, date_to, mode
)
assert aml_domains
# TODO we could do this for more precision:
# AND(OR(aml_domains[mode]), date_domain[mode]) for each mode
return expression.OR(aml_domains) + expression.OR(date_domain_by_mode.values())
def get_aml_domain_for_dates(self, date_from, date_to, mode):
if mode == self.MODE_VARIATION:
domain = [("date", ">=", date_from), ("date", "<=", date_to)]
elif mode in (self.MODE_INITIAL, self.MODE_END):
# for income and expense account, sum from the beginning
# of the current fiscal year only, for balance sheet accounts
# sum from the beginning of time
date_from_date = fields.Date.from_string(date_from)
# TODO this takes the fy from the first company
# make that user controllable (nice to have)?
fy_date_from = self.companies[0].compute_fiscalyear_dates(date_from_date)[
"date_from"
]
domain = [
"|",
("date", ">=", fields.Date.to_string(fy_date_from)),
("account_id.include_initial_balance", "=", True),
]
if mode == self.MODE_INITIAL:
domain.append(("date", "<", date_from))
elif mode == self.MODE_END:
domain.append(("date", "<=", date_to))
elif mode == self.MODE_UNALLOCATED:
date_from_date = fields.Date.from_string(date_from)
# TODO this takes the fy from the first company
# make that user controllable (nice to have)?
fy_date_from = self.companies[0].compute_fiscalyear_dates(date_from_date)[
"date_from"
]
domain = [
("date", "<", fields.Date.to_string(fy_date_from)),
("account_id.include_initial_balance", "=", False),
]
return expression.normalize_domain(domain)
def _get_company_rates(self, date):
# get exchange rates for each company with its rouding
company_rates = {}
target_rate = self.currency.with_context(date=date).rate
for company in self.companies:
if company.currency_id != self.currency:
rate = target_rate / company.currency_id.with_context(date=date).rate
else:
rate = 1.0
company_rates[company.id] = (rate, company.currency_id.decimal_places)
return company_rates
def do_queries(
self,
date_from,
date_to,
additional_move_line_filter=None,
aml_model=None,
):
"""Query sums of debit and credit for all accounts and domains
used in expressions.
This method must be executed after done_parsing().
"""
if not aml_model:
aml_model = self.env["account.move.line"]
else:
aml_model = self.env[aml_model]
aml_model = aml_model.with_context(active_test=False)
company_rates = self._get_company_rates(date_to)
# {(domain, mode): {account_id: Accumulator}}
self._data = defaultdict(
lambda: defaultdict(
lambda: Accumulator(self._custom_fields),
)
)
domain_by_mode = {}
ends = []
for key in self._map_account_ids:
domain, mode = key
if mode == self.MODE_END and self.smart_end:
# postpone computation of ending balance
ends.append((domain, mode))
continue
if mode not in domain_by_mode:
domain_by_mode[mode] = self.get_aml_domain_for_dates(
date_from, date_to, mode
)
domain = list(domain) + domain_by_mode[mode]
domain.append(("account_id", "in", self._map_account_ids[key]))
if additional_move_line_filter:
domain.extend(additional_move_line_filter)
# fetch sum of debit/credit, grouped by account_id
_logger.debug("read_group domain: %s", domain)
try:
accs = aml_model.read_group(
domain,
[
"debit",
"credit",
"account_id",
"company_id",
*self._custom_fields,
],
["account_id", "company_id"],
lazy=False,
)
except ValueError as e:
raise UserError(
_(
'Error while querying move line source "%(model_name)s". '
"This is likely due to a filter or expression referencing "
"a field that does not exist in the model.\n\n"
"The technical error message is: %(exception)s. "
)
% dict(
model_name=aml_model._description,
exception=e,
)
) from e
for acc in accs:
rate, dp = company_rates[acc["company_id"][0]]
debit = acc["debit"] or 0.0
credit = acc["credit"] or 0.0
if mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED) and float_is_zero(
debit - credit, precision_digits=self.dp
):
# in initial mode, ignore accounts with 0 balance
continue
# due to branches, it's possible to have multiple groups
# with the same account_id, because multiple companies can
# use the same account
account_data = self._data[key][acc["account_id"][0]]
account_data.add_debit_credit(debit * rate, credit * rate)
for field_name in self._custom_fields:
account_data.add_custom_field(
field_name, acc[field_name] or AccountingNone
)
# compute ending balances by summing initial and variation
for key in ends:
domain, mode = key
initial_data = self._data[(domain, self.MODE_INITIAL)]
variation_data = self._data[(domain, self.MODE_VARIATION)]
account_ids = set(initial_data.keys()) | set(variation_data.keys())
for account_id in account_ids:
self._data[key][account_id] += initial_data[account_id]
self._data[key][account_id] += variation_data[account_id]
def replace_expr(self, expr):
"""Replace accounting variables in an expression by their amount.
Returns a new expression string.
This method must be executed after do_queries().
"""
def f(mo):
field, mode, fld_name, acc_domain, ml_domain = self._parse_match_object(mo)
key = (ml_domain, mode)
account_ids_data = self._data[key]
v = AccountingNone
account_ids = self._account_ids_by_acc_domain[acc_domain]
for account_id in account_ids:
entry = account_ids_data[account_id]
debit = entry.debit
credit = entry.credit
if field == "bal":
v += debit - credit
elif field == "pbal":
if debit >= credit:
v += debit - credit
elif field == "nbal":
if debit < credit:
v += debit - credit
elif field == "deb":
v += debit
elif field == "crd":
v += credit
else:
assert field == "fld"
v += entry.custom_fields[fld_name]
# in initial balance mode, assume 0 is None
# as it does not make sense to distinguish 0 from "no data"
if (
v is not AccountingNone
and mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED)
and float_is_zero(v, precision_digits=self.dp)
):
v = AccountingNone
return "(" + repr(v) + ")"
return self._ACC_RE.sub(f, expr)
def replace_exprs_by_account_id(self, exprs):
"""Replace accounting variables in a list of expression
by their amount, iterating by accounts involved in the expression.
yields account_id, replaced_expr
This method must be executed after do_queries().
"""
def f(mo):
field, mode, fld_name, acc_domain, ml_domain = self._parse_match_object(mo)
key = (ml_domain, mode)
# first check if account_id is involved in
# the current expression part
if account_id not in self._account_ids_by_acc_domain[acc_domain]:
return "(AccountingNone)"
# here we know account_id is involved in acc_domain
account_ids_data = self._data[key]
entry = account_ids_data[account_id]
debit = entry.debit
credit = entry.credit
if field == "bal":
v = debit - credit
elif field == "pbal":
if debit >= credit:
v = debit - credit
else:
v = AccountingNone
elif field == "nbal":
if debit < credit:
v = debit - credit
else:
v = AccountingNone
elif field == "deb":
v = debit
elif field == "crd":
v = credit
else:
assert field == "fld"
v = entry.custom_fields[fld_name]
# in initial balance mode, assume 0 is None
# as it does not make sense to distinguish 0 from "no data"
if (
v is not AccountingNone
and mode in (self.MODE_INITIAL, self.MODE_UNALLOCATED)
and float_is_zero(v, precision_digits=self.dp)
):
v = AccountingNone
return "(" + repr(v) + ")"
account_ids = set()
for expr in exprs:
for mo in self._ACC_RE.finditer(expr):
_, mode, _, acc_domain, ml_domain = self._parse_match_object(mo)
key = (ml_domain, mode)
account_ids_data = self._data[key]
for account_id in self._account_ids_by_acc_domain[acc_domain]:
if account_ids_data[account_id].has_data():
account_ids.add(account_id)
for account_id in account_ids:
yield account_id, [self._ACC_RE.sub(f, expr) for expr in exprs]
@classmethod
def _get_balances(cls, mode, companies, date_from, date_to):
expr = f"deb{mode}[], crd{mode}[]"
aep = AccountingExpressionProcessor(companies)
# disable smart_end to have the data at once, instead
# of initial + variation
aep.smart_end = False
aep.parse_expr(expr)
aep.done_parsing()
aep.do_queries(date_from, date_to)
return {k: (v.debit, v.credit) for k, v in aep._data[((), mode)].items()}
@classmethod
def get_balances_initial(cls, companies, date):
"""A convenience method to obtain the initial balances of all accounts
at a given date.
It is the same as get_balances_end(date-1).
:param companies:
:param date:
Returns a dictionary: {account_id, (debit, credit)}
"""
return cls._get_balances(cls.MODE_INITIAL, companies, date, date)
@classmethod
def get_balances_end(cls, companies, date):
"""A convenience method to obtain the ending balances of all accounts
at a given date.
It is the same as get_balances_initial(date+1).
:param companies:
:param date:
Returns a dictionary: {account_id, (debit, credit)}
"""
return cls._get_balances(cls.MODE_END, companies, date, date)
@classmethod
def get_balances_variation(cls, companies, date_from, date_to):
"""A convenience method to obtain the variation of the
balances of all accounts over a period.
:param companies:
:param date:
Returns a dictionary: {account_id, (debit, credit)}
"""
return cls._get_balances(cls.MODE_VARIATION, companies, date_from, date_to)
@classmethod
def get_unallocated_pl(cls, companies, date):
"""A convenience method to obtain the unallocated profit/loss
of the previous fiscal years at a given date.
:param companies:
:param date:
Returns a tuple (debit, credit)
"""
# TODO shoud we include here the accounts of type "unaffected"
# or leave that to the caller?
bals = cls._get_balances(cls.MODE_UNALLOCATED, companies, date, date)
return tuple(map(sum, zip(*bals.values()))) # noqa: B905

View file

@ -0,0 +1,129 @@
# Copyright 2014 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
def _sum(lst):
"""Same as stdlib sum but returns None instead of 0
in case of empty sequence.
>>> sum([1])
1
>>> _sum([1])
1
>>> sum([1, 2])
3
>>> _sum([1, 2])
3
>>> sum([])
0
>>> _sum([])
"""
if not lst:
return None
return sum(lst)
def _avg(lst):
"""Arithmetic mean of a sequence. Returns None in case of empty sequence.
>>> _avg([1])
1.0
>>> _avg([1, 2])
1.5
>>> _avg([])
"""
if not lst:
return None
return sum(lst) / float(len(lst))
def _min(*args):
"""Same as stdlib min but returns None instead of exception
in case of empty sequence.
>>> min(1, 2)
1
>>> _min(1, 2)
1
>>> min([1, 2])
1
>>> _min([1, 2])
1
>>> min(1)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: 'int' object is not iterable
>>> _min(1)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: 'int' object is not iterable
>>> min([1])
1
>>> _min([1])
1
>>> min()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: min expected at least 1 argument, got 0
>>> _min()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: min expected at least 1 argument, got 0
>>> min([])
Traceback (most recent call last):
File "<stdin>", line 1, in ?
ValueError: min() arg is an empty sequence
>>> _min([])
"""
if len(args) == 1 and not args[0]:
return None
return min(*args)
def _max(*args):
"""Same as stdlib max but returns None instead of exception
in case of empty sequence.
>>> max(1, 2)
2
>>> _max(1, 2)
2
>>> max([1, 2])
2
>>> _max([1, 2])
2
>>> max(1)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: 'int' object is not iterable
>>> _max(1)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: 'int' object is not iterable
>>> max([1])
1
>>> _max([1])
1
>>> max()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: max expected at least 1 argument, got 0
>>> _max()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
TypeError: max expected at least 1 argument, got 0
>>> max([])
Traceback (most recent call last):
File "<stdin>", line 1, in ?
ValueError: max() arg is an empty sequence
>>> _max([])
"""
if len(args) == 1 and not args[0]:
return None
return max(*args)
if __name__ == "__main__": # pragma: no cover
import doctest
doctest.testmod()

View file

@ -0,0 +1,17 @@
# Copyright 2016 ACSONE SA/NV (<http://acsone.eu>)
# Copyright 2016 Akretion (<http://akretion.com>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
class DataError(Exception):
def __init__(self, name, msg):
super().__init__()
self.name = name
self.msg = msg
def __repr__(self):
return f"{self.__class__.__name__}({repr(self.name)})"
class NameDataError(DataError):
pass

View file

@ -0,0 +1,68 @@
# Copyright 2020 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from .mis_safe_eval import NameDataError, mis_safe_eval
class ExpressionEvaluator:
def __init__(
self,
aep,
date_from,
date_to,
additional_move_line_filter=None,
aml_model=None,
):
self.aep = aep
self.date_from = date_from
self.date_to = date_to
self.additional_move_line_filter = additional_move_line_filter
self.aml_model = aml_model
self._aep_queries_done = False
def aep_do_queries(self):
if self.aep and not self._aep_queries_done:
self.aep.do_queries(
self.date_from,
self.date_to,
self.additional_move_line_filter,
self.aml_model,
)
self._aep_queries_done = True
def eval_expressions(self, expressions, locals_dict):
vals = []
drilldown_args = []
name_error = False
for expression in expressions:
expr = expression and expression.name or "AccountingNone"
if self.aep:
replaced_expr = self.aep.replace_expr(expr)
else:
replaced_expr = expr
val = mis_safe_eval(replaced_expr, locals_dict)
vals.append(val)
if isinstance(val, NameDataError):
name_error = True
if replaced_expr != expr:
drilldown_args.append({"expr": expr})
else:
drilldown_args.append(None)
return vals, drilldown_args, name_error
def eval_expressions_by_account(self, expressions, locals_dict):
if not self.aep:
return
exprs = [e and e.name or "AccountingNone" for e in expressions]
for account_id, replaced_exprs in self.aep.replace_exprs_by_account_id(exprs):
vals = []
drilldown_args = []
name_error = False
for expr, replaced_expr in zip(exprs, replaced_exprs): # noqa: B905
val = mis_safe_eval(replaced_expr, locals_dict)
vals.append(val)
if replaced_expr != expr:
drilldown_args.append({"expr": expr, "account_id": account_id})
else:
drilldown_args.append(None)
yield account_id, vals, drilldown_args, name_error

View file

@ -0,0 +1,576 @@
# Copyright 2014 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from __future__ import annotations
import logging
from collections import OrderedDict, defaultdict
from odoo import _
from odoo.exceptions import UserError
from .accounting_none import AccountingNone
from .mis_kpi_data import ACC_SUM
from .mis_safe_eval import DataError, mis_safe_eval
from .simple_array import SimpleArray
_logger = logging.getLogger(__name__)
class KpiMatrixRow:
# TODO: ultimately, the kpi matrix will become ignorant of KPI's and
# accounts and know about rows, columns, sub columns and styles only.
# It is already ignorant of period and only knowns about columns.
# This will require a correct abstraction for expanding row details.
def __init__(self, matrix, kpi, account_id=None, parent_row=None):
self._matrix = matrix
self.kpi = kpi
self.account_id = account_id
self.description = ""
self.parent_row = parent_row
if not self.account_id:
self.style_props = self._matrix._style_model.merge(
[self.kpi.report_id.style_id, self.kpi.style_id]
)
else:
self.style_props = self._matrix._style_model.merge(
[self.kpi.report_id.style_id, self.kpi.auto_expand_accounts_style_id]
)
@property
def label(self):
if not self.account_id:
return self.kpi.description
else:
return self._matrix.get_account_name(self.account_id)
@property
def row_id(self):
self._matrix._make_row_id(self.kpi.id, self.account_id)
def iter_cell_tuples(self, cols=None):
if cols is None:
cols = self._matrix.iter_cols()
for col in cols:
yield col.get_cell_tuple_for_row(self)
def iter_cells(self, subcols=None):
if subcols is None:
subcols = self._matrix.iter_subcols()
for subcol in subcols:
yield subcol.get_cell_for_row(self)
def is_empty(self):
for cell in self.iter_cells():
if cell and cell.val not in (AccountingNone, None):
return False
return True
class KpiMatrixCol:
def __init__(self, key, label, description, locals_dict, subkpis):
self.key = key
self.label = label
self.description = description
self.locals_dict = locals_dict
self.colspan = subkpis and len(subkpis) or 1
self._subcols = []
self.subkpis = subkpis
if not subkpis:
subcol = KpiMatrixSubCol(self, "", "", 0)
self._subcols.append(subcol)
else:
for i, subkpi in enumerate(subkpis):
subcol = KpiMatrixSubCol(self, subkpi.description, "", i)
self._subcols.append(subcol)
self._cell_tuples_by_row = {} # {row: (cells tuple)}
def _set_cell_tuple(self, row, cell_tuple):
self._cell_tuples_by_row[row] = cell_tuple
def iter_subcols(self):
return self._subcols
def iter_cell_tuples(self):
return self._cell_tuples_by_row.values()
def get_cell_tuple_for_row(self, row):
return self._cell_tuples_by_row.get(row)
class KpiMatrixSubCol:
def __init__(self, col, label, description, index=0):
self.col = col
self.label = label
self.description = description
self.index = index
@property
def subkpi(self):
if self.col.subkpis:
return self.col.subkpis[self.index]
def iter_cells(self):
for cell_tuple in self.col.iter_cell_tuples():
yield cell_tuple[self.index]
def get_cell_for_row(self, row):
cell_tuple = self.col.get_cell_tuple_for_row(row)
if cell_tuple is None:
return None
return cell_tuple[self.index]
class KpiMatrixCell: # noqa: B903 (immutable data class)
def __init__(
self,
row,
subcol,
val,
val_rendered,
val_comment,
style_props,
drilldown_arg,
val_type,
):
self.row = row
self.subcol = subcol
self.val = val
self.val_rendered = val_rendered
self.val_comment = val_comment
self.style_props = style_props
self.drilldown_arg = drilldown_arg
self.val_type = val_type
self.cell_id = KpiMatrix._pack_cell_id(self)
class KpiMatrix:
def __init__(self, env, multi_company=False, account_model="account.account"):
# cache language id for faster rendering
lang_model = env["res.lang"]
self.lang = lang_model._lang_get(env.user.lang)
self._style_model = env["mis.report.style"]
self._account_model = env[account_model]
# data structures
# { kpi: KpiMatrixRow }
self._kpi_rows = OrderedDict()
# { kpi: {account_id: KpiMatrixRow} }
self._detail_rows = {}
# { col_key: KpiMatrixCol }
self._cols = OrderedDict()
# { col_key (left of comparison): [(col_key, base_col_key)] }
self._comparison_todo = defaultdict(list)
# { col_key (left of sum): (col_key, [(sign, sum_col_key)])
self._sum_todo = {}
# { account_id: account_name }
self._account_names = {}
self._multi_company = multi_company
def declare_kpi(self, kpi):
"""Declare a new kpi (row) in the matrix.
Invoke this first for all kpi, in display order.
"""
self._kpi_rows[kpi] = KpiMatrixRow(self, kpi)
self._detail_rows[kpi] = {}
def declare_col(self, col_key, label, description, locals_dict, subkpis):
"""Declare a new column, giving it an identifier (key).
Invoke the declare_* methods in display order.
"""
col = KpiMatrixCol(col_key, label, description, locals_dict, subkpis)
self._cols[col_key] = col
return col
def declare_comparison(
self, cmpcol_key, col_key, base_col_key, label, description=None
):
"""Declare a new comparison column.
Invoke the declare_* methods in display order.
"""
self._comparison_todo[cmpcol_key] = (col_key, base_col_key, label, description)
self._cols[cmpcol_key] = None # reserve slot in insertion order
def declare_sum(
self, sumcol_key, col_to_sum_keys, label, description=None, sum_accdet=False
):
"""Declare a new summation column.
Invoke the declare_* methods in display order.
:param col_to_sum_keys: [(sign, col_key)]
"""
self._sum_todo[sumcol_key] = (col_to_sum_keys, label, description, sum_accdet)
self._cols[sumcol_key] = None # reserve slot in insertion order
def set_values(self, kpi, col_key, vals, drilldown_args, tooltips=True):
"""Set values for a kpi and a colum.
Invoke this after declaring the kpi and the column.
"""
self.set_values_detail_account(
kpi, col_key, None, vals, drilldown_args, tooltips
)
def set_values_detail_account(
self, kpi, col_key, account_id, vals, drilldown_args, tooltips=True
):
"""Set values for a kpi and a column and a detail account.
Invoke this after declaring the kpi and the column.
"""
if not account_id:
row = self._kpi_rows[kpi]
else:
kpi_row = self._kpi_rows[kpi]
if account_id in self._detail_rows[kpi]:
row = self._detail_rows[kpi][account_id]
else:
row = KpiMatrixRow(self, kpi, account_id, parent_row=kpi_row)
self._detail_rows[kpi][account_id] = row
col = self._cols[col_key]
cell_tuple = []
assert len(vals) == col.colspan
assert len(drilldown_args) == col.colspan
for val, drilldown_arg, subcol in zip(vals, drilldown_args, col.iter_subcols()): # noqa: B905
if isinstance(val, DataError):
val_rendered = val.name
val_comment = val.msg
else:
val_rendered = self._style_model.render(
self.lang, row.style_props, kpi.type, val
)
if row.kpi.multi and subcol.subkpi:
val_comment = "{}.{} = {}".format(
row.kpi.name,
subcol.subkpi.name,
row.kpi._get_expression_str_for_subkpi(subcol.subkpi),
)
else:
val_comment = f"{row.kpi.name} = {row.kpi.expression}"
cell_style_props = row.style_props
if row.kpi.style_expression:
# evaluate style expression
try:
style_name = mis_safe_eval(
row.kpi.style_expression, col.locals_dict
)
except Exception:
_logger.error(
"Error evaluating style expression <%s>",
row.kpi.style_expression,
exc_info=True,
)
if style_name:
style = self._style_model.search([("name", "=", style_name)])
if style:
cell_style_props = self._style_model.merge(
[row.style_props, style[0]]
)
else:
_logger.error("Style '%s' not found.", style_name)
cell = KpiMatrixCell(
row,
subcol,
val,
val_rendered,
tooltips and val_comment or None,
cell_style_props,
drilldown_arg,
kpi.type,
)
cell_tuple.append(cell)
assert len(cell_tuple) == col.colspan
col._set_cell_tuple(row, cell_tuple)
def _common_subkpis(self, cols):
if not cols:
return set()
common_subkpis = set(cols[0].subkpis)
for col in cols[1:]:
common_subkpis = common_subkpis & set(col.subkpis)
return common_subkpis
def compute_comparisons(self):
"""Compute comparisons.
Invoke this after setting all values.
"""
for (
cmpcol_key,
(col_key, base_col_key, label, description),
) in self._comparison_todo.items():
col = self._cols[col_key]
base_col = self._cols[base_col_key]
common_subkpis = self._common_subkpis([col, base_col])
if (col.subkpis or base_col.subkpis) and not common_subkpis:
raise UserError(
_(
"Columns %(descr)s and %(base_descr)s are not comparable",
descr=col.description,
base_descr=base_col.description,
)
)
if not label:
label = f"{col.label} vs {base_col.label}"
comparison_col = KpiMatrixCol(
cmpcol_key,
label,
description,
{},
sorted(common_subkpis, key=lambda s: s.sequence),
)
self._cols[cmpcol_key] = comparison_col
for row in self.iter_rows():
cell_tuple = col.get_cell_tuple_for_row(row)
base_cell_tuple = base_col.get_cell_tuple_for_row(row)
if cell_tuple is None and base_cell_tuple is None:
continue
if cell_tuple is None:
vals = [AccountingNone] * (len(common_subkpis) or 1)
else:
vals = [
cell.val
for cell in cell_tuple
if not common_subkpis or cell.subcol.subkpi in common_subkpis
]
if base_cell_tuple is None:
base_vals = [AccountingNone] * (len(common_subkpis) or 1)
else:
base_vals = [
cell.val
for cell in base_cell_tuple
if not common_subkpis or cell.subcol.subkpi in common_subkpis
]
comparison_cell_tuple = []
for val, base_val, comparison_subcol in zip( # noqa: B905
vals,
base_vals,
comparison_col.iter_subcols(),
):
# TODO FIXME average factors
comparison = self._style_model.compare_and_render(
self.lang,
row.style_props,
row.kpi.type,
row.kpi.compare_method,
val,
base_val,
1,
1,
)
delta, delta_r, delta_style, delta_type = comparison
comparison_cell_tuple.append(
KpiMatrixCell(
row,
comparison_subcol,
delta,
delta_r,
None,
delta_style,
None,
delta_type,
)
)
comparison_col._set_cell_tuple(row, comparison_cell_tuple)
def compute_sums(self):
"""Compute comparisons.
Invoke this after setting all values.
"""
for (
sumcol_key,
(col_to_sum_keys, label, description, sum_accdet),
) in self._sum_todo.items():
sumcols = [self._cols[k] for (sign, k) in col_to_sum_keys]
# TODO check all sumcols are resolved; we need a kind of
# recompute queue here so we don't depend on insertion
# order
common_subkpis = self._common_subkpis(sumcols)
if any(c.subkpis for c in sumcols) and not common_subkpis:
raise UserError(
_(
"Sum cannot be computed in column {} "
"because the columns to sum have no "
"common subkpis"
).format(label)
)
sum_col = KpiMatrixCol(
sumcol_key,
label,
description,
{},
sorted(common_subkpis, key=lambda s: s.sequence),
)
self._cols[sumcol_key] = sum_col
for row in self.iter_rows():
acc = SimpleArray([AccountingNone] * (len(common_subkpis) or 1))
if row.kpi.accumulation_method == ACC_SUM and not (
row.account_id and not sum_accdet
):
for sign, col_to_sum in col_to_sum_keys:
cell_tuple = self._cols[col_to_sum].get_cell_tuple_for_row(row)
if cell_tuple is None:
vals = [AccountingNone] * (len(common_subkpis) or 1)
else:
vals = [
cell.val
for cell in cell_tuple
if not common_subkpis
or cell.subcol.subkpi in common_subkpis
]
if sign == "+":
acc += SimpleArray(vals)
else:
acc -= SimpleArray(vals)
self.set_values_detail_account(
row.kpi,
sumcol_key,
row.account_id,
acc,
[None] * (len(common_subkpis) or 1),
tooltips=False,
)
def iter_rows(self):
"""Iterate rows in display order.
yields KpiMatrixRow.
"""
for kpi_row in self._kpi_rows.values():
yield kpi_row
detail_rows = self._detail_rows[kpi_row.kpi].values()
detail_rows = sorted(detail_rows, key=lambda r: r.label)
yield from detail_rows
def iter_cols(self):
"""Iterate columns in display order.
yields KpiMatrixCol: one for each column or comparison.
"""
for _col_key, col in self._cols.items():
yield col
def iter_subcols(self):
"""Iterate sub columns in display order.
yields KpiMatrixSubCol: one for each subkpi in each column
and comparison.
"""
for col in self.iter_cols():
yield from col.iter_subcols()
def _load_account_names(self):
account_ids = set()
for detail_rows in self._detail_rows.values():
account_ids.update(detail_rows.keys())
accounts = self._account_model.search([("id", "in", list(account_ids))])
self._account_names = {a.id: self._get_account_name(a) for a in accounts}
def _get_account_name(self, account):
result = f"{account.code} {account.name}"
if self._multi_company:
result = f"{result} [{account.company_id.name}]"
return result
def get_account_name(self, account_id):
if account_id not in self._account_names:
self._load_account_names()
return self._account_names[account_id]
def as_dict(self):
header = [{"cols": []}, {"cols": []}]
for col in self.iter_cols():
header[0]["cols"].append(
{
"label": col.label,
"description": col.description,
"colspan": col.colspan,
}
)
for subcol in col.iter_subcols():
header[1]["cols"].append(
{
"label": subcol.label,
"description": subcol.description,
"colspan": 1,
}
)
body = []
for row in self.iter_rows():
if (
row.style_props.hide_empty and row.is_empty()
) or row.style_props.hide_always:
continue
row_data = {
"row_id": row.row_id,
"parent_row_id": (row.parent_row and row.parent_row.row_id or None),
"label": row.label,
"description": row.description,
"style": self._style_model.to_css_style(row.style_props),
"cells": [],
}
for cell in row.iter_cells():
if cell is None:
# TODO use subcol style here
row_data["cells"].append({})
else:
if cell.val is AccountingNone or isinstance(cell.val, DataError):
val = None
else:
val = cell.val
col_data = {
"cell_id": cell.cell_id,
"val": val,
"val_r": cell.val_rendered,
"val_c": cell.val_comment,
"style": self._style_model.to_css_style(
cell.style_props, no_indent=True
),
# notes can not be added on 'details by account' lines
"can_be_annotated": not cell.row.account_id,
}
if cell.drilldown_arg:
col_data["drilldown_arg"] = cell.drilldown_arg
row_data["cells"].append(col_data)
body.append(row_data)
return {"header": header, "body": body}
# Logic to convert semantic coordinates (period, kpi, subkpi)
# to visual coordinates (cell id) and back. The rendering logic musn't know
# about semantic concepts such as periods and kpis. Having these well identified
# methods allow us to easily spot where the conversion between the rendering and
# semantic domain occur.
@classmethod
def _make_row_id(cls, kpi_id: int, account_id: int | None) -> str:
return f"{kpi_id}:{account_id or ''}"
@classmethod
def _make_cell_id(
cls, kpi_id: int, account_id: int | None, period_id: int, subkpi_id: int | None
) -> str:
return f"{kpi_id}#{account_id or ''}#{period_id}#{subkpi_id or ''}"
@classmethod
def _pack_cell_id(cls, cell: KpiMatrixCell) -> str:
return cls._make_cell_id(
cell.row.kpi.id,
cell.row.account_id,
cell.subcol.col.key,
cell.subcol.subkpi and cell.subcol.subkpi.id,
)
@classmethod
def _unpack_cell_id(cls, cell_id: str) -> tuple[int, int | None, int, int | None]:
kpi_id, account_id, col_key, subkpi_id = cell_id.split("#")
kpi_id = int(kpi_id)
account_id = int(account_id) if account_id else None
period_id = int(col_key)
subkpi_id = int(subkpi_id) if subkpi_id else None
return kpi_id, account_id, period_id, subkpi_id

View file

@ -0,0 +1,115 @@
# Copyright 2017 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from collections import defaultdict
from odoo import _, api, fields, models
from odoo.exceptions import UserError
from odoo.osv import expression
ACC_SUM = "sum"
ACC_AVG = "avg"
ACC_NONE = "none"
def intersect_days(item_dt_from, item_dt_to, dt_from, dt_to):
item_days = (item_dt_to - item_dt_from).days + 1.0
i_dt_from = max(dt_from, item_dt_from)
i_dt_to = min(dt_to, item_dt_to)
i_days = (i_dt_to - i_dt_from).days + 1.0
return i_days, item_days
class MisKpiData(models.AbstractModel):
"""Abstract class for manually entered KPI values."""
_name = "mis.kpi.data"
_description = "MIS Kpi Data Abtract class"
name = fields.Char(compute="_compute_name", required=False, readonly=True)
kpi_expression_id = fields.Many2one(
comodel_name="mis.report.kpi.expression",
required=True,
ondelete="restrict",
string="KPI",
)
date_from = fields.Date(required=True, string="From")
date_to = fields.Date(required=True, string="To")
amount = fields.Float()
seq1 = fields.Integer(
related="kpi_expression_id.kpi_id.sequence",
store=True,
readonly=True,
string="KPI Sequence",
)
seq2 = fields.Integer(
related="kpi_expression_id.subkpi_id.sequence",
store=True,
readonly=True,
string="Sub-KPI Sequence",
)
@api.depends(
"kpi_expression_id.subkpi_id.name",
"kpi_expression_id.kpi_id.name",
"date_from",
"date_to",
)
def _compute_name(self):
for rec in self:
subkpi_name = rec.kpi_expression_id.subkpi_id.name
if subkpi_name:
subkpi_name = "." + subkpi_name
else:
subkpi_name = ""
rec.name = "{}{}: {} - {}".format(
rec.kpi_expression_id.kpi_id.name,
subkpi_name,
rec.date_from,
rec.date_to,
)
@api.model
def _intersect_days(self, item_dt_from, item_dt_to, dt_from, dt_to):
return intersect_days(item_dt_from, item_dt_to, dt_from, dt_to)
@api.model
def _query_kpi_data(self, date_from, date_to, base_domain):
"""Query mis.kpi.data over a time period.
Returns {mis.report.kpi.expression: amount}
"""
dt_from = fields.Date.from_string(date_from)
dt_to = fields.Date.from_string(date_to)
# all data items within or overlapping [date_from, date_to]
date_domain = [("date_from", "<=", date_to), ("date_to", ">=", date_from)]
domain = expression.AND([date_domain, base_domain])
res = defaultdict(float)
res_avg = defaultdict(list)
for item in self.search(domain):
item_dt_from = fields.Date.from_string(item.date_from)
item_dt_to = fields.Date.from_string(item.date_to)
i_days, item_days = self._intersect_days(
item_dt_from, item_dt_to, dt_from, dt_to
)
if item.kpi_expression_id.kpi_id.accumulation_method == ACC_SUM:
# accumulate pro-rata overlap between item and reporting period
res[item.kpi_expression_id] += item.amount * i_days / item_days
elif item.kpi_expression_id.kpi_id.accumulation_method == ACC_AVG:
# memorize the amount and number of days overlapping
# the reporting period (used as weight in average)
res_avg[item.kpi_expression_id].append((i_days, item.amount))
else:
raise UserError(
_(
"Unexpected accumulation method %(method)s for %(name)s.",
method=item.kpi_expression_id.kpi_id.accumulation_method,
name=item.name,
)
)
# compute weighted average for ACC_AVG
for kpi_expression, amounts in res_avg.items():
res[kpi_expression] = sum(d * a for d, a in amounts) / sum(
d for d, a in amounts
)
return res

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,113 @@
# Copyright 2025 ACSONE SA/NV
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
from odoo import _, api, fields, models
from odoo.exceptions import AccessError
from .kpimatrix import KpiMatrix
class MisReportInstanceAnnotation(models.Model):
_name = "mis.report.instance.annotation"
_description = "Mis Report Instance Annotation"
period_id = fields.Many2one(
comodel_name="mis.report.instance.period",
ondelete="cascade",
required=True,
)
kpi_id = fields.Many2one(
comodel_name="mis.report.kpi",
ondelete="cascade",
required=True,
)
subkpi_id = fields.Many2one(
comodel_name="mis.report.subkpi",
ondelete="cascade",
)
note = fields.Char()
annotation_context = fields.Json(
help="""
Context used when adding annotation
"""
)
def init(self):
self.env.cr.execute(
"""
CREATE INDEX IF NOT EXISTS
mis_report_instance_annotation_period_id_kpi_id_subkpi_id_idx
ON mis_report_instance_annotation(period_id,kpi_id,subkpi_id);
"""
)
@api.model
def _get_first_matching_annotation(self, cell_id, instance_id):
"""
Return first annoation
matching exactly the period,kpi,subkpi and annotation context
"""
kpi_id, _, period_id, subkpi_id = KpiMatrix._unpack_cell_id(cell_id)
annotations = self.env["mis.report.instance.annotation"].search(
[
("period_id", "=", period_id),
("kpi_id", "=", kpi_id),
("subkpi_id", "=", subkpi_id),
],
)
annotation_context = (
self.env["mis.report.instance"]
.browse(instance_id)
._get_annotation_context()
)
annotation = fields.first(
annotations.filtered(
lambda rec: rec.annotation_context == annotation_context
)
)
return annotation
@api.model
def set_annotation(self, cell_id, instance_id, note):
if (
not self.env["mis.report.instance"]
.browse(instance_id)
.user_can_edit_annotation
):
raise AccessError(_("You do not have the rights to edit annotations"))
annotation = self._get_first_matching_annotation(cell_id, instance_id)
if annotation:
annotation.note = note
else:
kpi_id, _account_id, period_id, subkpi_id = KpiMatrix._unpack_cell_id(
cell_id
)
self.env["mis.report.instance.annotation"].create(
{
"period_id": period_id,
"kpi_id": kpi_id,
"subkpi_id": subkpi_id,
"note": note,
"annotation_context": self.env["mis.report.instance"]
.browse(instance_id)
._get_annotation_context(),
}
)
@api.model
def remove_annotation(self, cell_id, instance_id):
if (
not self.env["mis.report.instance"]
.browse(instance_id)
.user_can_edit_annotation
):
raise AccessError(_("You do not have the rights to edit annotations"))
annotation = self._get_first_matching_annotation(cell_id, instance_id)
if annotation:
annotation.unlink()

View file

@ -0,0 +1,314 @@
# Copyright 2016 Therp BV (<http://therp.nl>)
# Copyright 2016 ACSONE SA/NV (<http://acsone.eu>)
# Copyright 2020 CorporateHub (https://corporatehub.eu)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import sys
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from .accounting_none import AccountingNone
from .data_error import DataError
if sys.version_info.major >= 3:
unicode = str
class PropertyDict(dict):
def __getattr__(self, name):
return self.get(name)
def copy(self): # pylint: disable=copy-wo-api-one,method-required-super
return PropertyDict(self)
PROPS = [
"color",
"background_color",
"font_style",
"font_weight",
"font_size",
"indent_level",
"prefix",
"suffix",
"dp",
"divider",
"hide_empty",
"hide_always",
]
TYPE_NUM = "num"
TYPE_PCT = "pct"
TYPE_STR = "str"
CMP_DIFF = "diff"
CMP_PCT = "pct"
CMP_NONE = "none"
class MisReportKpiStyle(models.Model):
_name = "mis.report.style"
_description = "MIS Report Style"
@api.constrains("indent_level")
def check_positive_val(self):
for record in self:
if record.indent_level < 0:
raise ValidationError(
_("Indent level must be greater than " "or equal to 0")
)
_font_style_selection = [("normal", "Normal"), ("italic", "Italic")]
_font_weight_selection = [("nornal", "Normal"), ("bold", "Bold")]
_font_size_selection = [
("medium", "medium"),
("xx-small", "xx-small"),
("x-small", "x-small"),
("small", "small"),
("large", "large"),
("x-large", "x-large"),
("xx-large", "xx-large"),
]
_font_size_to_xlsx_size = {
"medium": 11,
"xx-small": 5,
"x-small": 7,
"small": 9,
"large": 13,
"x-large": 15,
"xx-large": 17,
}
# style name
name = fields.Char(string="Style name", required=True)
# color
color_inherit = fields.Boolean(default=True)
color = fields.Char(
string="Text color",
help="Text color in valid RGB code (from #000000 to #FFFFFF)",
default="#000000",
)
background_color_inherit = fields.Boolean(default=True)
background_color = fields.Char(
help="Background color in valid RGB code (from #000000 to #FFFFFF)",
default="#FFFFFF",
)
# font
font_style_inherit = fields.Boolean(default=True)
font_style = fields.Selection(selection=_font_style_selection)
font_weight_inherit = fields.Boolean(default=True)
font_weight = fields.Selection(selection=_font_weight_selection)
font_size_inherit = fields.Boolean(default=True)
font_size = fields.Selection(selection=_font_size_selection)
# indent
indent_level_inherit = fields.Boolean(default=True)
indent_level = fields.Integer()
# number format
prefix_inherit = fields.Boolean(default=True)
prefix = fields.Char()
suffix_inherit = fields.Boolean(default=True)
suffix = fields.Char()
dp_inherit = fields.Boolean(default=True)
dp = fields.Integer(string="Rounding", default=0)
divider_inherit = fields.Boolean(default=True)
divider = fields.Selection(
[
("1e-6", _("µ")),
("1e-3", _("m")),
("1", _("1")),
("1e3", _("k")),
("1e6", _("M")),
],
string="Factor",
default="1",
)
hide_empty_inherit = fields.Boolean(default=True)
hide_empty = fields.Boolean(default=False)
hide_always_inherit = fields.Boolean(default=True)
hide_always = fields.Boolean(default=False)
_sql_constraints = [
("style_name_uniq", "unique(name)", "Style name should be unique")
]
@api.model
def merge(self, styles):
"""Merge several styles, giving priority to the last.
Returns a PropertyDict of style properties.
"""
r = PropertyDict()
for style in styles:
if not style:
continue
if isinstance(style, dict):
r.update(style)
else:
for prop in PROPS:
inherit = getattr(style, prop + "_inherit", None)
if not inherit:
value = getattr(style, prop)
r[prop] = value
return r
@api.model
def render(self, lang, style_props, var_type, value, sign="-"):
if var_type == TYPE_NUM:
return self.render_num(
lang,
value,
style_props.divider,
style_props.dp,
style_props.prefix,
style_props.suffix,
sign=sign,
)
elif var_type == TYPE_PCT:
return self.render_pct(lang, value, style_props.dp, sign=sign)
else:
return self.render_str(lang, value)
@api.model
def render_num(
self, lang, value, divider=1.0, dp=0, prefix=None, suffix=None, sign="-"
):
# format number following user language
if value is None or value is AccountingNone:
return ""
value = round(value / float(divider or 1), dp or 0) or 0
r = lang.format("%%%s.%df" % (sign, dp or 0), value, grouping=True)
r = r.replace("-", "\N{NON-BREAKING HYPHEN}")
if prefix:
r = prefix + "\N{NO-BREAK SPACE}" + r
if suffix:
r = r + "\N{NO-BREAK SPACE}" + suffix
return r
@api.model
def render_pct(self, lang, value, dp=1, sign="-"):
return self.render_num(lang, value, divider=0.01, dp=dp, suffix="%", sign=sign)
@api.model
def render_str(self, lang, value):
if value is None or value is AccountingNone:
return ""
return unicode(value)
@api.model
def compare_and_render(
self,
lang,
style_props,
var_type,
compare_method,
value,
base_value,
average_value=1,
average_base_value=1,
):
"""
:param lang: res.lang record
:param style_props: PropertyDict with style properties
:param var_type: num, pct or str
:param compare_method: diff, pct, none
:param value: value to compare (value - base_value)
:param base_value: value compared with (value - base_value)
:param average_value: value = value / average_value
:param average_base_value: base_value = base_value / average_base_value
:return: tuple with 4 elements
- delta = comparison result (Float or AccountingNone)
- delta_r = delta rendered in formatted string (String)
- delta_style = PropertyDict with style properties
- delta_type = Type of the comparison result (num or pct)
"""
delta = AccountingNone
delta_r = ""
delta_style = style_props.copy()
delta_type = TYPE_NUM
if isinstance(value, DataError) or isinstance(base_value, DataError):
return AccountingNone, "", delta_style, delta_type
if value is None:
value = AccountingNone
if base_value is None:
base_value = AccountingNone
if var_type == TYPE_PCT:
delta = value - base_value
if delta and round(delta, (style_props.dp or 0) + 2) != 0:
delta_style.update(divider=0.01, prefix="", suffix=_("pp"))
else:
delta = AccountingNone
elif var_type == TYPE_NUM:
if value and average_value:
# pylint: disable=redefined-variable-type
value = value / float(average_value)
if base_value and average_base_value:
# pylint: disable=redefined-variable-type
base_value = base_value / float(average_base_value)
if compare_method == CMP_DIFF:
delta = value - base_value
if delta and round(delta, style_props.dp or 0) != 0:
pass
else:
delta = AccountingNone
elif compare_method == CMP_PCT:
if base_value and round(base_value, style_props.dp or 0) != 0:
delta = (value - base_value) / abs(base_value)
if delta and round(delta, 3) != 0:
delta_style.update(dp=1)
delta_type = TYPE_PCT
else:
delta = AccountingNone
if delta is not AccountingNone:
delta_r = self.render(lang, delta_style, delta_type, delta, sign="+")
return delta, delta_r, delta_style, delta_type
@api.model
def to_xlsx_style(self, var_type, props, no_indent=False):
xlsx_attributes = [
("italic", props.font_style == "italic"),
("bold", props.font_weight == "bold"),
("font_size", self._font_size_to_xlsx_size.get(props.font_size, 11)),
("font_color", props.color),
("bg_color", props.background_color),
]
if var_type == TYPE_NUM:
num_format = "#,##0"
if props.dp:
num_format += "."
num_format += "0" * props.dp
if props.prefix:
num_format = f'"{props.prefix} "{num_format}'
if props.suffix:
num_format = f'{num_format}" {props.suffix}"'
xlsx_attributes.append(("num_format", num_format))
elif var_type == TYPE_PCT:
num_format = "0"
if props.dp:
num_format += "."
num_format += "0" * props.dp
num_format += "%"
xlsx_attributes.append(("num_format", num_format))
if props.indent_level is not None and not no_indent:
xlsx_attributes.append(("indent", props.indent_level))
return dict([a for a in xlsx_attributes if a[1] is not None])
@api.model
def to_css_style(self, props, no_indent=False):
css_attributes = [
("font-style", props.font_style),
("font-weight", props.font_weight),
("font-size", props.font_size),
("color", props.color),
("background-color", props.background_color),
]
if props.indent_level is not None and not no_indent:
css_attributes.append(("text-indent", f"{props.indent_level}em"))
return (
"; ".join(["{}: {}".format(*a) for a in css_attributes if a[1] is not None])
or None
)

View file

@ -0,0 +1,74 @@
# Copyright 2020 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from .mis_report import _is_valid_python_var
class ParentLoopError(ValidationError):
pass
class InvalidNameError(ValidationError):
pass
class MisReportSubReport(models.Model):
_name = "mis.report.subreport"
_description = "MIS Report - Sub Reports Relation"
name = fields.Char(required=True)
report_id = fields.Many2one(
comodel_name="mis.report",
required=True,
ondelete="cascade",
)
subreport_id = fields.Many2one(
comodel_name="mis.report",
required=True,
ondelete="restrict",
)
_sql_constraints = [
(
"name_unique",
"unique(name, report_id)",
"Subreport name should be unique by report",
),
(
"subreport_unique",
"unique(subreport_id, report_id)",
"Should not include the same report more than once as sub report "
"of a given report",
),
]
@api.constrains("name")
def _check_name(self):
for rec in self:
if not _is_valid_python_var(rec.name):
raise InvalidNameError(
_("Subreport name ({}) must be a valid python identifier").format(
rec.name
)
)
@api.constrains("report_id", "subreport_id")
def _check_loop(self):
def _has_subreport(reports, report):
if not reports:
return False
if report in reports:
return True
return any(
_has_subreport(r.subreport_ids.mapped("subreport_id"), report)
for r in reports
)
for rec in self:
if _has_subreport(rec.subreport_id, rec.report_id):
raise ParentLoopError(_("Subreport loop detected"))
# TODO check subkpi compatibility in subreports

View file

@ -0,0 +1,33 @@
# Copyright 2016 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import traceback
from odoo.tools.safe_eval import _BUILTINS, _SAFE_OPCODES, test_expr
from .data_error import DataError, NameDataError
__all__ = ["mis_safe_eval"]
def mis_safe_eval(expr, locals_dict):
"""Evaluate an expression using safe_eval
Returns the evaluated value or DataError.
Raises NameError if the evaluation depends on a variable that is not
present in local_dict.
"""
try:
c = test_expr(expr, _SAFE_OPCODES, mode="eval")
globals_dict = {"__builtins__": _BUILTINS}
# pylint: disable=eval-used,eval-referenced
val = eval(c, globals_dict, locals_dict)
except NameError:
val = NameDataError("#NAME", traceback.format_exc())
except ZeroDivisionError:
# pylint: disable=redefined-variable-type
val = DataError("#DIV/0", traceback.format_exc())
except Exception:
val = DataError("#ERR", traceback.format_exc())
return val

View file

@ -0,0 +1,96 @@
# Copyright 2020 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import _, api, fields, models
from odoo.exceptions import UserError
from odoo.fields import Date
from .mis_kpi_data import intersect_days
class ProRataReadGroupMixin(models.AbstractModel):
_name = "prorata.read_group.mixin"
_description = "Adapt model with date_from/date_to for pro-rata temporis read_group"
date_from = fields.Date(required=True)
date_to = fields.Date(required=True)
date = fields.Date(
compute=lambda self: None,
search="_search_date",
help=(
"Dummy field that adapts searches on date "
"to searches on date_from/date_to."
),
)
def _search_date(self, operator, value):
if operator in (">=", ">"):
return [("date_to", operator, value)]
elif operator in ("<=", "<"):
return [("date_from", operator, value)]
raise UserError(
_("Unsupported operator %s for searching on date") % (operator,)
)
@api.model
def _intersect_days(self, item_dt_from, item_dt_to, dt_from, dt_to):
return intersect_days(item_dt_from, item_dt_to, dt_from, dt_to)
@api.model
def read_group(
self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True
):
"""Override read_group to perform pro-rata temporis adjustments.
When read_group is invoked with a domain that filters on
a time period (date >= from and date <= to, or
date_from <= to and date_to >= from), adjust the accumulated
values pro-rata temporis.
"""
date_from = None
date_to = None
assert isinstance(domain, list)
for domain_item in domain:
if isinstance(domain_item, list | tuple):
field, op, value = domain_item
if field == "date" and op == ">=":
date_from = value
elif field == "date_to" and op == ">=":
date_from = value
elif field == "date" and op == "<=":
date_to = value
elif field == "date_from" and op == "<=":
date_to = value
if (
date_from is not None
and date_to is not None
and not any(":" in f for f in fields)
):
dt_from = Date.from_string(date_from)
dt_to = Date.from_string(date_to)
res = {}
sum_fields = set(fields) - set(groupby)
read_fields = set(fields + ["date_from", "date_to"])
for item in self.search(domain).read(read_fields):
key = tuple(item[k] for k in groupby)
if key not in res:
res[key] = {k: item[k] for k in groupby}
res[key].update({k: 0.0 for k in sum_fields})
res_item = res[key]
for sum_field in sum_fields:
item_dt_from = Date.from_string(item["date_from"])
item_dt_to = Date.from_string(item["date_to"])
i_days, item_days = self._intersect_days(
item_dt_from, item_dt_to, dt_from, dt_to
)
res_item[sum_field] += item[sum_field] * i_days / item_days
return res.values()
return super().read_group(
domain,
fields,
groupby,
offset=offset,
limit=limit,
orderby=orderby,
lazy=lazy,
)

View file

@ -0,0 +1,184 @@
# Copyright 2014 ACSONE SA/NV (<http://acsone.eu>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
""" A trivial immutable array that supports basic arithmetic operations.
>>> a = SimpleArray((1.0, 2.0, 3.0))
>>> b = SimpleArray((4.0, 5.0, 6.0))
>>> t = (4.0, 5.0, 6.0)
>>> +a
SimpleArray((1.0, 2.0, 3.0))
>>> -a
SimpleArray((-1.0, -2.0, -3.0))
>>> a + b
SimpleArray((5.0, 7.0, 9.0))
>>> b + a
SimpleArray((5.0, 7.0, 9.0))
>>> a + t
SimpleArray((5.0, 7.0, 9.0))
>>> t + a
SimpleArray((5.0, 7.0, 9.0))
>>> a - b
SimpleArray((-3.0, -3.0, -3.0))
>>> a - t
SimpleArray((-3.0, -3.0, -3.0))
>>> t - a
SimpleArray((3.0, 3.0, 3.0))
>>> a * b
SimpleArray((4.0, 10.0, 18.0))
>>> b * a
SimpleArray((4.0, 10.0, 18.0))
>>> a * t
SimpleArray((4.0, 10.0, 18.0))
>>> t * a
SimpleArray((4.0, 10.0, 18.0))
>>> a / b
SimpleArray((0.25, 0.4, 0.5))
>>> b / a
SimpleArray((4.0, 2.5, 2.0))
>>> a / t
SimpleArray((0.25, 0.4, 0.5))
>>> t / a
SimpleArray((4.0, 2.5, 2.0))
>>> b / 2
SimpleArray((2.0, 2.5, 3.0))
>>> 2 * b
SimpleArray((8.0, 10.0, 12.0))
>>> 1 - b
SimpleArray((-3.0, -4.0, -5.0))
>>> b += 2 ; b
SimpleArray((6.0, 7.0, 8.0))
>>> a / ((1.0, 0.0, 1.0))
SimpleArray((1.0, DataError('#DIV/0'), 3.0))
>>> a / 0.0
SimpleArray((DataError('#DIV/0'), DataError('#DIV/0'), DataError('#DIV/0')))
>>> a * ((1.0, 'a', 1.0))
SimpleArray((1.0, DataError('#ERR'), 3.0))
>>> 6.0 / a
SimpleArray((6.0, 3.0, 2.0))
>>> Vector = named_simple_array('Vector', ('x', 'y'))
>>> p1 = Vector((1, 2))
>>> print(p1.x, p1.y, p1)
1 2 Vector((1, 2))
>>> p2 = Vector((2, 3))
>>> print(p2.x, p2.y, p2)
2 3 Vector((2, 3))
>>> p3 = p1 + p2
>>> print(p3.x, p3.y, p3)
3 5 Vector((3, 5))
>>> p4 = (4, 5) + p2
>>> print(p4.x, p4.y, p4)
6 8 Vector((6, 8))
>>> p1 * 2
Vector((2, 4))
>>> 2 * p1
Vector((2, 4))
>>> p1 - 1
Vector((0, 1))
>>> 1 - p1
Vector((0, -1))
>>> p1 / 2.0
Vector((0.5, 1.0))
>>> v = 2.0 / p1
>>> print(v.x, v.y, v)
2.0 1.0 Vector((2.0, 1.0))
"""
import itertools
import operator
import traceback
from .data_error import DataError
__all__ = ["SimpleArray", "named_simple_array"]
class SimpleArray(tuple):
def _op(self, op, other):
def _o2(x, y):
try:
return op(x, y)
except ZeroDivisionError:
return DataError("#DIV/0", traceback.format_exc())
except Exception:
return DataError("#ERR", traceback.format_exc())
if isinstance(other, tuple):
if len(other) != len(self):
raise TypeError("tuples must have same length for %s" % op)
return self.__class__(map(_o2, self, other))
else:
return self.__class__(_o2(z, other) for z in self)
def _cast(self, other):
if isinstance(other, self.__class__):
return other
elif isinstance(other, tuple):
return self.__class__(other)
else:
# other is a scalar
return self.__class__(itertools.repeat(other, len(self)))
def __add__(self, other):
return self._op(operator.add, other)
__radd__ = __add__
def __pos__(self):
return self.__class__(map(operator.pos, self))
def __neg__(self):
return self.__class__(map(operator.neg, self))
def __sub__(self, other):
return self._op(operator.sub, other)
def __rsub__(self, other):
return self._cast(other)._op(operator.sub, self)
def __mul__(self, other):
return self._op(operator.mul, other)
__rmul__ = __mul__
def __div__(self, other):
return self._op(operator.div, other)
def __floordiv__(self, other):
return self._op(operator.floordiv, other)
def __truediv__(self, other):
return self._op(operator.truediv, other)
def __rdiv__(self, other):
return self._cast(other)._op(operator.div, self)
def __rfloordiv__(self, other):
return self._cast(other)._op(operator.floordiv, self)
def __rtruediv__(self, other):
return self._cast(other)._op(operator.truediv, self)
def __repr__(self):
return f"{self.__class__.__name__}({tuple.__repr__(self)})"
def named_simple_array(typename, field_names):
"""Return a subclass of SimpleArray, with named properties.
This method is to SimpleArray what namedtuple is to tuple.
It's less sophisticated than namedtuple so some namedtuple
advanced use cases may not work, but it's good enough for
our needs in mis_builder, ie referring to subkpi values
by name.
"""
props = {
field_name: property(operator.itemgetter(i))
for i, field_name in enumerate(field_names)
}
return type(typename, (SimpleArray,), props)
if __name__ == "__main__": # pragma: no cover
import doctest
doctest.testmod()