To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 20c56fb9 authored by mikolajr's avatar mikolajr
Browse files

Merge branch '35-enum-for-labjack' into 'devel'

Resolve "Enum for Labjack"

Closes #35

See merge request !35
parents 7602bf71 3b0221b3
Pipeline #53145 passed with stages
in 3 minutes and 1 second
...@@ -15,15 +15,15 @@ from __future__ import annotations ...@@ -15,15 +15,15 @@ from __future__ import annotations
import logging import logging
from collections.abc import Sequence from collections.abc import Sequence
from numbers import Real
from typing import Union, Tuple, Optional, List from typing import Union, Tuple, Optional, List
from aenum import Enum from aenum import Enum, IntEnum
from typing_extensions import Literal # in `typing` only since Py 3.8
from .._dev import labjack from .._dev import labjack
from ..comm import LJMCommunication from ..comm import LJMCommunication
from ..dev import SingleCommDevice from ..dev import SingleCommDevice
from ..utils.enum import NameEnum from ..utils.enum import NameEnum, StrEnumBase
class LabJackError(Exception): class LabJackError(Exception):
...@@ -119,6 +119,20 @@ class LabJack(SingleCommDevice): ...@@ -119,6 +119,20 @@ class LabJack(SingleCommDevice):
return float(self.com.read_name(f"SBUS{number}_RH")) return float(self.com.read_name(f"SBUS{number}_RH"))
class AInRange(StrEnumBase):
_init_ = "value_str"
TEN = "10"
ONE = "1"
ONE_TENTH = "0.1"
ONE_HUNDREDTH = "0.01"
def __str__(self) -> str:
return self.value_str
@property
def value(self) -> float:
return float(self.value_str)
def get_ain(self, *channels: int) -> Union[float, Tuple[float, ...]]: def get_ain(self, *channels: int) -> Union[float, Tuple[float, ...]]:
""" """
Read currently measured value (voltage, resistance, ...) from one or more Read currently measured value (voltage, resistance, ...) from one or more
...@@ -136,25 +150,15 @@ class LabJack(SingleCommDevice): ...@@ -136,25 +150,15 @@ class LabJack(SingleCommDevice):
return tuple(float(val) for val in ret_val) return tuple(float(val) for val in ret_val)
return float(ret_val) return float(ret_val)
def set_ain_range(self, channel: int, ain_range: float) -> None: def set_ain_range(self, channel: int, vrange: Union[Real, AInRange]) -> None:
r""" """
Set the range of an analog input port. Set the range of an analog input port.
Possible values for ``ain_range`` are:
* 10 => +- 10 V
* 1 => +- 1 V
* 0.1 => +- 0.1 V
* 0.01 => +- 0.01 V
:param channel: is the AIN number (0..254) :param channel: is the AIN number (0..254)
:param ain_range: is the range specifier :param vrange: is the voltage range to be set
""" """
vrange = self.AInRange(str(vrange))
if ain_range not in (10, 1, 0.1, 0.01): self.com.write_name(f"AIN{channel}_RANGE", vrange.value)
raise LabJackError(f"Not supported range: {ain_range}")
self.com.write_name(f"AIN{channel}_RANGE", ain_range)
def set_ain_resolution(self, channel: int, resolution: int) -> None: def set_ain_resolution(self, channel: int, resolution: int) -> None:
""" """
...@@ -244,7 +248,9 @@ class LabJack(SingleCommDevice): ...@@ -244,7 +248,9 @@ class LabJack(SingleCommDevice):
cjc_type: Union[str, CjcType] = ( cjc_type: Union[str, CjcType] = (
CjcType.internal # type: ignore CjcType.internal # type: ignore
), ),
vrange: float = 0.01, vrange: Union[Real, AInRange] = (
AInRange.ONE_HUNDREDTH # type: ignore
),
resolution: int = 10, resolution: int = 10,
unit: Union[str, TemperatureUnit] = ( unit: Union[str, TemperatureUnit] = (
TemperatureUnit.K # type: ignore TemperatureUnit.K # type: ignore
...@@ -259,7 +265,7 @@ class LabJack(SingleCommDevice): ...@@ -259,7 +265,7 @@ class LabJack(SingleCommDevice):
the thermocouple type the thermocouple type
:param cjc_address: modbus register address to read the CJC temperature :param cjc_address: modbus register address to read the CJC temperature
:param cjc_type: determines cjc slope and offset, 'internal' or 'lm34' :param cjc_type: determines cjc slope and offset, 'internal' or 'lm34'
:param vrange: measurement voltage range (10, 1, 0.1, 0.01) :param vrange: measurement voltage range
:param resolution: resolution index (T7-Pro: 0-12) :param resolution: resolution index (T7-Pro: 0-12)
:param unit: is the temperature unit to be returned ('K', 'C' or 'F') :param unit: is the temperature unit to be returned ('K', 'C' or 'F')
:raises LabJackError: if parameters are unsupported :raises LabJackError: if parameters are unsupported
...@@ -270,6 +276,10 @@ class LabJack(SingleCommDevice): ...@@ -270,6 +276,10 @@ class LabJack(SingleCommDevice):
thermocouple = self.ThermocoupleType(thermocouple) thermocouple = self.ThermocoupleType(thermocouple)
# validate separately from `set_ain_range` to fail before any write happens
# (in `set_ain_differential` first)
vrange = self.AInRange(str(vrange))
unit = self.TemperatureUnit(unit) unit = self.TemperatureUnit(unit)
cjc_type = self.CjcType(cjc_type) cjc_type = self.CjcType(cjc_type)
...@@ -305,31 +315,41 @@ class LabJack(SingleCommDevice): ...@@ -305,31 +315,41 @@ class LabJack(SingleCommDevice):
return round(self.com.read_name(f"AIN{pos_channel}_EF_READ_A"), 2) return round(self.com.read_name(f"AIN{pos_channel}_EF_READ_A"), 2)
def set_digital_output(self, address: str, state) -> None: class DIOStatus(IntEnum):
"""
State of a digital I/O channel.
"""
LOW = 0
HIGH = 1
def set_digital_output(self, address: str, state: Union[int, DIOStatus]) -> None:
""" """
Set the value of a digital output. Set the value of a digital output.
:param address: name of the output -> 'FIO0' :param address: name of the output -> `'FIO0'`
:param state: state of the output -> 1 or 0 :param state: state of the output -> `DIOStatus` instance or corresponding `int`
value
""" """
dt = self.get_product_type() dt = self.get_product_type()
if address not in ( if address not in (
dt.dio # type: ignore dt.dio # type: ignore
): ):
raise LabJackIdentifierDIOError raise LabJackIdentifierDIOError
state = self.DIOStatus(state)
self.com.write_name(address, state) self.com.write_name(address, state)
DIOChannel = labjack.TSeriesDIOChannel DIOChannel = labjack.TSeriesDIOChannel
def get_digital_input( def get_digital_input(
self, address: Union[str, labjack.TSeriesDIOChannel] self, address: Union[str, labjack.TSeriesDIOChannel]
) -> Union[Literal[0], Literal[1]]: ) -> LabJack.DIOStatus:
""" """
Get the value of a digital input. Get the value of a digital input.
allowed names for T7 (Pro): FIO0 - FIO7, EIO0 - EIO 7, CIO0- CIO3, MIO0 - MIO2 allowed names for T7 (Pro): FIO0 - FIO7, EIO0 - EIO 7, CIO0- CIO3, MIO0 - MIO2
:param address: name of the output -> 'FIO0' :param address: name of the output -> 'FIO0'
:return: `1` when `address` DIO is high, otherwise `0`. :return: HIGH when `address` DIO is high, and LOW when `address` DIO is low
""" """
if not isinstance(address, self.DIOChannel): if not isinstance(address, self.DIOChannel):
address = self.DIOChannel(address) address = self.DIOChannel(address)
...@@ -341,12 +361,11 @@ class LabJack(SingleCommDevice): ...@@ -341,12 +361,11 @@ class LabJack(SingleCommDevice):
raise LabJackIdentifierDIOError( raise LabJackIdentifierDIOError(
f"DIO {address.name} is not available for this device type: {dt_name}." f"DIO {address.name} is not available for this device type: {dt_name}."
) )
ret = int(self.com.read_name(address.name)) try:
if ret == 0: ret = int(self.com.read_name(address.name))
return 0 return self.DIOStatus(ret)
if ret == 1: except ValueError:
return 1 raise LabJackIdentifierDIOError(f"Expected 0 or 1 return value, got {ret}.")
raise LabJackIdentifierDIOError(f"Expected 0 or 1 return value, got {ret}.")
class CalMicroAmpere(Enum): class CalMicroAmpere(Enum):
""" """
...@@ -430,13 +449,15 @@ class LabJack(SingleCommDevice): ...@@ -430,13 +449,15 @@ class LabJack(SingleCommDevice):
""" """
return self.get_product_type(force_query_id=force_query_id).name return self.get_product_type(force_query_id=force_query_id).name
def set_ain_resistance(self, channel: int, vrange: float, resolution: int) -> None: def set_ain_resistance(
self, channel: int, vrange: Union[Real, AInRange], resolution: int
) -> None:
""" """
Set the specified channel to resistance mode. It utilized the 200uA current Set the specified channel to resistance mode. It utilized the 200uA current
source of the LabJack. source of the LabJack.
:param channel: channel that should measure the resistance :param channel: channel that should measure the resistance
:param vrange: voltage range of the channel (10, 1, 0.1, 0.01) :param vrange: voltage range of the channel
:param resolution: resolution index of the channel T4: 0-5, T7: 0-8, T7-Pro 0-12 :param resolution: resolution index of the channel T4: 0-5, T7: 0-8, T7-Pro 0-12
""" """
self.set_ain_range(channel, vrange) self.set_ain_range(channel, vrange)
......
...@@ -23,7 +23,6 @@ requirements = [ ...@@ -23,7 +23,6 @@ requirements = [
"aenum>=2.1.2", "aenum>=2.1.2",
"opcua>=0.98.6", "opcua>=0.98.6",
"cryptography>=2.6.1", # optional dependency of the opcua package "cryptography>=2.6.1", # optional dependency of the opcua package
"typing-extensions>=3.7.4.1", # rm when min support Py 3.8
] ]
dependency_links = [] dependency_links = []
......
...@@ -89,13 +89,13 @@ def test_get_ain(started_dev_comm): ...@@ -89,13 +89,13 @@ def test_get_ain(started_dev_comm):
def test_set_ain_range(started_dev_comm): def test_set_ain_range(started_dev_comm):
lj, com = started_dev_comm lj, com = started_dev_comm
lj.set_ain_range(0, 10) lj.set_ain_range(0, lj.AInRange.TEN)
assert com.get_written() == ("AIN0_RANGE", 10) assert com.get_written() == ("AIN0_RANGE", 10)
lj.set_ain_range(0, 0.1) lj.set_ain_range(0, lj.AInRange.ONE_TENTH)
assert com.get_written() == ("AIN0_RANGE", 0.1) assert com.get_written() == ("AIN0_RANGE", 0.1)
with pytest.raises(LabJackError): with pytest.raises(ValueError):
lj.set_ain_range(0, 0.2) lj.set_ain_range(0, 0.2)
assert com.get_written() is None assert com.get_written() is None
...@@ -143,19 +143,19 @@ def test_set_ain_thermocouple(started_dev_comm_with_t7pro): ...@@ -143,19 +143,19 @@ def test_set_ain_thermocouple(started_dev_comm_with_t7pro):
lj, com = started_dev_comm_with_t7pro lj, com = started_dev_comm_with_t7pro
lj.set_ain_thermocouple(0, None) lj.set_ain_thermocouple(0, None)
lj.set_ain_thermocouple(0, LabJack.ThermocoupleType.NONE) lj.set_ain_thermocouple(0, lj.ThermocoupleType.NONE)
lj.set_ain_thermocouple(0, "K") lj.set_ain_thermocouple(0, "K")
lj.set_ain_thermocouple(0, LabJack.ThermocoupleType.K) lj.set_ain_thermocouple(0, lj.ThermocoupleType.K)
with pytest.raises(ValueError): with pytest.raises(ValueError):
lj.set_ain_thermocouple(0, "B") lj.set_ain_thermocouple(0, "B")
lj.set_ain_thermocouple(0, thermocouple="T", unit="F") lj.set_ain_thermocouple(0, thermocouple="T", unit="F")
lj.set_ain_thermocouple(0, thermocouple="T", unit=LabJack.TemperatureUnit.F) lj.set_ain_thermocouple(0, thermocouple="T", unit=lj.TemperatureUnit.F)
with pytest.raises(ValueError): with pytest.raises(ValueError):
lj.set_ain_thermocouple(0, "K", unit="B") lj.set_ain_thermocouple(0, "K", unit="B")
lj.set_ain_thermocouple(0, thermocouple="T", cjc_type="lm34") lj.set_ain_thermocouple(0, thermocouple="T", cjc_type="lm34")
lj.set_ain_thermocouple(0, thermocouple="T", cjc_type=LabJack.CjcType.lm34) lj.set_ain_thermocouple(0, thermocouple="T", cjc_type=lj.CjcType.lm34)
with pytest.raises(ValueError): with pytest.raises(ValueError):
lj.set_ain_thermocouple(0, "K", cjc_type="LM35") lj.set_ain_thermocouple(0, "K", cjc_type="LM35")
...@@ -174,7 +174,7 @@ def test_read_resistance(started_dev_comm): ...@@ -174,7 +174,7 @@ def test_read_resistance(started_dev_comm):
def test_set_ain_resistance(started_dev_comm_with_t7pro): def test_set_ain_resistance(started_dev_comm_with_t7pro):
lj, com = started_dev_comm_with_t7pro lj, com = started_dev_comm_with_t7pro
lj.set_ain_resistance(2, 1, 8) lj.set_ain_resistance(channel=2, vrange=lj.AInRange.ONE, resolution=8)
def test_get_product_id(started_dev_comm): def test_get_product_id(started_dev_comm):
...@@ -228,10 +228,10 @@ def test_get_cal_current_source(started_dev_comm): ...@@ -228,10 +228,10 @@ def test_get_cal_current_source(started_dev_comm):
def test_get_digital_input(started_dev_comm_with_t4): def test_get_digital_input(started_dev_comm_with_t4):
lj, com = started_dev_comm_with_t4 lj, com = started_dev_comm_with_t4
com.put_name("EIO0", 0.0) com.put_name("EIO0", lj.DIOStatus.LOW)
assert lj.get_digital_input("EIO0") == 0 assert lj.get_digital_input("EIO0") == lj.DIOStatus.LOW
com.put_name("EIO0", 1.0) com.put_name("EIO0", lj.DIOStatus.HIGH)
assert lj.get_digital_input("EIO0") == 1 assert lj.get_digital_input("EIO0") == lj.DIOStatus.HIGH
com.put_name("EIO0", 2.0) com.put_name("EIO0", 2.0)
with pytest.raises(LabJackIdentifierDIOError): with pytest.raises(LabJackIdentifierDIOError):
lj.get_digital_input("EIO0") lj.get_digital_input("EIO0")
...@@ -248,8 +248,8 @@ def test_get_digital_input(started_dev_comm_with_t4): ...@@ -248,8 +248,8 @@ def test_get_digital_input(started_dev_comm_with_t4):
def test_set_digital_output(started_dev_comm_with_t7pro): def test_set_digital_output(started_dev_comm_with_t7pro):
lj, com = started_dev_comm_with_t7pro lj, com = started_dev_comm_with_t7pro
lj.set_digital_output("FIO0", 1) lj.set_digital_output("FIO0", lj.DIOStatus.HIGH)
assert com.get_written() == ("FIO0", 1) assert com.get_written() == ("FIO0", lj.DIOStatus.HIGH)
with pytest.raises(LabJackIdentifierDIOError): with pytest.raises(LabJackIdentifierDIOError):
lj.set_digital_output("AIO0", 1) lj.set_digital_output("AIO0", 1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment