from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..types.prediction import Prediction
from ..types.state import GaussianState
[docs]class ParticlePredictor(Predictor):
"""ParticlePredictor class
An implementation of a Particle Filter predictor.
"""
[docs] @predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.ParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
# Compute time_interval
try:
time_interval = timestamp - prior.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
time_interval = None
new_state_vector = self.transition_model.function(
prior,
noise=True,
time_interval=time_interval,
**kwargs)
return Prediction.from_state(prior, state_vector=new_state_vector, weight=prior.weight,
timestamp=timestamp, particle_list=None,
transition_model=self.transition_model)
[docs]class ParticleFlowKalmanPredictor(ParticlePredictor):
"""Gromov Flow Parallel Kalman Particle Predictor
This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which
can use a :class:`~.ExtendedKalmanPredictor` or
:class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a
state covariance, as proposed in [1]_.
This should be used in conjunction with the
:class:`~.ParticleFlowKalmanUpdater`.
Parameters
----------
References
----------
.. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang
Exact-Flow Particle Filter" 2012
"""
kalman_predictor: KalmanPredictor = Property(
default=None,
doc="Kalman predictor to use. Default `None` where a new instance of"
":class:`~.ExtendedKalmanPredictor` will be created utilising the"
"same transition model.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kalman_predictor is None:
self.kalman_predictor = ExtendedKalmanPredictor(
self.transition_model)
[docs] def predict(self, prior, *args, **kwargs):
particle_prediction = super().predict(prior, *args, **kwargs)
kalman_prediction = self.kalman_predictor.predict(
GaussianState(prior.state_vector, prior.covar, prior.timestamp),
*args, **kwargs)
return Prediction.from_state(prior, state_vector=particle_prediction.state_vector,
weight=particle_prediction.weight,
timestamp=particle_prediction.timestamp,
fixed_covar=kalman_prediction.covar, particle_list=None,
transition_model=self.transition_model)