Source code for stonesoup.initiator.simple

import numpy as np
from scipy.stats import multivariate_normal

from .base import GaussianInitiator, ParticleInitiator, Initiator
from ..base import Property
from ..dataassociator import DataAssociator
from ..deleter import Deleter
from ..models.base import LinearModel, ReversibleModel
from ..models.measurement import MeasurementModel
from ..types.hypothesis import SingleHypothesis
from ..types.numeric import Probability
from ..types.particle import Particle
from ..types.state import State, GaussianState, ParticleState
from ..types.track import Track
from ..types.update import GaussianStateUpdate, ParticleStateUpdate, Update
from ..updater import Updater
from ..updater.kalman import ExtendedKalmanUpdater


[docs]class SinglePointInitiator(GaussianInitiator): """SinglePointInitiator class This uses an :class:`~.ExtendedKalmanUpdater` to carry out an update using provided :attr:`prior_state` for each unassociated detection. """ prior_state: GaussianState = Property(doc="Prior state information") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.")
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with an initial :class:`~.GaussianState` """ updater = ExtendedKalmanUpdater(self.measurement_model) tracks = set() for detection in detections: measurement_prediction = updater.predict_measurement( self.prior_state, detection.measurement_model) track_state = updater.update(SingleHypothesis( self.prior_state, detection, measurement_prediction)) track = Track([track_state]) tracks.add(track) return tracks
[docs]class SimpleMeasurementInitiator(GaussianInitiator): """Initiator that maps measurement space to state space Works for both linear and non-linear co-ordinate input This initiator utilises the :class:`~.MeasurementModel` matrix to convert :class:`~.Detection` state vector and model covariance into state space. It either takes the :class:`~.MeasurementModel` from the given detection or uses the :attr:`measurement_model`. Utilises the ReversibleModel inverse function to convert non-linear spherical co-ordinates into Cartesian x/y co-ordinates for use in predictions and mapping. This then replaces mapped values in the :attr:`prior_state` to form the initial :class:`~.GaussianState` of the :class:`~.Track`. The diagonal loading value is used to try to ensure that the estimated covariance matrix is positive definite, especially for subsequent Cholesky decompositions. """ prior_state: GaussianState = Property(doc="Prior state information") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.") skip_non_reversible: bool = Property(default=False) diag_load: float = Property(default=0.0, doc="Positive float value for diagonal loading") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.diag_load < 0: raise ValueError( "diag_load value can't be less than 0.0")
[docs] def initiate(self, detections, timestamp, **kwargs): tracks = set() for detection in detections: if detection.measurement_model is not None: measurement_model = detection.measurement_model else: if self.measurement_model is None: raise ValueError("No measurement model specified") else: measurement_model = self.measurement_model if isinstance(measurement_model, LinearModel): model_matrix = measurement_model.matrix() inv_model_matrix = np.linalg.pinv(model_matrix) state_vector = inv_model_matrix @ detection.state_vector else: if isinstance(measurement_model, ReversibleModel): try: state_vector = measurement_model.inverse_function( detection) except NotImplementedError: if not self.skip_non_reversible: raise else: continue model_matrix = measurement_model.jacobian(State( state_vector)) inv_model_matrix = np.linalg.pinv(model_matrix) elif self.skip_non_reversible: continue else: raise Exception("Invalid measurement model used.\ Must be instance of linear or reversible.") model_covar = measurement_model.covar() prior_state_vector = self.prior_state.state_vector.copy() prior_covar = self.prior_state.covar.copy() mapped_dimensions = measurement_model.mapping prior_state_vector[mapped_dimensions, :] = 0 prior_covar[mapped_dimensions, :] = 0 C0 = inv_model_matrix @ model_covar @ inv_model_matrix.T C0 = C0 + prior_covar + np.diag(np.array([self.diag_load] * C0.shape[0])) tracks.add(Track([GaussianStateUpdate( prior_state_vector + state_vector, C0, SingleHypothesis(None, detection), timestamp=detection.timestamp) ])) return tracks
[docs]class MultiMeasurementInitiator(GaussianInitiator): """Multi-measurement initiator. Utilises features of the tracker to initiate and hold tracks temporarily within the initiator itself, releasing them to the tracker once there are multiple detections associated with them enough to determine that they are 'sure' tracks. Utilises simple initiator to initiate tracks to hold -> prevents code duplication. Solves issue of short-lived single detection tracks being initiated only to then be removed shortly after. Does cause slight delay in initiation to tracker.""" prior_state: GaussianState = Property(doc="Prior state information") deleter: Deleter = Property(doc="Deleter used to delete the track.") data_associator: DataAssociator = Property( doc="Association algorithm to pair predictions to detections.") updater: Updater = Property( doc="Updater used to update the track object to the new state.") measurement_model: MeasurementModel = Property( default=None, doc="Measurement model. Can be left as None if all detections have a " "valid measurement model.") min_points: int = Property( default=2, doc="Minimum number of track points required to confirm a track.") updates_only: bool = Property( default=True, doc="Whether :attr:`min_points` only counts :class:`~.Update` states.") initiator: Initiator = Property( default=None, doc="Initiator used to create tracks. If None, a :class:`SimpleMeasurementInitiator` will " "be created using :attr:`prior_state` and :attr:`measurement_model`. Otherwise, these " "attributes are ignored.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.holding_tracks = set() if self.initiator is None: self.initiator = SimpleMeasurementInitiator(self.prior_state, self.measurement_model)
[docs] def initiate(self, detections, timestamp, **kwargs): sure_tracks = set() associated_detections = set() if self.holding_tracks: associations = self.data_associator.associate( self.holding_tracks, detections, timestamp) for track, hypothesis in associations.items(): if hypothesis: state_post = self.updater.update(hypothesis) track.append(state_post) associated_detections.add(hypothesis.measurement) else: track.append(hypothesis.prediction) if sum(1 for state in track if not self.updates_only or isinstance(state, Update))\ >= self.min_points: sure_tracks.add(track) self.holding_tracks.remove(track) self.holding_tracks -= self.deleter.delete_tracks(self.holding_tracks) self.holding_tracks |= self.initiator.initiate( detections - associated_detections, timestamp) return sure_tracks
[docs]class GaussianParticleInitiator(ParticleInitiator): """Gaussian Particle Initiator class Utilising Gaussian Initiator, sample from the resultant track's state to generate a number of particles, overwriting with a :class:`~.ParticleState`. """ initiator: GaussianInitiator = Property( doc="Gaussian Initiator which will be used to generate tracks.") number_particles: int = Property( default=200, doc="Number of particles for initial track state") use_fixed_covar: bool = Property( default=False, doc="If `True`, the Gaussian state covariance is used for the " ":class:`~.ParticleState` as a fixed covariance. Default `False`.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Create prior particle state try: samples = multivariate_normal.rvs(self.initiator.prior_state.state_vector.ravel(), self.initiator.prior_state.covar, size=self.number_particles) except AttributeError: raise AttributeError("No prior state") particles = [ Particle(sample.reshape(-1, 1), weight=self.weight) for sample in samples] self.prior_state = ParticleState( particles, fixed_covar=self.initiator.prior_state.covar if self.use_fixed_covar else None ) @property def weight(self): return Probability(1 / self.number_particles)
[docs] def initiate(self, detections, timestamp, **kwargs): """Initiates tracks given unassociated measurements Parameters ---------- detections : set of :class:`~.Detection` A list of unassociated detections timestamp: datetime.datetime Current timestamp Returns ------- : set of :class:`~.Track` A list of new tracks with a initial :class:`~.ParticleState` """ tracks = self.initiator.initiate(detections, timestamp, **kwargs) for track in tracks: samples = multivariate_normal.rvs(track.state_vector.ravel(), track.covar, size=self.number_particles) particles = [ Particle(sample.reshape(-1, 1), weight=self.weight) for sample in samples] track[-1] = ParticleStateUpdate( particles, track.hypothesis, fixed_covar=track.covar if self.use_fixed_covar else None, timestamp=track.timestamp) return tracks