Source code for stonesoup.updater.probability

# -*- coding: utf-8 -*-
import numpy as np
from copy import copy

from .kalman import ExtendedKalmanUpdater
from ..types.update import Update
from ..types.array import StateVectors
from ..functions import gm_reduce_single


[docs] class PDAUpdater(ExtendedKalmanUpdater): r"""An updater which undertakes probabilistic data association (PDA), as defined in [#]_. It differs slightly from the Kalman updater it inherits from in that instead of a single hypothesis object, the :meth:`update` method takes a hypotheses object returned by a :class:`~.PDA` (or similar) data associator. Functionally this is a list of single hypothesis objects which group tracks together with associated measuments and probabilities. The :class:`~.ExtendedKalmanUpdater` is used in order to inherit the ability to cope with (slight) non-linearities. Other inheritance structures should be trivial to implement. The update step proceeds as: .. math:: \mathbf{x}_{k|k} &= \mathbf{x}_{k|k-1} + K_k \mathbf{y}_k P_{k|k} &= \beta_0 P_{k|k-1} + (1 - \beta_0) P_{k|k} + \tilde{P} where :math:`K_k` and :math:`P_{k|k}` are the Kalman gain and posterior covariance respectively returned by the single-target Kalman update, :math:`\beta_0` is the probability of missed detection. In this instance :math:`\mathbf{y}_k` is the *combined* innovation, over :math:`m_k` detections: .. math:: \mathbf{y}_k = \Sigma_{i=1}^{m_k} \beta_i \mathbf{y}_{k,i}. The posterior covariance is composed of a term to account for the covariance due to missed detection, that due to the true detection, and a term (:math:`\tilde{P}`) which quantifies the effect of the measurement origin uncertainty. .. math:: \tilde{P} \triangleq K_k [ \Sigma_{i=1}^{m_k} \beta_i \mathbf{y}_{k,i}\mathbf{y}_{k,i}^T - \mathbf{y}_k \mathbf{y}_k^T ] K_k^T A method for updating via a Gaussian mixture reduction is also provided. In this latter case, each of the hypotheses, including that for a missed detection, is updated and then a weighted Gaussian reduction is used to resolve the hypotheses to a single Gaussian distribution. The reason this is equivalent to the innovation-based method is shown in [#]_. References ---------- .. [#] Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control Systems Magazine .. [#] https://gist.github.com/jmbarr/92dc83e28c04026136d4f8706a1159c1 """
[docs] def update(self, hypotheses, gm_method=False, **kwargs): r"""The update step. Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` The prediction-measurement association hypotheses. This hypotheses object carries tracks, associated sets of measurements for each track together with a probability measure which enumerates the likelihood of each track-measurement pair. (This is most likely output by the :class:`~.PDA` associator). In a single case (the missed detection hypothesis), the hypothesis will not have an associated measurement or measurement prediction. gm_method : bool Use the innovation-based update method if False (default), or the GM-reduction (True). **kwargs : various These are passed to :meth:`predict_measurement` Returns ------- : :class:`GaussianUpdate` The update, :math:`\mathbf{x}_{k|k}, P_{k|k}` """ if gm_method: posterior_mean, posterior_covariance = self._update_via_GM_reduction(hypotheses, **kwargs) else: posterior_mean, posterior_covariance = self._update_via_innovation(hypotheses, **kwargs) # Note that this does not check if all hypotheses are of the same type. # It also assumes that all measurements have the same timestamp (updates are # contemporaneous). return Update.from_state( hypotheses[0].prediction, posterior_mean, posterior_covariance, timestamp=hypotheses[0].measurement.timestamp, hypothesis=hypotheses)
def _update_via_GM_reduction(self, hypotheses, **kwargs): """This method delivers the same outcome as what's described above. It's slightly different, but potentially more intuitive. Here, each of the hypotheses, including missed detection, are updated and then a weighted Gaussian reduction is used to resolve the hypotheses to a single Gaussian distribution. The reason this is equivalent is shown in _[#] Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` As in :meth:`update` method **kwargs : various These are passed to :class:`~.ExtendedKalmanUpdater`:meth:`update` Returns ------- : :class:`~.StateVector` The mean of the reduced/single Gaussian : :class:`~.CovarianceMatrix` The covariance of the reduced/single Gaussian """ posterior_states = [] posterior_state_weights = [] for hypothesis in hypotheses: if not hypothesis: posterior_states.append(hypothesis.prediction) else: posterior_state = super().update(hypothesis, **kwargs) # Use the EKF update posterior_states.append(posterior_state) posterior_state_weights.append(hypothesis.probability) means = StateVectors([state.state_vector for state in posterior_states]) covars = np.stack([state.covar for state in posterior_states], axis=2) weights = np.asarray(posterior_state_weights) # Reduce mixture of states to one posterior estimate Gaussian. post_mean, post_covar = gm_reduce_single(means, covars, weights) return post_mean, post_covar def _update_via_innovation(self, hypotheses, **kwargs): """Of n hypotheses there should be 1 prediction (a missed detection hypothesis) and n-1 different measurement associations. The update proceeds as described above. Parameters ---------- hypotheses : :class:`~.MultipleHypothesis` As in :meth:`update` method **kwargs : various These are passed to :meth:`predict_measurement` Returns ------- : :class:`~.StateVector` The mean of the reduced/single Gaussian : :class:`~.CovarianceMatrix` The covariance of the reduced/single Gaussian """ for n, hypothesis in enumerate(hypotheses): # Check for the existence of an associated measurement. Because of the way the # hypothesis is constructed, you can do this: if not hypothesis: hypothesis.measurement_prediction = self.predict_measurement( hypothesis.prediction, **kwargs) innovation = hypothesis.measurement_prediction.state_vector - \ hypothesis.measurement_prediction.state_vector # is zero in this case posterior_covariance, kalman_gain = self._posterior_covariance(hypothesis) # Add the weighted prediction to the weighted posterior posterior_covariance = float(hypothesis.probability) * \ hypothesis.prediction.covar + (1 - float(hypothesis.probability)) * \ posterior_covariance posterior_mean = copy(hypothesis.prediction.state_vector) else: innovation = hypothesis.measurement.state_vector - \ hypothesis.measurement_prediction.state_vector # probably exists a less clunky way of doing this using exists() or overwritten += # All these floats should be redundant if/when the bug in Probability.__mult__() is # fixed. if n == 0: sum_of_innovations = float(hypothesis.probability) * innovation sum_of_weighted_cov = float(hypothesis.probability) * (innovation @ innovation.T) else: sum_of_innovations += float(hypothesis.probability) * innovation sum_of_weighted_cov += float(hypothesis.probability) * (innovation @ innovation.T) posterior_mean += kalman_gain @ sum_of_innovations posterior_covariance += \ kalman_gain @ (sum_of_weighted_cov - sum_of_innovations @ sum_of_innovations.T) \ @ kalman_gain.T return posterior_mean, posterior_covariance