19.0 vanilla

This commit is contained in:
Ernad Husremovic 2026-03-09 09:31:47 +01:00
parent accf5918df
commit 6e65e8c877
688 changed files with 225434 additions and 199401 deletions

View file

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
from babel.dates import format_date
from collections import defaultdict
from dateutil import relativedelta
from datetime import timedelta, datetime
from functools import partial
@ -9,15 +12,17 @@ from random import randint
from odoo import api, exceptions, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.addons.resource.models.resource import make_aware, Intervals
from odoo.tools.float_utils import float_compare
from odoo.tools.intervals import Intervals
from odoo.tools.date_utils import start_of, end_of, localized, to_timezone
from odoo.tools.float_utils import float_compare, float_is_zero, float_round
from odoo.tools.misc import get_lang
class MrpWorkcenter(models.Model):
_name = 'mrp.workcenter'
_description = 'Work Center'
_order = "sequence, id"
_inherit = ['resource.mixin']
_inherit = ['mail.thread', 'resource.mixin']
_check_company_auto = True
# resource
@ -28,23 +33,21 @@ class MrpWorkcenter(models.Model):
code = fields.Char('Code', copy=False)
note = fields.Html(
'Description')
default_capacity = fields.Float(
'Capacity', default=1.0,
help="Default number of pieces (in product UoM) that can be produced in parallel (at the same time) at this work center. For example: the capacity is 5 and you need to produce 10 units, then the operation time listed on the BOM will be multiplied by two. However, note that both time before and after production will only be counted once.")
sequence = fields.Integer(
'Sequence', default=1, required=True,
help="Gives the sequence order when displaying a list of work centers.")
color = fields.Integer('Color')
currency_id = fields.Many2one('res.currency', 'Currency', related='company_id.currency_id', readonly=True, required=True)
costs_hour = fields.Float(string='Cost per hour', help='Hourly processing cost.', default=0.0)
costs_hour = fields.Float(string='Cost per hour', help='Hourly processing cost.', default=0.0, tracking=True)
time_start = fields.Float('Setup Time')
time_stop = fields.Float('Cleanup Time')
routing_line_ids = fields.One2many('mrp.routing.workcenter', 'workcenter_id', "Routing Lines")
has_routing_lines = fields.Boolean(compute='_compute_has_routing_lines', help='Technical field for workcenter views')
order_ids = fields.One2many('mrp.workorder', 'workcenter_id', "Orders")
workorder_count = fields.Integer('# Work Orders', compute='_compute_workorder_count')
workorder_ready_count = fields.Integer('# Read Work Orders', compute='_compute_workorder_count')
workorder_ready_count = fields.Integer('# To Do Work Orders', compute='_compute_workorder_count')
workorder_progress_count = fields.Integer('Total Running Orders', compute='_compute_workorder_count')
workorder_pending_count = fields.Integer('Total Pending Orders', compute='_compute_workorder_count')
workorder_blocked_count = fields.Integer('Total Pending Orders', compute='_compute_workorder_count')
workorder_late_count = fields.Integer('Total Late Orders', compute='_compute_workorder_count')
time_ids = fields.One2many('mrp.workcenter.productivity', 'workcenter_id', 'Time Logs')
@ -74,6 +77,15 @@ class MrpWorkcenter(models.Model):
tag_ids = fields.Many2many('mrp.workcenter.tag')
capacity_ids = fields.One2many('mrp.workcenter.capacity', 'workcenter_id', string='Product Capacities',
help="Specific number of pieces that can be produced in parallel per product.", copy=True)
kanban_dashboard_graph = fields.Text(compute='_compute_kanban_dashboard_graph')
resource_calendar_id = fields.Many2one(check_company=True)
def _compute_display_name(self):
super()._compute_display_name()
for workcenter in self:
# Show the red icon(workcenter is blocked) only when the Gantt view is accessed from MRP > Planning > Planning by Workcenter.
if self.env.context.get('group_by') and self.env.context.get('show_workcenter_status') and workcenter.working_state == 'blocked':
workcenter.display_name = f"{workcenter.display_name}\u00A0\u00A0🔴"
@api.constrains('alternative_workcenter_ids')
def _check_alternative_workcenter(self):
@ -81,28 +93,99 @@ class MrpWorkcenter(models.Model):
if workcenter in workcenter.alternative_workcenter_ids:
raise ValidationError(_("Workcenter %s cannot be an alternative of itself.", workcenter.name))
@api.depends('order_ids.duration_expected', 'order_ids.workcenter_id', 'order_ids.state', 'order_ids.date_planned_start')
def _compute_kanban_dashboard_graph(self):
week_range, date_start, date_stop = self._get_week_range_and_first_last_days()
load_data = self._get_workcenter_load_per_week(week_range, date_start, date_stop)
load_graph_data = self._prepare_graph_data(load_data, week_range)
for wc in self:
wc.kanban_dashboard_graph = json.dumps(load_graph_data[wc.id])
def _get_week_range_and_first_last_days(self):
""" We calculate the delta between today and the previous monday,
then add it to the delta between monday and the previous first day
of the week as configured in the language settings.
We use the result to calculate the modulo of 7 to make sure that
we do not take the previous first day of the week from 2 weeks ago.
E.g. today is Thursday, the first of a week is a Tuesday.
The delta between today and Monday is 3 days.
The delta between Monday and the previous Tuesday is 6 days.
(3 + 6) % 7 = 2, so from today, the first day of the current week is 2 days ago.
"""
week_range = {}
locale = get_lang(self.env).code
today = datetime.today()
delta_from_monday_to_today = (today - start_of(today, 'week')).days
first_week_day = int(get_lang(self.env).week_start) - 1
day_offset = ((7 - first_week_day) + delta_from_monday_to_today) % 7
for delta in range(-7, 28, 7):
week_start = start_of(today + relativedelta.relativedelta(days=delta - day_offset), 'day')
week_end = week_start + relativedelta.relativedelta(days=6)
short_name = (format_date(week_start, 'd - ', locale=locale)
+ format_date(week_end, 'd MMM', locale=locale))
if not delta:
short_name = _('This Week')
week_range[week_start] = short_name
date_start = start_of(today + relativedelta.relativedelta(days=-7 - day_offset), 'day')
date_stop = end_of(today + relativedelta.relativedelta(days=27 - day_offset), 'day')
return week_range, date_start, date_stop
def _get_workcenter_load_per_week(self, week_range, date_start, date_stop):
load_data = {rec: {} for rec in self}
# demo data
has_workorder = self.env['mrp.workorder'].search_count([('workcenter_id', 'in', self.ids)], limit=1)
if not has_workorder:
for wc in self:
load_limit = 40 # default max load per week is 40 hours on a new workcenter
load_data[wc] = {week_start: randint(0, int(load_limit * 2)) for week_start in week_range}
return load_data
result = self.env['mrp.workorder']._read_group(
[('workcenter_id', 'in', self.ids), ('state', 'in', ('pending', 'waiting', 'ready', 'progress')),
('production_date', '>=', date_start), ('production_date', '<=', date_stop)],
['workcenter_id', 'production_date:week'], ['duration_expected:sum'])
for r in result:
load_in_hours = round(r[2] / 60, 1)
load_data[r[0]].update({r[1]: load_in_hours})
return load_data
def _prepare_graph_data(self, load_data, week_range):
graph_data = {wid: [] for wid in self._ids}
has_workorder = self.env['mrp.workorder'].search_count([('workcenter_id', 'in', self.ids)], limit=1)
for workcenter in self:
load_limit = sum(workcenter.resource_calendar_id.attendance_ids.mapped('duration_hours'))
wc_data = {'is_sample_data': not has_workorder, 'labels': list(week_range.values())}
load_bar = []
excess_bar = []
for week_start in week_range:
load_bar.append(min(load_data[workcenter].get(week_start, 0), load_limit))
excess_bar.append(max(float_round(load_data[workcenter].get(week_start, 0) - load_limit, precision_digits=1, rounding_method='HALF-UP'), 0))
wc_data['values'] = [load_bar, load_limit, excess_bar]
graph_data[workcenter.id].append(wc_data)
return graph_data
@api.depends('order_ids.duration_expected', 'order_ids.workcenter_id', 'order_ids.state', 'order_ids.date_start')
def _compute_workorder_count(self):
MrpWorkorder = self.env['mrp.workorder']
result = {wid: {} for wid in self._ids}
result_duration_expected = {wid: 0 for wid in self._ids}
# Count Late Workorder
data = MrpWorkorder._read_group(
[('workcenter_id', 'in', self.ids), ('state', 'in', ('pending', 'waiting', 'ready')), ('date_planned_start', '<', datetime.now().strftime('%Y-%m-%d'))],
['workcenter_id'], ['workcenter_id'])
count_data = dict((item['workcenter_id'][0], item['workcenter_id_count']) for item in data)
[('workcenter_id', 'in', self.ids), ('state', 'in', ('blocked', 'ready')), ('date_start', '<', datetime.now().strftime('%Y-%m-%d'))],
['workcenter_id'], ['__count'])
count_data = {workcenter.id: count for workcenter, count in data}
# Count All, Pending, Ready, Progress Workorder
res = MrpWorkorder._read_group(
[('workcenter_id', 'in', self.ids)],
['workcenter_id', 'state', 'duration_expected'], ['workcenter_id', 'state'],
lazy=False)
for res_group in res:
result[res_group['workcenter_id'][0]][res_group['state']] = res_group['__count']
if res_group['state'] in ('pending', 'waiting', 'ready', 'progress'):
result_duration_expected[res_group['workcenter_id'][0]] += res_group['duration_expected']
['workcenter_id', 'state'], ['duration_expected:sum', '__count'])
for workcenter, state, duration_sum, count in res:
result[workcenter.id][state] = count
if state in ('blocked', 'ready', 'progress'):
result_duration_expected[workcenter.id] += duration_sum
for workcenter in self:
workcenter.workorder_count = sum(count for state, count in result[workcenter.id].items() if state not in ('done', 'cancel'))
workcenter.workorder_pending_count = result[workcenter.id].get('pending', 0)
workcenter.workorder_blocked_count = result[workcenter.id].get('blocked', 0)
workcenter.workcenter_load = result_duration_expected[workcenter.id]
workcenter.workorder_ready_count = result[workcenter.id].get('ready', 0)
workcenter.workorder_progress_count = result[workcenter.id].get('progress', 0)
@ -110,14 +193,20 @@ class MrpWorkcenter(models.Model):
@api.depends('time_ids', 'time_ids.date_end', 'time_ids.loss_type')
def _compute_working_state(self):
# We search for a productivity line associated to this workcenter having no `date_end`.
# If we do not find one, the workcenter is not currently being used. If we find one, according
# to its `type_loss`, the workcenter is either being used or blocked.
time_log_by_workcenter = {}
for time_log in self.env['mrp.workcenter.productivity'].search([
('workcenter_id', 'in', self.ids),
('date_end', '=', False),
]):
wc = time_log.workcenter_id
if wc not in time_log_by_workcenter:
time_log_by_workcenter[wc] = time_log
for workcenter in self:
# We search for a productivity line associated to this workcenter having no `date_end`.
# If we do not find one, the workcenter is not currently being used. If we find one, according
# to its `type_loss`, the workcenter is either being used or blocked.
time_log = self.env['mrp.workcenter.productivity'].search([
('workcenter_id', '=', workcenter.id),
('date_end', '=', False)
], limit=1)
time_log = time_log_by_workcenter.get(workcenter._origin)
if not time_log:
# the workcenter is not being used
workcenter.working_state = 'normal'
@ -135,8 +224,8 @@ class MrpWorkcenter(models.Model):
('workcenter_id', 'in', self.ids),
('date_end', '!=', False),
('loss_type', '!=', 'productive')],
['duration', 'workcenter_id'], ['workcenter_id'], lazy=False)
count_data = dict((item['workcenter_id'][0], item['duration']) for item in data)
['workcenter_id'], ['duration:sum'])
count_data = {workcenter.id: duration for workcenter, duration in data}
for workcenter in self:
workcenter.blocked_time = count_data.get(workcenter.id, 0.0) / 60.0
@ -147,36 +236,53 @@ class MrpWorkcenter(models.Model):
('workcenter_id', 'in', self.ids),
('date_end', '!=', False),
('loss_type', '=', 'productive')],
['duration', 'workcenter_id'], ['workcenter_id'], lazy=False)
count_data = dict((item['workcenter_id'][0], item['duration']) for item in data)
['workcenter_id'], ['duration:sum'])
count_data = {workcenter.id: duration for workcenter, duration in data}
for workcenter in self:
workcenter.productive_time = count_data.get(workcenter.id, 0.0) / 60.0
@api.depends('blocked_time', 'productive_time')
def _compute_oee(self):
for order in self:
if order.productive_time:
order.oee = round(order.productive_time * 100.0 / (order.productive_time + order.blocked_time), 2)
time_data = self.env['mrp.workcenter.productivity']._read_group(
domain=[
('date_start', '>=', fields.Datetime.to_string(datetime.now() - relativedelta.relativedelta(months=1))),
('workcenter_id', 'in', self.ids),
('date_end', '!=', False),
],
groupby=['workcenter_id', 'loss_type'],
aggregates=['duration:sum'],
)
time_by_workcenter = defaultdict(lambda: {'productive_time': 0.0, 'blocked_time': 0.0})
for data in time_data:
workcenter, loss_type, duration = data
time_to_update = 'productive_time' if loss_type == 'productive' else 'blocked_time'
time_by_workcenter[workcenter.id][time_to_update] += duration
for workcenter in self:
workcenter_time = time_by_workcenter[workcenter.id]
productive_time = workcenter_time['productive_time']
if productive_time:
blocked_time = workcenter_time['blocked_time']
workcenter.oee = float_round(productive_time * 100.0 / (productive_time + blocked_time), precision_digits=2)
else:
order.oee = 0.0
workcenter.oee = 0.0
def _compute_performance(self):
wo_data = self.env['mrp.workorder']._read_group([
('date_start', '>=', fields.Datetime.to_string(datetime.now() - relativedelta.relativedelta(months=1))),
('workcenter_id', 'in', self.ids),
('state', '=', 'done')], ['duration_expected', 'workcenter_id', 'duration'], ['workcenter_id'], lazy=False)
duration_expected = dict((data['workcenter_id'][0], data['duration_expected']) for data in wo_data)
duration = dict((data['workcenter_id'][0], data['duration']) for data in wo_data)
('state', '=', 'done')], ['workcenter_id'], ['duration_expected:sum', 'duration:sum'])
duration_expected = {workcenter.id: expected for workcenter, expected, __ in wo_data}
duration = {workcenter.id: duration for workcenter, __, duration in wo_data}
for workcenter in self:
if duration.get(workcenter.id):
workcenter.performance = 100 * duration_expected.get(workcenter.id, 0.0) / duration[workcenter.id]
else:
workcenter.performance = 0.0
@api.constrains('default_capacity')
def _check_capacity(self):
if any(workcenter.default_capacity <= 0.0 for workcenter in self):
raise exceptions.UserError(_('The capacity must be strictly positive.'))
@api.depends('routing_line_ids')
def _compute_has_routing_lines(self):
for workcenter in self:
workcenter.has_routing_lines = self.env['mrp.routing.workcenter'].search_count([('workcenter_id', 'in', workcenter.ids)], limit=1)
def unblock(self):
self.ensure_one()
@ -184,7 +290,7 @@ class MrpWorkcenter(models.Model):
raise exceptions.UserError(_("It has already been unblocked."))
times = self.env['mrp.workcenter.productivity'].search([('workcenter_id', '=', self.id), ('date_end', '=', False)])
times.write({'date_end': datetime.now()})
return {'type': 'ir.actions.client', 'tag': 'reload'}
return True
@api.model_create_multi
def create(self, vals_list):
@ -211,6 +317,12 @@ class MrpWorkcenter(models.Model):
action = self.env["ir.actions.actions"]._for_xml_id("mrp.action_work_orders")
return action
def action_work_order_alternatives(self):
action = self.env["ir.actions.actions"]._for_xml_id("mrp.mrp_workorder_todo")
action['domain'] = ['|', ('workcenter_id', 'in', self.alternative_workcenter_ids.ids),
('workcenter_id.alternative_workcenter_ids', '=', self.id)]
return action
def _get_unavailability_intervals(self, start_datetime, end_datetime):
"""Get the unavailabilities intervals for the workcenters in `self`.
@ -224,7 +336,7 @@ class MrpWorkcenter(models.Model):
unavailability_ressources = self.resource_id._get_unavailable_intervals(start_datetime, end_datetime)
return {wc.id: unavailability_ressources.get(wc.resource_id.id, []) for wc in self}
def _get_first_available_slot(self, start_datetime, duration):
def _get_first_available_slot(self, start_datetime, duration, forward=True, leaves_to_ignore=False, extra_leaves_slots=[]):
"""Get the first available interval for the workcenter in `self`.
The available interval is disjoinct with all other workorders planned on this workcenter, but
@ -232,44 +344,68 @@ class MrpWorkcenter(models.Model):
Return the first available interval (start datetime, end datetime) or,
if there is none before 700 days, a tuple error (False, 'error message').
:param start_datetime: begin the search at this datetime
:param duration: minutes needed to make the workorder (float)
:param start_datetime: begin the search at this datetime
:param forward: forward scheduling (search from start_datetime to 700 days after), or backward (from start_datetime to now)
:param leaves_to_ignore: typically, ignore allocated leave when re-planning a workorder
:param extra_leaves_slots: extra time slots (start, stop) to consider
:rtype: tuple
"""
self.ensure_one()
start_datetime, revert = make_aware(start_datetime)
ICP = self.env['ir.config_parameter'].sudo()
max_planning_iterations = max(int(ICP.get_param('mrp.workcenter_max_planning_iterations', '50')), 1)
resource = self.resource_id
get_available_intervals = partial(self.resource_calendar_id._work_intervals_batch, domain=[('time_type', 'in', ['other', 'leave'])], resources=resource, tz=timezone(self.resource_calendar_id.tz))
get_workorder_intervals = partial(self.resource_calendar_id._leave_intervals_batch, domain=[('time_type', '=', 'other')], resources=resource, tz=timezone(self.resource_calendar_id.tz))
revert = to_timezone(start_datetime.tzinfo)
start_datetime = localized(start_datetime)
get_available_intervals = partial(self.resource_calendar_id._work_intervals_batch, resources=resource, tz=timezone(self.resource_calendar_id.tz))
workorder_intervals_leaves_domain = [('time_type', '=', 'other')]
if leaves_to_ignore:
workorder_intervals_leaves_domain.append(('id', 'not in', leaves_to_ignore.ids))
get_workorder_intervals = partial(self.resource_calendar_id._leave_intervals_batch, domain=workorder_intervals_leaves_domain, resources=resource, tz=timezone(self.resource_calendar_id.tz))
extra_leaves_slots_intervals = Intervals([(localized(start), localized(stop), self.env['resource.calendar.attendance']) for start, stop in extra_leaves_slots])
remaining = duration
start_interval = start_datetime
remaining = duration = max(duration, 1 / 60)
now = localized(datetime.now())
delta = timedelta(days=14)
for n in range(50): # 50 * 14 = 700 days in advance (hardcoded)
dt = start_datetime + delta * n
available_intervals = get_available_intervals(dt, dt + delta)[resource.id]
workorder_intervals = get_workorder_intervals(dt, dt + delta)[resource.id]
for start, stop, dummy in available_intervals:
# Shouldn't loop more than 2 times because the available_intervals contains the workorder_intervals
# And remaining == duration can only occur at the first loop and at the interval intersection (cannot happen several time because available_intervals > workorder_intervals
for _i in range(2):
start_interval, stop_interval = None, None
for n in range(max_planning_iterations): # 50 * 14 = 700 days in advance
if forward:
date_start = start_datetime + delta * n
date_stop = date_start + delta
available_intervals = get_available_intervals(date_start, date_stop)[resource.id]
workorder_intervals = get_workorder_intervals(date_start, date_stop)[resource.id]
for start, stop, _records in available_intervals:
start_interval = start_interval or start
interval_minutes = (stop - start).total_seconds() / 60
# If the remaining minutes has never decrease update start_interval
if remaining == duration:
start_interval = start
# If there is a overlap between the possible available interval and a others WO
if Intervals([(start_interval, start + timedelta(minutes=min(remaining, interval_minutes)), dummy)]) & workorder_intervals:
remaining = duration
elif float_compare(interval_minutes, remaining, precision_digits=3) >= 0:
while (interval := Intervals([(start_interval or start, start + timedelta(minutes=min(remaining, interval_minutes)), _records)])) \
and (conflict := interval & workorder_intervals or interval & extra_leaves_slots_intervals):
(_start, start, _records) = conflict._items[0] # restart available interval at conflicting interval stop
interval_minutes = (stop - start).total_seconds() / 60
start_interval, remaining = start if interval_minutes else None, duration
if float_compare(interval_minutes, remaining, precision_digits=3) >= 0:
return revert(start_interval), revert(start + timedelta(minutes=remaining))
else:
# Decrease a part of the remaining duration
remaining -= interval_minutes
# Go to the next available interval because the possible current interval duration has been used
break
return False, 'Not available slot 700 days after the planned start'
remaining -= interval_minutes
else:
# same process but starting from end on reversed intervals
date_stop = start_datetime - delta * n
date_start = date_stop - delta
available_intervals = get_available_intervals(date_start, date_stop)[resource.id]
available_intervals = reversed(available_intervals)
workorder_intervals = get_workorder_intervals(date_start, date_stop)[resource.id]
for start, stop, _records in available_intervals:
stop_interval = stop_interval or stop
interval_minutes = (stop - start).total_seconds() / 60
while (interval := Intervals([(stop - timedelta(minutes=min(remaining, interval_minutes)), stop_interval or stop, _records)])) \
and (conflict := interval & workorder_intervals or interval & extra_leaves_slots_intervals):
(stop, _stop, _records) = conflict._items[0] # restart available interval at conflicting interval start
interval_minutes = (stop - start).total_seconds() / 60
stop_interval, remaining = stop if interval_minutes else None, duration
if float_compare(interval_minutes, remaining, precision_digits=3) >= 0:
return revert(stop - timedelta(minutes=remaining)), revert(stop_interval)
remaining -= interval_minutes
if date_start <= now:
break
return False, 'No available slot 700 days after the planned start'
def action_archive(self):
res = super().action_archive()
@ -288,21 +424,20 @@ class MrpWorkcenter(models.Model):
}
return res
def _get_capacity(self, product):
product_capacity = self.capacity_ids.filtered(lambda capacity: capacity.product_id == product)
return product_capacity.capacity if product_capacity else self.default_capacity
def _get_expected_duration(self, product_id):
"""Compute the expected duration when using this work-center
Always include workcenter startup time and clean-up time.
In case there are specific capacities defined in the workcenter
that matches the product we are producing. Add the extra-time.
"""
capacity = self.capacity_ids.filtered(lambda p: p.product_id == product_id)
return self.time_start + self.time_stop + (capacity.time_start + capacity.time_stop if capacity else 0.0)
def _get_capacity(self, product, unit, default_capacity=1):
capacity = self.capacity_ids.sorted(lambda c: (
not (c.product_id == product and c.product_uom_id == product.uom_id),
not (not c.product_id and c.product_uom_id == unit),
not (not c.product_id and c.product_uom_id == product.uom_id),
))[:1]
if capacity and capacity.product_id in [product, self.env['product.product']] and capacity.product_uom_id in [product.uom_id, unit]:
if float_is_zero(capacity.capacity, 0):
return (default_capacity, capacity.time_start, capacity.time_stop)
return (capacity.product_uom_id._compute_quantity(capacity.capacity, unit), capacity.time_start, capacity.time_stop)
return (default_capacity, self.time_start, self.time_stop)
class WorkcenterTag(models.Model):
class MrpWorkcenterTag(models.Model):
_name = 'mrp.workcenter.tag'
_description = 'Add tag for the workcenter'
_order = 'name'
@ -313,27 +448,24 @@ class WorkcenterTag(models.Model):
name = fields.Char("Tag Name", required=True)
color = fields.Integer("Color Index", default=_get_default_color)
_sql_constraints = [
('tag_name_unique', 'unique(name)',
'The tag name must be unique.'),
]
_tag_name_unique = models.Constraint(
'unique(name)',
'The tag name must be unique.',
)
class MrpWorkcenterProductivityLossType(models.Model):
_name = "mrp.workcenter.productivity.loss.type"
_name = 'mrp.workcenter.productivity.loss.type'
_description = 'MRP Workorder productivity losses'
_rec_name = 'loss_type'
@api.depends('loss_type')
def name_get(self):
def _compute_display_name(self):
""" As 'category' field in form view is a Many2one, its value will be in
lower case. In order to display its value capitalized 'name_get' is
lower case. In order to display its value capitalized 'display_name' is
overrided.
"""
result = []
for rec in self:
result.append((rec.id, rec.loss_type.title()))
return result
rec.display_name = rec.loss_type.title()
loss_type = fields.Selection([
('availability', 'Availability'),
@ -343,15 +475,15 @@ class MrpWorkcenterProductivityLossType(models.Model):
class MrpWorkcenterProductivityLoss(models.Model):
_name = "mrp.workcenter.productivity.loss"
_name = 'mrp.workcenter.productivity.loss'
_description = "Workcenter Productivity Losses"
_order = "sequence, id"
name = fields.Char('Blocking Reason', required=True)
name = fields.Char('Blocking Reason', required=True, translate=True)
sequence = fields.Integer('Sequence', default=1)
manual = fields.Boolean('Is a Blocking Reason', default=True)
loss_id = fields.Many2one('mrp.workcenter.productivity.loss.type', domain=([('loss_type', 'in', ['quality', 'availability'])]), string='Category')
loss_type = fields.Selection(string='Effectiveness Category', related='loss_id.loss_type', store=True, readonly=False)
loss_id = fields.Many2one('mrp.workcenter.productivity.loss.type', domain=[('loss_type', 'in', ['quality', 'availability'])], string='Category')
loss_type = fields.Selection(string='Effectiveness Category', related='loss_id.loss_type', readonly=False)
def _convert_to_duration(self, date_start, date_stop, workcenter=False):
""" Convert a date range into a duration in minutes.
@ -368,8 +500,9 @@ class MrpWorkcenterProductivityLoss(models.Model):
duration = max(duration, (date_stop - date_start).total_seconds() / 60.0)
return round(duration, 2)
class MrpWorkcenterProductivity(models.Model):
_name = "mrp.workcenter.productivity"
_name = 'mrp.workcenter.productivity'
_description = "Workcenter Productivity Log"
_order = "id desc"
_rec_name = "loss_id"
@ -402,7 +535,7 @@ class MrpWorkcenterProductivity(models.Model):
'mrp.workcenter.productivity.loss', "Loss Reason",
ondelete='restrict', required=True)
loss_type = fields.Selection(
string="Effectiveness", related='loss_id.loss_type', store=True, readonly=False)
string="Effectiveness", related='loss_id.loss_type', readonly=False)
description = fields.Text('Description')
date_start = fields.Datetime('Start Date', default=fields.Datetime.now, required=True)
date_end = fields.Datetime('End Date')
@ -416,17 +549,48 @@ class MrpWorkcenterProductivity(models.Model):
else:
blocktime.duration = 0.0
@api.onchange('duration')
def _duration_changed(self):
if not self.date_end:
return
self.date_start = self.date_end - timedelta(minutes=self.duration)
self._loss_type_change()
@api.onchange('date_start')
def _date_start_changed(self):
if not self.date_start:
return
self.date_end = self.date_start + timedelta(minutes=self.duration)
self._loss_type_change()
@api.onchange('date_end')
def _date_end_changed(self):
if not self.date_end:
return
self.date_start = self.date_end - timedelta(minutes=self.duration)
self._loss_type_change()
@api.constrains('workorder_id')
def _check_open_time_ids(self):
for workorder in self.workorder_id:
open_time_ids_by_user = self.env["mrp.workcenter.productivity"].read_group([("id", "in", workorder.time_ids.ids), ("date_end", "=", False)], ["user_id", "open_time_ids_count:count(id)"], ["user_id"])
if any(data["open_time_ids_count"] > 1 for data in open_time_ids_by_user):
open_time_ids_by_user = self.env["mrp.workcenter.productivity"]._read_group(
[("id", "in", workorder.time_ids.ids), ("date_end", "=", False)],
["user_id"], having=[("__count", ">", 1)],
)
if open_time_ids_by_user:
raise ValidationError(_('The Workorder (%s) cannot be started twice!', workorder.display_name))
def button_block(self):
self.ensure_one()
self.workcenter_id.order_ids.end_all()
def _loss_type_change(self):
self.ensure_one()
if self.workorder_id.duration > self.workorder_id.duration_expected:
self.loss_id = self.env.ref("mrp.block_reason4").id
else:
self.loss_id = self.env.ref("mrp.block_reason7").id
def _close(self):
underperformance_timers = self.env['mrp.workcenter.productivity']
for timer in self:
@ -446,19 +610,37 @@ class MrpWorkcenterProductivity(models.Model):
underperformance_timers.write({'loss_id': underperformance_type.id})
class MrpWorkCenterCapacity(models.Model):
class MrpWorkcenterCapacity(models.Model):
_name = 'mrp.workcenter.capacity'
_description = 'Work Center Capacity'
_check_company_auto = True
workcenter_id = fields.Many2one('mrp.workcenter', string='Work Center', required=True)
product_id = fields.Many2one('product.product', string='Product', required=True)
product_uom_id = fields.Many2one('uom.uom', string='Product UoM', related='product_id.uom_id')
capacity = fields.Float('Capacity', default=1.0, help="Number of pieces that can be produced in parallel for this product.")
time_start = fields.Float('Setup Time (minutes)', help="Additional time in minutes for the setup.")
time_stop = fields.Float('Cleanup Time (minutes)', help="Additional time in minutes for the cleaning.")
def _default_time_start(self):
workcenter_id = self.workcenter_id.id or self.env.context.get('default_workcenter_id')
return self.env['mrp.workcenter'].browse(workcenter_id).time_start if workcenter_id else 0.0
_sql_constraints = [
('positive_capacity', 'CHECK(capacity > 0)', 'Capacity should be a positive number.'),
('unique_product', 'UNIQUE(workcenter_id, product_id)', 'Product capacity should be unique for each workcenter.'),
]
def _default_time_stop(self):
workcenter_id = self.workcenter_id.id or self.env.context.get('default_workcenter_id')
return self.env['mrp.workcenter'].browse(workcenter_id).time_stop if workcenter_id else 0.0
workcenter_id = fields.Many2one('mrp.workcenter', string='Work Center', required=True, index=True)
product_id = fields.Many2one('product.product', string='Product')
product_uom_id = fields.Many2one('uom.uom', string='Unit',
compute="_compute_product_uom_id", precompute=True, store=True, readonly=False, required=True)
capacity = fields.Float('Capacity', help="Number of pieces that can be produced in parallel for this product or for all, depending on the unit.")
time_start = fields.Float('Setup Time (minutes)', default=_default_time_start, help="Time in minutes for the setup.")
time_stop = fields.Float('Cleanup Time (minutes)', default=_default_time_stop, help="Time in minutes for the cleaning.")
_positive_capacity = models.Constraint(
'CHECK(capacity >= 0)',
'Capacity should be a non-negative number.',
)
_workcenter_product_product_uom_unique = models.UniqueIndex(
'(workcenter_id, COALESCE(product_id, 0), product_uom_id)',
'Product/Unit capacity should be unique for each workcenter.'
)
@api.depends('product_id')
def _compute_product_uom_id(self):
for capacity in self:
capacity.product_uom_id = capacity.product_id.uom_id or self.env.ref('uom.product_uom_unit')