mirror of
https://github.com/bringout/euro-office.git
synced 2026-04-18 04:01:59 +02:00
Based on onlyoffice_odoo by Ascensio System SIA (ONLYOFFICE, LGPL-3). Rebranded and adapted for Euro-Office by bring.out d.o.o. Modules: - eurooffice_odoo: base integration - eurooffice_odoo_templates: document templates - eurooffice_odoo_oca_dms: OCA DMS integration (replaces Enterprise documents) All references renamed: onlyoffice -> eurooffice, ONLYOFFICE -> Euro-Office. Original copyright notices preserved.
529 lines
25 KiB
Python
529 lines
25 KiB
Python
#
|
|
# (c) Copyright Ascensio System SIA 2024
|
|
#
|
|
import base64
|
|
import codecs
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import zipfile
|
|
from datetime import datetime
|
|
from urllib.parse import quote
|
|
|
|
from odoo import http
|
|
from odoo.http import request
|
|
from odoo.tools import (
|
|
DEFAULT_SERVER_DATE_FORMAT,
|
|
DEFAULT_SERVER_DATETIME_FORMAT,
|
|
file_open,
|
|
get_lang,
|
|
)
|
|
|
|
from odoo.addons.eurooffice_odoo.controllers.controllers import Eurooffice_Connector, eurooffice_request
|
|
from odoo.addons.eurooffice_odoo.utils import config_utils, file_utils, jwt_utils, url_utils
|
|
from odoo.addons.eurooffice_odoo_templates.utils import config_utils as templates_config_utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Eurooffice_Inherited_Connector(Eurooffice_Connector):
|
|
@http.route("/eurooffice/template/template_content/<string:path>", auth="public")
|
|
def get_template_content(self, path):
|
|
try:
|
|
file_content = request.env["eurooffice.odoo.demo.templates"].get_template_content(path.replace("_", "/"))
|
|
|
|
return request.make_response(
|
|
file_content,
|
|
headers=[
|
|
("Content-Type", "application/pdf"),
|
|
("Content-Disposition", 'inline; filename="preview.pdf"'),
|
|
],
|
|
)
|
|
except Exception as e:
|
|
return request.not_found(f"Error: {str(e)}")
|
|
|
|
@http.route("/eurooffice/template/editor", auth="user", methods=["POST"], type="json", csrf=False)
|
|
def override_render_editor(self, attachment_id, access_token=None):
|
|
attachment = self.get_attachment(attachment_id)
|
|
if not attachment:
|
|
return request.not_found()
|
|
|
|
attachment.validate_access(access_token)
|
|
|
|
data = attachment.read(["id", "checksum", "public", "name", "access_token"])[0]
|
|
filename = data["name"]
|
|
|
|
can_read = attachment.check_access_rights("read", raise_exception=False) and file_utils.can_view(filename)
|
|
hasAccess = http.request.env.user.has_group("eurooffice_odoo_templates.group_eurooffice_odoo_templates_admin")
|
|
can_write = (
|
|
hasAccess
|
|
and attachment.check_access_rights("write", raise_exception=False)
|
|
and file_utils.can_edit(filename)
|
|
)
|
|
|
|
if not can_read:
|
|
raise Exception("cant read")
|
|
|
|
prepare_editor_values = self.prepare_editor_values(attachment, access_token, can_write)
|
|
return prepare_editor_values
|
|
|
|
|
|
class EuroofficeTemplate_Connector(http.Controller):
|
|
@http.route("/eurooffice/template/fill", auth="user", type="http")
|
|
def main(self, template_id, record_ids):
|
|
logger.info("GET /eurooffice/template/fill - template: %s, records: %s", template_id, record_ids)
|
|
internal_jwt_secret = config_utils.get_internal_jwt_secret(request.env)
|
|
oo_security_token = jwt_utils.encode_payload(request.env, {"id": request.env.user.id}, internal_jwt_secret)
|
|
|
|
try:
|
|
templates = self.fill_template(oo_security_token, record_ids, template_id)
|
|
if len(templates) == 1:
|
|
url = next(iter(templates.values()))
|
|
filename = next(iter(templates))
|
|
filename = filename.encode("ascii", "ignore").decode("ascii")
|
|
if not filename:
|
|
filename = "document.pdf"
|
|
response = eurooffice_request(url=quote(url, safe="/:?=&"), method="get")
|
|
if response.status_code == 200:
|
|
headers = [
|
|
("Content-Type", "application/pdf"),
|
|
("X-Content-Type-Options", "nosniff"),
|
|
("Content-Length", str(len(response.content))),
|
|
("Content-Disposition", f'attachment; filename="{filename}"'),
|
|
]
|
|
logger.info("GET /eurooffice/template/fill - returning single PDF: %s", filename)
|
|
return request.make_response(response.content, headers)
|
|
else:
|
|
e = f"error while downloading the document file, status = {response.status_code}"
|
|
logger.warning(e)
|
|
return request.not_found()
|
|
elif len(templates) > 1:
|
|
logger.info("GET /eurooffice/template/fill - creating ZIP with %s files", len(templates))
|
|
stream = io.BytesIO()
|
|
with zipfile.ZipFile(stream, "w", zipfile.ZIP_DEFLATED) as archive:
|
|
for filename, url in templates.items():
|
|
response = eurooffice_request(url=url, method="get")
|
|
if response.status_code == 200:
|
|
archive.writestr(filename, response.content)
|
|
else:
|
|
e = f"error while downloading the document file to be generated zip, status = {response.status_code}" # noqa: E501
|
|
logger.warning(e)
|
|
return request.not_found()
|
|
stream.seek(0)
|
|
content = stream.read()
|
|
stream.flush()
|
|
|
|
filename = f"eurooffice-templates-{datetime.now().strftime('%Y_%m_%d_%H_%M')}.zip"
|
|
headers = [
|
|
("Content-Type", "application/zip"),
|
|
("X-Content-Type-Options", "nosniff"),
|
|
("Content-Length", str(len(response.content))),
|
|
("Content-Disposition", f'attachment; filename="{filename}"'),
|
|
]
|
|
logger.info("GET /eurooffice/template/fill - returning ZIP: %s", filename)
|
|
return request.make_response(content, headers)
|
|
else:
|
|
logger.warning("no templates found")
|
|
logger.debug(templates)
|
|
return request.not_found()
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
return request.not_found()
|
|
|
|
return request.not_found()
|
|
|
|
def fill_template(self, oo_security_token, record_ids, template_id):
|
|
logger.info("fill_template - template: %s, records: %s", template_id, record_ids)
|
|
docserver_url = config_utils.get_doc_server_public_url(request.env)
|
|
docserver_url = url_utils.replace_public_url_to_internal(request.env, docserver_url)
|
|
docbuilder_url = f"{docserver_url}docbuilder"
|
|
jwt_header = config_utils.get_jwt_header(request.env)
|
|
jwt_secret = config_utils.get_jwt_secret(request.env)
|
|
odoo_url = config_utils.get_base_or_odoo_url(request.env)
|
|
|
|
docbuilder_headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
|
docbuilder_callback_url = f"{odoo_url}eurooffice/template/callback/docbuilder/fill_template?oo_security_token={oo_security_token}&record_ids={record_ids}&template_id={template_id}" # noqa: E501
|
|
docbuilder_payload = {"async": False, "url": docbuilder_callback_url}
|
|
|
|
logger.info("fill_template - docserver_url: %s", docserver_url)
|
|
logger.info("fill_template - jwt_enabled: %s", bool(jwt_secret))
|
|
|
|
if jwt_secret:
|
|
docbuilder_payload["token"] = jwt_utils.encode_payload(request.env, docbuilder_payload, jwt_secret)
|
|
docbuilder_headers[jwt_header] = "Bearer " + jwt_utils.encode_payload(
|
|
request.env, {"payload": docbuilder_payload}, jwt_secret
|
|
)
|
|
|
|
try:
|
|
if jwt_secret:
|
|
docbuilder_response = eurooffice_request(
|
|
url=docbuilder_url,
|
|
method="post",
|
|
opts={
|
|
"json": docbuilder_payload,
|
|
"headers": docbuilder_headers,
|
|
},
|
|
)
|
|
else:
|
|
docbuilder_response = eurooffice_request(
|
|
url=docbuilder_url,
|
|
method="post",
|
|
opts={
|
|
"json": docbuilder_payload,
|
|
},
|
|
)
|
|
docbuilder_json = docbuilder_response.json()
|
|
if docbuilder_json.get("error"):
|
|
e = self.get_docbuilder_error(docbuilder_json.get("error"))
|
|
logger.warning("fill_template - docbuilder error: %s", e)
|
|
raise Exception(e)
|
|
|
|
urls = docbuilder_json.get("urls")
|
|
logger.info("fill_template - success, got %s URLs", len(urls) if urls else 0)
|
|
return urls
|
|
except Exception as e:
|
|
logger.warning("fill_template - error: %s", str(e))
|
|
raise
|
|
|
|
@http.route("/eurooffice/template/callback/docbuilder/fill_template", auth="public")
|
|
def docbuilder_fill_template(self, oo_security_token, record_ids, template_id):
|
|
logger.info(
|
|
"GET /eurooffice/template/callback/docbuilder/fill_template - template: %s, records: %s",
|
|
template_id,
|
|
record_ids,
|
|
)
|
|
if not oo_security_token or not record_ids or not template_id:
|
|
logger.warning("oo_security_token or record_ids or template_id not found")
|
|
return request.not_found()
|
|
|
|
user = self.get_user_from_token(oo_security_token)
|
|
if not user:
|
|
logger.warning("user not found")
|
|
return request.not_found()
|
|
|
|
template = self.get_record("eurooffice.odoo.templates", template_id, user)
|
|
if not template:
|
|
logger.warning("template not found: %s", template_id)
|
|
return request.not_found()
|
|
|
|
attachment_id = template.attachment_id.id
|
|
if not attachment_id:
|
|
logger.warning("attachment_id of the template was not found")
|
|
return request.not_found()
|
|
|
|
model = template.template_model_model
|
|
if not model:
|
|
logger.warning("model of the template was not found")
|
|
return request.not_found()
|
|
|
|
try:
|
|
record_ids = [int(x) for x in record_ids.split(",")]
|
|
logger.info(
|
|
"GET /eurooffice/template/callback/docbuilder/fill_template - processing %s records", len(record_ids)
|
|
)
|
|
url = f"{config_utils.get_base_or_odoo_url(http.request.env)}eurooffice/template/download/{attachment_id}?oo_security_token={oo_security_token}" # noqa: E501
|
|
|
|
docbuilder_content = ""
|
|
docbuilder_script_content = ""
|
|
with file_open("eurooffice_odoo_templates/controllers/fill_template.docbuilder", "r") as f:
|
|
docbuilder_script_content = f.read()
|
|
|
|
keys = self.get_keys(attachment_id, oo_security_token)
|
|
logger.info(
|
|
"GET /eurooffice/template/callback/docbuilder/fill_template - got %s keys", len(keys) if keys else 0
|
|
)
|
|
for record_id in record_ids:
|
|
fields = self.get_fields(keys, model, record_id, user)
|
|
fields = json.dumps(fields, ensure_ascii=False)
|
|
|
|
docbuilder_content += f"""
|
|
builder.OpenFile("{url}");
|
|
var fields = {fields};
|
|
"""
|
|
docbuilder_content += docbuilder_script_content
|
|
|
|
record = self.get_record(model, record_id, user)
|
|
record_name = getattr(record, "display_name", getattr(record, "name", str(record_id)))
|
|
template_name = getattr(template, "display_name", getattr(template, "name", "Filled Template"))
|
|
filename = re.sub(r"[<>:'/\\|?*\x00-\x1f]", " ", f"{template_name} - {record_name}")
|
|
|
|
editable_form_fields = templates_config_utils.get_editable_form_fields(http.request.env)
|
|
if editable_form_fields:
|
|
docbuilder_content += f"""
|
|
builder.SaveFile("pdf", "{filename}.pdf", "<m_sJsonParams>{{"isPrint":true}}</m_sJsonParams>")
|
|
builder.CloseFile();
|
|
""" # noqa: E501
|
|
else:
|
|
docbuilder_content += f"""
|
|
builder.SaveFile("pdf", "{filename}.pdf");
|
|
builder.CloseFile();
|
|
"""
|
|
|
|
headers = {
|
|
"Content-Disposition": "attachment; filename='fill_template.docbuilder'",
|
|
"Content-Type": "text/plain",
|
|
}
|
|
|
|
logger.info("GET /eurooffice/template/callback/docbuilder/fill_template - success")
|
|
return request.make_response(docbuilder_content, headers)
|
|
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
return request.not_found()
|
|
|
|
def get_keys(self, attachment_id, oo_security_token):
|
|
logger.info("get_keys - attachment: %s", attachment_id)
|
|
docserver_url = config_utils.get_doc_server_public_url(request.env)
|
|
docserver_url = url_utils.replace_public_url_to_internal(request.env, docserver_url)
|
|
docbuilder_url = f"{docserver_url}docbuilder"
|
|
jwt_header = config_utils.get_jwt_header(request.env)
|
|
jwt_secret = config_utils.get_jwt_secret(request.env)
|
|
odoo_url = config_utils.get_base_or_odoo_url(request.env)
|
|
|
|
docbuilder_headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
|
docbuilder_callback_url = f"{odoo_url}eurooffice/template/callback/docbuilder/get_keys?attachment_id={attachment_id}&oo_security_token={oo_security_token}" # noqa: E501
|
|
docbuilder_payload = {"async": False, "url": docbuilder_callback_url}
|
|
|
|
if jwt_secret:
|
|
docbuilder_payload["token"] = jwt_utils.encode_payload(request.env, docbuilder_payload, jwt_secret)
|
|
docbuilder_headers[jwt_header] = "Bearer " + jwt_utils.encode_payload(
|
|
request.env, {"payload": docbuilder_payload}, jwt_secret
|
|
)
|
|
|
|
try:
|
|
if jwt_secret:
|
|
docbuilder_response = eurooffice_request(
|
|
url=docbuilder_url,
|
|
method="post",
|
|
opts={
|
|
"json": docbuilder_payload,
|
|
"headers": docbuilder_headers,
|
|
},
|
|
)
|
|
else:
|
|
docbuilder_response = eurooffice_request(
|
|
url=docbuilder_url,
|
|
method="post",
|
|
opts={
|
|
"json": docbuilder_payload,
|
|
},
|
|
)
|
|
docbuilder_json = docbuilder_response.json()
|
|
if docbuilder_json.get("error"):
|
|
e = self.get_docbuilder_error(docbuilder_json.get("error"))
|
|
raise Exception(e)
|
|
|
|
urls = docbuilder_json.get("urls")
|
|
keys_url = urls.get("keys.txt")
|
|
keys_response = eurooffice_request(
|
|
url=keys_url,
|
|
method="get",
|
|
)
|
|
response_content = codecs.decode(keys_response.content, "utf-8-sig")
|
|
|
|
logger.info("get_keys - success")
|
|
return json.loads(response_content)
|
|
except Exception as e:
|
|
logger.warning("get_keys - error: %s", str(e))
|
|
raise
|
|
|
|
@http.route("/eurooffice/template/callback/docbuilder/get_keys", auth="public")
|
|
def docbuilder_get_keys(self, attachment_id, oo_security_token):
|
|
logger.info("GET /eurooffice/template/callback/docbuilder/get_keys - attachment: %s", attachment_id)
|
|
if not attachment_id or not oo_security_token:
|
|
logger.warning("attachment_id or oo_security_token not found")
|
|
return request.not_found()
|
|
|
|
url = f"{config_utils.get_base_or_odoo_url(http.request.env)}eurooffice/template/download/{attachment_id}?oo_security_token={oo_security_token}" # noqa: E501
|
|
docbuilder_content = f"""
|
|
builder.OpenFile("{url}");
|
|
"""
|
|
|
|
with file_open("eurooffice_odoo_templates/controllers/get_keys.docbuilder", "r") as f:
|
|
docbuilder_content = docbuilder_content + f.read()
|
|
|
|
headers = {
|
|
"Content-Disposition": "attachment; filename='get_keys.docbuilder'",
|
|
"Content-Type": "text/plain",
|
|
}
|
|
|
|
logger.info("GET /eurooffice/template/callback/docbuilder/get_keys - success")
|
|
return request.make_response(docbuilder_content, headers)
|
|
|
|
@http.route("/eurooffice/template/download/<int:attachment_id>", auth="public")
|
|
def download(self, attachment_id, oo_security_token):
|
|
logger.info("GET /eurooffice/template/download - attachment: %s", attachment_id)
|
|
if not attachment_id or not oo_security_token:
|
|
logger.warning("attachment_id or oo_security_token not found")
|
|
return request.not_found()
|
|
|
|
attachment = self.get_record("ir.attachment", attachment_id, self.get_user_from_token(oo_security_token))
|
|
if attachment:
|
|
content = base64.b64decode(attachment.datas)
|
|
headers = {
|
|
"Content-Type": "application/pdf",
|
|
"Content-Disposition": "attachment; filename=template.pdf",
|
|
}
|
|
logger.info("GET /eurooffice/template/download - success")
|
|
return request.make_response(content, headers)
|
|
else:
|
|
logger.warning("attachment not found: %s", attachment_id)
|
|
return request.not_found()
|
|
|
|
def get_fields(self, keys, model, record_id, user): # noqa: C901
|
|
logger.info("get_fields - model: %s, record: %s", model, record_id)
|
|
|
|
def convert_keys(input_list):
|
|
output_dict = {}
|
|
for item in input_list:
|
|
if " " in item:
|
|
keys = item.split(" ")
|
|
current_dict = output_dict
|
|
for key in keys[:-1]:
|
|
current_dict = current_dict.setdefault(key, {})
|
|
current_dict[keys[-1]] = None
|
|
else:
|
|
output_dict[item] = None
|
|
|
|
def dict_to_list(input_dict):
|
|
output_list = []
|
|
for key, value in input_dict.items():
|
|
if isinstance(value, dict):
|
|
output_list.append({key: dict_to_list(value)})
|
|
else:
|
|
output_list.append(key)
|
|
return output_list
|
|
|
|
return dict_to_list(output_dict)
|
|
|
|
def get_related_field(keys, model, record_id): # noqa: C901
|
|
result = {}
|
|
record = self.get_record(model, record_id, user)
|
|
if not record:
|
|
logger.warning("Record not found")
|
|
return
|
|
for field in keys:
|
|
try:
|
|
if isinstance(field, dict):
|
|
related_field = list(field.keys())[0]
|
|
if related_field not in record._fields:
|
|
continue
|
|
field_type = record._fields[related_field].type
|
|
related_keys = field[related_field]
|
|
if field_type in ["one2many", "many2many", "many2one"]:
|
|
related_model = record._fields[related_field].comodel_name
|
|
related_record_ids = record.read([related_field])[0][related_field]
|
|
if not related_record_ids:
|
|
continue
|
|
if field_type == "many2one" and isinstance(related_record_ids, tuple):
|
|
related_data = get_related_field(related_keys, related_model, related_record_ids[0])
|
|
else:
|
|
related_data = []
|
|
for record_id in related_record_ids:
|
|
related_data_temp = get_related_field(related_keys, related_model, record_id)
|
|
if related_data_temp:
|
|
related_data.append(related_data_temp)
|
|
if related_data:
|
|
result[related_field] = related_data
|
|
else:
|
|
if field not in record._fields:
|
|
continue
|
|
field_type = record._fields[field].type
|
|
data = record.read([field])[0][field]
|
|
if field_type in ["html", "json"]:
|
|
continue # TODO
|
|
elif field_type == "boolean":
|
|
result[field] = str(data).lower()
|
|
elif isinstance(data, tuple):
|
|
result[field] = str(data[1])
|
|
elif field_type == "binary" and isinstance(data, bytes):
|
|
img = re.search(r"'(.*?)'", str(data))
|
|
if img:
|
|
result[field] = img.group(1)
|
|
elif data:
|
|
if field_type in ["float", "integer", "char", "text"]:
|
|
result[field] = str(data)
|
|
elif field_type == "monetary":
|
|
data = f"{float(data):,.2f}"
|
|
currency_field_name = record._fields[field].currency_field
|
|
if currency_field_name:
|
|
currency = getattr(record, currency_field_name).name
|
|
result[field] = f"{data} {currency}" if currency else str(data)
|
|
else:
|
|
result[field] = str(data)
|
|
elif field_type == "date":
|
|
date_format = None
|
|
lang = request.env["res.lang"].search([("code", "=", user.lang)], limit=1)
|
|
user_date_format = lang.date_format
|
|
if user_date_format:
|
|
date_format = user_date_format
|
|
else:
|
|
date_format = get_lang(request.env).date_format
|
|
format_to_use = date_format or DEFAULT_SERVER_DATE_FORMAT
|
|
result[field] = str(data.strftime(format_to_use))
|
|
elif field_type == "datetime":
|
|
date_format = None
|
|
time_format = None
|
|
lang = request.env["res.lang"].search([("code", "=", user.lang)], limit=1)
|
|
user_date_format = lang.date_format
|
|
user_time_format = lang.time_format
|
|
if user_date_format and user_time_format:
|
|
date_format = user_date_format
|
|
time_format = user_time_format
|
|
else:
|
|
date_format = get_lang(request.env).date_format
|
|
time_format = get_lang(request.env).time_format
|
|
if date_format and time_format:
|
|
format_to_use = f"{date_format} {time_format}"
|
|
else:
|
|
format_to_use = DEFAULT_SERVER_DATETIME_FORMAT
|
|
result[field] = str(data.strftime(format_to_use))
|
|
elif field_type == "selection":
|
|
selection = record._fields[field].selection
|
|
if isinstance(selection, list):
|
|
result[field] = str(dict(selection).get(data))
|
|
else:
|
|
result[field] = str(data)
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
continue
|
|
return result
|
|
|
|
keys = convert_keys(keys)
|
|
return get_related_field(keys, model, record_id)
|
|
|
|
def get_record(self, model, record_id, user=None):
|
|
logger.info("get_record - model: %s, record: %s", model, record_id)
|
|
if not isinstance(record_id, list):
|
|
record_id = [int(record_id)]
|
|
model = request.env[model].sudo()
|
|
context = {"lang": request.env.context.get("lang", "en_US")}
|
|
if user:
|
|
model = model.with_user(user)
|
|
context["lang"] = user.lang
|
|
context["uid"] = user.id
|
|
try:
|
|
return model.with_context(**context).browse(record_id).exists() # TODO: Add .sudo()
|
|
except Exception as e:
|
|
logger.warning(e)
|
|
raise
|
|
|
|
def get_user_from_token(self, token):
|
|
if not token:
|
|
raise Exception("missing security token")
|
|
user_id = jwt_utils.decode_token(request.env, token, config_utils.get_internal_jwt_secret(request.env))["id"]
|
|
user = request.env["res.users"].sudo().browse(user_id).exists().ensure_one()
|
|
logger.info("get_user_from_token - user: %s", user.name)
|
|
return user
|
|
|
|
def get_docbuilder_error(self, error_code):
|
|
docbuilder_messages = {
|
|
-1: "Unknown error.",
|
|
-2: "Generation timeout error.",
|
|
-3: "Document generation error.",
|
|
-4: "Error while downloading the document file to be generated.",
|
|
-6: "Error while accessing the document generation result database.",
|
|
-8: "Invalid token.",
|
|
}
|
|
return docbuilder_messages.get(error_code, "Error code not recognized.")
|