Source code for stonesoup.predictor.particle

from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..types.prediction import Prediction
from ..types.state import GaussianState

[docs]class ParticlePredictor(Predictor): """ParticlePredictor class An implementation of a Particle Filter predictor. """
[docs] @predict_lru_cache() def predict(self, prior, timestamp=None, **kwargs): """Particle Filter prediction step Parameters ---------- prior : :class:`~.ParticleState` A prior state object timestamp: :class:`datetime.datetime`, optional A timestamp signifying when the prediction is performed (the default is `None`) Returns ------- : :class:`~.ParticleStatePrediction` The predicted state """ # Compute time_interval try: time_interval = timestamp - prior.timestamp except TypeError: # TypeError: (timestamp or prior.timestamp) is None time_interval = None new_state_vector = self.transition_model.function( prior, noise=True, time_interval=time_interval, **kwargs) return Prediction.from_state(prior, state_vector=new_state_vector, weight=prior.weight, timestamp=timestamp, particle_list=None, transition_model=self.transition_model)
[docs]class ParticleFlowKalmanPredictor(ParticlePredictor): """Gromov Flow Parallel Kalman Particle Predictor This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which can use a :class:`~.ExtendedKalmanPredictor` or :class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a state covariance, as proposed in [1]_. This should be used in conjunction with the :class:`~.ParticleFlowKalmanUpdater`. Parameters ---------- References ---------- .. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang Exact-Flow Particle Filter" 2012 """ kalman_predictor: KalmanPredictor = Property( default=None, doc="Kalman predictor to use. Default `None` where a new instance of" ":class:`~.ExtendedKalmanPredictor` will be created utilising the" "same transition model.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.kalman_predictor is None: self.kalman_predictor = ExtendedKalmanPredictor( self.transition_model)
[docs] def predict(self, prior, *args, **kwargs): particle_prediction = super().predict(prior, *args, **kwargs) kalman_prediction = self.kalman_predictor.predict( GaussianState(prior.state_vector, prior.covar, prior.timestamp), *args, **kwargs) return Prediction.from_state(prior, state_vector=particle_prediction.state_vector, weight=particle_prediction.weight, timestamp=particle_prediction.timestamp, fixed_covar=kalman_prediction.covar, particle_list=None, transition_model=self.transition_model)