Initial commit: Security packages

This commit is contained in:
Ernad Husremovic 2025-08-29 15:20:51 +02:00
commit bb469e4763
1399 changed files with 278378 additions and 0 deletions

View file

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from . import res_config_settings
from . import ir_http
from . import res_partner
from . import res_users

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models
from odoo.http import request
class Http(models.AbstractModel):
_inherit = 'ir.http'
@classmethod
def _pre_dispatch(cls, rule, args):
super()._pre_dispatch(rule, args)
# add signup token or login to the session if given
for key in ('auth_signup_token', 'auth_login'):
val = request.httprequest.args.get(key)
if val is not None:
request.session[key] = val

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import fields, models
class ResConfigSettings(models.TransientModel):
_inherit = 'res.config.settings'
auth_signup_reset_password = fields.Boolean(
string='Enable password reset from Login page',
config_parameter='auth_signup.reset_password')
auth_signup_uninvited = fields.Selection(
selection=[
('b2b', 'On invitation'),
('b2c', 'Free sign up'),
],
string='Customer Account',
default='b2c',
config_parameter='auth_signup.invitation_scope')
auth_signup_template_user_id = fields.Many2one(
'res.users',
string='Template user for new users created through signup',
config_parameter='base.template_portal_user_id')

View file

@ -0,0 +1,197 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import random
import werkzeug.urls
from collections import defaultdict
from datetime import datetime, timedelta
from odoo import api, exceptions, fields, models, _
from odoo.tools import sql
class SignupError(Exception):
pass
def random_token():
# the token has an entropy of about 120 bits (6 bits/char * 20 chars)
chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
return ''.join(random.SystemRandom().choice(chars) for _ in range(20))
def now(**kwargs):
return datetime.now() + timedelta(**kwargs)
class ResPartner(models.Model):
_inherit = 'res.partner'
signup_token = fields.Char(copy=False, groups="base.group_erp_manager", compute='_compute_token', inverse='_inverse_token')
signup_type = fields.Char(string='Signup Token Type', copy=False, groups="base.group_erp_manager")
signup_expiration = fields.Datetime(copy=False, groups="base.group_erp_manager")
signup_valid = fields.Boolean(compute='_compute_signup_valid', string='Signup Token is Valid')
signup_url = fields.Char(compute='_compute_signup_url', string='Signup URL')
def init(self):
super().init()
if not sql.column_exists(self.env.cr, self._table, "signup_token"):
self.env.cr.execute("ALTER TABLE res_partner ADD COLUMN signup_token varchar")
@api.depends('signup_token', 'signup_expiration')
def _compute_signup_valid(self):
dt = now()
for partner, partner_sudo in zip(self, self.sudo()):
partner.signup_valid = bool(partner_sudo.signup_token) and \
(not partner_sudo.signup_expiration or dt <= partner_sudo.signup_expiration)
def _compute_signup_url(self):
""" proxy for function field towards actual implementation """
result = self.sudo()._get_signup_url_for_action()
for partner in self:
if any(u._is_internal() for u in partner.user_ids if u != self.env.user):
self.env['res.users'].check_access_rights('write')
if any(u.has_group('base.group_portal') for u in partner.user_ids if u != self.env.user):
self.env['res.partner'].check_access_rights('write')
partner.signup_url = result.get(partner.id, False)
def _compute_token(self):
for partner in self.filtered('id'):
self.env.cr.execute('SELECT signup_token FROM res_partner WHERE id=%s', (partner._origin.id,))
partner.signup_token = self.env.cr.fetchone()[0]
def _inverse_token(self):
for partner in self.filtered('id'):
self.env.cr.execute('UPDATE res_partner SET signup_token = %s WHERE id=%s', (partner.signup_token or None, partner.id))
def _get_signup_url_for_action(self, url=None, action=None, view_type=None, menu_id=None, res_id=None, model=None):
""" generate a signup url for the given partner ids and action, possibly overriding
the url state components (menu_id, id, view_type) """
res = dict.fromkeys(self.ids, False)
for partner in self:
base_url = partner.get_base_url()
# when required, make sure the partner has a valid signup token
if self.env.context.get('signup_valid') and not partner.user_ids:
partner.sudo().signup_prepare()
route = 'login'
# the parameters to encode for the query
query = {'db': self.env.cr.dbname}
if self.env.context.get('create_user'):
query['signup_email'] = partner.email
signup_type = self.env.context.get('signup_force_type_in_url', partner.sudo().signup_type or '')
if signup_type:
route = 'reset_password' if signup_type == 'reset' else signup_type
if partner.sudo().signup_token and signup_type:
query['token'] = partner.sudo().signup_token
elif partner.user_ids:
query['login'] = partner.user_ids[0].login
else:
continue # no signup token, no user, thus no signup url!
if url:
query['redirect'] = url
else:
fragment = dict()
base = '/web#'
if action == '/mail/view':
base = '/mail/view?'
elif action:
fragment['action'] = action
if view_type:
fragment['view_type'] = view_type
if menu_id:
fragment['menu_id'] = menu_id
if model:
fragment['model'] = model
if res_id:
fragment['res_id'] = res_id
if fragment:
query['redirect'] = base + werkzeug.urls.url_encode(fragment)
signup_url = "/web/%s?%s" % (route, werkzeug.urls.url_encode(query))
if not self.env.context.get('relative_url'):
signup_url = werkzeug.urls.url_join(base_url, signup_url)
res[partner.id] = signup_url
return res
def action_signup_prepare(self):
return self.signup_prepare()
def signup_get_auth_param(self):
""" Get a signup token related to the partner if signup is enabled.
If the partner already has a user, get the login parameter.
"""
if not self.env.user._is_internal() and not self.env.is_admin():
raise exceptions.AccessDenied()
res = defaultdict(dict)
allow_signup = self.env['res.users']._get_signup_invitation_scope() == 'b2c'
for partner in self:
partner = partner.sudo()
if allow_signup and not partner.user_ids:
partner.signup_prepare()
res[partner.id]['auth_signup_token'] = partner.signup_token
elif partner.user_ids:
res[partner.id]['auth_login'] = partner.user_ids[0].login
return res
def signup_cancel(self):
return self.write({'signup_token': False, 'signup_type': False, 'signup_expiration': False})
def signup_prepare(self, signup_type="signup", expiration=False):
""" generate a new token for the partners with the given validity, if necessary
:param expiration: the expiration datetime of the token (string, optional)
"""
for partner in self:
if expiration or not partner.signup_valid:
token = random_token()
while self._signup_retrieve_partner(token):
token = random_token()
partner.write({'signup_token': token, 'signup_type': signup_type, 'signup_expiration': expiration})
return True
@api.model
def _signup_retrieve_partner(self, token, check_validity=False, raise_exception=False):
""" find the partner corresponding to a token, and possibly check its validity
:param token: the token to resolve
:param check_validity: if True, also check validity
:param raise_exception: if True, raise exception instead of returning False
:return: partner (browse record) or False (if raise_exception is False)
"""
self.env.cr.execute("SELECT id FROM res_partner WHERE signup_token = %s AND active", (token,))
partner_id = self.env.cr.fetchone()
partner = self.browse(partner_id[0]) if partner_id else None
if not partner:
if raise_exception:
raise exceptions.UserError(_("Signup token '%s' is not valid", token))
return False
if check_validity and not partner.signup_valid:
if raise_exception:
raise exceptions.UserError(_("Signup token '%s' is no longer valid", token))
return False
return partner
@api.model
def signup_retrieve_info(self, token):
""" retrieve the user info about the token
:return: a dictionary with the user information:
- 'db': the name of the database
- 'token': the token, if token is valid
- 'name': the name of the partner, if token is valid
- 'login': the user login, if the user already exists
- 'email': the partner email, if the user does not exist
"""
partner = self._signup_retrieve_partner(token, raise_exception=True)
res = {'db': self.env.cr.dbname}
if partner.signup_valid:
res['token'] = token
res['name'] = partner.name
if partner.user_ids:
res['login'] = partner.user_ids[0].login
else:
res['email'] = res['login'] = partner.email or ''
return res

View file

@ -0,0 +1,262 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
from ast import literal_eval
from collections import defaultdict
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools.misc import ustr
from odoo.addons.base.models.ir_mail_server import MailDeliveryException
from odoo.addons.auth_signup.models.res_partner import SignupError, now
_logger = logging.getLogger(__name__)
class ResUsers(models.Model):
_inherit = 'res.users'
state = fields.Selection(compute='_compute_state', search='_search_state', string='Status',
selection=[('new', 'Never Connected'), ('active', 'Confirmed')])
def _search_state(self, operator, value):
negative = operator in expression.NEGATIVE_TERM_OPERATORS
# In case we have no value
if not value:
return expression.TRUE_DOMAIN if negative else expression.FALSE_DOMAIN
if operator in ['in', 'not in']:
if len(value) > 1:
return expression.FALSE_DOMAIN if negative else expression.TRUE_DOMAIN
if value[0] == 'new':
comp = '!=' if negative else '='
if value[0] == 'active':
comp = '=' if negative else '!='
return [('log_ids', comp, False)]
if operator in ['=', '!=']:
# In case we search against anything else than new, we have to invert the operator
if value != 'new':
operator = expression.TERM_OPERATORS_NEGATION[operator]
return [('log_ids', operator, False)]
return expression.TRUE_DOMAIN
def _compute_state(self):
for user in self:
user.state = 'active' if user.login_date else 'new'
@api.model
def signup(self, values, token=None):
""" signup a user, to either:
- create a new user (no token), or
- create a user for a partner (with token, but no user for partner), or
- change the password of a user (with token, and existing user).
:param values: a dictionary with field values that are written on user
:param token: signup token (optional)
:return: (dbname, login, password) for the signed up user
"""
if token:
# signup with a token: find the corresponding partner id
partner = self.env['res.partner']._signup_retrieve_partner(token, check_validity=True, raise_exception=True)
# invalidate signup token
partner.write({'signup_token': False, 'signup_type': False, 'signup_expiration': False})
partner_user = partner.user_ids and partner.user_ids[0] or False
# avoid overwriting existing (presumably correct) values with geolocation data
if partner.country_id or partner.zip or partner.city:
values.pop('city', None)
values.pop('country_id', None)
if partner.lang:
values.pop('lang', None)
if partner_user:
# user exists, modify it according to values
values.pop('login', None)
values.pop('name', None)
partner_user.write(values)
if not partner_user.login_date:
partner_user._notify_inviter()
return (partner_user.login, values.get('password'))
else:
# user does not exist: sign up invited user
values.update({
'name': partner.name,
'partner_id': partner.id,
'email': values.get('email') or values.get('login'),
})
if partner.company_id:
values['company_id'] = partner.company_id.id
values['company_ids'] = [(6, 0, [partner.company_id.id])]
partner_user = self._signup_create_user(values)
partner_user._notify_inviter()
else:
# no token, sign up an external user
values['email'] = values.get('email') or values.get('login')
self._signup_create_user(values)
return (values.get('login'), values.get('password'))
@api.model
def _get_signup_invitation_scope(self):
return self.env['ir.config_parameter'].sudo().get_param('auth_signup.invitation_scope', 'b2b')
@api.model
def _signup_create_user(self, values):
""" signup a new user using the template user """
# check that uninvited users may sign up
if 'partner_id' not in values:
if self._get_signup_invitation_scope() != 'b2c':
raise SignupError(_('Signup is not allowed for uninvited users'))
return self._create_user_from_template(values)
def _notify_inviter(self):
for user in self:
invite_partner = user.create_uid.partner_id
if invite_partner:
# notify invite user that new user is connected
self.env['bus.bus']._sendone(invite_partner, 'res.users/connection', {
'username': user.name,
'partnerId': user.partner_id.id,
})
def _create_user_from_template(self, values):
template_user_id = literal_eval(self.env['ir.config_parameter'].sudo().get_param('base.template_portal_user_id', 'False'))
template_user = self.browse(template_user_id)
if not template_user.exists():
raise ValueError(_('Signup: invalid template user'))
if not values.get('login'):
raise ValueError(_('Signup: no login given for new user'))
if not values.get('partner_id') and not values.get('name'):
raise ValueError(_('Signup: no name or partner given for new user'))
# create a copy of the template user (attached to a specific partner_id if given)
values['active'] = True
try:
with self.env.cr.savepoint():
return template_user.with_context(no_reset_password=True).copy(values)
except Exception as e:
# copy may failed if asked login is not available.
raise SignupError(ustr(e))
def reset_password(self, login):
""" retrieve the user corresponding to login (login or email),
and reset their password
"""
users = self.search(self._get_login_domain(login))
if not users:
users = self.search(self._get_email_domain(login))
if not users:
raise Exception(_('No account found for this login'))
if len(users) > 1:
raise Exception(_('Multiple accounts found for this login'))
return users.action_reset_password()
def action_reset_password(self):
""" create signup token for each user, and send their signup url by email """
if self.env.context.get('install_mode', False):
return
if self.filtered(lambda user: not user.active):
raise UserError(_("You cannot perform this action on an archived user."))
# prepare reset password signup
create_mode = bool(self.env.context.get('create_user'))
# no time limit for initial invitation, only for reset password
expiration = False if create_mode else now(days=+1)
self.mapped('partner_id').signup_prepare(signup_type="reset", expiration=expiration)
# send email to users with their signup url
template = False
if create_mode:
try:
template = self.env.ref('auth_signup.set_password_email', raise_if_not_found=False)
except ValueError:
pass
if not template:
template = self.env.ref('auth_signup.reset_password_email')
assert template._name == 'mail.template'
email_values = {
'email_cc': False,
'auto_delete': True,
'message_type': 'user_notification',
'recipient_ids': [],
'partner_ids': [],
'scheduled_date': False,
}
for user in self:
if not user.email:
raise UserError(_("Cannot send email: user %s has no email address.", user.name))
email_values['email_to'] = user.email
# TDE FIXME: make this template technical (qweb)
with self.env.cr.savepoint():
force_send = not(self.env.context.get('import_file', False))
template.send_mail(user.id, force_send=force_send, raise_exception=True, email_values=email_values)
_logger.info("Password reset email sent for user <%s> to <%s>", user.login, user.email)
def send_unregistered_user_reminder(self, after_days=5):
email_template = self.env.ref('auth_signup.mail_template_data_unregistered_users', raise_if_not_found=False)
if not email_template:
_logger.warning("Template 'auth_signup.mail_template_data_unregistered_users' was not found. Cannot send reminder notifications.")
return
datetime_min = fields.Datetime.today() - relativedelta(days=after_days)
datetime_max = datetime_min + relativedelta(hours=23, minutes=59, seconds=59)
res_users_with_details = self.env['res.users'].search_read([
('share', '=', False),
('create_uid.email', '!=', False),
('create_date', '>=', datetime_min),
('create_date', '<=', datetime_max),
('log_ids', '=', False)], ['create_uid', 'name', 'login'])
# group by invited by
invited_users = defaultdict(list)
for user in res_users_with_details:
invited_users[user.get('create_uid')[0]].append("%s (%s)" % (user.get('name'), user.get('login')))
# For sending mail to all the invitors about their invited users
for user in invited_users:
template = email_template.with_context(dbname=self._cr.dbname, invited_users=invited_users[user])
template.send_mail(user, email_layout_xmlid='mail.mail_notification_light', force_send=False)
@api.model
def web_create_users(self, emails):
inactive_users = self.search([('state', '=', 'new'), '|', ('login', 'in', emails), ('email', 'in', emails)])
new_emails = set(emails) - set(inactive_users.mapped('email'))
res = super(ResUsers, self).web_create_users(list(new_emails))
if inactive_users:
inactive_users.with_context(create_user=True).action_reset_password()
return res
@api.model_create_multi
def create(self, vals_list):
# overridden to automatically invite user to sign up
users = super(ResUsers, self).create(vals_list)
if not self.env.context.get('no_reset_password'):
users_with_email = users.filtered('email')
if users_with_email:
try:
users_with_email.with_context(create_user=True).action_reset_password()
except MailDeliveryException:
users_with_email.partner_id.with_context(create_user=True).signup_cancel()
return users
@api.returns('self', lambda value: value.id)
def copy(self, default=None):
self.ensure_one()
sup = super(ResUsers, self)
if not default or not default.get('email'):
# avoid sending email to the user we are duplicating
sup = super(ResUsers, self.with_context(no_reset_password=True))
return sup.copy(default=default)