19.0 vanilla

This commit is contained in:
Ernad Husremovic 2026-03-09 09:32:28 +01:00
parent 20ddc1b4a3
commit c0efcc53f5
1162 changed files with 125577 additions and 105287 deletions

View file

@ -2,6 +2,6 @@
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from . import auth_totp
from . import ir_http
from . import auth_totp_rate_limit_log
from . import res_users
from . import totp

View file

@ -1,19 +1,19 @@
# -*- coding: utf-8 -*-
from odoo import api, models
from odoo.addons.auth_totp.controllers.home import TRUSTED_DEVICE_AGE
from odoo import models
from odoo.addons.auth_totp.controllers.home import TRUSTED_DEVICE_AGE_DAYS
import logging
_logger = logging.getLogger(__name__)
class AuthTotpDevice(models.Model):
class Auth_TotpDevice(models.Model):
# init is overriden in res.users.apikeys to create a secret column 'key'
# use a different model to benefit from the secured methods while not mixing
# two different concepts
_name = "auth_totp.device"
_inherit = "res.users.apikeys"
_name = 'auth_totp.device'
_inherit = ["res.users.apikeys"]
_description = "Authentication Device"
_auto = False
@ -22,10 +22,17 @@ class AuthTotpDevice(models.Model):
assert uid, "uid is required"
return self._check_credentials(scope=scope, key=key) == uid
@api.autovacuum
def _gc_device(self):
self._cr.execute("""
DELETE FROM auth_totp_device
WHERE create_date < (NOW() AT TIME ZONE 'UTC' - INTERVAL '%s SECONDS')
""", [TRUSTED_DEVICE_AGE])
_logger.info("GC'd %d totp devices entries", self._cr.rowcount)
def _get_trusted_device_age(self):
ICP = self.env['ir.config_parameter'].sudo()
try:
nbr_days = int(ICP.get_param('auth_totp.trusted_device_age', TRUSTED_DEVICE_AGE_DAYS))
if nbr_days <= 0:
nbr_days = None
except ValueError:
nbr_days = None
if nbr_days is None:
_logger.warning("Invalid value for 'auth_totp.trusted_device_age', using default value.")
nbr_days = TRUSTED_DEVICE_AGE_DAYS
return nbr_days * 86400 # seconds

View file

@ -0,0 +1,15 @@
from odoo import fields, models
class AuthTotpRateLimitLog(models.TransientModel):
_name = 'auth.totp.rate.limit.log'
_description = 'TOTP rate limit logs'
_user_id_limit_type_create_date_idx = models.Index("(user_id, limit_type, create_date)")
user_id = fields.Many2one('res.users', required=True, readonly=True)
ip = fields.Char(readonly=True)
limit_type = fields.Selection([
('send_email', 'Send Email'),
('code_check', 'Code Checking'),
], readonly=True)

View file

@ -1,13 +0,0 @@
# -*- coding: utf-8 -*-
from odoo import models
from odoo.http import request
class IrHttp(models.AbstractModel):
_inherit = 'ir.http'
def session_info(self):
info = super().session_info()
# because frontend session_info uses this key and is embedded in
# the view source
info["user_id"] = request.session.uid,
return info

View file

@ -7,6 +7,8 @@ import logging
import os
import re
from datetime import datetime, timedelta
from odoo import _, api, fields, models
from odoo.addons.base.models.res_users import check_identity
from odoo.exceptions import AccessDenied, UserError
@ -18,23 +20,30 @@ from odoo.addons.auth_totp.models.totp import TOTP, TOTP_SECRET_SIZE
_logger = logging.getLogger(__name__)
compress = functools.partial(re.sub, r'\s', '')
class Users(models.Model):
TOTP_RATE_LIMITS = {
'send_email': (5, 3600),
'code_check': (5, 3600),
}
class ResUsers(models.Model):
_inherit = 'res.users'
totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS, compute='_compute_totp_secret', inverse='_inverse_totp_secret', search='_search_totp_enable')
totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled', search='_search_totp_enable')
totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS, compute='_compute_totp_secret', inverse='_inverse_token')
totp_last_counter = fields.Integer(copy=False, groups=fields.NO_ACCESS)
totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled', search='_totp_enable_search')
totp_trusted_device_ids = fields.One2many('auth_totp.device', 'user_id', string="Trusted Devices")
def init(self):
super().init()
if not sql.column_exists(self.env.cr, self._table, "totp_secret"):
self.env.cr.execute("ALTER TABLE res_users ADD COLUMN totp_secret varchar")
@property
def SELF_READABLE_FIELDS(self):
return super().SELF_READABLE_FIELDS + ['totp_enabled', 'totp_trusted_device_ids']
def init(self):
init_res = super().init()
if not sql.column_exists(self.env.cr, self._table, "totp_secret"):
self.env.cr.execute("ALTER TABLE res_users ADD COLUMN totp_secret varchar")
return init_res
def _mfa_type(self):
r = super()._mfa_type()
if r is not None:
@ -62,14 +71,29 @@ class Users(models.Model):
def _get_session_token_fields(self):
return super()._get_session_token_fields() | {'totp_secret'}
def _totp_check(self, code):
sudo = self.sudo()
key = base64.b32decode(sudo.totp_secret)
match = TOTP(key).match(code)
if match is None:
_logger.info("2FA check: FAIL for %s %r", self, sudo.login)
raise AccessDenied(_("Verification failed, please double-check the 6-digit code"))
_logger.info("2FA check: SUCCESS for %s %r", self, sudo.login)
def _check_credentials(self, credentials, env):
if credentials['type'] == 'totp':
self._totp_rate_limit('code_check')
sudo = self.sudo()
key = base64.b32decode(sudo.totp_secret)
match = TOTP(key).match(credentials['token'])
if match is None:
_logger.info("2FA check: FAIL for %s %r", self, sudo.login)
raise AccessDenied(_("Verification failed, please double-check the 6-digit code"))
if sudo.totp_last_counter and match <= sudo.totp_last_counter:
_logger.warning("2FA check: REUSE for %s %r", self, sudo.login)
raise AccessDenied(_("Verification failed, please use the latest 6-digit code"))
sudo.totp_last_counter = match
_logger.info("2FA check: SUCCESS for %s %r", self, sudo.login)
self._totp_rate_limit_purge('code_check')
return {
'uid': self.env.user.id,
'auth_method': 'totp',
'mfa': 'default',
}
return super()._check_credentials(credentials, env)
def _totp_try_setting(self, secret, code):
if self.totp_enabled or self != self.env.user:
@ -83,6 +107,7 @@ class Users(models.Model):
return False
self.sudo().totp_secret = secret
self.sudo().totp_last_counter = match
if request:
self.env.flush_all()
# update session token so the user does not get logged out (cache cleared by change)
@ -92,6 +117,40 @@ class Users(models.Model):
_logger.info("2FA enable: SUCCESS for %s %r", self, self.login)
return True
def _totp_rate_limit(self, limit_type):
self.ensure_one()
assert request, "A request is required to be able to rate limit TOTP related actions"
limit, interval = TOTP_RATE_LIMITS[limit_type]
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
ip = request.httprequest.environ['REMOTE_ADDR']
domain = [
('user_id', '=', self.id),
('create_date', '>=', datetime.now() - timedelta(seconds=interval)),
('limit_type', '=', limit_type),
]
count = RateLimitLog.search_count(domain)
if count >= limit:
descriptions = {
'send_email': _('You reached the limit of authentication mails sent for your account, please try again later.'),
'code_check': _('You reached the limit of code verifications for your account, please try again later.'),
}
description = descriptions[limit_type]
raise AccessDenied(description)
RateLimitLog.create({
'user_id': self.id,
'ip': ip,
'limit_type': limit_type,
})
def _totp_rate_limit_purge(self, limit_type):
self.ensure_one()
assert request, "A request is required to be able to rate limit TOTP related actions"
RateLimitLog = self.env['auth.totp.rate.limit.log'].sudo()
RateLimitLog.search([
('user_id', '=', self.id),
('limit_type', '=', limit_type),
]).unlink()
@check_identity
def action_totp_disable(self):
logins = ', '.join(map(repr, self.mapped('login')))
@ -142,7 +201,7 @@ class Users(models.Model):
'name': _("Two-Factor Authentication Activation"),
'res_id': w.id,
'views': [(False, 'form')],
'context': self.env.context,
'context': self.env.context | {'dialog_size': 'medium'},
}
@check_identity
@ -158,16 +217,20 @@ class Users(models.Model):
return super().change_password(old_passwd, new_passwd)
def _compute_totp_secret(self):
for user in self.filtered('id'):
for user in self:
if not user.id:
user.totp_secret = user._origin.totp_secret
continue
self.env.cr.execute('SELECT totp_secret FROM res_users WHERE id=%s', (user.id,))
user.totp_secret = self.env.cr.fetchone()[0]
def _inverse_totp_secret(self):
for user in self.filtered('id'):
def _inverse_token(self):
self.sudo().totp_last_counter = False
for user in self:
secret = user.totp_secret if user.totp_secret else None
self.env.cr.execute('UPDATE res_users SET totp_secret = %s WHERE id=%s', (secret, user.id))
def _search_totp_enable(self, operator, value):
def _totp_enable_search(self, operator, value):
value = not value if operator == '!=' else value
if value:
self.env.cr.execute("SELECT id FROM res_users WHERE totp_secret IS NOT NULL")