"""Experiment data storage.
There are two main use cases for the functionality in this module:
reading/writing data during an experiment session, and reading data once an
experiment is complete (i.e. for analysis). See the :ref:`user guide <storage>`
for information on these use cases/api.jpeg/api.jpeg/api.jpeg.
"""
import os
import h5py
import numpy
import pandas
import zipfile
import shutil
import pickle
import logging
#
# Highest layer. Used by tasks to obtain task readers/writers
#
[docs]class Storage(object):
"""Top-level data storage maintainer.
See the :ref:`user guide <storage>` for more information.
Parameters
----------
root : str, optional
Path to the root of the data storage filestructure. By default, 'data'
is used. If the directory doesn't exist, it is created.
allow_overwrite : bool, optional
Specifies whether or not the storage interface allows you to overwrite
a task's data for a subject if it already exists.
"""
def __init__(self, root='data', allow_overwrite=False):
self.root = root
self.allow_overwrite = allow_overwrite
makedirs(root, exist_ok=True)
self._subject_id = None
@property
def subject_ids(self):
"""Generate subject IDs found in storage sorted in alphabetical order.
Returns
-------
subject_id : str
ID of the subject found.
"""
ls = os.listdir(self.root)
for name in sorted(ls):
path = os.path.join(self.root, name)
if os.path.isdir(path):
yield name
@property
def subject_id(self):
"""The current subject ID.
When setting the subject ID for a new subject (i.e. one that doesn't
exist already), storage for that subject is created.
"""
return self._subject_id
@subject_id.setter
def subject_id(self, val):
makedirs(os.path.join(self.root, val), exist_ok=True)
self._subject_id = val
@property
def task_ids(self):
"""Generate names of tasks found for the current subject.
Note that there may be no tasks found if the `subject_id` has not been
set or if the subject hasn't started any tasks. In this case, nothing
is yielded.
"""
if self.subject_id is None:
return
subj_path = os.path.join(self.root, self.subject_id)
ls = os.listdir(subj_path)
for name in sorted(ls):
path = os.path.join(subj_path, name)
if os.path.isdir(path):
yield name
[docs] def create_task(self, task_id):
"""Create a task for the current subject.
Parameters
----------
task_id : str
The ID of the task to add. The name must not have been used for
another task for the current subject.
Returns
-------
writer : TaskWriter
A new TaskWriter for storing task data.
"""
path = self._task_path(task_id)
try:
makedirs(path)
except OSError:
if self.allow_overwrite:
shutil.rmtree(path)
makedirs(path)
else:
raise ValueError(
"Subject {} has already started \"{}\". Only unique task "
"names are allowed.".format(self.subject_id, task_id))
return TaskWriter(path)
[docs] def require_task(self, task_id):
"""Retrieves a task for the current subject.
Parameters
----------
task_id : str
The ID of the task to look for. The task must have already been run
with the current subject.
Returns
-------
reader : TaskReader
A new TaskReader for working with the existing task data.
"""
if task_id not in self.task_ids:
raise ValueError(
"Subject {} has not started \"{}\" yet. Use `create_task` to "
"create it first.".format(self.subject_id, task_id))
path = self._task_path(task_id)
return TaskReader(path)
[docs] def to_zip(self, outfile):
"""Create a ZIP archive from a data storage hierarchy.
For more information, see :func:`storage_to_zip`.
"""
storage_to_zip(self.root, outfile)
def _task_path(self, task_id):
return os.path.join(self.root, self.subject_id, task_id)
#
# Middle layer. Used by tasks to read/write data.
#
[docs]class TaskWriter(object):
"""The main interface for storing data from a task.
Usually you get a :class:`Taskwriter` from :class:`Storage`, so you don't
normally need to create one yourself.
Parameters
----------
root : str
Path to the task root (e.g. 'data/subject_1/taskname').
Attributes
----------
trials : TrialWriter
:class:`TrialWriter` for storing trial data.
"""
def __init__(self, root):
self.root = root
self.trials = TrialWriter(_trials_path(self.root))
[docs] def write(self, trial):
"""Write trial data.
This must be the last thing done for the current trial. That is, make
sure all arrays have accumulated all data required. This method flushes
trial and array data to files for you.
**Important note**: The trial's arrays are cleared after writing.
Parameters
----------
trial : Trial
Tral data. See :meth:`TrialWriter.write` and :class:`Trial` for
details.
"""
logging.info('saving trial {}:{}\n{}'.format(
trial.attrs['block'], trial.attrs['trial'], str(trial)))
self.trials.write(trial.attrs)
ind = self.trials.df.index[-1]
for name, array in trial.arrays.items():
path = _array_path(self.root, name)
write_hdf5(path, array.data, dataset=str(ind))
array.clear()
[docs] def pickle(self, obj, name):
"""Write a generic object to storage.
This can be useful to persist an object from one task to another, or to
store something that doesn't easily fit into the AxoPy storage model
(trial attributes and arrays). Be cautious, however, as pickles are not
the best way to store things long-term nor securely. See the advice
given here, for example:
http://scikit-learn.org/stable/modules/model_persistence.html
Parameters
----------
obj : object
The object to pickle.
name : str
Name of the pickle to save (no extension).
"""
with open(_pickle_path(self.root, name), 'wb') as f:
pickle.dump(obj, f)
[docs]class TaskReader(object):
"""High-level interface to task storage.
Parameters
----------
root : str
Path to task's root directory. This is the directory specific to a task
which contains a ``trials.csv`` file and HDF5 array files.
"""
def __init__(self, root):
self.root = root
self._trials = None
@property
def trials(self):
"""A Pandas DataFrame representing the trial data."""
if self._trials is None:
self._trials = pandas.read_csv(_trials_path(self.root))
return self._trials
[docs] def iterarray(self, name):
"""Iteratively retrieve an array for each trial.
Parameters
----------
name : str
Name of the array type.
"""
for ind in self.trials.index:
dset = str(ind)
yield read_hdf5(_array_path(self.root, name), dataset=dset)
[docs] def array(self, name):
"""Retrieve an array type's data for all trials."""
return numpy.vstack(self.iterarray(name))
[docs] def pickle(self, name):
"""Load a pickled object from storage.
Parameters
----------
name : str
Name of the pickled object (no extension).
"""
with open(_pickle_path(self.root, name), 'rb') as f:
obj = pickle.load(f)
return obj
#
# Lowest layer. Used by TaskReader/TaskWriter.
#
[docs]class TrialWriter(object):
"""Writes trial data to a CSV file line by line.
Parameters
----------
filepath : str
Path to the file to create.
Attributes
----------
data : dict
Dictionary containing all trial data written so far.
"""
def __init__(self, filepath):
self.filepath = filepath
self.data = {}
[docs] def write(self, data):
"""Add a single row to the trials dataset.
Data is immediately added to the file on disk.
Parameters
----------
data : dict
Data values to add.
"""
for col, val in data.items():
if col not in self.data:
self.data[col] = []
self.data[col].append(val)
self.df = pandas.DataFrame(self.data)
self.df.to_csv(self.filepath, index=False)
#
# Utilities
#
def _trials_path(taskroot):
return os.path.join(taskroot, 'trials.csv')
def _array_path(taskroot, arrayname):
return os.path.join(taskroot, '{}.hdf5'.format(arrayname))
def _pickle_path(taskroot, picklename):
return os.path.join(taskroot, '{}.pkl'.format(picklename))
[docs]def read_hdf5(filepath, dataset='data'):
"""Read the contents of a dataset.
This function assumes the dataset in the HDF5 file exists at the root of
the file (i.e. at '/'). It is primarily for internal usage but you may find
it useful for quickly grabbing an array from an HDF5 file.
Parameters
----------
filepath : str
Path to the file to read from.
dataset : str, optional
Name of the dataset to retrieve. By default, 'data' is used.
Returns
-------
data : ndarray
The data (read into memory) as a NumPy array. The dtype, shape, etc. is
all determined by whatever is in the file.
"""
with h5py.File(filepath, 'r') as f:
return f.get('/{}'.format(dataset))[:]
[docs]def write_hdf5(filepath, data, dataset='data'):
"""Write data to an hdf5 file.
The data is written to a new file with a single dataset called "data" in
the root group. It is primarily for internal usage but you may find it
useful for quickly writing an array to an HDF5 file.
Parameters
----------
filepath : str
Path to the file to be written.
data : ndarray
NumPy array containing the data to write. The dtype, shape, etc. of the
resulting dataset in storage is determined by this array directly.
dataset : str, optional
Name of the dataset to create. Default is 'data'.
"""
with h5py.File(filepath, 'a') as f:
f.create_dataset(dataset, data=data)
[docs]def storage_to_zip(path, outfile=None):
"""Create a ZIP archive from a data storage hierarchy.
The contents of the data storage hierarchy are all placed in the archive,
with the top-level folder in the archive being the data storage root folder
itself. That is, all paths within the ZIP file are relative to the dataset
root folder.
Parameters
----------
path : str
Path to the root of the dataset.
outfile : str, optional
Name of the ZIP file to create. If not specified, the file is created
in the same directory as the data root with the same name as the
dataset root directory (with ".zip" added).
Returns
-------
outfile : str
The name of the ZIP file created.
"""
datapath, datadir = os.path.split(path)
if outfile is None:
# absolute path to parent of data root + dataset name + .zip
outfile = os.path.join(datapath, datadir + '.zip')
with zipfile.ZipFile(outfile, 'w') as zipf:
for root, dirs, files in os.walk(path):
for f in files:
# write as *relative* path from data root
zipf.write(os.path.join(root, f),
arcname=os.path.join(datadir, f))
return outfile
[docs]def makedirs(path, exist_ok=False):
"""Recursively create directories.
This is needed for Python versions earlier than 3.2, otherwise
``os.makedirs(path, exist_ok=True)`` would suffice.
Parameters
----------
path : str
Path to directory to create.
exist_ok : bool, optional
If `exist_ok` is False (default), an exception is raised. Set to True
if it is acceptable that the directory already exists.
"""
try:
os.makedirs(path)
except OSError:
if not exist_ok:
raise