To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 63474a32 authored by Henrik Menne's avatar Henrik Menne
Browse files

Merge remote-tracking branch 'remotes/origin/devel' into 35-enum-for-labjack

# Conflicts:
#	hvl_ccb/dev/labjack.py
parents d1c49daa 88659a43
Pipeline #53121 passed with stages
in 2 minutes and 57 seconds
......@@ -4,8 +4,9 @@
from __future__ import annotations
import warnings
from typing import List, Union
from typing import Dict, Iterable, List, Union
from aenum import EnumMeta
from labjack.ljm import constants
from ..utils.enum import AutoNumberNameEnum, NameEnum
......@@ -18,6 +19,7 @@ class TSeriesDIOChannel(NameEnum):
NOTE: not all DIO addresses are available on all devices. This is defined as
`dio` attribute of `LabJackDeviceType`.
"""
FIO0 = 0
FIO1 = 1
FIO2 = 2
......@@ -43,12 +45,17 @@ class TSeriesDIOChannel(NameEnum):
MIO2 = 22
LabJackDeviceTypeBase = AutoNumberNameEnum
LabJackDeviceTypeMetaBase = LabJackDeviceTypeBase.__class__
def _build_p_id_lookup_dict(
lab_jack_device_types: Iterable["DeviceType"],
) -> Dict[int, List]:
"""
Build lookup dictionary of `DeviceType` instances based on their `p_id`. Note:
`p_id` is not unique for each device type.
def _build_p_id_lookup_dict(lab_jack_device_types):
ret = dict()
:param lab_jack_device_types: `DeviceType` instances to iterate over
:return: `int`-based lookup dictionary
"""
ret: Dict[int, List] = dict()
for lab_jack_device_type in lab_jack_device_types:
if lab_jack_device_type.p_id not in ret:
ret[lab_jack_device_type.p_id] = list()
......@@ -56,9 +63,10 @@ def _build_p_id_lookup_dict(lab_jack_device_types):
return ret
class DeviceTypeMeta(LabJackDeviceTypeMetaBase):
# NOTE: super metaclass has to match metaclass of `super(DeviceType)`!
class DeviceTypeMeta(EnumMeta):
def __new__(metacls, clsname, bases, clsdict, **kwargs):
cls = LabJackDeviceTypeMetaBase.__new__(
cls = EnumMeta.__new__(
metacls, clsname, bases, clsdict, **kwargs
)
cls._get_by_p_id = _build_p_id_lookup_dict(cls)
......@@ -69,7 +77,7 @@ class AmbiguousProductIdWarning(UserWarning):
pass
class DeviceType(LabJackDeviceTypeBase, metaclass=DeviceTypeMeta):
class DeviceType(AutoNumberNameEnum, metaclass=DeviceTypeMeta):
"""
LabJack device types.
......
......@@ -16,6 +16,7 @@ from .modbus_tcp import ( # noqa: F401
from .opc import ( # noqa: F401
OpcUaCommunication,
OpcUaCommunicationConfig,
OpcUaCommunicationIOError,
OpcUaSubHandler,
)
from .serial import ( # noqa: F401
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for LabJack using the LJM Library.
Originally developed and tested for LabJack T7-PRO.
......@@ -38,7 +40,7 @@ class LJMCommunicationConfig:
DeviceType = labjack.DeviceType
#: Can be either string 'ANY', 'T7_PRO', 'T7', 'T4', or of enum :class:`DeviceType`.
device_type: (str, DeviceType) = "ANY" # type: ignore
device_type: Union[str, labjack.DeviceType] = "ANY"
class ConnectionType(AutoNumberNameEnum):
"""
......@@ -52,7 +54,7 @@ class LJMCommunicationConfig:
WIFI = ()
#: Can be either string or of enum :class:`ConnectionType`.
connection_type: (str, ConnectionType) = "ANY" # type: ignore
connection_type: Union[str, ConnectionType] = "ANY"
identifier: str = "ANY"
"""
......@@ -67,10 +69,12 @@ identifier-parameter) for more information.
Performs value checks on device_type and connection_type.
"""
if not isinstance(self.device_type, self.DeviceType):
self.force_value("device_type", self.DeviceType(self.device_type))
self.force_value( # type: ignore
"device_type", self.DeviceType(self.device_type)
)
if not isinstance(self.connection_type, self.ConnectionType):
self.force_value(
self.force_value( # type: ignore
"connection_type", self.ConnectionType(self.connection_type)
)
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol implementing an OPC UA connection.
This protocol is used to interface with the "Supercube" PLC from Siemens.
"""
import logging
from typing import Iterable, Union
from functools import wraps
from socket import gaierror
from typing import Iterable, Union, Optional
from opcua import Client, Node, Subscription
from opcua.ua import NodeId, DataValue
from opcua.ua import NodeId, DataValue, UaError
from .base import CommunicationProtocol
from ..configuration import configdataclass
......@@ -26,11 +30,11 @@ class OpcUaSubHandler:
def datachange_notification(self, node, val, data):
logging.getLogger(__name__).debug(
'OPCUA Datachange event: {} to value {}'.format(node, val)
f"OPCUA Datachange event: {node} to value {val}"
)
def event_notification(self, event):
logging.getLogger(__name__).debug('OPCUA Event: {}'.format(event))
logging.getLogger(__name__).debug(f"OPCUA Event: {event}")
@configdataclass
......@@ -55,6 +59,59 @@ class OpcUaCommunicationConfig:
update_period: int = 500
class OpcUaCommunicationIOError(IOError):
"""OPC-UA communication I/O error."""
def _wrap_ua_error(method):
"""
Wrap any `UaError` raised from a `OpcUaCommunication` method into
`OpcUaCommunicationIOError`; additionally, log source error.
:param method: `OpcUaCommunication` instance method to wrap
:return: Whatever `method` returns
"""
@wraps(method)
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
except UaError as e:
self.logger.error(f"UA error: {str(e)}")
raise OpcUaCommunicationIOError from e
except gaierror as e:
self.logger.error(f"Socket address error: {str(e)}")
raise OpcUaCommunicationIOError from e
return wrapper
def _require_ua_opened(method):
"""
Check if `opcua.client.ua_client.UaClient` socket is opened and raise an
`OpcUaCommunicationIOError` if not.
NOTE: this checks should be implemented downstream in
`opcua.client.ua_client.UaClient`; currently you get `AttributeError: 'NoneType'
object has no attribute ...`.
:param method: `OpcUaCommunication` instance method to wrap
:return: Whatever `method` returns
"""
@wraps(method)
def wrapper(self, *args, **kwargs):
# BLAH: this checks should be implemented downstream in
# `opcua.client.ua_client.UaClient`
if self._client.uaclient._uasocket is None:
err_msg = f"Client's socket is not set in {str(self)}. Was it opened?"
self.logger.error(err_msg)
raise OpcUaCommunicationIOError(err_msg)
return method(self, *args, **kwargs)
return wrapper
class OpcUaCommunication(CommunicationProtocol):
"""
Communication protocol implementing an OPC UA connection.
......@@ -72,38 +129,35 @@ class OpcUaCommunication(CommunicationProtocol):
self.logger = logging.getLogger(__name__)
url = (
'opc.tcp://{host}:{port}/{endpoint}'.format(
host=self.config.host,
port=self.config.port,
endpoint=self.config.endpoint_name,
)
)
conf = self.config
url = f"opc.tcp://{conf.host}:{conf.port}/{conf.endpoint_name}"
self.logger.info('Create OPC UA client to URL: {}'.format(url))
self.logger.info(f"Create OPC UA client to URL: {url}")
self._client = Client(url)
# the objects node exists on evere OPC UA server and is the root for all
# objects.
self._objects_node = None # type: Node
# the objects node exists on every OPC UA server and are root for all objects.
self._objects_node: Optional[Node] = None
# subscription handler
self._sub_handler = self.config.sub_handler
# subscription object
self._subscription = None # type: Subscription
self._subscription: Optional[Subscription] = None
@staticmethod
def config_cls():
return OpcUaCommunicationConfig
@_wrap_ua_error
def open(self) -> None:
"""
Open the communication to the OPC UA server.
:raises OpcUaCommunicationIOError: when communication port cannot be opened.
"""
self.logger.info('Open connection to OPC server.')
self.logger.info("Open connection to OPC server.")
with self.access_lock:
self._client.connect()
# in example from opcua, load_type_definitions() is called after connect(
......@@ -112,18 +166,44 @@ class OpcUaCommunication(CommunicationProtocol):
# self._client.load_type_definitions()
self._objects_node = self._client.get_objects_node()
self._subscription = self._client.create_subscription(
self.config.update_period, self._sub_handler)
self.config.update_period, self._sub_handler
)
@property
def is_open(self) -> bool:
"""
Flag indicating if the communication port is open.
:return: `True` if the port is open, otherwise `False`
"""
open_called = self._objects_node or self._subscription
if open_called:
try:
self._client.send_hello()
return True
except UaError as e:
self.logger.info(f"Sending hello returned UA error: {str(e)}")
# try cleanup in case connection was opened before but now is lost
if open_called:
self.close()
return False
def close(self) -> None:
"""
Close the connection to the OPC UA server.
"""
self.logger.info('Close connection to OPC server.')
self.logger.info("Close connection to OPC server.")
with self.access_lock:
self._subscription.delete()
if self._subscription:
self._subscription.delete()
self._subscription = None
if self._objects_node:
self._objects_node = None
self._client.disconnect()
@_require_ua_opened
@_wrap_ua_error
def read(self, node_id, ns_index):
"""
Read a value from a node with id and namespace index.
......@@ -131,12 +211,17 @@ class OpcUaCommunication(CommunicationProtocol):
:param node_id: the ID of the node to read the value from
:param ns_index: the namespace index of the node
:return: the value of the node object.
:raises OpcUaCommunicationIOError: when protocol was not opened or can't
communicate with a OPC UA server
"""
with self.access_lock:
return self._client.get_node(
NodeId(identifier=node_id, namespaceidx=ns_index)).get_value()
NodeId(identifier=node_id, namespaceidx=ns_index)
).get_value()
@_require_ua_opened
@_wrap_ua_error
def write(self, node_id, ns_index, value) -> None:
"""
Write a value to a node with name ``name``.
......@@ -144,6 +229,8 @@ class OpcUaCommunication(CommunicationProtocol):
:param node_id: the id of the node to write the value to.
:param ns_index: the namespace index of the node.
:param value: the value to write.
:raises OpcUaCommunicationIOError: when protocol was not opened or can't
communicate with a OPC UA server
"""
with self.access_lock:
......@@ -151,25 +238,36 @@ class OpcUaCommunication(CommunicationProtocol):
NodeId(identifier=node_id, namespaceidx=ns_index)
).set_value(DataValue(value))
def init_monitored_nodes(self, node_id: Union[str, Iterable[str]],
ns_index: int) -> None:
@_require_ua_opened
@_wrap_ua_error
def init_monitored_nodes(
self, node_id: Union[str, Iterable[str]], ns_index: int
) -> None:
"""
Initialize monitored nodes.
:param node_id: one or more strings of node IDs.
:param ns_index: the namespace index the nodes belong to.
:raises OpcUaCommunicationIOError: when protocol was not opened or can't
communicate with a OPC UA server
"""
if not self._subscription:
err_msg = f"Missing subscription in {str(self)}. Was it opened?"
self.logger.error(err_msg)
raise OpcUaCommunicationIOError(err_msg)
ids: Iterable[str] = ()
if isinstance(node_id, str):
ids = [node_id]
ids = (node_id,)
else:
ids = node_id
nodes = []
for id_ in ids:
nodes.append(self._client.get_node(
NodeId(identifier=id_, namespaceidx=ns_index)
))
nodes.append(
self._client.get_node(NodeId(identifier=id_, namespaceidx=ns_index))
)
with self.access_lock:
self._subscription.subscribe_data_change(nodes)
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for serial ports. Makes use of the `pySerial
<https://pythonhosted.org/pyserial/index.html>`_ library.
"""
from typing import Union, cast
# Note: PyCharm does not recognize the dependency correctly, it is added as pyserial.
import serial
......@@ -10,47 +14,59 @@ from .base import CommunicationProtocol
from ..configuration import configdataclass
from ..utils.enum import ValueEnum, unique
Number = Union[int, float]
class SerialCommunicationIOError(IOError):
"""Serial communication related I/O errors."""
@unique
class SerialCommunicationParity(ValueEnum):
"""
Serial communication parity.
"""
EVEN = serial.PARITY_EVEN
MARK = serial.PARITY_MARK
NAMES = serial.PARITY_NAMES
NONE = serial.PARITY_NONE
ODD = serial.PARITY_ODD
SPACE = serial.PARITY_SPACE
@unique
class SerialCommunicationStopbits(ValueEnum):
"""
Serial communication stopbits.
"""
ONE = serial.STOPBITS_ONE
ONE_POINT_FIVE = serial.STOPBITS_ONE_POINT_FIVE
TWO = serial.STOPBITS_TWO
@unique
class SerialCommunicationBytesize(ValueEnum):
"""
Serial communication bytesize.
"""
FIVEBITS = serial.FIVEBITS
SIXBITS = serial.SIXBITS
SEVENBITS = serial.SEVENBITS
EIGHTBITS = serial.EIGHTBITS
@configdataclass
class SerialCommunicationConfig:
"""
Configuration dataclass for :class:`SerialCommunication`.
"""
@unique
class Parity(ValueEnum):
"""
Serial communication parity.
"""
EVEN = serial.PARITY_EVEN
MARK = serial.PARITY_MARK
NAMES = serial.PARITY_NAMES
NONE = serial.PARITY_NONE
ODD = serial.PARITY_ODD
SPACE = serial.PARITY_SPACE
@unique
class Stopbits(ValueEnum):
"""
Serial communication stopbits.
"""
ONE = serial.STOPBITS_ONE
ONE_POINT_FIVE = serial.STOPBITS_ONE_POINT_FIVE
TWO = serial.STOPBITS_TWO
@unique
class Bytesize(ValueEnum):
"""
Serial communication bytesize.
"""
FIVEBITS = serial.FIVEBITS
SIXBITS = serial.SIXBITS
SEVENBITS = serial.SEVENBITS
EIGHTBITS = serial.EIGHTBITS
Parity = SerialCommunicationParity
Stopbits = SerialCommunicationStopbits
Bytesize = SerialCommunicationBytesize
#: Port is a string referring to a COM-port (e.g. ``'COM3'``) or a URL.
#: The full list of capabilities is found `on the pyserial documentation
......@@ -61,33 +77,33 @@ class SerialCommunicationConfig:
baudrate: int
#: Parity to be used for the connection.
parity: (str, Parity) # type: ignore
parity: Union[str, SerialCommunicationParity]
#: Stopbits setting, can be 1, 1.5 or 2.
stopbits: (int, float, Stopbits) # type: ignore
stopbits: Union[Number, SerialCommunicationStopbits]
#: Size of a byte, 5 to 8
bytesize: (int, Bytesize) # type: ignore
bytesize: Union[int, SerialCommunicationBytesize]
#: The terminator character. Typically this is ``b'\r\n'`` or ``b'\n'``, but can
#: also be ``b'\r'`` or other combinations.
terminator: bytes = b'\r\n'
terminator: bytes = b"\r\n"
#: Timeout in seconds for the serial port
timeout: (int, float) = 2 # type: ignore
timeout: Number = 2
def clean_values(self):
if not isinstance(self.parity, self.Parity):
self.force_value('parity', self.Parity(self.parity))
if not isinstance(self.parity, SerialCommunicationParity):
self.force_value("parity", SerialCommunicationParity(self.parity))
if not isinstance(self.stopbits, self.Stopbits):
self.force_value('stopbits', self.Stopbits(self.stopbits))
if not isinstance(self.stopbits, SerialCommunicationStopbits):
self.force_value("stopbits", SerialCommunicationStopbits(self.stopbits))
if not isinstance(self.bytesize, self.Bytesize):
self.force_value('bytesize', self.Bytesize(self.bytesize))
if not isinstance(self.bytesize, SerialCommunicationBytesize):
self.force_value("bytesize", SerialCommunicationBytesize(self.bytesize))
if self.timeout < 0:
raise ValueError('Timeout has to be >= 0.')
raise ValueError("Timeout has to be >= 0.")
def create_serial_port(self) -> serial.Serial:
"""
......@@ -100,9 +116,9 @@ class SerialCommunicationConfig:
assert not ser.is_open
ser.baudrate = self.baudrate
ser.parity = self.parity.value
ser.stopbits = self.stopbits.value
ser.bytesize = self.bytesize.value
ser.parity = cast(SerialCommunicationParity, self.parity).value
ser.stopbits = cast(SerialCommunicationStopbits, self.stopbits).value
ser.bytesize = cast(SerialCommunicationBytesize, self.bytesize).value
ser.timeout = self.timeout
return ser
......@@ -113,8 +129,8 @@ class SerialCommunication(CommunicationProtocol):
Implements the Communication Protocol for serial ports.
"""
ENCODING = 'utf-8'
UNICODE_HANDLING = 'replace'
ENCODING = "utf-8"
UNICODE_HANDLING = "replace"
def __init__(self, configuration):
"""
......@@ -142,7 +158,7 @@ class SerialCommunication(CommunicationProtocol):
self._serial_port.open()
except serial.SerialException as exc:
# ignore when port is already open
if str(exc) != 'Port is already open.':
if str(exc) != "Port is already open.":
raise SerialCommunicationIOError from exc
def close(self):
......@@ -172,8 +188,10 @@ class SerialCommunication(CommunicationProtocol):
:raises SerialCommunicationIOError: when communication port is not opened
"""
try:
self._serial_port.write(text.encode(self.ENCODING, self.UNICODE_HANDLING)
+ self.config.terminator)
self._serial_port.write(
text.encode(self.ENCODING, self.UNICODE_HANDLING)
+ self.config.terminator
)
except serial.SerialException as exc:
raise SerialCommunicationIOError from exc
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for VISA. Makes use of the pyvisa library.
The backend can be NI-Visa or pyvisa-py.
......@@ -11,7 +13,7 @@ So far only TCPIP SOCKET and TCPIP INSTR interfaces are supported.
import importlib
import logging
from time import sleep
from typing import Tuple, Union, Type
from typing import Tuple, Union, Type, Optional
import visa
from IPy import IP
......@@ -28,6 +30,7 @@ class VisaCommunicationError(Exception):
"""
Base class for VisaCommunication errors.
"""
pass
......@@ -42,7 +45,7 @@ class VisaCommunicationConfig:
Supported VISA Interface types.
"""
_init_ = 'value _address_template'
_init_ = "value _address_template"
#: VISA-RAW protocol
TCPIP_SOCKET = (), "TCPIP{board}::{host}::{port}::SOCKET"
......@@ -60,17 +63,13 @@ class VisaCommunicationConfig:
:return: address string
"""
return self._address_template.format(
board=board,
host=host,
port=port,
)
return self._address_template.format(board=board, host=host, port=port,)
#: IP address of the VISA device. DNS names are currently unsupported.
host: str
#: Interface type of the VISA connection, being one of :class:`InterfaceType`.
interface_type: (str, InterfaceType) # type: ignore
interface_type: Union[str, InterfaceType]
#: Board number is typically 0 and comes from old bus systems.
board: int = 0
......@@ -89,12 +88,12 @@ class VisaCommunicationConfig:
open_timeout: int = 1000
#: Write termination character.
write_termination: str = '\n'
write_termination: str = "\n"
#: Read termination character.
read_termination: str = '\n'