# Source code for stonesoup.dataassociator.neighbour

```
import itertools
import numpy as np
from scipy.optimize import linear_sum_assignment
from .base import DataAssociator
from ..base import Property
from ..hypothesiser import Hypothesiser
from ..types.hypothesis import SingleHypothesis, JointHypothesis, \
ProbabilityHypothesis
[docs]
class NearestNeighbour(DataAssociator):
"""Nearest Neighbour Associator
Scores and associates detections to a predicted state using the Nearest
Neighbour method.
"""
hypothesiser: Hypothesiser = Property(
doc="Generate a set of hypotheses for each prediction-detection pair")
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
# Generate a set of hypotheses for each track on each detection
hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs)
# Only associate tracks with one or more hypotheses
associate_tracks = {track
for track, track_hypotheses in hypotheses.items()
if track_hypotheses}
associations = {}
associated_measurements = set()
while associate_tracks > associations.keys():
# Define a 'greedy' association
best_hypothesis = None
for track in associate_tracks - associations.keys():
for hypothesis in hypotheses[track]:
# A measurement may only be associated with a single track
if hypothesis.measurement in associated_measurements:
continue
# best_hypothesis is 'greater than' other
if (best_hypothesis is None
or hypothesis > best_hypothesis):
best_hypothesis = hypothesis
best_hypothesis_track = track
associations[best_hypothesis_track] = best_hypothesis
if best_hypothesis:
associated_measurements.add(best_hypothesis.measurement)
return associations
[docs]
class GlobalNearestNeighbour(DataAssociator):
"""Global Nearest Neighbour Associator
Scores and associates detections to a predicted state using the Global
Nearest Neighbour method, assuming a distance-based hypothesis score.
"""
hypothesiser: Hypothesiser = Property(
doc="Generate a set of hypotheses for each prediction-detection pair")
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
# Generate a set of hypotheses for each track on each detection
hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs)
# Link hypotheses into a set of joint_hypotheses and evaluate
joint_hypotheses = self.enumerate_joint_hypotheses(hypotheses)
associations = max(joint_hypotheses)
return associations
[docs]
@staticmethod
def isvalid(joint_hypothesis):
"""Determine whether a joint_hypothesis is valid.
Check the set of hypotheses that define a joint hypothesis to ensure a
single detection is not associated to more than one track.
Parameters
----------
joint_hypothesis : :class:`JointHypothesis`
A set of hypotheses linking each prediction to a single detection
Returns
-------
bool
Whether joint_hypothesis is a valid set of hypotheses
"""
number_hypotheses = len(joint_hypothesis)
unique_hypotheses = len(
{hyp.measurement for hyp in joint_hypothesis if hyp})
number_null_hypotheses = sum(not hyp for hyp in joint_hypothesis)
# joint_hypothesis is invalid if one detection is assigned to more than
# one prediction. Multiple missed detections are valid.
if unique_hypotheses + number_null_hypotheses == number_hypotheses:
return True
else:
return False
[docs]
@classmethod
def enumerate_joint_hypotheses(cls, hypotheses):
"""Enumerate the possible joint hypotheses.
Create a list of all possible joint hypotheses from the individual
hypotheses and determine whether each is valid.
Parameters
----------
hypotheses : dict of :class:`~.Track`: :class:`~.Hypothesis`
A list of all hypotheses linking predictions to detections,
including missed detections
Returns
-------
joint_hypotheses : list of :class:`JointHypothesis`
A list of all valid joint hypotheses with a score on each
"""
# Create a list of dictionaries of valid track-hypothesis pairs
joint_hypotheses = [
JointHypothesis({
track: hypothesis
for track, hypothesis in zip(hypotheses, joint_hypothesis)})
for joint_hypothesis in itertools.product(*hypotheses.values())
if cls.isvalid(joint_hypothesis)]
return joint_hypotheses
[docs]
class GNNWith2DAssignment(DataAssociator):
"""Global Nearest Neighbour Associator
Associates detections to a predicted state using the
Global Nearest Neighbour method, utilising a 2D matrix of
distances and a "shortest path" assignment algorithm.
"""
hypothesiser: Hypothesiser = Property(
doc="Generate a set of hypotheses for each prediction-detection pair")
[docs]
def associate(self, tracks, detections, timestamp, **kwargs):
"""Associate a set of detections with predicted states.
Parameters
----------
tracks : set of :class:`Track`
Current tracked objects
detections : set of :class:`Detection`
Retrieved measurements
timestamp : datetime.datetime
Detection time to predict to
Returns
-------
dict
Key value pair of tracks with associated detection
"""
# Generate a set of hypotheses for each track on each detection
hypotheses = self.generate_hypotheses(tracks, detections, timestamp, **kwargs)
# Create dictionary for associations
associations = {}
# Extract detected tracks
detected_tracks = [track
for track, track_hypotheses in hypotheses.items()
if any(track_hypotheses)]
# Store associations for undetected/missed tracks
# NOTE: It is assumed that if a track is undetected/missed, then it
# will only have a single missed detection hypothesis
for track in hypotheses.keys() - set(detected_tracks):
if hypotheses[track]:
associations[track] = hypotheses[track][0]
# No need to perform data association if all tracks are missed
if not detected_tracks:
return associations
# Convert sets to indexable lists
detections = list(detections)
# Generate 2d array "matrix" of hypotheses mapping track to detection
hypothesis_matrix = np.empty(
(len(detected_tracks), len(detections) + len(detected_tracks)),
SingleHypothesis)
for i, track in enumerate(detected_tracks):
row = np.empty(
(hypothesis_matrix.shape[1]), SingleHypothesis)
for hypothesis in hypotheses[track]:
if not hypothesis:
row[len(detections) + i] = hypothesis
else:
row[detections.index(hypothesis.measurement)] = hypothesis
hypothesis_matrix[i] = row
# Determine type of hypothesis used, probability or distance
# Probability is maximise problem, distance is minimise problem
# Mixed hypotheses cannot be computed at this time
hypothesis_types = {
isinstance(hypothesis, ProbabilityHypothesis)
for row in hypothesis_matrix for hypothesis in row
if hypothesis is not None}
if len(hypothesis_types) > 1:
raise RuntimeError(
"2d assignment does not support mixed hypothesis types")
probability_flag = hypothesis_types.pop()
# Generate 2d array "matrix" of distances
# Use probabilities instead for probability based hypotheses
distance_matrix = np.empty(hypothesis_matrix.shape)
for x in range(hypothesis_matrix.shape[0]):
for y in range(hypothesis_matrix.shape[1]):
if hypothesis_matrix[x][y] is None:
distance_matrix[x][y] = -np.inf if probability_flag \
else np.inf
else:
if probability_flag:
distance_matrix[x][y] = \
hypothesis_matrix[x][y].probability
else:
distance_matrix[x][y] = \
hypothesis_matrix[x][y].distance
# Use "shortest path" assignment algorithm on distance matrix
# to assign tracks to nearest detection
# Maximise flag = true for probability instance
# (converts minimisation problem to maximisation problem)
try:
row4col, col4row = linear_sum_assignment(distance_matrix, probability_flag)
except ValueError:
raise RuntimeError("Assignment was not feasible")
# Generate dict of key/value pairs
for j, track in enumerate(detected_tracks):
associations[track] = hypothesis_matrix[j][col4row[j]]
return associations
```