import copy
from collections.abc import Sequence
from enum import Enum
import numpy as np
from scipy.special import logsumexp
from ordered_set import OrderedSet
from .base import Predictor
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor, ExtendedKalmanPredictor
from ..base import Property
from ..models.transition import TransitionModel
from ..proposal.base import Proposal
from ..proposal.simple import PriorAsProposal
from ..sampler.particle import ParticleSampler
from ..types.numeric import Probability
from ..types.prediction import Prediction
from ..types.state import GaussianState
from ..sampler import Sampler
from ..types.array import StateVectors
[docs]
class ParticlePredictor(Predictor):
"""ParticlePredictor class
An implementation of a Particle Filter predictor.
"""
proposal: Proposal = Property(
default=None,
doc="A proposal object that generates samples from the proposal distribution. If `None`,"
"the transition model is used to generate samples.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.proposal is None:
self.proposal = PriorAsProposal(self.transition_model)
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, measurement=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.ParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
measurement: :class:`~.Detection`, optional
measurement used in the Kalman Filter proposal to update
the prediction
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
# Compute time_interval
try:
time_interval = timestamp - prior.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
time_interval = None
return self.proposal.rvs(prior,
noise=True,
time_interval=time_interval,
measurement=measurement,
**kwargs)
[docs]
class ParticleFlowKalmanPredictor(ParticlePredictor):
"""Gromov Flow Parallel Kalman Particle Predictor
This is a wrapper around the :class:`~.GromovFlowParticlePredictor` which
can use a :class:`~.ExtendedKalmanPredictor` or
:class:`~.UnscentedKalmanPredictor` in parallel in order to maintain a
state covariance, as proposed in [1]_.
This should be used in conjunction with the
:class:`~.ParticleFlowKalmanUpdater`.
Parameters
----------
References
----------
.. [1] Ding, Tao & Coates, Mark J., "Implementation of the Daum-Huang
Exact-Flow Particle Filter" 2012
"""
kalman_predictor: KalmanPredictor = Property(
default=None,
doc="Kalman predictor to use. Default `None` where a new instance of"
":class:`~.ExtendedKalmanPredictor` will be created utilising the"
"same transition model.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kalman_predictor is None:
self.kalman_predictor = ExtendedKalmanPredictor(
self.transition_model)
[docs]
def predict(self, prior, *args, **kwargs):
particle_prediction = super().predict(prior, *args, **kwargs)
kalman_prediction = self.kalman_predictor.predict(
GaussianState(prior.state_vector, prior.covar, prior.timestamp),
*args, **kwargs)
return Prediction.from_state(prior,
state_vector=particle_prediction.state_vector,
log_weight=particle_prediction.log_weight,
timestamp=particle_prediction.timestamp,
fixed_covar=kalman_prediction.covar,
transition_model=self.transition_model,
prior=prior)
[docs]
class MultiModelPredictor(Predictor):
"""MultiModelPredictor class
An implementation of a Particle Filter predictor utilising multiple models.
"""
transition_model = None
transition_models: Sequence[TransitionModel] = Property(
doc="Transition models used to for particle transition, selected by model index on "
"particle. Models dimensions can be subset of the overall state space, by "
"using :attr:`model_mappings`."
)
transition_matrix: np.ndarray = Property(
doc="n-model by n-model transition matrix."
)
model_mappings: Sequence[Sequence[int]] = Property(
doc="Sequence of mappings associated with each transition model. This enables mapping "
"between model and state space, enabling use of models that may have different "
"dimensions (e.g. velocity or acceleration). Parts of the state that aren't mapped "
"are set to zero.")
@property
def probabilities(self):
return np.cumsum(self.transition_matrix, axis=1)
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.MultiModelParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
new_particle_state = Prediction.from_state(
prior,
state_vector=copy.copy(prior.state_vector),
parent=prior,
dynamic_model=copy.copy(prior.dynamic_model),
timestamp=timestamp)
for model_index, transition_model in enumerate(self.transition_models):
# Change the value of the dynamic value randomly according to the defined
# transition matrix
current_model_indices = prior.dynamic_model == model_index
current_model_count = np.count_nonzero(current_model_indices)
if current_model_count == 0:
continue
new_dynamic_models = np.searchsorted(
self.probabilities[model_index],
np.random.random(size=current_model_count))
new_state_vector = self.apply_model(
prior[current_model_indices], transition_model, timestamp,
self.model_mappings[model_index], noise=True, **kwargs)
new_particle_state.state_vector[:, current_model_indices] = new_state_vector
new_particle_state.dynamic_model[current_model_indices] = new_dynamic_models
return new_particle_state
@staticmethod
def apply_model(prior, transition_model, timestamp, model_mapping, **kwargs):
# Based on given position mapping create a new state vector that contains only the
# required states
orig_ndim = prior.ndim
prior.state_vector = prior.state_vector[model_mapping, :]
new_state_vector = transition_model.function(
prior, time_interval=timestamp - prior.timestamp, **kwargs)
# Calculate the indices removed from the state vector to become compatible with the
# dynamic model
for j in range(orig_ndim):
if j not in model_mapping:
new_state_vector = np.insert(new_state_vector, j, 0, axis=0)
return new_state_vector
[docs]
class RaoBlackwellisedMultiModelPredictor(MultiModelPredictor):
"""Rao-Blackwellised Multi Model Predictor class
An implementation of a Particle Filter predictor utilising multiple models, with per
particle model probabilities.
"""
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, **kwargs):
"""Particle Filter prediction step
Parameters
----------
prior : :class:`~.RaoBlackwellisedParticleState`
A prior state object
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
new_particle_state = Prediction.from_state(
prior,
state_vector=copy.copy(prior.state_vector),
parent=prior,
timestamp=timestamp)
# Change the value of the dynamic value randomly according to the
# matrix
new_dynamic_models = np.argmax(
prior.model_probabilities.astype(float).cumsum(0) > np.random.rand(len(prior)),
axis=0)
for model_index, transition_model in enumerate(self.transition_models):
new_model_indices = new_dynamic_models == model_index
if np.count_nonzero(new_model_indices) == 0:
continue
new_state_vector = self.apply_model(
prior[new_model_indices], transition_model, timestamp,
self.model_mappings[model_index], noise=True, **kwargs)
new_particle_state.state_vector[:, new_model_indices] = new_state_vector
return new_particle_state
[docs]
class BernoulliParticlePredictor(ParticlePredictor):
"""Bernoulli Particle Filter Predictor class
An implementation of a particle filter predictor utilising the Bernoulli
filter formulation that estimates the spatial distribution of a single
target and estimates its existence, as described in [2]_.
This should be used in conjunction with the
:class:`~.BernoulliParticleUpdater`.
References
----------
.. [2] Ristic, Branko & Vo, Ba-Toung & Vo, Ba-Ngu & Farina, Alfonso, A
Tutorial on Bernoulli Filters: Theory, Implementation and Applications,
2013, IEEE Transactions on Signal Processing, 61(13), 3406-3430.
"""
birth_probability: float = Property(
default=0.01,
doc="Probability of target birth.")
survival_probability: float = Property(
default=0.98,
doc="Probability of target survival")
birth_sampler: Sampler = Property(
default=None,
doc="Sampler object used for sampling birth particles. "
"Currently implementation assumes the :class:`~.DetectionSampler` is used")
[docs]
def predict(self, prior, timestamp=None, **kwargs):
"""Bernoulli Particle Filter prediction step
Parameters
----------
prior : :class:`~.BernoulliParticleState`
A prior state object
timestamp : :class:`~.datetime.datetime`
A timestamp signifying when the prediction is performed
(the default is `None`)
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state and existence"""
new_particle_state = copy.copy(prior)
nsurv_particles = new_particle_state.state_vector.shape[1]
# Calculate time interval
try:
time_interval = timestamp - new_particle_state.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
time_interval = None
# Sample from birth distribution
detections = self.get_detections(prior)
birth_state = self.birth_sampler.sample(detections, **kwargs)
birth_part = birth_state.state_vector
nbirth_particles = len(birth_state)
# Unite the surviving and birth particle sets in the prior
new_particle_state.state_vector = StateVectors(np.concatenate(
(new_particle_state.state_vector, birth_part), axis=1))
# Extend weights to match length of state_vector
new_log_weight_vector = np.concatenate(
(new_particle_state.log_weight, np.full(nbirth_particles, np.log(1/nbirth_particles))))
new_particle_state.log_weight = new_log_weight_vector - logsumexp(new_log_weight_vector)
untransitioned_state = Prediction.from_state(
new_particle_state,
parent=prior,
)
# Predict particles using the transition model
new_state_vector = self.transition_model.function(
new_particle_state,
noise=True,
time_interval=time_interval,
**kwargs)
# Estimate existence
predicted_existence = self.estimate_existence(
new_particle_state.existence_probability)
predicted_log_weights = self.predict_log_weights(
new_particle_state.existence_probability,
predicted_existence, nsurv_particles,
new_particle_state.log_weight)
# Create the prediction output
new_particle_state = Prediction.from_state(
new_particle_state,
state_vector=new_state_vector,
log_weight=predicted_log_weights,
existence_probability=predicted_existence,
parent=untransitioned_state,
timestamp=timestamp,
transition_model=self.transition_model,
prior=prior
)
return new_particle_state
def estimate_existence(self, existence_prior):
existence_estimate = self.birth_probability * (1 - existence_prior) \
+ self.survival_probability * existence_prior
return existence_estimate
def predict_log_weights(self, prior_existence, predicted_existence, surv_part_size,
prior_log_weights):
# Weight prediction function currently assumes that the chosen importance density is the
# transition density. This will need to change if implementing a different importance
# density or incorporating visibility information
surv_weights = np.log((self.survival_probability*prior_existence)/predicted_existence) \
+ prior_log_weights[:surv_part_size]
birth_weights = np.log((self.birth_probability*(1-prior_existence))/predicted_existence) \
+ prior_log_weights[surv_part_size:]
predicted_log_weights = np.concatenate((surv_weights, birth_weights))
# Normalise weights
predicted_log_weights -= logsumexp(predicted_log_weights)
return predicted_log_weights
@staticmethod
def get_detections(prior):
detections = OrderedSet()
if hasattr(prior, 'hypothesis'):
if prior.hypothesis is not None:
for hypothesis in prior.hypothesis:
detections |= {hypothesis.measurement}
return detections
[docs]
class SMCPHDBirthSchemeEnum(Enum):
"""SMC-PHD Birth scheme enumeration"""
EXPANSION = 'expansion' #: Expansion birth scheme
MIXTURE = 'mixture' #: Mixture birth scheme
[docs]
class SMCPHDPredictor(Predictor):
"""Sequential Monte Carlo Probability Hypothesis Density (SMC-PHD) Predictor class
An implementation of a particle predictor that propagates only the first-order moment (i.e. the
Probability Hypothesis Density) of the multi-target state density based on [#phd]_.
.. note::
- It is assumed that the proposal distribution is the same as the dynamics
- Target "spawning" is not implemented
Parameters
----------
References
----------
.. [#phd] Ba-Ngu Vo, S. Singh and A. Doucet, "Sequential monte carlo implementation of the phd
filter for multi-target tracking," Sixth International Conference of Information
Fusion, 2003. Proceedings of the, Cairns, QLD, Australia, 2003, pp. 792-799,
doi: 10.1109/ICIF.2003.177320
"""
death_probability: Probability = Property(
doc="The probability of death per unit time. This is used to calculate the probability "
r"of survival as :math:`1 - \exp(-\lambda \Delta t)` where :math:`\lambda` is the "
r"probability of death and :math:`\Delta t` is the time interval")
birth_probability: Probability = Property(
doc="Probability of target birth. In the current implementation, this is used to calculate"
"the number of birth particles, as per the explanation under :py:attr:`~birth_scheme`")
birth_rate: float = Property(
doc="The expected number of new/born targets at each iteration. This is used to calculate"
"the weight of the birth particles")
birth_sampler: ParticleSampler = Property(
doc="Sampler object used for sampling birth particles. The weight of the sampled birth "
"particles is ignored and calculated internally based on the :py:attr:`~birth_rate` "
"and number of particles")
birth_func_num_samples_field: str = Property(
default='num_samples',
doc="The field name of the number of samples parameter for the birth sampler. This is "
"required since the number of samples required for the birth sampler may be vary"
"between iterations. Default is ``'num_samples'``")
birth_scheme: SMCPHDBirthSchemeEnum = Property(
default=SMCPHDBirthSchemeEnum.EXPANSION,
doc="The scheme for birth particles. Options are ``'expansion'`` | ``'mixture'``. Default "
"is ``'expansion'``.\n\n"
" - The ``'expansion'`` scheme follows the implementation of [#phd]_, meaning that "
"birth particles are appended to the list of surviving particles, where the number of "
"birth particles is computed as :math:`P_b N` where :math:`P_b` is the birth "
"probability and :math:`N` is the number of particles.\n"
" - The ``'mixture'`` scheme draws from a binomial distribution, with probability "
":math:`P_b`, for each particle to decide if it gets replaced by a birth particle. "
"The weights of the particles are then updated as a mixture of the survival and birth "
"probabilities."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Ensure birth scheme is a valid BirthSchemeEnum
self.birth_scheme = SMCPHDBirthSchemeEnum(self.birth_scheme)
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, random_state=None, **kwargs):
""" SMC-PHD prediction step
Parameters
----------
prior: :class:`~.ParticleState`
The prior state
timestamp: :class:`datetime.datetime`
The time at which to predict the next state
Returns
-------
: :class:`~.ParticleStatePrediction`
The predicted state
"""
num_samples = len(prior)
log_prior_weights = prior.log_weight
time_interval = timestamp - prior.timestamp
# Predict surviving particles forward
pred_particles_sv = self.transition_model.function(prior,
time_interval=time_interval,
noise=True,
random_state=random_state,
**kwargs)
# Calculate probability of survival
log_prob_survive = -float(self.death_probability) * time_interval.total_seconds()
# Perform birth and update weights
if self.birth_scheme == SMCPHDBirthSchemeEnum.EXPANSION:
# Expansion birth scheme, as described in [1]
# Compute number of birth particles (J_k) as a fraction of the number of particles
num_birth = round(float(self.birth_probability) * num_samples)
# Sample birth particles
birth_particles = self.birth_sampler.sample(
params={self.birth_func_num_samples_field: num_birth},
timestamp=timestamp,
random_state=random_state,
**kwargs)
# Ensure birth weights are uniform and scaled by birth rate
birth_particles.log_weight = np.full((num_birth,), np.log(self.birth_rate / num_birth))
# Surviving particle weights
log_pred_weights = log_prob_survive + log_prior_weights
# Append birth particles to predicted ones
pred_particles_sv = StateVectors(
np.concatenate((pred_particles_sv, birth_particles.state_vector), axis=1))
log_pred_weights = np.concatenate((log_pred_weights, birth_particles.log_weight))
else:
rng = random_state if random_state is not None else np.random
# Flip a coin for each particle to decide if it gets replaced by a birth particle
birth_inds = np.flatnonzero(
rng.binomial(1, float(self.birth_probability), num_samples)
)
# Sample birth particles and replace in original state vector matrix
num_birth = len(birth_inds)
if num_birth > 0:
birth_particles = self.birth_sampler.sample(
params={self.birth_func_num_samples_field: num_birth},
timestamp=timestamp,
random_state=random_state,
**kwargs)
# Replace particles in the state vector matrix
pred_particles_sv[:, birth_inds] = birth_particles.state_vector
# Process weights
prob_survive = np.exp(log_prob_survive)
birth_weight = self.birth_rate / num_samples
log_pred_weights = np.log(prob_survive + birth_weight) + log_prior_weights
prediction = Prediction.from_state(prior, state_vector=pred_particles_sv,
log_weight=log_pred_weights,
timestamp=timestamp, particle_list=None,
transition_model=self.transition_model)
return prediction