Source code for silq.instrument_interfaces.keysight.Keysight_SD_DIG_interface

from typing import List
import numpy as np

from silq.instrument_interfaces import InstrumentInterface, Channel
from silq.pulses.pulse_types import Pulse, TriggerPulse

from qcodes.utils import validators as vals
from qcodes.instrument_drivers.Keysight.SD_common.SD_acquisition_controller \
    import Triggered_Controller

import logging
logger = logging.getLogger(__name__)


[docs]class Keysight_SD_DIG_Interface(InstrumentInterface): def __init__(self, instrument_name, **kwargs): super().__init__(instrument_name, **kwargs) self.pulse_sequence.allow_untargeted_pulses = True self.pulse_sequence.allow_pulse_overlap = True # Initialize channels self._acquisition_channels = { f'ch{k}': Channel(instrument_name=self.instrument_name(), name=f'ch{k}', id=k, input=True) for k in range(self.instrument.n_channels)} self._pxi_channels = { f'pxi{k}': Channel(instrument_name=self.instrument_name(), name=f'pxi{k}', id=4000 + k, input_trigger=True, output=True, input=True) for k in range(self.instrument.n_triggers)} self._channels = { **self._acquisition_channels , **self._pxi_channels, 'trig_in': Channel(instrument_name=self.instrument_name(), name='trig_in', input=True), } self.add_parameter(name='acquisition_controller', set_cmd=None, docstring='Acquisition controller for acquiring ' 'data with SD digitizer. ' 'Must be acquisition controller object.') self.add_parameter(name='acquisition_channels', initial_value=[], set_cmd=None, vals=vals.Lists(), docstring='Names of acquisition channels ' '[chA, chB, etc.]. Set by the layout') self.add_parameter('sample_rate', vals=vals.Numbers(), set_cmd=None, docstring='Acquisition sampling rate (Hz)') self.add_parameter('samples', vals=vals.Numbers(), set_cmd=None, docstring='Number of times to acquire the pulse ' 'sequence.') self.add_parameter('points_per_trace', get_cmd=lambda: self.acquisition_controller().samples_per_trace(), docstring='Number of points in a trace.') self.add_parameter('channel_selection', set_cmd=None, docstring='Active channel indices to acquire. ' 'Zero-based index (chA -> 0, etc.). ' 'Set during setup and should not be set' 'manually.') self.add_parameter('minimum_timeout_interval', unit='s', vals=vals.Numbers(), initial_value=5, set_cmd=None, docstring='Minimum value for timeout when acquiring ' 'data. If 2.1 * pulse_sequence.duration ' 'is lower than this value, it will be ' 'used instead') self.add_parameter('trigger_in_duration', unit='s', vals=vals.Numbers(), initial_value=15e-6, set_cmd=None, docstring="Duration for a receiving trigger signal. " "This is passed the the interface that is " "sending the triggers to this instrument.") self.add_parameter('capture_full_trace', initial_value=False, vals=vals.Bool(), set_cmd=None, docstring='Capture from t=0 to end of pulse ' 'sequence. False by default, in which ' 'case start and stop times correspond to ' 'min(t_start) and max(t_stop) of all ' 'pulses with the flag acquire=True, ' 'respectively.') # dict of raw unsegmented traces {ch_name: ch_traces} self.traces = {} # Segmented traces per pulse, {pulse_name: {channel_name: {ch_pulse_traces}} self.pulse_traces = {} # Set up the driver to a known default state self.initialize_driver()
[docs] def acquisition(self): """Perform acquisition""" traces = self.acquisition_controller().acquisition() self.stop() self.traces = {ch: ch_traces for ch, ch_traces in zip(self.acquisition_channels(), traces)} # The start of acquisition if self.capture_full_trace(): t0 = 0 else: t0 = min(pulse.t_start for pulse in self.pulse_sequence.get_pulses(acquire=True)) # Split data into pulse traces pulse_traces = {} for pulse in self.pulse_sequence.get_pulses(acquire=True): name = pulse.full_name pulse_traces[name] = {} for k, ch_name in enumerate(self.acquisition_channels()): ch_data = self.traces[ch_name] start_idx = int(round((pulse.t_start - t0) * self.sample_rate())) stop_idx = start_idx + int(round(pulse.duration * self.sample_rate())) # Extract pulse data from the channel data pulse_traces[name][ch_name] = ch_data[:, start_idx:stop_idx] # Further average the pulse data if pulse.average == 'none': pass elif pulse.average == 'trace': pulse_traces[name][ch_name] = np.mean(pulse_traces[name][ch_name], axis=0) elif pulse.average == 'point': pulse_traces[name][ch_name] = np.mean(pulse_traces[name][ch_name]) elif 'point_segment' in pulse.average: segments = int(pulse.average.split(':')[1]) split_arrs = np.array_split(pulse_traces[name][ch_name], segments, axis=1) pulse_traces[name][ch_name] = np.mean(split_arrs, axis=(1,2)) else: raise SyntaxError(f'average mode {pulse.average} not configured') self.pulse_traces = pulse_traces return pulse_traces
[docs] def initialize_driver(self): """ Puts driver into a known initial state. Further configuration will be done in the configure_driver and get_additional_pulses functions. """ self.instrument.channels.impedance('50') # 50 Ohm impedance self.instrument.channels.coupling('DC') # DC Coupled self.instrument.channels.full_scale(3.0) # 3.0 Volts
[docs] def get_additional_pulses(self, connections) -> List[Pulse]: """Additional pulses needed by instrument after targeting of main pulses Args: connections: List of all connections in the layout Returns: List of additional pulses, empty by default. """ if self.input_pulse_sequence.get_pulses(trigger=True): logger.warning('SD digitizer has a manual trigger pulse defined.' 'This should normally be done automatically instead.') return [] elif not self.pulse_sequence.get_pulses(acquire=True): # No pulses need to be acquired return [] else: # A trigger pulse is needed try: trigger_connection = next(connection for connection in connections if connection.trigger or connection.trigger_start) except StopIteration: logger.error('Could not find trigger connection for SD digitizer') connection_requirements = {'input_instrument': self.instrument_name()} if trigger_connection.trigger: connection_requirements['trigger'] = True t_start = min(pulse.t_start for pulse in self.pulse_sequence.get_pulses(acquire=True)) else: # connection.trigger_start or capture full trace connection_requirements['trigger_start'] = True t_start = 0 if self.capture_full_trace(): # Override t_start to capture full trace t_start = 0 return [TriggerPulse(t_start=t_start, duration=self.trigger_in_duration(), connection_requirements=connection_requirements)]
[docs] def setup(self, samples=1, input_connections=(), **kwargs): self.samples(samples) # Select the channels to acquire channel_selection = [int(ch_name[-1]) for ch_name in self.acquisition_channels()] self.channel_selection(channel_selection) self.acquisition_controller().channel_selection(channel_selection) # Pulse averaging is done in the interface, not the controller self.acquisition_controller().average_mode('none') if isinstance(self.acquisition_controller(), Triggered_Controller): self.acquisition_controller().sample_rate(self.sample_rate()) self.acquisition_controller().traces_per_acquisition(self.samples()) # Capture maximum number of samples on all channels if not self.capture_full_trace(): t_start = min(self.pulse_sequence.t_start_list) t_stop = max(self.pulse_sequence.t_stop_list) else: # Capture full trace, even if no pulses have acquire=True # Mainly done so that the full trace can be stored. t_start = 0 t_stop = self.pulse_sequence.duration # !!! Changed np.ceil to np.round !!! samples_per_trace = int(np.round((t_stop - t_start) * self.sample_rate())) samples_per_trace += samples_per_trace % 2 self.acquisition_controller().samples_per_trace(samples_per_trace) # Set read timeout interval # This is the interval for requesting an acquisition. # This allows us to interrupt an acquisition prematurely. timeout_interval = max(2.1 * self.pulse_sequence.duration, self.minimum_timeout_interval()) self.acquisition_controller().timeout_interval(timeout_interval) # An error is raised if no data has been acquired within 64 seconds. self.acquisition_controller().timeout(64.0) # Traces per read should not be longer than the timeout interval traces_per_read = max(timeout_interval // self.pulse_sequence.duration, 1) self.acquisition_controller().traces_per_read(traces_per_read) self.setup_trigger(t_start, input_connections) else: raise RuntimeError('No setup configured for acquisition controller ' f'{self.acquisition_controller()}')
[docs] def setup_trigger(self, t_start, input_connections): # Setup triggering trigger_pulse = self.input_pulse_sequence.get_pulse(trigger=True) if trigger_pulse is None: trigger_pulse = self.input_pulse_sequence.get_pulse(trigger_start=True) assert trigger_pulse is not None, "No trigger pulse found for digitizer" trigger_channel = trigger_pulse.connection.input['channel'].name # Also sets trigger mode, etc. self.acquisition_controller().trigger_channel(trigger_channel) if trigger_channel.startswith('ch'): self.acquisition_controller().analog_trigger_edge('rising') self.acquisition_controller().analog_trigger_threshold( trigger_pulse.amplitude / 2) elif trigger_channel == 'trig_in': self.acquisition_controller().digital_trigger_mode('rising') else: # PXI channel self.acquisition_controller().digital_trigger_mode('rising') if self.input_pulse_sequence.get_pulses(name='Bayes'): bayesian_pulse = self.input_pulse_sequence.get_pulse(name='Bayes') trigger_delay = int(bayesian_pulse.t_start * 2 * self.sample_rate()) trigger_delay_samples = int(round(trigger_delay * self.sample_rate())) elif any(connection.trigger_start for connection in input_connections): trigger_delay_samples = int(round(t_start * self.sample_rate())) else: trigger_delay_samples = 0 self.acquisition_controller().trigger_delay_samples(trigger_delay_samples)
[docs] def stop(self): # Stop all DAQs self.instrument.stop_channels(range(8))