Initial commit: Hw packages

This commit is contained in:
Ernad Husremovic 2025-08-29 15:20:52 +02:00
commit a9d00500da
161 changed files with 10506 additions and 0 deletions

View file

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import jinja2
import json
import logging
import netifaces as ni
import os
import subprocess
import threading
import time
import urllib3
from odoo import http
from odoo.addons.hw_drivers.connection_manager import connection_manager
from odoo.addons.hw_drivers.driver import Driver
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.main import iot_devices
from odoo.addons.hw_drivers.tools import helpers
path = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../views'))
loader = jinja2.FileSystemLoader(path)
jinja_env = jinja2.Environment(loader=loader, autoescape=True)
jinja_env.filters["json"] = json.dumps
pos_display_template = jinja_env.get_template('pos_display.html')
_logger = logging.getLogger(__name__)
class DisplayDriver(Driver):
connection_type = 'display'
def __init__(self, identifier, device):
super(DisplayDriver, self).__init__(identifier, device)
self.device_type = 'display'
self.device_connection = 'hdmi'
self.device_name = device['name']
self.event_data = threading.Event()
self.owner = False
self.rendered_html = ''
if self.device_identifier != 'distant_display':
self._x_screen = device.get('x_screen', '0')
self.load_url()
self._actions.update({
'update_url': self._action_update_url,
'display_refresh': self._action_display_refresh,
'take_control': self._action_take_control,
'customer_facing_display': self._action_customer_facing_display,
'get_owner': self._action_get_owner,
})
@classmethod
def supported(cls, device):
return True # All devices with connection_type == 'display' are supported
@classmethod
def get_default_display(cls):
displays = list(filter(lambda d: iot_devices[d].device_type == 'display', iot_devices))
return len(displays) and iot_devices[displays[0]]
def run(self):
while self.device_identifier != 'distant_display' and not self._stopped.is_set():
time.sleep(60)
if self.url != 'http://localhost:8069/point_of_sale/display/' + self.device_identifier:
# Refresh the page every minute
self.call_xdotools('F5')
def update_url(self, url=None):
os.environ['DISPLAY'] = ":0." + self._x_screen
os.environ['XAUTHORITY'] = '/run/lightdm/pi/xauthority'
firefox_env = os.environ.copy()
firefox_env['HOME'] = '/tmp/' + self._x_screen
self.url = url or 'http://localhost:8069/point_of_sale/display/' + self.device_identifier
new_window = subprocess.call(['xdotool', 'search', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox'])
subprocess.Popen(['firefox', self.url], env=firefox_env)
if new_window:
self.call_xdotools('F11')
def load_url(self):
url = None
if helpers.get_odoo_server_url():
# disable certifiacte verification
urllib3.disable_warnings()
http = urllib3.PoolManager(cert_reqs='CERT_NONE')
try:
response = http.request('GET', "%s/iot/box/%s/display_url" % (helpers.get_odoo_server_url(), helpers.get_mac_address()))
if response.status == 200:
data = json.loads(response.data.decode('utf8'))
url = data[self.device_identifier]
except json.decoder.JSONDecodeError:
url = response.data.decode('utf8')
except Exception:
pass
return self.update_url(url)
def call_xdotools(self, keystroke):
os.environ['DISPLAY'] = ":0." + self._x_screen
os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority"
try:
subprocess.call(['xdotool', 'search', '--sync', '--onlyvisible', '--screen', self._x_screen, '--class', 'Firefox', 'key', keystroke])
return "xdotool succeeded in stroking " + keystroke
except:
return "xdotool threw an error, maybe it is not installed on the IoTBox"
def update_customer_facing_display(self, origin, html=None):
if origin == self.owner:
self.rendered_html = html
self.event_data.set()
def get_serialized_order(self):
# IMPLEMENTATION OF LONGPOLLING
# Times out 2 seconds before the JS request does
if self.event_data.wait(28):
self.event_data.clear()
return {'rendered_html': self.rendered_html}
return {'rendered_html': False}
def take_control(self, new_owner, html=None):
# ALLOW A CASHIER TO TAKE CONTROL OVER THE POSBOX, IN CASE OF MULTIPLE CASHIER PER DISPLAY
self.owner = new_owner
self.rendered_html = html
self.data = {
'value': '',
'owner': self.owner,
}
event_manager.device_changed(self)
self.event_data.set()
def _action_update_url(self, data):
if self.device_identifier != 'distant_display':
self.update_url(data.get('url'))
def _action_display_refresh(self, data):
if self.device_identifier != 'distant_display':
self.call_xdotools('F5')
def _action_take_control(self, data):
self.take_control(self.data.get('owner'), data.get('html'))
def _action_customer_facing_display(self, data):
self.update_customer_facing_display(self.data.get('owner'), data.get('html'))
def _action_get_owner(self, data):
self.data = {
'value': '',
'owner': self.owner,
}
event_manager.device_changed(self)
class DisplayController(http.Controller):
@http.route('/hw_proxy/display_refresh', type='json', auth='none', cors='*')
def display_refresh(self):
display = DisplayDriver.get_default_display()
if display and display.device_identifier != 'distant_display':
return display.call_xdotools('F5')
@http.route('/hw_proxy/customer_facing_display', type='json', auth='none', cors='*')
def customer_facing_display(self, html=None):
display = DisplayDriver.get_default_display()
if display:
display.update_customer_facing_display(http.request.httprequest.remote_addr, html)
return {'status': 'updated'}
return {'status': 'failed'}
@http.route('/hw_proxy/take_control', type='json', auth='none', cors='*')
def take_control(self, html=None):
display = DisplayDriver.get_default_display()
if display:
display.take_control(http.request.httprequest.remote_addr, html)
return {
'status': 'success',
'message': 'You now have access to the display',
}
@http.route('/hw_proxy/test_ownership', type='json', auth='none', cors='*')
def test_ownership(self):
display = DisplayDriver.get_default_display()
if display and display.owner == http.request.httprequest.remote_addr:
return {'status': 'OWNER'}
return {'status': 'NOWNER'}
@http.route(['/point_of_sale/get_serialized_order', '/point_of_sale/get_serialized_order/<string:display_identifier>'], type='json', auth='none')
def get_serialized_order(self, display_identifier=None):
if display_identifier:
display = iot_devices.get(display_identifier)
else:
display = DisplayDriver.get_default_display()
if display:
return display.get_serialized_order()
return {
'rendered_html': False,
'error': "No display found",
}
@http.route(['/point_of_sale/display', '/point_of_sale/display/<string:display_identifier>'], type='http', auth='none')
def display(self, display_identifier=None):
cust_js = None
interfaces = ni.interfaces()
with open(os.path.join(os.path.dirname(__file__), "../../static/src/js/worker.js")) as js:
cust_js = js.read()
display_ifaces = []
for iface_id in interfaces:
if 'wlan' in iface_id or 'eth' in iface_id:
iface_obj = ni.ifaddresses(iface_id)
ifconfigs = iface_obj.get(ni.AF_INET, [])
essid = helpers.get_ssid()
for conf in ifconfigs:
if conf.get('addr'):
display_ifaces.append({
'iface_id': iface_id,
'essid': essid,
'addr': conf.get('addr'),
'icon': 'sitemap' if 'eth' in iface_id else 'wifi',
})
if not display_identifier:
display_identifier = DisplayDriver.get_default_display().device_identifier
return pos_display_template.render({
'title': "Odoo -- Point of Sale",
'breadcrumb': 'POS Client display',
'cust_js': cust_js,
'display_ifaces': display_ifaces,
'display_identifier': display_identifier,
'pairing_code': connection_manager.pairing_code,
})

View file

@ -0,0 +1,384 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import ctypes
import evdev
import json
import logging
from lxml import etree
import os
from pathlib import Path
from queue import Queue, Empty
import re
import subprocess
from threading import Lock
import time
import urllib3
from usb import util
from odoo import http, _
from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
from odoo.addons.hw_drivers.driver import Driver
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.main import iot_devices
from odoo.addons.hw_drivers.tools import helpers
_logger = logging.getLogger(__name__)
xlib = ctypes.cdll.LoadLibrary('libX11.so.6')
class KeyboardUSBDriver(Driver):
# The list of devices can be found in /proc/bus/input/devices
# or "ls -l /dev/input/by-path"
# Note: the user running "evdev" commands must be inside the "input" group
# Each device's input will correspond to a file in /dev/input/event*
# The exact file can be found by looking at the "Handlers" line in /proc/bus/input/devices
# Example: "H: Handlers=sysrq kbd leds event0" -> The file used is /dev/input/event0
# If you read the file "/dev/input/event0" you will get the input from the device in real time
# One usb device can have multiple associated event files (like a foot pedal which has 3 event files)
connection_type = 'usb'
keyboard_layout_groups = []
available_layouts = []
input_devices = []
def __init__(self, identifier, device):
if not hasattr(KeyboardUSBDriver, 'display'):
os.environ['XAUTHORITY'] = "/run/lightdm/pi/xauthority"
KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8"))
super(KeyboardUSBDriver, self).__init__(identifier, device)
self.device_connection = 'direct'
self.device_name = self._set_name()
self._actions.update({
'update_layout': self._update_layout,
'update_is_scanner': self._save_is_scanner,
'': self._action_default,
})
# from https://github.com/xkbcommon/libxkbcommon/blob/master/test/evdev-scancodes.h
self._scancode_to_modifier = {
42: 'left_shift',
54: 'right_shift',
58: 'caps_lock',
69: 'num_lock',
100: 'alt_gr', # right alt
}
self._tracked_modifiers = {modifier: False for modifier in self._scancode_to_modifier.values()}
if not KeyboardUSBDriver.available_layouts:
KeyboardUSBDriver.load_layouts_list()
KeyboardUSBDriver.send_layouts_list()
for evdev_device in [evdev.InputDevice(path) for path in evdev.list_devices()]:
if (device.idVendor == evdev_device.info.vendor) and (device.idProduct == evdev_device.info.product):
self.input_devices.append(evdev_device)
self._set_device_type('scanner') if self._is_scanner() else self._set_device_type()
@classmethod
def supported(cls, device):
for cfg in device:
for itf in cfg:
if itf.bInterfaceClass == 3 and itf.bInterfaceProtocol != 2:
device.interface_protocol = itf.bInterfaceProtocol
return True
return False
@classmethod
def get_status(self):
"""Allows `hw_proxy.Proxy` to retrieve the status of the scanners"""
status = 'connected' if any(iot_devices[d].device_type == "scanner" for d in iot_devices) else 'disconnected'
return {'status': status, 'messages': ''}
@classmethod
def send_layouts_list(cls):
server = helpers.get_odoo_server_url()
if server:
urllib3.disable_warnings()
pm = urllib3.PoolManager(cert_reqs='CERT_NONE')
server = server + '/iot/keyboard_layouts'
try:
pm.request('POST', server, fields={'available_layouts': json.dumps(cls.available_layouts)})
except Exception:
_logger.exception('Could not reach configured server to send available layouts')
@classmethod
def load_layouts_list(cls):
tree = etree.parse("/usr/share/X11/xkb/rules/base.xml", etree.XMLParser(ns_clean=True, recover=True))
layouts = tree.xpath("//layout")
for layout in layouts:
layout_name = layout.xpath("./configItem/name")[0].text
layout_description = layout.xpath("./configItem/description")[0].text
KeyboardUSBDriver.available_layouts.append({
'name': layout_description,
'layout': layout_name,
})
for variant in layout.xpath("./variantList/variant"):
variant_name = variant.xpath("./configItem/name")[0].text
variant_description = variant.xpath("./configItem/description")[0].text
KeyboardUSBDriver.available_layouts.append({
'name': variant_description,
'layout': layout_name,
'variant': variant_name,
})
def _set_name(self):
try:
manufacturer = util.get_string(self.dev, self.dev.iManufacturer)
product = util.get_string(self.dev, self.dev.iProduct)
if manufacturer and product:
return re.sub(r"[^\w \-+/*&]", '', "%s - %s" % (manufacturer, product))
except ValueError as e:
_logger.warning(e)
return _('Unknown input device')
def run(self):
try:
for device in self.input_devices:
for event in device.read_loop():
if self._stopped.is_set():
break
if event.type == evdev.ecodes.EV_KEY:
data = evdev.categorize(event)
modifier_name = self._scancode_to_modifier.get(data.scancode)
if modifier_name:
if modifier_name in ('caps_lock', 'num_lock'):
if data.keystate == 1:
self._tracked_modifiers[modifier_name] = not self._tracked_modifiers[modifier_name]
else:
self._tracked_modifiers[modifier_name] = bool(data.keystate) # 1 for keydown, 0 for keyup
elif data.keystate == 1:
self.key_input(data.scancode)
except Exception as err:
_logger.warning(err)
def _change_keyboard_layout(self, new_layout):
"""Change the layout of the current device to what is specified in
new_layout.
Args:
new_layout (dict): A dict containing two keys:
- layout (str): The layout code
- variant (str): An optional key to represent the variant of the
selected layout
"""
if hasattr(self, 'keyboard_layout'):
KeyboardUSBDriver.keyboard_layout_groups.remove(self.keyboard_layout)
if new_layout:
self.keyboard_layout = new_layout.get('layout') or 'us'
if new_layout.get('variant'):
self.keyboard_layout += "(%s)" % new_layout['variant']
else:
self.keyboard_layout = 'us'
KeyboardUSBDriver.keyboard_layout_groups.append(self.keyboard_layout)
subprocess.call(["setxkbmap", "-display", ":0.0", ",".join(KeyboardUSBDriver.keyboard_layout_groups)])
# Close then re-open display to refresh the mapping
xlib.XCloseDisplay(KeyboardUSBDriver.display)
KeyboardUSBDriver.display = xlib.XOpenDisplay(bytes(":0.0", "utf-8"))
def save_layout(self, layout):
"""Save the layout to a file on the box to read it when restarting it.
We need that in order to keep the selected layout after a reboot.
Args:
new_layout (dict): A dict containing two keys:
- layout (str): The layout code
- variant (str): An optional key to represent the variant of the
selected layout
"""
file_path = helpers.path_file('odoo-keyboard-layouts.conf')
if file_path.exists():
data = json.loads(file_path.read_text())
else:
data = {}
data[self.device_identifier] = layout
helpers.write_file('odoo-keyboard-layouts.conf', json.dumps(data))
def load_layout(self):
"""Read the layout from the saved filed and set it as current layout.
If no file or no layout is found we use 'us' by default.
"""
file_path = helpers.path_file('odoo-keyboard-layouts.conf')
if file_path.exists():
data = json.loads(file_path.read_text())
layout = data.get(self.device_identifier, {'layout': 'us'})
else:
layout = {'layout': 'us'}
self._change_keyboard_layout(layout)
def _action_default(self, data):
self.data['value'] = ''
event_manager.device_changed(self)
def _is_scanner(self):
"""Read the device type from the saved filed and set it as current type.
If no file or no device type is found we try to detect it automatically.
"""
device_name = self.device_name.lower()
scanner_name = ['barcode', 'scanner', 'reader']
is_scanner = any(x in device_name for x in scanner_name) or self.dev.interface_protocol == '0'
file_path = helpers.path_file('odoo-keyboard-is-scanner.conf')
if file_path.exists():
data = json.loads(file_path.read_text())
is_scanner = data.get(self.device_identifier, {}).get('is_scanner', is_scanner)
return is_scanner
def _keyboard_input(self, scancode):
"""Deal with a keyboard input. Send the character corresponding to the
pressed key represented by its scancode to the connected Odoo instance.
Args:
scancode (int): The scancode of the pressed key.
"""
self.data['value'] = self._scancode_to_char(scancode)
if self.data['value']:
event_manager.device_changed(self)
def _barcode_scanner_input(self, scancode):
"""Deal with a barcode scanner input. Add the new character scanned to
the current barcode or complete the barcode if "Return" is pressed.
When a barcode is completed, two tasks are performed:
- Send a device_changed update to the event manager to notify the
listeners that the value has changed (used in Enterprise).
- Add the barcode to the list barcodes that are being queried in
Community.
Args:
scancode (int): The scancode of the pressed key.
"""
if scancode == 28: # Return
self.data['value'] = self._current_barcode
event_manager.device_changed(self)
self._barcodes.put((time.time(), self._current_barcode))
self._current_barcode = ''
else:
self._current_barcode += self._scancode_to_char(scancode)
def _save_is_scanner(self, data):
"""Save the type of device.
We need that in order to keep the selected type of device after a reboot.
"""
is_scanner = {'is_scanner': data.get('is_scanner')}
file_path = helpers.path_file('odoo-keyboard-is-scanner.conf')
if file_path.exists():
data = json.loads(file_path.read_text())
else:
data = {}
data[self.device_identifier] = is_scanner
helpers.write_file('odoo-keyboard-is-scanner.conf', json.dumps(data))
self._set_device_type('scanner') if is_scanner.get('is_scanner') else self._set_device_type()
def _update_layout(self, data):
layout = {
'layout': data.get('layout'),
'variant': data.get('variant'),
}
self._change_keyboard_layout(layout)
self.save_layout(layout)
def _set_device_type(self, device_type='keyboard'):
"""Modify the device type between 'keyboard' and 'scanner'
Args:
type (string): Type wanted to switch
"""
if device_type == 'scanner':
self.device_type = 'scanner'
self.key_input = self._barcode_scanner_input
self._barcodes = Queue()
self._current_barcode = ''
for device in self.input_devices:
device.grab()
self.read_barcode_lock = Lock()
else:
self.device_type = 'keyboard'
self.key_input = self._keyboard_input
self.load_layout()
def _scancode_to_char(self, scancode):
"""Translate a received scancode to a character depending on the
selected keyboard layout and the current state of the keyboard's
modifiers.
Args:
scancode (int): The scancode of the pressed key, to be translated to
a character
Returns:
str: The translated scancode.
"""
# Scancode -> Keysym : Depends on the keyboard layout
group = KeyboardUSBDriver.keyboard_layout_groups.index(self.keyboard_layout)
modifiers = self._get_active_modifiers(scancode)
keysym = ctypes.c_int(xlib.XkbKeycodeToKeysym(KeyboardUSBDriver.display, scancode + 8, group, modifiers))
# Translate Keysym to a character
key_pressed = ctypes.create_string_buffer(5)
xlib.XkbTranslateKeySym(KeyboardUSBDriver.display, ctypes.byref(keysym), 0, ctypes.byref(key_pressed), 5, ctypes.byref(ctypes.c_int()))
if key_pressed.value:
return key_pressed.value.decode('utf-8')
return ''
def _get_active_modifiers(self, scancode):
"""Get the state of currently active modifiers.
Args:
scancode (int): The scancode of the key being translated
Returns:
int: The current state of the modifiers:
0 -- Lowercase
1 -- Highercase or (NumLock + key pressed on keypad)
2 -- AltGr
3 -- Highercase + AltGr
"""
modifiers = 0
uppercase = (self._tracked_modifiers['right_shift'] or self._tracked_modifiers['left_shift']) ^ self._tracked_modifiers['caps_lock']
if uppercase or (scancode in [71, 72, 73, 75, 76, 77, 79, 80, 81, 82, 83] and self._tracked_modifiers['num_lock']):
modifiers += 1
if self._tracked_modifiers['alt_gr']:
modifiers += 2
return modifiers
def read_next_barcode(self):
"""Get the value of the last barcode that was scanned but not sent yet
and not older than 5 seconds. This function is used in Community, when
we don't have access to the IoTLongpolling.
Returns:
str: The next barcode to be read or an empty string.
"""
# Previous query still running, stop it by sending a fake barcode
if self.read_barcode_lock.locked():
self._barcodes.put((time.time(), ""))
with self.read_barcode_lock:
try:
timestamp, barcode = self._barcodes.get(True, 55)
if timestamp > time.time() - 5:
return barcode
except Empty:
return ''
proxy_drivers['scanner'] = KeyboardUSBDriver
class KeyboardUSBController(http.Controller):
@http.route('/hw_proxy/scanner', type='json', auth='none', cors='*')
def get_barcode(self):
scanners = [iot_devices[d] for d in iot_devices if iot_devices[d].device_type == "scanner"]
if scanners:
return scanners[0].read_next_barcode()
time.sleep(5)
return None

View file

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import logging
import platform
import json
from passlib.context import CryptContext
from odoo import http
from odoo.tools.config import config
_logger = logging.getLogger(__name__)
try:
import PyKCS11
except ImportError:
PyKCS11 = None
_logger.error('Could not import library PyKCS11')
crypt_context = CryptContext(schemes=['pbkdf2_sha512'])
class EtaUsbController(http.Controller):
def _is_access_token_valid(self, access_token):
stored_hash = config.get('proxy_access_token')
if not stored_hash:
# empty password/hash => authentication forbidden
return False
return crypt_context.verify(access_token, stored_hash)
@http.route('/hw_l10n_eg_eta/certificate', type='http', auth='none', cors='*', csrf=False, save_session=False, methods=['POST'])
def eta_certificate(self, pin, access_token):
"""
Gets the certificate from the token and returns it to the main odoo instance so that we can prepare the
cades-bes object on the main odoo instance rather than this middleware
@param pin: pin of the token
@param access_token: token shared with the main odoo instance
"""
if not PyKCS11:
return self._get_error_template('no_pykcs11')
if not self._is_access_token_valid(access_token):
return self._get_error_template('unauthorized')
session, error = self._get_session(pin)
if error:
return error
try:
cert = session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)])[0]
cert_bytes = bytes(session.getAttributeValue(cert, [PyKCS11.CKA_VALUE])[0])
payload = {
'certificate': base64.b64encode(cert_bytes).decode()
}
return json.dumps(payload)
except Exception as ex:
_logger.exception('Error while getting ETA certificate')
return self._get_error_template(str(ex))
finally:
session.logout()
session.closeSession()
@http.route('/hw_l10n_eg_eta/sign', type='http', auth='none', cors='*', csrf=False, save_session=False, methods=['POST'])
def eta_sign(self, pin, access_token, invoices):
"""
Check if the access_token is valid and sign the invoices accessing the usb key with the pin.
@param pin: pin of the token
@param access_token: token shared with the main odoo instance
@param invoices: dictionary of invoices. Keys are invoices ids, value are the base64 encoded binaries to sign
"""
if not PyKCS11:
return self._get_error_template('no_pykcs11')
if not self._is_access_token_valid(access_token):
return self._get_error_template('unauthorized')
session, error = self._get_session(pin)
if error:
return error
try:
cert = session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_CERTIFICATE)])[0]
cert_id = session.getAttributeValue(cert, [PyKCS11.CKA_ID])[0]
priv_key = session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), (PyKCS11.CKA_ID, cert_id)])[0]
invoice_dict = dict()
invoices = json.loads(invoices)
for invoice, eta_inv in invoices.items():
to_sign = base64.b64decode(eta_inv)
signed_data = session.sign(priv_key, to_sign, PyKCS11.Mechanism(PyKCS11.CKM_SHA256_RSA_PKCS))
invoice_dict[invoice] = base64.b64encode(bytes(signed_data)).decode()
payload = {
'invoices': json.dumps(invoice_dict),
}
return json.dumps(payload)
except Exception as ex:
_logger.exception('Error while signing invoices')
return self._get_error_template(str(ex))
finally:
session.logout()
session.closeSession()
def _get_session(self, pin):
session = False
lib, error = self.get_crypto_lib()
if error:
return session, error
try:
pkcs11 = PyKCS11.PyKCS11Lib()
pkcs11.load(pkcs11dll_filename=lib)
except PyKCS11.PyKCS11Error:
return session, self._get_error_template('missing_dll')
slots = pkcs11.getSlotList(tokenPresent=True)
if not slots:
return session, self._get_error_template('no_drive')
if len(slots) > 1:
return session, self._get_error_template('multiple_drive')
try:
session = pkcs11.openSession(slots[0], PyKCS11.CKF_SERIAL_SESSION | PyKCS11.CKF_RW_SESSION)
session.login(pin)
except Exception as ex:
error = self._get_error_template(str(ex))
return session, error
def get_crypto_lib(self):
error = lib = False
system = platform.system()
if system == 'Linux':
lib = '/usr/lib/x86_64-linux-gnu/opensc-pkcs11.so'
elif system == 'Windows':
lib = 'C:/Windows/System32/eps2003csp11.dll'
elif system == 'Darwin':
lib = '/Library/OpenSC/lib/onepin-opensc-pkcs11.so'
else:
error = self._get_error_template('unsupported_system')
return lib, error
def _get_error_template(self, error_str):
return json.dumps({
'error': error_str,
})

View file

@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import serial
import time
import struct
import json
from functools import reduce
from odoo import http
from odoo.addons.hw_drivers.iot_handlers.drivers.SerialBaseDriver import SerialDriver, SerialProtocol, serial_connection
from odoo.addons.hw_drivers.main import iot_devices
_logger = logging.getLogger(__name__)
TremolG03Protocol = SerialProtocol(
name='Tremol G03',
baudrate=115200,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_NONE,
timeout=4,
writeTimeout=0.2,
measureRegexp=None,
statusRegexp=None,
commandTerminator=b'',
commandDelay=0.2,
measureDelay=3,
newMeasureDelay=0.2,
measureCommand=b'',
emptyAnswerValid=False,
)
STX = 0x02
ETX = 0x0A
ACK = 0x06
NACK = 0x15
# Dictionary defining the output size of expected from various commands
COMMAND_OUTPUT_SIZE = {
0x30: 7,
0x31: 7,
0x38: 157,
0x39: 155,
0x60: 40,
0x68: 23,
}
FD_ERRORS = {
0x30: 'OK',
0x32: 'Registers overflow',
0x33: 'Clock failure or incorrect date & time',
0x34: 'Opened fiscal receipt',
0x39: 'Incorrect password',
0x3b: '24 hours block - missing Z report',
0x3d: 'Interrupt power supply in fiscal receipt (one time until status is read)',
0x3e: 'Overflow EJ',
0x3f: 'Insufficient conditions',
}
COMMAND_ERRORS = {
0x30: 'OK',
0x31: 'Invalid command',
0x32: 'Illegal command',
0x33: 'Z daily report is not zero',
0x34: 'Syntax error',
0x35: 'Input registers orverflow',
0x36: 'Zero input registers',
0x37: 'Unavailable transaction for correction',
0x38: 'Insufficient amount on hand',
}
class TremolG03Driver(SerialDriver):
"""Driver for the Kenyan Tremol G03 fiscal device."""
_protocol = TremolG03Protocol
def __init__(self, identifier, device):
super().__init__(identifier, device)
self.device_type = 'fiscal_data_module'
self.message_number = 0
@classmethod
def get_default_device(cls):
fiscal_devices = list(filter(lambda d: iot_devices[d].device_type == 'fiscal_data_module', iot_devices))
return len(fiscal_devices) and iot_devices[fiscal_devices[0]]
@classmethod
def supported(cls, device):
"""Checks whether the device, which port info is passed as argument, is supported by the driver.
:param device: path to the device
:type device: str
:return: whether the device is supported by the driver
:rtype: bool
"""
protocol = cls._protocol
try:
protocol = cls._protocol
with serial_connection(device['identifier'], protocol) as connection:
connection.write(b'\x09')
time.sleep(protocol.commandDelay)
response = connection.read(1)
if response == b'\x40':
return True
except serial.serialutil.SerialTimeoutException:
pass
except Exception:
_logger.exception('Error while probing %s with protocol %s', device, protocol.name)
# ----------------
# HELPERS
# ----------------
@staticmethod
def generate_checksum(message):
""" Generate the checksum bytes for the bytes provided.
:param message: bytes representing the part of the message from which the checksum is calculated
:returns: two checksum bytes calculated from the message
This checksum is calculated as:
1) XOR of all bytes of the bytes
2) Conversion of the one XOR byte into the two bytes of the checksum by
adding 30h to each half-byte of the XOR
eg. to_check = \x12\x23\x34\x45\x56
XOR of all bytes in to_check = \x16
checksum generated as \x16 -> \x31 \x36
"""
xor = reduce(lambda a, b: a ^ b, message)
return bytes([(xor >> 4) + 0x30, (xor & 0xf) + 0x30])
# ----------------
# COMMUNICATION
# ----------------
def send(self, msgs):
""" Send and receive messages to/from the fiscal device over serial connection
Generate the wrapped message from the msgs and send them to the device.
The wrapping contains the <STX> (starting byte) <LEN> (length byte)
and <NBL> (message number byte) at the start and two <CS> (checksum
bytes), and the <ETX> line-feed byte at the end.
:param msgs: A list of byte strings representing the <CMD> and <DATA>
components of the serial message.
:return: A list of the responses (if any) from the device. If the
response is an ack, it wont be part of this list.
"""
with self._device_lock:
replies = []
for msg in msgs:
self.message_number += 1
core_message = struct.pack('BB%ds' % (len(msg)), len(msg) + 34, self.message_number + 32, msg)
request = struct.pack('B%ds2sB' % (len(core_message)), STX, core_message, self.generate_checksum(core_message), ETX)
time.sleep(self._protocol.commandDelay)
self._connection.write(request)
# If we know the expected output size, we can set the read
# buffer to match the size of the output.
output_size = COMMAND_OUTPUT_SIZE.get(msg[0])
if output_size:
try:
response = self._connection.read(output_size)
except serial.serialutil.SerialTimeoutException:
_logger.exception('Timeout error while reading response to command %s', msg)
self.data['status'] = "Device timeout error"
else:
time.sleep(self._protocol.measureDelay)
response = self._connection.read_all()
if not response:
self.data['status'] = "No response"
_logger.error("Sent request: %s,\n Received no response", request)
self.abort_post()
break
if response[0] == ACK:
# In the case where either byte is not 0x30, there has been an error
if response[2] != 0x30 or response[3] != 0x30:
self.data['status'] = response[2:4].decode('cp1251')
_logger.error(
"Sent request: %s,\n Received fiscal device error: %s \n Received command error: %s",
request, FD_ERRORS.get(response[2], 'Unknown fiscal device error'),
COMMAND_ERRORS.get(response[3], 'Unknown command error'),
)
self.abort_post()
break
replies.append('')
elif response[0] == NACK:
self.data['status'] = "Received NACK"
_logger.error("Sent request: %s,\n Received NACK \x15", request)
self.abort_post()
break
elif response[0] == 0x02:
self.data['status'] = "ok"
size = response[1] - 35
reply = response[4:4 + size]
replies.append(reply.decode('cp1251'))
return {'replies': replies, 'status': self.data['status']}
def abort_post(self):
""" Cancel the posting of the invoice
In the event of an error, it is better to try to cancel the posting of
the invoice, since the state of the invoice on the device will remain
open otherwise, blocking further invoices being sent.
"""
self.message_number += 1
abort = struct.pack('BBB', 35, self.message_number + 32, 0x39)
request = struct.pack('B3s2sB', STX, abort, self.generate_checksum(abort), ETX)
self._connection.write(request)
response = self._connection.read(COMMAND_OUTPUT_SIZE[0x39])
if response and response[0] == 0x02:
self.data['status'] += "\n The invoice was successfully cancelled"
_logger.info("Invoice successfully cancelled")
else:
self.data['status'] += "\n The invoice could not be cancelled."
_logger.error("Failed to cancel invoice, received response: %s", response)
class TremolG03Controller(http.Controller):
@http.route('/hw_proxy/l10n_ke_cu_send', type='http', auth='none', cors='*', csrf=False, save_session=False, methods=['POST'])
def l10n_ke_cu_send(self, messages, company_vat):
""" Posts the messages sent to this endpoint to the fiscal device connected to the server
:param messages: The messages (consisting of <CMD> and <DATA>) to
send to the fiscal device.
:returns: Dictionary containing a list of the responses from
fiscal device and status of the fiscal device.
"""
device = TremolG03Driver.get_default_device()
if device:
# First run the command to get the fiscal device numbers
device_numbers = device.send([b'\x60'])
# If the vat doesn't match, abort
if device_numbers['status'] != 'ok':
return device_numbers
serial_number, device_vat, _dummy = device_numbers['replies'][0].split(';')
if device_vat != company_vat:
return json.dumps({'status': 'The company vat number does not match that of the device'})
messages = json.loads(messages)
device.message_number = 0
resp = json.dumps({**device.send([msg.encode('cp1251') for msg in messages]), 'serial_number': serial_number})
return resp
else:
return json.dumps({'status': 'The fiscal device is not connected to the proxy server'})

View file

@ -0,0 +1,382 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from base64 import b64decode
from cups import IPPError, IPP_PRINTER_IDLE, IPP_PRINTER_PROCESSING, IPP_PRINTER_STOPPED
import dbus
import io
import logging
import netifaces as ni
import os
from PIL import Image, ImageOps
import re
import subprocess
import tempfile
from uuid import getnode as get_mac
from odoo import http
from odoo.addons.hw_drivers.connection_manager import connection_manager
from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
from odoo.addons.hw_drivers.driver import Driver
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.iot_handlers.interfaces.PrinterInterface_L import PPDs, conn, cups_lock
from odoo.addons.hw_drivers.main import iot_devices
from odoo.addons.hw_drivers.tools import helpers
_logger = logging.getLogger(__name__)
RECEIPT_PRINTER_COMMANDS = {
'star': {
'center': b'\x1b\x1d\x61\x01', # ESC GS a n
'cut': b'\x1b\x64\x02', # ESC d n
'title': b'\x1b\x69\x01\x01%s\x1b\x69\x00\x00', # ESC i n1 n2
'drawers': [b'\x07', b'\x1a'] # BEL & SUB
},
'escpos': {
'center': b'\x1b\x61\x01', # ESC a n
'cut': b'\x1d\x56\x41\n', # GS V m
'title': b'\x1b\x21\x30%s\x1b\x21\x00', # ESC ! n
'drawers': [b'\x1b\x3d\x01', b'\x1b\x70\x00\x19\x19', b'\x1b\x70\x01\x19\x19'] # ESC = n then ESC p m t1 t2
}
}
def cups_notification_handler(message, uri, device_identifier, state, reason, accepting_jobs):
if device_identifier in iot_devices:
reason = reason if reason != 'none' else None
state_value = {
IPP_PRINTER_IDLE: 'connected',
IPP_PRINTER_PROCESSING: 'processing',
IPP_PRINTER_STOPPED: 'stopped'
}
iot_devices[device_identifier].update_status(state_value[state], message, reason)
# Create a Cups subscription if it doesn't exist yet
try:
conn.getSubscriptions('/printers/')
except IPPError:
conn.createSubscription(
uri='/printers/',
recipient_uri='dbus://',
events=['printer-state-changed']
)
# Listen for notifications from Cups
bus = dbus.SystemBus()
bus.add_signal_receiver(cups_notification_handler, signal_name="PrinterStateChanged", dbus_interface="org.cups.cupsd.Notifier")
class PrinterDriver(Driver):
connection_type = 'printer'
def __init__(self, identifier, device):
super(PrinterDriver, self).__init__(identifier, device)
self.device_type = 'printer'
self.device_connection = device['device-class'].lower()
self.device_name = device['device-make-and-model']
self.state = {
'status': 'connecting',
'message': 'Connecting to printer',
'reason': None,
}
self.send_status()
self._actions.update({
'cashbox': self.open_cashbox,
'print_receipt': self.print_receipt,
'': self._action_default,
})
self.receipt_protocol = 'star' if 'STR_T' in device['device-id'] else 'escpos'
if 'direct' in self.device_connection and any(cmd in device['device-id'] for cmd in ['CMD:STAR;', 'CMD:ESC/POS;']):
self.print_status()
@classmethod
def supported(cls, device):
if device.get('supported', False):
return True
protocol = ['dnssd', 'lpd', 'socket']
if any(x in device['url'] for x in protocol) and device['device-make-and-model'] != 'Unknown' or 'direct' in device['device-class']:
model = cls.get_device_model(device)
ppdFile = ''
for ppd in PPDs:
if model and model in PPDs[ppd]['ppd-product']:
ppdFile = ppd
break
with cups_lock:
if ppdFile:
conn.addPrinter(name=device['identifier'], ppdname=ppdFile, device=device['url'])
else:
conn.addPrinter(name=device['identifier'], device=device['url'])
conn.setPrinterInfo(device['identifier'], device['device-make-and-model'])
conn.enablePrinter(device['identifier'])
conn.acceptJobs(device['identifier'])
conn.setPrinterUsersAllowed(device['identifier'], ['all'])
conn.addPrinterOptionDefault(device['identifier'], "usb-no-reattach", "true")
conn.addPrinterOptionDefault(device['identifier'], "usb-unidir", "true")
return True
return False
@classmethod
def get_device_model(cls, device):
device_model = ""
if device.get('device-id'):
for device_id in [device_lo for device_lo in device['device-id'].split(';')]:
if any(x in device_id for x in ['MDL', 'MODEL']):
device_model = device_id.split(':')[1]
break
elif device.get('device-make-and-model'):
device_model = device['device-make-and-model']
return re.sub(r"[\(].*?[\)]", "", device_model).strip()
@classmethod
def get_status(cls):
status = 'connected' if any(iot_devices[d].device_type == "printer" and iot_devices[d].device_connection == 'direct' for d in iot_devices) else 'disconnected'
return {'status': status, 'messages': ''}
def disconnect(self):
self.update_status('disconnected', 'Printer was disconnected')
super(PrinterDriver, self).disconnect()
def update_status(self, status, message, reason=None):
"""Updates the state of the current printer.
Args:
status (str): The new value of the status
message (str): A comprehensive message describing the status
reason (str): The reason fo the current status
"""
if self.state['status'] != status or self.state['reason'] != reason:
self.state = {
'status': status,
'message': message,
'reason': reason,
}
self.send_status()
def send_status(self):
""" Sends the current status of the printer to the connected Odoo instance.
"""
self.data = {
'value': '',
'state': self.state,
}
event_manager.device_changed(self)
def print_raw(self, data):
process = subprocess.Popen(["lp", "-d", self.device_identifier], stdin=subprocess.PIPE)
process.communicate(data)
if process.returncode != 0:
# The stderr isn't meaningful so we don't log it ('No such file or directory')
_logger.error('Printing failed: printer with the identifier "%s" could not be found',
self.device_identifier)
def print_receipt(self, data):
receipt = b64decode(data['receipt'])
im = Image.open(io.BytesIO(receipt))
# Convert to greyscale then to black and white
im = im.convert("L")
im = ImageOps.invert(im)
im = im.convert("1")
print_command = getattr(self, 'format_%s' % self.receipt_protocol)(im)
self.print_raw(print_command)
def format_star(self, im):
width = int((im.width + 7) / 8)
raster_init = b'\x1b\x2a\x72\x41'
raster_page_length = b'\x1b\x2a\x72\x50\x30\x00'
raster_send = b'\x62'
raster_close = b'\x1b\x2a\x72\x42'
raster_data = b''
dots = im.tobytes()
while len(dots):
raster_data += raster_send + width.to_bytes(2, 'little') + dots[:width]
dots = dots[width:]
return raster_init + raster_page_length + raster_data + raster_close
def format_escpos_bit_image_raster(self, im):
""" prints with the `GS v 0`-command """
width = int((im.width + 7) / 8)
raster_send = b'\x1d\x76\x30\x00'
max_slice_height = 255
raster_data = b''
dots = im.tobytes()
while len(dots):
im_slice = dots[:width*max_slice_height]
slice_height = int(len(im_slice) / width)
raster_data += raster_send + width.to_bytes(2, 'little') + slice_height.to_bytes(2, 'little') + im_slice
dots = dots[width*max_slice_height:]
return raster_data
def extract_columns_from_picture(self, im, line_height):
# Code inspired from python esc pos library:
# https://github.com/python-escpos/python-escpos/blob/4a0f5855ef118a2009b843a3a106874701d8eddf/src/escpos/image.py#L73-L89
width_pixels, height_pixels = im.size
for left in range(0, width_pixels, line_height):
box = (left, 0, left + line_height, height_pixels)
im_chunk = im.transform(
(line_height, height_pixels),
Image.EXTENT,
box
)
yield im_chunk.tobytes()
def format_escpos_bit_image_column(self, im, high_density_vertical=True,
high_density_horizontal=True,
size_scale=100):
""" prints with the `ESC *`-command
reference: https://reference.epson-biz.com/modules/ref_escpos/index.php?content_id=88
:param im: PIL image to print
:param high_density_vertical: print in high density in vertical direction
:param high_density_horizontal: print in high density in horizontal direction
:param size_scale: picture scale in percentage,
e.g: 50 -> half the size (horizontally and vertically)
"""
size_scale_ratio = size_scale / 100
size_scale_width = int(im.width * size_scale_ratio)
size_scale_height = int(im.height * size_scale_ratio)
im = im.resize((size_scale_width, size_scale_height))
# escpos ESC * command print column per column
# (instead of usual row by row).
# So we transpose the picture to ease the calculations
im = im.transpose(Image.ROTATE_270).transpose(Image.FLIP_LEFT_RIGHT)
# Most of the code here is inspired from python escpos library
# https://github.com/python-escpos/python-escpos/blob/4a0f5855ef118a2009b843a3a106874701d8eddf/src/escpos/escpos.py#L237C9-L251
ESC = b'\x1b'
density_byte = (1 if high_density_horizontal else 0) + \
(32 if high_density_vertical else 0)
nL = im.height & 0xFF
nH = (im.height >> 8) & 0xFF
HEADER = ESC + b'*' + bytes([density_byte, nL, nH])
raster_data = ESC + b'3\x10' # Adjust line-feed size
line_height = 24 if high_density_vertical else 8
for column in self.extract_columns_from_picture(im, line_height):
raster_data += HEADER + column + b'\n'
raster_data += ESC + b'2' # Reset line-feed size
return raster_data
def format_escpos(self, im):
# Epson support different command to print pictures.
# We use by default "GS v 0", but it is incompatible with certain
# printer models (like TM-U2x0)
# As we are pretty limited in the information that we have, we will
# use the printer name to parse some configuration value
# Printer name examples:
# EpsonTMM30
# -> Print using raster mode
# TM-U220__IMC_LDV_LDH_SCALE70__
# -> Print using column bit image mode (without vertical and
# horizontal density and a scale of 70%)
# Default image printing mode
image_mode = 'raster'
options_str = self.device_name.split('__')
option_str = ""
if len(options_str) > 2:
option_str = options_str[1].upper()
if option_str.startswith('IMC'):
image_mode = 'column'
if image_mode == 'column':
# Default printing mode parameters
high_density_vertical = True
high_density_horizontal = True
scale = 100
# Parse the printer name to get the needed parameters
# The separator need to not be filtered by `get_identifier`
options = option_str.split('_')
for option in options:
if option == 'LDV':
high_density_vertical = False
elif option == 'LDH':
high_density_horizontal = False
elif option.startswith('SCALE'):
scale_value_str = re.search(r'\d+$', option)
if scale_value_str is not None:
scale = int(scale_value_str.group())
else:
raise ValueError(
"Missing printer SCALE parameter integer "
"value in option: " + option)
res = self.format_escpos_bit_image_column(im,
high_density_vertical,
high_density_horizontal,
scale)
else:
res = self.format_escpos_bit_image_raster(im)
return res + RECEIPT_PRINTER_COMMANDS['escpos']['cut']
def print_status(self):
"""Prints the status ticket of the IoTBox on the current printer."""
wlan = ''
ip = ''
mac = ''
homepage = ''
pairing_code = ''
ssid = helpers.get_ssid()
wlan = '\nWireless network:\n%s\n\n' % ssid
interfaces = ni.interfaces()
ips = []
for iface_id in interfaces:
iface_obj = ni.ifaddresses(iface_id)
ifconfigs = iface_obj.get(ni.AF_INET, [])
for conf in ifconfigs:
if conf.get('addr') and conf.get('addr'):
ips.append(conf.get('addr'))
if len(ips) == 0:
ip = '\nERROR: Could not connect to LAN\n\nPlease check that the IoTBox is correc-\ntly connected with a network cable,\n that the LAN is setup with DHCP, and\nthat network addresses are available'
elif len(ips) == 1:
ip = '\nIP Address:\n%s\n' % ips[0]
else:
ip = '\nIP Addresses:\n%s\n' % '\n'.join(ips)
if len(ips) >= 1:
ips_filtered = [i for i in ips if i != '127.0.0.1']
main_ips = ips_filtered and ips_filtered[0] or '127.0.0.1'
mac = '\nMAC Address:\n%s\n' % helpers.get_mac_address()
homepage = '\nHomepage:\nhttp://%s:8069\n\n' % main_ips
code = connection_manager.pairing_code
if code:
pairing_code = '\nPairing Code:\n%s\n' % code
commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
title = commands['title'] % b'IoTBox Status'
self.print_raw(commands['center'] + title + b'\n' + wlan.encode() + mac.encode() + ip.encode() + homepage.encode() + pairing_code.encode() + commands['cut'])
def open_cashbox(self, data):
"""Sends a signal to the current printer to open the connected cashbox."""
commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
for drawer in commands['drawers']:
self.print_raw(drawer)
def _action_default(self, data):
self.print_raw(b64decode(data['document']))
class PrinterController(http.Controller):
@http.route('/hw_proxy/default_printer_action', type='json', auth='none', cors='*')
def default_printer_action(self, data):
printer = next((d for d in iot_devices if iot_devices[d].device_type == 'printer' and iot_devices[d].device_connection == 'direct'), None)
if printer:
iot_devices[printer].action(data)
return True
return False
proxy_drivers['printer'] = PrinterDriver

View file

@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from PIL import Image, ImageOps
import logging
from base64 import b64decode
import io
import win32print
import ghostscript
from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
from odoo.addons.hw_drivers.driver import Driver
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.main import iot_devices
from odoo.addons.hw_drivers.tools import helpers
from odoo.tools.mimetypes import guess_mimetype
_logger = logging.getLogger(__name__)
RECEIPT_PRINTER_COMMANDS = {
'star': {
'center': b'\x1b\x1d\x61\x01', # ESC GS a n
'cut': b'\x1b\x64\x02', # ESC d n
'title': b'\x1b\x69\x01\x01%s\x1b\x69\x00\x00', # ESC i n1 n2
'drawers': [b'\x07', b'\x1a'] # BEL & SUB
},
'escpos': {
'center': b'\x1b\x61\x01', # ESC a n
'cut': b'\x1d\x56\x41\n', # GS V m
'title': b'\x1b\x21\x30%s\x1b\x21\x00', # ESC ! n
'drawers': [b'\x1b\x3d\x01', b'\x1b\x70\x00\x19\x19', b'\x1b\x70\x01\x19\x19'] # ESC = n then ESC p m t1 t2
}
}
class PrinterDriver(Driver):
connection_type = 'printer'
def __init__(self, identifier, device):
super().__init__(identifier, device)
self.device_type = 'printer'
self.device_connection = 'network'
self.device_name = device.get('identifier')
self.printer_handle = device.get('printer_handle')
self.state = {
'status': 'connecting',
'message': 'Connecting to printer',
'reason': None,
}
self.send_status()
self._actions.update({
'cashbox': self.open_cashbox,
'print_receipt': self.print_receipt,
'': self._action_default,
})
self.receipt_protocol = 'escpos'
@classmethod
def supported(cls, device):
return True
@classmethod
def get_status(cls):
status = 'connected' if any(iot_devices[d].device_type == "printer" and iot_devices[d].device_connection == 'direct' for d in iot_devices) else 'disconnected'
return {'status': status, 'messages': ''}
def disconnect(self):
self.update_status('disconnected', 'Printer was disconnected')
super(PrinterDriver, self).disconnect()
def update_status(self, status, message, reason=None):
"""Updates the state of the current printer.
Args:
status (str): The new value of the status
message (str): A comprehensive message describing the status
reason (str): The reason fo the current status
"""
if self.state['status'] != status or self.state['reason'] != reason:
self.state = {
'status': status,
'message': message,
'reason': reason,
}
self.send_status()
def send_status(self):
""" Sends the current status of the printer to the connected Odoo instance.
"""
self.data = {
'value': '',
'state': self.state,
}
event_manager.device_changed(self)
def print_raw(self, data):
win32print.StartDocPrinter(self.printer_handle, 1, ('', None, "RAW"))
win32print.StartPagePrinter(self.printer_handle)
win32print.WritePrinter(self.printer_handle, data)
win32print.EndPagePrinter(self.printer_handle)
win32print.EndDocPrinter(self.printer_handle)
def print_report(self, data):
helpers.write_file('document.pdf', data, 'wb')
file_name = helpers.path_file('document.pdf')
printer = self.device_name
args = [
"-dPrinted", "-dBATCH", "-dNOPAUSE", "-dNOPROMPT"
"-q",
"-sDEVICE#mswinpr2",
f'-sOutputFile#%printer%{printer}',
f'{file_name}'
]
_logger.debug("Printing report with ghostscript using %s", args)
stderr_buf = io.BytesIO()
stdout_buf = io.BytesIO()
stdout_log_level = logging.DEBUG
try:
ghostscript.Ghostscript(*args, stdout=stdout_buf, stderr=stderr_buf)
except Exception:
_logger.exception("Error while printing report, ghostscript args: %s, error buffer: %s", args, stderr_buf.getvalue())
stdout_log_level = logging.ERROR # some stdout value might contains relevant error information
raise
finally:
_logger.log(stdout_log_level, "Ghostscript stdout: %s", stdout_buf.getvalue())
def print_receipt(self, data):
receipt = b64decode(data['receipt'])
im = Image.open(io.BytesIO(receipt))
# Convert to greyscale then to black and white
im = im.convert("L")
im = ImageOps.invert(im)
im = im.convert("1")
print_command = getattr(self, 'format_%s' % self.receipt_protocol)(im)
self.print_raw(print_command)
def format_escpos(self, im):
width = int((im.width + 7) / 8)
raster_send = b'\x1d\x76\x30\x00'
max_slice_height = 255
raster_data = b''
dots = im.tobytes()
while dots:
im_slice = dots[:width*max_slice_height]
slice_height = int(len(im_slice) / width)
raster_data += raster_send + width.to_bytes(2, 'little') + slice_height.to_bytes(2, 'little') + im_slice
dots = dots[width*max_slice_height:]
return raster_data + RECEIPT_PRINTER_COMMANDS['escpos']['cut']
def open_cashbox(self, data):
"""Sends a signal to the current printer to open the connected cashbox."""
commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
for drawer in commands['drawers']:
self.print_raw(drawer)
def _action_default(self, data):
document = b64decode(data['document'])
mimetype = guess_mimetype(document)
if mimetype == 'application/pdf':
self.print_report(document)
else:
self.print_raw(document)
proxy_drivers['printer'] = PrinterDriver

View file

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import namedtuple
from contextlib import contextmanager
import logging
import serial
from threading import Lock
import time
import traceback
from odoo import _
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.driver import Driver
_logger = logging.getLogger(__name__)
SerialProtocol = namedtuple(
'SerialProtocol',
"name baudrate bytesize stopbits parity timeout writeTimeout measureRegexp statusRegexp "
"commandTerminator commandDelay measureDelay newMeasureDelay "
"measureCommand emptyAnswerValid")
@contextmanager
def serial_connection(path, protocol, is_probing=False):
"""Opens a serial connection to a device and closes it automatically after use.
:param path: path to the device
:type path: string
:param protocol: an object containing the serial protocol to connect to a device
:type protocol: namedtuple
:param is_probing: a flag thet if set to `True` makes the timeouts longer, defaults to False
:type is_probing: bool, optional
"""
PROBING_TIMEOUT = 1
port_config = {
'baudrate': protocol.baudrate,
'bytesize': protocol.bytesize,
'stopbits': protocol.stopbits,
'parity': protocol.parity,
'timeout': PROBING_TIMEOUT if is_probing else protocol.timeout, # longer timeouts for probing
'writeTimeout': PROBING_TIMEOUT if is_probing else protocol.writeTimeout # longer timeouts for probing
}
connection = serial.Serial(path, **port_config)
yield connection
connection.close()
class SerialDriver(Driver):
"""Abstract base class for serial drivers."""
_protocol = None
connection_type = 'serial'
STATUS_CONNECTED = 'connected'
STATUS_ERROR = 'error'
STATUS_CONNECTING = 'connecting'
def __init__(self, identifier, device):
""" Attributes initialization method for `SerialDriver`.
:param device: path to the device
:type device: str
"""
super(SerialDriver, self).__init__(identifier, device)
self._actions.update({
'get_status': self._push_status,
})
self.device_connection = 'serial'
self._device_lock = Lock()
self._status = {'status': self.STATUS_CONNECTING, 'message_title': '', 'message_body': ''}
self._set_name()
def _get_raw_response(connection):
pass
def _push_status(self):
"""Updates the current status and pushes it to the frontend."""
self.data['status'] = self._status
event_manager.device_changed(self)
def _set_name(self):
"""Tries to build the device's name based on its type and protocol name but falls back on a default name if that doesn't work."""
try:
name = ('%s serial %s' % (self._protocol.name, self.device_type)).title()
except Exception:
name = 'Unknown Serial Device'
self.device_name = name
def _take_measure(self):
pass
def _do_action(self, data):
"""Helper function that calls a specific action method on the device.
:param data: the `_actions` key mapped to the action method we want to call
:type data: string
"""
with self._device_lock:
try:
self._actions[data['action']](data)
time.sleep(self._protocol.commandDelay)
except Exception:
msg = _(f'An error occurred while performing action "{data}" on "{self.device_name}"')
_logger.exception(msg)
self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
self._push_status()
self._status = {'status': self.STATUS_CONNECTED, 'message_title': '', 'message_body': ''}
self.data['status'] = self._status
def action(self, data):
"""Establish a connection with the device if needed and have it perform a specific action.
:param data: the `_actions` key mapped to the action method we want to call
:type data: string
"""
if self._connection and self._connection.isOpen():
self._do_action(data)
else:
with serial_connection(self.device_identifier, self._protocol) as connection:
self._connection = connection
self._do_action(data)
def run(self):
"""Continuously gets new measures from the device."""
try:
with serial_connection(self.device_identifier, self._protocol) as connection:
self._connection = connection
self._status['status'] = self.STATUS_CONNECTED
self._push_status()
while not self._stopped.is_set():
self._take_measure()
time.sleep(self._protocol.newMeasureDelay)
except Exception:
msg = _('Error while reading %s', self.device_name)
_logger.exception(msg)
self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
self._push_status()

View file

@ -0,0 +1,316 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from collections import namedtuple
import logging
import re
import serial
import threading
import time
from odoo import http
from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
from odoo.addons.hw_drivers.event_manager import event_manager
from odoo.addons.hw_drivers.iot_handlers.drivers.SerialBaseDriver import SerialDriver, SerialProtocol, serial_connection
_logger = logging.getLogger(__name__)
# Only needed to ensure compatibility with older versions of Odoo
ACTIVE_SCALE = None
new_weight_event = threading.Event()
ScaleProtocol = namedtuple('ScaleProtocol', SerialProtocol._fields + ('zeroCommand', 'tareCommand', 'clearCommand', 'autoResetWeight'))
# 8217 Mettler-Toledo (Weight-only) Protocol, as described in the scale's Service Manual.
# e.g. here: https://www.manualslib.com/manual/861274/Mettler-Toledo-Viva.html?page=51#manual
# Our recommended scale, the Mettler-Toledo "Ariva-S", supports this protocol on
# both the USB and RS232 ports, it can be configured in the setup menu as protocol option 3.
# We use the default serial protocol settings, the scale's settings can be configured in the
# scale's menu anyway.
Toledo8217Protocol = ScaleProtocol(
name='Toledo 8217',
baudrate=9600,
bytesize=serial.SEVENBITS,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_EVEN,
timeout=1,
writeTimeout=1,
measureRegexp=b"\x02\\s*([0-9.]+)N?\\r",
statusRegexp=b"\x02\\s*(\\?.)\\r",
commandDelay=0.2,
measureDelay=0.5,
newMeasureDelay=0.2,
commandTerminator=b'',
measureCommand=b'W',
zeroCommand=b'Z',
tareCommand=b'T',
clearCommand=b'C',
emptyAnswerValid=False,
autoResetWeight=False,
)
# The ADAM scales have their own RS232 protocol, usually documented in the scale's manual
# e.g at https://www.adamequipment.com/media/docs/Print%20Publications/Manuals/PDF/AZEXTRA/AZEXTRA-UM.pdf
# https://www.manualslib.com/manual/879782/Adam-Equipment-Cbd-4.html?page=32#manual
# Only the baudrate and label format seem to be configurable in the AZExtra series.
ADAMEquipmentProtocol = ScaleProtocol(
name='Adam Equipment',
baudrate=4800,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_NONE,
timeout=0.2,
writeTimeout=0.2,
measureRegexp=br"\s*([0-9.]+)kg", # LABEL format 3 + KG in the scale settings, but Label 1/2 should work
statusRegexp=None,
commandTerminator=b"\r\n",
commandDelay=0.2,
measureDelay=0.5,
# AZExtra beeps every time you ask for a weight that was previously returned!
# Adding an extra delay gives the operator a chance to remove the products
# before the scale starts beeping. Could not find a way to disable the beeps.
newMeasureDelay=5,
measureCommand=b'P',
zeroCommand=b'Z',
tareCommand=b'T',
clearCommand=None, # No clear command -> Tare again
emptyAnswerValid=True, # AZExtra does not answer unless a new non-zero weight has been detected
autoResetWeight=True, # AZExtra will not return 0 after removing products
)
# Ensures compatibility with older versions of Odoo
class ScaleReadOldRoute(http.Controller):
@http.route('/hw_proxy/scale_read', type='json', auth='none', cors='*')
def scale_read(self):
if ACTIVE_SCALE:
return {'weight': ACTIVE_SCALE._scale_read_old_route()}
return None
class ScaleDriver(SerialDriver):
"""Abstract base class for scale drivers."""
last_sent_value = None
def __init__(self, identifier, device):
super(ScaleDriver, self).__init__(identifier, device)
self.device_type = 'scale'
self._set_actions()
self._is_reading = True
# Ensures compatibility with older versions of Odoo
# Only the last scale connected is kept
global ACTIVE_SCALE
ACTIVE_SCALE = self
proxy_drivers['scale'] = ACTIVE_SCALE
# Ensures compatibility with older versions of Odoo
# and allows using the `ProxyDevice` in the point of sale to retrieve the status
def get_status(self):
"""Allows `hw_proxy.Proxy` to retrieve the status of the scales"""
status = self._status
return {'status': status['status'], 'messages': [status['message_title'], ]}
def _set_actions(self):
"""Initializes `self._actions`, a map of action keys sent by the frontend to backend action methods."""
self._actions.update({
'read_once': self._read_once_action,
'set_zero': self._set_zero_action,
'set_tare': self._set_tare_action,
'clear_tare': self._clear_tare_action,
'start_reading': self._start_reading_action,
'stop_reading': self._stop_reading_action,
})
def _start_reading_action(self, data):
"""Starts asking for the scale value."""
self._is_reading = True
def _stop_reading_action(self, data):
"""Stops asking for the scale value."""
self._is_reading = False
def _clear_tare_action(self, data):
"""Clears the scale current tare weight."""
# if the protocol has no clear tare command, we can just tare again
clearCommand = self._protocol.clearCommand or self._protocol.tareCommand
self._connection.write(clearCommand + self._protocol.commandTerminator)
def _read_once_action(self, data):
"""Reads the scale current weight value and pushes it to the frontend."""
self._read_weight()
self.last_sent_value = self.data['value']
event_manager.device_changed(self)
def _set_zero_action(self, data):
"""Makes the weight currently applied to the scale the new zero."""
self._connection.write(self._protocol.zeroCommand + self._protocol.commandTerminator)
def _set_tare_action(self, data):
"""Sets the scale's current weight value as tare weight."""
self._connection.write(self._protocol.tareCommand + self._protocol.commandTerminator)
@staticmethod
def _get_raw_response(connection):
"""Gets raw bytes containing the updated value of the device.
:param connection: a connection to the device's serial port
:type connection: pyserial.Serial
:return: the raw response to a weight request
:rtype: str
"""
answer = []
while True:
char = connection.read(1)
if not char:
break
else:
answer.append(bytes(char))
return b''.join(answer)
def _read_weight(self):
"""Asks for a new weight from the scale, checks if it is valid and, if it is, makes it the current value."""
protocol = self._protocol
self._connection.write(protocol.measureCommand + protocol.commandTerminator)
answer = self._get_raw_response(self._connection)
match = re.search(self._protocol.measureRegexp, answer)
if match:
self.data = {
'value': float(match.group(1)),
'status': self._status
}
# Ensures compatibility with older versions of Odoo
def _scale_read_old_route(self):
"""Used when the iot app is not installed"""
with self._device_lock:
self._read_weight()
return self.data['value']
def _take_measure(self):
"""Reads the device's weight value, and pushes that value to the frontend."""
with self._device_lock:
self._read_weight()
if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR:
self.last_sent_value = self.data['value']
event_manager.device_changed(self)
class Toledo8217Driver(ScaleDriver):
"""Driver for the Toldedo 8217 serial scale."""
_protocol = Toledo8217Protocol
def __init__(self, identifier, device):
super(Toledo8217Driver, self).__init__(identifier, device)
self.device_manufacturer = 'Toledo'
@classmethod
def supported(cls, device):
"""Checks whether the device, which port info is passed as argument, is supported by the driver.
:param device: path to the device
:type device: str
:return: whether the device is supported by the driver
:rtype: bool
"""
protocol = cls._protocol
try:
with serial_connection(device['identifier'], protocol, is_probing=True) as connection:
connection.write(b'Ehello' + protocol.commandTerminator)
time.sleep(protocol.commandDelay)
answer = connection.read(8)
if answer == b'\x02E\rhello':
connection.write(b'F' + protocol.commandTerminator)
return True
except serial.serialutil.SerialTimeoutException:
pass
except Exception:
_logger.exception('Error while probing %s with protocol %s' % (device, protocol.name))
return False
class AdamEquipmentDriver(ScaleDriver):
"""Driver for the Adam Equipment serial scale."""
_protocol = ADAMEquipmentProtocol
priority = 0 # Test the supported method of this driver last, after all other serial drivers
def __init__(self, identifier, device):
super(AdamEquipmentDriver, self).__init__(identifier, device)
self._is_reading = False
self._last_weight_time = 0
self.device_manufacturer = 'Adam'
def _check_last_weight_time(self):
"""The ADAM doesn't make the difference between a value of 0 and "the same value as last time":
in both cases it returns an empty string.
With this, unless the weight changes, we give the user `TIME_WEIGHT_KEPT` seconds to log the new weight,
then change it back to zero to avoid keeping it indefinetely, which could cause issues.
In any case the ADAM must always go back to zero before it can weight again.
"""
TIME_WEIGHT_KEPT = 10
if self.data['value'] is None:
if time.time() - self._last_weight_time > TIME_WEIGHT_KEPT:
self.data['value'] = 0
else:
self._last_weight_time = time.time()
def _take_measure(self):
"""Reads the device's weight value, and pushes that value to the frontend."""
if self._is_reading:
with self._device_lock:
self._read_weight()
self._check_last_weight_time()
if self.data['value'] != self.last_sent_value or self._status['status'] == self.STATUS_ERROR:
self.last_sent_value = self.data['value']
event_manager.device_changed(self)
else:
time.sleep(0.5)
# Ensures compatibility with older versions of Odoo
def _scale_read_old_route(self):
"""Used when the iot app is not installed"""
time.sleep(3)
with self._device_lock:
self._read_weight()
self._check_last_weight_time()
return self.data['value']
@classmethod
def supported(cls, device):
"""Checks whether the device at `device` is supported by the driver.
:param device: path to the device
:type device: str
:return: whether the device is supported by the driver
:rtype: bool
"""
protocol = cls._protocol
try:
with serial_connection(device['identifier'], protocol, is_probing=True) as connection:
connection.write(protocol.measureCommand + protocol.commandTerminator)
# Checking whether writing to the serial port using the Adam protocol raises a timeout exception is about the only thing we can do.
return True
except serial.serialutil.SerialTimeoutException:
pass
except Exception:
_logger.exception('Error while probing %s with protocol %s' % (device, protocol.name))
return False

View file

@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from re import sub, finditer
import subprocess
import RPi.GPIO as GPIO
import logging
from odoo.addons.hw_drivers.interface import Interface
_logger = logging.getLogger(__name__)
try:
from vcgencmd import Vcgencmd
except ImportError:
Vcgencmd = None
_logger.warning('Could not import library vcgencmd')
class DisplayInterface(Interface):
_loop_delay = 0
connection_type = 'display'
def get_devices(self):
# If no display connected, create "fake" device to be accessed from another computer
display_devices = {
'distant_display' : {
'name': "Distant Display",
},
}
if Vcgencmd:
return self.get_devices_vcgencmd() or display_devices
else:
return self.get_devices_tvservice() or display_devices
def get_devices_tvservice(self):
display_devices = {}
displays = subprocess.check_output(['tvservice', '-l']).decode()
x_screen = 0
for match in finditer(r'Display Number (\d), type HDMI (\d)', displays):
display_id, hdmi_id = match.groups()
tvservice_output = subprocess.check_output(['tvservice', '-nv', display_id]).decode().strip()
if tvservice_output:
display_name = tvservice_output.split('=')[1]
display_identifier = sub('[^a-zA-Z0-9 ]+', '', display_name).replace(' ', '_') + "_" + str(hdmi_id)
iot_device = {
'identifier': display_identifier,
'name': display_name,
'x_screen': str(x_screen),
}
display_devices[display_identifier] = iot_device
x_screen += 1
return display_devices
def get_devices_vcgencmd(self):
"""
With the new IoT build 23_11 which uses Raspi OS Bookworm,
tvservice is no longer usable.
vcgencmd returns the display power state as on or off of the display whose ID is passed as the parameter.
The display ID for the preceding three methods are determined by the following table.
Display ID
Main LCD 0
Secondary LCD 1
HDMI 0 2
Composite 3
HDMI 1 7
"""
display_devices = {}
x_screen = 0
hdmi_port = {'hdmi_0' : 2} # HDMI 0
rpi_type = GPIO.RPI_INFO.get('TYPE')
# Check if it is a RPI 3B+ beacause he response on for booth hdmi port
if 'Pi 4' in rpi_type:
hdmi_port.update({'hdmi_1': 7}) # HDMI 1
try:
for hdmi in hdmi_port:
power_state_hdmi = Vcgencmd().display_power_state(hdmi_port.get(hdmi))
if power_state_hdmi == 'on':
iot_device = {
'identifier': hdmi,
'name': 'Display hdmi ' + str(x_screen),
'x_screen': str(x_screen),
}
display_devices[hdmi] = iot_device
x_screen += 1
except subprocess.CalledProcessError:
_logger.warning('Vcgencmd "display_power_state" method call failed')
return display_devices

View file

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from cups import Connection as cups_connection
from re import sub
from threading import Lock
from odoo.addons.hw_drivers.interface import Interface
conn = cups_connection()
PPDs = conn.getPPDs()
cups_lock = Lock() # We can only make one call to Cups at a time
class PrinterInterface(Interface):
_loop_delay = 120
connection_type = 'printer'
printer_devices = {}
def get_devices(self):
discovered_devices = {}
with cups_lock:
printers = conn.getPrinters()
devices = conn.getDevices()
for printer_name, printer in printers.items():
path = printer.get('device-uri', False)
if printer_name != self.get_identifier(path):
printer.update({'supported': True}) # these printers are automatically supported
device_class = 'network'
if 'usb' in printer.get('device-uri'):
device_class = 'direct'
printer.update({'device-class': device_class})
printer.update({'device-make-and-model': printer_name}) # give name setted in Cups
printer.update({'device-id': ''})
devices.update({printer_name: printer})
for path, device in devices.items():
identifier = self.get_identifier(path)
device.update({'identifier': identifier})
device.update({'url': path})
device.update({'disconnect_counter': 0})
discovered_devices.update({identifier: device})
self.printer_devices.update(discovered_devices)
# Deal with devices which are on the list but were not found during this call of "get_devices"
# If they aren't detected 3 times consecutively, remove them from the list of available devices
for device in list(self.printer_devices):
if not discovered_devices.get(device):
disconnect_counter = self.printer_devices.get(device).get('disconnect_counter')
if disconnect_counter >= 2:
self.printer_devices.pop(device, None)
else:
self.printer_devices[device].update({'disconnect_counter': disconnect_counter + 1})
return dict(self.printer_devices)
def get_identifier(self, path):
"""
Necessary because the path is not always a valid Cups identifier,
as it may contain characters typically found in URLs or paths.
- Removes characters: ':', '/', '.', '\', and space.
- Removes the exact strings: "uuid=" and "serial=".
Example 1:
Input: "ipp://printers/printer1:1234/abcd"
Output: "ippprintersprinter11234abcd"
Example 2:
Input: "uuid=1234-5678-90ab-cdef"
Output: "1234-5678-90ab-cdef
"""
return sub(r'[:\/\.\\ ]|(uuid=)|(serial=)', '', path)

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import win32print
from odoo.addons.hw_drivers.interface import Interface
class PrinterInterface(Interface):
_loop_delay = 30
connection_type = 'printer'
def get_devices(self):
printer_devices = {}
printers = win32print.EnumPrinters(win32print.PRINTER_ENUM_LOCAL)
for printer in printers:
identifier = printer[2]
handle_printer = win32print.OpenPrinter(identifier)
win32print.GetPrinter(handle_printer, 2)
printer_devices[identifier] = {
'identifier': identifier,
'printer_handle': handle_printer,
}
return printer_devices

View file

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import serial.tools.list_ports
from odoo.addons.hw_drivers.interface import Interface
class SerialInterface(Interface):
connection_type = 'serial'
def get_devices(self):
serial_devices = {}
for port in serial.tools.list_ports.comports():
serial_devices[port.device] = {
'identifier': port.device
}
return serial_devices

View file

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from usb import core
from odoo.addons.hw_drivers.interface import Interface
class USBInterface(Interface):
connection_type = 'usb'
def get_devices(self):
"""
USB devices are identified by a combination of their `idVendor` and
`idProduct`. We can't be sure this combination in unique per equipment.
To still allow connecting multiple similar equipments, we complete the
identifier by a counter. The drawbacks are we can't be sure the equipments
will get the same identifiers after a reboot or a disconnect/reconnect.
"""
usb_devices = {}
devs = core.find(find_all=True)
cpt = 2
for dev in devs:
identifier = "usb_%04x:%04x" % (dev.idVendor, dev.idProduct)
if identifier in usb_devices:
identifier += '_%s' % cpt
cpt += 1
usb_devices[identifier] = dev
return usb_devices