Source code for stonesoup.resampler.particle

import numpy as np
from enum import Enum

from .base import Resampler
from ..base import Property
from ..types.state import ParticleState


[docs] class SystematicResampler(Resampler): """ Traditional style resampler for particle filter. Calculates first random point in (0, 1/nparts], then calculates `nparts` points that are equidistantly distributed across the CDF. Complexity of order O(N) where N is the number of resampled particles. """
[docs] def resample(self, particles, nparts=None): """ Resample the particles Parameters ---------- particles : :class:`~.ParticleState` or list of :class:`~.Particle` The particles or particle state to be resampled according to their weights nparts : int The number of particles to be returned from resampling Returns ------- particle state: :class:`~.ParticleState` The particle state after resampling """ if not isinstance(particles, ParticleState): particles = ParticleState(None, particle_list=particles) if nparts is None: nparts = len(particles) log_weights = particles.log_weight weight_order = np.argsort(log_weights, kind='stable') max_log_value = log_weights[weight_order[-1]] with np.errstate(divide='ignore'): cdf = np.log(np.cumsum(np.exp(log_weights[weight_order] - max_log_value))) cdf += max_log_value # Pick random starting point u_i = np.random.uniform(0, 1 / nparts) # Cycle through the cumulative distribution and copy the particle # that pushed the score over the current value u_j = u_i + (1 / nparts) * np.arange(nparts) index = weight_order[np.searchsorted(cdf, np.log(u_j))] new_particles = particles[index] new_particles.log_weight = np.full((nparts, ), np.log(1/nparts)) return new_particles
[docs] class ESSResampler(Resampler): """ This wrapper uses a :class:`~.Resampler` to resample the particles inside an instance of :class:`~.Particles`, but only after checking if this is necessary by comparing the Effective Sample Size (ESS) with a supplied threshold (numeric). Kish's ESS is used, as recommended in Section 3.5 of this tutorial [1]_. References ---------- .. [1] Doucet A., Johansen A.M., 2009, Tutorial on Particle Filtering \ and Smoothing: Fifteen years later, Handbook of Nonlinear Filtering, Vol. 12. """ threshold: float = Property(default=None, doc='Threshold compared with ESS to decide whether to resample. \ Default is number of particles divided by 2, \ set in resample method') resampler: Resampler = Property(default=SystematicResampler(), doc='Resampler to wrap, which is called \ when ESS below threshold')
[docs] def resample(self, particles, nparts=None): """ Parameters ---------- particles : :class:`~.ParticleState` or list of :class:`~.Particle` The particles to be resampled according to their weight nparts : int The number of particles to be returned from resampling Returns ------- particles : :class:`~.ParticleState` The particles, either unchanged or resampled, depending on weight degeneracy """ if not isinstance(particles, ParticleState): particles = ParticleState(None, particle_list=particles) if nparts is None: nparts = len(particles) if self.threshold is None: self.threshold = len(particles) / 2 # If ESS too small, resample if 1 / np.sum(np.exp(2*particles.log_weight)) < self.threshold: return self.resampler.resample(particles, nparts) else: return particles
[docs] class MultinomialResampler(Resampler): """ Traditional style resampler for particle filter. Calculates a random point in (0, 1] individually for each particle, and picks the corresponding particle from the CDF calculated from particle weights. Complexity is of order O(NM) where N and M are the number of resampled and existing particles respectively. """
[docs] def resample(self, particles, nparts=None): """Resample the particles Parameters ---------- particles : :class:`~.ParticleState` or list of :class:`~.Particle` The particles or particle state to be resampled according to their weights nparts : int The number of particles to be returned from resampling Returns ------- particle state: :class:`~.ParticleState` The particle state after resampling """ if not isinstance(particles, ParticleState): particles = ParticleState(None, particle_list=particles) if nparts is None: nparts = len(particles) log_weights = particles.log_weight weight_order = np.argsort(log_weights, kind='stable') max_log_value = log_weights[weight_order[-1]] with np.errstate(divide='ignore'): cdf = np.log(np.cumsum(np.exp(log_weights[weight_order] - max_log_value))) cdf += max_log_value # Pick random points for each of the particles u_j = np.random.rand(nparts) # Pick particles that represent the chosen point from the CDF index = weight_order[np.searchsorted(cdf, np.log(u_j))] new_particles = particles[index] new_particles.log_weight = np.full((nparts, ), np.log(1/nparts)) return new_particles
[docs] class StratifiedResampler(Resampler): """ Traditional style resampler for particle filter. Splits the CDF into N evenly sized subpopulations ('strata'), then independently picks one value from each stratum. Complexity of order O(N). """
[docs] def resample(self, particles, nparts=None): """Resample the particles Parameters ---------- particles : :class:`~.ParticleState` or list of :class:`~.Particle` The particles or particle state to be resampled according to their weights nparts : int The number of particles to be returned from resampling Returns ------- particle state: :class:`~.ParticleState` The particle state after resampling """ if not isinstance(particles, ParticleState): particles = ParticleState(None, particle_list=particles) if nparts is None: nparts = len(particles) log_weights = particles.log_weight weight_order = np.argsort(log_weights, kind='stable') max_log_value = log_weights[weight_order[-1]] with np.errstate(divide='ignore'): cdf = np.log(np.cumsum(np.exp(log_weights[weight_order] - max_log_value))) cdf += max_log_value # Strata lower bounds: s_l = np.arange(nparts) * (1 / nparts) # Independently pick a point in each stratum u_j = np.random.uniform(s_l, s_l + (1 / nparts)) # Pick particles that represent the chosen point from the CDF index = weight_order[np.searchsorted(cdf, np.log(u_j))] new_particles = particles[index] new_particles.log_weight = np.full((nparts, ), np.log(1/nparts)) return new_particles
[docs] class ResidualMethod(Enum): MULTINOMIAL = 'multinomial' SYSTEMATIC = 'systematic' STRATIFIED = 'stratified'
[docs] class ResidualResampler(Resampler): """ Wrapper around a traditional resampler. Any particle, p with weight W >= 1/N, will be resampled floor(W_p) times, providing N_stage_1 = sum(floor(W_p)) resampled particles. The residual weights of each particle are carried over and passed into another resampler, where the remaining N - N_stage_1 particles are resampled from. Should be a more computationally efficient method than resampling all particles from a CDF. Cannot be used to upsample or downsample. """ residual_method: ResidualMethod = Property(default=ResidualMethod.MULTINOMIAL, doc="Method used to resample particles from the " "residuals.")
[docs] def resample(self, particles, nparts=None): """Resample the particles. For ResidualResampler, `nparts` must equal len(`particles`) Parameters ---------- particles : :class:`~.ParticleState` or list of :class:`~.Particle` The particles or particle state to be resampled according to their weights nparts : int The number of particles to be returned from resampling - must equal number of particles from previous step Returns ------- particle state: :class:`~.ParticleState` The particle state after resampling """ if not isinstance(particles, ParticleState): particles = ParticleState(None, particle_list=particles) if nparts is None: nparts = len(particles) elif nparts != len(particles): raise NotImplementedError("This resampler does not currently support up- or down-" "sampling") log_weights = particles.log_weight # Get the true weight of each particle weights = np.exp(log_weights) # **** Stage 1 **** # Calculate the floor floors = np.floor(weights * nparts) # Generate particle index from stage 1 resampling # There might be a better way to do this, seems a complicated way to do a simple task stage_1_index = np.array([index for sublist in [[index]*int(floor) for index, floor in enumerate(floors) if int(floor) != 0] for index in sublist]) # **** Stage 2 **** # Calculate number of particles to be resampled from residuals n_stage_2_parts = nparts - int(sum(floors)) # Check stage 2 is necessary (Necessary in all cases except where all weights = 1/N) if n_stage_2_parts > 0: # Calculate the residuals r_weights = weights - floors/nparts # Normalise residual weights normalised_r_weights = r_weights * 1/sum(r_weights) r_log_weights = np.log(normalised_r_weights) weight_order = np.argsort(r_log_weights, kind='stable') max_log_value = r_log_weights[weight_order[-1]] with np.errstate(divide='ignore'): cdf = np.log(np.cumsum(np.exp(r_log_weights[weight_order] - max_log_value))) cdf += max_log_value if self.residual_method == ResidualMethod.MULTINOMIAL: # Pick random points for each of the particles u_j = np.random.rand(n_stage_2_parts) elif self.residual_method == ResidualMethod.SYSTEMATIC: # Pick random starting point u_i = np.random.uniform(0, 1 / n_stage_2_parts) # Cycle through the cumulative distribution and copy the particle # that pushed the score over the current value u_j = u_i + (1 / n_stage_2_parts) * np.arange(n_stage_2_parts) elif self.residual_method == ResidualMethod.STRATIFIED: # Strata lower bounds: s_l = np.arange(n_stage_2_parts) * (1 / n_stage_2_parts) # Independently pick a point in each stratum u_j = np.random.uniform(s_l, s_l + (1 / n_stage_2_parts)) else: raise ValueError("Invalid string variable given for stage 2 residual_method") # Pick particles that represent the chosen point from the CDF stage_2_index = weight_order[np.searchsorted(cdf, np.log(u_j))] # Combine the indexes from both stages index = np.concatenate([stage_1_index, stage_2_index]) else: index = stage_1_index new_particles = particles[index] new_particles.log_weight = np.full((nparts, ), np.log(1/nparts)) return new_particles