To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit e0938b7b authored by mikolajr's avatar mikolajr
Browse files

In RTO1024 device: wait time config opts, refactor invalid config tests, fix...

In RTO1024 device: wait time config opts, refactor invalid config tests, fix type check errors, format (black), and copyright notice
parent 8aed4925
......@@ -59,12 +59,35 @@ class _RTO1024ConfigDefaultsBase(_VisaDeviceConfigDefaultsBase):
Timeout to wait for asynchronous commands to complete, in seconds. This timeout
is respected for long operations such as storing waveforms.
"""
wait_sec_short_pause: Number = 0.1
"""Time for short wait periods, in seconds (depends on both device and
network/connection)."""
wait_sec_enable_history: Number = 1
"""Time to wait after enabling history, in seconds."""
wait_sec_post_acquisition_start: Number = 2
"""Time to wait after start of continuous acquisition, in seconds."""
def clean_values(self):
super().clean_values()
if self.command_timeout_seconds <= 0:
raise ValueError("command_timeout_seconds needs to be positive.")
raise ValueError(
"Timeout to wait for asynchronous commands to complete must be a "
"positive value (in seconds)."
)
if self.wait_sec_short_pause <= 0:
raise ValueError(
"Wait time for a short pause must be a positive value (in seconds)."
)
if self.wait_sec_enable_history <= 0:
raise ValueError(
"Wait time for enabling history must be a positive value (in seconds)."
)
if self.wait_sec_post_acquisition_start <= 0:
raise ValueError(
"Wait time after acquisition start must be a positive value (in "
"seconds)."
)
@configdataclass
......@@ -85,7 +108,7 @@ class RTO1024VisaCommunicationConfig(VisaCommunicationConfig):
interface_type: Union[
str, VisaCommunicationConfig.InterfaceType
] = VisaCommunicationConfig.InterfaceType.TCPIP_INSTR
] = VisaCommunicationConfig.InterfaceType.TCPIP_INSTR # type: ignore
class RTO1024VisaCommunication(VisaCommunication):
......@@ -121,9 +144,6 @@ class RTO1024(VisaDevice):
return list(map(lambda x: x.name, cls))
#: time for short wait periods (depends on both device and network/connection)
SHORT_PAUSE_SECONDS = 0.1
@staticmethod
def config_cls():
return RTO1024Config
......@@ -346,7 +366,7 @@ class RTO1024(VisaDevice):
if isinstance(mode, str):
try:
mode = self.TriggerModes[mode.upper()]
mode = self.TriggerModes[mode.upper()] # type: ignore
except KeyError:
err_msg = (
f'"{mode}" is not an allowed trigger mode out of '
......@@ -443,7 +463,7 @@ class RTO1024(VisaDevice):
# enable history
self.com.write("CHAN:HIST ON")
sleep(1)
sleep(self.config.wait_sec_enable_history)
# turn off display
self.local_display(False)
......@@ -463,7 +483,7 @@ class RTO1024(VisaDevice):
# clear status, to get *OPC working
self.com.write("*CLS")
sleep(self.SHORT_PAUSE_SECONDS)
sleep(self.config.wait_sec_short_pause)
# play waveform to start exporting
self.com.write("CHANnel:HISTory:PLAY", "*OPC")
......@@ -524,11 +544,11 @@ class RTO1024(VisaDevice):
self.set_trigger_mode("AUTO")
# pause a little bit
sleep(self.SHORT_PAUSE_SECONDS)
sleep(self.config.wait_sec_short_pause)
# start acquisition and wait for two seconds
self.run_continuous_acquisition()
sleep(2)
sleep(self.config.wait_sec_post_acquisition_start)
# stop acquisition
self.stop_acquisition()
......@@ -543,7 +563,7 @@ class RTO1024(VisaDevice):
self.com.write("ACQuire:SEGMented:MAX ON")
# final pause to secure the state
sleep(self.SHORT_PAUSE_SECONDS)
sleep(self.config.wait_sec_short_pause)
def save_configuration(self, filename: str) -> None:
r"""
......@@ -612,7 +632,7 @@ class RTO1024(VisaDevice):
self.com.write(f"CHANnel:WAVeform:HISTory:CURRent {index}")
# wait until frame is loaded
sleep(self.SHORT_PAUSE_SECONDS)
sleep(self.config.wait_sec_short_pause)
# store relative timestamp
timestamps_relative.append(
......@@ -620,7 +640,7 @@ class RTO1024(VisaDevice):
)
# wait until timestamp is stored
sleep(self.SHORT_PAUSE_SECONDS)
sleep(self.config.wait_sec_short_pause)
# re-enable local display
self.local_display(True)
......
# Copyright (c) 2019 ETH Zurich, SIS ID and HVL D-ITET
#
"""
Tests for the R&S RTO1024 digital storage oscilloscope device class.
"""
import pytest
from hvl_ccb.dev import RTO1024, RTO1024Error, RTO1024VisaCommunicationConfig
from hvl_ccb.dev import (
RTO1024,
RTO1024Config,
RTO1024Error,
RTO1024VisaCommunicationConfig,
)
from tests.masked_comm import MaskedVisaCommunication
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def com_config():
return {
'interface_type': RTO1024VisaCommunicationConfig.InterfaceType.TCPIP_INSTR,
'host': '127.0.0.1',
'open_timeout': 10,
'timeout': 50,
"interface_type": RTO1024VisaCommunicationConfig.InterfaceType.TCPIP_INSTR,
"host": "127.0.0.1",
"open_timeout": 10,
"timeout": 50,
}
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def dev_config():
return {
'waveforms_path': 'C:\\Data\\DavidGraber\\02_waveforms',
'settings_path': 'C:\\Data\\DavidGraber\\01_settings',
'backup_path': 'D:\\backups',
'command_timeout_seconds': 0.05,
'spoll_interval': 0.01,
'spoll_start_delay': 0,
"waveforms_path": "C:\\Data\\DavidGraber\\02_waveforms",
"settings_path": "C:\\Data\\DavidGraber\\01_settings",
"backup_path": "D:\\backups",
"spoll_interval": 0.01,
"spoll_start_delay": 0,
"command_timeout_seconds": 0.05,
"wait_sec_short_pause": 0.01,
"wait_sec_short_pause": 0.01,
"wait_sec_post_acquisition_start": 0.01,
}
......@@ -38,162 +48,168 @@ def testdev(com_config, dev_config):
dev.SPOLL_START_DELAY = 0
assert dev is not None
dev.start()
assert dev.com.get_written() == '&GTR'
assert dev.com.get_written() == '*RST'
assert dev.com.get_written() == '*CLS'
assert dev.com.get_written() == 'SYST:DISP:UPD ON'
assert dev.com.get_written() == '*ESE 127'
assert dev.com.get_written() == "&GTR"
assert dev.com.get_written() == "*RST"
assert dev.com.get_written() == "*CLS"
assert dev.com.get_written() == "SYST:DISP:UPD ON"
assert dev.com.get_written() == "*ESE 127"
yield dev
dev.stop()
def test_instantiation(com_config, dev_config):
copy_config = dict(com_config)
copy_config.pop('interface_type')
dev = RTO1024(copy_config, dev_config)
assert dev is not None
dev_config_dict = dev_config
assert dev.config.spoll_interval == dev_config_dict['spoll_interval']
invalid_dev_config = dict(dev_config_dict)
invalid_dev_config['spoll_interval'] = 0
def test_dev_config(dev_config):
config = RTO1024Config(**dev_config)
for key, value in dev_config.items():
assert getattr(config, key) == value
@pytest.mark.parametrize(
"wrong_config_dict",
[
{"spoll_interval": 0},
{"spoll_interval": -1},
{"spoll_start_delay": -1},
{"command_timeout_seconds": 0},
{"wait_sec_short_pause": 0},
{"wait_sec_short_pause": -1},
{"wait_sec_enable_history": 0},
{"wait_sec_post_acquisition_start": 0},
],
)
def test_dev_config_invalid(dev_config, wrong_config_dict):
invalid_dev_config = dict(dev_config)
invalid_dev_config.update(wrong_config_dict)
with pytest.raises(ValueError):
RTO1024(com_config, invalid_dev_config)
RTO1024Config(**invalid_dev_config)
assert dev.config.spoll_start_delay == dev_config_dict['spoll_start_delay']
invalid_dev_config = dict(dev_config_dict)
invalid_dev_config['spoll_start_delay'] = -1
with pytest.raises(ValueError):
RTO1024(com_config, invalid_dev_config)
assert dev.config.command_timeout_seconds == dev_config_dict[
'command_timeout_seconds']
def test_instantiation(com_config, dev_config):
dev = RTO1024(com_config, dev_config)
assert dev is not None
invalid_dev_config = dict(dev_config_dict)
invalid_dev_config['command_timeout_seconds'] = 0
with pytest.raises(ValueError):
RTO1024(com_config, invalid_dev_config)
copy_config = dict(com_config)
copy_config.pop("interface_type")
dev = RTO1024(copy_config, dev_config)
assert dev is not None
def test_local_display(testdev: RTO1024):
testdev.local_display(True)
assert testdev.com.get_written() == 'SYST:DISP:UPD ON'
assert testdev.com.get_written() == "SYST:DISP:UPD ON"
testdev.local_display(False)
assert testdev.com.get_written() == 'SYST:DISP:UPD OFF'
assert testdev.com.get_written() == "SYST:DISP:UPD OFF"
def test_set_acquire_length(testdev: RTO1024):
testdev.set_acquire_length(10)
assert testdev.com.get_written() == 'TIMebase:RANGe 10'
assert testdev.com.get_written() == "TIMebase:RANGe 10"
def test_set_reference_point(testdev: RTO1024):
testdev.set_reference_point(50)
assert testdev.com.get_written() == 'TIMebase:REFerence 50'
assert testdev.com.get_written() == "TIMebase:REFerence 50"
def test_set_repetitions(testdev: RTO1024):
testdev.set_repetitions(10)
assert testdev.com.get_written() == 'ACQuire:COUNt 10'
assert testdev.com.get_written() == "ACQuire:COUNt 10"
def test_set_channel_state(testdev: RTO1024):
testdev.set_channel_state(2, True)
assert testdev.com.get_written() == 'CHANnel2:STATe ON'
assert testdev.com.get_written() == "CHANnel2:STATe ON"
testdev.set_channel_state(3, False)
assert testdev.com.get_written() == 'CHANnel3:STATe OFF'
assert testdev.com.get_written() == "CHANnel3:STATe OFF"
def test_set_channel_scale(testdev: RTO1024):
testdev.set_channel_scale(1, 1e-3)
assert testdev.com.get_written() == 'CHANnel1:SCALe 0.001'
assert testdev.com.get_written() == "CHANnel1:SCALe 0.001"
def test_set_channel_range(testdev: RTO1024):
testdev.set_channel_range(2, 1.5)
assert testdev.com.get_written() == 'CHANnel2:RANGe 1.500'
assert testdev.com.get_written() == "CHANnel2:RANGe 1.500"
def test_set_channel_position(testdev: RTO1024):
testdev.set_channel_position(4, 4)
assert testdev.com.get_written() == 'CHANnel4:POSition 4.00'
assert testdev.com.get_written() == "CHANnel4:POSition 4.00"
def test_set_trigger_source(testdev: RTO1024):
testdev.set_trigger_source(1) # to channel 1
assert testdev.com.get_written() == 'TRIGger1:SOURce CHAN1'
assert testdev.com.get_written() == "TRIGger1:SOURce CHAN1"
testdev.set_trigger_source(3, 2) # to channel 3, B-Event
assert testdev.com.get_written() == 'TRIGger2:SOURce CHAN3'
assert testdev.com.get_written() == "TRIGger2:SOURce CHAN3"
def test_set_trigger_level(testdev: RTO1024):
testdev.set_trigger_level(1, 2e-3)
assert testdev.com.get_written() == 'TRIGger1:LEVel1 0.002'
assert testdev.com.get_written() == "TRIGger1:LEVel1 0.002"
def test_set_trigger_mode(testdev: RTO1024):
testdev.set_trigger_mode(RTO1024.TriggerModes.AUTO)
assert testdev.com.get_written() == 'TRIGger1:MODE AUTO'
assert testdev.com.get_written() == "TRIGger1:MODE AUTO"
testdev.set_trigger_mode(RTO1024.TriggerModes.NORMAL)
assert testdev.com.get_written() == 'TRIGger1:MODE NORMAL'
assert testdev.com.get_written() == "TRIGger1:MODE NORMAL"
testdev.set_trigger_mode(RTO1024.TriggerModes.FREERUN)
assert testdev.com.get_written() == 'TRIGger1:MODE FREERUN'
assert testdev.com.get_written() == "TRIGger1:MODE FREERUN"
testdev.set_trigger_mode('Auto')
assert testdev.com.get_written() == 'TRIGger1:MODE AUTO'
testdev.set_trigger_mode("Auto")
assert testdev.com.get_written() == "TRIGger1:MODE AUTO"
testdev.set_trigger_mode('normal')
assert testdev.com.get_written() == 'TRIGger1:MODE NORMAL'
testdev.set_trigger_mode("normal")
assert testdev.com.get_written() == "TRIGger1:MODE NORMAL"
testdev.set_trigger_mode('FREERUN')
assert testdev.com.get_written() == 'TRIGger1:MODE FREERUN'
testdev.set_trigger_mode("FREERUN")
assert testdev.com.get_written() == "TRIGger1:MODE FREERUN"
with pytest.raises(RTO1024Error):
testdev.set_trigger_mode('myunknownmode')
testdev.set_trigger_mode("myunknownmode")
assert testdev.com.get_written() is None
def test_file_copy(testdev: RTO1024):
testdev.com.stb = 32
testdev.file_copy('C:\\Data\\test.txt', 'D:\\test.txt')
testdev.file_copy("C:\\Data\\test.txt", "D:\\test.txt")
testdev.com.stb = 0
assert testdev.com.get_written() == '*CLS'
assert (testdev.com.get_written() ==
"MMEMory:COPY 'C:\\Data\\test.txt', 'D:\\test.txt'")
assert testdev.com.get_written() == '*OPC'
assert testdev.com.get_written() == "*CLS"
assert (
testdev.com.get_written() == "MMEMory:COPY 'C:\\Data\\test.txt', 'D:\\test.txt'"
)
assert testdev.com.get_written() == "*OPC"
# test timeout
with pytest.raises(RTO1024Error):
testdev.file_copy('C:\\Data\\test.txt', 'D:\\test.txt')
testdev.file_copy("C:\\Data\\test.txt", "D:\\test.txt")
def test_backup_waveform(testdev: RTO1024):
testdev.com.stb = 32
testdev.backup_waveform('test')
assert testdev.com.get_written() == '*CLS'
testdev.backup_waveform("test")
assert testdev.com.get_written() == "*CLS"
assert (
testdev.com.get_written() ==
"MMEMory:COPY 'C:\\Data\\DavidGraber\\02_waveforms\\test.Wfm.bin', "
testdev.com.get_written()
== "MMEMory:COPY 'C:\\Data\\DavidGraber\\02_waveforms\\test.Wfm.bin', "
"'D:\\backups\\test.Wfm.bin'"
)
assert testdev.com.get_written() == '*OPC'
assert testdev.com.get_written() == '*CLS'
assert testdev.com.get_written() == "*OPC"
assert testdev.com.get_written() == "*CLS"
assert (
testdev.com.get_written() ==
"MMEMory:COPY 'C:\\Data\\DavidGraber\\02_waveforms\\test.bin', "
testdev.com.get_written()
== "MMEMory:COPY 'C:\\Data\\DavidGraber\\02_waveforms\\test.bin', "
"'D:\\backups\\test.bin'"
)
assert testdev.com.get_written() == '*OPC'
assert testdev.com.get_written() == "*OPC"
testdev.com.stb = 0
......@@ -201,14 +217,14 @@ def test_list_directory(testdev: RTO1024):
testdev.com.put_name(
"MMEMory:CATalog? 'C:/Data'",
'142422792,147771314176,".,DIR,0","..,DIR,0","DavidGraber,DIR,0","Myriam,DIR,'
'0","test.Wfm.bin,BIN,142422792"'
'0","test.Wfm.bin,BIN,142422792"',
)
dir_list = testdev.list_directory('C:/Data')
dir_list = testdev.list_directory("C:/Data")
expected = [
['DavidGraber', 'DIR', '0'],
['Myriam', 'DIR', '0'],
['test.Wfm.bin', 'BIN', '142422792']
["DavidGraber", "DIR", "0"],
["Myriam", "DIR", "0"],
["test.Wfm.bin", "BIN", "142422792"],
]
assert dir_list == expected
......@@ -218,58 +234,56 @@ def test_save_waveform_history(testdev: RTO1024):
testdev.com.put_name(
"MMEMory:CATalog? '{}'".format(testdev.config.waveforms_path),
'142422792,147771314176,".,DIR,0","..,DIR,0","DavidGraber,DIR,0","Myriam,DIR,'
'0","test.Wfm.bin,BIN,142422792","test.bin,BIN,1234"'
'0","test.Wfm.bin,BIN,142422792","test.bin,BIN,1234"',
)
testdev.com.stb = 32
testdev.save_waveform_history('test', 1)
testdev.save_waveform_history("test", 1)
testdev.com.stb = 0
assert (testdev.com.get_written() ==
'EXPort:WAVeform:FASTexport ON')
assert testdev.com.get_written() == 'CHAN:HIST ON'
assert testdev.com.get_written() == 'SYST:DISP:UPD OFF'
assert (testdev.com.get_written() ==
'EXPort:WAVeform:MULTichannel OFF')
assert (testdev.com.get_written() ==
'EXPort:WAVeform:SOURce C1W1')
assert (testdev.com.get_written() ==
"EXPort:WAVeform:NAME 'C:\\Data\\DavidGraber\\02_waveforms\\test.bin'")
assert testdev.com.get_written() == 'EXPort:WAVeform:DLOGging ON'
assert testdev.com.get_written() == '*CLS'
assert testdev.com.get_written() == 'CHANnel:HISTory:PLAY'
assert testdev.com.get_written() == '*OPC'
assert (testdev.com.get_written() ==
'EXPort:WAVeform:FASTexport OFF')
assert testdev.com.get_written() == 'SYST:DISP:UPD ON'
assert testdev.com.get_written() == "EXPort:WAVeform:FASTexport ON"
assert testdev.com.get_written() == "CHAN:HIST ON"
assert testdev.com.get_written() == "SYST:DISP:UPD OFF"
assert testdev.com.get_written() == "EXPort:WAVeform:MULTichannel OFF"
assert testdev.com.get_written() == "EXPort:WAVeform:SOURce C1W1"
assert (
testdev.com.get_written()
== "EXPort:WAVeform:NAME 'C:\\Data\\DavidGraber\\02_waveforms\\test.bin'"
)
assert testdev.com.get_written() == "EXPort:WAVeform:DLOGging ON"
assert testdev.com.get_written() == "*CLS"
assert testdev.com.get_written() == "CHANnel:HISTory:PLAY"
assert testdev.com.get_written() == "*OPC"
assert testdev.com.get_written() == "EXPort:WAVeform:FASTexport OFF"
assert testdev.com.get_written() == "SYST:DISP:UPD ON"
# with timeout
with pytest.raises(RTO1024Error):
testdev.save_waveform_history('test', 1)
testdev.save_waveform_history("test", 1)
# with "file not found"
testdev.com.put_name(
"MMEMory:CATalog? '{}'".format(testdev.config.waveforms_path),
'142422792,147771314176,".,DIR,0","..,DIR,0","DavidGraber,DIR,0","Myriam,DIR,'
'0"'
'0"',
)
testdev.com.stb = 32
with pytest.raises(RTO1024Error):
testdev.save_waveform_history('test', 1)
testdev.save_waveform_history("test", 1)
testdev.com.stb = 0
def test_run_continuous_acquisition(testdev: RTO1024):
testdev.run_continuous_acquisition()
assert testdev.com.get_written() == 'RUN'
assert testdev.com.get_written() == "RUN"
def test_run_single_acquisition(testdev: RTO1024):
testdev.run_single_acquisition()
assert testdev.com.get_written() == 'SINGle'
assert testdev.com.get_written() == "SINGle"
def test_stop_acquisition(testdev: RTO1024):
testdev.stop_acquisition()
assert testdev.com.get_written() == 'STOP'
assert testdev.com.get_written() == "STOP"
def test_prepare_ultra_segmentation(testdev: RTO1024):
......@@ -277,37 +291,37 @@ def test_prepare_ultra_segmentation(testdev: RTO1024):
def test_save_configuration(testdev: RTO1024):
testdev.save_configuration('test_configuration')
assert testdev.com.get_written() == '*SAV 49'
testdev.save_configuration("test_configuration")
assert testdev.com.get_written() == "*SAV 49"
assert (
testdev.com.get_written() ==
"MMEMory:STOR:STAT 49,'C:\\Data\\DavidGraber\\01_settings\\test_configuration"
testdev.com.get_written()
== "MMEMory:STOR:STAT 49,'C:\\Data\\DavidGraber\\01_settings\\test_configuration"
".dfl'"
)
def test_load_configuration(testdev: RTO1024):
testdev.load_configuration('test_configuration')
testdev.load_configuration("test_configuration")
assert (
testdev.com.get_written() ==
"MMEMory:LOAD:STAT 41,'C:\\Data\\DavidGraber\\01_settings\\test_configuration"
testdev.com.get_written()
== "MMEMory:LOAD:STAT 41,'C:\\Data\\DavidGraber\\01_settings\\test_configuration"
".dfl'"
)
assert testdev.com.get_written() == '*RCL 41'
assert testdev.com.get_written() == "*RCL 41"
def test_get_timestamps(testdev: RTO1024):
testdev.com.put_name('ACQuire:AVAilable?', '3')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '-1.24')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '-0.87')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '0')
testdev.com.put_name("ACQuire:AVAilable?", "3")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "-1.24")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "-0.87")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "0")
ts = testdev.get_timestamps()
assert ts == [-1.24, -0.87, 0]
# with read errors
testdev.com.put_name('ACQuire:AVAilable?', '3')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '-1.24')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '-1.24')
testdev.com.put_name('CHANnel:WAVeform:HISTory:TSRelative?', '0')
testdev.com.put_name("ACQuire:AVAilable?", "3")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "-1.24")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "-1.24")
testdev.com.put_name("CHANnel:WAVeform:HISTory:TSRelative?", "0")
with pytest.raises(RTO1024Error):
testdev.get_timestamps()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment