# Part of Odoo. See LICENSE file for full copyright and licensing details. import re from collections import defaultdict from pytz import timezone, UTC, utc from datetime import datetime, time, timedelta, date from random import choice from string import digits from dateutil.relativedelta import relativedelta from markupsafe import Markup from odoo import api, fields, models, _, tools from odoo.fields import Domain from odoo.exceptions import ValidationError, AccessError, RedirectWarning, UserError from odoo.tools import convert, format_time, email_normalize, SQL, Query from odoo.tools.intervals import Intervals from odoo.addons.hr.models.hr_version import format_date_abbr from odoo.addons.mail.tools.discuss import Store from odoo.tools.float_utils import float_is_zero # This sentinel object, when in the context, provides read access to the # model 'hr.employee' in certain situations, like when setting a many2many # field for users that don't have access to `hr.employee`. _ALLOW_READ_HR_EMPLOYEE = object() class HrEmployee(models.Model): """ NB: Any field only available on the model hr.employee (i.e. not on the hr.employee.public model) should have `groups="hr.group_hr_user"` on its definition to avoid being prefetched when the user hasn't access to the hr.employee model. Indeed, the prefetch loads the data for all the fields that are available according to the group defined on them. """ _name = 'hr.employee' _description = "Employee" _order = 'name' _inherit = ['mail.thread.main.attachment', 'mail.activity.mixin', 'resource.mixin', 'avatar.mixin'] _mail_post_access = 'read' _primary_email = 'work_email' _inherits = {'hr.version': 'version_id'} # versions version_id = fields.Many2one( 'hr.version', compute='_compute_version_id', search='_search_version_id', ondelete='cascade', required=True, store=False, compute_sudo=True, groups="hr.group_hr_user") current_version_id = fields.Many2one( 'hr.version', compute='_compute_current_version_id', store=True, bypass_search_access=True, ) current_date_version = fields.Date( related="current_version_id.date_version", string="Current Date Version", groups="hr.group_hr_user" ) version_ids = fields.One2many( 'hr.version', 'employee_id', string='Employee Versions', groups="hr.group_hr_user", required=True ) versions_count = fields.Integer(compute='_compute_versions_count', groups="hr.group_hr_user") @api.model def _lang_get(self): return self.env['res.lang'].get_installed() # resource and user # required on the resource, make sure required="True" set in the view name = fields.Char(string="Employee Name", related='resource_id.name', store=True, readonly=False, tracking=True) resource_id = fields.Many2one('resource.resource', required=True) # required because the mixin already creates it so it is not related to the version_id resource_calendar_id = fields.Many2one(related='version_id.resource_calendar_id', inherited=True, index=False, store=False, check_company=True) user_id = fields.Many2one( 'res.users', 'User', related='resource_id.user_id', store=True, readonly=False, check_company=True, precompute=True, index='btree_not_null', ondelete='restrict') user_partner_id = fields.Many2one(related="user_id.partner_id", related_sudo=False, string="User's partner") share = fields.Boolean(related="user_id.share") phone = fields.Char(related="user_id.phone") im_status = fields.Char(related="user_id.im_status") email = fields.Char(related="user_id.email") hr_presence_state = fields.Selection([ ('present', 'Present'), ('absent', 'Absent'), ('archive', 'Archived'), ('out_of_working_hour', 'Off-Hours')], compute='_compute_presence_state', default='out_of_working_hour') last_activity = fields.Date(compute="_compute_last_activity") last_activity_time = fields.Char(compute="_compute_last_activity") hr_icon_display = fields.Selection([ ('presence_present', 'Present'), ('presence_out_of_working_hour', 'Off-Hours'), ('presence_absent', 'Absent'), ('presence_archive', 'Archived'), ('presence_undetermined', 'Undetermined')], compute='_compute_presence_icon') show_hr_icon_display = fields.Boolean(compute='_compute_presence_icon') newly_hired = fields.Boolean('Newly Hired', compute='_compute_newly_hired', search='_search_newly_hired') active = fields.Boolean('Active', related='resource_id.active', default=True, store=True, readonly=False) company_id = fields.Many2one('res.company', required=True, tracking=True) company_country_id = fields.Many2one('res.country', 'Company Country', related='company_id.country_id', readonly=True, groups="base.group_system,hr.group_hr_user") company_country_code = fields.Char(related='company_country_id.code', depends=['company_country_id'], readonly=True, groups="base.group_system,hr.group_hr_user", string='Company Country Code') work_phone = fields.Char('Work Phone', store=True, readonly=False, tracking=True, compute="_compute_work_contact_details", inverse='_inverse_work_contact_details') mobile_phone = fields.Char('Work Mobile') work_email = fields.Char('Work Email', compute="_compute_work_contact_details", store=True, inverse='_inverse_work_contact_details') work_contact_id = fields.Many2one('res.partner', 'Work Contact', copy=False, index='btree_not_null') # private info legal_name = fields.Char(compute='_compute_legal_name', store=True, readonly=False, groups="hr.group_hr_user") is_user_active = fields.Boolean(related='user_id.active', string="User's active", groups="hr.group_hr_user") private_phone = fields.Char(string="Private Phone", groups="hr.group_hr_user") private_email = fields.Char(string="Private Email", groups="hr.group_hr_user") lang = fields.Selection(selection=_lang_get, string="Lang", groups="hr.group_hr_user") place_of_birth = fields.Char('Place of Birth', groups="hr.group_hr_user", tracking=True) country_of_birth = fields.Many2one('res.country', string="Country of Birth", groups="hr.group_hr_user", tracking=True) birthday = fields.Date('Birthday', groups="hr.group_hr_user", tracking=True) birthday_public_display = fields.Boolean('Show to all employees', groups="hr.group_hr_user", default=False) birthday_public_display_string = fields.Char("Public Date of Birth", compute="_compute_birthday_public_display_string", default="hidden") bank_account_ids = fields.Many2many( 'res.partner.bank', relation='employee_bank_account_rel', column1='employee_id', column2='bank_account_id', domain="[('partner_id', '=', work_contact_id), '|', ('company_id', '=', False), ('company_id', '=', company_id)]", groups="hr.group_hr_user", tracking=True, string='Bank Accounts', help='Employee bank accounts to pay salaries') is_trusted_bank_account = fields.Boolean(compute="_compute_is_trusted_bank_account", groups="hr.group_hr_user") primary_bank_account_id = fields.Many2one('res.partner.bank', compute="_compute_primary_bank_account_id", groups="hr.group_hr_user") has_multiple_bank_accounts = fields.Boolean(compute="_compute_has_multiple_bank_accounts", default=False, groups="hr.group_hr_user") salary_distribution = fields.Json(string="Salary Distribution", compute='_sync_salary_distribution', groups='hr.group_hr_user', store=True, readonly=False) """ { `bank_account_id`: { 'sequence': int, 'amount': float, 'amount_is_percentage': boolean, } } """ permit_no = fields.Char('Work Permit No', groups="hr.group_hr_user", tracking=True) visa_no = fields.Char('Visa No', groups="hr.group_hr_user", tracking=True) visa_expire = fields.Date('Visa Expiration Date', groups="hr.group_hr_user", tracking=True) work_permit_expiration_date = fields.Date('Work Permit Expiration Date', groups="hr.group_hr_user", tracking=True) has_work_permit = fields.Binary(string="Work Permit", groups="hr.group_hr_user") work_permit_scheduled_activity = fields.Boolean(default=False, groups="hr.group_hr_user") work_permit_name = fields.Char('work_permit_name', compute='_compute_work_permit_name', groups="hr.group_hr_user") certificate = fields.Selection(selection='_get_certificate_selection', string='Certificate Level', groups="hr.group_hr_user", tracking=True) study_field = fields.Char("Field of Study", groups="hr.group_hr_user", tracking=True) study_school = fields.Char("School", groups="hr.group_hr_user", tracking=True) emergency_contact = fields.Char(groups="hr.group_hr_user", tracking=True) emergency_phone = fields.Char(groups="hr.group_hr_user", tracking=True) work_location_name = fields.Char("Work Location Name", compute="_compute_work_location_name") work_location_type = fields.Selection([ ("home", "Home"), ("office", "Office"), ("other", "Other")], compute="_compute_work_location_type", tracking=True) # All version fields needing a specific group to be accessible should also have `inherited=True` set on its definition to make sure those fields are linked to `_inherits` on `hr.version` contract_date_start = fields.Date(readonly=False, related="version_id.contract_date_start", inherited=True, groups="hr.group_hr_manager") contract_date_end = fields.Date(readonly=False, related="version_id.contract_date_end", inherited=True, groups="hr.group_hr_manager") trial_date_end = fields.Date(readonly=False, related="version_id.trial_date_end", inherited=True, groups="hr.group_hr_manager") contract_wage = fields.Monetary(related="version_id.contract_wage", inherited=True, groups="hr.group_hr_manager") date_start = fields.Date(related='version_id.date_start', inherited=True, groups="hr.group_hr_manager") date_end = fields.Date(related='version_id.date_end', inherited=True, groups="hr.group_hr_manager") is_current = fields.Boolean(related='version_id.is_current', inherited=True, groups="hr.group_hr_manager") is_past = fields.Boolean(related='version_id.is_past', inherited=True, groups="hr.group_hr_manager") is_future = fields.Boolean(related='version_id.is_future', inherited=True, groups="hr.group_hr_manager") is_in_contract = fields.Boolean(related='version_id.is_in_contract', inherited=True, groups="hr.group_hr_manager") structure_type_id = fields.Many2one(readonly=False, related='version_id.structure_type_id', inherited=True, groups="hr.group_hr_manager") contract_type_id = fields.Many2one(readonly=False, related='version_id.contract_type_id', inherited=True, groups="hr.group_hr_manager") # employee in company parent_id = fields.Many2one('hr.employee', 'Manager', tracking=True, index=True, domain="['|', ('company_id', '=', False), ('company_id', 'in', allowed_company_ids)]") child_ids = fields.One2many('hr.employee', 'parent_id', string='Direct subordinates') coach_id = fields.Many2one( 'hr.employee', 'Coach', compute='_compute_coach', store=True, readonly=False, domain="['|', ('company_id', '=', False), ('company_id', 'in', allowed_company_ids)]", help='Select the "Employee" who is the coach of this employee.\n' 'The "Coach" has no specific rights or responsibilities by default.') category_ids = fields.Many2many( 'hr.employee.category', 'employee_category_rel', 'employee_id', 'category_id', groups="hr.group_hr_user", string='Tags') tz = fields.Selection(tracking=True) # misc color = fields.Integer('Color Index', default=0) barcode = fields.Char(string="Badge ID", help="ID used for employee identification.", groups="hr.group_hr_user", copy=False) pin = fields.Char(string="PIN", groups="hr.group_hr_user", copy=False, help="PIN used to Check In/Out in the Kiosk Mode of the Attendance application (if enabled in Configuration) and to change the cashier in the Point of Sale application.") message_main_attachment_id = fields.Many2one(groups="hr.group_hr_user") id_card = fields.Binary(string="ID Card Copy", groups="hr.group_hr_user") driving_license = fields.Binary(string="Driving License", groups="hr.group_hr_user") private_car_plate = fields.Char(groups="hr.group_hr_user", help="If you have more than one car, just separate the plates by a space.") currency_id = fields.Many2one('res.currency', related='company_id.currency_id', readonly=True, groups="hr.group_hr_user") related_partners_count = fields.Integer(compute="_compute_related_partners_count", groups="hr.group_hr_user") # properties employee_properties = fields.Properties('Properties', definition='company_id.employee_properties_definition', precompute=False, groups="hr.group_hr_user") # mail.activity.mixin activity_ids = fields.One2many(groups="hr.group_hr_user") activity_state = fields.Selection(groups="hr.group_hr_user") activity_user_id = fields.Many2one(groups="hr.group_hr_user") activity_type_id = fields.Many2one(groups="hr.group_hr_user") activity_type_icon = fields.Char(groups="hr.group_hr_user") activity_date_deadline = fields.Date(groups="hr.group_hr_user") my_activity_date_deadline = fields.Date(groups="hr.group_hr_user") activity_summary = fields.Char(groups="hr.group_hr_user") activity_exception_decoration = fields.Selection(groups="hr.group_hr_user") activity_exception_icon = fields.Char(groups="hr.group_hr_user") # mail.thread mixin message_is_follower = fields.Boolean(groups="hr.group_hr_user") message_follower_ids = fields.One2many(groups="hr.group_hr_user") message_partner_ids = fields.Many2many(groups="hr.group_hr_user") message_ids = fields.One2many(groups="hr.group_hr_user") has_message = fields.Boolean(groups="hr.group_hr_user") message_needaction = fields.Boolean(groups="hr.group_hr_user") message_needaction_counter = fields.Integer(groups="hr.group_hr_user") message_has_error = fields.Boolean(groups="hr.group_hr_user") message_has_error_counter = fields.Integer(groups="hr.group_hr_user") message_attachment_count = fields.Integer(groups="hr.group_hr_user") _barcode_uniq = models.Constraint( 'unique (barcode)', 'The Badge ID must be unique, this one is already assigned to another employee.', ) _user_uniq = models.Constraint( 'unique (user_id, company_id)', 'A user cannot be linked to multiple employees in the same company.', ) def _prepare_create_values(self, vals_list): result = super()._prepare_create_values(vals_list) new_vals_list = [] Version = self.env['hr.version'] version_fields = [fname for fname, field in Version._fields.items() if Version._has_field_access(field, 'write')] for vals in result: employee_vals = {} version_vals = {} for fname, value in vals.items(): employee_field = self._fields.get(fname) if not (employee_field and employee_field.inherited and employee_field.related_field.model_name == 'hr.version'): employee_vals[fname] = value else: version_vals[fname] = value new_vals_list.append({ **employee_vals, **{k: v for k, v in version_vals.items() if k in version_fields}, }) return new_vals_list @api.depends('bank_account_ids.allow_out_payment') def _compute_is_trusted_bank_account(self): for employee in self: employee.is_trusted_bank_account = employee.primary_bank_account_id.allow_out_payment @api.depends('bank_account_ids') def _compute_has_multiple_bank_accounts(self): for employee in self: if employee.bank_account_ids and len(employee.bank_account_ids) > 1: employee.has_multiple_bank_accounts = True else: employee.has_multiple_bank_accounts = False @api.depends('bank_account_ids') def _sync_salary_distribution(self): for employee in self: current_salary_distribution = employee.salary_distribution or {} current_ids = set(map(int, current_salary_distribution.keys())) account_ids = set(employee.bank_account_ids.ids) added_ids = account_ids - current_ids removed_ids = current_ids - account_ids unchanged_ids = account_ids & current_ids # Preserve existing data and order ordered = sorted([ (int(i), data) for i, data in current_salary_distribution.items() if int(i) in unchanged_ids ], key=lambda x: (not x[1].get('amount_is_percentage'), x[1].get('sequence', float('inf')))) new_salary_distribution = {str(i): data for i, data in ordered} # Redistribute removed % to first item removed_percentage = sum(current_salary_distribution[str(i)]['amount'] for i in removed_ids if str(i) in current_salary_distribution and current_salary_distribution[str(i)]['amount_is_percentage']) if removed_percentage and ordered: first_id = str(ordered[0][0]) if new_salary_distribution[first_id]['amount_is_percentage']: new_salary_distribution[first_id]['amount'] += removed_percentage # Add new entries with remaining % total_allocated = sum(d['amount'] for d in new_salary_distribution.values() if d['amount_is_percentage']) remaining = max(0.0, 100.0 - total_allocated) seq = max((d.get('sequence', 0) for d in new_salary_distribution.values()), default=0) amount = employee.currency_id.round(remaining / len(added_ids)) if added_ids else 0.0 for i, new_id in enumerate(added_ids): seq += 1 if i == len(added_ids) - 1: amount = remaining new_salary_distribution[str(new_id)] = { 'amount': amount, 'amount_is_percentage': True, 'sequence': seq, } remaining -= amount employee.salary_distribution = new_salary_distribution @api.constrains('salary_distribution') def _check_salary_distribution(self): for employee in self: dist = employee.salary_distribution if not dist: continue total = 0 check_total = False for ba_values in dist.values(): amount = ba_values.get('amount') is_percentage = ba_values.get('amount_is_percentage', True) if is_percentage and (not isinstance(amount, (float, int)) or not (0 <= amount <= 100)): raise ValidationError(self.env._("Each amount percentage must be a number between 0 and 100.")) if is_percentage: check_total = True total += amount if check_total and not float_is_zero(total - 100.0, precision_digits=4): raise ValidationError(self.env._("Total salary distribution on bank accounts must be exactly 100%.")) @api.model def _create(self, data_list): versions = [vals['stored'].pop('version_id', None) for vals in data_list] result = super()._create(data_list) for (employee, version_id, vals) in zip(result, versions, data_list): version = self.env['hr.version'].browse(version_id) version.employee_id = employee.id version.write({**vals.get('inherited', {})['hr.version'], 'employee_id': employee.id}) return result @api.model @api.deprecated("Override of a deprecated method") def check_field_access_rights(self, operation, field_names): # DISCLAIMER: Dirty hack to avoid having to create a bridge module to override only a # groups on a field which is not prefetched (because not stored) but would crash anyway # if we try to read them directly (very uncommon use case). Don't add your field on this # list if you can specify the group on the field directly (as all the other fields). result = super().check_field_access_rights(operation, field_names) if not self.env.user.has_group("hr.group_hr_user"): result = [field for field in result if field not in ['activity_calendar_event_id', 'rating_ids', 'website_message_ids', 'message_has_sms_error']] return result def _has_field_access(self, field, operation): # DISCLAIMER: Dirty hack to avoid having to create a bridge module to override only a # groups on a field which is not prefetched (because not stored) but would crash anyway # if we try to read them directly (very uncommon use case). Don't add your field on this # list if you can specify the group on the field directly (as all the other fields). return super()._has_field_access(field, operation) and ( self.env.su or self.env.user.has_group("hr.group_hr_user") or field.name not in ('activity_calendar_event_id', 'rating_ids', 'website_message_ids', 'message_has_sms_error') ) def check_no_existing_contract(self, date): if isinstance(date, str): date = fields.Date.from_string(date) if self._is_in_contract(date): raise ValidationError(self.env._("The employee is already in contract on %s. " "Please select a date outside existing contracts", format_date_abbr(self.env, date))) @api.onchange('contract_template_id') def _onchange_contract_template_id(self): if self.contract_template_id: whitelist = self.env['hr.version']._get_whitelist_fields_from_template() for field in self.contract_template_id._fields: if field in whitelist and not self.env['hr.version']._fields[field].related: self[field] = self.contract_template_id[field] @api.onchange('contract_date_start') def _onchange_contract_date_start(self): if not self.contract_date_start: self.contract_date_end = False @api.onchange("private_state_id") def _onchange_private_state_id(self): if self.private_state_id: self.private_country_id = self.private_state_id.country_id @api.onchange('work_phone', 'mobile_phone', 'company_country_id', 'company_id') def _onchange_phone_validation_employee(self): if self.work_phone: self.work_phone = self._phone_format(number=self.work_phone, force_format='INTERNATIONAL') or self.work_phone if self.mobile_phone: self.mobile_phone = self._phone_format(number=self.mobile_phone, force_format='INTERNATIONAL') or self.mobile_phone @api.model def _get_new_hire_field(self): return 'create_date' def _compute_newly_hired(self): new_hire_field = self._get_new_hire_field() new_hire_date = fields.Datetime.now() - timedelta(days=90) for employee in self: if not employee[new_hire_field]: employee.newly_hired = False elif not isinstance(employee[new_hire_field], datetime): employee.newly_hired = employee[new_hire_field] > new_hire_date.date() else: employee.newly_hired = employee[new_hire_field] > new_hire_date @api.depends('resource_calendar_id', 'hr_presence_state') def _compute_presence_icon(self): """ This method compute the state defining the display icon in the kanban view. It can be overriden to add other possibilities, like time off or attendances recordings. """ for employee in self: employee.hr_icon_display = 'presence_' + employee.hr_presence_state employee.show_hr_icon_display = bool(employee.user_id) @api.model def _get_certificate_selection(self): return [ ('graduate', self.env._('Graduate')), ('bachelor', self.env._('Bachelor')), ('master', self.env._('Master')), ('doctor', self.env._('Doctor')), ('other', self.env._('Other')), ] def _get_first_versions(self): self.ensure_one() versions = self.version_ids if self.env.context.get('before_date'): versions = versions.filtered(lambda c: c.date_start <= self.env.context['before_date']) return versions def _get_first_versions_filtered(self, no_gap=True): self.ensure_one() if not self.env.su and not self.env.user.has_group("hr.group_hr_user"): raise AccessError(_("Only HR users can access first version date on an employee.")) def remove_gap(versions): # We do not consider a gap of more than 4 days to be a same occupation # versions are considered to be ordered correctly if not versions: return self.env['hr.version'] if len(versions) == 1: return versions current_version = versions[0] older_versions = versions[1:] current_date = current_version.date_start for i, other_version in enumerate(older_versions): # Consider current_version.date_end being false as an error and cut the loop gap = (current_date - (other_version.date_end or date(2100, 1, 1))).days current_date = other_version.date_start if gap >= 4: return older_versions[0:i] + current_version return older_versions + current_version versions = self._get_first_versions().sorted('date_start', reverse=True) if no_gap: versions = remove_gap(versions) return versions def _get_first_version_date(self, no_gap=True): versions = self._get_first_versions_filtered(no_gap=no_gap) return min(versions.mapped('date_start')) if versions else False def _get_first_contract_date(self, no_gap=True): versions = self._get_first_versions_filtered(no_gap=no_gap).filtered(lambda x: x.contract_date_start) return min(versions.mapped('contract_date_start')) if versions else False @api.depends('name') def _compute_legal_name(self): for employee in self: if not employee.legal_name: employee.legal_name = employee.name @api.depends('current_version_id') @api.depends_context('version_id') def _compute_version_id(self): context_version_id = self.env.context.get('version_id', False) context_version = self.env['hr.version'].browse(context_version_id).exists() if context_version_id else self.env['hr.version'] for employee in self: if context_version.employee_id == self: version = context_version else: version = employee.current_version_id employee.version_id = version @api.depends("version_id.work_location_id.name") def _compute_work_location_name(self): for employee in self: employee.work_location_name = employee.version_id.work_location_id.name or None @api.depends("version_id.work_location_id.location_type") def _compute_work_location_type(self): for employee in self: employee.work_location_type = employee.version_id.work_location_id.location_type or 'other' @api.depends('version_ids.date_version', 'version_ids.active', 'active') def _compute_current_version_id(self): for employee in self: version = self.env['hr.version'].search( [('employee_id', 'in', employee.ids), ('date_version', '<=', fields.Date.today())], order='date_version desc', limit=1, ) new_current_version = False if version: new_current_version = version elif employee.version_ids: new_current_version = employee.version_ids[0] # To not trigger computed properties if still the same version if employee.current_version_id != new_current_version: employee.current_version_id = new_current_version def _cron_update_current_version_id(self): self.search([])._compute_current_version_id() def _search_version_id(self, operator, value): if operator in ('any', 'any!'): return Domain('current_version_id', operator, value) domain = Domain('id', operator, value) return Domain('id', 'in', self.env['hr.version']._search(domain).select('employee_id')) def _field_to_sql(self, alias: str, field_expr: str, query: (Query | None) = None) -> SQL: """This is required to search for the related fields of version_id as version_id is not stored""" if field_expr == 'version_id': field_expr = 'current_version_id' return super()._field_to_sql(alias, field_expr, query) def _get_version(self, date=fields.Date.today()): """ Return the version that should be used for the given date. If no valid version is found, we return the very first version of the employee. """ self.ensure_one() active_versions = self.version_ids.filtered(lambda v: v.active) versions = active_versions.filtered_domain([('date_version', '<=', date)]) return max(versions, key=lambda v: v.date_version) if versions else active_versions[0] def create_version(self, values): self.ensure_one() date = values.get('date_version', False) if not date: raise ValueError("date_version is required") if isinstance(date, str): date = fields.Date.to_date(date) elif isinstance(date, datetime): date = date.date() version_to_copy = self._get_version(date) if not version_to_copy: version_to_copy = self.env['hr.version'].search([('employee_id', '=', self.id)], limit=1) if version_to_copy.date_version == date: return version_to_copy date_from, date_to = self.sudo()._get_contract_dates(date) contract_date_start = values.get('contract_date_start', date_from) contract_date_end = values.get('contract_date_end', date_to) employee_id = values.get('employee_id', self.id) if isinstance(contract_date_start, str): contract_date_start = fields.Date.to_date(contract_date_start) if isinstance(contract_date_end, str): contract_date_end = fields.Date.to_date(contract_date_end) if contract_date_start == date_from and contract_date_end != date_to: versions_sudo_to_sync = self.env['hr.version'].with_context(sync_contract_dates=True).sudo().search([ ('employee_id', '=', employee_id), ('contract_date_start', '=', date_from), ]) if versions_sudo_to_sync: versions_sudo_to_sync.write({ 'contract_date_end': contract_date_end, }) self.check_access('write') version_to_copy.check_access('write') # to be sure even if the user has no access to certain fields, we can still copy the verison without any issues. copy_vals = { 'date_version': date, 'employee_id': employee_id, 'contract_date_start': contract_date_start, 'contract_date_end': contract_date_end, } if 'active' in values: copy_vals['active'] = values['active'] if calendar_id := values.get('resource_calendar_id'): copy_vals['resource_calendar_id'] = calendar_id # apply the changes on the new versions. new_version_vals = { field_name: field_value for field_name, field_value in values.items() if field_name not in copy_vals } version_fields = self.env['hr.version']._fields copy_vals = { k: v for k, v in version_to_copy.sudo().copy_data()[0].items() if not (k in new_version_vals and version_fields[k].type in ['one2many', 'many2many']) } | copy_vals new_version = self.env['hr.version'].sudo().create(copy_vals).sudo(False) with self.env.protecting([f for f_name, f in version_fields.items() if f_name not in new_version_vals and f.copy], new_version): properties_fields_vals = { field_name: field_value for field_name, field_value in copy_vals.items() if version_fields[field_name].type == 'properties' and field_name not in new_version_vals } if properties_fields_vals: # make sure properties vals are correctly copied. new_version.sudo().write(properties_fields_vals) new_version.write(new_version_vals) return new_version def create_contract(self, date): # Here we can assume that there is no existing contract on the date given self.ensure_one() if date and isinstance(date, str): date = fields.Date.to_date(date) contracts = self._get_contract_versions(date)[self.id] future_contract_dates = [d for d in list(contracts.keys()) if d > date] new_contract_date_end = min(future_contract_dates) + relativedelta(days=-1) if future_contract_dates else False # There is already a version but with no contract defined on it so we simply write on it the dates if version_same_date := self.version_ids.filtered(lambda v: v.date_version == date): version_same_date.write({ 'contract_date_start': date, 'contract_date_end': new_contract_date_end }) return version_same_date return self.create_version({ 'date_version': date, 'contract_date_start': date, 'contract_date_end': new_contract_date_end }) def _is_in_contract(self, date): return self._get_contract_dates(date) != (False, False) def _get_contracts(self, date_start=None, date_end=None, use_latest_version=True, domain=None): """ Retrieve the contracts for employees within a specified date range and based on specified criteria, such as domain filtering and version selection. This method is used to collect and organize employee contracts based on their versions, date ranges, and other specified options. The resulting contracts are grouped by employee, and their selection logic depends on whether the latest version should be used or not. It supports flexibility in contract retrieval by allowing optional filters for date range and domain. Args: date_start (Optional[datetime.date]): The start date to filter the contracts by. If provided, only contract versions <= this date are considered based on the selection logic. date_end (Optional[datetime.date]): The end date to filter the contracts by. Only contract versions within the range will be retrieved. Defaults to None if not specified. domain (Optional[dict]): A dictionary representing additional filters or constraints to apply to the contract versions retrieved. Defaults to None. use_latest_version (bool): Indicates whether to retrieve the version effective at the end of the contract (or before the date_end) for each employee (True) or at the start of the contract (before the date_start) (False). Defaults to True. Returns: collections.defaultdict: A dictionary mapping each employee's identifier (employee.id) to a set of their corresponding contracts. Each set contains version records retrieved and filtered based on the specified criteria. """ contract_versions_by_employee = self._get_contract_versions(date_start, date_end, domain) contracts_by_employee = defaultdict(lambda: self.env["hr.version"]) for employee_id in contract_versions_by_employee: for contract_versions in contract_versions_by_employee[employee_id].values(): effective_date = date_end if use_latest_version else date_start if use_latest_version: if effective_date: correct_versions = contract_versions.filtered(lambda v: v.date_version <= effective_date) contracts_by_employee[employee_id] |= correct_versions[-1] if correct_versions else contract_versions[0] else: contracts_by_employee[employee_id] |= contract_versions[-1] if use_latest_version else contract_versions[0] return contracts_by_employee def _get_contract_versions(self, date_start=None, date_end=None, domain=None): """ Retrieves contract versions for employees within the specified date range and domain. The function constructs a dynamic domain to filter contracts based on the provided arguments and retrieves grouped results. The grouping ensures organization by employee and date, and the results are stored in a structured format for ease of use. Args: date_start (datetime.date | None): The start date for filtering contracts. date_end (datetime.date | None): The end date for filtering contracts. domain (list | None): Additional domain constraints for filtering. Returns: dict: A dictionary where keys are employee IDs and values are lists of contract version records organized by contract date start and date range. """ version_domain = Domain('contract_date_start', '!=', False) if self.ids: version_domain &= Domain('employee_id', 'in', self.ids) elif not any(self._ids): # onchange version_domain &= Domain('employee_id', 'in', self._origin.ids) if date_start: version_domain &= Domain('contract_date_end', '=', False) | Domain('contract_date_end', '>=', date_start) if date_end: version_domain &= Domain('contract_date_start', '<=', date_end) if domain: version_domain &= domain all_versions = self.env['hr.version']._read_group( domain=version_domain, groupby=['employee_id', 'date_version:day'], aggregates=['id:recordset'], ) contract_versions_by_employee = defaultdict(lambda: defaultdict(lambda: self.env["hr.version"])) for employee, _date_version, version in all_versions: first_version = next(iter(version), version) contract_versions_by_employee[employee.id][first_version.contract_date_start] |= version return contract_versions_by_employee def _get_all_contract_dates(self): """ Return a list of intervals (date_from, date_to) where the employee is in contract. For a permanent contract, the interval is (date_from, False). """ self.ensure_one() return self.env['hr.version']._read_group( [('employee_id', '=', self.id), ('contract_date_start', '!=', False)], ['contract_date_start:day', 'contract_date_end:day']) def _get_contract_dates(self, date): """ Return a tuple (date_from, date_to) of the contract at the date given. (False, False) if the employee is not in contract at that date. """ self.ensure_one() for date_from, date_to in self._get_all_contract_dates(): if date_from <= date and (date_to is False or date_to >= date): return date_from, date_to return False, False def _compute_versions_count(self): version_count_per_employee = dict( self.env['hr.version']._read_group( [('employee_id', 'in', self.ids)], ['employee_id'], ['id:count'], ), ) for employee in self: employee.versions_count = version_count_per_employee.get(employee, 0) def _search_newly_hired(self, operator, value): if operator not in ('in', 'not in'): return NotImplemented new_hire_field = self._get_new_hire_field() new_hires = self.env['hr.employee'].sudo().search([ (new_hire_field, '>', fields.Datetime.now() - timedelta(days=90)) ]) return [('id', operator, new_hires.ids)] def _create_work_contacts(self): if any(employee.work_contact_id for employee in self): raise UserError(_('Some employee already have a work contact')) work_contacts = self.env['res.partner'].create([{ 'email': employee.work_email, 'phone': employee.work_phone, 'name': employee.name, 'image_1920': employee.image_1920, 'company_id': employee.company_id.id } for employee in self]) for employee, work_contact in zip(self, work_contacts): employee.work_contact_id = work_contact @api.depends('parent_id') def _compute_coach(self): for version in self: manager = version.parent_id previous_manager = version._origin.parent_id if manager and (version.coach_id == previous_manager or not version.coach_id): version.coach_id = manager elif not version.coach_id: version.coach_id = False @api.depends('work_contact_id', 'work_contact_id.phone', 'work_contact_id.email') def _compute_work_contact_details(self): for employee in self: if employee.work_contact_id: if len(employee.work_contact_id.employee_ids) <= 1: employee.work_phone = employee.work_contact_id.phone employee.work_email = employee.work_contact_id.email def _inverse_work_contact_details(self): employees_without_work_contact = self.env['hr.employee'] for employee in self: if not employee.work_contact_id: employees_without_work_contact += employee else: if len(employee.work_contact_id.employee_ids) <= 1: employee.work_contact_id.sudo().write({ 'email': employee.work_email, 'phone': employee.work_phone, }) if employees_without_work_contact: employees_without_work_contact.sudo()._create_work_contacts() @api.model def _get_employee_working_now(self): """ Sudo needed to get resource_calendar_id as its normally only accessible by hr_users on version model (accessible on employee by inherits).""" working_now = [] # We loop over all the employee tz and the resource calendar_id to detect working hours in batch. for tz_info, employee_ids in self.filtered('resource_calendar_id').grouped('tz').items(): calendar_by_employee = employee_ids.grouped('resource_calendar_id') tz = timezone(tz_info or 'UTC') from_datetime = utc.localize(fields.Datetime.now()).astimezone(tz) to_datetime = from_datetime + timedelta(hours=1) for calendar_id, res_employee_ids in calendar_by_employee.items(): # Getting work interval of the first is working. Functions called on resource_calendar_id # are waiting for singleton work_interval = calendar_id._work_intervals_batch(from_datetime, to_datetime)[False] # Employee that is not supposed to work have empty items. if len(work_interval._items) > 0: # The employees should be working now according to their work schedule working_now += res_employee_ids.ids return working_now @api.depends('user_id.im_status') def _compute_presence_state(self): """ This method is overritten in several other modules which add additional presence criterions. e.g. hr_attendance, hr_holidays """ # sudo: res.users - can access presence of accessible user employee_to_check_working = self.filtered( lambda e: (e.user_id.sudo().presence_ids.status or "offline") == "offline" ) working_now_list = employee_to_check_working._get_employee_working_now() for employee in self: state = 'out_of_working_hour' if employee.company_id.sudo().hr_presence_control_login: # sudo: res.users - can access presence of accessible user presence_status = employee.user_id.sudo().presence_ids.status or "offline" if presence_status == "online": state = 'present' elif presence_status == "offline" and employee.id in working_now_list: state = 'absent' if not employee.active: state = 'archive' employee.hr_presence_state = state @api.depends('user_id') def _compute_last_activity(self): for employee in self: tz = employee.tz # sudo: res.users - can access presence of accessible user if last_presence := employee.user_id.sudo().presence_ids.last_presence: last_activity_datetime = last_presence.replace(tzinfo=UTC).astimezone(timezone(tz)).replace(tzinfo=None) employee.last_activity = last_activity_datetime.date() if employee.last_activity == fields.Date.today(): employee.last_activity_time = format_time(self.env, last_presence, time_format='short') else: employee.last_activity_time = False else: employee.last_activity = False employee.last_activity_time = False @api.depends('name', 'user_id.avatar_1920', 'image_1920') def _compute_avatar_1920(self): super()._compute_avatar_1920() @api.depends('name', 'user_id.avatar_1024', 'image_1024') def _compute_avatar_1024(self): super()._compute_avatar_1024() @api.depends('name', 'user_id.avatar_512', 'image_512') def _compute_avatar_512(self): super()._compute_avatar_512() @api.depends('name', 'user_id.avatar_256', 'image_256') def _compute_avatar_256(self): super()._compute_avatar_256() @api.depends('name', 'user_id.avatar_128', 'image_128') def _compute_avatar_128(self): super()._compute_avatar_128() def _compute_avatar(self, avatar_field, image_field): employee_wo_user_and_image = self.env['hr.employee'] for employee in self: if not employee.user_id and not employee._origin[image_field]: employee_wo_user_and_image += employee continue avatar = employee._origin[image_field] if not avatar and employee.user_id: avatar = employee.user_id.sudo()[avatar_field] employee[avatar_field] = avatar super(HrEmployee, employee_wo_user_and_image)._compute_avatar(avatar_field, image_field) @api.depends('birthday_public_display') def _compute_birthday_public_display_string(self): for employee in self: if employee.birthday and employee.birthday_public_display: employee.birthday_public_display_string = datetime.strftime(employee.birthday, "%d %B") else: employee.birthday_public_display_string = "hidden" @api.depends('name', 'permit_no') def _compute_work_permit_name(self): for employee in self: name = employee.name.replace(' ', '_') + '_' if employee.name else '' permit_no = '_' + employee.permit_no if employee.permit_no else '' employee.work_permit_name = "%swork_permit%s" % (name, permit_no) def _get_partner_count_depends(self): return ['user_id'] @api.depends(lambda self: self._get_partner_count_depends()) def _compute_related_partners_count(self): self.related_partners_count = len(self._get_related_partners()) def _get_related_partners(self): return self.work_contact_id | self.user_id.partner_id def action_related_contacts(self): related_partners = self._get_related_partners() action = { 'name': _("Related Contacts"), 'type': 'ir.actions.act_window', 'res_model': 'res.partner', 'view_mode': 'form', } if len(related_partners) > 1: action['view_mode'] = 'kanban,list,form' action['domain'] = [('id', 'in', related_partners.ids)] return action else: action['res_id'] = related_partners.id return action def action_create_user(self): self.ensure_one() if self.user_id: raise ValidationError(_("This employee already has an user.")) return { 'name': _('Create User'), 'type': 'ir.actions.act_window', 'res_model': 'res.users', 'view_mode': 'form', 'view_id': self.env.ref('hr.view_users_simple_form').id, 'target': 'new', 'context': { **self.env.context, 'default_create_employee_id': self.id, 'default_name': self.name, 'default_phone': self.work_phone, 'default_mobile': self.mobile_phone, 'default_login': self.work_email, 'default_partner_id': self.work_contact_id.id, }, } def action_create_users_confirmation(self): raise RedirectWarning( message=_("You're about to invite new users. %s users will be created with the default user template's rights. " "Adding new users may increase your subscription cost. Do you wish to continue?", len(self.ids)), action=self.env.ref('hr.action_hr_employee_create_users').id, button_text=_('Confirm'), additional_context={ 'selected_ids': self.ids, }, ) def action_create_users(self): def _get_user_creation_notification_action(message, message_type, next_action): return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': self.env._("User Creation Notification"), 'type': message_type, 'message': message, 'next': next_action } } employee_emails = [ normalized_email for employee in self for normalized_email in tools.mail.email_normalize_all(employee.work_email) ] conflicting_users = self.env['res.users'] if employee_emails: conflicting_users = self.env['res.users'].search([ '|', ('email_normalized', 'in', employee_emails), ('login', 'in', employee_emails), ]) emp_by_email = self.grouped(lambda employee: email_normalize(employee.work_email)) duplicate_emails = [email for email, employees in emp_by_email.items() if email and len(employees) > 1] old_users = [] new_users = [] users_without_emails = [] users_with_invalid_emails = [] users_with_existing_email = [] employees_with_duplicate_email = [] for employee in self: normalized_email = email_normalize(employee.work_email) if employee.user_id: old_users.append(employee.name) continue if not employee.work_email: users_without_emails.append(employee.name) continue if not normalized_email: users_with_invalid_emails.append(employee.name) continue if normalized_email in conflicting_users.mapped('email_normalized'): users_with_existing_email.append(employee.name) continue if normalized_email in duplicate_emails: employees_with_duplicate_email.append(employee.name) continue new_users.append({ 'create_employee_id': employee.id, 'name': employee.name, 'phone': employee.work_phone, 'login': normalized_email, 'partner_id': employee.work_contact_id.id, }) next_action = {'type': 'ir.actions.act_window_close'} if new_users: self.env['res.users'].create(new_users) message = _('Users %s creation successful', ', '.join([user['name'] for user in new_users])) next_action = _get_user_creation_notification_action(message, 'success', { "type": "ir.actions.client", "tag": "soft_reload", "params": {"next": next_action}, }) if old_users: message = _('User already exists for Those Employees %s', ', '.join(old_users)) next_action = _get_user_creation_notification_action(message, 'warning', next_action) if users_without_emails: message = _("You need to set the work email address for %s", ', '.join(users_without_emails)) next_action = _get_user_creation_notification_action(message, 'danger', next_action) if users_with_invalid_emails: message = _("You need to set a valid work email address for %s", ', '.join(users_with_invalid_emails)) next_action = _get_user_creation_notification_action(message, 'danger', next_action) if users_with_existing_email: message = _('User already exists with the same email for Employees %s', ', '.join(users_with_existing_email)) next_action = _get_user_creation_notification_action(message, 'warning', next_action) if employees_with_duplicate_email: message = _('The following employees have the same work email address: %s', ', '.join(employees_with_duplicate_email)) next_action = _get_user_creation_notification_action(message, 'warning', next_action) return next_action def _compute_display_name(self): if self.browse().has_access('read'): return super()._compute_display_name() for employee_private, employee_public in zip(self, self.env['hr.employee.public'].browse(self.ids)): employee_private.display_name = employee_public.display_name @api.model def search_fetch(self, domain, field_names=None, offset=0, limit=None, order=None): if self.browse().has_access('read'): return super().search_fetch(domain, field_names, offset, limit, order) # HACK: retrieve publicly available values from hr.employee.public and # copy them to the cache of self; non-public data will be missing from # cache, and interpreted as an access error if field_names is None: field_names = [field.name for field in self._determine_fields_to_fetch()] field_names = [f_name for f_name in field_names if f_name != 'current_version_id'] self._check_private_fields(field_names) self.flush_model(field_names) public = self.env['hr.employee.public'].search_fetch(domain, field_names, offset, limit, order) employees = self.browse(public._ids) employees._copy_cache_from(public, field_names) return employees def fetch(self, field_names=None): if self.browse().has_access('read'): return super().fetch(field_names) # HACK: retrieve publicly available values from hr.employee.public and # copy them to the cache of self; non-public data will be missing from # cache, and interpreted as an access error if field_names is None: field_names = [field.name for field in self._determine_fields_to_fetch()] field_names = [f_name for f_name in field_names if f_name != 'current_version_id'] self._check_private_fields(field_names) self.flush_recordset(field_names) public = self.env['hr.employee.public'].browse(self._ids) public.fetch(field_names) # make sure all related fields from employee are in cache for field_name in field_names: public_field = self.env['hr.employee.public']._fields[field_name] private_field = self.env['hr.employee']._fields[field_name] if (public_field.related and public_field.related_field.model_name == 'hr.employee' or private_field.inherited and private_field.inherited_field.model_name == 'hr.version'): public.mapped(field_name) self._copy_cache_from(public, field_names) def _check_access(self, operation): # This method override provides read access to 'hr.employee' in some # situations, like setting a many2many field to comodel 'hr.employee'. # Since Odoo 19, one must have read access to the comodel to modify the # relation. if operation == 'read' and self.env.context.get('_allow_read_hr_employee') is _ALLOW_READ_HR_EMPLOYEE: return None return super()._check_access(operation) def _check_private_fields(self, field_names): """ Check whether ``field_names`` contain private fields. """ public_fields = self.env['hr.employee.public']._fields private_fields = [fname for fname in field_names if fname not in public_fields] if private_fields: raise AccessError(_('The fields ā€œ%sā€, which you are trying to read, are not available for employee public profiles.', ','.join(private_fields))) def _copy_cache_from(self, public, field_names): # HACK: retrieve publicly available values from hr.employee.public and # copy them to the cache of self; non-public data will be missing from # cache, and interpreted as an access error for fname in field_names: values = self.env.cache.get_values(public, public._fields[fname]) if self._fields[fname].translate: values = [(value.copy() if value else None) for value in values] self.env.cache.update_raw(self, self._fields[fname], values) @api.model def notify_expiring_contract_work_permit(self): companies = self.env['res.company'].search([]) employees_contract_expiring = self.env['hr.employee'] employees_work_permit_expiring = self.env['hr.employee'] for company in companies: employees_contract_expiring += self.env['hr.employee'].search([ ('company_id', '=', company.id), ('contract_date_start', '!=', False), ('contract_date_start', '<', fields.Date.today()), ('contract_date_end', '=', fields.Date.today() + relativedelta(days=company.contract_expiration_notice_period)), ]) employees_work_permit_expiring += self.env['hr.employee'].search([ ('company_id', '=', company.id), ('work_permit_expiration_date', '!=', False), ('work_permit_expiration_date', '=', fields.Date.today() + relativedelta(days=company.work_permit_expiration_notice_period)), ]) for employee in employees_contract_expiring: employee.with_context(mail_activity_quick_update=True).activity_schedule( 'mail.mail_activity_data_todo', employee.contract_date_end, _("The contract of %s is about to expire.", employee.name), user_id=employee.hr_responsible_id.id or self.env.uid) for employee in employees_work_permit_expiring: employee.with_context(mail_activity_quick_update=True).activity_schedule( 'mail.mail_activity_data_todo', employee.work_permit_expiration_date, _("The work permit of %s is about to expire.", employee.name), user_id=employee.hr_responsible_id.id or self.env.uid) return True @api.model def get_view(self, view_id=None, view_type='form', **options): if self.browse().has_access('read'): return super().get_view(view_id, view_type, **options) return self.env['hr.employee.public'].get_view(view_id, view_type, **options) @api.model def get_views(self, views, options=None): if self.browse().has_access('read'): return super().get_views(views, options) # returning public employee data would cause a traceback when building # the private employee xml view raise RedirectWarning( message=_( """You are not allowed to access "Employee" (hr.employee) records. We can redirect you to the public employee list.""" ), action=self.env.ref('hr.hr_employee_public_action').id, button_text=_("Employees profile"), ) @api.model def _search(self, domain, offset=0, limit=None, order=None, *, bypass_access=False, **kwargs): """ We override the _search because it is the method that checks the access rights This is correct to override the _search. That way we enforce the fact that calling search on an hr.employee returns a hr.employee recordset, even if you don't have access to this model, as the result of _search (the ids of the public employees) is to be browsed on the hr.employee model. This can be trusted as the ids of the public employees exactly match the ids of the related hr.employee. """ if self.browse().has_access('read') or bypass_access: return super()._search(domain, offset, limit, order, bypass_access=bypass_access, **kwargs) domain = Domain(domain) # HACK Some fields are inherited from the `current_version_id` and may have been already # optimized, showing current_version_id in the domain, but public employee does not have # that field and may have fields directly on the model, just change the condition to `id` in # that case. domain = domain.map_conditions(lambda cond: Domain('id', cond.operator, cond.value) if cond.field_expr == 'current_version_id' else cond) try: ids = self.env['hr.employee.public']._search(domain, offset, limit, order, **kwargs) except ValueError as e: raise AccessError(self.env._('You do not have access to this document.')) from e # the result is expected from this table, so we should link tables return super(HrEmployee, self.sudo())._search([('id', 'in', ids)], order=order) def _load_demo_data(self): dep_rd = self.env.ref('hr.dep_rd', raise_if_not_found=False) action_reload = { 'type': 'ir.actions.client', 'tag': 'reload', } if dep_rd: return action_reload convert.convert_file(env=self.sudo().env, module='hr', filename='data/scenarios/hr_scenario.xml', idref=None, mode='init') if 'resume_line_ids' in self: convert.convert_file(env=self.env, module='hr_skills', filename='data/scenarios/hr_skills_scenario.xml', idref=None, mode='init') return action_reload def get_formview_id(self, access_uid=None): """ Override this method in order to redirect many2one towards the right model depending on access_uid """ user = self.env.user if access_uid: user = self.env['res.users'].browse(access_uid).sudo() if user.has_group('hr.group_hr_user'): return super().get_formview_id(access_uid=access_uid) # Hardcode the form view for public employee return self.env.ref('hr.hr_employee_public_view_form').id def get_formview_action(self, access_uid=None): """ Override this method in order to redirect many2one towards the right model depending on access_uid """ res = super().get_formview_action(access_uid=access_uid) user = self.env.user if access_uid: user = self.env['res.users'].browse(access_uid).sudo() if not user.has_group('hr.group_hr_user'): res['res_model'] = 'hr.employee.public' return res @api.constrains('pin') def _verify_pin(self): for employee in self: if employee.pin and not employee.pin.isdigit(): raise ValidationError(_("The PIN must be a sequence of digits.")) @api.constrains('barcode') def _verify_barcode(self): for employee in self: if employee.barcode: if not (re.match(r'^[A-Za-z0-9]+$', employee.barcode) and len(employee.barcode) <= 18): raise ValidationError(_("The Badge ID must be alphanumeric without any accents and no longer than 18 characters.")) @api.onchange('user_id') def _onchange_user(self): self.update(self._sync_user(self.user_id, (bool(self.image_1920)))) if not self.name: self.name = self.user_id.name @api.onchange('resource_calendar_id') def _onchange_timezone(self): if self.resource_calendar_id and not self.tz: self.tz = self.resource_calendar_id.tz def _remove_work_contact_id(self, user, employee_company): """ Remove work_contact_id for previous employee if the user is assigned to a new employee """ employee_company = employee_company or self.company_id.id # For employees with a user_id, the constraint (user can't be linked to multiple employees) is triggered old_partner_employee_ids = user.partner_id.employee_ids.filtered(lambda e: not e.user_id and e.company_id.id == employee_company and e != self ) old_partner_employee_ids.work_contact_id = None def _sync_user(self, user, employee_has_image=False): vals = dict( work_contact_id=user.partner_id.id if user else self.work_contact_id.id, user_id=user.id, ) if not employee_has_image: vals['image_1920'] = user.image_1920 if user.tz: vals['tz'] = user.tz return vals def _prepare_resource_values(self, vals, tz): resource_vals = super()._prepare_resource_values(vals, tz) vals.pop('name') # Already considered by super call but no popped # We need to pop it to avoid useless resource update (& write) call # on every newly created resource (with the correct name already) user_id = vals.pop('user_id', None) if user_id: resource_vals['user_id'] = user_id active_status = vals.get('active') if active_status is not None: resource_vals['active'] = active_status return resource_vals @api.model def new(self, values=None, origin=None, ref=None): if not values: values = {} new_vals = values.copy() version_vals = {val: new_vals.pop(val) for val in values if val in self._fields and self._fields[val].inherited} employee = super().new(new_vals, origin, ref) version_vals['employee_id'] = employee self.env['hr.version'].new({ f_name: value for f_name, value in version_vals.items() if self.env['hr.version']._has_field_access(self.env['hr.version']._fields[f_name], 'read') }) return employee @api.model_create_multi def create(self, vals_list): vals_per_company = defaultdict(list) for idx, vals in enumerate(vals_list): if vals.get('user_id'): user = self.env['res.users'].browse(vals['user_id']) vals.update(self._sync_user(user, bool(vals.get('image_1920')))) vals['name'] = vals.get('name', user.name) self._remove_work_contact_id(user, vals.get('company_id')) # Having one create per company is necessary to pass the company in the context to correctly set it in # the underlying version created by the framework vals_per_company[vals.get('company_id', self.env.company)].append((idx, vals)) index_per_employee = {} employees = self.env['hr.employee'] for company, vals_list in vals_per_company.items(): idxs, vals_list = zip(*vals_list) new_employees = super(HrEmployee, self.with_company(company)).create(vals_list) index_per_employee.update(dict(zip(new_employees, idxs))) employees |= new_employees # As we do a custom batch by company, we must reorder the records to respect the original order. employees = employees.sorted(key=lambda employee: index_per_employee[employee]) # Sudo in case HR officer doesn't have the Contact Creation group employees.filtered(lambda e: not e.work_contact_id).sudo()._create_work_contacts() if self.env.context.get('salary_simulation'): return employees for employee_sudo in employees.sudo(): # creating 'svg/xml' attachments requires specific rights if not employee_sudo.image_1920 and self.env['ir.ui.view'].sudo(False).has_access('write'): employee_sudo.image_1920 = employee_sudo._avatar_generate_svg() employee_sudo.work_contact_id.image_1920 = employee_sudo.image_1920 employee_departments = employees.department_id if employee_departments: self.env['discuss.channel'].sudo().search([ ('subscription_department_ids', 'in', employee_departments.ids) ])._subscribe_users_automatically() onboarding_notes_bodies = {} hr_root_menu = self.env.ref('hr.menu_hr_root') for employee in employees: # Launch onboarding plans url = '/odoo/%s/action-hr.plan_wizard_action?active_model=hr.employee&menu_id=%s' % (employee.id, hr_root_menu.id) onboarding_notes_bodies[employee.id] = Markup(_( 'Congratulations! May I recommend you to setup an onboarding plan?', )) % url employees._message_log_batch(onboarding_notes_bodies) employees.invalidate_recordset() return employees def write(self, vals): if 'work_contact_id' in vals: self.message_unsubscribe(self.work_contact_id.ids) if 'user_id' in vals: # Update the profile pictures with user, except if provided user = self.env['res.users'].browse(vals['user_id']) vals.update(self._sync_user(user, (bool(all(emp.image_1920 for emp in self))))) self._remove_work_contact_id(user, vals.get('company_id')) if 'work_permit_expiration_date' in vals: vals['work_permit_scheduled_activity'] = False if vals.get('tz'): users_to_update = self.env['res.users'] for employee in self: if employee.user_id and employee.company_id == employee.user_id.company_id and vals['tz'] != employee.user_id.tz: users_to_update |= employee.user_id if users_to_update: users_to_update.write({'tz': vals['tz']}) if vals.get('department_id') or vals.get('user_id'): department_id = vals['department_id'] if vals.get('department_id') else self[:1].department_id.id # When added to a department or changing user, subscribe to the channels auto-subscribed by department self.env['discuss.channel'].sudo().search([ ('subscription_department_ids', 'in', department_id) ])._subscribe_users_automatically() if vals.get('departure_description'): for employee in self: employee.message_post(body=_( 'Additional Information: \n %(description)s', description=vals.get('departure_description'))) # Only one write call for all the fields from hr.version new_vals = vals.copy() version_vals = {val: new_vals.pop(val) for val in vals if val in self._fields and self._fields[val].inherited} res = super().write(new_vals) if 'work_contact_id' in vals: account_ids = self.bank_account_ids.ids if account_ids: bank_accounts = self.env['res.partner.bank'].sudo().browse(account_ids) for bank_account in bank_accounts: if vals['work_contact_id'] != bank_account.partner_id.id: if bank_account.allow_out_payment: bank_account.allow_out_payment = False if vals['work_contact_id']: bank_account.partner_id = vals['work_contact_id'] if version_vals: version_vals['last_modified_date'] = fields.Datetime.now() version_vals['last_modified_uid'] = self.env.uid self.version_id.write(version_vals) for employee in self: employee._track_set_log_message(Markup("Modified on the Version '%s'") % employee.version_id.display_name) if res and 'resource_calendar_id' in vals: resources_per_calendar_id = defaultdict(lambda: self.env['resource.resource']) for employee in self: if employee.version_id == employee.current_version_id: resources_per_calendar_id[employee.resource_calendar_id.id] += employee.resource_id for calendar_id, resources in resources_per_calendar_id.items(): resources.write({'calendar_id': calendar_id}) return res def unlink(self): resources = self.mapped('resource_id') super().unlink() return resources.unlink() def _get_employee_m2o_to_empty_on_archived_employees(self): return ['parent_id', 'coach_id'] def _get_user_m2o_to_empty_on_archived_employees(self): return [] def action_unarchive(self): res = super().action_unarchive() self.write({ 'departure_reason_id': False, 'departure_description': False, 'departure_date': False }) return res def action_archive(self): archived_employees = self.filtered('active') res = super().action_archive() if archived_employees: # Empty links to this employees (example: manager, coach, time off responsible, ...) employee_fields_to_empty = self._get_employee_m2o_to_empty_on_archived_employees() user_fields_to_empty = self._get_user_m2o_to_empty_on_archived_employees() employee_domain = Domain.OR(Domain(field, 'in', archived_employees.ids) for field in employee_fields_to_empty) user_domain = Domain.OR(Domain(field, 'in', archived_employees.user_id.ids) for field in user_fields_to_empty) employees = self.env['hr.employee'].search(employee_domain | user_domain) for employee in employees: for field in employee_fields_to_empty: if employee[field] in archived_employees: employee[field] = False for field in user_fields_to_empty: if employee[field] in archived_employees.user_id: employee[field] = False if len(archived_employees) == 1 and not self.env.context.get('no_wizard', False): return { 'type': 'ir.actions.act_window', 'name': _('Register Departure'), 'res_model': 'hr.departure.wizard', 'view_mode': 'form', 'target': 'new', 'context': {'active_id': self.id}, 'views': [[False, 'form']] } return res @api.onchange('company_id') def _onchange_company_id(self): if self._origin: return {'warning': { 'title': _("Warning"), 'message': _("To avoid multi company issues (losing the access to your previous contracts, leaves, ...), you should create another employee in the new company instead.") }} def _load_scenario(self): demo_tag = self.env.ref('hr.employee_category_demo', raise_if_not_found=False) if demo_tag: return convert.convert_file(self.env, 'hr', 'data/scenarios/hr_scenario.xml', None, mode='init') # --------------------------------------------------------- # Business Methods # --------------------------------------------------------- def generate_random_barcode(self): for employee in self: employee.barcode = '041'+"".join(choice(digits) for i in range(9)) def _get_tz(self): self.ensure_one() return self.resource_calendar_id.tz or\ self.tz or\ self.company_id.resource_calendar_id.tz or\ 'UTC' def _get_tz_batch(self): # Finds the first valid timezone in his tz, his work hours tz, # the company calendar tz or UTC # Returns a dict {employee_id: tz} return {emp.id: emp._get_tz() for emp in self} def _get_calendar_tz_batch(self, dt=None): """ Return a mapping { employee id : employee's effective schedule's (at dt) timezone } """ employees_by_id = self.grouped('id') if not dt: calendars = self._get_calendars() return { emp_id: calendar.sudo().tz or employees_by_id[emp_id].tz \ for emp_id, calendar in calendars.items() } employees_by_tz = self.grouped(lambda emp: emp._get_tz()) employee_timezones = {} for tz, employee_ids in employees_by_tz.items(): date_at = timezone(tz).localize(dt).date() calendars = self._get_calendars(date_at) employee_timezones |= { emp_id: cal.sudo().tz or employees_by_id[emp_id].tz \ for emp_id, cal in calendars.items() } return employee_timezones def _get_calendars(self, date_from=None): res = super()._get_calendars(date_from=date_from) if not date_from: return res date_from = fields.Date.to_date(date_from) for employee in self: employee_versions_sudo = employee.sudo().version_ids.filtered(lambda v: v._is_in_contract(date_from)) if employee_versions_sudo: res[employee.id] = employee_versions_sudo[0].resource_calendar_id.sudo(False) return res def _get_version_periods(self, start, stop, field=None, check_contract=False): if field and field not in self: raise UserError(self.env._( "This field %(field_name)s doesn't exist on this model (hr.version).", field_name=field )) version_periods_by_employee = defaultdict(list) if check_contract: versions = self._get_versions_with_contract_overlap_with_period(start.date(), stop.date()) else: versions = self.version_ids.filtered_domain([ ('date_start', '<=', stop), '|', ('date_end', '=', False), ('date_end', '>=', start) ]) for version in versions: # if employee is under fully flexible contract, use timezone of the employee calendar_tz = timezone(version.resource_calendar_id.tz) if version.resource_calendar_id else timezone(version.employee_id.resource_id.tz) date_start = datetime.combine(version.date_start, time.min).replace(tzinfo=calendar_tz).astimezone(utc) end_date = version.date_end if end_date: date_end = datetime.combine( end_date + relativedelta(days=1), time.min, ).replace(tzinfo=calendar_tz).astimezone(utc) else: date_end = stop version_periods_by_employee[version.employee_id].append( (max(date_start, start), min(date_end, stop), version[field] if field else version)) return version_periods_by_employee def _get_calendar_periods(self, start, stop, check_contract=True): """ :param datetime start: the start of the period :param datetime stop: the stop of the period """ return self.sudo()._get_version_periods(start, stop, 'resource_calendar_id', check_contract) @api.model def _get_all_versions_with_contract_overlap_with_period(self, date_from, date_to): """ Returns the versions of all employees between date_from and date_to that have at least 1 day in contract during that period """ all_employees = self.search(['|', ('active', '=', True), ('active', '=', False)]) return all_employees._get_versions_with_contract_overlap_with_period(date_from, date_to) def _get_unusual_days(self, date_from, date_to=None): date_from_date = datetime.strptime(date_from, '%Y-%m-%d %H:%M:%S').date() date_to_date = datetime.strptime(date_to, '%Y-%m-%d %H:%M:%S').date() if date_to else None employee_versions = self.env['hr.version'].sudo().search([('employee_id', '=', self.id)]).filtered( lambda v: v._is_overlapping_period(date_from_date, date_to_date)) if not employee_versions: # Checking the calendar directly allows to not grey out the leaves taken # by the employee or fallback to the company calendar return (self.resource_calendar_id or self.env.company.resource_calendar_id)._get_unusual_days( datetime.combine(fields.Date.from_string(date_from), time.min).replace(tzinfo=UTC), datetime.combine(fields.Date.from_string(date_to), time.max).replace(tzinfo=UTC), self.company_id, ) unusual_days = {} for version in employee_versions: tmp_date_from = max(date_from_date, version.date_start) tmp_date_to = min(date_to_date, version.date_end) if version.date_end else date_to_date unusual_days.update(version.resource_calendar_id.sudo(False)._get_unusual_days( datetime.combine(fields.Date.from_string(tmp_date_from), time.min).replace(tzinfo=UTC), datetime.combine(fields.Date.from_string(tmp_date_to), time.max).replace(tzinfo=UTC), self.company_id, )) return unusual_days def _employee_attendance_intervals(self, start, stop, lunch=False): self.ensure_one() if not lunch: return self._get_expected_attendances(start, stop) else: valid_versions = self.sudo()._get_versions_with_contract_overlap_with_period(start.date(), stop.date()) if not valid_versions: calendar = self.resource_calendar_id or self.company_id.resource_calendar_id return calendar._attendance_intervals_batch(start, stop, self.resource_id, lunch=True)[self.resource_id.id] employee_tz = timezone(self.tz) if self.tz else None duration_data = Intervals() for version in valid_versions: version_start = datetime.combine(version.date_start, time.min, employee_tz) version_end = datetime.combine(version.date_end or date.max, time.max, employee_tz) calendar = version.resource_calendar_id or version.company_id.resource_calendar_id lunch_intervals = calendar._attendance_intervals_batch( max(start, version_start), min(stop, version_end), resources=self.resource_id, lunch=True)[self.resource_id.id] duration_data = duration_data | lunch_intervals return duration_data def _get_expected_attendances(self, date_from, date_to): self.ensure_one() valid_versions = self.sudo()._get_versions_with_contract_overlap_with_period(date_from.date(), date_to.date()) employee_tz = timezone(self.tz) if self.tz else None if not valid_versions: calendar = self.resource_calendar_id or self.company_id.resource_calendar_id calendar_intervals = calendar._work_intervals_batch( date_from, date_to, tz=employee_tz, resources=self.resource_id, compute_leaves=True, domain=[('company_id', 'in', [False, self.company_id.id])])[self.resource_id.id] return calendar_intervals duration_data = Intervals() version_prev = datetime.combine(valid_versions[0].date_start, time.min, employee_tz) for version in valid_versions: version_start = datetime.combine(version.date_start, time.min, employee_tz) contract_start = datetime.combine(version.contract_date_start, time.min, employee_tz) version_end = datetime.combine(version.date_end or date.max, time.max, employee_tz) calendar = version.resource_calendar_id or version.company_id.resource_calendar_id start_date = version_start if version_prev < version_start else contract_start version_intervals = calendar._work_intervals_batch( max(date_from, start_date), min(date_to, version_end), tz=employee_tz, resources=self.resource_id, compute_leaves=True, domain=[('company_id', 'in', [False, self.company_id.id]), ('time_type', '=', 'leave')])[self.resource_id.id] duration_data = duration_data | version_intervals return duration_data def _get_calendar_attendances(self, date_from, date_to): self.ensure_one() valid_versions = self.sudo()._get_versions_with_contract_overlap_with_period(date_from.date(), date_to.date()) employee_tz = timezone(self.tz) if self.tz else None if not valid_versions: calendar = self.resource_calendar_id or self.company_id.resource_calendar_id return calendar.with_context(employee_timezone=employee_tz).get_work_duration_data( date_from, date_to, domain=[('company_id', 'in', [False, self.company_id.id])]) duration_data = {'days': 0, 'hours': 0} for version in valid_versions: version_start = datetime.combine(version.date_start, time.min, employee_tz) version_end = datetime.combine(version.date_end or date.max, time.max, employee_tz) calendar = version.resource_calendar_id or version.company_id.resource_calendar_id version_duration_data = calendar\ .with_context(employee_timezone=employee_tz)\ .get_work_duration_data( max(date_from, version_start), min(date_to, version_end), domain=[('company_id', 'in', [False, version.company_id.id])]) duration_data['days'] += version_duration_data['days'] duration_data['hours'] += version_duration_data['hours'] return duration_data @api.model def get_import_templates(self): return [{ 'label': _('Import Template for Employees'), 'template': '/hr/static/xls/hr_employee.xls' }] def _get_age(self, target_date=None): self.ensure_one() if target_date is None: target_date = fields.Date.context_today(self.env.user) return relativedelta(target_date, self.birthday).years if self.birthday else 0 def _get_departure_date(self): # Primarily used in the archive wizard # to pick a good default for the departure date self.ensure_one() if self.date_end and self.date_end < fields.Date.today(): return self.departure_date return False def _get_versions_with_contract_overlap_with_period(self, date_from, date_to): """ Returns the versions of the employee between date_from and date_to that have at least 1 day in contract during that period """ return self.version_ids.filtered_domain([ ('contract_date_start', '!=', False), ('contract_date_start', '<=', date_to), '|', ('contract_date_end', '>=', date_from), ('contract_date_end', '=', False), ]) def get_avatar_card_data(self, fields): return self.read(fields) # --------------------------------------------------------- # Messaging # --------------------------------------------------------- def _phone_get_number_fields(self): return ['mobile_phone'] def _mail_get_partner_fields(self, introspect_fields=False): return ['work_contact_id', 'user_partner_id'] def action_open_versions(self): self.ensure_one() return { 'type': 'ir.actions.act_window', 'name': self.employee_id.name + self.env._(' Records'), 'path': 'versions', 'res_model': 'hr.version', 'view_mode': 'list,graph,pivot', 'views': [(self.env.ref('hr.hr_version_list_view').id, 'list'), (False, 'graph'), (False, 'pivot')], 'domain': [('employee_id', '=', self.employee_id.id)], 'search_view_id': self.env.ref('hr.hr_version_search_view').id } def _get_store_avatar_card_fields(self, target): employee_fields = [ "company_id", Store.One("department_id", ["name"]), "work_email", Store.One("work_location_id", ["location_type", "name"]), "work_phone", ] user = target.get_user(self.env) if user.has_group("hr.group_hr_user"): # job_title is not a field of hr.employee.public, but it is a field of hr.employee employee_fields.append("job_title") # HACK: fetch the employee fields from employees to retrieve hr.employee.public fields if no access to hr.employee if len(self) > 0: self.fetch([ field.field_name if isinstance(field, Store.Attr) else field for field in employee_fields ]) return employee_fields @api.depends('bank_account_ids') def _compute_primary_bank_account_id(self): for employee in self: if employee.bank_account_ids: primary_account = min( employee.bank_account_ids, key=lambda acc: employee.salary_distribution.get(str(acc.id), {}).get("sequence", float("inf")), ) employee.primary_bank_account_id = primary_account else: employee.primary_bank_account_id = False def get_accounts_with_fixed_allocations(self): self.ensure_one() return self.bank_account_ids.filtered( lambda a: not self.salary_distribution.get(str(a.id), {}).get('amount_is_percentage', True) ) def get_bank_account_salary_allocation(self, account_id): ba_info = self.salary_distribution.get(str(account_id), {}) return ba_info.get('amount', 0), ba_info.get('amount_is_percentage') def get_remaining_percentage(self): self.ensure_one() distribution = self.salary_distribution or {} allocated = 0.0 for ba_id, vals in distribution.items(): if vals.get('amount_is_percentage'): allocated += vals.get('amount', 0.0) remaining = 100.0 - allocated return max(0.0, remaining) def action_open_allocation_wizard(self): self.ensure_one() wizard = self.env['hr.bank.account.allocation.wizard'].create({ 'employee_id': self.id, }) return { 'type': 'ir.actions.act_window', 'name': self.env._('Bank Account Allocation'), 'res_model': 'hr.bank.account.allocation.wizard', 'res_id': wizard.id, 'view_mode': 'form', 'target': 'new', } def action_toggle_primary_bank_account_trust(self): self.ensure_one() current_val = self.primary_bank_account_id.allow_out_payment self.primary_bank_account_id.allow_out_payment = not current_val