Source code for stonesoup.dataassociator.probability

# -*- coding: utf-8 -*-

from .base import DataAssociator
from ..base import Property
from ..hypothesiser import Hypothesiser
from ..hypothesiser.probability import PDAHypothesiser
from ..types.detection import MissedDetection
from ..types.hypothesis import (
    SingleProbabilityHypothesis, ProbabilityJointHypothesis)
from ..types.multihypothesis import MultipleHypothesis
from ..types.numeric import Probability
import itertools


[docs]class PDA(DataAssociator): """Probabilistic Data Association (PDA) Given a set of detections and a set of tracks, each track has a probability that it is associated to each specific detection. """ hypothesiser: Hypothesiser = Property( doc="Generate a set of hypotheses for each prediction-detection pair")
[docs] def associate(self, tracks, detections, timestamp, **kwargs): # Generate a set of hypotheses for each track on each detection hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs) # Ensure association probabilities are normalised for track, hypothesis in hypotheses.items(): hypothesis.normalise_probabilities(total_weight=1) return hypotheses
[docs]class JPDA(DataAssociator): r"""Joint Probabilistic Data Association (JPDA) Given a set of Detections and a set of Tracks, each Detection has a probability that it is associated with each specific Track. Rather than associate specific Detections/Tracks, JPDA calculates the new state of a Track based on its possible association with ALL Detections. The new state is a Gaussian Mixture, reduced to a single Gaussian. If .. math:: prob_{association(Detection, Track)} < \frac{prob_{association(MissedDetection, Track)}}{gate\ ratio} then Detection is assumed to be outside Track's gate, and the probability of association is dropped from the Gaussian Mixture. This calculation takes place in the function :meth:`enumerate_JPDA_hypotheses`. """ hypothesiser: PDAHypothesiser = Property( doc="Generate a set of hypotheses for each prediction-detection pair")
[docs] def associate(self, tracks, detections, timestamp, **kwargs): # Calculate MultipleHypothesis for each Track over all # available Detections hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs) # enumerate the Joint Hypotheses of track/detection associations joint_hypotheses = \ self.enumerate_JPDA_hypotheses(tracks, hypotheses) # Calculate MultiMeasurementHypothesis for each Track over all # available Detections with probabilities drawn from JointHypotheses new_hypotheses = dict() for track in tracks: single_measurement_hypotheses = list() # record the MissedDetection hypothesis for this track prob_misdetect = Probability.sum( joint_hypothesis.probability for joint_hypothesis in joint_hypotheses if not joint_hypothesis.hypotheses[track].measurement) single_measurement_hypotheses.append( SingleProbabilityHypothesis( hypotheses[track][0].prediction, MissedDetection(timestamp=timestamp), measurement_prediction=hypotheses[track][0].measurement_prediction, probability=prob_misdetect)) # record hypothesis for any given Detection being associated with # this track for hypothesis in hypotheses[track]: if not hypothesis: continue pro_detect_assoc = Probability.sum( joint_hypothesis.probability for joint_hypothesis in joint_hypotheses if joint_hypothesis.hypotheses[track].measurement is hypothesis.measurement) single_measurement_hypotheses.append( SingleProbabilityHypothesis( hypothesis.prediction, hypothesis.measurement, measurement_prediction=hypothesis.measurement_prediction, probability=pro_detect_assoc)) result = MultipleHypothesis(single_measurement_hypotheses, True, 1) new_hypotheses[track] = result return new_hypotheses
@classmethod def enumerate_JPDA_hypotheses(cls, tracks, multihypths): joint_hypotheses = list() if not tracks: return joint_hypotheses # perform a simple level of gating - all track/detection pairs for # which the probability of association is a certain multiple less # than the probability of missed detection - detection is outside the # gating region, association is impossible possible_assoc = list() for track in tracks: track_possible_assoc = list() for hypothesis in multihypths[track]: # Always include missed detection (gate ratio < 1) track_possible_assoc.append(hypothesis) possible_assoc.append(track_possible_assoc) # enumerate all valid JPDA joint hypotheses enum_JPDA_hypotheses = ( joint_hypothesis for joint_hypothesis in itertools.product(*possible_assoc) if cls.isvalid(joint_hypothesis)) # turn the valid JPDA joint hypotheses into 'JointHypothesis' for joint_hypothesis in enum_JPDA_hypotheses: local_hypotheses = {} for track, hypothesis in zip(tracks, joint_hypothesis): local_hypotheses[track] = \ multihypths[track][hypothesis.measurement] joint_hypotheses.append( ProbabilityJointHypothesis(local_hypotheses)) # normalize ProbabilityJointHypotheses relative to each other sum_probabilities = Probability.sum(hypothesis.probability for hypothesis in joint_hypotheses) for hypothesis in joint_hypotheses: hypothesis.probability /= sum_probabilities return joint_hypotheses @staticmethod def isvalid(joint_hypothesis): # 'joint_hypothesis' represents a valid joint hypothesis if # no measurement is repeated (ignoring missed detections) measurements = set() for hypothesis in joint_hypothesis: measurement = hypothesis.measurement if not measurement: pass elif measurement in measurements: return False else: measurements.add(measurement) return True