import copy
from functools import lru_cache
import numpy as np
from scipy.linalg import inv
from scipy.special import logsumexp
from .base import Updater
from .kalman import KalmanUpdater, ExtendedKalmanUpdater
from ..base import Property
from ..functions import cholesky_eps, sde_euler_maruyama_integration
from ..predictor.particle import MultiModelPredictor, RaoBlackwellisedMultiModelPredictor
from ..resampler import Resampler
from ..regulariser import Regulariser
from ..types.prediction import (
Prediction, ParticleMeasurementPrediction, GaussianStatePrediction, MeasurementPrediction)
from ..types.update import ParticleStateUpdate, Update
[docs]
class ParticleUpdater(Updater):
"""Particle Updater
Perform an update by multiplying particle weights by PDF of measurement
model (either :attr:`~.Detection.measurement_model` or
:attr:`measurement_model`), and normalising the weights. If provided, a
:attr:`resampler` will be used to take a new sample of particles (this is
called every time, and it's up to the resampler to decide if resampling is
required).
"""
resampler: Resampler = Property(default=None, doc='Resampler to prevent particle degeneracy')
regulariser: Regulariser = Property(default=None, doc="Regulariser to prevent particle "
"impoverishment")
[docs]
def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.ParticleState`
The state posterior
"""
predicted_state = Update.from_state(
state=hypothesis.prediction,
hypothesis=hypothesis,
timestamp=hypothesis.prediction.timestamp
)
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
new_weight = predicted_state.log_weight + measurement_model.logpdf(
hypothesis.measurement, predicted_state, **kwargs)
# Normalise the weights
new_weight -= logsumexp(new_weight)
predicted_state.log_weight = new_weight
# Resample
if self.resampler is not None:
predicted_state = self.resampler.resample(predicted_state)
if self.regulariser is not None:
prior = hypothesis.prediction.parent
predicted_state = self.regulariser.regularise(prior,
predicted_state)
return predicted_state
[docs]
@lru_cache()
def predict_measurement(self, state_prediction, measurement_model=None,
**kwargs):
if measurement_model is None:
measurement_model = self.measurement_model
new_state_vector = measurement_model.function(state_prediction, **kwargs)
return MeasurementPrediction.from_state(
state_prediction, state_vector=new_state_vector, timestamp=state_prediction.timestamp)
[docs]
class GromovFlowParticleUpdater(Updater):
"""Gromov Flow Particle Updater
This is implementation of Gromov method for stochastic particle flow
filters [#]_. The Euler Maruyama method is used for integration, over 20
steps using an exponentially increase step size.
Parameters
----------
References
----------
.. [#] Daum, Fred & Huang, Jim & Noushin, Arjang. "Generalized Gromov
method for stochastic particle flow filters." 2017
"""
[docs]
def update(self, hypothesis, **kwargs):
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
num_steps = 20
b = 2
s0 = (b-1) / (b**num_steps - 1)
steps = [s0*b**n for n in range(num_steps)]
time_steps = np.zeros((len(steps) + 1, ))
time_steps[1:] = np.cumsum(steps)
P = hypothesis.prediction.covar
R = measurement_model.covar()
inv_R = inv(R)
# Start by making our own copy of the particle before we move them...
particles = [copy.copy(particle) for particle in hypothesis.prediction.particles]
def function(state, lambda_):
try:
H = measurement_model.matrix()
except AttributeError:
H = measurement_model.jacobian(state)
# Eq. (12) Ref [1]
a = P - lambda_*P@H.T@inv(R + lambda_*H@P@H.T)@H@P
b = a @ H.T @ inv_R
measurement_particle_state_vector = measurement_model.function(state, **kwargs)
f = -b @ (measurement_particle_state_vector - hypothesis.measurement.state_vector)
Q = b @ H @ a
B = cholesky_eps((Q+Q.T)/2)
return f, B
for particle in particles:
particle.state_vector = sde_euler_maruyama_integration(function, time_steps, particle)
return ParticleStateUpdate(
None,
hypothesis,
particle_list=particles,
timestamp=hypothesis.measurement.timestamp)
predict_measurement = ParticleUpdater.predict_measurement
[docs]
class GromovFlowKalmanParticleUpdater(GromovFlowParticleUpdater):
"""Gromov Flow Parallel Kalman Particle Updater
This is a wrapper around the :class:`~.GromovFlowParticleUpdater` which
can use a :class:`~.ExtendedKalmanUpdater` or
:class:`~.UnscentedKalmanUpdater` in parallel in order to maintain a state
covariance, as proposed in [#]_. In this implementation, the mean of the
:class:`~.ParticleState` is used the EKF/UKF update.
This should be used in conjunction with the
:class:`~.ParticleFlowKalmanPredictor`.
Parameters
----------
References
----------
.. [#] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang
Exact-Flow Particle Filter" 2012
"""
kalman_updater: KalmanUpdater = Property(
default=None,
doc="Kalman updater to use. Default `None` where a new instance of"
":class:`~.ExtendedKalmanUpdater` will be created utilising the"
"same measurement model.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kalman_updater is None:
self.kalman_updater = ExtendedKalmanUpdater(self.measurement_model)
[docs]
def update(self, hypothesis, **kwargs):
particle_update = super().update(hypothesis, **kwargs)
particle_pred = hypothesis.prediction
kalman_hypothesis = copy.copy(hypothesis)
# Convert to GaussianState
kalman_hypothesis.prediction = GaussianStatePrediction(
particle_pred.mean, particle_pred.covar, particle_pred.timestamp)
# Needed for cross covar
kalman_hypothesis.measurement_prediction = None
kalman_update = self.kalman_updater.update(kalman_hypothesis, **kwargs)
return ParticleStateUpdate(
particle_update.state_vector,
hypothesis,
weight=particle_update.weight,
fixed_covar=kalman_update.covar,
timestamp=particle_update.timestamp)
[docs]
def predict_measurement(self, state_prediction, *args, **kwargs):
particle_prediction = super().predict_measurement(
state_prediction, *args, **kwargs)
kalman_prediction = self.kalman_updater.predict_measurement(
Prediction.from_state(
state_prediction, state_prediction.state_vector, state_prediction.covar,
target_type=GaussianStatePrediction),
*args, **kwargs)
return ParticleMeasurementPrediction(
state_vector=particle_prediction.state_vector,
weight=state_prediction.weight,
fixed_covar=kalman_prediction.covar,
timestamp=particle_prediction.timestamp)
[docs]
class MultiModelParticleUpdater(ParticleUpdater):
"""Particle Updater for the Multi Model system"""
predictor: MultiModelPredictor = Property(
doc="Predictor which hold holds transition matrix")
[docs]
def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.MultiModelParticleStateUpdate`
The state posterior
"""
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
update = Update.from_state(
hypothesis.prediction,
hypothesis=hypothesis,
timestamp=hypothesis.measurement.timestamp,
)
transition_matrix = np.asanyarray(self.predictor.transition_matrix)
update.log_weight = update.log_weight \
+ measurement_model.logpdf(hypothesis.measurement, update, **kwargs) \
+ np.log(transition_matrix[update.parent.dynamic_model, update.dynamic_model])
# Normalise the weights
update.log_weight -= logsumexp(update.log_weight)
if self.resampler:
update = self.resampler.resample(update)
return update
[docs]
class RaoBlackwellisedParticleUpdater(MultiModelParticleUpdater):
"""Particle Updater for the Raoblackwellised scheme"""
predictor: RaoBlackwellisedMultiModelPredictor = Property(
doc="Predictor which hold holds transition matrix, models and mappings")
[docs]
def update(self, hypothesis, **kwargs):
"""Particle Filter update step
Parameters
----------
hypothesis : :class:`~.Hypothesis`
Hypothesis with predicted state and associated detection used for
updating.
Returns
-------
: :class:`~.RaoBlackwellisedParticleStateUpdate`
The state posterior
"""
if hypothesis.measurement.measurement_model is None:
measurement_model = self.measurement_model
else:
measurement_model = hypothesis.measurement.measurement_model
update = Update.from_state(
hypothesis.prediction,
log_weight=copy.copy(hypothesis.prediction.log_weight),
hypothesis=hypothesis,
timestamp=hypothesis.measurement.timestamp,
)
update.model_probabilities = self.calculate_model_probabilities(
hypothesis.prediction, self.predictor)
update.log_weight = update.log_weight + measurement_model.logpdf(hypothesis.measurement,
update,
**kwargs)
# Normalise the weights
update.log_weight -= logsumexp(update.log_weight)
if self.resampler:
update = self.resampler.resample(update)
return update
[docs]
@staticmethod
def calculate_model_probabilities(prediction, predictor):
"""Calculates the new model probabilities based
on the ones calculated in the previous time step"""
denominator_components = []
# Loop over the previous models m_k-1
for model_index, transition_model in enumerate(predictor.transition_models):
required_space_prior = copy.copy(prediction.parent)
required_space_prior.state_vector = \
required_space_prior.state_vector[predictor.model_mappings[model_index], :]
required_space_pred = copy.copy(prediction)
required_space_pred.state_vector = \
required_space_pred.state_vector[predictor.model_mappings[model_index], :]
prob_position_given_model_and_old_position = transition_model.pdf(
required_space_pred, required_space_prior,
time_interval=prediction.timestamp - prediction.parent.timestamp
)
# Looks up p(m_k|m_k-1)
log_prob_of_transition = np.log(
np.asfarray(predictor.transition_matrix)[model_index, :])
log_product_of_probs = \
np.asfarray(np.log(prob_position_given_model_and_old_position)) \
+ log_prob_of_transition[:, np.newaxis] \
+ np.asfarray(np.log(prediction.model_probabilities)) # p(m_k-1|x_1:k-1)
denominator_components.append(logsumexp(log_product_of_probs, axis=0))
# Calculate the denominator
new_log_probabilities = np.stack(denominator_components)
new_log_probabilities -= logsumexp(new_log_probabilities, axis=0)
return np.exp(new_log_probabilities)
[docs]
class BernoulliParticleUpdater(ParticleUpdater):
"""Bernoulli Particle Filter Updater class
An implementation of a particle filter updater utilising the
Bernoulli filter formulation that estimates the spatial distribution
of a single target and estimates its existence, as described in [1]_.
Due to the nature of the Bernoulli particle
filter prediction process, resampling is required at every
time step to reduce the number of particles back down to the
desired size.
References
----------
.. [1] Ristic, Branko & Vo, Ba-Toung & Vo, Ba-Ngu & Farina, Alfonso, A
Tutorial on Bernoulli Filters: Theory, Implementation and Applications,
2013, IEEE Transactions on Signal Processing, 61(13), 3406-3430.
"""
birth_probability: float = Property(
default=0.01,
doc="Probability of target birth.")
survival_probability: float = Property(
default=0.98,
doc="Probability of target survival")
clutter_rate: int = Property(
default=1,
doc="Average number of clutter measurements per time step. Implementation assumes number "
"of clutter measurements follows a Poisson distribution")
clutter_distribution: float = Property(
default=None,
doc="Distribution used to describe clutter measurements. This is usually assumed uniform "
"in the measurement space.")
detection_probability: float = Property(
default=None,
doc="Probability of detection assigned to the generated samples of the birth distribution."
" If None, it will inherit from the input.")
nsurv_particles: float = Property(
default=None,
doc="Number of particles describing the surviving distribution, which will be output from "
"the update algorithm.")
[docs]
def update(self, hypotheses, **kwargs):
"""Bernoulli Particle Filter update step
Parameters
----------
hypotheses : :class:`~.MultipleHypothesis`
Hypotheses containing the sequence of single hypotheses
that contain the prediction and unassociated measurements.
Returns
-------
: :class:`~.BernoulliParticleStateUpdate`
The state posterior.
"""
# copy prediction
prediction = hypotheses.single_hypotheses[0].prediction
# updated_state = copy.copy(prediction)
updated_state = Update.from_state(
state=prediction,
hypothesis=hypotheses,
timestamp=prediction.timestamp
)
if any(hypotheses):
detections = [single_hypothesis.measurement
for single_hypothesis in hypotheses.single_hypotheses]
# Evaluate measurement likelihood and approximate integrals
log_meas_likelihood = []
delta_part2 = []
log_detection_probability = np.full(len(prediction),
np.log(self.detection_probability))
for detection in detections:
measurement_model = detection.measurement_model or self.measurement_model
log_meas_likelihood.append(measurement_model.logpdf(detection, updated_state))
delta_part2.append(self._log_space_product(
log_detection_probability,
log_meas_likelihood[-1]
- np.log(self.clutter_rate * self.clutter_distribution)
+ updated_state.log_weight))
delta = \
np.exp(self._log_space_product(log_detection_probability,
updated_state.log_weight)) \
- np.exp(logsumexp(delta_part2))
updated_state.existence_probability = \
(1 - delta) \
/ (1 - updated_state.existence_probability * delta) \
* updated_state.existence_probability
updated_state.log_weight = \
np.logaddexp(log_detection_probability
+ logsumexp(log_meas_likelihood, axis=0)
- np.log(self.clutter_rate * self.clutter_distribution),
np.log(1 - self.detection_probability)) \
+ updated_state.log_weight
# Normalise weights
updated_state.log_weight -= logsumexp(updated_state.log_weight)
# Resampling
if self.resampler is not None:
updated_state = self.resampler.resample(updated_state,
self.nsurv_particles)
if any(hypotheses):
# Regularisation
if self.regulariser is not None:
updated_state = self.regulariser.regularise(updated_state.parent,
updated_state)
return updated_state
@staticmethod
def _log_space_product(A, B):
if A.ndim < 2:
A = np.atleast_2d(A)
if B.ndim < 2:
B = np.atleast_2d(B).T
Astack = np.stack([A] * B.shape[1]).transpose(1, 0, 2)
Bstack = np.stack([B] * A.shape[0]).transpose(0, 2, 1)
return np.squeeze(logsumexp(Astack + Bstack, axis=2))