import copy
from functools import lru_cache
import numpy as np
from scipy.linalg import inv
from scipy.special import logsumexp
from .base import Updater
from .kalman import KalmanUpdater, ExtendedKalmanUpdater
from ..base import Property
from ..functions import cholesky_eps, sde_euler_maruyama_integration
from ..predictor.particle import MultiModelPredictor, RaoBlackwellisedMultiModelPredictor
from ..resampler import Resampler
from ..types.prediction import (
Prediction, ParticleMeasurementPrediction, GaussianStatePrediction, MeasurementPrediction)
from ..types.update import ParticleStateUpdate, Update
[docs]class ParticleUpdater(Updater):
"""Particle Updater
Perform an update by multiplying particle weights by PDF of measurement
model (either :attr:`~.Detection.measurement_model` or
:attr:`measurement_model`), and normalising the weights. If provided, a
:attr:`resampler` will be used to take a new sample of particles (this is
called every time, and it's up to the resampler to decide if resampling is
required).
"""
resampler: Resampler = Property(default=None, doc='Resampler to prevent particle degeneracy')
[docs] def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.ParticleState`
The state posterior
"""
predicted_state = copy.copy(hypothesis.prediction)
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
new_weight = predicted_state.log_weight + measurement_model.logpdf(
hypothesis.measurement, predicted_state, **kwargs)
# Normalise the weights
new_weight -= logsumexp(new_weight)
predicted_state.log_weight = new_weight
# Resample
if self.resampler is not None:
predicted_state = self.resampler.resample(predicted_state)
return Update.from_state(
state=hypothesis.prediction,
state_vector=predicted_state.state_vector,
log_weight=predicted_state.log_weight,
hypothesis=hypothesis,
timestamp=hypothesis.measurement.timestamp,
)
[docs] @lru_cache()
def predict_measurement(self, state_prediction, measurement_model=None,
**kwargs):
if measurement_model is None:
measurement_model = self.measurement_model
new_state_vector = measurement_model.function(state_prediction, **kwargs)
return MeasurementPrediction.from_state(
state_prediction, state_vector=new_state_vector, timestamp=state_prediction.timestamp)
[docs]class GromovFlowParticleUpdater(Updater):
"""Gromov Flow Particle Updater
This is implementation of Gromov method for stochastic particle flow
filters [#]_. The Euler Maruyama method is used for integration, over 20
steps using an exponentially increase step size.
Parameters
----------
References
----------
.. [#] Daum, Fred & Huang, Jim & Noushin, Arjang. "Generalized Gromov
method for stochastic particle flow filters." 2017
"""
[docs] def update(self, hypothesis, **kwargs):
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
num_steps = 20
b = 2
s0 = (b-1) / (b**num_steps - 1)
steps = [s0*b**n for n in range(num_steps)]
time_steps = np.zeros((len(steps) + 1, ))
time_steps[1:] = np.cumsum(steps)
P = hypothesis.prediction.covar
R = measurement_model.covar()
inv_R = inv(R)
# Start by making our own copy of the particle before we move them...
particles = [copy.copy(particle) for particle in hypothesis.prediction.particles]
def function(state, lambda_):
try:
H = measurement_model.matrix()
except AttributeError:
H = measurement_model.jacobian(state)
# Eq. (12) Ref [1]
a = P - lambda_*P@H.T@inv(R + lambda_*H@P@H.T)@H@P
b = a @ H.T @ inv_R
measurement_particle_state_vector = measurement_model.function(state, **kwargs)
f = -b @ (measurement_particle_state_vector - hypothesis.measurement.state_vector)
Q = b @ H @ a
B = cholesky_eps((Q+Q.T)/2)
return f, B
for particle in particles:
particle.state_vector = sde_euler_maruyama_integration(function, time_steps, particle)
return ParticleStateUpdate(
None,
hypothesis,
particle_list=particles,
timestamp=hypothesis.measurement.timestamp)
predict_measurement = ParticleUpdater.predict_measurement
[docs]class GromovFlowKalmanParticleUpdater(GromovFlowParticleUpdater):
"""Gromov Flow Parallel Kalman Particle Updater
This is a wrapper around the :class:`~.GromovFlowParticleUpdater` which
can use a :class:`~.ExtendedKalmanUpdater` or
:class:`~.UnscentedKalmanUpdater` in parallel in order to maintain a state
covariance, as proposed in [#]_. In this implementation, the mean of the
:class:`~.ParticleState` is used the EKF/UKF update.
This should be used in conjunction with the
:class:`~.ParticleFlowKalmanPredictor`.
Parameters
----------
References
----------
.. [#] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang
Exact-Flow Particle Filter" 2012
"""
kalman_updater: KalmanUpdater = Property(
default=None,
doc="Kalman updater to use. Default `None` where a new instance of"
":class:`~.ExtendedKalmanUpdater` will be created utilising the"
"same measurement model.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kalman_updater is None:
self.kalman_updater = ExtendedKalmanUpdater(self.measurement_model)
[docs] def update(self, hypothesis, **kwargs):
particle_update = super().update(hypothesis, **kwargs)
particle_pred = hypothesis.prediction
kalman_hypothesis = copy.copy(hypothesis)
# Convert to GaussianState
kalman_hypothesis.prediction = GaussianStatePrediction(
particle_pred.mean, particle_pred.covar, particle_pred.timestamp)
# Needed for cross covar
kalman_hypothesis.measurement_prediction = None
kalman_update = self.kalman_updater.update(kalman_hypothesis, **kwargs)
return ParticleStateUpdate(
particle_update.state_vector,
hypothesis,
weight=particle_update.weight,
fixed_covar=kalman_update.covar,
timestamp=particle_update.timestamp)
[docs] def predict_measurement(self, state_prediction, *args, **kwargs):
particle_prediction = super().predict_measurement(
state_prediction, *args, **kwargs)
kalman_prediction = self.kalman_updater.predict_measurement(
Prediction.from_state(
state_prediction, state_prediction.state_vector, state_prediction.covar,
target_type=GaussianStatePrediction),
*args, **kwargs)
return ParticleMeasurementPrediction(
state_vector=particle_prediction.state_vector,
weight=state_prediction.weight,
fixed_covar=kalman_prediction.covar,
timestamp=particle_prediction.timestamp)
[docs]class MultiModelParticleUpdater(ParticleUpdater):
"""Particle Updater for the Multi Model system"""
predictor: MultiModelPredictor = Property(
doc="Predictor which hold holds transition matrix")
[docs] def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.MultiModelParticleStateUpdate`
The state posterior
"""
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
update = Update.from_state(
hypothesis.prediction,
hypothesis=hypothesis,
timestamp=hypothesis.measurement.timestamp,
)
transition_matrix = np.asanyarray(self.predictor.transition_matrix)
update.log_weight = update.log_weight \
+ measurement_model.logpdf(hypothesis.measurement, update, **kwargs) \
+ np.log(transition_matrix[update.parent.dynamic_model, update.dynamic_model])
# Normalise the weights
update.log_weight -= logsumexp(update.log_weight)
if self.resampler:
update = self.resampler.resample(update)
return update
[docs]class RaoBlackwellisedParticleUpdater(MultiModelParticleUpdater):
"""Particle Updater for the Raoblackwellised scheme"""
predictor: RaoBlackwellisedMultiModelPredictor = Property(
doc="Predictor which hold holds transition matrix, models and mappings")
[docs] def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.RaoBlackwellisedParticleStateUpdate`
The state posterior
"""
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
update = Update.from_state(
hypothesis.prediction,
log_weight=copy.copy(hypothesis.prediction.log_weight),
hypothesis=hypothesis,
timestamp=hypothesis.measurement.timestamp,
)
update.model_probabilities = self.calculate_model_probabilities(
hypothesis.prediction, self.predictor)
update.log_weight = update.log_weight + measurement_model.logpdf(hypothesis.measurement,
update,
**kwargs)
# Normalise the weights
update.log_weight -= logsumexp(update.log_weight)
if self.resampler:
update = self.resampler.resample(update)
return update
[docs] @staticmethod
def calculate_model_probabilities(prediction, predictor):
"""Calculates the new model probabilities based
on the ones calculated in the previous time step"""
denominator_components = []
# Loop over the previous models m_k-1
for model_index, transition_model in enumerate(predictor.transition_models):
required_space_prior = copy.copy(prediction.parent)
required_space_prior.state_vector = \
required_space_prior.state_vector[predictor.model_mappings[model_index], :]
required_space_pred = copy.copy(prediction)
required_space_pred.state_vector = \
required_space_pred.state_vector[predictor.model_mappings[model_index], :]
prob_position_given_model_and_old_position = transition_model.pdf(
required_space_pred, required_space_prior,
time_interval=prediction.timestamp - prediction.parent.timestamp
)
# Looks up p(m_k|m_k-1)
log_prob_of_transition = np.log(
np.asfarray(predictor.transition_matrix)[model_index, :])
log_product_of_probs = \
np.asfarray(np.log(prob_position_given_model_and_old_position)) \
+ log_prob_of_transition[:, np.newaxis] \
+ np.asfarray(np.log(prediction.model_probabilities)) # p(m_k-1|x_1:k-1)
denominator_components.append(logsumexp(log_product_of_probs, axis=0))
# Calculate the denominator
new_log_probabilities = np.stack(denominator_components)
new_log_probabilities -= logsumexp(new_log_probabilities, axis=0)
return np.exp(new_log_probabilities)