Source code for stonesoup.predictor.particle

import copy
from typing import Sequence

import numpy as np

from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..models.transition import TransitionModel
from ..types.prediction import Prediction
from ..types.state import GaussianState


[docs]class ParticlePredictor(Predictor): """ParticlePredictor class An implementation of a Particle Filter predictor. """
[docs] @predict_lru_cache() def predict(self, prior, timestamp=None, **kwargs): """Particle Filter prediction step Parameters ---------- prior : :class:`~.ParticleState` A prior state object timestamp: :class:`datetime.datetime`, optional A timestamp signifying when the prediction is performed (the default is `None`) Returns ------- : :class:`~.ParticleStatePrediction` The predicted state """ # Compute time_interval try: time_interval = timestamp - prior.timestamp except TypeError: # TypeError: (timestamp or prior.timestamp) is None time_interval = None new_state_vector = self.transition_model.function( prior, noise=True, time_interval=time_interval, **kwargs) return Prediction.from_state(prior, state_vector=new_state_vector, weight=prior.weight, timestamp=timestamp, particle_list=None, transition_model=self.transition_model)
[docs]class ParticleFlowKalmanPredictor(ParticlePredictor): """Gromov Flow Parallel Kalman Particle Predictor This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which can use a :class:`~.ExtendedKalmanPredictor` or :class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a state covariance, as proposed in [1]_. This should be used in conjunction with the :class:`~.ParticleFlowKalmanUpdater`. Parameters ---------- References ---------- .. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang Exact-Flow Particle Filter" 2012 """ kalman_predictor: KalmanPredictor = Property( default=None, doc="Kalman predictor to use. Default `None` where a new instance of" ":class:`~.ExtendedKalmanPredictor` will be created utilising the" "same transition model.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.kalman_predictor is None: self.kalman_predictor = ExtendedKalmanPredictor( self.transition_model)
[docs] def predict(self, prior, *args, **kwargs): particle_prediction = super().predict(prior, *args, **kwargs) kalman_prediction = self.kalman_predictor.predict( GaussianState(prior.state_vector, prior.covar, prior.timestamp), *args, **kwargs) return Prediction.from_state(prior, state_vector=particle_prediction.state_vector, weight=particle_prediction.weight, timestamp=particle_prediction.timestamp, fixed_covar=kalman_prediction.covar, particle_list=None, transition_model=self.transition_model)
[docs]class MultiModelPredictor(Predictor): """MultiModelPredictor class An implementation of a Particle Filter predictor utilising multiple models. """ transition_model = None transition_models: Sequence[TransitionModel] = Property( doc="Transition models used to for particle transition, selected by model index on " "particle. Models dimensions can be subset of the overall state space, by " "using :attr:`model_mappings`." ) transition_matrix: np.ndarray = Property( doc="n-model by n-model transition matrix." ) model_mappings: Sequence[Sequence[int]] = Property( doc="Sequence of mappings associated with each transition model. This enables mapping " "between model and state space, enabling use of models that may have different " "dimensions (e.g. velocity or acceleration). Parts of the state that aren't mapped " "are set to zero.") @property def probabilities(self): return np.cumsum(self.transition_matrix, axis=1)
[docs] @predict_lru_cache() def predict(self, prior, timestamp=None, **kwargs): """Particle Filter prediction step Parameters ---------- prior : :class:`~.MultiModelParticleState` A prior state object timestamp: :class:`datetime.datetime`, optional A timestamp signifying when the prediction is performed (the default is `None`) Returns ------- : :class:`~.ParticleStatePrediction` The predicted state """ new_particle_state = Prediction.from_state( prior, state_vector=copy.copy(prior.state_vector), parent=prior, particle_list=None, dynamic_model=copy.copy(prior.dynamic_model), timestamp=timestamp) for model_index, transition_model in enumerate(self.transition_models): # Change the value of the dynamic value randomly according to the defined # transition matrix current_model_indices = prior.dynamic_model == model_index current_model_count = np.count_nonzero(current_model_indices) if current_model_count == 0: continue new_dynamic_models = np.searchsorted( self.probabilities[model_index], np.random.random(size=current_model_count)) new_state_vector = self.apply_model( prior[current_model_indices], transition_model, timestamp, self.model_mappings[model_index], noise=True, **kwargs) new_particle_state.state_vector[:, current_model_indices] = new_state_vector new_particle_state.dynamic_model[current_model_indices] = new_dynamic_models return new_particle_state
@staticmethod def apply_model(prior, transition_model, timestamp, model_mapping, **kwargs): # Based on given position mapping create a new state vector that contains only the # required states orig_ndim = prior.ndim prior.state_vector = prior.state_vector[model_mapping, :] new_state_vector = transition_model.function( prior, time_interval=timestamp - prior.timestamp, **kwargs) # Calculate the indices removed from the state vector to become compatible with the # dynamic model for j in range(orig_ndim): if j not in model_mapping: new_state_vector = np.insert(new_state_vector, j, 0, axis=0) return new_state_vector
[docs]class RaoBlackwellisedMultiModelPredictor(MultiModelPredictor): """Rao-Blackwellised Multi Model Predictor class An implementation of a Particle Filter predictor utilising multiple models, with per particle model probabilities. """
[docs] @predict_lru_cache() def predict(self, prior, timestamp=None, **kwargs): """Particle Filter prediction step Parameters ---------- prior : :class:`~.RaoBlackwellisedParticleState` A prior state object timestamp: :class:`datetime.datetime`, optional A timestamp signifying when the prediction is performed (the default is `None`) Returns ------- : :class:`~.ParticleStatePrediction` The predicted state """ new_particle_state = Prediction.from_state( prior, state_vector=copy.copy(prior.state_vector), parent=prior, particle_list=None, timestamp=timestamp) # Change the value of the dynamic value randomly according to the # matrix new_dynamic_models = np.argmax( prior.model_probabilities.astype(float).cumsum(0) > np.random.rand(len(prior)), axis=0) for model_index, transition_model in enumerate(self.transition_models): new_model_indices = new_dynamic_models == model_index if np.count_nonzero(new_model_indices) == 0: continue new_state_vector = self.apply_model( prior[new_model_indices], transition_model, timestamp, self.model_mappings[model_index], noise=True, **kwargs) new_particle_state.state_vector[:, new_model_indices] = new_state_vector return new_particle_state