Source code for silq.instrument_interfaces.Tektronix.AWG520_interface

import numpy as np
import logging
from time import sleep
from copy import copy

import silq
from silq.instrument_interfaces import InstrumentInterface, Channel
from silq.pulses import DCPulse, DCRampPulse, TriggerPulse, SinePulse, \
    PulseImplementation
from qcodes.utils.helpers import arreqclose_in_list
from silq.tools.pulse_tools import pulse_to_waveform_sequence
from qcodes.utils.validators import Lists, Enum, Numbers


logger = logging.getLogger(__name__)

[docs]class AWG520Interface(InstrumentInterface): """ Notes: - Sets first point of each waveform to final voltage of previous waveform because this is the value used when the previous waveform ended and is waiting for triggers. - Amplitude significantly decreases above 400 MHz at 1 GHz sampling. - The interface receives a trigger at the end of the sequence, ignoring any final_delay. This means that the pulse sequence will already have restarted during any final delay, which may cause issues. Todo: Check if only channel 2 can be programmed Add marker channels """ max_waveforms = 200 def __init__(self, instrument_name, **kwargs): super().__init__(instrument_name, **kwargs) self._output_channels = { f'ch{k}': Channel(instrument_name=self.instrument_name(), name=f'ch{k}', id=k, output=True) for k in [1,2] } self._channels = { **self._output_channels, 'trig_in': Channel(instrument_name=self.instrument_name(), name='trig_in', input_trigger=True) } self.pulse_implementations = [ DCPulseImplementation(pulse_requirements=[('amplitude', {'min': -1, 'max': 1})]), SinePulseImplementation(pulse_requirements=[('frequency', {'max': 500e6}), ('amplitude', {'min': -1, 'max': 1})]) ] self.add_parameter('pulse_final_delay', unit='s', set_cmd=None, initial_value=.1e-6, docstring='Time subtracted from each waveform to ensure ' 'that it is finished once next trigger arrives.') self.add_parameter('trigger_in_duration', unit='s', set_cmd=None, initial_value=.1e-6) self.add_parameter('active_channels', set_cmd=None, vals=Lists(Enum('ch1', 'ch2'))) self.add_parameter('sampling_rate', unit='1/s', initial_value=1e9, set_cmd=None, vals=Numbers(max_value=1e9)) self.waveforms = {} self.waveform_filenames = {} self.sequence = {} # Create silq folder to place waveforms and sequences in self.instrument.change_folder('/silq', create_if_necessary=True) # Delete all files in folder self.instrument.delete_all_files()
[docs] def get_additional_pulses(self, **kwargs): # Return empty list if no pulses are in the pulse sequence if not self.pulse_sequence or self.is_primary(): return [] active_channels = list(set(pulse.connection.output['channel'].name for pulse in self.pulse_sequence)) connections = {ch: self.pulse_sequence.get_connection( output_instrument=self.instrument.name, output_channel=ch) for ch in active_channels} # If a pulse starts on one channel and needs a trigger while another # pulse is still active on the other channel, this would cause the other # pulse to move onto the next pulse prematurely. This only happens if # the other pulse finished its waveform and is waiting for a trigger. # Here we check that this does not occur. if len(active_channels) > 1: t = 0 gap_pulses = [] for t_start in self.pulse_sequence.t_start_list: if t_start < t: raise RuntimeError( f'Pulse starting before end of previous pulse {t}') elif t_start > t: # Add gap pulses for each channel for ch in active_channels: gap_pulse = DCPulse(t_start=t, t_stop=t_start, amplitude=0, connection=connections[ch]) gap_pulses.append(self.get_pulse_implementation(gap_pulse)) t = t_start pulses = {ch: self.pulse_sequence.get_pulse(t_start=t_start, output_channel=ch) for ch in active_channels} if pulses['ch1'] is None and pulses['ch2'] is None: raise RuntimeError(f"pulse sequence has t_start={t_start}, " "but couldn't find pulse for either channel") elif pulses['ch1'] is not None and pulses['ch2'] is not None: assert pulses['ch1'].t_stop == pulses['ch2'].t_stop, \ f"t_stop of pulses starting at {t_start} must be equal." \ f"This is a current limitation of the AWG interface." t = pulses['ch1'].t_stop elif pulses['ch1'] is not None: # add gap pulse for ch2 gap_pulse = DCPulse(t_start=t_start, t_stop=pulses['ch1'].t_stop, amplitude=0, connection=connections['ch2']) gap_pulses.append(self.get_pulse_implementation(gap_pulse)) t = pulses['ch1'].t_stop else: # add gap pulse for ch1 gap_pulse = DCPulse(t_start=t_start, t_stop=pulses['ch2'].t_stop, amplitude=0, connection=connections['ch1']) gap_pulses.append(self.get_pulse_implementation(gap_pulse)) t = pulses['ch2'].t_stop else: ch = active_channels[0] # only add gap pulses for active channel connection = self.pulse_sequence.get_connection( output_instrument=self.instrument.name, output_channel=ch) gap_pulses = [] t = 0 for t_start in self.pulse_sequence.t_start_list: if t_start < t: raise RuntimeError('Pulse starting before previous pulse ' f'finished {t} s') elif t_start > t: # Add gap pulse gap_pulse = DCPulse(t_start=t, t_stop=t_start, amplitude=0, connection=connection) gap_pulses.append(self.get_pulse_implementation(gap_pulse)) pulse = self.pulse_sequence.get_pulse(t_start=t_start, output_channel=ch) # Pulse will be None if the pulse sequence has a final delay if pulse is not None: t = pulse.t_stop if t != self.pulse_sequence.duration: for ch in active_channels: gap_pulse = DCPulse(t_start=t, t_stop=self.pulse_sequence.duration, amplitude=0, connection=connections[ch]) gap_pulses.append(self.get_pulse_implementation(gap_pulse)) if gap_pulses: self.pulse_sequence.add(*gap_pulses) # TODO test if first waveform needs trigger as well additional_pulses = [ TriggerPulse(t_start=t_start, duration=self.trigger_in_duration(), connection_requirements={ 'input_instrument': self.instrument_name(), 'trigger': True}) for t_start in self.pulse_sequence.t_start_list + [self.pulse_sequence.duration] ] return additional_pulses
[docs] def setup(self, **kwargs): assert not self.is_primary(), 'AWG520 not programmed as primary instrument' self.active_channels(list({pulse.connection.output['channel'].name for pulse in self.pulse_sequence})) # Get waveforms for all channels t = 0 pulses = {ch: self.pulse_sequence.get_pulses(output_channel=ch) for ch in self.active_channels()} if len(self.active_channels()) > 1: assert len(pulses['ch1']) == len(pulses['ch2']), \ "Channel1 and channel2 do not have equal number of pulses" N_instructions = len(pulses[self.active_channels()[0]]) waveforms = {ch: [0] * N_instructions for ch in self.active_channels()} repetitions = [0] * N_instructions for k in range(N_instructions): pulse = {ch: pulses[ch][k] for ch in self.active_channels()} # Get pulse of first channel pulse1 = pulse[self.active_channels()[0]] if len(self.active_channels()) > 1: assert t == pulse["ch1"].t_start == pulse["ch2"].t_start, \ f't={t} does not match pulse.t_start={pulse["ch1"].t_start}' assert pulse["ch1"].t_stop == pulse["ch2"].t_stop, \ f'pulse["ch1"].t_stop != pulse["ch2"].t_stop: ' \ f'{pulse["ch1"].t_stop} != {pulse["ch2"].t_stop}' else: assert t == pulse1.t_start, "t != pulse.t_start: {t} != {pulse1.t_start}" # Waveform points of both channels need to match # Here we find out the number of points needed pulse_pts = [pulse.implementation.pts for pulse in pulse.values()] if None in pulse_pts: waveform_pts = int(round((pulse1.t_stop - self.pulse_final_delay() - pulse1.t_start) * self.sampling_rate())) else: waveform_pts = int(round(max(pulse_pts))) # subtract 0.5 from waveform points to ensure correct length t_list = np.arange(pulse1.t_start, pulse1.t_start + (waveform_pts-0.5)/self.sampling_rate(), 1/self.sampling_rate()) if len(t_list) % 4: # Points need to be increment of 4 t_list = t_list[:len(t_list) - (len(t_list) % 4)] assert len(t_list) >= 256, \ f"Waveform has too few points at pulse.t_start={pulse1.t_start}" single_repetitions = [0 for _ in self.active_channels()] for ch_idx, ch in enumerate(self.active_channels()): waveforms[ch][k], single_repetitions[ch_idx] = pulse[ch].implementation.implement( t_list=t_list, sampling_rate=self.sampling_rate()) if all(repetition == None for repetition in single_repetitions): # None of the channel pulses cares about repetitions, set to 1 repetitions[k] = 1 else: specified_repetitions = {elem for elem in single_repetitions if elem is not None} if len(set(specified_repetitions)) > 1: raise RuntimeError(f'Pulses {pulse} require different number' f'of repetitions: {single_repetitions}') repetitions[k] = next(iter(specified_repetitions)) t = pulse1.t_stop # Set first point of each waveform to endpoint of previous waveform endpoint_voltages = {ch: [waveform[-1] for waveform in ch_waveforms] for ch, ch_waveforms in waveforms.items()} for ch in self.active_channels(): for k, waveform in enumerate(waveforms[ch]): waveform[0] = endpoint_voltages[ch][(k-1) % len(waveforms[ch])] # Create list of unique waveforms and corresponding sequence waveforms_list = [] sequence = {ch: [] for ch in self.active_channels()} for ch in self.active_channels(): for waveform in waveforms[ch]: waveform_idx = arreqclose_in_list(waveform, waveforms_list, rtol=-1e-4, atol=1e-4) if waveform_idx is None: waveforms_list.append(waveform) waveform_idx = len(waveforms_list) - 1 sequence[ch].append(waveform_idx) self.instrument.stop() self.instrument.trigger_mode('ENH') self.instrument.trigger_level(1) self.instrument.clock_freq(self.sampling_rate()) for ch in ['ch1', 'ch2']: self.instrument[f'{ch}_offset'](0) self.instrument[f'{ch}_amp'](2) self.instrument[f'{ch}_status']('OFF') # for ch in [self.instrument.ch1, self.instrument.ch2]: # ch.offset(0) # ch.amplitude(1) # ch.status('OFF') # Create silq folder to place waveforms and sequences in self.instrument.change_folder('/silq', create_if_necessary=True) total_waveform_points = sum(len(waveform) for waveform in waveforms_list) assert total_waveform_points < 3.99e6, \ f'Too many total waveform points: {total_waveform_points}' total_existing_waveform_points = sum( len(waveform) for waveform in self.waveform_filenames.values()) if (total_waveform_points + total_existing_waveform_points > 3.99e6) or \ len(self.waveform_filenames) > self.max_waveforms: logger.info('Deleting existing waveforms from hard disk') self.instrument.delete_all_files(root=False) self.waveform_filenames.clear() waveform_filename_mapping = [] # Copy waveform filenames because we only want to update the attribute # If the setup is complete waveform_filenames = copy(self.waveform_filenames) for waveform in waveforms_list: waveform_idx = arreqclose_in_list(waveform, waveform_filenames.values(), rtol=1e-4, atol=1e-4) if waveform_idx is None: # Waveform has not yet been uploaded waveform_idx = len(waveform_filenames) # Upload waveform filename = f'waveform_{waveform_idx}.wfm' marker1 = marker2 = np.zeros(len(waveform)) self.instrument.send_waveform(waveform, marker1, marker2, filename, clock=self.sampling_rate()) waveform_filenames[filename] = waveform else: filename = list(waveform_filenames)[waveform_idx] # Add waveform mapping waveform_filename_mapping.append(filename) # Upload sequence sequence_waveform_names = [ [waveform_filename_mapping[waveform_idx] for waveform_idx in sequence[ch]] for ch in self.active_channels()] wait_trigger = np.ones(N_instructions) goto_one = np.zeros(N_instructions) logic_jump = np.zeros(N_instructions) if len(self.active_channels()) == 1: sequence_waveform_names = sequence_waveform_names[0] self.instrument.send_sequence('sequence.seq', sequence_waveform_names, repetitions, wait_trigger, goto_one, logic_jump) # Check for errors here because else it adds a 50% overhead because it's # still busy setting the sequence for error in self.instrument.get_errors(): logger.warning(error) self.instrument.set_sequence('sequence.seq') self.waveforms = waveforms self.waveform_filenames = waveform_filenames self.sequence = sequence
[docs] def start(self): for ch in self.active_channels(): self.instrument[f'{ch}_status']('ON') # self.instrument.ch1.status('ON') # self.instrument.ch2.status('ON') self.instrument.start() # SLeep for a short time because else it sometimes misses first trigger sleep(0.2)
[docs] def stop(self): self.instrument.ch1_status('OFF') self.instrument.ch2_status('OFF') self.instrument.stop()
[docs]class DCPulseImplementation(PulseImplementation): # Number of points in a waveform (must be at least 256) pulse_class = DCPulse pts = 256
[docs] def implement(self, t_list: np.ndarray, **kwargs) -> np.ndarray: """Implements the DC pulses for the AWG520 Args: first_point_voltage: Voltage to set first point of waveform to. When the previous waveform ends, the voltage is set to the first point of the next voltage. sampling_rate: AWG sampling rate final_delay: Final part of waveform to skip. If this does not exist, the waveform may not have finished when next trigger arrives, in which case the trigger is ignored. Returns: waveform """ # AWG520 requires waveforms of at least 256 points waveform = self.pulse.amplitude * np.ones(len(t_list)) repetitions = None # Repetitions is irrelevant return waveform, repetitions
[docs]class SinePulseImplementation(PulseImplementation): # Number of points in a waveform (must be at least 256) pulse_class = SinePulse pts = None settings = {}
[docs] def implement(self, t_list: np.ndarray, sampling_rate: float, **kwargs) -> np.ndarray: """Implements Sine pulses for the AWG520 Args: first_point_voltage: Voltage to set first point of waveform to. When the previous waveform ends, the voltage is set to the first point of the next voltage. sampling_rate: AWG sampling rate final_delay: Final part of waveform to skip. If this does not exist, the waveform may not have finished when next trigger arrives, in which case the trigger is ignored. Returns: Waveform array """ settings = {**silq.config.properties.get('sine_waveform_settings', {}), **self.settings} max_points = settings.pop('max_points', 50000) if len(t_list) <= max_points: waveform = self.pulse.get_voltage(t_list) repetitions = 1 else: duration = np.max(t_list) - np.min(t_list) min_points = settings.pop('min_points', np.ceil(duration / 65536 * sampling_rate)) use_modified_frequency = settings.pop('use_modified_frequency', False) results = pulse_to_waveform_sequence(max_points=max_points, frequency=self.pulse.frequency, sampling_rate=sampling_rate, total_duration=duration, min_points=max(256, min_points), sample_points_multiple=4, **settings ) # Store results for debugging purposes self.results = results if use_modified_frequency: # Temporarily change frequency to modified value original_frequency = self.pulse.frequency self.pulse.frequency = results['optimum']['modified_frequency'] t_list_segment = t_list[:results['optimum']['points']] waveform = self.pulse.get_voltage(t_list_segment) repetitions = results['optimum']['repetitions'] if use_modified_frequency: # Revert frequency to original self.pulse.frequency = original_frequency return waveform, repetitions