import copy
from typing import Sequence
import numpy as np
from scipy.special import logsumexp
from ordered_set import OrderedSet
from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..models.transition import TransitionModel
from ..types.prediction import Prediction
from ..types.state import GaussianState
from ..sampler import Sampler
from ..types.array import StateVectors
[docs]
class ParticlePredictor(Predictor):
"""ParticlePredictor class
An implementation of a Particle Filter predictor.
"""
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.ParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
# Compute time_interval
try:
time_interval = timestamp - prior.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
time_interval = None
new_state_vector = self.transition_model.function(
prior,
noise=True,
time_interval=time_interval,
**kwargs)
return Prediction.from_state(prior,
parent=prior,
state_vector=new_state_vector,
timestamp=timestamp,
transition_model=self.transition_model)
[docs]
class ParticleFlowKalmanPredictor(ParticlePredictor):
"""Gromov Flow Parallel Kalman Particle Predictor
This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which
can use a :class:`~.ExtendedKalmanPredictor` or
:class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a
state covariance, as proposed in [1]_.
This should be used in conjunction with the
:class:`~.ParticleFlowKalmanUpdater`.
Parameters
----------
References
----------
.. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang
Exact-Flow Particle Filter" 2012
"""
kalman_predictor: KalmanPredictor = Property(
default=None,
doc="Kalman predictor to use. Default `None` where a new instance of"
":class:`~.ExtendedKalmanPredictor` will be created utilising the"
"same transition model.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kalman_predictor is None:
self.kalman_predictor = ExtendedKalmanPredictor(
self.transition_model)
[docs]
def predict(self, prior, *args, **kwargs):
particle_prediction = super().predict(prior, *args, **kwargs)
kalman_prediction = self.kalman_predictor.predict(
GaussianState(prior.state_vector, prior.covar, prior.timestamp),
*args, **kwargs)
return Prediction.from_state(prior,
state_vector=particle_prediction.state_vector,
log_weight=particle_prediction.log_weight,
timestamp=particle_prediction.timestamp,
fixed_covar=kalman_prediction.covar,
transition_model=self.transition_model)
[docs]
class MultiModelPredictor(Predictor):
"""MultiModelPredictor class
An implementation of a Particle Filter predictor utilising multiple models.
"""
transition_model = None
transition_models: Sequence[TransitionModel] = Property(
doc="Transition models used to for particle transition, selected by model index on "
"particle. Models dimensions can be subset of the overall state space, by "
"using :attr:`model_mappings`."
)
transition_matrix: np.ndarray = Property(
doc="n-model by n-model transition matrix."
)
model_mappings: Sequence[Sequence[int]] = Property(
doc="Sequence of mappings associated with each transition model. This enables mapping "
"between model and state space, enabling use of models that may have different "
"dimensions (e.g. velocity or acceleration). Parts of the state that aren't mapped "
"are set to zero.")
@property
def probabilities(self):
return np.cumsum(self.transition_matrix, axis=1)
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.MultiModelParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
new_particle_state = Prediction.from_state(
prior,
state_vector=copy.copy(prior.state_vector),
parent=prior,
dynamic_model=copy.copy(prior.dynamic_model),
timestamp=timestamp)
for model_index, transition_model in enumerate(self.transition_models):
# Change the value of the dynamic value randomly according to the defined
# transition matrix
current_model_indices = prior.dynamic_model == model_index
current_model_count = np.count_nonzero(current_model_indices)
if current_model_count == 0:
continue
new_dynamic_models = np.searchsorted(
self.probabilities[model_index],
np.random.random(size=current_model_count))
new_state_vector = self.apply_model(
prior[current_model_indices], transition_model, timestamp,
self.model_mappings[model_index], noise=True, **kwargs)
new_particle_state.state_vector[:, current_model_indices] = new_state_vector
new_particle_state.dynamic_model[current_model_indices] = new_dynamic_models
return new_particle_state
@staticmethod
def apply_model(prior, transition_model, timestamp, model_mapping, **kwargs):
# Based on given position mapping create a new state vector that contains only the
# required states
orig_ndim = prior.ndim
prior.state_vector = prior.state_vector[model_mapping, :]
new_state_vector = transition_model.function(
prior, time_interval=timestamp - prior.timestamp, **kwargs)
# Calculate the indices removed from the state vector to become compatible with the
# dynamic model
for j in range(orig_ndim):
if j not in model_mapping:
new_state_vector = np.insert(new_state_vector, j, 0, axis=0)
return new_state_vector
[docs]
class RaoBlackwellisedMultiModelPredictor(MultiModelPredictor):
"""Rao-Blackwellised Multi Model Predictor class
An implementation of a Particle Filter predictor utilising multiple models, with per
particle model probabilities.
"""
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.RaoBlackwellisedParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
new_particle_state = Prediction.from_state(
prior,
state_vector=copy.copy(prior.state_vector),
parent=prior,
timestamp=timestamp)
# Change the value of the dynamic value randomly according to the
# matrix
new_dynamic_models = np.argmax(
prior.model_probabilities.astype(float).cumsum(0) > np.random.rand(len(prior)),
axis=0)
for model_index, transition_model in enumerate(self.transition_models):
new_model_indices = new_dynamic_models == model_index
if np.count_nonzero(new_model_indices) == 0:
continue
new_state_vector = self.apply_model(
prior[new_model_indices], transition_model, timestamp,
self.model_mappings[model_index], noise=True, **kwargs)
new_particle_state.state_vector[:, new_model_indices] = new_state_vector
return new_particle_state
[docs]
class BernoulliParticlePredictor(ParticlePredictor):
"""Bernoulli Particle Filter Predictor class
An implementation of a particle filter predictor utilising the Bernoulli
filter formulation that estimates the spatial distribution of a single
target and estimates its existence, as described in [2]_.
This should be used in conjunction with the
:class:`~.BernoulliParticleUpdater`.
References
----------
.. [2] Ristic, Branko & Vo, Ba-Toung & Vo, Ba-Ngu & Farina, Alfonso, A
Tutorial on Bernoulli Filters: Theory, Implementation and Applications,
2013, IEEE Transactions on Signal Processing, 61(13), 3406-3430.
"""
birth_probability: float = Property(
default=0.01,
doc="Probability of target birth.")
survival_probability: float = Property(
default=0.98,
doc="Probability of target survival")
birth_sampler: Sampler = Property(
default=None,
doc="Sampler object used for sampling birth particles. "
"Currently implementation assumes the :class:`~.DetectionSampler` is used")
[docs]
def predict(self, prior, timestamp=None, **kwargs):
"""Bernoulli Particle Filter prediction step
Parameters
----------
prior : :class:`~.BernoulliParticleState`
A prior state object
timestamp : :class:`~.datetime.datetime`
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state and existence"""
new_particle_state = copy.copy(prior)
nsurv_particles = new_particle_state.state_vector.shape[1]
# Calculate time interval
try:
time_interval = timestamp - new_particle_state.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
time_interval = None
# Sample from birth distribution
detections = self.get_detections(prior)
birth_state = self.birth_sampler.sample(detections)
birth_part = birth_state.state_vector
nbirth_particles = len(birth_state)
# Unite the surviving and birth particle sets in the prior
new_particle_state.state_vector = StateVectors(np.concatenate(
(new_particle_state.state_vector, birth_part), axis=1))
# Extend weights to match length of state_vector
new_log_weight_vector = np.concatenate(
(new_particle_state.log_weight, np.full(nbirth_particles, np.log(1/nbirth_particles))))
new_particle_state.log_weight = new_log_weight_vector - logsumexp(new_log_weight_vector)
untransitioned_state = Prediction.from_state(
new_particle_state,
parent=prior,
)
# Predict particles using the transition model
new_state_vector = self.transition_model.function(
new_particle_state,
noise=True,
time_interval=time_interval,
**kwargs)
# Estimate existence
predicted_existence = self.estimate_existence(
new_particle_state.existence_probability)
predicted_log_weights = self.predict_log_weights(
new_particle_state.existence_probability,
predicted_existence, nsurv_particles,
new_particle_state.log_weight)
# Create the prediction output
new_particle_state = Prediction.from_state(
new_particle_state,
state_vector=new_state_vector,
log_weight=predicted_log_weights,
existence_probability=predicted_existence,
parent=untransitioned_state,
timestamp=timestamp,
transition_model=self.transition_model
)
return new_particle_state
def estimate_existence(self, existence_prior):
existence_estimate = self.birth_probability * (1 - existence_prior) \
+ self.survival_probability * existence_prior
return existence_estimate
def predict_log_weights(self, prior_existence, predicted_existence, surv_part_size,
prior_log_weights):
# Weight prediction function currently assumes that the chosen importance density is the
# transition density. This will need to change if implementing a different importance
# density or incorporating visibility information
surv_weights = np.log((self.survival_probability*prior_existence)/predicted_existence) \
+ prior_log_weights[:surv_part_size]
birth_weights = np.log((self.birth_probability*(1-prior_existence))/predicted_existence) \
+ prior_log_weights[surv_part_size:]
predicted_log_weights = np.concatenate((surv_weights, birth_weights))
# Normalise weights
predicted_log_weights -= logsumexp(predicted_log_weights)
return predicted_log_weights
@staticmethod
def get_detections(prior):
detections = OrderedSet()
if hasattr(prior, 'hypothesis'):
if prior.hypothesis is not None:
for hypothesis in prior.hypothesis:
detections |= {hypothesis.measurement}
return detections