oca-technical/odoo-bringout-oca-automation-automation_oca/automation_oca/models/automation_configuration_step.py
2025-08-29 15:43:03 +02:00

566 lines
21 KiB
Python

# Copyright 2024 Dixmit
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
import json
from collections import defaultdict
import babel.dates
from dateutil.relativedelta import relativedelta
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from odoo.osv import expression
from odoo.tools import get_lang
from odoo.tools.safe_eval import safe_eval
class AutomationConfigurationStep(models.Model):
_name = "automation.configuration.step"
_description = "Automation Steps"
_order = "trigger_interval_hours ASC"
name = fields.Char(required=True)
configuration_id = fields.Many2one(
"automation.configuration", required=True, auto_join=True
)
domain = fields.Char(
required=True, default="[]", help="Filter to apply specifically"
)
apply_parent_domain = fields.Boolean(default=True)
applied_domain = fields.Char(
compute="_compute_applied_domain",
recursive=True,
)
parent_id = fields.Many2one("automation.configuration.step", ondelete="cascade")
model_id = fields.Many2one(related="configuration_id.model_id")
model = fields.Char(related="model_id.model")
child_ids = fields.One2many(
"automation.configuration.step", inverse_name="parent_id"
)
step_type = fields.Selection(
[("mail", "Mail"), ("action", "Server Action"), ("activity", "Activity")],
required=True,
default="mail",
)
step_icon = fields.Char(compute="_compute_step_info")
step_name = fields.Char(compute="_compute_step_info")
trigger_date_kind = fields.Selection(
[
("offset", "Offset"),
("date", "Force on Record Date"),
],
required=True,
default="offset",
)
trigger_interval_hours = fields.Integer(
compute="_compute_trigger_interval_hours", store=True
)
trigger_interval = fields.Integer(
help="""Set a negative time trigger if you want the step to be executed
immediately, in parallel with the previous step, without waiting for it to
finish."""
)
trigger_interval_type = fields.Selection(
[("hours", "Hour(s)"), ("days", "Day(s)")], required=True, default="hours"
)
trigger_date_field_id = fields.Many2one(
"ir.model.fields",
domain="[('model_id', '=', model_id), ('ttype', 'in', ['date', 'datetime'])]",
)
trigger_date_field = fields.Char(related="trigger_date_field_id.field_description")
allow_expiry = fields.Boolean(compute="_compute_allow_expiry")
expiry = fields.Boolean(compute="_compute_expiry", store=True, readonly=False)
expiry_interval = fields.Integer()
expiry_interval_type = fields.Selection(
[("hours", "Hour(s)"), ("days", "Day(s)")], required=True, default="hours"
)
trigger_type = fields.Selection(
selection="_trigger_type_selection",
required=True,
default="start",
)
trigger_child_types = fields.Json(compute="_compute_trigger_child_types")
trigger_type_data = fields.Json(compute="_compute_trigger_type_data")
mail_author_id = fields.Many2one(
"res.partner", required=True, default=lambda r: r.env.user.id
)
mail_template_id = fields.Many2one(
"mail.template", domain="[('model_id', '=', model_id)]"
)
server_action_id = fields.Many2one(
"ir.actions.server", domain="[('model_id', '=', model_id)]"
)
server_context = fields.Text(default="{}")
activity_type_id = fields.Many2one(
"mail.activity.type",
string="Activity",
domain="['|', ('res_model', '=', False), ('res_model', '=', model)]",
compute="_compute_activity_info",
readonly=False,
store=True,
ondelete="restrict",
)
activity_summary = fields.Char(
"Summary", compute="_compute_activity_info", readonly=False, store=True
)
activity_note = fields.Html(
"Note", compute="_compute_activity_info", readonly=False, store=True
)
activity_date_deadline_range = fields.Integer(
string="Due Date In",
compute="_compute_activity_info",
readonly=False,
store=True,
)
activity_date_deadline_range_type = fields.Selection(
[("days", "Day(s)"), ("weeks", "Week(s)"), ("months", "Month(s)")],
string="Due type",
default="days",
compute="_compute_activity_info",
readonly=False,
store=True,
)
activity_user_type = fields.Selection(
[("specific", "Specific User"), ("generic", "Generic User From Record")],
compute="_compute_activity_info",
readonly=False,
store=True,
help="""Use 'Specific User' to always assign the same user on the next activity.
Use 'Generic User From Record' to specify the field name of the user
to choose on the record.""",
)
activity_user_id = fields.Many2one(
"res.users",
string="Responsible",
compute="_compute_activity_info",
readonly=False,
store=True,
)
activity_user_field_id = fields.Many2one(
"ir.model.fields",
"User field name",
compute="_compute_activity_info",
readonly=False,
store=True,
)
parent_position = fields.Integer(
compute="_compute_parent_position", recursive=True, store=True
)
graph_data = fields.Json(compute="_compute_graph_data")
graph_done = fields.Integer(compute="_compute_total_graph_data")
graph_error = fields.Integer(compute="_compute_total_graph_data")
@api.constrains("server_context")
def _check_server_context(self):
for record in self:
if record.server_context:
try:
json.loads(record.server_context)
except Exception as e:
raise ValidationError(_("Server Context is not wellformed")) from e
@api.onchange("trigger_type")
def _onchange_trigger_type(self):
if self.trigger_type == "start":
# Theoretically, only start allows no parent, so we will keep it this way
self.parent_id = False
########################################
# Graph computed fields ################
########################################
@api.depends()
def _compute_graph_data(self):
total = self.env["automation.record.step"].read_group(
[
("configuration_step_id", "in", self.ids),
(
"processed_on",
">=",
fields.Date.context_today(self) + relativedelta(days=-14),
),
("is_test", "=", False),
],
["configuration_step_id"],
["configuration_step_id", "processed_on:day"],
lazy=False,
)
done = self.env["automation.record.step"].read_group(
[
("configuration_step_id", "in", self.ids),
(
"processed_on",
">=",
fields.Date.context_today(self) + relativedelta(days=-14),
),
("state", "=", "done"),
("is_test", "=", False),
],
["configuration_step_id"],
["configuration_step_id", "processed_on:day"],
lazy=False,
)
now = fields.Datetime.now()
date_map = {
babel.dates.format_datetime(
now + relativedelta(days=i - 14),
format="dd MMM yyy",
tzinfo=self._context.get("tz", None),
locale=get_lang(self.env).code,
): 0
for i in range(0, 15)
}
result = defaultdict(
lambda: {"done": date_map.copy(), "error": date_map.copy()}
)
for line in total:
result[line["configuration_step_id"][0]]["error"][
line["processed_on:day"]
] += line["__count"]
for line in done:
result[line["configuration_step_id"][0]]["done"][
line["processed_on:day"]
] += line["__count"]
result[line["configuration_step_id"][0]]["error"][
line["processed_on:day"]
] -= line["__count"]
for record in self:
graph_info = dict(result[record.id])
record.graph_data = {
"error": [
{"x": key[:-5], "y": value, "name": key}
for (key, value) in graph_info["error"].items()
],
"done": [
{"x": key[:-5], "y": value, "name": key}
for (key, value) in graph_info["done"].items()
],
}
@api.depends()
def _compute_total_graph_data(self):
for record in self:
record.graph_done = self.env["automation.record.step"].search_count(
[
("configuration_step_id", "=", record.id),
("state", "=", "done"),
("is_test", "=", False),
]
)
record.graph_error = self.env["automation.record.step"].search_count(
[
("configuration_step_id", "=", record.id),
("state", "in", ["expired", "rejected", "error", "cancel"]),
("is_test", "=", False),
]
)
@api.depends("step_type")
def _compute_activity_info(self):
for to_reset in self.filtered(lambda act: act.step_type != "activity"):
to_reset.activity_summary = False
to_reset.activity_note = False
to_reset.activity_date_deadline_range = False
to_reset.activity_date_deadline_range_type = False
to_reset.activity_user_type = False
to_reset.activity_user_id = False
to_reset.activity_user_field_id = False
for activity in self.filtered(lambda act: act.step_type == "activity"):
if not activity.activity_date_deadline_range_type:
activity.activity_date_deadline_range_type = "days"
if not activity.activity_user_id:
activity.activity_user_id = self.env.user.id
@api.depends("trigger_interval", "trigger_interval_type")
def _compute_trigger_interval_hours(self):
for record in self:
record.trigger_interval_hours = record._get_trigger_interval_hours()
def _get_trigger_interval_hours(self):
if self.trigger_interval_type == "days":
return self.trigger_interval * 24
return self.trigger_interval
@api.depends("parent_id", "parent_id.parent_position", "trigger_type")
def _compute_parent_position(self):
for record in self:
record.parent_position = (
(record.parent_id.parent_position + 1) if record.parent_id else 0
)
@api.depends(
"domain",
"configuration_id.domain",
"parent_id",
"parent_id.applied_domain",
"apply_parent_domain",
)
def _compute_applied_domain(self):
for record in self:
eval_context = record.configuration_id._get_eval_context()
if record.apply_parent_domain:
record.applied_domain = expression.AND(
[
safe_eval(record.domain, eval_context),
safe_eval(
(record.parent_id and record.parent_id.applied_domain)
or record.configuration_id.domain,
eval_context,
),
]
)
else:
record.applied_domain = safe_eval(record.domain, eval_context)
@api.model
def _trigger_type_selection(self):
return [
(trigger_id, trigger.get("name", trigger_id))
for trigger_id, trigger in self._trigger_types().items()
]
@api.model
def _trigger_types(self):
"""
This function will return a dictionary that map trigger_types to its configurations.
Each trigger_type can contain:
- name (Required field)
- step type: List of step types that succeed after this.
If it is false, it will work for all step types,
otherwise only for the ones on the list
- color: Color of the icon
- icon: Icon to show
- message_configuration: Message to show on the step configuration
- allow_expiry: True if it allows expiration of activity
- message: Message to show on the record if expected is not date defined
"""
return {
"start": {
"name": _("start of workflow"),
"step_type": [],
"message_configuration": False,
"message": False,
"allow_parent": True,
},
"after_step": {
"name": _("execution of another step"),
"color": "text-success",
"icon": "fa fa-code-fork fa-rotate-180 fa-flip-vertical",
"message_configuration": False,
"message": False,
},
"mail_open": {
"name": _("Mail opened"),
"allow_expiry": True,
"step_type": ["mail"],
"color": "text-success",
"icon": "fa fa-envelope-open-o",
"message_configuration": _("Opened after"),
"message": _("Not opened yet"),
},
"mail_not_open": {
"name": _("Mail not opened"),
"step_type": ["mail"],
"color": "text-danger",
"icon": "fa fa-envelope-open-o",
"message_configuration": _("Not opened within"),
"message": False,
},
"mail_reply": {
"name": _("Mail replied"),
"allow_expiry": True,
"step_type": ["mail"],
"color": "text-success",
"icon": "fa fa-reply",
"message_configuration": _("Replied after"),
"message": _("Not replied yet"),
},
"mail_not_reply": {
"name": _("Mail not replied"),
"step_type": ["mail"],
"color": "text-danger",
"icon": "fa fa-reply",
"message_configuration": _("Not replied within"),
"message": False,
},
"mail_click": {
"name": _("Mail clicked"),
"allow_expiry": True,
"step_type": ["mail"],
"color": "text-success",
"icon": "fa fa-hand-pointer-o",
"message_configuration": _("Clicked after"),
"message": _("Not clicked yet"),
},
"mail_not_clicked": {
"name": _("Mail not clicked"),
"step_type": ["mail"],
"color": "text-danger",
"icon": "fa fa-hand-pointer-o",
"message_configuration": _("Not clicked within"),
"message": False,
},
"mail_bounce": {
"name": _("Mail bounced"),
"allow_expiry": True,
"step_type": ["mail"],
"color": "text-danger",
"icon": "fa fa-exclamation-circle",
"message_configuration": _("Bounced after"),
"message": _("Not bounced yet"),
},
"activity_done": {
"name": _("Activity has been finished"),
"step_type": ["activity"],
"color": "text-success",
"icon": "fa fa-clock-o",
"message_configuration": _("After finished"),
"message": _("Activity not done"),
},
"activity_cancel": {
"name": _("Activity has been cancelled"),
"step_type": ["activity"],
"color": "text-warning",
"icon": "fa fa-ban",
"message_configuration": _("After finished"),
"message": _("Activity not cancelled"),
},
"activity_not_done": {
"name": _("Activity has not been finished"),
"allow_expiry": True,
"step_type": ["activity"],
"color": "text-danger",
"icon": "fa fa-clock-o",
"message_configuration": _("Not finished within"),
"message": False,
},
}
@api.model
def _step_icons(self):
"""
This function will return a dictionary that maps step types and icons
"""
return {
"mail": "fa fa-envelope",
"activity": "fa fa-clock-o",
"action": "fa fa-cogs",
}
@api.depends("step_type")
def _compute_step_info(self):
step_icons = self._step_icons()
step_name_map = dict(self._fields["step_type"].selection)
for record in self:
record.step_icon = step_icons.get(record.step_type, "")
record.step_name = step_name_map.get(record.step_type, "")
@api.depends("trigger_type")
def _compute_trigger_type_data(self):
trigger_types = self._trigger_types()
for record in self:
record.trigger_type_data = trigger_types[record.trigger_type]
@api.depends("trigger_type")
def _compute_allow_expiry(self):
trigger_types = self._trigger_types()
for record in self:
record.allow_expiry = trigger_types[record.trigger_type].get(
"allow_expiry", False
)
@api.depends("trigger_type")
def _compute_expiry(self):
trigger_types = self._trigger_types()
for record in self:
record.expiry = (
trigger_types[record.trigger_type].get("allow_expiry", False)
and record.expiry
)
@api.depends("step_type")
def _compute_trigger_child_types(self):
trigger_types = self._trigger_types()
for record in self:
trigger_child_types = {}
for trigger_type_id, trigger_type in trigger_types.items():
if "step_type" not in trigger_type:
# All are allowed
trigger_child_types[trigger_type_id] = trigger_type
elif record.step_type in trigger_type["step_type"]:
trigger_child_types[trigger_type_id] = trigger_type
record.trigger_child_types = trigger_child_types
def _check_configuration(self):
trigger_conf = self._trigger_types()[self.trigger_type]
if not self.parent_id and not trigger_conf.get("allow_parent"):
raise ValidationError(
_("%s configurations needs a parent") % trigger_conf["name"]
)
if (
self.parent_id
and "step_type" in trigger_conf
and self.parent_id.step_type not in trigger_conf["step_type"]
):
step_types = dict(self._fields["step_type"].selection)
raise ValidationError(
_("To use a %(name)s trigger type we need a parent of type %(parents)s")
% {
"name": trigger_conf["name"],
"parents": ",".join(
[
name
for step_type, name in step_types.items()
if step_type in trigger_conf["step_type"]
]
),
}
)
@api.constrains("parent_id", "parent_id.step_type", "trigger_type")
def _check_parent_configuration(self):
for record in self:
record._check_configuration()
def _get_record_activity_scheduled_date(self, record, force=False):
if not force and self.trigger_type in [
"mail_open",
"mail_bounce",
"mail_click",
"mail_not_clicked",
"mail_reply",
"mail_not_reply",
"activity_done",
"activity_cancel",
]:
return False
if (
self.trigger_date_kind == "date"
and self.trigger_date_field_id
and record[self.trigger_date_field_id.name]
):
date = fields.Datetime.to_datetime(record[self.trigger_date_field_id.name])
else:
date = fields.Datetime.now()
return date + relativedelta(
**{self.trigger_interval_type: self.trigger_interval}
)
def _get_expiry_date(self):
if not self.expiry:
return False
return fields.Datetime.now() + relativedelta(
**{self.expiry_interval_type: self.expiry_interval}
)
def _create_record_activity_vals(self, record, **kwargs):
scheduled_date = self._get_record_activity_scheduled_date(record)
do_not_wait = scheduled_date and scheduled_date < fields.Datetime.now()
return {
"configuration_step_id": self.id,
"do_not_wait": do_not_wait,
"expiry_date": self._get_expiry_date(),
"scheduled_date": scheduled_date,
**kwargs,
}