mirror of
https://github.com/bringout/oca-ocb-report.git
synced 2026-04-23 16:22:08 +02:00
19.0 vanilla
This commit is contained in:
parent
62d197ac8b
commit
184bb0e321
667 changed files with 691406 additions and 239886 deletions
|
|
@ -0,0 +1,175 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
import io
|
||||
import zipfile
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from odoo import api, fields, models, _, tools
|
||||
from odoo.exceptions import ValidationError, MissingError
|
||||
|
||||
from odoo.addons.spreadsheet.utils.validate_data import fields_in_spreadsheet, menus_xml_ids_in_spreadsheet
|
||||
|
||||
|
||||
class SpreadsheetMixin(models.AbstractModel):
|
||||
_name = 'spreadsheet.mixin'
|
||||
_description = "Spreadsheet mixin"
|
||||
_auto = False
|
||||
|
||||
spreadsheet_binary_data = fields.Binary(
|
||||
string="Spreadsheet file",
|
||||
default=lambda self: self._empty_spreadsheet_data_base64(),
|
||||
)
|
||||
spreadsheet_data = fields.Text(compute='_compute_spreadsheet_data', inverse='_inverse_spreadsheet_data')
|
||||
spreadsheet_file_name = fields.Char(compute='_compute_spreadsheet_file_name')
|
||||
thumbnail = fields.Binary()
|
||||
|
||||
@api.constrains("spreadsheet_binary_data")
|
||||
def _check_spreadsheet_data(self):
|
||||
for spreadsheet in self.filtered("spreadsheet_binary_data"):
|
||||
try:
|
||||
data = json.loads(base64.b64decode(spreadsheet.spreadsheet_binary_data).decode())
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
raise ValidationError(_("Uh-oh! Looks like the spreadsheet file contains invalid data."))
|
||||
if not (tools.config['test_enable'] or tools.config['test_file']):
|
||||
continue
|
||||
if data.get("[Content_Types].xml"):
|
||||
# this is a xlsx file
|
||||
continue
|
||||
display_name = spreadsheet.display_name
|
||||
errors = []
|
||||
for model, field_chains in fields_in_spreadsheet(data).items():
|
||||
if model not in self.env:
|
||||
errors.append(f"- model '{model}' used in '{display_name}' does not exist")
|
||||
continue
|
||||
for field_chain in field_chains:
|
||||
field_model = model
|
||||
for fname in field_chain.split("."): # field chain 'product_id.channel_ids'
|
||||
if fname not in self.env[field_model]._fields:
|
||||
errors.append(f"- field '{fname}' used in spreadsheet '{display_name}' does not exist on model '{field_model}'")
|
||||
continue
|
||||
field = self.env[field_model]._fields[fname]
|
||||
if field.relational:
|
||||
field_model = field.comodel_name
|
||||
|
||||
for xml_id in menus_xml_ids_in_spreadsheet(data):
|
||||
record = self.env.ref(xml_id, raise_if_not_found=False)
|
||||
if not record:
|
||||
errors.append(f"- xml id '{xml_id}' used in spreadsheet '{display_name}' does not exist")
|
||||
continue
|
||||
# check that the menu has an action. Root menus always have an action.
|
||||
if not record.action and record.parent_id.id:
|
||||
errors.append(f"- menu with xml id '{xml_id}' used in spreadsheet '{display_name}' does not have an action")
|
||||
|
||||
if errors:
|
||||
raise ValidationError(
|
||||
_(
|
||||
"Uh-oh! Looks like the spreadsheet file contains invalid data.\n\n%(errors)s",
|
||||
errors="\n".join(errors),
|
||||
),
|
||||
)
|
||||
|
||||
@api.depends("spreadsheet_binary_data")
|
||||
def _compute_spreadsheet_data(self):
|
||||
attachments = self.env['ir.attachment'].with_context(bin_size=False).search([
|
||||
('res_model', '=', self._name),
|
||||
('res_field', '=', 'spreadsheet_binary_data'),
|
||||
('res_id', 'in', self.ids),
|
||||
])
|
||||
data = {
|
||||
attachment.res_id: attachment.raw
|
||||
for attachment in attachments
|
||||
}
|
||||
for spreadsheet in self:
|
||||
spreadsheet.spreadsheet_data = data.get(spreadsheet.id, False)
|
||||
|
||||
def _inverse_spreadsheet_data(self):
|
||||
for spreadsheet in self:
|
||||
if not spreadsheet.spreadsheet_data:
|
||||
spreadsheet.spreadsheet_binary_data = False
|
||||
else:
|
||||
spreadsheet.spreadsheet_binary_data = base64.b64encode(spreadsheet.spreadsheet_data.encode())
|
||||
|
||||
@api.depends('display_name')
|
||||
def _compute_spreadsheet_file_name(self):
|
||||
for spreadsheet in self:
|
||||
spreadsheet.spreadsheet_file_name = f"{spreadsheet.display_name}.osheet.json"
|
||||
|
||||
@api.onchange('spreadsheet_binary_data')
|
||||
def _onchange_data_(self):
|
||||
self._check_spreadsheet_data()
|
||||
|
||||
@api.readonly
|
||||
@api.model
|
||||
def get_display_names_for_spreadsheet(self, args):
|
||||
ids_per_model = defaultdict(list)
|
||||
for arg in args:
|
||||
ids_per_model[arg["model"]].append(arg["id"])
|
||||
display_names = defaultdict(dict)
|
||||
for model, ids in ids_per_model.items():
|
||||
records = self.env[model].with_context(active_test=False).search([("id", "in", ids)])
|
||||
for record in records:
|
||||
display_names[model][record.id] = record.display_name
|
||||
|
||||
# return the display names in the same order as the input
|
||||
return [
|
||||
display_names[arg["model"]].get(arg["id"])
|
||||
for arg in args
|
||||
]
|
||||
|
||||
def _empty_spreadsheet_data_base64(self):
|
||||
"""Create an empty spreadsheet workbook.
|
||||
Encoded as base64
|
||||
"""
|
||||
data = json.dumps(self._empty_spreadsheet_data())
|
||||
return base64.b64encode(data.encode())
|
||||
|
||||
def _empty_spreadsheet_data(self):
|
||||
"""Create an empty spreadsheet workbook.
|
||||
The sheet name should be the same for all users to allow consistent references
|
||||
in formulas. It is translated for the user creating the spreadsheet.
|
||||
"""
|
||||
lang = self.env["res.lang"]._lang_get(self.env.user.lang)
|
||||
locale = lang._odoo_lang_to_spreadsheet_locale()
|
||||
return {
|
||||
"sheets": [
|
||||
{
|
||||
"id": "sheet1",
|
||||
"name": _("Sheet1"),
|
||||
}
|
||||
],
|
||||
"settings": {
|
||||
"locale": locale,
|
||||
},
|
||||
"revisionId": "START_REVISION",
|
||||
}
|
||||
|
||||
def _zip_xslx_files(self, files):
|
||||
stream = io.BytesIO()
|
||||
with zipfile.ZipFile(stream, 'w', compression=zipfile.ZIP_DEFLATED) as doc_zip:
|
||||
for f in files:
|
||||
# to reduce networking load, only the image path is sent.
|
||||
# It's replaced by the image content here.
|
||||
if 'imageSrc' in f:
|
||||
try:
|
||||
content = self._get_file_content(f['imageSrc'])
|
||||
doc_zip.writestr(f['path'], content)
|
||||
except MissingError:
|
||||
pass
|
||||
else:
|
||||
doc_zip.writestr(f['path'], f['content'])
|
||||
|
||||
return stream.getvalue()
|
||||
|
||||
def _get_file_content(self, file_path):
|
||||
if file_path.startswith('data:image/png;base64,'):
|
||||
return base64.b64decode(file_path.split(',')[1])
|
||||
match = re.match(r'/web/image/(\d+)', file_path)
|
||||
file_record = self.env['ir.binary']._find_record(
|
||||
res_model='ir.attachment',
|
||||
res_id=int(match.group(1)),
|
||||
)
|
||||
return self.env['ir.binary']._get_stream_from(file_record).read()
|
||||
Loading…
Add table
Add a link
Reference in a new issue