Source code for qcodes.data.hdf5_format

import numpy as np
import logging
import h5py
import os
import json

from .data_array import DataArray
from .format import Formatter


class HDF5Format(Formatter):
    """
    HDF5 formatter for saving qcodes datasets.

    Capable of storing (write) and recovering (read) qcodes datasets.
    """

    def close_file(self, data_set):
        """
        Closes the hdf5 file open in the dataset.
        """
        if hasattr(data_set, '_h5_base_group'):
            data_set._h5_base_group.close()
            # Removes reference to closed file
            del data_set._h5_base_group
        else:
            logging.warning(
                'Cannot close file, data_set has no open hdf5 file')

    def _create_file(self, filepath):
        """
        creates a hdf5 file (data_object) at a location specifed by
        filepath
        """
        folder, _filename = os.path.split(filepath)
        if not os.path.isdir(folder):
            os.makedirs(folder)
        file = h5py.File(filepath, 'a')
        return file

    def _open_file(self, data_set, location=None):
        if location is None:
            location = data_set.location
        filepath = self._filepath_from_location(location,
                                                io_manager=data_set.io)
        data_set._h5_base_group = h5py.File(filepath, 'r+')

    def read(self, data_set, location=None):
        """
        Reads an hdf5 file specified by location into a data_set object.
        If no data_set is provided will creata an empty data_set to read into.
        If no location is provided will use the location specified in the
        dataset.
        """
        self._open_file(data_set, location)

        for i, array_id in enumerate(
                data_set._h5_base_group['Data Arrays'].keys()):
            # Decoding string is needed because of h5py/issues/379
            name = array_id  # will be overwritten if not in file
            dat_arr = data_set._h5_base_group['Data Arrays'][array_id]

            # write ensures these attributes always exist
            name = dat_arr.attrs['name'].decode()
            label = dat_arr.attrs['label'].decode()

            # get unit from units if no unit field, for backward compatibility
            if 'unit' in dat_arr.attrs:
                unit = dat_arr.attrs['unit'].decode()
            else:
                unit = dat_arr.attrs['units'].decode()

            is_setpoint = str_to_bool(dat_arr.attrs['is_setpoint'].decode())
            # if not is_setpoint:
            set_arrays = dat_arr.attrs['set_arrays']
            set_arrays = [s.decode() for s in set_arrays]
            # else:
            #     set_arrays = ()
            vals = dat_arr.value[:, 0]
            if 'shape' in dat_arr.attrs.keys():
                vals = vals.reshape(dat_arr.attrs['shape'])
            if array_id not in data_set.arrays.keys():  # create new array
                d_array = DataArray(
                    name=name, array_id=array_id, label=label, parameter=None,
                    unit=unit,
                    is_setpoint=is_setpoint, set_arrays=(),
                    preset_data=vals)
                data_set.add_array(d_array)
            else:  # update existing array with extracted values
                d_array = data_set.arrays[array_id]
                d_array.name = name
                d_array.label = label
                d_array.unit = unit
                d_array.is_setpoint = is_setpoint
                d_array.ndarray = vals
                d_array.shape = dat_arr.attrs['shape']
            # needed because I cannot add set_arrays at this point
            data_set.arrays[array_id]._sa_array_ids = set_arrays

        # Add copy/ref of setarrays (not array id only)
        # Note, this is not pretty but a result of how the dataset works
        for array_id, d_array in data_set.arrays.items():
            for sa_id in d_array._sa_array_ids:
                d_array.set_arrays += (data_set.arrays[sa_id], )
        data_set = self.read_metadata(data_set)
        return data_set

    def _filepath_from_location(self, location, io_manager):
        filename = os.path.split(location)[-1]
        filepath = io_manager.to_path(location +
                                      '/{}.hdf5'.format(filename))
        return filepath

    def _create_data_object(self, data_set, io_manager=None,
                            location=None):
                # Create the file if it is not there yet
        if io_manager is None:
            io_manager = data_set.io
        if location is None:
            location = data_set.location
        filepath = self._filepath_from_location(location, io_manager)
        # note that this creates an hdf5 file in a folder with the same
        # name. This is useful for saving e.g. images in the same folder
        # I think this is a sane default (MAR).
        data_set._h5_base_group = self._create_file(filepath)
        return data_set._h5_base_group

    def write(self, data_set, io_manager=None, location=None,
              force_write=False, flush=True, write_metadata=True,
              only_complete=False):
        """
        Writes a data_set to an hdf5 file.

        Args:
            data_set: qcodes data_set to write to hdf5 file
            io_manager: io_manger used for providing path
            location: location can be used to specify custom location
            force_write (bool): if True creates a new file to write to
            flush (bool) : whether to flush after writing, can be disabled
                for testing or performance reasons
            only_complete (bool): Not used by this formatter, but must be
                included in the call signature to avoid an "unexpected
                keyword argument" TypeError.

        N.B. It is recommended to close the file after writing, this can be
        done by calling ``HDF5Format.close_file(data_set)`` or
        ``data_set.finalize()`` if the data_set formatter is set to an
        hdf5 formatter.  Note that this is not required if the dataset
        is created from a Loop as this includes a data_set.finalize()
        statement.

        The write function consists of two parts, writing DataArrays and
        writing metadata.

            - The main part of write consists of writing and resizing arrays,
              the resizing providing support for incremental writes.

            - write_metadata is called at the end of write and dumps a
              dictionary to an hdf5 file. If there already is metadata it will
              delete this and overwrite it with current metadata.

        """
        if not hasattr(data_set, '_h5_base_group') or force_write:
            data_set._h5_base_group = self._create_data_object(
                data_set, io_manager, location)

        data_name = 'Data Arrays'

        if data_name not in data_set._h5_base_group.keys():
            arr_group = data_set._h5_base_group.create_group(data_name)
        else:
            arr_group = data_set._h5_base_group[data_name]

        for array_id in data_set.arrays.keys():
            if array_id not in arr_group.keys() or force_write:
                self._create_dataarray_dset(array=data_set.arrays[array_id],
                                            group=arr_group)
            dset = arr_group[array_id]
            # Resize the dataset and add the new values

            # dataset refers to the hdf5 dataset here
            datasetshape = dset.shape
            old_dlen = datasetshape[0]
            x = data_set.arrays[array_id]
            new_dlen = len(x[~np.isnan(x)])
            new_datasetshape = (new_dlen,
                                datasetshape[1])
            dset.resize(new_datasetshape)
            new_data_shape = (new_dlen - old_dlen, datasetshape[1])
            dset[old_dlen:new_dlen] = x.flatten()[old_dlen:new_dlen].reshape(
                new_data_shape)
            # allow resizing extracted data, here so it gets written for
            # incremental writes aswell
            dset.attrs['shape'] = x.shape
        if write_metadata:
            self.write_metadata(
                data_set, io_manager=io_manager, location=location)

        # flush ensures buffers are written to disk
        # (useful for ensuring openable by other files)
        if flush:
            data_set._h5_base_group.file.flush()

    def _create_dataarray_dset(self, array, group):
        '''
        input arguments
        array:  Dataset data array
        group:  group in the hdf5 file where the dset will be created

        creates a hdf5 datasaset that represents the data array.
        '''
        # Check for empty meta attributes, use array_id if name and/or label
        # is not specified
        if array.label is not None:
            label = array.label
        else:
            label = array.array_id

        if array.name is not None:
            name = array.name
        else:
            name = array.array_id

        # Create the hdf5 dataset
        dset = group.create_dataset(
            array.array_id, (0, 1),
            maxshape=(None, 1))
        dset.attrs['label'] = _encode_to_utf8(str(label))
        dset.attrs['name'] = _encode_to_utf8(str(name))
        dset.attrs['unit'] = _encode_to_utf8(str(array.unit or ''))
        dset.attrs['is_setpoint'] = _encode_to_utf8(str(array.is_setpoint))

        set_arrays = []
        # list will remain empty if array does not have set_array
        for i in range(len(array.set_arrays)):
            set_arrays += [_encode_to_utf8(
                str(array.set_arrays[i].array_id))]
        dset.attrs['set_arrays'] = set_arrays

        return dset

    def write_metadata(self, data_set, io_manager=None, location=None, read_first=True):
        """
        Writes metadata of dataset to file using write_dict_to_hdf5 method

        Note that io and location are arguments that are only here because
        of backwards compatibility with the loop.
        This formatter uses io and location as specified for the main
        dataset.
        The read_first argument is ignored.
        """
        if not hasattr(data_set, '_h5_base_group'):
            # added here because loop writes metadata before data itself
            data_set._h5_base_group = self._create_data_object(data_set)
        if 'metadata' in data_set._h5_base_group.keys():
            del data_set._h5_base_group['metadata']
        metadata_group = data_set._h5_base_group.create_group('metadata')
        self.write_dict_to_hdf5(data_set.metadata, metadata_group)

    @staticmethod
    def write_dict_to_hdf5(data_dict, entry_point):
        for key, item in data_dict.items():
            if isinstance(item, (str, bool, tuple, float, int)):
                entry_point.attrs[key] = item
            elif isinstance(item, np.ndarray):
                entry_point.create_dataset(key, data=item)
            elif item is None:
                # as h5py does not support saving None as attribute
                # I create special string, note that this can create
                # unexpected behaviour if someone saves a string with this name
                entry_point.attrs[key] = 'NoneType:__None__'
            elif isinstance(item, dict):
                entry_point.create_group(key)
                HDF5Format.write_dict_to_hdf5(data_dict=item,
                                   entry_point=entry_point[key])
            elif isinstance(item, list):
                if len(item) > 0:
                    elt_type = type(item[0])

                    # If elt_type is either int or float, allow both types
                    if elt_type in [int, float]:
                        elt_type = (int, float)

                    if all(isinstance(x, elt_type) for x in item):
                        if isinstance(item[0], (int, float,
                                                np.int32, np.int64)):

                            entry_point.create_dataset(key,
                                                       data=np.array(item))
                            entry_point[key].attrs['list_type'] = 'array'
                        elif isinstance(item[0], str):
                            dt = h5py.special_dtype(vlen=str)
                            data = np.array(item)
                            data = data.reshape((-1, 1))
                            ds = entry_point.create_dataset(
                                key, (len(data), 1), dtype=dt)
                            ds[:] = data
                        elif isinstance(item[0], dict):
                            entry_point.create_group(key)
                            group_attrs = entry_point[key].attrs
                            group_attrs['list_type'] = 'dict'
                            base_list_key = 'list_idx_{}'
                            group_attrs['base_list_key'] = base_list_key
                            group_attrs['list_length'] = len(item)
                            for i, list_item in enumerate(item):
                                list_item_grp = entry_point[key].create_group(
                                    base_list_key.format(i))
                                HDF5Format.write_dict_to_hdf5(
                                    data_dict=list_item,
                                    entry_point=list_item_grp)
                        else:
                            logging.warning(
                                'List of type "{}" for "{}":"{}" not '
                                'supported, storing as string'.format(
                                    elt_type, key, item))
                            entry_point.attrs[key] = str(item)
                    else:
                        logging.warning(
                            'List of mixed type for "{}":"{}" not supported, '
                            'storing as string'.format(type(item), key, item))
                        entry_point.attrs[key] = str(item)
                else:
                    # as h5py does not support saving None as attribute
                    entry_point.attrs[key] = 'NoneType:__emptylist__'

            else:
                logging.warning(
                    'Type "{}" for "{}":"{}" not supported, '
                    'storing as string'.format(type(item), key, item))
                entry_point.attrs[key] = str(item)

    def read_metadata(self, data_set):
        """
        Reads in the metadata, this is also called at the end of a read
        statement so there should be no need to call this explicitly.
        """
        # checks if there is an open file in the dataset as load_data does
        # reading of metadata before reading the complete dataset
        if not hasattr(self, '_h5_base_group'):
            self._open_file(data_set)
        if 'metadata' in data_set._h5_base_group.keys():
            metadata_group = data_set._h5_base_group['metadata']
            self.read_dict_from_hdf5(data_set.metadata, metadata_group)
        return data_set

    @staticmethod
    def read_dict_from_hdf5(data_dict, h5_group):
        if 'list_type' not in h5_group.attrs:
            for key, item in h5_group.items():
                if isinstance(item, h5py.Group):
                    data_dict[key] = {}
                    data_dict[key] = HDF5Format.read_dict_from_hdf5(
                        data_dict[key], item)
                else:  # item either a group or a dataset
                    if 'list_type' not in item.attrs:
                        data_dict[key] = item.value
                    else:
                        data_dict[key] = list(item.value)
            for key, item in h5_group.attrs.items():
                if type(item) is str:
                    # Extracts "None" as an exception as h5py does not support
                    # storing None, nested if statement to avoid elementwise
                    # comparison warning
                    if item == 'NoneType:__None__':
                        item = None
                    elif item == 'NoneType:__emptylist__':
                        item = []
                data_dict[key] = item
        elif h5_group.attrs['list_type'] == 'dict':
            # preallocate empty list
            list_to_be_filled = [None] * h5_group.attrs['list_length']
            base_list_key = h5_group.attrs['base_list_key']
            for i in range(h5_group.attrs['list_length']):
                list_to_be_filled[i] = {}
                HDF5Format.read_dict_from_hdf5(
                    data_dict=list_to_be_filled[i],
                    h5_group=h5_group[base_list_key.format(i)])

            # THe error is here!, extract correctly but not adding to
            # data dict correctly
            data_dict = list_to_be_filled
        else:
            raise NotImplementedError('cannot read "list_type":"{}"'.format(
                h5_group.attrs['list_type']))
        return data_dict


def _encode_to_utf8(s):
    """
    Required because h5py does not support python3 strings
    converts byte type to string
    """
    return s.encode('utf-8')


def str_to_bool(s):
    if s == 'True':
        return True
    elif s == 'False':
        return False
    else:
        raise ValueError("Cannot covert {} to a bool".format(s))

from qcodes.utils.helpers import deep_update, NumpyJSONEncoder


class HDF5FormatMetadata(HDF5Format):

    metadata_file = 'snapshot.json'

    def write_metadata(self, data_set, io_manager=None, location=None, read_first=False):
        """
        Write all metadata in this DataSet to storage.

        Args:
            data_set (DataSet): the data we're storing

            io_manager (io_manager): the base location to write to

            location (str): the file location within io_manager

            read_first (bool, optional): read previously saved metadata before
                writing? The current metadata will still be the used if
                there are changes, but if the saved metadata has information
                not present in the current metadata, it will be retained.
                Default True.
        """

        # this statement is here to make the linter happy
        if io_manager is None or location is None:
            raise Exception('please set io_manager and location arguments ')

        if read_first:
            # In case the saved file has more metadata than we have here,
            # read it in first. But any changes to the in-memory copy should
            # override the saved file data.
            memory_metadata = data_set.metadata
            data_set.metadata = {}
            self.read_metadata(data_set)
            deep_update(data_set.metadata, memory_metadata)

        fn = io_manager.join(location, self.metadata_file)
        with io_manager.open(fn, 'w', encoding='utf8') as snap_file:
            json.dump(data_set.metadata, snap_file, sort_keys=True,
                      indent=4, ensure_ascii=False, cls=NumpyJSONEncoder)

    def read_metadata(self, data_set):
        io_manager = data_set.io
        location = data_set.location
        fn = io_manager.join(location, self.metadata_file)
        if io_manager.list(fn):
            with io_manager.open(fn, 'r') as snap_file:
                metadata = json.load(snap_file, encoding='utf8')
            data_set.metadata.update(metadata)