Source code for silq.instrument_interfaces.keysight.Keysight_81180A_interface
import numpy as np
import logging
from typing import List, Union
from copy import copy
from silq import config
from silq.instrument_interfaces import InstrumentInterface, Channel
from silq.pulses import (
DCPulse,
TriggerPulse,
SinePulse,
FrequencyRampPulse,
PulseImplementation,
)
from silq.tools.general_tools import find_approximate_divisor
from silq.tools.pulse_tools import pulse_to_waveform_sequence
from qcodes import ManualParameter, ParameterNode, MatPlot
from qcodes import validators as vals
from qcodes.utils.helpers import arreqclose_in_list
from qcodes.config.config import DotDict
logger = logging.getLogger(__name__)
[docs]class Keysight81180AInterface(InstrumentInterface):
"""
Notes:
- When the output is turned on, there is a certain ramping time of a few
milliseconds. This negatively impacts the first repetition of a
pulse sequence
- To ensure voltage is fixed at final voltage of the pulse sequence
during any pulse_sequence.final_delay, the first point of the first
waveform of the sequence is set to that final voltage.
- see ``interface.additional_settings`` for instrument settings that should
be set manually
- When the last waveform of the sequence is finished, the time until the
next trigger is spent at the voltages of the first point of the first
waveform in the sequence. This includes any PulseSequence.final_delay.
However, the behaviour should be that during this final_delay, the
voltage is kept at the last point of the last waveform. To ensure this,
the first point of the first waveform is modified to that of the last
point of the last waveform.
- Creation of a sine waveform needs certain settings.
Defaults are given in the SinePulseImplementation, but they can be
overridden by setting the corresponding property in
``silq.config.properties.sine_waveform_settings``
"""
def __init__(self, instrument_name, max_amplitude=1.5, **kwargs):
assert max_amplitude <= 1.5
super().__init__(instrument_name, **kwargs)
self._output_channels = {
f"ch{k}": Channel(
instrument_name=self.instrument_name(), name=f"ch{k}", id=k, output=True
)
for k in [1, 2]
}
# TODO add marker outputs
self._channels = {
**self._output_channels,
"trig_in": Channel(
instrument_name=self.instrument_name(),
name="trig_in",
input_trigger=True,
),
"event_in": Channel(
instrument_name=self.instrument_name(),
name="event_in",
input_trigger=True,
),
"sync": Channel(
instrument_name=self.instrument_name(), name="sync", output=True
),
}
self.pulse_implementations = [
DCPulseImplementation(
pulse_requirements=[
("amplitude", {"max": max_amplitude}),
("duration", {"min": 100e-9}),
]
),
SinePulseImplementation(
pulse_requirements=[
("frequency", {"min": -1.5e9, "max": 1.5e9}),
("amplitude", {"min": 0, "max": max_amplitude}),
("duration", {"min": 100e-9}),
]
),
FrequencyRampPulseImplementation(
pulse_requirements=[
("frequency_start", {"min": -1.5e9, "max": 1.5e9}),
("frequency_stop", {"min": -1.5e9, "max": 1.5e9}),
("amplitude", {"min": 0, "max": max_amplitude}),
("duration", {"min": 100e-9}),
]
),
]
self.add_parameter(
"trigger_in_duration",
parameter_class=ManualParameter,
unit="s",
initial_value=1e-6,
)
self.add_parameter(
"active_channels",
parameter_class=ManualParameter,
initial_value=[],
vals=vals.Lists(vals.Strings()),
)
self.instrument.ch1.clear_waveforms()
self.instrument.ch2.clear_waveforms()
self.waveforms = {} # List of waveform arrays for each channel
# Optional initial waveform for each channel. Used to set the first point
# to equal the last voltage of the final pulse (see docstring for details)
self.waveforms_initial = {}
self.sequences = {} # List of sequence instructions for each channel
# offsets list of actual programmed sample points versus expected points
self.point_offsets = {}
self.max_point_offsets = {} # Maximum absolute sample point offset
self.point = {} # Current sample point, incremented as sequence is programmed
# Maximum tolerable absolute point offset before raising a warning
self.point_offset_limit = 100
# Add parameters that are not set via setup
self.additional_settings = ParameterNode()
for instrument_channel in self.instrument.channels:
channel = ParameterNode(instrument_channel.name)
setattr(self.additional_settings, instrument_channel.name, channel)
channel.output_coupling = instrument_channel.output_coupling
channel.sample_rate = instrument_channel.sample_rate
[docs] def get_additional_pulses(self, **kwargs):
# Currently the only supported sequence_mode is `once`, i.e. one trigger
# at the start of the pulse sequence
# Request a single trigger at the start of the pulse sequence
logger.info(f"Creating trigger for Keysight 81180A: {self.name}")
return [
TriggerPulse(
name=self.name + "_trigger",
t_start=0,
duration=self.trigger_in_duration(),
connection_requirements={
"input_instrument": self.instrument_name(),
"trigger": True,
},
)
]
[docs] def setup(self, **kwargs):
self.active_channels(
list(
{
pulse.connection.output["channel"].name
for pulse in self.pulse_sequence
}
)
)
for ch in self.active_channels():
instrument_channel = getattr(self.instrument, ch)
instrument_channel.off()
instrument_channel.continuous_run_mode(False)
instrument_channel.function_mode("sequenced")
# TODO Are these needed? Or set via sequence?
# instrument_channel.power(5) # If coupling is AC
# instrument_channel.voltage_DAC(voltage) # If coupling is DAC (max 0.5)
instrument_channel.voltage_DC(2) # If coupling is DC (max 2)
if not instrument_channel.output_coupling() == "DC":
logger.warning(
"Keysight 81180 output coupling is not DC. The waveform "
"amplitudes might be off."
)
instrument_channel.voltage_offset(0)
instrument_channel.output_modulation("off")
# Trigger settings
instrument_channel.trigger_input("TTL")
instrument_channel.trigger_source("external")
instrument_channel.trigger_slope("positive")
# Goto next waveform immediately if trigger received
instrument_channel.trigger_mode("override")
instrument_channel.trigger_level(1) # TODO Make into parameter
instrument_channel.trigger_delay(0)
# Only advance a single waveform/sequence per trigger.
instrument_channel.burst_count(1)
# Immediately skip to next waveform. Not sure what the difference is
# with trigger_mode('override')
instrument_channel.waveform_timing("immediate")
# Should be either 'once' (finish entire sequence per trigger)
# or 'stepped' (one trigger per waveform)
instrument_channel.sequence_mode("once")
# Do not repeat sequence multiple times
instrument_channel.sequence_once_count(1)
if self.instrument.is_idle() != "1":
logger.warning("Not idle")
self.instrument.ensure_idle = True
self.generate_waveform_sequences()
self.instrument.ensure_idle = False
[docs] def generate_waveform_sequences(self):
self.waveforms = {ch: [] for ch in self.active_channels()}
self.sequences = {ch: [] for ch in self.active_channels()}
self.point = {ch: 0 for ch in self.active_channels()}
self.point_offsets = {ch: [] for ch in self.active_channels()}
for ch in self.active_channels():
instrument_channel = self.instrument.channels[ch]
sample_rate = instrument_channel.sample_rate()
# Set start time t=0
t_pulse = 0
self.waveforms_initial[ch] = None
# A waveform must have at least 320 points
min_waveform_duration = 320 / sample_rate
# Always begin by waiting for a trigger/event pulse
# Add empty waveform (0V DC), with minimum points (320)
self.add_single_waveform(ch, waveform_array=np.zeros(320))
pulses = self.pulse_sequence.get_pulses(output_channel=ch)
for pulse in pulses:
# Check if there is a gap between next pulse and current time t_pulse
if pulse.t_start + 1e-11 < t_pulse:
raise SyntaxError(
f"Trying to add pulse {pulse} which starts before current "
f"time position in waveform {t_pulse}"
)
elif 1e-11 < pulse.t_start - t_pulse < min_waveform_duration + 1e-11:
# The gap between pulses is smaller than the minimum waveform
# duration. Cannot create DC waveform to bridge the gap
raise SyntaxError(
f"Delay between pulse {pulse} start {pulse.t_start} s "
f"and current time {t_pulse} s is less than minimum "
f"waveform duration. cannot add 0V DC pulse to bridge gap"
)
elif pulse.t_start - t_pulse >= min_waveform_duration + 1e-11:
# Add 0V DC pulse to bridge the gap between pulses
self.sequences[ch] += self._add_DC_waveform(
channel_name=ch,
t_start=t_pulse,
t_stop=pulse.t_start,
amplitude=0,
sample_rate=sample_rate,
pulse_name="DC",
)
# Get waveform of current pulse
waveform = pulse.implementation.implement(sample_rate=sample_rate,)
# Add waveform and sequence steps
sequence_steps = self.add_pulse_waveforms(
ch,
**waveform,
t_stop=pulse.t_stop,
sample_rate=sample_rate,
pulse_name=pulse.name,
)
self.sequences[ch] += sequence_steps
# Set current time to pulse.t_stop
t_pulse = pulse.t_stop
# Add 0V pulse if last pulse does not stop at pulse_sequence.duration
if self.pulse_sequence.duration - t_pulse >= min_waveform_duration + 1e-11:
self.sequences[ch] += self._add_DC_waveform(
channel_name=ch,
t_start=t_pulse,
t_stop=self.pulse_sequence.duration,
amplitude=0,
sample_rate=sample_rate,
pulse_name="final_DC",
)
last_waveform_idx = self.sequences[ch][-1][0]
last_waveform = self.waveforms[ch][last_waveform_idx - 1]
last_voltage = last_waveform[-1]
if self.waveforms_initial[ch] is not None:
waveform_initial_idx, waveform_initial = self.waveforms_initial[ch]
logger.debug(
f"Changing first point of first waveform to {last_voltage}"
)
waveform_initial[0] = last_voltage
self.waveforms[ch][waveform_initial_idx - 1] = waveform_initial
# Ensure there are at least three sequence instructions
while len(self.sequences[ch]) < 3:
waveform_idx = self.add_single_waveform(ch, last_voltage * np.ones(320))
# Add extra blank segment which will automatically run to
# the next segment (~ 70 ns offset)
self.sequences[ch].append((waveform_idx, 1, 0, "final_filler_pulse"))
# Ensure total waveform points are less than memory limit
total_waveform_points = sum(
len(waveform) for waveform in self.waveforms["ch1"]
)
if total_waveform_points > self.instrument.waveform_max_length:
raise RuntimeError(
f"Total waveform points {total_waveform_points} exceeds "
f"limit of 81180A ({self.instrument.waveform_max_length})"
)
# Sequence all loaded waveforms
waveform_idx_mapping = instrument_channel.upload_waveforms(
self.waveforms[ch], allow_existing=True
)
# Update waveform indices since they may correspond to pre-existing waveforms
self.sequences[ch] = [
(waveform_idx_mapping[idx], *instructions)
for idx, *instructions in self.sequences[ch]
]
instrument_channel.set_sequence(self.sequences[ch])
# Check that the sample point offsets do not exceed limit
self.max_point_offsets[ch] = max(np.abs(self.point_offsets[ch]))
if self.max_point_offsets[ch] > self.point_offset_limit:
logger.warning(
f"81180A maximum sample point offset exceeds limit {self.point_offset_limit}. "
f"Current maximum: {self.max_point_offsets}"
)
else:
logger.debug(
f"81180A sample point maximum offset: {self.max_point_offsets}"
)
def _add_DC_waveform(
self,
channel_name: str,
t_start: float,
t_stop: float,
amplitude: float,
sample_rate: float,
pulse_name="DC",
) -> List:
# We fake a DC pulse for improved performance
DC_pulse = DotDict(
dict(
t_start=t_start,
t_stop=t_stop,
duration=round(t_stop - t_start, 11),
amplitude=amplitude,
)
)
waveform = DCPulseImplementation.implement(
pulse=DC_pulse, sample_rate=sample_rate
)
sequence_steps = self.add_pulse_waveforms(
channel_name,
**waveform,
t_stop=DC_pulse.t_stop,
sample_rate=sample_rate,
pulse_name=pulse_name,
)
return sequence_steps
[docs] def add_single_waveform(
self, channel_name: str, waveform_array: np.ndarray, allow_existing: bool = True
) -> int:
"""Add waveform to instrument, uploading if necessary
If the waveform already exists on the instrument and allow_existing=True,
the existing waveform is used and no new waveform is uploaded.
Args:
channel_name: Name of channel for which to upload waveform
waveform_array: Waveform array
allow_existing:
Returns:
Waveform index, used for sequencing
Raises:
SyntaxError if waveform contains less than 320 points
"""
if len(waveform_array) < 320:
raise SyntaxError(f"Waveform length {len(waveform_array)} < 320")
self.waveforms.setdefault(channel_name, [])
# Check if waveform already exists in waveform array
if allow_existing:
waveform_idx = arreqclose_in_list(
waveform_array, self.waveforms[channel_name], atol=1e-3
)
else:
waveform_idx = None
# Check if new waveform needs to be created and uploaded
if waveform_idx is not None:
waveform_idx += 1 # Waveform index is 1-based
else:
# Add waveform to current list of waveforms
self.waveforms[channel_name].append(waveform_array)
# waveform index should be the position of added waveform (1-based)
waveform_idx = len(self.waveforms[channel_name])
return waveform_idx
[docs] def add_pulse_waveforms(
self,
channel_name: str,
waveform: np.ndarray,
loops: int,
waveform_initial: Union[np.ndarray, None],
waveform_tail: Union[np.ndarray, None],
t_stop: float,
sample_rate: float,
pulse_name=None,
):
sequence = []
total_points = 0
if pulse_name is None:
pulse_name = "pulse"
if waveform_initial is not None:
# An initial waveform must be added. This initial waveform corresponds
# to the beginning of a DC waveform at the start of a pulse sequence.
# See interface docstring for details
# Temporarily set waveform to None, will be set later to correct waveform
self.waveforms[channel_name].append(None)
waveform_initial_idx = len(self.waveforms[channel_name])
# Temporarily store initial waveform in separate variable
self.waveforms_initial[channel_name] = (
waveform_initial_idx,
waveform_initial,
)
# Add sequence step (waveform_idx, loops, jump_event)
sequence.append((waveform_initial_idx, 1, 0, f"{pulse_name}_pre"))
total_points += len(waveform_initial) # Update total waveform points
# Upload main waveform
waveform_idx = self.add_single_waveform(channel_name, waveform)
total_points += len(waveform) * loops # Update total waveform points
# Add sequence step (waveform_idx, loops, jump_event, label)
sequence.append((waveform_idx, loops, 0, pulse_name))
# Optionally add waveform tail
if waveform_tail is not None:
waveform_tail_idx = self.add_single_waveform(channel_name, waveform_tail)
sequence.append((waveform_tail_idx, 1, 0, f"{pulse_name}_tail"))
total_points += len(waveform_tail) # Update total waveform points
# Update the total number of sample points after having implemented
# this pulse waveform.
self.point[channel_name] += total_points
# Compare the total number of sample points to the expected number of
# sample points (t_stop * sample_rate). this may differ because waveforms
# must have a multiple of 32 points
expected_stop_point = int(t_stop * sample_rate)
self.point_offsets[channel_name].append(
self.point[channel_name] - expected_stop_point
)
return sequence
[docs] def start(self):
"""Turn all active instrument channels on"""
for ch_name in self.active_channels():
instrument_channel = getattr(self.instrument, ch_name)
instrument_channel.on()
[docs] def stop(self):
"""Turn both instrument channels off"""
self.instrument.ch1.off()
self.instrument.ch2.off()
[docs]class DCPulseImplementation(PulseImplementation):
pulse_class = DCPulse
[docs] @staticmethod
def implement(pulse: DCPulse, sample_rate: float, max_points: int = 6000) -> dict:
# TODO shouldn't the properties of self be from self.pulse instead?
# Number of points must be a multiple of 32
N = 32 * np.floor(pulse.duration * sample_rate / 32)
# Add a waveform_initial if this pulse is the start of the pulse_sequence
# and has a long enough duration. The first point of waveform_initial will
# later on be set to the last point of the last waveform, ensuring that
# any pulse_sequence.final_delay remains at the last voltage
if pulse.t_start == 0 and N > 640:
N -= 320
waveform_initial = pulse.amplitude * np.ones(320)
logger.debug("adding waveform_initial")
else:
waveform_initial = None
if N < 320:
raise RuntimeError(
f'Cannot add pulse because the number of waveform points {N} '
f'is less than the minimum 320. {pulse}'
)
# Find an approximate divisor of the number of points N, allowing us
# to shrink the waveform
approximate_divisor = find_approximate_divisor(
N=N,
max_cycles=1000000,
points_multiple=32, # waveforms must be multiple of 32
min_points=320, # waveform must have at least 320 points
max_points=max_points,
max_remaining_points=1000,
min_remaining_points=320,
)
if approximate_divisor is None:
raise RuntimeError(
f"Could not add DC waveform because no divisor "
f"could be found for the number of points {N}"
)
# Add waveform(s) and sequence steps
waveform = pulse.amplitude * np.ones(approximate_divisor["points"])
# Add separate waveform if there are remaining points left after division
if approximate_divisor["remaining_points"]:
waveform_tail = pulse.amplitude * np.ones(
approximate_divisor["remaining_points"]
)
else:
waveform_tail = None
return {
"waveform": waveform,
"loops": approximate_divisor["cycles"],
"waveform_initial": waveform_initial,
"waveform_tail": waveform_tail,
}
[docs]class SinePulseImplementation(PulseImplementation):
pulse_class = SinePulse
settings = {
"max_points": 50e3,
"frequency_threshold": 30,
}
[docs] def implement(self, sample_rate, plot=False, **kwargs):
# If frequency is zero, use DC pulses instead
if self.pulse.frequency == 0:
DC_pulse = DCPulse(
"DC_sine",
t_start=self.pulse.t_start,
t_stop=self.pulse.t_stop,
amplitude=self.pulse.get_voltage(self.pulse.t_start),
)
return DCPulseImplementation.implement(
pulse=DC_pulse, sample_rate=sample_rate
)
settings = copy(self.settings)
settings.update(**config.properties.get("sine_waveform_settings", {}))
settings.update(
**config.properties.get("keysight_81180A_sine_waveform_settings", {})
)
settings.update(**kwargs)
# Do not approximate frequency if the pulse is sufficiently short
max_points_exact = settings.pop('max_points_exact', 4000)
points = int(self.pulse.duration * sample_rate)
if points > max_points_exact:
self.results = pulse_to_waveform_sequence(
frequency=self.pulse.frequency,
sampling_rate=sample_rate,
total_duration=self.pulse.duration,
min_points=320,
sample_points_multiple=32,
plot=plot,
**settings,
)
else:
self.results = None
if self.results is None:
t_list = np.arange(self.pulse.t_start, self.pulse.t_stop, 1 / sample_rate)
t_list = t_list[: len(t_list) // 32 * 32] # Ensure pulse is multiple of 32
if len(t_list) < 320:
raise RuntimeError(
f"Sine waveform points {len(t_list)} is below "
f"minimum of 320. Increase pulse duration or "
f"sample rate. {self.pulse}"
)
return {
"waveform": self.pulse.get_voltage(t_list),
"loops": 1,
"waveform_initial": None,
"waveform_tail": None,
}
optimum = self.results["optimum"]
waveform_loops = max(optimum["repetitions"], 1)
# Temporarily modify pulse frequency to ensure waveforms have full period
original_frequency = self.pulse.frequency
self.pulse.frequency = optimum["modified_frequency"]
# Get waveform points for repeated segment
t_list = self.pulse.t_start + np.arange(optimum["points"]) / sample_rate
waveform_array = self.pulse.get_voltage(t_list)
# Potentially include a waveform tail
waveform_tail_pts = int(optimum["final_delay"] * sample_rate)
# Waveform must be multiple of 32, if number of points is less than
# this, there is no point in adding the waveform
if waveform_tail_pts >= 32:
if waveform_tail_pts < 320: # Waveform must be at least 320 points
# Find minimum number of loops of main waveform that are needed
# to increase tail to be at least 320 points long
subtract_loops = int(
np.ceil((320 - waveform_tail_pts) / optimum["points"])
)
else:
subtract_loops = 0
if waveform_loops - subtract_loops > 0:
# Safe to subtract loops from the main waveform, add to this one
waveform_loops -= subtract_loops
waveform_tail_pts += subtract_loops * optimum["points"]
t_list_tail = np.arange(
self.pulse.t_start + optimum["duration"] * waveform_loops,
self.pulse.t_stop,
1 / sample_rate,
)
t_list_tail = t_list_tail[: 32 * (len(t_list_tail) // 32)]
waveform_tail_array = self.pulse.get_voltage(t_list_tail)
else:
# Cannot subtract loops from the main waveform because then
# the main waveform would not have any loops remaining
waveform_tail_array = None
else:
waveform_tail_array = None
# Reset pulse frequency to original
self.pulse.frequency = original_frequency
if plot:
plot = MatPlot(subplots=(2, 1), figsize=(10, 6), sharex=True)
ax = plot[0]
t_list = np.arange(self.pulse.t_start, self.pulse.t_stop, 1 / sample_rate)
voltages = self.pulse.get_voltage(t_list)
ax.add(t_list, voltages, color="C0")
# Add recreated sine pulse
wf_voltages_main = np.tile(waveform_array, waveform_loops)
wf_voltages_tail = waveform_tail_array
wf_voltages = np.hstack((wf_voltages_main, wf_voltages_tail))
t_stop = self.pulse.t_start + len(wf_voltages) / sample_rate
wf_t_list = self.pulse.t_start + np.arange(len(wf_voltages)) / sample_rate
ax.add(wf_t_list, wf_voltages, marker="o", ms=2, color="C1")
# Add remaining marker values
new_wf_idxs = np.arange(waveform_loops) * len(waveform_array)
ax.plot(
wf_t_list[new_wf_idxs], wf_voltages[new_wf_idxs], "o", color="C2", ms=4
)
wf_tail_idx = len(waveform_array) * waveform_loops
ax.plot(
wf_t_list[wf_tail_idx], wf_voltages[wf_tail_idx], "o", color="C3", ms=4
)
ax.set_ylabel("Amplitude (V)")
ax = plot[1]
ax.add(wf_t_list, wf_voltages - voltages)
ax.set_ylabel("Amplitude error (V)")
ax.set_xlabel("Time (s)")
plot.tight_layout()
return {
"waveform": waveform_array,
"loops": waveform_loops,
"waveform_initial": None,
"waveform_tail": waveform_tail_array,
}
[docs]class FrequencyRampPulseImplementation(PulseImplementation):
pulse_class = FrequencyRampPulse
[docs] def implement(self, sample_rate, plot=False, **kwargs):
if self.pulse.frequency_deviation == 0:
raise RuntimeError(f"{self.pulse} has no frequency_deviation")
# Convert t_start and t_stop to int to get rid of floating point errors
start_idx = int(round(self.pulse.t_start * sample_rate))
stop_idx = int(round(self.pulse.t_stop * sample_rate))
# Add half of 32 points to ensure good rounding during floor division
t_list = np.arange(start_idx, stop_idx + 16)
# Ensure number of points is multiple of 32
t_list = t_list[: len(t_list) // 32 * 32]
t_list = t_list / sample_rate
if len(t_list) < 320:
raise RuntimeError("Waveform has fewer than minimum 320 points")
waveform_array = self.pulse.get_voltage(t_list)
return {
"waveform": waveform_array,
"loops": 1,
"waveform_initial": None,
"waveform_tail": None,
}