oca-technical/odoo-bringout-oca-rest-framework-model_serializer/model_serializer/core.py
2025-08-29 15:43:03 +02:00

226 lines
8.7 KiB
Python

from odoo import _, models
from odoo.exceptions import ValidationError
from odoo.addons.datamodel.core import Datamodel, MetaDatamodel
from .field_converter import convert_field
class class_or_instancemethod(classmethod): # pylint: disable=class-camelcase
def __get__(self, instance, type_):
descr_get = super().__get__ if instance is None else self.__func__.__get__
return descr_get(instance, type_)
class MetaModelSerializer(MetaDatamodel):
def __init__(self, name, bases, attrs):
super(MetaModelSerializer, self).__init__(name, bases, attrs)
class ModelSerializer(Datamodel, metaclass=MetaModelSerializer):
_inherit = "base"
_register = False
_model = None
_model_fields = []
def dump(self, many=None):
with self.__dump_mode_on__():
dump = self.__schema__.dump(self, many=many)
return dump
@classmethod
def _check_nested_class(cls, marshmallow_field, registry):
"""If `marshmallow_field` is a nested datamodel (relational field), we check
if the nested datamodel class exists
"""
nested_name = getattr(marshmallow_field, "datamodel_name", None)
if nested_name and nested_name not in registry:
raise ValidationError(
_("'{}' datamodel does not exist").format(nested_name)
)
@classmethod
def _extend_from_odoo_model(cls, registry, env):
"""Extend the datamodel to contain the declared Odoo model fields"""
attrs = {
"_inherit": cls._name,
"_model_fields": getattr(cls, "_model_fields", []),
"_model": getattr(cls, "_model", None),
}
bases = (ModelSerializer,)
name = cls.__name__ + "Child"
parent_class = registry[cls._name]
has_model_fields = bool(attrs["_model_fields"])
if getattr(parent_class, "_model_fields", None):
has_model_fields = True
if getattr(parent_class, "_model", None):
if attrs["_model"] and attrs["_model"] != parent_class._model:
raise ValidationError(
_(
"Error in {}: Model Serializers cannot inherit "
"from a class having a different '_model' attribute"
).format(cls.__name__)
)
attrs["_model"] = parent_class._model
if not (attrs["_model"] and has_model_fields):
raise ValidationError(
_(
"Error in {}: Model Serializers require '_model' and "
"'_model_fields' attributes to be defined"
).format(cls.__name__)
)
odoo_model = env[attrs["_model"]]
for field_name in cls._model_fields:
if not hasattr(cls, field_name):
odoo_field = odoo_model._fields[field_name]
marshmallow_field = convert_field(odoo_field)
if marshmallow_field:
attrs[field_name] = marshmallow_field
else:
marshmallow_field = cls.__schema_class__._declared_fields[field_name]
cls._check_nested_class(marshmallow_field, registry)
return MetaDatamodel(name, bases, attrs)
@property
def _model_name(self):
if self.context.get("odoo_model"):
return self.context["odoo_model"]
return self._model
@_model_name.setter
def _model_name(self, value):
if not self.context:
self.context = {}
self.context["odoo_model"] = value
@classmethod
def from_recordset(cls, recordset, *, many=False):
"""Transform a recordset into a (list of) datamodel(s)"""
def convert_null_value(val):
if val:
return val
if val is False or isinstance(val, models.BaseModel):
return None
return val
res = []
datamodels = recordset.env.datamodels
recordset = recordset if many else recordset[:1]
for record in recordset:
instance = cls(partial=True, context={"odoo_model": record._name})
for model_field in cls._model_fields:
schema_field = instance.__schema__.fields[model_field]
nested_datamodel_name = getattr(schema_field, "datamodel_name", None)
if nested_datamodel_name:
nested_datamodel_class = datamodels[nested_datamodel_name]
if hasattr(nested_datamodel_class, "from_recordset"):
setattr(
instance,
model_field,
nested_datamodel_class.from_recordset(
record[model_field], many=schema_field.many
),
)
else:
value = convert_null_value(record[model_field])
setattr(instance, model_field, value)
res.append(instance)
if not many:
return res[0] if res else None
return res
def get_odoo_record(self):
"""Get an existing record matching `self`. Meant to be overridden
TODO: optimize this to deal with multiple instances at once
"""
odoo_model = self.env[self._model_name]
if "id" in self._model_fields and getattr(self, "id", None):
return odoo_model.browse(self.id)
return odoo_model.browse([])
def _new_odoo_record(self):
odoo_model = self.env[self._model_name]
default_values = odoo_model.default_get(odoo_model._fields.keys())
return odoo_model.new(default_values)
def _process_model_value(self, value, model_field):
if hasattr(self, "validate_{}".format(model_field)):
return getattr(self, "validate_{}".format(model_field))(value)
return value
def _get_partial_fields(self):
"""Return the list of fields actually used to instantiate `self`"""
res = []
received_keys = set(self.__schema__._declared_fields) - set(
self.__missing_fields__
)
actual_field_names = {
field.data_key: name
for name, field in self.__schema__._declared_fields.items()
if field.data_key
}
for received_key in received_keys:
res.append(actual_field_names.get(received_key) or received_key)
return res
def convert_to_values(self, model=None):
"""Transform `self` into a dictionary to create or write an odoo record"""
def convert_related_values(dics):
res = [(6, 0, [])]
for dic in dics:
rec_id = dic.pop("id", None)
if rec_id:
res[0][2].append(rec_id)
if dic:
res.append((1, rec_id, dic))
else:
res.append((4, rec_id))
else:
res.append((0, 0, dic))
return res
model_name = model or self._model
self._model_name = model_name
record = self.get_odoo_record()
values = {"id": record.id} if record else {}
# in case of partial, not all fields are considered
received_fields = self._get_partial_fields()
model_fields = set(received_fields) & set(self._model_fields)
for model_field in model_fields:
schema_field = self.__schema__.fields[model_field]
if schema_field.dump_only:
continue
value = getattr(self, model_field)
nested_datamodel_name = getattr(schema_field, "datamodel_name", None)
nested_datamodel = (
self.env.datamodels[nested_datamodel_name]
if nested_datamodel_name
else None
)
if nested_datamodel and issubclass(nested_datamodel, ModelSerializer):
odoo_field = record._fields[model_field]
if odoo_field.type == "many2one":
value._model_name = odoo_field.comodel_name
value = value.to_recordset().id
else:
nested_values = [
instance.convert_to_values(model=odoo_field.comodel_name)
for instance in value
]
value = convert_related_values(nested_values)
values[model_field] = self._process_model_value(value, model_field)
return values
def to_recordset(self):
"""Transform `self` into a corresponding recordset"""
record = self.get_odoo_record()
values = self.convert_to_values(model=self._model_name)
if record:
record.write(values)
return record
else:
return self.env[self._model_name].create(values)