Source code for stonesoup.reader.yaml

from pathlib import Path

from ..base import Property
from ..buffered_generator import BufferedGenerator
from ..serialise import YAML
from .base import DetectionReader, GroundTruthReader, SensorDataReader
from ..tracker import Tracker
from .file import FileReader


[docs]class YAMLReader(FileReader, BufferedGenerator): """YAML Reader""" path: Path = Property(doc="File to read data from") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._yaml = YAML(typ='safe') @BufferedGenerator.generator_method def data_gen(self): for document in self._yaml.load_all(self.path): yield document.pop('time'), document
[docs]class YAMLDetectionReader(YAMLReader, DetectionReader): """YAML Detection Reader""" def data_gen(self): yield from super().data_gen()
[docs] @BufferedGenerator.generator_method def detections_gen(self): for time, document in self.data_gen(): yield time, document.get('detections', set())
[docs]class YAMLGroundTruthReader(YAMLReader, GroundTruthReader): """YAML Ground Truth Reader""" def data_gen(self): yield from super().data_gen()
[docs] @BufferedGenerator.generator_method def groundtruth_paths_gen(self): paths = dict() for time, document in self.data_gen(): updated_paths = set() for path in document.get('groundtruth_paths', set()): if path.id in paths: paths[path.id].states = path.states else: paths[path.id] = path updated_paths.add(paths[path.id]) yield time, updated_paths
[docs]class YAMLSensorDataReader(YAMLReader, SensorDataReader): """YAML Sensor Data Reader""" def data_gen(self): yield from super().data_gen()
[docs] @BufferedGenerator.generator_method def sensor_data_gen(self): for time, document in self.data_gen(): yield time, document.get('sensor_data', set())
[docs]class YAMLTrackReader(YAMLReader, Tracker): """YAML Track Reader""" def data_gen(self): yield from super().data_gen() def __iter__(self): self.data_iter = iter(self.data_gen()) self._tracks = dict() return super().__iter__() @property def tracks(self): return self._tracks def __next__(self): time, document = next(self.data_iter) updated_tracks = set() for track in document.get('tracks', set()): if track.id in self.tracks: self._tracks[track.id].states = track.states else: self._tracks[track.id] = track updated_tracks.add(self.tracks[track.id]) return time, updated_tracks