Initial commit: Accounting packages

This commit is contained in:
Ernad Husremovic 2025-08-29 15:20:47 +02:00
commit 4ef34c2317
2661 changed files with 1709616 additions and 0 deletions

View file

@ -0,0 +1,11 @@
# -*- encoding: utf-8 -*-
from . import account_move
from . import account_journal
from . import account_edi_format
from . import account_edi_document
from . import account_payment
from . import ir_actions_report
from . import mail_template
from . import ir_attachment
from . import uom

View file

@ -0,0 +1,284 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
from odoo.exceptions import UserError
from psycopg2 import OperationalError
import base64
import logging
from collections import defaultdict
_logger = logging.getLogger(__name__)
DEFAULT_BLOCKING_LEVEL = 'error'
class AccountEdiDocument(models.Model):
_name = 'account.edi.document'
_description = 'Electronic Document for an account.move'
# == Stored fields ==
move_id = fields.Many2one('account.move', required=True, ondelete='cascade', index=True)
edi_format_id = fields.Many2one('account.edi.format', required=True)
attachment_id = fields.Many2one(
comodel_name='ir.attachment',
groups='base.group_system',
help="The file generated by edi_format_id when the invoice is posted (and this document is processed).",
)
state = fields.Selection([('to_send', 'To Send'), ('sent', 'Sent'), ('to_cancel', 'To Cancel'), ('cancelled', 'Cancelled')])
error = fields.Html(help='The text of the last error that happened during Electronic Invoice operation.')
blocking_level = fields.Selection(
selection=[('info', 'Info'), ('warning', 'Warning'), ('error', 'Error')],
help="Blocks the current operation of the document depending on the error severity:\n"
" * Info: the document is not blocked and everything is working as it should.\n"
" * Warning: there is an error that doesn't prevent the current Electronic Invoicing operation to succeed.\n"
" * Error: there is an error that blocks the current Electronic Invoicing operation.")
# == Not stored fields ==
name = fields.Char(related='attachment_id.name')
edi_format_name = fields.Char(string='Format Name', related='edi_format_id.name')
edi_content = fields.Binary(compute='_compute_edi_content', compute_sudo=True)
_sql_constraints = [
(
'unique_edi_document_by_move_by_format',
'UNIQUE(edi_format_id, move_id)',
'Only one edi document by move by format',
),
]
@api.depends('move_id', 'error', 'state')
def _compute_edi_content(self):
for doc in self:
res = b''
if doc.state in ('to_send', 'to_cancel'):
move = doc.move_id
config_errors = doc.edi_format_id._check_move_configuration(move)
if config_errors:
res = base64.b64encode('\n'.join(config_errors).encode('UTF-8'))
else:
move_applicability = doc.edi_format_id._get_move_applicability(move)
if move_applicability and move_applicability.get('edi_content'):
res = base64.b64encode(move_applicability['edi_content'](move))
doc.edi_content = res
def action_export_xml(self):
self.ensure_one()
return {
'type': 'ir.actions.act_url',
'url': '/web/content/account.edi.document/%s/edi_content' % self.id
}
def _prepare_jobs(self):
"""Creates a list of jobs to be performed by '_process_job' for the documents in self.
Each document represent a job, BUT if multiple documents have the same state, edi_format_id,
doc_type (invoice or payment) and company_id AND the edi_format_id supports batching, they are grouped
into a single job.
:returns: [{
'documents': account.edi.document,
'method_to_call': str,
}]
"""
# Classify jobs by (edi_format, edi_doc.state, doc_type, move.company_id, custom_key)
to_process = {}
for state, edi_flow in (('to_send', 'post'), ('to_cancel', 'cancel')):
documents = self.filtered(lambda d: d.state == state and d.blocking_level != 'error')
# This is a dict that for each batching key gives the value that should be appended to the batching key,
# in order to enforce a limit of documents per batch.
batching_limit_keys = defaultdict(lambda: 0)
for edi_doc in documents:
edi_format = edi_doc.edi_format_id
move = edi_doc.move_id
move_applicability = edi_doc.edi_format_id._get_move_applicability(move) or {}
batching_key = [edi_format, state, move.company_id]
custom_batching_key = f'{edi_flow}_batching'
if move_applicability.get(custom_batching_key):
batching_key += list(move_applicability[custom_batching_key](move))
else:
batching_key.append(move.id)
if move_applicability.get('batching_limit'):
batching_key_without_limit = tuple(batching_key)
batching_key.append(batching_limit_keys[batching_key_without_limit])
batch = to_process.setdefault(tuple(batching_key), {
'documents': self.env['account.edi.document'],
'method_to_call': move_applicability.get(edi_flow),
})
batch['documents'] |= edi_doc
if move_applicability.get('batching_limit') and len(batch['documents']) >= move_applicability['batching_limit']:
batching_limit_keys[batching_key_without_limit] += 1
return list(to_process.values())
@api.model
def _process_job(self, job):
"""Post or cancel move_id (invoice or payment) by calling the related methods on edi_format_id.
Invoices are processed before payments.
:param job: {
'documents': account.edi.document,
'method_to_call': str,
}
"""
def _postprocess_post_edi_results(documents, edi_result):
attachments_to_unlink = self.env['ir.attachment']
for document in documents:
move = document.move_id
move_result = edi_result.get(move, {})
if move_result.get('attachment'):
old_attachment = document.sudo().attachment_id
document.sudo().attachment_id = move_result['attachment']
if not old_attachment.res_model or not old_attachment.res_id:
attachments_to_unlink |= old_attachment
if move_result.get('success') is True:
document.write({
'state': 'sent',
'error': False,
'blocking_level': False,
})
if move.is_invoice(include_receipts=True):
reconciled_lines = move.line_ids.filtered(lambda line: line.account_id.account_type in ('asset_receivable', 'liability_payable'))
reconciled_amls = reconciled_lines.mapped('matched_debit_ids.debit_move_id') \
| reconciled_lines.mapped('matched_credit_ids.credit_move_id')
reconciled_amls.move_id._update_payments_edi_documents()
else:
document.write({
'error': move_result.get('error', False),
'blocking_level': move_result.get('blocking_level', DEFAULT_BLOCKING_LEVEL) if 'error' in move_result else False,
})
# Attachments that are not explicitly linked to a business model could be removed because they are not
# supposed to have any traceability from the user.
attachments_to_unlink.sudo().unlink()
def _postprocess_cancel_edi_results(documents, edi_result):
move_ids_to_cancel = set() # Avoid duplicates
attachments_to_unlink = self.env['ir.attachment']
for document in documents:
move = document.move_id
move_result = edi_result.get(move, {})
if move_result.get('success') is True:
old_attachment = document.sudo().attachment_id
document.sudo().write({
'state': 'cancelled',
'error': False,
'attachment_id': False,
'blocking_level': False,
})
if move.state == 'posted' and all(
doc.state == 'cancelled'
or not doc.edi_format_id._needs_web_services()
for doc in move.edi_document_ids
):
# The user requested a cancellation of the EDI and it has been approved. Then, the invoice
# can be safely cancelled.
move_ids_to_cancel.add(move.id)
if not old_attachment.res_model or not old_attachment.res_id:
attachments_to_unlink |= old_attachment
else:
document.write({
'error': move_result.get('error', False),
'blocking_level': move_result.get('blocking_level', DEFAULT_BLOCKING_LEVEL) if move_result.get('error') else False,
})
if move_ids_to_cancel:
invoices = self.env['account.move'].browse(list(move_ids_to_cancel))
invoices.button_draft()
invoices.button_cancel()
# Attachments that are not explicitly linked to a business model could be removed because they are not
# supposed to have any traceability from the user.
attachments_to_unlink.sudo().unlink()
documents = job['documents']
if job['method_to_call']:
method_to_call = job['method_to_call']
else:
method_to_call = lambda moves: {move: {'success': True} for move in moves}
documents.edi_format_id.ensure_one() # All account.edi.document of a job should have the same edi_format_id
documents.move_id.company_id.ensure_one() # All account.edi.document of a job should be from the same company
if len(set(doc.state for doc in documents)) != 1:
raise ValueError('All account.edi.document of a job should have the same state')
state = documents[0].state
documents.move_id.line_ids.flush_recordset() # manual flush for tax details
moves = documents.move_id
if state == 'to_send':
if all(move.is_invoice(include_receipts=True) for move in moves):
with moves._send_only_when_ready():
edi_result = method_to_call(moves)
else:
edi_result = method_to_call(moves)
_postprocess_post_edi_results(documents, edi_result)
elif state == 'to_cancel':
edi_result = method_to_call(moves)
_postprocess_cancel_edi_results(documents, edi_result)
def _process_documents_no_web_services(self):
""" Post and cancel all the documents that don't need a web service.
"""
jobs = self.filtered(lambda d: not d.edi_format_id._needs_web_services())._prepare_jobs()
for job in jobs:
self._process_job(job)
def _process_documents_web_services(self, job_count=None, with_commit=True):
''' Post and cancel all the documents that need a web service.
:param job_count: The maximum number of jobs to process if specified.
:param with_commit: Flag indicating a commit should be made between each job.
:return: The number of remaining jobs to process.
'''
all_jobs = self.filtered(lambda d: d.edi_format_id._needs_web_services())._prepare_jobs()
jobs_to_process = all_jobs[0:job_count] if job_count else all_jobs
for job in jobs_to_process:
documents = job['documents']
move_to_lock = documents.move_id
attachments_potential_unlink = documents.sudo().attachment_id.filtered(lambda a: not a.res_model and not a.res_id)
try:
with self.env.cr.savepoint(flush=False):
self._cr.execute('SELECT * FROM account_edi_document WHERE id IN %s FOR UPDATE NOWAIT', [tuple(documents.ids)])
self._cr.execute('SELECT * FROM account_move WHERE id IN %s FOR UPDATE NOWAIT', [tuple(move_to_lock.ids)])
# Locks the attachments that might be unlinked
if attachments_potential_unlink:
self._cr.execute('SELECT * FROM ir_attachment WHERE id IN %s FOR UPDATE NOWAIT', [tuple(attachments_potential_unlink.ids)])
except OperationalError as e:
if e.pgcode == '55P03':
_logger.debug('Another transaction already locked documents rows. Cannot process documents.')
if not with_commit:
raise UserError(_('This document is being sent by another process already. '))
continue
else:
raise e
self._process_job(job)
if with_commit and len(jobs_to_process) > 1:
self.env.cr.commit()
return len(all_jobs) - len(jobs_to_process)
@api.model
def _cron_process_documents_web_services(self, job_count=None):
''' Method called by the EDI cron processing all web-services.
:param job_count: Limit explicitely the number of web service calls. If not provided, process all.
'''
edi_documents = self.search([
('state', 'in', ('to_send', 'to_cancel')),
('move_id.state', '=', 'posted'),
('blocking_level', '!=', 'error'),
])
nb_remaining_jobs = edi_documents._process_documents_web_services(job_count=job_count)
# Mark the CRON to be triggered again asap since there is some remaining jobs to process.
if nb_remaining_jobs > 0:
self.env.ref('account_edi.ir_cron_edi_network')._trigger()

View file

@ -0,0 +1,576 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
from odoo.tools.pdf import OdooPdfFileReader
from odoo.osv import expression
from odoo.tools import html_escape
from odoo.exceptions import RedirectWarning
try:
from PyPDF2.errors import PdfReadError
except ImportError:
from PyPDF2.utils import PdfReadError
from lxml import etree
from struct import error as StructError
import base64
import io
import logging
import pathlib
import re
_logger = logging.getLogger(__name__)
class AccountEdiFormat(models.Model):
_name = 'account.edi.format'
_description = 'EDI format'
name = fields.Char()
code = fields.Char(required=True)
_sql_constraints = [
('unique_code', 'unique (code)', 'This code already exists')
]
####################################################
# Low-level methods
####################################################
@api.model_create_multi
def create(self, vals_list):
edi_formats = super().create(vals_list)
if not edi_formats:
return edi_formats
# activate by default on journal
if not self.pool.loaded:
# The registry is not totally loaded. We cannot yet recompute the field on jourals as
# The helper methods aren't yet overwritten by all installed `l10n_` modules.
# Delay it in the register hook
self.pool._delay_compute_edi_format_ids = True
else:
journals = self.env['account.journal'].search([])
journals._compute_edi_format_ids()
# activate cron
if any(edi_format._needs_web_services() for edi_format in edi_formats):
self.env.ref('account_edi.ir_cron_edi_network').active = True
return edi_formats
def _register_hook(self):
if hasattr(self.pool, "_delay_compute_edi_format_ids"):
del self.pool._delay_compute_edi_format_ids
journals = self.env['account.journal'].search([])
journals._compute_edi_format_ids()
return super()._register_hook()
####################################################
# Export method to override based on EDI Format
####################################################
def _get_move_applicability(self, move):
""" Core function for the EDI processing: it first checks whether the EDI format is applicable on a given
move, if so, it then returns a dictionary containing the functions to call for this move.
:return: dict mapping str to function (callable)
* post: function called for edi.documents with state 'to_send' (post flow)
* cancel: function called for edi.documents with state 'to_cancel' (cancel flow)
* post_batching: function returning the batching key for the post flow
* cancel_batching: function returning the batching key for the cancel flow
* edi_content: function called when computing the edi_content for an edi.document
"""
self.ensure_one()
def _needs_web_services(self):
""" Indicate if the EDI must be generated asynchronously through to some web services.
:return: True if such a web service is available, False otherwise.
"""
self.ensure_one()
return False
def _is_compatible_with_journal(self, journal):
""" Indicate if the EDI format should appear on the journal passed as parameter to be selected by the user.
If True, this EDI format will appear on the journal.
:param journal: The journal.
:returns: True if this format can appear on the journal, False otherwise.
"""
# TO OVERRIDE
self.ensure_one()
return journal.type == 'sale'
def _is_enabled_by_default_on_journal(self, journal):
""" Indicate if the EDI format should be selected by default on the journal passed as parameter.
If True, this EDI format will be selected by default on the journal.
:param journal: The journal.
:returns: True if this format should be enabled by default on the journal, False otherwise.
"""
return True
def _check_move_configuration(self, move):
""" Checks the move and relevant records for potential error (missing data, etc).
:param move: The move to check.
:returns: A list of error messages.
"""
# TO OVERRIDE
return []
####################################################
# Import methods to override based on EDI Format
####################################################
def _create_invoice_from_xml_tree(self, filename, tree, journal=None):
""" Create a new invoice with the data inside the xml.
:param filename: The name of the xml.
:param tree: The tree of the xml to import.
:param journal: The journal on which importing the invoice.
:returns: The created invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _update_invoice_from_xml_tree(self, filename, tree, invoice):
""" Update an existing invoice with the data inside the xml.
:param filename: The name of the xml.
:param tree: The tree of the xml to import.
:param invoice: The invoice to update.
:returns: The updated invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _create_invoice_from_pdf_reader(self, filename, reader):
""" Create a new invoice with the data inside a pdf.
:param filename: The name of the pdf.
:param reader: The OdooPdfFileReader of the pdf to import.
:returns: The created invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _update_invoice_from_pdf_reader(self, filename, reader, invoice):
""" Update an existing invoice with the data inside the pdf.
:param filename: The name of the pdf.
:param reader: The OdooPdfFileReader of the pdf to import.
:param invoice: The invoice to update.
:returns: The updated invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _create_invoice_from_binary(self, filename, content, extension):
""" Create a new invoice with the data inside a binary file.
:param filename: The name of the file.
:param content: The content of the binary file.
:param extension: The extensions as a string.
:returns: The created invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _update_invoice_from_binary(self, filename, content, extension, invoice):
""" Update an existing invoice with the data inside a binary file.
:param filename: The name of the file.
:param content: The content of the binary file.
:param extension: The extensions as a string.
:param invoice: The invoice to update.
:returns: The updated invoice.
"""
# TO OVERRIDE
self.ensure_one()
return self.env['account.move']
def _prepare_invoice_report(self, pdf_writer, edi_document):
"""
Prepare invoice report to be printed.
:param pdf_writer: The pdf writer with the invoice pdf content loaded.
:param edi_document: The edi document to be added to the pdf file.
"""
# TO OVERRIDE
self.ensure_one()
####################################################
# Import Internal methods (not meant to be overridden)
####################################################
def _decode_xml(self, filename, content):
"""Decodes an xml into a list of one dictionary representing an attachment.
:param filename: The name of the xml.
:param content: The bytes representing the xml.
:returns: A list with a dictionary.
* filename: The name of the attachment.
* content: The content of the attachment.
* type: The type of the attachment.
* xml_tree: The tree of the xml if type is xml.
"""
to_process = []
try:
xml_tree = etree.fromstring(content)
except Exception as e:
_logger.exception("Error when converting the xml content to etree: %s" % e)
return to_process
if len(xml_tree):
to_process.append({
'filename': filename,
'content': content,
'type': 'xml',
'xml_tree': xml_tree,
})
return to_process
def _decode_pdf(self, filename, content):
"""Decodes a pdf and unwrap sub-attachment into a list of dictionary each representing an attachment.
:param filename: The name of the pdf.
:param content: The bytes representing the pdf.
:returns: A list of dictionary for each attachment.
* filename: The name of the attachment.
* content: The content of the attachment.
* type: The type of the attachment.
* xml_tree: The tree of the xml if type is xml.
* pdf_reader: The pdf_reader if type is pdf.
"""
to_process = []
try:
buffer = io.BytesIO(content)
pdf_reader = OdooPdfFileReader(buffer, strict=False)
except Exception as e:
# Malformed pdf
_logger.warning("Error when reading the pdf: %s", e, exc_info=True)
return to_process
# Process embedded files.
try:
for xml_name, content in pdf_reader.getAttachments():
to_process.extend(self._decode_xml(xml_name, content))
except (NotImplementedError, StructError, PdfReadError) as e:
_logger.warning("Unable to access the attachments of %s. Tried to decrypt it, but %s." % (filename, e))
# Process the pdf itself.
to_process.append({
'filename': filename,
'content': content,
'type': 'pdf',
'pdf_reader': pdf_reader,
})
return to_process
def _decode_binary(self, filename, content):
"""Decodes any file into a list of one dictionary representing an attachment.
This is a fallback for all files that are not decoded by other methods.
:param filename: The name of the file.
:param content: The bytes representing the file.
:returns: A list with a dictionary.
* filename: The name of the attachment.
* content: The content of the attachment.
* type: The type of the attachment.
"""
return [{
'filename': filename,
'extension': ''.join(pathlib.Path(filename).suffixes),
'content': content,
'type': 'binary',
}]
def _decode_attachment(self, attachment):
"""Decodes an ir.attachment and unwrap sub-attachment into a list of dictionary each representing an attachment.
:param attachment: An ir.attachment record.
:returns: A list of dictionary for each attachment.
* filename: The name of the attachment.
* content: The content of the attachment.
* type: The type of the attachment.
* xml_tree: The tree of the xml if type is xml.
* pdf_reader: The pdf_reader if type is pdf.
"""
content = base64.b64decode(attachment.with_context(bin_size=False).datas)
to_process = []
# XML attachments received by mail have a 'text/plain' mimetype (cfr. context key: 'attachments_mime_plainxml')
# Therefore, if content start with '<?xml', or if the filename ends with '.xml', it is considered as XML.
is_text_plain_xml = 'text/plain' in attachment.mimetype and (content.startswith(b'<?xml') or attachment.name.endswith('.xml'))
if 'pdf' in attachment.mimetype:
to_process.extend(self._decode_pdf(attachment.name, content))
elif attachment.mimetype.endswith('/xml') or is_text_plain_xml:
to_process.extend(self._decode_xml(attachment.name, content))
else:
to_process.extend(self._decode_binary(attachment.name, content))
return to_process
def _create_document_from_attachment(self, attachment):
"""Decodes an ir.attachment to create an invoice.
:param attachment: An ir.attachment record.
:returns: The invoice where to import data.
"""
for file_data in self._decode_attachment(attachment):
for edi_format in self:
res = False
try:
if file_data['type'] == 'xml':
res = edi_format.with_company(self.env.company)._create_invoice_from_xml_tree(file_data['filename'], file_data['xml_tree'])
elif file_data['type'] == 'pdf':
res = edi_format.with_company(self.env.company)._create_invoice_from_pdf_reader(file_data['filename'], file_data['pdf_reader'])
file_data['pdf_reader'].stream.close()
else:
res = edi_format._create_invoice_from_binary(file_data['filename'], file_data['content'], file_data['extension'])
except RedirectWarning as rw:
raise rw
except Exception as e:
_logger.exception(
"Error importing attachment \"%s\" as invoice with format \"%s\": %s",
file_data['filename'],
edi_format.name,
str(e))
if res:
return res._link_invoice_origin_to_purchase_orders(timeout=4)
return self.env['account.move']
def _update_invoice_from_attachment(self, attachment, invoice):
"""Decodes an ir.attachment to update an invoice.
:param attachment: An ir.attachment record.
:returns: The invoice where to import data.
"""
for file_data in self._decode_attachment(attachment):
for edi_format in self:
res = False
try:
if file_data['type'] == 'xml':
res = edi_format.with_company(invoice.company_id)._update_invoice_from_xml_tree(file_data['filename'], file_data['xml_tree'], invoice)
elif file_data['type'] == 'pdf':
res = edi_format.with_company(invoice.company_id)._update_invoice_from_pdf_reader(file_data['filename'], file_data['pdf_reader'], invoice)
file_data['pdf_reader'].stream.close()
else: # file_data['type'] == 'binary'
res = edi_format._update_invoice_from_binary(file_data['filename'], file_data['content'], file_data['extension'], invoice)
except Exception as e:
_logger.exception(
"Error importing attachment \"%s\" as invoice with format \"%s\": %s",
file_data['filename'],
edi_format.name,
str(e))
if res:
return res._link_invoice_origin_to_purchase_orders(timeout=4)
return self.env['account.move']
####################################################
# Import helpers
####################################################
def _find_value(self, xpath, xml_element, namespaces=None):
element = xml_element.xpath(xpath, namespaces=namespaces)
return element[0].text if element else None
@api.model
def _retrieve_partner_with_vat(self, vat, extra_domain):
if not vat:
return None
# Sometimes, the vat is specified with some whitespaces.
normalized_vat = vat.replace(' ', '')
country_prefix = re.match('^[a-zA-Z]{2}|^', vat).group()
partner = self.env['res.partner'].search(extra_domain + [('vat', 'in', (normalized_vat, vat))], limit=1)
# Try to remove the country code prefix from the vat.
if not partner and country_prefix:
partner = self.env['res.partner'].search(extra_domain + [
('vat', 'in', (normalized_vat[2:], vat[2:])),
('country_id.code', '=', country_prefix.upper()),
], limit=1)
# The country could be not specified on the partner.
if not partner:
partner = self.env['res.partner'].search(extra_domain + [
('vat', 'in', (normalized_vat[2:], vat[2:])),
('country_id', '=', False),
], limit=1)
# The vat could be a string of alphanumeric values without country code but with missing zeros at the
# beginning.
if not partner:
try:
vat_only_numeric = str(int(re.sub(r'^\D{2}', '', normalized_vat) or 0))
except ValueError:
vat_only_numeric = None
if vat_only_numeric:
query = self.env['res.partner']._where_calc(extra_domain + [('active', '=', True)])
tables, where_clause, where_params = query.get_sql()
if country_prefix:
vat_prefix_regex = f'({country_prefix})?'
else:
vat_prefix_regex = '([A-z]{2})?'
self._cr.execute(f'''
SELECT res_partner.id
FROM {tables}
WHERE {where_clause}
AND res_partner.vat ~ %s
LIMIT 1
''', where_params + ['^%s0*%s$' % (vat_prefix_regex, vat_only_numeric)])
partner_row = self._cr.fetchone()
if partner_row:
partner = self.env['res.partner'].browse(partner_row[0])
return partner
@api.model
def _retrieve_partner_with_phone_mail(self, phone, mail, extra_domain):
domains = []
if phone:
domains.append([('phone', '=', phone)])
domains.append([('mobile', '=', phone)])
if mail:
domains.append([('email', '=', mail)])
if not domains:
return None
domain = expression.OR(domains)
if extra_domain:
domain = expression.AND([domain, extra_domain])
return self.env['res.partner'].search(domain, limit=1)
@api.model
def _retrieve_partner_with_name(self, name, extra_domain):
if not name:
return None
return self.env['res.partner'].search([('name', 'ilike', name)] + extra_domain, limit=1)
def _retrieve_partner(self, name=None, phone=None, mail=None, vat=None, domain=None):
'''Search all partners and find one that matches one of the parameters.
:param name: The name of the partner.
:param phone: The phone or mobile of the partner.
:param mail: The mail of the partner.
:param vat: The vat number of the partner.
:returns: A partner or an empty recordset if not found.
'''
def search_with_vat(extra_domain):
return self._retrieve_partner_with_vat(vat, extra_domain)
def search_with_phone_mail(extra_domain):
return self._retrieve_partner_with_phone_mail(phone, mail, extra_domain)
def search_with_name(extra_domain):
return self._retrieve_partner_with_name(name, extra_domain)
def search_with_domain(extra_domain):
if not domain:
return None
return self.env['res.partner'].search(domain + extra_domain, limit=1)
for search_method in (search_with_vat, search_with_domain, search_with_phone_mail, search_with_name):
for extra_domain in ([('company_id', '=', self.env.company.id)], [('company_id', '=', False)]):
partner = search_method(extra_domain)
if partner:
return partner
return self.env['res.partner']
def _retrieve_product(self, name=None, default_code=None, barcode=None):
'''Search all products and find one that matches one of the parameters.
Use the following priority:
1. barcode
2. default_code
3. name (exact match)
4. name (ilike)
:param name: The name of the product.
:param default_code: The default_code of the product.
:param barcode: The barcode of the product.
:returns: A product or an empty recordset if not found.
'''
if name and '\n' in name:
# cut Sales Description from the name
name = name.split('\n')[0]
domains = []
if barcode:
domains.append([('barcode', '=', barcode)])
if default_code:
domains.append([('default_code', '=', default_code)])
if name:
domains += [[('name', '=', name)], [('name', 'ilike', name)]]
products = self.env['product.product'].search(
expression.AND([
expression.OR(domains),
[('company_id', 'in', [False, self.env.company.id])],
]),
)
if products:
for domain in domains:
products_by_domain = products.filtered_domain(domain)
if products_by_domain:
return products_by_domain[0]
return self.env['product.product']
def _retrieve_tax(self, amount, type_tax_use):
'''Search all taxes and find one that matches all of the parameters.
:param amount: The amount of the tax.
:param type_tax_use: The type of the tax.
:returns: A tax or an empty recordset if not found.
'''
domains = [
[('amount', '=', float(amount))],
[('type_tax_use', '=', type_tax_use)],
[('company_id', '=', self.env.company.id)]
]
return self.env['account.tax'].search(expression.AND(domains), order='sequence ASC', limit=1)
def _retrieve_currency(self, code):
'''Search all currencies and find one that matches the code.
:param code: The code of the currency.
:returns: A currency or an empty recordset if not found.
'''
currency = self.env['res.currency'].with_context(active_test=False).search([('name', '=', code.upper())], limit=1)
if currency and not currency.active:
error_msg = _('The currency (%s) of the document you are uploading is not active in this database.\n'
'Please activate it and update the currency rate if needed before trying again to import.',
currency.name)
error_action = {
'view_mode': 'form',
'res_model': 'res.currency',
'type': 'ir.actions.act_window',
'target': 'new',
'res_id': currency.id,
'views': [[False, 'form']]
}
raise RedirectWarning(error_msg, error_action, _('Display the currency'))
return currency
####################################################
# Other helpers
####################################################
@api.model
def _format_error_message(self, error_title, errors):
bullet_list_msg = ''.join('<li>%s</li>' % html_escape(msg) for msg in errors)
return '%s<ul>%s</ul>' % (error_title, bullet_list_msg)

View file

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import api, models, fields, _
from odoo.exceptions import UserError
from collections import defaultdict
class AccountJournal(models.Model):
_inherit = 'account.journal'
edi_format_ids = fields.Many2many(comodel_name='account.edi.format',
string='Electronic invoicing',
help='Send XML/EDI invoices',
domain="[('id', 'in', compatible_edi_ids)]",
compute='_compute_edi_format_ids',
readonly=False, store=True)
compatible_edi_ids = fields.Many2many(comodel_name='account.edi.format',
compute='_compute_compatible_edi_ids',
help='EDI format that support moves in this journal')
def write(self, vals):
# OVERRIDE
# Don't allow the user to deactivate an edi format having at least one document to be processed.
if vals.get('edi_format_ids'):
old_edi_format_ids = self.edi_format_ids
res = super().write(vals)
diff_edi_format_ids = old_edi_format_ids - self.edi_format_ids
documents = self.env['account.edi.document'].search([
('move_id.journal_id', 'in', self.ids),
('edi_format_id', 'in', diff_edi_format_ids.ids),
('state', 'in', ('to_cancel', 'to_send')),
])
# If the formats we are unchecking do not need a webservice, we don't need them to be correctly sent
if documents.filtered(lambda d: d.edi_format_id._needs_web_services()):
raise UserError(_('Cannot deactivate (%s) on this journal because not all documents are synchronized', ', '.join(documents.edi_format_id.mapped('display_name'))))
# remove these documents which: do not need a web service & are linked to the edi formats we are unchecking
if documents:
documents.unlink()
return res
else:
return super().write(vals)
@api.depends('type', 'company_id', 'company_id.account_fiscal_country_id')
def _compute_compatible_edi_ids(self):
edi_formats = self.env['account.edi.format'].search([])
for journal in self:
compatible_edis = edi_formats.filtered(lambda e: e._is_compatible_with_journal(journal))
journal.compatible_edi_ids = compatible_edis
@api.depends('type', 'company_id', 'company_id.account_fiscal_country_id')
def _compute_edi_format_ids(self):
edi_formats = self.env['account.edi.format'].search([])
journal_ids = self.ids
if journal_ids:
self._cr.execute('''
SELECT
move.journal_id,
ARRAY_AGG(doc.edi_format_id) AS edi_format_ids
FROM account_edi_document doc
JOIN account_move move ON move.id = doc.move_id
WHERE doc.state IN ('to_cancel', 'to_send')
AND move.journal_id IN %s
GROUP BY move.journal_id
''', [tuple(journal_ids)])
protected_edi_formats_per_journal = {r[0]: set(r[1]) for r in self._cr.fetchall()}
else:
protected_edi_formats_per_journal = defaultdict(set)
for journal in self:
enabled_edi_formats = edi_formats.filtered(lambda e: e._is_compatible_with_journal(journal) and
(e._is_enabled_by_default_on_journal(journal)
or (e in journal.edi_format_ids)))
# The existing edi formats that are already in use so we can't remove it.
protected_edi_format_ids = protected_edi_formats_per_journal.get(journal.id, set())
protected_edi_formats = journal.edi_format_ids.filtered(lambda e: e.id in protected_edi_format_ids)
journal.edi_format_ids = enabled_edi_formats + protected_edi_formats

View file

@ -0,0 +1,500 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import api, fields, models, _
from odoo.exceptions import UserError
class AccountMove(models.Model):
_inherit = 'account.move'
edi_document_ids = fields.One2many(
comodel_name='account.edi.document',
inverse_name='move_id')
edi_state = fields.Selection(
selection=[('to_send', 'To Send'), ('sent', 'Sent'), ('to_cancel', 'To Cancel'), ('cancelled', 'Cancelled')],
string="Electronic invoicing",
store=True,
compute='_compute_edi_state',
help='The aggregated state of all the EDIs with web-service of this move')
edi_error_count = fields.Integer(
compute='_compute_edi_error_count',
help='How many EDIs are in error for this move ?')
edi_blocking_level = fields.Selection(
selection=[('info', 'Info'), ('warning', 'Warning'), ('error', 'Error')],
compute='_compute_edi_error_message')
edi_error_message = fields.Html(
compute='_compute_edi_error_message')
# Technical field to display the documents that will be processed by the CRON
edi_web_services_to_process = fields.Text(
compute='_compute_edi_web_services_to_process')
edi_show_cancel_button = fields.Boolean(
compute='_compute_edi_show_cancel_button')
edi_show_abandon_cancel_button = fields.Boolean(
compute='_compute_edi_show_abandon_cancel_button')
@api.depends('edi_document_ids.state')
def _compute_edi_state(self):
for move in self:
all_states = set(move.edi_document_ids.filtered(lambda d: d.edi_format_id._needs_web_services()).mapped('state'))
if all_states == {'sent'}:
move.edi_state = 'sent'
elif all_states == {'cancelled'}:
move.edi_state = 'cancelled'
elif 'to_send' in all_states:
move.edi_state = 'to_send'
elif 'to_cancel' in all_states:
move.edi_state = 'to_cancel'
else:
move.edi_state = False
@api.depends('edi_document_ids.error')
def _compute_edi_error_count(self):
for move in self:
move.edi_error_count = len(move.edi_document_ids.filtered(lambda d: d.error))
@api.depends('edi_error_count', 'edi_document_ids.error', 'edi_document_ids.blocking_level')
def _compute_edi_error_message(self):
for move in self:
if move.edi_error_count == 0:
move.edi_error_message = None
move.edi_blocking_level = None
elif move.edi_error_count == 1:
error_doc = move.edi_document_ids.filtered(lambda d: d.error)
move.edi_error_message = error_doc.error
move.edi_blocking_level = error_doc.blocking_level
else:
error_levels = set([doc.blocking_level for doc in move.edi_document_ids])
if 'error' in error_levels:
move.edi_error_message = str(move.edi_error_count) + _(" Electronic invoicing error(s)")
move.edi_blocking_level = 'error'
elif 'warning' in error_levels:
move.edi_error_message = str(move.edi_error_count) + _(" Electronic invoicing warning(s)")
move.edi_blocking_level = 'warning'
else:
move.edi_error_message = str(move.edi_error_count) + _(" Electronic invoicing info(s)")
move.edi_blocking_level = 'info'
@api.depends(
'edi_document_ids',
'edi_document_ids.state',
'edi_document_ids.blocking_level',
'edi_document_ids.edi_format_id',
'edi_document_ids.edi_format_id.name')
def _compute_edi_web_services_to_process(self):
for move in self:
to_process = move.edi_document_ids.filtered(lambda d: d.state in ['to_send', 'to_cancel'] and d.blocking_level != 'error')
format_web_services = to_process.edi_format_id.filtered(lambda f: f._needs_web_services())
move.edi_web_services_to_process = ', '.join(f.name for f in format_web_services)
def _check_edi_documents_for_reset_to_draft(self):
self.ensure_one()
for doc in self.edi_document_ids:
move_applicability = doc.edi_format_id._get_move_applicability(self)
if doc.edi_format_id._needs_web_services() \
and doc.state in ('sent', 'to_cancel') \
and move_applicability \
and move_applicability.get('cancel'):
return False
return True
@api.depends('edi_document_ids.state')
def _compute_show_reset_to_draft_button(self):
# OVERRIDE
super()._compute_show_reset_to_draft_button()
for move in self:
if not move._check_edi_documents_for_reset_to_draft():
move.show_reset_to_draft_button = False
@api.depends('edi_document_ids.state')
def _compute_edi_show_cancel_button(self):
for move in self:
if move.state != 'posted':
move.edi_show_cancel_button = False
continue
move.edi_show_cancel_button = False
for doc in move.edi_document_ids:
move_applicability = doc.edi_format_id._get_move_applicability(move)
if doc.edi_format_id._needs_web_services() \
and doc.state == 'sent' \
and move_applicability \
and move_applicability.get('cancel'):
move.edi_show_cancel_button = True
break
@api.depends('edi_document_ids.state')
def _compute_edi_show_abandon_cancel_button(self):
for move in self:
move.edi_show_abandon_cancel_button = False
for doc in move.edi_document_ids:
move_applicability = doc.edi_format_id._get_move_applicability(move)
if doc.edi_format_id._needs_web_services() \
and doc.state == 'to_cancel' \
and move_applicability \
and move_applicability.get('cancel'):
move.edi_show_abandon_cancel_button = True
break
####################################################
# Export Electronic Document
####################################################
def _prepare_edi_tax_details(self, filter_to_apply=None, filter_invl_to_apply=None, grouping_key_generator=None):
''' Compute amounts related to taxes for the current invoice.
:param filter_to_apply: Optional filter to exclude some tax values from the final results.
The filter is defined as a method getting a dictionary as parameter
representing the tax values for a single repartition line.
This dictionary contains:
'base_line_id': An account.move.line record.
'tax_id': An account.tax record.
'tax_repartition_line_id': An account.tax.repartition.line record.
'base_amount': The tax base amount expressed in company currency.
'tax_amount': The tax amount expressed in company currency.
'base_amount_currency': The tax base amount expressed in foreign currency.
'tax_amount_currency': The tax amount expressed in foreign currency.
If the filter is returning False, it means the current tax values will be
ignored when computing the final results.
:param filter_invl_to_apply: Optional filter to exclude some invoice lines.
:param grouping_key_generator: Optional method used to group tax values together. By default, the tax values
are grouped by tax. This parameter is a method getting a dictionary as parameter
(same signature as 'filter_to_apply').
This method must returns a dictionary where values will be used to create the
grouping_key to aggregate tax values together. The returned dictionary is added
to each tax details in order to retrieve the full grouping_key later.
:param compute_mode: Optional parameter to specify the method used to allocate the tax line amounts
among the invoice lines:
'tax_details' (the default) uses the AccountMove._get_query_tax_details method.
'compute_all' uses the AccountTax._compute_all method.
The 'tax_details' method takes the tax line balance and allocates it among the
invoice lines to which that tax applies, proportionately to the invoice lines'
base amounts. This always ensures that the sum of the tax amounts equals the
tax line's balance, which, depending on the constraints of a particular
localization, can be more appropriate when 'Round Globally' is set.
The 'compute_all' method returns, for each invoice line, the exact tax amounts
corresponding to the taxes applied to the invoice line. Depending on the
constraints of the particular localization, this can be more appropriate when
'Round per Line' is set.
:return: The full tax details for the current invoice and for each invoice line
separately. The returned dictionary is the following:
'base_amount': The total tax base amount in company currency for the whole invoice.
'tax_amount': The total tax amount in company currency for the whole invoice.
'base_amount_currency': The total tax base amount in foreign currency for the whole invoice.
'tax_amount_currency': The total tax amount in foreign currency for the whole invoice.
'tax_details': A mapping of each grouping key (see 'grouping_key_generator') to a dictionary
containing:
'base_amount': The tax base amount in company currency for the current group.
'tax_amount': The tax amount in company currency for the current group.
'base_amount_currency': The tax base amount in foreign currency for the current group.
'tax_amount_currency': The tax amount in foreign currency for the current group.
'group_tax_details': The list of all tax values aggregated into this group.
'tax_details_per_record': A mapping of each invoice line to a dictionary containing:
'base_amount': The total tax base amount in company currency for the whole invoice line.
'tax_amount': The total tax amount in company currency for the whole invoice line.
'base_amount_currency': The total tax base amount in foreign currency for the whole invoice line.
'tax_amount_currency': The total tax amount in foreign currency for the whole invoice line.
'tax_details': A mapping of each grouping key (see 'grouping_key_generator') to a dictionary
containing:
'base_amount': The tax base amount in company currency for the current group.
'tax_amount': The tax amount in company currency for the current group.
'base_amount_currency': The tax base amount in foreign currency for the current group.
'tax_amount_currency': The tax amount in foreign currency for the current group.
'group_tax_details': The list of all tax values aggregated into this group.
'''
return self._prepare_invoice_aggregated_taxes(
filter_invl_to_apply=filter_invl_to_apply,
filter_tax_values_to_apply=filter_to_apply,
grouping_key_generator=grouping_key_generator,
)
def _prepare_edi_vals_to_export(self):
''' The purpose of this helper is to prepare values in order to export an invoice through the EDI system.
This includes the computation of the tax details for each invoice line that could be very difficult to
handle regarding the computation of the base amount.
:return: A python dict containing default pre-processed values.
'''
self.ensure_one()
res = {
'record': self,
'balance_multiplicator': -1 if self.is_inbound() else 1,
'invoice_line_vals_list': [],
}
# Invoice lines details.
for index, line in enumerate(self.invoice_line_ids.filtered(lambda line: line.display_type == 'product'), start=1):
line_vals = line._prepare_edi_vals_to_export()
line_vals['index'] = index
res['invoice_line_vals_list'].append(line_vals)
# Totals.
res.update({
'total_price_subtotal_before_discount': sum(x['price_subtotal_before_discount'] for x in res['invoice_line_vals_list']),
'total_price_discount': sum(x['price_discount'] for x in res['invoice_line_vals_list']),
})
return res
def _update_payments_edi_documents(self):
''' Update the edi documents linked to the current journal entries. These journal entries must be linked to an
account.payment of an account.bank.statement.line. This additional method is needed because the payment flow is
not the same as the invoice one. Indeed, the edi documents must be created when the payment is fully reconciled
with invoices.
'''
payments = self.filtered(lambda move: move.payment_id or move.statement_line_id)
edi_document_vals_list = []
to_remove = self.env['account.edi.document']
for payment in payments:
edi_formats = payment._get_reconciled_invoices().journal_id.edi_format_ids | payment.edi_document_ids.edi_format_id
for edi_format in edi_formats:
# Only recreate document when cancelled before.
existing_edi_document = payment.edi_document_ids.filtered(lambda x: x.edi_format_id == edi_format)
if existing_edi_document.state == 'sent':
continue
move_applicability = edi_format._get_move_applicability(payment)
if move_applicability:
if existing_edi_document:
existing_edi_document.write({
'state': 'to_send',
'error': False,
'blocking_level': False,
})
else:
edi_document_vals_list.append({
'edi_format_id': edi_format.id,
'move_id': payment.id,
'state': 'to_send',
})
elif existing_edi_document:
to_remove |= existing_edi_document
to_remove.unlink()
self.env['account.edi.document'].create(edi_document_vals_list)
payments.edi_document_ids._process_documents_no_web_services()
def _is_ready_to_be_sent(self):
# OVERRIDE
# Prevent a mail to be sent to the customer if the EDI document is not sent.
res = super()._is_ready_to_be_sent()
if not res:
return False
edi_documents_to_send = self.edi_document_ids.filtered(lambda x: x.state == 'to_send')
return not bool(edi_documents_to_send)
def _post(self, soft=True):
# OVERRIDE
# Set the electronic document to be posted and post immediately for synchronous formats.
posted = super()._post(soft=soft)
edi_document_vals_list = []
for move in posted:
for edi_format in move.journal_id.edi_format_ids:
move_applicability = edi_format._get_move_applicability(move)
if move_applicability:
errors = edi_format._check_move_configuration(move)
if errors:
raise UserError(_("Invalid invoice configuration:\n\n%s") % '\n'.join(errors))
existing_edi_document = move.edi_document_ids.filtered(lambda x: x.edi_format_id == edi_format)
if existing_edi_document:
existing_edi_document.sudo().write({
'state': 'to_send',
'attachment_id': False,
})
else:
edi_document_vals_list.append({
'edi_format_id': edi_format.id,
'move_id': move.id,
'state': 'to_send',
})
self.env['account.edi.document'].create(edi_document_vals_list)
posted.edi_document_ids._process_documents_no_web_services()
self.env.ref('account_edi.ir_cron_edi_network')._trigger()
return posted
def button_cancel(self):
# OVERRIDE
# Set the electronic document to be canceled and cancel immediately for synchronous formats.
res = super().button_cancel()
self.edi_document_ids.filtered(lambda doc: doc.state != 'sent').write({'state': 'cancelled', 'error': False, 'blocking_level': False})
self.edi_document_ids.filtered(lambda doc: doc.state == 'sent').write({'state': 'to_cancel', 'error': False, 'blocking_level': False})
self.edi_document_ids._process_documents_no_web_services()
self.env.ref('account_edi.ir_cron_edi_network')._trigger()
return res
def _edi_allow_button_draft(self):
self.ensure_one()
return not self.edi_show_cancel_button
def button_draft(self):
# OVERRIDE
for move in self:
if not move._edi_allow_button_draft():
raise UserError(_(
"You can't edit the following journal entry %s because an electronic document has already been "
"sent. Please use the 'Request EDI Cancellation' button instead."
) % move.display_name)
res = super().button_draft()
self.edi_document_ids.write({'error': False, 'blocking_level': False})
self.edi_document_ids.filtered(lambda doc: doc.state == 'to_send').unlink()
return res
def button_cancel_posted_moves(self):
'''Mark the edi.document related to this move to be canceled.
'''
to_cancel_documents = self.env['account.edi.document']
for move in self:
move._check_fiscalyear_lock_date()
is_move_marked = False
for doc in move.edi_document_ids:
move_applicability = doc.edi_format_id._get_move_applicability(move)
if doc.edi_format_id._needs_web_services() \
and doc.state == 'sent' \
and move_applicability \
and move_applicability.get('cancel'):
to_cancel_documents |= doc
is_move_marked = True
if is_move_marked:
move.message_post(body=_("A cancellation of the EDI has been requested."))
to_cancel_documents.write({'state': 'to_cancel', 'error': False, 'blocking_level': False})
def button_abandon_cancel_posted_posted_moves(self):
'''Cancel the request for cancellation of the EDI.
'''
documents = self.env['account.edi.document']
for move in self:
is_move_marked = False
for doc in move.edi_document_ids:
move_applicability = doc.edi_format_id._get_move_applicability(move)
if doc.state == 'to_cancel' and move_applicability and move_applicability.get('cancel'):
documents |= doc
is_move_marked = True
if is_move_marked:
move.message_post(body=_("A request for cancellation of the EDI has been called off."))
documents.write({'state': 'sent', 'error': False, 'blocking_level': False})
def _get_edi_document(self, edi_format):
return self.edi_document_ids.filtered(lambda d: d.edi_format_id == edi_format)
def _get_edi_attachment(self, edi_format):
return self._get_edi_document(edi_format).sudo().attachment_id
####################################################
# Import Electronic Document
####################################################
def _get_create_document_from_attachment_decoders(self):
# OVERRIDE
res = super()._get_create_document_from_attachment_decoders()
res.append((10, self.env['account.edi.format'].search([])._create_document_from_attachment))
return res
def _get_update_invoice_from_attachment_decoders(self, invoice):
# OVERRIDE
res = super()._get_update_invoice_from_attachment_decoders(invoice)
res.append((10, self.env['account.edi.format'].search([])._update_invoice_from_attachment))
return res
# this override is to make sure that the main attachment is not the edi xml otherwise the attachment viewer will not work correctly
def _message_set_main_attachment_id(self, attachment_ids):
if self.message_main_attachment_id and len(attachment_ids) > 1 and self.message_main_attachment_id in self.edi_document_ids.attachment_id:
self.message_main_attachment_id = self.env['ir.attachment']
super()._message_set_main_attachment_id(attachment_ids)
####################################################
# Business operations
####################################################
def button_process_edi_web_services(self):
self.ensure_one()
self.action_process_edi_web_services(with_commit=False)
def action_process_edi_web_services(self, with_commit=True):
docs = self.edi_document_ids.filtered(lambda d: d.state in ('to_send', 'to_cancel') and d.blocking_level != 'error')
docs._process_documents_web_services(with_commit=with_commit)
def _retry_edi_documents_error_hook(self):
''' Hook called when edi_documents are retried. For example, when it's needed to clean a field.
TO OVERRIDE
'''
return
def action_retry_edi_documents_error(self):
self._retry_edi_documents_error_hook()
self.edi_document_ids.write({'error': False, 'blocking_level': False})
self.action_process_edi_web_services()
class AccountMoveLine(models.Model):
_inherit = 'account.move.line'
####################################################
# Export Electronic Document
####################################################
def _prepare_edi_vals_to_export(self):
''' The purpose of this helper is the same as '_prepare_edi_vals_to_export' but for a single invoice line.
This includes the computation of the tax details for each invoice line or the management of the discount.
Indeed, in some EDI, we need to provide extra values depending the discount such as:
- the discount as an amount instead of a percentage.
- the price_unit but after subtraction of the discount.
:return: A python dict containing default pre-processed values.
'''
self.ensure_one()
if self.discount == 100.0:
gross_price_subtotal = self.currency_id.round(self.price_unit * self.quantity)
else:
gross_price_subtotal = self.currency_id.round(self.price_subtotal / (1 - self.discount / 100.0))
res = {
'line': self,
'price_unit_after_discount': self.currency_id.round(self.price_unit * (1 - (self.discount / 100.0))),
'price_subtotal_before_discount': gross_price_subtotal,
'price_subtotal_unit': self.currency_id.round(self.price_subtotal / self.quantity) if self.quantity else 0.0,
'price_total_unit': self.currency_id.round(self.price_total / self.quantity) if self.quantity else 0.0,
'price_discount': gross_price_subtotal - self.price_subtotal,
'price_discount_unit': (gross_price_subtotal - self.price_subtotal) / self.quantity if self.quantity else 0.0,
'gross_price_total_unit': self.currency_id.round(gross_price_subtotal / self.quantity) if self.quantity else 0.0,
'unece_uom_code': self.product_id.product_tmpl_id.uom_id._get_unece_code(),
}
return res
def reconcile(self):
# OVERRIDE
# In some countries, the payments must be sent to the government under some condition. One of them could be
# there is at least one reconciled invoice to the payment. Then, we need to update the state of the edi
# documents during the reconciliation.
all_lines = self + self.matched_debit_ids.debit_move_id + self.matched_credit_ids.credit_move_id
res = super().reconcile()
all_lines.move_id._update_payments_edi_documents()
return res

View file

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
class AccountPayment(models.Model):
_inherit = 'account.payment'
def action_process_edi_web_services(self):
return self.move_id.action_process_edi_web_services()
def action_retry_edi_documents_error(self):
self.ensure_one()
return self.move_id.action_retry_edi_documents_error()

View file

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import io
from odoo import models
from odoo.tools.pdf import OdooPdfFileReader, OdooPdfFileWriter
class IrActionsReport(models.Model):
_inherit = 'ir.actions.report'
def _render_qweb_pdf_prepare_streams(self, report_ref, data, res_ids=None):
# EXTENDS base
collected_streams = super()._render_qweb_pdf_prepare_streams(report_ref, data, res_ids=res_ids)
if collected_streams \
and res_ids \
and len(res_ids) == 1 \
and self._is_invoice_report(report_ref):
invoice = self.env['account.move'].browse(res_ids)
if invoice.is_sale_document() and invoice.state != 'draft':
to_embed = invoice.edi_document_ids
# Add the attachments to the pdf file
if to_embed:
pdf_stream = collected_streams[invoice.id]['stream']
# Read pdf content.
pdf_content = pdf_stream.getvalue()
reader_buffer = io.BytesIO(pdf_content)
reader = OdooPdfFileReader(reader_buffer, strict=False)
# Post-process and embed the additional files.
writer = OdooPdfFileWriter()
writer.cloneReaderDocumentRoot(reader)
for edi_document in to_embed:
# The attachements on the edi documents are only system readable
# because they don't have res_id and res_model, here we are sure that
# the user has access to the invoice and edi document
edi_document.edi_format_id._prepare_invoice_report(writer, edi_document)
# Replace the current content.
pdf_stream.close()
new_pdf_stream = io.BytesIO()
writer.write(new_pdf_stream)
collected_streams[invoice.id]['stream'] = new_pdf_stream
return collected_streams

View file

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
from odoo import api, models, fields, _
from odoo.exceptions import UserError
class IrAttachment(models.Model):
_inherit = 'ir.attachment'
@api.ondelete(at_uninstall=False)
def _unlink_except_government_document(self):
linked_edi_documents = self.env['account.edi.document'].search([('attachment_id', 'in', self.ids)])
linked_edi_formats_ws = linked_edi_documents.edi_format_id.filtered(lambda edi_format: edi_format._needs_web_services())
if linked_edi_formats_ws:
raise UserError(_("You can't unlink an attachment being an EDI document sent to the government."))

View file

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
from odoo import models
class MailTemplate(models.Model):
_inherit = "mail.template"
def _get_edi_attachments(self, document):
"""
Will either return the information about the attachment of the edi document for adding the attachment in the
mail, or the attachment id to be linked to the 'send & print' wizard.
Can be overridden where e.g. a zip-file needs to be sent with the individual files instead of the entire zip
IMPORTANT:
* If the attachment's id is returned, no new attachment will be created, the existing one on the move is linked
to the wizard (see _onchange_template_id in mail.compose.message).
* If the attachment's content is returned, a new one is created and linked to the wizard. Thus, when sending
the mail (clicking on 'send & print' in the wizard), a new attachment is added to the move (see
_action_send_mail in mail.compose.message).
:param document: an edi document
:return: dict:
{'attachments': tuple with the name and base64 content of the attachment}
OR
{'attachment_ids': list containing the id of the attachment}
"""
attachment_sudo = document.sudo().attachment_id
if not attachment_sudo:
return {}
if not (attachment_sudo.res_model and attachment_sudo.res_id):
# do not return system attachment not linked to a record
return {}
if len(self._context.get('active_ids', [])) > 1:
# In mass mail mode 'attachments_ids' is removed from template values
# as they should not be rendered
return {'attachments': [(attachment_sudo.name, attachment_sudo.datas)]}
return {'attachment_ids': [attachment_sudo.id]}
def generate_email(self, res_ids, fields):
res = super().generate_email(res_ids, fields)
multi_mode = True
if isinstance(res_ids, int):
res_ids = [res_ids]
multi_mode = False
if self.model not in ['account.move', 'account.payment']:
return res
records = self.env[self.model].browse(res_ids)
for record in records:
record_data = (res[record.id] if multi_mode else res)
for doc in record.edi_document_ids:
record_data.setdefault('attachments', [])
attachments = self._get_edi_attachments(doc)
record_data['attachment_ids'] += attachments.get('attachment_ids', [])
record_data['attachments'] += attachments.get('attachments', [])
return res

View file

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
from odoo import models
class UoM(models.Model):
_inherit = 'uom.uom'
def _get_unece_code(self):
""" Returns the UNECE code used for international trading for corresponding to the UoM as per
https://unece.org/fileadmin/DAM/cefact/recommendations/rec20/rec20_rev3_Annex2e.pdf"""
mapping = {
'uom.product_uom_unit': 'C62',
'uom.product_uom_dozen': 'DZN',
'uom.product_uom_kgm': 'KGM',
'uom.product_uom_gram': 'GRM',
'uom.product_uom_day': 'DAY',
'uom.product_uom_hour': 'HUR',
'uom.product_uom_ton': 'TNE',
'uom.product_uom_meter': 'MTR',
'uom.product_uom_km': 'KMT',
'uom.product_uom_cm': 'CMT',
'uom.product_uom_litre': 'LTR',
'uom.product_uom_lb': 'LBR',
'uom.product_uom_oz': 'ONZ',
'uom.product_uom_inch': 'INH',
'uom.product_uom_foot': 'FOT',
'uom.product_uom_mile': 'SMI',
'uom.product_uom_floz': 'OZA',
'uom.product_uom_qt': 'QT',
'uom.product_uom_gal': 'GLL',
'uom.product_uom_cubic_meter': 'MTQ',
'uom.product_uom_cubic_inch': 'INQ',
'uom.product_uom_cubic_foot': 'FTQ',
}
xml_ids = self._get_external_ids().get(self.id, [])
matches = list(set(xml_ids) & set(mapping.keys()))
return matches and mapping[matches[0]] or 'C62'