"""Protocol and threaded interface for data acquisition."""
import time
import numpy
from PyQt5 import QtCore
from axopy.messaging import Transmitter
from axopy.gui.main import get_qtapp, qt_key_map
from axopy.pipeline import Filter
[docs]class DaqStream(QtCore.QThread):
"""Asynchronous interface to an input device.
Runs a persistent while loop wherein the device is repeatedly polled for
data. When the data becomes available, it is emitted and the loop
continues.
There are effectively two methods of this class: start and stop. These
methods do as their names suggest -- they start and stop the underlying
device from sampling new data.
The device used to create the DaqStream is also accessible via the
``device`` attribute so you can change settings on the underlying device
any time (e.g. sampling rate, number of samples per update, etc.).
Parameters
----------
device : daq
Any object implementing the AxoPy data acquisition interface. See
:class:`NoiseGenerator` for an example.
Attributes
----------
updated : Transmitter
Transmitted when the latest chunk of data is available. The data type
depends on the underlying input device, but it is often a numpy
ndarray.
disconnected : Transmitter
Transmitted if the device cannot be read from (it has disconnected
somehow).
finished : Transmitter
Transmitted when the device has stopped and samping is finished.
"""
updated = Transmitter(object)
disconnected = Transmitter()
finished = Transmitter()
def __init__(self, device):
super(DaqStream, self).__init__()
self.device = device
self._running = False
@property
def running(self):
"""Boolean value indicating whether or not the stream is running."""
return self._running
[docs] def start(self):
"""Start the device and begin reading from it."""
super(DaqStream, self).start()
[docs] def run(self):
"""Implementation for the underlying QThread.
Don't call this method directly -- use :meth:`start` instead.
"""
self._running = True
self.device.start()
while True:
if not self._running:
break
try:
d = self.device.read()
except IOError:
self.disconnected.emit()
return
if self._running:
self.updated.emit(d)
self.device.stop()
self.finished.emit()
[docs] def stop(self, wait=True):
"""Stop the stream.
Parameters
----------
wait : bool, optional
Whether or not to wait for the underlying device to stop before
returning.
"""
self._running = False
if wait:
self.wait()
[docs]class NoiseGenerator(object):
"""An emulated data acquisition device which generates random data.
Each sample of the generated data is sampled from a zero-mean Gaussian
distribution with variance determined by the amplitude specified, which
corresponds to three standard deviations. That is, approximately 99.7% of
the samples should be within the desired peak amplitude.
:class:`NoiseGenerator` is meant to emulate data acquisition devices that
block on each request for data until the data is available. See
:meth:`read` for details.
Parameters
----------
rate : int, optional
Sample rate in Hz. Default is 1000.
num_channels : int, optional
Number of "channels" to generate. Default is 1.
amplitude : float, optional
Approximate peak amplitude of the signal to generate. Specifically, the
amplitude represents three standard deviations for generating the
Gaussian distributed data. Default is 1.
read_size : int, optional
Number of samples to generate per :meth:`read()` call. Default is 100.
"""
def __init__(self, rate=1000, num_channels=1, amplitude=1.0,
read_size=100):
self.rate = rate
self.num_channels = num_channels
self.amplitude = amplitude
self.read_size = read_size
self._sigma = amplitude / 3
self.sleeper = _Sleeper(float(self.read_size/self.rate))
[docs] def start(self):
"""Does nothing for this device. Implemented to follow device API."""
pass
[docs] def read(self):
"""
Generates zero-mean Gaussian data.
This method blocks (calls ``time.sleep()``) to emulate other data
acquisition units which wait for the requested number of samples to be
read. The amount of time to block is calculated such that consecutive
calls will always return with constant frequency, assuming the calls
occur faster than required (i.e. processing doesn't fall behind).
Returns
-------
data : ndarray, shape (num_channels, read_size)
The generated data.
"""
self.sleeper.sleep()
data = self._sigma * numpy.random.randn(self.num_channels,
self.read_size)
return data
[docs] def stop(self):
"""Does nothing for this device. Implemented to follow device API."""
pass
[docs] def reset(self):
"""Reset the device back to its initialized state."""
self.sleeper.reset()
[docs]class Keyboard(QtCore.QObject):
"""Keyboard input device.
The keyboard device works by periodically sampling (with the rate
specified) whether or not the watched keys have been pressed since the last
sampling event. The output is a numpy array of shape ``(n_keys, 1)``, where
the numerical values are booleans indicating whether or not the
corresponding keys have been pressed.
Parameters
----------
rate : int, optional
Sampling rate, in Hz.
keys : container of str, optional
Keys to watch and use as input signals. The keys used here should not
conflict with the key used by the ``Experiment`` to start the next
task.
Notes
-----
There are a couple reasonable alternatives to the way the keyboard device
is currently implemented. One way to do it might be sampling the key states
at a given rate and producing segments of sampled key state data, much like
a regular data acquisition device. One issue is that actual key state
(whether the key is being physically pressed or not) doesn't seem to be
feasible to find out with Qt. You can hook into key press and key release
events, but these are subject to repeat delay and repeat rate.
Another possible keyboard device would be responsive to key press events
themselves rather than an input sampling event. While Qt enables
event-based keyboard handling, the method used here fits the input device
model, making it easily swappable with other input devices.
"""
def __init__(self, rate=10, keys=None):
super(Keyboard, self).__init__()
self.rate = rate
if keys is None:
keys = list('wasd')
self.keys = keys
self._qkeys = [qt_key_map[k] for k in keys]
self._sleeper = _Sleeper(1.0/rate)
self._data = numpy.zeros((len(self.keys), 1))
[docs] def start(self):
"""Start the keyboard input device."""
# install event filter to capture keyboard input events
get_qtapp().installEventFilter(self)
[docs] def read(self):
"""Read which keys have just been pressed.
Returns
-------
data : ndarray, shape (n_keys, 1)
A boolean array with a 1 indicating the corresponding key has been
pressed and a 0 indicating it has not.
"""
self._sleeper.sleep()
out = self._data.copy()
self._data *= 0
return out
[docs] def stop(self):
"""Stop the keyboard input device.
You may need to stop the device in case you want to be able to use the
keys watched by the device for another purpose.
"""
# remove event filter so captured keys propagate when daq isn't used
get_qtapp().removeEventFilter(self)
[docs] def reset(self):
"""Reset the input device."""
self._sleeper.reset()
[docs] def eventFilter(self, obj, event):
evtype = event.type()
if evtype == QtCore.QEvent.KeyPress and event.key() in self._qkeys:
self._data[self._qkeys.index(event.key())] = 1
return True
return False
[docs]class Mouse(QtCore.QObject):
"""Mouse input device.
The mouse device works by periodically sampling (with the rate specified)
the mouse position within the AxoPy experiment window. The output is in the
form of a numpy array of shape ``(2, 1)``, representing either the change
in position (default) or the absolute position in the window.
Parameters
----------
rate : int, optional
Sampling rate, in Hz.
position : bool, optional
Whether or not to return the mouse's position (instead of the position
difference from the prevoius sample).
Notes
-----
In Qt's coordinate system, the positive y direction is *downward*. Here,
this is inverted as a convenience (upward movement of the mouse produces a
positive "velocity").
Mouse events are intercepted here but they are not *consumed*, meaning you
can still use the mouse to manipulate widgets in the experiment window.
"""
def __init__(self, rate=10, position=False):
super(Mouse, self).__init__()
self.rate = rate
self._sleeper = _Sleeper(1.0/rate)
if position:
b = 1
else:
b = (1, -1)
self._filter = Filter(b)
self.reset()
[docs] def start(self):
"""Start sampling mouse movements."""
get_qtapp().installEventFilter(self)
[docs] def read(self):
"""Read the last-updated mouse position.
Returns
-------
data : ndarray, shape (2, 1)
The mouse "velocity" or position (x, y).
"""
self._sleeper.sleep()
return self._filter.process(self._data.copy())
[docs] def stop(self):
"""Stop sampling mouse movements."""
get_qtapp().removeEventFilter(self)
[docs] def reset(self):
"""Clear the input device."""
self._data = numpy.zeros((2, 1), dtype=float)
self._filter.clear()
self._sleeper.reset()
[docs] def eventFilter(self, obj, event):
evtype = event.type()
if evtype == QtCore.QEvent.MouseMove:
self._data[0] = event.x()
self._data[1] = -event.y()
return False
class _Sleeper(object):
def __init__(self, read_time):
self.read_time = read_time
self.last_read_time = None
def sleep(self):
t = time.time()
if self.last_read_time is None:
time.sleep(self.read_time)
else:
try:
time.sleep(self.read_time - (t - self.last_read_time))
except ValueError:
# if we're not meeting real-time requirement, don't wait
pass
self.last_read_time = time.time()
def reset(self):
self.last_read_time = None