# Part of Odoo. See LICENSE file for full copyright and licensing details. from odoo import api, fields, models, _, tools from odoo.addons.mail.tools.discuss import Store from odoo.tools import email_normalize, email_split, html2plaintext, plaintext2html from odoo.tools.mimetypes import get_extension import json from markupsafe import Markup from pytz import timezone def is_livechat_channel(channel): """Predicate to filter channels for which the channel type is 'livechat'. :returns: Whether the channel is a live chat channel. :rtype: bool """ return channel.channel_type == "livechat" class DiscussChannel(models.Model): """ Chat Session Reprensenting a conversation between users. It extends the base method for anonymous usage. """ _name = 'discuss.channel' _inherit = ['rating.mixin', 'discuss.channel'] channel_type = fields.Selection(selection_add=[('livechat', 'Livechat Conversation')], ondelete={'livechat': 'cascade'}) duration = fields.Float('Duration', compute='_compute_duration', help='Duration of the session in hours') livechat_lang_id = fields.Many2one("res.lang", string="Language", help="Lang of the visitor of the channel.") livechat_end_dt = fields.Datetime( "Session end date", help="Session is closed when either the visitor or the last agent leaves the conversation.", ) livechat_channel_id = fields.Many2one('im_livechat.channel', 'Channel', index='btree_not_null') livechat_operator_id = fields.Many2one('res.partner', string='Operator', index='btree_not_null') livechat_channel_member_history_ids = fields.One2many("im_livechat.channel.member.history", "channel_id") livechat_expertise_ids = fields.Many2many( "im_livechat.expertise", "discuss_channel_im_livechat_expertise_rel", "discuss_channel_id", "im_livechat_expertise_id", store=True, ) livechat_agent_history_ids = fields.One2many( "im_livechat.channel.member.history", string="Agents (History)", compute="_compute_livechat_agent_history_ids", search="_search_livechat_agent_history_ids", ) livechat_bot_history_ids = fields.One2many( "im_livechat.channel.member.history", string="Bots (History)", compute="_compute_livechat_bot_history_ids", search="_search_livechat_bot_history_ids", ) livechat_customer_history_ids = fields.One2many( "im_livechat.channel.member.history", string="Customers (History)", compute="_compute_livechat_customer_history_ids", search="_search_livechat_customer_history_ids", ) livechat_agent_partner_ids = fields.Many2many( "res.partner", "im_livechat_channel_member_history_discuss_channel_agent_rel", string="Agents", compute="_compute_livechat_agent_partner_ids", store=True, ) livechat_bot_partner_ids = fields.Many2many( "res.partner", "im_livechat_channel_member_history_discuss_channel_bot_rel", string="Bots", compute="_compute_livechat_bot_partner_ids", context={"active_test": False}, store=True, ) livechat_customer_partner_ids = fields.Many2many( "res.partner", "im_livechat_channel_member_history_discuss_channel_customer_rel", string="Customers (Partners)", compute="_compute_livechat_customer_partner_ids", store=True, ) livechat_customer_guest_ids = fields.Many2many( "mail.guest", string="Customers (Guests)", compute="_compute_livechat_customer_guest_ids", ) livechat_agent_requesting_help_history = fields.Many2one( "im_livechat.channel.member.history", string="Help Requested (Agent)", compute="_compute_livechat_agent_requesting_help_history", store=True, ) livechat_agent_providing_help_history = fields.Many2one( "im_livechat.channel.member.history", string="Help Provided (Agent)", compute="_compute_livechat_agent_providing_help_history", store=True, ) livechat_note = fields.Html( "Live Chat Note", sanitize_style=True, groups="base.group_user", help="Note about the session, visible to all internal users having access to the session.", ) livechat_status = fields.Selection( selection=[ ("in_progress", "In progress"), ("waiting", "Waiting for customer"), ("need_help", "Looking for help"), ], compute="_compute_livechat_status", groups="base.group_user", readonly=False, store=True, ) livechat_outcome = fields.Selection( [ ("no_answer", "Never Answered"), ("no_agent", "No one Available"), ("no_failure", "Success"), ("escalated", "Escalated"), ], compute="_compute_livechat_outcome", store=True, ) livechat_conversation_tag_ids = fields.Many2many( "im_livechat.conversation.tag", "livechat_conversation_tag_rel", groups="im_livechat.im_livechat_group_user", string="Live Chat Conversation Tags", help="Tags to qualify the conversation.", ) livechat_start_hour = fields.Float( "Session Start Hour", compute="_compute_livechat_start_hour", store=True ) livechat_week_day = fields.Selection( [ ("0", "Monday"), ("1", "Tuesday"), ("2", "Wednesday"), ("3", "Thursday"), ("4", "Friday"), ("5", "Saturday"), ("6", "Sunday"), ], string="Day of the Week", compute="_compute_livechat_week_day", store=True, ) livechat_matches_self_lang = fields.Boolean( compute="_compute_livechat_matches_self_lang", search="_search_livechat_matches_self_lang" ) livechat_matches_self_expertise = fields.Boolean( compute="_compute_livechat_matches_self_expertise", search="_search_livechat_matches_self_expertise", ) chatbot_current_step_id = fields.Many2one('chatbot.script.step', string='Chatbot Current Step') chatbot_message_ids = fields.One2many('chatbot.message', 'discuss_channel_id', string='Chatbot Messages') country_id = fields.Many2one('res.country', string="Country", help="Country of the visitor of the channel") livechat_failure = fields.Selection( selection=[ ("no_answer", "Never Answered"), ("no_agent", "No one Available"), ("no_failure", "No Failure"), ], string="Live Chat Session Failure", ) livechat_is_escalated = fields.Boolean("Is session escalated", compute="_compute_livechat_is_escalated", store=True) rating_last_text = fields.Selection(store=True) _livechat_operator_id = models.Constraint( "CHECK((channel_type = 'livechat' and livechat_operator_id is not null) or (channel_type != 'livechat'))", 'Livechat Operator ID is required for a channel of type livechat.', ) _livechat_end_dt_status_constraint = models.Constraint( "CHECK(livechat_end_dt IS NULL or livechat_status IS NULL)", "Closed Live Chat session should not have a status.", ) _livechat_end_dt_idx = models.Index("(livechat_end_dt) WHERE livechat_end_dt IS NULL") _livechat_failure_idx = models.Index( "(livechat_failure) WHERE livechat_failure IN ('no_answer', 'no_agent')" ) _livechat_is_escalated_idx = models.Index( "(livechat_is_escalated) WHERE livechat_is_escalated IS TRUE" ) _livechat_channel_type_create_date_idx = models.Index( "(channel_type, create_date) WHERE channel_type = 'livechat'" ) def write(self, vals): if "livechat_status" not in vals and "livechat_expertise_ids" not in vals: return super().write(vals) need_help_before = self.filtered(lambda c: c.livechat_status == "need_help") result = super().write(vals) need_help_after = self.filtered(lambda c: c.livechat_status == "need_help") group_livechat_user = self.env.ref("im_livechat.im_livechat_group_user") store = Store(bus_channel=group_livechat_user, bus_subchannel="LOOKING_FOR_HELP") added_need_help = need_help_after - need_help_before removed_need_help = need_help_before - need_help_after store.add(added_need_help) store.add(removed_need_help, ["livechat_status"]) if "livechat_expertise_ids" in vals: store.add(self, Store.Many("livechat_expertise_ids")) if added_need_help or removed_need_help: group_livechat_user._bus_send( "im_livechat.looking_for_help/update", { "added_channel_ids": added_need_help.ids, "removed_channel_ids": removed_need_help.ids, }, subchannel="LOOKING_FOR_HELP", ) store.bus_send() return result @api.depends("livechat_end_dt") def _compute_duration(self): for record in self: end = record.livechat_end_dt or fields.Datetime.now() start = record.create_date or fields.Datetime.now() record.duration = (end - start).total_seconds() / 3600 @api.depends("livechat_end_dt") def _compute_livechat_status(self): for channel in self.filtered(lambda c: c.livechat_end_dt): channel.livechat_status = False @api.depends("livechat_agent_history_ids") def _compute_livechat_is_escalated(self): for channel in self: channel.livechat_is_escalated = len(channel.livechat_agent_history_ids) > 1 @api.depends("livechat_channel_member_history_ids.livechat_member_type") def _compute_livechat_agent_history_ids(self): for channel in self: channel.livechat_agent_history_ids = ( channel.livechat_channel_member_history_ids.filtered( lambda h: h.livechat_member_type == "agent", ) ) @api.depends("livechat_channel_member_history_ids.livechat_member_type") def _compute_livechat_bot_history_ids(self): for channel in self: channel.livechat_bot_history_ids = channel.livechat_channel_member_history_ids.filtered( lambda h: h.livechat_member_type == "bot", ) def _search_livechat_bot_history_ids(self, operator, value): if operator != "in": return NotImplemented bot_history_query = self.env["im_livechat.channel.member.history"]._search( [ ("livechat_member_type", "=", "bot"), ("id", "in", value), ], ) return [("id", "in", bot_history_query.subselect("channel_id"))] @api.depends("livechat_channel_member_history_ids.livechat_member_type") def _compute_livechat_customer_history_ids(self): for channel in self: channel.livechat_customer_history_ids = ( channel.livechat_channel_member_history_ids.filtered( lambda h: h.livechat_member_type == "visitor", ) ) def _search_livechat_customer_history_ids(self, operator, value): if operator != "in": return NotImplemented customer_history_query = self.env["im_livechat.channel.member.history"]._search( [ ("livechat_member_type", "=", "visitor"), ("id", "in", value), ], ) return [("id", "in", customer_history_query.subselect("channel_id"))] @api.depends("livechat_agent_history_ids.partner_id") def _compute_livechat_agent_partner_ids(self): for channel in self: channel.livechat_agent_partner_ids = ( channel.livechat_agent_history_ids.partner_id ) def _search_livechat_agent_history_ids(self, operator, value): if operator not in ("any", "in"): return NotImplemented if operator == "in" and len(value) == 1 and not next(iter(value)): return [ ( "id", "not in", self.env["im_livechat.channel.member.history"] ._search([("livechat_member_type", "=", "agent")]) .subselect("channel_id"), ), ] query = ( self.env["im_livechat.channel.member.history"]._search(value) if isinstance(value, fields.Domain) else value ) agent_history_query = self.env["im_livechat.channel.member.history"]._search( [ ("livechat_member_type", "=", "agent"), ("id", "in", query), ], ) return [("id", "in", agent_history_query.subselect("channel_id"))] @api.depends("livechat_bot_history_ids.partner_id") def _compute_livechat_bot_partner_ids(self): for channel in self: channel.livechat_bot_partner_ids = ( channel.livechat_bot_history_ids.partner_id ) @api.depends("livechat_customer_history_ids.partner_id") def _compute_livechat_customer_partner_ids(self): for channel in self: channel.livechat_customer_partner_ids = ( channel.livechat_customer_history_ids.partner_id ) # @api.depends("livechat_customer_history_ids.guest_id") def _compute_livechat_customer_guest_ids(self): for channel in self: channel.livechat_customer_guest_ids = ( channel.livechat_customer_history_ids.guest_id ) @api.depends("livechat_agent_history_ids") def _compute_livechat_agent_requesting_help_history(self): for channel in self: channel.livechat_agent_requesting_help_history = ( channel.livechat_agent_history_ids.sorted(lambda h: (h.create_date, h.id))[0] if channel.livechat_is_escalated else None ) @api.depends("livechat_agent_history_ids") def _compute_livechat_agent_providing_help_history(self): for channel in self: channel.livechat_agent_providing_help_history = ( channel.livechat_agent_history_ids.sorted( lambda h: (h.create_date, h.id), reverse=True )[0] if channel.livechat_is_escalated else None ) @api.depends("livechat_is_escalated", "livechat_failure") def _compute_livechat_outcome(self): for channel in self: self.livechat_outcome = ( "escalated" if channel.livechat_is_escalated else channel.livechat_failure ) @api.depends_context("user") def _compute_livechat_matches_self_lang(self): for channel in self: channel.livechat_matches_self_lang = ( channel.livechat_lang_id in self.env.user.livechat_lang_ids or channel.livechat_lang_id.code == self.env.user.lang ) def _search_livechat_matches_self_lang(self, operator, value): if operator != "in" or value not in ({True}, {False}): return NotImplemented operator = "in" if value == {True} else "not in" lang_codes = self.env.user.livechat_lang_ids.mapped("code") lang_codes.append(self.env.user.lang) return [("livechat_lang_id.code", operator, lang_codes)] @api.depends_context("user") def _compute_livechat_matches_self_expertise(self): for channel in self: channel.livechat_matches_self_expertise = bool( channel.livechat_expertise_ids & self.env.user.livechat_expertise_ids ) def _search_livechat_matches_self_expertise(self, operator, value): if operator != "in" or value not in ({True}, {False}): return NotImplemented operator = "in" if value == {True} else "not in" return [("livechat_expertise_ids", operator, self.env.user.livechat_expertise_ids.ids)] @api.depends("create_date") def _compute_livechat_start_hour(self): for channel in self: channel.livechat_start_hour = channel.create_date.hour @api.depends("create_date") def _compute_livechat_week_day(self): for channel in self: channel.livechat_week_day = str(channel.create_date.weekday()) def _sync_field_names(self): field_names = super()._sync_field_names() field_names[None].append( Store.One( "livechat_operator_id", self.env["discuss.channel"]._store_livechat_operator_id_fields(), predicate=is_livechat_channel, ), ) field_names["internal_users"].extend( [ Store.Attr("description", predicate=is_livechat_channel), Store.Attr("livechat_note", predicate=is_livechat_channel), Store.Attr("livechat_status", predicate=is_livechat_channel), Store.Many("livechat_expertise_ids", ["name"], predicate=is_livechat_channel), # sudo: internal users having access to the channel can read its tags Store.Many( "livechat_conversation_tag_ids", ["name", "color"], predicate=is_livechat_channel, sudo=True, ), ], ) return field_names def _store_livechat_operator_id_fields(self): """Return the standard fields to include in Store for livechat_operator_id.""" return ["avatar_128", *self.env["res.partner"]._get_store_livechat_username_fields()] def _to_store_defaults(self, target: Store.Target): fields = [ "chatbot_current_step", Store.One("country_id", ["code", "name"], predicate=is_livechat_channel), Store.One( "livechat_lang_id", ["name"], predicate=is_livechat_channel, ), Store.Attr("livechat_end_dt", predicate=is_livechat_channel), # sudo - res.partner: accessing livechat operator is allowed Store.One( "livechat_operator_id", self.env["discuss.channel"]._store_livechat_operator_id_fields(), predicate=is_livechat_channel, sudo=True, ), ] if target.is_internal(self.env): fields.append( Store.One( "livechat_channel_id", ["name"], predicate=is_livechat_channel, sudo=True ) ) fields.extend( [ Store.Attr("description", predicate=is_livechat_channel), Store.Attr("livechat_note", predicate=is_livechat_channel), Store.Attr("livechat_outcome", predicate=is_livechat_channel), Store.Attr("livechat_status", predicate=is_livechat_channel), Store.Many("livechat_expertise_ids", ["name"], predicate=is_livechat_channel), # sudo: internal users having access to the channel can read its tags Store.Many( "livechat_conversation_tag_ids", ["name", "color"], predicate=is_livechat_channel, sudo=True, ), ], ) return super()._to_store_defaults(target) + fields def _to_store(self, store: Store, fields): """Extends the channel header by adding the livechat operator and the 'anonymous' profile""" super()._to_store(store, [f for f in fields if f != "chatbot_current_step"]) if "chatbot_current_step" not in fields: return lang = self.env["chatbot.script"]._get_chatbot_language() for channel in self.filtered(lambda channel: channel.chatbot_current_step_id): # sudo: chatbot.script.step - returning the current script/step of the channel current_step_sudo = channel.chatbot_current_step_id.sudo().with_context(lang=lang) chatbot_script = current_step_sudo.chatbot_script_id step_message = self.env["chatbot.message"] if not current_step_sudo.is_forward_operator: step_message = channel.sudo().chatbot_message_ids.filtered( lambda m: m.script_step_id == current_step_sudo and m.mail_message_id.author_id == chatbot_script.operator_partner_id )[:1] current_step = { "scriptStep": current_step_sudo.id, "message": step_message.mail_message_id.id, "operatorFound": current_step_sudo.is_forward_operator and channel.livechat_operator_id != chatbot_script.operator_partner_id, } store.add(current_step_sudo) store.add(chatbot_script) chatbot_data = { "script": chatbot_script.id, "steps": [current_step], "currentStep": current_step, } store.add(channel, {"chatbot": chatbot_data}) @api.autovacuum def _gc_empty_livechat_sessions(self): hours = 1 # never remove empty session created within the last hour self.env.cr.execute(""" SELECT id as id FROM discuss_channel C WHERE NOT EXISTS ( SELECT 1 FROM mail_message M WHERE M.res_id = C.id AND m.model = 'discuss.channel' ) AND C.channel_type = 'livechat' AND livechat_channel_id IS NOT NULL AND COALESCE(write_date, create_date, (now() at time zone 'UTC'))::timestamp < ((now() at time zone 'UTC') - interval %s)""", ("%s hours" % hours,)) empty_channel_ids = [item['id'] for item in self.env.cr.dictfetchall()] self.browse(empty_channel_ids).unlink() @api.autovacuum def _gc_bot_only_ongoing_sessions(self): """Garbage collect bot-only livechat sessions with no activity for over 1 day.""" stale_sessions = self.search([ ("channel_type", "=", "livechat"), ("livechat_end_dt", "=", False), ("last_interest_dt", "<=", "-1d"), ("livechat_agent_partner_ids", "=", False), ]) stale_sessions.livechat_end_dt = fields.Datetime.now() def execute_command_history(self, **kwargs): self._bus_send( "im_livechat.history_command", {"id": self.id, "partner_id": self.env.user.partner_id.id}, ) def _get_visitor_leave_message(self, operator=False, cancel=False): return _('Visitor left the conversation.') def _close_livechat_session(self, **kwargs): """ Set deactivate the livechat channel and notify (the operator) the reason of closing the session.""" self.ensure_one() if not self.livechat_end_dt: member = self.channel_member_ids.filtered(lambda m: m.is_self) if member: # sudo: discuss.channel.rtc.session - member of current user can leave call member.sudo()._rtc_leave_call() # sudo: discuss.channel - visitor left the conversation, state must be updated self.sudo().livechat_end_dt = fields.Datetime.now() Store(bus_channel=self).add(self, "livechat_end_dt").bus_send() # avoid useless notification if the channel is empty if not self.message_ids: return # Notify that the visitor has left the conversation # sudo: mail.message - posting visitor leave message is allowed self.sudo().message_post( author_id=self.env.ref('base.partner_root').id, body=Markup('
') % self._get_visitor_leave_message(**kwargs), message_type='notification', subtype_xmlid='mail.mt_comment' ) # Rating Mixin def _rating_get_parent_field_name(self): return 'livechat_channel_id' def _email_livechat_transcript(self, email): company = self.env.user.company_id tz = "UTC" # sudo: discuss.channel - access partner's/guest's timezone for customer in self.sudo().livechat_customer_history_ids: customer_tz = customer.partner_id.tz or customer.guest_id.timezone if customer_tz: tz = customer_tz break render_context = { "company": company, "channel": self, "tz": timezone(tz), } mail_body = self.env['ir.qweb']._render('im_livechat.livechat_email_template', render_context, minimal_qcontext=True) mail_body = self.env['mail.render.mixin']._replace_local_links(mail_body) mail = self.env['mail.mail'].sudo().create({ 'subject': _('Conversation with %s', self.livechat_operator_id.user_livechat_username or self.livechat_operator_id.name), 'email_from': company.catchall_formatted or company.email_formatted, 'author_id': self.env.user.partner_id.id, 'email_to': email_split(email)[0], 'body_html': mail_body, }) mail.send() def _attachment_to_html(self, attachment): if attachment.mimetype.startswith("image/"): return Markup( "