"""Experiment workflow and design."""
from axopy import util
from axopy.storage import Storage
from axopy.daq import DaqStream
from axopy.messaging import Transmitter, TransmitterBase
from axopy.gui.main import _MainWindow, _SessionConfig
from axopy.gui.canvas import Canvas, Text
[docs]class Experiment(TransmitterBase):
"""Experiment workflow manager.
Presents the researcher with a prompt for entering session details and then
presents the appropriate tasks.
Parameters
----------
daq : object, optional
A data acquisition device that follows the AxoPy DAQ protocol. See
:mod:`axopy.daq`. For mutliple devices, a dictionary, list or tuple
is expected.
data : str, optional
Path to the data. The directory is created for you if it doesn't exist.
subject : str, optional
The subject ID to use. If not specified, a configuration screen is
shown before running the tasks so you can enter it there. This is
mostly for experiment writing (to avoid the extra configuration step).
allow_overwrite : bool, optional
If ``True``, overwrite protection in :class:`Storage` is disabled. This
is mostly for experiment writing purposes.
"""
key_pressed = Transmitter(str)
def __init__(self, daq=None, data='data', subject=None,
allow_overwrite=False):
super(Experiment, self).__init__()
self.daq = daq
self.storage = Storage(data, allow_overwrite=allow_overwrite)
self._receive_keys = False
self.subject = subject
# main screen
self.screen = _MainWindow()
# Prepare daqstream(s)
self._prepare_daqstream()
[docs] def run(self, *tasks):
"""Run the experimental tasks."""
if self.subject is None:
self.configure()
self.screen.key_pressed.connect(self.key_press)
# screen to show "Ready" between tasks
self.confirm_screen = Canvas(draw_border=False)
self.confirm_screen.add_item(Text("Ready (enter to start)"))
self.storage.subject_id = self.subject
self.tasks = tasks
self.current_task = None
self.task_iter = iter(self.tasks)
self._task_finished()
self.screen.run()
@property
def status(self):
return "subject: {} | task: {}".format(
self.subject, self.current_task.__class__.__name__)
def _run_task(self):
self._receive_keys = False
# wait for task to finish
self.current_task.finished.connect(self._task_finished)
# forward key presses to the task
self.key_pressed.connect(self.current_task.key_press)
self.screen.set_status(self.status)
# add a task view
con = self.screen.new_container()
self.current_task.prepare_graphics(con)
self.current_task.prepare_daq(self.daqstream)
self.current_task.prepare_storage(self.storage)
self.current_task.run()
def _task_finished(self):
if self.current_task is not None:
self.current_task.disconnect_all()
self.current_task.finished.disconnect(self._task_finished)
self.key_pressed.disconnect(self.current_task.key_press)
try:
self.current_task = next(self.task_iter)
except StopIteration:
self.screen.quit()
self.screen.set_container(self.confirm_screen)
self._receive_keys = True
def key_press(self, key):
if self._receive_keys:
if key == util.key_escape:
self.screen.quit()
elif key == util.key_return:
self._run_task()
else:
self.key_pressed.emit(key)
def _prepare_daqstream(self):
if isinstance(self.daq, (list, tuple)):
self.daqstream = []
for daq_ in self.daq:
self.daqstream.append(DaqStream(daq_))
elif isinstance(self.daq, dict):
self.daqstream = dict()
for daq_name, daq_ in self.daq.items():
self.daqstream[daq_name] = DaqStream(daq_)
else:
self.daqstream = DaqStream(self.daq)