import datetime
import numpy as np
from .base import Tracker, _TrackerMixInNext
from ..base import Property
from ..dataassociator import DataAssociator
from ..deleter import Deleter
from ..functions import gm_reduce_single
from ..initiator import Initiator
from ..reader import DetectionReader
from ..types.array import StateVectors
from ..types.prediction import GaussianStatePrediction
from ..types.track import Track
from ..types.update import GaussianStateUpdate
from ..updater import Updater
[docs]
class SingleTargetTracker(_TrackerMixInNext, Tracker):
"""A simple single target tracker.
Track a single object using Stone Soup components. The tracker works by
first calling the :attr:`data_associator` with the active track, and then
either updating the track state with the result of the :attr:`updater` if
a detection is associated, or with the prediction if no detection is
associated to the track. The track is then checked for deletion by the
:attr:`deleter`, and if deleted the :attr:`initiator` is called to generate
a new track. Similarly if no track is present (i.e. tracker is initialised
or deleted in previous iteration), only the :attr:`initiator` is called.
Parameters
----------
Attributes
----------
track : :class:`~.Track`
Current track being maintained. Also accessible as the sole item in
:attr:`tracks`
"""
initiator: Initiator = Property(doc="Initiator used to initialise the track.")
deleter: Deleter = Property(doc="Deleter used to delete tracks.")
detector: DetectionReader = Property(doc="Detector used to generate detection objects.")
data_associator: DataAssociator = Property(
doc="Association algorithm to pair predictions to detections")
updater: Updater = Property(doc="Updater used to update the track object to the new state.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._track = None
@property
def tracks(self) -> set[Track]:
return {self._track} if self._track else set()
def __next__(self) -> tuple[datetime.datetime, set[Track]]:
time, detections = next(self.detector_iter)
if self._track is not None:
associations = self.data_associator.associate(
self.tracks, detections, time)
if associations[self._track]:
state_post = self.updater.update(associations[self._track])
self._track.append(state_post)
else:
self._track.append(
associations[self._track].prediction)
if self._track is None or self.deleter.delete_tracks(self.tracks):
new_tracks = self.initiator.initiate(detections, time)
if new_tracks:
self._track = new_tracks.pop()
else:
self._track = None
return time, self.tracks
[docs]
class SingleTargetMixtureTracker(_TrackerMixInNext, Tracker):
""" A simple single target tracking that receives associations from a
(Gaussian) Mixture associator.
Track single objects using Stone Soup components. The tracker works by
first calling the :attr:`data_associator` with the active track, and then
either updating the track state with the result of the
:attr:`data_associator` that reduces the (Gaussian) Mixture of all
possible track-detection associations, or with the prediction if no
detection is associated to the track.
The track is then checked for deletion
by the :attr:`deleter`, and remaining unassociated detections are passed
to the :attr:`initiator` to generate new track.
Parameters
----------
"""
initiator: Initiator = Property(doc="Initiator used to initialise the track.")
deleter: Deleter = Property(doc="Deleter used to delete tracks.")
detector: DetectionReader = Property(doc="Detector used to generate detection objects.")
data_associator: DataAssociator = Property(
doc="Association algorithm to pair predictions to detections")
updater: Updater = Property(doc="Updater used to update the track object to the new state.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._track = None
@property
def tracks(self) -> set[Track]:
return {self._track} if self._track else set()
def __next__(self) -> tuple[datetime.datetime, set[Track]]:
time, detections = next(self.detector_iter)
if self._track is not None:
associations = self.data_associator.associate(
self.tracks, detections, time)
unassociated_detections = set(detections)
for track, multihypothesis in associations.items():
# calculate the Track's state as a Gaussian Mixture of
# its possible associations with each detection, then
# reduce the Mixture to a single Gaussian State
posterior_states = []
posterior_state_weights = []
for hypothesis in multihypothesis:
if not hypothesis:
posterior_states.append(hypothesis.prediction)
else:
posterior_states.append(
self.updater.update(hypothesis))
posterior_state_weights.append(
hypothesis.probability)
means = StateVectors([state.state_vector for state in posterior_states])
covars = np.stack([state.covar for state in posterior_states], axis=2)
weights = np.asarray(posterior_state_weights)
# Recuce the mixture of states to one posterior estimate Gaussian
post_mean, post_covar = gm_reduce_single(means, covars, weights)
missed_detection_weight = next(hyp.weight for hyp in multihypothesis if not hyp)
# Check if at least one reasonable measurement...
if any(hypothesis.weight > missed_detection_weight
for hypothesis in multihypothesis):
# ...and if so use update type
track.append(GaussianStateUpdate(
post_mean, post_covar,
multihypothesis,
multihypothesis[0].measurement.timestamp))
else:
# ...and if not, treat as a prediction
track.append(GaussianStatePrediction(
post_mean, post_covar,
multihypothesis[0].prediction.timestamp))
# any detections in multihypothesis that had an
# association score (weight) lower than or equal to the
# association score of "MissedDetection" is considered
# unassociated - candidate for initiating a new Track
for hyp in multihypothesis:
if hyp.weight > missed_detection_weight:
if hyp.measurement in unassociated_detections:
unassociated_detections.remove(hyp.measurement)
if self._track is None or self.deleter.delete_tracks(self.tracks):
new_tracks = self.initiator.initiate(detections, time)
if new_tracks:
self._track = new_tracks.pop()
else:
self._track = None
return time, self.tracks
[docs]
class MultiTargetTracker(_TrackerMixInNext, Tracker):
"""A simple multi target tracker.
Track multiple objects using Stone Soup components. The tracker works by
first calling the :attr:`data_associator` with the active tracks, and then
either updating the track state with the result of the :attr:`updater` if
a detection is associated, or with the prediction if no detection is
associated to the track. Tracks are then checked for deletion by the
:attr:`deleter`, and remaining unassociated detections are passed to the
:attr:`initiator` to generate new tracks.
Parameters
----------
"""
initiator: Initiator = Property(doc="Initiator used to initialise the track.")
deleter: Deleter = Property(doc="Deleter used to delete tracks.")
detector: DetectionReader = Property(doc="Detector used to generate detection objects.")
data_associator: DataAssociator = Property(
doc="Association algorithm to pair predictions to detections")
updater: Updater = Property(doc="Updater used to update the track object to the new state.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tracks = set()
@property
def tracks(self) -> set[Track]:
return self._tracks
def __next__(self) -> tuple[datetime.datetime, set[Track]]:
time, detections = next(self.detector_iter)
associations = self.data_associator.associate(
self.tracks, detections, time)
associated_detections = set()
for track, hypothesis in associations.items():
if hypothesis:
state_post = self.updater.update(hypothesis)
track.append(state_post)
associated_detections.add(hypothesis.measurement)
else:
track.append(hypothesis.prediction)
self._tracks -= self.deleter.delete_tracks(self.tracks)
self._tracks |= self.initiator.initiate(
detections - associated_detections, time)
return time, self.tracks
[docs]
class MultiTargetMixtureTracker(_TrackerMixInNext, Tracker):
"""A simple multi target tracker that receives associations from a
(Gaussian) Mixture associator.
Track multiple objects using Stone Soup components. The tracker works by
first calling the :attr:`data_associator` with the active tracks, and then
either updating the track state with the result of the
:attr:`data_associator` that reduces the (Gaussian) Mixture of all
possible track-detection associations, or with the prediction if no
detection is associated to the track. Tracks are then checked for deletion
by the :attr:`deleter`, and remaining unassociated detections are passed
to the :attr:`initiator` to generate new tracks.
Parameters
----------
"""
initiator: Initiator = Property(doc="Initiator used to initialise the track.")
deleter: Deleter = Property(doc="Deleter used to delete tracks.")
detector: DetectionReader = Property(doc="Detector used to generate detection objects.")
data_associator: DataAssociator = Property(
doc="Association algorithm to pair predictions to detections")
updater: Updater = Property(doc="Updater used to update the track object to the new state.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tracks = set()
@property
def tracks(self) -> set[Track]:
return self._tracks
def __next__(self) -> tuple[datetime.datetime, set[Track]]:
time, detections = next(self.detector_iter)
associations = self.data_associator.associate(
self.tracks, detections, time)
unassociated_detections = set(detections)
for track, multihypothesis in associations.items():
# calculate each Track's state as a Gaussian Mixture of
# its possible associations with each detection, then
# reduce the Mixture to a single Gaussian State
posterior_states = []
posterior_state_weights = []
for hypothesis in multihypothesis:
if not hypothesis:
posterior_states.append(hypothesis.prediction)
else:
posterior_states.append(
self.updater.update(hypothesis))
posterior_state_weights.append(
hypothesis.probability)
means = StateVectors([state.state_vector for state in posterior_states])
covars = np.stack([state.covar for state in posterior_states], axis=2)
weights = np.asarray(posterior_state_weights)
post_mean, post_covar = gm_reduce_single(means, covars, weights)
missed_detection_weight = next(hyp.weight for hyp in multihypothesis if not hyp)
# Check if at least one reasonable measurement...
if any(hypothesis.weight > missed_detection_weight
for hypothesis in multihypothesis):
# ...and if so use update type
track.append(GaussianStateUpdate(
post_mean, post_covar,
multihypothesis,
multihypothesis[0].measurement.timestamp))
else:
# ...and if not, treat as a prediction
track.append(GaussianStatePrediction(
post_mean, post_covar,
multihypothesis[0].prediction.timestamp))
# any detections in multihypothesis that had an
# association score (weight) lower than or equal to the
# association score of "MissedDetection" is considered
# unassociated - candidate for initiating a new Track
for hyp in multihypothesis:
if hyp.weight > missed_detection_weight:
if hyp.measurement in unassociated_detections:
unassociated_detections.remove(hyp.measurement)
self._tracks -= self.deleter.delete_tracks(self.tracks)
self._tracks |= self.initiator.initiate(
unassociated_detections, time)
return time, self.tracks