Initial commit: Sale packages

This commit is contained in:
Ernad Husremovic 2025-08-29 15:20:49 +02:00
commit 14e3d26998
6469 changed files with 2479670 additions and 0 deletions

View file

@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from . import account_move
from . import product_template
from . import res_company
from . import res_config_settings
from . import res_users
from . import sale_order
from . import sale_order_line
from . import stock

View file

@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import defaultdict
from odoo import fields, models
from odoo.tools import float_is_zero, float_compare
from odoo.tools.misc import formatLang
class AccountMove(models.Model):
_inherit = 'account.move'
def _stock_account_get_last_step_stock_moves(self):
""" Overridden from stock_account.
Returns the stock moves associated to this invoice."""
rslt = super(AccountMove, self)._stock_account_get_last_step_stock_moves()
for invoice in self:
if invoice.move_type not in ['out_invoice', 'out_refund']:
continue
if (invoice.move_type == 'out_invoice' or (
invoice.move_type == 'out_refund' and any(invoice.invoice_line_ids.sale_line_ids.mapped('is_downpayment')))
):
rslt += invoice.mapped('invoice_line_ids.sale_line_ids.move_ids').filtered(lambda x: x.state == 'done' and x.location_dest_id.usage == 'customer')
else:
rslt += invoice.mapped('reversed_entry_id.invoice_line_ids.sale_line_ids.move_ids').filtered(lambda x: x.state == 'done' and x.location_id.usage == 'customer')
# Add refunds generated from the SO
rslt += invoice.mapped('invoice_line_ids.sale_line_ids.move_ids').filtered(lambda x: x.state == 'done' and x.location_id.usage == 'customer')
return rslt
def _get_invoiced_lot_values(self):
""" Get and prepare data to show a table of invoiced lot on the invoice's report. """
self.ensure_one()
res = super(AccountMove, self)._get_invoiced_lot_values()
if self.state == 'draft' or not self.invoice_date or self.move_type not in ('out_invoice', 'out_refund'):
return res
current_invoice_amls = self.invoice_line_ids.filtered(lambda aml: aml.display_type == 'product' and aml.product_id and aml.product_id.type in ('consu', 'product') and aml.quantity)
all_invoices_amls = current_invoice_amls.sale_line_ids.invoice_lines.filtered(lambda aml: aml.move_id.state == 'posted').sorted(lambda aml: (aml.date, aml.move_name, aml.id))
index = all_invoices_amls.ids.index(current_invoice_amls[:1].id) if current_invoice_amls[:1] in all_invoices_amls else 0
previous_amls = all_invoices_amls[:index]
invoiced_qties = current_invoice_amls._get_invoiced_qty_per_product()
invoiced_products = invoiced_qties.keys()
if self.move_type == 'out_invoice':
# filter out the invoices that have been fully refund and re-invoice otherwise, the quantities would be
# consumed by the reversed invoice and won't be print on the new draft invoice
previous_amls = previous_amls.filtered(lambda aml: aml.move_id.payment_state != 'reversed')
previous_qties_invoiced = previous_amls._get_invoiced_qty_per_product()
if self.move_type == 'out_refund':
# we swap the sign because it's a refund, and it would print negative number otherwise
for p in previous_qties_invoiced:
previous_qties_invoiced[p] = -previous_qties_invoiced[p]
for p in invoiced_qties:
invoiced_qties[p] = -invoiced_qties[p]
qties_per_lot = defaultdict(float)
previous_qties_delivered = defaultdict(float)
stock_move_lines = current_invoice_amls.sale_line_ids.move_ids.move_line_ids.filtered(lambda sml: sml.state == 'done' and sml.lot_id).sorted(lambda sml: (sml.date, sml.id))
for sml in stock_move_lines:
if sml.product_id not in invoiced_products or 'customer' not in {sml.location_id.usage, sml.location_dest_id.usage}:
continue
product = sml.product_id
product_uom = product.uom_id
qty_done = sml.product_uom_id._compute_quantity(sml.qty_done, product_uom)
# is it a stock return considering the document type (should it be it thought of as positively or negatively?)
is_stock_return = (
self.move_type == 'out_invoice' and (sml.location_id.usage, sml.location_dest_id.usage) == ('customer', 'internal')
or
self.move_type == 'out_refund' and (sml.location_id.usage, sml.location_dest_id.usage) == ('internal', 'customer')
)
if is_stock_return:
returned_qty = min(qties_per_lot[sml.lot_id], qty_done)
qties_per_lot[sml.lot_id] -= returned_qty
qty_done = returned_qty - qty_done
previous_qty_invoiced = previous_qties_invoiced[product]
previous_qty_delivered = previous_qties_delivered[product]
# If we return more than currently delivered (i.e., qty_done < 0), we remove the surplus
# from the previously delivered (and qty_done becomes zero). If it's a delivery, we first
# try to reach the previous_qty_invoiced
if float_compare(qty_done, 0, precision_rounding=product_uom.rounding) < 0 or \
float_compare(previous_qty_delivered, previous_qty_invoiced, precision_rounding=product_uom.rounding) < 0:
previously_done = qty_done if is_stock_return else min(previous_qty_invoiced - previous_qty_delivered, qty_done)
previous_qties_delivered[product] += previously_done
qty_done -= previously_done
qties_per_lot[sml.lot_id] += qty_done
for lot, qty in qties_per_lot.items():
# access the lot as a superuser in order to avoid an error
# when a user prints an invoice without having the stock access
lot = lot.sudo()
if float_is_zero(invoiced_qties[lot.product_id], precision_rounding=lot.product_uom_id.rounding) \
or float_compare(qty, 0, precision_rounding=lot.product_uom_id.rounding) <= 0:
continue
invoiced_lot_qty = min(qty, invoiced_qties[lot.product_id])
invoiced_qties[lot.product_id] -= invoiced_lot_qty
res.append({
'product_name': lot.product_id.display_name,
'quantity': formatLang(self.env, invoiced_lot_qty, dp='Product Unit of Measure'),
'uom_name': lot.product_uom_id.name,
'lot_name': lot.name,
# The lot id is needed by localizations to inherit the method and add custom fields on the invoice's report.
'lot_id': lot.id,
})
return res
def _get_anglo_saxon_price_ctx(self):
ctx = super()._get_anglo_saxon_price_ctx()
move_is_downpayment = self.invoice_line_ids.filtered(
lambda line: any(line.sale_line_ids.mapped("is_downpayment"))
)
return dict(ctx, move_is_downpayment=move_is_downpayment)
class AccountMoveLine(models.Model):
_inherit = "account.move.line"
def _sale_can_be_reinvoice(self):
self.ensure_one()
return self.move_type != 'entry' and self.display_type != 'cogs' and super(AccountMoveLine, self)._sale_can_be_reinvoice()
def _stock_account_get_anglo_saxon_price_unit(self):
self.ensure_one()
price_unit = super(AccountMoveLine, self)._stock_account_get_anglo_saxon_price_unit()
so_line = self.sale_line_ids and self.sale_line_ids[-1] or False
if so_line:
price_unit = self._deduce_anglo_saxon_unit_price(so_line.invoice_lines.move_id, so_line.move_ids)
return price_unit

View file

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import api, models
class ProductTemplate(models.Model):
_inherit = 'product.template'
@api.depends('type')
def _compute_expense_policy(self):
super()._compute_expense_policy()
self.filtered(lambda t: t.type == 'product').expense_policy = 'no'
@api.depends('type')
def _compute_service_type(self):
super()._compute_service_type()
self.filtered(lambda t: t.type == 'product').service_type = 'manual'

View file

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import fields, models
class company(models.Model):
_inherit = 'res.company'
security_lead = fields.Float(
'Sales Safety Days', default=0.0, required=True,
help="Margin of error for dates promised to customers. "
"Products will be scheduled for procurement and delivery "
"that many days earlier than the actual promised date, to "
"cope with unexpected delays in the supply chain.")

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import api, fields, models
class ResConfigSettings(models.TransientModel):
_inherit = 'res.config.settings'
security_lead = fields.Float(related='company_id.security_lead', string="Security Lead Time", readonly=False)
group_display_incoterm = fields.Boolean("Incoterms", implied_group='sale_stock.group_display_incoterm')
use_security_lead = fields.Boolean(
string="Security Lead Time for Sales",
config_parameter='sale_stock.use_security_lead',
help="Margin of error for dates promised to customers. Products will be scheduled for delivery that many days earlier than the actual promised date, to cope with unexpected delays in the supply chain.")
default_picking_policy = fields.Selection([
('direct', 'Ship products as soon as available, with back orders'),
('one', 'Ship all products at once')
], "Picking Policy", default='direct', default_model="sale.order", required=True)
@api.onchange('use_security_lead')
def _onchange_use_security_lead(self):
if not self.use_security_lead:
self.security_lead = 0.0

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields
class Users(models.Model):
_inherit = ['res.users']
property_warehouse_id = fields.Many2one('stock.warehouse', string='Default Warehouse', company_dependent=True, check_company=True)
def _get_default_warehouse_id(self):
if self.property_warehouse_id:
return self.property_warehouse_id
# !!! Any change to the following search domain should probably
# be also applied in sale_stock/models/sale_order.py/_init_column.
return self.env['stock.warehouse'].search([('company_id', '=', self.env.company.id)], limit=1)
@property
def SELF_READABLE_FIELDS(self):
return super().SELF_READABLE_FIELDS + ['property_warehouse_id']
@property
def SELF_WRITEABLE_FIELDS(self):
return super().SELF_WRITEABLE_FIELDS + ['property_warehouse_id']

View file

@ -0,0 +1,260 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
import logging
from odoo import api, fields, models, _
from odoo.tools import float_compare
_logger = logging.getLogger(__name__)
class SaleOrder(models.Model):
_inherit = "sale.order"
incoterm = fields.Many2one(
'account.incoterms', 'Incoterm',
help="International Commercial Terms are a series of predefined commercial terms used in international transactions.")
incoterm_location = fields.Char(string='Incoterm Location')
picking_policy = fields.Selection([
('direct', 'As soon as possible'),
('one', 'When all products are ready')],
string='Shipping Policy', required=True, readonly=True, default='direct',
states={'draft': [('readonly', False)], 'sent': [('readonly', False)]},
help="If you deliver all products at once, the delivery order will be scheduled based on the greatest "
"product lead time. Otherwise, it will be based on the shortest.")
warehouse_id = fields.Many2one(
'stock.warehouse', string='Warehouse', required=True,
compute='_compute_warehouse_id', store=True, readonly=False, precompute=True,
states={'sale': [('readonly', True)], 'done': [('readonly', True)], 'cancel': [('readonly', False)]},
check_company=True)
picking_ids = fields.One2many('stock.picking', 'sale_id', string='Transfers')
delivery_count = fields.Integer(string='Delivery Orders', compute='_compute_picking_ids')
delivery_status = fields.Selection([
('pending', 'Not Delivered'),
('started', 'Started'),
('partial', 'Partially Delivered'),
('full', 'Fully Delivered'),
], string='Delivery Status', compute='_compute_delivery_status', store=True)
procurement_group_id = fields.Many2one('procurement.group', 'Procurement Group', copy=False)
effective_date = fields.Datetime("Effective Date", compute='_compute_effective_date', store=True, help="Completion date of the first delivery order.")
expected_date = fields.Datetime( help="Delivery date you can promise to the customer, computed from the minimum lead time of "
"the order lines in case of Service products. In case of shipping, the shipping policy of "
"the order will be taken into account to either use the minimum or maximum lead time of "
"the order lines.")
json_popover = fields.Char('JSON data for the popover widget', compute='_compute_json_popover')
show_json_popover = fields.Boolean('Has late picking', compute='_compute_json_popover')
def _init_column(self, column_name):
""" Ensure the default warehouse_id is correctly assigned
At column initialization, the ir.model.fields for res.users.property_warehouse_id isn't created,
which means trying to read the property field to get the default value will crash.
We therefore enforce the default here, without going through
the default function on the warehouse_id field.
"""
if column_name != "warehouse_id":
return super(SaleOrder, self)._init_column(column_name)
default_warehouse = self.env["stock.warehouse"].search([], limit=1)
query = """
UPDATE sale_order so
SET warehouse_id = COALESCE(wh.id, %s)
FROM stock_warehouse wh
WHERE so.company_id = wh.company_id and so.warehouse_id IS NULL AND wh.active
"""
params = [default_warehouse.id]
_logger.debug("Initializing column '%s' in table '%s'", column_name, self._table)
self._cr.execute(query, params)
@api.depends('picking_ids.date_done')
def _compute_effective_date(self):
for order in self:
pickings = order.picking_ids.filtered(lambda x: x.state == 'done' and x.location_dest_id.usage == 'customer')
dates_list = [date for date in pickings.mapped('date_done') if date]
order.effective_date = min(dates_list, default=False)
@api.depends('picking_ids', 'picking_ids.state')
def _compute_delivery_status(self):
for order in self:
if not order.picking_ids or all(p.state == 'cancel' for p in order.picking_ids):
order.delivery_status = False
elif all(p.state in ['done', 'cancel'] for p in order.picking_ids):
order.delivery_status = 'full'
elif any(p.state == 'done' for p in order.picking_ids) and any(
l.qty_delivered for l in order.order_line):
order.delivery_status = 'partial'
elif any(p.state == 'done' for p in order.picking_ids):
order.delivery_status = 'started'
else:
order.delivery_status = 'pending'
@api.depends('picking_policy')
def _compute_expected_date(self):
super(SaleOrder, self)._compute_expected_date()
def _select_expected_date(self, expected_dates):
if self.picking_policy == "direct":
return super()._select_expected_date(expected_dates)
return max(expected_dates)
def write(self, values):
if values.get('order_line') and self.state == 'sale':
for order in self:
pre_order_line_qty = {order_line: order_line.product_uom_qty for order_line in order.mapped('order_line') if not order_line.is_expense}
if values.get('partner_shipping_id'):
new_partner = self.env['res.partner'].browse(values.get('partner_shipping_id'))
for record in self:
picking = record.mapped('picking_ids').filtered(lambda x: x.state not in ('done', 'cancel'))
addresses = (record.partner_shipping_id.display_name, new_partner.display_name)
message = _("""The delivery address has been changed on the Sales Order<br/>
From <strong>"%s"</strong> To <strong>"%s"</strong>,
You should probably update the partner on this document.""") % addresses
picking.activity_schedule('mail.mail_activity_data_warning', note=message, user_id=self.env.user.id)
if 'commitment_date' in values:
# protagate commitment_date as the deadline of the related stock move.
# TODO: Log a note on each down document
deadline_datetime = values.get('commitment_date')
for order in self:
order.order_line.move_ids.date_deadline = deadline_datetime or order.expected_date
res = super(SaleOrder, self).write(values)
if values.get('order_line') and self.state == 'sale':
rounding = self.env['decimal.precision'].precision_get('Product Unit of Measure')
for order in self:
to_log = {}
for order_line in order.order_line:
if order_line.display_type:
continue
if float_compare(order_line.product_uom_qty, pre_order_line_qty.get(order_line, 0.0), precision_rounding=order_line.product_uom.rounding or rounding) < 0:
to_log[order_line] = (order_line.product_uom_qty, pre_order_line_qty.get(order_line, 0.0))
if to_log:
documents = self.env['stock.picking'].sudo()._log_activity_get_documents(to_log, 'move_ids', 'UP')
documents = {k: v for k, v in documents.items() if k[0].state != 'cancel'}
order._log_decrease_ordered_quantity(documents)
return res
def _compute_json_popover(self):
for order in self:
late_stock_picking = order.picking_ids.filtered(lambda p: p.delay_alert_date)
order.json_popover = json.dumps({
'popoverTemplate': 'sale_stock.DelayAlertWidget',
'late_elements': [{
'id': late_move.id,
'name': late_move.display_name,
'model': 'stock.picking',
} for late_move in late_stock_picking
]
})
order.show_json_popover = bool(late_stock_picking)
def _action_confirm(self):
self.order_line._action_launch_stock_rule()
return super(SaleOrder, self)._action_confirm()
@api.depends('picking_ids')
def _compute_picking_ids(self):
for order in self:
order.delivery_count = len(order.picking_ids)
@api.depends('user_id', 'company_id')
def _compute_warehouse_id(self):
for order in self:
default_warehouse_id = self.env['ir.default'].with_company(
order.company_id.id).get_model_defaults('sale.order').get('warehouse_id')
if order.state in ['draft', 'sent'] or not order.ids:
# Should expect empty
if default_warehouse_id is not None:
order.warehouse_id = default_warehouse_id
else:
order.warehouse_id = order.user_id.with_company(order.company_id.id)._get_default_warehouse_id()
@api.onchange('partner_shipping_id')
def _onchange_partner_shipping_id(self):
res = {}
pickings = self.picking_ids.filtered(
lambda p: p.state not in ['done', 'cancel'] and p.partner_id != self.partner_shipping_id
)
if pickings:
res['warning'] = {
'title': _('Warning!'),
'message': _(
'Do not forget to change the partner on the following delivery orders: %s'
) % (','.join(pickings.mapped('name')))
}
return res
def action_view_delivery(self):
return self._get_action_view_picking(self.picking_ids)
def _action_cancel(self):
documents = None
for sale_order in self:
if sale_order.state == 'sale' and sale_order.order_line:
sale_order_lines_quantities = {order_line: (order_line.product_uom_qty, 0) for order_line in sale_order.order_line}
documents = self.env['stock.picking'].with_context(include_draft_documents=True)._log_activity_get_documents(sale_order_lines_quantities, 'move_ids', 'UP')
self.picking_ids.filtered(lambda p: p.state != 'done').action_cancel()
if documents:
filtered_documents = {}
for (parent, responsible), rendering_context in documents.items():
if parent._name == 'stock.picking':
if parent.state == 'cancel':
continue
filtered_documents[(parent, responsible)] = rendering_context
self._log_decrease_ordered_quantity(filtered_documents, cancel=True)
return super()._action_cancel()
def _get_action_view_picking(self, pickings):
'''
This function returns an action that display existing delivery orders
of given sales order ids. It can either be a in a list or in a form
view, if there is only one delivery order to show.
'''
action = self.env["ir.actions.actions"]._for_xml_id("stock.action_picking_tree_all")
if len(pickings) > 1:
action['domain'] = [('id', 'in', pickings.ids)]
elif pickings:
form_view = [(self.env.ref('stock.view_picking_form').id, 'form')]
if 'views' in action:
action['views'] = form_view + [(state,view) for state,view in action['views'] if view != 'form']
else:
action['views'] = form_view
action['res_id'] = pickings.id
# Prepare the context.
picking_id = pickings.filtered(lambda l: l.picking_type_id.code == 'outgoing')
if picking_id:
picking_id = picking_id[0]
else:
picking_id = pickings[0]
action['context'] = dict(self._context, default_partner_id=self.partner_id.id, default_picking_type_id=picking_id.picking_type_id.id, default_origin=self.name, default_group_id=picking_id.group_id.id)
return action
def _prepare_invoice(self):
invoice_vals = super(SaleOrder, self)._prepare_invoice()
invoice_vals['invoice_incoterm_id'] = self.incoterm.id
return invoice_vals
def _log_decrease_ordered_quantity(self, documents, cancel=False):
def _render_note_exception_quantity_so(rendering_context):
order_exceptions, visited_moves = rendering_context
visited_moves = list(visited_moves)
visited_moves = self.env[visited_moves[0]._name].concat(*visited_moves)
order_line_ids = self.env['sale.order.line'].browse([order_line.id for order in order_exceptions.values() for order_line in order[0]])
sale_order_ids = order_line_ids.mapped('order_id')
impacted_pickings = visited_moves.filtered(lambda m: m.state not in ('done', 'cancel')).mapped('picking_id')
values = {
'sale_order_ids': sale_order_ids,
'order_exceptions': order_exceptions.values(),
'impacted_pickings': impacted_pickings,
'cancel': cancel
}
return self.env['ir.qweb']._render('sale_stock.exception_on_so', values)
self.env['stock.picking']._log_activity(_render_note_exception_quantity_so, documents)

View file

@ -0,0 +1,377 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from datetime import timedelta
from collections import defaultdict
from odoo import api, fields, models, _
from odoo.tools import float_compare
from odoo.exceptions import UserError
class SaleOrderLine(models.Model):
_inherit = 'sale.order.line'
qty_delivered_method = fields.Selection(selection_add=[('stock_move', 'Stock Moves')])
route_id = fields.Many2one('stock.route', string='Route', domain=[('sale_selectable', '=', True)], ondelete='restrict', check_company=True)
move_ids = fields.One2many('stock.move', 'sale_line_id', string='Stock Moves')
virtual_available_at_date = fields.Float(compute='_compute_qty_at_date', digits='Product Unit of Measure')
scheduled_date = fields.Datetime(compute='_compute_qty_at_date')
forecast_expected_date = fields.Datetime(compute='_compute_qty_at_date')
free_qty_today = fields.Float(compute='_compute_qty_at_date', digits='Product Unit of Measure')
qty_available_today = fields.Float(compute='_compute_qty_at_date')
warehouse_id = fields.Many2one(related='order_id.warehouse_id')
qty_to_deliver = fields.Float(compute='_compute_qty_to_deliver', digits='Product Unit of Measure')
is_mto = fields.Boolean(compute='_compute_is_mto')
display_qty_widget = fields.Boolean(compute='_compute_qty_to_deliver')
customer_lead = fields.Float(
compute='_compute_customer_lead', store=True, readonly=False, precompute=True,
inverse='_inverse_customer_lead')
@api.depends('product_type', 'product_uom_qty', 'qty_delivered', 'state', 'move_ids', 'product_uom')
def _compute_qty_to_deliver(self):
"""Compute the visibility of the inventory widget."""
for line in self:
line.qty_to_deliver = line.product_uom_qty - line.qty_delivered
if line.state in ('draft', 'sent', 'sale') and line.product_type == 'product' and line.product_uom and line.qty_to_deliver > 0:
if line.state == 'sale' and not line.move_ids:
line.display_qty_widget = False
else:
line.display_qty_widget = True
else:
line.display_qty_widget = False
@api.depends(
'product_id', 'customer_lead', 'product_uom_qty', 'product_uom', 'order_id.commitment_date',
'move_ids', 'move_ids.forecast_expected_date', 'move_ids.forecast_availability')
def _compute_qty_at_date(self):
""" Compute the quantity forecasted of product at delivery date. There are
two cases:
1. The quotation has a commitment_date, we take it as delivery date
2. The quotation hasn't commitment_date, we compute the estimated delivery
date based on lead time"""
treated = self.browse()
all_move_ids = {
move.id
for line in self
if line.state == 'sale'
for move in line.move_ids | self.env['stock.move'].browse(line.move_ids._rollup_move_origs())
if move.product_id == line.product_id
}
all_moves = self.env['stock.move'].browse(all_move_ids)
forecast_expected_date_per_move = dict(all_moves.mapped(lambda m: (m.id, m.forecast_expected_date)))
# If the state is already in sale the picking is created and a simple forecasted quantity isn't enough
# Then used the forecasted data of the related stock.move
for line in self.filtered(lambda l: l.state == 'sale'):
if not line.display_qty_widget:
continue
moves = line.move_ids | self.env['stock.move'].browse(line.move_ids._rollup_move_origs())
moves = moves.filtered(
lambda m: m.product_id == line.product_id and m.state not in ('cancel', 'done'))
line.forecast_expected_date = max(
(
forecast_expected_date_per_move[move.id]
for move in moves
if forecast_expected_date_per_move[move.id]
),
default=False,
)
line.qty_available_today = 0
line.free_qty_today = 0
for move in moves:
line.qty_available_today += move.product_uom._compute_quantity(move.reserved_availability, line.product_uom)
line.free_qty_today += move.product_id.uom_id._compute_quantity(move.forecast_availability, line.product_uom)
line.scheduled_date = line.order_id.commitment_date or line._expected_date()
line.virtual_available_at_date = False
treated |= line
qty_processed_per_product = defaultdict(lambda: 0)
grouped_lines = defaultdict(lambda: self.env['sale.order.line'])
# We first loop over the SO lines to group them by warehouse and schedule
# date in order to batch the read of the quantities computed field.
for line in self.filtered(lambda l: l.state in ('draft', 'sent')):
if not (line.product_id and line.display_qty_widget):
continue
grouped_lines[(line.warehouse_id.id, line.order_id.commitment_date or line._expected_date())] |= line
for (warehouse, scheduled_date), lines in grouped_lines.items():
product_qties = lines.mapped('product_id').with_context(to_date=scheduled_date, warehouse=warehouse).read([
'qty_available',
'free_qty',
'virtual_available',
])
qties_per_product = {
product['id']: (product['qty_available'], product['free_qty'], product['virtual_available'])
for product in product_qties
}
for line in lines:
line.scheduled_date = scheduled_date
qty_available_today, free_qty_today, virtual_available_at_date = qties_per_product[line.product_id.id]
line.qty_available_today = qty_available_today - qty_processed_per_product[line.product_id.id]
line.free_qty_today = free_qty_today - qty_processed_per_product[line.product_id.id]
line.virtual_available_at_date = virtual_available_at_date - qty_processed_per_product[line.product_id.id]
line.forecast_expected_date = False
product_qty = line.product_uom_qty
if line.product_uom and line.product_id.uom_id and line.product_uom != line.product_id.uom_id:
line.qty_available_today = line.product_id.uom_id._compute_quantity(line.qty_available_today, line.product_uom)
line.free_qty_today = line.product_id.uom_id._compute_quantity(line.free_qty_today, line.product_uom)
line.virtual_available_at_date = line.product_id.uom_id._compute_quantity(line.virtual_available_at_date, line.product_uom)
product_qty = line.product_uom._compute_quantity(product_qty, line.product_id.uom_id)
qty_processed_per_product[line.product_id.id] += product_qty
treated |= lines
remaining = (self - treated)
remaining.virtual_available_at_date = False
remaining.scheduled_date = False
remaining.forecast_expected_date = False
remaining.free_qty_today = False
remaining.qty_available_today = False
@api.depends('product_id', 'route_id', 'order_id.warehouse_id', 'product_id.route_ids')
def _compute_is_mto(self):
""" Verify the route of the product based on the warehouse
set 'is_available' at True if the product availability in stock does
not need to be verified, which is the case in MTO, Cross-Dock or Drop-Shipping
"""
self.is_mto = False
for line in self:
if not line.display_qty_widget:
continue
product = line.product_id
product_routes = line.route_id or (product.route_ids + product.categ_id.total_route_ids)
# Check MTO
mto_route = line.order_id.warehouse_id.mto_pull_id.route_id
if not mto_route:
try:
mto_route = self.env['stock.warehouse']._find_global_route('stock.route_warehouse0_mto', _('Make To Order'))
except UserError:
# if route MTO not found in ir_model_data, we treat the product as in MTS
pass
if mto_route and mto_route in product_routes:
line.is_mto = True
else:
line.is_mto = False
@api.depends('product_id')
def _compute_qty_delivered_method(self):
""" Stock module compute delivered qty for product [('type', 'in', ['consu', 'product'])]
For SO line coming from expense, no picking should be generate: we don't manage stock for
those lines, even if the product is a storable.
"""
super(SaleOrderLine, self)._compute_qty_delivered_method()
for line in self:
if not line.is_expense and line.product_id.type in ['consu', 'product']:
line.qty_delivered_method = 'stock_move'
@api.depends('move_ids.state', 'move_ids.scrapped', 'move_ids.quantity_done', 'move_ids.product_uom')
def _compute_qty_delivered(self):
super(SaleOrderLine, self)._compute_qty_delivered()
for line in self: # TODO: maybe one day, this should be done in SQL for performance sake
if line.qty_delivered_method == 'stock_move':
qty = 0.0
outgoing_moves, incoming_moves = line._get_outgoing_incoming_moves()
for move in outgoing_moves:
if move.state != 'done':
continue
qty += move.product_uom._compute_quantity(move.quantity_done, line.product_uom, rounding_method='HALF-UP')
for move in incoming_moves:
if move.state != 'done':
continue
qty -= move.product_uom._compute_quantity(move.quantity_done, line.product_uom, rounding_method='HALF-UP')
line.qty_delivered = qty
@api.model_create_multi
def create(self, vals_list):
lines = super(SaleOrderLine, self).create(vals_list)
lines.filtered(lambda line: line.state == 'sale')._action_launch_stock_rule()
return lines
def write(self, values):
lines = self.env['sale.order.line']
if 'product_uom_qty' in values:
lines = self.filtered(lambda r: r.state == 'sale' and not r.is_expense)
if 'product_packaging_id' in values:
self.move_ids.filtered(
lambda m: m.state not in ['cancel', 'done']
).product_packaging_id = values['product_packaging_id']
previous_product_uom_qty = {line.id: line.product_uom_qty for line in lines}
res = super(SaleOrderLine, self).write(values)
if lines:
lines._action_launch_stock_rule(previous_product_uom_qty)
return res
@api.depends('order_id.state')
def _compute_invoice_status(self):
def check_moves_state(moves):
# All moves states are either 'done' or 'cancel', and there is at least one 'done'
at_least_one_done = False
for move in moves:
if move.state not in ['done', 'cancel']:
return False
at_least_one_done = at_least_one_done or move.state == 'done'
return at_least_one_done
super(SaleOrderLine, self)._compute_invoice_status()
for line in self:
# We handle the following specific situation: a physical product is partially delivered,
# but we would like to set its invoice status to 'Fully Invoiced'. The use case is for
# products sold by weight, where the delivered quantity rarely matches exactly the
# quantity ordered.
if line.order_id.state == 'done'\
and line.invoice_status == 'no'\
and line.product_id.type in ['consu', 'product']\
and line.product_id.invoice_policy == 'delivery'\
and line.move_ids \
and check_moves_state(line.move_ids):
line.invoice_status = 'invoiced'
@api.depends('move_ids')
def _compute_product_updatable(self):
super()._compute_product_updatable()
for line in self:
if line.move_ids.filtered(lambda m: m.state != 'cancel'):
line.product_updatable = False
@api.depends('product_id')
def _compute_customer_lead(self):
super()._compute_customer_lead() # Reset customer_lead when the product is modified
for line in self:
line.customer_lead = line.product_id.sale_delay
def _inverse_customer_lead(self):
for line in self:
if line.state == 'sale' and not line.order_id.commitment_date:
# Propagate deadline on related stock move
line.move_ids.date_deadline = line.order_id.date_order + timedelta(days=line.customer_lead or 0.0)
def _prepare_procurement_values(self, group_id=False):
""" Prepare specific key for moves or other components that will be created from a stock rule
coming from a sale order line. This method could be override in order to add other custom key that could
be used in move/po creation.
"""
values = super(SaleOrderLine, self)._prepare_procurement_values(group_id)
self.ensure_one()
# Use the delivery date if there is else use date_order and lead time
date_deadline = self.order_id.commitment_date or self._expected_date()
date_planned = date_deadline - timedelta(days=self.order_id.company_id.security_lead)
values.update({
'group_id': group_id,
'sale_line_id': self.id,
'date_planned': date_planned,
'date_deadline': date_deadline,
'route_ids': self.route_id,
'warehouse_id': self.order_id.warehouse_id or False,
'partner_id': self.order_id.partner_shipping_id.id,
'product_description_variants': self.with_context(lang=self.order_id.partner_id.lang)._get_sale_order_line_multiline_description_variants(),
'company_id': self.order_id.company_id,
'product_packaging_id': self.product_packaging_id,
'sequence': self.sequence,
})
return values
def _get_qty_procurement(self, previous_product_uom_qty=False):
self.ensure_one()
qty = 0.0
outgoing_moves, incoming_moves = self._get_outgoing_incoming_moves()
for move in outgoing_moves:
qty += move.product_uom._compute_quantity(move.product_uom_qty, self.product_uom, rounding_method='HALF-UP')
for move in incoming_moves:
qty -= move.product_uom._compute_quantity(move.product_uom_qty, self.product_uom, rounding_method='HALF-UP')
return qty
def _get_outgoing_incoming_moves(self):
outgoing_moves_ids = set()
incoming_moves_ids = set()
moves = self.move_ids.filtered(lambda r: r.state != 'cancel' and not r.scrapped and self.product_id == r.product_id)
if self._context.get('accrual_entry_date'):
moves = moves.filtered(lambda r: fields.Date.context_today(r, r.date) <= self._context['accrual_entry_date'])
for move in moves:
if move.location_dest_id.usage == "customer":
if not move.origin_returned_move_id or (move.origin_returned_move_id and move.to_refund):
outgoing_moves_ids.add(move.id)
elif move.location_dest_id.usage != "customer" and move.to_refund:
incoming_moves_ids.add(move.id)
return self.env['stock.move'].browse(outgoing_moves_ids), self.env['stock.move'].browse(incoming_moves_ids)
def _get_procurement_group(self):
return self.order_id.procurement_group_id
def _prepare_procurement_group_vals(self):
return {
'name': self.order_id.name,
'move_type': self.order_id.picking_policy,
'sale_id': self.order_id.id,
'partner_id': self.order_id.partner_shipping_id.id,
}
def _action_launch_stock_rule(self, previous_product_uom_qty=False):
"""
Launch procurement group run method with required/custom fields generated by a
sale order line. procurement group will launch '_run_pull', '_run_buy' or '_run_manufacture'
depending on the sale order line product rule.
"""
if self._context.get("skip_procurement"):
return True
precision = self.env['decimal.precision'].precision_get('Product Unit of Measure')
procurements = []
for line in self:
line = line.with_company(line.company_id)
if line.state != 'sale' or not line.product_id.type in ('consu', 'product'):
continue
qty = line._get_qty_procurement(previous_product_uom_qty)
if float_compare(qty, line.product_uom_qty, precision_digits=precision) == 0:
continue
group_id = line._get_procurement_group()
if not group_id:
group_id = self.env['procurement.group'].create(line._prepare_procurement_group_vals())
line.order_id.procurement_group_id = group_id
else:
# In case the procurement group is already created and the order was
# cancelled, we need to update certain values of the group.
updated_vals = {}
if group_id.partner_id != line.order_id.partner_shipping_id:
updated_vals.update({'partner_id': line.order_id.partner_shipping_id.id})
if group_id.move_type != line.order_id.picking_policy:
updated_vals.update({'move_type': line.order_id.picking_policy})
if updated_vals:
group_id.write(updated_vals)
values = line._prepare_procurement_values(group_id=group_id)
product_qty = line.product_uom_qty - qty
line_uom = line.product_uom
quant_uom = line.product_id.uom_id
product_qty, procurement_uom = line_uom._adjust_uom_quantities(product_qty, quant_uom)
procurements.append(self.env['procurement.group'].Procurement(
line.product_id, product_qty, procurement_uom,
line.order_id.partner_shipping_id.property_stock_customer,
line.product_id.display_name, line.order_id.name, line.order_id.company_id, values))
if procurements:
procurement_group = self.env['procurement.group']
if self.env.context.get('import_file'):
procurement_group = procurement_group.with_context(import_file=False)
procurement_group.run(procurements)
# This next block is currently needed only because the scheduler trigger is done by picking confirmation rather than stock.move confirmation
orders = self.mapped('order_id')
for order in orders:
pickings_to_confirm = order.picking_ids.filtered(lambda p: p.state not in ['cancel', 'done'])
if pickings_to_confirm:
# Trigger the Scheduler for Pickings
pickings_to_confirm.action_confirm()
return True
def _update_line_quantity(self, values):
precision = self.env['decimal.precision'].precision_get('Product Unit of Measure')
line_products = self.filtered(lambda l: l.product_id.type in ['product', 'consu'])
if line_products.mapped('qty_delivered') and float_compare(values['product_uom_qty'], max(line_products.mapped('qty_delivered')), precision_digits=precision) == -1:
raise UserError(_('The ordered quantity of a sale order line cannot be decreased below the amount already delivered. Instead, create a return in your inventory.'))
super(SaleOrderLine, self)._update_line_quantity(values)

View file

@ -0,0 +1,179 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import defaultdict
from odoo import api, fields, models, _
from odoo.tools.sql import column_exists, create_column
class StockRoute(models.Model):
_inherit = "stock.route"
sale_selectable = fields.Boolean("Selectable on Sales Order Line")
class StockMove(models.Model):
_inherit = "stock.move"
sale_line_id = fields.Many2one('sale.order.line', 'Sale Line', index='btree_not_null')
@api.model
def _prepare_merge_moves_distinct_fields(self):
distinct_fields = super(StockMove, self)._prepare_merge_moves_distinct_fields()
distinct_fields.append('sale_line_id')
return distinct_fields
def _get_related_invoices(self):
""" Overridden from stock_account to return the customer invoices
related to this stock move.
"""
rslt = super(StockMove, self)._get_related_invoices()
invoices = self.mapped('picking_id.sale_id.invoice_ids').filtered(lambda x: x.state == 'posted')
rslt += invoices
#rslt += invoices.mapped('reverse_entry_ids')
return rslt
def _get_source_document(self):
res = super()._get_source_document()
return self.sudo().sale_line_id.order_id or res
def _assign_picking_post_process(self, new=False):
super(StockMove, self)._assign_picking_post_process(new=new)
if new:
picking_id = self.mapped('picking_id')
sale_order_ids = self.mapped('sale_line_id.order_id')
for sale_order_id in sale_order_ids:
picking_id.message_post_with_view(
'mail.message_origin_link',
values={'self': picking_id, 'origin': sale_order_id},
subtype_id=self.env.ref('mail.mt_note').id)
def _get_all_related_sm(self, product):
return super()._get_all_related_sm(product) | self.filtered(lambda m: m.sale_line_id.product_id == product)
class ProcurementGroup(models.Model):
_inherit = 'procurement.group'
sale_id = fields.Many2one('sale.order', 'Sale Order')
class StockRule(models.Model):
_inherit = 'stock.rule'
def _get_custom_move_fields(self):
fields = super(StockRule, self)._get_custom_move_fields()
fields += ['sale_line_id', 'partner_id', 'sequence', 'to_refund']
return fields
class StockPicking(models.Model):
_inherit = 'stock.picking'
sale_id = fields.Many2one(related="group_id.sale_id", string="Sales Order", store=True, readonly=False, index='btree_not_null')
def _auto_init(self):
"""
Create related field here, too slow
when computing it afterwards through _compute_related.
Since group_id.sale_id is created in this module,
no need for an UPDATE statement.
"""
if not column_exists(self.env.cr, 'stock_picking', 'sale_id'):
create_column(self.env.cr, 'stock_picking', 'sale_id', 'int4')
return super()._auto_init()
def _action_done(self):
res = super()._action_done()
sale_order_lines_vals = []
for move in self.move_ids:
sale_order = move.picking_id.sale_id
# Creates new SO line only when pickings linked to a sale order and
# for moves with qty. done and not already linked to a SO line.
if not sale_order or move.location_dest_id.usage != 'customer' or move.sale_line_id or not move.quantity_done:
continue
product = move.product_id
so_line_vals = {
'move_ids': [(4, move.id, 0)],
'name': product.display_name,
'order_id': sale_order.id,
'product_id': product.id,
'product_uom_qty': 0,
'qty_delivered': move.quantity_done,
'product_uom': move.product_uom.id,
}
if product.invoice_policy == 'delivery':
# Check if there is already a SO line for this product to get
# back its unit price (in case it was manually updated).
so_line = sale_order.order_line.filtered(lambda sol: sol.product_id == product)
if so_line:
so_line_vals['price_unit'] = so_line[0].price_unit
elif product.invoice_policy == 'order':
# No unit price if the product is invoiced on the ordered qty.
so_line_vals['price_unit'] = 0
sale_order_lines_vals.append(so_line_vals)
if sale_order_lines_vals:
self.env['sale.order.line'].with_context(skip_procurement=True).create(sale_order_lines_vals)
return res
def _log_less_quantities_than_expected(self, moves):
""" Log an activity on sale order that are linked to moves. The
note summarize the real processed quantity and promote a
manual action.
:param dict moves: a dict with a move as key and tuple with
new and old quantity as value. eg: {move_1 : (4, 5)}
"""
def _keys_in_groupby(sale_line):
""" group by order_id and the sale_person on the order """
return (sale_line.order_id, sale_line.order_id.user_id)
def _render_note_exception_quantity(moves_information):
""" Generate a note with the picking on which the action
occurred and a summary on impacted quantity that are
related to the sale order where the note will be logged.
:param moves_information dict:
{'move_id': ['sale_order_line_id', (new_qty, old_qty)], ..}
:return: an html string with all the information encoded.
:rtype: str
"""
origin_moves = self.env['stock.move'].browse([move.id for move_orig in moves_information.values() for move in move_orig[0]])
origin_picking = origin_moves.mapped('picking_id')
values = {
'origin_moves': origin_moves,
'origin_picking': origin_picking,
'moves_information': moves_information.values(),
}
return self.env['ir.qweb']._render('sale_stock.exception_on_picking', values)
documents = self.sudo()._log_activity_get_documents(moves, 'sale_line_id', 'DOWN', _keys_in_groupby)
self._log_activity(_render_note_exception_quantity, documents)
return super(StockPicking, self)._log_less_quantities_than_expected(moves)
class StockLot(models.Model):
_inherit = 'stock.lot'
sale_order_ids = fields.Many2many('sale.order', string="Sales Orders", compute='_compute_sale_order_ids')
sale_order_count = fields.Integer('Sale order count', compute='_compute_sale_order_ids')
@api.depends('name')
def _compute_sale_order_ids(self):
sale_orders = defaultdict(lambda: self.env['sale.order'])
for move_line in self.env['stock.move.line'].search([('lot_id', 'in', self.ids), ('state', '=', 'done')]):
move = move_line.move_id
if move.picking_id.location_dest_id.usage == 'customer' and move.sale_line_id.order_id:
sale_orders[move_line.lot_id.id] |= move.sale_line_id.order_id
for lot in self:
lot.sale_order_ids = sale_orders[lot.id]
lot.sale_order_count = len(lot.sale_order_ids)
def action_view_so(self):
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id("sale.action_orders")
action['domain'] = [('id', 'in', self.mapped('sale_order_ids.id'))]
action['context'] = dict(self._context, create=False)
return action