To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 3eb28829 authored by mikolajr's avatar mikolajr
Browse files

In PfeifferTPG device: format (black), copyright notice

parent 69d4a7a6
Pipeline #53118 passed with stages
in 3 minutes and 2 seconds
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for Pfeiffer TPG controllers.
......@@ -32,14 +34,10 @@ class PfeifferTPGSerialCommunicationConfig(SerialCommunicationConfig):
baudrate: int = 9600
#: Pfeiffer TPG controllers do not use parity
parity: Union[
str, SerialCommunicationParity
] = SerialCommunicationParity.NONE
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: Pfeiffer TPG controllers use one stop bit
stopbits: Union[
int, SerialCommunicationStopbits
] = SerialCommunicationStopbits.ONE
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: Union[
......@@ -47,7 +45,7 @@ class PfeifferTPGSerialCommunicationConfig(SerialCommunicationConfig):
] = SerialCommunicationBytesize.EIGHTBITS
#: The terminator is <CR><LF>
terminator: bytes = b'\r\n'
terminator: bytes = b"\r\n"
#: use 3 seconds timeout as default
timeout: Number = 3
......@@ -125,11 +123,30 @@ class PfeifferTPGConfig:
"""
class Model(NameEnum):
_init_ = 'full_scale_ranges'
TPG25xA = {1: 0, 10: 1, 100: 2, 1000: 3, 2000: 4, 5000: 5,
10000: 6, 50000: 7, 0.1: 8}
TPGx6x = {0.01: 0, 0.1: 1, 1: 2, 10: 3, 100: 4, 1000: 5,
2000: 6, 5000: 7, 10000: 8, 50000: 9}
_init_ = "full_scale_ranges"
TPG25xA = {
1: 0,
10: 1,
100: 2,
1000: 3,
2000: 4,
5000: 5,
10000: 6,
50000: 7,
0.1: 8,
}
TPGx6x = {
0.01: 0,
0.1: 1,
1: 2,
10: 3,
100: 4,
1000: 5,
2000: 6,
5000: 7,
10000: 8,
50000: 9,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -139,13 +156,11 @@ class PfeifferTPGConfig:
# model of the TPG (determines which lookup table to use for the
# full scale range)
model: Union[
str, Model
] = Model.TPG25xA # type: ignore
model: Union[str, Model] = Model.TPG25xA # type: ignore
def clean_values(self):
if not isinstance(self.model, self.Model):
self.force_value('model', self.Model(self.model))
self.force_value("model", self.Model(self.model))
class PfeifferTPG(SingleCommDevice):
......@@ -169,30 +184,30 @@ class PfeifferTPG(SingleCommDevice):
Volt = 5
SensorTypes = Enum( # type: ignore
value='SensorTypes',
value="SensorTypes",
names=[
('TPR/PCR Pirani Gauge', 1),
('TPR', 1),
('TPR/PCR', 1),
('IKR Cold Cathode Gauge', 2),
('IKR', 2),
('IKR9', 2),
('IKR11', 2),
('PKR Full range CC', 3),
('PKR', 3),
('APR/CMR Linear Gauge', 4),
('CMR', 4),
('APR/CMR', 4),
('CMR/APR', 4),
('Pirani / High Pressure Gauge', 5),
('IMR', 5),
('Fullrange BA Gauge', 6),
('PBR', 6),
('None', 7),
('no Sensor', 7),
('noSen', 7),
('noSENSOR', 7)
]
("TPR/PCR Pirani Gauge", 1),
("TPR", 1),
("TPR/PCR", 1),
("IKR Cold Cathode Gauge", 2),
("IKR", 2),
("IKR9", 2),
("IKR11", 2),
("PKR Full range CC", 3),
("PKR", 3),
("APR/CMR Linear Gauge", 4),
("CMR", 4),
("APR/CMR", 4),
("CMR/APR", 4),
("Pirani / High Pressure Gauge", 5),
("IMR", 5),
("Fullrange BA Gauge", 6),
("PBR", 6),
("None", 7),
("no Sensor", 7),
("noSen", 7),
("noSENSOR", 7),
],
)
class SensorStatus(IntEnum):
......@@ -273,11 +288,11 @@ class PfeifferTPG(SingleCommDevice):
# try matching the sensors:
sensors = []
for s in answer.split(','):
for s in answer.split(","):
try:
sensors.append(self.SensorTypes[s].name)
except KeyError:
sensors.append('Unknown')
sensors.append("Unknown")
self.sensors = sensors
# identification successful:
logging.info(f"Identified {self}")
......@@ -297,8 +312,10 @@ class PfeifferTPG(SingleCommDevice):
self.com.send_command(f"UNI,{unit.value}")
logging.info(f"Setting display unit to {unit.name}")
except PfeifferTPGError:
logging.error(f"Setting display unit to {unit.name} failed. Not all units"
" are available on all TGP models")
logging.error(
f"Setting display unit to {unit.name} failed. Not all units"
" are available on all TGP models"
)
raise
def measure(self, channel: int) -> Tuple[str, float]:
......@@ -314,8 +331,10 @@ class PfeifferTPG(SingleCommDevice):
"""
if not 1 <= channel <= self.number_of_sensors:
message = (f"{channel} is not a valid channel number, it should be between "
f"1 and {self.number_of_sensors}")
message = (
f"{channel} is not a valid channel number, it should be between "
f"1 and {self.number_of_sensors}"
)
logging.error(message)
raise ValueError(message)
......@@ -325,14 +344,18 @@ class PfeifferTPG(SingleCommDevice):
logging.error(f"Reading sensor {channel} failed.")
raise
status, measurement = answer.split(',')
status, measurement = answer.split(",")
s = self.SensorStatus(int(status))
if s == self.SensorStatus.Ok:
logging.info(f"Channel {channel} successful reading of "
f"pressure: {measurement} mbar.")
logging.info(
f"Channel {channel} successful reading of "
f"pressure: {measurement} mbar."
)
else:
logging.info(f"Channel {channel} no reading of pressure, sensor status is "
f"{self.SensorStatus(s).name}.")
logging.info(
f"Channel {channel} no reading of pressure, sensor status is "
f"{self.SensorStatus(s).name}."
)
return s.name, float(measurement)
def measure_all(self) -> List[Tuple[str, float]]:
......@@ -347,15 +370,19 @@ class PfeifferTPG(SingleCommDevice):
"""
try:
answer = self.com.query('PRX')
answer = self.com.query("PRX")
except PfeifferTPGError:
logging.error("Getting pressure reading from all sensors failed (this "
"command is not available on all TGP models).")
logging.error(
"Getting pressure reading from all sensors failed (this "
"command is not available on all TGP models)."
)
raise
ans = answer.split(',')
ret = [(self.SensorStatus(int(ans[2*i])).name, float(ans[2*i+1]))
for i in range(self.number_of_sensors)]
ans = answer.split(",")
ret = [
(self.SensorStatus(int(ans[2 * i])).name, float(ans[2 * i + 1]))
for i in range(self.number_of_sensors)
]
logging.info(f"Reading all sensors with result: {ret}.")
return ret
......@@ -369,18 +396,21 @@ class PfeifferTPG(SingleCommDevice):
:raises PfeifferTPGError: if command fails
"""
wrong_values = [v for v in fsr
if v not in self.config.model.full_scale_ranges_reversed]
wrong_values = [
v for v in fsr if v not in self.config.model.full_scale_ranges_reversed
]
if len(fsr) != self.number_of_sensors:
raise ValueError(f"Argument fsr should be of length "
f"{self.number_of_sensors}. Received length {len(fsr)}.")
raise ValueError(
f"Argument fsr should be of length "
f"{self.number_of_sensors}. Received length {len(fsr)}."
)
elif wrong_values:
raise ValueError(
f"Argument fsr contains invalid values: {wrong_values}. Accepted "
f"values are {list(self.config.model.full_scale_ranges.values())}"
)
str_fsr = ','.join([str(f) for f in fsr])
str_fsr = ",".join([str(f) for f in fsr])
try:
self.com.send_command(f"FSR,{str_fsr}")
logging.info(f"Set sensors full scale to {fsr} respectively.")
......@@ -404,9 +434,12 @@ class PfeifferTPG(SingleCommDevice):
logging.error("Query full scale range of all sensors failed.")
raise
wrong_values = [v for v in answer.split(',')
if not v.isdigit()
or int(v) not in self.config.model.full_scale_ranges_reversed]
wrong_values = [
v
for v in answer.split(",")
if not v.isdigit()
or int(v) not in self.config.model.full_scale_ranges_reversed
]
if wrong_values:
raise PfeifferTPGError(
f"The controller returned the full scale range values: {answer}. The "
......@@ -414,7 +447,7 @@ class PfeifferTPG(SingleCommDevice):
f"{list(self.config.model.full_scale_ranges.values())}."
)
fsr = [int(i) for i in answer.split(',')]
fsr = [int(i) for i in answer.split(",")]
logging.info(f"Obtained full scale range of all sensors as {fsr}.")
return fsr
......@@ -431,14 +464,15 @@ class PfeifferTPG(SingleCommDevice):
if len(fsr) != self.number_of_sensors:
raise ValueError(
f"Argument fsr should be of length {self.number_of_sensors}. "
f"Received length {len(fsr)}.")
f"Received length {len(fsr)}."
)
elif wrong_values:
raise ValueError(
f"Argument fsr contains invalid values: {wrong_values}. Accepted "
f"values are {list(self.config.model.full_scale_ranges.keys())}"
)
str_fsr = ','.join([str(self.config.model.full_scale_ranges[f]) for f in fsr])
str_fsr = ",".join([str(self.config.model.full_scale_ranges[f]) for f in fsr])
try:
self.com.send_command(f"FSR,{str_fsr}")
......@@ -462,9 +496,12 @@ class PfeifferTPG(SingleCommDevice):
logging.info("Query full scale range of all sensors failed.")
raise
wrong_values = [v for v in answer.split(',')
if not v.isdigit()
or int(v) not in self.config.model.full_scale_ranges_reversed]
wrong_values = [
v
for v in answer.split(",")
if not v.isdigit()
or int(v) not in self.config.model.full_scale_ranges_reversed
]
if wrong_values:
raise PfeifferTPGError(
f"The controller returned the full scale range values: {answer}. The "
......@@ -472,8 +509,10 @@ class PfeifferTPG(SingleCommDevice):
f"{list(self.config.model.full_scale_ranges_reversed.keys())}."
)
fsr = [self.config.model.full_scale_ranges_reversed[int(i)]
for i in answer.split(',')]
fsr = [
self.config.model.full_scale_ranges_reversed[int(i)]
for i in answer.split(",")
]
logging.info(f"Obtained full scale range of sensors as {fsr} mbar.")
return fsr
......@@ -482,4 +521,5 @@ class PfeifferTPGError(Exception):
"""
Error with the Pfeiffer TPG Controller.
"""
pass
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Tests for the .dev.pfeiffer_tpg sub-package.
"""
......@@ -20,14 +22,14 @@ def com_config():
"parity": dev.PfeifferTPGSerialCommunicationConfig.Parity.NONE,
"stopbits": dev.PfeifferTPGSerialCommunicationConfig.Stopbits.ONE,
"bytesize": dev.PfeifferTPGSerialCommunicationConfig.Bytesize.EIGHTBITS,
"terminator": b'\r\n',
"terminator": b"\r\n",
"timeout": 3,
}
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def dev_config():
return {'model': dev.PfeifferTPGConfig.Model.TPG25xA}
return {"model": dev.PfeifferTPGConfig.Model.TPG25xA}
@pytest.fixture
......@@ -35,8 +37,14 @@ def started_pfeiffer_tpg(com_config, dev_config):
com = PfeifferTPGLoopSerialCommunication(com_config)
com.open()
com.put_text(chr(6))
com.put_text(','.join([dev.PfeifferTPG.SensorTypes.CMR.name,
dev.PfeifferTPG.SensorTypes.noSENSOR.name]))
com.put_text(
",".join(
[
dev.PfeifferTPG.SensorTypes.CMR.name,
dev.PfeifferTPG.SensorTypes.noSENSOR.name,
]
)
)
with dev.PfeifferTPG(com, dev_config) as pg:
while com.get_written() is not None:
pass
......@@ -50,11 +58,11 @@ def test_pfeiffer_tpg_instantiation(com_config, dev_config):
# another valid config
config_dict = dict(dev_config)
config_dict['model'] = 'TPGx6x'
config_dict["model"] = "TPGx6x"
dev.PfeifferTPG(com_config, config_dict)
# wrong config
config_dict['model'] = 'wrong_name'
config_dict["model"] = "wrong_name"
with pytest.raises(ValueError):
dev.PfeifferTPG(com_config, config_dict)
......@@ -63,11 +71,11 @@ def test_com_send_command(com_config):
com = PfeifferTPGLoopSerialCommunication(com_config)
com.open()
com.put_text(chr(6))
com.send_command('this is the command')
assert com.get_written() == 'this is the command'
com.put_text('not an acknowledgement')
com.send_command("this is the command")
assert com.get_written() == "this is the command"
com.put_text("not an acknowledgement")
with pytest.raises(dev.PfeifferTPGError):
com.send_command('this command is not acknowledged')
com.send_command("this command is not acknowledged")
com.close()
......@@ -75,17 +83,17 @@ def test_com_query(com_config):
com = PfeifferTPGLoopSerialCommunication(com_config)
com.open()
com.put_text(chr(6))
com.put_text('this is the answer')
assert com.query('this is the query') == 'this is the answer'
assert com.get_written() == 'this is the query'
com.put_text("this is the answer")
assert com.query("this is the query") == "this is the answer"
assert com.get_written() == "this is the query"
assert com.get_written() == chr(5)
com.put_text('not an acknowledgement')
com.put_text("not an acknowledgement")
with pytest.raises(dev.PfeifferTPGError):
com.query('this query is not acknowledged')
com.query("this query is not acknowledged")
com.put_text(chr(6))
com.put_text('')
com.put_text("")
with pytest.raises(dev.PfeifferTPGError):
com.query('the answer to this query is empty')
com.query("the answer to this query is empty")
com.close()
......@@ -93,16 +101,16 @@ def test_pfeiffer_tpg_start(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
# starting again should work
com.put_text(chr(6))
com.put_text(','.join(['Unknown', dev.PfeifferTPG.SensorTypes.CMR.name]))
com.put_text(",".join(["Unknown", dev.PfeifferTPG.SensorTypes.CMR.name]))
pg.start()
assert pg.number_of_sensors == 2
assert pg.sensors[0] == 'Unknown'
assert pg.sensors[1] == 'APR/CMR Linear Gauge'
assert pg.sensors[0] == "Unknown"
assert pg.sensors[1] == "APR/CMR Linear Gauge"
def test_pfeiffer_com_error(com_config, dev_config):
wrong_config = dict(com_config)
wrong_config['port'] = 'NOT A PORT'
wrong_config["port"] = "NOT A PORT"
tpg = dev.PfeifferTPG(wrong_config, dev_config)
assert not tpg.com.is_open
......@@ -133,18 +141,24 @@ def test_identify_sensors(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
# example with 3 sensors
com.put_text(chr(6))
com.put_text(','.join([dev.PfeifferTPG.SensorTypes.PKR.name,
dev.PfeifferTPG.SensorTypes.CMR.name,
dev.PfeifferTPG.SensorTypes.IKR.name]))
com.put_text(
",".join(
[
dev.PfeifferTPG.SensorTypes.PKR.name,
dev.PfeifferTPG.SensorTypes.CMR.name,
dev.PfeifferTPG.SensorTypes.IKR.name,
]
)
)
pg.identify_sensors()
assert com.get_written() == 'TID'
assert com.get_written() == "TID"
assert com.get_written() == chr(5)
assert pg.number_of_sensors == 3
assert pg.sensors[0] == dev.PfeifferTPG.SensorTypes.PKR.name
assert pg.sensors[1] == dev.PfeifferTPG.SensorTypes.CMR.name
assert pg.sensors[2] == dev.PfeifferTPG.SensorTypes.IKR.name
# wrong answer from device
com.put_text('this will make the command fail')
com.put_text("this will make the command fail")
with pytest.raises(dev.PfeifferTPGError):
pg.identify_sensors()
......@@ -158,7 +172,7 @@ def test_set_display_unit(started_pfeiffer_tpg):
com.put_text(chr(6))
pg.set_display_unit(pg.PressureUnits.Pascal)
# valid PressureUnits argument but no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.set_display_unit(pg.PressureUnits.Pascal)
# invalid str argument
......@@ -172,7 +186,7 @@ def test_measure(started_pfeiffer_tpg):
com.put_text(chr(6))
com.put_text(f"{dev.PfeifferTPG.SensorStatus.Ok.value},1.234E-02")
assert pg.measure(2) == (dev.PfeifferTPG.SensorStatus.Ok.name, 1.234e-2)
assert com.get_written() == 'PR2'
assert com.get_written() == "PR2"
# underrange
com.put_text(chr(6))
com.put_text(f"{dev.PfeifferTPG.SensorStatus.Underrange.value},1.234E-02")
......@@ -196,13 +210,15 @@ def test_measure(started_pfeiffer_tpg):
# identification error
com.put_text(chr(6))
com.put_text(f"{dev.PfeifferTPG.SensorStatus.Identification_error.value},1.234E-02")
assert pg.measure(2) == (dev.PfeifferTPG.SensorStatus.Identification_error.name,
1.234e-2)
assert pg.measure(2) == (
dev.PfeifferTPG.SensorStatus.Identification_error.name,
1.234e-2,
)
# wrong channel
with pytest.raises(ValueError):
pg.measure(12)
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.measure(1)
......@@ -211,14 +227,17 @@ def test_measure_all(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
# normal case
com.put_text(chr(6))
com.put_text(f"{dev.PfeifferTPG.SensorStatus.Identification_error.value},1.234E-02,"
f"{dev.PfeifferTPG.SensorStatus.Ok.value},1.234E-02")
assert pg.measure_all() == [(dev.PfeifferTPG.SensorStatus.Identification_error.name,
1.234e-2),
(dev.PfeifferTPG.SensorStatus.Ok.name, 1.234e-2)]
assert com.get_written() == 'PRX'
com.put_text(
f"{dev.PfeifferTPG.SensorStatus.Identification_error.value},1.234E-02,"
f"{dev.PfeifferTPG.SensorStatus.Ok.value},1.234E-02"
)
assert pg.measure_all() == [
(dev.PfeifferTPG.SensorStatus.Identification_error.name, 1.234e-2),
(dev.PfeifferTPG.SensorStatus.Ok.name, 1.234e-2),
]
assert com.get_written() == "PRX"
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.measure_all()
......@@ -227,9 +246,9 @@ def test_set_full_scale_unitless(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
com.put_text(chr(6))
pg.set_full_scale_unitless([1, 2])
assert com.get_written() == 'FSR,1,2'
assert com.get_written() == "FSR,1,2"
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.set_full_scale_unitless([1, 2])
# wrong values
......@@ -245,16 +264,16 @@ def test_set_full_scale_unitless(started_pfeiffer_tpg):
def test_get_full_scale_unitless(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
com.put_text(chr(6))
com.put_text('2,0')
com.put_text("2,0")
assert pg.get_full_scale_unitless() == [2, 0]
assert com.get_written() == 'FSR'
assert com.get_written() == "FSR"
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.get_full_scale_unitless()
# wrong answer from device
com.put_text(chr(6))
com.put_text('12,24')
com.put_text("12,24")
with pytest.raises(dev.PfeifferTPGError):
pg.get_full_scale_unitless()
......@@ -264,9 +283,9 @@ def test_set_full_scale_mbar(started_pfeiffer_tpg):
com.put_text(chr(6))
pg.set_full_scale_mbar([100, 1])
fsr = pg.config.model.full_scale_ranges
assert com.get_written() == f'FSR,{fsr[100]},{fsr[1]}'
assert com.get_written() == f"FSR,{fsr[100]},{fsr[1]}"
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.set_full_scale_mbar([100, 1])
# wrong values
......@@ -282,16 +301,16 @@ def test_set_full_scale_mbar(started_pfeiffer_tpg):
def test_get_full_scale_mbar(started_pfeiffer_tpg):
com, pg = started_pfeiffer_tpg
com.put_text(chr(6))
com.put_text('2,0')
com.put_text("2,0")
fsr_rev = pg.config.model.full_scale_ranges_reversed
assert pg.get_full_scale_mbar() == [fsr_rev[2], fsr_rev[0]]
assert com.get_written() == 'FSR'
assert com.get_written() == "FSR"
# no acknowledgment from device
com.put_text('not an acknowledgment')
com.put_text("not an acknowledgment")
with pytest.raises(dev.PfeifferTPGError):
pg.get_full_scale_mbar()
# wrong answer from device
com.put_text(chr(6))
com.put_text('12,24')
com.put_text("12,24")
with pytest.raises(dev.PfeifferTPGError):
pg.get_full_scale_mbar()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment