Source code for stonesoup.updater.particle

# -*- coding: utf-8 -*-
import copy
from functools import lru_cache

import numpy as np
from scipy.linalg import inv

from .base import Updater
from .kalman import KalmanUpdater, ExtendedKalmanUpdater
from ..base import Property
from ..functions import cholesky_eps, sde_euler_maruyama_integration
from ..resampler import Resampler
from ..types.numeric import Probability
from ..types.particle import Particle
from ..types.prediction import (
    Prediction, ParticleMeasurementPrediction, GaussianStatePrediction, MeasurementPrediction)
from ..types.update import ParticleStateUpdate, Update

[docs]class ParticleUpdater(Updater): """Particle Updater Perform an update by multiplying particle weights by PDF of measurement model (either :attr:`~.Detection.measurement_model` or :attr:`measurement_model`), and normalising the weights. If provided, a :attr:`resampler` will be used to take a new sample of particles (this is called every time, and it's up to the resampler to decide if resampling is required). """ resampler: Resampler = Property(default=None, doc='Resampler to prevent particle degeneracy')
[docs] def update(self, hypothesis, **kwargs): """Particle Filter update step Parameters ---------- hypothesis : :class:`~.Hypothesis` Hypothesis with predicted state and associated detection used for updating. Returns ------- : :class:`~.ParticleState` The state posterior """ particles = copy.copy(hypothesis.prediction.particles) if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model particles.weight = particles.weight * measurement_model.pdf( hypothesis.measurement, particles, num_samples=len(particles), **kwargs) # Normalise the weights sum_w = np.array(Probability.sum(particles.weight)) particles.weight = particles.weight / sum_w # Resample if self.resampler is not None: particles = self.resampler.resample(particles) return Update.from_state( hypothesis.prediction, particles=particles, hypothesis=hypothesis, timestamp=hypothesis.measurement.timestamp)
[docs] @lru_cache() def predict_measurement(self, state_prediction, measurement_model=None, **kwargs): if measurement_model is None: measurement_model = self.measurement_model new_particles = [] for particle in state_prediction.particles: new_state_vector = measurement_model.function(particle, **kwargs) new_particles.append( Particle(new_state_vector, weight=particle.weight, parent=particle.parent)) return MeasurementPrediction.from_state( state_prediction, particles=new_particles, timestamp=state_prediction.timestamp)
[docs]class GromovFlowParticleUpdater(Updater): """Gromov Flow Particle Updater This is implementation of Gromov method for stochastic particle flow filters [#]_. The Euler Maruyama method is used for integration, over 20 steps using an exponentially increase step size. Parameters ---------- References ---------- .. [#] Daum, Fred & Huang, Jim & Noushin, Arjang. "Generalized Gromov method for stochastic particle flow filters." 2017 """
[docs] def update(self, hypothesis, **kwargs): if hypothesis.measurement.measurement_model is None: measurement_model = self.measurement_model else: measurement_model = hypothesis.measurement.measurement_model num_steps = 20 b = 2 s0 = (b-1) / (b**num_steps - 1) steps = [s0*b**n for n in range(num_steps)] time_steps = np.zeros((len(steps) + 1, )) time_steps[1:] = np.cumsum(steps) P = hypothesis.prediction.covar R = measurement_model.covar() inv_R = inv(R) # Start by making our own copy of the particle before we move them... particles = [copy.copy(particle) for particle in hypothesis.prediction.particles] def function(state, lambda_): try: H = measurement_model.matrix() except AttributeError: H = measurement_model.jacobian(state) # Eq. (12) Ref [1] a = P - lambda_*P@H.T@inv(R + lambda_*H@P@H.T)@H@P b = a @ H.T @ inv_R measurement_particle_state_vector = measurement_model.function(state, **kwargs) f = -b @ (measurement_particle_state_vector - hypothesis.measurement.state_vector) Q = b @ H @ a B = cholesky_eps((Q+Q.T)/2) return f, B for particle in particles: particle.state_vector = sde_euler_maruyama_integration(function, time_steps, particle) return ParticleStateUpdate( particles, hypothesis, timestamp=hypothesis.measurement.timestamp)
predict_measurement = ParticleUpdater.predict_measurement
[docs]class GromovFlowKalmanParticleUpdater(GromovFlowParticleUpdater): """Gromov Flow Parallel Kalman Particle Updater This is a wrapper around the :class:`~.GromovFlowParticleUpdater` which can use a :class:`~.ExtendedKalmanUpdater` or :class:`~.UnscentedKalmanUpdater` in parallel in order to maintain a state covariance, as proposed in [#]_. In this implementation, the mean of the :class:`~.ParticleState` is used the EKF/UKF update. This should be used in conjunction with the :class:`~.ParticleFlowKalmanPredictor`. Parameters ---------- References ---------- .. [#] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang Exact-Flow Particle Filter" 2012 """ kalman_updater: KalmanUpdater = Property( default=None, doc="Kalman updater to use. Default `None` where a new instance of" ":class:`~.ExtendedKalmanUpdater` will be created utilising the" "same measurement model.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.kalman_updater is None: self.kalman_updater = ExtendedKalmanUpdater(self.measurement_model)
[docs] def update(self, hypothesis, **kwargs): particle_update = super().update(hypothesis, **kwargs) particle_pred = hypothesis.prediction kalman_hypothesis = copy.copy(hypothesis) # Convert to GaussianState kalman_hypothesis.prediction = GaussianStatePrediction( particle_pred.state_vector, particle_pred.covar, particle_pred.timestamp) # Needed for cross covar kalman_hypothesis.measurement_prediction = None kalman_update = self.kalman_updater.update(kalman_hypothesis, **kwargs) return ParticleStateUpdate( particle_update.particles, hypothesis, kalman_update.covar, timestamp=particle_update.timestamp)
[docs] def predict_measurement( self, state_prediction, *args, **kwargs): particle_prediction = super().predict_measurement( state_prediction, *args, **kwargs) kalman_prediction = self.kalman_updater.predict_measurement( Prediction.from_state( state_prediction, state_prediction.state_vector, state_prediction.covar, prediction_type=GaussianStatePrediction), *args, **kwargs) return ParticleMeasurementPrediction( particle_prediction.particles, kalman_prediction.covar, timestamp=particle_prediction.timestamp)