vanilla 18.0

This commit is contained in:
Ernad Husremovic 2025-10-08 10:48:04 +02:00
parent 0a7ae8db93
commit 5454004ff9
1963 changed files with 1187893 additions and 919508 deletions

View file

@ -5,5 +5,9 @@ from . import ir_qweb_fields
from . import ir_http
from . import ir_model
from . import ir_ui_menu
from . import ir_ui_view
from . import models
from . import base_document_layout
from . import res_config_settings
from . import res_partner
from . import res_users

View file

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
import markupsafe
import os
from markupsafe import Markup
from math import ceil
from odoo import api, fields, models, tools
from odoo import api, fields, models
from odoo.addons.base.models.ir_qweb_fields import nl2br
from odoo.modules import get_resource_path
from odoo.tools import file_path, html2plaintext, is_html_empty
from odoo.tools import html2plaintext, is_html_empty, image as tools
from odoo.tools.misc import file_path
try:
import sass as libsass
@ -47,7 +47,7 @@ class BaseDocumentLayout(models.TransientModel):
if 'company_name' not in address_format:
address_format = '%(company_name)s\n' + address_format
company_data['company_name'] = company_data['company_name'] or company.name
return Markup(nl2br(address_format)) % company_data
return nl2br(address_format) % company_data
def _clean_address_format(self, address_format, company_data):
missing_company_data = [k for k, v in company_data.items() if not v]
@ -126,22 +126,29 @@ class BaseDocumentLayout(models.TransientModel):
for wizard in self:
if wizard.report_layout_id:
# guarantees that bin_size is always set to False,
# so the logo always contains the bin data instead of the binary size
if wizard.env.context.get('bin_size'):
wizard_with_logo = wizard.with_context(bin_size=False)
else:
wizard_with_logo = wizard
preview_css = markupsafe.Markup(self._get_css_for_preview(styles, wizard_with_logo.id))
ir_ui_view = wizard_with_logo.env['ir.ui.view']
wizard.preview = ir_ui_view._render_template('web.report_invoice_wizard_preview', {
'company': wizard_with_logo,
'preview_css': preview_css,
'is_html_empty': is_html_empty,
})
# guarantees that bin_size is always set to False,
# so the logo always contains the bin data instead of the binary size
wizard = wizard.with_context(bin_size=False)
wizard.preview = wizard.env['ir.ui.view']._render_template(
wizard._get_preview_template(),
wizard._get_render_information(styles),
)
else:
wizard.preview = False
def _get_preview_template(self):
return 'web.report_invoice_wizard_preview'
def _get_render_information(self, styles):
self.ensure_one()
preview_css = self._get_css_for_preview(styles, self.id)
return {
'company': self,
'preview_css': preview_css,
'is_html_empty': is_html_empty,
}
@api.onchange('company_id')
def _onchange_company_id(self):
for wizard in self:
@ -217,7 +224,7 @@ class BaseDocumentLayout(models.TransientModel):
return False, False
base_w, base_h = image.size
w = int(50 * base_w / base_h)
w = ceil(50 * base_w / base_h)
h = 50
# Converts to RGBA (if already RGBA, this is a noop)
@ -251,14 +258,6 @@ class BaseDocumentLayout(models.TransientModel):
return tools.rgb_to_hex(primary), tools.rgb_to_hex(secondary)
@api.model
def action_open_base_document_layout(self, action_ref=None):
if not action_ref:
action_ref = 'web.action_base_document_layout_configurator'
res = self.env["ir.actions.actions"]._for_xml_id(action_ref)
self.env[res["res_model"]].check_access_rights('write')
return res
def document_layout_save(self):
# meant to be overridden
return self.env.context.get('report_action') or {'type': 'ir.actions.act_window_close'}
@ -306,10 +305,10 @@ class BaseDocumentLayout(models.TransientModel):
precision = 8
output_style = 'expanded'
bootstrap_path = get_resource_path('web', 'static', 'lib', 'bootstrap', 'scss')
bootstrap_path = file_path('web/static/lib/bootstrap/scss')
try:
return libsass.compile(
compiled_css = libsass.compile(
string=scss_source,
include_paths=[
bootstrap_path,
@ -318,6 +317,7 @@ class BaseDocumentLayout(models.TransientModel):
output_style=output_style,
precision=precision,
)
return Markup(compiled_css) if isinstance(compiled_css, Markup) else compiled_css
except libsass.CompileError as e:
raise libsass.CompileError(e.args[0])

View file

@ -2,17 +2,14 @@
import hashlib
import json
import logging
import odoo
from odoo import api, http, models
from odoo.http import request
from odoo.tools import file_open, image_process, ustr
from odoo import api, models, fields
from odoo.http import request, DEFAULT_MAX_CONTENT_LENGTH
from odoo.tools import ormcache, config
from odoo.tools.misc import str2bool
_logger = logging.getLogger(__name__)
"""
Debug mode is stored in session and should always be a string.
It can be activated with an URL query string `debug=<mode>` where mode
@ -42,6 +39,12 @@ class Http(models.AbstractModel):
# timeit has been done to check the optimum method
return any(bot in user_agent for bot in cls.bots)
@classmethod
def _sanitize_cookies(cls, cookies):
super()._sanitize_cookies(cookies)
if cids := cookies.get('cids'):
cookies['cids'] = '-'.join(cids.split(','))
@classmethod
def _handle_debug(cls):
debug = request.httprequest.args.get('debug')
@ -58,6 +61,11 @@ class Http(models.AbstractModel):
super()._pre_dispatch(rule, args)
cls._handle_debug()
@classmethod
def _post_logout(cls):
super()._post_logout()
request.future_response.set_cookie('cids', max_age=0)
def webclient_rendering_context(self):
return {
'menu_data': request.env['ir.ui.menu'].load_menus(request.session.debug),
@ -79,24 +87,29 @@ class Http(models.AbstractModel):
IrConfigSudo = self.env['ir.config_parameter'].sudo()
max_file_upload_size = int(IrConfigSudo.get_param(
'web.max_file_upload_size',
default=128 * 1024 * 1024, # 128MiB
default=DEFAULT_MAX_CONTENT_LENGTH,
))
mods = odoo.conf.server_wide_modules or []
if request.db:
mods = list(request.registry._init_modules) + mods
is_internal_user = user._is_internal()
session_info = {
"uid": session_uid,
"is_system": user._is_system() if session_uid else False,
"is_admin": user._is_admin() if session_uid else False,
"is_public": user._is_public(),
"is_internal_user": is_internal_user,
"user_context": user_context,
"db": self.env.cr.dbname,
"user_settings": self.env['res.users.settings']._find_or_create_for_user(user)._res_users_settings_format(),
"server_version": version_info.get('server_version'),
"server_version_info": version_info.get('server_version_info'),
"support_url": "https://www.odoo.com/buy",
"name": user.name,
"username": user.login,
"quick_login": str2bool(IrConfigSudo.get_param('web.quick_login', default=True), True),
"partner_write_date": fields.Datetime.to_string(user.partner_id.write_date),
"partner_display_name": user.partner_id.display_name,
"company_id": user.company_id.id if session_uid else None, # YTI TODO: Remove this from the user context
"partner_id": user.partner_id.id if session_uid and user.partner_id else None,
"web.base.url": IrConfigSudo.get_param('web.base.url', default=''),
"active_ids_limit": int(IrConfigSudo.get_param('web.active_ids_limit', default='20000')),
@ -114,20 +127,25 @@ class Http(models.AbstractModel):
'bundle_params': {
'lang': request.session.context['lang'],
},
'test_mode': bool(config['test_enable'] or config['test_file']),
'view_info': self.env['ir.ui.view'].get_view_info(),
}
if request.session.debug:
session_info['bundle_params']['debug'] = request.session.debug
if self.env.user.has_group('base.group_user'):
if is_internal_user:
# the following is only useful in the context of a webclient bootstrapping
# but is still included in some other calls (e.g. '/web/session/authenticate')
# to avoid access errors and unnecessary information, it is only included for users
# with access to the backend ('internal'-type users)
menus = self.env['ir.ui.menu'].with_context(lang=request.session.context['lang']).load_menus(request.session.debug)
ordered_menus = {str(k): v for k, v in menus.items()}
menu_json_utf8 = json.dumps(ordered_menus, default=ustr, sort_keys=True).encode()
menu_json_utf8 = json.dumps(ordered_menus, sort_keys=True).encode()
session_info['cache_hashes'].update({
"load_menus": hashlib.sha512(menu_json_utf8).hexdigest()[:64], # sha512/256
})
# We need sudo since a user may not have access to ancestor companies
disallowed_ancestor_companies_sudo = user.company_ids.sudo().parent_ids - user.company_ids
all_companies_in_hierarchy_sudo = disallowed_ancestor_companies_sudo + user.company_ids
session_info.update({
# current_company should be default_company
"user_companies": {
@ -137,8 +155,19 @@ class Http(models.AbstractModel):
'id': comp.id,
'name': comp.name,
'sequence': comp.sequence,
'child_ids': (comp.child_ids & all_companies_in_hierarchy_sudo).ids,
'parent_id': comp.parent_id.id,
} for comp in user.company_ids
},
'disallowed_ancestor_companies': {
comp.id: {
'id': comp.id,
'name': comp.name,
'sequence': comp.sequence,
'child_ids': (comp.child_ids & all_companies_in_hierarchy_sudo).ids,
'parent_id': comp.parent_id.id,
} for comp in disallowed_ancestor_companies_sudo
},
},
"show_effect": True,
"display_switch_company_menu": user.has_group('base.group_multi_company') and len(user.company_ids) > 1,
@ -152,16 +181,21 @@ class Http(models.AbstractModel):
session_info = {
'is_admin': user._is_admin() if session_uid else False,
'is_system': user._is_system() if session_uid else False,
'is_public': user._is_public(),
"is_internal_user": user._is_internal(),
'is_website_user': user._is_public() if session_uid else False,
'user_id': user.id if session_uid else False,
'uid': session_uid,
'is_frontend': True,
'profile_session': request.session.profile_session,
'profile_collectors': request.session.profile_collectors,
'profile_params': request.session.profile_params,
'show_effect': bool(request.env['ir.config_parameter'].sudo().get_param('base_setup.show_effect')),
'currencies': self.get_currencies(),
'quick_login': str2bool(request.env['ir.config_parameter'].sudo().get_param('web.quick_login', default=True), True),
'bundle_params': {
'lang': request.session.context['lang'],
},
'test_mode': bool(config['test_enable'] or config['test_file']),
}
if request.session.debug:
session_info['bundle_params']['debug'] = request.session.debug
@ -173,7 +207,11 @@ class Http(models.AbstractModel):
})
return session_info
@ormcache()
def get_currencies(self):
Currency = self.env['res.currency']
currencies = Currency.search([]).read(['symbol', 'position', 'decimal_places'])
return {c['id']: {'symbol': c['symbol'], 'position': c['position'], 'digits': [69,c['decimal_places']]} for c in currencies}
currencies = Currency.search_fetch([], ['symbol', 'position', 'decimal_places'])
return {
c.id: {'symbol': c.symbol, 'position': c.position, 'digits': [69, c.decimal_places]}
for c in currencies
}

View file

@ -19,7 +19,7 @@ class IrModel(models.Model):
accessible_models = []
not_accessible_models = []
for model in models:
if self._check_model_access(model):
if self._is_valid_for_model_selector(model):
accessible_models.append(model)
else:
not_accessible_models.append({"display_name": model, "model": model})
@ -34,9 +34,15 @@ class IrModel(models.Model):
} for model in records]
@api.model
def _check_model_access(self, model):
return (self.env.user._is_internal() and model in self.env
and self.env[model].check_access_rights("read", raise_exception=False))
def _is_valid_for_model_selector(self, model):
model = self.env.get(model)
return (
self.env.user._is_internal()
and model is not None
and model.has_access("read")
and not model._transient
and not model._abstract
)
@api.model
def get_available_models(self):
@ -44,5 +50,48 @@ class IrModel(models.Model):
Return the list of models the current user has access to, with their
corresponding display name.
"""
accessible_models = [model for model in self.pool.keys() if self._check_model_access(model)]
accessible_models = [model for model in self.pool if self._is_valid_for_model_selector(model)]
return self._display_name_for(accessible_models)
def _get_definitions(self, model_names):
model_definitions = {}
for model_name in model_names:
model = self.env[model_name]
# get fields, relational fields are kept only if the related model is in model_names
fields_data_by_fname = {
fname: field_data
for fname, field_data in model.fields_get(
attributes={
'definition_record_field', 'definition_record', 'aggregator',
'name', 'readonly', 'related', 'relation', 'required', 'searchable',
'selection', 'sortable', 'store', 'string', 'tracking', 'type',
},
).items()
if field_data.get('selectable', True) and (
not field_data.get('relation') or field_data['relation'] in model_names
)
}
fields_data_by_fname = {
fname: field_data
for fname, field_data in fields_data_by_fname.items()
if not field_data.get('related') or field_data['related'].split('.')[0] in fields_data_by_fname
}
for fname, field_data in fields_data_by_fname.items():
if fname in model._fields:
inverse_fields = [
field for field in model.pool.field_inverses[model._fields[fname]]
if field.model_name in model_names
]
if inverse_fields:
field_data['inverse_fname_by_model_name'] = {field.model_name: field.name for field in inverse_fields}
if field_data['type'] == 'many2one_reference':
field_data['model_name_ref_fname'] = model._fields[fname].model_field
model_definitions[model_name] = {
'description': model._description,
'fields': fields_data_by_fname,
'inherit': [model_name for model_name in model._inherit_module if model_name in model_names],
'order': model._order,
'parent_name': model._parent_name,
'rec_name': model._rec_name,
}
return model_definitions

View file

@ -6,8 +6,7 @@ from collections import OrderedDict
from werkzeug.urls import url_quote
from markupsafe import Markup
from odoo import api, models
from odoo.tools import pycompat
from odoo import api, models, fields
from odoo.tools import html_escape as escape
@ -36,7 +35,7 @@ class Image(models.AbstractModel):
if max_width or max_height:
max_size = '%sx%s' % (max_width, max_height)
sha = hashlib.sha512(str(getattr(record, '__last_update')).encode('utf-8')).hexdigest()[:7]
sha = hashlib.sha512(str(getattr(record, 'write_date', fields.Datetime.now())).encode('utf-8')).hexdigest()[:7]
max_size = '' if max_size is None else '/%s' % max_size
if options.get('filename-field') and options['filename-field'] in record and record[options['filename-field']]:
@ -106,9 +105,9 @@ class Image(models.AbstractModel):
for name, value in atts.items():
if value:
img.append(' ')
img.append(escape(pycompat.to_text(name)))
img.append(escape(name))
img.append('="')
img.append(escape(pycompat.to_text(value)))
img.append(escape(value))
img.append('"')
img.append('/>')

View file

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import re
from odoo import models
@ -30,12 +32,16 @@ class IrUiMenu(models.Model):
"xmlid": "",
"actionID": False,
"actionModel": False,
"actionPath": False,
"webIcon": None,
"webIconData": None,
"webIconDataMimetype": None,
"backgroundImage": menu.get('backgroundImage'),
}
else:
action = menu['action']
web_icon = menu['web_icon']
web_icon_data = menu['web_icon_data']
if menu['id'] == menu['app_id']:
# if it's an app take action of first (sub)child having one defined
@ -44,8 +50,28 @@ class IrUiMenu(models.Model):
action = child['action']
child = menus[child['children'][0]] if child['children'] else False
webIcon = menu.get('web_icon', '')
webIconlist = webIcon and webIcon.split(',')
iconClass = color = backgroundColor = None
if webIconlist:
if len(webIconlist) >= 2:
iconClass, color = webIconlist[:2]
if len(webIconlist) == 3:
backgroundColor = webIconlist[2]
if menu.get('web_icon_data'):
web_icon_data = re.sub(r'\s/g', "", ('data:%s;base64,%s' % (menu['web_icon_data_mimetype'], menu['web_icon_data'])))
elif backgroundColor is not None: # Could split in three parts?
web_icon = ",".join([iconClass or "", color or "", backgroundColor])
else:
web_icon_data = '/web/static/img/default_icon_app.png'
action_model, action_id = action.split(',') if action else (False, False)
action_id = int(action_id) if action_id else False
if action_model and action_id:
action_path = self.env[action_model].browse(action_id).sudo().path
else:
action_path = False
web_menus[menu['id']] = {
"id": menu['id'],
@ -55,8 +81,10 @@ class IrUiMenu(models.Model):
"xmlid": menu['xmlid'],
"actionID": action_id,
"actionModel": action_model,
"webIcon": menu['web_icon'],
"webIconData": menu['web_icon_data'],
"actionPath": action_path,
"webIcon": web_icon,
"webIconData": web_icon_data,
"webIconDataMimetype": menu['web_icon_data_mimetype'],
}
return web_menus

View file

@ -1,17 +1,20 @@
# -*- coding: utf-8 -*-
import babel.dates
import pytz
from lxml import etree
from __future__ import annotations
import base64
import itertools
import json
from odoo import _, _lt, api, fields, models
from odoo import api, models
from odoo.fields import Command
from odoo.models import BaseModel, NewId
from odoo.osv.expression import AND, TRUE_DOMAIN, normalize_domain
from odoo.tools import date_utils, lazy, OrderedSet
from odoo.tools.misc import get_lang
from odoo.exceptions import UserError
from odoo.tools import unique, OrderedSet
from odoo.exceptions import AccessError, UserError
from collections import defaultdict
from odoo.tools.translate import LazyTranslate
_lt = LazyTranslate(__name__)
SEARCH_PANEL_ERROR_MESSAGE = _lt("Too many items to display.")
def is_true_domain(domain):
@ -33,37 +36,21 @@ DISPLAY_DATE_FORMATS = {
}
class IrActionsActWindowView(models.Model):
_inherit = 'ir.actions.act_window.view'
view_mode = fields.Selection(selection_add=[
('qweb', 'QWeb')
], ondelete={'qweb': 'cascade'})
class Base(models.AbstractModel):
_inherit = 'base'
@api.model
def web_search_read(self, domain=None, fields=None, offset=0, limit=None, order=None, count_limit=None):
"""
Performs a search_read and a search_count.
@api.readonly
def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None):
records = self.search_fetch(domain, specification.keys(), offset=offset, limit=limit, order=order)
values_records = records.web_read(specification)
return self._format_web_search_read_results(domain, values_records, offset, limit, count_limit)
:param domain: search domain
:param fields: list of fields to read
:param limit: maximum number of records to read
:param offset: number of records to skip
:param order: columns to sort results
:return: {
'records': array of read records (result of a call to 'search_read')
'length': number of records matching the domain (result of a call to 'search_count')
}
"""
records = self.search_read(domain, fields, offset=offset, limit=limit, order=order)
def _format_web_search_read_results(self, domain, records, offset=0, limit=None, count_limit=None):
if not records:
return {
'length': 0,
'records': []
'records': [],
}
current_length = len(records) + offset
limit_reached = len(records) == limit
@ -75,15 +62,174 @@ class Base(models.AbstractModel):
length = current_length
return {
'length': length,
'records': records
'records': records,
}
def web_save(self, vals, specification: dict[str, dict], next_id=None) -> list[dict]:
if self:
self.write(vals)
else:
self = self.create(vals)
if next_id:
self = self.browse(next_id)
return self.with_context(bin_size=True).web_read(specification)
@api.readonly
def web_read(self, specification: dict[str, dict]) -> list[dict]:
fields_to_read = list(specification) or ['id']
if fields_to_read == ['id']:
# if we request to read only the ids, we have them already so we can build the return dictionaries immediately
# this also avoid a call to read on the co-model that might have different access rules
values_list = [{'id': id_} for id_ in self._ids]
else:
values_list: list[dict] = self.read(fields_to_read, load=None)
if not values_list:
return values_list
def cleanup(vals: dict) -> dict:
""" Fixup vals['id'] of a new record. """
if not vals['id']:
vals['id'] = vals['id'].origin or False
return vals
for field_name, field_spec in specification.items():
field = self._fields.get(field_name)
if field is None:
continue
if field.type == 'many2one':
if 'fields' not in field_spec:
for values in values_list:
if isinstance(values[field_name], NewId):
values[field_name] = values[field_name].origin
continue
co_records = self[field_name]
if 'context' in field_spec:
co_records = co_records.with_context(**field_spec['context'])
extra_fields = dict(field_spec['fields'])
extra_fields.pop('display_name', None)
many2one_data = {
vals['id']: cleanup(vals)
for vals in co_records.web_read(extra_fields)
}
if 'display_name' in field_spec['fields']:
for rec in co_records.sudo():
many2one_data[rec.id]['display_name'] = rec.display_name
for values in values_list:
if values[field_name] is False:
continue
vals = many2one_data[values[field_name]]
values[field_name] = vals['id'] and vals
elif field.type in ('one2many', 'many2many'):
if not field_spec:
continue
co_records = self[field_name]
if 'order' in field_spec and field_spec['order']:
co_records = co_records.with_context(active_test=False).search(
[('id', 'in', co_records.ids)], order=field_spec['order'],
).with_context(co_records.env.context) # Reapply previous context
order_key = {
co_record.id: index
for index, co_record in enumerate(co_records)
}
for values in values_list:
# filter out inaccessible corecords in case of "cache pollution"
values[field_name] = [id_ for id_ in values[field_name] if id_ in order_key]
values[field_name] = sorted(values[field_name], key=order_key.__getitem__)
if 'context' in field_spec:
co_records = co_records.with_context(**field_spec['context'])
if 'fields' in field_spec:
if field_spec.get('limit') is not None:
limit = field_spec['limit']
ids_to_read = OrderedSet(
id_
for values in values_list
for id_ in values[field_name][:limit]
)
co_records = co_records.browse(ids_to_read)
x2many_data = {
vals['id']: vals
for vals in co_records.web_read(field_spec['fields'])
}
for values in values_list:
values[field_name] = [x2many_data.get(id_) or {'id': id_} for id_ in values[field_name]]
elif field.type in ('reference', 'many2one_reference'):
if not field_spec:
continue
values_by_id = {
vals['id']: vals
for vals in values_list
}
for record in self:
if not record[field_name]:
continue
record_values = values_by_id[record.id]
if field.type == 'reference':
co_record = record[field_name]
else: # field.type == 'many2one_reference'
if not record[field.model_field]:
record_values[field_name] = False
continue
co_record = self.env[record[field.model_field]].browse(record[field_name])
if 'context' in field_spec:
co_record = co_record.with_context(**field_spec['context'])
if 'fields' in field_spec:
try:
reference_read = co_record.web_read(field_spec['fields'])
except AccessError:
reference_read = [{'id': co_record.id, 'display_name': self.env._("You don't have access to this record")}]
if any(fname != 'id' for fname in field_spec['fields']):
# we can infer that if we can read fields for the co-record, it exists
co_record_exists = bool(reference_read)
else:
co_record_exists = co_record.exists()
else:
# If there are no fields to read (field_spec.get('fields') --> None) and we web_read ids, it will
# not actually read the records so we do not know if they exist.
# This ensures the record actually exists
co_record_exists = co_record.exists()
if not co_record_exists:
record_values[field_name] = False
if field.type == 'many2one_reference':
record_values[field.model_field] = False
continue
if 'fields' in field_spec:
record_values[field_name] = reference_read[0]
if field.type == 'reference':
record_values[field_name]['id'] = {
'id': co_record.id,
'model': co_record._name
}
return values_list
@api.model
def web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False,
lazy=True, expand=False, expand_limit=None, expand_orderby=False):
@api.readonly
def web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False, lazy=True):
"""
Returns the result of a read_group (and optionally search for and read records inside each
group), and the total number of groups matching the search domain.
Returns the result of a read_group and the total number of groups matching the search domain.
:param domain: search domain
:param fields: list of fields to read (see ``fields``` param of ``read_group``)
@ -92,29 +238,23 @@ class Base(models.AbstractModel):
:param offset: see ``offset`` param of ``read_group``
:param orderby: see ``orderby`` param of ``read_group``
:param lazy: see ``lazy`` param of ``read_group``
:param expand: if true, and groupby only contains one field, read records inside each group
:param expand_limit: maximum number of records to read in each group
:param expand_orderby: order to apply when reading records in each group
:return: {
'groups': array of read groups
'length': total number of groups
}
"""
groups = self._web_read_group(domain, fields, groupby, limit, offset, orderby, lazy, expand,
expand_limit, expand_orderby)
groups = self._web_read_group(domain, fields, groupby, limit, offset, orderby, lazy)
if not groups:
length = 0
elif limit and len(groups) == limit:
# We need to fetch all groups to know the total number
# this cannot be done all at once to avoid MemoryError
length = limit
chunk_size = 100000
while True:
more = len(self.read_group(domain, ['display_name'], groupby, offset=length, limit=chunk_size, lazy=True))
length += more
if more < chunk_size:
break
annotated_groupby = self._read_group_get_annotated_groupby(groupby, lazy=lazy)
length = limit + len(self._read_group(
domain,
groupby=annotated_groupby.values(),
offset=limit,
))
else:
length = len(groups) + offset
return {
@ -123,26 +263,18 @@ class Base(models.AbstractModel):
}
@api.model
def _web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False,
lazy=True, expand=False, expand_limit=None, expand_orderby=False):
def _web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False, lazy=True):
"""
Performs a read_group and optionally a web_search_read for each group.
See ``web_read_group`` for params description.
:returns: array of groups
"""
groups = self.read_group(domain, fields, groupby, offset=offset, limit=limit,
orderby=orderby, lazy=lazy)
if expand and len(groupby) == 1:
for group in groups:
group['__data'] = self.web_search_read(domain=group['__domain'], fields=fields,
offset=0, limit=expand_limit,
order=expand_orderby)
return groups
@api.model
@api.readonly
def read_progress_bar(self, domain, group_by, progress_bar):
"""
Gets the data needed for all the kanban column progressbars.
@ -156,20 +288,13 @@ class Base(models.AbstractModel):
:return a dictionnary mapping group_by values to dictionnaries mapping
progress bar field values to the related number of records
"""
group_by_fname = group_by.partition(':')[0]
field_type = self._fields[group_by_fname].type
if field_type == 'selection':
selection_labels = dict(self.fields_get()[group_by]['selection'])
def adapt(value):
if field_type == 'selection':
value = selection_labels.get(value, False)
if isinstance(value, tuple):
value = value[1] # FIXME should use technical value (0)
value = value[0]
return value
result = {}
for group in self._read_progress_bar(domain, group_by, progress_bar):
for group in self.read_group(domain, ['__count'], [group_by, progress_bar['field']], lazy=False):
group_by_value = str(adapt(group[group_by]))
field_value = group[progress_bar['field']]
if group_by_value not in result:
@ -178,79 +303,6 @@ class Base(models.AbstractModel):
result[group_by_value][field_value] += group['__count']
return result
def _read_progress_bar(self, domain, group_by, progress_bar):
""" Implementation of read_progress_bar() that returns results in the
format of read_group().
"""
try:
fname = progress_bar['field']
return self.read_group(domain, [fname], [group_by, fname], lazy=False)
except UserError:
# possibly failed because of grouping on or aggregating non-stored
# field; fallback on alternative implementation
pass
# Workaround to match read_group's infrastructure
# TO DO in master: harmonize this function and readgroup to allow factorization
group_by_name = group_by.partition(':')[0]
group_by_modifier = group_by.partition(':')[2] or 'month'
records_values = self.search_read(domain or [], [progress_bar['field'], group_by_name])
field_type = self._fields[group_by_name].type
for record_values in records_values:
group_by_value = record_values.pop(group_by_name)
# Again, imitating what _read_group_format_result and _read_group_prepare_data do
if group_by_value and field_type in ['date', 'datetime']:
locale = get_lang(self.env).code
group_by_value = date_utils.start_of(fields.Datetime.to_datetime(group_by_value), group_by_modifier)
group_by_value = pytz.timezone('UTC').localize(group_by_value)
tz_info = None
if field_type == 'datetime' and self._context.get('tz') in pytz.all_timezones:
tz_info = self._context.get('tz')
group_by_value = babel.dates.format_datetime(
group_by_value, format=DISPLAY_DATE_FORMATS[group_by_modifier],
tzinfo=tz_info, locale=locale)
else:
group_by_value = babel.dates.format_date(
group_by_value, format=DISPLAY_DATE_FORMATS[group_by_modifier],
locale=locale)
if field_type == 'many2many' and isinstance(group_by_value, list):
group_by_value = str(tuple(group_by_value)) or False
record_values[group_by] = group_by_value
record_values['__count'] = 1
return records_values
##### qweb view hooks #####
@api.model
def qweb_render_view(self, view_id, domain):
assert view_id
return self.env['ir.qweb']._render(
view_id,
{
'model': self,
'domain': domain,
# not necessarily necessary as env is already part of the
# non-minimal qcontext
'context': self.env.context,
'records': lazy(self.search, domain),
})
@api.model
def _get_view(self, view_id=None, view_type='form', **options):
arch, view = super()._get_view(view_id, view_type, **options)
# avoid leaking the raw (un-rendered) template, also avoids bloating
# the response payload for no reason. Only send the root node,
# to send attributes such as `js_class`.
if view_type == 'qweb':
root = arch
arch = etree.Element('qweb', root.attrib)
return arch, view
@api.model
def _search_panel_field_image(self, field_name, **kwargs):
"""
@ -502,7 +554,7 @@ class Base(models.AbstractModel):
supported_types = ['many2one', 'selection']
if field.type not in supported_types:
types = dict(self.env["ir.model.fields"]._fields["ttype"]._description_selection(self.env))
raise UserError(_(
raise UserError(self.env._(
'Only types %(supported_types)s are supported for category (found type %(field_type)s)',
supported_types=", ".join(types[t] for t in supported_types),
field_type=types[field.type],
@ -631,8 +683,9 @@ class Base(models.AbstractModel):
field = self._fields[field_name]
supported_types = ['many2one', 'many2many', 'selection']
if field.type not in supported_types:
raise UserError(_('Only types %(supported_types)s are supported for filter (found type %(field_type)s)',
supported_types=supported_types, field_type=field.type))
raise UserError(self.env._(
'Only types %(supported_types)s are supported for filter (found type %(field_type)s)',
supported_types=supported_types, field_type=field.type))
model_domain = kwargs.get('search_domain', [])
extra_domain = AND([
@ -658,19 +711,19 @@ class Base(models.AbstractModel):
if group_by_field.type == 'many2one':
def group_id_name(value):
return value or (False, _("Not Set"))
return value or (False, self.env._("Not Set"))
elif group_by_field.type == 'selection':
desc = Comodel.fields_get([group_by])[group_by]
group_by_selection = dict(desc['selection'])
group_by_selection[False] = _("Not Set")
group_by_selection[False] = self.env._("Not Set")
def group_id_name(value):
return value, group_by_selection[value]
else:
def group_id_name(value):
return (value, value) if value else (False, _("Not Set"))
return (value, value) if value else (False, self.env._("Not Set"))
comodel_domain = kwargs.get('comodel_domain', [])
enable_counters = kwargs.get('enable_counters')
@ -678,16 +731,8 @@ class Base(models.AbstractModel):
if field.type == 'many2many':
if not expand:
if field.base_field.groupable:
domain_image = self._search_panel_domain_image(field_name, model_domain, limit=limit)
image_element_ids = list(domain_image.keys())
else:
model_records = self.search_read(model_domain, [field_name])
image_element_ids = OrderedSet()
for rec in model_records:
if rec[field_name]:
image_element_ids.update(rec[field_name])
image_element_ids = list(image_element_ids)
domain_image = self._search_panel_domain_image(field_name, model_domain, limit=limit)
image_element_ids = list(domain_image.keys())
comodel_domain = AND([
comodel_domain,
[('id', 'in', image_element_ids)],
@ -779,6 +824,236 @@ class Base(models.AbstractModel):
return { 'values': field_range, }
def onchange(self, values: dict, field_names: list[str], fields_spec: dict):
"""
Perform an onchange on the given fields, and return the result.
:param values: dictionary mapping field names to values on the form view,
giving the current state of modification
:param field_names: names of the modified fields
:param fields_spec: dictionary specifying the fields in the view,
just like the one used by :meth:`web_read`; it is used to format
the resulting values
When creating a record from scratch, the client should call this with an
empty list as ``field_names``. In that case, the method first adds
default values to ``values``, computes the remaining fields, applies
onchange methods to them, and return all the fields in ``fields_spec``.
The result is a dictionary with two optional keys. The key ``"value"``
is used to return field values that should be modified on the caller.
The corresponding value is a dict mapping field names to their value,
in the format of :meth:`web_read`, except for x2many fields, where the
value is a list of commands to be applied on the caller's field value.
The key ``"warning"`` provides a warning message to the caller. The
corresponding value is a dictionary like::
{
"title": "Be careful!", # subject of message
"message": "Blah blah blah.", # full warning message
"type": "dialog", # how to display the warning
}
"""
# this is for tests using `Form`
self.env.flush_all()
env = self.env
cache = env.cache
first_call = not field_names
if any(fname not in self._fields for fname in field_names):
return {}
if first_call:
field_names = [fname for fname in values if fname != 'id']
missing_names = [fname for fname in fields_spec if fname not in values]
defaults = self.default_get(missing_names)
for field_name in missing_names:
values[field_name] = defaults.get(field_name, False)
if field_name in defaults:
field_names.append(field_name)
# prefetch x2many lines: this speeds up the initial snapshot by avoiding
# computing fields on new records as much as possible, as that can be
# costly and is not necessary at all
self.fetch(fields_spec.keys())
for field_name, field_spec in fields_spec.items():
field = self._fields[field_name]
if field.type not in ('one2many', 'many2many'):
continue
sub_fields_spec = field_spec.get('fields') or {}
if sub_fields_spec and values.get(field_name):
# retrieve all line ids in commands
line_ids = OrderedSet(self[field_name].ids)
for cmd in values[field_name]:
if cmd[0] in (Command.UPDATE, Command.LINK):
line_ids.add(cmd[1])
elif cmd[0] == Command.SET:
line_ids.update(cmd[2])
# prefetch stored fields on lines
lines = self[field_name].browse(line_ids)
lines.fetch(sub_fields_spec.keys())
# copy the cache of lines to their corresponding new records;
# this avoids computing computed stored fields on new_lines
new_lines = lines.browse(map(NewId, line_ids))
for field_name in sub_fields_spec:
field = lines._fields[field_name]
line_values = [
field.convert_to_cache(line[field_name], new_line, validate=False)
for new_line, line in zip(new_lines, lines)
]
cache.update(new_lines, field, line_values)
# Isolate changed values, to handle inconsistent data sent from the
# client side: when a form view contains two one2many fields that
# overlap, the lines that appear in both fields may be sent with
# different data. Consider, for instance:
#
# foo_ids: [line with value=1, ...]
# bar_ids: [line with value=1, ...]
#
# If value=2 is set on 'line' in 'bar_ids', the client sends
#
# foo_ids: [line with value=1, ...]
# bar_ids: [line with value=2, ...]
#
# The idea is to put 'foo_ids' in cache first, so that the snapshot
# contains value=1 for line in 'foo_ids'. The snapshot is then updated
# with the value of `bar_ids`, which will contain value=2 on line.
#
# The issue also occurs with other fields. For instance, an onchange on
# a move line has a value for the field 'move_id' that contains the
# values of the move, among which the one2many that contains the line
# itself, with old values!
#
initial_values = dict(values)
changed_values = {fname: initial_values.pop(fname) for fname in field_names}
# do not force delegate fields to False
for parent_name in self._inherits.values():
if not initial_values.get(parent_name, True):
initial_values.pop(parent_name)
# create a new record with initial values
if self:
# fill in the cache of record with the values of self
cache_values = {fname: self[fname] for fname in fields_spec}
record = self.new(cache_values, origin=self)
# apply initial values on top of the values of self
record._update_cache(initial_values)
else:
# set changed values to null in initial_values; not setting them
# triggers default_get() on the new record when creating snapshot0
initial_values.update(dict.fromkeys(field_names, False))
record = self.new(initial_values, origin=self)
# make parent records match with the form values; this ensures that
# computed fields on parent records have all their dependencies at
# their expected value
for field_name in initial_values:
field = self._fields.get(field_name)
if field and field.inherited:
parent_name, field_name = field.related.split('.', 1)
if parent := record[parent_name]:
parent._update_cache({field_name: record[field_name]})
# make a snapshot based on the initial values of record
snapshot0 = RecordSnapshot(record, fields_spec, fetch=(not first_call))
# store changed values in cache; also trigger recomputations based on
# subfields (e.g., line.a has been modified, line.b is computed stored
# and depends on line.a, but line.b is not in the form view)
record._update_cache(changed_values)
# update snapshot0 with changed values
for field_name in field_names:
snapshot0.fetch(field_name)
# Determine which field(s) should be triggered an onchange. On the first
# call, 'names' only contains fields with a default. If 'self' is a new
# line in a one2many field, 'names' also contains the one2many's inverse
# field, and that field may not be in nametree.
todo = list(unique(itertools.chain(field_names, fields_spec))) if first_call else list(field_names)
done = set()
# mark fields to do as modified to trigger recomputations
protected = [
field
for mod_field in [self._fields[fname] for fname in field_names]
for field in self.pool.field_computed.get(mod_field) or [mod_field]
]
with self.env.protecting(protected, record):
record.modified(todo)
for field_name in todo:
field = self._fields[field_name]
if field.inherited:
# modifying an inherited field should modify the parent
# record accordingly; because we don't actually assign the
# modified field on the record, the modification on the
# parent record has to be done explicitly
parent = record[field.related.split('.')[0]]
parent[field_name] = record[field_name]
result = {'warnings': OrderedSet()}
# process names in order
while todo:
# apply field-specific onchange methods
for field_name in todo:
record._apply_onchange_methods(field_name, result)
done.add(field_name)
if not env.context.get('recursive_onchanges', True):
break
# determine which fields to process for the next pass
todo = [
field_name
for field_name in fields_spec
if field_name not in done and snapshot0.has_changed(field_name)
]
# make the snapshot with the final values of record
snapshot1 = RecordSnapshot(record, fields_spec)
# determine values that have changed by comparing snapshots
result['value'] = snapshot1.diff(snapshot0, force=first_call)
# format warnings
warnings = result.pop('warnings')
if len(warnings) == 1:
title, message, type_ = warnings.pop()
if not type_:
type_ = 'dialog'
result['warning'] = dict(title=title, message=message, type=type_)
elif len(warnings) > 1:
# concatenate warning titles and messages
title = self.env._("Warnings")
message = '\n\n'.join([warn_title + '\n\n' + warn_message for warn_title, warn_message, warn_type in warnings])
result['warning'] = dict(title=title, message=message, type='dialog')
return result
def web_override_translations(self, values):
"""
This method is used to override all the modal translations of the given fields
with the provided value for each field.
:param values: dictionary of the translations to apply for each field name
ex: { "field_name": "new_value" }
"""
self.ensure_one()
for field_name in values:
field = self._fields[field_name]
if field.translate is True:
translations = {lang: False for lang, _ in self.env['res.lang'].get_installed()}
translations['en_US'] = values[field_name]
translations[self.env.lang or 'en_US'] = values[field_name]
self.update_field_translations(field_name, translations)
class ResCompany(models.Model):
_inherit = 'res.company'
@ -815,3 +1090,114 @@ class ResCompany(models.Model):
b64_val = self._get_asset_style_b64()
if b64_val != asset_attachment.datas:
asset_attachment.write({'datas': b64_val})
class RecordSnapshot(dict):
""" A dict with the values of a record, following a prefix tree. """
__slots__ = ['record', 'fields_spec']
def __init__(self, record: BaseModel, fields_spec: dict, fetch=True):
# put record in dict to include it when comparing snapshots
super().__init__()
self.record = record
self.fields_spec = fields_spec
if fetch:
for name in fields_spec:
self.fetch(name)
def __eq__(self, other: 'RecordSnapshot'):
return self.record == other.record and super().__eq__(other)
def fetch(self, field_name):
""" Set the value of field ``name`` from the record's value. """
if self.record._fields[field_name].type in ('one2many', 'many2many'):
# x2many fields are serialized as a dict of line snapshots
lines = self.record[field_name]
if 'context' in self.fields_spec[field_name]:
lines = lines.with_context(**self.fields_spec[field_name]['context'])
sub_fields_spec = self.fields_spec[field_name].get('fields') or {}
self[field_name] = {line.id: RecordSnapshot(line, sub_fields_spec) for line in lines}
else:
self[field_name] = self.record[field_name]
def has_changed(self, field_name) -> bool:
""" Return whether a field on the record has changed. """
if field_name not in self:
return True
if self.record._fields[field_name].type not in ('one2many', 'many2many'):
return self[field_name] != self.record[field_name]
return self[field_name].keys() != set(self.record[field_name]._ids) or any(
line_snapshot.has_changed(subname)
for line_snapshot in self[field_name].values()
for subname in self.fields_spec[field_name].get('fields') or {}
)
def diff(self, other: 'RecordSnapshot', force=False):
""" Return the values in ``self`` that differ from ``other``. """
# determine fields to return
simple_fields_spec = {}
x2many_fields_spec = {}
for field_name, field_spec in self.fields_spec.items():
if field_name == 'id':
continue
if not force and other.get(field_name) == self[field_name]:
continue
field = self.record._fields[field_name]
if field.type in ('one2many', 'many2many'):
x2many_fields_spec[field_name] = field_spec
else:
simple_fields_spec[field_name] = field_spec
# use web_read() for simple fields
[result] = self.record.web_read(simple_fields_spec)
# discard the NewId from the dict
result.pop('id')
# for x2many fields: serialize value as commands
for field_name, field_spec in x2many_fields_spec.items():
commands = []
self_value = self[field_name]
other_value = {} if force else other.get(field_name) or {}
if any(other_value):
# other may be a snapshot for a real record, adapt its x2many ids
other_value = {NewId(id_): snap for id_, snap in other_value.items()}
# commands for removed lines
field = self.record._fields[field_name]
remove = Command.delete if field.type == 'one2many' else Command.unlink
for id_ in other_value:
if id_ not in self_value:
commands.append(remove(id_.origin or id_.ref or 0))
# commands for modified or extra lines
for id_, line_snapshot in self_value.items():
if not force and id_ in other_value:
# existing line: check diff and send update
line_diff = line_snapshot.diff(other_value[id_])
if line_diff:
commands.append(Command.update(id_.origin or id_.ref or 0, line_diff))
elif not id_.origin:
# new line: send diff from scratch
line_diff = line_snapshot.diff({})
commands.append((Command.CREATE, id_.origin or id_.ref or 0, line_diff))
else:
# link line: send data to client
base_line = line_snapshot.record._origin
[base_data] = base_line.web_read(field_spec.get('fields') or {})
commands.append((Command.LINK, base_line.id, base_data))
# check diff and send update
base_snapshot = RecordSnapshot(base_line, field_spec.get('fields') or {})
line_diff = line_snapshot.diff(base_snapshot)
if line_diff:
commands.append(Command.update(id_.origin, line_diff))
if commands:
result[field_name] = commands
return result