"""Utilities for keeping track of time in a task."""
from __future__ import division
from PyQt5 import QtCore
from axopy.messaging import Transmitter, TransmitterBase
[docs]class Counter(TransmitterBase):
"""Counts to a given number then transmits a timeout event.
Parameters
----------
max_count : int
Number of iterations to go through before transmitting the `timeout`
event. Must be greater than 1.
reset_on_timeout : bool, optional
Specifies whether or not the timer should reset its count back to zero
once the timeout event occurs. The default behavior is to reset.
Attributes
----------
count : int
Current count.
timeout : Transmitter
Transmitted when ``max_count`` has been reached.
Examples
--------
Basic usage:
>>> from axopy.timing import Counter
>>> timer = Counter(2)
>>> timer.increment()
>>> timer.count
1
>>> timer.progress
0.5
>>> timer.increment()
>>> timer.count
0
"""
timeout = Transmitter()
def __init__(self, max_count=1, reset_on_timeout=True):
super(Counter, self).__init__()
max_count = int(max_count)
if max_count < 1:
raise ValueError('max_count must be > 1')
self.reset_on_timeout = reset_on_timeout
self.max_count = max_count
self.count = 0
@property
def progress(self):
"""Progress toward timeout, from 0 to 1."""
return self.count / self.max_count
[docs] def increment(self):
"""Increment the counter.
If `max_count` is reached, the ``timeout`` event is transmitted. If
`reset_on_timeout` has been set to True (default), the timer is also
reset.
"""
self.count += 1
if self.count == self.max_count:
if self.reset_on_timeout:
self.reset()
self.timeout.emit()
[docs] def reset(self):
"""Resets the count to 0 to start over."""
self.count = 0
[docs]class Timer(TransmitterBase):
"""Real-time one-shot timer.
This is useful in situations where you want to wait for some amount of time
and locking the timing to data acquisition updates is not important. For
example, inserting a waiting period between trials of a task can be done by
connecting the ``timeout`` transmitter to your task's
:meth:`~axopy.task.Task.next_trial` method.
Parameters
----------
duration : float
Duration of the timer, in seconds.
Attributes
----------
timeout : Transmitter
Transmitted when the timer has finished.
"""
timeout = Transmitter()
def __init__(self, duration):
super(Timer, self).__init__()
self.duration = duration
self._qtimer = QtCore.QTimer()
self._qtimer.setInterval(int(1000*self.duration))
self._qtimer.setSingleShot(True)
self._qtimer.timeout.connect(self.timeout)
[docs] def start(self):
"""Start the timer."""
self._qtimer.start()
[docs] def stop(self):
"""Stop the timer.
If you stop the timer early, the timeout event won't be transmitted.
"""
self._qtimer.stop()