Source code for stonesoup.feeder.time

import datetime
import heapq
from warnings import warn

from ..base import Property
from ..buffered_generator import BufferedGenerator
from .base import DetectionFeeder, GroundTruthFeeder

[docs] class TimeBufferedFeeder(DetectionFeeder, GroundTruthFeeder): """Buffer data so it can be yielded in time order. Any "old" data (where the time is earlier than the head of the buffer) shall be dropped, producing a :class:`UserWarning`. """ buffer_size: int = Property(default=1000, doc="Max size of buffer") @BufferedGenerator.generator_method def data_gen(self): time_data_buffer = [] for time_data in self.reader: # Drop "old" detections if len(time_data_buffer) >= self.buffer_size and \ time_data < time_data_buffer[0]: warn('"Old" detection dropped') continue # Yield oldest when buffer full if len(time_data_buffer) >= self.buffer_size: yield heapq.heappushpop(time_data_buffer, time_data) else: # Else just insert heapq.heappush(time_data_buffer, time_data) # No more new data: yield remaining buffer while time_data_buffer: yield heapq.heappop(time_data_buffer)
[docs] class TimeSyncFeeder(DetectionFeeder, GroundTruthFeeder): """Synchronise the data into selected time window. Assumes that states from :attr:`Reader` are in time order. The :class:`~.TimeBufferedFeeder` can be used in conjunction to ensure this is the case. """ time_window: datetime.timedelta = Property( default=datetime.timedelta(seconds=1), doc="Time window to group detections") @BufferedGenerator.generator_method def data_gen(self): data_iter = iter(self.reader) prev_time, states = next(data_iter) data_buffer = set(states) prev_time -= self.time_window for time, states in data_iter: if time > prev_time + self.time_window: yield prev_time + self.time_window, data_buffer # Increment time and yield empty data set until latest # state within time window. prev_time += self.time_window while time > prev_time + self.time_window: prev_time += self.time_window yield prev_time, set() data_buffer = set(states) else: data_buffer.update(states) # No more new states: yield remaining buffer yield prev_time + self.time_window, data_buffer