To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 0b0cbae6 authored by mikolajr's avatar mikolajr
Browse files

In Heinziger devices: wait time config settings, config tests, fix type check...

In Heinziger devices: wait time config settings, config tests, fix type check errors, format (black), and copyright notice
parent b6a398d7
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device classes for Heinzinger Digital Interface I/II and Heinzinger PNC power supply.
......@@ -22,9 +24,16 @@ from typing import Union
from .base import SingleCommDevice
from ..comm import SerialCommunication, SerialCommunicationConfig
from ..comm.serial import (
SerialCommunicationParity,
SerialCommunicationStopbits,
SerialCommunicationBytesize,
)
from ..configuration import configdataclass
from ..utils.enum import AutoNumberNameEnum
Number = Union[int, float]
@configdataclass
class HeinzingerSerialCommunicationConfig(SerialCommunicationConfig):
......@@ -32,22 +41,33 @@ class HeinzingerSerialCommunicationConfig(SerialCommunicationConfig):
baudrate: int = 9600
#: Heinzinger does not use parity
parity: (str, SerialCommunicationConfig.Parity) = \
SerialCommunicationConfig.Parity.NONE
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: Heinzinger uses one stop bit
stopbits: (int, SerialCommunicationConfig.Stopbits) = \
SerialCommunicationConfig.Stopbits.ONE
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: (int, SerialCommunicationConfig.Bytesize) = \
SerialCommunicationConfig.Bytesize.EIGHTBITS
bytesize: Union[
int, SerialCommunicationBytesize
] = SerialCommunicationBytesize.EIGHTBITS
#: The terminator is LF
terminator: bytes = b'\n'
terminator: bytes = b"\n"
#: use 3 seconds timeout as default
timeout: (int, float) = 3
timeout: Number = 3
#: time to wait between attempts of reading a non-empty text
wait_sec_read_text_nonempty: Number = 0.5
def clean_values(self):
super().clean_values()
if self.wait_sec_read_text_nonempty <= 0:
raise ValueError(
"Wait time between attempts to read a non-empty text must be be a "
"positive value (in seconds)."
)
class HeinzingerSerialCommunication(SerialCommunication):
......@@ -61,11 +81,7 @@ class HeinzingerSerialCommunication(SerialCommunication):
def config_cls():
return HeinzingerSerialCommunicationConfig
def read_text_nonempty(
self,
n_attempts_max: int = 40,
attempt_interval_sec: Union[int, float] = 0.5,
) -> str:
def read_text_nonempty(self, n_attempts_max: int = 40,) -> str:
"""
Reads from the serial port, until a non-empty line is found, or the number of
attempts is exceeded.
......@@ -77,6 +93,7 @@ class HeinzingerSerialCommunication(SerialCommunication):
"""
answer = self.read_text().strip()
n_attempts_left = n_attempts_max if self.is_open else 0
attempt_interval_sec = self.wait_sec_read_text_nonempty # type: ignore
while len(answer) == 0 and n_attempts_left > 0:
sleep(attempt_interval_sec)
answer = self.read_text().strip()
......@@ -97,21 +114,34 @@ class HeinzingerConfig:
EIGHT = 8
SIXTEEN = 16
# default number of recordings used in averaging the current
# or the voltage [1, 2, 4, 8, 16]
default_number_of_recordings: (int, RecordingsEnum) = 1
#: default number of recordings used in averaging the current
# or the voltage [1, 2, 4, 8, 16]
default_number_of_recordings: Union[int, RecordingsEnum] = 1
# number of decimals sent for setting the current limit or the voltage
#: number of decimals sent for setting the current limit or the voltage, between 1
# and 10
number_of_decimals: int = 6
#: Time to wait after subsequent commands during stop (in seconds)
wait_sec_stop_commands: Number = 0.5
def clean_values(self):
if not isinstance(self.default_number_of_recordings, self.RecordingsEnum):
self.force_value('default_number_of_recordings',
self.RecordingsEnum(self.default_number_of_recordings))
self.force_value(
"default_number_of_recordings",
self.RecordingsEnum(self.default_number_of_recordings),
)
if self.number_of_decimals not in range(1, 11):
raise ValueError('The number of decimals should be '
'an integer between 1 and 10.')
raise ValueError(
"The number of decimals should be " "an integer between 1 and 10."
)
if self.wait_sec_stop_commands <= 0:
raise ValueError(
"Wait time after subsequent commands during stop must be be a "
"positive value (in seconds)."
)
class HeinzingerDI(SingleCommDevice, ABC):
......@@ -128,10 +158,10 @@ class HeinzingerDI(SingleCommDevice, ABC):
super().__init__(com, dev_config)
# Version of the interface (will be retrieved after com is opened)
self._interface_version = ''
self._interface_version = ""
def __repr__(self):
return "HeinzingerDI({})".format(self._interface_version)
return f"HeinzingerDI({self._interface_version})"
@staticmethod
def default_com_cls():
......@@ -161,9 +191,9 @@ class HeinzingerDI(SingleCommDevice, ABC):
logging.info("Stopping device " + str(self))
self.set_voltage(0)
sleep(0.5)
sleep(self.config.wait_sec_stop_commands)
self.output_off()
sleep(0.5)
sleep(self.config.wait_sec_stop_commands)
super().stop()
def reset_interface(self) -> None:
......@@ -173,7 +203,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('*RST')
self.com.write_text("*RST")
def get_interface_version(self) -> str:
"""
......@@ -181,7 +211,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('VERS?')
self.com.write_text("VERS?")
return self.com.read_text_nonempty()
def get_serial_number(self) -> str:
......@@ -191,7 +221,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:return: string containing the device serial number
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('*IDN?')
self.com.write_text("*IDN?")
return self.com.read_text_nonempty()
def output_on(self) -> None:
......@@ -200,7 +230,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('OUTP ON')
self.com.write_text("OUTP ON")
def output_off(self) -> None:
"""
......@@ -208,7 +238,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('OUTP OFF')
self.com.write_text("OUTP OFF")
def get_number_of_recordings(self) -> int:
"""
......@@ -218,13 +248,12 @@ class HeinzingerDI(SingleCommDevice, ABC):
:return: int number of recordings
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('AVER?')
self.com.write_text("AVER?")
answer = self.com.read_text_nonempty()
return int(answer)
def set_number_of_recordings(
self,
value: Union[int, HeinzingerConfig.RecordingsEnum],
self, value: Union[int, HeinzingerConfig.RecordingsEnum],
) -> None:
"""
Sets the number of recordings the device is using for average value
......@@ -233,7 +262,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
value = self.config.RecordingsEnum(value).value
self.com.write_text('AVER {}'.format(value))
self.com.write_text(f"AVER {value}")
def measure_voltage(self) -> float:
"""
......@@ -242,7 +271,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:return: measured voltage as float
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('MEAS:VOLT?')
self.com.write_text("MEAS:VOLT?")
answer = self.com.read_text_nonempty()
return float(answer)
......@@ -254,8 +283,9 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('VOLT {:.{}f}'.format(value,
self.config.number_of_decimals))
self.com.write_text(
f"VOLT {{:.{self.config.number_of_decimals}f}}".format(value)
)
def get_voltage(self) -> float:
"""
......@@ -263,7 +293,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('VOLT?')
self.com.write_text("VOLT?")
answer = self.com.read_text_nonempty()
return float(answer)
......@@ -274,7 +304,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:return: measured current as float
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('MEAS:CURR?')
self.com.write_text("MEAS:CURR?")
answer = self.com.read_text_nonempty()
return float(answer)
......@@ -286,8 +316,9 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('CURR {:.{}f}'.format(value,
self.config.number_of_decimals))
self.com.write_text(
f"CURR {{:.{self.config.number_of_decimals}f}}".format(value)
)
def get_current(self) -> float:
"""
......@@ -295,7 +326,7 @@ class HeinzingerDI(SingleCommDevice, ABC):
:raises SerialCommunicationIOError: when communication port is not opened
"""
self.com.write_text('CURR?')
self.com.write_text("CURR?")
answer = self.com.read_text_nonempty()
return float(answer)
......@@ -323,9 +354,9 @@ class HeinzingerPNC(HeinzingerDI):
super().__init__(com, dev_config)
# Serial number of the device (will be retrieved after com is opened)
self._serial_number = ''
self._serial_number = ""
# model of the device (derived from serial number)
self._model = ''
self._model = ""
# maximum output current of the hardware (unit mA or A, depending on model)
self._max_current_hardware = 0
# maximum output voltage of the hardware (unit V or kV, depending on model)
......@@ -340,8 +371,10 @@ class HeinzingerPNC(HeinzingerDI):
self._unit_voltage = self.UnitVoltage.UNKNOWN
def __repr__(self):
return "HeinzingerPNC({}), with HeinzingerDI({})".format(
self._serial_number, self._interface_version)
return (
f"HeinzingerPNC({self._serial_number}), with "
f"HeinzingerDI({self._interface_version})"
)
@property
def max_current_hardware(self) -> Union[int, float]:
......@@ -366,8 +399,9 @@ class HeinzingerPNC(HeinzingerDI):
@max_current.setter
def max_current(self, value: Union[int, float]):
if not 0 <= value <= self._max_current_hardware:
raise ValueError("max_current must positive "
"and below max_current_hardware.")
raise ValueError(
"max_current must positive " "and below max_current_hardware."
)
self._max_current = value
@property
......@@ -377,8 +411,9 @@ class HeinzingerPNC(HeinzingerDI):
@max_voltage.setter
def max_voltage(self, value: Union[int, float]):
if not 0 <= value <= self._max_voltage_hardware:
raise ValueError("max_voltage must be positive "
"and below max_voltage_hardware.")
raise ValueError(
"max_voltage must be positive " "and below max_voltage_hardware."
)
self._max_voltage = value
def start(self) -> None:
......@@ -403,16 +438,18 @@ class HeinzingerPNC(HeinzingerDI):
"""
serial_number = self.get_serial_number()
# regex to find the model of the device
regex_model = r"PNC.*?\d+-\d+\s?[a-z]{3}"
regex_vc = r"(\d+)-(\d+)" # voltage-current info
regex_model = r"PNC.*?" + regex_vc + r"\s?[a-z]{3}"
result = re.search(regex_model, serial_number)
if result:
self._serial_number = serial_number
model = result.group()
self._model = model
# regex to find the nominal voltage and nominal current
vi = re.search(r"\d+-\d+", model).group().split('-')
voltage = int(vi[0])
current = int(vi[1])
match = re.search(regex_vc, model)
assert match # already matched in regex_model expression
voltage = int(match.group(1))
current = int(match.group(2))
# identifying the units to use for voltage and current
if voltage < 100000:
self._unit_voltage = self.UnitVoltage.V
......@@ -420,17 +457,17 @@ class HeinzingerPNC(HeinzingerDI):
self._max_voltage = voltage
else:
self._unit_voltage = self.UnitVoltage.kV
self._max_voltage_hardware = int(voltage/1000)
self._max_voltage = int(voltage/1000)
self._max_voltage_hardware = int(voltage / 1000)
self._max_voltage = int(voltage / 1000)
if current < 1000:
self._unit_current = self.UnitCurrent.mA
self._max_current_hardware = current
self._max_current = current
else:
self._unit_current = self.UnitCurrent.A
self._max_current_hardware = int(current/1000)
self._max_current = int(current/1000)
logging.info('Device {} successfully identified'.format(model))
self._max_current_hardware = int(current / 1000)
self._max_current = int(current / 1000)
logging.info(f"Device {model} successfully identified")
else:
raise HeinzingerPNCDeviceNotRecognizedException(serial_number)
......@@ -467,6 +504,7 @@ class HeinzingerPNCError(Exception):
"""
General error with the Heinzinger PNC voltage source.
"""
pass
......@@ -475,6 +513,7 @@ class HeinzingerPNCMaxVoltageExceededException(HeinzingerPNCError):
Error indicating that program attempted to set the voltage
to a value exceeding 'max_voltage'.
"""
pass
......@@ -483,6 +522,7 @@ class HeinzingerPNCMaxCurrentExceededException(HeinzingerPNCError):
Error indicating that program attempted to set the current
to a value exceeding 'max_current'.
"""
pass
......@@ -491,4 +531,5 @@ class HeinzingerPNCDeviceNotRecognizedException(HeinzingerPNCError):
Error indicating that the serial number of the device
is not recognized.
"""
pass
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Tests for the .dev.heinzinger_pnc sub-package.
"""
......@@ -20,21 +22,22 @@ def com_config():
"parity": dev.HeinzingerSerialCommunicationConfig.Parity.NONE,
"stopbits": dev.HeinzingerSerialCommunicationConfig.Stopbits.ONE,
"bytesize": dev.HeinzingerSerialCommunicationConfig.Bytesize.EIGHTBITS,
"terminator": b'\r\n',
"terminator": b"\r\n",
"timeout": 3,
"wait_sec_read_text_nonempty": 0.01,
}
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def dev_config():
return {
'default_number_of_recordings': 16,
'number_of_decimals': 6
"default_number_of_recordings": 16,
"number_of_decimals": 6,
"wait_sec_stop_commands": 0.01,
}
class ConcreteHeinzingerDI(dev.HeinzingerDI):
def start(self):
super().start()
......@@ -52,7 +55,6 @@ def started_heinzinger_di(com_config, dev_config):
@pytest.fixture
def start_pnc_device(com_config, dev_config):
def _start_pnc_device(serial_number):
serial_port = HeinzingerLoopSerialCommunication(com_config)
serial_port.open()
......@@ -64,6 +66,7 @@ def start_pnc_device(com_config, dev_config):
while serial_port.get_written() is not None:
pass
yield serial_port, pnc
return started_pnc()
return _start_pnc_device
......@@ -74,9 +77,9 @@ def test_di_number_of_recordings(started_heinzinger_di):
# set a new value
di.set_number_of_recordings(4)
assert com.get_written() == 'AVER 4'
assert com.get_written() == "AVER 4"
com.put_text('4')
com.put_text("4")
assert di.get_number_of_recordings() == 4
# assigning integer not amongst accepted values
......@@ -84,14 +87,56 @@ def test_di_number_of_recordings(started_heinzinger_di):
di.set_number_of_recordings(3)
def test_com_config(com_config):
config = dev.HeinzingerSerialCommunicationConfig(**com_config)
for key, value in com_config.items():
assert getattr(config, key) == value
@pytest.mark.parametrize(
"wrong_config_dict",
[
{"wait_sec_read_text_nonempty": 0},
{"timeout": 1, "wait_sec_read_text_nonempty": -1},
],
)
def test_com_config_invalid(com_config, wrong_config_dict):
invalid_com_config = dict(com_config)
invalid_com_config.update(wrong_config_dict)
with pytest.raises(ValueError):
dev.HeinzingerSerialCommunicationConfig(**invalid_com_config)
def test_dev_config(dev_config):
# currently there are no non-default config values
dev.HeinzingerConfig()
config = dev.HeinzingerConfig(**dev_config)
for key, value in dev_config.items():
assert getattr(config, key) == value
@pytest.mark.parametrize(
"wrong_config_dict",
[
{"default_number_of_recordings": 0},
{"number_of_decimals": 0},
{"number_of_decimals": 11},
{"wait_sec_stop_commands": 0},
{"number_of_decimals": 6, "wait_sec_stop_commands": -1},
],
)
def test_dev_config_invalid(wrong_config_dict):
with pytest.raises(ValueError):
dev.HeinzingerConfig(**wrong_config_dict)
def test_di_instantiation(com_config, dev_config):
di = ConcreteHeinzingerDI(com_config)
assert di is not None
wrong_config = dict(dev_config)
wrong_config['default_number_of_recordings'] = 0
with pytest.raises(ValueError):
ConcreteHeinzingerDI(com_config, wrong_config)
di = ConcreteHeinzingerDI(com_config, dev_config)
assert di is not None
def test_di_start(started_heinzinger_di):
......@@ -99,27 +144,27 @@ def test_di_start(started_heinzinger_di):
# starting again should work
com.put_text("my_interface_version")
di.start()
assert com.get_written() == 'VERS?'
assert com.get_written() == "VERS?"
def test_di_stop(started_heinzinger_di):
com, di = started_heinzinger_di
# starting again should work
di.stop()
assert com.get_written() == 'VOLT 0.000000'
assert com.get_written() == 'OUTP OFF'
assert com.get_written() == "VOLT 0.000000"
assert com.get_written() == "OUTP OFF"
def test_di_output_on(started_heinzinger_di):
com, di = started_heinzinger_di
di.output_on()
assert com.get_written() == 'OUTP ON'
assert com.get_written() == "OUTP ON"
def test_di_output_off(started_heinzinger_di):
com, di = started_heinzinger_di
di.output_off()
assert com.get_written() == 'OUTP OFF'
assert com.get_written() == "OUTP OFF"
def test_di_set_voltage(started_heinzinger_di):
......@@ -127,7 +172,7 @@ def test_di_set_voltage(started_heinzinger_di):
# test if the correct text is sent to the com
di.set_voltage(0.123456789)
assert com.get_written().strip() == 'VOLT 0.123457'
assert com.get_written().strip() == "VOLT 0.123457"
def test_di_measure_voltage(started_heinzinger_di):
......@@ -135,7 +180,7 @@ def test_di_measure_voltage(started_heinzinger_di):
volt = 1.2
com.put_text(str(volt))
result = di.measure_voltage()
assert result == volt and com.get_written() == 'MEAS:VOLT?'
assert result == volt and com.get_written() == "MEAS:VOLT?"
def test_di_set_current(started_heinzinger_di):
......@@ -143,7 +188,7 @@ def test_di_set_current(started_heinzinger_di):
# test if the correct text is sent to the com
di.set_current(0.123456789)
assert com.get_written().strip() == 'CURR 0.123457'
assert com.get_written().strip() == "CURR 0.123457"
def test_di_measure_current(started_heinzinger_di):
......@@ -151,13 +196,13 @@ def test_di_measure_current(started_heinzinger_di):
curr = 0.25
com.put_text(str(curr))
result = di.measure_current()
assert result == curr and com.get_written() == 'MEAS:CURR?'
assert result == curr and com.get_written() == "MEAS:CURR?"
def test_di_com_error(com_config, dev_config):
wrong_config = dict(com_config)
wrong_config['port'] = 'NOT A PORT'
wrong_config["port"] = "NOT A PORT"
di = ConcreteHeinzingerDI(wrong_config, dev_config)
assert not di.com.is_open
......@@ -200,7 +245,7 @@ def test_pnc_instantiation(com_config, dev_config):
assert pnc is not None
wrong_config = dict(dev_config)
wrong_config['default_number_of_recordings'] = 0
wrong_config["default_number_of_recordings"] = 0
with pytest.raises(ValueError):
dev.HeinzingerPNC(com_config, wrong_config)
......@@ -222,11 +267,11 @@ def test_pnc_com_error(com_config, dev_config):
# test different possible serial numbers to check if the info is read correctly
devices_data = [
('PNChp 60000-1neg 354211082', 60000, 1, dev.HeinzingerPNC.UnitVoltage.V, 'mA'),
('314807440/PNChp 60000-1neg.', 60000, 1, 'V', dev.HeinzingerPNC.UnitCurrent.mA),
('PNChp 1500-40 ump. 375214277', 1500, 40, 'V', 'mA'),
('PNC 100000-6pos 375214277', 100, 6, dev.HeinzingerPNC.UnitVoltage.kV, 'mA'),
('PNC 600-3000pos 375214277', 600, 3, 'V', dev.HeinzingerPNC.UnitCurrent.A),
("PNChp 60000-1neg 354211082", 60000, 1, dev.HeinzingerPNC.UnitVoltage.V, "mA"),
("314807440/PNChp 60000-1neg.", 60000, 1, "V", dev.HeinzingerPNC.UnitCurrent.mA),
("PNChp 1500-40 ump. 375214277", 1500, 40, "V", "mA"),
("PNC 100000-6pos 375214277", 100, 6, dev.HeinzingerPNC.UnitVoltage.kV, "mA"),
("PNC 600-3000pos 375214277", 600, 3, "V", dev.HeinzingerPNC.UnitCurrent.A),
]
......@@ -239,34 +284,34 @@ def test_pnc_start(start_pnc_device, com_config, device_data):
com.put_text("my_interface_version")
com.put_text(device_data[0])
pnc.start()
assert com.get_written() == 'VERS?'
assert com.get_written() == '*IDN?'
assert com.get_written() == "VERS?"
assert com.get_written() == "*IDN?"
assert pnc.max_voltage_hardware == pnc.max_voltage == device_data[1]
assert pnc.max_current_hardware == pnc.max_current == device_data[2]
assert pnc.unit_voltage == device_data[3]
assert pnc.unit_current == device_data[4]
assert com.get_written() == 'AVER ' + str(