Source code for stonesoup.predictor.particle

# -*- coding: utf-8 -*-

from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..types.particle import Particles
from ..types.prediction import Prediction, ParticleStatePrediction
from ..types.state import GaussianState


[docs]class ParticlePredictor(Predictor): """ParticlePredictor class An implementation of a Particle Filter predictor. """
[docs] @predict_lru_cache() def predict(self, prior, control_input=None, timestamp=None, **kwargs): """Particle Filter prediction step Parameters ---------- prior : :class:`~.ParticleState` A prior state object control_input : :class:`~.State`, optional The control input. It will only have an effect if :attr:`control_model` is not `None` (the default is `None`) timestamp: :class:`datetime.datetime`, optional A timestamp signifying when the prediction is performed (the default is `None`) Returns ------- : :class:`~.ParticleStatePrediction` The predicted state """ # Compute time_interval try: time_interval = timestamp - prior.timestamp except TypeError: # TypeError: (timestamp or prior.timestamp) is None time_interval = None new_state_vector = self.transition_model.function( prior.particles, noise=True, time_interval=time_interval, num_samples=len(prior.particles), **kwargs) new_particles = Particles(state_vector=new_state_vector, weight=prior.particles.weight, parent=prior.particles.parent) return Prediction.from_state(prior, particles=new_particles, timestamp=timestamp, transition_model=self.transition_model)
[docs]class ParticleFlowKalmanPredictor(ParticlePredictor): """Gromov Flow Parallel Kalman Particle Predictor This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which can use a :class:`~.ExtendedKalmanPredictor` or :class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a state covariance, as proposed in [1]_. This should be used in conjunction with the :class:`~.ParticleFlowKalmanUpdater`. Parameters ---------- References ---------- .. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang Exact-Flow Particle Filter" 2012 """ kalman_predictor: KalmanPredictor = Property( default=None, doc="Kalman predictor to use. Default `None` where a new instance of" ":class:`~.ExtendedKalmanPredictor` will be created utilising the" "same transition model.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.kalman_predictor is None: self.kalman_predictor = ExtendedKalmanPredictor( self.transition_model)
[docs] def predict(self, prior, *args, **kwargs): particle_prediction = super().predict(prior, *args, **kwargs) kalman_prediction = self.kalman_predictor.predict( GaussianState(prior.state_vector, prior.covar, prior.timestamp), *args, **kwargs) return ParticleStatePrediction( particle_prediction.particles, kalman_prediction.covar, timestamp=particle_prediction.timestamp)