Source code for stonesoup.dataassociator.tracktotrack

# -*- coding: utf-8 -*-
from operator import attrgetter

from .base import TrackToTrackAssociator
from ..base import Property
from ..measures import Measure, Euclidean
from ..types.association import AssociationSet, TimeRangeAssociation
from ..types.time import TimeRange


[docs]class TrackToTrack(TrackToTrackAssociator): """Track to track associator Compares two sets of :class:`~.tracks`, each formed of a sequence of :class:`~.State` objects and returns an :class:`~.Association` object for each time at which a the two :class:`~.State` within the :class:`~.tracks` are assessed to be associated. Associations are triggered by track states being within a threshold distance for a given number of timestamps. Associations are terminated when either the two :class:`~.tracks` end or the two :class:`~.State` are separated by a distance greater than the threshold at the next time step. Note ---- Association is not prioritised based on historic associations or distance. If, at a specific time step, the :class:`~.State` of one of the :class:`~.tracks` is assessed as close to more than one track then an :class:`~.Association` object will be return for all possible association combinations """ association_threshold: float = Property( default=10, doc="Threshold distance measure which states must be within for an " "association to be recorded.Default is 10") consec_pairs_confirm: int = Property( default=3, doc="Number of consecutive time instances which track pairs are " "required to be within a specified threshold in order for an " "association to be formed. Default is 3") consec_misses_end: int = Property( default=2, doc="Number of consecutive time instances which track pairs are " "required to exceed a specified threshold in order for an " "association to be ended. Default is 2") measure: Measure = Property( default=Euclidean(), doc="Distance measure to use. Default :class:`~.measures.Euclidean()`")
[docs] def associate_tracks(self, tracks_set_1, tracks_set_2): """Associate two sets of tracks together. Parameters ---------- tracks_set_1 : list of :class:`~.Track` objects Tracks to associate to track set 2 tracks_set_2 : list of :class:`~.Track` objects Tracks to associate to track set 1 Returns ------- AssociationSet Contains a set of :class:`~.Association` objects """ associations = set() for track2 in tracks_set_2: truth_timestamps = [state.timestamp for state in track2.states] for track1 in tracks_set_1: track1_states = sorted( (state for state in track1 if state.timestamp in truth_timestamps), key=attrgetter('timestamp')) track_timestamps = [state.timestamp for state in track1_states] track2_states = sorted( (state for state in track2 if state.timestamp in track_timestamps), key=attrgetter('timestamp')) if not (track1_states and track2_states): continue # At this point we should have two lists of states from # track1 and 2 only at the times that they both existed n_succesful = 0 n_unsuccesful = 0 start_timestamp = None end_timestamp = None # Loop through every detection pair and form associations for state1, state2 in zip(track1_states, track2_states): distance = self.measure(state1, state2) if distance <= self.association_threshold: n_succesful += 1 n_unsuccesful = 0 if n_succesful == 1: first_timestamp = state1.timestamp if n_succesful == self.consec_pairs_confirm: start_timestamp = first_timestamp else: n_succesful = 0 n_unsuccesful += 1 if n_unsuccesful == 1: end_timestamp = state1.timestamp if n_unsuccesful >= self.consec_misses_end and \ start_timestamp: associations.add(TimeRangeAssociation( (track1, track2), TimeRange(start_timestamp, end_timestamp))) start_timestamp = None # close any open associations if start_timestamp: end_timestamp = track1_states[-1].timestamp associations.add(TimeRangeAssociation( (track1, track2), TimeRange(start_timestamp, end_timestamp))) return AssociationSet(associations)
[docs]class TrackToTruth(TrackToTrackAssociator): """Track to truth associator Compares two sets of :class:`~.tracks`, each formed of a sequence of :class:`~.State` objects and returns an :class:`~.Association` object for each time at which a the two :class:`~.State` within the :class:`~.tracks` are assessed to be associated. Tracks are considered to be associated with the Truth if the true :class:`~.State` is the closest to the track and within the specified distance for a specified number of time steps. Associations between Truth and Track if the Truth is no longer the 'closest' to the track or the distance exceeds the specified threshold for a specified number of consecutive time steps. Associates will be ended by consec_misses_end before any new associations are considered even if consec_pairs_confirm < consec_misses_end Note ---- Tracks can only be associated with one Truth (one-2-one relationship) at a given time step however a Truth track can be associated with multiple Tracks (one-2-many relationship). """ association_threshold: float = Property( default=10, doc="Threshold distance measure which states must be within for an " "association to be recorded.Default is 10") consec_pairs_confirm: int = Property( default=3, doc="Number of consecutive time instances which track-truth pairs are " "required to be within a specified threshold in order for an " "association to be formed. Default is 3") consec_misses_end: int = Property( default=2, doc="Number of consecutive time instances which track-truth pairs are " "required to exceed a specified threshold in order for an " "association to be ended. Default is 2") measure: Measure = Property( default=Euclidean(), doc="Distance measure to use. Default :class:`~.measures.Euclidean()`")
[docs] def associate_tracks(self, tracks_set, truth_set): """Associate Tracks Method compares to sets of :class:`~.Track` objects and will determine associations between the two sets. Parameters ---------- tracks_set : list of :class:`~.Track` objects Tracks to associate to truth truth_set : list of :class:`~.Track` objects Truth to associate to tracks Returns ------- AssociationSet Contains a set of :class:`~.Association` objects """ associations = set() for track in tracks_set: current_truth = None potential_truth = None n_potential_successes = 0 n_failures = 0 potential_start_timestep = None start_timestamp = None end_timestamp = None for track_state in track: min_dist = None min_truth = None for truth in truth_set: try: truth_state = truth[track_state.timestamp] except IndexError: continue distance = self.measure(track_state, truth_state) if min_dist and distance < min_dist: min_dist = distance min_truth = truth elif not min_dist \ and distance < self.association_threshold: min_dist = distance min_truth = truth # If there is not a truth track currently # considered to be associated to the track if not current_truth: # If no truth associated then there's nothing to consider if min_truth is None: n_potential_successes = 0 potential_truth = None potential_start_timestep = None # If the latest closest truth is not being assessed # as the likely truth make it so elif potential_truth is not min_truth: potential_truth = min_truth n_potential_successes = 1 potential_start_timestep = track_state.timestamp # Otherwise increse the number of times # this truth appears in a row else: n_potential_successes += 1 # If the threshold of continuous # similar matches has been made if n_potential_successes >= self.consec_pairs_confirm: current_truth = min_truth start_timestamp = potential_start_timestep end_timestamp = track_state.timestamp potential_start_timestep = None potential_truth = None n_potential_successes = 0 # Otherwise if there is a track currently # considered as the association else: # If the closest track this time is the same # update the end time (time of last association) if min_truth == current_truth: n_failures = 0 end_timestamp = track_state.timestamp # Otherwise record the failed match and how # many times it's been the same different # potential track in a row else: n_failures += 1 if min_truth and min_truth is potential_truth: n_potential_successes += 1 else: potential_truth = min_truth potential_start_timestep = track_state.timestamp n_potential_successes = 1 # If there have been enough failed matches # in a row end the association and record if n_failures >= self.consec_misses_end: associations.add(TimeRangeAssociation( (track, current_truth), TimeRange(start_timestamp, end_timestamp))) # If the current potential association # is strong enough to confirm then do so if n_potential_successes >= self.consec_pairs_confirm: current_truth = potential_truth start_timestamp = potential_start_timestep end_timestamp = track_state.timestamp else: # Otherwise wait for a new # association to be good enough current_truth = None start_timestamp = None end_timestamp = None # Close any open associations when the track ends if current_truth: associations.add(TimeRangeAssociation( (track, current_truth), TimeRange(start_timestamp, end_timestamp))) return AssociationSet(associations)