To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 5011ec34 authored by mikolajr's avatar mikolajr
Browse files

Merge branch 'dev_pfeiffer_tpg' into 'devel'

Added device Pfeiffer TPG controller

See merge request !25
parents 88659a43 d81ce585
Pipeline #53140 passed with stages
in 2 minutes and 59 seconds
......@@ -9,10 +9,12 @@ Devel
* Heinzinger Digital Interface I/II and a Heinzinger PNC power supply
* Q-switched Pulsed Laser and a laser attenuator from CryLas
* Newport SMC100PP single axis motion controller for 2-phase stepper motors
* Pfeiffer TPG controller (TPG 25x, TPG 26x and TPG 36x) for Compact pressure Gauges
* API refactorings:
* Protected non-thread safe read and write in communication protocols
* Device sequence mixin: start/stop, add/rm and lookup
* PEP 561 compatibility and static type checking corrections
* Refactored `.format()` to f-strings.
* Improved error docstrings (:code:`:raises:` annotations) and extended tests for
errors.
......
......@@ -56,6 +56,7 @@ the following devices:
connection
* a Newport SMC100PP single axis motion controller for 2-phase stepper motors over
a serial connection
* a Pfeiffer TPG controller (TPG 25x, TPG 26x and TPG 36x) for Compact pressure Gauges
Credits
......
"""
Example script for the device Pfeiffer TPG
"""
import logging
from hvl_ccb.dev import PfeifferTPG
logging.basicConfig(level=logging.INFO)
# create device object
pg = PfeifferTPG({'port': 'COM4'})
# start the device
pg.start()
# get the full scale range of all channels
# see model-dependent lookup table with: help(pg.get_full_scale)
fsr = pg.get_full_scale_mbar()
# set the full scale of sensor 5 to 100 mbar
fsr[4] = 100
pg.set_full_scale_mbar(fsr)
# get pressure reading from channel 1
pressure = pg.measure(1)
# get pressure reading from all channels
# (not available on all TPG models)
pressures = pg.measure_all()
# stop the device
pg.stop()
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""Devices subpackage."""
from .base import ( # noqa: F401
......@@ -8,6 +6,19 @@ from .base import ( # noqa: F401
DeviceSequenceMixin,
DeviceExistingException,
)
from .crylas import ( # noqa: F401
CryLasLaser,
CryLasLaserConfig,
CryLasLaserSerialCommunication,
CryLasLaserSerialCommunicationConfig,
CryLasLaserError,
CryLasLaserNotReadyError,
CryLasAttenuator,
CryLasAttenuatorConfig,
CryLasAttenuatorSerialCommunication,
CryLasAttenuatorSerialCommunicationConfig,
CryLasAttenuatorError,
)
from .ea_psi9000 import ( # noqa: F401
PSI9000,
PSI9000Config,
......@@ -26,29 +37,6 @@ from .heinzinger import ( # noqa: F401
HeinzingerSerialCommunication,
HeinzingerSerialCommunicationConfig,
)
from .crylas import ( # noqa: F401
CryLasLaser,
CryLasLaserConfig,
CryLasLaserSerialCommunication,
CryLasLaserSerialCommunicationConfig,
CryLasLaserError,
CryLasLaserNotReadyError,
CryLasAttenuator,
CryLasAttenuatorConfig,
CryLasAttenuatorSerialCommunication,
CryLasAttenuatorSerialCommunicationConfig,
CryLasAttenuatorError,
)
from .newport import ( # noqa: F401
NewportSMC100PP,
NewportSMC100PPConfig,
NewportSMC100PPSerialCommunication,
NewportSMC100PPSerialCommunicationConfig,
NewportConfigCommands,
NewportMotorError,
NewportControllerError,
NewportSerialCommunicationError,
)
from .labjack import ( # noqa: F401
LabJack,
LabJackError,
......@@ -63,6 +51,23 @@ from .mbw973 import ( # noqa: F401
MBW973SerialCommunication,
MBW973SerialCommunicationConfig,
)
from .newport import ( # noqa: F401
NewportSMC100PP,
NewportSMC100PPConfig,
NewportSMC100PPSerialCommunication,
NewportSMC100PPSerialCommunicationConfig,
NewportConfigCommands,
NewportMotorError,
NewportControllerError,
NewportSerialCommunicationError,
)
from .pfeiffer_tpg import ( # noqa: F401
PfeifferTPG,
PfeifferTPGConfig,
PfeifferTPGSerialCommunication,
PfeifferTPGSerialCommunicationConfig,
PfeifferTPGError,
)
from .rs_rto1024 import ( # noqa: F401
RTO1024,
RTO1024Error,
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for Pfeiffer TPG controllers.
The Pfeiffer TPG control units are used to control Pfeiffer Compact Gauges.
Models: TPG 251 A, TPG 252 A, TPG 256A, TPG 261, TPG 262, TPG 361, TPG 362 and TPG 366.
Manufacturer homepage:
https://www.pfeiffer-vacuum.com/en/products/measurement-analysis/
measurement/activeline/controllers/
"""
import logging
from enum import Enum, IntEnum
from typing import Dict, List, Tuple, Union, cast
from .base import SingleCommDevice
from ..comm import SerialCommunication, SerialCommunicationConfig
from ..comm.serial import (
SerialCommunicationParity,
SerialCommunicationStopbits,
SerialCommunicationBytesize,
)
from ..configuration import configdataclass
from ..utils.enum import NameEnum
Number = Union[int, float]
class PfeifferTPGError(Exception):
"""
Error with the Pfeiffer TPG Controller.
"""
pass
@configdataclass
class PfeifferTPGSerialCommunicationConfig(SerialCommunicationConfig):
#: Baudrate for Pfeiffer TPG controllers is 9600 baud
baudrate: int = 9600
#: Pfeiffer TPG controllers do not use parity
parity: Union[str, SerialCommunicationParity] = SerialCommunicationParity.NONE
#: Pfeiffer TPG controllers use one stop bit
stopbits: Union[int, SerialCommunicationStopbits] = SerialCommunicationStopbits.ONE
#: One byte is eight bits long
bytesize: Union[
int, SerialCommunicationBytesize
] = SerialCommunicationBytesize.EIGHTBITS
#: The terminator is <CR><LF>
terminator: bytes = b"\r\n"
#: use 3 seconds timeout as default
timeout: Number = 3
class PfeifferTPGSerialCommunication(SerialCommunication):
"""
Specific communication protocol implementation for Pfeiffer TPG controllers.
Already predefines device-specific protocol parameters in config.
"""
@staticmethod
def config_cls():
return PfeifferTPGSerialCommunicationConfig
def send_command(self, cmd: str) -> None:
"""
Send a command to the device and check for acknowledgement.
:param cmd: command to send to the device
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if the answer from the device differs from the
expected acknowledgement character 'chr(6)'.
"""
with self.access_lock:
# send the command
self._write_text_unsafe(cmd)
# check for acknowledgment char (ASCII 6)
answer = self._read_text_unsafe()
if len(answer) == 0 or ord(answer[0]) != 6:
message = f"Pfeiffer TPG not acknowledging command {cmd}"
logging.error(message)
if len(answer) > 0:
logging.debug(f"Pfeiffer TPG: {answer}")
raise PfeifferTPGError(message)
def query(self, cmd: str) -> str:
"""
Send a query, then read and returns the first line from the com port.
:param cmd: query message to send to the device
:return: first line read on the com
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if the device does not acknowledge the command or if
the answer from the device is empty
"""
with self.access_lock:
# send the command
self._write_text_unsafe(cmd)
# check for acknowledgment char (ASCII 6)
answer = self._read_text_unsafe()
if len(answer) == 0 or ord(answer[0]) != 6:
message = f"Pfeiffer TPG not acknowledging command {cmd}"
logging.error(message)
if len(answer) > 0:
logging.debug(f"Pfeiffer TPG: {answer}")
raise PfeifferTPGError(message)
# send enquiry
self._write_text_unsafe(chr(5))
# read answer
answer = self._read_text_unsafe().strip()
if len(answer) == 0:
message = f"Pfeiffer TPG not answering to command {cmd}"
logging.error(message)
raise PfeifferTPGError(message)
return answer
@configdataclass
class PfeifferTPGConfig:
"""
Device configuration dataclass for Pfeiffer TPG controllers.
"""
class Model(NameEnum):
_init_ = "full_scale_ranges"
TPG25xA = {
1: 0,
10: 1,
100: 2,
1000: 3,
2000: 4,
5000: 5,
10000: 6,
50000: 7,
0.1: 8,
}
TPGx6x = {
0.01: 0,
0.1: 1,
1: 2,
10: 3,
100: 4,
1000: 5,
2000: 6,
5000: 7,
10000: 8,
50000: 9,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.full_scale_ranges_reversed: Dict[int, int] = {
v: k for k, v in self.full_scale_ranges.items()
}
def is_valid_scale_range_reversed_str(self, v: str) -> bool:
"""
Check if given string represents a valid reversed scale range of a model.
:param v: Reversed scale range string.
:return: `True` if valid, `False` otherwise.
"""
# Explicit check because otherwise we get `True` for instance for `float`
if not isinstance(v, str):
raise TypeError(f"Expected `str`, got `{type(v)}` instead.")
try:
return int(v) in self.full_scale_ranges_reversed
except ValueError:
return False
# model of the TPG (determines which lookup table to use for the
# full scale range)
model: Union[str, Model] = Model.TPG25xA # type: ignore
def clean_values(self):
if not isinstance(self.model, self.Model):
self.force_value("model", self.Model(self.model))
class PfeifferTPG(SingleCommDevice):
"""
Pfeiffer TPG control unit device class
"""
class PressureUnits(NameEnum):
"""
Enum of available pressure units for the digital display. "0" corresponds either
to bar or to mbar depending on the TPG model. In case of doubt, the unit is
visible on the digital display.
"""
mbar = 0
bar = 0
Torr = 1
Pascal = 2
Micron = 3
hPascal = 4
Volt = 5
SensorTypes = Enum( # type: ignore
value="SensorTypes",
names=[
("TPR/PCR Pirani Gauge", 1),
("TPR", 1),
("TPR/PCR", 1),
("IKR Cold Cathode Gauge", 2),
("IKR", 2),
("IKR9", 2),
("IKR11", 2),
("PKR Full range CC", 3),
("PKR", 3),
("APR/CMR Linear Gauge", 4),
("CMR", 4),
("APR/CMR", 4),
("CMR/APR", 4),
("Pirani / High Pressure Gauge", 5),
("IMR", 5),
("Fullrange BA Gauge", 6),
("PBR", 6),
("None", 7),
("no Sensor", 7),
("noSen", 7),
("noSENSOR", 7),
],
)
class SensorStatus(IntEnum):
Ok = 0
Underrange = 1
Overrange = 2
Sensor_error = 3
Sensor_off = 4
No_sensor = 5
Identification_error = 6
def __init__(self, com, dev_config=None):
# Call superclass constructor
super().__init__(com, dev_config)
# list of sensors connected to the TPG
self.sensors: List[str] = []
def __repr__(self):
return f"Pfeiffer TPG with {self.number_of_sensors} sensors: {self.sensors}"
@property
def number_of_sensors(self):
return len(self.sensors)
@property
def unit(self):
"""
The pressure unit of readings is always mbar, regardless of the display unit.
"""
return "mbar"
@staticmethod
def default_com_cls():
return PfeifferTPGSerialCommunication
@staticmethod
def config_cls():
return PfeifferTPGConfig
def start(self) -> None:
"""
Start this device. Opens the communication protocol,
and identify the sensors.
:raises SerialCommunicationIOError: when communication port cannot be opened
"""
logging.info("Starting Pfeiffer TPG")
super().start()
# identify the sensors connected to the TPG
# and also find out the number of channels
self.identify_sensors()
def stop(self) -> None:
"""
Stop the device. Closes also the communication protocol.
"""
logging.info(f"Stopping device {self}")
super().stop()
def identify_sensors(self) -> None:
"""
Send identification request TID to sensors on all channels.
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("TID")
except PfeifferTPGError:
logging.error("Pressure sensor identification failed.")
raise
# try matching the sensors:
sensors = []
for s in answer.split(","):
try:
sensors.append(self.SensorTypes[s].name)
except KeyError:
sensors.append("Unknown")
self.sensors = sensors
# identification successful:
logging.info(f"Identified {self}")
def set_display_unit(self, unit: Union[str, PressureUnits]) -> None:
"""
Set the unit in which the measurements are shown on the display.
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
if not isinstance(unit, self.PressureUnits):
unit = self.PressureUnits(unit)
try:
self.com.send_command(f"UNI,{unit.value}")
logging.info(f"Setting display unit to {unit.name}")
except PfeifferTPGError:
logging.error(
f"Setting display unit to {unit.name} failed. Not all units"
" are available on all TGP models"
)
raise
def measure(self, channel: int) -> Tuple[str, float]:
"""
Get the status and measurement of one sensor
:param channel: int channel on which the sensor is connected, with
1 <= channel <= number_of_sensors
:return: measured value as float if measurement successful,
sensor status as string if not
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
if not 1 <= channel <= self.number_of_sensors:
message = (
f"{channel} is not a valid channel number, it should be between "
f"1 and {self.number_of_sensors}"
)
logging.error(message)
raise ValueError(message)
try:
answer = self.com.query(f"PR{channel}")
except PfeifferTPGError:
logging.error(f"Reading sensor {channel} failed.")
raise
status, measurement = answer.split(",")
s = self.SensorStatus(int(status))
if s == self.SensorStatus.Ok:
logging.info(
f"Channel {channel} successful reading of "
f"pressure: {measurement} mbar."
)
else:
logging.info(
f"Channel {channel} no reading of pressure, sensor status is "
f"{self.SensorStatus(s).name}."
)
return s.name, float(measurement)
def measure_all(self) -> List[Tuple[str, float]]:
"""
Get the status and measurement of all sensors (this command is
not available on all models)
:return: list of measured values as float if measurements successful,
and or sensor status as strings if not
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("PRX")
except PfeifferTPGError:
logging.error(
"Getting pressure reading from all sensors failed (this "
"command is not available on all TGP models)."
)
raise
ans = answer.split(",")
ret = [
(self.SensorStatus(int(ans[2 * i])).name, float(ans[2 * i + 1]))
for i in range(self.number_of_sensors)
]
logging.info(f"Reading all sensors with result: {ret}.")
return ret
def _set_full_scale(self, fsr: List[Number], unitless: bool) -> None:
"""
Set the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:param fsr: list of full scale range values, like `[0, 1, 3, 3, 2, 0]` for
`unitless = True` scale or `[0.01, 1000]` otherwise (mbar units scale)
:param unitless: flag to indicate scale of range values; if `False` then mbar
units scale
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
if len(fsr) != self.number_of_sensors:
raise ValueError(
f"Argument fsr should be of length {self.number_of_sensors}. "
f"Received length {len(fsr)}."
)
possible_values_map = (
self.config.model.full_scale_ranges_reversed if unitless
else self.config.model.full_scale_ranges
)
wrong_values = [v for v in fsr if v not in possible_values_map]
if wrong_values:
raise ValueError(
f"Argument fsr contains invalid values: {wrong_values}. Accepted "
f"values are {list(possible_values_map.items())}"
f"{'' if unitless else ' mbar'}."
)
str_fsr = ",".join([
str(f if unitless else possible_values_map[f]) for f in fsr
])
try:
self.com.send_command(f"FSR,{str_fsr}")
logging.info(f"Set sensors full scale to {fsr} (unitless) respectively.")
except PfeifferTPGError as e:
logging.error("Setting sensors full scale failed.")
raise e
def _get_full_scale(self, unitless: bool) -> List[Number]:
"""
Get the full scale range of the attached sensors. See lookup table between
command and corresponding pressure in the device user manual.
:param unitless: flag to indicate scale of range values; if `False` then mbar
units scale
:return: list of full scale range values, like `[0, 1, 3, 3, 2, 0]` for
`unitless = True` scale or `[0.01, 1000]` otherwise (mbar units scale)
:raises SerialCommunicationIOError: when communication port is not opened
:raises PfeifferTPGError: if command fails
"""
try:
answer = self.com.query("FSR")
except PfeifferTPGError:
logging.error("Query full scale range of all sensors failed.")
raise
answer_values = answer.split(",")
wrong_values = [
v for v in answer_values
if not self.config.model.is_valid_scale_range_reversed_str(v)
]
if wrong_values:
raise PfeifferTPGError(
f"The controller returned the full unitless scale range values: "
f"{answer}. The values {wrong_values} are invalid. Accepted values are "
f"{list(self.config.model.full_scale_ranges_reversed.keys())}."
)
fsr = [
int(v) if unitless else self.config.model.full_scale_ranges_reversed[int(v)]