Source code for stonesoup.tracker.simple

import datetime

import numpy as np

from .base import Tracker, _TrackerMixInNext
from ..base import Property
from ..dataassociator import DataAssociator
from ..deleter import Deleter
from ..functions import gm_reduce_single
from ..initiator import Initiator
from ..reader import DetectionReader
from ..types.array import StateVectors
from ..types.prediction import GaussianStatePrediction
from ..types.track import Track
from ..types.update import GaussianStateUpdate
from ..updater import Updater


[docs] class SingleTargetTracker(_TrackerMixInNext, Tracker): """A simple single target tracker. Track a single object using Stone Soup components. The tracker works by first calling the :attr:`data_associator` with the active track, and then either updating the track state with the result of the :attr:`updater` if a detection is associated, or with the prediction if no detection is associated to the track. The track is then checked for deletion by the :attr:`deleter`, and if deleted the :attr:`initiator` is called to generate a new track. Similarly if no track is present (i.e. tracker is initialised or deleted in previous iteration), only the :attr:`initiator` is called. Parameters ---------- Attributes ---------- track : :class:`~.Track` Current track being maintained. Also accessible as the sole item in :attr:`tracks` """ initiator: Initiator = Property(doc="Initiator used to initialise the track.") deleter: Deleter = Property(doc="Deleter used to delete tracks.") detector: DetectionReader = Property(doc="Detector used to generate detection objects.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections") updater: Updater = Property(doc="Updater used to update the track object to the new state.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._track = None @property def tracks(self) -> set[Track]: return {self._track} if self._track else set() def __next__(self) -> tuple[datetime.datetime, set[Track]]: time, detections = next(self.detector_iter) if self._track is not None: associations = self.data_associator.associate( self.tracks, detections, time) if associations[self._track]: state_post = self.updater.update(associations[self._track]) self._track.append(state_post) else: self._track.append( associations[self._track].prediction) if self._track is None or self.deleter.delete_tracks(self.tracks): new_tracks = self.initiator.initiate(detections, time) if new_tracks: self._track = new_tracks.pop() else: self._track = None return time, self.tracks
[docs] class SingleTargetMixtureTracker(_TrackerMixInNext, Tracker): """ A simple single target tracking that receives associations from a (Gaussian) Mixture associator. Track single objects using Stone Soup components. The tracker works by first calling the :attr:`data_associator` with the active track, and then either updating the track state with the result of the :attr:`data_associator` that reduces the (Gaussian) Mixture of all possible track-detection associations, or with the prediction if no detection is associated to the track. The track is then checked for deletion by the :attr:`deleter`, and remaining unassociated detections are passed to the :attr:`initiator` to generate new track. Parameters ---------- """ initiator: Initiator = Property(doc="Initiator used to initialise the track.") deleter: Deleter = Property(doc="Deleter used to delete tracks.") detector: DetectionReader = Property(doc="Detector used to generate detection objects.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections") updater: Updater = Property(doc="Updater used to update the track object to the new state.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._track = None @property def tracks(self) -> set[Track]: return {self._track} if self._track else set() def __next__(self) -> tuple[datetime.datetime, set[Track]]: time, detections = next(self.detector_iter) if self._track is not None: associations = self.data_associator.associate( self.tracks, detections, time) unassociated_detections = set(detections) for track, multihypothesis in associations.items(): # calculate the Track's state as a Gaussian Mixture of # its possible associations with each detection, then # reduce the Mixture to a single Gaussian State posterior_states = [] posterior_state_weights = [] for hypothesis in multihypothesis: if not hypothesis: posterior_states.append(hypothesis.prediction) else: posterior_states.append( self.updater.update(hypothesis)) posterior_state_weights.append( hypothesis.probability) means = StateVectors([state.state_vector for state in posterior_states]) covars = np.stack([state.covar for state in posterior_states], axis=2) weights = np.asarray(posterior_state_weights) # Recuce the mixture of states to one posterior estimate Gaussian post_mean, post_covar = gm_reduce_single(means, covars, weights) missed_detection_weight = next(hyp.weight for hyp in multihypothesis if not hyp) # Check if at least one reasonable measurement... if any(hypothesis.weight > missed_detection_weight for hypothesis in multihypothesis): # ...and if so use update type track.append(GaussianStateUpdate( post_mean, post_covar, multihypothesis, multihypothesis[0].measurement.timestamp)) else: # ...and if not, treat as a prediction track.append(GaussianStatePrediction( post_mean, post_covar, multihypothesis[0].prediction.timestamp)) # any detections in multihypothesis that had an # association score (weight) lower than or equal to the # association score of "MissedDetection" is considered # unassociated - candidate for initiating a new Track for hyp in multihypothesis: if hyp.weight > missed_detection_weight: if hyp.measurement in unassociated_detections: unassociated_detections.remove(hyp.measurement) if self._track is None or self.deleter.delete_tracks(self.tracks): new_tracks = self.initiator.initiate(detections, time) if new_tracks: self._track = new_tracks.pop() else: self._track = None return time, self.tracks
[docs] class MultiTargetTracker(_TrackerMixInNext, Tracker): """A simple multi target tracker. Track multiple objects using Stone Soup components. The tracker works by first calling the :attr:`data_associator` with the active tracks, and then either updating the track state with the result of the :attr:`updater` if a detection is associated, or with the prediction if no detection is associated to the track. Tracks are then checked for deletion by the :attr:`deleter`, and remaining unassociated detections are passed to the :attr:`initiator` to generate new tracks. Parameters ---------- """ initiator: Initiator = Property(doc="Initiator used to initialise the track.") deleter: Deleter = Property(doc="Deleter used to delete tracks.") detector: DetectionReader = Property(doc="Detector used to generate detection objects.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections") updater: Updater = Property(doc="Updater used to update the track object to the new state.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._tracks = set() @property def tracks(self) -> set[Track]: return self._tracks def __next__(self) -> tuple[datetime.datetime, set[Track]]: time, detections = next(self.detector_iter) associations = self.data_associator.associate( self.tracks, detections, time) associated_detections = set() for track, hypothesis in associations.items(): if hypothesis: state_post = self.updater.update(hypothesis) track.append(state_post) associated_detections.add(hypothesis.measurement) else: track.append(hypothesis.prediction) self._tracks -= self.deleter.delete_tracks(self.tracks) self._tracks |= self.initiator.initiate( detections - associated_detections, time) return time, self.tracks
[docs] class MultiTargetMixtureTracker(_TrackerMixInNext, Tracker): """A simple multi target tracker that receives associations from a (Gaussian) Mixture associator. Track multiple objects using Stone Soup components. The tracker works by first calling the :attr:`data_associator` with the active tracks, and then either updating the track state with the result of the :attr:`data_associator` that reduces the (Gaussian) Mixture of all possible track-detection associations, or with the prediction if no detection is associated to the track. Tracks are then checked for deletion by the :attr:`deleter`, and remaining unassociated detections are passed to the :attr:`initiator` to generate new tracks. Parameters ---------- """ initiator: Initiator = Property(doc="Initiator used to initialise the track.") deleter: Deleter = Property(doc="Deleter used to delete tracks.") detector: DetectionReader = Property(doc="Detector used to generate detection objects.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections") updater: Updater = Property(doc="Updater used to update the track object to the new state.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._tracks = set() @property def tracks(self) -> set[Track]: return self._tracks def __next__(self) -> tuple[datetime.datetime, set[Track]]: time, detections = next(self.detector_iter) associations = self.data_associator.associate( self.tracks, detections, time) unassociated_detections = set(detections) for track, multihypothesis in associations.items(): # calculate each Track's state as a Gaussian Mixture of # its possible associations with each detection, then # reduce the Mixture to a single Gaussian State posterior_states = [] posterior_state_weights = [] for hypothesis in multihypothesis: if not hypothesis: posterior_states.append(hypothesis.prediction) else: posterior_states.append( self.updater.update(hypothesis)) posterior_state_weights.append( hypothesis.probability) means = StateVectors([state.state_vector for state in posterior_states]) covars = np.stack([state.covar for state in posterior_states], axis=2) weights = np.asarray(posterior_state_weights) post_mean, post_covar = gm_reduce_single(means, covars, weights) missed_detection_weight = next(hyp.weight for hyp in multihypothesis if not hyp) # Check if at least one reasonable measurement... if any(hypothesis.weight > missed_detection_weight for hypothesis in multihypothesis): # ...and if so use update type track.append(GaussianStateUpdate( post_mean, post_covar, multihypothesis, multihypothesis[0].measurement.timestamp)) else: # ...and if not, treat as a prediction track.append(GaussianStatePrediction( post_mean, post_covar, multihypothesis[0].prediction.timestamp)) # any detections in multihypothesis that had an # association score (weight) lower than or equal to the # association score of "MissedDetection" is considered # unassociated - candidate for initiating a new Track for hyp in multihypothesis: if hyp.weight > missed_detection_weight: if hyp.measurement in unassociated_detections: unassociated_detections.remove(hyp.measurement) self._tracks -= self.deleter.delete_tracks(self.tracks) self._tracks |= self.initiator.initiate( unassociated_detections, time) return time, self.tracks