mirror of
https://github.com/bringout/oca-technical.git
synced 2026-04-20 02:32:07 +02:00
2127 lines
78 KiB
Python
2127 lines
78 KiB
Python
# Copyright 2019-20 ForgeFlow S.L. (http://www.forgeflow.com)
|
|
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl.html).
|
|
|
|
|
|
import json
|
|
import logging
|
|
import operator as py_operator
|
|
import threading
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from math import pi
|
|
|
|
from odoo import _, api, exceptions, fields, models
|
|
from odoo.exceptions import ValidationError
|
|
from odoo.tools import float_compare, float_round
|
|
from odoo.tools.misc import split_every
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
try:
|
|
from bokeh.embed import components
|
|
from bokeh.models import (
|
|
ColumnDataSource,
|
|
DatetimeTickFormatter,
|
|
HoverTool,
|
|
LabelSet,
|
|
Legend,
|
|
)
|
|
from bokeh.plotting import figure
|
|
from bokeh.util.serialization import convert_datetime_type
|
|
except (ImportError, IOError) as err:
|
|
_logger.debug(err)
|
|
|
|
|
|
OPERATORS = {
|
|
"<": py_operator.lt,
|
|
">": py_operator.gt,
|
|
"<=": py_operator.le,
|
|
">=": py_operator.ge,
|
|
"==": py_operator.eq,
|
|
"=": py_operator.eq,
|
|
"!=": py_operator.ne,
|
|
}
|
|
|
|
|
|
_PRIORITY_LEVEL = [("1_red", "Red"), ("2_yellow", "Yellow"), ("3_green", "Green")]
|
|
|
|
DDMRP_COLOR = {
|
|
"0_dark_red": "#8B0000",
|
|
"1_red": "#ff0000",
|
|
"2_yellow": "#ffff00",
|
|
"3_green": "#33cc33",
|
|
}
|
|
|
|
|
|
class StockBuffer(models.Model):
|
|
_name = "stock.buffer"
|
|
_description = "Stock Buffer"
|
|
_order = "planning_priority_level asc, net_flow_position_percent asc"
|
|
|
|
CRON_DDMRP_CHUNKS = 50
|
|
|
|
@api.model
|
|
def default_get(self, fields):
|
|
res = super().default_get(fields)
|
|
warehouse = None
|
|
if "warehouse_id" not in res and res.get("company_id"):
|
|
warehouse = self.env["stock.warehouse"].search(
|
|
[("company_id", "=", res["company_id"])], limit=1
|
|
)
|
|
if warehouse:
|
|
res["warehouse_id"] = warehouse.id
|
|
res["location_id"] = warehouse.lot_stock_id.id
|
|
return res
|
|
|
|
name = fields.Char(
|
|
copy=False,
|
|
required=True,
|
|
default=lambda self: self.env["ir.sequence"].next_by_code("stock.buffer"),
|
|
)
|
|
active = fields.Boolean(default=True)
|
|
warehouse_id = fields.Many2one(
|
|
comodel_name="stock.warehouse",
|
|
string="Warehouse",
|
|
ondelete="cascade",
|
|
required=True,
|
|
)
|
|
location_id = fields.Many2one(
|
|
comodel_name="stock.location",
|
|
string="Location",
|
|
ondelete="cascade",
|
|
required=True,
|
|
)
|
|
product_id = fields.Many2one(
|
|
comodel_name="product.product",
|
|
string="Product",
|
|
domain=[("type", "=", "product")],
|
|
ondelete="cascade",
|
|
required=True,
|
|
)
|
|
product_uom = fields.Many2one(
|
|
related="product_id.uom_id",
|
|
)
|
|
product_categ_id = fields.Many2one(
|
|
comodel_name="product.category",
|
|
string="Product Category",
|
|
related="product_id.categ_id",
|
|
store=True,
|
|
readonly=True,
|
|
)
|
|
# TODO: fix in method _compute_procure_recommended_qty.
|
|
# not sure maybe they are useful for tweak batches like in multi level mrp
|
|
procure_min_qty = fields.Float(
|
|
string="Minimum Procure Batch",
|
|
digits="Product Unit of Measure",
|
|
help="Minimum qty for a single procurement",
|
|
)
|
|
procure_max_qty = fields.Float(
|
|
string="Maximum Procure Batch",
|
|
digits="Product Unit of Measure",
|
|
help="Maximum qty for a single procurement",
|
|
)
|
|
qty_multiple = fields.Float(
|
|
digits="Product Unit of Measure",
|
|
default=1,
|
|
required=True,
|
|
help="The procurement quantity will be rounded up to this multiple. "
|
|
"If it is 0, the exact quantity will be used.",
|
|
)
|
|
group_id = fields.Many2one(
|
|
comodel_name="procurement.group",
|
|
string="Procurement Group",
|
|
copy=False,
|
|
help="Moves created through this buffer will be put in this "
|
|
"procurement group. If none is given, the moves generated by "
|
|
"stock rules will be grouped into one big picking.",
|
|
)
|
|
company_id = fields.Many2one(
|
|
comodel_name="res.company",
|
|
string="Company",
|
|
required=True,
|
|
default=lambda self: self.env.company,
|
|
)
|
|
# TODO: rename to manual LT ??
|
|
lead_days = fields.Integer(
|
|
"Lead Time (Distributed)",
|
|
default=1,
|
|
help="Lead time for distributed products.",
|
|
)
|
|
|
|
_sql_constraints = [
|
|
(
|
|
"qty_multiple_check",
|
|
"CHECK( qty_multiple >= 0 )",
|
|
"Qty Multiple must be greater than or equal to zero.",
|
|
),
|
|
(
|
|
"stock_buffer_uniq",
|
|
"unique(product_id, location_id)",
|
|
"The product/location combination must be unique."
|
|
"Remember that the buffer could be archived.",
|
|
),
|
|
]
|
|
|
|
def _quantity_in_progress(self):
|
|
"""Return Quantities that are not yet in virtual stock but should
|
|
be deduced from buffers (example: purchases created from buffers)"""
|
|
res = {}.fromkeys(self.ids, 0.0)
|
|
for buffer in self:
|
|
polines = buffer._get_rfq_dlt(dlt_interval=None)
|
|
for line in polines:
|
|
res[buffer.id] += line.product_uom._compute_quantity(
|
|
line.product_qty, buffer.product_uom, round=False
|
|
)
|
|
return res
|
|
|
|
def action_view_purchase(self):
|
|
action = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
|
|
# Remove the context since the action basically display RFQ and not PO.
|
|
action["context"] = {}
|
|
order_line_ids = self.env["purchase.order.line"].search(
|
|
[("buffer_ids", "in", self.ids)]
|
|
)
|
|
purchase_ids = order_line_ids.mapped("order_id")
|
|
action["domain"] = [("id", "in", purchase_ids.ids)]
|
|
return action
|
|
|
|
def action_view_yearly_consumption(self):
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"ddmrp.stock_move_year_consumption_action"
|
|
)
|
|
locations = (
|
|
self.env["stock.location"]
|
|
.with_context(active_test=False)
|
|
.search([("id", "child_of", self.location_id.ids)])
|
|
)
|
|
date_to = fields.Date.today()
|
|
# We take last five years, even though they will be initially
|
|
# filtered in the action to show only last year.
|
|
date_from = date_to - timedelta(days=5 * 365)
|
|
action["domain"] = self._past_moves_domain(date_from, date_to, locations)
|
|
return action
|
|
|
|
def _demand_estimate_domain(self, locations, date_from=False, date_to=False):
|
|
self.ensure_one()
|
|
domain = [
|
|
("location_id", "in", locations.ids),
|
|
("product_id", "=", self.product_id.id),
|
|
]
|
|
if date_to:
|
|
domain += [("date_from", "<=", date_to)]
|
|
if date_from:
|
|
domain += [("date_to", ">=", date_from)]
|
|
return domain
|
|
|
|
def action_view_stock_demand_estimates(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"stock_demand_estimate.stock_demand_estimate_action"
|
|
)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
domain = self._demand_estimate_domain(locations)
|
|
recs = self.env["stock.demand.estimate"].search(domain)
|
|
result["domain"] = [("id", "in", recs.ids)]
|
|
return result
|
|
|
|
def action_view_bom(self):
|
|
action = self.product_id.action_view_bom()
|
|
boms = self._get_manufactured_bom(limit=100)
|
|
action["domain"] = [("id", "in", boms.ids)]
|
|
action["context"] = {
|
|
"location_id": self.location_id.id,
|
|
"warehouse_id": self.location_id.warehouse_id.id,
|
|
}
|
|
return action
|
|
|
|
@api.constrains("product_id")
|
|
def _check_product_uom(self):
|
|
if any(
|
|
buffer.product_id.uom_id.category_id != buffer.product_uom.category_id
|
|
for buffer in self
|
|
):
|
|
raise ValidationError(
|
|
_(
|
|
"You have to select a product unit of measure that is in"
|
|
"the same category than the default unit of"
|
|
"measure of the product"
|
|
)
|
|
)
|
|
|
|
@api.onchange("warehouse_id")
|
|
def onchange_warehouse_id(self):
|
|
if self.warehouse_id:
|
|
self.location_id = self.warehouse_id.lot_stock_id.id
|
|
|
|
@api.onchange("product_id")
|
|
def onchange_product_id(self):
|
|
if self.product_id:
|
|
self.product_uom = self.product_id.uom_id.id
|
|
return {
|
|
"domain": {
|
|
"product_uom": [
|
|
("category_id", "=", self.product_id.uom_id.category_id.id)
|
|
]
|
|
}
|
|
}
|
|
return {"domain": {"product_uom": []}}
|
|
|
|
def _prepare_procurement_values(
|
|
self,
|
|
product_qty,
|
|
date=False,
|
|
group=False,
|
|
):
|
|
"""Prepare specific key for moves or other components that will be
|
|
created from a stock rule comming from a buffer. This method could
|
|
be override in order to add other custom key that could
|
|
be used in move/po creation.
|
|
"""
|
|
return {
|
|
"date_planned": date or self._get_date_planned(),
|
|
"warehouse_id": self.warehouse_id,
|
|
"buffer_id": self,
|
|
"company_id": self.company_id,
|
|
"group_id": group or self.group_id,
|
|
}
|
|
|
|
# MANUAL PROCUREMENT AND UOM
|
|
|
|
def _get_date_planned(self, force_lt=None):
|
|
self.ensure_one()
|
|
profile = self.buffer_profile_id
|
|
dlt = int(self.dlt)
|
|
if force_lt and isinstance(force_lt, (int, float)):
|
|
dlt = force_lt
|
|
if profile.item_type == "distributed":
|
|
max_proc_time = profile.distributed_reschedule_max_proc_time
|
|
else:
|
|
max_proc_time = 0
|
|
# For purchased items we always consider calendar days,
|
|
# not work days.
|
|
if profile.item_type == "purchased":
|
|
dt_planned = fields.datetime.today() + timedelta(days=dlt)
|
|
else:
|
|
if self.warehouse_id.calendar_id:
|
|
dt_planned = self.warehouse_id.wh_plan_days(fields.datetime.now(), dlt)
|
|
if max_proc_time:
|
|
calendar = self.warehouse_id.calendar_id
|
|
# We found the day with "wh_plan_day", now determine
|
|
# the first available hour in the day (wh_plan_day returns
|
|
# the stop hour), and add the procurement time.
|
|
dt_planned = calendar.plan_hours(
|
|
# expect hours
|
|
max_proc_time / 60,
|
|
# start from the first working hours available
|
|
dt_planned.replace(hour=0, minute=0, second=0),
|
|
)
|
|
|
|
else:
|
|
dt_planned = (
|
|
fields.datetime.now()
|
|
+ timedelta(days=dlt)
|
|
+ timedelta(minutes=max_proc_time)
|
|
)
|
|
return dt_planned
|
|
|
|
procure_recommended_qty = fields.Float(
|
|
string="Procure Recommendation",
|
|
compute="_compute_procure_recommended_qty",
|
|
store=True,
|
|
)
|
|
procure_uom_id = fields.Many2one(
|
|
comodel_name="uom.uom",
|
|
string="Procurement UoM",
|
|
compute="_compute_procure_uom_id",
|
|
readonly=False,
|
|
store=True,
|
|
)
|
|
|
|
@api.constrains("product_id", "procure_uom_id")
|
|
def _check_procure_uom(self):
|
|
if any(
|
|
buffer.product_uom
|
|
and buffer.procure_uom_id
|
|
and buffer.product_uom.category_id != buffer.procure_uom_id.category_id
|
|
for buffer in self
|
|
):
|
|
raise ValidationError(
|
|
_(
|
|
"Error: The product default Unit of Measure and the "
|
|
"procurement Unit of Measure must be in the same category."
|
|
)
|
|
)
|
|
|
|
# STOCK INFORMATION:
|
|
|
|
product_location_qty_available_not_res = fields.Float(
|
|
string="Quantity On Hand (Unreserved)",
|
|
help="Quantity available in this stock buffer, this is the total "
|
|
"quantity on hand minus the outgoing reservations.",
|
|
readonly=True,
|
|
)
|
|
|
|
def _get_outgoing_reservation_qty(self):
|
|
"""Return the qty reserved in operations that move products outside
|
|
of the buffer in the UoM of the product."""
|
|
domain = [
|
|
("product_id", "=", self.product_id.id),
|
|
("state", "in", ("partially_available", "assigned")),
|
|
]
|
|
lines = self.env["stock.move.line"].search(domain)
|
|
lines = lines.filtered(
|
|
lambda line: line.location_id.is_sublocation_of(self.location_id)
|
|
and not line.location_dest_id.is_sublocation_of(self.location_id)
|
|
)
|
|
return sum(lines.mapped("reserved_qty"))
|
|
|
|
def _update_quantities_dict(self, product):
|
|
self.ensure_one()
|
|
reserved_qty = self._get_outgoing_reservation_qty()
|
|
self.update(
|
|
{
|
|
"product_location_qty_available_not_res": product["qty_available"]
|
|
- reserved_qty,
|
|
}
|
|
)
|
|
|
|
def _calc_product_available_qty(self):
|
|
operation_by_location = defaultdict(lambda: self.browse())
|
|
for rec in self:
|
|
operation_by_location[rec.location_id] |= rec
|
|
for location_id, buffer_in_location in operation_by_location.items():
|
|
products = (
|
|
buffer_in_location.mapped("product_id")
|
|
.with_context(location=location_id.id)
|
|
._compute_quantities_dict(
|
|
lot_id=self.env.context.get("lot_id"),
|
|
owner_id=self.env.context.get("owner_id"),
|
|
package_id=self.env.context.get("package_id"),
|
|
)
|
|
)
|
|
for buffer in buffer_in_location:
|
|
product = products[buffer.product_id.id]
|
|
buffer._update_quantities_dict(product)
|
|
|
|
# PURCHASES LINK:
|
|
|
|
purchase_line_ids = fields.Many2many(
|
|
comodel_name="purchase.order.line",
|
|
string="Purchase Order Lines",
|
|
copy=False,
|
|
readonly=True,
|
|
)
|
|
|
|
# MRP LINK:
|
|
|
|
def action_view_mrp_productions(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("mrp.mrp_production_action")
|
|
result["context"] = {}
|
|
mrp_production_ids = self.env["mrp.production"].search(
|
|
[("buffer_id", "=", self.id)]
|
|
)
|
|
result["domain"] = [("id", "in", mrp_production_ids.ids)]
|
|
return result
|
|
|
|
product_type = fields.Selection(related="product_id.type", readonly=True)
|
|
used_in_bom_count = fields.Integer(related="product_id.used_in_bom_count")
|
|
|
|
def action_used_in_bom(self):
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id("mrp.mrp_bom_form_action")
|
|
action["domain"] = [("bom_line_ids.product_id", "=", self.product_id.id)]
|
|
return action
|
|
|
|
# DDMRP SPECIFIC:
|
|
|
|
@api.depends(
|
|
"dlt",
|
|
"extra_lead_time",
|
|
"adu",
|
|
"buffer_profile_id.lead_time_id.factor",
|
|
"red_override",
|
|
"buffer_profile_id.variability_id.factor",
|
|
"product_uom.rounding",
|
|
"lead_days",
|
|
"product_id.seller_ids.delay",
|
|
)
|
|
def _compute_red_zone(self):
|
|
for rec in self:
|
|
if rec.product_id and rec.replenish_method in ["replenish", "min_max"]:
|
|
dlt = rec.dlt + rec.extra_lead_time
|
|
rec.red_base_qty = float_round(
|
|
dlt * rec.adu * rec.buffer_profile_id.lead_time_id.factor,
|
|
precision_rounding=rec.product_uom.rounding,
|
|
)
|
|
rec.red_safety_qty = float_round(
|
|
rec.red_base_qty * rec.buffer_profile_id.variability_id.factor,
|
|
precision_rounding=rec.product_uom.rounding,
|
|
)
|
|
rec.red_zone_qty = rec.red_base_qty + rec.red_safety_qty
|
|
elif rec.product_id and rec.replenish_method == "replenish_override":
|
|
rec.red_zone_qty = rec.red_override
|
|
else:
|
|
rec.red_zone_qty = 0.0
|
|
|
|
@api.depends(
|
|
"dlt",
|
|
"extra_lead_time",
|
|
"adu",
|
|
"buffer_profile_id.lead_time_id.factor",
|
|
"order_cycle",
|
|
"minimum_order_quantity",
|
|
"product_uom.rounding",
|
|
"green_override",
|
|
"top_of_yellow",
|
|
)
|
|
def _compute_green_zone(self):
|
|
for rec in self:
|
|
if rec.product_id and rec.replenish_method in ["replenish", "min_max"]:
|
|
# Using imposed or desired minimum order cycle
|
|
rec.green_zone_oc = float_round(
|
|
rec.order_cycle * rec.adu,
|
|
precision_rounding=rec.product_uom.rounding,
|
|
)
|
|
# Using lead time factor
|
|
dlt = rec.dlt + rec.extra_lead_time
|
|
rec.green_zone_lt_factor = float_round(
|
|
dlt * rec.adu * rec.buffer_profile_id.lead_time_id.factor,
|
|
precision_rounding=rec.product_uom.rounding,
|
|
)
|
|
# Using minimum order quantity
|
|
rec.green_zone_moq = float_round(
|
|
rec.minimum_order_quantity,
|
|
precision_rounding=rec.product_uom.rounding,
|
|
)
|
|
|
|
# The biggest option of the above will be used as the green
|
|
# zone value
|
|
rec.green_zone_qty = max(
|
|
rec.green_zone_oc, rec.green_zone_lt_factor, rec.green_zone_moq
|
|
)
|
|
elif rec.product_id and rec.replenish_method == "replenish_override":
|
|
rec.green_zone_qty = rec.green_override
|
|
else:
|
|
rec.green_zone_qty = 0.0
|
|
rec.top_of_green = rec.green_zone_qty + rec.top_of_yellow
|
|
|
|
@api.depends(
|
|
"dlt",
|
|
"extra_lead_time",
|
|
"adu",
|
|
"buffer_profile_id.lead_time_id.factor",
|
|
"buffer_profile_id.variability_id.factor",
|
|
"buffer_profile_id.replenish_method",
|
|
"order_cycle",
|
|
"minimum_order_quantity",
|
|
"product_uom.rounding",
|
|
"yellow_override",
|
|
"red_zone_qty",
|
|
)
|
|
def _compute_yellow_zone(self):
|
|
for rec in self:
|
|
if rec.product_id and rec.replenish_method == "min_max":
|
|
rec.yellow_zone_qty = 0
|
|
elif rec.product_id and rec.replenish_method == "replenish":
|
|
dlt = rec.dlt + rec.extra_lead_time
|
|
rec.yellow_zone_qty = float_round(
|
|
dlt * rec.adu, precision_rounding=rec.product_uom.rounding
|
|
)
|
|
elif rec.product_id and rec.replenish_method == "replenish_override":
|
|
rec.yellow_zone_qty = rec.yellow_override
|
|
else:
|
|
rec.yellow_zone_qty = 0.0
|
|
rec.top_of_yellow = rec.yellow_zone_qty + rec.red_zone_qty
|
|
|
|
@api.depends(
|
|
"net_flow_position",
|
|
"top_of_green",
|
|
"qty_multiple",
|
|
"product_uom",
|
|
"procure_uom_id",
|
|
"product_uom.rounding",
|
|
)
|
|
def _compute_procure_recommended_qty(self):
|
|
subtract_qty = self.sudo()._quantity_in_progress()
|
|
for rec in self:
|
|
|
|
procure_recommended_qty = 0.0
|
|
# uses _origin because onchange uses a NewId with the record wrapped
|
|
if rec._origin and rec.net_flow_position < rec.top_of_yellow:
|
|
qty = (
|
|
rec.top_of_green
|
|
- rec.net_flow_position
|
|
- subtract_qty[rec._origin.id]
|
|
)
|
|
if qty >= 0.0:
|
|
procure_recommended_qty = qty
|
|
elif rec._origin:
|
|
if subtract_qty[rec._origin.id] > 0.0:
|
|
procure_recommended_qty -= subtract_qty[rec._origin.id]
|
|
|
|
adjusted_qty = 0.0
|
|
if procure_recommended_qty > 0.0:
|
|
adjusted_qty = rec._adjust_procure_qty(procure_recommended_qty)
|
|
rec.procure_recommended_qty = adjusted_qty
|
|
|
|
@api.depends("product_uom")
|
|
def _compute_procure_uom_id(self):
|
|
for rec in self:
|
|
rec.procure_uom_id = rec.product_uom.id
|
|
|
|
def _adjust_procure_qty(self, qty):
|
|
self.ensure_one()
|
|
# If there is a procure UoM we apply it before anything.
|
|
# This means max, min and multiple quantities are relative to
|
|
# the procure UoM.
|
|
if self.procure_uom_id:
|
|
rounding = self.procure_uom_id.rounding
|
|
adjusted_qty = self.product_id.uom_id._compute_quantity(
|
|
qty, self.procure_uom_id
|
|
)
|
|
else:
|
|
rounding = self.product_uom.rounding
|
|
adjusted_qty = qty
|
|
|
|
# Apply qty multiple and minimum quantity (maximum quantity
|
|
# applies on the procure wizard)
|
|
remainder = self.qty_multiple > 0 and adjusted_qty % self.qty_multiple or 0.0
|
|
multiple_tolerance = self.qty_multiple * (
|
|
self.company_id.ddmrp_qty_multiple_tolerance / 100
|
|
)
|
|
if (
|
|
float_compare(remainder, multiple_tolerance, precision_rounding=rounding)
|
|
> 0
|
|
):
|
|
adjusted_qty += self.qty_multiple - remainder
|
|
elif float_compare(remainder, 0.0, precision_rounding=rounding) > 0:
|
|
adjusted_qty -= remainder
|
|
if (
|
|
float_compare(
|
|
adjusted_qty, self.procure_min_qty, precision_rounding=rounding
|
|
)
|
|
< 0
|
|
):
|
|
adjusted_qty = self.procure_min_qty
|
|
return adjusted_qty
|
|
|
|
def _compute_ddmrp_chart_planning(self):
|
|
"""This method use the Bokeh library to create a buffer depiction."""
|
|
for rec in self:
|
|
div, script = rec.get_ddmrp_chart_planning()
|
|
json_data = json.dumps(
|
|
{
|
|
"div": div,
|
|
"script": script,
|
|
}
|
|
)
|
|
rec.ddmrp_chart = json_data
|
|
|
|
def _compute_ddmrp_chart_execution(self):
|
|
for rec in self:
|
|
div, script = rec.get_ddmrp_chart_execution()
|
|
json_data = json.dumps(
|
|
{
|
|
"div": div,
|
|
"script": script,
|
|
}
|
|
)
|
|
rec.ddmrp_chart_execution = json_data
|
|
|
|
def _get_colors_hex_map(self, pallete="planning"):
|
|
return DDMRP_COLOR
|
|
|
|
def get_ddmrp_chart_planning(self):
|
|
p = figure(frame_width=300, frame_height=400, y_axis_label="Quantity")
|
|
p.xaxis.visible = False
|
|
p.toolbar.logo = None
|
|
hex_colors = self._get_colors_hex_map(pallete="planning")
|
|
red = p.vbar(
|
|
x=1,
|
|
bottom=0,
|
|
top=self.top_of_red,
|
|
width=1,
|
|
color=hex_colors.get("1_red", "red"),
|
|
)
|
|
yellow = p.vbar(
|
|
x=1,
|
|
bottom=self.top_of_red,
|
|
top=self.top_of_yellow,
|
|
width=1,
|
|
color=hex_colors.get("2_yellow", "yellow"),
|
|
)
|
|
green = p.vbar(
|
|
x=1,
|
|
bottom=self.top_of_yellow,
|
|
top=self.top_of_green,
|
|
width=1,
|
|
color=hex_colors.get("3_green", "green"),
|
|
)
|
|
net_flow = p.line(
|
|
[0, 2], [self.net_flow_position, self.net_flow_position], line_width=2
|
|
)
|
|
on_hand = p.line(
|
|
[0, 2],
|
|
[
|
|
self.product_location_qty_available_not_res,
|
|
self.product_location_qty_available_not_res,
|
|
],
|
|
line_width=2,
|
|
line_dash="dotted",
|
|
)
|
|
legend = Legend(
|
|
items=[
|
|
("Red zone", [red]),
|
|
("Yellow zone", [yellow]),
|
|
("Green zone", [green]),
|
|
("Net Flow Position", [net_flow]),
|
|
("On-Hand Position (Unreserved)", [on_hand]),
|
|
],
|
|
)
|
|
labels_source_data = {
|
|
"height": [
|
|
self.net_flow_position,
|
|
self.product_location_qty_available_not_res,
|
|
self.top_of_red,
|
|
self.top_of_yellow,
|
|
self.top_of_green,
|
|
],
|
|
"weight": [0.25, 1.75, 1, 1, 1],
|
|
"names": [
|
|
str(self.net_flow_position),
|
|
str(self.product_location_qty_available_not_res),
|
|
str(self.top_of_red),
|
|
str(self.top_of_yellow),
|
|
str(self.top_of_green),
|
|
],
|
|
}
|
|
source = ColumnDataSource(data=labels_source_data)
|
|
labels = LabelSet(
|
|
x="weight",
|
|
y="height",
|
|
text="names",
|
|
y_offset=1,
|
|
text_font_size="8pt",
|
|
source=source,
|
|
text_align="center",
|
|
)
|
|
p.add_layout(labels)
|
|
p.add_layout(legend, "below")
|
|
|
|
script, div = components(p, wrap_script=False)
|
|
return div, script
|
|
|
|
def get_ddmrp_chart_execution(self):
|
|
p = figure(frame_width=300, frame_height=400, y_axis_label="Quantity")
|
|
p.xaxis.visible = False
|
|
p.toolbar.logo = None
|
|
tor_exec = float_round(
|
|
self.top_of_red / 2,
|
|
precision_rounding=self.product_uom.rounding,
|
|
)
|
|
toy_exec = self.top_of_red
|
|
tog_exec = float_round(
|
|
self.top_of_red + self.green_zone_qty,
|
|
precision_rounding=self.product_uom.rounding,
|
|
)
|
|
tor2_exec = self.top_of_green
|
|
toy2_exec = (tor2_exec + tog_exec) / 2
|
|
hex_colors = self._get_colors_hex_map(pallete="execution")
|
|
red = p.vbar(
|
|
x=1,
|
|
bottom=0,
|
|
top=tor_exec,
|
|
width=1,
|
|
color=hex_colors.get("1_red", "red"),
|
|
)
|
|
yellow = p.vbar(
|
|
x=1,
|
|
bottom=tor_exec,
|
|
top=toy_exec,
|
|
width=1,
|
|
color=hex_colors.get("2_yellow", "yellow"),
|
|
)
|
|
green = p.vbar(
|
|
x=1,
|
|
bottom=toy_exec,
|
|
top=tog_exec,
|
|
width=1,
|
|
color=hex_colors.get("3_green", "green"),
|
|
)
|
|
yellow_2 = p.vbar(
|
|
x=1,
|
|
bottom=tog_exec,
|
|
top=toy2_exec,
|
|
width=1,
|
|
color=hex_colors.get("2_yellow", "yellow"),
|
|
)
|
|
red_2 = p.vbar(
|
|
x=1,
|
|
bottom=toy2_exec,
|
|
top=tor2_exec,
|
|
width=1,
|
|
color=hex_colors.get("1_red", "red"),
|
|
)
|
|
on_hand = p.line(
|
|
[0, 2],
|
|
[
|
|
self.product_location_qty_available_not_res,
|
|
self.product_location_qty_available_not_res,
|
|
],
|
|
line_width=2,
|
|
line_dash="dotted",
|
|
)
|
|
legend = Legend(
|
|
items=[
|
|
("Red zone (Execution)", [red, red_2]),
|
|
("Yellow zone (Execution)", [yellow, yellow_2]),
|
|
("Green zone (Execution)", [green]),
|
|
("On-Hand Position (Unreserved)", [on_hand]),
|
|
]
|
|
)
|
|
labels_source_data = {
|
|
"height": [
|
|
self.product_location_qty_available_not_res,
|
|
tor_exec,
|
|
toy_exec,
|
|
tog_exec,
|
|
toy2_exec,
|
|
],
|
|
"weight": [0.25, 1, 1, 1, 1],
|
|
"names": [
|
|
str(self.product_location_qty_available_not_res),
|
|
str(tor_exec),
|
|
str(toy_exec),
|
|
str(tog_exec),
|
|
str(toy2_exec),
|
|
],
|
|
}
|
|
source = ColumnDataSource(data=labels_source_data)
|
|
labels = LabelSet(
|
|
x="weight",
|
|
y="height",
|
|
text="names",
|
|
y_offset=1,
|
|
text_font_size="8pt",
|
|
source=source,
|
|
text_align="center",
|
|
)
|
|
p.add_layout(labels)
|
|
p.add_layout(legend, "below")
|
|
|
|
script, div = components(p, wrap_script=False)
|
|
return div, script
|
|
|
|
def _compute_ddmrp_demand_supply_chart(self):
|
|
for rec in self:
|
|
if not rec.buffer_profile_id:
|
|
# Not a buffer, skip.
|
|
rec.ddmrp_demand_chart = ""
|
|
rec.ddmrp_supply_chart = ""
|
|
continue
|
|
|
|
# Prepare data:
|
|
demand_data = rec._get_demand_by_days(rec.qualified_demand_stock_move_ids)
|
|
mrp_data = rec._get_qualified_mrp_moves(rec.qualified_demand_mrp_move_ids)
|
|
supply_data = rec._get_incoming_by_days()
|
|
width = timedelta(days=0.4)
|
|
date_format = (
|
|
self.env["res.lang"]._lang_get(self.env.lang or "en_US").date_format
|
|
)
|
|
|
|
# Plot demand data:
|
|
if demand_data or mrp_data:
|
|
x_demand = list(convert_datetime_type(x) for x in demand_data.keys())
|
|
y_demand = list(demand_data.values())
|
|
x_mrp = list(convert_datetime_type(x) for x in mrp_data.keys())
|
|
y_mrp = list(mrp_data.values())
|
|
|
|
p = figure(
|
|
frame_width=500,
|
|
frame_height=400,
|
|
y_axis_label="Quantity",
|
|
x_axis_type="datetime",
|
|
)
|
|
p.toolbar.logo = None
|
|
p.sizing_mode = "stretch_both"
|
|
# TODO: # p.xaxis.label_text_font = "helvetica"
|
|
p.xaxis.formatter = DatetimeTickFormatter(
|
|
hours=date_format,
|
|
days=date_format,
|
|
months=date_format,
|
|
years=date_format,
|
|
)
|
|
p.xaxis.major_label_orientation = pi / 4
|
|
|
|
if demand_data:
|
|
p.vbar(
|
|
x=x_demand,
|
|
width=width,
|
|
bottom=0,
|
|
top=y_demand,
|
|
color="firebrick",
|
|
)
|
|
if mrp_data:
|
|
p.vbar(
|
|
x=x_mrp, width=width, bottom=0, top=y_mrp, color="lightsalmon"
|
|
)
|
|
p.line(
|
|
[
|
|
datetime.today() - timedelta(days=1),
|
|
datetime.today() + timedelta(days=rec.order_spike_horizon),
|
|
],
|
|
[rec.order_spike_threshold, rec.order_spike_threshold],
|
|
line_width=2,
|
|
line_dash="dashed",
|
|
)
|
|
|
|
unit = rec.product_uom.name
|
|
hover = HoverTool(
|
|
tooltips=[("qty", "$y %s" % unit)], point_policy="follow_mouse"
|
|
)
|
|
p.add_tools(hover)
|
|
|
|
script, div = components(p, wrap_script=False)
|
|
json_data = json.dumps(
|
|
{
|
|
"div": div,
|
|
"script": script,
|
|
}
|
|
)
|
|
rec.ddmrp_demand_chart = json_data
|
|
else:
|
|
rec.ddmrp_demand_chart = json.dumps(
|
|
{
|
|
"div": _("No demand detected."),
|
|
"script": "",
|
|
}
|
|
)
|
|
|
|
# Plot supply data:
|
|
if supply_data:
|
|
x_supply = list(convert_datetime_type(x) for x in supply_data.keys())
|
|
y_supply = list(supply_data.values())
|
|
|
|
p = figure(
|
|
frame_width=500,
|
|
frame_height=400,
|
|
y_axis_label="Quantity",
|
|
x_axis_type="datetime",
|
|
)
|
|
p.toolbar.logo = None
|
|
p.sizing_mode = "stretch_both"
|
|
p.xaxis.formatter = DatetimeTickFormatter(
|
|
hours=date_format,
|
|
days=date_format,
|
|
months=date_format,
|
|
years=date_format,
|
|
)
|
|
p.xaxis.major_label_orientation = pi / 4
|
|
|
|
# White line to have similar proportion to demand chart.
|
|
p.line(
|
|
[
|
|
datetime.today() - timedelta(days=1),
|
|
datetime.today() + timedelta(days=rec.order_spike_horizon),
|
|
],
|
|
[rec.order_spike_threshold, rec.order_spike_threshold],
|
|
line_width=2,
|
|
line_dash="dashed",
|
|
color="white",
|
|
)
|
|
|
|
p.vbar(x=x_supply, width=width, bottom=0, top=y_supply, color="grey")
|
|
|
|
unit = rec.product_uom.name
|
|
hover = HoverTool(
|
|
tooltips=[("qty", "$y %s" % unit)], point_policy="follow_mouse"
|
|
)
|
|
p.add_tools(hover)
|
|
|
|
script, div = components(p, wrap_script=False)
|
|
json_data = json.dumps(
|
|
{
|
|
"div": div,
|
|
"script": script,
|
|
}
|
|
)
|
|
rec.ddmrp_supply_chart = json_data
|
|
else:
|
|
rec.ddmrp_supply_chart = json.dumps(
|
|
{
|
|
"div": _("No supply detected."),
|
|
"script": "",
|
|
}
|
|
)
|
|
|
|
@api.depends("red_zone_qty")
|
|
def _compute_order_spike_threshold(self):
|
|
for rec in self:
|
|
rec.order_spike_threshold = 0.5 * rec.red_zone_qty
|
|
|
|
def _get_manufactured_bom(self, limit=1):
|
|
return self.env["mrp.bom"].search(
|
|
[
|
|
("type", "=", "normal"),
|
|
"|",
|
|
("product_id", "=", self.product_id.id),
|
|
("product_tmpl_id", "=", self.product_id.product_tmpl_id.id),
|
|
"|",
|
|
("company_id", "=", self.company_id.id),
|
|
("company_id", "=", False),
|
|
],
|
|
limit=limit,
|
|
)
|
|
|
|
@api.depends("lead_days", "product_id.seller_ids.delay")
|
|
def _compute_dlt(self):
|
|
for rec in self:
|
|
if rec.buffer_profile_id.item_type == "manufactured":
|
|
bom = rec._get_manufactured_bom()
|
|
dlt = bom.with_context(location_id=rec.location_id.id).dlt
|
|
elif rec.buffer_profile_id.item_type == "distributed":
|
|
dlt = rec.lead_days
|
|
else:
|
|
sellers = rec._get_product_sellers()
|
|
dlt = sellers and fields.first(sellers).delay or rec.lead_days
|
|
rec.dlt = dlt
|
|
|
|
def _get_product_sellers(self):
|
|
""":returns the default sellers for a single buffer."""
|
|
self.ensure_one()
|
|
all_sellers = self.product_id.seller_ids.filtered(
|
|
lambda r: not r.company_id or r.company_id == self.company_id
|
|
)
|
|
today = fields.Date.context_today(self)
|
|
sellers = all_sellers.filtered(
|
|
lambda s: (
|
|
(s.product_id == self.product_id or not s.product_id)
|
|
and (
|
|
(s.date_start <= today if s.date_start else True)
|
|
and (s.date_end >= today if s.date_end else True)
|
|
)
|
|
)
|
|
)
|
|
if not sellers:
|
|
# fallback to all sellers
|
|
sellers = all_sellers
|
|
# When the current transaction changed the sequence, it may happen that
|
|
# the sellers' recordset is not correctly sorted by default.
|
|
sellers = sellers.sorted(key=lambda s: (s.sequence, -s.min_qty, s.price))
|
|
return sellers
|
|
|
|
@api.depends(
|
|
"buffer_profile_id",
|
|
"item_type",
|
|
"product_id.seller_ids",
|
|
"product_id.seller_ids.company_id",
|
|
"product_id.seller_ids.partner_id",
|
|
"product_id.seller_ids.product_id",
|
|
"product_id.seller_ids.sequence",
|
|
"product_id.seller_ids.min_qty",
|
|
"product_id.seller_ids.price",
|
|
)
|
|
def _compute_main_supplier(self):
|
|
for rec in self:
|
|
if rec.item_type == "purchased":
|
|
suppliers = rec._get_product_sellers()
|
|
rec.main_supplier_id = suppliers[0].partner_id if suppliers else False
|
|
else:
|
|
rec.main_supplier_id = False
|
|
|
|
@api.depends("main_supplier_id", "product_id.seller_ids")
|
|
def _compute_product_vendor_code(self):
|
|
for rec in self:
|
|
if not rec.main_supplier_id:
|
|
rec.product_vendor_code = False
|
|
continue
|
|
supplier_info = rec._get_product_sellers().filtered(
|
|
lambda r: r.partner_id == rec.main_supplier_id
|
|
and r.product_id == rec.product_id
|
|
)
|
|
rec.product_vendor_code = fields.first(supplier_info).product_code
|
|
|
|
buffer_profile_id = fields.Many2one(
|
|
comodel_name="stock.buffer.profile",
|
|
string="Buffer Profile",
|
|
required=True,
|
|
)
|
|
replenish_method = fields.Selection(
|
|
related="buffer_profile_id.replenish_method",
|
|
)
|
|
item_type = fields.Selection(
|
|
related="buffer_profile_id.item_type",
|
|
)
|
|
main_supplier_id = fields.Many2one(
|
|
comodel_name="res.partner",
|
|
string="Main Supplier",
|
|
help=(
|
|
"The main supplier is the first listed supplier defined "
|
|
"on the product that is valid for this product variant. "
|
|
"Any date restrictions are not taken into account."
|
|
),
|
|
compute="_compute_main_supplier",
|
|
store=True,
|
|
index=True,
|
|
)
|
|
green_override = fields.Float(
|
|
string="Green Zone (Override)",
|
|
)
|
|
yellow_override = fields.Float(
|
|
string="Yellow Zone (Override)",
|
|
)
|
|
red_override = fields.Float(
|
|
string="Red Zone (Override)",
|
|
)
|
|
product_vendor_code = fields.Char(
|
|
compute="_compute_product_vendor_code", string="Vendor Code"
|
|
)
|
|
dlt = fields.Float(
|
|
string="DLT (days)",
|
|
compute="_compute_dlt",
|
|
help="Decoupled Lead Time (days)",
|
|
)
|
|
adu = fields.Float(
|
|
string="ADU",
|
|
default=0.0,
|
|
digits="Average Daily Usage",
|
|
readonly=True,
|
|
help="Average Daily Usage",
|
|
)
|
|
adu_calculation_method = fields.Many2one(
|
|
comodel_name="product.adu.calculation.method",
|
|
string="ADU calculation method",
|
|
required=True,
|
|
)
|
|
adu_calculation_method_type = fields.Selection(
|
|
related="adu_calculation_method.method",
|
|
)
|
|
adu_fixed = fields.Float(
|
|
string="Fixed ADU",
|
|
default=1.0,
|
|
digits="Average Daily Usage",
|
|
)
|
|
order_cycle = fields.Float(string="Minimum Order Cycle (days)")
|
|
minimum_order_quantity = fields.Float(
|
|
digits="Product Unit of Measure",
|
|
)
|
|
red_base_qty = fields.Float(
|
|
compute="_compute_red_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
red_safety_qty = fields.Float(
|
|
compute="_compute_red_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
red_zone_qty = fields.Float(
|
|
compute="_compute_red_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
top_of_red = fields.Float(
|
|
string="Top Of Red",
|
|
related="red_zone_qty",
|
|
store=True,
|
|
)
|
|
green_zone_qty = fields.Float(
|
|
compute="_compute_green_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
green_zone_lt_factor = fields.Float(
|
|
string="Green Zone Lead Time Factor",
|
|
compute="_compute_green_zone",
|
|
store=True,
|
|
help="Green zone Lead Time Factor",
|
|
)
|
|
green_zone_moq = fields.Float(
|
|
string="Green Zone Minimum Order Quantity",
|
|
compute="_compute_green_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
help="Green zone qty option considering minimum order quantity",
|
|
)
|
|
green_zone_oc = fields.Float(
|
|
string="Green Zone Order Cycle",
|
|
compute="_compute_green_zone",
|
|
store=True,
|
|
help="Green zone qty option considering desired Order Cycle",
|
|
)
|
|
yellow_zone_qty = fields.Float(
|
|
compute="_compute_yellow_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
top_of_yellow = fields.Float(
|
|
compute="_compute_yellow_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
top_of_green = fields.Float(
|
|
compute="_compute_green_zone",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
order_spike_horizon = fields.Float()
|
|
order_spike_threshold = fields.Float(
|
|
compute="_compute_order_spike_threshold",
|
|
digits="Product Unit of Measure",
|
|
store=True,
|
|
)
|
|
qualified_demand = fields.Float(
|
|
digits="Product Unit of Measure",
|
|
readonly=True,
|
|
)
|
|
qualified_demand_stock_move_ids = fields.Many2many(
|
|
comodel_name="stock.move",
|
|
)
|
|
qualified_demand_mrp_move_ids = fields.Many2many(
|
|
comodel_name="mrp.move",
|
|
)
|
|
incoming_total_qty = fields.Float(
|
|
string="Total Incoming",
|
|
readonly=True,
|
|
)
|
|
incoming_dlt_qty = fields.Float(
|
|
string="Incoming (Within DLT)",
|
|
readonly=True,
|
|
)
|
|
incoming_outside_dlt_qty = fields.Float(
|
|
string="Incoming (Outside DLT)",
|
|
readonly=True,
|
|
)
|
|
rfq_outside_dlt_qty = fields.Float(
|
|
string="RFQ Qty (Outside DLT)",
|
|
readonly=True,
|
|
help="Request for Quotation total quantity that is planned outside of "
|
|
"the DLT horizon.",
|
|
)
|
|
rfq_inside_dlt_qty = fields.Float(
|
|
string="RFQ Qty (Inside DLT)",
|
|
readonly=True,
|
|
help="Request for Quotation total quantity that is planned inside of "
|
|
"the DLT horizon.",
|
|
)
|
|
rfq_total_qty = fields.Float(
|
|
string="RFQ Total Qty",
|
|
readonly=True,
|
|
help="Request for Quotation total quantity that is planned",
|
|
)
|
|
net_flow_position = fields.Float(
|
|
digits="Product Unit of Measure",
|
|
readonly=True,
|
|
)
|
|
net_flow_position_percent = fields.Float(
|
|
string="Net flow position (% of TOG)",
|
|
readonly=True,
|
|
)
|
|
planning_priority_level = fields.Selection(
|
|
selection=_PRIORITY_LEVEL,
|
|
readonly=True,
|
|
)
|
|
execution_priority_level = fields.Selection(
|
|
string="On-Hand Alert Level",
|
|
selection=_PRIORITY_LEVEL,
|
|
store=True,
|
|
readonly=True,
|
|
)
|
|
on_hand_percent = fields.Float(
|
|
string="On Hand/TOR (%)",
|
|
store=True,
|
|
readonly=True,
|
|
)
|
|
on_hand_target_position = fields.Float(
|
|
string="Avg On Hand Target Position",
|
|
help="It denotes what the target stock on hand is. The "
|
|
"computation is: OH Target = TOR + Green Zone / 2 ",
|
|
compute="_compute_on_hand_target_position",
|
|
)
|
|
on_hand_target_max = fields.Float(
|
|
string="Target On Hand (Max)",
|
|
help="It denotes how far you are on average from the target",
|
|
compute="_compute_on_hand_target_max",
|
|
)
|
|
on_hand_target_min = fields.Float(
|
|
related="top_of_red",
|
|
string="On Hand Target Range",
|
|
help="It denotes what the target stock on hand range is.",
|
|
)
|
|
mrp_production_ids = fields.One2many(
|
|
string="Manufacturing Orders",
|
|
comodel_name="mrp.production",
|
|
inverse_name="buffer_id",
|
|
)
|
|
ddmrp_chart = fields.Text(
|
|
string="DDMRP Chart",
|
|
compute=_compute_ddmrp_chart_planning,
|
|
)
|
|
ddmrp_chart_execution = fields.Text(
|
|
string="DDMRP Execution Chart", compute=_compute_ddmrp_chart_execution
|
|
)
|
|
show_execution_chart = fields.Boolean()
|
|
ddmrp_demand_chart = fields.Text(
|
|
string="DDMRP Demand Chart",
|
|
compute="_compute_ddmrp_demand_supply_chart",
|
|
)
|
|
ddmrp_supply_chart = fields.Text(
|
|
string="DDMRP Supply Chart",
|
|
compute="_compute_ddmrp_demand_supply_chart",
|
|
)
|
|
auto_procure = fields.Boolean(
|
|
default=False,
|
|
help="Whenever the buffer is recomputed, if this option is set, it "
|
|
"will procure automatically if needed.",
|
|
)
|
|
auto_procure_option = fields.Selection(
|
|
selection=[
|
|
("standard", "When recommended (NFP below TOY)"),
|
|
("stockout", "When in stockout"),
|
|
],
|
|
default="standard",
|
|
required=True,
|
|
)
|
|
extra_lead_time = fields.Float(
|
|
string="Extra lead time (for Sizing)",
|
|
default=0.0,
|
|
help="When defined, this lead time will be added to the decoupled "
|
|
"lead time for the computation of the zones size (it won't affect "
|
|
"planned date for procurements).\n"
|
|
"This is particularly useful in situations of infrequent but "
|
|
"periodic demand. E.g. We receive a large order every 30 days, "
|
|
"whereas the supplier takes 10 days to supply. \n"
|
|
"In this case the yellow zone must cover for the "
|
|
"entire cycle of 30 days of demand.\n"
|
|
"In situations with infrequent demand the ADU tends to be very"
|
|
" small, and every new order would be treated as a spike, when \n"
|
|
"in reality this is not an exceptional situation.",
|
|
)
|
|
distributed_source_location_id = fields.Many2one(
|
|
string="Replenishment Location",
|
|
comodel_name="stock.location",
|
|
readonly=True,
|
|
index=True,
|
|
help="Source location from where goods will be replenished. "
|
|
"Computed when buffer is refreshed from following the Stock Rules.",
|
|
)
|
|
distributed_source_location_qty = fields.Float(
|
|
string="Source Location Free Quantity (distributed)",
|
|
compute="_compute_distributed_source_location_qty",
|
|
search="_search_distributed_source_location_qty",
|
|
help="Quantity free for distributed buffer in the source location. "
|
|
"When a procurement is requested, if the option is active on the profile,"
|
|
" it will be limited to this quantity.",
|
|
)
|
|
|
|
@api.depends(
|
|
"top_of_green",
|
|
"top_of_yellow",
|
|
"top_of_red",
|
|
)
|
|
def _compute_on_hand_target_position(self):
|
|
for rec in self:
|
|
green_zone_size = rec.top_of_green - rec.top_of_yellow
|
|
rec.on_hand_target_position = rec.top_of_red + green_zone_size / 2
|
|
|
|
@api.depends(
|
|
"top_of_green",
|
|
"top_of_yellow",
|
|
"top_of_red",
|
|
)
|
|
def _compute_on_hand_target_max(self):
|
|
for rec in self:
|
|
green_zone_size = rec.top_of_green - rec.top_of_yellow
|
|
rec.on_hand_target_max = rec.top_of_red + green_zone_size
|
|
|
|
@api.depends(
|
|
"distributed_source_location_id",
|
|
"distributed_source_location_id.quant_ids.quantity",
|
|
"distributed_source_location_id.quant_ids.reserved_quantity",
|
|
)
|
|
def _compute_distributed_source_location_qty(self):
|
|
to_compute_per_location = {}
|
|
for record in self:
|
|
location = record.distributed_source_location_id
|
|
if not location:
|
|
record.distributed_source_location_qty = 0.0
|
|
continue
|
|
to_compute_per_location.setdefault(location.id, set())
|
|
to_compute_per_location[location.id].add(record.id)
|
|
|
|
# batch computation per location
|
|
for location_id, buffer_ids in to_compute_per_location.items():
|
|
buffers = self.browse(buffer_ids).with_context(location=location_id)
|
|
for buf in buffers:
|
|
buf.distributed_source_location_qty = buf.product_id.free_qty
|
|
|
|
def _search_distributed_source_location_qty(self, operator, value):
|
|
if operator not in OPERATORS:
|
|
raise exceptions.UserError(_("Unsupported operator %s") % (operator,))
|
|
buffers = self.search([("distributed_source_location_id", "!=", False)])
|
|
operator_func = OPERATORS[operator]
|
|
buffers = buffers.filtered(
|
|
lambda buf: operator_func(buf.distributed_source_location_qty, value)
|
|
)
|
|
return [("id", "in", buffers.ids)]
|
|
|
|
def _search_open_stock_moves_domain(self):
|
|
self.ensure_one()
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
(
|
|
"state",
|
|
"in",
|
|
["draft", "waiting", "confirmed", "partially_available", "assigned"],
|
|
),
|
|
]
|
|
|
|
@api.model
|
|
def _stock_move_tree_view(self, lines):
|
|
views = []
|
|
tree_view = self.env.ref("stock.view_move_tree", False)
|
|
if tree_view:
|
|
views += [(tree_view.id, "tree")]
|
|
form_view = self.env.ref("stock.view_move_form", False)
|
|
if form_view:
|
|
views += [(form_view.id, "form")]
|
|
|
|
return {
|
|
"name": _("Non-completed Moves"),
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "stock.move",
|
|
"view_type": "form",
|
|
"views": views,
|
|
"view_mode": "tree,form",
|
|
"domain": str([("id", "in", lines.ids)]),
|
|
}
|
|
|
|
def _get_horizon_adu_past_demand(self):
|
|
return self.adu_calculation_method.horizon_past or 0
|
|
|
|
def _get_dates_adu_past_demand(self, horizon):
|
|
date_from = fields.Date.to_string(
|
|
self.warehouse_id.wh_plan_days(datetime.now(), -1 * horizon)
|
|
)
|
|
date_to = fields.Date.to_string(
|
|
self.warehouse_id.wh_plan_days(datetime.now(), -1)
|
|
)
|
|
return date_from, date_to
|
|
|
|
def _past_mrp_move_domain(self, date_from, date_to, locations):
|
|
self.ensure_one()
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
("mrp_date", "<=", date_to),
|
|
("mrp_date", ">=", date_from),
|
|
("mrp_area_id.location_id", "in", locations.ids),
|
|
("mrp_type", "=", "d"),
|
|
("mrp_origin", "in", ["mrp", "mo"]),
|
|
]
|
|
|
|
def _past_moves_domain(self, date_from, date_to, locations):
|
|
self.ensure_one()
|
|
domain = [
|
|
("state", "=", "done"),
|
|
("location_id", "in", locations.ids),
|
|
("location_dest_id", "not in", locations.ids),
|
|
("location_dest_id.usage", "!=", "inventory"),
|
|
("product_id", "=", self.product_id.id),
|
|
("date", ">=", date_from),
|
|
("date", "<=", date_to),
|
|
]
|
|
if not self.env.company.ddmrp_adu_calc_include_scrap:
|
|
domain.append(("location_id.scrap_location", "=", False))
|
|
return domain
|
|
|
|
def _calc_adu_past_demand(self):
|
|
self.ensure_one()
|
|
horizon = self._get_horizon_adu_past_demand()
|
|
# today is excluded to be sure that is a past day and all moves
|
|
# for that day are done (or at least the expected date is in the past).
|
|
date_from, date_to = self._get_dates_adu_past_demand(horizon)
|
|
locations = (
|
|
self.env["stock.location"]
|
|
.with_context(active_test=False)
|
|
.search([("id", "child_of", self.location_id.ids)])
|
|
)
|
|
qty = 0.0
|
|
if self.adu_calculation_method.source_past == "estimates_mrp":
|
|
domain = self._past_mrp_move_domain(date_from, date_to, locations)
|
|
for mrp_move in self.env["mrp.move"].search(domain):
|
|
qty += -mrp_move.mrp_qty
|
|
if self.adu_calculation_method.source_past in ["estimates", "estimates_mrp"]:
|
|
domain = self._demand_estimate_domain(locations, date_from, date_to)
|
|
for estimate in self.env["stock.demand.estimate"].search(domain):
|
|
qty += estimate.get_quantity_by_date_range(
|
|
fields.Date.from_string(date_from), fields.Date.from_string(date_to)
|
|
)
|
|
elif self.adu_calculation_method.source_past == "actual":
|
|
domain = self._past_moves_domain(date_from, date_to, locations)
|
|
for group in self.env["stock.move"].read_group(
|
|
domain, ["product_id", "product_qty"], ["product_id"]
|
|
):
|
|
qty += group["product_qty"]
|
|
return qty / horizon
|
|
|
|
def _get_horizon_adu_future_demand(self):
|
|
return self.adu_calculation_method.horizon_future or 1
|
|
|
|
def _get_dates_adu_future_demand(self, horizon):
|
|
date_from = fields.Datetime.now()
|
|
date_to = self.warehouse_id.wh_plan_days(date_from, horizon)
|
|
date_to = date_to.replace(
|
|
hour=date_from.hour,
|
|
minute=date_from.minute,
|
|
second=date_from.second,
|
|
)
|
|
return date_from, date_to
|
|
|
|
def _future_mrp_move_domain(self, date_from, date_to, locations):
|
|
self.ensure_one()
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
("mrp_date", "<=", date_to),
|
|
("mrp_date", ">=", date_from),
|
|
("mrp_area_id.location_id", "in", locations.ids),
|
|
("mrp_type", "=", "d"),
|
|
("mrp_origin", "in", ["mrp", "mo"]),
|
|
]
|
|
|
|
def _future_moves_domain(self, date_from, date_to, locations):
|
|
self.ensure_one()
|
|
domain = [
|
|
("state", "not in", ["done", "cancel"]),
|
|
("location_id", "in", locations.ids),
|
|
("location_dest_id", "not in", locations.ids),
|
|
("location_dest_id.usage", "!=", "inventory"),
|
|
("product_id", "=", self.product_id.id),
|
|
("date", ">=", date_from),
|
|
("date", "<=", date_to),
|
|
]
|
|
if not self.env.company.ddmrp_adu_calc_include_scrap:
|
|
domain.append(("location_id.scrap_location", "=", False))
|
|
return domain
|
|
|
|
def _calc_adu_future_demand(self):
|
|
self.ensure_one()
|
|
horizon = self._get_horizon_adu_future_demand()
|
|
date_from, date_to = self._get_dates_adu_future_demand(horizon)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
qty = 0.0
|
|
if self.adu_calculation_method.source_future == "estimates_mrp":
|
|
domain = self._future_mrp_move_domain(date_from, date_to, locations)
|
|
for mrp_move in self.env["mrp.move"].search(domain):
|
|
qty += -mrp_move.mrp_qty
|
|
if self.adu_calculation_method.source_future in ["estimates", "estimates_mrp"]:
|
|
domain = self._demand_estimate_domain(locations, date_from, date_to)
|
|
for estimate in self.env["stock.demand.estimate"].search(domain):
|
|
qty += estimate.get_quantity_by_date_range(
|
|
fields.Date.from_string(date_from), fields.Date.from_string(date_to)
|
|
)
|
|
elif self.adu_calculation_method.source_future == "actual":
|
|
domain = self._future_moves_domain(date_from, date_to, locations)
|
|
for group in self.env["stock.move"].read_group(
|
|
domain, ["product_id", "product_qty"], ["product_id"]
|
|
):
|
|
qty += group["product_qty"]
|
|
return qty / horizon
|
|
|
|
def _calc_adu_blended(self):
|
|
self.ensure_one()
|
|
past_comp = self._calc_adu_past_demand()
|
|
fp = self.adu_calculation_method.factor_past
|
|
future_comp = self._calc_adu_future_demand()
|
|
ff = self.adu_calculation_method.factor_future
|
|
return past_comp * fp + future_comp * ff
|
|
|
|
def _calc_adu(self):
|
|
for rec in self:
|
|
if rec.adu_calculation_method.method == "fixed":
|
|
rec.adu = rec.adu_fixed
|
|
elif rec.adu_calculation_method.method == "past":
|
|
rec.adu = rec._calc_adu_past_demand()
|
|
elif rec.adu_calculation_method.method == "future":
|
|
rec.adu = rec._calc_adu_future_demand()
|
|
elif rec.adu_calculation_method.method == "blended":
|
|
rec.adu = rec._calc_adu_blended()
|
|
return True
|
|
|
|
def _search_stock_moves_qualified_demand_domain(self):
|
|
self.ensure_one()
|
|
horizon = self.order_spike_horizon
|
|
date_to = self.warehouse_id.wh_plan_days(datetime.now(), horizon)
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
(
|
|
"state",
|
|
"in",
|
|
["waiting", "confirmed", "partially_available", "assigned"],
|
|
),
|
|
("date", "<=", date_to),
|
|
]
|
|
|
|
def _search_stock_moves_qualified_demand(self):
|
|
domain = self._search_stock_moves_qualified_demand_domain()
|
|
moves = self.env["stock.move"].search(domain)
|
|
moves = moves.filtered(
|
|
lambda move: move.location_id.is_sublocation_of(self.location_id)
|
|
and not move.location_dest_id.is_sublocation_of(self.location_id)
|
|
)
|
|
return moves
|
|
|
|
def _get_incoming_supply_date_limit(self):
|
|
# The safety factor allows to control the date limit
|
|
factor = self.warehouse_id.nfp_incoming_safety_factor or 1
|
|
horizon = int(self.dlt) * factor
|
|
return self._get_date_planned(force_lt=horizon)
|
|
|
|
def _search_stock_moves_incoming_domain(self, outside_dlt=False):
|
|
date_to = self._get_incoming_supply_date_limit()
|
|
date_operator = ">" if outside_dlt else "<="
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
(
|
|
"state",
|
|
"in",
|
|
["waiting", "confirmed", "partially_available", "assigned"],
|
|
),
|
|
("date", date_operator, date_to),
|
|
]
|
|
|
|
def _search_stock_moves_incoming(self, outside_dlt=False):
|
|
domain = self._search_stock_moves_incoming_domain(outside_dlt=outside_dlt)
|
|
moves = self.env["stock.move"].search(domain)
|
|
moves = moves.filtered(
|
|
lambda move: not move.location_id.is_sublocation_of(self.location_id)
|
|
and move.location_dest_id.is_sublocation_of(self.location_id)
|
|
)
|
|
return moves
|
|
|
|
def _get_incoming_by_days(self):
|
|
self.ensure_one()
|
|
moves = self._search_stock_moves_incoming()
|
|
incoming_by_days = {}
|
|
move_dates = [dt.date() for dt in moves.mapped("date")]
|
|
for move_date in move_dates:
|
|
incoming_by_days[move_date] = 0.0
|
|
for move in moves:
|
|
date = move.date.date()
|
|
incoming_by_days[date] += move.product_qty
|
|
return incoming_by_days
|
|
|
|
def _get_demand_by_days(self, moves):
|
|
self.ensure_one()
|
|
demand_by_days = {}
|
|
move_dates = [dt.date() for dt in moves.mapped("date")]
|
|
for move_date in move_dates:
|
|
demand_by_days[move_date] = 0.0
|
|
for move in moves:
|
|
date = move.date.date()
|
|
demand_by_days[
|
|
date
|
|
] += move.product_qty - move.product_uom._compute_quantity(
|
|
move.reserved_availability, move.product_id.uom_id
|
|
)
|
|
return demand_by_days
|
|
|
|
def _search_mrp_moves_qualified_demand_domain(self):
|
|
self.ensure_one()
|
|
horizon = self.order_spike_horizon
|
|
date_to = self.warehouse_id.wh_plan_days(datetime.now(), horizon)
|
|
return [
|
|
("product_id", "=", self.product_id.id),
|
|
("mrp_type", "=", "d"),
|
|
("mrp_date", "<=", date_to),
|
|
]
|
|
|
|
def _search_mrp_moves_qualified_demand(self):
|
|
domain = self._search_mrp_moves_qualified_demand_domain()
|
|
moves = self.env["mrp.move"].search(domain)
|
|
moves = moves.filtered(
|
|
lambda move: move.mrp_area_id.location_id.is_sublocation_of(
|
|
self.location_id
|
|
)
|
|
)
|
|
return moves
|
|
|
|
def _get_qualified_mrp_moves(self, moves):
|
|
self.ensure_one()
|
|
mrp_moves_by_days = {}
|
|
move_dates = [dt for dt in moves.mapped("mrp_date")]
|
|
for move_date in move_dates:
|
|
mrp_moves_by_days[move_date] = 0.0
|
|
for move in moves:
|
|
date = move.mrp_date
|
|
mrp_moves_by_days[date] += abs(move.mrp_qty)
|
|
return mrp_moves_by_days
|
|
|
|
def _calc_qualified_demand(self, current_date=False):
|
|
today = current_date or fields.date.today()
|
|
for rec in self:
|
|
qualified_demand = 0.0
|
|
moves = rec._search_stock_moves_qualified_demand()
|
|
mrp_moves = rec._search_mrp_moves_qualified_demand()
|
|
demand_by_days = rec._get_demand_by_days(moves)
|
|
mrp_moves_by_days = rec._get_qualified_mrp_moves(mrp_moves)
|
|
dates = list(set(demand_by_days.keys()) | set(mrp_moves_by_days.keys()))
|
|
for date in dates:
|
|
if (
|
|
demand_by_days.get(date, 0.0) >= rec.order_spike_threshold
|
|
or date <= today
|
|
):
|
|
qualified_demand += demand_by_days.get(date, 0.0)
|
|
else:
|
|
moves = moves.filtered(lambda x: x.date != date)
|
|
if (
|
|
mrp_moves_by_days.get(date, 0.0) >= rec.order_spike_threshold
|
|
or date <= today
|
|
):
|
|
qualified_demand += mrp_moves_by_days.get(date, 0.0)
|
|
else:
|
|
mrp_moves = mrp_moves.filtered(lambda x: x.mrp_date != date)
|
|
rec.qualified_demand = qualified_demand
|
|
rec.qualified_demand_stock_move_ids = moves
|
|
rec.qualified_demand_mrp_move_ids = mrp_moves
|
|
return True
|
|
|
|
def _calc_incoming_dlt_qty(self):
|
|
for rec in self:
|
|
moves = self._search_stock_moves_incoming()
|
|
rec.incoming_dlt_qty = sum(moves.mapped("product_qty"))
|
|
outside_dlt_moves = self._search_stock_moves_incoming(outside_dlt=True)
|
|
rec.incoming_outside_dlt_qty = sum(outside_dlt_moves.mapped("product_qty"))
|
|
if rec.item_type == "purchased":
|
|
pols_outside_dlt = rec._get_rfq_dlt(dlt_interval="outside")
|
|
rec.rfq_outside_dlt_qty = sum(pols_outside_dlt.mapped("product_qty"))
|
|
pols_inside_dlt = rec._get_rfq_dlt(dlt_interval="inside")
|
|
rec.rfq_inside_dlt_qty = sum(pols_inside_dlt.mapped("product_qty"))
|
|
rec.rfq_total_qty = rec.rfq_inside_dlt_qty + rec.rfq_outside_dlt_qty
|
|
else:
|
|
rec.rfq_outside_dlt_qty = 0.0
|
|
rec.rfq_inside_dlt_qty = 0.0
|
|
rec.rfq_total_qty = 0.0
|
|
rec.incoming_total_qty = rec.incoming_dlt_qty + rec.incoming_outside_dlt_qty
|
|
return True
|
|
|
|
def _calc_net_flow_position(self):
|
|
for rec in self:
|
|
rec.net_flow_position = (
|
|
rec.product_location_qty_available_not_res
|
|
+ rec.incoming_dlt_qty
|
|
- rec.qualified_demand
|
|
)
|
|
usage = 0.0
|
|
if rec.top_of_green:
|
|
usage = round((rec.net_flow_position / rec.top_of_green * 100), 2)
|
|
rec.net_flow_position_percent = usage
|
|
return True
|
|
|
|
def _calc_distributed_source_location(self):
|
|
"""Compute source location used for replenishment of distributed buffer
|
|
|
|
It follows the rules of the default route until it finds a "Take from
|
|
stock" rule. The source location depends on many factors (route on
|
|
warehouse, product, category, ...), that's why it is updated only
|
|
on refresh of the buffer.
|
|
"""
|
|
for record in self:
|
|
if record.item_type != "distributed":
|
|
record.distributed_source_location_id = self.env[
|
|
"stock.location"
|
|
].browse()
|
|
continue
|
|
|
|
source_location = record._source_location_from_route()
|
|
record.distributed_source_location_id = source_location
|
|
|
|
def _calc_planning_priority(self):
|
|
for rec in self:
|
|
if rec.net_flow_position >= rec.top_of_yellow:
|
|
rec.planning_priority_level = "3_green"
|
|
elif rec.net_flow_position >= rec.top_of_red:
|
|
rec.planning_priority_level = "2_yellow"
|
|
else:
|
|
rec.planning_priority_level = "1_red"
|
|
|
|
def _calc_execution_priority(self):
|
|
for rec in self:
|
|
if rec.product_location_qty_available_not_res >= rec.top_of_red:
|
|
rec.execution_priority_level = "3_green"
|
|
elif rec.product_location_qty_available_not_res >= rec.top_of_red * 0.5:
|
|
rec.execution_priority_level = "2_yellow"
|
|
else:
|
|
rec.execution_priority_level = "1_red"
|
|
if rec.top_of_red:
|
|
rec.on_hand_percent = round(
|
|
(
|
|
(rec.product_location_qty_available_not_res / rec.top_of_red)
|
|
* 100
|
|
),
|
|
2,
|
|
)
|
|
else:
|
|
rec.on_hand_percent = 0.0
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
records = super().create(vals_list)
|
|
if not self.env.context.get("skip_adu_calculation", False):
|
|
records._calc_adu()
|
|
records._calc_product_available_qty()
|
|
records._calc_distributed_source_location()
|
|
return records
|
|
|
|
def write(self, vals):
|
|
res = super().write(vals)
|
|
if any(f in vals for f in ("adu_fixed", "adu_calculation_method")):
|
|
self._calc_adu()
|
|
return res
|
|
|
|
def _procure_qty_to_order(self):
|
|
qty_to_order = self.procure_recommended_qty
|
|
rounding = self.procure_uom_id.rounding or self.product_uom.rounding
|
|
qty_in_progress = self._quantity_in_progress()[self._origin.id]
|
|
if (
|
|
self.item_type == "distributed"
|
|
and self.buffer_profile_id.replenish_distributed_limit_to_free_qty
|
|
):
|
|
# If we don't procure more than what we have in stock, we prevent
|
|
# backorders on the replenishment
|
|
if (
|
|
float_compare(
|
|
self.distributed_source_location_qty,
|
|
self.procure_min_qty,
|
|
precision_rounding=rounding,
|
|
)
|
|
< 0
|
|
):
|
|
# the free qty is below the minimum we want to move, do not
|
|
# move anything
|
|
return 0
|
|
else:
|
|
# move only what we have in stock
|
|
return min(qty_to_order, self.distributed_source_location_qty)
|
|
elif (
|
|
float_compare(qty_in_progress, 0, precision_rounding=rounding) > 0
|
|
and float_compare(
|
|
qty_to_order, self.green_zone_qty, precision_rounding=rounding
|
|
)
|
|
< 0
|
|
):
|
|
# When there is qty in progress (e.g. RfQ sent), do not keep
|
|
# auto-procuring small quantities, wait for the qty to be at least GZ.
|
|
return 0
|
|
return qty_to_order
|
|
|
|
def do_auto_procure(self):
|
|
if not self.auto_procure:
|
|
return False
|
|
rounding = self.product_uom.rounding
|
|
qty_to_order = self._procure_qty_to_order()
|
|
if float_compare(qty_to_order, 0.0, precision_rounding=rounding) > 0 and (
|
|
(
|
|
self.auto_procure_option == "stockout"
|
|
and float_compare(
|
|
self.net_flow_position, 0.0, precision_rounding=rounding
|
|
)
|
|
< 0
|
|
)
|
|
or self.auto_procure_option == "standard"
|
|
):
|
|
wizard = (
|
|
self.env["make.procurement.buffer"]
|
|
.with_context(
|
|
active_model="stock.buffer", active_ids=self.ids, active_id=self.id
|
|
)
|
|
.create({})
|
|
)
|
|
wizard.make_procurement()
|
|
return True
|
|
|
|
def action_view_supply_moves(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
|
|
result["context"] = {}
|
|
moves = self._search_stock_moves_incoming() + self._search_stock_moves_incoming(
|
|
outside_dlt=True
|
|
)
|
|
result["domain"] = [("id", "in", moves.ids)]
|
|
return result
|
|
|
|
def _get_rfq_dlt(self, dlt_interval=None):
|
|
self.ensure_one()
|
|
cut_date = self._get_incoming_supply_date_limit()
|
|
if dlt_interval == "inside":
|
|
pols = self.purchase_line_ids.filtered(
|
|
lambda l: l.date_planned <= fields.Datetime.to_datetime(cut_date)
|
|
and l.state in ("draft", "sent", "to approve")
|
|
)
|
|
elif dlt_interval == "outside":
|
|
pols = self.purchase_line_ids.filtered(
|
|
lambda l: l.date_planned > fields.Datetime.to_datetime(cut_date)
|
|
and l.state in ("draft", "sent", "to approve")
|
|
)
|
|
else:
|
|
pols = self.purchase_line_ids.filtered(
|
|
lambda l: l.state in ("draft", "sent", "to approve")
|
|
)
|
|
return pols
|
|
|
|
def action_view_supply_moves_inside_dlt_window(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
|
|
moves = self._search_stock_moves_incoming()
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", moves.ids)]
|
|
return result
|
|
|
|
def action_view_supply_moves_outside_dlt_window(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
|
|
moves = self._search_stock_moves_incoming(outside_dlt=True)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", moves.ids)]
|
|
return result
|
|
|
|
def action_view_supply_rfq_inside_dlt_window(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
|
|
pols = self._get_rfq_dlt(dlt_interval="inside")
|
|
pos = pols.mapped("order_id")
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", pos.ids)]
|
|
return result
|
|
|
|
def action_view_supply_rfq_outside_dlt_window(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("purchase.purchase_rfq")
|
|
pols = self._get_rfq_dlt(dlt_interval="outside")
|
|
pos = pols.mapped("order_id")
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", pos.ids)]
|
|
return result
|
|
|
|
def action_view_qualified_demand_moves(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_action")
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", self.qualified_demand_stock_move_ids.ids)]
|
|
return result
|
|
|
|
def action_view_qualified_demand_mrp(self):
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"mrp_multi_level.mrp_move_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", self.qualified_demand_mrp_move_ids.ids)]
|
|
return result
|
|
|
|
def action_view_past_adu_direct_demand(self):
|
|
horizon = self._get_horizon_adu_past_demand()
|
|
date_from, date_to = self._get_dates_adu_past_demand(horizon)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
if self.adu_calculation_method.source_past == "actual":
|
|
domain = self._past_moves_domain(date_from, date_to, locations)
|
|
moves = self.env["stock.move"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"stock.stock_move_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", moves.ids)]
|
|
else:
|
|
domain = self._demand_estimate_domain(locations, date_from, date_to)
|
|
estimates = self.env["stock.demand.estimate"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"stock_demand_estimate.stock_demand_estimate_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", estimates.ids)]
|
|
return result
|
|
|
|
def action_view_past_adu_indirect_demand(self):
|
|
horizon = self._get_horizon_adu_past_demand()
|
|
date_from, date_to = self._get_dates_adu_past_demand(horizon)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
domain = self._past_mrp_move_domain(date_from, date_to, locations)
|
|
mrp_moves = self.env["mrp.move"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"mrp_multi_level.mrp_move_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", mrp_moves.ids)]
|
|
return result
|
|
|
|
def action_view_future_adu_direct_demand(self):
|
|
horizon = self._get_horizon_adu_future_demand()
|
|
date_from, date_to = self._get_dates_adu_future_demand(horizon)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
if self.adu_calculation_method.source_future == "actual":
|
|
domain = self._future_moves_domain(date_from, date_to, locations)
|
|
moves = self.env["stock.move"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"stock.stock_move_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", moves.ids)]
|
|
else:
|
|
domain = self._demand_estimate_domain(locations, date_from, date_to)
|
|
estimates = self.env["stock.demand.estimate"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"stock_demand_estimate.stock_demand_estimate_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", estimates.ids)]
|
|
return result
|
|
|
|
def action_view_future_adu_indirect_demand(self):
|
|
horizon = self._get_horizon_adu_future_demand()
|
|
date_from, date_to = self._get_dates_adu_future_demand(horizon)
|
|
locations = self.env["stock.location"].search(
|
|
[("id", "child_of", [self.location_id.id])]
|
|
)
|
|
domain = self._future_mrp_move_domain(date_from, date_to, locations)
|
|
mrp_moves = self.env["mrp.move"].search(domain)
|
|
result = self.env["ir.actions.actions"]._for_xml_id(
|
|
"mrp_multi_level.mrp_move_action"
|
|
)
|
|
result["context"] = {}
|
|
result["domain"] = [("id", "in", mrp_moves.ids)]
|
|
return result
|
|
|
|
def action_request_procurement(self):
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"ddmrp.act_make_procurement_from_buffer"
|
|
)
|
|
return action
|
|
|
|
@api.model
|
|
def cron_ddmrp_adu(self, automatic=False):
|
|
"""calculate ADU for each DDMRP buffer. Called by cronjob."""
|
|
auto_commit = not getattr(threading.current_thread(), "testing", False)
|
|
_logger.info("Start cron_ddmrp_adu.")
|
|
buffer_ids = self.search([]).ids
|
|
i = 0
|
|
j = len(buffer_ids)
|
|
for buffer_chunk_ids in split_every(self.CRON_DDMRP_CHUNKS, buffer_ids):
|
|
for b in self.browse(buffer_chunk_ids).exists():
|
|
try:
|
|
i += 1
|
|
_logger.debug("ddmrp cron_adu: {}. ({}/{})".format(b.name, i, j))
|
|
if automatic:
|
|
with self.env.cr.savepoint():
|
|
b._calc_adu()
|
|
else:
|
|
b._calc_adu()
|
|
except Exception:
|
|
_logger.exception("Fail to compute ADU for buffer %s", b.name)
|
|
if not automatic:
|
|
raise
|
|
if auto_commit:
|
|
self._cr.commit() # pylint: disable=E8102
|
|
_logger.info("End cron_ddmrp_adu.")
|
|
return True
|
|
|
|
def refresh_buffer(self):
|
|
self.ensure_one()
|
|
self.cron_actions(only_nfp=False)
|
|
return True
|
|
|
|
def cron_actions(self, only_nfp=False):
|
|
"""This method is meant to be inherited by other modules in order to
|
|
enhance extensibility."""
|
|
self.ensure_one()
|
|
self.invalidate_recordset(
|
|
fnames=[
|
|
"product_location_qty_available_not_res",
|
|
"dlt",
|
|
"distributed_source_location_qty",
|
|
"qualified_demand",
|
|
],
|
|
)
|
|
self._calc_product_available_qty()
|
|
if not only_nfp or only_nfp == "out":
|
|
self._calc_qualified_demand()
|
|
if not only_nfp or only_nfp == "in":
|
|
self._calc_incoming_dlt_qty()
|
|
self._calc_net_flow_position()
|
|
self._calc_distributed_source_location()
|
|
self._calc_planning_priority()
|
|
self._calc_execution_priority()
|
|
self.mrp_production_ids._calc_execution_priority()
|
|
self.mapped("purchase_line_ids")._calc_execution_priority()
|
|
if not only_nfp:
|
|
# re-compoute red to force in cascade the recalculation of zones.
|
|
self._compute_red_zone()
|
|
self.do_auto_procure()
|
|
return True
|
|
|
|
@api.model
|
|
def cron_ddmrp(self, automatic=False):
|
|
"""Calculate key DDMRP parameters for each buffer.
|
|
Called by cronjob."""
|
|
auto_commit = not getattr(threading.current_thread(), "testing", False)
|
|
_logger.info("Start cron_ddmrp.")
|
|
buffer_ids = self.search([]).ids
|
|
i = 0
|
|
j = len(buffer_ids)
|
|
for buffer_chunk_ids in split_every(self.CRON_DDMRP_CHUNKS, buffer_ids):
|
|
for b in self.browse(buffer_chunk_ids).exists():
|
|
i += 1
|
|
_logger.debug("ddmrp cron: {}. ({}/{})".format(b.name, i, j))
|
|
try:
|
|
if automatic:
|
|
with self.env.cr.savepoint():
|
|
b.cron_actions()
|
|
else:
|
|
b.cron_actions()
|
|
except Exception:
|
|
_logger.exception("Fail updating buffer %s", b.name)
|
|
if not automatic:
|
|
raise
|
|
if auto_commit:
|
|
self._cr.commit() # pylint: disable=E8102
|
|
_logger.info("End cron_ddmrp.")
|
|
return True
|
|
|
|
def _values_source_location_from_route(self):
|
|
return {"warehouse_id": self.warehouse_id}
|
|
|
|
def _source_location_from_route(self, procure_location=None):
|
|
"""Return the replenishment source location for distributed buffers"""
|
|
current_location = procure_location or self.location_id
|
|
rule_values = self._values_source_location_from_route()
|
|
while current_location:
|
|
rule = self.env["procurement.group"]._get_rule(
|
|
self.product_id, current_location, rule_values
|
|
)
|
|
if rule.procure_method == "make_to_stock":
|
|
return rule.location_src_id
|
|
elif rule.procure_method == "make_to_order":
|
|
# If resupply from another warehouse, this rule can't be retrieved by
|
|
# method _get_rule, because that we try to get this rule bases on previous rule
|
|
pull_rule = self.env["stock.rule"].search(
|
|
[
|
|
("action", "in", ("pull", "pull_push")),
|
|
("route_id", "=", rule.route_id.id),
|
|
("location_dest_id", "=", rule.location_src_id.id),
|
|
]
|
|
)
|
|
if pull_rule:
|
|
if pull_rule.procure_method in ("make_to_stock", "mts_else_mto"):
|
|
return pull_rule.location_src_id
|
|
elif pull_rule.procure_method == "make_to_order":
|
|
current_location = pull_rule.location_src_id
|
|
rule_values.update(
|
|
{
|
|
"warehouse_id": pull_rule.location_src_id.warehouse_id,
|
|
}
|
|
)
|
|
continue
|
|
current_location = rule.location_src_id
|
|
|
|
def action_dummy(self):
|
|
# no action, used to show an image in the tree view
|
|
return
|