To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 3354b9df authored by mikolajr's avatar mikolajr
Browse files

Merge branch 'devel' into dev_pfeiffer_tpg

parents bbc85442 88659a43
Pipeline #53117 failed with stages
in 2 minutes and 12 seconds
......@@ -4,6 +4,7 @@
__pycache__
*.egg-info/
docs/_build/
.mypy_cache/
.eggs
.tox
.coverage
......
......@@ -2,6 +2,7 @@ image: python:3.7-stretch
stages:
- style
- typing
- tests
# Note: need to repeat Python installation steps because:
......@@ -23,6 +24,8 @@ tests:
script:
- tox -r -vv -e py37
stage: tests
tags:
- docker-executor
style:
before_script:
......@@ -32,3 +35,16 @@ style:
- tox -r -vv -e flake8
allow_failure: true
stage: style
tags:
- docker-executor
typing:
before_script:
- pip install -U pip setuptools
- pip install tox
script:
- tox -r -vv -e mypy
allow_failure: true
stage: typing
tags:
- docker-executor
......@@ -2,6 +2,22 @@
History
=======
Devel
-----
* New devices using serial connection:
* Heinzinger Digital Interface I/II and a Heinzinger PNC power supply
* Q-switched Pulsed Laser and a laser attenuator from CryLas
* Newport SMC100PP single axis motion controller for 2-phase stepper motors
* Pfeiffer TPG controller (TPG 25x, TPG 26x and TPG 36x) for Compact pressure Gauges
* API refactorings:
* Protected non-thread safe read and write in communication protocols
* Device sequence mixin: start/stop, add/rm and lookup
* PEP 561 compatibility and static type checking corrections
* Refactored `.format()` to f-strings.
* Improved error docstrings (:code:`:raises:` annotations) and extended tests for
errors.
0.3.3 (2019-05-08)
------------------
......
......@@ -64,9 +64,15 @@ clean-test: ## remove test and coverage artifacts
rm -fr htmlcov/
rm -fr .pytest_cache
lint: ## check style with flake8
style: ## check style with flake8
flake8 hvl_ccb tests
format: ## auto-format with black
black hvl_ccb tests
type: ## static code check using typing hints with mypy
mypy --show-error-codes hvl_ccb
test: ## run tests quickly with the default Python
py.test -n auto --dist=loadscope
......
......@@ -50,7 +50,12 @@ the following devices:
* a Elektro-Automatik PSI9000 DC power supply using VISA over TCP for communication
* a Rhode & Schwarz RTO 1024 oscilloscope using VISA interface over :code:`TCP::INSTR`
* a state-of-the-art HVL in-house Supercube device variants using an OPC UA client
* a Heinzinger Digital Interface I/II and a Heinzinger PNC power supply over a serial connection
* a Heinzinger Digital Interface I/II and a Heinzinger PNC power supply over a serial
connection
* a passively Q-switched Pulsed Laser and a laser attenuator from CryLas over a serial
connection
* a Newport SMC100PP single axis motion controller for 2-phase stepper motors over
a serial connection
* a Pfeiffer TPG controller (TPG 25x, TPG 26x and TPG 36x) for Compact pressure Gauges
......
......@@ -13,9 +13,11 @@ from hvl_ccb.dev import LabJack
logging.basicConfig(level=logging.INFO)
# generate configuration dict
config = {"device_type": "T7",
config = {
"device_type": "T7",
"connection_type": "TCP",
"identifier": "Any"}
"identifier": "Any",
}
# create connection
comm = LJMCommunication(config)
......@@ -27,19 +29,20 @@ labj = LabJack(comm)
labj.start()
# get and print the serial number of the connected LabJack device
print('SN: {}'.format(labj.get_serial_number()))
print(f"SN: {labj.get_serial_number()}")
# print temperature and relative humidity of a connected SBUS sensor
print("Temp DI = {:.2f} °C, RH = {:.1f} %".format((labj.get_sbus_temp(0) - 273.15), labj.get_sbus_rh(0) ))
print(
f"Temp DI = {(labj.get_sbus_temp(0) - 273.15):.2f} °C, "
f"RH = {labj.get_sbus_rh(0):.1f}%"
)
# init a thermocouple and read the value
ain_tc = 0
labj.set_ain_thermocouple(ain_tc,
thermocouple='T',
cjc_address=4,
cjc_type='lm34',
unit='C')
print(f'Temp TC = {labj.read_thermocouple(ain_tc):.2f} °C')
labj.set_ain_thermocouple(
ain_tc, thermocouple="T", cjc_address=4, cjc_type="lm34", unit="C",
)
print(f"Temp TC = {labj.read_thermocouple(ain_tc):.2f} °C")
sleep(0.5)
# init force sensor read out
......@@ -51,8 +54,19 @@ labj.set_ain_resolution(ain_force, 12)
max_force = 10_000
max_voltage = 5
print('Force = {:.2f} N'.format(labj.get_ain(ain_force) * max_force / max_voltage))
print(f"Force = {(labj.get_ain(ain_force) * max_force / max_voltage):.2f} N")
sleep(0.5)
print(
f"200 uA current source calibration I_cal = "
f"{(labj.get_cal_current_source('200u')*1e6):.3f} A"
)
print(
f"10 uA current source calibration I_cal = "
f"{(labj.get_cal_current_source('10u')*1e6):.3f} A"
)
print(f"State FIO0: {labj.get_digital_input('FIO0')}")
# stop device
labj.stop()
"""
Example script for the devices CryLasLaser and CrylasAttenuator
"""
import logging
from time import sleep
from hvl_ccb import ExperimentManager
from hvl_ccb.dev import CryLasLaser, CryLasAttenuator, CryLasLaserConfig
logging.basicConfig(level=logging.INFO)
# create devices with configuration
crylas = ExperimentManager({
'las': CryLasLaser({'port': 'COM10'}, {
'on_start_wait_until_ready': False,
'auto_laser_on': True,
'init_shutter_status': CryLasLaserConfig.ShutterStatus.CLOSED,
}),
'att': CryLasAttenuator({'port': 'COM12'}, {'init_attenuation': 0}),
})
print(crylas.status)
# start the laser (will also turn on the laser if 'auto_laser_on' is True, and set the
# shutter in the configured position, ex: CryLasLaserConfig.ShutterStatus.CLOSED)
crylas.start()
print(crylas.status)
# change the repetition rate (internal software trigger): 10, 20 or 60 Hz
crylas.las.set_repetition_rate(10)
# open the shutter
crylas.las.open_shutter()
# wait until the laser is on (this can take up to 5 minutes)
crylas.las.wait_until_ready()
# change the percentage of transmitted light (same as las.set_attenuation(20))
crylas.att.set_transmission(80)
# get the current pulse energy and rate
# rate = 0, means using internal hardware trigger
energy, rate = crylas.las.get_pulse_energy_and_rate()
print(f'Energy: {energy} uJ, rate: {rate} Hz')
# turn off the laser
crylas.las.laser_off()
# ... do stuff
# turn the laser on again
crylas.las.laser_on()
# set the pulse energy in uJ (possible only when using an external hardware trigger)
crylas.las.set_pulse_energy(100)
# stop the laser controller (this turns off the laser and closes the shutter) and the
# attenuator
crylas.stop()
print(crylas.status)
......@@ -3,7 +3,6 @@ Example script for the device HeinzingerPNC
"""
import logging
import sys
from time import sleep
from hvl_ccb.dev import HeinzingerPNC, HeinzingerPNCMaxVoltageExceededException
......
"""
Example script for the device Newport SMC100PP
"""
import logging
from hvl_ccb.dev import NewportSMC100PP, NewportSMC100PPConfig
logging.basicConfig(level=logging.INFO)
# create device object
mot_config = {
"address": 1,
"user_position_offset": 23.987,
"screw_scaling": 1.00188172,
"acceleration": 10,
"backlash_compensation": 0,
"hysteresis_compensation": 0.015,
"micro_step_per_full_step_factor": 100,
"motion_distance_per_full_step": 0.01,
"home_search_type": NewportSMC100PPConfig.HomeSearch.HomeSwitch,
"jerk_time": 0.04,
"home_search_velocity": 4,
"home_search_timeout": 27.5,
"peak_output_current_limit": 0.4,
"rs485_address": 2,
"negative_software_limit": -24,
"positive_software_limit": 26,
"velocity": 4,
"base_velocity": 0,
"stage_configuration": NewportSMC100PPConfig.EspStageConfig.EnableEspStageCheck,
}
mot = NewportSMC100PP({"port": "COM5"}, mot_config)
# start() opens the com, and if the controller is in state NO_REF state:
# * applies the config
# * initialize the controller (put the motor to its home position and the controller in
# READY state)
mot.start()
# move the motor to a certain absolute positive
mot.move_to_absolute_position(40)
mot.wait_until_move_finished()
# move the motor relatively to the current position
mot.move_to_relative_position(-10)
mot.wait_until_move_finished()
# send the motor to its home position, which corresponds to 'user_position_offset'
mot.go_home()
mot.wait_until_move_finished()
# stop() ceases the motion of the motor and closes the com. Use mot.stop_motion() if you
# want only to cease the motion.
mot.stop()
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""Internal devices subpackage to avoid possible circular imports when sharing utilities
between `comm` and `dev` packages."""
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""LabJack device utilities."""
from __future__ import annotations
import warnings
from typing import Dict, Iterable, List, Union
from aenum import EnumMeta
from labjack.ljm import constants
from ..utils.enum import AutoNumberNameEnum, NameEnum
class TSeriesDIOChannel(NameEnum):
"""
Digital Input/Output (DIO) addresses for various LabJack devices from T-Series.
NOTE: not all DIO addresses are available on all devices. This is defined as
`dio` attribute of `LabJackDeviceType`.
"""
FIO0 = 0
FIO1 = 1
FIO2 = 2
FIO3 = 3
FIO4 = 4
FIO5 = 5
FIO6 = 6
FIO7 = 7
EIO0 = 8
EIO1 = 9
EIO2 = 10
EIO3 = 11
EIO4 = 12
EIO5 = 13
EIO6 = 14
EIO7 = 15
CIO0 = 16
CIO1 = 17
CIO2 = 18
CIO3 = 19
MIO0 = 20
MIO1 = 21
MIO2 = 22
def _build_p_id_lookup_dict(
lab_jack_device_types: Iterable["DeviceType"],
) -> Dict[int, List]:
"""
Build lookup dictionary of `DeviceType` instances based on their `p_id`. Note:
`p_id` is not unique for each device type.
:param lab_jack_device_types: `DeviceType` instances to iterate over
:return: `int`-based lookup dictionary
"""
ret: Dict[int, List] = dict()
for lab_jack_device_type in lab_jack_device_types:
if lab_jack_device_type.p_id not in ret:
ret[lab_jack_device_type.p_id] = list()
ret[lab_jack_device_type.p_id].append(lab_jack_device_type)
return ret
# NOTE: super metaclass has to match metaclass of `super(DeviceType)`!
class DeviceTypeMeta(EnumMeta):
def __new__(metacls, clsname, bases, clsdict, **kwargs):
cls = EnumMeta.__new__(
metacls, clsname, bases, clsdict, **kwargs
)
cls._get_by_p_id = _build_p_id_lookup_dict(cls)
return cls
class AmbiguousProductIdWarning(UserWarning):
pass
class DeviceType(AutoNumberNameEnum, metaclass=DeviceTypeMeta):
"""
LabJack device types.
Can be also looked up by ambigious Product ID (`p_id`) or by instance name:
```python
LabJackDeviceType(4) is LabJackDeviceType('T4')
```
"""
_init_ = "value p_id type_str ain_max_resolution dio"
ANY = (), constants.dtANY, "ANY", 0, ()
T4 = (), constants.dtT4, "T4", 4, (
TSeriesDIOChannel.FIO4,
TSeriesDIOChannel.FIO5,
TSeriesDIOChannel.FIO6,
TSeriesDIOChannel.FIO7,
TSeriesDIOChannel.EIO0,
TSeriesDIOChannel.EIO1,
TSeriesDIOChannel.EIO2,
TSeriesDIOChannel.EIO3,
TSeriesDIOChannel.EIO4,
TSeriesDIOChannel.EIO5,
TSeriesDIOChannel.EIO6,
TSeriesDIOChannel.EIO7,
TSeriesDIOChannel.CIO0,
TSeriesDIOChannel.CIO1,
TSeriesDIOChannel.CIO2,
TSeriesDIOChannel.CIO3,
)
T7 = (), constants.dtT7, "T7", 8, (
TSeriesDIOChannel.FIO0,
TSeriesDIOChannel.FIO1,
TSeriesDIOChannel.FIO2,
TSeriesDIOChannel.FIO3,
TSeriesDIOChannel.FIO4,
TSeriesDIOChannel.FIO5,
TSeriesDIOChannel.FIO6,
TSeriesDIOChannel.FIO7,
TSeriesDIOChannel.EIO0,
TSeriesDIOChannel.EIO1,
TSeriesDIOChannel.EIO2,
TSeriesDIOChannel.EIO3,
TSeriesDIOChannel.EIO4,
TSeriesDIOChannel.EIO5,
TSeriesDIOChannel.EIO6,
TSeriesDIOChannel.EIO7,
TSeriesDIOChannel.CIO0,
TSeriesDIOChannel.CIO1,
TSeriesDIOChannel.CIO2,
TSeriesDIOChannel.CIO3,
TSeriesDIOChannel.MIO0,
TSeriesDIOChannel.MIO1,
TSeriesDIOChannel.MIO2,
)
T7_PRO = (), constants.dtT7, "T7", 12, (
TSeriesDIOChannel.FIO0,
TSeriesDIOChannel.FIO1,
TSeriesDIOChannel.FIO2,
TSeriesDIOChannel.FIO3,
TSeriesDIOChannel.FIO4,
TSeriesDIOChannel.FIO5,
TSeriesDIOChannel.FIO6,
TSeriesDIOChannel.FIO7,
TSeriesDIOChannel.EIO0,
TSeriesDIOChannel.EIO1,
TSeriesDIOChannel.EIO2,
TSeriesDIOChannel.EIO3,
TSeriesDIOChannel.EIO4,
TSeriesDIOChannel.EIO5,
TSeriesDIOChannel.EIO6,
TSeriesDIOChannel.EIO7,
TSeriesDIOChannel.CIO0,
TSeriesDIOChannel.CIO1,
TSeriesDIOChannel.CIO2,
TSeriesDIOChannel.CIO3,
TSeriesDIOChannel.MIO0,
TSeriesDIOChannel.MIO1,
TSeriesDIOChannel.MIO2,
)
# DIGIT = (), constants.dtDIGIT, 'DIGIT', 0, ()
# SERIES = (), constants.dtTSERIES, 'SERIES', 0, ()
@classmethod
def get_by_p_id(cls, p_id: int) -> Union[DeviceType, List[DeviceType]]:
"""
Get LabJack device type instance via LabJack product ID.
Note: Product ID is not unambiguous for LabJack devices.
:param p_id: Product ID of a LabJack device
:return: Instance or list of instances of `LabJackDeviceType`
"""
instances_list = cls._get_by_p_id[p_id]
if len(instances_list) > 1:
warnings.warn(
f"Product ID {p_id} matches multiple device types: "
f"{', '.join(instance.name for instance in instances_list)}.",
AmbiguousProductIdWarning,
)
ret = instances_list
else:
ret = instances_list[0]
return ret
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""Communication protocols subpackage."""
from .base import CommunicationProtocol # noqa: F401
......@@ -14,6 +16,7 @@ from .modbus_tcp import ( # noqa: F401
from .opc import ( # noqa: F401
OpcUaCommunication,
OpcUaCommunicationConfig,
OpcUaCommunicationIOError,
OpcUaSubHandler,
)
from .serial import ( # noqa: F401
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Communication protocol for LabJack using the LJM Library.
Originally developed and tested for LabJack T7-PRO.
......@@ -16,6 +18,7 @@ from typing import Union, Sequence, Tuple
from labjack import ljm
from .base import CommunicationProtocol
from .._dev import labjack
from ..configuration import configdataclass
from ..utils.enum import AutoNumberNameEnum
......@@ -24,6 +27,7 @@ class LJMCommunicationError(Exception):
"""
Errors coming from LJMCommunication.
"""
pass
......@@ -33,22 +37,16 @@ class LJMCommunicationConfig:
Configuration dataclass for :class:`LJMCommunication`.
"""
class DeviceType(AutoNumberNameEnum):
"""
LabJack device type.
"""
ANY = ()
T7 = ()
T4 = ()
DIGIT = ()
DeviceType = labjack.DeviceType
#: Can bei either string 'ANY', 'T7', 'T4', 'DIGIT' or of enum :class:`DeviceType`.
device_type: (str, DeviceType) = 'ANY'
#: Can be either string 'ANY', 'T7_PRO', 'T7', 'T4', or of enum :class:`DeviceType`.
device_type: Union[str, labjack.DeviceType] = "ANY"
class ConnectionType(AutoNumberNameEnum):
"""
LabJack connection type.
"""
ANY = ()
USB = ()
TCP = ()
......@@ -56,9 +54,9 @@ class LJMCommunicationConfig:
WIFI = ()
#: Can be either string or of enum :class:`ConnectionType`.
connection_type: (str, ConnectionType) = 'ANY'
connection_type: Union[str, ConnectionType] = "ANY"
identifier: str = 'ANY'
identifier: str = "ANY"
"""
The identifier specifies information for the connection to be used. This can
be an IP address, serial number, or device name. See the LabJack docs (
......@@ -71,10 +69,14 @@ identifier-parameter) for more information.
Performs value checks on device_type and connection_type.
"""
if not isinstance(self.device_type, self.DeviceType):
self.DeviceType(self.device_type)
self.force_value( # type: ignore
"device_type", self.DeviceType(self.device_type)
)
if not isinstance(self.connection_type, self.ConnectionType):
self.ConnectionType(self.connection_type)
self.force_value( # type: ignore
"connection_type", self.ConnectionType(self.connection_type)
)
class LJMCommunication(CommunicationProtocol):
......@@ -102,22 +104,21 @@ class LJMCommunication(CommunicationProtocol):
Open the communication port.
"""
self.logger.info('Open connection')
self.logger.info("Open connection")
# open connection and store handle
# may throw 1227 LJME_DEVICE_NOT_FOUND if device is not found
try:
with self.access_lock:
self._handle = ljm.openS(
str(self.config.device_type),
self.config.device_type.type_str,
str(self.config.connection_type),
str(self.config.identifier),
)
except ljm.LJMError as e:
self.logger.error(e)
# only catch "1229 LJME_DEVICE_ALREADY_OPEN", never observed
if e.errorCode == 1229:
return
if e.errorCode != 1229:
raise LJMCommunicationError from e
def close(self) -> None:
......@@ -125,7 +126,7 @@ class LJMCommunication(CommunicationProtocol):
Close the communication port.
"""
self.logger.info('Closing connection')
self.logger.info("Closing connection")
try:
with self.access_lock:
......@@ -133,9 +134,29 @@ class LJMCommunication(CommunicationProtocol):
except ljm.LJMError as e:
self.logger.error(e)
# only catch "1224 LJME_DEVICE_NOT_OPEN", thrown on invalid handle
if e.errorCode == 1224:
return
if e.errorCode != 1224:
raise LJMCommunicationError from e
self._handle = None
@property
def is_open(self) -> bool:
"""
Flag indicating if the communication port is open.
:return: `True` if the port is open, otherwise `False`
"""
# getHandleInfo does not work with LJM DEMO_MODE - consider it always opened
# if only set
if str(self._handle) == labjack.constants.DEMO_MODE: