Source code for stonesoup.updater.pointprocess

from abc import abstractmethod

from scipy.stats import multivariate_normal
import numpy as np

from ..base import Base, Property
from .kalman import KalmanUpdater
from ..types.update import GaussianMixtureUpdate
from ..types.state import TaggedWeightedGaussianState
from ..types.numeric import Probability


[docs] class PointProcessUpdater(Base): r""" Base updater class for the implementation of any Gaussian Mixture (GM) point process derived multi target filters such as the Probability Hypothesis Density (PHD), Cardinalised Probability Hypothesis Density (CPHD) or Linear Complexity with Cumulants (LCC) filters """ updater: KalmanUpdater = Property( doc="Underlying updater used to perform the \ single target Kalman Update.") clutter_spatial_density: float = Property( default=1e-26, doc="Spatial density of the clutter process uniformly\ distributed across the state space.") normalisation: bool = Property( default=True, doc="Flag for normalisation") prob_detection: Probability = Property( default=1, doc="Probability of a target being detected at the current timestep") prob_survival: Probability = Property( default=1, doc="Probability of a target surviving until the next timestep")
[docs] def update(self, hypotheses): """ Updates the current components in a :class:`GaussianMixture` by applying the underlying \ :class:`KalmanUpdater` updater to each component \ with the supplied measurements. Parameters ========== hypotheses : list of :class:`MultipleHypothesis` Measurements obtained at time :math:`k+1` Returns ======= updated_components : :class:`GaussianMixtureUpdate` GaussianMixtureMultiTargetTracker with updated \ components at time :math:`k+1` """ updated_components = list() weight_sum_list = list() # Loop over all measurements for multi_hypothesis in hypotheses[:-1]: updated_measurement_components = list() # Initialise weight sum for measurement to clutter intensity weight_sum = 0 # For every valid single hypothesis, update that component with # measurements and calculate new weight for hypothesis in multi_hypothesis: measurement_prediction = \ self.updater.predict_measurement( hypothesis.prediction, hypothesis.measurement.measurement_model) measurement = hypothesis.measurement prediction = hypothesis.prediction # Calculate new weight and add to weight sum q = multivariate_normal.pdf( measurement.state_vector.flatten(), mean=measurement_prediction.mean.flatten(), cov=measurement_prediction.covar ) new_weight = self.prob_detection\ * prediction.weight * q * self.prob_survival weight_sum += new_weight # Perform single target Kalman Update temp_updated_component = self.updater.update(hypothesis) updated_component = TaggedWeightedGaussianState( tag=prediction.tag if prediction.tag != "birth" else None, weight=new_weight, state_vector=temp_updated_component.mean, covar=temp_updated_component.covar, timestamp=temp_updated_component.timestamp ) # Add updated component to mixture updated_measurement_components.append(updated_component) weight_sum_list.append(weight_sum) for component in updated_measurement_components: if self.normalisation: component.weight /= \ (weight_sum + self.clutter_spatial_density) updated_components.append(component) # Calculate the correction terms l1 = self._calculate_update_terms(weight_sum_list, hypotheses) for missed_detected_hypotheses in hypotheses[-1]: # Add all active components except birth component back into # mixture as miss detected components if missed_detected_hypotheses.prediction.tag != "birth": component = TaggedWeightedGaussianState( tag=missed_detected_hypotheses.prediction.tag, weight=missed_detected_hypotheses.prediction.weight * (1-self.prob_detection) * l1, state_vector=missed_detected_hypotheses.prediction.mean, covar=missed_detected_hypotheses.prediction.covar, timestamp=missed_detected_hypotheses.prediction.timestamp) updated_components.append(component) # Ensure that no component has a weight of 0. This avoids divide # by 0 bugs in the GaussianMixtureReducer for component in updated_components: if component.weight == 0: component.weight = np.finfo(float).eps # Return updated components return GaussianMixtureUpdate(hypothesis=hypotheses, components=updated_components)
@abstractmethod def _calculate_update_terms(self, updated_sum_list, hypotheses): raise NotImplementedError
[docs] class PHDUpdater(PointProcessUpdater): """ A implementation of the Gaussian Mixture Probability Hypothesis Density (GM-PHD) multi-target filter References ---------- [1] B.-N. Vo and W.-K. Ma, “The Gaussian Mixture Probability Hypothesis Density Filter,” Signal Processing,IEEE Transactions on, vol. 54, no. 11, pp. 4091–4104, 2006. https://ieeexplore.ieee.org/document/1710358. [2] D. E. Clark, K. Panta and B. Vo, "The GM-PHD Filter Multiple Target Tracker," 2006 9th International Conference on Information Fusion, 2006, pp. 1-8, doi: 10.1109/ICIF.2006.301809. https://ieeexplore.ieee.org/document/4086095. """ @staticmethod def _calculate_update_terms(updated_sum_list, hypotheses): return 1
[docs] class LCCUpdater(PointProcessUpdater): """ A implementation of the Gaussian Mixture Linear Complexity with Cumulants (GM-LCC) multi-target filter References ---------- [1] D. E. Clark and F. De Melo. “A Linear-Complexity Second-Order Multi-Object Filter via Factorial Cumulants”. In: 2018 21st International Conference on Information Fusion (FUSION). 2018. DOI: 10. 23919/ICIF.2018.8455331. https://ieeexplore.ieee.org/document/8455331.. """ mean_number_of_false_alarms: float = Property( default=1, doc="Mean number of false alarms (clutter) expected per timestep") variance_of_false_alarms: float = Property( default=1, doc="Variance on the number of false alarms (clutter) expected per timestep") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.second_order_cumulant = 0 def _calculate_update_terms(self, updated_sum_list, hypotheses): """ Calculate the higher order terms used in the LCC Filter """ # Get the predicted weight sum predicted_weight_sum =\ Probability.sum(hypothesis.prediction.weight for hypothesis in hypotheses[-1]) * self.prob_survival # Second order predicted cumulant c(2) predicted_c2 = self.second_order_cumulant * self.prob_survival**2 # Detected predicted weight mu_d detected_weight_sum = self.prob_detection*predicted_weight_sum # Detected predicted weight mu_phi misdetected_weight_sum = (1-self.prob_detection)*predicted_weight_sum # Calculate the alpha of the predicted Panjer process alpha_pred = ((predicted_weight_sum + self.mean_number_of_false_alarms)**2)\ / (predicted_c2+self.second_order_false_alarm_cumulant+1e-26) # Calculate l1 and l2 correction factors denominator = alpha_pred + detected_weight_sum \ + self.mean_number_of_false_alarms number_of_measurements = len(hypotheses)-1 numerator = alpha_pred + number_of_measurements l1 = numerator/denominator l2 = numerator/(denominator**2) # Calculate updated c(2) detected_c2 = \ Probability.sum([weight_sum/((weight_sum + self.clutter_spatial_density) ** 2) for weight_sum in updated_sum_list]) misdetected_c2 = (misdetected_weight_sum**2)*l2 self.second_order_cumulant = misdetected_c2 - detected_c2 # Return the l1 correction factor for miss detected weight update return np.float64(l1) @property def second_order_false_alarm_cumulant(self): return self.variance_of_false_alarms - self.mean_number_of_false_alarms