mirror of
https://github.com/bringout/euro-office.git
synced 2026-04-18 04:22:01 +02:00
init: Euro-Office Odoo 16.0 modules
Based on onlyoffice_odoo by Ascensio System SIA (ONLYOFFICE, LGPL-3). Rebranded and adapted for Euro-Office by bring.out d.o.o. Modules: - eurooffice_odoo: base integration - eurooffice_odoo_templates: document templates - eurooffice_odoo_oca_dms: OCA DMS integration (replaces Enterprise documents) All references renamed: onlyoffice -> eurooffice, ONLYOFFICE -> Euro-Office. Original copyright notices preserved.
This commit is contained in:
commit
b59a9dc6bb
347 changed files with 16699 additions and 0 deletions
|
|
@ -0,0 +1 @@
|
|||
from . import controllers
|
||||
|
|
@ -0,0 +1,645 @@
|
|||
#
|
||||
# (c) Copyright Ascensio System SIA 2024
|
||||
#
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import string
|
||||
import time
|
||||
from mimetypes import guess_type
|
||||
from urllib.request import urlopen
|
||||
|
||||
import markupsafe
|
||||
import requests
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
from odoo import _, http
|
||||
from odoo.exceptions import AccessError, UserError
|
||||
from odoo.http import request
|
||||
|
||||
from odoo.addons.eurooffice_odoo.utils import config_utils, file_utils, jwt_utils, url_utils
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
_mobile_regex = r"android|avantgo|playbook|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od|ad)|iris|kindle|lge |maemo|midp|mmp|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\\/|plucker|pocket|psp|symbian|treo|up\\.(browser|link)|vodafone|wap|windows (ce|phone)|xda|xiino" # noqa: E501
|
||||
|
||||
|
||||
def eurooffice_urlopen(url, timeout=120, context=None):
|
||||
url = url_utils.replace_public_url_to_internal(request.env, url)
|
||||
cert_verify_disabled = config_utils.get_certificate_verify_disabled(request.env)
|
||||
|
||||
if cert_verify_disabled and url.startswith("https://"):
|
||||
import ssl
|
||||
|
||||
context = context or ssl._create_unverified_context()
|
||||
|
||||
return urlopen(url, timeout=timeout, context=context)
|
||||
|
||||
|
||||
def eurooffice_request(url, method, opts=None):
|
||||
_logger.info("External request: %s %s", method.upper(), url)
|
||||
url = url_utils.replace_public_url_to_internal(request.env, url)
|
||||
cert_verify_disabled = config_utils.get_certificate_verify_disabled(request.env)
|
||||
if opts is None:
|
||||
opts = {}
|
||||
|
||||
if url.startswith("https://") and cert_verify_disabled and "verify" not in opts:
|
||||
opts["verify"] = False
|
||||
|
||||
if "timeout" not in opts and "timeout" not in url:
|
||||
opts["timeout"] = 120
|
||||
|
||||
try:
|
||||
if method.lower() == "post":
|
||||
response = requests.post(url, **opts)
|
||||
else:
|
||||
response = requests.get(url, **opts)
|
||||
|
||||
_logger.info("External request completed: %s %s - status: %s", method.upper(), url, response.status_code)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
error_details = {
|
||||
"error_type": type(e).__name__,
|
||||
"url": url,
|
||||
"method": method.upper(),
|
||||
"request_options": opts,
|
||||
"original_error": str(e),
|
||||
}
|
||||
|
||||
_logger.error("Euro-Office request failed: %s", error_details)
|
||||
raise requests.exceptions.RequestException(
|
||||
f"Euro-Office request failed to {method.upper()} {url}: {str(e)}"
|
||||
) from e
|
||||
|
||||
except Exception as e:
|
||||
error_details = {
|
||||
"error_type": type(e).__name__,
|
||||
"url": url,
|
||||
"method": method.upper(),
|
||||
"request_options": opts,
|
||||
"original_error": str(e),
|
||||
}
|
||||
|
||||
_logger.error("Unexpected error in Euro-Office request: %s", error_details)
|
||||
raise requests.exceptions.RequestException(
|
||||
f"Unexpected error in Euro-Office request to {method.upper()} {url}: {str(e)}"
|
||||
) from e
|
||||
|
||||
|
||||
class Eurooffice_Connector(http.Controller):
|
||||
@http.route("/eurooffice/editor/get_config", auth="user", methods=["POST"], type="json", csrf=False)
|
||||
def get_config(self, document_id=None, attachment_id=None, access_token=None):
|
||||
_logger.info("POST /eurooffice/editor/get_config - document: %s, attachment: %s", document_id, attachment_id)
|
||||
document = None
|
||||
if document_id:
|
||||
document = request.env["documents.document"].browse(int(document_id))
|
||||
attachment_id = document.attachment_id.id
|
||||
|
||||
attachment = self.get_attachment(attachment_id)
|
||||
if not attachment:
|
||||
_logger.warning("POST /eurooffice/editor/get_config - attachment not found: %s", attachment_id)
|
||||
return request.not_found()
|
||||
|
||||
attachment.validate_access(access_token)
|
||||
|
||||
if attachment.res_model == "documents.document" and not document:
|
||||
document = request.env["documents.document"].browse(int(attachment.res_id))
|
||||
|
||||
if document:
|
||||
self._check_document_access(document)
|
||||
|
||||
data = attachment.read(["id", "checksum", "public", "name", "access_token"])[0]
|
||||
filename = data["name"]
|
||||
|
||||
can_read = attachment.check_access_rights("read", raise_exception=False) and file_utils.can_view(filename)
|
||||
|
||||
if not can_read:
|
||||
_logger.warning("POST /eurooffice/editor/get_config - no read access: %s", attachment_id)
|
||||
raise Exception("cant read")
|
||||
|
||||
can_write = attachment.check_access_rights("write", raise_exception=False) and file_utils.can_edit(filename)
|
||||
|
||||
config = self.prepare_editor_values(attachment, access_token, can_write)
|
||||
_logger.info("POST /eurooffice/editor/get_config - success: %s", attachment_id)
|
||||
return config
|
||||
|
||||
@http.route("/eurooffice/file/content/test.txt", auth="public")
|
||||
def get_test_file(self):
|
||||
_logger.info("GET /eurooffice/file/content/test.txt")
|
||||
content = "test"
|
||||
headers = [
|
||||
("Content-Length", len(content)),
|
||||
("Content-Type", "text/plain"),
|
||||
("Content-Disposition", "attachment; filename=test.txt"),
|
||||
]
|
||||
response = request.make_response(content, headers)
|
||||
return response
|
||||
|
||||
@http.route("/eurooffice/file/content/<int:attachment_id>", auth="public")
|
||||
def get_file_content(self, attachment_id, oo_security_token=None, access_token=None):
|
||||
_logger.info("GET /eurooffice/file/content/%s", attachment_id)
|
||||
attachment = self.get_attachment(attachment_id, self.get_user_from_token(oo_security_token))
|
||||
if not attachment:
|
||||
_logger.warning("GET /eurooffice/file/content/%s - attachment not found", attachment_id)
|
||||
return request.not_found()
|
||||
|
||||
attachment.validate_access(access_token)
|
||||
attachment.check_access_rights("read")
|
||||
|
||||
if jwt_utils.is_jwt_enabled(request.env):
|
||||
token = request.httprequest.headers.get(config_utils.get_jwt_header(request.env))
|
||||
if token:
|
||||
token = token[len("Bearer ") :]
|
||||
|
||||
if not token:
|
||||
_logger.warning("GET /eurooffice/file/content/%s - JWT token missing", attachment_id)
|
||||
raise Exception("expected JWT")
|
||||
|
||||
jwt_utils.decode_token(request.env, token)
|
||||
|
||||
stream = request.env["ir.binary"]._get_stream_from(attachment, "datas", None, "name", None)
|
||||
|
||||
send_file_kwargs = {"as_attachment": True, "max_age": None}
|
||||
|
||||
_logger.info("GET /eurooffice/file/content/%s - success", attachment_id)
|
||||
return stream.get_response(**send_file_kwargs)
|
||||
|
||||
@http.route("/eurooffice/editor/<int:attachment_id>", auth="public", type="http", website=True)
|
||||
def render_editor(self, attachment_id, access_token=None):
|
||||
_logger.info("GET /eurooffice/editor/%s", attachment_id)
|
||||
attachment = self.get_attachment(attachment_id)
|
||||
if not attachment:
|
||||
_logger.warning("GET /eurooffice/editor/%s - attachment not found", attachment_id)
|
||||
return request.not_found()
|
||||
|
||||
attachment.validate_access(access_token)
|
||||
|
||||
if attachment.res_model == "documents.document":
|
||||
document = request.env["documents.document"].browse(int(attachment.res_id))
|
||||
self._check_document_access(document)
|
||||
|
||||
data = attachment.read(["id", "checksum", "public", "name", "access_token"])[0]
|
||||
filename = data["name"]
|
||||
|
||||
can_read = attachment.check_access_rights("read", raise_exception=False) and file_utils.can_view(filename)
|
||||
can_write = attachment.check_access_rights("write", raise_exception=False) and file_utils.can_edit(filename)
|
||||
|
||||
if not can_read:
|
||||
_logger.warning("GET /eurooffice/editor/%s - no read access", attachment_id)
|
||||
raise Exception("cant read")
|
||||
|
||||
_logger.info("GET /eurooffice/editor/%s - success", attachment_id)
|
||||
return request.render(
|
||||
"eurooffice_odoo.eurooffice_editor", self.prepare_editor_values(attachment, access_token, can_write)
|
||||
)
|
||||
|
||||
@http.route(
|
||||
"/eurooffice/editor/callback/<int:attachment_id>", auth="public", methods=["POST"], type="http", csrf=False
|
||||
)
|
||||
def editor_callback(self, attachment_id, oo_security_token=None, access_token=None):
|
||||
_logger.info("POST /eurooffice/editor/callback/%s", attachment_id)
|
||||
response_json = {"error": 0}
|
||||
|
||||
try:
|
||||
body = request.get_json_data()
|
||||
user = self.get_user_from_token(oo_security_token)
|
||||
attachment = self.get_attachment(attachment_id, user)
|
||||
if not attachment:
|
||||
_logger.warning("POST /eurooffice/editor/callback/%s - attachment not found", attachment_id)
|
||||
raise Exception("attachment not found")
|
||||
|
||||
attachment.validate_access(access_token)
|
||||
attachment.check_access_rights("write")
|
||||
|
||||
if jwt_utils.is_jwt_enabled(request.env):
|
||||
token = body.get("token")
|
||||
|
||||
if not token:
|
||||
token = request.httprequest.headers.get(config_utils.get_jwt_header(request.env))
|
||||
if token:
|
||||
token = token[len("Bearer ") :]
|
||||
|
||||
if not token:
|
||||
_logger.warning("POST /eurooffice/editor/callback/%s - JWT token missing", attachment_id)
|
||||
raise Exception("expected JWT")
|
||||
|
||||
body = jwt_utils.decode_token(request.env, token)
|
||||
if body.get("payload"):
|
||||
body = body["payload"]
|
||||
|
||||
status = body["status"]
|
||||
_logger.info("POST /eurooffice/editor/callback/%s - status: %s", attachment_id, status)
|
||||
|
||||
if (status == 2) | (status == 3): # mustsave, corrupted
|
||||
file_url = url_utils.replace_public_url_to_internal(request.env, body.get("url"))
|
||||
datas = eurooffice_urlopen(file_url).read()
|
||||
if attachment.res_model == "documents.document":
|
||||
datas = base64.encodebytes(datas)
|
||||
document = request.env["documents.document"].browse(int(attachment.res_id))
|
||||
document.with_user(user).write(
|
||||
{
|
||||
"name": attachment.name,
|
||||
"datas": datas,
|
||||
"mimetype": guess_type(file_url)[0],
|
||||
}
|
||||
)
|
||||
|
||||
attachment_version = attachment.oo_attachment_version
|
||||
attachment.write({"oo_attachment_version": attachment_version + 1})
|
||||
previous_attachments = (
|
||||
request.env["ir.attachment"]
|
||||
.sudo()
|
||||
.search(
|
||||
[
|
||||
("res_model", "=", "documents.document"),
|
||||
("res_id", "=", document.id),
|
||||
("oo_attachment_version", "=", attachment_version),
|
||||
],
|
||||
limit=1,
|
||||
)
|
||||
)
|
||||
name = attachment.name
|
||||
filename, ext = os.path.splitext(attachment.name)
|
||||
name = f"{filename} ({attachment_version}){ext}"
|
||||
previous_attachments.sudo().write({"name": name})
|
||||
else:
|
||||
attachment.write({"raw": datas, "mimetype": guess_type(file_url)[0]})
|
||||
|
||||
_logger.info("POST /eurooffice/editor/callback/%s - file saved successfully", attachment_id)
|
||||
|
||||
except Exception as ex:
|
||||
_logger.error("POST /eurooffice/editor/callback/%s - error: %s", attachment_id, str(ex))
|
||||
response_json["error"] = 1
|
||||
response_json["message"] = http.serialize_exception(ex)
|
||||
|
||||
return request.make_response(
|
||||
data=json.dumps(response_json),
|
||||
status=500 if response_json["error"] == 1 else 200,
|
||||
headers=[("Content-Type", "application/json")],
|
||||
)
|
||||
|
||||
def prepare_editor_values(self, attachment, access_token, can_write):
|
||||
_logger.info("prepare_editor_values - attachment: %s", attachment.id)
|
||||
data = attachment.read(["id", "checksum", "public", "name", "access_token"])[0]
|
||||
key = str(data["id"]) + str(data["checksum"])
|
||||
docserver_url = config_utils.get_doc_server_public_url(request.env)
|
||||
odoo_url = config_utils.get_base_or_odoo_url(request.env)
|
||||
|
||||
filename = self.filter_xss(data["name"])
|
||||
|
||||
security_token = jwt_utils.encode_payload(
|
||||
request.env, {"id": request.env.user.id}, config_utils.get_internal_jwt_secret(request.env)
|
||||
)
|
||||
security_token = security_token.decode("utf-8") if isinstance(security_token, bytes) else security_token
|
||||
access_token = access_token.decode("utf-8") if isinstance(access_token, bytes) else access_token
|
||||
path_part = (
|
||||
str(data["id"])
|
||||
+ "?oo_security_token="
|
||||
+ security_token
|
||||
+ ("&access_token=" + access_token if access_token else "")
|
||||
+ "&shardkey="
|
||||
+ key
|
||||
)
|
||||
|
||||
document_type = file_utils.get_file_type(filename)
|
||||
|
||||
is_mobile = bool(re.search(_mobile_regex, request.httprequest.headers.get("User-Agent"), re.IGNORECASE))
|
||||
|
||||
root_config = {
|
||||
"width": "100%",
|
||||
"height": "100%",
|
||||
"type": "mobile" if is_mobile else "desktop",
|
||||
"documentType": document_type,
|
||||
"document": {
|
||||
"title": filename,
|
||||
"url": odoo_url + "eurooffice/file/content/" + path_part,
|
||||
"fileType": file_utils.get_file_ext(filename),
|
||||
"key": key,
|
||||
"permissions": {},
|
||||
},
|
||||
"editorConfig": {
|
||||
"lang": request.env.user.lang,
|
||||
"user": {"id": str(request.env.user.id), "name": request.env.user.name},
|
||||
"customization": {},
|
||||
},
|
||||
}
|
||||
|
||||
if can_write:
|
||||
root_config["editorConfig"]["callbackUrl"] = odoo_url + "eurooffice/editor/callback/" + path_part
|
||||
|
||||
if attachment.res_model != "documents.document":
|
||||
root_config["editorConfig"]["mode"] = "edit" if can_write else "view"
|
||||
root_config["document"]["permissions"]["edit"] = can_write
|
||||
elif attachment.res_model == "documents.document":
|
||||
root_config = self.get_documents_permissions(attachment, can_write, root_config)
|
||||
|
||||
if jwt_utils.is_jwt_enabled(request.env):
|
||||
root_config["token"] = jwt_utils.encode_payload(request.env, root_config)
|
||||
|
||||
_logger.info("prepare_editor_values - success: %s", attachment.id)
|
||||
return {
|
||||
"docTitle": filename,
|
||||
"docIcon": f"/eurooffice_odoo/static/description/editor_icons/{document_type}.ico",
|
||||
"docApiJS": docserver_url + "web-apps/apps/api/documents/api.js",
|
||||
"editorConfig": markupsafe.Markup(json.dumps(root_config)),
|
||||
}
|
||||
|
||||
def get_documents_permissions(self, attachment, can_write, root_config): # noqa: C901
|
||||
_logger.info("get_documents_permissions - attachment: %s", attachment.id)
|
||||
role = None
|
||||
document = request.env["documents.document"].browse(int(attachment.res_id))
|
||||
|
||||
if document.attachment_id.id != attachment.id: # history files
|
||||
root_config["editorConfig"]["mode"] = "view"
|
||||
root_config["document"]["permissions"]["edit"] = False
|
||||
return root_config
|
||||
|
||||
if document.owner_id.id == request.env.user.id:
|
||||
if can_write:
|
||||
role = "editor"
|
||||
else:
|
||||
role = "viewer"
|
||||
else:
|
||||
access_user = request.env["eurooffice.odoo.documents.access.user"].search(
|
||||
[("document_id", "=", document.id), ("user_id", "=", request.env.user.id)], limit=1
|
||||
)
|
||||
if access_user:
|
||||
if access_user.role == "none":
|
||||
raise AccessError(_("User has no read access rights to open this document"))
|
||||
elif access_user.role == "editor" and can_write:
|
||||
role = "editor"
|
||||
else:
|
||||
role = access_user.role
|
||||
if not role:
|
||||
access = request.env["eurooffice.odoo.documents.access"].search(
|
||||
[("document_id", "=", document.id)], limit=1
|
||||
)
|
||||
if access:
|
||||
if access.internal_users == "none":
|
||||
raise AccessError(_("User has no read access rights to open this document"))
|
||||
elif access.internal_users == "editor" and can_write:
|
||||
role = "editor"
|
||||
else:
|
||||
role = access.internal_users
|
||||
else:
|
||||
role = "viewer" # default role for internal users
|
||||
|
||||
if not role:
|
||||
raise AccessError(_("User has no read access rights to open this document"))
|
||||
elif role == "viewer":
|
||||
root_config["editorConfig"]["mode"] = "view"
|
||||
root_config["document"]["permissions"]["edit"] = False
|
||||
elif role == "commenter":
|
||||
root_config["editorConfig"]["mode"] = "edit"
|
||||
root_config["document"]["permissions"]["edit"] = False
|
||||
root_config["document"]["permissions"]["comment"] = True
|
||||
elif role == "reviewer":
|
||||
root_config["editorConfig"]["mode"] = "edit"
|
||||
root_config["document"]["permissions"]["edit"] = False
|
||||
root_config["document"]["permissions"]["review"] = True
|
||||
elif role == "editor":
|
||||
root_config["editorConfig"]["mode"] = "edit"
|
||||
root_config["document"]["permissions"]["edit"] = True
|
||||
elif role == "form_filling":
|
||||
root_config["editorConfig"]["mode"] = "edit"
|
||||
root_config["document"]["permissions"]["edit"] = False
|
||||
root_config["document"]["permissions"]["fillForms"] = True
|
||||
elif role == "custom_filter":
|
||||
root_config["editorConfig"]["mode"] = "edit"
|
||||
root_config["document"]["permissions"]["edit"] = True
|
||||
root_config["document"]["permissions"]["modifyFilter"] = False
|
||||
|
||||
_logger.info("get_documents_permissions - role: %s", role)
|
||||
return root_config
|
||||
|
||||
def get_attachment(self, attachment_id, user=None):
|
||||
IrAttachment = request.env["ir.attachment"]
|
||||
if user:
|
||||
IrAttachment = IrAttachment.with_user(user)
|
||||
try:
|
||||
attachment = IrAttachment.browse([attachment_id]).exists().ensure_one()
|
||||
_logger.debug("get_attachment - found: %s", attachment_id)
|
||||
return attachment
|
||||
except Exception:
|
||||
_logger.debug("get_attachment - not found: %s", attachment_id)
|
||||
return None
|
||||
|
||||
def get_user_from_token(self, token):
|
||||
_logger.info("get_user_from_token")
|
||||
if not token:
|
||||
raise Exception("missing security token")
|
||||
|
||||
user_id = jwt_utils.decode_token(request.env, token, config_utils.get_internal_jwt_secret(request.env))["id"]
|
||||
user = request.env["res.users"].sudo().browse(user_id).exists().ensure_one()
|
||||
_logger.info("get_user_from_token - user: %s", user.name)
|
||||
return user
|
||||
|
||||
def filter_xss(self, text):
|
||||
allowed_symbols = set(string.digits + " _-,.:@+")
|
||||
text = "".join(char for char in text if char.isalpha() or char in allowed_symbols)
|
||||
return text
|
||||
|
||||
def _check_document_access(self, document):
|
||||
if document.is_locked and document.lock_uid.id != request.env.user.id:
|
||||
_logger.error("Document is locked by another user")
|
||||
raise Forbidden()
|
||||
try:
|
||||
document.check_access_rule("read")
|
||||
except AccessError as e:
|
||||
_logger.error("User has no read access rights to open this document")
|
||||
raise Forbidden() from e
|
||||
|
||||
@http.route("/eurooffice/preview", type="http", auth="user")
|
||||
def preview(self, url, title):
|
||||
_logger.info("GET /eurooffice/preview - url: %s, title: %s", url, title)
|
||||
docserver_url = config_utils.get_doc_server_public_url(request.env)
|
||||
odoo_url = config_utils.get_base_or_odoo_url(request.env)
|
||||
|
||||
if url and url.startswith("/eurooffice/file/content/"):
|
||||
internal_jwt_secret = config_utils.get_internal_jwt_secret(request.env)
|
||||
user_id = request.env.user.id
|
||||
security_token = jwt_utils.encode_payload(request.env, {"id": user_id}, internal_jwt_secret)
|
||||
security_token = security_token.decode("utf-8") if isinstance(security_token, bytes) else security_token
|
||||
url = url + "?oo_security_token=" + security_token
|
||||
|
||||
if url and not url.startswith(("http://", "https://")):
|
||||
url = odoo_url.rstrip("/") + "/" + url.lstrip("/")
|
||||
|
||||
document_type = file_utils.get_file_type(title)
|
||||
key = str(int(time.time() * 1000))
|
||||
|
||||
root_config = {
|
||||
"width": "100%",
|
||||
"height": "100%",
|
||||
"type": "embedded",
|
||||
"documentType": document_type,
|
||||
"document": {
|
||||
"title": self.filter_xss(title),
|
||||
"url": url,
|
||||
"fileType": file_utils.get_file_ext(title),
|
||||
"key": key,
|
||||
"permissions": {"edit": False},
|
||||
},
|
||||
"editorConfig": {
|
||||
"mode": "view",
|
||||
"lang": request.env.user.lang,
|
||||
"user": {"id": str(request.env.user.id), "name": request.env.user.name},
|
||||
"customization": {},
|
||||
},
|
||||
}
|
||||
|
||||
if jwt_utils.is_jwt_enabled(request.env):
|
||||
root_config["token"] = jwt_utils.encode_payload(request.env, root_config)
|
||||
|
||||
_logger.info("GET /eurooffice/preview - success")
|
||||
return request.render(
|
||||
"eurooffice_odoo.eurooffice_editor",
|
||||
{
|
||||
"docTitle": title,
|
||||
"docIcon": f"/eurooffice_odoo/static/description/editor_icons/{document_type}.ico",
|
||||
"docApiJS": docserver_url + "web-apps/apps/api/documents/api.js",
|
||||
"editorConfig": markupsafe.Markup(json.dumps(root_config)),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class EuroOfficeOFormsDocumentsController(http.Controller):
|
||||
CMSOFORMS_URL = "https://cmsoforms.eurooffice.com/api"
|
||||
OFORMS_URL = "https://oforms.eurooffice.com/dashboard/api"
|
||||
TIMEOUT = 20 # seconds
|
||||
|
||||
def _make_api_request(self, url, endpoint, params=None, method="GET", data=None, files=None):
|
||||
url = f"{url}/{endpoint}"
|
||||
try:
|
||||
if method == "GET":
|
||||
response = requests.get(url, params=params, timeout=self.TIMEOUT)
|
||||
elif method == "POST":
|
||||
response = requests.post(url, data=data, files=files, timeout=self.TIMEOUT)
|
||||
else:
|
||||
raise ValueError(f"Unsupported HTTP method: {method}")
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
_logger.error(f"API request failed to {url}: {str(e)}")
|
||||
raise UserError(f"Failed to connect to Forms API: {str(e)}") from e
|
||||
|
||||
@http.route("/eurooffice/oforms/locales", type="json", auth="user")
|
||||
def get_oform_locales(self):
|
||||
url = self.OFORMS_URL
|
||||
endpoint = "i18n/locales"
|
||||
response = self._make_api_request(url, endpoint)
|
||||
locales = response if isinstance(response, list) else []
|
||||
return {
|
||||
"data": [
|
||||
{
|
||||
"code": locale.get("code"),
|
||||
"name": locale.get("name", locale.get("code")),
|
||||
}
|
||||
for locale in locales
|
||||
]
|
||||
}
|
||||
|
||||
@http.route("/eurooffice/oforms/category-types", type="json", auth="user")
|
||||
def get_category_types(self, locale="en"):
|
||||
url = self.OFORMS_URL
|
||||
endpoint = "menu-translations"
|
||||
params = {"populate": "*", "locale": locale}
|
||||
response = self._make_api_request(url, endpoint, params=params)
|
||||
|
||||
categories = []
|
||||
for item in response.get("data", []):
|
||||
attrs = item.get("attributes", {})
|
||||
localized_name = next(
|
||||
(
|
||||
loc["attributes"]["name"]
|
||||
for loc in attrs.get("localizations", {}).get("data", [])
|
||||
if loc["attributes"]["locale"] == locale
|
||||
),
|
||||
None,
|
||||
) or attrs.get("name", "")
|
||||
|
||||
categories.append(
|
||||
{
|
||||
"id": item["id"],
|
||||
"categoryId": attrs.get("categoryId"),
|
||||
"name": localized_name,
|
||||
"type": attrs.get("categoryTitle"),
|
||||
}
|
||||
)
|
||||
|
||||
return {"data": categories}
|
||||
|
||||
@http.route("/eurooffice/oforms/subcategories", type="json", auth="user")
|
||||
def get_subcategories(self, category_type, locale="en"):
|
||||
url = self.OFORMS_URL
|
||||
endpoint_map = {"categorie": "categories", "type": "types", "compilation": "compilations"}
|
||||
|
||||
if category_type not in endpoint_map:
|
||||
return {"data": []}
|
||||
|
||||
endpoint = f"{endpoint_map[category_type]}"
|
||||
params = {"populate": "*", "locale": locale}
|
||||
response = self._make_api_request(url, endpoint, params=params)
|
||||
|
||||
subcategories = []
|
||||
for item in response.get("data", []):
|
||||
attrs = item.get("attributes", {})
|
||||
localized_name = next(
|
||||
(
|
||||
loc["attributes"][category_type]
|
||||
for loc in attrs.get("localizations", {}).get("data", [])
|
||||
if loc["attributes"]["locale"] == locale
|
||||
),
|
||||
None,
|
||||
) or attrs.get(category_type, "")
|
||||
|
||||
subcategories.append(
|
||||
{
|
||||
"id": item["id"],
|
||||
"name": localized_name,
|
||||
"category_type": endpoint_map[category_type],
|
||||
}
|
||||
)
|
||||
|
||||
return {"data": subcategories}
|
||||
|
||||
@http.route("/eurooffice/oforms", type="json", auth="user")
|
||||
def get_oforms(self, params=None, **kwargs):
|
||||
url = self.CMSOFORMS_URL
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
api_params = {
|
||||
"fields[0]": "name_form",
|
||||
"fields[1]": "updatedAt",
|
||||
"fields[2]": "description_card",
|
||||
"fields[3]": "template_desc",
|
||||
"filters[form_exts][ext][$eq]": params.get("type", "pdf"),
|
||||
"locale": params.get("locale", "en"),
|
||||
"pagination[page]": params.get("pagination[page]", 1),
|
||||
"pagination[pageSize]": params.get("pagination[pageSize]", 12),
|
||||
"populate[card_prewiew][fields][0]": "url",
|
||||
"populate[template_image][fields][0]": "formats",
|
||||
"populate[file_oform][fields][0]": "url",
|
||||
"populate[file_oform][fields][1]": "name",
|
||||
"populate[file_oform][fields][2]": "ext",
|
||||
"populate[file_oform][filters][url][$endsWith]": "." + params.get("type", "pdf"),
|
||||
}
|
||||
|
||||
if "filters[name_form][$containsi]" in params:
|
||||
api_params["filters[name_form][$containsi]"] = params["filters[name_form][$containsi]"]
|
||||
|
||||
if "filters[categories][$eq]" in params:
|
||||
api_params["filters[categories][id][$eq]"] = params["filters[categories][$eq]"]
|
||||
elif "filters[types][$eq]" in params:
|
||||
api_params["filters[types][id][$eq]"] = params["filters[types][$eq]"]
|
||||
elif "filters[compilations][$eq]" in params:
|
||||
api_params["filters[compilations][id][$eq]"] = params["filters[compilations][$eq]"]
|
||||
|
||||
response = self._make_api_request(url, "oforms", params=api_params)
|
||||
return response
|
||||
Loading…
Add table
Add a link
Reference in a new issue