import sys from time import sleep import nidaqmx from datetime import datetime from pylablib.devices import Thorlabs from pymeasure.experiment import Procedure, IntegerParameter, unique_filename, FloatParameter, Parameter from pymeasure.experiment import Results from pymeasure.display.Qt import QtWidgets from pymeasure.display.windows import ManagedWindow import logging from src.classes.controltab import ControlTab from src.classes.stage import Stage log = logging.getLogger('') log.addHandler(logging.NullHandler()) class MokeProcedure(Procedure): acq_time = FloatParameter('Aquisition time', units='s', default=5) time_step = FloatParameter('Time Step', units='ms', default=200) x_min = FloatParameter('X min', units='mm') x_max = FloatParameter('X max', units='mm') x_steps = IntegerParameter('X steps', default=10) y = FloatParameter('Y', units='mm') field = FloatParameter('Magnetic field', units='V') DATA_COLUMNS = ['X Position', 'Voltage'] def startup(self): log.info("Setting up the data aquisition") def execute(self): log.info("Starting the voltage acquisition") try: nidaqmx.Task() except: raise RuntimeError("Failed communicating with the DAC, aborting task") x_step_size = (self.x_max - self.x_min) / self.x_steps with nidaqmx.Task() as task_field: task_field.ao_channels.add_ao_voltage_chan("Dev1/ao1") task_field.write(self.field, timeout=10) sleep(1) # control_tab.stage.move_y_to(self.y) # Start sweeping loop for i in range(self.x_steps + 1): x = self.x_min + i * x_step_size # control_tab.stage.move_x_to(x) with nidaqmx.Task() as task: task.ai_channels.add_ai_voltage_chan("Dev1/ai0") data = { 'X Position': x, 'Voltage': task.read() } log.debug("Produced numbers: %s" % data) self.emit('results', data) self.emit('progress', 100 * i / self.iterations) sleep(self.delay / 1000.) if self.should_stop(): log.warning("Catch stop command in procedure") break task_field.write(0., timeout=10) task_field.stop() def shutdown(self): log.info("Finished acquisition") class MainWindow(ManagedWindow): def __init__(self): super().__init__( procedure_class=MokeProcedure, inputs=['acq_time', 'time_step', 'x_min', 'x_max', 'x_steps', 'y', 'field'], displays=['acq_time', 'time_step', 'x_min', 'x_max', 'x_steps', 'y', 'field'], x_axis='X Position', y_axis='Voltage', sequencer=True, widget_list=tuple([ControlTab("Manual control")]), directory_input=True ) self.setWindowTitle('Mini MOKE') self.directory = 'data' for device in Thorlabs.list_kinesis_devices(): log.info(f"Thorlab device connected: {device[1]} with serial number: {device[0]}") def queue(self, procedure=None): direc = self.directory + '/' + datetime.now().strftime('%Y-%m') file = unique_filename(direc, 'miniMoke') print(self.inputs) if procedure is None: procedure = self.make_procedure() results = Results(procedure, file) experiment = self.new_experiment(results) self.manager.queue(experiment) if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec())