AxoPy

AxoPy is a system for creating human-computer interface experiments involving the use of electrophysiological signals, such as electromyography (EMG) or electroencephalography (EEG). It is intended to provide an infrastructure for rapidly developing simple experiments while allowing for more complex designs.

This documentation is split into two major components: a user guide and API documentation. The API documentation covers what the different modules of AxoPy can do, and the user guide is intended to describe how to use these modules and how they work together.

Documentation Contents

Installation

There are two major options for installing AxoPy: pip and (Ana)conda.

See also

If you’re interested in developing AxoPy itself, see the Development documentation.

pip

If you like to use a systemwide Python installation (such as the Python provided by your package manager on Linux or the official installer for Windows), you can use pip to install AxoPy from PyPI:

$ pip install axopy

You may also want to make use of venv to create a virtual environment first. This would allow you to install several different versions of AxoPy for different projects, for example:

$ python -m venv .venv
$ source .venv/bin/activate
(.venv) $ pip install axopy

Note that the second command above depends on your platform. See the venv documentation for more information.

conda

AxoPy can also be installed with miniconda or Anaconda via the conda-forge channel:

$ conda install -c conda-forge axopy

Similarly to the instructions above for pip installation, you may want to create a separate conda environment before installing:

$ conda create -c conda-forge -n axopy axopy

User Guide

This is a guide that covers most of AxoPy’s functionality in a narrative style. If you are new to AxoPy, this section of the documentation should be helpful in getting started implementing experiments.

Experiment Setup

The overall structure of an AxoPy application is handled by the Experiment. You can think of the Experiment as a manager of a number of tasks that, when run in succession, form an actual experimental protocol. Let’s get started with AxoPy by immediately writing a bit of code to produce a running experiment. We can then re-run the application after making a number of changes to get a feel for how to set up an Experiment.

Hello, Experiment

AxoPy is written for experiments that involve collecting data from a hardware input device and producing visual [1] feedback to the subject. For most of our examples, we’ll make use of the built-in Oscilloscope task and a built-in device that works without requiring special hardware, like the NoiseGenerator. So here’s how we use those two items to put together a simple but functioning experiment:

import axopy

daq = axopy.daq.NoiseGenerator()
exp = axopy.experiment.Experiment(daq=daq)
exp.run(axopy.task.Oscilloscope())

We create the Experiment object with a NoiseGenerator as the input device (or DAQ, short for data acquisition), then run the experiment with Oscilloscope as the sole task to run.

It’s worth noting here that AxoPy’s submodules (e.g. experiment, daq, etc.) are useful for organizing the package into logical parts, but it can be annoying to type the module names repeatedly. You can write the above example with more verbose imports like the following so the code itself is a little more succinct:

from axopy.daq import NoiseGenerator
from axopy.experiment import Experiment
from axopy.task import Oscilloscope

daq = NoiseGenerator()
exp = Experiment(daq=daq)
exp.run(Oscilloscope())

When you run this example, you’ll notice the first thing that happens is a dialog window pops up prompting you to enter a subject ID. The Experiment needs a subject ID so that it can set up data storage. Once the subject ID is entered and accepted, you’ll see a screen that says “Ready”. This screen is shown in between all tasks in the experiment—hit the Enter or Return key to accept the prompt and start the task. You should then see an oscilloscope widget displaying a randomly generated signal in real time. You can press Enter again to finish the task (this is specific to Oscilloscope which is a “free-running” task). When the task finishes, the Experiment looks for the next task to run. Since there aren’t any more, the application exits.

[1]At least visual. For now, AxoPy doesn’t have a standardized way to talk to other kinds of feedback-producing devices (an audio output module would be cool, PRs welcome). That said, AxoPy doesn’t do anything to prevent you from working with them either.
Experiment Configuration

Human-computer interface study designs often include one or more of the following complications:

  • subjects are split into groups
  • subjects are tested over multiple sessions
  • subjects fall into categories that require different configuration (e.g. mirror the screen contents for left-hand dominant subjects)

For these cases, Experiment provides the option to run a configuration step between creation of the experiment object and running the tasks. The options are entered in the same dialog window where you entered the subject ID in the example above. This allows you to set options on your tasks before running them or even run an entirely different list of tasks. It also means the person running an experiment (which isn’t necessarily the person who wrote the experiment code) doesn’t need to know how to write some configuration file or anything — they just run the experiment application and can enter the details in a graphical widget.

The Experiment.configure() method accepts as many configuration options as you want. You specify each one by providing a keyword argument with the option’s type (e.g. str, int, float) as the value, and it returns a dictionary with the values entered.

For example, say we want to input the subject’s age. We can do that with an int option called age:

from axopy.experiment import Experiment

exp = Experiment()
config = exp.configure(age=int)

print(config['age'])

If you run the code above, a dialog box will pop up just like it did for the first example, but now a text box for the subject ID and the age is shown. Note that you do not have to specify subject as an option—this is done for you. It’s up to you to handle the configuration options and modify how the experiment runs based on them.

Aside from primitive types like int, str, or float, you can enumerate all possible values for a configuration option, and these will be available to select in a combo box (drop-down menu). This way, the researcher running the experiment can’t enter an invalid value:

exp.configure(hand=('right', 'left'))
Tips for Experiment Writing

The Experiment class accepts a couple other keyword arguments that can be useful when debugging and/or developing an experiment application. You can specify a subject argument so that the configuration dialog isn’t shown when the Experiment is run:

from axopy.experiment import Experiment
from axopy.task import Oscilloscope
from axopy.daq import NoiseGenerator

exp = Experiment(daq=NoiseGenerator(), subject='test')
exp.run(Oscilloscope())

By default, if you run any tasks that write data to storage, AxoPy will complain and exit if you attempt to overwrite any data that exists. This will happen if you repeatedly run the Experiment with the same subject ID, so it can be useful (in conjunction with the subject keyword argument) to set allow_overwrite=True as well, quelling the error regarding overwriting data:

exp = Experiment(subject='test', allow_overwrite=True)

This setup is pretty handy when developing an experiment, just remember to switch it off! One way to make this a little more robust is to add a flag to your application so you have to explicitly enable this “debugging mode”.

How It Works

Skippable unless you want to dig into working on AxoPy itself

The Experiment manages a PyQt5 application and is responsible for giving each task a graphical container within the Qt application, access to hardware inputs, and data storage. The task implementation is responsible for making use of these experiment-wide resources and then handing control back to the experiment so it can run the next task.

Next Up

Now that we have an experiment running and the ability to set up some configuration options if needed, let’s look at how to write tasks.

Implementing Tasks

In AxoPy, the individual components of an experiment are tasks. In essence, a task does any or all of the following:

  • Takes in data from previous tasks (read)
  • Streams data from a data acquisition device (daq)
  • Processes the streaming or task data (proc)
  • Draws things to the screen (ui)
  • Outputs data of its own (write)

One example of a task is the Oscilloscope which we saw on the previous page. It is responsible for streaming data from a data acquisition device (daq) and displaying it in a dynamic plot (ui). The purpose of this task is usually just to allow the researcher to visually verify the integrity of the data coming in from the device before proceeding with the rest of the experiment.

Another example of a task is a cursor control task (subject produces input in attempt to hit targets on the screen). This kind of task reads in and processes data from an input device (daq), displays some information on screen to give feedback to the subject (ui), and records some data for post-experiment analysis (write). It may also require some calibration parameters from a previous task (read). This is a fairly complex task with an enormous number of possible implementations, so there’s no built-in CursorControlTask.

There isn’t really a strict definition of what a single task is or what it should or shouldn’t do, but it’s a good idea to make tasks as simple as possible — any given task should do just a couple things and do them well. This encourages modularity so you can re-use task implementations in different experiments, etc.

In terms of the AxoPy API, a task looks like the following:

_images/task_diagram.png

In this part of the user guide, we’ll go through how to make each of the four connections in the diagram and refer to separate documents for the details of working with those four components.

The Task Lifecycle

AxoPy experiments are event-driven, following the way graphical user interface frameworks tend to operate. This can be an unfamiliar way of writing programs, so it’s important to understand the overall idea before seeing some of the details. Tasks in an experiment all go through the same lifecycle, shown below. First, the Task instances are created (by you) and then they’re handed off to an Experiment, like we saw in the previous section:

exp = Experiment(...)
exp.run(Oscilloscope())

Once you call run(), the Experiment collects the task objects passed in, sets up the shared resources (data storage, data acquisition, graphical backend), then proceeds to prepare and run each task in sequence. That means pretty much all of the functionality of the experiment is defined in the Task classes.

The most important thing to understand about tasks is that they’re written by defining what should happen in response to certain events. For example, the Oscilloscope task defines a method that gets called every time there is new data available from the data acquisition stream, allowing it to update the signals displayed on the screen. This is sometimes referred to as a callback. You can think of the Experiment as running an infinite loop checking for events that occur, then dispatching the data from those events to the task if appropriate.

There are a standard set of methods that are automatically run by the Experiment the task belongs to, and you can optionally implement these methods to make use of the shared resources that the Experiment manages. These are the prepare methods: prepare_design, prepare_storage, prepare_input_stream, and prepare_graphics.

Say you’re writing a task that makes use of data storage only (read and write). A common example of this is processing some data to make it suitable for other tasks later on in the experiment. To interact with the storage functionality set up by the Experiment, your class should implement the Task.prepare_storage() method. If you click on the link to the API documentation for that method, you’ll see that a Storage object is passed into this method, which is provided by the Experiment. We’ll see more details about setting up storage specifically later on, but for the sake of the example, it’s sufficient to point out that the storage object lets you create reader and/or writer objects that you can save for use later on in your task:

from axopy.task import Task

class MyTask1(Task):

   def prepare_storage(self, storage):
      # here's where we can use the storage object to read data from other
      # tasks or write our own data to storage

The Task.prepare_design() is slightly different from the others in that it’s not actually for setting up a shared resource. It’s actually just an opportunity for your task to build a Design so that it can easily be written in terms of a series of repeated trials.

After the rest of the prepare methods are called, the Task.run() method is called. This is where your task should set up its own events and start running. In “experiment tasks” (those implementing prepare_design), the flow of the task proceeds through it’s Design by calling Task.next_trial`.

There are two main ways for a task to end. One is by calling Task.finished() somewhere in your task implementation. This signals to the Experiment that the task is done, then the Task.finish() method is called so you can clean up anything you need to before the next task runs. A common example of cleanup is to make sure the DAQ is stopped.

The flowchart below shows the lifecycle of a Task when it’s run by an Experiment.

_images/task_flowchart.png

Data Acquisition

Traditionally, data acquisition (DAQ) refers to the process of capturing and conditioning signals for recording by a computer. In AxoPy, any source of data generated or influenced by the subject of the experiment is referred to as a DAQ. AxoPy doesn’t include built-in support for hardware aside from things commonly available with just a desktop computer (mouse, keyboard).

AxoPy assumes a fairly simple model for collecting data, based on polling. First, the interface is set up – this might involve initializing a USB interface, connecting to a TCP server, setting up initial parameters, etc. Next, data acquisition is started. Some devices don’t require an explicit start command, but some do. Next, you request data from the device. This is a blocking operation, meaning the request won’t return the data until the data is ready. You’re then free to process, display, or save this data. Then, you request the next batch of data with another request. It is important to make sure consecutive requests occur frequently enough that you don’t fall behind.

For example, imagine you set up a device to acquire data at 1000 Hz in bunches of 100 samples:

from axopy.daq import NoiseGenerator

daq = NoiseGenerator(rate=1000, read_size=100)

daq.start() # NoiseGenerator doesn't require this, but most do
for i in range(10):
    data = daq.read()
    process_data(data)
daq.stop() # again, NoiseGenerator doesn't require this

Here, you’ll want to ensure that the process_data() function does not take longer than 100 ms to complete, or data acquisition will fall behind the rate at which it is generated.

Some DAQs are built in to AxoPy, but of course not all of them can be. Check out pymcc and pytrigno for a couple examples of working with real data acquisition hardware.

The DaqStream

One thing to notice about the code above is that every time the daq.read() operation occurs, no other code is being run while waiting for the device to return the new data. This is sometimes referred to as a blocking operation. In AxoPy, we usually want some things to be happening while the device is reading in data in the background. This where the DaqStream comes in – a threaded interface to the underlying hardware.

You’ll usually set up your DAQ as above (e.g. daq = NoiseGenerator(...)), pass it to the Experiment as a shared resource, then make use of the DaqStream object made available by the Experiment in your Task implementation. The DaqStream has a uniform interface so no matter what kind of hardware you’re using, your task implementation doesn’t need to care about how that all works. You just start/stop and connect/disconnect from the stream. In order to facilitate this uniform interface, the device the DaqStream wraps needs to expose a specific API as well. This is defined below:

DAQ API
   start() - Called once before the first read.

   read()  - Request a new buffer of data from the hardware. Parameters (like
             the size of the buffer or number of samples to read) should be
             set up in the daq constructor. Canonical AxoPy devices generate
             a NumPy ndarray with shape (n_channels, n_samples), but the only
             real restriction is that your Pipelines can consume the data
             generated by your device.

   stop()  - Called when the user wants the device to stop reading data.

In addition, the DAQ implementation should raise an IOError if something goes wrong during data acquisition.

An example of setting up a DaqStream in the context of a custom Task is given in the recipes page.

Data Storage

The AxoPy data storage model was created so that your experimental dataset can be easily passed on to others and read using a variety of tools on any platform, meaning AxoPy is not required to use the raw dataset. It was also designed so that you can browse the dataset using a standard file browser so you do not need to write any code to have a peek.

Data is stored in a hierarchical fashion using a specific file structure and common file types. There are two types of files: comma separated value (CSV) files for trial data (one row per trial) and HDF5 files for array data (one file per trial). Here’s the file structure in abstract terms:

data/
    subject_id/
        task_name/
            file: trials.csv
            file: array_type1.hdf5
            file: array_type2.hdf5

You have the root of the entire dataset, containing a subfolder for each subject. Each subject folder contains a set of subfolders, one per task. The task subfolders contain a single CSV file which contains all of the attributes (scalars) for each trial, and it contains HDF5 files which store array data, one for each kind of array (e.g. an emg.hdf5 file containing raw electrophysiological recordings for each trial and a cursor.hdf5 file containing cursor positions for each trial).

As an concrete example, suppose you are running an experiment where subjects are tasked with contracting a muscle to a specified level, determined using electromyography (EMG). For each trial, you want to store the following:

  • the time it took for the subject to reach the desired contraction level for each trial
  • the number of times the contraction level went past the desired level (overshoots)
  • the raw EMG signals, which are recorded at 2 kHz
  • the current “level of contraction,” which is computed by processing the EMG signals through some processing pipeline you have set up at 10 Hz

The trial data variables here are time to target and overshoots, so these are placed in a CSV file with one row per trial:

trial time_to_target overshoots
0 3.271942 1
1 2.159271 0
2 3.212450 2

Since you have two different array-like entities to store (raw EMG data at 2 kHz and processed position at 10 Hz), you create two different array types: emg and level. They are placed in separate subfolders of the task and each one is stored as an array in a HDF5 file, with one HDF5 dataset (in the root group) per trial. The result of all of this is a structure that looks like:

data_root/
    subject_id/
        contraction_level_task/
            file: trials.csv
            file: emg.hdf5
            file: level.hdf5

The HDF5 format was chosen for all array data because it naturally works with NumPy arrays, which are the assumed container for data as it goes from a hardware device through processing code to computer interaction. It also saves the arrays in a binary format instead of converting to strings as something like numpy.savetxt would do, potentially reducing the size of a whole experiment’s dataset significantly if you store many arrays representing high-frequency electrophysiological recordings.

The goals of this storage layout are to be simple to implement and reason about, to allow for manual browsing of the dataset, and to enable simultaneous sessions (i.e. multiple researchers running the experiment with different subjects) with a very simple and intuitive data merging procedure (i.e. just designate a single root folder and move all subject data there). The layout is not optimized for processing and analyzing data once an experiment is complete, however. For that, see Data Consolidation.

Experiment-Level Storage

The top-level Storage class handles the first two layers of the storage hierarchy: subjects and tasks. It is initialized at the beginning of each session and (e.g. once per subject for a single-session experiment) and the data storage hierarchy is built for each subject. Initializing and adding subjects is typically handled for you by axopy.experiment.Experiment in the context of running an experiment. Once a task is given access to the Storage object, however, it is up to the task implementation to set up TaskReader objects for reading data from other tasks and TaskWriter objects for storing its own data. This is done by calling Storage.require_task() and Storage.create_task(), respectively.

Task Storage

Task storage is designed to make implementing a task’s data reading and writing as simple as possible, while being flexible enough to accommodate different kinds of experimental design. If you are interested in processing data after an experiment has been run, see the Data Consolidation documentation.

Reading Data from Another Task

The TaskReader is used for reading in data from another task. In the context of an experiment, you would access a reader with Storage.require_task(), passing in the name of the task (i.e. the name of the directory corresponding to the task). You can then access the trial data (attrs) with the trials attribute, which returns a pandas DataFrame object. You can also access array data either by reading it all at once (arrays for each trial are stacked) or by iterating over each trial’s array.

Keeping with our example above, suppose we want to run the EMG data from the contraction_level_task through a processing pipeline.

# storage can either be created for post-processing
# or it can be given to us if this is another task implementation
reader = storage.require_task('contraction_level_task')
for emg in reader.iterarray('emg'):
    # emg holds the EMG data for a single trial
    out = pipeline.process(emg)
    ...

It is also common to need the trial attributes while iterating over the trial arrays, and this can be achieved using zip and the DataFrame.iterrows method:

for (i, trial_attrs), emg in zip(reader.trials.iterrows(),
                                 reader.iterarray('emg')):
    if trial_attrs['time_to_target'] > 5.0:
        continue
    out = pipeline.process(emg)
    ...
Data Consolidation

Most of the data reading and writing functionality implemented in AxoPy is designed to make implementing an experiment as easy as possible, but there are some convenience functions for compiling an experiment’s dataset into something more amenable to post-processing and analysis.

Archiving Raw Data

In most cases, you’ll want to archive your entire untouched dataset once an experiment is complete, or maybe even periodically as an experiment is performed. For this purpose, there is the storage_to_zip() function, which creates a ZIP archive of the data contained in the root storage directory. It’s usage is fairly simple, since it does a simple task. You pass it the path to your data storage root directory, which can be relative to the directory you run the function from. Let’s say you have some data in a folder called experiment01_data:

>>> from axopy.storage import storage_to_zip
>>> storage_to_zip('experiment01_data')

There should now be a file called experiment01_data.zip in the current directory, containing a copy of the whole dataset hierarchy. You can also specify an output file if you don’t like the default:

>>> from axopy.storage import storage_to_zip
>>> storage_to_zip('experiment01_data', outfile='dataset.zip')

Graphics

Each task in an AxoPy experiment is given a Container. The container is effectively an empty QWidget from Qt, so you can set up its contents quite flexibly. That is, any valid QWidget or QLayout can be used as the container’s contents, so you can create arbitrarily complex graphics for a task.

To set up graphics for a task, override the Task.prepare_graphics method, which takes the Container as an input argument, then use Container.set_widget() to establish the main widget for the task.

from axopy.task import Task

class CanvasTask(Task):

    def prepare_graphics(self, container):
        # set up graphical widget/layout here
        widget = ...
        container.set_widget(widget)

While you can always set up completely custom graphics using PyQt5 classes directly, AxoPy includes some graphical elements commonly used in human-computer interface experiments, making it possible to write experiments without knowing how to use Qt.

Note

In the examples below, get_qtapp() will be used to demonstrate different graphical widgets and layouts. This function creates or retrieves a QApplication instance. We can then use app.exec_() to run the Qt event loop and test out the graphics code.

Built-In Graphics Widgets
Canvas Graphics

The axopy.gui.canvas module contains a Canvas class which can be directly inserted into a container. You can then add items like a Circle or Text to the canvas. In the context of a task, you can create a canvas as follows:

from axopy.gui.main import get_qtapp
from axopy.gui.canvas import Canvas, Circle

app = get_qtapp()

canvas = Canvas()
canvas.add_item(Circle(0.1, color='red'))

canvas.show()
app.exec_()

All of the built-in items inherit from the Item class, which means they all have a number of properties that can be set, such as the position and visibility.

canvas = Canvas()
circle = Circle(0.1)
canvas.add_item(circle)

# set the x coordinate
circle.x = 0.5
# read the y coordinate
y = circle.y
# hide the circle
circle.hide()

All of the Item classes are actually just wrappers around QGraphicsItem classes. In general, the various subclasses of QGraphicsItem (e.g. QGraphicsEllipseItem) have a large number of methods that may not be exposed by AxoPy, so all items have a qitem attribute pointing to the underlying QGraphicsItem. For example, the Line item wraps a QGraphicsLineItem. In AxoPy, a line is just a solid line with a specific cap style. If you need to customize this behavior, you can use the qitem attribute and dig into the Qt API:

from axopy.gui.canvas import Line

# horizontal line 0.4 units long
line = Line(-0.2, 0, 0.2, 0)
Custom Items

Processing Data

AxoPy’s pipeline subpackage is a small infrastructure for processing data in a pipeline style. You create pipeline blocks, then connect them up with an efficient (but still readable) syntax. It was originally created for flexibly creating pipelines in real-time signal processing applications, but it can be useful in offline applications as well.

In pipeline, data processing is implemented through a Pipeline. A pipeline is a series of processing routines for transforming raw input data (e.g. electrophysiological data such as EMG) into useful output, such as the velocity of a cursor on the screen. These routines can usually be broken down into blocks which have common functionality.

Common Blocks

The typical picture for an electrophysiological signal processing pipeline looks something like:

         Input
           ↓
┌──────────────────────┐
│       Windowing      │
└──────────────────────┘
           ↓
┌──────────────────────┐
│     Conditioning     │
└──────────────────────┘
           ↓
┌──────────────────────┐
│  Feature Extraction  │
└──────────────────────┘
           ↓
┌──────────────────────┐
│  Intent Recognition  │
└──────────────────────┘
           ↓
┌──────────────────────┐
│    Output Mapping    │
└──────────────────────┘
           ↓
         Output

Each block in this example is really a type of processing block, and the actual processing involved in each can vary. Some common cases are implemented, but creating custom blocks and connecting them together in a pipeline structure is simple. Also, the picture above shows a simple series structure, where each block takes input only from the block before it. More complex structures are sometimes convenient or necessary, and some complexity is supported.

Windowing

Windowing involves specifying a time window over which the rest of the pipeline will operate. That is, a windower keeps track of the current input data and optionally some data from the past, concatentating the two and passing it along. This is useful for calculating statistics over a sufficient sample size while updating the pipeline output at a rapid rate, achieved by overlapping windows. In an offline processing context (i.e. processing static recordings), windowing also specifies how much data to read in on each iteration through the recording.

Windowing is handled by a Windower.

Conditioning

Raw data conditioning (or pre-processing) usually involves things like filtering and normalization. Usually the output of a conditioning block does not fundamentally change the representation of the input.

Feature Extraction

Features are statistics computed on a window of input data. Generally, they should represent the information contained in the raw input in a compact way. For example, you might take 100 samples of data from six channels of EMG and calculate the root-mean-square value of each channel during that 100-sample window of time. This results in an array of length 6 which represents the amplitude of each channel in the high-dimensional raw data. A feature extractor is just a collection of features to compute from the input.

Features in pipeline are classes that take all of their parameters in __init__ and perform their operation on the input in a compute method.

Features are typically used by adding a handful of them to a FeatureExtractor and putting that extractor in a Pipeline.

Intent Recognition

Intent recognition is the prediction or estimation of what the user intends to do based on the signals generated. An example would be a large signal sensed at the group of extensor muscles in the forearm for communicating “wrist extension.” Sometimes this mapping can be specified a priori, but most of the time we rely on machine learning techniques to infer this mapping from training data.

Connecting Blocks

The core module is a small infrastructure for processing data in a pipeline style. You create or use the built-in Block objects, then connect them up with an efficient (but still readable) syntax with a Pipeline.

The syntax for expressing pipeline structure is based on lists and tuples. Lists hold elements that are connected in series:

[a, b]:

    ─a─b─

The input is whatever a takes, and the output is whatever b outputs. Tuples hold elements that are connected in parallel:

(a, b):

     ┌─a─┐
    ─┤   ┝━
     └─b─┘

The input goes to both a and b, and the output is whatever a and b output in a list. If we connect another element in series with a parallel block, it must be a block that handles multiple inputs:

[(a, b), c]:

     ┌─a─┐
    ─┤   ┝━c─
     └─b─┘

The bottom line is: pipeline blocks accept input types and they specify the output types. You are responsible for ensuring that pipeline blocks can be connected as specified.

Sometimes, you might want to pass the output of a block to some block structure and somewhere downstream. To handle this case, there is a PassthroughPipeline that you can use as a block within another pipeline:

passthrough pipeline p ← (b, c):

     ┌─────┐
     ├─b─┐ │
    ─┤   ┝━┷━
     └─c─┘

[a, p, d]:
                   ┌─────┐
                   ├─b─┐ │
    ─a─p━d─  →  ─a─┤   ┝━┷━d─
                   └─c─┘

The pass-through pipeline places its own output(s) after its input, so the input is accesible on the other side. There are cases where this type of structure is possible with a list/tuple expression, but sometimes the pass-through pipeline as a block is needed. The above example is one of those cases.

Implementing Pipeline Blocks

Pipeline blocks are simple to implement. It is only expected that you implement a process() method which takes one argument (data) and returns something. For multi-input blocks, you’ll probably want to just expand the inputs right off the bat (e.g. in_a, in_b = data). Usually, the output is some processed form of the input data:

import axopy.pipeline as pipeline

class FooBlock(pipeline.Block):
    def process(self, data):
        return data + 1

class BarBlock(pipeline.Block):
    def process(self, data):
        return 2 * data

With some blocks implemented, the list/tuple syntax described above is used for specifying how they are connected:

a = FooBlock()
b = BarBlock()
p = pipeline.Pipeline([a, b])

Now, you just give the pipeline input and get its output:

data = 3
result = p.process(data)

In this case, the result would be 2 * (data + 1) == 8.

Post-Process Hooks

Sometimes, it’s useful to be able to hook into some block in the pipeline to retrieve its data in the middle of a run through the pipeline. For instance, let’s say you have a simple pipeline:

[a, b]:

    ─a─b─

You run some data through the pipeline to get the result from block b, but you also want to run some function with the output of a. Block takes a hooks keword argument which takes a list of functions to execute after the block’s process method finishes. To use hooks, make sure your custom block calls the parent Block __init__ method. For example:

import axopy.pipeline as pipeline

class FooBlock(pipeline.Block):
    def __init__(self, hooks=None):
        super(FooBlock, self).__init__(hooks=hooks)

    def process(self, data):
        return data + 1

class BarBlock(pipeline.Block):
    def process(self, data):
        return 2 * data

def foo_hook(data):
    print("FooBlock output is %d".format(data))

a = FooBlock(hooks=[foo_hook])
b = BarBlock()

p = pipeline.Pipeline([a, b])
result = p.process(3)

Now, the call to process on the pipeline will input 3 to block a, block a will add 1 then print FooBlock output is 4, and then 4 will be passed to block b, which will return 8.

Some Task Recipes

The following are just some recipes for tasks or pieces of tasks that are somewhat common. Note that these are for illustration purposes and won’t always run as-is.

The Basics
Storing Data

Storing data within a task is typically a two-step process. First, you implement prepare_design() to set up the attributes and arrays (with initial values) for each trial and the trial/block structure. Then you implement prepare_storage() to get access to a new TaskWriter. When your trial is finished, you can use the task’s trial attribute to write the trial data to disk using the task writer.

class CustomTask(Task):

    def prepare_design(self, design):
        block = design.add_block()
        for pos in [0, 0.2, 0.4]:
            block.add_trial(attrs={'pos': pos})
        block.shuffle()

    def prepare_storage(self, storage):
        self.writer = storage.create_task('custom_task')

    # ... task implementation here

    def finish_trial(self):
        self.writer.write(self.trial)

        # call next_trial() to start the next trial
        # could instead start a timer if you want a timeout between trials
        self.next_trial()
Using Input Hardware

To make use of an input device (DAQ), implement prepare_daq() to gain access to the stream interface, get it running, then connect its updated transmitter to a callback that you define.

class CustomTask(Task):

    def prepare_daq(self, daqstream):
        self.daqstream = daqstream
        self.daqstream.start()

    def run_trial(self, trial):
        self.daqstream.updated.connect(self.update)

    def update(self, data):
        # do something with the data from the daqstream here

You may instead want to connect the stream in prepare_daq and start and stop the stream (as opposed to letting it run and making/breaking the connection to your update callback). The main disadvantage to this approach is some devices may take a couple seconds to start. The downside of the other approach though is the time from making the connection to the first call of the update callback is variable depending on when exactly the connection is made with respect to the most recent update from the hardware.

class CustomTask(Task):

    def prepare_daq(self, daqstream):
        self.daqstream = daqstream
        self.daqstream.updated.connect(self.update)

    def run_trial(self, trial):
        self.daqstream.start()

    def update(self, data):
        # do something with the data from the daqstream here

Examples

Here are some complete examples that can be run. Click on an example to see the source code and some comments about the example.

Experiment Setup Options

Demonstration of several ways to instantiate an experiment.

simple
The most straightforward usage. You pass the hardware device to create the experiment, then run a task. Subject configuration is handled automatically.
customized
A customized experiment setup. A “config” step is used before Experiment.run() to allow the researcher to select the subject group for the current session (“A” or “B”).
import argparse
from axopy.experiment import Experiment
from axopy.task import Oscilloscope
from axopy.daq import NoiseGenerator

daq = NoiseGenerator(rate=2000, num_channels=6, read_size=200)


def run():
    """Main function of the example. Runs each demo and then exits."""
    customized()


def simple():
    # subject is not given, so it is configured in run
    exp = Experiment(daq=daq).run(Oscilloscope())


def customized():
    exp = Experiment(daq=daq)

    # optional config step, subject field is implied
    config = exp.configure(group=('A', 'B'))

    # here you can retrieve the selected group via `config['group']`

    # run list of tasks
    exp.run(Oscilloscope())


def debug():
    # subject is given, so no configure step is needed
    exp = Experiment(daq=daq, data='/tmp/data', subject='test').run(
        Oscilloscope())


if __name__ == '__main__':
    functions = {
        'simple': simple,
        'customized': customized,
    }

    parser = argparse.ArgumentParser(usage=__doc__)
    parser.add_argument(
        'function',
        help='Function in the example script to run.')
    args = parser.parse_args()

    if args.function not in functions:
        print("{} isn't a function in the example.".format(args.function))
        sys.exit(-1)
    else:
        functions[args.function]()

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery

Built-In Devices

This example demonstrates some input devices built into AxoPy for testing. Pass the following options to try out different devices:

rainbow
Basic use of an NoiseGenerator to show lots of colorful random data.
keyboard
Basic use of a Keyboard to show roughly-timed keyboard inputs.
keystick
Neat use of a filter to get joystick-like inputs from a keyboard.
mouse
Basic use of a Mouse for velocity input.
import sys
import argparse
import numpy as np
from axopy.task import Oscilloscope
from axopy.experiment import Experiment
from axopy.daq import NoiseGenerator, Keyboard, Mouse
from axopy.pipeline import Pipeline, Callable, Windower, Filter


def rainbow():
    dev = NoiseGenerator(rate=2000, num_channels=16, read_size=200)
    run(dev)


def keyboard():
    dev = Keyboard()
    # need a windower to show something interesting in the oscilloscope
    pipeline = Pipeline([Windower(10)])
    run(dev, pipeline)


def keystick():
    dev = Keyboard(rate=20, keys=list('wasd'))
    pipeline = Pipeline([
        # window to average over
        Windower(10),
        # mean along rows
        Callable(lambda x: np.mean(x, axis=1, keepdims=True)),
        # window to show in the oscilloscope
        Windower(60)
    ])
    run(dev, pipeline)


def mouse():
    dev = Mouse(rate=20)
    pipeline = Pipeline([
        # just for scaling the input since it's in pixels
        Callable(lambda x: x/100),
        # window to show in the oscilloscope
        Windower(40)
    ])
    run(dev, pipeline)


def run(dev, pipeline=None):
    # run an experiment with just an oscilloscope task
    Experiment(daq=dev, subject='test').run(Oscilloscope(pipeline))


if __name__ == '__main__':
    functions = {
        'rainbow': rainbow,
        'keyboard': keyboard,
        'keystick': keystick,
        'mouse': mouse,
    }

    parser = argparse.ArgumentParser(usage=__doc__)
    parser.add_argument(
        'function',
        help='Function in the example script to run.')
    args = parser.parse_args()

    if args.function not in functions:
        print("{} isn't a function in the example.".format(args.function))
        sys.exit(-1)
    else:
        functions[args.function]()

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery

Adaptive Cursor Control Mapping

This example contains a 2D cursor-to-target task which processes input signals from a data acquisition device (e.g. from EMG hardware) and adaptively learns a linear mapping from input magnitude to cursor position via the recursive least squares (RLS) algorithm.

Once the cursor interface is shown, press the “Enter” key to begin. The target will move to some location on the screen and the subject should attempt to move the cursor toward the target. As input data is collected, the recursive least squares algorithm updates the weights of a linear mapping from input amplitude to cursor position. Once this training procedure is finished, the target changes color and the subject can attempt to hit the targets with the mapping now fixed.

import numpy
import random
from scipy.signal import butter

from axopy import pipeline
from axopy.features import mean_absolute_value
from axopy.experiment import Experiment
from axopy import util
from axopy.task import Task, Oscilloscope
from axopy.daq import NoiseGenerator
from axopy.timing import Counter
from axopy.gui.canvas import Canvas, Circle, Cross


class RLSMapping(pipeline.Block):
    """Linear mapping of EMG amplitude to position updated by RLS.

    Parameters
    ----------
    m : int
        Number of vectors in the mapping.
    k : int
        Dimensionality of the mapping vectors.
    lam : float
        Forgetting factor.
    """

    def __init__(self, m, k, lam, delta=0.001):
        super(RLSMapping, self).__init__()
        self.m = m
        self.k = k
        self.lam = lam
        self.delta = delta
        self._init()

    @classmethod
    def from_weights(cls, weights):
        """Construct an RLSMapping static weights."""
        obj = cls(1, 1, 1)
        obj.weights = weights
        return obj

    def _init(self):
        self.w = numpy.zeros((self.k, self.m))
        self.P = numpy.eye(self.m) / self.delta

    def process(self, data):
        """Just applies the current weights to the input."""
        self.y = data
        self.xhat = self.y.dot(self.w.T)
        return self.xhat

    def update(self, x):
        """Update the weights with the teaching signal."""
        z = self.P.dot(self.y.T)
        g = z / (self.lam + self.y.dot(z))
        e = x - self.xhat
        self.w = self.w + numpy.outer(e, g)
        self.P = (self.P - numpy.outer(g, z)) / self.lam


class CursorFollowing(Task):
    # TODO split this into two tasks (a "training" task and a "practice" task).
    # This would involve storing the RLS weights and loading them for the
    # practice task. Probably a good idea to write a simple cursor interface
    # class to share common code between the two tasks.

    target_dist = 0.8

    def __init__(self, pipeline):
        super(CursorFollowing, self).__init__()
        self.pipeline = pipeline

    def prepare_design(self, design):
        d = self.target_dist
        target_positions = [(d, 0), (0, d), (-d, 0), (0, -d), (0, 0)]
        for training in [True, False]:
            block = design.add_block()
            for x, y in target_positions:
                block.add_trial(attrs={
                    'target_x': x,
                    'target_y': y,
                    'training': training
                })
            block.shuffle()

    def prepare_graphics(self, container):
        self.canvas = Canvas()
        self.cursor = Circle(0.05, color='#aa1212')
        self.target = Circle(0.1, color='#32b124')
        self.canvas.add_item(self.target)
        self.canvas.add_item(self.cursor)
        self.canvas.add_item(Cross())
        container.set_widget(self.canvas)

    def prepare_daq(self, daqstream):
        self.daqstream = daqstream
        self.daqstream.start()

        self.timer = Counter(50)
        self.timer.timeout.connect(self.finish_trial)

    def run_trial(self, trial):
        if not trial.attrs['training']:
            self.target.color = '#3224b1'
        self._reset()
        self.target.pos = trial.attrs['target_x'], trial.attrs['target_y']
        self.target.show()
        self.pipeline.clear()
        self.connect(self.daqstream.updated, self.update)

    def update(self, data):
        xhat = self.pipeline.process(data)
        self.cursor.pos = xhat

        target_pos = numpy.array([self.trial.attrs['target_x'],
                                  self.trial.attrs['target_y']])
        if self.trial.attrs['training']:
            self.pipeline.named_blocks['RLSMapping'].update(target_pos)

        if self.cursor.collides_with(self.target):
            self.finish_trial()

        self.timer.increment()

    def finish_trial(self):
        self.disconnect(self.daqstream.updated, self.update)
        self._reset()
        self.next_trial()

    def _reset(self):
        self.cursor.pos = 0, 0
        self.timer.reset()
        self.target.hide()

    def finish(self):
        self.daqstream.stop()

    def key_press(self, key):
        if key == util.key_escape:
            self.finish()
        else:
            super().key_press(key)


if __name__ == '__main__':
    dev = NoiseGenerator(rate=2000, num_channels=4, read_size=200)

    b, a = butter(4, (10/2000./2., 450/2000./2.), 'bandpass')
    preproc_pipeline = pipeline.Pipeline([
        pipeline.Windower(400),
        pipeline.Centerer(),
        pipeline.Filter(b, a=a, overlap=200),
    ])
    main_pipeline = pipeline.Pipeline([
        preproc_pipeline,
        pipeline.Callable(mean_absolute_value),
        RLSMapping(4, 2, 0.99)
    ])

    Experiment(daq=dev, subject='test').run(
        Oscilloscope(preproc_pipeline),
        CursorFollowing(main_pipeline)
    )

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery

Gallery generated by Sphinx-Gallery

API

These are the modules/subpackages which constitute AxoPy.

daq

Protocol and threaded interface for data acquisition.

class axopy.daq.DaqStream(device)[source]

Asynchronous interface to an input device.

Runs a persistent while loop wherein the device is repeatedly polled for data. When the data becomes available, it is emitted and the loop continues.

There are effectively two methods of this class: start and stop. These methods do as their names suggest – they start and stop the underlying device from sampling new data.

The device used to create the DaqStream is also accessible via the device attribute so you can change settings on the underlying device any time (e.g. sampling rate, number of samples per update, etc.).

Parameters:device (daq) – Any object implementing the AxoPy data acquisition interface. See NoiseGenerator for an example.
updated

Transmitted when the latest chunk of data is available. The data type depends on the underlying input device, but it is often a numpy ndarray.

Type:Transmitter
disconnected

Transmitted if the device cannot be read from (it has disconnected somehow).

Type:Transmitter
finished

Transmitted when the device has stopped and samping is finished.

Type:Transmitter
run()[source]

Implementation for the underlying QThread.

Don’t call this method directly – use start() instead.

running

Boolean value indicating whether or not the stream is running.

start()[source]

Start the device and begin reading from it.

stop(wait=True)[source]

Stop the stream.

Parameters:wait (bool, optional) – Whether or not to wait for the underlying device to stop before returning.
class axopy.daq.Keyboard(rate=10, keys=None)[source]

Keyboard input device.

The keyboard device works by periodically sampling (with the rate specified) whether or not the watched keys have been pressed since the last sampling event. The output is a numpy array of shape (n_keys, 1), where the numerical values are booleans indicating whether or not the corresponding keys have been pressed.

Parameters:
  • rate (int, optional) – Sampling rate, in Hz.
  • keys (container of str, optional) – Keys to watch and use as input signals. The keys used here should not conflict with the key used by the Experiment to start the next task.

Notes

There are a couple reasonable alternatives to the way the keyboard device is currently implemented. One way to do it might be sampling the key states at a given rate and producing segments of sampled key state data, much like a regular data acquisition device. One issue is that actual key state (whether the key is being physically pressed or not) doesn’t seem to be feasible to find out with Qt. You can hook into key press and key release events, but these are subject to repeat delay and repeat rate.

Another possible keyboard device would be responsive to key press events themselves rather than an input sampling event. While Qt enables event-based keyboard handling, the method used here fits the input device model, making it easily swappable with other input devices.

eventFilter(self, QObject, QEvent) → bool[source]
read()[source]

Read which keys have just been pressed.

Returns:data – A boolean array with a 1 indicating the corresponding key has been pressed and a 0 indicating it has not.
Return type:ndarray, shape (n_keys, 1)
reset()[source]

Reset the input device.

start()[source]

Start the keyboard input device.

stop()[source]

Stop the keyboard input device.

You may need to stop the device in case you want to be able to use the keys watched by the device for another purpose.

class axopy.daq.Mouse(rate=10, position=False)[source]

Mouse input device.

The mouse device works by periodically sampling (with the rate specified) the mouse position within the AxoPy experiment window. The output is in the form of a numpy array of shape (2, 1), representing either the change in position (default) or the absolute position in the window.

Parameters:
  • rate (int, optional) – Sampling rate, in Hz.
  • position (bool, optional) – Whether or not to return the mouse’s position (instead of the position difference from the prevoius sample).

Notes

In Qt’s coordinate system, the positive y direction is downward. Here, this is inverted as a convenience (upward movement of the mouse produces a positive “velocity”).

Mouse events are intercepted here but they are not consumed, meaning you can still use the mouse to manipulate widgets in the experiment window.

eventFilter(self, QObject, QEvent) → bool[source]
read()[source]

Read the last-updated mouse position.

Returns:data – The mouse “velocity” or position (x, y).
Return type:ndarray, shape (2, 1)
reset()[source]

Clear the input device.

start()[source]

Start sampling mouse movements.

stop()[source]

Stop sampling mouse movements.

class axopy.daq.NoiseGenerator(rate=1000, num_channels=1, amplitude=1.0, read_size=100)[source]

An emulated data acquisition device which generates random data.

Each sample of the generated data is sampled from a zero-mean Gaussian distribution with variance determined by the amplitude specified, which corresponds to three standard deviations. That is, approximately 99.7% of the samples should be within the desired peak amplitude.

NoiseGenerator is meant to emulate data acquisition devices that block on each request for data until the data is available. See read() for details.

Parameters:
  • rate (int, optional) – Sample rate in Hz. Default is 1000.
  • num_channels (int, optional) – Number of “channels” to generate. Default is 1.
  • amplitude (float, optional) – Approximate peak amplitude of the signal to generate. Specifically, the amplitude represents three standard deviations for generating the Gaussian distributed data. Default is 1.
  • read_size (int, optional) – Number of samples to generate per read() call. Default is 100.
read()[source]

Generates zero-mean Gaussian data.

This method blocks (calls time.sleep()) to emulate other data acquisition units which wait for the requested number of samples to be read. The amount of time to block is calculated such that consecutive calls will always return with constant frequency, assuming the calls occur faster than required (i.e. processing doesn’t fall behind).

Returns:data – The generated data.
Return type:ndarray, shape (num_channels, read_size)
reset()[source]

Reset the device back to its initialized state.

start()[source]

Does nothing for this device. Implemented to follow device API.

stop()[source]

Does nothing for this device. Implemented to follow device API.

design

Task design containers.

class axopy.design.Design[source]

Top-level task design container.

The Design is a list of Block objects, which themselves are lists of Trial objects.

add_block()[source]

Add a block to the design.

Returns:block – The created block.
Return type:design.Block
class axopy.design.Block(index, *args, **kwargs)[source]

List of trials.

Experiments often consist of a set of blocks, each containing the same set of trials in randomized order. You usually shouldn’t need to create a block directly – use Design.add_block() instead.

Parameters:index (int) – Index of the block in the design. This is required to pass along to each trial in the block, so that the trial knows which block it belongs to.
add_trial(attrs=None)[source]

Add a trial to the block.

A Trial object is created and added to the block. You can optionally provide a dictionary of attribute name/value pairs to initialize the trial.

Parameters:attrs (dict, optional) – Dictionary of attribute name/value pairs.
Returns:trial – The trial object created. This can be used to add new attributes or arrays. See Trial.
Return type:Trial
shuffle(reset_index=True)[source]

Shuffle the block’s trials in random order.

Parameters:reset_index (bool, optional) – Whether or not to set the trial attribute of each trial such that they remain in sequential order after shuffling. This is the default.
class axopy.design.Trial(attrs)[source]

Container of trial data.

There are two kinds of data typically needed during a trial: attributes and arrays. Attributes are scalar quantities or primitives like integers, floating point numbers, booleans, strings, etc. Arrays are NumPy arrays, useful for holding things like cursor trajectories.

There are two primary purposes for each of these two kinds of data. First, it’s useful to design a task with pre-determined values, such as the target location or the cursor trajectory to follow. The other purpose is to temporarily hold runtime data using the same interface, such as the final cursor position or the time-to-target.

You shouldn’t normally need to create a trial directly – instead, use Block.add_trial().

attrs

Dictionary mapping attribute names to their values.

Type:dict
arrays

Dictionary mapping array names to Array objects, which contain the array data.

Type:dict
add_array(name, **kwargs)[source]

Add an array to the trial.

Parameters:
  • name (str) – Name of the array.
  • kwargs (dict) – Keyword arguments passed along to Array.
class axopy.design.Array(data=None, stack_axis=1)[source]

Trial array.

The array is not much more than a NumPy array with a stack() method for conveniently adding new data to the array. This is useful in cases where you iteratively collect new segments of data and want to concatenate them. For example, you could use an Array to collect the samples from a data acquisition device as they come in.

You usually don’t need to create an array manually – instead, use Trial.add_array().

Parameters:
  • data (ndarray, optional) – Data to initialize the array with. If None, the first array passed to stack() is used for initialization.
  • stack_axis (int, optional) – Axis to stack the data along.
data

The NumPy array holding the data.

Type:ndarray, optional
clear()[source]

Clears the buffer.

Anything that was in the buffer is not retrievable.

stack(data)[source]

Stack new data onto the array.

Parameters:data (ndarray) – New data to add. The direction to stack along is specified in the array’s constructor (stack_axis).

experiment

Experiment workflow and design.

class axopy.experiment.Experiment(daq=None, data='data', subject=None, allow_overwrite=False)[source]

Experiment workflow manager.

Presents the researcher with a prompt for entering session details and then presents the appropriate tasks.

Parameters:
  • daq (object, optional) – A data acquisition device that follows the AxoPy DAQ protocol. See axopy.daq.
  • data (str, optional) – Path to the data. The directory is created for you if it doesn’t exist.
  • subject (str, optional) – The subject ID to use. If not specified, a configuration screen is shown before running the tasks so you can enter it there. This is mostly for experiment writing (to avoid the extra configuration step).
  • allow_overwrite (bool, optional) – If True, overwrite protection in Storage is disabled. This is mostly for experiment writing purposes.
configure(**options)[source]

Configure the experiment with custom options.

This method allows you to specify a number of options that you want to configure with a graphical interface prior to running the tasks. Use keyword arguments to specify which options you want to configure. The options selected/specified in the graphical interface are then returned by this method so that you can alter setup before running the experiment.

Each keyword argument should list the data type to configure, such as float, str, or int. You can also provide a list or tuple of available choices for that option.

You do not need to add an option for the subject name/ID – that is added automatically if the subject ID was not specified when creating the experiment.

run(*tasks)[source]

Run the experimental tasks.

features

Time-domain features.

Notation:
  • \(x_i\) : value of a signal at time index \(i\)
  • \(N\) : length of the signal
axopy.features.time.integrated_emg(x, axis=-1, keepdims=False)[source]

Sum over the rectified signal.

\[\text{IEMG} = \sum_{i=1}^{N} | x_{i} |\]
Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – IEMG of each channel.

Return type:

ndarray, shape (n_channels,)

axopy.features.time.logvar(x, axis=-1, keepdims=False)[source]

Log of the variance of the signal.

\[\text{log-var} = \log \left( \frac{1}{N} \sum_{i=1}^{N} \left(x_i - \mu \right)^2 \right)\]

For electrophysiological signals that are mean-zero, this is the log of the mean square value, making it similar to root_mean_square() but scaling differently (slower) with \(x\).

For EMG data recorded from forearm muscles, log-var has been found to relate to wrist angle fairly linearly [1]_.

Note: base-10 logarithm is used, though the base is not specified in [1]_.

Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – log-var of each channel.

Return type:

ndarray, shape (n_channels,)

References

[1]J. M. Hahne, F. Bießmann, N. Jiang, H. Rehbaum, D. Farina, F. C. Meinecke, K.-R. Müller, and L. C. Parra, “Linear and Nonlinear Regression Techniques for Simultaneous and Proportional Myoelectric Control,” IEEE Transactions on Neural Systems and Rehabilitation Engineering, vol. 22, no. 2, pp. 269–279, 2014.
axopy.features.time.mean_absolute_value(x, weights='mav', axis=-1, keepdims=False)[source]

Computes the mean absolute value (MAV) of each signal.

Mean absolute value is a popular feature for obtaining amplitude information from EMG, especially in gesture classification contexts [1]_.

There is an optional windowing function applied to the rectified signal, described as MAV1 and MAV2 in some references. A custom window can also be used. The general definition is given as:

\[\text{MAV} = \frac{1}{N} \sum_{i=1}^{N} w_i |x_i|\]

Normal MAV does not use a windowing function, equivalent to setting all \(w_i = 1\).

MAV1 refers to a rectangular window which de-emphasizes the beginning and ending of an input window. The first quarter of the input samples receive a weight of 0.5, the middle half of the input samples receive a weight of 1, and the final quarter recieves a weight of 0.5:

\[\begin{split}w_i = \begin{cases} 1, & \frac{N}{4} \leq i \leq \frac{3N}{4} \\ 0.5, & \text{otherwise} \end{cases}\end{split}\]

MAV2 uses a similar window structure to MAV1 (i.e. broken into first quarter, middle half, and final quarter), but the window is trapezoidal in shape, ramping from 0 to 1 over the first quarter and from 1 to 0 over the last quarter:

\[\begin{split}w_i = \begin{cases} 1, & \frac{N}{4} \leq i \leq \frac{3N}{4} \\ \frac{4i}{N}, & i < \frac{N}{4} \\ \frac{4(i - N)}{N}, & i > \frac{3N}{4} \end{cases}\end{split}\]
Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • weights (str or ndarray, optional) –

    Weights to use. Possible values:

    • ’mav’ : all samples in the signal are weighted equally (default).
    • ’mav1’ : rectangular window with the middle half of the signal receiving unit weight and the first and last quarters of the signal receiving half weight.
    • ’mav2’ : similar to ‘mav1’, but weights on the first and last quarters increase and decrease between 0 and 1 respectively, forming a trapezoidal window.
    • [ndarray] : user-supplied weights to apply. Must be a 1D array with the same length as the signals received in the compute method.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – MAV of each channel.

Return type:

ndarray, shape (n_channels,)

See also

axopy.features.util.inverted_t_window()
Generates the window for MAV1
axopy.features.util.trapezoidal_window()
Generates the window for MAV2

References

[1]B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993.
[2]A. Phinyomark, P. Phukpattaranont, and C. Limsakul, “Feature Reduction and Selection for EMG Signal Classification,” Expert Systems with Applications, vol. 39, no. 8, pp. 7420-7431, 2012.
axopy.features.time.root_mean_square(x, axis=-1, keepdims=False)[source]

Computes the root mean square of each signal.

RMS is a commonly used feature for extracting amplitude information from physiological signals.

\[\text{RMS} = \sqrt{\frac{1}{N} \sum_{i=1}^N x_i^2}\]
Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – RMS of each channel.

Return type:

ndarray, shape (n_channels,)

axopy.features.time.slope_sign_changes(x, threshold=0, axis=-1, keepdims=False)[source]

Computes the number of slope sign changes (SSC) of each signal.

A slope sign change occurs when the middle value of a group of three adjacent values in the signal is either greater than or less than both of the other two.

Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • threshold (float, optional) – A threshold for discriminating true slope sign changes from those caused by low-level noise fluctuating about a specific value. By default, no threshold is used, so every slope sign change in the signal is counted.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – SSC of each channel.

Return type:

ndarray, shape (n_channels,)

References

[1]B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993.
axopy.features.time.waveform_length(x, axis=-1, keepdims=False)[source]

Computes the waveform length (WL) of each signal.

Waveform length is the sum of the absolute value of the deltas between adjacent values (in time) of the signal:

\[\text{WL} = \sum_{i=1}^{N-1} | x_{i+1} - x_i |\]
Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – WL of each channel.

Return type:

ndarray, shape (n_channels,)

References

[1]B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993.
axopy.features.time.zero_crossings(x, threshold=0, axis=-1, keepdims=False)[source]

Computes the number of zero crossings (ZC) of each signal.

A zero crossing occurs when two adjacent values (in time) of the signal have opposite sign. A threshold is used to mitigate the effect of noise around zero. It is used as a measure of frequency information.

Parameters:
  • x (ndarray) – Input data. Use the axis argument to specify the “time axis”.
  • threshold (float, optional) – A threshold for discriminating true zero crossings from those caused by low-level noise situated about zero. By default, no threshold is used, so every sign change in the signal is counted.
  • axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
  • keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns:

y – ZC of each channel.

Return type:

ndarray, shape (n_channels,)

References

[1]B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993.

gui

main
class axopy.gui.main.Container[source]

Graphics container for tasks.

set_layout(layout)[source]

Set the layout of the container.

Parameters:layout (QLayout) – Any QLayout is OK to add.
set_widget(widget)[source]

Set the widget containing all graphical elements.

Parameters:widget (QWidget) – Any QWidget is OK to add.

See also

axopy.gui.canvas()
Canvas widget and canvas items that can be added to the container.
axopy.gui.graph()
Plotting widgets that can be added to the container.
show()[source]

Show the container in the active application.

This is not normally needed, unless you’re testing out a GUI and using gui_check().

axopy.gui.main.get_qtapp()[source]

Get a QApplication instance running.

Returns the current QApplication instance if it exists and creates it otherwise.

Examples

This function is primarily for internal usage, but it is exposed to make it convenient to test graphical code without all of the experiment/task machinery.

from axopy.gui.main import get_qtapp, Container

# get the application instance first, before creating widgets etc.
app = get_qtapp()

con = Container()

# call show() to show the widget, then run the application
con.show()
app.exec_()
axopy.gui.main.gui_check()[source]

Check graphical interface code interactively.

This function makes it convenient to test graphical code without all of the experiment/task machinery. You can create a Container, add things to the container, and then call this function with the container to run the GUI and try it out.

Note

Be sure to call Container.show() at the end to display the container.

Examples

Minimal example

from axopy.gui.main import Container, gui_check

with gui_check():
    con = Container()
    con.show()
canvas

2D canvas style graphics functionality backed by Qt’s QGraphicsView.

class axopy.gui.canvas.Canvas(draw_border=True, bg_color=None, border_color=None, parent=None, invert_x=False, invert_y=False)[source]

A 2D canvas interface implemented using a QGraphicsView.

This view essentially just holds a QGraphicsScene that grows to fit the size of the view, keeping the aspect ratio square. The scene is displayed with a gray (by default) border.

See Qt’s documentation for more information about working with QGraphicsView (https://doc.qt.io/Qt-5/qgraphicsview.html).

add_item(item)[source]

Add an item to the canvas.

Parameters:item (Item or QGraphicsItem) – The item to add to the canvas. This can be either one of AxoPy’s built-in items (Circle, Text, etc.) or any QGraphicsItem.
resizeEvent(self, QResizeEvent)[source]
class axopy.gui.canvas.Circle(diameter, color='#333333')[source]

Circular item.

The coordinates of this item correspond to the center of the circle.

Parameters:
  • dia (float) – Diameter of the circle with respect to the scene coordinate system.
  • color (str) – Hex string to set the color of the circle. You can use the underlying qitem attribute to get the underlying QGraphicsEllipseItem to set stroke color vs. fill color, etc. if needed.
class axopy.gui.canvas.Cross(size=0.05, linewidth=0.01, color='#333333')[source]

Collection of two lines oriented as a “plus sign”.

The coordinates of this item correspond to the center of the cross. This item’s qitem attribute is a QGraphicsItemGroup (a group of two lines).

Parameters:
  • size (float) – The size is the length of each line making up the cross.
  • linewidth (float) – Thickness of each line making up the cross.
  • color (str) – Color of the lines making up the cross.
color

Color of the lines in the cross.

class axopy.gui.canvas.Item(qitem)[source]

Canvas item base class.

This is simply a wrapper around any kind of QGraphicsItem, adding the ability to set some properties of the underlying item with a more Pythonic API. You can always access the QGraphicsItem with the qitem attribute. Once you know what kind of QGraphicsItem is being wrapped, you can use the corresponding Qt documentation to make use of more complete functionality.

qitem

The QGraphicsItem being wrapped. You can use this attribute to access methods and properties of the item not exposed by the wrapper class. If you find yourself routinely using a method of the QGraphicsItem, consider recommending it for addition to AxoPy.

Type:QGraphicsItem
collides_with(item)[source]

Determine if the item intersects with another item.

color

Color of the item.

get(prop, *args, **kwargs)[source]

Get any property of the underlying QGraphicsItem.

hide()[source]

Set the item to invisible.

opacity

Opacity of the item (between 0 and 1).

pos

Both X and Y coordinates of the item in the canvas.

set(**kwargs)[source]

Set any properties of the underlying QGraphicsItem.

show()[source]

Set the item to visible.

visible

Visibility of the item.

x

X coordinate of the item in the canvas.

y

Y coordinate of the item in the canvas.

class axopy.gui.canvas.Line(x1, y1, x2, y2, width=0.01, color='#333333')[source]

Line item.

color

Color of the item.

class axopy.gui.canvas.Rectangle(width, height, x=0, y=0, color='#333333', penwidth=0.01)[source]

Rectangular item.

This is a filled retangle that allows you to set the size, color, position, etc. By default, the item’s position is its center.

color

Color of the rectangle.

class axopy.gui.canvas.Text(text, color='#333333')[source]

Text item.

graph

Widgets for plotting multi-channel signals.

class axopy.gui.graph.SignalWidget[source]

Scrolling oscilloscope-like widget for displaying real-time signals.

Intended for multi-channel viewing, each channel gets its own row in the widget, and all channels share y-axis zoom.

plot(data)[source]

Adds a window of data to the widget.

Previous windows are scrolled to the left, and the new data is added to the end.

Parameters:data (ndarray, shape = (n_channels, n_samples)) – Window of data to add to the end of the currently-shown data.

pipeline

class axopy.pipeline.Block(name=None, hooks=None)[source]

Base class for all blocks.

Notes

Blocks should take their parameters in __init__ and provide at least the process method for taking in data and returning some result.

clear()[source]

Clear the state of the block.

Some blocks don’t keep stateful attributes, so clear does nothing by default.

process(data)[source]

Process input data and produce a result.

Subclasses must implement this method, otherwise it shouldn’t really be a Block.

class axopy.pipeline.Pipeline(blocks, name=None)[source]

Feedforward arrangement of blocks for processing data.

A Pipeline contains a set of Block objects which operate on data to produce a final output.

To create a pipeline, the following two rules are needed: blocks in a list processed in series, and blocks in a tuple are processed in parallel.

Blocks that are arranged to take multiple inputs should expect to take the corresponding number of inputs in the order they are given. It is up to the user constructing the pipeline to make sure that the arrangement of blocks makes sense.

Parameters:blocks (container) – The blocks in the pipline, with lists processed in series and tuples processed in parallel.
named_blocks

Dictionary of blocks in the pipeline. Keys are the names given to the blocks in the pipeline and values are the block objects.

Type:dict
clear()[source]

Calls the clear method on each block in the pipeline. The effect depends on the blocks themselves.

process(data)[source]

Calls the process method of each block in the pipeline, passing the outputs around as specified in the block structure.

Parameters:data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it.
Returns:out – The data output by the process method of the last block(s) in the pipeline.
Return type:object
class axopy.pipeline.Passthrough(blocks, expand_output=True, name=None)[source]

Convenience block for passing input along to output.

A passthrough pipeline block is useful when you want to process some data then provide both the processed output as well as the original input to another block downstream:

    -----------------------> x
   |
x ---> [ subpipeline ] ----> y
process(data)[source]

Calls the process method of each block in the pipeline, passing the outputs around as specified in the block structure.

Parameters:data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it.
Returns:out – The data output by the process method of the last block(s) in the pipeline.
Return type:object
class axopy.pipeline.Callable(func, func_args=None, func_kwargs=None, name=None, hooks=None)[source]

A Block that does not require persistent attributes.

Some Block implementations don’t require attributes to update on successive calls to the process method, but instead are essentially a function that can be called repeatedly. This class is for conveniently creating such a block.

If the function you want to use takes additional arguments, such as a keyword argument that

Note: if you use an anonymous function as the func argument, (e.g. lambda x: 2*x), it is recommended to explicitly give the block a meaningful name.

Parameters:
  • func (callable(x)) – Function that gets called when the block’s process method is called. Should take a single input and return output which is compatible with whatever is connected to the block.
  • func_args (list, optional) – List (or tuple) of additional arguments to pass to func when calling it for processing. If None (default), no arguments are used.
  • func_kwargs (dict) – Keyword argument name/value pairs to pass to func when calling it for processing. If None (default), no keyword arguments are used.
  • name (str, optional, default=None) – Name of the block. By default, the name of the processor function is used.
  • hooks (list, optional, default=None) – List of callables (callbacks) to run when after the block’s process method is called.
process(data)[source]

Process input data and produce a result.

Subclasses must implement this method, otherwise it shouldn’t really be a Block.

class axopy.pipeline.Windower(length)[source]

Windows incoming data to a specific length.

Takes new input data and combines with past data to maintain a sliding window with optional overlap. The window length is specified directly, so the overlap depends on the length of the input.

The input length may change on each iteration, but the Windower must be cleared before the number of channels can change.

Parameters:length (int) – Total number of samples to output on each iteration. This must be at least as large as the number of samples input to the windower on each iteration.

See also

axopy.pipeline.common.Ensure2D
Ensure input to the windower is 2D.

Examples

Basic use of a windower:

>>> import axopy.pipeline as pipeline
>>> import numpy as np
>>> win = pipeline.Windower(4)
>>> win.process(np.array([[1, 2], [3, 4]]))
array([[ 0.,  0.,  1.,  2.],
       [ 0.,  0.,  3.,  4.]])
>>> win.process(np.array([[7, 8], [5, 6]]))
array([[ 1.,  2.,  7.,  8.],
       [ 3.,  4.,  5.,  6.]])
>>> win.clear()
>>> win.process(np.array([[1, 2], [3, 4]]))
array([[ 0.,  0.,  1.,  2.],
       [ 0.,  0.,  3.,  4.]])

If your data is 1-dimensional (shape (n_samples,)), use an Ensure2D block in front of the Windower:

>>> win = pipeline.Windower(4)
>>> p = pipeline.Pipeline([pipeline.Ensure2D(), win])
>>> p.process(np.array([1, 2]))
array([[ 0.,  0.,  1.,  2.]])
clear()[source]

Clear the buffer containing previous input data.

process(data)[source]

Add new data to the end of the window.

Parameters:data (array, shape (n_channels, n_samples)) – Input data. n_samples must be less than or equal to the windower length.
Returns:out – Output window with the input data at the end.
Return type:array, shape (n_channels, length)
class axopy.pipeline.Centerer(name=None, hooks=None)[source]

Centers data by subtracting out its mean.

\[\tilde{x} = x - \sum_{i=0}^{N-1} x[i]\]
process(data)[source]

Center each row of the input.

Parameters:data (array, shape (n_channels, n_samples)) – Input data.
Returns:out – Input data that’s been centered.
Return type:array, shape (n_channels, n_samples)
class axopy.pipeline.Filter(b, a=1, overlap=0)[source]

Filters incoming data with a time domain filter.

This filter implementation takes filter coefficients that are designed by the user – it merely applies the filter to the input, remembering the final inputs/outputs from the previous update and using them as initial conditions for the current update.

Parameters:
  • b (ndarray) – Numerator polynomial coefficients of the filter.
  • a (ndarray, optional) – Denominator polynomial coefficients of the filter. Default is 1, meaning the filter is FIR.
  • overlap (int, optional) – Number of samples overlapping in consecutive inputs. Needed for correct filter initial conditions in each filtering operation. Default is 0, meaning the final inputs/outputs of the previous update are used.

See also

axopy.pipeline.common.Ensure2D
Ensure input to the filter is 2D.

Examples

Design a filter using scipy and use the coefficients:

>>> import axopy.pipeline as pipeline
>>> import numpy as np
>>> from scipy.signal import butter
>>> b, a = butter(4, 100/1000/2)
>>> f = pipeline.Filter(b, a)
>>> f.process(np.random.randn(1, 5)) # doctest: +ELLIPSIS
array([...

Use a filter in combination with a Windower, making sure to account for overlapping data in consecutive filtering operations. Here, we’ll use a window of length 5 and pass in 3 samples at a time, so there will be an overlap of 2 samples. The overlapping samples in each output will agree:

>>> w = pipeline.Windower(5)
>>> f = pipeline.Filter(b, a, overlap=2)
>>> p = pipeline.Pipeline([w, f])
>>> out1 = p.process(np.random.randn(1, 3))
>>> out2 = p.process(np.random.randn(1, 3))
>>> out1[:, -2:] == out2[:, :2]
array([[ True,  True]], dtype=bool)
clear()[source]

Clears the filter initial conditions.

Clearing the initial conditions is important when starting a new recording if overlap is nonzero.

process(data)[source]

Applies the filter to the input.

Parameters:data (ndarray, shape (n_channels, n_samples)) – Input signals.
class axopy.pipeline.FeatureExtractor(features, hooks=None)[source]

Computes multiple features from the input, concatenating the results.

Each feature should be able to take in the same data and output a 1D array, so overall output of the FeatureExtractor can be a single 1D array.

This block isn’t strictly necessary, since you could just apply multiple feature blocks in parallel and the result of each will be passed to the next block. However, the block following feature computation typically expects the input to be a single array (or row) per data sample.

Parameters:features (list) – List of (name, feature) tuples (i.e. implementing a compute method).
named_features

Dictionary of features accessed by name.

Type:dict
feature_indices

Dictionary of (start, stop) tuples indicating the bounds of each feature, accessed by name. Will be empty until after data is first passed through.

Type:dict
clear()[source]

Clears the output array.

This should be called if the input is going to change form in some way (i.e. the shape of the input array changes).

process(data)[source]

Run data through the list of features and concatenates the results.

The first pass (after a clear call) will be a little slow since the extractor needs to allocate the output array.

Parameters:data (array, shape (n_channels, n_samples)) – Input data. Must be appropriate for all features.
Returns:out
Return type:array, shape (n_features,)
class axopy.pipeline.Estimator(estimator)[source]

A pipeline block wrapper around scikit-learn’s idea of an estimator.

An estimator is an object that can be trained with some data (fit) and, once trained, can output predictions from novel inputs. A common use-case for this block is to utilize a scikit-learn pipeline in the context of a axopy pipeline.

Parameters:estimator (object) – An object implementing the scikit-learn Estimator interface (i.e. implementing fit and predict methods).
process(data)[source]

Calls the estimator’s predict method and returns the result.

class axopy.pipeline.Transformer(transformer, hooks=None)[source]

A pipeline block wrapper around scikit-learn’s idea of a transformer.

A transformer is trained with some data (fit) and, once trained, can output projections of the input data to some other space. A common example is projecting data in high-dimensional space to a lower-dimensional space using principal components analysis.

Parameters:transformer (object) – An object implementing the scikit-learn Transformer interface (i.e. implementing fit and transform methods).
process(data)[source]

Calls the transformer’s transform method and returns the result.

class axopy.pipeline.Ensure2D(orientation='row')[source]

Transforms an array to ensure it has 2 dimensions.

Input with shape (n,) can be made to have shape (n, 1) or (1, n).

Parameters:orientation ({'row', 'col'}, optional) – Orientation of the output. If ‘row’, the output will have shape (1, n), meaning the output is a row vector. This is the default behavior, useful when the data is something like samples of a 1-channel signal. If ‘col’, the output will have shape (n, 1), meaning the output is a column vector.

Examples

Output row data:

>>> import numpy as np
>>> import axopy.pipeline as pipeline
>>> block = pipeline.Ensure2D()
>>> block.process(np.array([1, 2, 3]))
array([[1, 2, 3]])

Output column data:

>>> block = pipeline.Ensure2D(orientation='col')
>>> block.process(np.array([1, 2, 3]))
array([[1],
       [2],
       [3]])
process(data)[source]

Make sure data is 2-dimensional.

If the input already has two dimensions, it is unaffected.

Parameters:data (array, shape (n,)) – Input data.
Returns:out – Output data, with shape specified by orientation.
Return type:array, shape (1, n) or (n, 1)
axopy.pipeline.segment(data, length, overlap=0)[source]

Generate segments of an array.

Each segment is of a specified length and optional overlap with the previous segment. Only segments of the specified length are retrieved (if segments don’t fit evenly into the data).

Parameters:
  • data (array, shape (n_channels, n_samples)) – Data to segment.
  • length (int) – Number of samples to retrieve in each chunk.
  • overlap (int, optional) – Number of overlapping samples in consecutive chunks.
Yields:

segment (array (n_channels, length)) – Segment of the input array.

Examples

Segment a 2-channel recording:

>>> import numpy as np
>>> from axopy.pipeline import segment
>>> x = np.arange(8).reshape(2, 4)
>>> x
array([[0, 1, 2, 3],
       [4, 5, 6, 7]])
>>> seg = segment(x, 2)
>>> next(seg)
array([[0, 1],
       [4, 5]])
>>> next(seg)
array([[2, 3],
       [6, 7]])

Consecutive segments with overlapping samples agree:

>>> seg = segment(x, 3, overlap=2)
>>> next(seg)
array([[0, 1, 2],
       [4, 5, 6]])
>>> next(seg)
array([[1, 2, 3],
       [5, 6, 7]])
axopy.pipeline.segment_indices(n, length, overlap=0)[source]

Generate indices to segment an array.

Each segment is of a specified length with optional overlap with the previous segment. Only segments of the specified length are retrieved if they don’t fit evenly into the the total length. The indices returned are meant to be used for slicing, e.g. data[:, from:to].

Parameters:
  • n (int) – Number of samples to segment up.
  • length (int) – Length of each segment.
  • overlap (int, optional) – Number of overlapping samples in consecutive segments.
Yields:
  • from (int) – Index of the beginning of the segment with respect to the input array.
  • to (int) – Index of the end of the segement with respect to the input array.

Examples

Basic usage – segment a 6-sample recording into segments of length 2:

>>> import numpy as np
>>> from axopy.pipeline import segment_indices
>>> list(segment_indices(6, 2))
[(0, 2), (2, 4), (4, 6)]

Overlapping segments:

>>> list(segment_indices(11, 5, overlap=2))
[(0, 5), (3, 8), (6, 11)]

storage

Experiment data storage.

There are two main use cases for the functionality in this module: reading/writing data during an experiment session, and reading data once an experiment is complete (i.e. for analysis). See the user guide for information on these use cases/api.jpeg/api.jpeg/api.jpeg.

class axopy.storage.Storage(root='data', allow_overwrite=False)[source]

Top-level data storage maintainer.

See the user guide for more information.

Parameters:
  • root (str, optional) – Path to the root of the data storage filestructure. By default, ‘data’ is used. If the directory doesn’t exist, it is created.
  • allow_overwrite (bool, optional) – Specifies whether or not the storage interface allows you to overwrite a task’s data for a subject if it already exists.
create_task(task_id)[source]

Create a task for the current subject.

Parameters:task_id (str) – The ID of the task to add. The name must not have been used for another task for the current subject.
Returns:writer – A new TaskWriter for storing task data.
Return type:TaskWriter
require_task(task_id)[source]

Retrieves a task for the current subject.

Parameters:task_id (str) – The ID of the task to look for. The task must have already been run with the current subject.
Returns:reader – A new TaskReader for working with the existing task data.
Return type:TaskReader
subject_id

The current subject ID.

When setting the subject ID for a new subject (i.e. one that doesn’t exist already), storage for that subject is created.

subject_ids

Generate subject IDs found in storage sorted in alphabetical order.

Returns:subject_id – ID of the subject found.
Return type:str
task_ids

Generate names of tasks found for the current subject.

Note that there may be no tasks found if the subject_id has not been set or if the subject hasn’t started any tasks. In this case, nothing is yielded.

to_zip(outfile)[source]

Create a ZIP archive from a data storage hierarchy.

For more information, see storage_to_zip().

class axopy.storage.TaskReader(root)[source]

High-level interface to task storage.

Parameters:root (str) – Path to task’s root directory. This is the directory specific to a task which contains a trials.csv file and HDF5 array files.
array(name)[source]

Retrieve an array type’s data for all trials.

iterarray(name)[source]

Iteratively retrieve an array for each trial.

Parameters:name (str) – Name of the array type.
pickle(name)[source]

Load a pickled object from storage.

Parameters:name (str) – Name of the pickled object (no extension).
trials

A Pandas DataFrame representing the trial data.

class axopy.storage.TaskWriter(root)[source]

The main interface for storing data from a task.

Usually you get a Taskwriter from Storage, so you don’t normally need to create one yourself.

Parameters:root (str) – Path to the task root (e.g. ‘data/subject_1/taskname’).
trials

TrialWriter for storing trial data.

Type:TrialWriter
pickle(obj, name)[source]

Write a generic object to storage.

This can be useful to persist an object from one task to another, or to store something that doesn’t easily fit into the AxoPy storage model (trial attributes and arrays). Be cautious, however, as pickles are not the best way to store things long-term nor securely. See the advice given here, for example: http://scikit-learn.org/stable/modules/model_persistence.html

Parameters:
  • obj (object) – The object to pickle.
  • name (str) – Name of the pickle to save (no extension).
write(trial)[source]

Write trial data.

This must be the last thing done for the current trial. That is, make sure all arrays have accumulated all data required. This method flushes trial and array data to files for you.

Important note: The trial’s arrays are cleared after writing.

Parameters:trial (Trial) – Tral data. See TrialWriter.write() and Trial for details.
class axopy.storage.TrialWriter(filepath)[source]

Writes trial data to a CSV file line by line.

Parameters:filepath (str) – Path to the file to create.
data

Dictionary containing all trial data written so far.

Type:dict
write(data)[source]

Add a single row to the trials dataset.

Data is immediately added to the file on disk.

Parameters:data (dict) – Data values to add.
axopy.storage.makedirs(path, exist_ok=False)[source]

Recursively create directories.

This is needed for Python versions earlier than 3.2, otherwise os.makedirs(path, exist_ok=True) would suffice.

Parameters:
  • path (str) – Path to directory to create.
  • exist_ok (bool, optional) – If exist_ok is False (default), an exception is raised. Set to True if it is acceptable that the directory already exists.
axopy.storage.read_hdf5(filepath, dataset='data')[source]

Read the contents of a dataset.

This function assumes the dataset in the HDF5 file exists at the root of the file (i.e. at ‘/’). It is primarily for internal usage but you may find it useful for quickly grabbing an array from an HDF5 file.

Parameters:
  • filepath (str) – Path to the file to read from.
  • dataset (str, optional) – Name of the dataset to retrieve. By default, ‘data’ is used.
Returns:

data – The data (read into memory) as a NumPy array. The dtype, shape, etc. is all determined by whatever is in the file.

Return type:

ndarray

axopy.storage.storage_to_zip(path, outfile=None)[source]

Create a ZIP archive from a data storage hierarchy.

The contents of the data storage hierarchy are all placed in the archive, with the top-level folder in the archive being the data storage root folder itself. That is, all paths within the ZIP file are relative to the dataset root folder.

Parameters:
  • path (str) – Path to the root of the dataset.
  • outfile (str, optional) – Name of the ZIP file to create. If not specified, the file is created in the same directory as the data root with the same name as the dataset root directory (with “.zip” added).
Returns:

outfile – The name of the ZIP file created.

Return type:

str

axopy.storage.write_hdf5(filepath, data, dataset='data')[source]

Write data to an hdf5 file.

The data is written to a new file with a single dataset called “data” in the root group. It is primarily for internal usage but you may find it useful for quickly writing an array to an HDF5 file.

Parameters:
  • filepath (str) – Path to the file to be written.
  • data (ndarray) – NumPy array containing the data to write. The dtype, shape, etc. of the resulting dataset in storage is determined by this array directly.
  • dataset (str, optional) – Name of the dataset to create. Default is ‘data’.

task

Base task implementation and some generic tasks.

See the user guide for information on implementing tasks.

class axopy.task.Task[source]

Base class for tasks.

This base class handles iteration through the trials of the task in blocks.

Most task implementations will want to override the prepare and run_trial methods, while the rest can be left to default behavior.

If you need to implement a custom constructor (__init__), you must call the base task __init__:

class CustomTask(Task):

    def __init__(self, custom_param):
        super(CustomTask, self).__init__()
trial

Dictionary containing the current trial’s attributes.

Type:dict
advance_block_key

Key for the user to press in order to advance to the next block. Can set to None to disable the feature (next block starts immediately after one finishes).

Type:str
finished

Emitted when the last trial of the last block has run. This is primarily for the axopy.experiment.Experiment to know when the task has finished so it can run the next one. You shouldn’t need to use this transmitter at all.

Type:Transmitter
connect(transmitter, receiver)[source]

Connect a transmitter to a receiver.

This method helps the task keep track of connections so that all of the manually specified connections can be torn down by the axopy.experiment.Experiment.

disconnect(transmitter, receiver)[source]

Disconnect a transmitter from a receiver.

disconnect_all()[source]

Disconnect all of the task’s manually-created connections.

finish()[source]

Clean up at the end of the task.

Override if you need to clean up once the task is completely finished. If you do override this method, you should call the base Task.finish() method or call the finished transmitter yourself.

finish_block()[source]

Finishes the block and starts the next one.

Override if you need to do some cleanup between blocks.

key_press(key)[source]

Handle key press events.

Override this method to receive key press events. Available keys can be found in axopy.util (named key_<keyname>, e.g. key_k).

Important note: if relying on the advance_block_key to advance the task, make sure to call this super implementation.

next_block()[source]

Get the next block of trials and starts running them.

Before starting the block, a prompt is shown to verify that the user is ready to proceed. If there are no more blocks to run, the finish method is called. You usually do not need to override this method.

next_trial()[source]

Get the next trial in the block and starts running it.

If there are no more trials in the block, the finish_block method is called.

prepare_daq(daqstream)[source]

Set up the input device, if applicable.

Parameters:daqstream (DaqStream) – Interface to the data acquisition device.
prepare_design(design)[source]

Callback for setting up the task design.

See axopy.design.Design for details on how to design the task. By default, nothing is added to the design.

Parameters:design (Design) – The task design object you can use to add blocks and trials.
prepare_graphics(container)[source]

Initialize graphical elements and messaging connections.

This method should be overridden if the task uses any graphics (which most do). It is important to defer initializing any graphical elements until this method is called so that the graphical backend has a chance to start.

Parameters:container (axopy.gui.Container) – The graphical container you can add objects to.
prepare_storage(storage)[source]

Initialize data storage.

Override to read or write task data. A axopy.storage.Storage object is given, which can be used to create a new axopy.storage.TaskWriter for storing new data or a axopy.storage.TaskReader for reading in existing data. Note that the subject ID has already been set.

Parameters:storage (Storage) – The top-level storage object with which new storage can be allocated and existing data can be read.
run()[source]

Start running the task.

Simply calls next_block to start running trials in the first block. This method is called automatically if the task is added to an Experiment. Tasks that have a block design shouldn’t normally need to override this method. Tasks that are “free-running” for experimenter interaction (e.g. a plot visualization task that the experimenter controls) should override.

run_trial(trial)[source]

Initiate a trial.

By default, this method does nothing. Override to implement what happens in a trial. When a trial is complete, use next_trial to start the next.

Parameters:trial (object) – Trial data. This is whatever data is put into the design passed in.
class axopy.task.Oscilloscope(pipeline=None)[source]

A visualizer for data acquisition devices.

This task connects to the experiment input DAQ and displays each of its channels on a separate plot. You can optionally pass a Pipeline object to preprocess the input data before displaying it.

Parameters:pipeline (Pipeline, optional) – Pipeline to run the input data through before displaying it. Often this is some preprocessing like filtering. It is often useful to use a Windower in the pipeline to display a larger chunk of data than is given on each input update of the DAQ. This gives a “scrolling” view of the input data, which can be helpful for experiment setup (e.g. placing electrodes, making sure the device is recording properly, etc.).
key_press(key)[source]

Handle key press events.

Override this method to receive key press events. Available keys can be found in axopy.util (named key_<keyname>, e.g. key_k).

Important note: if relying on the advance_block_key to advance the task, make sure to call this super implementation.

prepare_daq(daqstream)[source]

Set up the input device, if applicable.

Parameters:daqstream (DaqStream) – Interface to the data acquisition device.
prepare_graphics(container)[source]

Initialize graphical elements and messaging connections.

This method should be overridden if the task uses any graphics (which most do). It is important to defer initializing any graphical elements until this method is called so that the graphical backend has a chance to start.

Parameters:container (axopy.gui.Container) – The graphical container you can add objects to.
run()[source]

Start running the task.

Simply calls next_block to start running trials in the first block. This method is called automatically if the task is added to an Experiment. Tasks that have a block design shouldn’t normally need to override this method. Tasks that are “free-running” for experimenter interaction (e.g. a plot visualization task that the experimenter controls) should override.

timing

Utilities for keeping track of time in a task.

class axopy.timing.Counter(max_count=1, reset_on_timeout=True)[source]

Counts to a given number then transmits a timeout event.

Parameters:
  • max_count (int) – Number of iterations to go through before transmitting the timeout event. Must be greater than 1.
  • reset_on_timeout (bool, optional) – Specifies whether or not the timer should reset its count back to zero once the timeout event occurs. The default behavior is to reset.
count

Current count.

Type:int
timeout

Transmitted when max_count has been reached.

Type:Transmitter

Examples

Basic usage:

>>> from axopy.timing import Counter
>>> timer = Counter(2)
>>> timer.increment()
>>> timer.count
1
>>> timer.progress
0.5
>>> timer.increment()
>>> timer.count
0
increment()[source]

Increment the counter.

If max_count is reached, the timeout event is transmitted. If reset_on_timeout has been set to True (default), the timer is also reset.

progress

Progress toward timeout, from 0 to 1.

reset()[source]

Resets the count to 0 to start over.

class axopy.timing.Timer(duration)[source]

Real-time one-shot timer.

This is useful in situations where you want to wait for some amount of time and locking the timing to data acquisition updates is not important. For example, inserting a waiting period between trials of a task can be done by connecting the timeout transmitter to your task’s next_trial() method.

Parameters:duration (float) – Duration of the timer, in seconds.
timeout

Transmitted when the timer has finished.

Type:Transmitter
start()[source]

Start the timer.

stop()[source]

Stop the timer.

If you stop the timer early, the timeout event won’t be transmitted.

util

Development

Install

Retrieve the source code:

$ git clone git@github.com:axopy/axopy.git
$ cd axopy

A virtual environment is a good way to set up a development environment:

$ python -m venv .venv-dev
$ source .venv-dev/bin/activate

Once in the virtual environment, you can install AxoPy in “development mode” along with the development dependencies:

(.venv-dev) $ pip intall -e .[dev]

If you take a look at the setup.py file, you’ll see that this installs everything from the requirements.txt file as well as the requirements-dev.txt file. This should be everything needed to run the tests and build the documentation.

The Python Packaging Authority has much more detailed instructions here: https://packaging.python.org/guides/installing-using-pip-and-virtualenv/

Test

pytest is used to find tests and run them:

(.venv-dev) $ pytest

Document

To build the documentation locally, you can activate your dev environment, cd into the docs/ directory, and run one of the build rules, like:

(.venv-dev) $ cd docs/
(.venv-dev) $ make html

If you aren’t able to use make, you could run the sphinx commands manually. Look in the docs/Makefile to be sure, but it should be something like:

(.venv-dev) $ sphinx-build -b html . _build/html

Once the build completes, you can open _build/html/index.html with your browser to check the output.

Release

This section is relevant only if you’re an AxoPy maintainer. If you’re just interested in contributing to AxoPy, you can stop here.

PyPI

To cut a release, you’ll need the wheel and twine packages (these are not included in the dev requirements which are for every-day development and CI).

Start by bumping the version number in the axopy.version module, then build the source and wheel distributions:

(.venv-dev) $ python setup.py sdist bdist_wheel

Optional: If you want to check first that all is well before pushing to PyPI, you can upload the release packages to the test PyPI server first:

(.venv-dev) $ twine upload --repository-url https://test.pypi.org/legacy dist/*

Now you can use twine to upload the release to PyPI. Note that you should either remove everything from dist/ first (if just using the command below) or specify which files to upload:

(.venv-dev) $ twine upload dist/*

Once everything looks good, you can tag the version bump commit and push the tag up to GitHub.

conda-forge

After releasing on PyPI, you can update the release on conda-forge. Check their docs for insight into their process, but the following is sufficient now that the infrastructure is in place.

Start by forking the axopy-feedstock repo on GitHub. Edit the recipe/meta.yml file so its version string matches the PyPI version and copy the SHA256 hash for the source dist (sdist) package (the tar.gz file) from PyPI and paste it into the line below that. Commit the changes to your fork then make a pull request against the conda-forge repository. If you’re a maintainer, you have push access to the repository so once CI passes, go ahead and merge. The rest is automated.