Some Task Recipes

The following are just some recipes for tasks or pieces of tasks that are somewhat common. Note that these are for illustration purposes and won’t always run as-is.

The Basics

Storing Data

Storing data within a task is typically a two-step process. First, you implement prepare_design() to set up the attributes and arrays (with initial values) for each trial and the trial/block structure. Then you implement prepare_storage() to get access to a new TaskWriter. When your trial is finished, you can use the task’s trial attribute to write the trial data to disk using the task writer.

class CustomTask(Task):

    def prepare_design(self, design):
        block = design.add_block()
        for pos in [0, 0.2, 0.4]:
            block.add_trial(attrs={'pos': pos})
        block.shuffle()

    def prepare_storage(self, storage):
        self.writer = storage.create_task('custom_task')

    # ... task implementation here

    def finish_trial(self):
        self.writer.write(self.trial)

        # call next_trial() to start the next trial
        # could instead start a timer if you want a timeout between trials
        self.next_trial()

Using Input Hardware

To make use of an input device (DAQ), implement prepare_daq() to gain access to the stream interface, get it running, then connect its updated transmitter to a callback that you define.

class CustomTask(Task):

    def prepare_daq(self, daqstream):
        self.daqstream = daqstream
        self.daqstream.start()

    def run_trial(self, trial):
        self.daqstream.updated.connect(self.update)

    def update(self, data):
        # do something with the data from the daqstream here

You may instead want to connect the stream in prepare_daq and start and stop the stream (as opposed to letting it run and making/breaking the connection to your update callback). The main disadvantage to this approach is some devices may take a couple seconds to start. The downside of the other approach though is the time from making the connection to the first call of the update callback is variable depending on when exactly the connection is made with respect to the most recent update from the hardware.

class CustomTask(Task):

    def prepare_daq(self, daqstream):
        self.daqstream = daqstream
        self.daqstream.updated.connect(self.update)

    def run_trial(self, trial):
        self.daqstream.start()

    def update(self, data):
        # do something with the data from the daqstream here