Source code for silq.tools.general_tools

from typing import Any, List, Union
import operator
from functools import wraps
import numpy as np
import logging
import logging.handlers
import os
import time

from qcodes import config
from qcodes.config.config import DotDict
from qcodes.instrument.parameter import Parameter

__all__ = ['execfile', 'is_between', 'get_truth', 'get_memory_usage',
           'SettingsClass', 'UpdateDotDict',
           'attribute_from_config', 'clear_single_settings', 'JSONEncoder',
           'JSONListEncoder', 'run_code', 'get_exponent', 'get_first_digit',
           'ParallelTimedRotatingFileHandler', 'convert_setpoints',
           'Singleton', 'property_ignore_setter', 'freq_to_str']

code_labels = {}
properties_config = config['user'].get('properties', {})


[docs]def execfile(filename: str, globals: dict = None, locals: dict = None): """Execute code in .py file, adding new variables to globals/locals. Args: globals: Global variables dictionary. If not specified, uses globals in first frame. locals: Local variables dictionary. If not specified, uses locals in first frame. """ if globals is None: globals = sys._getframe(1).f_globals if locals is None: locals = sys._getframe(1).f_locals with open(filename, "r") as fh: exec(fh.read()+"\n", globals, locals)
"""Comparison operators, used for `get_truth`""" ops = {'>': operator.gt, '<': operator.lt, '>=': operator.ge, '<=': operator.le, '==': operator.eq}
[docs]def is_between(val: float, min_val: float = None, max_val: float = None, tolerance: float = 1e-13): """ Check if value is between min and max, taking machine precision into account""" if min_val is not None and np.min(val) < min_val - tolerance: return False elif max_val is not None and np.max(val) > max_val + tolerance: return False else: return True
[docs]def get_truth(test_val: Any, target_val: Any, relation: str = '==') -> bool: """Tests if the ``test_val`` satisfies the ``relation`` to ``target_val``. Args: test_val: lhs of relation with target_val. target_val: rhs of relation with test_val relation: relation between test_val and target_val. Default is '==' Can be: '>', '<', '>=', '<=', '==' Returns: True if relation holds """ return ops[relation](test_val, target_val)
[docs]def get_memory_usage(): """return the memory usage in MB""" import os import psutil process = psutil.Process(os.getpid()) mem = process.memory_info()[0] / float(2 ** 20) return mem
[docs]class SettingsClass: """ Class used to temporarily override attributes. This can be done through obj.single_settings() and obj.temporary_settings(). Any settings specified here will override the actual values of the object until settings are cleared Settings can be cleared in two ways: Using the decorator @clear_single_settings on a method, which will delete the single_settings. Using obj.clear_settings(), which will clear both the single settings and temporary settings. Furthermore, attribute_names can be added to ignore_if_set. If the object's value of that attribute is not equal to None, [], or (), it cannot be overridden through single or temporary settings. Note that for all attributes, they must be set in the object before they can be overridden by single/temporary settings """ _single_settings = {} _temporary_settings = {} _ignore_if_set = {} def __init__(self, ignore_if_set=[], **kwargs): self._temporary_settings = {} self._single_settings = {} self._ignore_if_set = ignore_if_set def __getattribute__(self, item): """ Called when requesting an attribute. The attribute is successively searched in the following places: 1. single_settings 2. temporary_settings 3. self.{attr} 4. If self.{attr} is explicitly None, it will check properties_config Some attributes (such as 'mode') are not allowed Args: item: Attribute to be retrieved Returns: """ if item in ['_temporary_settings', '_single_settings', '_ignore_if_set', '__setstate__', '__dict__']: return object.__getattribute__(self, item) elif item in self._ignore_if_set and \ object.__getattribute__(self, item) not in \ (None, [], ()): return object.__getattribute__(self, item) elif item in self._single_settings: return self._single_settings[item] elif item in self._temporary_settings: return self._temporary_settings[item] else: value = object.__getattribute__(self, item) if value is not None: return value
[docs] def settings(self, **kwargs): """ Sets up the meta properties of a measurement parameter """ for item, value in kwargs.items(): if hasattr(self, item): setattr(self, item, value) else: raise ValueError('Setting {} not found'.format(item))
[docs] def temporary_settings(self, append=True, **kwargs): """ Sets up the meta properties of a measurement parameter """ if not kwargs: return self._temporary_settings if not append: self._temporary_settings.clear() for item, value in kwargs.items(): if hasattr(self, item): self._temporary_settings[item] = value else: raise ValueError('Setting {} not found'.format(item)) return self
[docs] def single_settings(self, **kwargs): """ Sets up the meta properties of a measurement parameter """ if not kwargs: return self._single_settings self._single_settings.clear() for item, value in kwargs.items(): if value is None: continue elif hasattr(self, item): self._single_settings[item] = value else: raise ValueError('Setting {} not found'.format(item)) return self
[docs] def clear_settings(self): """ Clears temporary and single settings """ self._temporary_settings.clear() self._single_settings.clear()
[docs]class UpdateDotDict(DotDict): """DotDict that can evaluate function upon being updated. Args: update_function: Function that is called every time a value changes. **kwargs: Unused kwargs. """ exclude_from_dict = ['update_function', 'exclude_from_dict'] def __init__(self, update_function=None, **kwargs): self.update_function = update_function super().__init__() for key, val in kwargs.items(): DotDict.__setitem__(self, key, val) def __setitem__(self, key, value): super().__setitem__(key, value) if self.update_function is not None: self.update_function()
[docs]def attribute_from_config(item: str, config: dict): """Check if attribute exists is an item in the config Args: item: key in config to check config: config to check Returns: Value in config. If dict-like, will be converted to dict. Raises: AttributeError: item not found. """ # check if {item}_{self.mode} is in properties_config # if mode is None, mode_str='', in which case it checks for {item} if item in config: # Check if item is in properties config value = config[item] else: raise AttributeError if type(value) is DotDict: value = dict(value) return value
[docs]def clear_single_settings(f): """`SettingsClass` wrapper to clear single_settings after running function. Args: f: function after which to clear ``single_settings`` attribute. """ @wraps(f) def clear_single_settings_decorator(self, *args, **kwargs): try: output = f(self, *args, **kwargs) finally: self._single_settings.clear() return output return clear_single_settings_decorator
[docs]def JSONListEncoder(l): return_l = [] for element in l: if hasattr(element, '_JSONEncoder'): return_l.append(element._JSONEncoder()) elif isinstance(element, (list, tuple)): return_l.append(JSONListEncoder(element)) else: return_l.append(repr(element)) return return_l
[docs]def JSONEncoder(obj, ignore_attrs: List[str] = [], ignore_vals: List[str]=[]): """Encode object as dict for JSON encoding. Args: ignore_attrs: Attributes that should not be included. ignore_vals: Vals to be ignored. Returns: dict representation of object. Notes: # If one of its attributes is an object containing method ``_JSONEncoder``, the method is called to get its JSON representation. # Lists are encoded using `JSONListEncoder`, which has an additional check for the method ``_JSONEncoder``. Todo: Ensure parameters etc. are encoded using their snapshot. """ return_dict = {} for attr, val in vars(obj).items(): if attr in ignore_attrs: continue if hasattr(val, '_JSONEncoder'): val = val._JSONEncoder() elif isinstance(val, (list, tuple)): val = JSONListEncoder(val) if isinstance(val, Parameter): # TODO: is this right? return_dict[attr] = val.name elif val not in ignore_vals: return_dict[attr] = val return_dict['class'] = obj.__class__.__name__ return return_dict
[docs]def run_code(label, **kwargs): """Creates cell to run code from global variable code_labels Code labels is a dictionary in which each key has a corresponding value that is a string representation of executable code. Note that the module variable code_labels must be set to equal the global variable code_labels. Args: label: label referring to code in dict code_labels **kwargs: Optional kwargs that are replaced in code i.e. for a given kwarg {var}=5, a line matching: "{var} = {val}" will be replaced to "{var} = 5" (Note whitespaces) Returns: Creates cell at bottom of notebook and executes it Note: This function is not used anymore, though it should still work """ from silq.tools.notebook_tools import create_cell code = code_labels[label] for var, val in kwargs.items(): pattern = r'{} = .+'.format(var) repl = r'{} = {}'.format(var, val) code = re.sub(pattern, repl, code, count=1) create_cell(code, 'bottom', execute=True)
[docs]def get_exponent(val: float): """Get decimal exponent Example: >>> get_exponent(0.032) -2 Args: val: Val of which to get exponent Returns: Exponent """ if val <= 0: raise SyntaxError(f'Val {val} must be larger than zero') else: return int(np.floor(np.log10(val)))
[docs]def get_first_digit(val: float): """Get first nonzero digit. Example: >>> get_first_digit(0.032) 3 Args: val: Val for which to get first nonzero digit Returns: First nonero digit. """ first_digit = int(np.floor(val * 10 ** -get_exponent(val))) return first_digit
[docs]class ParallelTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler): """Logging handler that creates a new log file every day. Files are stored with .log suffix, and dates are included in filename from the beginning. Also allows multiple processes to access same log. Note: - From: https://stackoverflow.com/questions/24649789/how-to-force-a -rotating-name-with-pythons-timedrotatingfilehandler - Essentially a TimedRotatingFileHandler with some modifications. - Haven't gone through the code, but it does the trick. """ def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, postfix = ".log"): self.origFileName = filename self.when = when.upper() self.interval = interval self.backupCount = backupCount self.utc = utc self.postfix = postfix # Seems to be needed for self.computeRollover self.atTime = None if self.when == 'S': self.interval = 1 # one second self.suffix = "%Y-%m-%d_%H-%M-%S" self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}$" elif self.when == 'M': self.interval = 60 # one minute self.suffix = "%Y-%m-%d_%H-%M" self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}$" elif self.when == 'H': self.interval = 60 * 60 # one hour self.suffix = "%Y-%m-%d_%H" self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}$" elif self.when == 'D' or self.when == 'MIDNIGHT': self.interval = 60 * 60 * 24 # one day self.suffix = "%Y-%m-%d" self.extMatch = r"^\d{4}-\d{2}-\d{2}$" elif self.when.startswith('W'): self.interval = 60 * 60 * 24 * 7 # one week if len(self.when) != 2: raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when) if self.when[1] < '0' or self.when[1] > '6': raise ValueError("Invalid day specified for weekly rollover: %s" % self.when) self.dayOfWeek = int(self.when[1]) self.suffix = "%Y-%m-%d" self.extMatch = r"^\d{4}-\d{2}-\d{2}$" else: raise ValueError("Invalid rollover interval specified: %s" % self.when) currenttime = int(time.time()) logging.handlers.BaseRotatingHandler.__init__(self, self.calculateFileName(currenttime), 'a', encoding, delay) self.extMatch = re.compile(self.extMatch) self.interval = self.interval * interval # multiply by units requested self.rolloverAt = self.computeRollover(currenttime)
[docs] def calculateFileName(self, currenttime): if self.utc: timeTuple = time.gmtime(currenttime) else: timeTuple = time.localtime(currenttime) return self.origFileName + "." + time.strftime(self.suffix, timeTuple) + self.postfix
[docs] def getFilesToDelete(self, newFileName): dirName, fName = os.path.split(self.origFileName) dName, newFileName = os.path.split(newFileName) fileNames = os.listdir(dirName) result = [] prefix = fName + "." postfix = self.postfix prelen = len(prefix) postlen = len(postfix) for fileName in fileNames: if fileName[:prelen] == prefix \ and fileName[-postlen:] == postfix \ and len(fileName)-postlen > prelen \ and fileName != newFileName: suffix = fileName[prelen:len(fileName)-postlen] if self.extMatch.match(suffix): result.append(os.path.join(dirName, fileName)) result.sort() if len(result) < self.backupCount: result = [] else: result = result[:len(result) - self.backupCount] return result
[docs] def doRollover(self): if self.stream: self.stream.close() self.stream = None currentTime = self.rolloverAt newFileName = self.calculateFileName(currentTime) newBaseFileName = os.path.abspath(newFileName) self.baseFilename = newBaseFileName self.mode = 'a' self.stream = self._open() if self.backupCount > 0: for s in self.getFilesToDelete(newFileName): try: os.remove(s) except: pass newRolloverAt = self.computeRollover(currentTime) while newRolloverAt <= currentTime: newRolloverAt = newRolloverAt + self.interval #If DST changes and midnight or weekly rollover, adjust for this. if (self.when == 'MIDNIGHT' or self.when.startswith('W')) \ and not self.utc: dstNow = time.localtime(currentTime)[-1] dstAtRollover = time.localtime(newRolloverAt)[-1] if dstNow != dstAtRollover: if not dstNow: # DST kicks in before next rollover, need to deduct an hour newRolloverAt = newRolloverAt - 3600 else: # DST bows out before next rollover, need to add an hour newRolloverAt = newRolloverAt + 3600 self.rolloverAt = newRolloverAt
[docs]def convert_setpoints(*args): """Convert setpoints to tuples, supporting multidimensional setpoints. Temporary solution to make setpoints work (see issue #627). Currently, setpoints need to be tuples (arrays etc. give issues). Further, the second setpoint array needs to be 2D etc. Args: *args: 1D setpoint arrays. each successive setpoints array gains an extra dimension Returns: Setpoint arrays converted to tuples. """ if not args: return tuple() else: first_arg = tuple(args[0]) remaining_args = convert_setpoints(*args[1:]) if remaining_args: remaining_args = tuple((arg,) * len(first_arg) for arg in remaining_args) return (first_arg, ) + remaining_args
[docs]class Singleton(type): """Meta-class for classes that can only have a single instance. If a second instance is created, it will instead return the already-existing instance. """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls]
[docs]class property_ignore_setter(object): """Decorator similar to @property that ignores setter The setter shouldn't be defined, and any setting of attribute is ignored. """ def __init__(self, func, doc=None): self.func = func self.__doc__ = doc if doc is not None else func.__doc__ def __get__(self, obj, value): return self.func(obj) def __set__(self, obj, value): pass
[docs]def freq_to_str(frequency, fmt='{:.15g}'): """Formats frequency, using right magnitude. Note: This will be superseded later on when all attributes are converted to parameters. """ if abs(frequency) > 1e9: frequency_string = fmt.format(frequency/1e9) + ' GHz' elif abs(frequency) > 1e6: frequency_string = fmt.format(frequency/1e6) + ' MHz' elif abs(frequency) > 1e3: frequency_string = fmt.format(frequency/1e3) + ' kHz' else: frequency_string = fmt.format(frequency) + ' Hz' return frequency_string
import re, sys, types, inspect def formatAllArgs(args, kwds): """ makes a nice string representation of all the arguments """ allargs = [] for item in args: allargs.append('%s' % str(item)) for key, item in kwds.items(): allargs.append('%s=%s' % (key, str(item))) formattedArgs = ', '.join(allargs) if len(formattedArgs) > 150: return formattedArgs[:146] + " ..." return formattedArgs def logfunction(theFunction, log, displayName=None): """Decorator to log whenever a function is called including arguments""" if not displayName: displayName = theFunction.__name__ def _wrapper(*args, **kwds): argstr = formatAllArgs(args, kwds) # Log the entry into the function print("%s(%s) " % (displayName, argstr), file=log) log.flush() returnval = theFunction(*args, **kwds) # Log return ##indentlog("return: %s"% str(returnval) return returnval return _wrapper def logmethod(theMethod, log, displayName=None): """Decorator to log whenever a method is called including arguments""" def _methodWrapper(self, *args, **kwds): "Use this one for instance or class methods" argstr = formatAllArgs(args, kwds) print(f'{displayName}.{theMethod.__name__}({argstr})') log.flush() returnval = theMethod(self, *args, **kwds) return returnval return _methodWrapper def logclass(cls, methodsAsFunctions=False, log=sys.stdout, logMatch=".*", logNotMatch="asdfnomatch"): """ A class "decorator". But python doesn't support decorator syntax for classes, so do it manually:: class C(object): ... C = logclass(C) @param methodsAsFunctions: set to True if you always want methodname first in the display. Probably breaks if you're using class/staticmethods? """ allow = lambda s: re.match(logMatch, s) and not re.match(logNotMatch, s) and \ s not in ('__str__', '__repr__') namesToCheck = cls.__dict__.keys() for name in namesToCheck: if not allow(name): continue # unbound methods show up as mere functions in the values of # cls.__dict__,so we have to go through getattr value = getattr(cls, name) if methodsAsFunctions and callable(value): setattr(cls, name, logfunction(value, log=log)) elif isinstance(value, types.FunctionType) and hasattr(cls, value.__name__): setattr(cls, name, logmethod(value, log=log, displayName=cls.__name__)) elif isinstance(value, types.FunctionType): w = logfunction(value, log = log, displayName="%s.%s" % ( cls.__name__, value.__name__)) setattr(cls, name, staticmethod(w)) elif inspect.ismethod(value) and value.__self__ is cls: setattr(cls, name, classmethod( logmethod(value.__func__, log = log, displayName=cls.__name__))) return cls def find_approximate_divisor( N: int, max_cycles: int = 65535, points_multiple: int = 1, min_points: int = 15, max_points: int = 6000, max_remaining_points: int = 1000, min_remaining_points: int = 0, ) -> Union[dict, None]: """Find an approximate divisor for a number The divisor (points) is chosen such that points * cycles <= N, with cycles as close as possible to max_cycles, with a low number of remaining points Args: N: Number for which to find a divisor max_cycles: Maximum number of cycles (for points * cycles) points_multiple: Optional value that points must be a multiple of min_points: Minimum number of waveform points. max_points: Maximum number of waveform points. max_remaining_points: Maximum number of remaining points. Set to 0 to find an exact divisor min_remaining_points: Minimum number of remaining points when not zero Returns: If successful, a dict containing {'points', 'cycles', 'remaining_points'} If unsuccessful, None """ # Minimum points can't be less than N/max_cycles min_points = max(int(np.ceil(N / max_cycles)), min_points) # Minimum points must be a multiple of points_multiple min_points += (points_multiple - min_points) % points_multiple for points in range(min_points, max_points, points_multiple): cycles = N // points remaining_points = N - points * cycles # Increase remaining_points if there are remaining points and they # are less than min_remaining_points if remaining_points and remaining_points < min_remaining_points: subtract_cycles = np.ceil( (min_remaining_points - remaining_points) / points ) remaining_points += subtract_cycles * points if cycles - subtract_cycles < 1: # Remaining points cannot be incorporated continue cycles -= subtract_cycles if ( min_points <= points <= max_points and remaining_points <= max_remaining_points ): return { "points": int(points), "cycles": int(cycles), "remaining_points": int(remaining_points), } else: return None