# pipeline¶

class axopy.pipeline.Block(name=None, hooks=None)[source]

Base class for all blocks.

Notes

Blocks should take their parameters in __init__ and provide at least the process method for taking in data and returning some result.

clear()[source]

Clear the state of the block.

Some blocks don’t keep stateful attributes, so clear does nothing by default.

process(data)[source]

Process input data and produce a result.

Subclasses must implement this method, otherwise it shouldn’t really be a Block.

class axopy.pipeline.Pipeline(blocks, name=None)[source]

Feedforward arrangement of blocks for processing data.

A Pipeline contains a set of Block objects which operate on data to produce a final output.

To create a pipeline, the following two rules are needed: blocks in a list processed in series, and blocks in a tuple are processed in parallel.

Blocks that are arranged to take multiple inputs should expect to take the corresponding number of inputs in the order they are given. It is up to the user constructing the pipeline to make sure that the arrangement of blocks makes sense.

Parameters: blocks (container) – The blocks in the pipline, with lists processed in series and tuples processed in parallel.
named_blocks

Dictionary of blocks in the pipeline. Keys are the names given to the blocks in the pipeline and values are the block objects.

Type: dict
clear()[source]

Calls the clear method on each block in the pipeline. The effect depends on the blocks themselves.

process(data)[source]

Calls the process method of each block in the pipeline, passing the outputs around as specified in the block structure.

Parameters: data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it. out – The data output by the process method of the last block(s) in the pipeline. object
class axopy.pipeline.Passthrough(blocks, expand_output=True, name=None)[source]

Convenience block for passing input along to output.

A passthrough pipeline block is useful when you want to process some data then provide both the processed output as well as the original input to another block downstream:

    -----------------------> x
|
x ---> [ subpipeline ] ----> y

process(data)[source]

Calls the process method of each block in the pipeline, passing the outputs around as specified in the block structure.

Parameters: data (object) – The input to the first block(s) in the pipeline. The type/format doesn’t matter, as long as the blocks you define accept it. out – The data output by the process method of the last block(s) in the pipeline. object
class axopy.pipeline.Callable(func, func_args=None, func_kwargs=None, name=None, hooks=None)[source]

A Block that does not require persistent attributes.

Some Block implementations don’t require attributes to update on successive calls to the process method, but instead are essentially a function that can be called repeatedly. This class is for conveniently creating such a block.

If the function you want to use takes additional arguments, such as a keyword argument that

Note: if you use an anonymous function as the func argument, (e.g. lambda x: 2*x), it is recommended to explicitly give the block a meaningful name.

Parameters: func (callable(x)) – Function that gets called when the block’s process method is called. Should take a single input and return output which is compatible with whatever is connected to the block. func_args (list, optional) – List (or tuple) of additional arguments to pass to func when calling it for processing. If None (default), no arguments are used. func_kwargs (dict) – Keyword argument name/value pairs to pass to func when calling it for processing. If None (default), no keyword arguments are used. name (str, optional, default=None) – Name of the block. By default, the name of the processor function is used. hooks (list, optional, default=None) – List of callables (callbacks) to run when after the block’s process method is called.
process(data)[source]

Process input data and produce a result.

Subclasses must implement this method, otherwise it shouldn’t really be a Block.

class axopy.pipeline.Windower(length)[source]

Windows incoming data to a specific length.

Takes new input data and combines with past data to maintain a sliding window with optional overlap. The window length is specified directly, so the overlap depends on the length of the input.

The input length may change on each iteration, but the Windower must be cleared before the number of channels can change.

Parameters: length (int) – Total number of samples to output on each iteration. This must be at least as large as the number of samples input to the windower on each iteration.

axopy.pipeline.common.Ensure2D
Ensure input to the windower is 2D.

Examples

Basic use of a windower:

>>> import axopy.pipeline as pipeline
>>> import numpy as np
>>> win = pipeline.Windower(4)
>>> win.process(np.array([[1, 2], [3, 4]]))
array([[ 0.,  0.,  1.,  2.],
[ 0.,  0.,  3.,  4.]])
>>> win.process(np.array([[7, 8], [5, 6]]))
array([[ 1.,  2.,  7.,  8.],
[ 3.,  4.,  5.,  6.]])
>>> win.clear()
>>> win.process(np.array([[1, 2], [3, 4]]))
array([[ 0.,  0.,  1.,  2.],
[ 0.,  0.,  3.,  4.]])


If your data is 1-dimensional (shape (n_samples,)), use an Ensure2D block in front of the Windower:

>>> win = pipeline.Windower(4)
>>> p = pipeline.Pipeline([pipeline.Ensure2D(), win])
>>> p.process(np.array([1, 2]))
array([[ 0.,  0.,  1.,  2.]])

clear()[source]

Clear the buffer containing previous input data.

process(data)[source]

Add new data to the end of the window.

Parameters: data (array, shape (n_channels, n_samples)) – Input data. n_samples must be less than or equal to the windower length. out – Output window with the input data at the end. array, shape (n_channels, length)
class axopy.pipeline.Centerer(name=None, hooks=None)[source]

Centers data by subtracting out its mean.

$\tilde{x} = x - \sum_{i=0}^{N-1} x[i]$
process(data)[source]

Center each row of the input.

Parameters: data (array, shape (n_channels, n_samples)) – Input data. out – Input data that’s been centered. array, shape (n_channels, n_samples)
class axopy.pipeline.Filter(b, a=1, overlap=0)[source]

Filters incoming data with a time domain filter.

This filter implementation takes filter coefficients that are designed by the user – it merely applies the filter to the input, remembering the final inputs/outputs from the previous update and using them as initial conditions for the current update.

Parameters: b (ndarray) – Numerator polynomial coefficients of the filter. a (ndarray, optional) – Denominator polynomial coefficients of the filter. Default is 1, meaning the filter is FIR. overlap (int, optional) – Number of samples overlapping in consecutive inputs. Needed for correct filter initial conditions in each filtering operation. Default is 0, meaning the final inputs/outputs of the previous update are used.

axopy.pipeline.common.Ensure2D
Ensure input to the filter is 2D.

Examples

Design a filter using scipy and use the coefficients:

>>> import axopy.pipeline as pipeline
>>> import numpy as np
>>> from scipy.signal import butter
>>> b, a = butter(4, 100/1000/2)
>>> f = pipeline.Filter(b, a)
>>> f.process(np.random.randn(1, 5)) # doctest: +ELLIPSIS
array([...


Use a filter in combination with a Windower, making sure to account for overlapping data in consecutive filtering operations. Here, we’ll use a window of length 5 and pass in 3 samples at a time, so there will be an overlap of 2 samples. The overlapping samples in each output will agree:

>>> w = pipeline.Windower(5)
>>> f = pipeline.Filter(b, a, overlap=2)
>>> p = pipeline.Pipeline([w, f])
>>> out1 = p.process(np.random.randn(1, 3))
>>> out2 = p.process(np.random.randn(1, 3))
>>> out1[:, -2:] == out2[:, :2]
array([[ True,  True]], dtype=bool)

clear()[source]

Clears the filter initial conditions.

Clearing the initial conditions is important when starting a new recording if overlap is nonzero.

process(data)[source]

Applies the filter to the input.

Parameters: data (ndarray, shape (n_channels, n_samples)) – Input signals.
class axopy.pipeline.FeatureExtractor(features, hooks=None)[source]

Computes multiple features from the input, concatenating the results.

Each feature should be able to take in the same data and output a 1D array, so overall output of the FeatureExtractor can be a single 1D array.

This block isn’t strictly necessary, since you could just apply multiple feature blocks in parallel and the result of each will be passed to the next block. However, the block following feature computation typically expects the input to be a single array (or row) per data sample.

Parameters: features (list) – List of (name, feature) tuples (i.e. implementing a compute method).
named_features

Dictionary of features accessed by name.

Type: dict
feature_indices

Dictionary of (start, stop) tuples indicating the bounds of each feature, accessed by name. Will be empty until after data is first passed through.

Type: dict
clear()[source]

Clears the output array.

This should be called if the input is going to change form in some way (i.e. the shape of the input array changes).

process(data)[source]

Run data through the list of features and concatenates the results.

The first pass (after a clear call) will be a little slow since the extractor needs to allocate the output array.

Parameters: data (array, shape (n_channels, n_samples)) – Input data. Must be appropriate for all features. out array, shape (n_features,)
class axopy.pipeline.Estimator(estimator, return_proba=False, return_log_proba=False)[source]

A pipeline block wrapper around scikit-learn’s idea of an estimator.

An estimator is an object that can be trained with some data (fit) and, once trained, can output predictions from novel inputs. A common use-case for this block is to utilize a scikit-learn pipeline in the context of a axopy pipeline.

Parameters: estimator (object) – An object implementing the scikit-learn Estimator interface (i.e. implementing fit and predict methods). return_proba (boolean, optional (default: False)) – If True, use the estimator’s predict_proba method instead of predict to return probability estimates. return_log_proba (boolean, optional (default: False)) – If True, use the estimator’s predict_log_proba method instead of predict to return probability estimates.
process(data)[source]

Calls the estimator’s predict or predict_proba method and returns the result.

class axopy.pipeline.Transformer(transformer, inverse=False, hooks=None)[source]

A pipeline block wrapper around scikit-learn’s idea of a transformer.

A transformer is trained with some data (fit) and, once trained, can output projections of the input data to some other space. A common example is projecting data in high-dimensional space to a lower-dimensional space using principal components analysis.

Parameters: transformer (object) – An object implementing the scikit-learn Transformer interface (i.e. implementing fit, transform and inverse_transform methods). inverse (boolean, optional (default: False)) – If True, call inverse_transform instead of transform.
process(data)[source]

Calls the transformer’s transform or inverse_transform method and returns the result.

class axopy.pipeline.Ensure2D(orientation='row')[source]

Transforms an array to ensure it has 2 dimensions.

Input with shape (n,) can be made to have shape (n, 1) or (1, n).

Parameters: orientation ({'row', 'col'}, optional) – Orientation of the output. If ‘row’, the output will have shape (1, n), meaning the output is a row vector. This is the default behavior, useful when the data is something like samples of a 1-channel signal. If ‘col’, the output will have shape (n, 1), meaning the output is a column vector.

Examples

Output row data:

>>> import numpy as np
>>> import axopy.pipeline as pipeline
>>> block = pipeline.Ensure2D()
>>> block.process(np.array([1, 2, 3]))
array([[1, 2, 3]])


Output column data:

>>> block = pipeline.Ensure2D(orientation='col')
>>> block.process(np.array([1, 2, 3]))
array([[1],
[2],
[3]])

process(data)[source]

Make sure data is 2-dimensional.

If the input already has two dimensions, it is unaffected.

Parameters: data (array, shape (n,)) – Input data. out – Output data, with shape specified by orientation. array, shape (1, n) or (n, 1)
axopy.pipeline.segment(data, length, overlap=0)[source]

Generate segments of an array.

Each segment is of a specified length and optional overlap with the previous segment. Only segments of the specified length are retrieved (if segments don’t fit evenly into the data).

Parameters: data (array, shape (n_channels, n_samples)) – Data to segment. length (int) – Number of samples to retrieve in each chunk. overlap (int, optional) – Number of overlapping samples in consecutive chunks. segment (array (n_channels, length)) – Segment of the input array.

Examples

Segment a 2-channel recording:

>>> import numpy as np
>>> from axopy.pipeline import segment
>>> x = np.arange(8).reshape(2, 4)
>>> x
array([[0, 1, 2, 3],
[4, 5, 6, 7]])
>>> seg = segment(x, 2)
>>> next(seg)
array([[0, 1],
[4, 5]])
>>> next(seg)
array([[2, 3],
[6, 7]])


Consecutive segments with overlapping samples agree:

>>> seg = segment(x, 3, overlap=2)
>>> next(seg)
array([[0, 1, 2],
[4, 5, 6]])
>>> next(seg)
array([[1, 2, 3],
[5, 6, 7]])

axopy.pipeline.segment_indices(n, length, overlap=0)[source]

Generate indices to segment an array.

Each segment is of a specified length with optional overlap with the previous segment. Only segments of the specified length are retrieved if they don’t fit evenly into the the total length. The indices returned are meant to be used for slicing, e.g. data[:, from:to].

Parameters: n (int) – Number of samples to segment up. length (int) – Length of each segment. overlap (int, optional) – Number of overlapping samples in consecutive segments. from (int) – Index of the beginning of the segment with respect to the input array. to (int) – Index of the end of the segement with respect to the input array.

Examples

Basic usage – segment a 6-sample recording into segments of length 2:

>>> import numpy as np
>>> from axopy.pipeline import segment_indices
>>> list(segment_indices(6, 2))
[(0, 2), (2, 4), (4, 6)]


Overlapping segments:

>>> list(segment_indices(11, 5, overlap=2))
[(0, 5), (3, 8), (6, 11)]