mirror of
https://github.com/bringout/oca-ocb-test.git
synced 2026-04-22 22:22:00 +02:00
Initial commit: Test packages
This commit is contained in:
commit
080accd21c
338 changed files with 32413 additions and 0 deletions
|
|
@ -0,0 +1,10 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from . import test_odoobot
|
||||
from . import test_mail_performance
|
||||
from . import test_mail_thread_internals
|
||||
from . import test_mass_mailing
|
||||
from . import test_portal
|
||||
from . import test_rating
|
||||
from . import test_res_users
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from odoo.addons.test_mass_mailing.tests.common import TestMassMailCommon
|
||||
|
||||
|
||||
class TestMailFullCommon(TestMassMailCommon):
|
||||
""" Keep a single entry point, notably for backward compatibility """
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from odoo.addons.mail.tests.common import mail_new_test_user
|
||||
from odoo.addons.test_mail.tests.test_performance import BaseMailPerformance
|
||||
from odoo.tests.common import users, warmup
|
||||
from odoo.tests import tagged
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
|
||||
@tagged('mail_performance', 'post_install', '-at_install')
|
||||
class TestMailPerformance(BaseMailPerformance):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestMailPerformance, cls).setUpClass()
|
||||
|
||||
# users / followers
|
||||
cls.user_emp_email = mail_new_test_user(
|
||||
cls.env,
|
||||
company_id=cls.user_admin.company_id.id,
|
||||
company_ids=[(4, cls.user_admin.company_id.id)],
|
||||
email='user.emp.email@test.example.com',
|
||||
login='user_emp_email',
|
||||
groups='base.group_user,base.group_partner_manager',
|
||||
name='Emmanuel Email',
|
||||
notification_type='email',
|
||||
signature='--\nEmmanuel',
|
||||
)
|
||||
cls.user_portal = mail_new_test_user(
|
||||
cls.env,
|
||||
company_id=cls.user_admin.company_id.id,
|
||||
company_ids=[(4, cls.user_admin.company_id.id)],
|
||||
email='user.portal@test.example.com',
|
||||
login='user_portal',
|
||||
groups='base.group_portal',
|
||||
name='Paul Portal',
|
||||
)
|
||||
cls.customers = cls.env['res.partner'].create([
|
||||
{'country_id': cls.env.ref('base.be').id,
|
||||
'email': 'customer.full.test.1@example.com',
|
||||
'name': 'Test Full Customer 1',
|
||||
'mobile': '0456112233',
|
||||
'phone': '0456112233',
|
||||
},
|
||||
{'country_id': cls.env.ref('base.be').id,
|
||||
'email': 'customer.full.test.2@example.com',
|
||||
'name': 'Test Full Customer 2',
|
||||
'mobile': '0456223344',
|
||||
'phone': '0456112233',
|
||||
},
|
||||
])
|
||||
|
||||
# record
|
||||
cls.record_container = cls.env['mail.test.container.mc'].create({
|
||||
'alias_name': 'test-alias',
|
||||
'customer_id': cls.customer.id,
|
||||
'name': 'Test Container',
|
||||
})
|
||||
cls.record_ticket = cls.env['mail.test.ticket.mc'].create({
|
||||
'email_from': 'email.from@test.example.com',
|
||||
'container_id': cls.record_container.id,
|
||||
'customer_id': False,
|
||||
'name': 'Test Ticket',
|
||||
'user_id': cls.user_emp_email.id,
|
||||
})
|
||||
cls.record_ticket.message_subscribe(cls.customers.ids + cls.user_admin.partner_id.ids + cls.user_portal.partner_id.ids)
|
||||
|
||||
def test_initial_values(self):
|
||||
""" Simply ensure some values through all tests """
|
||||
record_ticket = self.env['mail.test.ticket.mc'].browse(self.record_ticket.ids)
|
||||
self.assertEqual(record_ticket.message_partner_ids,
|
||||
self.user_emp_email.partner_id + self.user_admin.partner_id + self.customers + self.user_portal.partner_id)
|
||||
self.assertEqual(len(record_ticket.message_ids), 1)
|
||||
|
||||
@mute_logger('odoo.tests', 'odoo.addons.mail.models.mail_mail', 'odoo.models.unlink')
|
||||
@users('employee')
|
||||
@warmup
|
||||
def test_message_post_w_followers(self):
|
||||
""" Aims to cover as much features of message_post as possible """
|
||||
record_ticket = self.env['mail.test.ticket.mc'].browse(self.record_ticket.ids)
|
||||
attachments = self.env['ir.attachment'].create(self.test_attachments_vals)
|
||||
|
||||
with self.assertQueryCount(employee=91): # tmf: 60
|
||||
new_message = record_ticket.message_post(
|
||||
attachment_ids=attachments.ids,
|
||||
body='<p>Test Content</p>',
|
||||
message_type='comment',
|
||||
subject='Test Subject',
|
||||
subtype_xmlid='mail.mt_comment',
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
new_message.notified_partner_ids,
|
||||
self.user_emp_email.partner_id + self.user_admin.partner_id + self.customers + self.user_portal.partner_id
|
||||
)
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from werkzeug.urls import url_parse
|
||||
|
||||
from odoo.addons.test_mail_full.tests.common import TestMailFullCommon
|
||||
from odoo.addons.test_mail_sms.tests.common import TestSMSRecipients
|
||||
from odoo.tests import tagged, users
|
||||
|
||||
|
||||
class TestMailThreadInternalsCommon(TestMailFullCommon, TestSMSRecipients):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestMailThreadInternalsCommon, cls).setUpClass()
|
||||
|
||||
cls.test_portal_records, cls.test_portal_partners = cls._create_records_for_batch(
|
||||
'mail.test.portal',
|
||||
2,
|
||||
)
|
||||
cls.test_portal_nop_records, _ = cls._create_records_for_batch(
|
||||
'mail.test.portal.no.partner',
|
||||
2,
|
||||
)
|
||||
cls.test_rating_records, cls.test_rating_partners = cls._create_records_for_batch(
|
||||
'mail.test.rating',
|
||||
2,
|
||||
)
|
||||
cls.test_simple_records, _ = cls._create_records_for_batch(
|
||||
'mail.test.simple',
|
||||
2,
|
||||
)
|
||||
|
||||
|
||||
@tagged('mail_thread', 'portal')
|
||||
class TestMailThreadInternals(TestMailThreadInternalsCommon):
|
||||
|
||||
@users('employee')
|
||||
def test_notify_get_recipients_groups(self):
|
||||
""" Test redirection of portal-enabled records """
|
||||
test_records = [
|
||||
self.test_portal_records[0].with_env(self.env),
|
||||
self.test_portal_nop_records[0].with_env(self.env),
|
||||
self.test_rating_records[0].with_env(self.env),
|
||||
self.test_simple_records[0].with_env(self.env),
|
||||
]
|
||||
for test_record in test_records:
|
||||
with self.subTest(test_record=test_record):
|
||||
is_portal = test_record._name != 'mail.test.simple'
|
||||
has_customer = test_record._name != 'mail.test.portal.no.partner'
|
||||
partner_fnames = test_record._mail_get_partner_fields()
|
||||
|
||||
if is_portal:
|
||||
self.assertFalse(
|
||||
test_record.access_token,
|
||||
'By default access tokens are False with portal'
|
||||
)
|
||||
|
||||
groups = test_record._notify_get_recipients_groups()
|
||||
portal_customer_group = next(
|
||||
(group for group in groups if group[0] == 'portal_customer'),
|
||||
False
|
||||
)
|
||||
|
||||
if is_portal and has_customer:
|
||||
# should have generated the access token, required for portal links
|
||||
self.assertTrue(
|
||||
test_record.access_token,
|
||||
'Portal should generate access token'
|
||||
)
|
||||
# check portal_customer content and link
|
||||
self.assertTrue(
|
||||
portal_customer_group,
|
||||
'Portal Mixin should add portal customer notification group'
|
||||
)
|
||||
portal_url = portal_customer_group[2]['button_access']['url']
|
||||
parameters = url_parse(portal_url).decode_query()
|
||||
self.assertEqual(parameters['access_token'], test_record.access_token)
|
||||
self.assertEqual(parameters['model'], test_record._name)
|
||||
self.assertEqual(parameters['pid'], str(test_record[partner_fnames[0]].id))
|
||||
self.assertEqual(parameters['res_id'], str(test_record.id))
|
||||
else:
|
||||
self.assertFalse(
|
||||
portal_customer_group,
|
||||
'Portal Mixin should not add portal customer notification group'
|
||||
)
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
import werkzeug
|
||||
|
||||
from odoo.addons.test_mail_full.tests.common import TestMailFullCommon
|
||||
from odoo.tests.common import users
|
||||
from odoo.tools import mute_logger
|
||||
from odoo.tests import tagged
|
||||
|
||||
|
||||
@tagged('mass_mailing')
|
||||
class TestMassMailing(TestMailFullCommon):
|
||||
|
||||
@users('user_marketing')
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_mailing_w_blacklist_opt_out(self):
|
||||
mailing = self.env['mailing.mailing'].browse(self.mailing_bl.ids)
|
||||
mailing.write({'subject': 'Subject {{ object.name }}'})
|
||||
|
||||
mailing.write({'mailing_model_id': self.env['ir.model']._get('mailing.test.optout').id})
|
||||
recipients = self._create_mailing_test_records(model='mailing.test.optout', count=10)
|
||||
|
||||
# optout records 1 and 2
|
||||
(recipients[1] | recipients[2]).write({'opt_out': True})
|
||||
recipients[1].email_from = f'"Format Me" <{recipients[1].email_from}>'
|
||||
# blacklist records 3 and 4
|
||||
self.env['mail.blacklist'].create({'email': recipients[3].email_normalized})
|
||||
self.env['mail.blacklist'].create({'email': recipients[4].email_normalized})
|
||||
recipients[3].email_from = f'"Format Me" <{recipients[3].email_from}>'
|
||||
# have a duplicate email for 9
|
||||
recipients[9].email_from = f'"Format Me" <{recipients[9].email_from}>'
|
||||
recipient_dup_1 = recipients[9].copy()
|
||||
recipient_dup_1.email_from = f'"Format Me" <{recipient_dup_1.email_from}>'
|
||||
# have another duplicate for 9, but with multi emails already done
|
||||
recipient_dup_2 = recipients[9].copy()
|
||||
recipient_dup_2.email_from += f'; "TestDupe" <{recipients[8].email_from}>'
|
||||
# have another duplicate for 9, but with multi emails, one is different
|
||||
recipient_dup_3 = recipients[9].copy() # this one will passthrough (best-effort)
|
||||
recipient_dup_3.email_from += '; "TestMulti" <test.multi@test.example.com>'
|
||||
recipient_dup_4 = recipient_dup_2.copy() # this one will be discarded (youpi)
|
||||
# have a void mail
|
||||
recipient_void_1 = self.env['mailing.test.optout'].create({'name': 'TestRecord_void_1'})
|
||||
# have a falsy mail
|
||||
recipient_falsy_1 = self.env['mailing.test.optout'].create({
|
||||
'name': 'TestRecord_falsy_1',
|
||||
'email_from': 'falsymail'
|
||||
})
|
||||
recipients_all = (
|
||||
recipients + recipient_dup_1 + recipient_dup_2 + recipient_dup_3 + recipient_dup_4
|
||||
+ recipient_void_1 + recipient_falsy_1
|
||||
)
|
||||
|
||||
mailing.write({'mailing_domain': [('id', 'in', recipients_all.ids)]})
|
||||
mailing.action_put_in_queue()
|
||||
with self.mock_mail_gateway(mail_unlink_sent=False):
|
||||
mailing.action_send_mail()
|
||||
|
||||
for recipient in recipients_all:
|
||||
recipient_info = {
|
||||
'email': recipient.email_normalized,
|
||||
'content': f'Hello {recipient.name}',
|
||||
'mail_values': {
|
||||
'subject': f'Subject {recipient.name}',
|
||||
},
|
||||
}
|
||||
|
||||
# opt-out: cancel (cancel mail)
|
||||
if recipient in recipients[1] | recipients[2]:
|
||||
recipient_info['trace_status'] = "cancel"
|
||||
recipient_info['failure_type'] = "mail_optout"
|
||||
# blacklisted: cancel (cancel mail)
|
||||
elif recipient in recipients[3] | recipients[4]:
|
||||
recipient_info['trace_status'] = "cancel"
|
||||
recipient_info['failure_type'] = "mail_bl"
|
||||
# duplicates: cancel (cancel mail)
|
||||
elif recipient in (recipient_dup_1, recipient_dup_2, recipient_dup_4):
|
||||
recipient_info['trace_status'] = "cancel"
|
||||
recipient_info['failure_type'] = "mail_dup"
|
||||
# void: error (failed mail)
|
||||
elif recipient == recipient_void_1:
|
||||
recipient_info['trace_status'] = 'cancel'
|
||||
recipient_info['failure_type'] = "mail_email_missing"
|
||||
# falsy: error (failed mail)
|
||||
elif recipient == recipient_falsy_1:
|
||||
recipient_info['trace_status'] = "cancel"
|
||||
recipient_info['failure_type'] = "mail_email_invalid"
|
||||
recipient_info['email'] = recipient.email_from # normalized is False but email should be falsymail
|
||||
else:
|
||||
# multi email -> outgoing email contains all emails
|
||||
if recipient == recipient_dup_3:
|
||||
email = self._find_sent_email(self.user_marketing.email_formatted, ['test.record.09@test.example.com', 'test.multi@test.example.com'])
|
||||
else:
|
||||
email = self._find_sent_email(self.user_marketing.email_formatted, [recipient.email_normalized])
|
||||
# preview correctly integrated rendered qweb
|
||||
self.assertIn(
|
||||
'Hi %s :)' % recipient.name,
|
||||
email['body'])
|
||||
# rendered unsubscribe
|
||||
self.assertIn(
|
||||
'%s/mailing/%s/confirm_unsubscribe' % (mailing.get_base_url(), mailing.id),
|
||||
email['body'])
|
||||
unsubscribe_href = self._get_href_from_anchor_id(email['body'], "url6")
|
||||
unsubscribe_url = werkzeug.urls.url_parse(unsubscribe_href)
|
||||
unsubscribe_params = unsubscribe_url.decode_query().to_dict(flat=True)
|
||||
self.assertEqual(int(unsubscribe_params['res_id']), recipient.id)
|
||||
self.assertEqual(unsubscribe_params['email'], recipient.email_normalized)
|
||||
self.assertEqual(
|
||||
mailing._unsubscribe_token(unsubscribe_params['res_id'], (unsubscribe_params['email'])),
|
||||
unsubscribe_params['token']
|
||||
)
|
||||
# rendered view
|
||||
self.assertIn(
|
||||
'%s/mailing/%s/view' % (mailing.get_base_url(), mailing.id),
|
||||
email['body'])
|
||||
view_href = self._get_href_from_anchor_id(email['body'], "url6")
|
||||
view_url = werkzeug.urls.url_parse(view_href)
|
||||
view_params = view_url.decode_query().to_dict(flat=True)
|
||||
self.assertEqual(int(view_params['res_id']), recipient.id)
|
||||
self.assertEqual(view_params['email'], recipient.email_normalized)
|
||||
self.assertEqual(
|
||||
mailing._unsubscribe_token(view_params['res_id'], (view_params['email'])),
|
||||
view_params['token']
|
||||
)
|
||||
|
||||
self.assertMailTraces(
|
||||
[recipient_info], mailing, recipient,
|
||||
mail_links_info=[[
|
||||
('url0', 'https://www.odoo.tz/my/%s' % recipient.name, True, {}),
|
||||
('url1', 'https://www.odoo.be', True, {}),
|
||||
('url2', 'https://www.odoo.com', True, {}),
|
||||
('url3', 'https://www.odoo.eu', True, {}),
|
||||
('url4', 'https://www.example.com/foo/bar?baz=qux', True, {'baz': 'qux'}),
|
||||
('url5', '%s/event/dummy-event-0' % mailing.get_base_url(), True, {}),
|
||||
# view is not shortened and parsed at sending
|
||||
('url6', '%s/view' % mailing.get_base_url(), False, {}),
|
||||
('url7', 'mailto:test@odoo.com', False, {}),
|
||||
# unsubscribe is not shortened and parsed at sending
|
||||
('url8', '%s/unsubscribe_from_list' % mailing.get_base_url(), False, {}),
|
||||
]],
|
||||
check_mail=True,
|
||||
)
|
||||
|
||||
# sent: 15, 2 bl, 3 opt-out, 3 invalid -> 7 remaining
|
||||
# ignored: 2 bl + 3 optout + 2 invalid + 1 duplicate; failed: 0
|
||||
self.assertMailingStatistics(mailing, expected=16, delivered=7, sent=7, canceled=9, failed=0)
|
||||
|
|
@ -0,0 +1,142 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from odoo.addons.test_mail.tests.common import TestMailCommon, TestRecipients
|
||||
from odoo.tests import tagged
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
|
||||
@tagged("odoobot")
|
||||
class TestOdoobot(TestMailCommon, TestRecipients):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestOdoobot, cls).setUpClass()
|
||||
cls.test_record = cls.env['mail.test.simple'].with_context(cls._test_context).create({'name': 'Test', 'email_from': 'ignasse@example.com'})
|
||||
|
||||
cls.odoobot = cls.env.ref("base.partner_root")
|
||||
cls.message_post_default_kwargs = {
|
||||
'body': '',
|
||||
'attachment_ids': [],
|
||||
'message_type': 'comment',
|
||||
'partner_ids': [],
|
||||
'subtype_xmlid': 'mail.mt_comment'
|
||||
}
|
||||
cls.odoobot_ping_body = '<a href="http://odoo.com/web#model=res.partner&id=%s" class="o_mail_redirect" data-oe-id="%s" data-oe-model="res.partner" target="_blank">@OdooBot</a>' % (cls.odoobot.id, cls.odoobot.id)
|
||||
cls.test_record_employe = cls.test_record.with_user(cls.user_employee)
|
||||
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_fetch_listener(self):
|
||||
channel = self.user_employee.with_user(self.user_employee)._init_odoobot()
|
||||
odoobot = self.env.ref("base.partner_root")
|
||||
odoobot_in_fetch_listeners = self.env['mail.channel.member'].search([('channel_id', '=', channel.id), ('partner_id', '=', odoobot.id)])
|
||||
self.assertEqual(len(odoobot_in_fetch_listeners), 1, 'odoobot should appear only once in channel_fetch_listeners')
|
||||
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_odoobot_ping(self):
|
||||
kwargs = self.message_post_default_kwargs.copy()
|
||||
kwargs.update({'body': self.odoobot_ping_body, 'partner_ids': [self.odoobot.id, self.user_admin.partner_id.id]})
|
||||
|
||||
with patch('random.choice', lambda x: x[0]):
|
||||
self.assertNextMessage(
|
||||
self.test_record_employe.with_context({'mail_post_autofollow': True}).message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=False
|
||||
)
|
||||
# Odoobot should not be a follower but user_employee and user_admin should
|
||||
follower = self.test_record.message_follower_ids.mapped('partner_id')
|
||||
self.assertNotIn(self.odoobot, follower)
|
||||
self.assertIn(self.user_employee.partner_id, follower)
|
||||
self.assertIn(self.user_admin.partner_id, follower)
|
||||
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_onboarding_flow(self):
|
||||
kwargs = self.message_post_default_kwargs.copy()
|
||||
channel = self.user_employee.with_user(self.user_employee)._init_odoobot()
|
||||
|
||||
kwargs['body'] = 'tagada 😊'
|
||||
last_message = self.assertNextMessage(
|
||||
channel.message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=("help",)
|
||||
)
|
||||
channel.execute_command_help()
|
||||
self.assertNextMessage(
|
||||
last_message, # no message will be post with command help, use last odoobot message instead
|
||||
sender=self.odoobot,
|
||||
answer=("@OdooBot",)
|
||||
)
|
||||
kwargs['body'] = ''
|
||||
kwargs['partner_ids'] = [self.env['ir.model.data']._xmlid_to_res_id("base.partner_root")]
|
||||
self.assertNextMessage(
|
||||
channel.message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=("attachment",)
|
||||
)
|
||||
kwargs['body'] = ''
|
||||
attachment = self.env['ir.attachment'].with_user(self.user_employee).create({
|
||||
'datas': 'bWlncmF0aW9uIHRlc3Q=',
|
||||
'name': 'picture_of_your_dog.doc',
|
||||
'res_model': 'mail.compose.message',
|
||||
})
|
||||
kwargs['attachment_ids'] = [attachment.id]
|
||||
# For the end of the flow, we only test that the state changed, but not to which
|
||||
# one since it depends on the intalled apps, which can add more steps (like livechat)
|
||||
channel.message_post(**kwargs)
|
||||
self.assertNotEqual(self.user_employee.odoobot_state, 'onboarding_attachement')
|
||||
|
||||
# Test miscellaneous messages
|
||||
self.user_employee.odoobot_state = "idle"
|
||||
kwargs['partner_ids'] = []
|
||||
kwargs['body'] = "I love you"
|
||||
self.assertNextMessage(
|
||||
channel.message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=("too human for me",)
|
||||
)
|
||||
kwargs['body'] = "Go fuck yourself"
|
||||
self.assertNextMessage(
|
||||
channel.message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=("I have feelings",)
|
||||
)
|
||||
kwargs['body'] = "help me"
|
||||
self.assertNextMessage(
|
||||
channel.message_post(**kwargs),
|
||||
sender=self.odoobot,
|
||||
answer=("If you need help",)
|
||||
)
|
||||
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_odoobot_no_default_answer(self):
|
||||
kwargs = self.message_post_default_kwargs.copy()
|
||||
kwargs.update({'body': "I'm not talking to @odoobot right now", 'partner_ids': []})
|
||||
self.assertNextMessage(
|
||||
self.test_record_employe.message_post(**kwargs),
|
||||
answer=False
|
||||
)
|
||||
|
||||
def assertNextMessage(self, message, answer=None, sender=None):
|
||||
last_message = self.env['mail.message'].search([('id', '=', message.id + 1)])
|
||||
if last_message:
|
||||
body = last_message.body.replace('<p>', '').replace('</p>', '')
|
||||
else:
|
||||
self.assertFalse(answer, "No last message found when an answer was expect")
|
||||
if answer is not None:
|
||||
if answer and not last_message:
|
||||
self.assertTrue(False, "No last message found")
|
||||
if isinstance(answer, list):
|
||||
self.assertIn(body, answer)
|
||||
elif isinstance(answer, tuple):
|
||||
for elem in answer:
|
||||
self.assertIn(elem, body)
|
||||
elif not answer:
|
||||
self.assertFalse(last_message, "No answer should have been post")
|
||||
return
|
||||
else:
|
||||
self.assertEqual(body, answer)
|
||||
if sender:
|
||||
self.assertEqual(sender, last_message.author_id)
|
||||
return last_message
|
||||
|
|
@ -0,0 +1,465 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from werkzeug.urls import url_parse, url_decode, url_encode
|
||||
import json
|
||||
|
||||
from odoo import http
|
||||
from odoo.addons.test_mail_full.tests.common import TestMailFullCommon
|
||||
from odoo.addons.test_mail_sms.tests.common import TestSMSRecipients
|
||||
from odoo.exceptions import AccessError
|
||||
from odoo.tests import tagged, users
|
||||
from odoo.tests.common import HttpCase
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
|
||||
@tagged('portal')
|
||||
class TestPortal(HttpCase, TestMailFullCommon, TestSMSRecipients):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPortal, self).setUp()
|
||||
|
||||
self.record_portal = self.env['mail.test.portal'].create({
|
||||
'partner_id': self.partner_1.id,
|
||||
'name': 'Test Portal Record',
|
||||
})
|
||||
|
||||
self.record_portal._portal_ensure_token()
|
||||
|
||||
|
||||
@tagged('-at_install', 'post_install', 'portal', 'mail_controller')
|
||||
class TestPortalControllers(TestPortal):
|
||||
|
||||
def test_portal_avatar_with_access_token(self):
|
||||
mail_record = self.env['mail.message'].create({
|
||||
'author_id': self.record_portal.partner_id.id,
|
||||
'model': self.record_portal._name,
|
||||
'res_id': self.record_portal.id,
|
||||
})
|
||||
response = self.url_open(f'/mail/avatar/mail.message/{mail_record.id}/author_avatar/50x50?access_token={self.record_portal.access_token}')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.headers.get('Content-Type'), 'image/png')
|
||||
self.assertRegex(response.headers.get('Content-Disposition', ''), r'mail_message-\d+-author_avatar\.png')
|
||||
|
||||
placeholder_response = self.url_open(f'/mail/avatar/mail.message/{mail_record.id}/author_avatar/50x50?access_token={self.record_portal.access_token + "a"}') # false token
|
||||
self.assertEqual(placeholder_response.status_code, 200)
|
||||
self.assertEqual(placeholder_response.headers.get('Content-Type'), 'image/png')
|
||||
self.assertRegex(placeholder_response.headers.get('Content-Disposition', ''), r'placeholder\.png')
|
||||
|
||||
no_token_response = self.url_open(f'/mail/avatar/mail.message/{mail_record.id}/author_avatar/50x50')
|
||||
self.assertEqual(no_token_response.status_code, 200)
|
||||
self.assertEqual(no_token_response.headers.get('Content-Type'), 'image/png')
|
||||
self.assertRegex(no_token_response.headers.get('Content-Disposition', ''), r'placeholder\.png')
|
||||
|
||||
def test_portal_avatar_with_hash_pid(self):
|
||||
self.authenticate(None, None)
|
||||
post_url = f"{self.record_portal.get_base_url()}/mail/chatter_post"
|
||||
res = self.opener.post(
|
||||
url=post_url,
|
||||
json={
|
||||
'params': {
|
||||
'csrf_token': http.Request.csrf_token(self),
|
||||
'message': 'Test',
|
||||
'res_model': self.record_portal._name,
|
||||
'res_id': self.record_portal.id,
|
||||
'hash': self.record_portal._sign_token(self.partner_2.id),
|
||||
'pid': self.partner_2.id,
|
||||
},
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
self.assertNotIn("error", res.json())
|
||||
message = self.record_portal.message_ids[0]
|
||||
response = self.url_open(
|
||||
f'/mail/avatar/mail.message/{message.id}/author_avatar/50x50?_hash={self.record_portal._sign_token(self.partner_2.id)}&pid={self.partner_2.id}')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.headers.get('Content-Type'), 'image/png')
|
||||
self.assertRegex(response.headers.get('Content-Disposition', ''), r'mail_message-\d+-author_avatar\.png')
|
||||
|
||||
placeholder_response = self.url_open(
|
||||
f'/mail/avatar/mail.message/{message.id}/author_avatar/50x50?_hash={self.record_portal._sign_token(self.partner_2.id) + "a"}&pid={self.partner_2.id}') # false hash
|
||||
self.assertEqual(placeholder_response.status_code, 200)
|
||||
self.assertEqual(placeholder_response.headers.get('Content-Type'), 'image/png')
|
||||
self.assertRegex(placeholder_response.headers.get('Content-Disposition', ''), r'placeholder\.png')
|
||||
|
||||
def test_portal_message_fetch(self):
|
||||
"""Test retrieving chatter messages through the portal controller"""
|
||||
self.authenticate(None, None)
|
||||
message_fetch_url = '/mail/chatter_fetch'
|
||||
payload = json.dumps({
|
||||
'jsonrpc': '2.0',
|
||||
'method': 'call',
|
||||
'id': 0,
|
||||
'params': {
|
||||
'res_model': 'mail.test.portal',
|
||||
'res_id': self.record_portal.id,
|
||||
'token': self.record_portal.access_token,
|
||||
},
|
||||
})
|
||||
|
||||
def get_chatter_message_count():
|
||||
res = self.url_open(
|
||||
url=message_fetch_url,
|
||||
data=payload,
|
||||
headers={'Content-Type': 'application/json'}
|
||||
)
|
||||
return res.json().get('result', {}).get('message_count', 0)
|
||||
|
||||
self.assertEqual(get_chatter_message_count(), 0)
|
||||
|
||||
for _ in range(8):
|
||||
self.record_portal.message_post(
|
||||
body='Test',
|
||||
author_id=self.partner_1.id,
|
||||
message_type='comment',
|
||||
subtype_id=self.env.ref('mail.mt_comment').id,
|
||||
)
|
||||
|
||||
self.assertEqual(get_chatter_message_count(), 8)
|
||||
|
||||
# Empty the body of a few messages
|
||||
for i in (2, 5, 6):
|
||||
self.record_portal.message_ids[i].body = ""
|
||||
|
||||
# Empty messages should be ignored
|
||||
self.assertEqual(get_chatter_message_count(), 5)
|
||||
|
||||
def test_portal_share_comment(self):
|
||||
""" Test posting through portal controller allowing to use a hash to
|
||||
post wihtout access rights. """
|
||||
self.authenticate(None, None)
|
||||
post_url = f"{self.record_portal.get_base_url()}/mail/chatter_post"
|
||||
|
||||
# test as not logged
|
||||
self.opener.post(
|
||||
url=post_url,
|
||||
json={
|
||||
'params': {
|
||||
'csrf_token': http.Request.csrf_token(self),
|
||||
'hash': self.record_portal._sign_token(self.partner_2.id),
|
||||
'message': 'Test',
|
||||
'pid': self.partner_2.id,
|
||||
'redirect': '/',
|
||||
'res_model': self.record_portal._name,
|
||||
'res_id': self.record_portal.id,
|
||||
'token': self.record_portal.access_token,
|
||||
},
|
||||
},
|
||||
)
|
||||
message = self.record_portal.message_ids[0]
|
||||
|
||||
self.assertIn('Test', message.body)
|
||||
self.assertEqual(message.author_id, self.partner_2)
|
||||
|
||||
|
||||
@tagged('-at_install', 'post_install', 'portal', 'mail_controller')
|
||||
class TestPortalFlow(TestMailFullCommon, HttpCase):
|
||||
""" Test shared links, mail/view links and redirection (backend, customer
|
||||
portal or frontend for specific addons). """
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.customer = cls.env['res.partner'].create({
|
||||
'country_id': cls.env.ref('base.fr').id,
|
||||
'email': 'mdelvaux34@example.com',
|
||||
'lang': 'en_US',
|
||||
'mobile': '+33639982325',
|
||||
'name': 'Mathias Delvaux',
|
||||
'phone': '+33353011823',
|
||||
})
|
||||
# customer portal enabled
|
||||
cls.record_portal = cls.env['mail.test.portal'].create({
|
||||
'name': 'Test Portal Record',
|
||||
'partner_id': cls.customer.id,
|
||||
'user_id': cls.user_admin.id,
|
||||
})
|
||||
# internal only
|
||||
cls.record_internal = cls.env['mail.test.track'].create({
|
||||
'name': 'Test Internal Record',
|
||||
})
|
||||
# readable (aka portal can read but no specific action)
|
||||
cls.record_read = cls.env['mail.test.simple'].create({
|
||||
'name': 'Test Readable Record',
|
||||
})
|
||||
# 'public' target_type act_url (e.g. blog, forum, ...) -> redirection to a public page
|
||||
cls.record_public_act_url = cls.env['mail.test.portal.public.access.action'].create({
|
||||
'name': 'Public ActUrl',
|
||||
})
|
||||
|
||||
cls.mail_template = cls.env['mail.template'].create({
|
||||
'auto_delete': True,
|
||||
'body_html': '<p>Hello <t t-out="object.partner_id.name"/>, your quotation is ready for review.</p>',
|
||||
'email_from': '{{ (object.user_id.email_formatted or user.email_formatted) }}',
|
||||
'model_id': cls.env.ref('test_mail_full.model_mail_test_portal').id,
|
||||
'name': 'Quotation template',
|
||||
'partner_to': '{{ object.partner_id.id }}',
|
||||
'subject': 'Your quotation "{{ object.name }}"',
|
||||
})
|
||||
cls._create_portal_user()
|
||||
|
||||
# prepare access URLs on self to ease tests
|
||||
# ------------------------------------------------------------
|
||||
base_url = cls.record_portal.get_base_url()
|
||||
cls.test_base_url = base_url
|
||||
|
||||
cls.record_internal_url_base = f'{base_url}/mail/view?model={cls.record_internal._name}&res_id={cls.record_internal.id}'
|
||||
cls.record_portal_url_base = f'{base_url}/mail/view?model={cls.record_portal._name}&res_id={cls.record_portal.id}'
|
||||
cls.record_read_url_base = f'{base_url}/mail/view?model={cls.record_read._name}&res_id={cls.record_read.id}'
|
||||
cls.record_public_act_url_base = f'{base_url}/mail/view?model={cls.record_public_act_url._name}&res_id={cls.record_public_act_url.id}'
|
||||
|
||||
max_internal_id = cls.env['mail.test.track'].search([], order="id desc", limit=1).id
|
||||
max_portal_id = cls.env['mail.test.portal'].search([], order="id desc", limit=1).id
|
||||
max_read_id = cls.env['mail.test.simple'].search([], order="id desc", limit=1).id
|
||||
max_public_act_url_id = cls.env['mail.test.portal.public.access.action'].search([], order="id desc", limit=1).id
|
||||
cls.record_internal_url_no_exists = f'{base_url}/mail/view?model={cls.record_internal._name}&res_id={max_internal_id + 1}'
|
||||
cls.record_portal_url_no_exists = f'{base_url}/mail/view?model={cls.record_portal._name}&res_id={max_portal_id + 1}'
|
||||
cls.record_read_url_no_exists = f'{base_url}/mail/view?model={cls.record_read._name}&res_id={max_read_id + 1}'
|
||||
cls.record_public_act_url_url_no_exists = f'{base_url}/mail/view?model={cls.record_public_act_url._name}&res_id={max_public_act_url_id + 1}'
|
||||
|
||||
cls.record_url_no_model = f'{cls.record_portal.get_base_url()}/mail/view?model=this.should.not.exists&res_id=1'
|
||||
|
||||
# find portal + auth data url
|
||||
for group_name, group_func, group_data in cls.record_portal.sudo()._notify_get_recipients_groups(False):
|
||||
if group_name == 'portal_customer' and group_func(cls.customer):
|
||||
cls.record_portal_url_auth = group_data['button_access']['url']
|
||||
break
|
||||
else:
|
||||
raise AssertionError('Record access URL not found')
|
||||
# build altered access_token URL for testing security
|
||||
parsed_url = url_parse(cls.record_portal_url_auth)
|
||||
query_params = url_decode(parsed_url.query)
|
||||
cls.record_portal_hash = query_params['hash']
|
||||
cls.record_portal_url_auth_wrong_token = parsed_url.replace(
|
||||
query=url_encode({
|
||||
**query_params,
|
||||
'access_token': query_params['access_token'].translate(
|
||||
str.maketrans('0123456789abcdef', '9876543210fedcba')
|
||||
)
|
||||
}, sort=True)
|
||||
).to_url()
|
||||
|
||||
# prepare result URLs on self to ease tests
|
||||
# ------------------------------------------------------------
|
||||
cls.portal_web_url = f'{base_url}/my/test_portal/{cls.record_portal.id}'
|
||||
cls.portal_web_url_with_token = f'{base_url}/my/test_portal/{cls.record_portal.id}?{url_encode({"access_token": cls.record_portal.access_token, "pid": cls.customer.id, "hash": cls.record_portal_hash}, sort=True)}'
|
||||
cls.public_act_url_share = f'{base_url}/test_portal/public_type/{cls.record_public_act_url.id}'
|
||||
cls.internal_backend_local_url = f'/web#{url_encode({"model": cls.record_internal._name, "id": cls.record_internal.id, "active_id": cls.record_internal.id, "cids": cls.company_admin.id}, sort=True)}'
|
||||
cls.portal_backend_local_url = f'/web#{url_encode({"model": cls.record_portal._name, "id": cls.record_portal.id, "active_id": cls.record_portal.id, "cids": cls.company_admin.id}, sort=True)}'
|
||||
cls.read_backend_local_url = f'/web#{url_encode({"model": cls.record_read._name, "id": cls.record_read.id, "active_id": cls.record_read.id, "cids": cls.company_admin.id}, sort=True)}'
|
||||
cls.public_act_url_backend_local_url = f'/web#{url_encode({"model": cls.record_public_act_url._name, "id": cls.record_public_act_url.id, "active_id": cls.record_public_act_url.id, "cids": cls.company_admin.id}, sort=True)}'
|
||||
cls.discuss_local_url = '/web#action=mail.action_discuss'
|
||||
|
||||
def test_assert_initial_data(self):
|
||||
""" Test some initial values. Test that record_access_url is a valid URL
|
||||
to view the record_portal and that record_access_url_wrong_token only differs
|
||||
from record_access_url by a different access_token. """
|
||||
self.record_internal.with_user(self.user_employee).check_access_rule('read')
|
||||
self.record_portal.with_user(self.user_employee).check_access_rule('read')
|
||||
self.record_read.with_user(self.user_employee).check_access_rule('read')
|
||||
|
||||
with self.assertRaises(AccessError):
|
||||
self.record_internal.with_user(self.user_portal).check_access_rights('read')
|
||||
with self.assertRaises(AccessError):
|
||||
self.record_portal.with_user(self.user_portal).check_access_rights('read')
|
||||
self.record_read.with_user(self.user_portal).check_access_rights('read')
|
||||
|
||||
self.assertNotEqual(self.record_portal_url_auth, self.record_portal_url_auth_wrong_token)
|
||||
url_params = []
|
||||
for url in (
|
||||
self.record_portal_url_auth, self.record_portal_url_auth_wrong_token,
|
||||
):
|
||||
with self.subTest(url=url):
|
||||
parsed = url_parse(url)
|
||||
self.assertEqual(parsed.path, '/mail/view')
|
||||
params = url_decode(parsed.query)
|
||||
url_params.append(params)
|
||||
# Note that pid, hash and auth_signup_token are not tested by this test but may be present in the URL (config).
|
||||
self.assertEqual(params.get('model'), 'mail.test.portal')
|
||||
self.assertEqual(int(params.get('res_id')), self.record_portal.id)
|
||||
self.assertTrue(params.get('access_token'))
|
||||
self.assertNotEqual(url_params[0]['access_token'], url_params[1]['access_token'])
|
||||
self.assertEqual(
|
||||
{k: v for k, v in url_params[0].items() if k != 'access_token'},
|
||||
{k: v for k, v in url_params[1].items() if k != 'access_token'},
|
||||
'URLs should be the same, except for access token'
|
||||
)
|
||||
|
||||
@users('employee')
|
||||
def test_employee_access(self):
|
||||
""" Check internal employee behavior when accessing mail/view """
|
||||
self.authenticate(self.env.user.login, self.env.user.login)
|
||||
for url_name, url, exp_url in [
|
||||
# accessible records
|
||||
("Internal record mail/view", self.record_internal_url_base, self.internal_backend_local_url),
|
||||
("Portal record mail/view", self.record_portal_url_base, self.portal_backend_local_url),
|
||||
("Portal readable record mail/view", self.record_read_url_base, self.read_backend_local_url),
|
||||
("Public with act_url", self.record_public_act_url_base, self.public_act_url_backend_local_url),
|
||||
# even with token -> backend
|
||||
("Portal record with token", self.record_portal_url_auth, self.portal_backend_local_url),
|
||||
# invalid token is not an issue for employee -> backend, has access
|
||||
("Portal record with wrong token", self.record_portal_url_auth_wrong_token, self.portal_backend_local_url),
|
||||
# not existing -> redirect to discuss
|
||||
("Not existing record (internal)", self.record_internal_url_no_exists, self.discuss_local_url),
|
||||
("Not existing record (portal enabled)", self.record_portal_url_no_exists, self.discuss_local_url),
|
||||
("Not existign model", self.record_url_no_model, self.discuss_local_url),
|
||||
]:
|
||||
with self.subTest(name=url_name, url=url):
|
||||
res = self.url_open(url)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertURLEqual(res.url, exp_url)
|
||||
|
||||
@mute_logger('werkzeug')
|
||||
@users('portal_test')
|
||||
def test_portal_access_logged(self):
|
||||
""" Check portal behavior when accessing mail/view, notably check token
|
||||
support and propagation. """
|
||||
my_url = f'{self.test_base_url}/my'
|
||||
|
||||
self.authenticate(self.env.user.login, self.env.user.login)
|
||||
for url_name, url, exp_url in [
|
||||
# valid token -> ok -> redirect to portal URL
|
||||
(
|
||||
"No access (portal enabled), token", self.record_portal_url_auth,
|
||||
self.portal_web_url_with_token,
|
||||
),
|
||||
# invalid token -> ko -> redirect to my
|
||||
(
|
||||
"No access (portal enabled), invalid token", self.record_portal_url_auth_wrong_token,
|
||||
my_url,
|
||||
),
|
||||
# std url, read record -> redirect to my with parameters being record portal action parameters (???)
|
||||
(
|
||||
'Access record (no customer portal)', self.record_read_url_base,
|
||||
f'{self.test_base_url}/my#{url_encode({"model": self.record_read._name, "id": self.record_read.id, "active_id": self.record_read.id, "cids": self.company_admin.id}, sort=True)}',
|
||||
),
|
||||
# std url, no access to record -> redirect to my
|
||||
(
|
||||
'No access record (internal)', self.record_internal_url_base,
|
||||
my_url,
|
||||
),
|
||||
# missing token -> redirect to my
|
||||
(
|
||||
'No access record (portal enabled)', self.record_portal_url_base,
|
||||
my_url,
|
||||
),
|
||||
# public_type act_url -> share users are redirected to frontend url
|
||||
(
|
||||
"Public with act_url -> frontend url", self.record_public_act_url_base,
|
||||
self.public_act_url_share
|
||||
),
|
||||
# not existing -> redirect to my
|
||||
(
|
||||
'Not existing record (internal)', self.record_internal_url_no_exists,
|
||||
my_url,
|
||||
),
|
||||
(
|
||||
'Not existing record (portal enabled)', self.record_portal_url_no_exists,
|
||||
my_url,
|
||||
),
|
||||
(
|
||||
'Not existing model', self.record_url_no_model,
|
||||
my_url,
|
||||
),
|
||||
]:
|
||||
with self.subTest(name=url_name, url=url):
|
||||
res = self.url_open(url)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertURLEqual(res.url, exp_url)
|
||||
|
||||
@mute_logger('werkzeug')
|
||||
def test_portal_access_not_logged(self):
|
||||
""" Check customer behavior when accessing mail/view, notably check token
|
||||
support and propagation. """
|
||||
self.authenticate(None, None)
|
||||
login_url = f'{self.test_base_url}/web/login'
|
||||
|
||||
for url_name, url, exp_url in [
|
||||
# valid token -> ok -> redirect to portal URL
|
||||
(
|
||||
"No access (portal enabled), token", self.record_portal_url_auth,
|
||||
self.portal_web_url_with_token,
|
||||
),
|
||||
# invalid token -> ko -> redirect to login with redirect to original link, will be rejected after login
|
||||
(
|
||||
"No access (portal enabled), invalid token", self.record_portal_url_auth_wrong_token,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_portal_url_auth_wrong_token.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
# std url, no access to record -> redirect to login with redirect to original link, will be rejected after login
|
||||
(
|
||||
'No access record (internal)', self.record_internal_url_base,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_internal_url_base.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
# std url, no access to record but portal -> redirect to login, original (local) URL kept as redirection post login to try again (even if faulty)
|
||||
(
|
||||
'No access record (portal enabled)', self.record_portal_url_base,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_portal_url_base.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
(
|
||||
'No access record (portal can read, no customer portal)', self.record_read_url_base,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_read_url_base.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
# public_type act_url -> share users are redirected to frontend url
|
||||
(
|
||||
"Public with act_url -> frontend url", self.record_public_act_url_base,
|
||||
self.public_act_url_share
|
||||
),
|
||||
# not existing -> redirect to login, original (internal) URL kept as redirection post login to try again (even if faulty)
|
||||
(
|
||||
'Not existing record (internal)', self.record_internal_url_no_exists,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_internal_url_no_exists.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
(
|
||||
'Not existing record (portal enabled)', self.record_portal_url_no_exists,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_portal_url_no_exists.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
(
|
||||
'Not existing model', self.record_url_no_model,
|
||||
f'{login_url}?{url_encode({"redirect": self.record_url_no_model.replace(self.test_base_url, "")})}',
|
||||
),
|
||||
]:
|
||||
with self.subTest(name=url_name, url=url):
|
||||
res = self.url_open(url)
|
||||
self.assertEqual(res.status_code, 200)
|
||||
self.assertURLEqual(res.url, exp_url)
|
||||
|
||||
def test_redirect_to_records_norecord(self):
|
||||
""" Check specific use case of missing model, should directly redirect
|
||||
to login page. """
|
||||
for model, res_id in [
|
||||
(False, self.record_portal.id),
|
||||
('', self.record_portal.id),
|
||||
(self.record_portal._name, False),
|
||||
(self.record_portal._name, ''),
|
||||
(False, False),
|
||||
('wrong.model', self.record_portal.id),
|
||||
(self.record_portal._name, -4),
|
||||
]:
|
||||
response = self.url_open(
|
||||
'/mail/view?model=%s&res_id=%s' % (model, res_id),
|
||||
timeout=15
|
||||
)
|
||||
path = url_parse(response.url).path
|
||||
self.assertEqual(
|
||||
path, '/web/login',
|
||||
'Failed with %s - %s' % (model, res_id)
|
||||
)
|
||||
|
||||
|
||||
@tagged('portal')
|
||||
class TestPortalMixin(TestPortal):
|
||||
|
||||
@users('employee')
|
||||
def test_portal_mixin(self):
|
||||
""" Test internals of portal mixin """
|
||||
customer = self.partner_1.with_env(self.env)
|
||||
record_portal = self.env['mail.test.portal'].create({
|
||||
'partner_id': customer.id,
|
||||
'name': 'Test Portal Record',
|
||||
})
|
||||
|
||||
self.assertFalse(record_portal.access_token)
|
||||
self.assertEqual(record_portal.access_url, '/my/test_portal/%s' % record_portal.id)
|
||||
|
||||
record_portal._portal_ensure_token()
|
||||
self.assertTrue(record_portal.access_token)
|
||||
|
|
@ -0,0 +1,237 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
import lxml
|
||||
from datetime import datetime
|
||||
|
||||
from odoo import http
|
||||
from odoo.addons.test_mail_full.tests.common import TestMailFullCommon
|
||||
from odoo.addons.test_mail_sms.tests.common import TestSMSRecipients
|
||||
from odoo.tests import tagged
|
||||
from odoo.tests.common import HttpCase, users, warmup
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
|
||||
class TestRatingCommon(TestMailFullCommon, TestSMSRecipients):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestRatingCommon, cls).setUpClass()
|
||||
|
||||
cls.record_rating = cls.env['mail.test.rating'].create({
|
||||
'customer_id': cls.partner_1.id,
|
||||
'name': 'Test Rating',
|
||||
'user_id': cls.user_admin.id,
|
||||
})
|
||||
|
||||
|
||||
@tagged('rating')
|
||||
class TestRatingFlow(TestRatingCommon):
|
||||
|
||||
def test_initial_values(self):
|
||||
record_rating = self.record_rating.with_env(self.env)
|
||||
self.assertFalse(record_rating.rating_ids)
|
||||
self.assertEqual(record_rating.message_partner_ids, self.partner_admin)
|
||||
self.assertEqual(len(record_rating.message_ids), 1)
|
||||
|
||||
@users('employee')
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_rating_prepare(self):
|
||||
record_rating = self.record_rating.with_env(self.env)
|
||||
|
||||
# prepare rating token
|
||||
access_token = record_rating._rating_get_access_token()
|
||||
|
||||
# check rating creation
|
||||
rating = record_rating.rating_ids
|
||||
self.assertEqual(rating.access_token, access_token)
|
||||
self.assertFalse(rating.consumed)
|
||||
self.assertFalse(rating.is_internal)
|
||||
self.assertEqual(rating.partner_id, self.partner_1)
|
||||
self.assertEqual(rating.rated_partner_id, self.user_admin.partner_id)
|
||||
self.assertFalse(rating.rating)
|
||||
|
||||
@users('employee')
|
||||
@mute_logger('odoo.addons.mail.models.mail_mail')
|
||||
def test_rating_rating_apply(self):
|
||||
record_rating = self.record_rating.with_env(self.env)
|
||||
record_messages = record_rating.message_ids
|
||||
|
||||
# prepare rating token
|
||||
access_token = record_rating._rating_get_access_token()
|
||||
|
||||
# simulate an email click: notification should be delayed
|
||||
with self.mock_mail_gateway(mail_unlink_sent=False), self.mock_mail_app():
|
||||
record_rating.rating_apply(5, token=access_token, feedback='Top Feedback', notify_delay_send=True)
|
||||
message = record_rating.message_ids[0]
|
||||
rating = record_rating.rating_ids
|
||||
|
||||
# check posted message
|
||||
self.assertEqual(record_rating.message_ids, record_messages + message)
|
||||
self.assertIn('Top Feedback', message.body)
|
||||
self.assertIn('/rating/static/src/img/rating_5.png', message.body)
|
||||
self.assertEqual(message.author_id, self.partner_1)
|
||||
self.assertEqual(message.rating_ids, rating)
|
||||
self.assertFalse(message.notified_partner_ids)
|
||||
self.assertEqual(message.subtype_id, self.env.ref('test_mail_full.mt_mail_test_rating_rating_done'))
|
||||
|
||||
# check rating update
|
||||
self.assertTrue(rating.consumed)
|
||||
self.assertEqual(rating.feedback, 'Top Feedback')
|
||||
self.assertEqual(rating.message_id, message)
|
||||
self.assertEqual(rating.rating, 5)
|
||||
self.assertEqual(record_rating.rating_last_value, 5)
|
||||
|
||||
# give a feedback: send notifications (notify_delay_send set to False)
|
||||
with self.mock_mail_gateway(mail_unlink_sent=False), self.mock_mail_app():
|
||||
record_rating.rating_apply(1, token=access_token, feedback='Bad Feedback')
|
||||
|
||||
# check posted message: message is updated
|
||||
update_message = record_rating.message_ids[0]
|
||||
self.assertEqual(update_message, message, 'Should update first message')
|
||||
self.assertEqual(record_rating.message_ids, record_messages + update_message)
|
||||
self.assertIn('Bad Feedback', update_message.body)
|
||||
self.assertIn('/rating/static/src/img/rating_1.png', update_message.body)
|
||||
self.assertEqual(update_message.author_id, self.partner_1)
|
||||
self.assertEqual(update_message.rating_ids, rating)
|
||||
self.assertEqual(update_message.notified_partner_ids, self.partner_admin)
|
||||
self.assertEqual(update_message.subtype_id, self.env.ref("test_mail_full.mt_mail_test_rating_rating_done"))
|
||||
|
||||
# check rating update
|
||||
new_rating = record_rating.rating_ids
|
||||
self.assertEqual(new_rating, rating, 'Should update first rating')
|
||||
self.assertTrue(new_rating.consumed)
|
||||
self.assertEqual(new_rating.feedback, 'Bad Feedback')
|
||||
self.assertEqual(new_rating.message_id, update_message)
|
||||
self.assertEqual(new_rating.rating, 1)
|
||||
self.assertEqual(record_rating.rating_last_value, 1)
|
||||
|
||||
|
||||
@tagged('rating')
|
||||
class TestRatingMixin(TestRatingCommon):
|
||||
|
||||
@users('employee')
|
||||
@warmup
|
||||
def test_rating_values(self):
|
||||
record_rating = self.record_rating.with_env(self.env)
|
||||
|
||||
# prepare rating token
|
||||
access_0 = record_rating._rating_get_access_token()
|
||||
last_rating = record_rating.rating_apply(3, token=access_0, feedback="This record is meh but it's cheap.")
|
||||
# Make sure to update the write_date which is used to retrieve the last rating
|
||||
last_rating.write_date = datetime(2022, 1, 1, 14, 00)
|
||||
access_1 = record_rating._rating_get_access_token()
|
||||
last_rating = record_rating.rating_apply(1, token=access_1, feedback="This record sucks so much. I want to speak to the manager !")
|
||||
last_rating.write_date = datetime(2022, 2, 1, 14, 00)
|
||||
access_2 = record_rating._rating_get_access_token()
|
||||
last_rating = record_rating.rating_apply(5, token=access_2, feedback="This is the best record ever ! I wish I read the documentation before complaining !")
|
||||
last_rating.write_date = datetime(2022, 3, 1, 14, 00)
|
||||
record_rating.rating_ids.flush_model(['write_date'])
|
||||
|
||||
self.assertEqual(record_rating.rating_last_value, 5, "The last rating is kept.")
|
||||
self.assertEqual(record_rating.rating_avg, 3, "The average should be equal to 3")
|
||||
|
||||
|
||||
@tagged('rating', 'mail_performance', 'post_install', '-at_install')
|
||||
class TestRatingPerformance(TestRatingCommon):
|
||||
|
||||
@users('employee')
|
||||
@warmup
|
||||
def test_rating_last_value_perfs(self):
|
||||
RECORD_COUNT = 100
|
||||
partners = self.env['res.partner'].sudo().create([
|
||||
{'name': 'Jean-Luc %s' % (idx), 'email': 'jean-luc-%s@opoo.com' % (idx)} for idx in range(RECORD_COUNT)])
|
||||
|
||||
with self.assertQueryCount(employee=1516): # tmf 1516 / com 5510
|
||||
record_ratings = self.env['mail.test.rating'].create([{
|
||||
'customer_id': partners[idx].id,
|
||||
'name': 'Test Rating',
|
||||
'user_id': self.user_admin.id,
|
||||
} for idx in range(RECORD_COUNT)])
|
||||
self.flush_tracking()
|
||||
|
||||
with self.assertQueryCount(employee=2004): # tmf 2004
|
||||
for record in record_ratings:
|
||||
access_token = record._rating_get_access_token()
|
||||
record.rating_apply(1, token=access_token)
|
||||
self.flush_tracking()
|
||||
|
||||
with self.assertQueryCount(employee=2003): # tmf 2003
|
||||
for record in record_ratings:
|
||||
access_token = record._rating_get_access_token()
|
||||
record.rating_apply(5, token=access_token)
|
||||
self.flush_tracking()
|
||||
|
||||
with self.assertQueryCount(employee=1):
|
||||
record_ratings._compute_rating_last_value()
|
||||
vals = [val == 5 for val in record_ratings.mapped('rating_last_value')]
|
||||
self.assertTrue(all(vals), "The last rating is kept.")
|
||||
|
||||
|
||||
@tagged('rating')
|
||||
class TestRatingRoutes(HttpCase, TestRatingCommon):
|
||||
|
||||
def test_open_rating_route(self):
|
||||
"""
|
||||
16.0 + expected behavior
|
||||
1) Clicking on the smiley image triggers the /rate/<string:token>/<int:rate>
|
||||
route should not update the rating of the record but simply redirect
|
||||
to the feedback form
|
||||
2) Customer interacts with webpage and submits FORM. Triggers /rate/<string:token>/submit_feedback
|
||||
route. Should update the rating of the record with the data in the POST request
|
||||
"""
|
||||
self.authenticate(None, None) # set up session for public user
|
||||
access_token = self.record_rating._rating_get_access_token()
|
||||
|
||||
# First round of clicking the URL and then submitting FORM data
|
||||
response_click_one = self.url_open(f"/rate/{access_token}/5")
|
||||
response_click_one.raise_for_status()
|
||||
|
||||
# there should be a form to post to validate the feedback and avoid one-click anyway
|
||||
forms = lxml.html.fromstring(response_click_one.content).xpath('//form')
|
||||
self.assertEqual(forms[0].get('method'), 'post')
|
||||
self.assertEqual(forms[0].get('action', ''), f'/rate/{access_token}/submit_feedback')
|
||||
|
||||
# rating should not change, i.e. default values
|
||||
rating = self.record_rating.rating_ids
|
||||
self.assertFalse(rating.consumed)
|
||||
self.assertEqual(rating.rating, 0)
|
||||
self.assertFalse(rating.feedback)
|
||||
self.assertEqual(self.record_rating.rating_last_value, 0)
|
||||
|
||||
response_submit_one = self.url_open(
|
||||
f"/rate/{access_token}/submit_feedback",
|
||||
data={
|
||||
"rate": 5,
|
||||
"csrf_token": http.Request.csrf_token(self),
|
||||
"feedback": "good",
|
||||
}
|
||||
)
|
||||
response_submit_one.raise_for_status()
|
||||
|
||||
rating_post_submit_one = self.record_rating.rating_ids
|
||||
self.assertTrue(rating_post_submit_one.consumed)
|
||||
self.assertEqual(rating_post_submit_one.rating, 5)
|
||||
self.assertEqual(rating_post_submit_one.feedback, "good")
|
||||
self.assertEqual(self.record_rating.rating_last_value, 5)
|
||||
|
||||
# Second round of clicking the URL and then submitting FORM data
|
||||
response_click_two = self.url_open(f"/rate/{access_token}/1")
|
||||
response_click_two.raise_for_status()
|
||||
self.assertEqual(self.record_rating.rating_last_value, 5) # should not be updated to 1
|
||||
|
||||
# check returned form
|
||||
forms = lxml.html.fromstring(response_click_two.content).xpath('//form')
|
||||
self.assertEqual(forms[0].get('method'), 'post')
|
||||
self.assertEqual(forms[0].get('action', ''), f'/rate/{access_token}/submit_feedback')
|
||||
|
||||
response_submit_two = self.url_open(f"/rate/{access_token}/submit_feedback",
|
||||
data={"rate": 1,
|
||||
"csrf_token": http.Request.csrf_token(self),
|
||||
"feedback": "bad job"})
|
||||
response_submit_two.raise_for_status()
|
||||
|
||||
rating_post_submit_second = self.record_rating.rating_ids
|
||||
self.assertTrue(rating_post_submit_second.consumed)
|
||||
self.assertEqual(rating_post_submit_second.rating, 1)
|
||||
self.assertEqual(rating_post_submit_second.feedback, "bad job")
|
||||
self.assertEqual(self.record_rating.rating_last_value, 1)
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||||
|
||||
from odoo.addons.mail.tests.common import mail_new_test_user
|
||||
from odoo.addons.test_mail_full.tests.common import TestMailFullCommon
|
||||
|
||||
|
||||
class TestResUsers(TestMailFullCommon):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestResUsers, cls).setUpClass()
|
||||
cls.portal_user = mail_new_test_user(
|
||||
cls.env,
|
||||
login='portal_user',
|
||||
mobile='+32 494 12 34 56',
|
||||
phone='+32 494 12 34 89',
|
||||
password='password',
|
||||
name='Portal User',
|
||||
email='portal@test.example.com',
|
||||
groups='base.group_portal',
|
||||
)
|
||||
|
||||
cls.portal_user_2 = mail_new_test_user(
|
||||
cls.env,
|
||||
login='portal_user_2',
|
||||
mobile='+32 494 12 34 22',
|
||||
phone='invalid phone',
|
||||
password='password',
|
||||
name='Portal User 2',
|
||||
email='portal_2@test.example.com',
|
||||
groups='base.group_portal',
|
||||
)
|
||||
|
||||
# Remove existing blacklisted email / phone (they will be sanitized, so we avoid to sanitize them here)
|
||||
cls.env['mail.blacklist'].search([]).unlink()
|
||||
cls.env['phone.blacklist'].search([]).unlink()
|
||||
|
||||
def test_deactivate_portal_users_blacklist(self):
|
||||
"""Test that the email and the phone are blacklisted
|
||||
when a portal user deactivate his own account.
|
||||
"""
|
||||
(self.portal_user | self.portal_user_2)._deactivate_portal_user(request_blacklist=True)
|
||||
|
||||
self.assertFalse(self.portal_user.active, 'Should have archived the user')
|
||||
self.assertFalse(self.portal_user.partner_id.active, 'Should have archived the partner')
|
||||
self.assertFalse(self.portal_user_2.active, 'Should have archived the user')
|
||||
self.assertFalse(self.portal_user_2.partner_id.active, 'Should have archived the partner')
|
||||
|
||||
blacklist = self.env['mail.blacklist'].search([
|
||||
('email', 'in', ('portal@test.example.com', 'portal_2@test.example.com')),
|
||||
])
|
||||
self.assertEqual(len(blacklist), 2, 'Should have blacklisted the users email')
|
||||
|
||||
blacklists = self.env['phone.blacklist'].search([
|
||||
('number', 'in', ('+32494123489', '+32494123456', '+32494123422')),
|
||||
])
|
||||
self.assertEqual(len(blacklists), 3, 'Should have blacklisted the user phone and mobile')
|
||||
|
||||
blacklist = self.env['phone.blacklist'].search([('number', '=', 'invalid phone')])
|
||||
self.assertFalse(blacklist, 'Should have skipped invalid phone')
|
||||
|
||||
def test_deactivate_portal_users_no_blacklist(self):
|
||||
"""Test the case when the user do not want to blacklist his email / phone."""
|
||||
(self.portal_user | self.portal_user_2)._deactivate_portal_user(request_blacklist=False)
|
||||
|
||||
self.assertFalse(self.portal_user.active, 'Should have archived the user')
|
||||
self.assertFalse(self.portal_user.partner_id.active, 'Should have archived the partner')
|
||||
self.assertFalse(self.portal_user_2.active, 'Should have archived the user')
|
||||
self.assertFalse(self.portal_user_2.partner_id.active, 'Should have archived the partner')
|
||||
|
||||
blacklists = self.env['mail.blacklist'].search([])
|
||||
self.assertFalse(blacklists, 'Should not have blacklisted the users email')
|
||||
|
||||
blacklists = self.env['phone.blacklist'].search([])
|
||||
self.assertFalse(blacklists, 'Should not have blacklisted the user phone and mobile')
|
||||
Loading…
Add table
Add a link
Reference in a new issue