{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Track Stitching Example\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\nTrack Stitching considers a set of broken fragments of track (which we call tracklets), and aims\nto identify which fragments should be stitched (joined) together to form one track. This is done\nby considering the state of a tracked object and predicting its state at a future (or past) time.\nThis example generates a set of ``tracklets`` and applies track stitching to them. The figure\nbelow visualises the aim of track stitching: taking a set of tracklets (left, black) and\nproducing a set of tracks (right, blue/red).\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitching Method\nIn a scenario where there are many sections of tracks that are all disconnected from each other,\nwe aim to stitch the track sections together into full tracks. We can use the known states of\ntracklets at known times to predict where the tracked object would be at a different time.\nWe can use this information to associate tracklets with each other using the following methods:\n\n### Predicting forward\nFor a given track section, consider the state at the end-point of the track $x$ at\nthe time $k$ that the observation was made. We use the state of the object to predict its\nstate at time $k + \\delta k$. If the state at the start point of\nanother track section falls within an acceptable range of this prediction, we associate the\ntracks and stitch them together. This method is used in the function :func:`forward_predict`.\n\n### Predicting backward\nSimilarly to predicting forward, we can consider the state at the start point of a track section\nat time $k$ and backwards-predict the state to time $k - \\delta k$. We can then\nassociate and stitch tracks together as before. This method is used in the function\n:func:`backward_predict`.\n\n### Using both predictions\nWe can use both methods simultaneously to calculate the probability that two track sections are\npart of the same track. The track stitcher in this example uses the :class:`~.KalmanPredictor`\nto make predictions about which tracklets should be stitched into the same track.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import Modules\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\nimport numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scenario Generation\nSet Variables for Scenario Generation\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nThe code below contains parameters used to generate input truth paths.\n\n``number_of_targets`` is the total number of truth paths generated in the initial simulation.\n\nThe starting location of each truth path is defined in the region (-``range_value``,\n``range_value``) in all dimensions.\n\nEach truth object is split into a number of segments chosen randomly from the range\n(1, ``max_segments``).\n\nThe start time of each truth path is bounded between $t$ = 0 and $t$ =\n``max_track_start``.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"start_time = datetime.now().replace(second=0, microsecond=0)\nnp.random.seed(100)\n\nnumber_of_targets = 10\nrange_value = 10000\nmax_segments = 10\nmax_segment_length = 125\nmin_segment_length = 60\nmax_disjoint_length = 250\nmin_disjoint_length = 125\nmax_track_start = 125\nn_spacial_dimensions = 3\nmeasurement_noise = 100\n\n# Set transition model:\n# ConstantVelocity = CV\n# KnownTurnRate = KTR\n\nTM = \"CV\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transition and Measurement Models\nThe code below sets transition and measurement models. It also checks that sets of track data are\nempty before the scenario is generated.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity, KnownTurnRate\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\n# Check all sets are empty\ntruths = set()\ntruthlets = set()\ntracklets = set()\nall_tracks = set()\n\n# Set transition model\nif TM == \"CV\":\n transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(1)] *\n n_spacial_dimensions, seed=12)\nelif TM == \"KTR\":\n transition_model = KnownTurnRate(turn_rate=np.radians(0.5), turn_noise_diff_coeffs=(0.1, 0.1))\n if n_spacial_dimensions != 2:\n print(\"KnownTurnRate model only works for 2 dimensions. Changing from {} \"\n \"dimensions to 2D.\".format(n_spacial_dimensions))\n n_spacial_dimensions = 2\nelse:\n raise TypeError(\"Must assign 'CV' or 'KTR' to TM\")\n\n# Variable calculations for measurement model\nmeasurement_cov_array = np.zeros((n_spacial_dimensions, n_spacial_dimensions), int)\nnp.fill_diagonal(measurement_cov_array, measurement_noise)\n\n# Set measurement model\nmeasurement_model = LinearGaussian(ndim_state=2 * n_spacial_dimensions,\n mapping=list(range(0, 2 * n_spacial_dimensions, 2)),\n noise_covar=measurement_cov_array)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truths and truthlets\nHere we generate a set of ground truths. We then break the truths into alternating sections of\ntruthlets (sections of 'known' state data) and disjoint sections (sections of no data). Note that\nno 'truth' data is used in track stitching - in this tutorial it is only used for generating\ntracklets and for evaluation of track stitching results.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import OrnsteinUhlenbeck\nfrom stonesoup.predictor.kalman import KalmanPredictor\nfrom stonesoup.updater.kalman import KalmanUpdater\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nfrom stonesoup.dataassociator.neighbour import GNNWith2DAssignment\nfrom stonesoup.deleter.error import CovarianceBasedDeleter\nfrom stonesoup.deleter.multi import CompositeDeleter\nfrom stonesoup.deleter.time import UpdateTimeStepsDeleter\nfrom stonesoup.initiator.simple import SimpleMeasurementInitiator\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.types.state import GaussianState\n\n# Parameters for tracker\npredictor = KalmanPredictor(transition_model)\nupdater = KalmanUpdater(measurement_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=30)\ndata_associator = GNNWith2DAssignment(hypothesiser)\ndeleter = CompositeDeleter([UpdateTimeStepsDeleter(50), CovarianceBasedDeleter(5000)])\ninitiator = SimpleMeasurementInitiator(\n prior_state=GaussianState(np.zeros((2 * n_spacial_dimensions, 1), int),\n np.diag([1, 0] * n_spacial_dimensions)),\n measurement_model=measurement_model)\nstate_vector = [np.random.uniform(-range_value, range_value, 1),\n np.random.uniform(-2, 2, 1)] * n_spacial_dimensions\n\n# Calculate start and end points for truthlets given the starting conditions\nfor i in range(number_of_targets):\n # Sets number of segments from range of random numbers\n number_of_segments = int(np.random.choice(range(1, max_segments), 1))\n\n # Set length of first truthlet segment\n truthlet0_length = np.random.choice(range(max_track_start), 1)\n\n # Set lengths of each of the truthlet segments\n truthlet_lengths = np.random.choice(range(min_segment_length, max_segment_length),\n number_of_segments)\n\n # Set lengths of each disjoint section\n disjoint_lengths = np.random.choice(range(min_disjoint_length, max_disjoint_length),\n number_of_segments)\n\n # Sum pairs of truthlets and disjoints, and set the start-point of the truth path\n segment_pair_lengths = np.insert(truthlet_lengths + disjoint_lengths, 0, truthlet0_length,\n axis=0)\n\n # Cumulative sum of segments, giving the start point of each truth segment\n truthlet_startpoints = np.cumsum(segment_pair_lengths)\n\n # Sum truth segments length to start point, giving end point for each segment\n truthlet_endpoints = truthlet_startpoints + np.append(truthlet_lengths, 0)\n\n # Set start and end points for each segment\n starts = truthlet_startpoints[:number_of_segments]\n stops = truthlet_endpoints[:number_of_segments]\n truth = GroundTruthPath([GroundTruthState(state_vector, timestamp=start_time)],\n id=i)\n for k in range(1, np.max(stops)):\n truth.append(GroundTruthState(\n transition_model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=truth[k - 1].timestamp + timedelta(seconds=1)))\n for j in range(number_of_segments):\n truthlet = GroundTruthPath(truth[starts[j]:stops[j]], id=str(\"G::\" + str(truth.id) +\n \"::S::\" + str(j) + \"::\"))\n truthlets.add(truthlet)\n truths.add(truth)\n\nprint(number_of_targets, \" targets required.\")\nprint(len(truths), \" truths have been generated.\")\nprint(len(truthlets), \" truthlets have been generated.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate a tracklet from each truthlet\nWe introduce measurement noise (as set in variables section) and generate a set of tracklets from\nthe set of truthlets.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.tracker.simple import MultiTargetTracker\nfrom stonesoup.types.detection import TrueDetection\n\n# Generate tracklets from truthlets calculated above\nfor n, truthlet in enumerate(truthlets):\n measurementlet = []\n for state in truthlet:\n m = measurement_model.function(state, noise=True)\n m0 = TrueDetection(m,\n timestamp=state.timestamp,\n measurement_model=measurement_model,\n groundtruth_path=truthlet)\n measurementlet.append((state.timestamp, {m0}))\n tracklet = MultiTargetTracker(initiator, deleter, measurementlet, data_associator, updater)\n for _, t in tracklet:\n all_tracks |= t\n\nprint(len(all_tracks), \" tracklets have been produced.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Plot the set of tracklets\nThe following plots present the generated tracks and the underlying ground truths.\nA 2D graph is plotted for each 2D plane in the N-D space.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter, Dimension\n\n# Plot graph for each 2D face in n-dimensional space\ndimensions_list = list(range(0, 2 * n_spacial_dimensions, 2))\ndim_pairs = [(a, b) for idx, a in enumerate(dimensions_list) for b in dimensions_list[idx + 1:]]\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(all_tracks, list(pair))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Plot 3D graph if working in 3-dimensional space\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(all_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitcher Class\nThe cell below contains the track stitcher class. The functions :func:`forward_predict` and\n:func:`backward_predict` perform the forward and backward predictions respectively (as noted\nabove). If using forwards and backwards stitching, predictions from both methods are merged\ntogether. They calculate which pairs of tracks could possibly be stitched together. The function\n:func:`stitch` uses :func:`forward_predict` and :func:`backward_predict` to pair and 'stitch'\ntrack sections together.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.stitcher import TrackStitcher"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Applying the Track Stitcher\nNow that we have a set of tracklets, we can apply the Track Stitching method to stitch tracklets\ntogether into tracks. The code in the following cell applies this process using the\n:class:`~.TrackStitcher` and plots the stitched tracks. :class:`~.TrackStitcher` has a property\n'search_window' that reduces computation time by filtering out track segments that are outside a\ndefined time window. When forward stitching, the associator will consider any track that has a\nstart point that falls within the time window $(t, t + search\\_window)$. When backward\nstitching, the associator will consider tracks that have an endpoint within the time window\n$(t - search\\_window, t)$.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model = CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.001, 2e-2)] *\n n_spacial_dimensions, seed=12)\n\npredictor = KalmanPredictor(transition_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=300)\nstitcher = TrackStitcher(forward_hypothesiser=hypothesiser, search_window=timedelta(seconds=500))\n\nstitched_tracks, _ = stitcher.stitch(all_tracks, start_time)\n\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(stitched_tracks, list(pair))\n\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(stitched_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Applying Metrics\nNow tracklets are stitched into tracks, we can compare the tracks to the ground truths. This can\nbe done by using SIAP metrics as well as a custom metric specialized for track stitching.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### % of tracklets stitched to the correct previous tracklet\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def stitcher_correctness(stitchedtracks):\n\n stitchedtracks = list(stitchedtracks)\n total, count = 0, 0\n\n for track in stitchedtracks: # loop through all stitched tracks\n\n for j, state in enumerate(track): # for every state of a stitched track\n\n if j == len(track) - 1:\n continue\n id1 = [int(s) for s in state.hypothesis.measurement.groundtruth_path.id.split('::')\n if s.isdigit()]\n id2 = [int(s) for s in\n track[j + 1].hypothesis.measurement.groundtruth_path.id.split('::') if\n s.isdigit()]\n if id1 != id2:\n total += 1\n if id1[0] == id2[0] and id1[1] == (id2[1] - 1):\n count += 1\n return count / total * 100\n\n\nprint(\"Tracklets stitched correctly: \", stitcher_correctness(stitched_tracks), \"%\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SIAP Metrics\nThe following cell calculates and records a range of SIAP (Single Integrated Air Picture) metrics\nto assess the accuracy of the stitcher. The value of math:`association_threshold` should be\nadjusted to represent the acceptable distance for association for the scenario that is being\nconsidered. For example, associating with a threshold of 50 metres may be acceptable if tracking a\nlarge ship, but not so useful for tracking biological cell movement.\n\nSIAP Ambiguity: Important as a value not equal to 1 suggests that the stitcher is not stitching\nwhole tracks together, or stitching multiple tracks into one.\n\nSIAP Completeness: Not a valuable metric for track stitching evaluation as we are only tracking\nfractions of the true objects - metric value is scaled by the ratio of truthlets to\ndisjoint sections.\n\nSIAP Rate of Track Number Change: Important metric for assessing track stitching. Any value above\nzero is showing that tracklets are being incorrectly stitched to tracklets from different truth\npaths.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.measures import Euclidean\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.dataassociator.tracktotrack import TrackToTruth\nfrom stonesoup.metricgenerator.manager import MultiManager\nfrom stonesoup.metricgenerator.metrictables import SIAPTableGenerator\n\nsiap_generator = SIAPMetrics(position_measure=Euclidean((0, 2)),\n velocity_measure=Euclidean((1, 3)),\n generator_name='SIAPs',\n tracks_key='tracks',\n truths_key='truths'\n )\n\nassociator = TrackToTruth(association_threshold=30)\n\n# create metric manager and add tracks and truths data to it\nmetric_manager = MultiManager([siap_generator],\n associator=associator)\nmetric_manager.add_data({'truths': truths, 'tracks': set(all_tracks)})\n\n# generate metrics and extract SIAP averages to display in SIAP table\nmetrics = metric_manager.generate_metrics()\nsiap_metrics = metrics['SIAPs']\nsiap_averages = {siap_metrics.get(metric) for metric in siap_metrics\n if metric.startswith(\"SIAP\") and not metric.endswith(\" at times\")}\n\n_ = SIAPTableGenerator(siap_averages).compute_metric()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}