pfeiffer_tpg.py 16.4 KB
Newer Older
1
"""
2
Device class for Pfeiffer TPG controllers.
3
4

The Pfeiffer TPG control units are used to control Pfeiffer Compact Gauges.
Chachereau Alise's avatar
Chachereau Alise committed
5
Models: TPG 251 A, TPG 252 A, TPG 256A, TPG 261, TPG 262, TPG 361, TPG 362 and TPG 366.
Chachereau Alise's avatar
Chachereau Alise committed
6

7
8
9
10
11
12
Manufacturer homepage:
https://www.pfeiffer-vacuum.com/en/products/measurement-analysis/
measurement/activeline/controllers/
"""

import logging
Chachereau Alise's avatar
Chachereau Alise committed
13
from enum import Enum, IntEnum
14
from typing import List, Union, Tuple
15
16
17
18

from .base import SingleCommDevice
from ..comm import SerialCommunication, SerialCommunicationConfig
from ..configuration import configdataclass
19
from ..utils.enum import NameEnum
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47


@configdataclass
class PfeifferTPGSerialCommunicationConfig(SerialCommunicationConfig):
    #: Baudrate for Pfeiffer TPG controllers is 9600 baud
    baudrate: int = 9600

    #: Pfeiffer TPG controllers do not use parity
    parity: (str, SerialCommunicationConfig.Parity) = \
        SerialCommunicationConfig.Parity.NONE

    #: Pfeiffer TPG controllers use one stop bit
    stopbits: (int, SerialCommunicationConfig.Stopbits) = \
        SerialCommunicationConfig.Stopbits.ONE

    #: One byte is eight bits long
    bytesize: (int, SerialCommunicationConfig.Bytesize) = \
        SerialCommunicationConfig.Bytesize.EIGHTBITS

    #: The terminator is <CR><LF>
    terminator: bytes = b'\r\n'

    #: use 3 seconds timeout as default
    timeout: (int, float) = 3


class PfeifferTPGSerialCommunication(SerialCommunication):
    """
48
    Specific communication protocol implementation for Pfeiffer TPG controllers.
49
50
51
52
53
54
55
    Already predefines device-specific protocol parameters in config.
    """

    @staticmethod
    def config_cls():
        return PfeifferTPGSerialCommunicationConfig

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    def send_command(self, cmd: str) -> None:
        """
        Send a command to the device and check for acknowledgement.

        :param cmd: command to send to the device
        :raises SerialCommunicationIOError: when communication port is not opened
        :raises PfeifferTPGError: if the answer from the device differs from the
        expected acknowledgement character 'chr(6)'.
        """

        with self.access_lock:
            # send the command
            self._write_text_unsafe(cmd)
            # check for acknowledgment char (ASCII 6)
            answer = self._read_text_unsafe()
            if len(answer) == 0 or ord(answer[0]) != 6:
                message = f"Pfeiffer TPG not acknowledging command {cmd}"
                logging.error(message)
                if len(answer) > 0:
                    logging.debug(f"Pfeiffer TPG: {answer}")
                raise PfeifferTPGError(message)

    def query(self, cmd: str) -> str:
        """
        Send a query, then read and returns the first line from the com port.

        :param cmd: query message to send to the device
        :return: first line read on the com
        :raises SerialCommunicationIOError: when communication port is not opened
        :raises PfeifferTPGError: if the device does not acknowledge the command or if
        the answer from the device is empty
        """

        with self.access_lock:
            # send the command
            self._write_text_unsafe(cmd)
            # check for acknowledgment char (ASCII 6)
            answer = self._read_text_unsafe()
            if len(answer) == 0 or ord(answer[0]) != 6:
                message = f"Pfeiffer TPG not acknowledging command {cmd}"
                logging.error(message)
                if len(answer) > 0:
                    logging.debug(f"Pfeiffer TPG: {answer}")
                raise PfeifferTPGError(message)
            # send enquiry
            self._write_text_unsafe(chr(5))
            # read answer
            answer = self._read_text_unsafe().strip()
            if len(answer) == 0:
                message = f"Pfeiffer TPG not answering to command {cmd}"
                logging.error(message)
                raise PfeifferTPGError(message)
            return answer

110
111
112
113
114
115
116

@configdataclass
class PfeifferTPGConfig:
    """
    Device configuration dataclass for Pfeiffer TPG controllers.
    """

117
118
    class Model(NameEnum):
        _init_ = 'full_scale_ranges'
Chachereau Alise's avatar
Chachereau Alise committed
119
120
121
122
123
        TPG25xA = {1: 0, 10: 1, 100: 2, 1000: 3, 2000: 4, 5000: 5,
                   10000: 6, 50000: 7, 0.1: 8}
        TPGx6x = {0.01: 0, 0.1: 1, 1: 2, 10: 3, 100: 4, 1000: 5,
                  2000: 6, 5000: 7, 10000: 8, 50000: 9}

124
125
126
127
128
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.full_scale_ranges_reversed = {
                v: k for k, v in self.full_scale_ranges.items()
            }
Chachereau Alise's avatar
Chachereau Alise committed
129
130
131

    # model of the TPG (determines which lookup table to use for the
    # full scale range)
132
    model: (str, Model) = Model.TPG25xA
Chachereau Alise's avatar
Chachereau Alise committed
133
134

    def clean_values(self):
135
        if not isinstance(self.model, self.Model):
Chachereau Alise's avatar
Chachereau Alise committed
136
            self.force_value('model', self.Model(self.model))
Chachereau Alise's avatar
Chachereau Alise committed
137

138
139
140

class PfeifferTPG(SingleCommDevice):
    """
alisec's avatar
alisec committed
141
    Pfeiffer TPG control unit device class
142
143
    """

144
145
    class PressureUnits(NameEnum):
        """
146
147
148
        Enum of available pressure units for the digital display. "0" corresponds either
        to bar or to mbar depending on the TPG model. In case of doubt, the unit is
        visible on the digital display.
149
150
        """

151
152
153
154
155
156
157
158
        mbar = 0
        bar = 0
        Torr = 1
        Pascal = 2
        Micron = 3
        hPascal = 4
        Volt = 5

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    SensorTypes = Enum(
        value='SensorTypes',
        names=[
            ('TPR/PCR Pirani Gauge', 1),
            ('TPR', 1),
            ('TPR/PCR', 1),
            ('IKR Cold Cathode Gauge', 2),
            ('IKR', 2),
            ('IKR9', 2),
            ('IKR11', 2),
            ('PKR Full range CC', 3),
            ('PKR', 3),
            ('APR/CMR Linear Gauge', 4),
            ('CMR', 4),
            ('APR/CMR', 4),
            ('CMR/APR', 4),
            ('Pirani / High Pressure Gauge', 5),
            ('IMR', 5),
            ('Fullrange BA Gauge', 6),
            ('PBR', 6),
            ('None', 7),
            ('no Sensor', 7),
            ('noSen', 7),
            ('noSENSOR', 7)
        ]
    )
Chachereau Alise's avatar
Chachereau Alise committed
185
186
187
188
189
190
191
192
193

    class SensorStatus(IntEnum):
        Ok = 0
        Underrange = 1
        Overrange = 2
        Sensor_error = 3
        Sensor_off = 4
        No_sensor = 5
        Identification_error = 6
alisec's avatar
alisec committed
194

195
196
197
198
199
    def __init__(self, com, dev_config=None):

        # Call superclass constructor
        super().__init__(com, dev_config)

alisec's avatar
alisec committed
200
        # list of sensors connected to the TPG
201
        self.sensors = []
Chachereau Alise's avatar
Chachereau Alise committed
202
203

    def __repr__(self):
alisec's avatar
alisec committed
204
        return f"Pfeiffer TPG with {self.number_of_sensors} sensors: {self.sensors}"
Chachereau Alise's avatar
Chachereau Alise committed
205

206
207
208
209
    @property
    def number_of_sensors(self):
        return len(self.sensors)

Chachereau Alise's avatar
Chachereau Alise committed
210
211
    @property
    def unit(self):
alisec's avatar
alisec committed
212
213
214
215
        """
        The pressure unit of readings is always mbar, regardless of the display unit.
        """
        return "mbar"
Chachereau Alise's avatar
Chachereau Alise committed
216

217
218
219
220
221
222
223
224
225
226
    @staticmethod
    def default_com_cls():
        return PfeifferTPGSerialCommunication

    @staticmethod
    def config_cls():
        return PfeifferTPGConfig

    def start(self) -> None:
        """
Chachereau Alise's avatar
Chachereau Alise committed
227
228
        Start this device. Opens the communication protocol,
        and identify the sensors.
alisec's avatar
alisec committed
229
230

        :raises SerialCommunicationIOError: when communication port cannot be opened
231
232
        """

233
        logging.info("Starting Pfeiffer TPG")
alisec's avatar
alisec committed
234
        super().start()
235

alisec's avatar
alisec committed
236
237
238
239
        # identify the sensors connected to the TPG
        # and also find out the number of channels
        self.identify_sensors()

240
241
242
243
244
    def stop(self) -> None:
        """
        Stop the device. Closes also the communication protocol.
        """

alisec's avatar
alisec committed
245
        logging.info(f"Stopping device {self}")
alisec's avatar
alisec committed
246
        super().stop()
Chachereau Alise's avatar
Chachereau Alise committed
247

248
    def identify_sensors(self) -> None:
alisec's avatar
alisec committed
249
        """
Chachereau Alise's avatar
Chachereau Alise committed
250
        Send identification request TID to sensors on all channels.
251
252

        :raises SerialCommunicationIOError: when communication port is not opened
253
        :raises PfeifferTPGError: if command fails
alisec's avatar
alisec committed
254
        """
255
256

        try:
257
            answer = self.com.query("TID")
258
        except PfeifferTPGError:
alisec's avatar
alisec committed
259
            logging.error("Pressure sensor identification failed.")
260
261
262
            raise

        # try matching the sensors:
263
264
265
266
267
268
269
        sensors = []
        for s in answer.split(','):
            try:
                sensors.append(self.SensorTypes[s].name)
            except KeyError:
                sensors.append('Unknown')
        self.sensors = sensors
270
        # identification successful:
alisec's avatar
alisec committed
271
        logging.info(f"Identified {self}")
272

273
    def set_display_unit(self, unit: (str, PressureUnits)) -> None:
Chachereau Alise's avatar
Chachereau Alise committed
274
275
        """
        Set the unit in which the measurements are shown on the display.
276
277

        :raises SerialCommunicationIOError: when communication port is not opened
278
        :raises PfeifferTPGError: if command fails
Chachereau Alise's avatar
Chachereau Alise committed
279
280
        """

281
        if not isinstance(unit, self.PressureUnits):
282
283
            unit = self.PressureUnits(unit)

284
        try:
285
            self.com.send_command(f"UNI,{unit.value}")
alisec's avatar
alisec committed
286
            logging.info(f"Setting display unit to {unit.name}")
287
        except PfeifferTPGError:
alisec's avatar
alisec committed
288
289
            logging.error(f"Setting display unit to {unit.name} failed. Not all units"
                          " are available on all TGP models")
290
            raise
Chachereau Alise's avatar
Chachereau Alise committed
291

292
    def measure(self, channel: int) -> Tuple[str, float]:
alisec's avatar
alisec committed
293
294
        """
        Get the status and measurement of one sensor
295

296
297
        :param channel: int channel on which the sensor is connected, with
        1 <= channel <= number_of_sensors
298
        :return: measured value as float if measurement successful,
alisec's avatar
alisec committed
299
        sensor status as string if not
300
        :raises SerialCommunicationIOError: when communication port is not opened
301
        :raises PfeifferTPGError: if command fails
alisec's avatar
alisec committed
302
        """
303
304

        if not 1 <= channel <= self.number_of_sensors:
alisec's avatar
alisec committed
305
306
            message = (f"{channel} is not a valid channel number, it should be between "
                       f"1 and {self.number_of_sensors}")
Chachereau Alise's avatar
Chachereau Alise committed
307
308
            logging.error(message)
            raise ValueError(message)
309

310
        try:
311
            answer = self.com.query(f"PR{channel}")
312
        except PfeifferTPGError:
alisec's avatar
alisec committed
313
            logging.error(f"Reading sensor {channel} failed.")
314
315
316
317
318
            raise

        status, measurement = answer.split(',')
        s = self.SensorStatus(int(status))
        if s == self.SensorStatus.Ok:
alisec's avatar
alisec committed
319
320
            logging.info(f"Channel {channel} successful reading of "
                         f"pressure: {measurement} mbar.")
321
        else:
alisec's avatar
alisec committed
322
323
            logging.info(f"Channel {channel} no reading of pressure, sensor status is "
                         f"{self.SensorStatus(s).name}.")
324
325
326
        return s.name, float(measurement)

    def measure_all(self) -> List[Tuple[str, float]]:
alisec's avatar
alisec committed
327
328
329
        """
        Get the status and measurement of all sensors (this command is
        not available on all models)
330

331
        :return: list of measured values as float if measurements successful,
alisec's avatar
alisec committed
332
        and or sensor status as strings if not
333
        :raises SerialCommunicationIOError: when communication port is not opened
334
        :raises PfeifferTPGError: if command fails
335
        """
alisec's avatar
alisec committed
336

337
        try:
338
            answer = self.com.query('PRX')
339
340
341
342
343
344
345
346
        except PfeifferTPGError:
            logging.error("Getting pressure reading from all sensors failed (this "
                          "command is not available on all TGP models).")
            raise

        ans = answer.split(',')
        ret = [(self.SensorStatus(int(ans[2*i])).name, float(ans[2*i+1]))
               for i in range(self.number_of_sensors)]
alisec's avatar
alisec committed
347
        logging.info(f"Reading all sensors with result: {ret}.")
348
349
350
        return ret

    def set_full_scale_unitless(self, fsr: List[int]) -> None:
Chachereau Alise's avatar
Chachereau Alise committed
351
        """
352
        Set the full scale range of the attached sensors. See lookup table between
353
        command and corresponding pressure in the device user manual.
354

Chachereau Alise's avatar
Chachereau Alise committed
355
        :param fsr: list of full scale range values, like [0, 1, 3, 3, 2, 0]
356
        :raises SerialCommunicationIOError: when communication port is not opened
357
        :raises PfeifferTPGError: if command fails
Chachereau Alise's avatar
Chachereau Alise committed
358
        """
alisec's avatar
alisec committed
359

360
361
        wrong_values = [v for v in fsr
                        if v not in self.config.model.full_scale_ranges_reversed]
362
        if len(fsr) != self.number_of_sensors:
363
364
            raise ValueError(f"Argument fsr should be of length "
                             f"{self.number_of_sensors}. Received length {len(fsr)}.")
365
        elif wrong_values:
366
367
368
369
            raise ValueError(
                f"Argument fsr contains invalid values: {wrong_values}. Accepted "
                f"values are {list(self.config.model.full_scale_ranges.values())}"
            )
370
371
372

        str_fsr = ','.join([str(f) for f in fsr])
        try:
373
            self.com.send_command(f"FSR,{str_fsr}")
alisec's avatar
alisec committed
374
            logging.info(f"Set sensors full scale to {fsr} respectively.")
375
        except PfeifferTPGError:
alisec's avatar
alisec committed
376
            logging.error("Setting sensors full scale failed.")
377
378
379
            raise

    def get_full_scale_unitless(self) -> List[int]:
Chachereau Alise's avatar
Chachereau Alise committed
380
        """
381
        Get the full scale range of the attached sensors. See lookup table between
382
        command and corresponding pressure in the device user manual.
383

Chachereau Alise's avatar
Chachereau Alise committed
384
        :return: list of full scale range values, like [0, 1, 3, 3, 2, 0]
385
        :raises SerialCommunicationIOError: when communication port is not opened
386
        :raises PfeifferTPGError: if command fails
Chachereau Alise's avatar
Chachereau Alise committed
387
        """
Chachereau Alise's avatar
Chachereau Alise committed
388

389
        try:
390
            answer = self.com.query("FSR")
391
        except PfeifferTPGError:
alisec's avatar
alisec committed
392
            logging.error("Query full scale range of all sensors failed.")
393
            raise
394
395

        wrong_values = [v for v in answer.split(',')
396
397
                        if not v.isdigit()
                        or int(v) not in self.config.model.full_scale_ranges_reversed]
398
        if wrong_values:
399
400
401
402
403
            raise PfeifferTPGError(
                f"The controller returned the full scale range values: {answer}. The "
                f"values {wrong_values} are invalid. Accepted values are "
                f"{list(self.config.model.full_scale_ranges.values())}."
            )
404

405
        fsr = [int(i) for i in answer.split(',')]
alisec's avatar
alisec committed
406
        logging.info(f"Obtained full scale range of all sensors as {fsr}.")
407
408
409
        return fsr

    def set_full_scale_mbar(self, fsr: List[Union[int, float]]) -> None:
Chachereau Alise's avatar
Chachereau Alise committed
410
411
        """
        Set the full scale range of the attached sensors (in unit mbar)
412

Chachereau Alise's avatar
Chachereau Alise committed
413
        :param fsr: full scale range values in mbar, for example [0.01, 1000]
414
        :raises SerialCommunicationIOError: when communication port is not opened
415
        :raises PfeifferTPGError: if command fails
Chachereau Alise's avatar
Chachereau Alise committed
416
417
        """

418
419
        wrong_values = [v for v in fsr if v not in self.config.model.full_scale_ranges]
        if len(fsr) != self.number_of_sensors:
420
421
422
            raise ValueError(
                f"Argument fsr should be of length {self.number_of_sensors}. "
                f"Received length {len(fsr)}.")
423
        elif wrong_values:
424
425
426
427
            raise ValueError(
                f"Argument fsr contains invalid values: {wrong_values}. Accepted "
                f"values are {list(self.config.model.full_scale_ranges.keys())}"
            )
428

429
        str_fsr = ','.join([str(self.config.model.full_scale_ranges[f]) for f in fsr])
430
431

        try:
432
            self.com.send_command(f"FSR,{str_fsr}")
alisec's avatar
alisec committed
433
            logging.info(f"Set sensors full scale to {fsr} respectively.")
434
        except PfeifferTPGError:
alisec's avatar
alisec committed
435
            logging.error("Setting sensors full scale range failed.")
436
437
438
            raise

    def get_full_scale_mbar(self) -> List[Union[int, float]]:
Chachereau Alise's avatar
Chachereau Alise committed
439
440
        """
        Get the full scale range of the attached sensors
441

Chachereau Alise's avatar
Chachereau Alise committed
442
        :return: full scale range values in mbar, like [0.01, 1, 0.1, 1000, 50000, 10]
443
        :raises SerialCommunicationIOError: when communication port is not opened
444
        :raises PfeifferTPGError: if command fails
Chachereau Alise's avatar
Chachereau Alise committed
445
        """
446

447
        try:
448
            answer = self.com.query("FSR")
449
        except PfeifferTPGError:
alisec's avatar
alisec committed
450
            logging.info("Query full scale range of all sensors failed.")
451
452
            raise

453
        wrong_values = [v for v in answer.split(',')
454
455
                        if not v.isdigit()
                        or int(v) not in self.config.model.full_scale_ranges_reversed]
456
        if wrong_values:
457
458
459
460
461
            raise PfeifferTPGError(
                f"The controller returned the full scale range values: {answer}. The "
                f"values {wrong_values} are invalid. Accepted values are "
                f"{list(self.config.model.full_scale_ranges_reversed.keys())}."
            )
462

463
        fsr = [self.config.model.full_scale_ranges_reversed[int(i)]
464
               for i in answer.split(',')]
alisec's avatar
alisec committed
465
        logging.info(f"Obtained full scale range of sensors as {fsr} mbar.")
466
467
468
469
470
471
472
473
        return fsr


class PfeifferTPGError(Exception):
    """
    Error with the Pfeiffer TPG Controller.
    """
    pass