Source code for stonesoup.updater.particle

import copy
from functools import lru_cache
from typing import Callable
import warnings

import numpy as np
from scipy.linalg import inv
from scipy.special import logsumexp

from .base import Updater
from .kalman import KalmanUpdater, ExtendedKalmanUpdater
from ..base import Property
from ..functions import cholesky_eps, sde_euler_maruyama_integration
from ..predictor.particle import MultiModelPredictor, RaoBlackwellisedMultiModelPredictor
from ..resampler import Resampler
from ..regulariser import Regulariser
from ..types.numeric import Probability
from ..types.prediction import (
    Prediction, ParticleMeasurementPrediction, GaussianStatePrediction, MeasurementPrediction)
from ..types.update import ParticleStateUpdate, Update


[docs] class ParticleUpdater(Updater): """Particle Updater Perform an update by multiplying particle weights by PDF of measurement model (either :attr:`~.Detection.measurement_model` or :attr:`measurement_model`), and normalising the weights. If provided, a :attr:`resampler` will be used to take a new sample of particles (this is called every time, and it's up to the resampler to decide if resampling is required). """ resampler: Resampler = Property(default=None, doc='Resampler to prevent particle degeneracy') regulariser: Regulariser = Property( default=None, doc='Regulariser to prevent particle impoverishment. The regulariser ' 'is normally used after resampling. If a :class:`~.Resampler` is defined, ' 'then regularisation will only take place if the particles have been ' 'resampled. If the :class:`~.Resampler` is not defined but a ' ':class:`~.Regulariser` is, then regularisation will be conducted under the ' 'assumption that the user intends for this to occur.') constraint_func: Callable = Property( default=None, doc="Callable, user defined function for applying " "constraints to the states. This is done by setting the weights " "of particles to 0 for particles that are not correctly constrained. " "This function provides indices of the unconstrained particles and " "should accept a :class:`~.ParticleState` object and return an array-like " "object of logical indices. " ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.resampler is None and self.regulariser is not None: warnings.warn('`regulariser` has been defined but a `resampler` has not. This' ' is not normal procedure.')
[docs] def update(self, hypothesis, **kwargs): """Particle Filter update step Parameters ---------- hypothesis : :class:`~.Hypothesis` Hypothesis with predicted state and associated detection used for updating. Returns ------- : :class:`~.ParticleState` The state posterior """ predicted_state = Update.from_state( state=hypothesis.prediction, hypothesis=hypothesis, timestamp=hypothesis.prediction.timestamp ) if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model new_weight = predicted_state.log_weight + measurement_model.logpdf( hypothesis.measurement, predicted_state, **kwargs) # Apply constraints if defined if self.constraint_func is not None: part_indx = self.constraint_func(predicted_state) new_weight[part_indx] = -1*np.inf # Normalise the weights new_weight -= logsumexp(new_weight) predicted_state.log_weight = new_weight # Resample resample_flag = True if self.resampler is not None: resampled_state = self.resampler.resample(predicted_state) if resampled_state == predicted_state: resample_flag = False predicted_state = resampled_state if self.regulariser is not None and resample_flag: predicted_state = self.regulariser.regularise(predicted_state.parent, predicted_state) return predicted_state
[docs] @lru_cache() def predict_measurement(self, state_prediction, measurement_model=None, **kwargs): if measurement_model is None: measurement_model = self.measurement_model new_state_vector = measurement_model.function(state_prediction, **kwargs) return MeasurementPrediction.from_state( state_prediction, state_vector=new_state_vector, timestamp=state_prediction.timestamp)
[docs] class GromovFlowParticleUpdater(Updater): """Gromov Flow Particle Updater This is implementation of Gromov method for stochastic particle flow filters [#]_. The Euler Maruyama method is used for integration, over 20 steps using an exponentially increase step size. Parameters ---------- References ---------- .. [#] Daum, Fred & Huang, Jim & Noushin, Arjang. "Generalized Gromov method for stochastic particle flow filters." 2017 """
[docs] def update(self, hypothesis, **kwargs): if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model num_steps = 20 b = 2 s0 = (b-1) / (b**num_steps - 1) steps = [s0*b**n for n in range(num_steps)] time_steps = np.zeros((len(steps) + 1, )) time_steps[1:] = np.cumsum(steps) P = hypothesis.prediction.covar R = measurement_model.covar() inv_R = inv(R) # Start by making our own copy of the particle before we move them... particles = [copy.copy(particle) for particle in hypothesis.prediction.particles] def function(state, lambda_): try: H = measurement_model.matrix() except AttributeError: H = measurement_model.jacobian(state) # Eq. (12) Ref [1] a = P - lambda_*P@H.T@inv(R + lambda_*H@P@H.T)@H@P b = a @ H.T @ inv_R measurement_particle_state_vector = measurement_model.function(state, **kwargs) f = -b @ (measurement_particle_state_vector - hypothesis.measurement.state_vector) Q = b @ H @ a B = cholesky_eps((Q+Q.T)/2) return f, B for particle in particles: particle.state_vector = sde_euler_maruyama_integration(function, time_steps, particle) return ParticleStateUpdate( None, hypothesis, particle_list=particles, timestamp=hypothesis.measurement.timestamp)
predict_measurement = ParticleUpdater.predict_measurement
[docs] class GromovFlowKalmanParticleUpdater(GromovFlowParticleUpdater): """Gromov Flow Parallel Kalman Particle Updater This is a wrapper around the :class:`~.GromovFlowParticleUpdater` which can use a :class:`~.ExtendedKalmanUpdater` or :class:`~.UnscentedKalmanUpdater` in parallel in order to maintain a state covariance, as proposed in [#]_. In this implementation, the mean of the :class:`~.ParticleState` is used the EKF/UKF update. This should be used in conjunction with the :class:`~.ParticleFlowKalmanPredictor`. Parameters ---------- References ---------- .. [#] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang Exact-Flow Particle Filter" 2012 """ kalman_updater: KalmanUpdater = Property( default=None, doc="Kalman updater to use. Default `None` where a new instance of" ":class:`~.ExtendedKalmanUpdater` will be created utilising the" "same measurement model.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.kalman_updater is None: self.kalman_updater = ExtendedKalmanUpdater(self.measurement_model)
[docs] def update(self, hypothesis, **kwargs): particle_update = super().update(hypothesis, **kwargs) particle_pred = hypothesis.prediction kalman_hypothesis = copy.copy(hypothesis) # Convert to GaussianState kalman_hypothesis.prediction = GaussianStatePrediction( particle_pred.mean, particle_pred.covar, particle_pred.timestamp) # Needed for cross covar kalman_hypothesis.measurement_prediction = None kalman_update = self.kalman_updater.update(kalman_hypothesis, **kwargs) return ParticleStateUpdate( particle_update.state_vector, hypothesis, weight=particle_update.weight, fixed_covar=kalman_update.covar, timestamp=particle_update.timestamp)
[docs] def predict_measurement(self, state_prediction, *args, **kwargs): particle_prediction = super().predict_measurement( state_prediction, *args, **kwargs) kalman_prediction = self.kalman_updater.predict_measurement( Prediction.from_state( state_prediction, state_prediction.state_vector, state_prediction.covar, target_type=GaussianStatePrediction), *args, **kwargs) return ParticleMeasurementPrediction( state_vector=particle_prediction.state_vector, weight=state_prediction.weight, fixed_covar=kalman_prediction.covar, timestamp=particle_prediction.timestamp)
[docs] class MultiModelParticleUpdater(ParticleUpdater): """Particle Updater for the Multi Model system""" predictor: MultiModelPredictor = Property( doc="Predictor which hold holds transition matrix")
[docs] def update(self, hypothesis, **kwargs): """Particle Filter update step Parameters ---------- hypothesis : :class:`~.Hypothesis` Hypothesis with predicted state and associated detection used for updating. Returns ------- : :class:`~.MultiModelParticleStateUpdate` The state posterior """ if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model update = Update.from_state( hypothesis.prediction, hypothesis=hypothesis, timestamp=hypothesis.measurement.timestamp, ) transition_matrix = np.asanyarray(self.predictor.transition_matrix) update.log_weight = update.log_weight \ + measurement_model.logpdf(hypothesis.measurement, update, **kwargs) \ + np.log(transition_matrix[update.parent.dynamic_model, update.dynamic_model]) # Normalise the weights update.log_weight -= logsumexp(update.log_weight) if self.resampler: update = self.resampler.resample(update) return update
[docs] class RaoBlackwellisedParticleUpdater(MultiModelParticleUpdater): """Particle Updater for the Raoblackwellised scheme""" predictor: RaoBlackwellisedMultiModelPredictor = Property( doc="Predictor which hold holds transition matrix, models and mappings")
[docs] def update(self, hypothesis, **kwargs): """Particle Filter update step Parameters ---------- hypothesis : :class:`~.Hypothesis` Hypothesis with predicted state and associated detection used for updating. Returns ------- : :class:`~.RaoBlackwellisedParticleStateUpdate` The state posterior """ if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model update = Update.from_state( hypothesis.prediction, log_weight=copy.copy(hypothesis.prediction.log_weight), hypothesis=hypothesis, timestamp=hypothesis.measurement.timestamp, ) update.model_probabilities = self.calculate_model_probabilities( hypothesis.prediction, self.predictor) update.log_weight = update.log_weight + measurement_model.logpdf(hypothesis.measurement, update, **kwargs) # Normalise the weights update.log_weight -= logsumexp(update.log_weight) if self.resampler: update = self.resampler.resample(update) return update
[docs] @staticmethod def calculate_model_probabilities(prediction, predictor): """Calculates the new model probabilities based on the ones calculated in the previous time step""" denominator_components = [] # Loop over the previous models m_k-1 for model_index, transition_model in enumerate(predictor.transition_models): required_space_prior = copy.copy(prediction.parent) required_space_prior.state_vector = \ required_space_prior.state_vector[predictor.model_mappings[model_index], :] required_space_pred = copy.copy(prediction) required_space_pred.state_vector = \ required_space_pred.state_vector[predictor.model_mappings[model_index], :] prob_position_given_model_and_old_position = transition_model.pdf( required_space_pred, required_space_prior, time_interval=prediction.timestamp - prediction.parent.timestamp ) # Looks up p(m_k|m_k-1) log_prob_of_transition = np.log( np.asarray(predictor.transition_matrix)[model_index, :], dtype=np.float64) log_product_of_probs = \ np.asarray(np.log(prob_position_given_model_and_old_position), dtype=np.float64) \ + log_prob_of_transition[:, np.newaxis] \ + np.asarray(np.log(prediction.model_probabilities), dtype=np.float64) denominator_components.append(logsumexp(log_product_of_probs, axis=0)) # Calculate the denominator new_log_probabilities = np.stack(denominator_components) new_log_probabilities -= logsumexp(new_log_probabilities, axis=0) return np.exp(new_log_probabilities)
[docs] class BernoulliParticleUpdater(ParticleUpdater): """Bernoulli Particle Filter Updater class An implementation of a particle filter updater utilising the Bernoulli filter formulation that estimates the spatial distribution of a single target and estimates its existence, as described in [1]_. Due to the nature of the Bernoulli particle filter prediction process, resampling is required at every time step to reduce the number of particles back down to the desired size. References ---------- .. [1] Ristic, Branko & Vo, Ba-Toung & Vo, Ba-Ngu & Farina, Alfonso, A Tutorial on Bernoulli Filters: Theory, Implementation and Applications, 2013, IEEE Transactions on Signal Processing, 61(13), 3406-3430. """ birth_probability: float = Property( default=0.01, doc="Probability of target birth.") survival_probability: float = Property( default=0.98, doc="Probability of target survival") clutter_rate: int = Property( default=1, doc="Average number of clutter measurements per time step. Implementation assumes number " "of clutter measurements follows a Poisson distribution") clutter_distribution: float = Property( default=None, doc="Distribution used to describe clutter measurements. This is usually assumed uniform " "in the measurement space.") detection_probability: float = Property( default=None, doc="Probability of detection assigned to the generated samples of the birth distribution." " If None, it will inherit from the input.") nsurv_particles: float = Property( default=None, doc="Number of particles describing the surviving distribution, which will be output from " "the update algorithm.")
[docs] def update(self, hypotheses, **kwargs): """Bernoulli Particle Filter update step Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` Hypotheses containing the sequence of single hypotheses that contain the prediction and unassociated measurements. Returns ------- : :class:`~.BernoulliParticleStateUpdate` The state posterior. """ # copy prediction prediction = hypotheses.single_hypotheses[0].prediction # updated_state = copy.copy(prediction) updated_state = Update.from_state( state=prediction, hypothesis=hypotheses, timestamp=prediction.timestamp ) if any(hypotheses): detections = [single_hypothesis.measurement for single_hypothesis in hypotheses.single_hypotheses] # Evaluate measurement likelihood and approximate integrals log_meas_likelihood = [] delta_part2 = [] log_detection_probability = np.full(len(prediction), np.log(self.detection_probability)) for detection in detections: measurement_model = detection.measurement_model or self.measurement_model log_meas_likelihood.append(measurement_model.logpdf(detection, updated_state)) delta_part2.append(self._log_space_product( log_detection_probability, log_meas_likelihood[-1] - np.log(self.clutter_rate * self.clutter_distribution) + updated_state.log_weight)) delta = \ np.exp(self._log_space_product(log_detection_probability, updated_state.log_weight)) \ - np.exp(logsumexp(delta_part2)) updated_state.existence_probability = \ (1 - delta) \ / (1 - updated_state.existence_probability * delta) \ * updated_state.existence_probability updated_state.log_weight = \ np.logaddexp(log_detection_probability + logsumexp(log_meas_likelihood, axis=0) - np.log(self.clutter_rate * self.clutter_distribution), np.log(1 - self.detection_probability)) \ + updated_state.log_weight # Normalise weights updated_state.log_weight -= logsumexp(updated_state.log_weight) # Resampling if self.resampler is not None: updated_state = self.resampler.resample(updated_state, self.nsurv_particles) if any(hypotheses): # Regularisation if self.regulariser is not None: updated_state = self.regulariser.regularise(updated_state.parent, updated_state) return updated_state
@staticmethod def _log_space_product(A, B): if A.ndim < 2: A = np.atleast_2d(A) if B.ndim < 2: B = np.atleast_2d(B).T Astack = np.stack([A] * B.shape[1]).transpose(1, 0, 2) Bstack = np.stack([B] * A.shape[0]).transpose(0, 2, 1) return np.squeeze(logsumexp(Astack + Bstack, axis=2))
[docs] class SMCPHDUpdater(ParticleUpdater): """Sequential Monte Carlo Probability Hypothesis Density (SMC-PHD) Updater class An implementation of a particle updater that estimates only the first-order moment (i.e. the Probability Hypothesis Density) of the multi-target state density based on [#phd1]_ and [#phd2]_. .. note:: - It is assumed that the proposal distribution is the same as the dynamics - Target "spawning" is not implemented Parameters ---------- References ---------- .. [#phd1] Ba-Ngu Vo, S. Singh and A. Doucet, "Sequential monte carlo implementation of the phd filter for multi-target tracking," Sixth International Conference of Information Fusion, 2003. Proceedings of the, Cairns, QLD, Australia, 2003, pp. 792-799, doi: 10.1109/ICIF.2003.177320. .. [#phd2] P. Horridge and S. Maskell, “Using a probabilistic hypothesis density filter to confirm tracks in a multi-target environment,” in 2011 Jahrestagung der Gesellschaft fr Informatik, October 2011. """ prob_detect: Probability = Property( default=Probability(0.85), doc="Target Detection Probability") clutter_intensity: float = Property( doc="Average number of clutter measurements per time step, per unit volume") num_samples: int = Property( default=None, doc="The number of particles to be output by the updater, after resampling. If the " "corresponding predictor has been configured in ``'expansion'`` mode, users should set" "this to the number of particles they want to output, otherwise the number of " "particles will continuously grow. Default is ``None``, which will output the same " "number of particles as the input prediction.")
[docs] def update(self, hypotheses, **kwargs): """ SMC-PHD update step Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` A container of :class:`~SingleHypothesis` objects. All hypotheses are assumed to have the same prediction (and hence same timestamp). Returns ------- : :class:`~.ParticleStateUpdate` The state posterior """ prediction = hypotheses[0].prediction num_samples = len(prediction) if self.num_samples is None else self.num_samples # Calculate w^{n,i} Eq. (20) of [#phd2] log_weights_per_hyp = self.get_log_weights_per_hypothesis(hypotheses) # Update weights Eq. (8) of [phd1] # w_k^i = \sum_{z \in Z_k}{w^{n,i}}, where i is the index of z in Z_k log_post_weights = logsumexp(log_weights_per_hyp, axis=1) # Apply constraints if defined if self.constraint_func is not None: part_indx = self.constraint_func(prediction) log_post_weights[part_indx] = -1 * np.inf # Create the updated state updated_state = Update.from_state( state=prediction, hypothesis=hypotheses, timestamp=prediction.timestamp ) # Resample if self.resampler is not None: # Normalise weights log_num_targets = logsumexp(log_post_weights) # N_{k|k} updated_state.log_weight = log_post_weights - log_num_targets # Resample updated_state = self.resampler.resample(updated_state, num_samples) # De-normalise updated_state.log_weight += log_num_targets return updated_state
[docs] def get_log_weights_per_hypothesis(self, hypotheses): """Calculate the log particle weights per hypothesis Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` A container of :class:`~SingleHypothesis` objects. All hypotheses are assumed to have the same prediction (and hence same timestamp). Returns ------- : :class:`~numpy.ndarray` The log weights per hypothesis, where the first dimension is the number of particles and the second dimension is the number of hypotheses. The first hypothesis (column) is always the missed detection hypothesis. """ prediction = hypotheses[0].prediction detections = [hypothesis.measurement for hypothesis in hypotheses if hypothesis] num_samples = prediction.state_vector.shape[1] # Compute g(z|x) matrix as in [#phd1] g = self._get_measurement_loglikelihoods(prediction, detections) # Calculate w^{n,i} Eq. (20) of [#phd2] Ck = np.log(self.prob_detect) + g + prediction.log_weight[:, np.newaxis] C = logsumexp(Ck, axis=0) k = np.log(self.clutter_intensity) C_plus = np.logaddexp(C, k) log_weights_per_hyp = np.full((num_samples, len(detections) + 1), -np.inf) log_weights_per_hyp[:, 0] = np.log(1 - self.prob_detect) + prediction.log_weight if len(detections): log_weights_per_hyp[:, 1:] = Ck - C_plus return log_weights_per_hyp
def _get_measurement_loglikelihoods(self, prediction, detections): num_samples = prediction.state_vector.shape[1] # Compute g(z|x) matrix as in [1] g = np.zeros((num_samples, len(detections))) for i, detection in enumerate(detections): measurement_model = self._check_measurement_model(detection.measurement_model) g[:, i] = measurement_model.logpdf(detection, prediction, noise=True) return g