Source code for stonesoup.initiator.simple

import numpy as np
from scipy.stats import multivariate_normal

from .base import GaussianInitiator, ParticleInitiator, Initiator
from ..base import Property
from ..dataassociator import DataAssociator
from ..deleter import Deleter
from ..models.base import LinearModel, ReversibleModel
from ..models.measurement import MeasurementModel
from ..types.hypothesis import SingleHypothesis
from ..types.mixture import GaussianMixture
from ..types.numeric import Probability
from ..types.particle import Particle
from ..types.state import State, GaussianState, ParticleState, TaggedWeightedGaussianState, \
    ASDGaussianState, EnsembleState
from ..types.track import Track
from ..types.update import ParticleStateUpdate, Update, \
    GaussianMixtureUpdate, ASDGaussianStateUpdate, EnsembleStateUpdate
from ..updater import Updater
from ..updater.kalman import ExtendedKalmanUpdater


[docs] class SinglePointInitiator(GaussianInitiator): """SinglePointInitiator class This uses an :class:`~.ExtendedKalmanUpdater` to carry out an update using provided :attr:`prior_state` for each unassociated detection. """ prior_state: GaussianState = Property(doc="Prior state information") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.")
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.GaussianState` """ updater = ExtendedKalmanUpdater(self.measurement_model) tracks = set() for detection in detections: measurement_prediction = updater.predict_measurement( self.prior_state, detection.measurement_model) track_state = updater.update(SingleHypothesis( self.prior_state, detection, measurement_prediction)) track = Track([track_state]) tracks.add(track) return tracks
[docs] class SimpleMeasurementInitiator(GaussianInitiator): """Initiator that maps measurement space to state space Works for both linear and non-linear co-ordinate input This initiator utilises the :class:`~.MeasurementModel` matrix to convert :class:`~.Detection` state vector and model covariance into state space. It either takes the :class:`~.MeasurementModel` from the given detection or uses the :attr:`measurement_model`. Utilises the ReversibleModel inverse function to convert non-linear spherical co-ordinates into Cartesian x/y co-ordinates for use in predictions and mapping. This then replaces mapped values in the :attr:`prior_state` to form the initial :class:`~.GaussianState` of the :class:`~.Track`. The diagonal loading value is used to try to ensure that the estimated covariance matrix is positive definite, especially for subsequent Cholesky decompositions. """ prior_state: GaussianState = Property(doc="Prior state information") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.") skip_non_reversible: bool = Property(default=False) diag_load: float = Property(default=0.0, doc="Positive float value for diagonal loading") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.diag_load < 0: raise ValueError( "diag_load value can't be less than 0.0")
[docs] def initiate(self, detections, timestamp, **kwargs): tracks = set() for detection in detections: if detection.measurement_model is not None: measurement_model = detection.measurement_model else: if self.measurement_model is None: raise ValueError("No measurement model specified") else: measurement_model = self.measurement_model if isinstance(measurement_model, LinearModel): model_matrix = measurement_model.matrix() inv_model_matrix = np.linalg.pinv(model_matrix) state_vector = inv_model_matrix @ detection.state_vector else: if isinstance(measurement_model, ReversibleModel): try: state_vector = measurement_model.inverse_function( detection) except NotImplementedError: if not self.skip_non_reversible: raise else: continue model_matrix = measurement_model.jacobian(State( state_vector)) inv_model_matrix = np.linalg.pinv(model_matrix) elif self.skip_non_reversible: continue else: raise Exception("Invalid measurement model used.\ Must be instance of linear or reversible.") model_covar = measurement_model.covar() prior_state_vector = self.prior_state.state_vector.copy() prior_covar = self.prior_state.covar.copy() mapped_dimensions = measurement_model.mapping prior_state_vector[mapped_dimensions, :] = 0 prior_covar[mapped_dimensions, :] = 0 C0 = inv_model_matrix @ model_covar @ inv_model_matrix.T C0 = C0 + prior_covar + np.diag(np.array([self.diag_load] * C0.shape[0])) tracks.add(Track([Update.from_state( self.prior_state, state_vector=prior_state_vector + state_vector, covar=C0, hypothesis=SingleHypothesis(None, detection), timestamp=detection.timestamp) ])) return tracks
[docs] class MultiMeasurementInitiator(GaussianInitiator): """Multi-measurement initiator. Utilises features of the tracker to initiate and hold tracks temporarily within the initiator itself, releasing them to the tracker once there are multiple detections associated with them enough to determine that they are 'sure' tracks. Utilises simple initiator to initiate tracks to hold -> prevents code duplication. Solves issue of short-lived single detection tracks being initiated only to then be removed shortly after. Does cause slight delay in initiation to tracker.""" prior_state: GaussianState = Property(doc="Prior state information") deleter: Deleter = Property(doc="Deleter used to delete the track.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections.") updater: Updater = Property( doc="Updater used to update the track object to the new state.") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.") min_points: int = Property( default=2, doc="Minimum number of track points required to confirm a track.") updates_only: bool = Property( default=True, doc="Whether :attr:`min_points` only counts :class:`~.Update` states.") initiator: Initiator = Property( default=None, doc="Initiator used to create tracks. If None, a :class:`SimpleMeasurementInitiator` will " "be created using :attr:`prior_state` and :attr:`measurement_model`. Otherwise, these " "attributes are ignored.") skip_non_reversible: bool = Property( default=False, doc="Skip measurements that do not have a reversible measurement model. " "Only allow measurements with a measurement model that is an instance " "of a :class:`~.LinearModel` or a :class:`~.ReversibleModel`.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.holding_tracks = set() if self.initiator is None: self.initiator = SimpleMeasurementInitiator(self.prior_state, self.measurement_model)
[docs] def initiate(self, detections, timestamp, **kwargs): sure_tracks = set() associated_detections = set() if self.skip_non_reversible: detections = {det for det in detections if isinstance(det.measurement_model, (ReversibleModel, LinearModel))} if self.holding_tracks: associations = self.data_associator.associate( self.holding_tracks, detections, timestamp) for track, hypothesis in associations.items(): if hypothesis: state_post = self.updater.update(hypothesis) track.append(state_post) associated_detections.add(hypothesis.measurement) else: track.append(hypothesis.prediction) if sum(1 for state in track if not self.updates_only or isinstance(state, Update))\ >= self.min_points: sure_tracks.add(track) self.holding_tracks.remove(track) self.holding_tracks -= self.deleter.delete_tracks(self.holding_tracks) self.holding_tracks |= self.initiator.initiate( detections - associated_detections, timestamp) return sure_tracks
[docs] class NoHistoryMultiMeasurementInitiator(MultiMeasurementInitiator): """ This initiator is very similar to :class:`MultiMeasurementInitiator`. The only difference being that the holding track’s history is moved to the metadata so that initialised tracks only have one state. """
[docs] def initiate(self, *args, **kwargs): tracks = super().initiate(*args, **kwargs) return {Track(id=track.id, states=[track.state], init_metadata=dict(holding_track=track, **track.metadata)) for track in tracks}
[docs] class GaussianParticleInitiator(ParticleInitiator): """Gaussian Particle Initiator class Utilising Gaussian Initiator, sample from the resultant track's state to generate a number of particles, overwriting with a :class:`~.ParticleState`. """ initiator: GaussianInitiator = Property( doc="Gaussian Initiator which will be used to generate tracks.") number_particles: int = Property( default=200, doc="Number of particles for initial track state") use_fixed_covar: bool = Property( default=False, doc="If `True`, the Gaussian state covariance is used for the " ":class:`~.ParticleState` as a fixed covariance. Default `False`.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Create prior particle state try: samples = multivariate_normal.rvs(self.initiator.prior_state.state_vector.ravel(), self.initiator.prior_state.covar, size=self.number_particles) except AttributeError: raise AttributeError("No prior state") particles = [ Particle(sample.reshape(-1, 1), weight=self.weight) for sample in samples] self.prior_state = ParticleState( particles, fixed_covar=self.initiator.prior_state.covar if self.use_fixed_covar else None ) @property def weight(self): return Probability(1 / self.number_particles)
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with a initial :class:`~.ParticleState` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: samples = multivariate_normal.rvs(track.state_vector.ravel(), track.covar, size=self.number_particles) particles = [ Particle(sample.reshape(-1, 1), weight=self.weight) for sample in samples] track[-1] = ParticleStateUpdate( None, track.hypothesis, particle_list=particles, fixed_covar=track.covar if self.use_fixed_covar else None, timestamp=track.timestamp) return tracks
[docs] class GaussianMixtureInitiator(GaussianInitiator): """Gaussian Mixture Initiator class Utilising Gaussian Initiator, applying the resultant track's state to generate a Tagged Weighted Gaussian State, overwriting with a :class:`~.GaussianMixture`. """ initiator: GaussianInitiator = Property( doc="Gaussian Initiator which will be used to generate tracks.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Create prior particle state try: state = self.initiator.prior_state.state_vector covar = self.initiator.prior_state.covar except AttributeError: raise AttributeError("No prior state") self.prior_state = GaussianMixture([ TaggedWeightedGaussianState( state_vector=state, covar=covar, weight=Probability(1), tag=[])])
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.GaussianMixture` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: for n, state in enumerate(track): mixture = [ TaggedWeightedGaussianState( state_vector=state.state_vector, covar=state.covar, weight=Probability(1), timestamp=state.timestamp, tag=[])] track[n] = GaussianMixtureUpdate( hypothesis=getattr(state, 'hypothesis', None), components=mixture) return tracks
[docs] class ASDGaussianInitiator(GaussianInitiator): """ASD Gaussian State Initiator class Utilising Gaussian Initiator, sample from the resultant track's state to generate an ASD Gaussian State, overwriting with a :class:`~.ASDGaussianState`. """ initiator: GaussianInitiator = Property( doc="Gaussian Initiator which will be used to generate tracks.") max_nstep: int = Property( default=0, doc="Decides when the state is pruned in a prediction step. If 0 then there is no pruning") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Create prior particle state try: state = self.initiator.prior_state.state_vector covar = self.initiator.prior_state.covar except AttributeError: raise AttributeError("No prior state") self.prior_state = ASDGaussianState(multi_state_vector=state, timestamps=None, max_nstep=self.max_nstep, multi_covar=covar)
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.ASDGaussianState` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: state = track.state_vector covar = track.covar timestamp = track.timestamp track[-1] = ASDGaussianStateUpdate( multi_state_vector=state, timestamps=timestamp, max_nstep=self.max_nstep, multi_covar=covar, hypothesis=track.hypothesis) return tracks
[docs] class EnsembleInitiator(GaussianInitiator): """Ensemble State Initiator class Utilising Gaussian Initiator, sample from the resultant track's state to generate an Ensemble, overwriting with a :class:`~.EnsembleState`. """ initiator: GaussianInitiator = Property( doc="Gaussian Initiator which will be used to generate tracks.") ensemble_size: int = Property( default=100, doc="Integer to determine the size of the Gaussian Ensemble State.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Create prior particle state try: state = self.initiator.prior_state except AttributeError: raise AttributeError("No prior state") self.prior_state = EnsembleState.from_gaussian_state(state, self.ensemble_size)
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.EnsembleState` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: gaussian_state = GaussianState(track.state_vector, track.covar, track.timestamp) track[-1] = EnsembleStateUpdate.from_gaussian_state( gaussian_state, self.ensemble_size, hypothesis=track.hypothesis) return tracks
[docs] class ParticleGaussianInitiator(GaussianInitiator): """Particle Gaussian Initiator class Utilising Particle Initiator, convert the resultant track's state to generate a Gaussian state, overwriting with a :class:`~.GaussianState`. """ initiator: ParticleInitiator = Property( doc="Particle Initiator which will be used to generate tracks.")
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.GaussianState` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: mu = track.mean covar = track.covar timestamp = track.timestamp track[-1] = State.from_state( state=track.state, state_vector=mu, covar=covar, timestamp=timestamp, target_type=GaussianState ) return tracks