oca-workflow-process/odoo-bringout-oca-purchase-workflow-purchase_merge/purchase_merge/wizard/purchase_merge.py

230 lines
8.2 KiB
Python

# Copyright 2022 Camptocamp SA
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from openupgradelib import openupgrade_merge_records
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
class MergePurchaseAutomatic(models.TransientModel):
"""
The idea behind this wizard is to create a list of potential purchases
to merge. We use two objects, the first one is the wizard for
the end-user. And the second will contain the purchase list to merge.
"""
_name = "purchase.merge.automatic.wizard"
_description = "Purchase Merge Automatic Wizard"
purchase_ids = fields.Many2many(
comodel_name="purchase.order",
)
dst_purchase_id = fields.Many2one(
comodel_name="purchase.order",
string="Destination",
)
@api.model
def default_get(self, fields_list):
res = super(MergePurchaseAutomatic, self).default_get(fields_list)
active_ids = self.env.context.get("active_ids")
purchase_orders = self.purchase_ids.browse(active_ids)
self._check_all_values(purchase_orders)
if (
self.env.context.get("active_model") == "purchase.order"
and len(active_ids) >= 1
):
res["purchase_ids"] = [(6, 0, active_ids)]
res["dst_purchase_id"] = self._get_ordered_purchase(active_ids)[-1].id
return res
# ----------------------------------------
# Check method
# ----------------------------------------
def _check_all_values(self, purchase_orders):
"""Contain all check method"""
self._check_state(purchase_orders)
self._check_content(purchase_orders)
def _check_state(self, purchase_orders):
non_draft_po = purchase_orders.filtered(lambda p: p.state != "draft")
if non_draft_po:
po_names = non_draft_po.mapped("name")
raise ValidationError(
_(
"You can't merge purchase orders that aren't in draft state like: {}"
).format(po_names)
)
def _check_content(self, purchase_orders):
error_messages = []
currencies = purchase_orders.currency_id
if len(currencies) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different currencies: %s",
", ".join(currencies.mapped("name")),
)
)
picking_types = purchase_orders.picking_type_id
if len(picking_types) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different picking types: %s",
", ".join(picking_types.mapped("name")),
)
)
incoterms = purchase_orders.incoterm_id
if len(incoterms) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different incoterms: %s",
", ".join(incoterms.mapped("name")),
)
)
payment_terms = purchase_orders.payment_term_id
if len(payment_terms) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different payment terms: %s",
", ".join(payment_terms.mapped("name")),
)
)
fiscal_positions = purchase_orders.fiscal_position_id
if len(fiscal_positions) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different fiscal positions: %s",
", ".join(fiscal_positions.mapped("name")),
)
)
suppliers = purchase_orders.partner_id
if len(suppliers) > 1:
error_messages.append(
_(
"You can't merge purchase orders with different suppliers: %s",
", ".join(suppliers.mapped("name")),
)
)
if error_messages:
raise ValidationError("\n".join(error_messages))
# ----------------------------------------
# Update method
# ----------------------------------------
@api.model
def _update_values(self, src_purchase, dst_purchase):
"""Update values of dst_purchase with the ones from the src_purchase.
:param src_purchase : recordset of source purchase.order
:param dst_purchase : record of destination purchase.order
"""
# merge all order lines + set origin and partner_ref
dst_purchase.write(self._get_update_values(src_purchase, dst_purchase))
for po in src_purchase:
self._add_message("to", [dst_purchase.name], po)
po_names = src_purchase.mapped("name")
self._add_message("from", po_names, dst_purchase)
@api.model
def _get_update_values(self, src_purchase, dst_purchase):
"""Generate values of dst_purchase with the ones from the src_purchase.
:param src_purchase : recordset of source purchase.order
:param dst_purchase : record of destination purchase.order
"""
# initialize destination origin and partner_ref
origin = {dst_purchase.origin or ""}
origin.update({x.origin for x in src_purchase if x.origin})
partner_ref = {dst_purchase.partner_ref or ""}
partner_ref.update({x.partner_ref for x in src_purchase if x.partner_ref})
# Generate destination origin and partner_ref
src_order_line = src_purchase.mapped("order_line")
return {
"order_line": [(4, line, 0) for line in src_order_line.ids],
"origin": ", ".join(origin),
"partner_ref": ", ".join(partner_ref),
}
def _add_message(self, way, po_name, po):
"""Send a message post with to advise the po about the merge.
:param way : choice between 'from' or 'to'
:param po_name : list of purchase order name to add in the body
:param po_name : the po where the message will be posted
"""
subject = "Merge purchase order"
body = _(
"This purchase order lines have been merged %(way)s : %(po_names)s",
way=way,
po_names=" ,".join(po_name),
)
po.message_post(body=body, subject=subject, content_subtype="plaintext")
def _merge(self, purchases, dst_purchase=None):
"""private implementation of merge purchase
:param purchases : ids of purchase to merge
:param dst_purchase : record of destination purchase.order
"""
if len(purchases) < 2:
return
record_ids = purchases - dst_purchase
openupgrade_merge_records.merge_records(
env=self.env,
model_name=self._name,
record_ids=record_ids.ids,
target_record_id=dst_purchase.id,
)
self._check_all_values(purchases)
# remove dst_purchase from purchases to merge
if dst_purchase and dst_purchase in purchases:
src_purchase = purchases - dst_purchase
else:
dst_purchase = self.purchase_ids[-1]
src_purchase = self.purchase_ids[:-1]
# call sub methods to do the merge
self._update_values(src_purchase, dst_purchase)
# cancel source purchase, since they are merged
src_purchase.button_cancel()
# ----------------------------------------
# Helpers
# ----------------------------------------
@api.model
def _get_ordered_purchase(self, purchase_ids):
"""Helper returns a `purchase.order` recordset ordered by create_date
:param purchase_ids : list of purchase ids to sort
"""
return (
self.env["purchase.order"]
.browse(purchase_ids)
.sorted(
key=lambda p: (p.create_date or ""),
reverse=True,
)
)
# ----------------------------------------
# Actions
# ----------------------------------------
def action_merge(self):
"""Merge Quotation button. Merge the selected purchases."""
if not self.purchase_ids:
return False
self._merge(self.purchase_ids, self.dst_purchase_id)
return True