Source code for silq.parameters.measurement_parameters

import numpy as np
import logging
from collections import Iterable
import numbers
from scipy.interpolate import interp1d

import qcodes as qc
from qcodes import Instrument
from qcodes.loops import active_dataset, Loop, BreakIf
from qcodes.data import hdf5_format
from qcodes.instrument.parameter import MultiParameter

from silq import config
from silq.tools.general_tools import SettingsClass, clear_single_settings, \
    attribute_from_config, convert_setpoints, property_ignore_setter
from silq.tools.parameter_tools import create_set_vals
from silq.measurements.measurement_types import Loop0DMeasurement, \
    Loop1DMeasurement, Loop2DMeasurement, ConditionSet, TruthCondition
from silq.measurements.measurement_modules import MeasurementSequence
from silq.parameters import DCParameter, CombinedParameter

__all__ = ['MeasurementParameter', 'RetuneBlipsParameter',
           'CoulombPeakParameter', 'DCMultisweepParameter',
           'MeasurementSequenceParameter', 'SelectFrequencyParameter',
           'TrackPeakParameter']

logger = logging.getLogger(__name__)

properties_config = config.get('properties', {})
parameter_config = config.properties.get('parameters', {})
measurement_config = config.get('measurements', {})


[docs]class MeasurementParameter(SettingsClass, MultiParameter): """Base class for parameters that perform measurements. A `MeasurementParameter` usually consists of several acquisitions, which it uses for complex sequences. A `MeasurementParameter` usually uses a ``qcodes.Loop`` or a ``qcodes.Measure`` or several in succession. The results in the ``DataSet`` are analysed and often some post-action is performed. Example: An example of a `MeasurementParameter` is a retuning sequence, which uses an `AcquisitionParameter`, and from that determines how much voltages have to be modified to retune the system (e.g. `RetuneBlipsParameter`). Note: The MeasurementParameter needs to be updated. It was originally created to be used with the `MeasurementSequence`, but this class turned out to be too rigid. Instead, measurements should be programmed by subclassing the MeasurementParameter. Args: Name: Parameter name acquisition_parameter: Acquisition_parameter to use. Fails in case of multiple or no acquisition parameters discriminant: data array in dataset to discriminate. Fails if there is no single discriminant. Parameters: silent (str): Print results during .get() Todo: * Clean up MeasurementParameter, remove attributes ``MeasurementParameter.discriminant`` and ``MeasurementParameter.acquisition_parameter``. """ layout = None def __init__(self, name, acquisition_parameter=None, discriminant=None, silent=True, **kwargs): SettingsClass.__init__(self) MultiParameter.__init__(self, name, snapshot_value=False, **kwargs) if self.layout is None: try: MeasurementParameter.layout = Instrument.find_instrument( 'layout') except KeyError: logger.warning(f'No layout found for {self}') self.discriminant = discriminant self.silent = silent self.acquisition_parameter = acquisition_parameter self._meta_attrs.extend(['acquisition_parameter_name']) def __repr__(self): return f'{self.name} measurement parameter' def __getattribute__(self, item): try: return super().__getattribute__(item) except AttributeError: return attribute_from_config(item, config.properties) @property def loc_provider(self): if self.base_folder is None: fmt = '{date}/#{counter}_{name}_{time}' else: fmt = self.base_folder + '/#{counter}_{name}_{time}' return qc.data.location.FormatLocation(fmt=fmt) @property def acquisition_parameter_name(self): return self.acquisition_parameter.name @property def base_folder(self): """ Obtain measurement base folder (if any). Returns: If in a measurement, the base folder is the relative path of the data folder. Otherwise None """ active_dataset = active_dataset() if active_dataset is None: return None elif getattr(active_dataset, 'location', None): return active_dataset.location elif hasattr(active_dataset, '_location'): return active_dataset._location @property def discriminant(self): if self._discriminant is not None: return self._discriminant else: return self.acquisition_parameter.name @discriminant.setter def discriminant(self, val): self._discriminant = val @property def discriminant_idx(self): if self.acquisition_parameter.name is not None: return self.acquisition_parameter.names.index(self.discriminant) else: return None
[docs] def print_results(self): if getattr(self, 'names', None) is not None: for name, result in self.results.items(): if isinstance(result, numbers.Number): print(f'{name}: {result:.3f}') else: print(f'{name}: {result}') elif hasattr(self, 'results'): print(f'{self.name}: {self.results[self.name]:.3f}')
[docs]class RetuneBlipsParameter(MeasurementParameter): """Parameter that retunes by analysing blips using a neural network The first (optional) stage is to use a CoulombPeakParameter to find the center of the Coulomb peak. Second, a sweep parameter is varied for a range of sweep values. For each sweep point, a trace is acquired, and its blips measured in a BlipsParameter. This information is then analysed by a neural network, from which the optimal tuning position is predicted. The Neural network is a Keras model that needs to be pre-trained with data. More info: Experiments/personal/Serwan/Neural networks/Retune blips.ipynb The following Neural Network seems to produce decent results: >>> model = Sequential() >>> model.add(Dense(3, activation='linear', input_shape=(21,3))) >>> model.add(Flatten()) >>> model.add(Dense(1, activation='linear')) """ def __init__(self, name='retune_blips', coulomb_peak_parameter=None, blips_parameter=None, sweep_parameter=None, sweep_vals=None, tune_to_coulomb_peak=True, tune_to_optimum=True, optimum_DC_offset=None, model_filepath=None, voltage_limit=None, optimum_method='neural_network', **kwargs): """ Args: name: name of parameter (default `retune_blips`) coulomb_peak_parameter: CoulombPeakParameter that tunes to the center of the Coulomb peak. Should have all its settings predefined, including sweep_range, and DC_offset blips_parameter: BlipsParameter that measures blips properties from a trace, to be analysed by a neural network. Should have all its settings predefined, including duration sweep_parameter: sweep CombinedParameter with scales defined to remain compensated on the Coulomb peak. The offsets must be set such that zero approximately corresponds to the tuning position. sweep_vals: Range of sweep values to sweep the sweep_parameter and measure blips_parameter. It is important that both the number of sweep values and distance between sweep points is the same as in the training data. tune_to_coulomb_peak: Whether to use the coulomb_peak_parameter. Otherwise, it will always assume the system remains on the Coulomb peak tune_to_optimum: Tune to optimum values predicted by neural network. If True, the offsets of the sweep parameter are also zeroed. optimum_DC_offset: Additional offset to optimum determined from neural network model. Used if the model is not trained properly and has a constant offset in predictions. If single value, combined parameter will be offset. If tuple, offsets are per parameter in combined parameter. model_filepath: Filepath of Keras neural network (.h5 extension). voltage_limit: Maximum voltage difference from initial offsets to avoid the system . The first time this parameter is called, the sweep parameter's initial offsets are stored. If one of the optimal values is more than voltage_limit away from its initial offset, a warning is raised and the gates are returned to the initial offsets. **kwargs: kwargs passed to MeasurementParameter """ # Load model here because it takes quite a while to load from keras.models import load_model import tensorflow as tf super().__init__(name=name, names=['optimal_vals', 'offsets'], units=['V', 'V'], shapes=((), ()), **kwargs) self.sweep_parameter = sweep_parameter self.coulomb_peak_parameter = coulomb_peak_parameter self.blips_parameter = blips_parameter self.sweep_vals = sweep_vals self.tune_to_coulomb_peak = tune_to_coulomb_peak self.tune_to_optimum = tune_to_optimum self.optimum_DC_offset = optimum_DC_offset self.initial_offsets = None self.voltage_limit = voltage_limit assert optimum_method in ['neural_network', 'max_blips'] self.optimum_method = optimum_method self.model_filepath = model_filepath if model_filepath is not None: self.model = load_model(self.model_filepath) self.model._make_predict_function() self.graph = tf.get_default_graph() else: logger.warning(f'No neural network model loaded for {self}') self.model = None self.continuous = True self.results = {} # Tools to only execute every nth call self.every_nth = None self.idx = 0 self._meta_attrs.extend(['sweep_vals', 'tune_to_coulomb_peak', 'tune_to_optimum', 'initial_offsets', 'voltage_limit', 'every_nth']) @property_ignore_setter def shapes(self): if isinstance(self.sweep_parameter, CombinedParameter): shape = (len(self.sweep_parameter.parameters),) else: shape = () return (shape,) * len(self.names)
[docs] def create_loop(self): """ Create loop that sweep sweep_parameter over sweep_vals and measures blips_parameter at each sweep point. Returns: loop """ loop = Loop(self.sweep_parameter[self.sweep_vals]).each( self.blips_parameter) return loop
[docs] def calculate_optimum(self): """ Calculate optimum of dataset from neural network Returns: optimal voltage of combined set parameter """ blips_per_second = self.data.blips_per_second.ndarray blips_per_second = np.nan_to_num(blips_per_second) mean_low_blip_duration = self.data.mean_low_blip_duration.ndarray mean_low_blip_duration = np.nan_to_num(mean_low_blip_duration) mean_high_blip_duration = self.data.mean_high_blip_duration.ndarray mean_high_blip_duration = np.nan_to_num(mean_high_blip_duration) if self.optimum_method == 'max_blips': if not np.nanmax(blips_per_second): return None else: max_idx = np.nanargmax(blips_per_second) return self.sweep_vals[max_idx] if self.model is None: logger.warning('No Neural network model provided. skipping retune') return None if len(blips_per_second) != 21: raise RuntimeError( f'Must have 21 sweep vals, not {len(blips_per_second)}') data = np.zeros((len(blips_per_second), 3)) # normalize data # Blips per second gets a gaussian normalization if np.std(blips_per_second): data[:, 0] = (blips_per_second - np.mean(blips_per_second)) / np.std( blips_per_second) else: return None # blip durations get a logarithmic normalization, since the region # of interest has a low value log_offset = 1e-3 # add offset since otherwise log(0) raises an error data[:, 1] = np.log10(mean_low_blip_duration + log_offset) data[:, 2] = np.log10(mean_high_blip_duration + log_offset) # Center logarithmic blip durations around zero for k in range(1, 3): data[:, k] += 1.5 data = np.expand_dims(data, 0) # Predict optimum value try: with self.graph.as_default(): self.neural_network_results = self.model.predict(data)[0, 0] except Exception as e: import traceback logger.error(traceback.print_exc()) return # Scale results # Neural network output is between -1 (sweep_vals[0]) and +1 (sweep_vals[-1]) scale_factor = (self.sweep_vals[-1] - self.sweep_vals[0]) / 2 self.optimal_val = self.neural_network_results * scale_factor return self.optimal_val
[docs] @clear_single_settings def get_raw(self): assert self.model_filepath is not None, "Must provide model filepath" self.idx += 1 if (self.every_nth is not None and (self.idx - 1) % self.every_nth != 0 and self.results is not None): logger.debug(f'skipping iteration {self.idx} % {self.every_nth}') # Skip this iteration, return old results return [self.results[name] for name in self.names] initial_set_val = self.sweep_parameter() initial_offsets = self.sweep_parameter.offsets if self.initial_offsets is None: # Set initial offsets to ensure tuning does not reach out of bounds self.initial_offsets = initial_offsets # Get Coulomb peak if self.tune_to_coulomb_peak: self.coulomb_peak_parameter() self.loop = self.create_loop() self.data = self.loop.get_data_set(name='count_blips', location=self.loc_provider) try: self.loop.run(set_active=False, quiet=True) finally: self.sweep_parameter(initial_set_val) optimum = self.calculate_optimum() if optimum is None: tune_to_optimum = False optimal_vals = initial_offsets offsets = [0] * len(initial_offsets) else: optimal_vals = self.sweep_parameter.calculate_individual_values(optimum) offsets = [offset - initial_offset for offset, initial_offset in zip(optimal_vals, initial_offsets)] if not self.tune_to_optimum: tune_to_optimum = False elif self.voltage_limit is None: tune_to_optimum = True else: voltage_differences = np.array(self.initial_offsets) - optimal_vals if max(abs(voltage_differences)) < self.voltage_limit: tune_to_optimum = True else: logging.warning(f'tune voltage {optimal_vals} outside ' f'range, tuning back to initial value') # self.sweep_parameter.offsets = self.initial_offsets # self.sweep_parameter(0) tune_to_optimum = False if tune_to_optimum: self.sweep_parameter(optimum) self.sweep_parameter.zero_offset() if isinstance(self.optimum_DC_offset, float): self.sweep_parameter(self.optimum_DC_offset) elif self.optimum_DC_offset: for parameter, offset in zip(self.sweep_parameter.parameters, self.optimum_DC_offset): parameter(parameter() + offset) self.results = { 'optimal_vals': optimal_vals, 'offsets': offsets } return [self.results[name] for name in self.names]
[docs]class CoulombPeakParameter(MeasurementParameter): """ Parameter that finds Coulomb peak and can tune to it. Finding the Coulomb peak is done by sweeping a gate and measuring the DC voltage at each point. """ def __init__(self, name='coulomb_peak', sweep_parameter=None, acquisition_parameter=None, combined_set_parameter=None, DC_peak_offset=None, tune_to_peak=True, min_voltage=0.5, interpolate: bool = True, **kwargs): """ Args: name: name of parameter (default coulomb_peak) sweep_parameter: gate parameter to sweep over Coulomb peak acquisition_parameter: parameter that measures DC voltage. If not provided, a default DCParameter is created. combined_set_parameter: CombinedParameter of gate parameters whose scale must be set to remain compensated on the Coulomb peak. Only needed if a DC_peak_offset is provided. Offsets must be set such that DC_peak_offset is relative to zero. DC_peak_offset: Any DC peak offset to set the combined_set_parameter to. Useful if you want to tune away from the transition when performing Coulomb peak scan. tune_to_peak: Tune to peak after scan is complete. If a combined_set_parameter and DC_peak_offset is provided, the offsets are zeroed after the scan. Otherwise, sweep_parameter is set to optimum min_voltage: Minimum voltage that Coulomb peak must have. If not satisfied, measurement has failed, and system is reset to initial value. **kwargs: kwargs passed to MeasurementParameter """ if acquisition_parameter is None: acquisition_parameter = DCParameter() self.sweep_parameter = sweep_parameter self.sweep = {'range': [], 'step_percentage': None, 'num': None} self.min_voltage = min_voltage self.combined_set_parameter = combined_set_parameter self.DC_peak_offset = DC_peak_offset self.tune_to_peak = tune_to_peak self.interpolate = interpolate self.continuous = True self.results = {} super().__init__(name=name, names=['peak_optimum', 'peak_offset', 'max_voltage', 'DC_voltage'], units=['V', 'V', 'V', 'V'], shapes=((), (), (), ()), acquisition_parameter=acquisition_parameter, wrap_set=False, **kwargs) self._meta_attrs += ['min_voltage']
[docs] def calculate_sweep_vals(self): if self.sweep_parameter is None: return None elif self.sweep.get('range', None): return self.sweep_parameter[self.sweep['range']] elif self.sweep['step_percentage'] and self.sweep['num']: return self.sweep_parameter.sweep( step_percentage=self.sweep['step_percentage'], num=self.sweep['num']) else: return None
@property_ignore_setter def names(self): sweep_vals = self.calculate_sweep_vals() if sweep_vals is not None: sweep_parameter = sweep_vals.parameter return [f'{sweep_parameter.name}_optimum', f'{sweep_parameter.name}_offset', 'max_voltage', 'DC_voltage'] else: return ['peak_optimum', 'peak_offset', 'max_voltage', 'DC_voltage'] @property_ignore_setter def shapes(self): sweep_vals = self.calculate_sweep_vals() if sweep_vals is not None: return ((), (), (), (len(sweep_vals),)) else: return ((), (), (), ())
[docs] def create_loop(self, sweep_vals): if sweep_vals is None: raise RuntimeError('Must define self.sweep_vals') loop = Loop(sweep_vals).each(self.acquisition_parameter) return loop
[docs] @clear_single_settings def get_raw(self): if self.DC_peak_offset is not None: if self.combined_set_parameter is None: raise RuntimeError('Must specify combined_set_parameter') # Add DC offset self.combined_set_parameter(self.DC_peak_offset) # Calculate sweep vals initial_set_val = self.sweep_parameter() sweep_vals = self.calculate_sweep_vals() self.loop = self.create_loop(sweep_vals=sweep_vals) self.data = self.loop.get_data_set(name=self.name, location=self.loc_provider) # Perform measurement if ( self.layout.pulse_sequence != self.acquisition_parameter.pulse_sequence or self.layout.samples() != self.acquisition_parameter.samples): self.acquisition_parameter.setup() try: self.loop.run(set_active=False, quiet=True, stop=not self.continuous) except: # Error occurred, reset to initial values and raise error if self.DC_peak_offset is None: self.sweep_parameter(initial_set_val) else: self.combined_set_parameter(0) raise # Analyse measurement results if self.min_voltage is not None and np.max( self.data.DC_voltage) < self.min_voltage: # Could not find coulomb peak self.results[self.names[0]] = np.nan self.results[self.names[1]] = np.nan # Tune back to original position if self.DC_peak_offset is None: self.sweep_parameter(initial_set_val) else: self.combined_set_parameter(0) else: # Found coulomb peak # Perform some smoothing of data self.smoothed_arr = self.data.DC_voltage.smooth(5) if self.interpolate: self.interpolation = interp1d(sweep_vals, self.smoothed_arr, 'cubic') self.interpolation_x_vals = np.linspace(sweep_vals[0], sweep_vals[-1], 100) self.interpolation_y_vals = self.interpolation( self.interpolation_x_vals) max_idx = np.argmax(self.interpolation_y_vals) max_set_val = self.interpolation_x_vals[max_idx] else: max_idx = np.argmax(self.smoothed_arr) max_set_val = sweep_vals[max_idx] self.results[self.names[0]] = max_set_val self.results[self.names[1]] = max_set_val - initial_set_val if self.tune_to_peak: # Update parameter values if self.DC_peak_offset is None: self.sweep_parameter(max_set_val) else: # Update combined_set_parameter offset peak_offset = max_set_val - initial_set_val sweep_parameter_idx = next( index for index, item in enumerate(self.combined_set_parameter.parameters) if item is self.sweep_parameter) self.combined_set_parameter.offsets[ sweep_parameter_idx] += peak_offset self.combined_set_parameter(0) self.results['max_voltage'] = np.max(self.data.DC_voltage) self.results['DC_voltage'] = self.data.DC_voltage return [self.results[name] for name in self.names]
class MeasureFlipNucleusParameter(MeasurementParameter): def __init__(self, name='measure_flip_nucleus', measure_nucleus_parameter=None, flip_nucleus_parameter=None, max_attempts=3, target_state=None, final_measure=False, silent=True, condition=None, **kwargs): self.measure_nucleus_parameter = measure_nucleus_parameter self.flip_nucleus_parameter = flip_nucleus_parameter self.max_attempts = max_attempts self.target_state = target_state self.final_measure = final_measure self.silent = silent self.condition = condition self.results = {} super().__init__(name=name, names=self.names, shapes=self.shapes, wrap_set=False, **kwargs) @property_ignore_setter def names(self): names = ['nuclear_states', 'nucleus_up_proportions', 'nucleus_pulse_sequences', 'nucleus_flip_success'] if self.final_measure: names.append('final state') return names @property_ignore_setter def shapes(self): shapes = [(self.max_attempts,), (self.max_attempts, len(self.measure_nucleus_parameter.frequency_vals)), (self.max_attempts,), ()] if self.final_measure: shapes.append(()) return tuple(shapes) @clear_single_settings def get_raw(self): if self.target_state is None: raise SyntaxError('Must provide MeasureFlipNucleusParameter.target_state') nuclear_states = np.nan * np.ones(self.max_attempts) nucleus_up_proportions = np.nan * np.ones(( self.max_attempts, len(self.measure_nucleus_parameter.frequency_vals))) nucleus_pulse_sequences = np.nan * np.ones(self.max_attempts) nucleus_flip_success = False nucleus_state = np.nan if self.condition is None or self.condition(): for k in range(self.max_attempts): self.measure_nucleus_parameter() results = self.measure_nucleus_parameter.results nucleus_state = nuclear_states[k] = results['nucleus_state'] nucleus_up_proportions[k] = next(val for key, val in results.items() if key.startswith('nucleus_up_proportion')) if not results['found_nucleus_state']: logger.info('Could not determine nucleus state. perhaps in a ' 'state for which ESR frequency is not known') continue if nucleus_state == self.target_state: logger.info(f'Nucleus is in target state {self.target_state}') nucleus_flip_success = True break else: logger.info(f'Flipping nucleus from {nucleus_state} ' f'to {self.target_state}') self.flip_nucleus_parameter(nucleus_state, self.target_state) else: nucleus_flip_success = False if not self.silent: self.print_results() self.results = {'nuclear_states': nuclear_states, 'nucleus_up_proportions': nucleus_up_proportions, 'nucleus_pulse_sequences': nucleus_pulse_sequences, 'nucleus_flip_success': nucleus_flip_success, 'final_state': nucleus_state} return [self.results[name] for name in self.names] def set(self, target_state, **kwargs): self.single_settings(target_state=target_state, **kwargs) return self() class MeasureNucleusParameter(MeasurementParameter): def __init__(self, name='measure_nucleus', discriminant='contrast_ESR', acquisition_parameter=None, frequency_set_parameter=None, frequency_vals=None, frequency_order='descending', threshold=0.5, samples=None, break_if_satisfied=True): super().__init__(name=name, acquisition_parameter=acquisition_parameter, names=['found_nucleus_state', 'nucleus_state', 'max_nucleus_' + discriminant, 'nucleus_' + discriminant, 'average_' + discriminant], discriminant=discriminant, shapes=((), (), (), (), ()), wrap_set=False) self.frequency_set_parameter = frequency_set_parameter self.frequency_vals = frequency_vals self.frequency_order = frequency_order self.threshold = threshold self.break_if_satisfied = break_if_satisfied self.samples = samples self.continuous = False self.silent = True self.results = {} self.loop = None self.data = None @property_ignore_setter def labels(self): return [name.replace('_', ' ').capitalize() for name in self.names] @property_ignore_setter def shapes(self): return ((), (), (), (len(self.frequency_vals), ), ()) @property def frequency_vals(self): if self._frequency_vals is not None: frequency_vals = self._frequency_vals else: frequency_vals = {int(key): val for key, val in config[f'environment:properties.ESR_vals'].items()} return frequency_vals @frequency_vals.setter def frequency_vals(self, vals): if vals is not None and not isinstance(vals, dict): raise SyntaxError('frequency_vals must be None or dict') else: self._frequency_vals = vals @property def frequency_order(self): frequency_vals = self.frequency_vals if self._frequency_order == 'descending': frequency_order = sorted(frequency_vals.keys(), reverse=True) elif self._frequency_order == 'ascending': frequency_order = sorted(frequency_vals.keys(), reverse=False) else: # Frequency order is a list frequency_order = self._frequency_order return frequency_order @frequency_order.setter def frequency_order(self, order): if isinstance(order, str) and order in ['ascending', 'descending']: self._frequency_order = order elif isinstance(order, Iterable): self._frequency_order = list(order) else: raise TypeError('frequency order must either be an iterable, ' 'or `ascending` or `descending`') @property def frequency_vals_sorted(self): frequency_vals = self.frequency_vals return [frequency_vals[key] for key in self.frequency_order] def create_loop(self): def above_threshold(): val = self.acquisition_parameter.results[self.discriminant] is_above_threshold = val > self.threshold logger.debug(f'{self.discriminant} {val} > {self.threshold}: {is_above_threshold}') return is_above_threshold if self.acquisition_parameter is None: raise RuntimeError('Must specify acquisition_parameter') if self.frequency_set_parameter is None: raise RuntimeError('Must specify frequency_set_parameter') actions = [self.acquisition_parameter] if self.break_if_satisfied: actions.append(BreakIf(above_threshold)) loop = Loop( self.frequency_set_parameter[self.frequency_vals_sorted] ).each( *actions ) return loop @clear_single_settings def get_raw(self): self.loop = self.create_loop() self.data = self.loop.get_data_set(name=self.name, location=self.loc_provider) temporary_settings = {"continuous": True, "base_folder": self.data.location, "subfolder": 'traces'} if self.samples is not None: temporary_settings['samples'] = self.samples self.acquisition_parameter.temporary_settings(**temporary_settings) if (self.layout.pulse_sequence != self.acquisition_parameter.pulse_sequence or self.layout.samples() != self.acquisition_parameter.samples): self.acquisition_parameter.setup() self.loop.run(set_active=False, quiet=True, stop=not self.continuous) discriminant_vals = getattr(self.data, self.discriminant).ndarray with np.errstate(invalid='ignore'): if np.nansum(discriminant_vals >= self.threshold) == 1: # Successfully found nucleus state frequency_idx = np.nanargmax(discriminant_vals) self.results['found_nucleus_state'] = True self.results['nucleus_state'] = int( self.frequency_order[frequency_idx]) else: # Could not localize nucleus state self.results['found_nucleus_state'] = False self.results['nucleus_state'] = np.nan self.results['max_nucleus_' + self.discriminant] = np.nanmax(discriminant_vals) self.results['nucleus_' + self.discriminant] = discriminant_vals self.results['average_' + self.discriminant] = np.nanmean(discriminant_vals) if not self.silent: self.print_results() return [self.results[name] for name in self.names] def set(self, **kwargs): # Set single settings return self.single_settings(**kwargs)()
[docs]class DCMultisweepParameter(MeasurementParameter): formatter = None def __init__(self, name, acquisition_parameter, x_gate, y_gate, **kwargs): super().__init__(name=name, names=['DC_voltage'], labels=['DC voltage'], units=['V'], shapes=((1, 1),), setpoint_names=((y_gate.name, x_gate.name),), setpoint_units=(('V', 'V'),), acquisition_parameter=acquisition_parameter) self.x_gate = x_gate self.y_gate = y_gate self.x_range = None self.y_range = None self.AC_range = 0.2 self.pts = 120 self.continuous = False @property def x_sweeps(self): return np.ceil((self.x_range[1] - self.x_range[0]) / self.AC_range) @property def y_sweeps(self): return np.ceil((self.y_range[1] - self.y_range[0]) / self.AC_range) @property def x_sweep_range(self): return (self.x_range[1] - self.x_range[0]) / self.x_sweeps @property def y_sweep_range(self): return (self.y_range[1] - self.y_range[0]) / self.y_sweeps @property def AC_x_vals(self): return np.linspace(-self.x_sweep_range / 2, self.x_sweep_range / 2, self.pts + 1)[:-1] @property def AC_y_vals(self): return np.linspace(-self.y_sweep_range / 2, self.y_sweep_range / 2, self.pts + 1)[:-1] @property def DC_x_vals(self): return np.linspace(self.x_range[0] + self.x_sweep_range / 2, self.x_range[1] - self.x_sweep_range / 2, self.x_sweeps).tolist() @property def DC_y_vals(self): return np.linspace(self.y_range[0] + self.y_sweep_range / 2, self.y_range[1] - self.y_sweep_range / 2, self.y_sweeps).tolist() @property_ignore_setter def setpoints(self): return convert_setpoints(np.linspace(self.y_range[0], self.y_range[1], self.pts * self.y_sweeps), np.linspace(self.x_range[0], self.x_range[1], self.pts * self.x_sweeps)), @property_ignore_setter def shapes(self): return (len(self.DC_y_vals) * self.pts, len(self.DC_x_vals) * self.pts),
[docs] def setup(self): self.acquisition_parameter.sweep_parameters.clear() self.acquisition_parameter.add_sweep(self.x_gate.name, self.AC_x_vals, connection_label=self.x_gate.name, offset_parameter=self.x_gate) self.acquisition_parameter.add_sweep(self.y_gate.name, self.AC_y_vals, connection_label=self.y_gate.name, offset_parameter=self.y_gate) self.acquisition_parameter.setup()
[docs] def get(self): self.loop = qc.Loop(self.y_gate[self.DC_y_vals]).loop( self.x_gate[self.DC_x_vals]).each(self.acquisition_parameter) self.acquisition_parameter.temporary_settings(continuous=True) try: if not self.continuous: self.setup() self.data = self.loop.run(name=f'multi_2D_scan', set_active=False, formatter=self.formatter, save_metadata=False) # except: # logger.debug('except stopping') # self.layout.stop() # self.acquisition_parameter.clear_settings() # raise finally: if not self.continuous: logger.debug('finally stopping') self.layout.stop() self.acquisition_parameter.clear_settings() arr = np.zeros((len(self.DC_y_vals) * self.pts, len(self.DC_x_vals) * self.pts)) for y_idx in range(len(self.DC_y_vals)): for x_idx in range(len(self.DC_x_vals)): DC_data = self.data.DC_voltage[y_idx, x_idx] arr[y_idx * self.pts:(y_idx + 1) * self.pts, x_idx * self.pts:(x_idx + 1) * self.pts] = DC_data return arr,
[docs]class MeasurementSequenceParameter(MeasurementParameter): def __init__(self, name, measurement_sequence=None, set_parameters=None, discriminant=None, start_condition=None, **kwargs): """ Args: name: set_parameters: acquisition_parameter: operations: discriminant: conditions: Must be of one of the following forms {'mode': 'measure'} {'mode': '1D_scan', 'span', 'set_points', 'set_parameter', 'center_val'(optional) **kwargs: """ SettingsClass.__init__(self) self.discriminant = discriminant self.set_parameters = set_parameters self.start_condition = start_condition if isinstance(measurement_sequence, str): # Load sequence from dict load_dict = measurement_config[measurement_sequence] measurement_sequence = MeasurementSequence.load_from_dict(load_dict) self.measurement_sequence = measurement_sequence self.acquisition_parameter = measurement_sequence.acquisition_parameter super().__init__( name=name, names=[name + '_msmts', 'optimal_set_vals', self.discriminant], shapes=((), (len(self.set_parameters),), ()), discriminant=self.discriminant, acquisition_parameter=self.acquisition_parameter, **kwargs) self._meta_attrs.extend(['discriminant'])
[docs] @clear_single_settings def get(self): if self.start_condition is None: start_condition_satisfied = True elif isinstance(self.start_condition, TruthCondition): if self.acquisition_parameter.results is None: logger.info('Start TruthCondition satisfied because ' 'acquisition parameter has no results') start_condition_satisfied = True else: start_condition_satisfied = \ self.start_condition.check_satisfied( self.acquisition_parameter.results)[0] logger.info(f'Start Truth condition {self.start_condition} ' f'satisfied: {start_condition_satisfied}') else: start_condition_satisfied = self.start_condition() logger.info(f'Start condition function {self.start_condition} ' f'satisfied: {start_condition_satisfied}') if not start_condition_satisfied: num_measurements = -1 optimal_set_vals = [p() for p in self.set_parameters] optimal_val = self.measurement_sequence.optimal_val return num_measurements, optimal_set_vals, optimal_val else: self.measurement_sequence.base_folder = self.base_folder result = self.measurement_sequence() num_measurements = self.measurement_sequence.num_measurements logger.info(f"Measurements performed: {num_measurements}") if result['action'] == 'success': # Retrieve dict of {param.name: val} of optimal set vals optimal_set_vals = self.measurement_sequence.optimal_set_vals # Convert dict to list of set vals optimal_set_vals = [optimal_set_vals.get(p.name, p()) for p in self.set_parameters] else: optimal_set_vals = [p() for p in self.set_parameters] optimal_val = self.measurement_sequence.optimal_val return num_measurements, optimal_set_vals, optimal_val
[docs]class SelectFrequencyParameter(MeasurementParameter): def __init__(self, threshold=0.5, discriminant=None, frequencies=None, mode=None, acquisition_parameter=None, update_frequency=True, **kwargs): # Initialize SettingsClass first because its needed for # self.spin_states, self.discriminant etc. SettingsClass.__init__(self) self.mode = mode self._discriminant = discriminant names = ['{}_{}'.format(self.discriminant, spin_state) for spin_state in self.spin_states] names.append('frequency') super().__init__(self, name='select_frequency', label='Select frequency', names=names, discriminant=self.discriminant, **kwargs) self.acquisition_parameter = acquisition_parameter self.update_frequency = update_frequency self.threshold = threshold self.frequencies = frequencies self.frequency = None self.samples = None self.measurement = None self._meta_attrs.extend(['frequencies', 'frequency', 'update_frequency', 'spin_states', 'threshold', 'discriminant']) @property def spin_states(self): spin_states_unsorted = self.frequencies.keys() return sorted(spin_states_unsorted)
[docs] @clear_single_settings def get(self): # Initialize frequency to the current frequency (default in case none # of the measured frequencies satisfy conditions) frequency = self.acquisition_parameter.frequency self.acquisition_parameter.temporary_settings(samples=self.samples) frequencies = [self.frequencies[spin_state] for spin_state in self.spin_states] self.condition_sets = ConditionSet( (self.discriminant, '>', self.threshold)) # Create Measurement object and perform measurement self.measurement = Loop1DMeasurement( name=self.name, acquisition_parameter=self.acquisition_parameter, set_parameter=self.acquisition_parameter, base_folder=self.base_folder, condition_sets=self.condition_sets) self.measurement(frequencies) self.measurement() self.results = {self.discriminant: getattr(self.measurement.dataset, self.discriminant)} # Determine optimal frequency and update config entry if needed self.condition_result = self.measurement.check_condition_sets() if self.condition_result['is_satsisfied']: frequency = self.measurement.optimal_set_vals[0] if self.update_frequency: properties_config['frequency'] = frequency else: if not self.silent: logger.warning("Could not find frequency with high enough " "contrast") self['frequency'] = frequency self.acquisition_parameter.clear_settings() # Print results if not self.silent: self.print_results() return [self.results[name] for name in self.names]
[docs]class TrackPeakParameter(MeasurementParameter): def __init__(self, name, set_parameter=None, acquisition_parameter=None, step_percentage=None, peak_width=None, points=None, discriminant=None, threshold=None, **kwargs): SettingsClass.__init__(self) self.set_parameter = set_parameter self.acquisition_parameter = acquisition_parameter self._discriminant = discriminant names = ['optimal_set_vals', self.set_parameter.name + '_set', self.discriminant] super().__init__(name=name, names=names, discriminant=self.discriminant, acquisition_parameter=acquisition_parameter, **kwargs) self.step_percentage = step_percentage self.peak_width = peak_width self.points = points self.threshold = threshold self.condition_sets = None self.measurement = None @property def set_vals(self): if self.peak_width is None and \ (self.points is None or self.step_percentage is None): # Retrieve peak_width from parameter config only if above # conditions are satisfied self.peak_width = parameter_config[self.set_parameter]['peak_width'] return create_set_vals(num_parameters=1, step_percentage=self.step_percentage, points=self.points, window=self.peak_width, set_parameters=self.set_parameter) @property def shapes(self): return [(), (len(self.set_vals),), (len(self.set_vals),)]
[docs] @clear_single_settings def get(self): # Create measurement object if self.threshold is not None: # Set condition set self.condition_sets = \ [ConditionSet((self.discriminant, '>', self.threshold))] self.measurement = Loop1DMeasurement( name=self.name, acquisition_parameter=self.acquisition_parameter, set_parameter=self.set_parameter, base_folder=self.base_folder, condition_sets=self.condition_sets, discriminant=self.discriminant, silent=self.silent, update=True) # Set loop values self.measurement(self.set_vals) # Obtain set vals as a list instead of a parameter iterable set_vals = self.set_vals[:] self.measurement() trace = getattr(self.measurement.dataset, self.discriminant) optimal_set_val = self.measurement.optimal_set_vals[ self.set_parameter.name] self.result = [optimal_set_val, set_vals, trace] return self.result