AxoPy¶
AxoPy is a system for creating human-computer interface experiments involving the use of electrophysiological signals, such as electromyography (EMG) or electroencephalography (EEG). It is intended to provide an infrastructure for rapidly developing simple experiments while allowing for more complex designs.
This documentation is split into two major components: a user guide and API documentation. The API documentation covers what the different modules of AxoPy can do, and the user guide is intended to describe how to use these modules and how they work together.
Documentation Contents¶
Installation¶
There are two major options for installing AxoPy: pip and (Ana)conda.
See also
If you’re interested in developing AxoPy itself, see the Development documentation.
pip¶
If you like to use a systemwide Python installation (such as the Python provided by your package manager on Linux or the official installer for Windows), you can use pip to install AxoPy from PyPI:
$ pip install axopy
You may also want to make use of venv to create a virtual environment first. This would allow you to install several different versions of AxoPy for different projects, for example:
$ python -m venv .venv
$ source .venv/bin/activate
(.venv) $ pip install axopy
Note that the second command above depends on your platform. See the venv documentation for more information.
conda¶
AxoPy can also be installed with miniconda or Anaconda via the conda-forge channel:
$ conda install -c conda-forge axopy
Similarly to the instructions above for pip installation, you may want to create a separate conda environment before installing:
$ conda create -c conda-forge -n axopy axopy
User Guide¶
This is a guide that covers most of AxoPy’s functionality in a narrative style. If you are new to AxoPy, this section of the documentation should be helpful in getting started implementing experiments.
Experiment Setup¶
The overall structure of an AxoPy application is handled by the
Experiment
. You can think of the Experiment
as a manager of
a number of tasks that, when run in succession, form an actual experimental
protocol. Let’s get started with AxoPy by immediately writing a bit of code to
produce a running experiment. We can then re-run the application after making
a number of changes to get a feel for how to set up an Experiment
.
Hello, Experiment¶
AxoPy is written for experiments that involve collecting data from a hardware
input device and producing visual [1] feedback to the subject. For most of
our examples, we’ll make use of the built-in Oscilloscope
task and a built-in device that works without requiring special hardware, like
the NoiseGenerator
. So here’s how we use those two items
to put together a simple but functioning experiment:
import axopy
daq = axopy.daq.NoiseGenerator()
exp = axopy.experiment.Experiment(daq=daq)
exp.run(axopy.task.Oscilloscope())
We create the Experiment
object with
a NoiseGenerator
as the input device (or DAQ, short for
data acquisition), then run the experiment with
Oscilloscope
as the sole task to run.
It’s worth noting here that AxoPy’s submodules (e.g. experiment, daq, etc.) are useful for organizing the package into logical parts, but it can be annoying to type the module names repeatedly. You can write the above example with more verbose imports like the following so the code itself is a little more succinct:
from axopy.daq import NoiseGenerator
from axopy.experiment import Experiment
from axopy.task import Oscilloscope
daq = NoiseGenerator()
exp = Experiment(daq=daq)
exp.run(Oscilloscope())
When you run this example, you’ll notice the first thing that happens is
a dialog window pops up prompting you to enter a subject ID. The
Experiment
needs a subject ID so that it can set up data storage. Once the subject ID is entered and accepted, you’ll see a screen
that says “Ready”. This screen is shown in between all tasks in the
experiment—hit the Enter
or Return
key to accept the prompt and start
the task. You should then see an oscilloscope widget displaying a randomly
generated signal in real time. You can press Enter
again to finish the task
(this is specific to Oscilloscope
which is a “free-running” task).
When the task finishes, the Experiment
looks for the next task to run.
Since there aren’t any more, the application exits.
[1] | At least visual. For now, AxoPy doesn’t have a standardized way to talk to other kinds of feedback-producing devices (an audio output module would be cool, PRs welcome). That said, AxoPy doesn’t do anything to prevent you from working with them either. |
Experiment Configuration¶
Human-computer interface study designs often include one or more of the following complications:
- subjects are split into groups
- subjects are tested over multiple sessions
- subjects fall into categories that require different configuration (e.g. mirror the screen contents for left-hand dominant subjects)
For these cases, Experiment
provides the option to run a configuration
step between creation of the experiment object and running the tasks. The
options are entered in the same dialog window where you entered the subject ID
in the example above. This allows you to set options on your tasks before
running them or even run an entirely different list of tasks. It also means the
person running an experiment (which isn’t necessarily the person who wrote the
experiment code) doesn’t need to know how to write some configuration file or
anything — they just run the experiment application and can enter the details
in a graphical widget.
The Experiment.configure()
method accepts as many configuration options
as you want. You specify each one by providing a keyword argument with the
option’s type (e.g. str
, int
, float
) as the value, and it returns
a dictionary with the values entered.
For example, say we want to input the subject’s age. We can do that with an
int
option called age
:
from axopy.experiment import Experiment
exp = Experiment()
config = exp.configure(age=int)
print(config['age'])
If you run the code above, a dialog box will pop up just like it did for the
first example, but now a text box for the subject ID and the age is shown.
Note that you do not have to specify subject
as an option—this is done
for you. It’s up to you to handle the configuration options and modify how the
experiment runs based on them.
Aside from primitive types like int
, str
, or float
, you can
enumerate all possible values for a configuration option, and these will be
available to select in a combo box (drop-down menu). This way, the researcher
running the experiment can’t enter an invalid value:
exp.configure(hand=('right', 'left'))
Tips for Experiment Writing¶
The Experiment
class accepts a couple other keyword arguments that can
be useful when debugging and/or developing an experiment application. You can
specify a subject
argument so that the configuration dialog isn’t shown
when the Experiment
is run:
from axopy.experiment import Experiment
from axopy.task import Oscilloscope
from axopy.daq import NoiseGenerator
exp = Experiment(daq=NoiseGenerator(), subject='test')
exp.run(Oscilloscope())
By default, if you run any tasks that write data to storage, AxoPy will
complain and exit if you attempt to overwrite any data that exists. This will
happen if you repeatedly run the Experiment
with the same subject ID,
so it can be useful (in conjunction with the subject
keyword argument) to
set allow_overwrite=True
as well, quelling the error regarding overwriting
data:
exp = Experiment(subject='test', allow_overwrite=True)
This setup is pretty handy when developing an experiment, just remember to switch it off! One way to make this a little more robust is to add a flag to your application so you have to explicitly enable this “debugging mode”.
How It Works¶
Skippable unless you want to dig into working on AxoPy itself
The Experiment
manages a PyQt5 application and is responsible for
giving each task a graphical container within the Qt application, access to
hardware inputs, and data storage. The task implementation is responsible for
making use of these experiment-wide resources and then handing control back to
the experiment so it can run the next task.
Next Up¶
Now that we have an experiment running and the ability to set up some configuration options if needed, let’s look at how to write tasks.
Implementing Tasks¶
In AxoPy, the individual components of an experiment are tasks. In essence, a task does any or all of the following:
- Takes in data from previous tasks (read)
- Streams data from a data acquisition device (daq)
- Processes the streaming or task data (proc)
- Draws things to the screen (ui)
- Outputs data of its own (write)
One example of a task is the Oscilloscope
which we saw on the previous
page. It is responsible for streaming data from a data acquisition device (daq)
and displaying it in a dynamic plot (ui). The purpose of this task is usually
just to allow the researcher to visually verify the integrity of the data
coming in from the device before proceeding with the rest of the experiment.
Another example of a task is a cursor control task (subject produces input in
attempt to hit targets on the screen). This kind of task reads in and processes
data from an input device (daq), displays some information on screen to give
feedback to the subject (ui), and records some data for post-experiment analysis
(write). It may also require some calibration parameters from a previous task
(read). This is a fairly complex task with an enormous number of possible
implementations, so there’s no built-in CursorControlTask
.
There isn’t really a strict definition of what a single task is or what it should or shouldn’t do, but it’s a good idea to make tasks as simple as possible — any given task should do just a couple things and do them well. This encourages modularity so you can re-use task implementations in different experiments, etc.
In terms of the AxoPy API, a task looks like the following:

In this part of the user guide, we’ll go through how to make each of the four connections in the diagram and refer to separate documents for the details of working with those four components.
The Task Lifecycle¶
AxoPy experiments are event-driven, following the way graphical user interface
frameworks tend to operate. This can be an unfamiliar way of writing programs,
so it’s important to understand the overall idea before seeing some of the
details. Tasks in an experiment all go through the same lifecycle, shown below.
First, the Task
instances are created (by you) and then they’re handed
off to an Experiment
, like we saw in the previous
section:
exp = Experiment(...)
exp.run(Oscilloscope())
Once you call run()
, the
Experiment collects the task objects passed in, sets up the shared resources
(data storage, data acquisition, graphical backend), then proceeds to prepare
and run each task in sequence. That means pretty much all of the functionality
of the experiment is defined in the Task
classes.
The most important thing to understand about tasks is that they’re written by
defining what should happen in response to certain events. For example, the
Oscilloscope
task defines a method that gets called every time there
is new data available from the data acquisition stream, allowing it to update
the signals displayed on the screen. This is sometimes referred to as
a callback. You can think of the Experiment as running an infinite loop
checking for events that occur, then dispatching the data from those events to
the task if appropriate.
There are a standard set of methods that are automatically run by the
Experiment
the task belongs to, and you can
optionally implement these methods to make use of the shared resources that the
Experiment
manages. These are the prepare
methods: prepare_design
, prepare_storage
, prepare_input_stream
, and
prepare_graphics
.
Say you’re writing a task that makes use of data storage only (read and write).
A common example of this is processing some data to make it suitable for other
tasks later on in the experiment. To interact with the storage functionality
set up by the Experiment, your class should implement the
Task.prepare_storage()
method. If you click on the link to the API
documentation for that method, you’ll see that
a Storage
object is passed into this method, which is
provided by the Experiment. We’ll see more details about setting up storage
specifically later on, but for the sake of the example, it’s sufficient to
point out that the storage object lets you create reader and/or writer objects
that you can save for use later on in your task:
from axopy.task import Task
class MyTask1(Task):
def prepare_storage(self, storage):
# here's where we can use the storage object to read data from other
# tasks or write our own data to storage
The Task.prepare_design()
is slightly different from the others in that
it’s not actually for setting up a shared resource. It’s actually just an
opportunity for your task to build a Design
so
that it can easily be written in terms of a series of repeated trials.
After the rest of the prepare
methods are called, the Task.run()
method is called. This is where your task should set up its own events and
start running. In “experiment tasks” (those implementing prepare_design
),
the flow of the task proceeds through it’s Design
by calling
Task.next_trial`
.
There are two main ways for a task to end. One is by calling
Task.finished()
somewhere in your task implementation. This signals to
the Experiment
that the task is done, then the
Task.finish()
method is called so you can clean up anything you need to
before the next task runs. A common example of cleanup is to make sure the
DAQ is stopped.
The flowchart below shows the lifecycle of a Task
when it’s run by an
Experiment
.

Data Acquisition¶
Traditionally, data acquisition (DAQ) refers to the process of capturing and conditioning signals for recording by a computer. In AxoPy, any source of data generated or influenced by the subject of the experiment is referred to as a DAQ. AxoPy doesn’t include built-in support for hardware aside from things commonly available with just a desktop computer (mouse, keyboard).
AxoPy assumes a fairly simple model for collecting data, based on polling. First, the interface is set up – this might involve initializing a USB interface, connecting to a TCP server, setting up initial parameters, etc. Next, data acquisition is started. Some devices don’t require an explicit start command, but some do. Next, you request data from the device. This is a blocking operation, meaning the request won’t return the data until the data is ready. You’re then free to process, display, or save this data. Then, you request the next batch of data with another request. It is important to make sure consecutive requests occur frequently enough that you don’t fall behind.
For example, imagine you set up a device to acquire data at 1000 Hz in bunches of 100 samples:
from axopy.daq import NoiseGenerator
daq = NoiseGenerator(rate=1000, read_size=100)
daq.start() # NoiseGenerator doesn't require this, but most do
for i in range(10):
data = daq.read()
process_data(data)
daq.stop() # again, NoiseGenerator doesn't require this
Here, you’ll want to ensure that the process_data()
function does not take
longer than 100 ms to complete, or data acquisition will fall behind the rate
at which it is generated.
Some DAQs are built in to AxoPy, but of course not all of them can be. Check out pymcc and pytrigno for a couple examples of working with real data acquisition hardware.
The DaqStream¶
One thing to notice about the code above is that every time the daq.read()
operation occurs, no other code is being run while waiting for the device to
return the new data. This is sometimes referred to as a blocking operation. In
AxoPy, we usually want some things to be happening while the device is
reading in data in the background. This where the DaqStream
comes in
– a threaded interface to the underlying hardware.
You’ll usually set up your DAQ as above (e.g. daq = NoiseGenerator(...)
),
pass it to the Experiment
as a shared resource, then
make use of the DaqStream
object made available by the Experiment in
your Task implementation. The DaqStream
has a uniform interface so no
matter what kind of hardware you’re using, your task implementation doesn’t
need to care about how that all works. You just start/stop and
connect/disconnect from the stream. In order to facilitate this uniform
interface, the device the DaqStream
wraps needs to expose a specific
API as well. This is defined below:
DAQ API
start() - Called once before the first read.
read() - Request a new buffer of data from the hardware. Parameters (like
the size of the buffer or number of samples to read) should be
set up in the daq constructor. Canonical AxoPy devices generate
a NumPy ndarray with shape (n_channels, n_samples), but the only
real restriction is that your Pipelines can consume the data
generated by your device.
stop() - Called when the user wants the device to stop reading data.
In addition, the DAQ implementation should raise an IOError
if something
goes wrong during data acquisition.
An example of setting up a DaqStream
in the context of a custom
Task
is given in the recipes page.
Data Storage¶
The AxoPy data storage model was created so that your experimental dataset can be easily passed on to others and read using a variety of tools on any platform, meaning AxoPy is not required to use the raw dataset. It was also designed so that you can browse the dataset using a standard file browser so you do not need to write any code to have a peek.
Data is stored in a hierarchical fashion using a specific file structure and common file types. There are two types of files: comma separated value (CSV) files for trial data (one row per trial) and HDF5 files for array data (one file per trial). Here’s the file structure in abstract terms:
data/
subject_id/
task_name/
file: trials.csv
file: array_type1.hdf5
file: array_type2.hdf5
You have the root of the entire dataset, containing a subfolder for each
subject. Each subject folder contains a set of subfolders, one per task. The
task subfolders contain a single CSV file which contains all of the
attributes (scalars) for each trial, and it contains HDF5 files which store
array data, one for each kind of array (e.g. an emg.hdf5
file containing
raw electrophysiological recordings for each trial and a cursor.hdf5
file
containing cursor positions for each trial).
As an concrete example, suppose you are running an experiment where subjects are tasked with contracting a muscle to a specified level, determined using electromyography (EMG). For each trial, you want to store the following:
- the time it took for the subject to reach the desired contraction level for each trial
- the number of times the contraction level went past the desired level (overshoots)
- the raw EMG signals, which are recorded at 2 kHz
- the current “level of contraction,” which is computed by processing the EMG signals through some processing pipeline you have set up at 10 Hz
The trial data variables here are time to target and overshoots, so these are placed in a CSV file with one row per trial:
trial | time_to_target | overshoots |
---|---|---|
0 | 3.271942 | 1 |
1 | 2.159271 | 0 |
2 | 3.212450 | 2 |
Since you have two different array-like entities to store (raw EMG data at
2 kHz and processed position at 10 Hz), you create two different array types:
emg
and level
. They are placed in separate subfolders of the task and
each one is stored as an array in a HDF5 file, with one HDF5 dataset (in the
root group) per trial. The result of all of this is a structure that looks
like:
data_root/
subject_id/
contraction_level_task/
file: trials.csv
file: emg.hdf5
file: level.hdf5
The HDF5 format was chosen for all array data because it naturally works with
NumPy arrays, which are the assumed container for data as it goes from
a hardware device through processing code to computer interaction. It also
saves the arrays in a binary format instead of converting to strings as
something like numpy.savetxt
would do, potentially reducing the size of
a whole experiment’s dataset significantly if you store many arrays
representing high-frequency electrophysiological recordings.
The goals of this storage layout are to be simple to implement and reason about, to allow for manual browsing of the dataset, and to enable simultaneous sessions (i.e. multiple researchers running the experiment with different subjects) with a very simple and intuitive data merging procedure (i.e. just designate a single root folder and move all subject data there). The layout is not optimized for processing and analyzing data once an experiment is complete, however. For that, see Data Consolidation.
Experiment-Level Storage¶
The top-level Storage
class handles the first two layers of the
storage hierarchy: subjects and tasks. It is initialized at the beginning of
each session and (e.g. once per subject for a single-session experiment) and
the data storage hierarchy is built for each subject. Initializing and adding
subjects is typically handled for you by axopy.experiment.Experiment
in the context of running an experiment. Once a task is given access to the
Storage
object, however, it is up to the task implementation to set up
TaskReader
objects for reading data from other tasks and
TaskWriter
objects for storing its own data. This is done by calling
Storage.require_task()
and Storage.create_task()
, respectively.
Task Storage¶
Task storage is designed to make implementing a task’s data reading and writing as simple as possible, while being flexible enough to accommodate different kinds of experimental design. If you are interested in processing data after an experiment has been run, see the Data Consolidation documentation.
Reading Data from Another Task¶
The TaskReader
is used for reading in data from another task. In the
context of an experiment, you would access a reader with
Storage.require_task()
, passing in the name of the task (i.e. the name of
the directory corresponding to the task). You can then access the trial data
(attrs) with the trials
attribute, which returns
a pandas DataFrame
object. You can also access array data either by reading
it all at once (arrays for each trial are stacked) or by iterating over each
trial’s array.
Keeping with our example above, suppose we want to run the EMG data from the
contraction_level_task
through a processing pipeline.
# storage can either be created for post-processing
# or it can be given to us if this is another task implementation
reader = storage.require_task('contraction_level_task')
for emg in reader.iterarray('emg'):
# emg holds the EMG data for a single trial
out = pipeline.process(emg)
...
It is also common to need the trial attributes while iterating over the trial
arrays, and this can be achieved using zip
and the DataFrame.iterrows
method:
for (i, trial_attrs), emg in zip(reader.trials.iterrows(),
reader.iterarray('emg')):
if trial_attrs['time_to_target'] > 5.0:
continue
out = pipeline.process(emg)
...
Data Consolidation¶
Most of the data reading and writing functionality implemented in AxoPy is designed to make implementing an experiment as easy as possible, but there are some convenience functions for compiling an experiment’s dataset into something more amenable to post-processing and analysis.
Archiving Raw Data¶
In most cases, you’ll want to archive your entire untouched dataset once an
experiment is complete, or maybe even periodically as an experiment is
performed. For this purpose, there is the storage_to_zip()
function,
which creates a ZIP archive of the data contained in the root storage
directory. It’s usage is fairly simple, since it does a simple task. You pass
it the path to your data storage root directory, which can be relative to the
directory you run the function from. Let’s say you have some data in a folder
called experiment01_data
:
>>> from axopy.storage import storage_to_zip
>>> storage_to_zip('experiment01_data')
There should now be a file called experiment01_data.zip
in the current
directory, containing a copy of the whole dataset hierarchy. You can also
specify an output file if you don’t like the default:
>>> from axopy.storage import storage_to_zip
>>> storage_to_zip('experiment01_data', outfile='dataset.zip')
Graphics¶
Each task in an AxoPy experiment is given a Container
. The container
is effectively an empty QWidget
from Qt, so you can set up its contents
quite flexibly. That is, any valid QWidget
or QLayout
can be used as
the container’s contents, so you can create arbitrarily complex graphics for
a task.
To set up graphics for a task, override the Task.prepare_graphics
method, which takes the Container
as an input argument, then use Container.set_widget()
to establish the
main widget for the task.
from axopy.task import Task
class CanvasTask(Task):
def prepare_graphics(self, container):
# set up graphical widget/layout here
widget = ...
container.set_widget(widget)
While you can always set up completely custom graphics using PyQt5 classes directly, AxoPy includes some graphical elements commonly used in human-computer interface experiments, making it possible to write experiments without knowing how to use Qt.
Note
In the examples below, get_qtapp()
will be used to demonstrate
different graphical widgets and layouts. This function creates or retrieves
a QApplication
instance. We can then use app.exec_()
to run the Qt
event loop and test out the graphics code.
Built-In Graphics Widgets¶
Canvas Graphics¶
The axopy.gui.canvas
module contains a Canvas
class which can
be directly inserted into a container. You can then add items like
a Circle
or Text
to the canvas. In the context of a task, you
can create a canvas as follows:
from axopy.gui.main import get_qtapp
from axopy.gui.canvas import Canvas, Circle
app = get_qtapp()
canvas = Canvas()
canvas.add_item(Circle(0.1, color='red'))
canvas.show()
app.exec_()
All of the built-in items inherit from the Item
class, which means
they all have a number of properties that can be set, such as the position and
visibility.
canvas = Canvas()
circle = Circle(0.1)
canvas.add_item(circle)
# set the x coordinate
circle.x = 0.5
# read the y coordinate
y = circle.y
# hide the circle
circle.hide()
All of the Item
classes are actually just wrappers around
QGraphicsItem classes. In general, the various subclasses of QGraphicsItem
(e.g. QGraphicsEllipseItem
) have a large number of methods that may not be
exposed by AxoPy, so all items have a qitem
attribute pointing to
the underlying QGraphicsItem
. For example, the Line
item wraps
a QGraphicsLineItem. In AxoPy, a line is just a solid line with a specific cap
style. If you need to customize this behavior, you can use the qitem
attribute and dig into the Qt API:
from axopy.gui.canvas import Line
# horizontal line 0.4 units long
line = Line(-0.2, 0, 0.2, 0)
Processing Data¶
AxoPy’s pipeline
subpackage is a small infrastructure for processing data
in a pipeline style. You create pipeline blocks, then connect them up with an
efficient (but still readable) syntax. It was originally created for flexibly
creating pipelines in real-time signal processing applications, but it can be
useful in offline applications as well.
In pipeline
, data processing is implemented through a Pipeline
.
A pipeline is a series of processing routines for transforming raw input data
(e.g. electrophysiological data such as EMG) into useful output, such as the
velocity of a cursor on the screen. These routines can usually be broken down
into blocks which have common functionality.
Common Blocks¶
The typical picture for an electrophysiological signal processing pipeline looks something like:
Input
↓
┌──────────────────────┐
│ Windowing │
└──────────────────────┘
↓
┌──────────────────────┐
│ Conditioning │
└──────────────────────┘
↓
┌──────────────────────┐
│ Feature Extraction │
└──────────────────────┘
↓
┌──────────────────────┐
│ Intent Recognition │
└──────────────────────┘
↓
┌──────────────────────┐
│ Output Mapping │
└──────────────────────┘
↓
Output
Each block in this example is really a type of processing block, and the actual processing involved in each can vary. Some common cases are implemented, but creating custom blocks and connecting them together in a pipeline structure is simple. Also, the picture above shows a simple series structure, where each block takes input only from the block before it. More complex structures are sometimes convenient or necessary, and some complexity is supported.
Windowing¶
Windowing involves specifying a time window over which the rest of the pipeline will operate. That is, a windower keeps track of the current input data and optionally some data from the past, concatentating the two and passing it along. This is useful for calculating statistics over a sufficient sample size while updating the pipeline output at a rapid rate, achieved by overlapping windows. In an offline processing context (i.e. processing static recordings), windowing also specifies how much data to read in on each iteration through the recording.
Windowing is handled by a Windower
.
Conditioning¶
Raw data conditioning (or pre-processing) usually involves things like filtering and normalization. Usually the output of a conditioning block does not fundamentally change the representation of the input.
Feature Extraction¶
Features are statistics computed on a window of input data. Generally, they should represent the information contained in the raw input in a compact way. For example, you might take 100 samples of data from six channels of EMG and calculate the root-mean-square value of each channel during that 100-sample window of time. This results in an array of length 6 which represents the amplitude of each channel in the high-dimensional raw data. A feature extractor is just a collection of features to compute from the input.
Features in pipeline
are classes that take all of their parameters in
__init__
and perform their operation on the input in a compute
method.
Features are typically used by adding a handful of them to
a FeatureExtractor
and putting that extractor in
a Pipeline
.
Intent Recognition¶
Intent recognition is the prediction or estimation of what the user intends to do based on the signals generated. An example would be a large signal sensed at the group of extensor muscles in the forearm for communicating “wrist extension.” Sometimes this mapping can be specified a priori, but most of the time we rely on machine learning techniques to infer this mapping from training data.
Connecting Blocks¶
The core
module is a small infrastructure for processing data in
a pipeline style. You create or use the built-in Block
objects, then
connect them up with an efficient (but still readable) syntax
with a Pipeline
.
The syntax for expressing pipeline structure is based on lists and tuples. Lists hold elements that are connected in series:
[a, b]:
─a─b─
The input is whatever a
takes, and the output is whatever b
outputs.
Tuples hold elements that are connected in parallel:
(a, b):
┌─a─┐
─┤ ┝━
└─b─┘
The input goes to both a
and b
, and the output is whatever a
and
b
output in a list. If we connect another element in series with a parallel
block, it must be a block that handles multiple inputs:
[(a, b), c]:
┌─a─┐
─┤ ┝━c─
└─b─┘
The bottom line is: pipeline blocks accept input types and they specify the output types. You are responsible for ensuring that pipeline blocks can be connected as specified.
Sometimes, you might want to pass the output of a block to some block structure
and somewhere downstream. To handle this case, there is
a PassthroughPipeline
that you can use as a block within another
pipeline:
passthrough pipeline p ← (b, c):
┌─────┐
├─b─┐ │
─┤ ┝━┷━
└─c─┘
[a, p, d]:
┌─────┐
├─b─┐ │
─a─p━d─ → ─a─┤ ┝━┷━d─
└─c─┘
The pass-through pipeline places its own output(s) after its input, so the input is accesible on the other side. There are cases where this type of structure is possible with a list/tuple expression, but sometimes the pass-through pipeline as a block is needed. The above example is one of those cases.
Implementing Pipeline Blocks¶
Pipeline blocks are simple to implement. It is only expected that you implement
a process()
method which takes one argument (data
) and returns
something. For multi-input blocks, you’ll probably want to just expand the
inputs right off the bat (e.g. in_a, in_b = data
). Usually, the output is
some processed form of the input data:
import axopy.pipeline as pipeline
class FooBlock(pipeline.Block):
def process(self, data):
return data + 1
class BarBlock(pipeline.Block):
def process(self, data):
return 2 * data
With some blocks implemented, the list/tuple syntax described above is used for specifying how they are connected:
a = FooBlock()
b = BarBlock()
p = pipeline.Pipeline([a, b])
Now, you just give the pipeline input and get its output:
data = 3
result = p.process(data)
In this case, the result would be 2 * (data + 1) == 8
.
Post-Process Hooks¶
Sometimes, it’s useful to be able to hook into some block in the pipeline to retrieve its data in the middle of a run through the pipeline. For instance, let’s say you have a simple pipeline:
[a, b]:
─a─b─
You run some data through the pipeline to get the result from block b
, but
you also want to run some function with the output of a
. Block
takes
a hooks
keword argument which takes a list of functions to execute after
the block’s process
method finishes. To use hooks, make sure your custom
block calls the parent Block
__init__
method. For example:
import axopy.pipeline as pipeline
class FooBlock(pipeline.Block):
def __init__(self, hooks=None):
super(FooBlock, self).__init__(hooks=hooks)
def process(self, data):
return data + 1
class BarBlock(pipeline.Block):
def process(self, data):
return 2 * data
def foo_hook(data):
print("FooBlock output is %d".format(data))
a = FooBlock(hooks=[foo_hook])
b = BarBlock()
p = pipeline.Pipeline([a, b])
result = p.process(3)
Now, the call to process
on the pipeline will input 3 to block a
, block
a
will add 1 then print FooBlock output is 4
, and then 4 will be passed
to block b
, which will return 8.
Some Task Recipes¶
The following are just some recipes for tasks or pieces of tasks that are somewhat common. Note that these are for illustration purposes and won’t always run as-is.
The Basics¶
Storing Data¶
Storing data within a task is typically a two-step process. First, you
implement prepare_design()
to set up the attributes and
arrays (with initial values) for each trial and the trial/block structure. Then
you implement prepare_storage()
to get access to a new
TaskWriter
. When your trial is finished, you can use
the task’s trial
attribute to write the trial data to disk using the task
writer.
class CustomTask(Task):
def prepare_design(self, design):
block = design.add_block()
for pos in [0, 0.2, 0.4]:
block.add_trial(attrs={'pos': pos})
block.shuffle()
def prepare_storage(self, storage):
self.writer = storage.create_task('custom_task')
# ... task implementation here
def finish_trial(self):
self.writer.write(self.trial)
# call next_trial() to start the next trial
# could instead start a timer if you want a timeout between trials
self.next_trial()
Using Input Hardware¶
To make use of an input device (DAQ), implement prepare_daq()
to gain access to the stream interface, get it running, then connect its
updated transmitter to a callback that you define.
class CustomTask(Task):
def prepare_daq(self, daqstream):
self.daqstream = daqstream
self.daqstream.start()
def run_trial(self, trial):
self.daqstream.updated.connect(self.update)
def update(self, data):
# do something with the data from the daqstream here
You may instead want to connect the stream in prepare_daq
and start and
stop the stream (as opposed to letting it run and making/breaking the
connection to your update callback). The main disadvantage to this approach is
some devices may take a couple seconds to start. The downside of the other
approach though is the time from making the connection to the first call of the
update
callback is variable depending on when exactly the connection is
made with respect to the most recent update from the hardware.
class CustomTask(Task):
def prepare_daq(self, daqstream):
self.daqstream = daqstream
self.daqstream.updated.connect(self.update)
def run_trial(self, trial):
self.daqstream.start()
def update(self, data):
# do something with the data from the daqstream here
Examples¶
Here are some complete examples that can be run. Click on an example to see the source code and some comments about the example.
Note
Click here to download the full example code
Experiment Setup Options¶
Demonstration of several ways to instantiate an experiment.
- simple
- The most straightforward usage. You pass the hardware device to create the experiment, then run a task. Subject configuration is handled automatically.
- customized
- A customized experiment setup. A “config” step is used before
Experiment.run()
to allow the researcher to select the subject group for the current session (“A” or “B”).
import argparse
from axopy.experiment import Experiment
from axopy.task import Oscilloscope
from axopy.daq import NoiseGenerator
daq = NoiseGenerator(rate=2000, num_channels=6, read_size=200)
def run():
"""Main function of the example. Runs each demo and then exits."""
customized()
def simple():
# subject is not given, so it is configured in run
exp = Experiment(daq=daq).run(Oscilloscope())
def customized():
exp = Experiment(daq=daq)
# optional config step, subject field is implied
config = exp.configure(group=('A', 'B'))
# here you can retrieve the selected group via `config['group']`
# run list of tasks
exp.run(Oscilloscope())
def debug():
# subject is given, so no configure step is needed
exp = Experiment(daq=daq, data='/tmp/data', subject='test').run(
Oscilloscope())
if __name__ == '__main__':
functions = {
'simple': simple,
'customized': customized,
}
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
'function',
help='Function in the example script to run.')
args = parser.parse_args()
if args.function not in functions:
print("{} isn't a function in the example.".format(args.function))
sys.exit(-1)
else:
functions[args.function]()
Total running time of the script: ( 0 minutes 0.000 seconds)
Note
Click here to download the full example code
Built-In Devices¶
This example demonstrates some input devices built into AxoPy for testing. Pass the following options to try out different devices:
- rainbow
- Basic use of an NoiseGenerator to show lots of colorful random data.
- keyboard
- Basic use of a Keyboard to show roughly-timed keyboard inputs.
- keystick
- Neat use of a filter to get joystick-like inputs from a keyboard.
- mouse
- Basic use of a Mouse for velocity input.
import sys
import argparse
import numpy as np
from axopy.task import Oscilloscope
from axopy.experiment import Experiment
from axopy.daq import NoiseGenerator, Keyboard, Mouse
from axopy.pipeline import Pipeline, Callable, Windower, Filter
def rainbow():
dev = NoiseGenerator(rate=2000, num_channels=16, read_size=200)
run(dev)
def keyboard():
dev = Keyboard()
# need a windower to show something interesting in the oscilloscope
pipeline = Pipeline([Windower(10)])
run(dev, pipeline)
def keystick():
dev = Keyboard(rate=20, keys=list('wasd'))
pipeline = Pipeline([
# window to average over
Windower(10),
# mean along rows
Callable(lambda x: np.mean(x, axis=1, keepdims=True)),
# window to show in the oscilloscope
Windower(60)
])
run(dev, pipeline)
def mouse():
dev = Mouse(rate=20)
pipeline = Pipeline([
# just for scaling the input since it's in pixels
Callable(lambda x: x/100),
# window to show in the oscilloscope
Windower(40)
])
run(dev, pipeline)
def run(dev, pipeline=None):
# run an experiment with just an oscilloscope task
Experiment(daq=dev, subject='test').run(Oscilloscope(pipeline))
if __name__ == '__main__':
functions = {
'rainbow': rainbow,
'keyboard': keyboard,
'keystick': keystick,
'mouse': mouse,
}
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument(
'function',
help='Function in the example script to run.')
args = parser.parse_args()
if args.function not in functions:
print("{} isn't a function in the example.".format(args.function))
sys.exit(-1)
else:
functions[args.function]()
Total running time of the script: ( 0 minutes 0.000 seconds)
Note
Click here to download the full example code
Adaptive Cursor Control Mapping¶
This example contains a 2D cursor-to-target task which processes input signals from a data acquisition device (e.g. from EMG hardware) and adaptively learns a linear mapping from input magnitude to cursor position via the recursive least squares (RLS) algorithm.
Once the cursor interface is shown, press the “Enter” key to begin. The target will move to some location on the screen and the subject should attempt to move the cursor toward the target. As input data is collected, the recursive least squares algorithm updates the weights of a linear mapping from input amplitude to cursor position. Once this training procedure is finished, the target changes color and the subject can attempt to hit the targets with the mapping now fixed.
import numpy
import random
from scipy.signal import butter
from axopy import pipeline
from axopy.features import mean_absolute_value
from axopy.experiment import Experiment
from axopy import util
from axopy.task import Task, Oscilloscope
from axopy.daq import NoiseGenerator
from axopy.timing import Counter
from axopy.gui.canvas import Canvas, Circle, Cross
class RLSMapping(pipeline.Block):
"""Linear mapping of EMG amplitude to position updated by RLS.
Parameters
----------
m : int
Number of vectors in the mapping.
k : int
Dimensionality of the mapping vectors.
lam : float
Forgetting factor.
"""
def __init__(self, m, k, lam, delta=0.001):
super(RLSMapping, self).__init__()
self.m = m
self.k = k
self.lam = lam
self.delta = delta
self._init()
@classmethod
def from_weights(cls, weights):
"""Construct an RLSMapping static weights."""
obj = cls(1, 1, 1)
obj.weights = weights
return obj
def _init(self):
self.w = numpy.zeros((self.k, self.m))
self.P = numpy.eye(self.m) / self.delta
def process(self, data):
"""Just applies the current weights to the input."""
self.y = data
self.xhat = self.y.dot(self.w.T)
return self.xhat
def update(self, x):
"""Update the weights with the teaching signal."""
z = self.P.dot(self.y.T)
g = z / (self.lam + self.y.dot(z))
e = x - self.xhat
self.w = self.w + numpy.outer(e, g)
self.P = (self.P - numpy.outer(g, z)) / self.lam
class CursorFollowing(Task):
# TODO split this into two tasks (a "training" task and a "practice" task).
# This would involve storing the RLS weights and loading them for the
# practice task. Probably a good idea to write a simple cursor interface
# class to share common code between the two tasks.
target_dist = 0.8
def __init__(self, pipeline):
super(CursorFollowing, self).__init__()
self.pipeline = pipeline
def prepare_design(self, design):
d = self.target_dist
target_positions = [(d, 0), (0, d), (-d, 0), (0, -d), (0, 0)]
for training in [True, False]:
block = design.add_block()
for x, y in target_positions:
block.add_trial(attrs={
'target_x': x,
'target_y': y,
'training': training
})
block.shuffle()
def prepare_graphics(self, container):
self.canvas = Canvas()
self.cursor = Circle(0.05, color='#aa1212')
self.target = Circle(0.1, color='#32b124')
self.canvas.add_item(self.target)
self.canvas.add_item(self.cursor)
self.canvas.add_item(Cross())
container.set_widget(self.canvas)
def prepare_daq(self, daqstream):
self.daqstream = daqstream
self.daqstream.start()
self.timer = Counter(50)
self.timer.timeout.connect(self.finish_trial)
def run_trial(self, trial):
if not trial.attrs['training']:
self.target.color = '#3224b1'
self._reset()
self.target.pos = trial.attrs['target_x'], trial.attrs['target_y']
self.target.show()
self.pipeline.clear()
self.connect(self.daqstream.updated, self.update)
def update(self, data):
xhat = self.pipeline.process(data)
self.cursor.pos = xhat
target_pos = numpy.array([self.trial.attrs['target_x'],
self.trial.attrs['target_y']])
if self.trial.attrs['training']:
self.pipeline.named_blocks['RLSMapping'].update(target_pos)
if self.cursor.collides_with(self.target):
self.finish_trial()
self.timer.increment()
def finish_trial(self):
self.disconnect(self.daqstream.updated, self.update)
self._reset()
self.next_trial()
def _reset(self):
self.cursor.pos = 0, 0
self.timer.reset()
self.target.hide()
def finish(self):
self.daqstream.stop()
def key_press(self, key):
if key == util.key_escape:
self.finish()
else:
super().key_press(key)
if __name__ == '__main__':
dev = NoiseGenerator(rate=2000, num_channels=4, read_size=200)
b, a = butter(4, (10/2000./2., 450/2000./2.), 'bandpass')
preproc_pipeline = pipeline.Pipeline([
pipeline.Windower(400),
pipeline.Centerer(),
pipeline.Filter(b, a=a, overlap=200),
])
main_pipeline = pipeline.Pipeline([
preproc_pipeline,
pipeline.Callable(mean_absolute_value),
RLSMapping(4, 2, 0.99)
])
Experiment(daq=dev, subject='test').run(
Oscilloscope(preproc_pipeline),
CursorFollowing(main_pipeline)
)
Total running time of the script: ( 0 minutes 0.000 seconds)
API¶
These are the modules/subpackages which constitute AxoPy.
daq¶
Protocol and threaded interface for data acquisition.
-
class
axopy.daq.
DaqStream
(device)[source]¶ Asynchronous interface to an input device.
Runs a persistent while loop wherein the device is repeatedly polled for data. When the data becomes available, it is emitted and the loop continues.
There are effectively two methods of this class: start and stop. These methods do as their names suggest – they start and stop the underlying device from sampling new data.
The device used to create the DaqStream is also accessible via the
device
attribute so you can change settings on the underlying device any time (e.g. sampling rate, number of samples per update, etc.).Parameters: device (daq) – Any object implementing the AxoPy data acquisition interface. See NoiseGenerator
for an example.-
updated
¶ Transmitted when the latest chunk of data is available. The data type depends on the underlying input device, but it is often a numpy ndarray.
Type: Transmitter
-
disconnected
¶ Transmitted if the device cannot be read from (it has disconnected somehow).
Type: Transmitter
-
finished
¶ Transmitted when the device has stopped and samping is finished.
Type: Transmitter
-
run
()[source]¶ Implementation for the underlying QThread.
Don’t call this method directly – use
start()
instead.
-
running
¶ Boolean value indicating whether or not the stream is running.
-
-
class
axopy.daq.
Keyboard
(rate=10, keys=None)[source]¶ Keyboard input device.
The keyboard device works by periodically sampling (with the rate specified) whether or not the watched keys have been pressed since the last sampling event. The output is a numpy array of shape
(n_keys, 1)
, where the numerical values are booleans indicating whether or not the corresponding keys have been pressed.Parameters: - rate (int, optional) – Sampling rate, in Hz.
- keys (container of str, optional) – Keys to watch and use as input signals. The keys used here should not
conflict with the key used by the
Experiment
to start the next task.
Notes
There are a couple reasonable alternatives to the way the keyboard device is currently implemented. One way to do it might be sampling the key states at a given rate and producing segments of sampled key state data, much like a regular data acquisition device. One issue is that actual key state (whether the key is being physically pressed or not) doesn’t seem to be feasible to find out with Qt. You can hook into key press and key release events, but these are subject to repeat delay and repeat rate.
Another possible keyboard device would be responsive to key press events themselves rather than an input sampling event. While Qt enables event-based keyboard handling, the method used here fits the input device model, making it easily swappable with other input devices.
-
class
axopy.daq.
Mouse
(rate=10, position=False)[source]¶ Mouse input device.
The mouse device works by periodically sampling (with the rate specified) the mouse position within the AxoPy experiment window. The output is in the form of a numpy array of shape
(2, 1)
, representing either the change in position (default) or the absolute position in the window.Parameters: Notes
In Qt’s coordinate system, the positive y direction is downward. Here, this is inverted as a convenience (upward movement of the mouse produces a positive “velocity”).
Mouse events are intercepted here but they are not consumed, meaning you can still use the mouse to manipulate widgets in the experiment window.
-
class
axopy.daq.
NoiseGenerator
(rate=1000, num_channels=1, amplitude=1.0, read_size=100)[source]¶ An emulated data acquisition device which generates random data.
Each sample of the generated data is sampled from a zero-mean Gaussian distribution with variance determined by the amplitude specified, which corresponds to three standard deviations. That is, approximately 99.7% of the samples should be within the desired peak amplitude.
NoiseGenerator
is meant to emulate data acquisition devices that block on each request for data until the data is available. Seeread()
for details.Parameters: - rate (int, optional) – Sample rate in Hz. Default is 1000.
- num_channels (int, optional) – Number of “channels” to generate. Default is 1.
- amplitude (float, optional) – Approximate peak amplitude of the signal to generate. Specifically, the amplitude represents three standard deviations for generating the Gaussian distributed data. Default is 1.
- read_size (int, optional) – Number of samples to generate per
read()
call. Default is 100.
-
read
()[source]¶ Generates zero-mean Gaussian data.
This method blocks (calls
time.sleep()
) to emulate other data acquisition units which wait for the requested number of samples to be read. The amount of time to block is calculated such that consecutive calls will always return with constant frequency, assuming the calls occur faster than required (i.e. processing doesn’t fall behind).Returns: data – The generated data. Return type: ndarray, shape (num_channels, read_size)
design¶
Task design containers.
-
class
axopy.design.
Design
[source]¶ Top-level task design container.
The
Design
is a list ofBlock
objects, which themselves are lists ofTrial
objects.-
add_block
()[source]¶ Add a block to the design.
Returns: block – The created block. Return type: design.Block
-
-
class
axopy.design.
Block
(index, *args, **kwargs)[source]¶ List of trials.
Experiments often consist of a set of blocks, each containing the same set of trials in randomized order. You usually shouldn’t need to create a block directly – use
Design.add_block()
instead.Parameters: index (int) – Index of the block in the design. This is required to pass along to each trial in the block, so that the trial knows which block it belongs to. -
add_trial
(attrs=None)[source]¶ Add a trial to the block.
A
Trial
object is created and added to the block. You can optionally provide a dictionary of attribute name/value pairs to initialize the trial.Parameters: attrs (dict, optional) – Dictionary of attribute name/value pairs. Returns: trial – The trial object created. This can be used to add new attributes or arrays. See Trial
.Return type: Trial
-
-
class
axopy.design.
Trial
(attrs)[source]¶ Container of trial data.
There are two kinds of data typically needed during a trial: attributes and arrays. Attributes are scalar quantities or primitives like integers, floating point numbers, booleans, strings, etc. Arrays are NumPy arrays, useful for holding things like cursor trajectories.
There are two primary purposes for each of these two kinds of data. First, it’s useful to design a task with pre-determined values, such as the target location or the cursor trajectory to follow. The other purpose is to temporarily hold runtime data using the same interface, such as the final cursor position or the time-to-target.
You shouldn’t normally need to create a trial directly – instead, use
Block.add_trial()
.
-
class
axopy.design.
Array
(data=None, stack_axis=1)[source]¶ Trial array.
The array is not much more than a NumPy array with a
stack()
method for conveniently adding new data to the array. This is useful in cases where you iteratively collect new segments of data and want to concatenate them. For example, you could use anArray
to collect the samples from a data acquisition device as they come in.You usually don’t need to create an array manually – instead, use
Trial.add_array()
.Parameters: -
data
¶ The NumPy array holding the data.
Type: ndarray, optional
-
experiment¶
Experiment workflow and design.
-
class
axopy.experiment.
Experiment
(daq=None, data='data', subject=None, allow_overwrite=False)[source]¶ Experiment workflow manager.
Presents the researcher with a prompt for entering session details and then presents the appropriate tasks.
Parameters: - daq (object, optional) – A data acquisition device that follows the AxoPy DAQ protocol. See
axopy.daq
. - data (str, optional) – Path to the data. The directory is created for you if it doesn’t exist.
- subject (str, optional) – The subject ID to use. If not specified, a configuration screen is shown before running the tasks so you can enter it there. This is mostly for experiment writing (to avoid the extra configuration step).
- allow_overwrite (bool, optional) – If
True
, overwrite protection inStorage
is disabled. This is mostly for experiment writing purposes.
-
configure
(**options)[source]¶ Configure the experiment with custom options.
This method allows you to specify a number of options that you want to configure with a graphical interface prior to running the tasks. Use keyword arguments to specify which options you want to configure. The options selected/specified in the graphical interface are then returned by this method so that you can alter setup before running the experiment.
Each keyword argument should list the data type to configure, such as
float
,str
, orint
. You can also provide a list or tuple of available choices for that option.You do not need to add an option for the subject name/ID – that is added automatically if the subject ID was not specified when creating the experiment.
- daq (object, optional) – A data acquisition device that follows the AxoPy DAQ protocol. See
features¶
Time-domain features.
- Notation:
- \(x_i\) : value of a signal at time index \(i\)
- \(N\) : length of the signal
-
axopy.features.time.
integrated_emg
(x, axis=-1, keepdims=False)[source]¶ Sum over the rectified signal.
\[\text{IEMG} = \sum_{i=1}^{N} | x_{i} |\]Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – IEMG of each channel.
Return type: ndarray, shape (n_channels,)
- x (ndarray) – Input data. Use the
-
axopy.features.time.
logvar
(x, axis=-1, keepdims=False)[source]¶ Log of the variance of the signal.
\[\text{log-var} = \log \left( \frac{1}{N} \sum_{i=1}^{N} \left(x_i - \mu \right)^2 \right)\]For electrophysiological signals that are mean-zero, this is the log of the mean square value, making it similar to
root_mean_square()
but scaling differently (slower) with \(x\).For EMG data recorded from forearm muscles, log-var has been found to relate to wrist angle fairly linearly [1]_.
Note: base-10 logarithm is used, though the base is not specified in [1]_.
Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – log-var of each channel.
Return type: ndarray, shape (n_channels,)
References
[1] J. M. Hahne, F. Bießmann, N. Jiang, H. Rehbaum, D. Farina, F. C. Meinecke, K.-R. Müller, and L. C. Parra, “Linear and Nonlinear Regression Techniques for Simultaneous and Proportional Myoelectric Control,” IEEE Transactions on Neural Systems and Rehabilitation Engineering, vol. 22, no. 2, pp. 269–279, 2014. - x (ndarray) – Input data. Use the
-
axopy.features.time.
mean_absolute_value
(x, weights='mav', axis=-1, keepdims=False)[source]¶ Computes the mean absolute value (MAV) of each signal.
Mean absolute value is a popular feature for obtaining amplitude information from EMG, especially in gesture classification contexts [1]_.
There is an optional windowing function applied to the rectified signal, described as MAV1 and MAV2 in some references. A custom window can also be used. The general definition is given as:
\[\text{MAV} = \frac{1}{N} \sum_{i=1}^{N} w_i |x_i|\]Normal MAV does not use a windowing function, equivalent to setting all \(w_i = 1\).
MAV1 refers to a rectangular window which de-emphasizes the beginning and ending of an input window. The first quarter of the input samples receive a weight of 0.5, the middle half of the input samples receive a weight of 1, and the final quarter recieves a weight of 0.5:
\[\begin{split}w_i = \begin{cases} 1, & \frac{N}{4} \leq i \leq \frac{3N}{4} \\ 0.5, & \text{otherwise} \end{cases}\end{split}\]MAV2 uses a similar window structure to MAV1 (i.e. broken into first quarter, middle half, and final quarter), but the window is trapezoidal in shape, ramping from 0 to 1 over the first quarter and from 1 to 0 over the last quarter:
\[\begin{split}w_i = \begin{cases} 1, & \frac{N}{4} \leq i \leq \frac{3N}{4} \\ \frac{4i}{N}, & i < \frac{N}{4} \\ \frac{4(i - N)}{N}, & i > \frac{3N}{4} \end{cases}\end{split}\]Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - weights (str or ndarray, optional) –
Weights to use. Possible values:
- ’mav’ : all samples in the signal are weighted equally (default).
- ’mav1’ : rectangular window with the middle half of the signal receiving unit weight and the first and last quarters of the signal receiving half weight.
- ’mav2’ : similar to ‘mav1’, but weights on the first and last quarters increase and decrease between 0 and 1 respectively, forming a trapezoidal window.
- [ndarray] : user-supplied weights to apply. Must be a 1D array
with the same length as the signals received in the
compute
method.
- axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – MAV of each channel.
Return type: ndarray, shape (n_channels,)
See also
axopy.features.util.inverted_t_window()
- Generates the window for MAV1
axopy.features.util.trapezoidal_window()
- Generates the window for MAV2
References
[1] B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993. [2] A. Phinyomark, P. Phukpattaranont, and C. Limsakul, “Feature Reduction and Selection for EMG Signal Classification,” Expert Systems with Applications, vol. 39, no. 8, pp. 7420-7431, 2012. - x (ndarray) – Input data. Use the
-
axopy.features.time.
root_mean_square
(x, axis=-1, keepdims=False)[source]¶ Computes the root mean square of each signal.
RMS is a commonly used feature for extracting amplitude information from physiological signals.
\[\text{RMS} = \sqrt{\frac{1}{N} \sum_{i=1}^N x_i^2}\]Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – RMS of each channel.
Return type: ndarray, shape (n_channels,)
- x (ndarray) – Input data. Use the
-
axopy.features.time.
slope_sign_changes
(x, threshold=0, axis=-1, keepdims=False)[source]¶ Computes the number of slope sign changes (SSC) of each signal.
A slope sign change occurs when the middle value of a group of three adjacent values in the signal is either greater than or less than both of the other two.
Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - threshold (float, optional) – A threshold for discriminating true slope sign changes from those caused by low-level noise fluctuating about a specific value. By default, no threshold is used, so every slope sign change in the signal is counted.
- axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – SSC of each channel.
Return type: ndarray, shape (n_channels,)
References
[1] B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993. - x (ndarray) – Input data. Use the
-
axopy.features.time.
waveform_length
(x, axis=-1, keepdims=False)[source]¶ Computes the waveform length (WL) of each signal.
Waveform length is the sum of the absolute value of the deltas between adjacent values (in time) of the signal:
\[\text{WL} = \sum_{i=1}^{N-1} | x_{i+1} - x_i |\]Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – WL of each channel.
Return type: ndarray, shape (n_channels,)
References
[1] B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993. - x (ndarray) – Input data. Use the
-
axopy.features.time.
zero_crossings
(x, threshold=0, axis=-1, keepdims=False)[source]¶ Computes the number of zero crossings (ZC) of each signal.
A zero crossing occurs when two adjacent values (in time) of the signal have opposite sign. A threshold is used to mitigate the effect of noise around zero. It is used as a measure of frequency information.
Parameters: - x (ndarray) – Input data. Use the
axis
argument to specify the “time axis”. - threshold (float, optional) – A threshold for discriminating true zero crossings from those caused by low-level noise situated about zero. By default, no threshold is used, so every sign change in the signal is counted.
- axis (int, optional) – The axis to compute the feature along. By default, it is computed along rows, so the input is assumed to be shape (n_channels, n_samples).
- keepdims (bool, optional) – Whether or not to keep the dimensionality of the input. That is, if the input is 2D, the output will be 2D even if a dimension collapses to size 1.
Returns: y – ZC of each channel.
Return type: ndarray, shape (n_channels,)
References
[1] B. Hudgins, P. Parker, and R. N. Scott, “A New Strategy for Multifunction Myoelectric Control,” IEEE Transactions on Biomedical Engineering, vol. 40, no. 1, pp. 82-94, 1993. - x (ndarray) – Input data. Use the
gui¶
main¶
-
class
axopy.gui.main.
Container
[source]¶ Graphics container for tasks.
-
set_layout
(layout)[source]¶ Set the layout of the container.
Parameters: layout (QLayout) – Any QLayout is OK to add.
-
set_widget
(widget)[source]¶ Set the widget containing all graphical elements.
Parameters: widget (QWidget) – Any QWidget is OK to add. See also
axopy.gui.canvas()
- Canvas widget and canvas items that can be added to the container.
axopy.gui.graph()
- Plotting widgets that can be added to the container.
-
show
()[source]¶ Show the container in the active application.
This is not normally needed, unless you’re testing out a GUI and using
gui_check()
.
-
-
axopy.gui.main.
get_qtapp
()[source]¶ Get a QApplication instance running.
Returns the current
QApplication
instance if it exists and creates it otherwise.Examples
This function is primarily for internal usage, but it is exposed to make it convenient to test graphical code without all of the experiment/task machinery.
from axopy.gui.main import get_qtapp, Container # get the application instance first, before creating widgets etc. app = get_qtapp() con = Container() # call show() to show the widget, then run the application con.show() app.exec_()
-
axopy.gui.main.
gui_check
()[source]¶ Check graphical interface code interactively.
This function makes it convenient to test graphical code without all of the experiment/task machinery. You can create a
Container
, add things to the container, and then call this function with the container to run the GUI and try it out.Note
Be sure to call
Container.show()
at the end to display the container.Examples
Minimal example
from axopy.gui.main import Container, gui_check with gui_check(): con = Container() con.show()
canvas¶
2D canvas style graphics functionality backed by Qt’s QGraphicsView.
-
class
axopy.gui.canvas.
Canvas
(draw_border=True, bg_color=None, border_color=None, parent=None, invert_x=False, invert_y=False)[source]¶ A 2D canvas interface implemented using a QGraphicsView.
This view essentially just holds a QGraphicsScene that grows to fit the size of the view, keeping the aspect ratio square. The scene is displayed with a gray (by default) border.
See Qt’s documentation for more information about working with QGraphicsView (https://doc.qt.io/Qt-5/qgraphicsview.html).
-
class
axopy.gui.canvas.
Circle
(diameter, color='#333333')[source]¶ Circular item.
The coordinates of this item correspond to the center of the circle.
Parameters:
-
class
axopy.gui.canvas.
Cross
(size=0.05, linewidth=0.01, color='#333333')[source]¶ Collection of two lines oriented as a “plus sign”.
The coordinates of this item correspond to the center of the cross. This item’s
qitem
attribute is aQGraphicsItemGroup
(a group of two lines).Parameters: -
color
¶ Color of the lines in the cross.
-
-
class
axopy.gui.canvas.
Item
(qitem)[source]¶ Canvas item base class.
This is simply a wrapper around any kind of
QGraphicsItem
, adding the ability to set some properties of the underlying item with a more Pythonic API. You can always access theQGraphicsItem
with theqitem
attribute. Once you know what kind ofQGraphicsItem
is being wrapped, you can use the corresponding Qt documentation to make use of more complete functionality.-
qitem
¶ The QGraphicsItem being wrapped. You can use this attribute to access methods and properties of the item not exposed by the wrapper class. If you find yourself routinely using a method of the QGraphicsItem, consider recommending it for addition to AxoPy.
Type: QGraphicsItem
-
color
¶ Color of the item.
-
opacity
¶ Opacity of the item (between 0 and 1).
-
pos
¶ Both X and Y coordinates of the item in the canvas.
-
visible
¶ Visibility of the item.
-
x
¶ X coordinate of the item in the canvas.
-
y
¶ Y coordinate of the item in the canvas.
-
-
class
axopy.gui.canvas.
Line
(x1, y1, x2, y2, width=0.01, color='#333333')[source]¶ Line item.
-
color
¶ Color of the item.
-
pipeline¶
-
class
axopy.pipeline.
Block
(name=None, hooks=None)[source]¶ Base class for all blocks.
Notes
Blocks should take their parameters in
__init__
and provide at least theprocess
method for taking in data and returning some result.
-
class
axopy.pipeline.
Pipeline
(blocks, name=None)[source]¶ Feedforward arrangement of blocks for processing data.
A
Pipeline
contains a set ofBlock
objects which operate on data to produce a final output.To create a pipeline, the following two rules are needed: blocks in a list processed in series, and blocks in a tuple are processed in parallel.
Blocks that are arranged to take multiple inputs should expect to take the corresponding number of inputs in the order they are given. It is up to the user constructing the pipeline to make sure that the arrangement of blocks makes sense.
Parameters: blocks (container) – The blocks in the pipline, with lists processed in series and tuples processed in parallel. -
named_blocks
¶ Dictionary of blocks in the pipeline. Keys are the names given to the blocks in the pipeline and values are the block objects.
Type: dict
-
clear
()[source]¶ Calls the
clear
method on each block in the pipeline. The effect depends on the blocks themselves.
-
process
(data)[source]¶ Calls the
process
method of each block in the pipeline, passing the outputs around as specified in the block structure.Parameters: data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it. Returns: out – The data output by the process
method of the last block(s) in the pipeline.Return type: object
-
-
class
axopy.pipeline.
Passthrough
(blocks, expand_output=True, name=None)[source]¶ Convenience block for passing input along to output.
A passthrough pipeline block is useful when you want to process some data then provide both the processed output as well as the original input to another block downstream:
-----------------------> x | x ---> [ subpipeline ] ----> y
-
process
(data)[source]¶ Calls the
process
method of each block in the pipeline, passing the outputs around as specified in the block structure.Parameters: data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it. Returns: out – The data output by the process
method of the last block(s) in the pipeline.Return type: object
-
-
class
axopy.pipeline.
Callable
(func, func_args=None, func_kwargs=None, name=None, hooks=None)[source]¶ A Block that does not require persistent attributes.
Some Block implementations don’t require attributes to update on successive calls to the process method, but instead are essentially a function that can be called repeatedly. This class is for conveniently creating such a block.
If the function you want to use takes additional arguments, such as a keyword argument that
Note: if you use an anonymous function as the func argument, (e.g.
lambda x: 2*x
), it is recommended to explicitly give the block a meaningful name.Parameters: - func (callable(x)) – Function that gets called when the block’s process method is called. Should take a single input and return output which is compatible with whatever is connected to the block.
- func_args (list, optional) – List (or tuple) of additional arguments to pass to func when calling it for processing. If None (default), no arguments are used.
- func_kwargs (dict) – Keyword argument name/value pairs to pass to func when calling it for processing. If None (default), no keyword arguments are used.
- name (str, optional, default=None) – Name of the block. By default, the name of the processor function is used.
- hooks (list, optional, default=None) – List of callables (callbacks) to run when after the block’s process method is called.
-
class
axopy.pipeline.
Windower
(length)[source]¶ Windows incoming data to a specific length.
Takes new input data and combines with past data to maintain a sliding window with optional overlap. The window length is specified directly, so the overlap depends on the length of the input.
The input length may change on each iteration, but the
Windower
must be cleared before the number of channels can change.Parameters: length (int) – Total number of samples to output on each iteration. This must be at least as large as the number of samples input to the windower on each iteration. See also
axopy.pipeline.common.Ensure2D
- Ensure input to the windower is 2D.
Examples
Basic use of a windower:
>>> import axopy.pipeline as pipeline >>> import numpy as np >>> win = pipeline.Windower(4) >>> win.process(np.array([[1, 2], [3, 4]])) array([[ 0., 0., 1., 2.], [ 0., 0., 3., 4.]]) >>> win.process(np.array([[7, 8], [5, 6]])) array([[ 1., 2., 7., 8.], [ 3., 4., 5., 6.]]) >>> win.clear() >>> win.process(np.array([[1, 2], [3, 4]])) array([[ 0., 0., 1., 2.], [ 0., 0., 3., 4.]])
If your data is 1-dimensional (shape
(n_samples,)
), use anEnsure2D
block in front of theWindower
:>>> win = pipeline.Windower(4) >>> p = pipeline.Pipeline([pipeline.Ensure2D(), win]) >>> p.process(np.array([1, 2])) array([[ 0., 0., 1., 2.]])
-
class
axopy.pipeline.
Centerer
(name=None, hooks=None)[source]¶ Centers data by subtracting out its mean.
\[\tilde{x} = x - \sum_{i=0}^{N-1} x[i]\]
-
class
axopy.pipeline.
Filter
(b, a=1, overlap=0)[source]¶ Filters incoming data with a time domain filter.
This filter implementation takes filter coefficients that are designed by the user – it merely applies the filter to the input, remembering the final inputs/outputs from the previous update and using them as initial conditions for the current update.
Parameters: - b (ndarray) – Numerator polynomial coefficients of the filter.
- a (ndarray, optional) – Denominator polynomial coefficients of the filter. Default is 1, meaning the filter is FIR.
- overlap (int, optional) – Number of samples overlapping in consecutive inputs. Needed for correct filter initial conditions in each filtering operation. Default is 0, meaning the final inputs/outputs of the previous update are used.
See also
axopy.pipeline.common.Ensure2D
- Ensure input to the filter is 2D.
Examples
Design a filter using scipy and use the coefficients:
>>> import axopy.pipeline as pipeline >>> import numpy as np >>> from scipy.signal import butter >>> b, a = butter(4, 100/1000/2) >>> f = pipeline.Filter(b, a) >>> f.process(np.random.randn(1, 5)) # doctest: +ELLIPSIS array([...
Use a filter in combination with a
Windower
, making sure to account for overlapping data in consecutive filtering operations. Here, we’ll use a window of length 5 and pass in 3 samples at a time, so there will be an overlap of 2 samples. The overlapping samples in each output will agree:>>> w = pipeline.Windower(5) >>> f = pipeline.Filter(b, a, overlap=2) >>> p = pipeline.Pipeline([w, f]) >>> out1 = p.process(np.random.randn(1, 3)) >>> out2 = p.process(np.random.randn(1, 3)) >>> out1[:, -2:] == out2[:, :2] array([[ True, True]], dtype=bool)
-
class
axopy.pipeline.
FeatureExtractor
(features, hooks=None)[source]¶ Computes multiple features from the input, concatenating the results.
Each feature should be able to take in the same data and output a 1D array, so overall output of the FeatureExtractor can be a single 1D array.
This block isn’t strictly necessary, since you could just apply multiple feature blocks in parallel and the result of each will be passed to the next block. However, the block following feature computation typically expects the input to be a single array (or row) per data sample.
Parameters: features (list) – List of (name, feature) tuples (i.e. implementing a compute
method).-
feature_indices
¶ Dictionary of (start, stop) tuples indicating the bounds of each feature, accessed by name. Will be empty until after data is first passed through.
Type: dict
-
clear
()[source]¶ Clears the output array.
This should be called if the input is going to change form in some way (i.e. the shape of the input array changes).
-
process
(data)[source]¶ Run data through the list of features and concatenates the results.
The first pass (after a
clear
call) will be a little slow since the extractor needs to allocate the output array.Parameters: data (array, shape (n_channels, n_samples)) – Input data. Must be appropriate for all features. Returns: out Return type: array, shape (n_features,)
-
-
class
axopy.pipeline.
Estimator
(estimator)[source]¶ A pipeline block wrapper around scikit-learn’s idea of an estimator.
An estimator is an object that can be trained with some data (
fit
) and, once trained, can output predictions from novel inputs. A common use-case for this block is to utilize a scikit-learn pipeline in the context of a axopy pipeline.Parameters: estimator (object) – An object implementing the scikit-learn Estimator interface (i.e. implementing fit
andpredict
methods).
-
class
axopy.pipeline.
Transformer
(transformer, hooks=None)[source]¶ A pipeline block wrapper around scikit-learn’s idea of a transformer.
A transformer is trained with some data (
fit
) and, once trained, can output projections of the input data to some other space. A common example is projecting data in high-dimensional space to a lower-dimensional space using principal components analysis.Parameters: transformer (object) – An object implementing the scikit-learn Transformer interface (i.e. implementing fit
andtransform
methods).
-
class
axopy.pipeline.
Ensure2D
(orientation='row')[source]¶ Transforms an array to ensure it has 2 dimensions.
Input with shape
(n,)
can be made to have shape(n, 1)
or(1, n)
.Parameters: orientation ({'row', 'col'}, optional) – Orientation of the output. If ‘row’, the output will have shape (1, n)
, meaning the output is a row vector. This is the default behavior, useful when the data is something like samples of a 1-channel signal. If ‘col’, the output will have shape(n, 1)
, meaning the output is a column vector.Examples
Output row data:
>>> import numpy as np >>> import axopy.pipeline as pipeline >>> block = pipeline.Ensure2D() >>> block.process(np.array([1, 2, 3])) array([[1, 2, 3]])
Output column data:
>>> block = pipeline.Ensure2D(orientation='col') >>> block.process(np.array([1, 2, 3])) array([[1], [2], [3]])
-
axopy.pipeline.
segment
(data, length, overlap=0)[source]¶ Generate segments of an array.
Each segment is of a specified length and optional overlap with the previous segment. Only segments of the specified length are retrieved (if segments don’t fit evenly into the data).
Parameters: Yields: segment (array (n_channels, length)) – Segment of the input array.
Examples
Segment a 2-channel recording:
>>> import numpy as np >>> from axopy.pipeline import segment >>> x = np.arange(8).reshape(2, 4) >>> x array([[0, 1, 2, 3], [4, 5, 6, 7]]) >>> seg = segment(x, 2) >>> next(seg) array([[0, 1], [4, 5]]) >>> next(seg) array([[2, 3], [6, 7]])
Consecutive segments with overlapping samples agree:
>>> seg = segment(x, 3, overlap=2) >>> next(seg) array([[0, 1, 2], [4, 5, 6]]) >>> next(seg) array([[1, 2, 3], [5, 6, 7]])
-
axopy.pipeline.
segment_indices
(n, length, overlap=0)[source]¶ Generate indices to segment an array.
Each segment is of a specified length with optional overlap with the previous segment. Only segments of the specified length are retrieved if they don’t fit evenly into the the total length. The indices returned are meant to be used for slicing, e.g.
data[:, from:to]
.Parameters: Yields: - from (int) – Index of the beginning of the segment with respect to the input array.
- to (int) – Index of the end of the segement with respect to the input array.
Examples
Basic usage – segment a 6-sample recording into segments of length 2:
>>> import numpy as np >>> from axopy.pipeline import segment_indices >>> list(segment_indices(6, 2)) [(0, 2), (2, 4), (4, 6)]
Overlapping segments:
>>> list(segment_indices(11, 5, overlap=2)) [(0, 5), (3, 8), (6, 11)]
storage¶
Experiment data storage.
There are two main use cases for the functionality in this module: reading/writing data during an experiment session, and reading data once an experiment is complete (i.e. for analysis). See the user guide for information on these use cases/api.jpeg/api.jpeg/api.jpeg.
-
class
axopy.storage.
Storage
(root='data', allow_overwrite=False)[source]¶ Top-level data storage maintainer.
See the user guide for more information.
Parameters: - root (str, optional) – Path to the root of the data storage filestructure. By default, ‘data’ is used. If the directory doesn’t exist, it is created.
- allow_overwrite (bool, optional) – Specifies whether or not the storage interface allows you to overwrite a task’s data for a subject if it already exists.
-
create_task
(task_id)[source]¶ Create a task for the current subject.
Parameters: task_id (str) – The ID of the task to add. The name must not have been used for another task for the current subject. Returns: writer – A new TaskWriter for storing task data. Return type: TaskWriter
-
require_task
(task_id)[source]¶ Retrieves a task for the current subject.
Parameters: task_id (str) – The ID of the task to look for. The task must have already been run with the current subject. Returns: reader – A new TaskReader for working with the existing task data. Return type: TaskReader
-
subject_id
¶ The current subject ID.
When setting the subject ID for a new subject (i.e. one that doesn’t exist already), storage for that subject is created.
-
subject_ids
¶ Generate subject IDs found in storage sorted in alphabetical order.
Returns: subject_id – ID of the subject found. Return type: str
-
task_ids
¶ Generate names of tasks found for the current subject.
Note that there may be no tasks found if the subject_id has not been set or if the subject hasn’t started any tasks. In this case, nothing is yielded.
-
to_zip
(outfile)[source]¶ Create a ZIP archive from a data storage hierarchy.
For more information, see
storage_to_zip()
.
-
class
axopy.storage.
TaskReader
(root)[source]¶ High-level interface to task storage.
Parameters: root (str) – Path to task’s root directory. This is the directory specific to a task which contains a trials.csv
file and HDF5 array files.-
iterarray
(name)[source]¶ Iteratively retrieve an array for each trial.
Parameters: name (str) – Name of the array type.
-
pickle
(name)[source]¶ Load a pickled object from storage.
Parameters: name (str) – Name of the pickled object (no extension).
-
trials
¶ A Pandas DataFrame representing the trial data.
-
-
class
axopy.storage.
TaskWriter
(root)[source]¶ The main interface for storing data from a task.
Usually you get a
Taskwriter
fromStorage
, so you don’t normally need to create one yourself.Parameters: root (str) – Path to the task root (e.g. ‘data/subject_1/taskname’). -
trials
¶ TrialWriter
for storing trial data.Type: TrialWriter
-
pickle
(obj, name)[source]¶ Write a generic object to storage.
This can be useful to persist an object from one task to another, or to store something that doesn’t easily fit into the AxoPy storage model (trial attributes and arrays). Be cautious, however, as pickles are not the best way to store things long-term nor securely. See the advice given here, for example: http://scikit-learn.org/stable/modules/model_persistence.html
Parameters:
-
write
(trial)[source]¶ Write trial data.
This must be the last thing done for the current trial. That is, make sure all arrays have accumulated all data required. This method flushes trial and array data to files for you.
Important note: The trial’s arrays are cleared after writing.
Parameters: trial (Trial) – Tral data. See TrialWriter.write()
andTrial
for details.
-
-
class
axopy.storage.
TrialWriter
(filepath)[source]¶ Writes trial data to a CSV file line by line.
Parameters: filepath (str) – Path to the file to create.
-
axopy.storage.
makedirs
(path, exist_ok=False)[source]¶ Recursively create directories.
This is needed for Python versions earlier than 3.2, otherwise
os.makedirs(path, exist_ok=True)
would suffice.Parameters:
-
axopy.storage.
read_hdf5
(filepath, dataset='data')[source]¶ Read the contents of a dataset.
This function assumes the dataset in the HDF5 file exists at the root of the file (i.e. at ‘/’). It is primarily for internal usage but you may find it useful for quickly grabbing an array from an HDF5 file.
Parameters: Returns: data – The data (read into memory) as a NumPy array. The dtype, shape, etc. is all determined by whatever is in the file.
Return type: ndarray
-
axopy.storage.
storage_to_zip
(path, outfile=None)[source]¶ Create a ZIP archive from a data storage hierarchy.
The contents of the data storage hierarchy are all placed in the archive, with the top-level folder in the archive being the data storage root folder itself. That is, all paths within the ZIP file are relative to the dataset root folder.
Parameters: Returns: outfile – The name of the ZIP file created.
Return type:
-
axopy.storage.
write_hdf5
(filepath, data, dataset='data')[source]¶ Write data to an hdf5 file.
The data is written to a new file with a single dataset called “data” in the root group. It is primarily for internal usage but you may find it useful for quickly writing an array to an HDF5 file.
Parameters:
task¶
Base task implementation and some generic tasks.
See the user guide for information on implementing tasks.
-
class
axopy.task.
Task
[source]¶ Base class for tasks.
This base class handles iteration through the trials of the task in blocks.
Most task implementations will want to override the prepare and run_trial methods, while the rest can be left to default behavior.
If you need to implement a custom constructor (
__init__
), you must call the base task__init__
:class CustomTask(Task): def __init__(self, custom_param): super(CustomTask, self).__init__()
-
advance_block_key
¶ Key for the user to press in order to advance to the next block. Can set to
None
to disable the feature (next block starts immediately after one finishes).Type: str
-
finished
¶ Emitted when the last trial of the last block has run. This is primarily for the
axopy.experiment.Experiment
to know when the task has finished so it can run the next one. You shouldn’t need to use this transmitter at all.Type: Transmitter
-
connect
(transmitter, receiver)[source]¶ Connect a transmitter to a receiver.
This method helps the task keep track of connections so that all of the manually specified connections can be torn down by the
axopy.experiment.Experiment
.
-
finish
()[source]¶ Clean up at the end of the task.
Override if you need to clean up once the task is completely finished. If you do override this method, you should call the base
Task.finish()
method or call thefinished
transmitter yourself.
-
finish_block
()[source]¶ Finishes the block and starts the next one.
Override if you need to do some cleanup between blocks.
-
key_press
(key)[source]¶ Handle key press events.
Override this method to receive key press events. Available keys can be found in
axopy.util
(named key_<keyname>, e.g. key_k).Important note: if relying on the
advance_block_key
to advance the task, make sure to call this super implementation.
-
next_block
()[source]¶ Get the next block of trials and starts running them.
Before starting the block, a prompt is shown to verify that the user is ready to proceed. If there are no more blocks to run, the finish method is called. You usually do not need to override this method.
-
next_trial
()[source]¶ Get the next trial in the block and starts running it.
If there are no more trials in the block, the finish_block method is called.
-
prepare_daq
(daqstream)[source]¶ Set up the input device, if applicable.
Parameters: daqstream (DaqStream) – Interface to the data acquisition device.
-
prepare_design
(design)[source]¶ Callback for setting up the task design.
See
axopy.design.Design
for details on how to design the task. By default, nothing is added to the design.Parameters: design (Design) – The task design object you can use to add blocks and trials.
-
prepare_graphics
(container)[source]¶ Initialize graphical elements and messaging connections.
This method should be overridden if the task uses any graphics (which most do). It is important to defer initializing any graphical elements until this method is called so that the graphical backend has a chance to start.
Parameters: container (axopy.gui.Container) – The graphical container you can add objects to.
-
prepare_storage
(storage)[source]¶ Initialize data storage.
Override to read or write task data. A
axopy.storage.Storage
object is given, which can be used to create a newaxopy.storage.TaskWriter
for storing new data or aaxopy.storage.TaskReader
for reading in existing data. Note that the subject ID has already been set.Parameters: storage (Storage) – The top-level storage object with which new storage can be allocated and existing data can be read.
-
run
()[source]¶ Start running the task.
Simply calls next_block to start running trials in the first block. This method is called automatically if the task is added to an
Experiment
. Tasks that have a block design shouldn’t normally need to override this method. Tasks that are “free-running” for experimenter interaction (e.g. a plot visualization task that the experimenter controls) should override.
-
-
class
axopy.task.
Oscilloscope
(pipeline=None)[source]¶ A visualizer for data acquisition devices.
This task connects to the experiment input DAQ and displays each of its channels on a separate plot. You can optionally pass a
Pipeline
object to preprocess the input data before displaying it.Parameters: pipeline (Pipeline, optional) – Pipeline to run the input data through before displaying it. Often this is some preprocessing like filtering. It is often useful to use a Windower
in the pipeline to display a larger chunk of data than is given on each input update of the DAQ. This gives a “scrolling” view of the input data, which can be helpful for experiment setup (e.g. placing electrodes, making sure the device is recording properly, etc.).-
key_press
(key)[source]¶ Handle key press events.
Override this method to receive key press events. Available keys can be found in
axopy.util
(named key_<keyname>, e.g. key_k).Important note: if relying on the
advance_block_key
to advance the task, make sure to call this super implementation.
-
prepare_daq
(daqstream)[source]¶ Set up the input device, if applicable.
Parameters: daqstream (DaqStream) – Interface to the data acquisition device.
-
prepare_graphics
(container)[source]¶ Initialize graphical elements and messaging connections.
This method should be overridden if the task uses any graphics (which most do). It is important to defer initializing any graphical elements until this method is called so that the graphical backend has a chance to start.
Parameters: container (axopy.gui.Container) – The graphical container you can add objects to.
-
run
()[source]¶ Start running the task.
Simply calls next_block to start running trials in the first block. This method is called automatically if the task is added to an
Experiment
. Tasks that have a block design shouldn’t normally need to override this method. Tasks that are “free-running” for experimenter interaction (e.g. a plot visualization task that the experimenter controls) should override.
-
timing¶
Utilities for keeping track of time in a task.
-
class
axopy.timing.
Counter
(max_count=1, reset_on_timeout=True)[source]¶ Counts to a given number then transmits a timeout event.
Parameters: -
timeout
¶ Transmitted when
max_count
has been reached.Type: Transmitter
Examples
Basic usage:
>>> from axopy.timing import Counter >>> timer = Counter(2) >>> timer.increment() >>> timer.count 1 >>> timer.progress 0.5 >>> timer.increment() >>> timer.count 0
-
increment
()[source]¶ Increment the counter.
If max_count is reached, the
timeout
event is transmitted. If reset_on_timeout has been set to True (default), the timer is also reset.
-
progress
¶ Progress toward timeout, from 0 to 1.
-
-
class
axopy.timing.
Timer
(duration)[source]¶ Real-time one-shot timer.
This is useful in situations where you want to wait for some amount of time and locking the timing to data acquisition updates is not important. For example, inserting a waiting period between trials of a task can be done by connecting the
timeout
transmitter to your task’snext_trial()
method.Parameters: duration (float) – Duration of the timer, in seconds. -
timeout
¶ Transmitted when the timer has finished.
Type: Transmitter
-
util¶
Development¶
Install¶
Retrieve the source code:
$ git clone git@github.com:axopy/axopy.git
$ cd axopy
A virtual environment is a good way to set up a development environment:
$ python -m venv .venv-dev
$ source .venv-dev/bin/activate
Once in the virtual environment, you can install AxoPy in “development mode” along with the development dependencies:
(.venv-dev) $ pip intall -e .[dev]
If you take a look at the setup.py
file, you’ll see that this installs
everything from the requirements.txt
file as well as the
requirements-dev.txt
file. This should be everything needed to run the
tests and build the documentation.
The Python Packaging Authority has much more detailed instructions here: https://packaging.python.org/guides/installing-using-pip-and-virtualenv/
Document¶
To build the documentation locally, you can activate your dev environment,
cd
into the docs/
directory, and run one of the build rules, like:
(.venv-dev) $ cd docs/
(.venv-dev) $ make html
If you aren’t able to use make
, you could run the sphinx commands manually.
Look in the docs/Makefile
to be sure, but it should be something like:
(.venv-dev) $ sphinx-build -b html . _build/html
Once the build completes, you can open _build/html/index.html
with your
browser to check the output.
Release¶
This section is relevant only if you’re an AxoPy maintainer. If you’re just interested in contributing to AxoPy, you can stop here.
PyPI¶
To cut a release, you’ll need the wheel and twine packages (these are not included in the dev requirements which are for every-day development and CI).
Start by bumping the version number in the axopy.version
module, then build
the source and wheel distributions:
(.venv-dev) $ python setup.py sdist bdist_wheel
Optional: If you want to check first that all is well before pushing to PyPI, you can upload the release packages to the test PyPI server first:
(.venv-dev) $ twine upload --repository-url https://test.pypi.org/legacy dist/*
Now you can use twine to upload the release to PyPI. Note that you should
either remove everything from dist/
first (if just using the command below)
or specify which files to upload:
(.venv-dev) $ twine upload dist/*
Once everything looks good, you can tag the version bump commit and push the tag up to GitHub.
conda-forge¶
After releasing on PyPI, you can update the release on conda-forge. Check their docs for insight into their process, but the following is sufficient now that the infrastructure is in place.
Start by forking the axopy-feedstock repo on GitHub. Edit the
recipe/meta.yml
file so its version string matches the PyPI version and
copy the SHA256 hash for the source dist (sdist) package (the tar.gz
file)
from PyPI and paste it into the line below that. Commit the changes to your
fork then make a pull request against the conda-forge repository. If you’re
a maintainer, you have push access to the repository so once CI passes, go
ahead and merge. The rest is automated.