# Copyright 2023 ACSONE SA/NV # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). # pylint: disable=method-required-super import base64 import itertools import mimetypes import os.path from io import BytesIO, IOBase from odoo import fields from odoo.tools.mimetypes import guess_mimetype from odoo.addons.fs_attachment.models.ir_attachment import IrAttachment class FSFileValue: def __init__( self, attachment: IrAttachment = None, name: str = None, value: bytes | IOBase = None, ) -> None: """ This class holds the information related to FSFile field. It can be used to assign a value to a FSFile field. In such a case, you can pass the name and the file content as parameters. When :param attachment: the attachment to use to store the file. :param name: the name of the file. If not provided, the name will be taken from the attachment or the io.IOBase. :param value: the content of the file. It can be bytes or an io.IOBase. """ self._is_new: bool = attachment is None self._buffer: IOBase = None self._attachment: IrAttachment = attachment if name and attachment: raise ValueError("Cannot set name and attachment at the same time") if value: if isinstance(value, IOBase): self._buffer = value if not hasattr(value, "name"): if name: self._buffer.name = name else: raise ValueError( "name must be set when value is an io.IOBase " "and is not provided by the io.IOBase" ) elif isinstance(value, bytes): self._buffer = BytesIO(value) if not name: raise ValueError("name must be set when value is bytes") self._buffer.name = name else: raise ValueError("value must be bytes or io.BytesIO") elif name: self._buffer = BytesIO(b"") self._buffer.name = name @property def write_buffer(self) -> BytesIO: if self._buffer is None: name = self._attachment.name if self._attachment else None self._buffer = BytesIO() self._buffer.name = name return self._buffer @property def name(self) -> str | None: name = ( self._attachment.name if self._attachment else self._buffer.name if self._buffer else None ) if name: return os.path.basename(name) return None @name.setter def name(self, value: str) -> None: # the name should only be updatable while the file is not yet stored # TODO, we could also allow to update the name of the file and rename # the file in the external file system if self._is_new: self.write_buffer.name = value else: raise ValueError( "The name of the file can only be updated while the file is not " "yet stored" ) @property def is_new(self) -> bool: return self._is_new @property def mimetype(self) -> str | None: """Return the mimetype of the file. If an attachment is set, the mimetype is taken from the attachment. If no attachment is set, the mimetype is guessed from the name of the file. If no name is set or if the mimetype cannot be guessed from the name, the mimetype is guessed from the content of the file. """ mimetype = None if self._attachment: mimetype = self._attachment.mimetype elif self.name: mimetype = mimetypes.guess_type(self.name)[0] # at last, try to guess the mimetype from the content return mimetype or guess_mimetype(self.getvalue()) @property def size(self) -> int: if self._attachment: return self._attachment.file_size # check if the object supports len try: return len(self._buffer) except TypeError: # pylint: disable=except-pass # the object does not support len pass # if we are on a BytesIO, we can get the size from the buffer if isinstance(self._buffer, BytesIO): return self._buffer.getbuffer().nbytes # we cannot get the size return 0 @property def url(self) -> str | None: return self._attachment.fs_url or None if self._attachment else None @property def internal_url(self) -> str | None: return self._attachment.internal_url or None if self._attachment else None @property def url_path(self) -> str | None: return self._attachment.fs_url_path or None if self._attachment else None @property def attachment(self) -> IrAttachment | None: return self._attachment @attachment.setter def attachment(self, value: IrAttachment) -> None: self._attachment = value self._buffer = None @property def extension(self) -> str | None: # get extension from mimetype ext = os.path.splitext(self.name)[1] if not ext: ext = mimetypes.guess_extension(self.mimetype) ext = ext and ext[1:] return ext @property def read_buffer(self) -> BytesIO: if self._buffer is None: content = b"" name = None if self._attachment: content = self._attachment.raw or b"" name = self._attachment.name self._buffer = BytesIO(content) self._buffer.name = name return self._buffer def getvalue(self) -> bytes: buffer = self.read_buffer current_pos = buffer.tell() buffer.seek(0) value = buffer.read() buffer.seek(current_pos) return value def open( self, mode="rb", block_size=None, cache_options=None, compression=None, new_version=True, **kwargs ) -> IOBase: """ Return a file-like object that can be used to read and write the file content. See the documentation of open() into the ir.attachment model from the fs_attachment module for more information. """ if not self._attachment: raise ValueError("Cannot open a file that is not stored") return self._attachment.open( mode=mode, block_size=block_size, cache_options=cache_options, compression=compression, new_version=new_version, **kwargs, ) class FSFile(fields.Binary): """ This field is a binary field that stores the file content in an external filesystem storage referenced by a storage code. A major difference with the standard Odoo binary field is that the value is not encoded in base64 but is a bytes object. Moreover, the field is designed to always return an instance of :class:`FSFileValue` when reading the value. This class is a file-like object that can be used to read the file content and to get information about the file (filename, mimetype, url, ...). To update the value of the field, the following values are accepted: - a bytes object (e.g. ``b"..."``) - a dict with the following keys: - ``filename``: the filename of the file - ``content``: the content of the file encoded in base64 - a FSFileValue instance - a file-like object (e.g. an instance of :class:`io.BytesIO`) When the value is provided is a bytes object the filename is set to the name of the field. You can override this behavior by providing specifying a fs_filename key in the context. For example: .. code-block:: python record.with_context(fs_filename='my_file.txt').write({ 'field': b'...', }) The same applies when the value is provided as a file-like object but the filename is set to the name of the file-like object or not a property of the file-like object. (e.g. ``io.BytesIO(b'...')``). When the value is converted to the read format, it's always an instance of dict with the following keys: - ``filename``: the filename of the file - ``mimetype``: the mimetype of the file - ``size``: the size of the file - ``url``: the url to access the file """ type = "fs_file" attachment: bool = True def __init__(self, *args, **kwargs): kwargs["attachment"] = True super().__init__(*args, **kwargs) def read(self, records): domain = [ ("res_model", "=", records._name), ("res_field", "=", self.name), ("res_id", "in", records.ids), ] data = { att.res_id: self._convert_attachment_to_cache(att) for att in records.env["ir.attachment"].sudo().search(domain) } records.env.cache.insert_missing(records, self, map(data.get, records._ids)) def create(self, record_values): if not record_values: return env = record_values[0][0].env with env.norecompute(): for record, value in record_values: if value: cache_value = self.convert_to_cache(value, record) attachment = self._create_attachment(record, cache_value) cache_value = self._convert_attachment_to_cache(attachment) record.env.cache.update( record, self, [cache_value], dirty=False, ) def _create_attachment(self, record, cache_value: FSFileValue): ir_attachment = ( record.env["ir.attachment"] .sudo() .with_context( binary_field_real_user=record.env.user, ) ) create_value = self._prepare_attachment_create_values(record, cache_value) return ir_attachment.create(create_value) def _prepare_attachment_create_values(self, record, cache_value: FSFileValue): return { "name": cache_value.name, "raw": cache_value.getvalue(), "res_model": record._name, "res_field": self.name, "res_id": record.id, "type": "binary", } def write(self, records, value): # the code is copied from the standard Odoo Binary field # with the following changes: # - the value is not encoded in base64 and we therefore write on # ir.attachment.raw instead of ir.attachment.datas # discard recomputation of self on records records.env.remove_to_compute(self, records) # update the cache, and discard the records that are not modified cache = records.env.cache cache_value = self.convert_to_cache(value, records) records = cache.get_records_different_from(records, self, cache_value) if not records: return records if self.store: # determine records that are known to be not null not_null = cache.get_records_different_from(records, self, None) if self.store: # Be sure to invalidate the cache for the modified records since # the value of the field has changed and the new value will be linked # to the attachment record used to store the file in the storage. cache.remove(records, self) else: # if the field is not stored and a value is set, we need to # set the value in the cache since the value (the case for computed # fields) cache.update(records, self, itertools.repeat(cache_value)) # retrieve the attachments that store the values, and adapt them if self.store and any(records._ids): real_records = records.filtered("id") atts = ( records.env["ir.attachment"] .sudo() .with_context( binary_field_real_user=records.env.user, ) ) if not_null: atts = atts.search( [ ("res_model", "=", self.model_name), ("res_field", "=", self.name), ("res_id", "in", real_records.ids), ] ) if value: filename = cache_value.name content = cache_value.getvalue() # update the existing attachments atts.write({"raw": content, "name": filename}) atts_records = records.browse(atts.mapped("res_id")) # set new value in the cache since we have the reference to the # attachment record and a new access to the field will nomore # require to load the attachment record for record in atts_records: new_cache_value = self._convert_attachment_to_cache( atts.filtered(lambda att: att.res_id == record.id) ) cache.update(record, self, [new_cache_value], dirty=False) # create the missing attachments missing = real_records - atts_records if missing: created = atts.browse() for record in missing: created |= self._create_attachment(record, cache_value) for att in created: record = records.browse(att.res_id) new_cache_value = self._convert_attachment_to_cache(att) record.env.cache.update( record, self, [new_cache_value], dirty=False ) else: atts.unlink() return records def _convert_attachment_to_cache(self, attachment: IrAttachment) -> FSFileValue: return FSFileValue(attachment=attachment) def _get_filename(self, record): return record.env.context.get("fs_filename", self.name) def convert_to_cache(self, value, record, validate=True): if value is None or value is False: return None if isinstance(value, FSFileValue): return value if isinstance(value, dict): if "content" not in value and value.get("url"): # we come from an onchange # The id is the third element of the url att_id = value["url"].split("/")[3] attachment = record.env["ir.attachment"].sudo().browse(int(att_id)) return self._convert_attachment_to_cache(attachment) return FSFileValue( name=value["filename"], value=base64.b64decode(value["content"]) ) if isinstance(value, IOBase): name = getattr(value, "name", None) if name is None: name = self._get_filename(record) return FSFileValue(name=name, value=value) if isinstance(value, bytes): return FSFileValue( name=self._get_filename(record), value=base64.b64decode(value) ) raise ValueError( "Invalid value for %s: %r\n" "Should be base64 encoded bytes or a file-like object" % (self, value) ) def convert_to_write(self, value, record): return self.convert_to_cache(value, record) def convert_to_read(self, value, record, use_name_get=True): if value is None or value is False: return None if isinstance(value, FSFileValue): res = { "filename": value.name, "size": value.size, "mimetype": value.mimetype, } if value.attachment: res["url"] = value.internal_url else: res["content"] = base64.b64encode(value.getvalue()).decode("ascii") return res raise ValueError( "Invalid value for %s: %r\n" "Should be base64 encoded bytes or a file-like object" % (self, value) )