mirror of
https://github.com/bringout/oca-ocb-mail.git
synced 2026-04-18 14:22:00 +02:00
Initial commit: Mail packages
This commit is contained in:
commit
4e53507711
1948 changed files with 751201 additions and 0 deletions
347
odoo-bringout-oca-ocb-mass_mailing/mass_mailing/tests/common.py
Normal file
347
odoo-bringout-oca-ocb-mass_mailing/mass_mailing/tests/common.py
Normal file
|
|
@ -0,0 +1,347 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
import datetime
|
||||
import random
|
||||
import re
|
||||
import werkzeug
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from odoo import tools
|
||||
from odoo.addons.link_tracker.tests.common import MockLinkTracker
|
||||
from odoo.addons.mail.tests.common import MailCase, MailCommon, mail_new_test_user
|
||||
from odoo.sql_db import Cursor
|
||||
|
||||
|
||||
class MassMailCase(MailCase, MockLinkTracker):
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# ASSERTS
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def assertMailingStatistics(self, mailing, **kwargs):
|
||||
""" Helper to assert mailing statistics fields. As we have many of them
|
||||
it helps lessening test asserts. """
|
||||
if not kwargs.get('expected'):
|
||||
kwargs['expected'] = len(mailing.mailing_trace_ids)
|
||||
if not kwargs.get('delivered'):
|
||||
kwargs['delivered'] = len(mailing.mailing_trace_ids)
|
||||
for fname in ['scheduled', 'expected', 'sent', 'delivered',
|
||||
'opened', 'replied', 'clicked',
|
||||
'canceled', 'failed', 'bounced']:
|
||||
self.assertEqual(
|
||||
mailing[fname], kwargs.get(fname, 0),
|
||||
'Mailing %s statistics failed: got %s instead of %s' % (fname, mailing[fname], kwargs.get(fname, 0))
|
||||
)
|
||||
|
||||
def assertMailTraces(self, recipients_info, mailing, records,
|
||||
check_mail=True, sent_unlink=False,
|
||||
author=None, mail_links_info=None):
|
||||
""" Check content of traces. Traces are fetched based on a given mailing
|
||||
and records. Their content is compared to recipients_info structure that
|
||||
holds expected information. Links content may be checked, notably to
|
||||
assert shortening or unsubscribe links. Mail.mail records may optionally
|
||||
be checked.
|
||||
|
||||
:param recipients_info: list[{
|
||||
# TRACE
|
||||
'partner': res.partner record (may be empty),
|
||||
'email': email used when sending email (may be empty, computed based on partner),
|
||||
'trace_status': outgoing / sent / open / reply / bounce / error / cancel (sent by default),
|
||||
'record: linked record,
|
||||
# MAIL.MAIL
|
||||
'content': optional content that should be present in mail.mail body_html;
|
||||
'email_to_mail': optional email used for the mail, when different from the
|
||||
one stored on the trace itself;
|
||||
'email_to_recipients': optional, see '_assertMailMail';
|
||||
'failure_type': optional failure reason;
|
||||
}, { ... }]
|
||||
|
||||
:param mailing: a mailing.mailing record from which traces have been
|
||||
generated;
|
||||
:param records: records given to mailing that generated traces. It is
|
||||
used notably to find traces using their IDs;
|
||||
:param check_mail: if True, also check mail.mail records that should be
|
||||
linked to traces;
|
||||
:param sent_unlink: it True, sent mail.mail are deleted and we check gateway
|
||||
output result instead of actual mail.mail records;
|
||||
:param mail_links_info: if given, should follow order of ``recipients_info``
|
||||
and give details about links. See ``assertLinkShortenedHtml`` helper for
|
||||
more details about content to give;
|
||||
:param author: author of sent mail.mail;
|
||||
"""
|
||||
# map trace state to email state
|
||||
state_mapping = {
|
||||
'sent': 'sent',
|
||||
'open': 'sent', # opened implies something has been sent
|
||||
'reply': 'sent', # replied implies something has been sent
|
||||
'error': 'exception',
|
||||
'cancel': 'cancel',
|
||||
'bounce': 'cancel',
|
||||
}
|
||||
|
||||
traces = self.env['mailing.trace'].search([
|
||||
('mass_mailing_id', 'in', mailing.ids),
|
||||
('res_id', 'in', records.ids)
|
||||
])
|
||||
debug_info = '\n'.join(
|
||||
(
|
||||
f'Trace: to {t.email} - state {t.trace_status} - res_id {t.res_id}'
|
||||
for t in traces
|
||||
)
|
||||
)
|
||||
|
||||
# ensure trace coherency
|
||||
self.assertTrue(all(s.model == records._name for s in traces))
|
||||
self.assertEqual(set(s.res_id for s in traces), set(records.ids))
|
||||
|
||||
# check each traces
|
||||
if not mail_links_info:
|
||||
mail_links_info = [None] * len(recipients_info)
|
||||
for recipient_info, link_info, record in zip(recipients_info, mail_links_info, records):
|
||||
partner = recipient_info.get('partner', self.env['res.partner'])
|
||||
email = recipient_info.get('email')
|
||||
email_to_mail = recipient_info.get('email_to_mail') or email
|
||||
email_to_recipients = recipient_info.get('email_to_recipients')
|
||||
status = recipient_info.get('trace_status', 'sent')
|
||||
record = record or recipient_info.get('record')
|
||||
content = recipient_info.get('content')
|
||||
if email is None and partner:
|
||||
email = partner.email_normalized
|
||||
|
||||
recipient_trace = traces.filtered(
|
||||
lambda t: (t.email == email or (not email and not t.email)) and \
|
||||
t.trace_status == status and \
|
||||
(t.res_id == record.id if record else True)
|
||||
)
|
||||
self.assertTrue(
|
||||
len(recipient_trace) == 1,
|
||||
'MailTrace: email %s (recipient %s, status: %s, record: %s): found %s records (1 expected)\n%s' % (
|
||||
email, partner, status, record,
|
||||
len(recipient_trace), debug_info)
|
||||
)
|
||||
self.assertTrue(bool(recipient_trace.mail_mail_id_int))
|
||||
if 'failure_type' in recipient_info or status in ('error', 'cancel', 'bounce'):
|
||||
self.assertEqual(recipient_trace.failure_type, recipient_info['failure_type'])
|
||||
|
||||
if check_mail:
|
||||
if author is None:
|
||||
author = self.env.user.partner_id
|
||||
|
||||
# mail.mail specific values to check
|
||||
fields_values = {'mailing_id': mailing}
|
||||
if recipient_info.get('mail_values'):
|
||||
fields_values.update(recipient_info['mail_values'])
|
||||
if 'failure_reason' in recipient_info:
|
||||
fields_values['failure_reason'] = recipient_info['failure_reason']
|
||||
if 'email_to_mail' in recipient_info:
|
||||
fields_values['email_to'] = recipient_info['email_to_mail']
|
||||
|
||||
# specific for partner: email_formatted is used
|
||||
if partner:
|
||||
if status == 'sent' and sent_unlink:
|
||||
self.assertSentEmail(author, [partner])
|
||||
else:
|
||||
self.assertMailMail(
|
||||
partner, state_mapping[status],
|
||||
author=author,
|
||||
content=content,
|
||||
email_to_recipients=email_to_recipients,
|
||||
fields_values=fields_values,
|
||||
)
|
||||
# specific if email is False -> could have troubles finding it if several falsy traces
|
||||
elif not email and status in ('cancel', 'bounce'):
|
||||
self.assertMailMailWId(
|
||||
recipient_trace.mail_mail_id_int, state_mapping[status],
|
||||
author=author,
|
||||
content=content,
|
||||
email_to_recipients=email_to_recipients,
|
||||
fields_values=fields_values,
|
||||
)
|
||||
else:
|
||||
self.assertMailMailWEmails(
|
||||
[email_to_mail], state_mapping[status],
|
||||
author=author,
|
||||
content=content,
|
||||
email_to_recipients=email_to_recipients,
|
||||
fields_values=fields_values,
|
||||
)
|
||||
|
||||
if link_info:
|
||||
trace_mail = self._find_mail_mail_wrecord(record)
|
||||
for (anchor_id, url, is_shortened, add_link_params) in link_info:
|
||||
link_params = {'utm_medium': 'Email', 'utm_source': mailing.name}
|
||||
if add_link_params:
|
||||
link_params.update(**add_link_params)
|
||||
self.assertLinkShortenedHtml(
|
||||
trace_mail.body_html,
|
||||
(anchor_id, url, is_shortened),
|
||||
link_params=link_params,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# TOOLS
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def gateway_mail_bounce(self, mailing, record, bounce_base_values=None):
|
||||
""" Generate a bounce at mailgateway level.
|
||||
|
||||
:param mailing: a ``mailing.mailing`` record on which we find a trace
|
||||
to bounce;
|
||||
:param record: record which should bounce;
|
||||
:param bounce_base_values: optional values given to routing;
|
||||
"""
|
||||
trace = mailing.mailing_trace_ids.filtered(
|
||||
lambda t: t.model == record._name and t.res_id == record.id
|
||||
)
|
||||
|
||||
parsed_bounce_values = {
|
||||
'email_from': 'some.email@external.example.com', # TDE check: email_from -> trace email ?
|
||||
'to': 'bounce@test.example.com', # TDE check: bounce alias ?
|
||||
'message_id': tools.generate_tracking_message_id('MailTest'),
|
||||
'bounced_partner': self.env['res.partner'].sudo(),
|
||||
'bounced_message': self.env['mail.message'].sudo()
|
||||
}
|
||||
if bounce_base_values:
|
||||
parsed_bounce_values.update(bounce_base_values)
|
||||
parsed_bounce_values.update({
|
||||
'bounced_email': trace.email,
|
||||
'bounced_msg_id': [trace.message_id],
|
||||
})
|
||||
self.env['mail.thread']._routing_handle_bounce(False, parsed_bounce_values)
|
||||
return trace
|
||||
|
||||
def gateway_mail_click(self, mailing, record, click_label):
|
||||
""" Simulate a click on a sent email.
|
||||
|
||||
:param mailing: a ``mailing.mailing`` record on which we find a trace
|
||||
to click;
|
||||
:param record: record which should click;
|
||||
:param click_label: label of link on which we should click;
|
||||
"""
|
||||
trace = mailing.mailing_trace_ids.filtered(
|
||||
lambda t: t.model == record._name and t.res_id == record.id
|
||||
)
|
||||
|
||||
email = self._find_sent_mail_wemail(trace.email)
|
||||
self.assertTrue(bool(email))
|
||||
for (_url_href, link_url, _dummy, label) in re.findall(tools.HTML_TAG_URL_REGEX, email['body']):
|
||||
if label == click_label and '/r/' in link_url: # shortened link, like 'http://localhost:8069/r/LBG/m/53'
|
||||
parsed_url = werkzeug.urls.url_parse(link_url)
|
||||
path_items = parsed_url.path.split('/')
|
||||
code, trace_id = path_items[2], int(path_items[4])
|
||||
self.assertEqual(trace.id, trace_id)
|
||||
|
||||
self.env['link.tracker.click'].sudo().add_click(
|
||||
code,
|
||||
ip='100.200.300.%3f' % random.random(),
|
||||
country_code='BE',
|
||||
mailing_trace_id=trace.id
|
||||
)
|
||||
break
|
||||
else:
|
||||
raise AssertionError('url %s not found in mailing %s for record %s' % (click_label, mailing, record))
|
||||
return trace
|
||||
|
||||
def gateway_mail_open(self, mailing, record):
|
||||
""" Simulate opening an email through blank.gif icon access. As we
|
||||
don't want to use the whole Http layer just for that we will just
|
||||
call 'set_opened()' on trace, until having a better option.
|
||||
|
||||
:param mailing: a ``mailing.mailing`` record on which we find a trace
|
||||
to open;
|
||||
:param record: record which should open;
|
||||
"""
|
||||
trace = mailing.mailing_trace_ids.filtered(
|
||||
lambda t: t.model == record._name and t.res_id == record.id
|
||||
)
|
||||
trace.set_opened()
|
||||
return trace
|
||||
|
||||
@classmethod
|
||||
def _create_bounce_trace(cls, mailing, records, dt=None):
|
||||
if dt is None:
|
||||
dt = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
return cls._create_traces(mailing, records, dt, trace_status='bounce')
|
||||
|
||||
@classmethod
|
||||
def _create_sent_traces(cls, mailing, records, dt=None):
|
||||
if dt is None:
|
||||
dt = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||
return cls._create_traces(mailing, records, dt, trace_status='sent')
|
||||
|
||||
@classmethod
|
||||
def _create_traces(cls, mailing, records, dt, **values):
|
||||
if 'email_normalized' in records:
|
||||
fname = 'email_normalized'
|
||||
elif 'email_from' in records:
|
||||
fname = 'email_from'
|
||||
else:
|
||||
fname = 'email'
|
||||
randomized = random.random()
|
||||
# Cursor.now() uses transaction's timestamp and not datetime lib -> freeze_time
|
||||
# is not sufficient
|
||||
with patch.object(Cursor, 'now', lambda *args, **kwargs: dt):
|
||||
traces = cls.env['mailing.trace'].sudo().create([
|
||||
dict({'mass_mailing_id': mailing.id,
|
||||
'model': record._name,
|
||||
'res_id': record.id,
|
||||
'trace_status': values.get('trace_status', 'bounce'),
|
||||
# TDE FIXME: improve this with a mail-enabled heuristics
|
||||
'email': record[fname],
|
||||
'message_id': '<%5f@gilbert.boitempomils>' % randomized,
|
||||
}, **values)
|
||||
for record in records
|
||||
])
|
||||
return traces
|
||||
|
||||
@classmethod
|
||||
def _create_mailing_list(cls):
|
||||
""" Shortcut to create mailing lists. Currently hardcoded, maybe evolve
|
||||
in a near future. """
|
||||
cls.mailing_list_1 = cls.env['mailing.list'].with_context(cls._test_context).create({
|
||||
'name': 'List1',
|
||||
'contact_ids': [
|
||||
(0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
|
||||
(0, 0, {'name': 'Gorramts', 'email': 'gorramts@example.com'}),
|
||||
(0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
|
||||
]
|
||||
})
|
||||
cls.mailing_list_2 = cls.env['mailing.list'].with_context(cls._test_context).create({
|
||||
'name': 'List2',
|
||||
'contact_ids': [
|
||||
(0, 0, {'name': 'Gilberte', 'email': 'gilberte@example.com'}),
|
||||
(0, 0, {'name': 'Gilberte En Mieux', 'email': 'gilberte@example.com'}),
|
||||
(0, 0, {'name': 'Norbert', 'email': 'norbert@example.com'}),
|
||||
(0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
|
||||
]
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def _create_mailing_list_of_x_contacts(cls, contacts_nbr):
|
||||
""" Shortcut to create a mailing list that contains a defined number
|
||||
of contacts. """
|
||||
return cls.env['mailing.list'].with_context(cls._test_context).create({
|
||||
'name': 'Test List',
|
||||
'contact_ids': [
|
||||
(0, 0, {'name': 'Contact %s' % i, 'email': 'contact%s@example.com' % i})
|
||||
for i in range(contacts_nbr)
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
class MassMailCommon(MailCommon, MassMailCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(MassMailCommon, cls).setUpClass()
|
||||
|
||||
cls.user_marketing = mail_new_test_user(
|
||||
cls.env, login='user_marketing',
|
||||
groups='base.group_user,base.group_partner_manager,mass_mailing.group_mass_mailing_user',
|
||||
name='Martial Marketing', signature='--\nMartial')
|
||||
|
||||
cls.email_reply_to = 'MyCompany SomehowAlias <test.alias@test.mycompany.com>'
|
||||
|
||||
cls.env.flush_all()
|
||||
Loading…
Add table
Add a link
Reference in a new issue