# # (c) Copyright Ascensio System SIA 2024 # import base64 import codecs import io import json import logging import re import zipfile from datetime import datetime from urllib.parse import quote from odoo import http from odoo.http import request from odoo.tools import ( DEFAULT_SERVER_DATE_FORMAT, DEFAULT_SERVER_DATETIME_FORMAT, file_open, get_lang, ) from odoo.addons.eurooffice_odoo.controllers.controllers import Eurooffice_Connector, eurooffice_request from odoo.addons.eurooffice_odoo.utils import config_utils, file_utils, jwt_utils, url_utils from odoo.addons.eurooffice_odoo_templates.utils import config_utils as templates_config_utils logger = logging.getLogger(__name__) class Eurooffice_Inherited_Connector(Eurooffice_Connector): @http.route("/eurooffice/template/template_content/", auth="public") def get_template_content(self, path): try: file_content = request.env["eurooffice.odoo.demo.templates"].get_template_content(path.replace("_", "/")) return request.make_response( file_content, headers=[ ("Content-Type", "application/pdf"), ("Content-Disposition", 'inline; filename="preview.pdf"'), ], ) except Exception as e: return request.not_found(f"Error: {str(e)}") @http.route("/eurooffice/template/editor", auth="user", methods=["POST"], type="json", csrf=False) def override_render_editor(self, attachment_id, access_token=None): attachment = self.get_attachment(attachment_id) if not attachment: return request.not_found() attachment.validate_access(access_token) data = attachment.read(["id", "checksum", "public", "name", "access_token"])[0] filename = data["name"] can_read = attachment.check_access_rights("read", raise_exception=False) and file_utils.can_view(filename) hasAccess = http.request.env.user.has_group("eurooffice_odoo_templates.group_eurooffice_odoo_templates_admin") can_write = ( hasAccess and attachment.check_access_rights("write", raise_exception=False) and file_utils.can_edit(filename) ) if not can_read: raise Exception("cant read") prepare_editor_values = self.prepare_editor_values(attachment, access_token, can_write) return prepare_editor_values class EuroofficeTemplate_Connector(http.Controller): @http.route("/eurooffice/template/fill", auth="user", type="http") def main(self, template_id, record_ids): logger.info("GET /eurooffice/template/fill - template: %s, records: %s", template_id, record_ids) internal_jwt_secret = config_utils.get_internal_jwt_secret(request.env) oo_security_token = jwt_utils.encode_payload(request.env, {"id": request.env.user.id}, internal_jwt_secret) try: templates = self.fill_template(oo_security_token, record_ids, template_id) if len(templates) == 1: url = next(iter(templates.values())) filename = next(iter(templates)) filename = filename.encode("ascii", "ignore").decode("ascii") if not filename: filename = "document.pdf" response = eurooffice_request(url=quote(url, safe="/:?=&"), method="get") if response.status_code == 200: headers = [ ("Content-Type", "application/pdf"), ("X-Content-Type-Options", "nosniff"), ("Content-Length", str(len(response.content))), ("Content-Disposition", f'attachment; filename="{filename}"'), ] logger.info("GET /eurooffice/template/fill - returning single PDF: %s", filename) return request.make_response(response.content, headers) else: e = f"error while downloading the document file, status = {response.status_code}" logger.warning(e) return request.not_found() elif len(templates) > 1: logger.info("GET /eurooffice/template/fill - creating ZIP with %s files", len(templates)) stream = io.BytesIO() with zipfile.ZipFile(stream, "w", zipfile.ZIP_DEFLATED) as archive: for filename, url in templates.items(): response = eurooffice_request(url=url, method="get") if response.status_code == 200: archive.writestr(filename, response.content) else: e = f"error while downloading the document file to be generated zip, status = {response.status_code}" # noqa: E501 logger.warning(e) return request.not_found() stream.seek(0) content = stream.read() stream.flush() filename = f"eurooffice-templates-{datetime.now().strftime('%Y_%m_%d_%H_%M')}.zip" headers = [ ("Content-Type", "application/zip"), ("X-Content-Type-Options", "nosniff"), ("Content-Length", str(len(response.content))), ("Content-Disposition", f'attachment; filename="{filename}"'), ] logger.info("GET /eurooffice/template/fill - returning ZIP: %s", filename) return request.make_response(content, headers) else: logger.warning("no templates found") logger.debug(templates) return request.not_found() except Exception as e: logger.warning(e) return request.not_found() return request.not_found() def fill_template(self, oo_security_token, record_ids, template_id): logger.info("fill_template - template: %s, records: %s", template_id, record_ids) docserver_url = config_utils.get_doc_server_public_url(request.env) docserver_url = url_utils.replace_public_url_to_internal(request.env, docserver_url) docbuilder_url = f"{docserver_url}docbuilder" jwt_header = config_utils.get_jwt_header(request.env) jwt_secret = config_utils.get_jwt_secret(request.env) odoo_url = config_utils.get_base_or_odoo_url(request.env) docbuilder_headers = {"Content-Type": "application/json", "Accept": "application/json"} docbuilder_callback_url = f"{odoo_url}eurooffice/template/callback/docbuilder/fill_template?oo_security_token={oo_security_token}&record_ids={record_ids}&template_id={template_id}" # noqa: E501 docbuilder_payload = {"async": False, "url": docbuilder_callback_url} logger.info("fill_template - docserver_url: %s", docserver_url) logger.info("fill_template - jwt_enabled: %s", bool(jwt_secret)) if jwt_secret: docbuilder_payload["token"] = jwt_utils.encode_payload(request.env, docbuilder_payload, jwt_secret) docbuilder_headers[jwt_header] = "Bearer " + jwt_utils.encode_payload( request.env, {"payload": docbuilder_payload}, jwt_secret ) try: if jwt_secret: docbuilder_response = eurooffice_request( url=docbuilder_url, method="post", opts={ "json": docbuilder_payload, "headers": docbuilder_headers, }, ) else: docbuilder_response = eurooffice_request( url=docbuilder_url, method="post", opts={ "json": docbuilder_payload, }, ) docbuilder_json = docbuilder_response.json() if docbuilder_json.get("error"): e = self.get_docbuilder_error(docbuilder_json.get("error")) logger.warning("fill_template - docbuilder error: %s", e) raise Exception(e) urls = docbuilder_json.get("urls") logger.info("fill_template - success, got %s URLs", len(urls) if urls else 0) return urls except Exception as e: logger.warning("fill_template - error: %s", str(e)) raise @http.route("/eurooffice/template/callback/docbuilder/fill_template", auth="public") def docbuilder_fill_template(self, oo_security_token, record_ids, template_id): logger.info( "GET /eurooffice/template/callback/docbuilder/fill_template - template: %s, records: %s", template_id, record_ids, ) if not oo_security_token or not record_ids or not template_id: logger.warning("oo_security_token or record_ids or template_id not found") return request.not_found() user = self.get_user_from_token(oo_security_token) if not user: logger.warning("user not found") return request.not_found() template = self.get_record("eurooffice.odoo.templates", template_id, user) if not template: logger.warning("template not found: %s", template_id) return request.not_found() attachment_id = template.attachment_id.id if not attachment_id: logger.warning("attachment_id of the template was not found") return request.not_found() model = template.template_model_model if not model: logger.warning("model of the template was not found") return request.not_found() try: record_ids = [int(x) for x in record_ids.split(",")] logger.info( "GET /eurooffice/template/callback/docbuilder/fill_template - processing %s records", len(record_ids) ) url = f"{config_utils.get_base_or_odoo_url(http.request.env)}eurooffice/template/download/{attachment_id}?oo_security_token={oo_security_token}" # noqa: E501 docbuilder_content = "" docbuilder_script_content = "" with file_open("eurooffice_odoo_templates/controllers/fill_template.docbuilder", "r") as f: docbuilder_script_content = f.read() keys = self.get_keys(attachment_id, oo_security_token) logger.info( "GET /eurooffice/template/callback/docbuilder/fill_template - got %s keys", len(keys) if keys else 0 ) for record_id in record_ids: fields = self.get_fields(keys, model, record_id, user) fields = json.dumps(fields, ensure_ascii=False) docbuilder_content += f""" builder.OpenFile("{url}"); var fields = {fields}; """ docbuilder_content += docbuilder_script_content record = self.get_record(model, record_id, user) record_name = getattr(record, "display_name", getattr(record, "name", str(record_id))) template_name = getattr(template, "display_name", getattr(template, "name", "Filled Template")) filename = re.sub(r"[<>:'/\\|?*\x00-\x1f]", " ", f"{template_name} - {record_name}") editable_form_fields = templates_config_utils.get_editable_form_fields(http.request.env) if editable_form_fields: docbuilder_content += f""" builder.SaveFile("pdf", "{filename}.pdf", "{{"isPrint":true}}") builder.CloseFile(); """ # noqa: E501 else: docbuilder_content += f""" builder.SaveFile("pdf", "{filename}.pdf"); builder.CloseFile(); """ headers = { "Content-Disposition": "attachment; filename='fill_template.docbuilder'", "Content-Type": "text/plain", } logger.info("GET /eurooffice/template/callback/docbuilder/fill_template - success") return request.make_response(docbuilder_content, headers) except Exception as e: logger.warning(e) return request.not_found() def get_keys(self, attachment_id, oo_security_token): logger.info("get_keys - attachment: %s", attachment_id) docserver_url = config_utils.get_doc_server_public_url(request.env) docserver_url = url_utils.replace_public_url_to_internal(request.env, docserver_url) docbuilder_url = f"{docserver_url}docbuilder" jwt_header = config_utils.get_jwt_header(request.env) jwt_secret = config_utils.get_jwt_secret(request.env) odoo_url = config_utils.get_base_or_odoo_url(request.env) docbuilder_headers = {"Content-Type": "application/json", "Accept": "application/json"} docbuilder_callback_url = f"{odoo_url}eurooffice/template/callback/docbuilder/get_keys?attachment_id={attachment_id}&oo_security_token={oo_security_token}" # noqa: E501 docbuilder_payload = {"async": False, "url": docbuilder_callback_url} if jwt_secret: docbuilder_payload["token"] = jwt_utils.encode_payload(request.env, docbuilder_payload, jwt_secret) docbuilder_headers[jwt_header] = "Bearer " + jwt_utils.encode_payload( request.env, {"payload": docbuilder_payload}, jwt_secret ) try: if jwt_secret: docbuilder_response = eurooffice_request( url=docbuilder_url, method="post", opts={ "json": docbuilder_payload, "headers": docbuilder_headers, }, ) else: docbuilder_response = eurooffice_request( url=docbuilder_url, method="post", opts={ "json": docbuilder_payload, }, ) docbuilder_json = docbuilder_response.json() if docbuilder_json.get("error"): e = self.get_docbuilder_error(docbuilder_json.get("error")) raise Exception(e) urls = docbuilder_json.get("urls") keys_url = urls.get("keys.txt") keys_response = eurooffice_request( url=keys_url, method="get", ) response_content = codecs.decode(keys_response.content, "utf-8-sig") logger.info("get_keys - success") return json.loads(response_content) except Exception as e: logger.warning("get_keys - error: %s", str(e)) raise @http.route("/eurooffice/template/callback/docbuilder/get_keys", auth="public") def docbuilder_get_keys(self, attachment_id, oo_security_token): logger.info("GET /eurooffice/template/callback/docbuilder/get_keys - attachment: %s", attachment_id) if not attachment_id or not oo_security_token: logger.warning("attachment_id or oo_security_token not found") return request.not_found() url = f"{config_utils.get_base_or_odoo_url(http.request.env)}eurooffice/template/download/{attachment_id}?oo_security_token={oo_security_token}" # noqa: E501 docbuilder_content = f""" builder.OpenFile("{url}"); """ with file_open("eurooffice_odoo_templates/controllers/get_keys.docbuilder", "r") as f: docbuilder_content = docbuilder_content + f.read() headers = { "Content-Disposition": "attachment; filename='get_keys.docbuilder'", "Content-Type": "text/plain", } logger.info("GET /eurooffice/template/callback/docbuilder/get_keys - success") return request.make_response(docbuilder_content, headers) @http.route("/eurooffice/template/download/", auth="public") def download(self, attachment_id, oo_security_token): logger.info("GET /eurooffice/template/download - attachment: %s", attachment_id) if not attachment_id or not oo_security_token: logger.warning("attachment_id or oo_security_token not found") return request.not_found() attachment = self.get_record("ir.attachment", attachment_id, self.get_user_from_token(oo_security_token)) if attachment: content = base64.b64decode(attachment.datas) headers = { "Content-Type": "application/pdf", "Content-Disposition": "attachment; filename=template.pdf", } logger.info("GET /eurooffice/template/download - success") return request.make_response(content, headers) else: logger.warning("attachment not found: %s", attachment_id) return request.not_found() def get_fields(self, keys, model, record_id, user): # noqa: C901 logger.info("get_fields - model: %s, record: %s", model, record_id) def convert_keys(input_list): output_dict = {} for item in input_list: if " " in item: keys = item.split(" ") current_dict = output_dict for key in keys[:-1]: current_dict = current_dict.setdefault(key, {}) current_dict[keys[-1]] = None else: output_dict[item] = None def dict_to_list(input_dict): output_list = [] for key, value in input_dict.items(): if isinstance(value, dict): output_list.append({key: dict_to_list(value)}) else: output_list.append(key) return output_list return dict_to_list(output_dict) def get_related_field(keys, model, record_id): # noqa: C901 result = {} record = self.get_record(model, record_id, user) if not record: logger.warning("Record not found") return for field in keys: try: if isinstance(field, dict): related_field = list(field.keys())[0] if related_field not in record._fields: continue field_type = record._fields[related_field].type related_keys = field[related_field] if field_type in ["one2many", "many2many", "many2one"]: related_model = record._fields[related_field].comodel_name related_record_ids = record.read([related_field])[0][related_field] if not related_record_ids: continue if field_type == "many2one" and isinstance(related_record_ids, tuple): related_data = get_related_field(related_keys, related_model, related_record_ids[0]) else: related_data = [] for record_id in related_record_ids: related_data_temp = get_related_field(related_keys, related_model, record_id) if related_data_temp: related_data.append(related_data_temp) if related_data: result[related_field] = related_data else: if field not in record._fields: continue field_type = record._fields[field].type data = record.read([field])[0][field] if field_type in ["html", "json"]: continue # TODO elif field_type == "boolean": result[field] = str(data).lower() elif isinstance(data, tuple): result[field] = str(data[1]) elif field_type == "binary" and isinstance(data, bytes): img = re.search(r"'(.*?)'", str(data)) if img: result[field] = img.group(1) elif data: if field_type in ["float", "integer", "char", "text"]: result[field] = str(data) elif field_type == "monetary": data = f"{float(data):,.2f}" currency_field_name = record._fields[field].currency_field if currency_field_name: currency = getattr(record, currency_field_name).name result[field] = f"{data} {currency}" if currency else str(data) else: result[field] = str(data) elif field_type == "date": date_format = None lang = request.env["res.lang"].search([("code", "=", user.lang)], limit=1) user_date_format = lang.date_format if user_date_format: date_format = user_date_format else: date_format = get_lang(request.env).date_format format_to_use = date_format or DEFAULT_SERVER_DATE_FORMAT result[field] = str(data.strftime(format_to_use)) elif field_type == "datetime": date_format = None time_format = None lang = request.env["res.lang"].search([("code", "=", user.lang)], limit=1) user_date_format = lang.date_format user_time_format = lang.time_format if user_date_format and user_time_format: date_format = user_date_format time_format = user_time_format else: date_format = get_lang(request.env).date_format time_format = get_lang(request.env).time_format if date_format and time_format: format_to_use = f"{date_format} {time_format}" else: format_to_use = DEFAULT_SERVER_DATETIME_FORMAT result[field] = str(data.strftime(format_to_use)) elif field_type == "selection": selection = record._fields[field].selection if isinstance(selection, list): result[field] = str(dict(selection).get(data)) else: result[field] = str(data) except Exception as e: logger.warning(e) continue return result keys = convert_keys(keys) return get_related_field(keys, model, record_id) def get_record(self, model, record_id, user=None): logger.info("get_record - model: %s, record: %s", model, record_id) if not isinstance(record_id, list): record_id = [int(record_id)] model = request.env[model].sudo() context = {"lang": request.env.context.get("lang", "en_US")} if user: model = model.with_user(user) context["lang"] = user.lang context["uid"] = user.id try: return model.with_context(**context).browse(record_id).exists() # TODO: Add .sudo() except Exception as e: logger.warning(e) raise def get_user_from_token(self, token): if not token: raise Exception("missing security token") user_id = jwt_utils.decode_token(request.env, token, config_utils.get_internal_jwt_secret(request.env))["id"] user = request.env["res.users"].sudo().browse(user_id).exists().ensure_one() logger.info("get_user_from_token - user: %s", user.name) return user def get_docbuilder_error(self, error_code): docbuilder_messages = { -1: "Unknown error.", -2: "Generation timeout error.", -3: "Document generation error.", -4: "Error while downloading the document file to be generated.", -6: "Error while accessing the document generation result database.", -8: "Invalid token.", } return docbuilder_messages.get(error_code, "Error code not recognized.")