mirror of
https://github.com/bringout/oca-edi.git
synced 2026-04-19 11:32:01 +02:00
294 lines
10 KiB
Python
294 lines
10 KiB
Python
# Copyright 2023 ALBA Software S.L.
|
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl-3.0)
|
|
|
|
|
|
import datetime
|
|
import logging
|
|
|
|
from pydifact.segmentcollection import Interchange, Message
|
|
from pydifact.segments import Segment
|
|
|
|
from odoo import api, models
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# https://github.com/nerdocs/pydifact
|
|
class BasePydifact(models.AbstractModel):
|
|
_name = "base.edifact"
|
|
_description = "Generate and parse Edifact documents"
|
|
|
|
MAP_AGENCY_CODE_2_RES_PARTNER_NAMEREF = {"9": "gln"}
|
|
CURRENCY_SYMBOLS = {
|
|
"EUR": "€",
|
|
"USD": "$",
|
|
}
|
|
PRODUCT_CODE_TYPES = {"EN": "EAN", "UP": "UPC", "SRV": "GTIN"}
|
|
|
|
@api.model
|
|
def pydifact_import(self, names):
|
|
classes = {
|
|
"Segment": Segment,
|
|
"Message": Message,
|
|
"Interchange": Interchange,
|
|
}
|
|
return [classes.get(name, None) for name in names]
|
|
|
|
@api.model
|
|
def pydifact_obj(self, docu):
|
|
obj = []
|
|
interchange = self._loads_edifact(docu)
|
|
header_seg = interchange.get_header_segment()
|
|
header_dict = dict()
|
|
header_dict[header_seg.tag] = header_seg.elements
|
|
obj.append(header_dict)
|
|
for message in interchange.get_messages():
|
|
segms = []
|
|
msg = {
|
|
"reference_number": message.reference_number,
|
|
"type": message.type,
|
|
"version": message.version,
|
|
"identifier": message.identifier,
|
|
"HEADER_TAG": message.HEADER_TAG,
|
|
"FOOTER_TAG": message.FOOTER_TAG,
|
|
"characters": str(message.characters),
|
|
"extra_header_elements": message.extra_header_elements,
|
|
"has_una_segment": message.has_una_segment,
|
|
}
|
|
logger.info(message)
|
|
for segment in message.segments:
|
|
logger.info(
|
|
"Segment tag: {}, content: {}".format(segment.tag, segment.elements)
|
|
)
|
|
# segms.append((segment.tag, segment.elements))
|
|
seg = dict()
|
|
seg[segment.tag] = segment.elements
|
|
segms.append(seg)
|
|
msg["segments"] = segms
|
|
obj.append(msg)
|
|
return obj
|
|
|
|
@api.model
|
|
def _loads_edifact(self, order_file):
|
|
# TODO: use chardet library for get encoding
|
|
try:
|
|
interchange = Interchange.from_str(order_file.decode())
|
|
except UnicodeDecodeError:
|
|
interchange = Interchange.from_str(order_file.decode("latin-1"))
|
|
return interchange
|
|
|
|
@api.model
|
|
def _get_msg_type(self, interchange):
|
|
seg = interchange.get_segment("UNH")
|
|
# MSG_TYPE, EDIFACT_MSG_TYPE_RELEASE
|
|
return (seg[1][0], "{}{}".format(seg[1][1], seg[1][2]))
|
|
|
|
@api.model
|
|
def map2odoo_date(self, dt):
|
|
# '102'
|
|
date_format = "%Y%m%d%H%M%S"
|
|
length_dt = len(dt[1])
|
|
if length_dt % 2 == 0 and length_dt in range(8, 13, 2):
|
|
date_format = date_format[0 : length_dt - 2]
|
|
dtt = datetime.datetime.strptime(dt[1], date_format)
|
|
return dtt.date()
|
|
|
|
@api.model
|
|
def map2odoo_partner(self, seg):
|
|
"""
|
|
BY. Party to which merchandise is sold.
|
|
NAD+BY+5550534000017::9'
|
|
NAD segment: ['BY', ['5550534001649', '', '9']]
|
|
|
|
SU. Party which manufactures or otherwise has possession of
|
|
goods,and consigns or makes them available in trade.
|
|
NAD+SU+<Supplier GLN>::9'
|
|
"""
|
|
|
|
partner_dict = {}
|
|
codes = ["BY", "SU"]
|
|
reference_code = seg[0]
|
|
if reference_code not in codes:
|
|
raise NotImplementedError(f"Code '{reference_code}' not implemented")
|
|
#
|
|
party_identification = seg[1]
|
|
party_id = party_identification[0]
|
|
agency_code = party_identification[2]
|
|
nameref = self.MAP_AGENCY_CODE_2_RES_PARTNER_NAMEREF.get(agency_code, "gln")
|
|
partner_dict[nameref] = party_id
|
|
return partner_dict
|
|
|
|
@api.model
|
|
def map2odoo_address(self, seg):
|
|
"""
|
|
DP. Party to which goods should be delivered, if not identical with
|
|
consignee.
|
|
NAD+DP+5550534000086::9+++++++DE'
|
|
NAD segment: ['DP', ['5550534022101', '', '9'], '', '', '', '', '', '', 'ES']
|
|
IV. Party to whom an invoice is issued.
|
|
NAD+IV+5450534005838::9++AMAZON EU SARL:NIEDERLASSUNG
|
|
DEUTSCHLAND+MARCEL-BREUER-STR. 12+MUENCHEN++80807+DE
|
|
|
|
:returns: {
|
|
'type':
|
|
'partner': {'gln':''}
|
|
'address': {...}
|
|
}
|
|
"""
|
|
if seg[0] not in ("DP", "IV"):
|
|
return False
|
|
order_type = "delivery" if seg[0] == "DP" else "invoice"
|
|
address = dict(type=order_type, partner={}, address={})
|
|
# PARTY IDENTIFICATION DETAILS
|
|
iden = seg[1]
|
|
party_id = iden[0]
|
|
agency_code = iden[2]
|
|
nameref = self.MAP_AGENCY_CODE_2_RES_PARTNER_NAMEREF.get(agency_code, "gln")
|
|
address["partner"][nameref] = party_id
|
|
d = address["address"]
|
|
# Fallback if address information is missing
|
|
try:
|
|
if isinstance(seg, Segment):
|
|
lenght_seg = len(seg.elements)
|
|
else:
|
|
lenght_seg = len(seg)
|
|
except ValueError:
|
|
lenght_seg = 0
|
|
# PARTY NAME
|
|
if lenght_seg > 2 and bool(seg[2]):
|
|
d["name"] = seg[2]
|
|
if lenght_seg > 3 and bool(seg[3]):
|
|
d["name"] = "{}{}".format(f"{d['name']}. " if d.get("name") else "", seg[3])
|
|
if lenght_seg > 4 and bool(seg[4]):
|
|
# Street address and/or PO Box number in a structured address: one to three lines.
|
|
d["street"] = seg[4]
|
|
if lenght_seg > 5 and bool(seg[5]):
|
|
d["city"] = seg[5]
|
|
if lenght_seg > 6 and bool(seg[6]):
|
|
# Country sub-entity identification
|
|
d["state_code"] = seg[6]
|
|
if lenght_seg > 7 and bool(seg[7]):
|
|
d["zip"] = seg[7]
|
|
if lenght_seg > 8 and bool(seg[8]):
|
|
# Country, coded ISO 3166
|
|
d["country_code"] = seg[8]
|
|
|
|
return address
|
|
|
|
@api.model
|
|
def map2odoo_currency(self, seg):
|
|
"""
|
|
['2', 'EUR', '9']
|
|
"""
|
|
# Identification of the name or symbol of the monetary unit involved in the transaction.
|
|
currency_coded = seg[1]
|
|
return {
|
|
"iso": currency_coded,
|
|
"symbol": self.CURRENCY_SYMBOLS.get(currency_coded, False),
|
|
}
|
|
|
|
@api.model
|
|
def map2odoo_product(self, seg, pia=None):
|
|
"""
|
|
:seg: LIN segment
|
|
['1', '', ['8885583503464', 'EN']]
|
|
EN. International Article Numbering Association (EAN)
|
|
UP. UPC (Universal product code)
|
|
SRV. GTIN
|
|
:pia: PIA segment
|
|
['5', ['1276', 'SA', '', '9']]
|
|
SA. Supplier's Article Number
|
|
"""
|
|
res = {}
|
|
# Set default code based on SA if given
|
|
if pia is not None and pia[1][0]:
|
|
res["code"] = pia[1][0]
|
|
code = seg[2][0] if len(list(seg)) > 2 else False
|
|
if code:
|
|
field = "code" if seg[2][1] == "SRV" else "barcode"
|
|
res[field] = code
|
|
return res
|
|
|
|
@api.model
|
|
def map2odoo_qty(self, seg):
|
|
"""
|
|
'QTY' EDI segment: [['21', '2']]
|
|
'21'. Ordered quantity
|
|
"""
|
|
return float(seg[0][1])
|
|
|
|
@api.model
|
|
def map2odoo_unit_price(self, seg=None):
|
|
"""
|
|
'PRI' EDI segment: [['AAA', '19.75']]
|
|
Price qualifier:
|
|
* 'AAA'. Calculation net
|
|
* 'AAB'. Calculation gross
|
|
"""
|
|
if seg:
|
|
pri = seg[0]
|
|
if pri[0] == "AAA":
|
|
return float(pri[1])
|
|
# TODO: Add price calculation formula
|
|
if pri[0] == "AAB":
|
|
return float(pri[1])
|
|
return 0.0
|
|
|
|
@api.model
|
|
def map2odoo_description(self, seg):
|
|
"""
|
|
'IMD' EDI segment: ['F', '79', ['', '', '', 'Description']]
|
|
F: Label
|
|
79: Other description
|
|
"""
|
|
if seg:
|
|
description = seg[2][3]
|
|
return description
|
|
return None
|
|
|
|
def _safe_segment_element(self, value):
|
|
if value is False:
|
|
return ""
|
|
return str(value)
|
|
|
|
@api.model
|
|
def create_segment(self, *elements):
|
|
cleaned_elements = []
|
|
for element in elements:
|
|
if isinstance(element, list):
|
|
cleaned_elements.append(
|
|
[self._safe_segment_element(value) for value in element]
|
|
)
|
|
else:
|
|
cleaned_elements.append(self._safe_segment_element(element))
|
|
return Segment(*cleaned_elements)
|
|
|
|
def create_interchange(self, sender, recipient, control_ref, syntax_identifier):
|
|
"""
|
|
Create an interchange (started by UNB segment, ended by UNZ segment)
|
|
|
|
:param list sender: Identification of the sender of the interchange.
|
|
example: ["40410", "14"]
|
|
- 40410: Identification of the sender of the interchange
|
|
- 14: EAN (European Article Numbering Association)
|
|
:param list recipient: Identification of the recipient of the interchange.
|
|
example: ["40411", "14"]
|
|
:param str control_ref: Unique reference assigned by the sender to an interchange.
|
|
example: "10"
|
|
:param list syntax_identifier: Identification of the agency controlling
|
|
the syntax and indication of syntax level, plus the syntax version number.
|
|
example: ["UNOC", "3"]
|
|
|
|
:return: Interchange object representing the created interchange.
|
|
:rtype: Interchange
|
|
"""
|
|
if not sender or not recipient or not control_ref or not syntax_identifier:
|
|
raise ValueError("All parameters must have values and not be False")
|
|
|
|
interchange = Interchange(
|
|
sender=(sender),
|
|
recipient=(recipient),
|
|
control_reference=str(control_ref),
|
|
syntax_identifier=(syntax_identifier),
|
|
)
|
|
return interchange
|