Source code for stonesoup.feeder.multi

import heapq
from abc import abstractmethod
from collections.abc import Collection
from functools import cached_property
from queue import Empty, Full, Queue, LifoQueue, PriorityQueue
from threading import Thread

from .base import DetectionFeeder, GroundTruthFeeder
from ..base import Property
from ..buffered_generator import BufferedGenerator
from ..reader import Reader


[docs] class MultiDataFeeder(DetectionFeeder, GroundTruthFeeder): """Multi-data Feeder This returns states from multiple data readers as a single stream, yielding from the reader yielding the lowest timestamp first. """ reader = None readers: Collection[Reader] = Property(doc='Readers to yield from') @BufferedGenerator.generator_method def data_gen(self): yield from heapq.merge(*self.readers)
class _QueueMultiDataFeeder(DetectionFeeder, GroundTruthFeeder): reader = None readers: Collection[Reader] = Property(doc='Readers to yield from') max_size: int = Property( default=0, doc="Max queue size, where it will block more data being added. Default 0, unbounded.") @cached_property @abstractmethod def _queue(self): raise NotImplementedError def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._threads = None @staticmethod def _run(queue: Queue, reader: Reader): for time, data in reader: queue.put((time, data)) @BufferedGenerator.generator_method def data_gen(self): if self._threads is None: self._threads = [ Thread(target=self._run, args=(self._queue, reader), daemon=True) for reader in self.readers ] for thread in self._threads: thread.start() while any(thread.is_alive() for thread in self._threads) \ or not self._queue.empty(): try: time, data = self._queue.get_nowait() except Empty: continue yield time, data self._queue.task_done()
[docs] class FIFOMultiDataFeeder(_QueueMultiDataFeeder): """FIFO Multi-data Feeder This returns data from multiple data readers as a single stream, where each reader is consumed in a separate thread and put into a queue. The data is consumed first in, first out. This is aimed at sources of data that are real-time streams, for example sensors sending data via network. """ @cached_property def _queue(self): return Queue(self.max_size)
[docs] class LIFOMultiDataFeeder(_QueueMultiDataFeeder): """LIFO Multi-data Feeder This returns data from multiple data readers as a single stream, where each reader is consumed in a separate thread and put into a queue. The data is consumed last in, first out. This is aimed at sources of data that are real-time streams, for example sensors sending data via network. """ @cached_property def _queue(self): return LifoQueue(self.max_size)
[docs] class PriorityMultiDataFeeder(_QueueMultiDataFeeder): """Priority Multi-data Feeder This returns data from multiple data readers as a single stream, where each reader is consumed in a separate thread and put into a queue. The data is consumed prioritised by time, earlier to later. This is aimed at sources of data that are real-time streams, for example sensors sending data via network. """ @cached_property def _queue(self): return PriorityQueue(self.max_size)
class _MaxSizePriorityQueue(PriorityQueue): def put(self, item, block=True, timeout=None): try: # Call super, so can at least try or wait timeout before # overwriting. super().put(item, False, timeout) except Full: with self.not_full: self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): if self.maxsize <= 0 or self.maxsize > len(self.queue): heapq.heappush(self.queue, item) else: heapq.heappushpop(self.queue, item)
[docs] class MaxSizePriorityMultiDataFeeder(_QueueMultiDataFeeder): """Max Size Priority Multi-data Feeder This returns data from multiple data readers as a single stream, where each reader is consumed in a separate thread and put into a queue. The data is consumed prioritised by time, earlier to later. Unlike :class:`~.PriorityMultiDataFeeder`, rather than blocking when the queue is full, it will drop the oldest data. This is aimed at sources of data that are real-time streams, for example sensors sending data via network. """ max_size: int = Property( default=0, doc="Max queue size, where it will drop oldest data when full. Default 0, unbounded.") @cached_property def _queue(self): return _MaxSizePriorityQueue(self.max_size)