from .base import DataAssociator
from ..base import Property
from ..hypothesiser import Hypothesiser
from ..hypothesiser.probability import PDAHypothesiser
from ..types.detection import MissedDetection
from ..types.hypothesis import (
SingleProbabilityHypothesis, ProbabilityJointHypothesis)
from ..types.multihypothesis import MultipleHypothesis
from ..types.numeric import Probability
import itertools
import numpy as np
[docs]
class PDA(DataAssociator):
"""Probabilistic Data Association (PDA)
Given a set of detections and a set of tracks, each track has a
probability that it is associated to each specific detection.
"""
hypothesiser: Hypothesiser = Property(
doc="Generate a set of hypotheses for each prediction-detection pair")
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
# Generate a set of hypotheses for each track on each detection
hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs)
# Ensure association probabilities are normalised
for track, hypothesis in hypotheses.items():
hypothesis.normalise_probabilities(total_weight=1)
return hypotheses
[docs]
class JPDA(DataAssociator):
r"""Joint Probabilistic Data Association (JPDA)
Given a set of Detections and a set of Tracks, each Detection has a
probability that it is associated with each specific Track. Rather than
associate specific Detections/Tracks, JPDA calculates the new state of a
Track based on its possible association with ALL Detections. The new
state is a Gaussian Mixture, reduced to a single Gaussian.
If
.. math::
prob_{association(Detection, Track)} <
\frac{prob_{association(MissedDetection, Track)}}{gate\ ratio}
then Detection is assumed to be outside Track's gate, and the probability
of association is dropped from the Gaussian Mixture. This calculation
takes place in the function :meth:`enumerate_JPDA_hypotheses`.
"""
hypothesiser: PDAHypothesiser = Property(
doc="Generate a set of hypotheses for each prediction-detection pair")
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
# Calculate MultipleHypothesis for each Track over all
# available Detections
hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs)
# enumerate the Joint Hypotheses of track/detection associations
joint_hypotheses = \
self.enumerate_JPDA_hypotheses(tracks, hypotheses)
# Calculate MultiMeasurementHypothesis for each Track over all
# available Detections with probabilities drawn from JointHypotheses
new_hypotheses = dict()
for track in tracks:
single_measurement_hypotheses = list()
# record the MissedDetection hypothesis for this track
prob_misdetect = Probability.sum(
joint_hypothesis.probability
for joint_hypothesis in joint_hypotheses
if not joint_hypothesis.hypotheses[track].measurement)
single_measurement_hypotheses.append(
SingleProbabilityHypothesis(
hypotheses[track][0].prediction,
MissedDetection(timestamp=timestamp),
measurement_prediction=hypotheses[track][0].measurement_prediction,
probability=prob_misdetect))
# record hypothesis for any given Detection being associated with
# this track
for hypothesis in hypotheses[track]:
if not hypothesis:
continue
pro_detect_assoc = Probability.sum(
joint_hypothesis.probability
for joint_hypothesis in joint_hypotheses
if joint_hypothesis.hypotheses[track].measurement is hypothesis.measurement)
single_measurement_hypotheses.append(
SingleProbabilityHypothesis(
hypothesis.prediction,
hypothesis.measurement,
measurement_prediction=hypothesis.measurement_prediction,
probability=pro_detect_assoc))
result = MultipleHypothesis(single_measurement_hypotheses, True, 1)
new_hypotheses[track] = result
return new_hypotheses
@classmethod
def enumerate_JPDA_hypotheses(cls, tracks, multihypths):
joint_hypotheses = list()
if not tracks:
return joint_hypotheses
# perform a simple level of gating - all track/detection pairs for
# which the probability of association is a certain multiple less
# than the probability of missed detection - detection is outside the
# gating region, association is impossible
possible_assoc = list()
for track in tracks:
track_possible_assoc = list()
for hypothesis in multihypths[track]:
# Always include missed detection (gate ratio < 1)
track_possible_assoc.append(hypothesis)
possible_assoc.append(track_possible_assoc)
# enumerate all valid JPDA joint hypotheses
enum_JPDA_hypotheses = (
joint_hypothesis
for joint_hypothesis in itertools.product(*possible_assoc)
if cls.isvalid(joint_hypothesis))
# turn the valid JPDA joint hypotheses into 'JointHypothesis'
for joint_hypothesis in enum_JPDA_hypotheses:
local_hypotheses = {}
for track, hypothesis in zip(tracks, joint_hypothesis):
local_hypotheses[track] = \
multihypths[track][hypothesis.measurement]
joint_hypotheses.append(
ProbabilityJointHypothesis(local_hypotheses))
# normalize ProbabilityJointHypotheses relative to each other
sum_probabilities = Probability.sum(hypothesis.probability
for hypothesis in joint_hypotheses)
for hypothesis in joint_hypotheses:
hypothesis.probability /= sum_probabilities
return joint_hypotheses
@staticmethod
def isvalid(joint_hypothesis):
# 'joint_hypothesis' represents a valid joint hypothesis if
# no measurement is repeated (ignoring missed detections)
measurements = set()
for hypothesis in joint_hypothesis:
measurement = hypothesis.measurement
if not measurement:
pass
elif measurement in measurements:
return False
else:
measurements.add(measurement)
return True
[docs]
class JPDAwithLBP(JPDA):
""" Joint Probabilistic Data Association with Loopy Belief Propagation
This is a faster alternative of the standard :class:`~.JPDA` algorithm, which makes use of
Loopy Belief Propagation (LBP) to efficiently approximately compute the marginal association
probabilities of tracks to measurements. See Williams and Lau (2014) for further details.
Reference
----------
Jason L. Williams and Rosalyn A. Lau, Approximate evaluation of marginal association
probabilities with belief propagation, IEEE Transactions on Aerospace and Electronic Systems,
vol 50(4), pp. 2942-2959, 2014.
"""
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
"""Associate tracks and detections
Parameters
----------
tracks : set of :class:`stonesoup.types.track.Track`
Tracks which detections will be associated to.
detections : set of :class:`stonesoup.types.detection.Detection`
Detections to be associated to tracks.
timestamp : :class:`datetime.datetime`
Timestamp to be used for missed detections and to predict to.
Returns
-------
: mapping of :class:`stonesoup.types.track.Track` : :class:`stonesoup.types.hypothesis.Hypothesis`
Mapping of track to Hypothesis
""" # noqa: E501
# Calculate MultipleHypothesis for each Track over all available Detections
hypotheses = {
track: self.hypothesiser.hypothesise(track, detections, timestamp)
for track in tracks}
if not hypotheses or not detections: # No tracks or no detections
return hypotheses
else:
return self._compute_multi_hypotheses(tracks, detections, hypotheses, timestamp)
@staticmethod
def _calc_likelihood_matrix(tracks, detections, hypotheses):
""" Compute the likelihood matrix (i.e. single target association weights)
Parameters
----------
tracks: list of :class:`stonesoup.types.track.Track`
Current tracked objects
detections : list of :class:`stonesoup.types.detection.Detection`
Retrieved measurements
hypotheses: dict
Key value pairs of tracks with associated detections
Returns
-------
:class:`numpy.ndarray`
An indicator matrix of shape (num_tracks, num_detections + 1) indicating the possible
(aka. valid) associations between tracks and detections. The first column corresponds
to the null hypothesis (hence contains all ones).
:class:`numpy.ndarray`
A matrix of shape (num_tracks, num_detections + 1) containing the unnormalised
likelihoods for all combinations of tracks and detections. The first column corresponds
to the null hypothesis.
"""
# Construct validation and likelihood matrices
# Both matrices have shape (num_tracks, num_detections + 1), where the first column
# corresponds to the null hypothesis.
num_tracks, num_detections = len(tracks), len(detections)
likelihood_matrix = np.zeros((num_tracks, num_detections + 1))
for i, track in enumerate(tracks):
for hyp in hypotheses[track]:
if not hyp:
likelihood_matrix[i, 0] = hyp.weight
else:
j = next(d_i for d_i, detection in enumerate(detections)
if hyp.measurement is detection)
likelihood_matrix[i, j + 1] = hyp.weight
# change the normalisation of the likelihood matrix to have the no measurement
# association hypothesis normalised to unity
likelihood_matrix /= likelihood_matrix[:, [0]]
return likelihood_matrix.astype(float)
@staticmethod
def _loopy_belief_propagation(likelihood_matrix, n_iterations, delta):
"""
Perform loopy belief propagation (Williams and Lau, 2014) to determine the approximate
marginal association probabilities (of tracks to measurements). This requires:
1. likelihood_matrix = single target association weights
2. n_iterations = number of iterations between convergence checks
3. delta = deviation tolerance(of approximate weights from true weights)
"""
# number of tracks
num_tracks = likelihood_matrix.shape[0]
# number of measurements/detections
num_measurements = likelihood_matrix.shape[1] - 1
# initialise
iteration: int = 0
alpha: float = 1.0
d: float = 0.0
# allocate memory
nu = np.ones((num_tracks, num_measurements))
nu_tilde = np.zeros((num_tracks, num_measurements))
assoc_prob_matrix = np.zeros((num_tracks, num_measurements + 1))
# determine W_star
w_star: float = np.max(np.sum(likelihood_matrix[:, 1:], axis=1))
# loopy belief propagation
while iteration == 0 or (alpha * d) / (1 - alpha) >= 0.5 * np.log10(1 + delta):
for k in range(1, n_iterations + 1):
# increment the number of iterations
iteration += 1
# calculate L-R message
val = likelihood_matrix[:, 1:] * nu
# Minus val to remove j = j'
s = 1 + np.sum(val, axis=1, keepdims=True) - val
mu = likelihood_matrix[:, 1:] / s
# save values for convergence check
if k == n_iterations:
nu_tilde = nu.copy()
# calculate R-L messages
nu = 1 / (1 + np.sum(mu, axis=0, keepdims=True) - mu)
# check for convergence
d = np.max(np.abs(np.log10(nu / nu_tilde)))
# determine alpha
if d > 0:
alpha = np.log10((1 + w_star*d) / (1 + w_star))
alpha /= np.log10(d)
else:
alpha = 0.0
# if w_star has a very large value, alpha = 1 which causes division by zero in the
# convergence check therefore, set alpha to be a nominal value just short of unity
if alpha == 1:
alpha = (1 - 1e-10)
# calculate marginal probabilities (beliefs)
s = 1 + np.sum(likelihood_matrix[:, 1:] * nu, axis=1, keepdims=True)
assoc_prob_matrix[:, :1] = 1 / s
assoc_prob_matrix[:, 1:] = (likelihood_matrix[:, 1:] * nu) / s
# return the matrix of marginal association probabilities
return assoc_prob_matrix.astype(float)
@classmethod
def _compute_multi_hypotheses(cls, tracks, detections, hypotheses, time):
# Tracks and detections must be in a list, so we can keep track of their order
track_list = list(tracks)
detection_list = list(detections)
# calculate the single target association weights
likelihood_matrix = cls._calc_likelihood_matrix(track_list, detection_list, hypotheses)
# Run Loopy Belief Propagation to determine the marginal association probability matrix
n_iterations: int = 1
delta: float = 0.001
assoc_prob_matrix = cls._loopy_belief_propagation(likelihood_matrix, n_iterations, delta)
# Calculate MultiMeasurementHypothesis for each Track over all
# available Detections with probabilities drawn from the association matrix
new_hypotheses = dict()
for i, track in enumerate(track_list):
single_measurement_hypotheses = list()
# Null measurement hypothesis
null_hypothesis = next((hyp for hyp in hypotheses[track] if not hyp), None)
prob_misdetect = Probability(assoc_prob_matrix[i, 0])
single_measurement_hypotheses.append(
SingleProbabilityHypothesis(
null_hypothesis.prediction,
MissedDetection(timestamp=time),
measurement_prediction=null_hypothesis.measurement_prediction,
probability=prob_misdetect))
# True hypotheses
for hypothesis in hypotheses[track]:
if not hypothesis:
continue
# Get the detection index
j = next(d_i + 1 for d_i, detection in enumerate(detection_list)
if hypothesis.measurement is detection)
pro_detect_assoc = Probability(assoc_prob_matrix[i, j])
single_measurement_hypotheses.append(
SingleProbabilityHypothesis(
hypothesis.prediction,
hypothesis.measurement,
measurement_prediction=hypothesis.measurement_prediction,
probability=pro_detect_assoc))
new_hypotheses[track] = MultipleHypothesis(single_measurement_hypotheses, True, 1)
return new_hypotheses