oca-technical/odoo-bringout-oca-ddmrp-ddmrp/ddmrp/models/stock_buffer.py
2025-08-29 15:43:03 +02:00

2127 lines
78 KiB
Python

# Copyright 2019-20 ForgeFlow S.L. (http://www.forgeflow.com)
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl.html).
import json
import logging
import operator as py_operator
import threading
from collections import defaultdict
from datetime import datetime, timedelta
from math import pi
from odoo import _, api, exceptions, fields, models
from odoo.exceptions import ValidationError
from odoo.tools import float_compare, float_round
from odoo.tools.misc import split_every
_logger = logging.getLogger(__name__)
try:
from bokeh.embed import components
from bokeh.models import (
ColumnDataSource,
DatetimeTickFormatter,
HoverTool,
LabelSet,
Legend,
)
from bokeh.plotting import figure
from bokeh.util.serialization import convert_datetime_type
except (ImportError, IOError) as err:
_logger.debug(err)
OPERATORS = {
"<": py_operator.lt,
">": py_operator.gt,
"<=": py_operator.le,
">=": py_operator.ge,
"==": py_operator.eq,
"=": py_operator.eq,
"!=": py_operator.ne,
}
_PRIORITY_LEVEL = [("1_red", "Red"), ("2_yellow", "Yellow"), ("3_green", "Green")]
DDMRP_COLOR = {
"0_dark_red": "#8B0000",
"1_red": "#ff0000",
"2_yellow": "#ffff00",
"3_green": "#33cc33",
}
class StockBuffer(models.Model):
_name = "stock.buffer"
_description = "Stock Buffer"
_order = "planning_priority_level asc, net_flow_position_percent asc"
CRON_DDMRP_CHUNKS = 50
@api.model
def default_get(self, fields):
res = super().default_get(fields)
warehouse = None
if "warehouse_id" not in res and res.get("company_id"):
warehouse = self.env["stock.warehouse"].search(
[("company_id", "=", res["company_id"])], limit=1
)
if warehouse:
res["warehouse_id"] = warehouse.id
res["location_id"] = warehouse.lot_stock_id.id
return res
name = fields.Char(
copy=False,
required=True,
default=lambda self: self.env["ir.sequence"].next_by_code("stock.buffer"),
)
active = fields.Boolean(default=True)
warehouse_id = fields.Many2one(
comodel_name="stock.warehouse",
string="Warehouse",
ondelete="cascade",
required=True,
)
location_id = fields.Many2one(
comodel_name="stock.location",
string="Location",
ondelete="cascade",
required=True,
)
product_id = fields.Many2one(
comodel_name="product.product",
string="Product",
domain=[("type", "=", "product")],
ondelete="cascade",
required=True,
)
product_uom = fields.Many2one(
related="product_id.uom_id",
)
product_categ_id = fields.Many2one(
comodel_name="product.category",
string="Product Category",
related="product_id.categ_id",
store=True,
readonly=True,
)
# TODO: fix in method _compute_procure_recommended_qty.
# not sure maybe they are useful for tweak batches like in multi level mrp
procure_min_qty = fields.Float(
string="Minimum Procure Batch",
digits="Product Unit of Measure",
help="Minimum qty for a single procurement",
)
procure_max_qty = fields.Float(
string="Maximum Procure Batch",
digits="Product Unit of Measure",
help="Maximum qty for a single procurement",
)
qty_multiple = fields.Float(
digits="Product Unit of Measure",
default=1,
required=True,
help="The procurement quantity will be rounded up to this multiple. "
"If it is 0, the exact quantity will be used.",
)
group_id = fields.Many2one(
comodel_name="procurement.group",
string="Procurement Group",
copy=False,
help="Moves created through this buffer will be put in this "
"procurement group. If none is given, the moves generated by "
"stock rules will be grouped into one big picking.",
)
company_id = fields.Many2one(
comodel_name="res.company",
string="Company",
required=True,
default=lambda self: self.env.company,
)
# TODO: rename to manual LT ??
lead_days = fields.Integer(
"Lead Time (Distributed)",
default=1,
help="Lead time for distributed products.",
)
_sql_constraints = [
(
"qty_multiple_check",
"CHECK( qty_multiple >= 0 )",
"Qty Multiple must be greater than or equal to zero.",
),
(
"stock_buffer_uniq",
"unique(product_id, location_id)",
"The product/location combination must be unique."
"Remember that the buffer could be archived.",
),
]
def _quantity_in_progress(self):
"""Return Quantities that are not yet in virtual stock but should
be deduced from buffers (example: purchases created from buffers)"""
res = {}.fromkeys(self.ids, 0.0)
for buffer in self:
polines = buffer._get_rfq_dlt(dlt_interval=None)
for line in polines:
res[buffer.id] += line.product_uom._compute_quantity(
line.product_qty, buffer.product_uom, round=False
)
return res
def action_view_purchase(self):
action = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
# Remove the context since the action basically display RFQ and not PO.
action["context"] = {}
order_line_ids = self.env["purchase.order.line"].search(
[("buffer_ids", "in", self.ids)]
)
purchase_ids = order_line_ids.mapped("order_id")
action["domain"] = [("id", "in", purchase_ids.ids)]
return action
def action_view_yearly_consumption(self):
action = self.env["ir.actions.actions"]._for_xml_id(
"ddmrp.stock_move_year_consumption_action"
)
locations = (
self.env["stock.location"]
.with_context(active_test=False)
.search([("id", "child_of", self.location_id.ids)])
)
date_to = fields.Date.today()
# We take last five years, even though they will be initially
# filtered in the action to show only last year.
date_from = date_to - timedelta(days=5 * 365)
action["domain"] = self._past_moves_domain(date_from, date_to, locations)
return action
def _demand_estimate_domain(self, locations, date_from=False, date_to=False):
self.ensure_one()
domain = [
("location_id", "in", locations.ids),
("product_id", "=", self.product_id.id),
]
if date_to:
domain += [("date_from", "<=", date_to)]
if date_from:
domain += [("date_to", ">=", date_from)]
return domain
def action_view_stock_demand_estimates(self):
result = self.env["ir.actions.actions"]._for_xml_id(
"stock_demand_estimate.stock_demand_estimate_action"
)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
domain = self._demand_estimate_domain(locations)
recs = self.env["stock.demand.estimate"].search(domain)
result["domain"] = [("id", "in", recs.ids)]
return result
def action_view_bom(self):
action = self.product_id.action_view_bom()
boms = self._get_manufactured_bom(limit=100)
action["domain"] = [("id", "in", boms.ids)]
action["context"] = {
"location_id": self.location_id.id,
"warehouse_id": self.location_id.warehouse_id.id,
}
return action
@api.constrains("product_id")
def _check_product_uom(self):
if any(
buffer.product_id.uom_id.category_id != buffer.product_uom.category_id
for buffer in self
):
raise ValidationError(
_(
"You have to select a product unit of measure that is in"
"the same category than the default unit of"
"measure of the product"
)
)
@api.onchange("warehouse_id")
def onchange_warehouse_id(self):
if self.warehouse_id:
self.location_id = self.warehouse_id.lot_stock_id.id
@api.onchange("product_id")
def onchange_product_id(self):
if self.product_id:
self.product_uom = self.product_id.uom_id.id
return {
"domain": {
"product_uom": [
("category_id", "=", self.product_id.uom_id.category_id.id)
]
}
}
return {"domain": {"product_uom": []}}
def _prepare_procurement_values(
self,
product_qty,
date=False,
group=False,
):
"""Prepare specific key for moves or other components that will be
created from a stock rule comming from a buffer. This method could
be override in order to add other custom key that could
be used in move/po creation.
"""
return {
"date_planned": date or self._get_date_planned(),
"warehouse_id": self.warehouse_id,
"buffer_id": self,
"company_id": self.company_id,
"group_id": group or self.group_id,
}
# MANUAL PROCUREMENT AND UOM
def _get_date_planned(self, force_lt=None):
self.ensure_one()
profile = self.buffer_profile_id
dlt = int(self.dlt)
if force_lt and isinstance(force_lt, (int, float)):
dlt = force_lt
if profile.item_type == "distributed":
max_proc_time = profile.distributed_reschedule_max_proc_time
else:
max_proc_time = 0
# For purchased items we always consider calendar days,
# not work days.
if profile.item_type == "purchased":
dt_planned = fields.datetime.today() + timedelta(days=dlt)
else:
if self.warehouse_id.calendar_id:
dt_planned = self.warehouse_id.wh_plan_days(fields.datetime.now(), dlt)
if max_proc_time:
calendar = self.warehouse_id.calendar_id
# We found the day with "wh_plan_day", now determine
# the first available hour in the day (wh_plan_day returns
# the stop hour), and add the procurement time.
dt_planned = calendar.plan_hours(
# expect hours
max_proc_time / 60,
# start from the first working hours available
dt_planned.replace(hour=0, minute=0, second=0),
)
else:
dt_planned = (
fields.datetime.now()
+ timedelta(days=dlt)
+ timedelta(minutes=max_proc_time)
)
return dt_planned
procure_recommended_qty = fields.Float(
string="Procure Recommendation",
compute="_compute_procure_recommended_qty",
store=True,
)
procure_uom_id = fields.Many2one(
comodel_name="uom.uom",
string="Procurement UoM",
compute="_compute_procure_uom_id",
readonly=False,
store=True,
)
@api.constrains("product_id", "procure_uom_id")
def _check_procure_uom(self):
if any(
buffer.product_uom
and buffer.procure_uom_id
and buffer.product_uom.category_id != buffer.procure_uom_id.category_id
for buffer in self
):
raise ValidationError(
_(
"Error: The product default Unit of Measure and the "
"procurement Unit of Measure must be in the same category."
)
)
# STOCK INFORMATION:
product_location_qty_available_not_res = fields.Float(
string="Quantity On Hand (Unreserved)",
help="Quantity available in this stock buffer, this is the total "
"quantity on hand minus the outgoing reservations.",
readonly=True,
)
def _get_outgoing_reservation_qty(self):
"""Return the qty reserved in operations that move products outside
of the buffer in the UoM of the product."""
domain = [
("product_id", "=", self.product_id.id),
("state", "in", ("partially_available", "assigned")),
]
lines = self.env["stock.move.line"].search(domain)
lines = lines.filtered(
lambda line: line.location_id.is_sublocation_of(self.location_id)
and not line.location_dest_id.is_sublocation_of(self.location_id)
)
return sum(lines.mapped("reserved_qty"))
def _update_quantities_dict(self, product):
self.ensure_one()
reserved_qty = self._get_outgoing_reservation_qty()
self.update(
{
"product_location_qty_available_not_res": product["qty_available"]
- reserved_qty,
}
)
def _calc_product_available_qty(self):
operation_by_location = defaultdict(lambda: self.browse())
for rec in self:
operation_by_location[rec.location_id] |= rec
for location_id, buffer_in_location in operation_by_location.items():
products = (
buffer_in_location.mapped("product_id")
.with_context(location=location_id.id)
._compute_quantities_dict(
lot_id=self.env.context.get("lot_id"),
owner_id=self.env.context.get("owner_id"),
package_id=self.env.context.get("package_id"),
)
)
for buffer in buffer_in_location:
product = products[buffer.product_id.id]
buffer._update_quantities_dict(product)
# PURCHASES LINK:
purchase_line_ids = fields.Many2many(
comodel_name="purchase.order.line",
string="Purchase Order Lines",
copy=False,
readonly=True,
)
# MRP LINK:
def action_view_mrp_productions(self):
result = self.env["ir.actions.actions"]._for_xml_id("mrp.mrp_production_action")
result["context"] = {}
mrp_production_ids = self.env["mrp.production"].search(
[("buffer_id", "=", self.id)]
)
result["domain"] = [("id", "in", mrp_production_ids.ids)]
return result
product_type = fields.Selection(related="product_id.type", readonly=True)
used_in_bom_count = fields.Integer(related="product_id.used_in_bom_count")
def action_used_in_bom(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("mrp.mrp_bom_form_action")
action["domain"] = [("bom_line_ids.product_id", "=", self.product_id.id)]
return action
# DDMRP SPECIFIC:
@api.depends(
"dlt",
"extra_lead_time",
"adu",
"buffer_profile_id.lead_time_id.factor",
"red_override",
"buffer_profile_id.variability_id.factor",
"product_uom.rounding",
"lead_days",
"product_id.seller_ids.delay",
)
def _compute_red_zone(self):
for rec in self:
if rec.product_id and rec.replenish_method in ["replenish", "min_max"]:
dlt = rec.dlt + rec.extra_lead_time
rec.red_base_qty = float_round(
dlt * rec.adu * rec.buffer_profile_id.lead_time_id.factor,
precision_rounding=rec.product_uom.rounding,
)
rec.red_safety_qty = float_round(
rec.red_base_qty * rec.buffer_profile_id.variability_id.factor,
precision_rounding=rec.product_uom.rounding,
)
rec.red_zone_qty = rec.red_base_qty + rec.red_safety_qty
elif rec.product_id and rec.replenish_method == "replenish_override":
rec.red_zone_qty = rec.red_override
else:
rec.red_zone_qty = 0.0
@api.depends(
"dlt",
"extra_lead_time",
"adu",
"buffer_profile_id.lead_time_id.factor",
"order_cycle",
"minimum_order_quantity",
"product_uom.rounding",
"green_override",
"top_of_yellow",
)
def _compute_green_zone(self):
for rec in self:
if rec.product_id and rec.replenish_method in ["replenish", "min_max"]:
# Using imposed or desired minimum order cycle
rec.green_zone_oc = float_round(
rec.order_cycle * rec.adu,
precision_rounding=rec.product_uom.rounding,
)
# Using lead time factor
dlt = rec.dlt + rec.extra_lead_time
rec.green_zone_lt_factor = float_round(
dlt * rec.adu * rec.buffer_profile_id.lead_time_id.factor,
precision_rounding=rec.product_uom.rounding,
)
# Using minimum order quantity
rec.green_zone_moq = float_round(
rec.minimum_order_quantity,
precision_rounding=rec.product_uom.rounding,
)
# The biggest option of the above will be used as the green
# zone value
rec.green_zone_qty = max(
rec.green_zone_oc, rec.green_zone_lt_factor, rec.green_zone_moq
)
elif rec.product_id and rec.replenish_method == "replenish_override":
rec.green_zone_qty = rec.green_override
else:
rec.green_zone_qty = 0.0
rec.top_of_green = rec.green_zone_qty + rec.top_of_yellow
@api.depends(
"dlt",
"extra_lead_time",
"adu",
"buffer_profile_id.lead_time_id.factor",
"buffer_profile_id.variability_id.factor",
"buffer_profile_id.replenish_method",
"order_cycle",
"minimum_order_quantity",
"product_uom.rounding",
"yellow_override",
"red_zone_qty",
)
def _compute_yellow_zone(self):
for rec in self:
if rec.product_id and rec.replenish_method == "min_max":
rec.yellow_zone_qty = 0
elif rec.product_id and rec.replenish_method == "replenish":
dlt = rec.dlt + rec.extra_lead_time
rec.yellow_zone_qty = float_round(
dlt * rec.adu, precision_rounding=rec.product_uom.rounding
)
elif rec.product_id and rec.replenish_method == "replenish_override":
rec.yellow_zone_qty = rec.yellow_override
else:
rec.yellow_zone_qty = 0.0
rec.top_of_yellow = rec.yellow_zone_qty + rec.red_zone_qty
@api.depends(
"net_flow_position",
"top_of_green",
"qty_multiple",
"product_uom",
"procure_uom_id",
"product_uom.rounding",
)
def _compute_procure_recommended_qty(self):
subtract_qty = self.sudo()._quantity_in_progress()
for rec in self:
procure_recommended_qty = 0.0
# uses _origin because onchange uses a NewId with the record wrapped
if rec._origin and rec.net_flow_position < rec.top_of_yellow:
qty = (
rec.top_of_green
- rec.net_flow_position
- subtract_qty[rec._origin.id]
)
if qty >= 0.0:
procure_recommended_qty = qty
elif rec._origin:
if subtract_qty[rec._origin.id] > 0.0:
procure_recommended_qty -= subtract_qty[rec._origin.id]
adjusted_qty = 0.0
if procure_recommended_qty > 0.0:
adjusted_qty = rec._adjust_procure_qty(procure_recommended_qty)
rec.procure_recommended_qty = adjusted_qty
@api.depends("product_uom")
def _compute_procure_uom_id(self):
for rec in self:
rec.procure_uom_id = rec.product_uom.id
def _adjust_procure_qty(self, qty):
self.ensure_one()
# If there is a procure UoM we apply it before anything.
# This means max, min and multiple quantities are relative to
# the procure UoM.
if self.procure_uom_id:
rounding = self.procure_uom_id.rounding
adjusted_qty = self.product_id.uom_id._compute_quantity(
qty, self.procure_uom_id
)
else:
rounding = self.product_uom.rounding
adjusted_qty = qty
# Apply qty multiple and minimum quantity (maximum quantity
# applies on the procure wizard)
remainder = self.qty_multiple > 0 and adjusted_qty % self.qty_multiple or 0.0
multiple_tolerance = self.qty_multiple * (
self.company_id.ddmrp_qty_multiple_tolerance / 100
)
if (
float_compare(remainder, multiple_tolerance, precision_rounding=rounding)
> 0
):
adjusted_qty += self.qty_multiple - remainder
elif float_compare(remainder, 0.0, precision_rounding=rounding) > 0:
adjusted_qty -= remainder
if (
float_compare(
adjusted_qty, self.procure_min_qty, precision_rounding=rounding
)
< 0
):
adjusted_qty = self.procure_min_qty
return adjusted_qty
def _compute_ddmrp_chart_planning(self):
"""This method use the Bokeh library to create a buffer depiction."""
for rec in self:
div, script = rec.get_ddmrp_chart_planning()
json_data = json.dumps(
{
"div": div,
"script": script,
}
)
rec.ddmrp_chart = json_data
def _compute_ddmrp_chart_execution(self):
for rec in self:
div, script = rec.get_ddmrp_chart_execution()
json_data = json.dumps(
{
"div": div,
"script": script,
}
)
rec.ddmrp_chart_execution = json_data
def _get_colors_hex_map(self, pallete="planning"):
return DDMRP_COLOR
def get_ddmrp_chart_planning(self):
p = figure(frame_width=300, frame_height=400, y_axis_label="Quantity")
p.xaxis.visible = False
p.toolbar.logo = None
hex_colors = self._get_colors_hex_map(pallete="planning")
red = p.vbar(
x=1,
bottom=0,
top=self.top_of_red,
width=1,
color=hex_colors.get("1_red", "red"),
)
yellow = p.vbar(
x=1,
bottom=self.top_of_red,
top=self.top_of_yellow,
width=1,
color=hex_colors.get("2_yellow", "yellow"),
)
green = p.vbar(
x=1,
bottom=self.top_of_yellow,
top=self.top_of_green,
width=1,
color=hex_colors.get("3_green", "green"),
)
net_flow = p.line(
[0, 2], [self.net_flow_position, self.net_flow_position], line_width=2
)
on_hand = p.line(
[0, 2],
[
self.product_location_qty_available_not_res,
self.product_location_qty_available_not_res,
],
line_width=2,
line_dash="dotted",
)
legend = Legend(
items=[
("Red zone", [red]),
("Yellow zone", [yellow]),
("Green zone", [green]),
("Net Flow Position", [net_flow]),
("On-Hand Position (Unreserved)", [on_hand]),
],
)
labels_source_data = {
"height": [
self.net_flow_position,
self.product_location_qty_available_not_res,
self.top_of_red,
self.top_of_yellow,
self.top_of_green,
],
"weight": [0.25, 1.75, 1, 1, 1],
"names": [
str(self.net_flow_position),
str(self.product_location_qty_available_not_res),
str(self.top_of_red),
str(self.top_of_yellow),
str(self.top_of_green),
],
}
source = ColumnDataSource(data=labels_source_data)
labels = LabelSet(
x="weight",
y="height",
text="names",
y_offset=1,
text_font_size="8pt",
source=source,
text_align="center",
)
p.add_layout(labels)
p.add_layout(legend, "below")
script, div = components(p, wrap_script=False)
return div, script
def get_ddmrp_chart_execution(self):
p = figure(frame_width=300, frame_height=400, y_axis_label="Quantity")
p.xaxis.visible = False
p.toolbar.logo = None
tor_exec = float_round(
self.top_of_red / 2,
precision_rounding=self.product_uom.rounding,
)
toy_exec = self.top_of_red
tog_exec = float_round(
self.top_of_red + self.green_zone_qty,
precision_rounding=self.product_uom.rounding,
)
tor2_exec = self.top_of_green
toy2_exec = (tor2_exec + tog_exec) / 2
hex_colors = self._get_colors_hex_map(pallete="execution")
red = p.vbar(
x=1,
bottom=0,
top=tor_exec,
width=1,
color=hex_colors.get("1_red", "red"),
)
yellow = p.vbar(
x=1,
bottom=tor_exec,
top=toy_exec,
width=1,
color=hex_colors.get("2_yellow", "yellow"),
)
green = p.vbar(
x=1,
bottom=toy_exec,
top=tog_exec,
width=1,
color=hex_colors.get("3_green", "green"),
)
yellow_2 = p.vbar(
x=1,
bottom=tog_exec,
top=toy2_exec,
width=1,
color=hex_colors.get("2_yellow", "yellow"),
)
red_2 = p.vbar(
x=1,
bottom=toy2_exec,
top=tor2_exec,
width=1,
color=hex_colors.get("1_red", "red"),
)
on_hand = p.line(
[0, 2],
[
self.product_location_qty_available_not_res,
self.product_location_qty_available_not_res,
],
line_width=2,
line_dash="dotted",
)
legend = Legend(
items=[
("Red zone (Execution)", [red, red_2]),
("Yellow zone (Execution)", [yellow, yellow_2]),
("Green zone (Execution)", [green]),
("On-Hand Position (Unreserved)", [on_hand]),
]
)
labels_source_data = {
"height": [
self.product_location_qty_available_not_res,
tor_exec,
toy_exec,
tog_exec,
toy2_exec,
],
"weight": [0.25, 1, 1, 1, 1],
"names": [
str(self.product_location_qty_available_not_res),
str(tor_exec),
str(toy_exec),
str(tog_exec),
str(toy2_exec),
],
}
source = ColumnDataSource(data=labels_source_data)
labels = LabelSet(
x="weight",
y="height",
text="names",
y_offset=1,
text_font_size="8pt",
source=source,
text_align="center",
)
p.add_layout(labels)
p.add_layout(legend, "below")
script, div = components(p, wrap_script=False)
return div, script
def _compute_ddmrp_demand_supply_chart(self):
for rec in self:
if not rec.buffer_profile_id:
# Not a buffer, skip.
rec.ddmrp_demand_chart = ""
rec.ddmrp_supply_chart = ""
continue
# Prepare data:
demand_data = rec._get_demand_by_days(rec.qualified_demand_stock_move_ids)
mrp_data = rec._get_qualified_mrp_moves(rec.qualified_demand_mrp_move_ids)
supply_data = rec._get_incoming_by_days()
width = timedelta(days=0.4)
date_format = (
self.env["res.lang"]._lang_get(self.env.lang or "en_US").date_format
)
# Plot demand data:
if demand_data or mrp_data:
x_demand = list(convert_datetime_type(x) for x in demand_data.keys())
y_demand = list(demand_data.values())
x_mrp = list(convert_datetime_type(x) for x in mrp_data.keys())
y_mrp = list(mrp_data.values())
p = figure(
frame_width=500,
frame_height=400,
y_axis_label="Quantity",
x_axis_type="datetime",
)
p.toolbar.logo = None
p.sizing_mode = "stretch_both"
# TODO: # p.xaxis.label_text_font = "helvetica"
p.xaxis.formatter = DatetimeTickFormatter(
hours=date_format,
days=date_format,
months=date_format,
years=date_format,
)
p.xaxis.major_label_orientation = pi / 4
if demand_data:
p.vbar(
x=x_demand,
width=width,
bottom=0,
top=y_demand,
color="firebrick",
)
if mrp_data:
p.vbar(
x=x_mrp, width=width, bottom=0, top=y_mrp, color="lightsalmon"
)
p.line(
[
datetime.today() - timedelta(days=1),
datetime.today() + timedelta(days=rec.order_spike_horizon),
],
[rec.order_spike_threshold, rec.order_spike_threshold],
line_width=2,
line_dash="dashed",
)
unit = rec.product_uom.name
hover = HoverTool(
tooltips=[("qty", "$y %s" % unit)], point_policy="follow_mouse"
)
p.add_tools(hover)
script, div = components(p, wrap_script=False)
json_data = json.dumps(
{
"div": div,
"script": script,
}
)
rec.ddmrp_demand_chart = json_data
else:
rec.ddmrp_demand_chart = json.dumps(
{
"div": _("No demand detected."),
"script": "",
}
)
# Plot supply data:
if supply_data:
x_supply = list(convert_datetime_type(x) for x in supply_data.keys())
y_supply = list(supply_data.values())
p = figure(
frame_width=500,
frame_height=400,
y_axis_label="Quantity",
x_axis_type="datetime",
)
p.toolbar.logo = None
p.sizing_mode = "stretch_both"
p.xaxis.formatter = DatetimeTickFormatter(
hours=date_format,
days=date_format,
months=date_format,
years=date_format,
)
p.xaxis.major_label_orientation = pi / 4
# White line to have similar proportion to demand chart.
p.line(
[
datetime.today() - timedelta(days=1),
datetime.today() + timedelta(days=rec.order_spike_horizon),
],
[rec.order_spike_threshold, rec.order_spike_threshold],
line_width=2,
line_dash="dashed",
color="white",
)
p.vbar(x=x_supply, width=width, bottom=0, top=y_supply, color="grey")
unit = rec.product_uom.name
hover = HoverTool(
tooltips=[("qty", "$y %s" % unit)], point_policy="follow_mouse"
)
p.add_tools(hover)
script, div = components(p, wrap_script=False)
json_data = json.dumps(
{
"div": div,
"script": script,
}
)
rec.ddmrp_supply_chart = json_data
else:
rec.ddmrp_supply_chart = json.dumps(
{
"div": _("No supply detected."),
"script": "",
}
)
@api.depends("red_zone_qty")
def _compute_order_spike_threshold(self):
for rec in self:
rec.order_spike_threshold = 0.5 * rec.red_zone_qty
def _get_manufactured_bom(self, limit=1):
return self.env["mrp.bom"].search(
[
("type", "=", "normal"),
"|",
("product_id", "=", self.product_id.id),
("product_tmpl_id", "=", self.product_id.product_tmpl_id.id),
"|",
("company_id", "=", self.company_id.id),
("company_id", "=", False),
],
limit=limit,
)
@api.depends("lead_days", "product_id.seller_ids.delay")
def _compute_dlt(self):
for rec in self:
if rec.buffer_profile_id.item_type == "manufactured":
bom = rec._get_manufactured_bom()
dlt = bom.with_context(location_id=rec.location_id.id).dlt
elif rec.buffer_profile_id.item_type == "distributed":
dlt = rec.lead_days
else:
sellers = rec._get_product_sellers()
dlt = sellers and fields.first(sellers).delay or rec.lead_days
rec.dlt = dlt
def _get_product_sellers(self):
""":returns the default sellers for a single buffer."""
self.ensure_one()
all_sellers = self.product_id.seller_ids.filtered(
lambda r: not r.company_id or r.company_id == self.company_id
)
today = fields.Date.context_today(self)
sellers = all_sellers.filtered(
lambda s: (
(s.product_id == self.product_id or not s.product_id)
and (
(s.date_start <= today if s.date_start else True)
and (s.date_end >= today if s.date_end else True)
)
)
)
if not sellers:
# fallback to all sellers
sellers = all_sellers
# When the current transaction changed the sequence, it may happen that
# the sellers' recordset is not correctly sorted by default.
sellers = sellers.sorted(key=lambda s: (s.sequence, -s.min_qty, s.price))
return sellers
@api.depends(
"buffer_profile_id",
"item_type",
"product_id.seller_ids",
"product_id.seller_ids.company_id",
"product_id.seller_ids.partner_id",
"product_id.seller_ids.product_id",
"product_id.seller_ids.sequence",
"product_id.seller_ids.min_qty",
"product_id.seller_ids.price",
)
def _compute_main_supplier(self):
for rec in self:
if rec.item_type == "purchased":
suppliers = rec._get_product_sellers()
rec.main_supplier_id = suppliers[0].partner_id if suppliers else False
else:
rec.main_supplier_id = False
@api.depends("main_supplier_id", "product_id.seller_ids")
def _compute_product_vendor_code(self):
for rec in self:
if not rec.main_supplier_id:
rec.product_vendor_code = False
continue
supplier_info = rec._get_product_sellers().filtered(
lambda r: r.partner_id == rec.main_supplier_id
and r.product_id == rec.product_id
)
rec.product_vendor_code = fields.first(supplier_info).product_code
buffer_profile_id = fields.Many2one(
comodel_name="stock.buffer.profile",
string="Buffer Profile",
required=True,
)
replenish_method = fields.Selection(
related="buffer_profile_id.replenish_method",
)
item_type = fields.Selection(
related="buffer_profile_id.item_type",
)
main_supplier_id = fields.Many2one(
comodel_name="res.partner",
string="Main Supplier",
help=(
"The main supplier is the first listed supplier defined "
"on the product that is valid for this product variant. "
"Any date restrictions are not taken into account."
),
compute="_compute_main_supplier",
store=True,
index=True,
)
green_override = fields.Float(
string="Green Zone (Override)",
)
yellow_override = fields.Float(
string="Yellow Zone (Override)",
)
red_override = fields.Float(
string="Red Zone (Override)",
)
product_vendor_code = fields.Char(
compute="_compute_product_vendor_code", string="Vendor Code"
)
dlt = fields.Float(
string="DLT (days)",
compute="_compute_dlt",
help="Decoupled Lead Time (days)",
)
adu = fields.Float(
string="ADU",
default=0.0,
digits="Average Daily Usage",
readonly=True,
help="Average Daily Usage",
)
adu_calculation_method = fields.Many2one(
comodel_name="product.adu.calculation.method",
string="ADU calculation method",
required=True,
)
adu_calculation_method_type = fields.Selection(
related="adu_calculation_method.method",
)
adu_fixed = fields.Float(
string="Fixed ADU",
default=1.0,
digits="Average Daily Usage",
)
order_cycle = fields.Float(string="Minimum Order Cycle (days)")
minimum_order_quantity = fields.Float(
digits="Product Unit of Measure",
)
red_base_qty = fields.Float(
compute="_compute_red_zone",
digits="Product Unit of Measure",
store=True,
)
red_safety_qty = fields.Float(
compute="_compute_red_zone",
digits="Product Unit of Measure",
store=True,
)
red_zone_qty = fields.Float(
compute="_compute_red_zone",
digits="Product Unit of Measure",
store=True,
)
top_of_red = fields.Float(
string="Top Of Red",
related="red_zone_qty",
store=True,
)
green_zone_qty = fields.Float(
compute="_compute_green_zone",
digits="Product Unit of Measure",
store=True,
)
green_zone_lt_factor = fields.Float(
string="Green Zone Lead Time Factor",
compute="_compute_green_zone",
store=True,
help="Green zone Lead Time Factor",
)
green_zone_moq = fields.Float(
string="Green Zone Minimum Order Quantity",
compute="_compute_green_zone",
digits="Product Unit of Measure",
store=True,
help="Green zone qty option considering minimum order quantity",
)
green_zone_oc = fields.Float(
string="Green Zone Order Cycle",
compute="_compute_green_zone",
store=True,
help="Green zone qty option considering desired Order Cycle",
)
yellow_zone_qty = fields.Float(
compute="_compute_yellow_zone",
digits="Product Unit of Measure",
store=True,
)
top_of_yellow = fields.Float(
compute="_compute_yellow_zone",
digits="Product Unit of Measure",
store=True,
)
top_of_green = fields.Float(
compute="_compute_green_zone",
digits="Product Unit of Measure",
store=True,
)
order_spike_horizon = fields.Float()
order_spike_threshold = fields.Float(
compute="_compute_order_spike_threshold",
digits="Product Unit of Measure",
store=True,
)
qualified_demand = fields.Float(
digits="Product Unit of Measure",
readonly=True,
)
qualified_demand_stock_move_ids = fields.Many2many(
comodel_name="stock.move",
)
qualified_demand_mrp_move_ids = fields.Many2many(
comodel_name="mrp.move",
)
incoming_total_qty = fields.Float(
string="Total Incoming",
readonly=True,
)
incoming_dlt_qty = fields.Float(
string="Incoming (Within DLT)",
readonly=True,
)
incoming_outside_dlt_qty = fields.Float(
string="Incoming (Outside DLT)",
readonly=True,
)
rfq_outside_dlt_qty = fields.Float(
string="RFQ Qty (Outside DLT)",
readonly=True,
help="Request for Quotation total quantity that is planned outside of "
"the DLT horizon.",
)
rfq_inside_dlt_qty = fields.Float(
string="RFQ Qty (Inside DLT)",
readonly=True,
help="Request for Quotation total quantity that is planned inside of "
"the DLT horizon.",
)
rfq_total_qty = fields.Float(
string="RFQ Total Qty",
readonly=True,
help="Request for Quotation total quantity that is planned",
)
net_flow_position = fields.Float(
digits="Product Unit of Measure",
readonly=True,
)
net_flow_position_percent = fields.Float(
string="Net flow position (% of TOG)",
readonly=True,
)
planning_priority_level = fields.Selection(
selection=_PRIORITY_LEVEL,
readonly=True,
)
execution_priority_level = fields.Selection(
string="On-Hand Alert Level",
selection=_PRIORITY_LEVEL,
store=True,
readonly=True,
)
on_hand_percent = fields.Float(
string="On Hand/TOR (%)",
store=True,
readonly=True,
)
on_hand_target_position = fields.Float(
string="Avg On Hand Target Position",
help="It denotes what the target stock on hand is. The "
"computation is: OH Target = TOR + Green Zone / 2 ",
compute="_compute_on_hand_target_position",
)
on_hand_target_max = fields.Float(
string="Target On Hand (Max)",
help="It denotes how far you are on average from the target",
compute="_compute_on_hand_target_max",
)
on_hand_target_min = fields.Float(
related="top_of_red",
string="On Hand Target Range",
help="It denotes what the target stock on hand range is.",
)
mrp_production_ids = fields.One2many(
string="Manufacturing Orders",
comodel_name="mrp.production",
inverse_name="buffer_id",
)
ddmrp_chart = fields.Text(
string="DDMRP Chart",
compute=_compute_ddmrp_chart_planning,
)
ddmrp_chart_execution = fields.Text(
string="DDMRP Execution Chart", compute=_compute_ddmrp_chart_execution
)
show_execution_chart = fields.Boolean()
ddmrp_demand_chart = fields.Text(
string="DDMRP Demand Chart",
compute="_compute_ddmrp_demand_supply_chart",
)
ddmrp_supply_chart = fields.Text(
string="DDMRP Supply Chart",
compute="_compute_ddmrp_demand_supply_chart",
)
auto_procure = fields.Boolean(
default=False,
help="Whenever the buffer is recomputed, if this option is set, it "
"will procure automatically if needed.",
)
auto_procure_option = fields.Selection(
selection=[
("standard", "When recommended (NFP below TOY)"),
("stockout", "When in stockout"),
],
default="standard",
required=True,
)
extra_lead_time = fields.Float(
string="Extra lead time (for Sizing)",
default=0.0,
help="When defined, this lead time will be added to the decoupled "
"lead time for the computation of the zones size (it won't affect "
"planned date for procurements).\n"
"This is particularly useful in situations of infrequent but "
"periodic demand. E.g. We receive a large order every 30 days, "
"whereas the supplier takes 10 days to supply. \n"
"In this case the yellow zone must cover for the "
"entire cycle of 30 days of demand.\n"
"In situations with infrequent demand the ADU tends to be very"
" small, and every new order would be treated as a spike, when \n"
"in reality this is not an exceptional situation.",
)
distributed_source_location_id = fields.Many2one(
string="Replenishment Location",
comodel_name="stock.location",
readonly=True,
index=True,
help="Source location from where goods will be replenished. "
"Computed when buffer is refreshed from following the Stock Rules.",
)
distributed_source_location_qty = fields.Float(
string="Source Location Free Quantity (distributed)",
compute="_compute_distributed_source_location_qty",
search="_search_distributed_source_location_qty",
help="Quantity free for distributed buffer in the source location. "
"When a procurement is requested, if the option is active on the profile,"
" it will be limited to this quantity.",
)
@api.depends(
"top_of_green",
"top_of_yellow",
"top_of_red",
)
def _compute_on_hand_target_position(self):
for rec in self:
green_zone_size = rec.top_of_green - rec.top_of_yellow
rec.on_hand_target_position = rec.top_of_red + green_zone_size / 2
@api.depends(
"top_of_green",
"top_of_yellow",
"top_of_red",
)
def _compute_on_hand_target_max(self):
for rec in self:
green_zone_size = rec.top_of_green - rec.top_of_yellow
rec.on_hand_target_max = rec.top_of_red + green_zone_size
@api.depends(
"distributed_source_location_id",
"distributed_source_location_id.quant_ids.quantity",
"distributed_source_location_id.quant_ids.reserved_quantity",
)
def _compute_distributed_source_location_qty(self):
to_compute_per_location = {}
for record in self:
location = record.distributed_source_location_id
if not location:
record.distributed_source_location_qty = 0.0
continue
to_compute_per_location.setdefault(location.id, set())
to_compute_per_location[location.id].add(record.id)
# batch computation per location
for location_id, buffer_ids in to_compute_per_location.items():
buffers = self.browse(buffer_ids).with_context(location=location_id)
for buf in buffers:
buf.distributed_source_location_qty = buf.product_id.free_qty
def _search_distributed_source_location_qty(self, operator, value):
if operator not in OPERATORS:
raise exceptions.UserError(_("Unsupported operator %s") % (operator,))
buffers = self.search([("distributed_source_location_id", "!=", False)])
operator_func = OPERATORS[operator]
buffers = buffers.filtered(
lambda buf: operator_func(buf.distributed_source_location_qty, value)
)
return [("id", "in", buffers.ids)]
def _search_open_stock_moves_domain(self):
self.ensure_one()
return [
("product_id", "=", self.product_id.id),
(
"state",
"in",
["draft", "waiting", "confirmed", "partially_available", "assigned"],
),
]
@api.model
def _stock_move_tree_view(self, lines):
views = []
tree_view = self.env.ref("stock.view_move_tree", False)
if tree_view:
views += [(tree_view.id, "tree")]
form_view = self.env.ref("stock.view_move_form", False)
if form_view:
views += [(form_view.id, "form")]
return {
"name": _("Non-completed Moves"),
"type": "ir.actions.act_window",
"res_model": "stock.move",
"view_type": "form",
"views": views,
"view_mode": "tree,form",
"domain": str([("id", "in", lines.ids)]),
}
def _get_horizon_adu_past_demand(self):
return self.adu_calculation_method.horizon_past or 0
def _get_dates_adu_past_demand(self, horizon):
date_from = fields.Date.to_string(
self.warehouse_id.wh_plan_days(datetime.now(), -1 * horizon)
)
date_to = fields.Date.to_string(
self.warehouse_id.wh_plan_days(datetime.now(), -1)
)
return date_from, date_to
def _past_mrp_move_domain(self, date_from, date_to, locations):
self.ensure_one()
return [
("product_id", "=", self.product_id.id),
("mrp_date", "<=", date_to),
("mrp_date", ">=", date_from),
("mrp_area_id.location_id", "in", locations.ids),
("mrp_type", "=", "d"),
("mrp_origin", "in", ["mrp", "mo"]),
]
def _past_moves_domain(self, date_from, date_to, locations):
self.ensure_one()
domain = [
("state", "=", "done"),
("location_id", "in", locations.ids),
("location_dest_id", "not in", locations.ids),
("location_dest_id.usage", "!=", "inventory"),
("product_id", "=", self.product_id.id),
("date", ">=", date_from),
("date", "<=", date_to),
]
if not self.env.company.ddmrp_adu_calc_include_scrap:
domain.append(("location_id.scrap_location", "=", False))
return domain
def _calc_adu_past_demand(self):
self.ensure_one()
horizon = self._get_horizon_adu_past_demand()
# today is excluded to be sure that is a past day and all moves
# for that day are done (or at least the expected date is in the past).
date_from, date_to = self._get_dates_adu_past_demand(horizon)
locations = (
self.env["stock.location"]
.with_context(active_test=False)
.search([("id", "child_of", self.location_id.ids)])
)
qty = 0.0
if self.adu_calculation_method.source_past == "estimates_mrp":
domain = self._past_mrp_move_domain(date_from, date_to, locations)
for mrp_move in self.env["mrp.move"].search(domain):
qty += -mrp_move.mrp_qty
if self.adu_calculation_method.source_past in ["estimates", "estimates_mrp"]:
domain = self._demand_estimate_domain(locations, date_from, date_to)
for estimate in self.env["stock.demand.estimate"].search(domain):
qty += estimate.get_quantity_by_date_range(
fields.Date.from_string(date_from), fields.Date.from_string(date_to)
)
elif self.adu_calculation_method.source_past == "actual":
domain = self._past_moves_domain(date_from, date_to, locations)
for group in self.env["stock.move"].read_group(
domain, ["product_id", "product_qty"], ["product_id"]
):
qty += group["product_qty"]
return qty / horizon
def _get_horizon_adu_future_demand(self):
return self.adu_calculation_method.horizon_future or 1
def _get_dates_adu_future_demand(self, horizon):
date_from = fields.Datetime.now()
date_to = self.warehouse_id.wh_plan_days(date_from, horizon)
date_to = date_to.replace(
hour=date_from.hour,
minute=date_from.minute,
second=date_from.second,
)
return date_from, date_to
def _future_mrp_move_domain(self, date_from, date_to, locations):
self.ensure_one()
return [
("product_id", "=", self.product_id.id),
("mrp_date", "<=", date_to),
("mrp_date", ">=", date_from),
("mrp_area_id.location_id", "in", locations.ids),
("mrp_type", "=", "d"),
("mrp_origin", "in", ["mrp", "mo"]),
]
def _future_moves_domain(self, date_from, date_to, locations):
self.ensure_one()
domain = [
("state", "not in", ["done", "cancel"]),
("location_id", "in", locations.ids),
("location_dest_id", "not in", locations.ids),
("location_dest_id.usage", "!=", "inventory"),
("product_id", "=", self.product_id.id),
("date", ">=", date_from),
("date", "<=", date_to),
]
if not self.env.company.ddmrp_adu_calc_include_scrap:
domain.append(("location_id.scrap_location", "=", False))
return domain
def _calc_adu_future_demand(self):
self.ensure_one()
horizon = self._get_horizon_adu_future_demand()
date_from, date_to = self._get_dates_adu_future_demand(horizon)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
qty = 0.0
if self.adu_calculation_method.source_future == "estimates_mrp":
domain = self._future_mrp_move_domain(date_from, date_to, locations)
for mrp_move in self.env["mrp.move"].search(domain):
qty += -mrp_move.mrp_qty
if self.adu_calculation_method.source_future in ["estimates", "estimates_mrp"]:
domain = self._demand_estimate_domain(locations, date_from, date_to)
for estimate in self.env["stock.demand.estimate"].search(domain):
qty += estimate.get_quantity_by_date_range(
fields.Date.from_string(date_from), fields.Date.from_string(date_to)
)
elif self.adu_calculation_method.source_future == "actual":
domain = self._future_moves_domain(date_from, date_to, locations)
for group in self.env["stock.move"].read_group(
domain, ["product_id", "product_qty"], ["product_id"]
):
qty += group["product_qty"]
return qty / horizon
def _calc_adu_blended(self):
self.ensure_one()
past_comp = self._calc_adu_past_demand()
fp = self.adu_calculation_method.factor_past
future_comp = self._calc_adu_future_demand()
ff = self.adu_calculation_method.factor_future
return past_comp * fp + future_comp * ff
def _calc_adu(self):
for rec in self:
if rec.adu_calculation_method.method == "fixed":
rec.adu = rec.adu_fixed
elif rec.adu_calculation_method.method == "past":
rec.adu = rec._calc_adu_past_demand()
elif rec.adu_calculation_method.method == "future":
rec.adu = rec._calc_adu_future_demand()
elif rec.adu_calculation_method.method == "blended":
rec.adu = rec._calc_adu_blended()
return True
def _search_stock_moves_qualified_demand_domain(self):
self.ensure_one()
horizon = self.order_spike_horizon
date_to = self.warehouse_id.wh_plan_days(datetime.now(), horizon)
return [
("product_id", "=", self.product_id.id),
(
"state",
"in",
["waiting", "confirmed", "partially_available", "assigned"],
),
("date", "<=", date_to),
]
def _search_stock_moves_qualified_demand(self):
domain = self._search_stock_moves_qualified_demand_domain()
moves = self.env["stock.move"].search(domain)
moves = moves.filtered(
lambda move: move.location_id.is_sublocation_of(self.location_id)
and not move.location_dest_id.is_sublocation_of(self.location_id)
)
return moves
def _get_incoming_supply_date_limit(self):
# The safety factor allows to control the date limit
factor = self.warehouse_id.nfp_incoming_safety_factor or 1
horizon = int(self.dlt) * factor
return self._get_date_planned(force_lt=horizon)
def _search_stock_moves_incoming_domain(self, outside_dlt=False):
date_to = self._get_incoming_supply_date_limit()
date_operator = ">" if outside_dlt else "<="
return [
("product_id", "=", self.product_id.id),
(
"state",
"in",
["waiting", "confirmed", "partially_available", "assigned"],
),
("date", date_operator, date_to),
]
def _search_stock_moves_incoming(self, outside_dlt=False):
domain = self._search_stock_moves_incoming_domain(outside_dlt=outside_dlt)
moves = self.env["stock.move"].search(domain)
moves = moves.filtered(
lambda move: not move.location_id.is_sublocation_of(self.location_id)
and move.location_dest_id.is_sublocation_of(self.location_id)
)
return moves
def _get_incoming_by_days(self):
self.ensure_one()
moves = self._search_stock_moves_incoming()
incoming_by_days = {}
move_dates = [dt.date() for dt in moves.mapped("date")]
for move_date in move_dates:
incoming_by_days[move_date] = 0.0
for move in moves:
date = move.date.date()
incoming_by_days[date] += move.product_qty
return incoming_by_days
def _get_demand_by_days(self, moves):
self.ensure_one()
demand_by_days = {}
move_dates = [dt.date() for dt in moves.mapped("date")]
for move_date in move_dates:
demand_by_days[move_date] = 0.0
for move in moves:
date = move.date.date()
demand_by_days[
date
] += move.product_qty - move.product_uom._compute_quantity(
move.reserved_availability, move.product_id.uom_id
)
return demand_by_days
def _search_mrp_moves_qualified_demand_domain(self):
self.ensure_one()
horizon = self.order_spike_horizon
date_to = self.warehouse_id.wh_plan_days(datetime.now(), horizon)
return [
("product_id", "=", self.product_id.id),
("mrp_type", "=", "d"),
("mrp_date", "<=", date_to),
]
def _search_mrp_moves_qualified_demand(self):
domain = self._search_mrp_moves_qualified_demand_domain()
moves = self.env["mrp.move"].search(domain)
moves = moves.filtered(
lambda move: move.mrp_area_id.location_id.is_sublocation_of(
self.location_id
)
)
return moves
def _get_qualified_mrp_moves(self, moves):
self.ensure_one()
mrp_moves_by_days = {}
move_dates = [dt for dt in moves.mapped("mrp_date")]
for move_date in move_dates:
mrp_moves_by_days[move_date] = 0.0
for move in moves:
date = move.mrp_date
mrp_moves_by_days[date] += abs(move.mrp_qty)
return mrp_moves_by_days
def _calc_qualified_demand(self, current_date=False):
today = current_date or fields.date.today()
for rec in self:
qualified_demand = 0.0
moves = rec._search_stock_moves_qualified_demand()
mrp_moves = rec._search_mrp_moves_qualified_demand()
demand_by_days = rec._get_demand_by_days(moves)
mrp_moves_by_days = rec._get_qualified_mrp_moves(mrp_moves)
dates = list(set(demand_by_days.keys()) | set(mrp_moves_by_days.keys()))
for date in dates:
if (
demand_by_days.get(date, 0.0) >= rec.order_spike_threshold
or date <= today
):
qualified_demand += demand_by_days.get(date, 0.0)
else:
moves = moves.filtered(lambda x: x.date != date)
if (
mrp_moves_by_days.get(date, 0.0) >= rec.order_spike_threshold
or date <= today
):
qualified_demand += mrp_moves_by_days.get(date, 0.0)
else:
mrp_moves = mrp_moves.filtered(lambda x: x.mrp_date != date)
rec.qualified_demand = qualified_demand
rec.qualified_demand_stock_move_ids = moves
rec.qualified_demand_mrp_move_ids = mrp_moves
return True
def _calc_incoming_dlt_qty(self):
for rec in self:
moves = self._search_stock_moves_incoming()
rec.incoming_dlt_qty = sum(moves.mapped("product_qty"))
outside_dlt_moves = self._search_stock_moves_incoming(outside_dlt=True)
rec.incoming_outside_dlt_qty = sum(outside_dlt_moves.mapped("product_qty"))
if rec.item_type == "purchased":
pols_outside_dlt = rec._get_rfq_dlt(dlt_interval="outside")
rec.rfq_outside_dlt_qty = sum(pols_outside_dlt.mapped("product_qty"))
pols_inside_dlt = rec._get_rfq_dlt(dlt_interval="inside")
rec.rfq_inside_dlt_qty = sum(pols_inside_dlt.mapped("product_qty"))
rec.rfq_total_qty = rec.rfq_inside_dlt_qty + rec.rfq_outside_dlt_qty
else:
rec.rfq_outside_dlt_qty = 0.0
rec.rfq_inside_dlt_qty = 0.0
rec.rfq_total_qty = 0.0
rec.incoming_total_qty = rec.incoming_dlt_qty + rec.incoming_outside_dlt_qty
return True
def _calc_net_flow_position(self):
for rec in self:
rec.net_flow_position = (
rec.product_location_qty_available_not_res
+ rec.incoming_dlt_qty
- rec.qualified_demand
)
usage = 0.0
if rec.top_of_green:
usage = round((rec.net_flow_position / rec.top_of_green * 100), 2)
rec.net_flow_position_percent = usage
return True
def _calc_distributed_source_location(self):
"""Compute source location used for replenishment of distributed buffer
It follows the rules of the default route until it finds a "Take from
stock" rule. The source location depends on many factors (route on
warehouse, product, category, ...), that's why it is updated only
on refresh of the buffer.
"""
for record in self:
if record.item_type != "distributed":
record.distributed_source_location_id = self.env[
"stock.location"
].browse()
continue
source_location = record._source_location_from_route()
record.distributed_source_location_id = source_location
def _calc_planning_priority(self):
for rec in self:
if rec.net_flow_position >= rec.top_of_yellow:
rec.planning_priority_level = "3_green"
elif rec.net_flow_position >= rec.top_of_red:
rec.planning_priority_level = "2_yellow"
else:
rec.planning_priority_level = "1_red"
def _calc_execution_priority(self):
for rec in self:
if rec.product_location_qty_available_not_res >= rec.top_of_red:
rec.execution_priority_level = "3_green"
elif rec.product_location_qty_available_not_res >= rec.top_of_red * 0.5:
rec.execution_priority_level = "2_yellow"
else:
rec.execution_priority_level = "1_red"
if rec.top_of_red:
rec.on_hand_percent = round(
(
(rec.product_location_qty_available_not_res / rec.top_of_red)
* 100
),
2,
)
else:
rec.on_hand_percent = 0.0
@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
if not self.env.context.get("skip_adu_calculation", False):
records._calc_adu()
records._calc_product_available_qty()
records._calc_distributed_source_location()
return records
def write(self, vals):
res = super().write(vals)
if any(f in vals for f in ("adu_fixed", "adu_calculation_method")):
self._calc_adu()
return res
def _procure_qty_to_order(self):
qty_to_order = self.procure_recommended_qty
rounding = self.procure_uom_id.rounding or self.product_uom.rounding
qty_in_progress = self._quantity_in_progress()[self._origin.id]
if (
self.item_type == "distributed"
and self.buffer_profile_id.replenish_distributed_limit_to_free_qty
):
# If we don't procure more than what we have in stock, we prevent
# backorders on the replenishment
if (
float_compare(
self.distributed_source_location_qty,
self.procure_min_qty,
precision_rounding=rounding,
)
< 0
):
# the free qty is below the minimum we want to move, do not
# move anything
return 0
else:
# move only what we have in stock
return min(qty_to_order, self.distributed_source_location_qty)
elif (
float_compare(qty_in_progress, 0, precision_rounding=rounding) > 0
and float_compare(
qty_to_order, self.green_zone_qty, precision_rounding=rounding
)
< 0
):
# When there is qty in progress (e.g. RfQ sent), do not keep
# auto-procuring small quantities, wait for the qty to be at least GZ.
return 0
return qty_to_order
def do_auto_procure(self):
if not self.auto_procure:
return False
rounding = self.product_uom.rounding
qty_to_order = self._procure_qty_to_order()
if float_compare(qty_to_order, 0.0, precision_rounding=rounding) > 0 and (
(
self.auto_procure_option == "stockout"
and float_compare(
self.net_flow_position, 0.0, precision_rounding=rounding
)
< 0
)
or self.auto_procure_option == "standard"
):
wizard = (
self.env["make.procurement.buffer"]
.with_context(
active_model="stock.buffer", active_ids=self.ids, active_id=self.id
)
.create({})
)
wizard.make_procurement()
return True
def action_view_supply_moves(self):
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
result["context"] = {}
moves = self._search_stock_moves_incoming() + self._search_stock_moves_incoming(
outside_dlt=True
)
result["domain"] = [("id", "in", moves.ids)]
return result
def _get_rfq_dlt(self, dlt_interval=None):
self.ensure_one()
cut_date = self._get_incoming_supply_date_limit()
if dlt_interval == "inside":
pols = self.purchase_line_ids.filtered(
lambda l: l.date_planned <= fields.Datetime.to_datetime(cut_date)
and l.state in ("draft", "sent", "to approve")
)
elif dlt_interval == "outside":
pols = self.purchase_line_ids.filtered(
lambda l: l.date_planned > fields.Datetime.to_datetime(cut_date)
and l.state in ("draft", "sent", "to approve")
)
else:
pols = self.purchase_line_ids.filtered(
lambda l: l.state in ("draft", "sent", "to approve")
)
return pols
def action_view_supply_moves_inside_dlt_window(self):
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
moves = self._search_stock_moves_incoming()
result["context"] = {}
result["domain"] = [("id", "in", moves.ids)]
return result
def action_view_supply_moves_outside_dlt_window(self):
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
moves = self._search_stock_moves_incoming(outside_dlt=True)
result["context"] = {}
result["domain"] = [("id", "in", moves.ids)]
return result
def action_view_supply_rfq_inside_dlt_window(self):
result = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
pols = self._get_rfq_dlt(dlt_interval="inside")
pos = pols.mapped("order_id")
result["context"] = {}
result["domain"] = [("id", "in", pos.ids)]
return result
def action_view_supply_rfq_outside_dlt_window(self):
result = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
pols = self._get_rfq_dlt(dlt_interval="outside")
pos = pols.mapped("order_id")
result["context"] = {}
result["domain"] = [("id", "in", pos.ids)]
return result
def action_view_qualified_demand_moves(self):
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
result["context"] = {}
result["domain"] = [("id", "in", self.qualified_demand_stock_move_ids.ids)]
return result
def action_view_qualified_demand_mrp(self):
result = self.env["ir.actions.actions"]._for_xml_id(
"mrp_multi_level.mrp_move_action"
)
result["context"] = {}
result["domain"] = [("id", "in", self.qualified_demand_mrp_move_ids.ids)]
return result
def action_view_past_adu_direct_demand(self):
horizon = self._get_horizon_adu_past_demand()
date_from, date_to = self._get_dates_adu_past_demand(horizon)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
if self.adu_calculation_method.source_past == "actual":
domain = self._past_moves_domain(date_from, date_to, locations)
moves = self.env["stock.move"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"stock.stock_move_action"
)
result["context"] = {}
result["domain"] = [("id", "in", moves.ids)]
else:
domain = self._demand_estimate_domain(locations, date_from, date_to)
estimates = self.env["stock.demand.estimate"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"stock_demand_estimate.stock_demand_estimate_action"
)
result["context"] = {}
result["domain"] = [("id", "in", estimates.ids)]
return result
def action_view_past_adu_indirect_demand(self):
horizon = self._get_horizon_adu_past_demand()
date_from, date_to = self._get_dates_adu_past_demand(horizon)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
domain = self._past_mrp_move_domain(date_from, date_to, locations)
mrp_moves = self.env["mrp.move"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"mrp_multi_level.mrp_move_action"
)
result["context"] = {}
result["domain"] = [("id", "in", mrp_moves.ids)]
return result
def action_view_future_adu_direct_demand(self):
horizon = self._get_horizon_adu_future_demand()
date_from, date_to = self._get_dates_adu_future_demand(horizon)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
if self.adu_calculation_method.source_future == "actual":
domain = self._future_moves_domain(date_from, date_to, locations)
moves = self.env["stock.move"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"stock.stock_move_action"
)
result["context"] = {}
result["domain"] = [("id", "in", moves.ids)]
else:
domain = self._demand_estimate_domain(locations, date_from, date_to)
estimates = self.env["stock.demand.estimate"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"stock_demand_estimate.stock_demand_estimate_action"
)
result["context"] = {}
result["domain"] = [("id", "in", estimates.ids)]
return result
def action_view_future_adu_indirect_demand(self):
horizon = self._get_horizon_adu_future_demand()
date_from, date_to = self._get_dates_adu_future_demand(horizon)
locations = self.env["stock.location"].search(
[("id", "child_of", [self.location_id.id])]
)
domain = self._future_mrp_move_domain(date_from, date_to, locations)
mrp_moves = self.env["mrp.move"].search(domain)
result = self.env["ir.actions.actions"]._for_xml_id(
"mrp_multi_level.mrp_move_action"
)
result["context"] = {}
result["domain"] = [("id", "in", mrp_moves.ids)]
return result
def action_request_procurement(self):
action = self.env["ir.actions.actions"]._for_xml_id(
"ddmrp.act_make_procurement_from_buffer"
)
return action
@api.model
def cron_ddmrp_adu(self, automatic=False):
"""calculate ADU for each DDMRP buffer. Called by cronjob."""
auto_commit = not getattr(threading.current_thread(), "testing", False)
_logger.info("Start cron_ddmrp_adu.")
buffer_ids = self.search([]).ids
i = 0
j = len(buffer_ids)
for buffer_chunk_ids in split_every(self.CRON_DDMRP_CHUNKS, buffer_ids):
for b in self.browse(buffer_chunk_ids).exists():
try:
i += 1
_logger.debug("ddmrp cron_adu: {}. ({}/{})".format(b.name, i, j))
if automatic:
with self.env.cr.savepoint():
b._calc_adu()
else:
b._calc_adu()
except Exception:
_logger.exception("Fail to compute ADU for buffer %s", b.name)
if not automatic:
raise
if auto_commit:
self._cr.commit() # pylint: disable=E8102
_logger.info("End cron_ddmrp_adu.")
return True
def refresh_buffer(self):
self.ensure_one()
self.cron_actions(only_nfp=False)
return True
def cron_actions(self, only_nfp=False):
"""This method is meant to be inherited by other modules in order to
enhance extensibility."""
self.ensure_one()
self.invalidate_recordset(
fnames=[
"product_location_qty_available_not_res",
"dlt",
"distributed_source_location_qty",
"qualified_demand",
],
)
self._calc_product_available_qty()
if not only_nfp or only_nfp == "out":
self._calc_qualified_demand()
if not only_nfp or only_nfp == "in":
self._calc_incoming_dlt_qty()
self._calc_net_flow_position()
self._calc_distributed_source_location()
self._calc_planning_priority()
self._calc_execution_priority()
self.mrp_production_ids._calc_execution_priority()
self.mapped("purchase_line_ids")._calc_execution_priority()
if not only_nfp:
# re-compoute red to force in cascade the recalculation of zones.
self._compute_red_zone()
self.do_auto_procure()
return True
@api.model
def cron_ddmrp(self, automatic=False):
"""Calculate key DDMRP parameters for each buffer.
Called by cronjob."""
auto_commit = not getattr(threading.current_thread(), "testing", False)
_logger.info("Start cron_ddmrp.")
buffer_ids = self.search([]).ids
i = 0
j = len(buffer_ids)
for buffer_chunk_ids in split_every(self.CRON_DDMRP_CHUNKS, buffer_ids):
for b in self.browse(buffer_chunk_ids).exists():
i += 1
_logger.debug("ddmrp cron: {}. ({}/{})".format(b.name, i, j))
try:
if automatic:
with self.env.cr.savepoint():
b.cron_actions()
else:
b.cron_actions()
except Exception:
_logger.exception("Fail updating buffer %s", b.name)
if not automatic:
raise
if auto_commit:
self._cr.commit() # pylint: disable=E8102
_logger.info("End cron_ddmrp.")
return True
def _values_source_location_from_route(self):
return {"warehouse_id": self.warehouse_id}
def _source_location_from_route(self, procure_location=None):
"""Return the replenishment source location for distributed buffers"""
current_location = procure_location or self.location_id
rule_values = self._values_source_location_from_route()
while current_location:
rule = self.env["procurement.group"]._get_rule(
self.product_id, current_location, rule_values
)
if rule.procure_method == "make_to_stock":
return rule.location_src_id
elif rule.procure_method == "make_to_order":
# If resupply from another warehouse, this rule can't be retrieved by
# method _get_rule, because that we try to get this rule bases on previous rule
pull_rule = self.env["stock.rule"].search(
[
("action", "in", ("pull", "pull_push")),
("route_id", "=", rule.route_id.id),
("location_dest_id", "=", rule.location_src_id.id),
]
)
if pull_rule:
if pull_rule.procure_method in ("make_to_stock", "mts_else_mto"):
return pull_rule.location_src_id
elif pull_rule.procure_method == "make_to_order":
current_location = pull_rule.location_src_id
rule_values.update(
{
"warehouse_id": pull_rule.location_src_id.warehouse_id,
}
)
continue
current_location = rule.location_src_id
def action_dummy(self):
# no action, used to show an image in the tree view
return