Source code for stonesoup.hypothesiser.probability

from functools import lru_cache

from scipy.stats import chi2
from scipy.linalg import det
from scipy.special import gamma
import numpy as np

from .base import Hypothesiser
from ..base import Property
from ..functions import batch_multivariate_normal_logpdf
from ..measures import SquaredMahalanobis
from ..types.detection import MissedDetection
from ..types.hypothesis import SingleProbabilityHypothesis
from ..types.multihypothesis import MultipleHypothesis
from ..types.numeric import Probability
from ..predictor import Predictor
from ..updater import Updater


[docs] class PDAHypothesiser(Hypothesiser): """Hypothesiser based on Probabilistic Data Association (PDA) Generate track predictions at detection times and calculate probabilities for all prediction-detection pairs for single prediction and multiple detections. """ predictor: Predictor = Property(doc="Predict tracks to detection times") updater: Updater = Property(doc="Updater used to get measurement prediction") clutter_spatial_density: float = Property( default=None, doc="Spatial density of clutter - tied to probability of false detection. Default is None " "where the clutter spatial density is calculated based on assumption that " "all but one measurement within the validation region of the track are clutter.") prob_detect: Probability = Property( default=Probability(0.85), doc="Target Detection Probability") prob_gate: Probability = Property( default=Probability(0.95), doc="Gate Probability - prob. gate contains true measurement " "if detected") include_all: bool = Property( default=False, doc="If `True`, hypotheses outside probability gates will be returned. This requires " "that the clutter spatial density is also provided, as it may not be possible to" "estimate this. Default `False`") normalise: bool = Property( default=True, doc="If `True`, hypotheses are normlised to total weight of 1. Default `True`") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.include_all and self.clutter_spatial_density is None: raise ValueError("Must provide clutter spatial density if including all hypotheses")
[docs] def hypothesise(self, track, detections, timestamp, **kwargs): r"""Evaluate and return all track association hypotheses. For a given track and a set of N detections, return a MultipleHypothesis with N+1 detections (first detection is a 'MissedDetection'), each with an associated probability. Probabilities are assumed to be exhaustive (sum to 1) and mutually exclusive (two detections cannot be the correct association at the same time). Detection 0: missed detection, none of the detections are associated with the track. Detection :math:`i, i \in {1...N}`: detection i is associated with the track. The probabilities for these detections are calculated as follow: .. math:: \beta_i(k) = \begin{cases} \frac{\mathcal{L}_{i}(k)}{1-P_{D}P_{G}+\sum_{j=1}^{m(k)} \mathcal{L}_{j}(k)}, \quad i=1,...,m(k) \\ \frac{1-P_{D}P_{G}}{1-P_{D}P_{G}+\sum_{j=1}^{m(k)} \mathcal{L}_{j}(k)}, \quad i=0 \end{cases} where .. math:: \mathcal{L}_{i}(k) = \frac{\mathcal{N}[z_{i}(k);\hat{z}(k|k-1), S(k)]P_{D}}{\lambda} :math:`\lambda` is the clutter density :math:`P_{D}` is the detection probability :math:`P_{G}` is the gate probability :math:`\mathcal{N}[z_{i}(k);\hat{z}(k|k-1),S(k)]` is the likelihood ratio of the measurement :math:`z_{i}(k)` originating from the track target rather than the clutter. NOTE: Since all probabilities have the same denominator and are normalized later, the denominator can be discarded. References: [1] "The Probabilistic Data Association Filter: Estimation in the Presence of Measurement Origin Uncertainty" - https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=5338565 [2] "Robotics 2 Data Association" (Lecture notes) - http://ais.informatik.uni-freiburg.de/teaching/ws11/robotics2/pdfs/rob2-20-dataassociation.pdf Parameters ---------- track : Track The track object to hypothesise on detections : set of :class:`~.Detection` The available detections timestamp : datetime.datetime A timestamp used when evaluating the state and measurement predictions. Note that if a given detection has a non empty timestamp, then prediction will be performed according to the timestamp of the detection. Returns ------- : :class:`~.MultipleHypothesis` A container of :class:`~.SingleProbabilityHypothesis` objects """ detections = list(detections) hypotheses = list() validated_measurements = 0 measure = SquaredMahalanobis(state_covar_inv_cache_size=None) # Common state & measurement prediction prediction = self.predictor.predict(track, timestamp=timestamp, **kwargs) # Missed detection hypothesis probability = Probability(1 - self.prob_detect*self.prob_gate) hypotheses.append( SingleProbabilityHypothesis( prediction, MissedDetection(timestamp=timestamp), probability )) predictions = [ self.predictor.predict(track, timestamp=detection.timestamp, **kwargs) for detection in detections ] measurement_predictions = [ self.updater.predict_measurement( predictions[n], detection.measurement_model, **kwargs ) for n, detection in enumerate(detections) ] logpdfs = batch_multivariate_normal_logpdf( [detection.state_vector for detection in detections], measurement_predictions, ) for n, detection in enumerate(detections): prediction = predictions[n] measurement_prediction = measurement_predictions[n] log_prob = logpdfs[n] probability = Probability(log_prob, log_value=True) if measure(measurement_prediction, detection) \ <= self._gate_threshold(self.prob_gate, measurement_prediction.ndim): validated_measurements += 1 valid_measurement = True else: # Will be gated out unless include_all is set valid_measurement = False if self.include_all or valid_measurement: probability *= self.prob_detect if self.clutter_spatial_density is not None: probability /= self.clutter_spatial_density # True detection hypothesis hypotheses.append( SingleProbabilityHypothesis( prediction, detection, probability, measurement_prediction)) if self.clutter_spatial_density is None: for hypothesis in hypotheses[1:]: # Skip missed detection hypothesis.probability *= self._validation_region_volume( self.prob_gate, hypothesis.measurement_prediction) / validated_measurements return MultipleHypothesis(hypotheses, normalise=self.normalise)
@classmethod @lru_cache() def _validation_region_volume(cls, prob_gate, meas_pred): n = meas_pred.ndim gate_threshold = cls._gate_threshold(prob_gate, n) c_z = np.pi**(n/2) / gamma(n/2 + 1) return c_z * gate_threshold**(n/2) * np.sqrt(det(meas_pred.covar)) @staticmethod @lru_cache() def _gate_threshold(prob_gate, n): return chi2.ppf(float(prob_gate), n)