oca-warehouse/odoo-bringout-oca-stock-logistics-barcode-stock_barcodes/stock_barcodes/wizard/stock_barcodes_read_picking.py
2025-08-29 15:43:06 +02:00

1021 lines
42 KiB
Python

# Copyright 2019 Sergio Teruel <sergio.teruel@tecnativa.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from collections import OrderedDict, defaultdict
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
from odoo.fields import first
from odoo.tools.float_utils import float_compare, float_round
from odoo.tools.safe_eval import safe_eval
_logger = logging.getLogger(__name__)
class WizStockBarcodesReadPicking(models.TransientModel):
_name = "wiz.stock.barcodes.read.picking"
_inherit = "wiz.stock.barcodes.read"
_description = "Wizard to read barcode on picking"
@property
@api.depends("picking_mode")
def _field_candidate_ids(self):
return "candidate_%s_ids" % self.picking_mode
picking_id = fields.Many2one(
comodel_name="stock.picking", string="Picking", readonly=True
)
picking_state = fields.Selection(related="picking_id.state")
picking_ids = fields.Many2many(
comodel_name="stock.picking", string="Pickings", readonly=True
)
candidate_picking_id = fields.Many2one(
comodel_name="stock.picking", related="candidate_picking_ids.picking_id"
)
candidate_picking_ids = fields.One2many(
comodel_name="wiz.candidate.picking",
inverse_name="wiz_barcode_id",
string="Candidate pickings",
readonly=True,
)
picking_product_qty = fields.Float(
string="Picking quantities", digits="Product Unit of Measure", readonly=True
)
picking_type_code = fields.Selection(
[("incoming", "Vendors"), ("outgoing", "Customers"), ("internal", "Internal")],
"Type of Operation",
)
move_line_ids = fields.One2many(
comodel_name="stock.move.line", compute="_compute_move_line_ids"
)
todo_line_ids = fields.One2many(
string="To Do Lines",
comodel_name="wiz.stock.barcodes.read.todo",
inverse_name="wiz_barcode_id",
)
todo_line_display_ids = fields.Many2many(
comodel_name="wiz.stock.barcodes.read.todo",
compute="_compute_todo_line_display_ids",
)
todo_line_id = fields.Many2one(comodel_name="wiz.stock.barcodes.read.todo")
picking_mode = fields.Selection([("picking", "Picking mode")])
pending_move_ids = fields.Many2many(
comodel_name="wiz.stock.barcodes.read.todo",
compute="_compute_pending_move_ids",
)
selected_pending_move_id = fields.Many2one(
comodel_name="wiz.stock.barcodes.read.todo"
)
show_detailed_operations = fields.Boolean(
related="option_group_id.show_detailed_operations", default=True, store=True
)
keep_screen_values = fields.Boolean(related="option_group_id.keep_screen_values")
# Extended from stock_barcodes_read base model
total_product_uom_qty = fields.Float(compute="_compute_total_product")
total_product_qty_done = fields.Float(compute="_compute_total_product")
# Technical fields to compute locations domain based on picking location
picking_location_id = fields.Many2one(related="picking_id.location_id")
picking_location_dest_id = fields.Many2one(related="picking_id.location_dest_id")
company_id = fields.Many2one(related="picking_id.company_id")
todo_line_is_extra_line = fields.Boolean(related="todo_line_id.is_extra_line")
forced_todo_key = fields.Char()
qty_available = fields.Float(compute="_compute_qty_available")
partner_id = fields.Many2one("res.partner", related="picking_id.partner_id")
enable_add_product = fields.Boolean(compute="_compute_enable_add_product")
def action_show_detailed_operations(self):
self.show_detailed_operations = not self.show_detailed_operations
@api.depends("picking_state")
def _compute_enable_add_product(self):
for rec in self:
rec.enable_add_product = rec.picking_state != "done"
@api.depends("todo_line_id")
def _compute_todo_line_display_ids(self):
"""Technical field to display only the first record in kanban view"""
self.todo_line_display_ids = self.todo_line_id
@api.depends("todo_line_ids", "picking_id.move_line_ids.qty_done")
def _compute_pending_move_ids(self):
if self.option_group_id.show_pending_moves:
self.pending_move_ids = self.todo_line_ids.filtered(
lambda t: t.state == "pending"
and any(
sm.barcode_backorder_action == "pending" for sm in t.stock_move_ids
)
)
else:
self.pending_move_ids = self.todo_line_ids
@api.depends(
"todo_line_ids", "todo_line_ids.qty_done", "picking_id.move_line_ids.qty_done"
)
def _compute_move_line_ids(self):
self.move_line_ids = self.picking_id.move_line_ids.filtered("qty_done").sorted(
key=lambda sml: (sml.write_date, sml.create_date), reverse=True
)
@api.depends("picking_id.move_line_ids.qty_done")
def _compute_total_product(self):
self.total_product_uom_qty = 0.0
self.total_product_qty_done = 0.0
for rec in self:
product_moves = rec.picking_id.move_ids.filtered(
lambda ln: ln.product_id.ids == self.product_id.ids
and ln.state != "cancel"
)
for line in product_moves:
rec.total_product_uom_qty += line.product_uom_qty
rec.total_product_qty_done += line.quantity_done
@api.depends("location_id", "product_id", "lot_id")
def _compute_qty_available(self):
if not self.product_id or self.location_id.usage != "internal":
self.qty_available = 0.0
return
domain_quant = [
("product_id", "=", self.product_id.id),
("location_id", "=", self.location_id.id),
]
if self.lot_id:
domain_quant.append(("lot_id", "=", self.lot_id.id))
# if self.package_id:
# domain_quant.append(('package_id', '=', self.package_id.id))
groups = self.env["stock.quant"].read_group(
domain_quant, ["quantity"], [], orderby="id"
)
self.qty_available = groups[0]["quantity"]
# Unexpected done quantities must reduce qty_available
if self.lot_id:
done_move_lines = self.move_line_ids.filtered(
lambda m: m.product_id == self.product_id and m.lot_id == self.lot_id
)
else:
done_move_lines = self.move_line_ids.filtered(
lambda m: m.product_id == self.product_id
)
for sml in done_move_lines:
over_done_qty = float_round(
sml.qty_done - sml.reserved_uom_qty,
precision_rounding=sml.product_uom_id.rounding,
)
if over_done_qty > 0.0:
self.qty_available -= over_done_qty
def name_get(self):
return [
(
rec.id,
"{} - {} - {}".format(
_("Barcode reader"),
rec.picking_id.name or rec.picking_type_code,
self.env.user.name,
),
)
for rec in self
]
def _set_default_picking(self):
picking_id = self.env.context.get("default_picking_id", False)
if picking_id:
self._set_candidate_pickings(self.env["stock.picking"].browse(picking_id))
@api.model_create_multi
def create(self, vals_list):
# When user click any view button the wizard record is create and the
# picking candidates have been lost, so we need set it.
wizards = super().create(vals_list)
for wiz in wizards:
if wiz.picking_id:
wiz._set_candidate_pickings(wiz.picking_id)
return wizards
@api.onchange("picking_id")
def onchange_picking_id(self):
# Add to candidate pickings the default picking. We are in a wizard
# view, so for create a candidate picking with the same default picking
# we need create it in this onchange
self._set_default_picking()
self.fill_pending_moves()
self.determine_todo_action()
def get_sorted_move_lines(self, move_lines):
location_field = self.option_group_id.location_field_to_sort
if not location_field:
if self.picking_id.picking_type_code in ["incoming", "internal"]:
location_field = "location_dest_id"
else:
location_field = "location_id"
if self.option_group_id.source_pending_moves == "move_line_ids":
move_lines = move_lines.sorted(
lambda sml: (
sml[location_field].posx,
sml[location_field].posy,
sml[location_field].posz,
sml[location_field].name,
)
)
else:
# Stock moves
move_lines = move_lines.sorted(
lambda sm: (
(sm.move_line_ids[:1] or sm)[location_field].posx,
(sm.move_line_ids[:1] or sm)[location_field].posy,
(sm.move_line_ids[:1] or sm)[location_field].posz,
(sm.move_line_ids[:1] or sm)[location_field].name,
)
)
return move_lines
def _get_stock_move_lines_todo(self):
move_lines = self.picking_id.move_line_ids.filtered(
lambda ml: (not ml.barcode_scan_state or ml.barcode_scan_state == "pending")
and ml.qty_done < ml.reserved_qty
)
return move_lines
def fill_pending_moves(self):
self.fill_todo_records()
def get_moves_or_move_lines(self):
if self.option_group_id.source_pending_moves == "move_line_ids":
return self.picking_id.move_line_ids.filtered(lambda ln: ln.move_id)
else:
return self.picking_id.move_ids
def get_moves(self):
return self.picking_id.move_ids
def fill_todo_records(self):
move_lines = self.get_sorted_move_lines(self.get_moves_or_move_lines())
self.fill_records([move_lines])
@api.model
def _get_fields_filled_special(self):
return [
"location_id",
"location_dest_id",
"package_id",
"result_package_id",
"product_qty",
]
def determine_todo_action(self, forced_todo_line=False):
self.visible_force_done = self.env.context.get("visible_force_done", False)
if not self.option_group_id.barcode_guided_mode == "guided":
return False
self.todo_line_id = (
forced_todo_line
or self.todo_line_ids.filtered(lambda t: t._origin.state == "pending")[:1]
)
self.todo_line_id._compute_qty_done()
move_line = self.todo_line_id
self.guided_location_id = move_line.location_id
self.guided_location_dest_id = move_line.location_dest_id
self.guided_product_id = move_line.product_id
self.guided_lot_id = move_line.lot_id
if self.option_group_id.get_option_value("location_id", "filled_default"):
self.location_id = move_line.location_id
elif self.picking_type_code != "incoming":
self.location_id = False
if self.option_group_id.get_option_value("location_dest_id", "filled_default"):
self.location_dest_id = move_line.location_dest_id
elif self.picking_type_code != "outgoing":
self.location_dest_id = False
if self.option_group_id.get_option_value("package_id", "filled_default"):
self.package_id = move_line.package_id
if not self.keep_result_package and self.option_group_id.get_option_value(
"result_package_id", "filled_default"
):
self.result_package_id = move_line.result_package_id
if self.option_group_id.get_option_value("product_qty", "filled_default"):
self.product_qty = move_line.product_uom_qty - move_line.qty_done
else:
if not self.visible_force_done:
self.product_qty = 0.0
# Try to fill data of any field defined in options
processed_fields = self._get_fields_filled_special()
for option in self.option_group_id.option_ids:
if option.field_name in processed_fields:
continue
if option.filled_default:
self[option.field_name] = move_line[option.field_name]
else:
if not self.env.context.get("skip_clean_values", False):
self[option.field_name] = False
self.update_fields_after_determine_todo(move_line)
self.action_show_step()
def update_fields_after_determine_todo(self, move_line):
self.picking_product_qty = move_line.qty_done
def action_done(self):
res = super().action_done()
if res:
move_dic = self.with_context(**self.env.context)._process_stock_move_line()
if move_dic:
self[self._field_candidate_ids].scan_count += 1
if self.env.context.get("force_create_move"):
self.move_line_ids.barcode_scan_state = "done_forced"
if not self.keep_screen_values or self.todo_line_id.state != "pending":
if not self.env.context.get("skip_clean_values", False):
self.action_clean_values()
keep_vals = {}
else:
keep_vals = self._convert_to_write(self._cache)
self.fill_todo_records()
if self.forced_todo_key:
self.todo_line_id = self.pending_move_ids.filtered(
lambda ln: str(self._group_key(ln)) == self.forced_todo_key
)[:1]
self.selected_pending_move_id = self.todo_line_id
self.determine_todo_action(self.todo_line_id)
else:
self.determine_todo_action()
self.action_show_step()
if keep_vals:
self.update_keep_values(keep_vals)
# Force refresh candidate pickings to show green if not pending moves
if not self.pending_move_ids:
self._set_candidate_pickings(self.picking_id)
return move_dic
return res
def update_keep_values(self, keep_vals):
options = self.option_group_id.option_ids
fields_to_keep = options.filtered(
lambda op: self._fields[op.field_name].type != "float"
).mapped("field_name")
self.update({f_name: keep_vals[f_name] for f_name in fields_to_keep})
def action_manual_entry(self):
result = super().action_manual_entry()
if result:
self.action_done()
return result
def _prepare_move_line_values(self, candidate_move, available_qty):
"""When we've got an out picking, the logical workflow is that
the scanned location is the location we're getting the stock
from"""
picking = self.env.context.get("picking", self.picking_id)
if not picking:
raise ValidationError(
_("You can not add extra moves if you have not set a picking")
)
# If we move all package units the result package is the same
if (
self.package_id
and not self.result_package_id
and sum(self.package_id.quant_ids.mapped("quantity")) <= self.product_qty
):
self.result_package_id = self.package_id
vals = {
"picking_id": picking.id,
"move_id": candidate_move.id,
"qty_done": available_qty,
"product_uom_id": candidate_move.product_uom.id or self.product_id.uom_id.id
if not self.packaging_id
else self.packaging_id.product_uom_id.id,
"product_id": self.product_id.id,
"location_id": self.location_id.id,
"location_dest_id": self.location_dest_id.id,
"lot_id": self.lot_id.id,
"lot_name": self.lot_id.name,
"barcode_scan_state": "done_forced",
"package_id": self.package_id.id,
"result_package_id": self.result_package_id.id,
}
if self.owner_id:
vals["owner_id"] = self.owner_id.id
return vals
def _states_move_allowed(self):
move_states = ["assigned", "partially_available"]
if self.confirmed_moves:
move_states.append("confirmed")
return move_states
def _prepare_stock_moves_domain(self):
domain = [
("product_id", "=", self.product_id.id),
("picking_id.picking_type_id.code", "=", self.picking_type_code),
("state", "in", self._states_move_allowed()),
]
if self.picking_id:
domain.append(("picking_id", "=", self.picking_id.id))
return domain
def _set_candidate_pickings(self, candidate_pickings):
vals = [(5, 0, 0)]
vals.extend([(0, 0, {"picking_id": p.id}) for p in candidate_pickings])
self.candidate_picking_ids = vals
def _search_candidate_picking(self, moves_todo=False):
if not moves_todo:
moves_todo = self.env["stock.move"].search(
self._prepare_stock_moves_domain()
)
if not self.picking_id:
candidate_pickings = moves_todo.mapped("picking_id")
candidate_pickings_count = len(candidate_pickings)
if candidate_pickings_count > 1:
self._set_candidate_pickings(candidate_pickings)
return False
if candidate_pickings_count == 1:
self.picking_id = candidate_pickings
self._set_candidate_pickings(candidate_pickings)
_logger.info("No picking assigned")
return True
def _check_guided_restrictions(self):
# Check restrictions in guided mode
if self.option_group_id.barcode_guided_mode == "guided":
if (
self.option_group_id.get_option_value("product_id", "forced")
and self.product_id != self.todo_line_id.product_id
):
self._set_messagge_info("more_match", _("Wrong product"))
return False
return True
def _get_candidate_stock_move_lines(self, moves_todo, sml_vals):
candidate_lines = moves_todo.mapped("move_line_ids").filtered(
lambda l: (
# l.picking_id == self.picking_id and
l.location_id == self.location_id
and l.location_dest_id == self.location_dest_id
and l.product_id == self.product_id
)
)
# Try to reuse existing stock move lines updating locations
if not candidate_lines:
location_option = self.option_group_id.option_ids.filtered(
lambda op: op.field_name == "location_id"
)
if not location_option.forced:
candidate_lines = moves_todo.mapped("move_line_ids").filtered(
lambda l: (
l.location_dest_id == self.location_dest_id
and l.product_id == self.product_id
and l.location_id == self.picking_location_id
)
)
if candidate_lines and self.location_id:
sml_vals.update({"location_id": self.location_id.id})
if not candidate_lines:
location_dest_option = self.option_group_id.option_ids.filtered(
lambda op: op.field_name == "location_dest_id"
)
if not location_dest_option.forced:
candidate_lines = moves_todo.mapped("move_line_ids").filtered(
lambda l: (
l.location_id == self.location_id
and l.product_id == self.product_id
and l.location_dest_id == self.picking_location_dest_id
)
)
if candidate_lines and self.location_dest_id:
sml_vals.update({"location_dest_id": self.location_dest_id.id})
return candidate_lines
def _get_candidate_line_domain(self):
"""To be extended for other modules"""
domain = []
if self.env.user.has_group("stock.group_tracking_lot"):
# Check if sml is created with complete content so we fill result package to
# set the complete package
if (
len(self.package_id.quant_ids) == 1
and float_compare(
self.package_id.quant_ids.quantity,
self.product_qty,
precision_rounding=self.product_id.uom_id.rounding,
)
== 0
):
self.result_package_id = self.package_id
domain.extend(
[
("package_id", "=", self.package_id.id),
("result_package_id", "=", self.result_package_id.id),
]
)
return domain
def _process_stock_move_line(self): # noqa: C901
"""
Search assigned or confirmed stock moves from a picking operation type
or a picking. If there is more than one picking with demand from
scanned product the interface allow to select what picking to work.
If only there is one picking the scan data is assigned to it.
"""
StockMove = self.env["stock.move"]
domain = self._prepare_stock_moves_domain()
if self.option_group_id.barcode_guided_mode == "guided":
moves_todo = self.todo_line_id.stock_move_ids
elif self.picking_id:
moves_todo = self.picking_id.move_ids.filtered(
lambda sm: sm.product_id == self.product_id
)
else:
moves_todo = StockMove.search(domain)
try:
getattr(
self,
"_search_candidate_%s" % self.picking_mode,
)(moves_todo)
except AttributeError:
return False
sml_vals = {}
candidate_lines = self._get_candidate_stock_move_lines(moves_todo, sml_vals)
lines = candidate_lines.filtered(
lambda l: (l.lot_id == self.lot_id and l.barcode_scan_state == "pending")
)
# Check if exists lines with lot created if product has tracking serial
if self.product_id.tracking == "serial":
serial_lines = self.picking_id.move_line_ids.filtered(
lambda sml: (
sml.lot_id == self.lot_id or sml.lot_name == self.lot_id.name
)
and sml.qty_done >= 1.0
)
if serial_lines:
self._set_messagge_info("more_match", _("S/N Already in picking"))
return False
# For incoming pickings the lot is not filled so we try fill it with
# the lot scanned
if (
not lines
and self.picking_type_code == "incoming"
and self.product_id.tracking != "none"
):
if (
self.option_group_id.create_lot
and self.product_id.tracking == "serial"
and candidate_lines.filtered(lambda ln: ln.lot_name == self.lot_id.name)
):
self.lot_id = False
self._set_messagge_info("more_match", _("S/N already created"))
return False
lines = candidate_lines.filtered(
lambda l: (not l.lot_id and l.barcode_scan_state == "pending")
)
if lines:
sml_vals.update(
{"lot_id": self.lot_id.id, "lot_name": self.lot_id.name}
)
candidate_domain = self._get_candidate_line_domain()
if candidate_domain:
lines = lines.filtered_domain(candidate_domain)
# Take into account all smls to get a line to update
if not lines:
lines = candidate_lines.filtered(lambda ln: (ln.lot_id == self.lot_id))
if candidate_domain:
lines = lines.filtered_domain(candidate_domain)
available_qty = self.product_qty
max_quantity = sum(sm.product_uom_qty - sm.quantity_done for sm in moves_todo)
if (
not self.option_group_id.code == "REL"
and not self.env.context.get("force_create_move", False)
and not self.env.context.get("manual_picking", False)
and float_compare(
available_qty,
max_quantity,
precision_rounding=self.product_id.uom_id.rounding,
)
> 0
):
self._set_messagge_info(
"more_match", _("Quantities scanned are higher than necessary")
)
self.visible_force_done = True
self._set_focus_on_qty_input("product_qty")
return False
move_lines_dic = {}
context = self.env.context
for line in lines:
if line.reserved_uom_qty and len(lines) > 1:
assigned_qty = min(
max(line.reserved_uom_qty - line.qty_done, 0.0), available_qty
)
else:
assigned_qty = available_qty
# Not increase qty done if user reads a complete package
if (
self.result_package_id
and self.package_id
and self.result_package_id == self.package_id
):
qty_done = assigned_qty
elif context.get("no_increase_qty_done", False) and assigned_qty > 0:
# Do not increase the quantity, if the quantity is > 0
qty_done = assigned_qty
else:
qty_done = line.qty_done + assigned_qty
sml_vals.update(
{
"qty_done": qty_done,
"result_package_id": self.result_package_id.id,
}
)
# Add or remove result_pselfackage_id
package_qty_available = sum(
self.package_id.quant_ids.filtered(
lambda q: q.lot_id == self.lot_id
).mapped("quantity")
)
if sml_vals["qty_done"] >= package_qty_available:
if not self.result_package_id:
sml_vals.update({"result_package_id": self.package_id.id})
elif line.result_package_id == line.package_id:
sml_vals.update({"result_package_id": False})
self._update_stock_move_line(line, sml_vals)
if line.qty_done >= line.reserved_uom_qty:
line.barcode_scan_state = "done"
elif self.env.context.get("done_forced"):
line.barcode_scan_state = "done_forced"
available_qty -= assigned_qty
if assigned_qty:
move_lines_dic[line.id] = assigned_qty
if (
float_compare(
available_qty,
0.0,
precision_rounding=line.product_id.uom_id.rounding,
)
< 1
):
break
if (
float_compare(
available_qty, 0, precision_rounding=self.product_id.uom_id.rounding
)
> 0
):
# Create an extra stock move line if this product has an
# initial demand.
# When the sml is created we need to link to a stock move but user can read
# any other product in guided mode so we must ensure that the sm linked to
# moves todo records have the same product. If not we search any sm linked
# to the picking.
moves_to_link = moves_todo.filtered(
lambda mv: mv.product_id == self.product_id
)
move_to_link_in_todo_line = True
if not moves_to_link:
move_to_link_in_todo_line = False
moves_to_link = self.picking_id.move_ids.filtered(
lambda mv: mv.product_id == self.product_id
)
stock_move_lines = self.create_new_stock_move_line(
moves_to_link, available_qty
)
for sml in stock_move_lines:
if not sml.move_id:
self.create_new_stock_move(sml)
elif sml.move_id and context.get("force_create_move", False):
sml.move_id.product_uom_qty = self.product_qty
move_lines_dic[sml.id] = sml.qty_done
# Ensure that the state of stock_move linked to the sml read is assigned
stock_move_lines.move_id.filtered(
lambda sm: sm.state == "draft"
).state = "assigned"
# When create new stock move lines and we are in guided mode we need
# link this new lines to the todo line details
# If user scan a product distinct of the todo line we need link to other
# alternative move
if self.option_group_id.source_pending_moves != "move_line_ids":
if move_to_link_in_todo_line and self.todo_line_id:
todo_line = self.todo_line_id
else:
todo_line = self.todo_line_ids.filtered(
lambda ln: ln.product_id == self.product_id
)
todo_line.line_ids = [(4, sml.id) for sml in stock_move_lines]
self.update_fields_after_process_stock(moves_todo)
return move_lines_dic
def _update_stock_move_line(self, line, sml_vals):
"""Update stock move line with values. Helper method to be inherited"""
line.write(sml_vals)
def create_new_stock_move_line(self, moves_todo, available_qty):
"""Create a new stock move line when a sml is not available
for the wizard values.
"""
return self.env["stock.move.line"].create(
self._prepare_move_line_values(moves_todo[:1], available_qty)
)
def create_new_stock_move(self, sml):
vals = {
"name": _("New Move:") + sml.product_id.display_name,
"product_uom": sml.product_uom_id.id,
"product_uom_qty": sml.qty_done,
"state": "assigned",
"additional": True,
"product_id": sml.product_id.id,
"location_id": sml.location_id.id,
"location_dest_id": sml.location_dest_id.id,
"picking_id": sml.picking_id.id,
}
new_move = self.env["stock.move"].create(vals)
sml.move_id = new_move
def update_fields_after_process_stock(self, moves):
self.picking_product_qty = sum(moves.mapped("quantity_done"))
def _candidate_picking_selected(self):
if len(self.candidate_picking_ids) == 1:
return self.candidate_picking_ids.picking_id
else:
return self.env["stock.picking"].browse()
def check_done_conditions(self):
res = super().check_done_conditions()
if (
self.picking_type_code != "incoming"
and float_compare(
self.product_qty,
self.qty_available,
precision_rounding=self.product_id.uom_id.rounding,
)
> 0
and not self.env.context.get("force_create_move", False)
and not self.option_group_id.allow_negative_quant
):
self._set_messagge_info(
"more_match", _("Quantities not available in location")
)
if self.option_group_id.allow_negative_quant:
self.visible_force_done = True
# Set focus on product_qty input box
self._set_focus_on_qty_input("product_qty")
return False
if self.picking_mode == "picking_batch":
return res
if not self.picking_id:
if not self._search_candidate_picking():
self._set_messagge_info(
"info", _("Click on picking pushpin to lock it")
)
return False
if self.picking_id and self.picking_id != self._candidate_picking_selected():
self._set_messagge_info("info", _("Click on picking pushpin to lock it"))
return False
return res
def get_lot_by_removal_strategy(self):
quants = first(
self.env["stock.quant"]._gather(self.product_id, self.location_id)
)
# TODO: Perhaps update location_id from quant??
self.lot_id = quants.lot_id
def action_product_scaned_post(self, product):
res = super().action_product_scaned_post(product)
if self.auto_lot and self.picking_type_code != "incoming":
self.get_lot_by_removal_strategy()
return res
def action_assign_serial(self):
move = self.env["stock.move"].search(self._prepare_stock_moves_domain())
if len(move) > 1:
smls = move.move_line_ids.filtered(
lambda ln: ln.barcode_scan_state == "pending"
)
move = smls[:1].move_id
if move:
return move.action_assign_serial()
raise ValidationError(_("No pending lines for this product"))
def action_put_in_pack(self):
for picking in self.mapped("picking_id"):
picking.action_put_in_pack()
def action_clean_values(self):
res = super().action_clean_values()
self.selected_pending_move_id = False
self.visible_force_done = False
# Hide Form Edit
self.manual_entry = False
self.send_bus_done(
"stock_barcodes_scan",
"stock_barcodes_edit_manual",
{
"manual_entry": False,
},
)
return res
def _option_required_hook(self, option_required):
if (
option_required.field_name == "location_dest_id"
and self.option_group_id.use_location_dest_putaway
):
self.location_dest_id = self.picking_id.location_dest_id.with_context(
avoid_location_with_reserve=True
)._get_putaway_strategy(
self.product_id,
self.product_qty,
self.result_package_id,
self.packaging_id,
)
return bool(self.location_dest_id)
return super()._option_required_hook(option_required)
def _group_key(self, line):
group_key_for_todo_records = self.option_group_id.group_key_for_todo_records
if group_key_for_todo_records:
return safe_eval(group_key_for_todo_records, globals_dict={"object": line})
if self.option_group_id.source_pending_moves == "move_line_ids":
return (
line.location_id.id,
line.product_id.id,
line.lot_id.id,
line.package_id.id,
)
else:
return (line.location_id.id, line.product_id.id)
def _get_all_products_quantities_in_package(self, package):
res = {}
# TODO: Check if domain is applied and we must recover _get_contained_quants
for quant in package.quant_ids:
if quant.product_id not in res:
res[quant.product_id] = 0
res[quant.product_id] += quant.quantity
return res
def _prepare_fill_record_values(self, line, position):
vals = {
"wiz_barcode_id": self.id,
"product_id": line.product_id.id,
"name": "To do action",
"position_index": position,
"picking_code": line.picking_code,
}
if line._name == "stock.move.line":
package_product_dic = self._get_all_products_quantities_in_package(
line.package_id
)
vals.update(
{
"location_id": line.location_id.id,
"location_dest_id": line.location_dest_id.id,
"lot_id": line.lot_id.id,
"package_id": line.package_id.id,
"result_package_id": line.result_package_id.id,
"uom_id": line.product_uom_id.id,
"product_uom_qty": line.reserved_uom_qty,
"product_qty_reserved": line.reserved_qty,
"line_ids": [(6, 0, line.ids)],
"stock_move_ids": [(6, 0, line.move_id.ids)],
"package_product_qty": package_product_dic
and package_product_dic[line.product_id]
or 0.0,
"is_stock_move_line_origin": True,
}
)
else:
vals.update(
{
"location_id": (line.move_line_ids[:1] or line).location_id.id,
"location_dest_id": (
line.move_line_ids[:1] or line
).location_dest_id.id,
"uom_id": line.product_uom.id,
"product_uom_qty": line.product_uom_qty,
"product_qty_reserved": line.move_line_ids
# TODO: Use reserved_qty or reserved_uom_qty
and sum(line.move_line_ids.mapped("reserved_qty"))
or line.product_uom_qty,
"line_ids": [(6, 0, line.move_line_ids.ids)],
"stock_move_ids": [(6, 0, line.ids)],
"is_stock_move_line_origin": False,
}
)
return vals
def _update_fill_record_values(self, line, vals):
if vals["is_stock_move_line_origin"]:
vals["product_uom_qty"] += line.reserved_uom_qty
vals["product_qty_reserved"] += line.reserved_qty
vals["line_ids"][0][2].append(line.id)
vals["stock_move_ids"][0][2].append(line.move_id.id)
else:
vals["product_uom_qty"] += line.product_uom_qty
vals["product_qty_reserved"] += (
line.move_line_ids
# TODO: Use reserved_qty or reserved_uom_qty
and sum(line.move_line_ids.mapped("reserved_qty"))
or line.product_uom_qty
)
vals["line_ids"][0][2].extend(line.move_line_ids.ids)
vals["stock_move_ids"][0][2].extend(line.ids)
return vals
@api.model
def fill_records(self, lines_list):
"""
:param lines_list: browse list
:return:
"""
self.forced_todo_key = str(
self._group_key(self.todo_line_id or self.selected_pending_move_id)
)
self.todo_line_ids.unlink()
self.todo_line_id = False
# self.position_index = 0
todo_vals = OrderedDict()
position = 0
move_qty_dic = defaultdict(float)
is_stock_move_line_origin = lines_list[0]._name == "stock.move.line"
for lines in lines_list:
for line in lines:
key = self._group_key(line)
if key not in todo_vals:
todo_vals[key] = self._prepare_fill_record_values(line, position)
position += 1
else:
todo_vals[key] = self._update_fill_record_values(
line, todo_vals[key]
)
if is_stock_move_line_origin:
move_qty_dic[line.move_id] += max(
line.reserved_uom_qty, line.qty_done
)
else:
move_qty_dic[line] += max(line.product_uom_qty, line.quantity_done)
for move in self.get_moves():
qty = move_qty_dic[move]
if (
move.barcode_backorder_action == "pending"
and move.product_uom_qty > qty
):
vals = self._prepare_fill_record_values(move, position)
vals.update(
{
"product_uom_qty": move.product_uom_qty - qty,
"product_qty_reserved": 0.0,
"line_ids": False,
"is_extra_line": True,
}
)
todo_vals[
(
move,
"M",
)
] = vals
position += 1
self.todo_line_ids = self.env["wiz.stock.barcodes.read.todo"].create(
list(todo_vals.values())
)
def action_validate_picking(self):
# for candidate_picking in self.candidate_picking_ids:
valid, result = self.candidate_picking_ids.with_context(
wiz_barcode_id=self.id,
picking_id=self.picking_id.id,
skip_sms=True,
skip_immediate=True,
).action_validate_picking()
if not valid:
return result
action = self.env["ir.actions.actions"]._for_xml_id(
"stock_barcodes.stock_barcodes_action_picking_tree_ready"
)
if self.picking_id and self.picking_id.picking_type_id:
context = self.env.context.copy()
context.update(safe_eval(action["context"]))
context.update(
{"search_default_picking_type_id": self.picking_id.picking_type_id.id}
)
action["context"] = context
return action
def action_open_picking(self):
for candidate_picking in self.candidate_picking_ids:
candidate_picking.with_context(
wiz_barcode_id=self.id, picking_id=self.picking_id.id
).action_open_picking()
def action_unlock_picking(self):
for candidate_picking in self.candidate_picking_ids:
candidate_picking.with_context(
wiz_barcode_id=self.id
).action_unlock_picking()
def action_lock_picking(self):
for candidate_picking in self.candidate_picking_ids:
candidate_picking.with_context(
wiz_barcode_id=self.id, picking_id=self.picking_id.id
).action_lock_picking()