Commit 76d21c07 authored by mikolajr's avatar mikolajr
Browse files

In ILS2T device: wait time config opts, clean config values, tests for config...

In ILS2T device: wait time config opts, clean config values, tests for config errors, refactor inline max int32, fixed type check errors, format (black), and copyright notice
parent c09db518
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Device class for controlling a Schneider Electric ILS2T stepper drive over modbus TCP.
"""
import logging
import numbers
from datetime import timedelta
from enum import Flag, IntEnum
from numbers import Integral
from time import sleep
from typing import Dict, List, Any
from typing import Dict, List, Any, Union, cast
import aenum
from bitstring import BitArray
......@@ -22,6 +24,8 @@ from ..comm import (
)
from ..configuration import configdataclass
Number = Union[float, int]
class ILS2TException(Exception):
"""
......@@ -78,7 +82,28 @@ class ILS2TConfig:
#: initial maximum RPM for the motor, can be set up to 3000 RPM. The user is
#: allowed to set a new max RPM at runtime using :meth:`ILS2T.set_max_rpm`,
#: but the value must never exceed this configuration setting.
rpm_max_init: int = 1500
rpm_max_init: Integral = cast(Integral, 1500)
wait_sec_post_enable: Number = 1
wait_sec_post_relative_step: Number = 2
wait_sec_post_absolute_position: Number = 2
def clean_values(self):
if not 0 < self.rpm_max_init <= 3000:
raise ValueError(
"Maximum RPM for the motor must be integer number between 1 and 3000."
)
if self.wait_sec_post_enable <= 0:
raise ValueError(
"Wait time post motor enabling must be a positive value (in seconds)."
)
if self.wait_sec_post_relative_step <= 0:
raise ValueError(
"Wait time post motor enabling must be a positive value (in seconds)."
)
if self.wait_sec_post_absolute_position <= 0:
raise ValueError(
"Wait time post motor enabling must be a positive value (in seconds)."
)
class ILS2T(SingleCommDevice):
......@@ -105,7 +130,8 @@ class ILS2T(SingleCommDevice):
=========== =========== ============== =============
"""
_init_ = 'min max'
_init_ = "min max"
INT32 = -2_147_483_648, 2_147_483_647
def is_in_range(self, value: int) -> bool:
......@@ -115,6 +141,7 @@ class ILS2T(SingleCommDevice):
"""
ILS2T Modbus Register Adresses
"""
POSITION = 7706 # INT32 position of the motor in user defined units
IO_SCANNING = 6922 # BITS start register for IO scanning control
# and status
......@@ -139,6 +166,7 @@ class ILS2T(SingleCommDevice):
"""
ILS2T device modes
"""
PTP = 3 # point to point
JOG = 1
......@@ -146,6 +174,7 @@ class ILS2T(SingleCommDevice):
"""
Allowed actions in the point to point mode (`ILS2T.Mode.PTP`).
"""
ABSOLUTE_POSITION = 0
RELATIVE_POSITION_TARGET = 1
RELATIVE_POSITION_MOTOR = 2
......@@ -161,6 +190,7 @@ class ILS2T(SingleCommDevice):
Allowed values for ILS2T ref_16 register (the shown values are the integer
representation of the bits), all in Jog mode = 1
"""
NONE = 0
POS = 1
NEG = 2
......@@ -173,22 +203,23 @@ class ILS2T(SingleCommDevice):
"""
State machine status values
"""
QUICKSTOP = 7
READY = 4
ON = 6
DEFAULT_IO_SCANNING_CONTROL_VALUES = {
'action': ActionsPtp.RELATIVE_POSITION_MOTOR.value,
'mode': Mode.PTP.value,
'disable_driver_di': 0,
'enable_driver_en': 0,
'quick_stop_qs': 0,
'fault_reset_fr': 0,
'execute_stop_sh': 0,
'reset_stop_ch': 0,
'continue_after_stop_cu': 0,
'ref_16': ILS2TConfig.rpm_max_init,
'ref_32': 0,
"action": ActionsPtp.RELATIVE_POSITION_MOTOR.value,
"mode": Mode.PTP.value,
"disable_driver_di": 0,
"enable_driver_en": 0,
"quick_stop_qs": 0,
"fault_reset_fr": 0,
"execute_stop_sh": 0,
"reset_stop_ch": 0,
"continue_after_stop_cu": 0,
"ref_16": ILS2TConfig.rpm_max_init,
"ref_32": 0,
}
"""
Default IO Scanning control mode values
......@@ -206,7 +237,8 @@ class ILS2T(SingleCommDevice):
# toggle reminder bit
self._mode_toggle_mt = 0
self.flt_list = []
self.flt_list: List[Dict[int, Dict[str, Any]]] = []
@staticmethod
def default_com_cls():
......@@ -231,8 +263,7 @@ class ILS2T(SingleCommDevice):
# writing 1 to register ACCESS_ENABLE allows to use the IO scanning mode.
# This is not documented in the manual!
self.com.write_registers(self.RegAddr.ACCESS_ENABLE.value,
[0, 1])
self.com.write_registers(self.RegAddr.ACCESS_ENABLE.value, [0, 1])
# set maximum RPM from init config
self.set_max_rpm(self.config.rpm_max_init)
......@@ -245,8 +276,7 @@ class ILS2T(SingleCommDevice):
logging.info("Stopping device " + str(self))
self.disable()
self.com.write_registers(self.RegAddr.ACCESS_ENABLE.value,
[0, 0])
self.com.write_registers(self.RegAddr.ACCESS_ENABLE.value, [0, 0])
super().stop()
def get_status(self) -> Dict[str, int]:
......@@ -256,8 +286,7 @@ class ILS2T(SingleCommDevice):
:return: dict with status information.
"""
registers = self.com \
.read_holding_registers(self.RegAddr.IO_SCANNING.value, 8)
registers = self.com.read_holding_registers(self.RegAddr.IO_SCANNING.value, 8)
return self._decode_status_registers(registers)
def do_ioscanning_write(self, **kwargs: int) -> None:
......@@ -271,8 +300,7 @@ class ILS2T(SingleCommDevice):
self._toggle()
values = self._generate_control_registers(**kwargs)
self.com.write_registers(self.RegAddr.IO_SCANNING.value,
values)
self.com.write_registers(self.RegAddr.IO_SCANNING.value, values)
def _generate_control_registers(self, **kwargs: int) -> List[int]:
"""
......@@ -286,21 +314,21 @@ class ILS2T(SingleCommDevice):
cleaned_io_scanning_mode = self._clean_ioscanning_mode_values(kwargs)
action_bits = '{0:03b}'.format(cleaned_io_scanning_mode['action'])
mode_bits = '{0:04b}'.format(cleaned_io_scanning_mode['mode'])
action_bits = "{0:03b}".format(cleaned_io_scanning_mode["action"])
mode_bits = "{0:04b}".format(cleaned_io_scanning_mode["mode"])
builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Big)
# add the first byte: Drive control
builder.add_bits(
[
cleaned_io_scanning_mode['disable_driver_di'],
cleaned_io_scanning_mode['enable_driver_en'],
cleaned_io_scanning_mode['quick_stop_qs'],
cleaned_io_scanning_mode['fault_reset_fr'],
cleaned_io_scanning_mode["disable_driver_di"],
cleaned_io_scanning_mode["enable_driver_en"],
cleaned_io_scanning_mode["quick_stop_qs"],
cleaned_io_scanning_mode["fault_reset_fr"],
0, # has to be 0 per default, no meaning
cleaned_io_scanning_mode['execute_stop_sh'],
cleaned_io_scanning_mode['reset_stop_ch'],
cleaned_io_scanning_mode['continue_after_stop_cu'],
cleaned_io_scanning_mode["execute_stop_sh"],
cleaned_io_scanning_mode["reset_stop_ch"],
cleaned_io_scanning_mode["continue_after_stop_cu"],
]
)
......@@ -320,15 +348,16 @@ class ILS2T(SingleCommDevice):
# add the third and fourth byte:
# Ref_16 (either JOG direction/speed, or RPM...)
builder.add_16bit_uint(cleaned_io_scanning_mode['ref_16'])
builder.add_16bit_uint(cleaned_io_scanning_mode["ref_16"])
# add 4 bytes Ref_32, Target position
builder.add_32bit_int(cleaned_io_scanning_mode['ref_32'])
builder.add_32bit_int(cleaned_io_scanning_mode["ref_32"])
return builder.to_registers()
def _clean_ioscanning_mode_values(self, io_scanning_values: Dict[str, int]) \
-> Dict[str, int]:
def _clean_ioscanning_mode_values(
self, io_scanning_values: Dict[str, int]
) -> Dict[str, int]:
"""
Checks if the constructed mode is valid.
......@@ -345,62 +374,63 @@ class ILS2T(SingleCommDevice):
all_keys = set(self.DEFAULT_IO_SCANNING_CONTROL_VALUES.keys())
superfluous_keys = io_scanning_keys.difference(all_keys)
if superfluous_keys:
raise ValueError("Unrecognized mode keys: {}".format(
list(superfluous_keys)))
raise ValueError(
"Unrecognized mode keys: {}".format(list(superfluous_keys))
)
# fill up io_scanning_values with defaults, if they are not set
for mode_key, default_value in self.DEFAULT_IO_SCANNING_CONTROL_VALUES.items():
if mode_key not in io_scanning_values:
io_scanning_values[mode_key] = default_value
io_scanning_values[mode_key] = cast(int, default_value)
# perform checks depending on mode
# JOG mode
if io_scanning_values['mode'] == self.Mode.JOG:
if io_scanning_values["mode"] == self.Mode.JOG:
if not io_scanning_values['action'] == self.ACTION_JOG_VALUE:
if not io_scanning_values["action"] == self.ACTION_JOG_VALUE:
raise IoScanningModeValueError(
'Wrong action: {}'.format(io_scanning_values['action'])
"Wrong action: {}".format(io_scanning_values["action"])
)
try:
self.Ref16Jog(io_scanning_values['ref_16'])
self.Ref16Jog(io_scanning_values["ref_16"])
except ValueError:
raise IoScanningModeValueError(
'Wrong value in ref_16 ({})'.format(io_scanning_values['ref_16'])
"Wrong value in ref_16 ({})".format(io_scanning_values["ref_16"])
)
if not io_scanning_values['ref_32'] == 0:
if not io_scanning_values["ref_32"] == 0:
raise IoScanningModeValueError(
'Wrong value in ref_32 ({})'.format(io_scanning_values['ref_32'])
"Wrong value in ref_32 ({})".format(io_scanning_values["ref_32"])
)
return io_scanning_values
# PTP mode
if io_scanning_values['mode'] == self.Mode.PTP:
if io_scanning_values["mode"] == self.Mode.PTP:
try:
self.ActionsPtp(io_scanning_values['action'])
self.ActionsPtp(io_scanning_values["action"])
except ValueError:
raise IoScanningModeValueError(
'Wrong action: {}'.format(io_scanning_values['action'])
"Wrong action: {}".format(io_scanning_values["action"])
)
if not self._is_valid_rpm(io_scanning_values['ref_16']):
if not self._is_valid_rpm(io_scanning_values["ref_16"]):
raise IoScanningModeValueError(
'Wrong value in ref_16 ({})'.format(io_scanning_values['ref_16'])
"Wrong value in ref_16 ({})".format(io_scanning_values["ref_16"])
)
if not self._is_int32(io_scanning_values['ref_32']):
if not self._is_int32(io_scanning_values["ref_32"]):
raise IoScanningModeValueError(
'Wrong value in ref_32 ({})'.format(io_scanning_values['ref_32'])
"Wrong value in ref_32 ({})".format(io_scanning_values["ref_32"])
)
return io_scanning_values
# default
raise IoScanningModeValueError(
'Wrong mode: {}'.format(io_scanning_values['mode'])
"Wrong mode: {}".format(io_scanning_values["mode"])
)
def _is_valid_rpm(self, num: int) -> bool:
......@@ -411,7 +441,7 @@ class ILS2T(SingleCommDevice):
:return: `True` if `num` is a valid RPM value, `False` otherwise
"""
return isinstance(num, numbers.Integral) and 0 < num <= self.config.rpm_max_init
return isinstance(num, Integral) and 0 < num <= self.config.rpm_max_init
@classmethod
def _is_int32(cls, num: int) -> bool:
......@@ -422,8 +452,7 @@ class ILS2T(SingleCommDevice):
:return: check result.
"""
return (isinstance(num, numbers.Integral)
and cls.RegDatatype.INT32.is_in_range(num))
return isinstance(num, Integral) and cls.RegDatatype.INT32.is_in_range(num)
@staticmethod
def _decode_status_registers(registers: List[int]) -> Dict[str, int]:
......@@ -436,32 +465,32 @@ class ILS2T(SingleCommDevice):
decoder = BinaryPayloadDecoder.fromRegisters(registers, byteorder=Endian.Big)
decoded = {
'drive_control': decoder.decode_bits(),
'mode_control': decoder.decode_bits(),
'ref_16': decoder.decode_16bit_int(),
'ref_32': decoder.decode_32bit_int(),
'drive_status_1': decoder.decode_bits(),
'drive_status_2': decoder.decode_bits(),
'mode_status': decoder.decode_bits(),
'drive_input': decoder.decode_bits(),
'action_word_1': decoder.decode_bits(),
'action_word_2': decoder.decode_bits(),
'special_function_1': decoder.decode_bits(),
'special_function_2': decoder.decode_bits(),
"drive_control": decoder.decode_bits(),
"mode_control": decoder.decode_bits(),
"ref_16": decoder.decode_16bit_int(),
"ref_32": decoder.decode_32bit_int(),
"drive_status_1": decoder.decode_bits(),
"drive_status_2": decoder.decode_bits(),
"mode_status": decoder.decode_bits(),
"drive_input": decoder.decode_bits(),
"action_word_1": decoder.decode_bits(),
"action_word_2": decoder.decode_bits(),
"special_function_1": decoder.decode_bits(),
"special_function_2": decoder.decode_bits(),
}
return {
'mode': BitArray(decoded['mode_status'][3::-1]).int,
'action': BitArray(decoded['mode_control'][6:3:-1]).int,
'ref_16': decoded['ref_16'],
'ref_32': decoded['ref_32'],
'state': BitArray(decoded['drive_status_2'][3::-1]).int,
'fault': decoded['drive_status_2'][6],
'warn': decoded['drive_status_2'][7],
'halt': decoded['drive_status_1'][0],
'motion_zero': decoded['action_word_2'][6],
'turning_positive': decoded['action_word_2'][7],
'turning_negative': decoded['action_word_1'][0],
"mode": BitArray(decoded["mode_status"][3::-1]).int,
"action": BitArray(decoded["mode_control"][6:3:-1]).int,
"ref_16": decoded["ref_16"],
"ref_32": decoded["ref_32"],
"state": BitArray(decoded["drive_status_2"][3::-1]).int,
"fault": decoded["drive_status_2"][6],
"warn": decoded["drive_status_2"][7],
"halt": decoded["drive_status_1"][0],
"motion_zero": decoded["action_word_2"][6],
"turning_positive": decoded["action_word_2"][7],
"turning_negative": decoded["action_word_1"][0],
}
def _toggle(self) -> None:
......@@ -469,7 +498,7 @@ class ILS2T(SingleCommDevice):
To activate a command it is necessary to toggle the MT bit first.
"""
self._mode_toggle_mt = (0 if self._mode_toggle_mt else 1)
self._mode_toggle_mt = 0 if self._mode_toggle_mt else 1
def relative_step(self, steps: int) -> None:
"""
......@@ -481,16 +510,18 @@ class ILS2T(SingleCommDevice):
:param steps: Number of steps to turn the motor.
"""
if not abs(steps) < 2_147_483_647:
logging.warning('number of steps is too big: {}'.format(steps))
max_step = self.RegDatatype.INT32.max # type: ignore
# use _is_int32 instead?
if not abs(steps) < max_step:
logging.warning("number of steps is too big: {}".format(steps))
logging.info('Perform number of steps: {}'.format(steps))
logging.info("Perform number of steps: {}".format(steps))
self.do_ioscanning_write(
enable_driver_en=1,
mode=self.Mode.PTP.value,
action=self.ActionsPtp.RELATIVE_POSITION_MOTOR.value,
ref_32=steps
ref_32=steps,
)
def absolute_position(self, position: int) -> None:
......@@ -501,16 +532,18 @@ class ILS2T(SingleCommDevice):
:param position: absolute position of motor in user defined steps.
"""
if not abs(position) < 2_147_483_647:
logging.warning('position is out of range: {}'.format(position))
max_position = self.RegDatatype.INT32.max # type: ignore
# use _is_int32 instead?
if not abs(position) < max_position:
logging.warning("position is out of range: {}".format(position))
logging.info('Absolute position: {}'.format(position))
logging.info("Absolute position: {}".format(position))
self.do_ioscanning_write(
enable_driver_en=1,
mode=self.Mode.PTP.value,
action=self.ActionsPtp.ABSOLUTE_POSITION.value,
ref_32=position
ref_32=position,
)
def relative_step_and_wait(self, steps: int) -> None:
......@@ -520,17 +553,17 @@ class ILS2T(SingleCommDevice):
:param steps: Number of steps.
"""
logging.info('Motor steps requested: {}'.format(steps))
logging.info("Motor steps requested: {}".format(steps))
position_before = self.get_position()
self.enable()
sleep(1)
sleep(self.config.wait_sec_post_enable)
self.relative_step(steps)
sleep(2)
sleep(self.config.wait_sec_post_relative_step)
while True:
if self.get_status()['motion_zero']:
if self.get_status()["motion_zero"]:
self.disable()
break
......@@ -539,13 +572,14 @@ class ILS2T(SingleCommDevice):
if position_before + steps != position_after:
flt_dict = self.get_error_code()
self.flt_list.append(flt_dict)
if 'empty' in flt_dict[0].keys():
logging.warning('no error in drive, '
'something different must have gone wrong')
if "empty" in flt_dict[0].keys():
logging.warning(
'The position does not align with the requested step '
'number. Before: {0}, after: {1}, requested: {2}, '
'real difference: {3}'.format(
"no error in drive, " "something different must have gone wrong"
)
logging.warning(
"The position does not align with the requested step "
"number. Before: {0}, after: {1}, requested: {2}, "
"real difference: {3}".format(
position_before,
position_after,
steps,
......@@ -555,11 +589,11 @@ class ILS2T(SingleCommDevice):
else:
# Despite drive error/malfunction don't break the code/experiment
# execution by raising an error; continuing as nothing happened.
logging.critical('error in drive, drive is know maybe locked')
logging.critical("error in drive, drive is know maybe locked")
logging.critical(
'The position does not align with the requested step '
'number. Before: {0}, after: {1}, requested: {2}, '
'real difference: {3}'.format(
"The position does not align with the requested step "
"number. Before: {0}, after: {1}, requested: {2}, "
"real difference: {3}".format(
position_before,
position_after,
steps,
......@@ -574,17 +608,17 @@ class ILS2T(SingleCommDevice):
:param position: absolute position of motor in user defined steps.
"""
logging.info('absolute position requested: {}'.format(position))
logging.info("absolute position requested: {}".format(position))
position_before = self.get_position()
self.enable()
sleep(1)
sleep(self.config.wait_sec_post_enable)
self.absolute_position(position)
sleep(2)
sleep(self.config.wait_sec_post_absolute_position)
while True:
if self.get_status()['motion_zero']:
if self.get_status()["motion_zero"]:
self.disable()
break
......@@ -593,27 +627,22 @@ class ILS2T(SingleCommDevice):
if position != position_after:
flt_dict = self.get_error_code()
self.flt_list.append(flt_dict)
if 'empty' in flt_dict[0].keys():
logging.warning('no error in drive, '
'something different must have gone wrong')
if "empty" in flt_dict[0].keys():
logging.warning(
'The position does not align with the requested absolute position. '
'Before: {0}, after: {1}, requested: {2}'
.format(
position_before,
position_after,
position,
"no error in drive, " "something different must have gone wrong"
)
logging.warning(
"The position does not align with the requested absolute position. "
"Before: {0}, after: {1}, requested: {2}".format(
position_before, position_after, position,
)
)
else:
logging.critical('error in drive, drive is know maybe locked')
logging.critical("error in drive, drive is know maybe locked")
logging.critical(
'The position does not align with the requested absolute position. '
'Before: {0}, after: {1}, requested: {2}'
.format(
position_before,
position_after,
position,
"The position does not align with the requested absolute position. "
"Before: {0}, after: {1}, requested: {2}".format(
position_before, position_after, position,
)
)
......@@ -622,11 +651,11 @@ class ILS2T(SingleCommDevice):
Disable the driver of the stepper motor and enable the brake.
"""
if self.get_status()['motion_zero']:
logging.info('Disable motor, brake.')
if self.get_status()["motion_zero"]:
logging.info("Disable motor, brake.")
self.do_ioscanning_write(enable_driver_en=0, disable_driver_di=1)
else:
logging.warning('Cannot disable motor, still running!')
logging.warning("Cannot disable motor, still running!")
def enable(self) -> None:
"""
......@@ -634,7 +663,7 @@ class ILS2T(SingleCommDevice):
"""
self.do_ioscanning_write(enable_driver_en=1, disable_driver_di=0)
logging.info('Enable motor, disable brake.')
logging.info("Enable motor, disable brake.")
def get_position(self) -> int:
"""
......@@ -643,8 +672,7 @@ class ILS2T(SingleCommDevice):
:return: Position step value
"""
value = self.com.read_input_registers(
self.RegAddr.POSITION.value, 2)
value = self.com.read_input_registers(self.RegAddr.POSITION.value, 2)
return self._decode_32bit(value, True)
def get_temperature(self) -> int:
......@@ -654,8 +682,7 @@ class ILS2T(SingleCommDevice):
:return: Temperature in degrees Celsius.
"""
value = self.com.read_input_registers(
self.RegAddr.TEMP.value, 2)
value = self.com.read_input_registers(self.RegAddr.TEMP.value, 2)
return self._decode_32bit(value, True)
def get_dc_volt(self) -> float:
......@@ -665,8 +692,7 @@ class ILS2T(SingleCommDevice):
:return: DC input voltage.
"""
value = self.com.read_input_registers(
self.RegAddr.VOLT.value, 2)
value = self.com.read_input_registers(self.RegAddr.VOLT.value, 2)
return self._decode_32bit(value, True) / 10
@staticmethod
......@@ -696,12 +722,12 @@ class ILS2T(SingleCommDevice):
"""
if not self._is_int32(revolutions):
err_msg = 'Wrong scaling factor: revolutions = {}'.format(revolutions)
err_msg = "Wrong scaling factor: revolutions = {}".format(revolutions)
logging.error(err_msg)
raise ScalingFactorValueError(err_msg)