Source code for stonesoup.dataassociator.tracktotrack

from operator import attrgetter

from ordered_set import OrderedSet

from ..base import Property
from ..measures import Euclidean, EuclideanWeighted, Measure
from ..measures.base import TrackMeasure
from ..types.association import Association, AssociationSet, TimeRangeAssociation
from ..types.groundtruth import GroundTruthPath
from ..types.time import TimeRange
from ..types.track import Track
from ._assignment import multidimensional_deconfliction
from .base import TwoTrackToTrackAssociator
from .general import OneToOneAssociator


[docs] class TrackToTrackCounting(TwoTrackToTrackAssociator): """Track to track associator based on the Counting Technique Compares two sets of :class:`~.tracks`, each formed of a sequence of :class:`~.State` objects and returns an :class:`~.Association` object for each time at which the two :class:`~.State` within the :class:`~.tracks` are assessed to be associated. Uses an algorithm called the Counting Technique [1]_. Associations are triggered by track states being within a threshold distance for a given number of timestamps. Associations are terminated when either the two :class:`~.tracks` end or the two :class:`~.State` are separated by a distance greater than the threshold at the next time step. References ---------- .. [1] J. Å. Sagild, A. Gullikstad Hem and E. F. Brekke, "Counting Technique versus Single-Time Test for Track-to-Track Association," 2021 IEEE 24th International Conference on Information Fusion (FUSION), 2021, pp. 1-7 Note ---- Association is not prioritised based on historic associations or distance. If, at a specific time step, the :class:`~.State` of one of the :class:`~.tracks` is assessed as close to more than one track then an :class:`~.Association` object will be return for all possible association combinations. """ association_threshold: float = Property( doc="Threshold distance measure which states must be within for an " "association to be recorded") consec_pairs_confirm: int = Property( default=3, doc="Number of consecutive time instances which track pairs are " "required to be within a specified threshold in order for an " "association to be formed. Default is 3") consec_misses_end: int = Property( default=2, doc="Number of consecutive time instances which track pairs are " "required to exceed a specified threshold in order for an " "association to be ended. Default is 2") measure: Measure = Property( default=None, doc="Distance measure to use. Must use :class:`~.measures.EuclideanWeighted()` if " "`use_positional_only` set to True. Default is " ":class:`~.measures.EuclideanWeighted()` using :attr:`use_positional_only` " "and :attr:`pos_map`. Note if neither are provided this is equivalent to a " "standard Euclidean") pos_map: list = Property( default=None, doc="List of items specifying the mapping of the position components " "of the state space for :attr:`tracks_set_1`. " "Defaults to whole :class:`~.array.StateVector()`, but must be provided whenever " ":attr:`use_positional_only` is set to True") use_positional_only: bool = Property( default=True, doc="If `True`, the differences in velocity/acceleration values for each state are " "ignored in the calculation for the association threshold. Default is `True`" ) position_weighting: float = Property( default=0.6, doc="If :attr:`use_positional_only` is set to False, this decides how much to weight " "position components compared to others (such as velocity). " "Default is 0.6" ) one_to_one: bool = Property( default=False, doc="If True, it is ensured no two associations ever contain the same track " "at the same time" )
[docs] def associate_tracks(self, tracks_set_1: set[Track], tracks_set_2: set[Track]): """Associate two sets of tracks together. Parameters ---------- tracks_set_1 : set of :class:`~.Track` objects Tracks to associate to track set 2 tracks_set_2 : set of :class:`~.Track` objects Tracks to associate to track set 1 Returns ------- AssociationSet Contains a set of :class:`~.Association` objects """ if self.position_weighting > 1 or self.position_weighting < 0: raise ValueError("Position weighting must be between 0 and 1") if not self.pos_map and self.use_positional_only: raise ValueError("Must provide mapping of position components to pos_map") if not self.measure: state1 = next(iter(tracks_set_1))[0] total = len(state1.state_vector) if not self.pos_map: self.pos_map = [i for i in range(total)] pos_map_len = len(self.pos_map) if not self.use_positional_only and total - pos_map_len > 0: v_weight = (1 - self.position_weighting) / (total - pos_map_len) p_weight = self.position_weighting / pos_map_len else: p_weight = 1 / pos_map_len v_weight = 0 weights = [p_weight if i in self.pos_map else v_weight for i in range(total)] self.measure = EuclideanWeighted(weighting=weights) associations = set() for track2 in tracks_set_2: truth_timestamps = [state.timestamp for state in track2.states] for track1 in tracks_set_1: track1_states = sorted( (state for state in track1 if state.timestamp in truth_timestamps), key=attrgetter('timestamp')) track_timestamps = [state.timestamp for state in track1_states] track2_states = sorted( (state for state in track2 if state.timestamp in track_timestamps), key=attrgetter('timestamp')) if not (track1_states and track2_states): continue # At this point we should have two lists of states from # track1 and 2 only at the times that they both existed n_successful = 0 n_unsuccessful = 0 start_timestamp = None end_timestamp = None # Loop through every detection pair and form associations for state1, state2 in zip(track1_states, track2_states): distance = self.measure(state1, state2) if distance <= self.association_threshold: n_successful += 1 n_unsuccessful = 0 if n_successful == 1: first_timestamp = state1.timestamp if n_successful == self.consec_pairs_confirm: start_timestamp = first_timestamp else: n_successful = 0 n_unsuccessful += 1 if n_unsuccessful == 1: end_timestamp = state1.timestamp if n_unsuccessful >= self.consec_misses_end and \ start_timestamp: associations.add(TimeRangeAssociation( OrderedSet((track1, track2)), TimeRange(start_timestamp, end_timestamp))) start_timestamp = None # close any open associations if start_timestamp: end_timestamp = track1_states[-1].timestamp associations.add(TimeRangeAssociation( OrderedSet((track1, track2)), TimeRange(start_timestamp, end_timestamp))) if self.one_to_one: return multidimensional_deconfliction(AssociationSet(associations)) else: return AssociationSet(associations)
[docs] class TrackToTruth(TwoTrackToTrackAssociator): """Track to truth associator Compares two sets of :class:`~.Track`, each formed of a sequence of :class:`~.State` objects and returns an :class:`~.Association` object for each time at which two :class:`~.State` objects within the :class:`~.Track` are assessed to be associated. Tracks are considered to be associated with the Truth if the true :class:`~.State` is the closest to the track and within the specified distance for a specified number of time steps. Associations between Truth and Track if the Truth is no longer the 'closest' to the track or the distance exceeds the specified threshold for a specified number of consecutive time steps. Associates will be ended by consec_misses_end before any new associations are considered even if consec_pairs_confirm < consec_misses_end Note ---- Tracks can only be associated with one Truth (one-2-one relationship) at a given time step however a Truth track can be associated with multiple Tracks (one-2-many relationship). """ association_threshold: float = Property( doc="Threshold distance measure which states must be within for an " "association to be recorded") consec_pairs_confirm: int = Property( default=3, doc="Number of consecutive time instances which track-truth pairs are " "required to be within a specified threshold in order for an " "association to be formed. Default is 3") consec_misses_end: int = Property( default=2, doc="Number of consecutive time instances which track-truth pairs are " "required to exceed a specified threshold in order for an " "association to be ended. Default is 2") measure: Measure = Property( default=Euclidean(), doc="Distance measure to use. Default :class:`~.measures.Euclidean()`")
[docs] def associate_tracks(self, tracks_set: set[Track], truth_set: set[GroundTruthPath]): """Associate Tracks Method compares to sets of :class:`~.Track` objects and will determine associations between the two sets. Parameters ---------- tracks_set : set of :class:`~.Track` objects Tracks to associate to truth truth_set : set of :class:`~.GroundTruthPath` objects Truth to associate to tracks Returns ------- AssociationSet Contains a set of :class:`~.Association` objects """ associations = set() # Remove tracks and truths with zero length tracks_set = {track for track in tracks_set if len(track) > 0} truth_set = {truth for truth in truth_set if len(truth) > 0} for track in tracks_set: current_truth = None potential_truth = None n_potential_successes = 0 n_failures = 0 potential_start_timestep = None start_timestamp = None end_timestamp = None truth_state_iters = {truth: GroundTruthPath.last_timestamp_generator(truth) for truth in truth_set} truth_states = {truth: next(truth_state_iter) for truth, truth_state_iter in truth_state_iters.items()} for track_state in Track.last_timestamp_generator(track): min_dist = self.association_threshold min_truth = None for truth in truth_set: if truth[0].timestamp > track_state.timestamp \ or truth[-1].timestamp < track_state.timestamp: continue while truth_states[truth].timestamp < track_state.timestamp: truth_states[truth] = next(truth_state_iters[truth]) truth_state = truth_states[truth] if truth_state.timestamp != track_state.timestamp: continue distance = self.measure(track_state, truth_state) if distance < min_dist: min_dist = distance min_truth = truth # If there is not a truth track currently # considered to be associated to the track if not current_truth: # If no truth associated then there's nothing to consider if min_truth is None: n_potential_successes = 0 potential_truth = None potential_start_timestep = None # If the latest closest truth is not being assessed # as the likely truth make it so elif potential_truth is not min_truth: potential_truth = min_truth n_potential_successes = 1 potential_start_timestep = track_state.timestamp # Otherwise increase the number of times # this truth appears in a row else: n_potential_successes += 1 # If the threshold of continuous # similar matches has been made if n_potential_successes >= self.consec_pairs_confirm: current_truth = min_truth start_timestamp = potential_start_timestep end_timestamp = track_state.timestamp potential_start_timestep = None potential_truth = None n_potential_successes = 0 # Otherwise if there is a track currently # considered as the association else: # If the closest track this time is the same # update the end time (time of last association) if min_truth == current_truth: n_failures = 0 end_timestamp = track_state.timestamp # Otherwise record the failed match and how # many times it's been the same different # potential track in a row else: n_failures += 1 if min_truth and min_truth is potential_truth: n_potential_successes += 1 else: potential_truth = min_truth potential_start_timestep = track_state.timestamp n_potential_successes = 1 # If there have been enough failed matches # in a row end the association and record if n_failures >= self.consec_misses_end: associations.add(TimeRangeAssociation( OrderedSet((track, current_truth)), TimeRange(start_timestamp, end_timestamp))) # If the current potential association # is strong enough to confirm then do so if n_potential_successes >= self.consec_pairs_confirm: current_truth = potential_truth start_timestamp = potential_start_timestep end_timestamp = track_state.timestamp else: # Otherwise wait for a new # association to be good enough current_truth = None start_timestamp = None end_timestamp = None # Close any open associations when the track ends if current_truth: associations.add(TimeRangeAssociation( OrderedSet((track, current_truth)), TimeRange(start_timestamp, end_timestamp))) return AssociationSet(associations)
[docs] class TrackIDbased(TwoTrackToTrackAssociator): """Track ID based associator Compares a set of :class:`~.Track` objects to a set of :class:`~.GroundTruth` objects, each formed of a sequence of :class:`~.State` objects and returns an :class:`~.Association` object for each time at which two :class:`~.State` objects within the :class:`~.Track` and :class:`~.GroundTruthPath` are assessed to be associated. Tracks are considered to be associated with the Ground Truth if the ID of the Track is the same as the ID of the Ground Truth. """
[docs] def associate_tracks(self, tracks_set, truths_set): """Associate two sets of tracks together. Parameters ---------- tracks_set : list of :class:`~.Track` objects Tracks to associate to ground truths set truths_set: list of :class:`~.GroundTruthPath` objects Ground truths to associate to tracks set Returns ------- AssociationSet Contains a set of :class:`~.Association` objects """ associations = set() for track in tracks_set: for truth in truths_set: if track.id == truth.id: try: associations.add( TimeRangeAssociation(OrderedSet((track, truth)), TimeRange(max(track[0].timestamp, truth[0].timestamp), min(track[-1].timestamp, truth[-1].timestamp)))) except (TypeError, ValueError): # A timestamp is None, or non-overlapping timestamps (start > end) associations.add(Association(OrderedSet((track, truth)))) return AssociationSet(associations)
[docs] class OneToOneTrackAssociator(TwoTrackToTrackAssociator, OneToOneAssociator): """ Uses the :class:`~.OneToOneAssociator` to associate tracks together """ measure: TrackMeasure = Property()
[docs] def associate_tracks(self, *tracks_sets: set[Track]) -> AssociationSet: if len(tracks_sets) != 2: # Should have two sets of tracks raise ValueError("There should be two sources of tracks to compare") tracks_a, tracks_b = tracks_sets associated_tracks, _, _ = self.associate(tracks_a, tracks_b) return associated_tracks