import logging import numpy as np from time import sleep from pymeasure.experiment import Procedure, IntegerParameter, FloatParameter from ..classes.stage import Stage from ..classes.dac import DAC log = logging.getLogger(__name__) log.addHandler(logging.NullHandler()) class X_Sweep(Procedure): name = "X-Sweep" acq_time = FloatParameter('Aquisition time', units='s', default=1) freq = FloatParameter('Aquisition Frequency', units='Hz', default=50) x_min = FloatParameter('From x', units='mm', default=0) x_max = FloatParameter('To x', units='mm', default=5) x_step = FloatParameter('Step', units='mm', default=0.1) y = FloatParameter('Position y', units='mm', default=0) field = FloatParameter('Magnetic field', units='V', default=0) DATA_COLUMNS = ['Iteration', 'X Position', 'Y Position', 'Voltage', 'Magnetic Field [A]', 'Magnetic Field [mT]'] def startup(self): log.info("Setting up the data aquisition") def execute(self): log.info("Starting the voltage acquisition") voltage = DAC("ai0") field = DAC("ao1") x_positions = np.linspace(self.x_min, self.x_max, int((self.x_max-self.x_min)//self.x_step+1), endpoint=True) field.write(self.field) sleep(1) try: stage = Stage() stage.enable_connections() except: log.info("Close connection with motors before running an experiment, motors can not be used") stage.move_y_to(self.y) stage.move_x_to(self.x_min) stage.wait_stable() # Start sweeping loop for i in range(len(x_positions)): stage.move_x_to(x_positions[i]) average_voltage = np.arange(0, int(round(self.acq_time * self.freq)), dtype='float64') for step in range(len(average_voltage)): average_voltage[step] = voltage.read() sleep(1/self.freq) data = { 'Iteration': i, 'X Position': x_positions[i], 'Voltage': np.mean(average_voltage) } log.debug("Produced numbers: %s" % data) self.emit('results', data) self.emit('progress', 100 * i / len(x_positions)) if self.should_stop(): break field.write(0.) stage.close_connections() def shutdown(self): log.info("Finished acquisition")