{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Track Stitching Example\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\nTrack Stitching considers a set of broken fragments of track (which we call tracklets), and aims\nto identify which fragments should be stitched (joined) together to form one track. This is done\nby considering the state of a tracked object and predicting its state at a future (or past) time.\nThis example generates a set of `tracklets`, before applying track stitching. The figure below\nvisualizes the aim of track stitching: taking a set of tracklets (left, black) and producing a set\nof tracks (right, blue/red).\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitching Method\nConsider the following scenario: We have a bunch of sections of track that are all disconnected\nfrom each other. We aim to stitch the track sections together into full tracks. We can use the\nknown states of tracklets at known times to predict where the tracked object would be at a\ndifferent time. We can use this information to associate tracklets with each other. Methods of\nassociating tracklets are explained below.\n\n### Predicting forward\nFor a given track section, we consider the state at the end-point of the track, say state\n$x$ at the time that the observation was made, call this time $k$. We use the state of\nthe object to predict the state at time $k + \\delta k$. If the state at the start point of\nanother track section falls within an acceptable range of this prediction, we may associate the\ntracks and stitch them together. This method is used in the function `forward_predict`.\n\n### Predicting backward\nSimilarly to predicting forward, we can consider the state at the start point of a track section,\ncall this time $k$, and predict what the state would have been at time $k - \\delta k$.\nWe can then associate and stitch tracks together as before. This method is used in the function\n`backward_predict`.\n\n### Using both predictions\nWe can use both methods at the same time to calculate the probability that two track sections are\npart of the same track. The track stitcher in this example uses the `KalmanPredictor` to make\npredictions about which tracklets should be stitched into the same track.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import Modules\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\nimport numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scenario Generation\nSet Variables for Scenario Generation\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nThe code below contains parameters used to generate input truth paths.\n\nThe `number_of_targets` is the total number of truth paths generated in the initial simulation.\n\nThe starting location of each truth path is defined in the region (-`range_value`, `range_value`)\nin all dimensions.\n\nEach truth object is split into a number of segments chosen randomly from the range\n(1, `max_segments`).\n\nYou can define the minimum and maximum length that segments can be, by setting\n`min_segment_length` and `max_segment_length`, respectively.\n\nSimilarly, the length of disjoint sections can be bounded by `min_disjoint_length` and\n`max_disjoint_length`.\n\nThe start time of each truth path is bounded between $t$ = 0 and $t$ =\n`max_track_start`.\n\nThe simulation will run for any number of spacial dimensions, given by `n_spacial_dimensions`.\n\nFinally, the transition model can be set by setting `TM` to either \"CV\" or \"KTR\" as indicated in\nthe comments in the code below.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"start_time = datetime.now().replace(second=0, microsecond=0)\nnp.random.seed(100)\n\nnumber_of_targets = 10\nrange_value = 10000\nmax_segments = 10\nmax_segment_length = 125\nmin_segment_length = 60\nmax_disjoint_length = 250\nmin_disjoint_length = 125\nmax_track_start = 125\nn_spacial_dimensions = 3\nmeasurement_noise = 100\n\n# Set transition model:\n# ConstantVelocity = CV\n# KnownTurnRate = KTR\n\nTM = \"CV\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transition and Measurement Models\nThe code below sets transition and measurement models. It also checks that sets of track data are\nempty before the scenario is generated.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity, KnownTurnRate\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\n# Check all sets are empty\ntruths = set()\ntruthlets = set()\ntracklets = set()\nall_tracks = set()\n\n# Set transition model\nif TM == \"CV\":\n transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(1)] *\n n_spacial_dimensions, seed=12)\nelif TM == \"KTR\":\n transition_model = KnownTurnRate(turn_rate=np.radians(0.5), turn_noise_diff_coeffs=(0.1, 0.1))\n if n_spacial_dimensions != 2:\n print(\"KnownTurnRate model only works for 2 dimensions. Changing from {} \"\n \"dimensions to 2D.\".format(n_spacial_dimensions))\n n_spacial_dimensions = 2\nelse:\n raise TypeError(\"Must assign 'CV' or 'KTR' to TM\")\n\n# Variable calculations for measurement model\nmeasurement_cov_array = np.zeros((n_spacial_dimensions, n_spacial_dimensions), int)\nnp.fill_diagonal(measurement_cov_array, measurement_noise)\n\n# Set measurement model\nmeasurement_model = LinearGaussian(ndim_state=2 * n_spacial_dimensions,\n mapping=list(range(0, 2 * n_spacial_dimensions, 2)),\n noise_covar=measurement_cov_array)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truths and truthlets\nHere we generate a set of ground truths. We then break the truths into alternating sections of\ntruthlets (sections of 'known' state data) and disjoint sections (sections of no data). Note that\nno 'truth' data is used in track stitching - in this tutorial it is only used for generating\ntracklets and for evaluation of track stitching results.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import OrnsteinUhlenbeck\nfrom stonesoup.predictor.kalman import KalmanPredictor\nfrom stonesoup.updater.kalman import KalmanUpdater\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nfrom stonesoup.dataassociator.neighbour import GNNWith2DAssignment\nfrom stonesoup.deleter.error import CovarianceBasedDeleter\nfrom stonesoup.deleter.multi import CompositeDeleter\nfrom stonesoup.deleter.time import UpdateTimeStepsDeleter\nfrom stonesoup.initiator.simple import SimpleMeasurementInitiator\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.types.state import GaussianState\n\n# Parameters for tracker\npredictor = KalmanPredictor(transition_model)\nupdater = KalmanUpdater(measurement_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=30)\ndata_associator = GNNWith2DAssignment(hypothesiser)\ndeleter = CompositeDeleter([UpdateTimeStepsDeleter(50), CovarianceBasedDeleter(5000)])\ninitiator = SimpleMeasurementInitiator(\n prior_state=GaussianState(np.zeros((2 * n_spacial_dimensions, 1), int),\n np.diag([1, 0] * n_spacial_dimensions)),\n measurement_model=measurement_model)\nstate_vector = [np.random.uniform(-range_value, range_value, 1),\n np.random.uniform(-2, 2, 1)] * n_spacial_dimensions\n\n# Calculate start and end points for truthlets given the starting conditions\nfor i in range(number_of_targets):\n # Sets number of segments from range of random numbers\n number_of_segments = int(np.random.choice(range(1, max_segments), 1))\n\n # Set length of first truthlet segment\n truthlet0_length = np.random.choice(range(max_track_start), 1)\n\n # Set lengths of each of the truthlet segments\n truthlet_lengths = np.random.choice(range(min_segment_length, max_segment_length),\n number_of_segments)\n\n # Set lengths of each disjoint section\n disjoint_lengths = np.random.choice(range(min_disjoint_length, max_disjoint_length),\n number_of_segments)\n\n # Sum pairs of truthlets and disjoints, and set the start-point of the truth path\n segment_pair_lengths = np.insert(truthlet_lengths + disjoint_lengths, 0, truthlet0_length,\n axis=0)\n\n # Cumulative sum of segments, giving the start point of each truth segment\n truthlet_startpoints = np.cumsum(segment_pair_lengths)\n\n # Sum truth segments length to start point, giving end point for each segment\n truthlet_endpoints = truthlet_startpoints + np.append(truthlet_lengths, 0)\n\n # Set start and end points for each segment\n starts = truthlet_startpoints[:number_of_segments]\n stops = truthlet_endpoints[:number_of_segments]\n truth = GroundTruthPath([GroundTruthState(state_vector, timestamp=start_time)],\n id=i)\n for k in range(1, np.max(stops)):\n truth.append(GroundTruthState(\n transition_model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=truth[k - 1].timestamp + timedelta(seconds=1)))\n for j in range(number_of_segments):\n truthlet = GroundTruthPath(truth[starts[j]:stops[j]], id=str(\"G::\" + str(truth.id) +\n \"::S::\" + str(j) + \"::\"))\n truthlets.add(truthlet)\n truths.add(truth)\n\nprint(number_of_targets, \" targets required.\")\nprint(len(truths), \" truths have been generated.\")\nprint(len(truthlets), \" truthlets have been generated.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate a tracklet from each truthlet\nWe introduce measurement noise (as set in variables section) and generate a set of tracklets from\nthe set of truthlets.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.tracker.simple import MultiTargetTracker\nfrom stonesoup.types.detection import TrueDetection\n\n# Generate tracklets from truthlets calculated above\nfor n, truthlet in enumerate(truthlets):\n measurementlet = []\n for state in truthlet:\n m = measurement_model.function(state, noise=True)\n m0 = TrueDetection(m,\n timestamp=state.timestamp,\n measurement_model=measurement_model,\n groundtruth_path=truthlet)\n measurementlet.append((state.timestamp, {m0}))\n tracklet = MultiTargetTracker(initiator, deleter, measurementlet, data_associator, updater)\n for _, t in tracklet:\n all_tracks |= t\n\nprint(len(all_tracks), \" tracklets have been produced.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Plot the set of tracklets\nThe following plots present the tracks which have been generated, as well as, for reference, the\nground truths used to generate them. A 2D graph is plotted for each 2D plane in the N-D space.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter, Dimension\n\n# Plot graph for each 2D face in n-dimensional space\ndimensions_list = list(range(0, 2 * n_spacial_dimensions, 2))\ndim_pairs = [(a, b) for idx, a in enumerate(dimensions_list) for b in dimensions_list[idx + 1:]]\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(all_tracks, list(pair))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Plot 3D graph if working in 3-dimensional space\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(all_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitcher Class\nThe cell below contains the track stitcher class. The functions `forward_predict` and\n`backward_predict` perform the forward and backward predictions respectively (as noted above). If\nusing fowards and backwards stitching, predictions from both methods are merged together.\nThey calculate which pairs of tracks could possibly be stitched together. The function `stitch`\nuses `forward_predict` and `backward_predict` to pair and 'stitch' track sections together.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.stitcher import TrackStitcher"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Applying the Track Stitcher\nNow that we have a set of tracklets, we can apply the Track Stitching method to stitch tracklets\ntogether into tracks. The code in the following cell applies this process using the class\n`TrackStitcher` and plots the stitched tracks.\n`TrackStitcher` has a property search_window that enables you to reduce compute time by filtering\nout track segments that are not within a determined time window. When forward stitching, the\nassociator will consider any track that has a start point that falls within the time window\n$(t, t + search_window)$. When backward stitching, the associator will consider tracks that\nhave an endpoint within the time window $(t - search_window, t)$.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model = CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.001, 2e-2)] *\n n_spacial_dimensions, seed=12)\n\npredictor = KalmanPredictor(transition_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=300)\nstitcher = TrackStitcher(forward_hypothesiser=hypothesiser, search_window=timedelta(seconds=500))\n\nstitched_tracks, _ = stitcher.stitch(all_tracks, start_time)\n\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(stitched_tracks, list(pair))\n\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(stitched_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Applying Metrics\nNow that we have stitched the tracklets into tracks, we can compare the tracks to the ground\ntruths that were used to generate the tracklets. This can be done by using metrics: find below a\nrange of SIAP (Single Integrated Air Picture) metrics as well as a custom metric specialized for\ntrack stitching.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### % of tracklets stitched to the correct previous tracklet\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def StitcherCorrectness(stitchedtracks):\n stitchedtracks = list(stitchedtracks)\n total, count = 0, 0\n for track in stitchedtracks:\n for j, state in enumerate(track):\n if j == len(track) - 1:\n continue\n id1 = [int(s) for s in state.hypothesis.measurement.groundtruth_path.id.split('::')\n if s.isdigit()]\n id2 = [int(s) for s in\n track[j + 1].hypothesis.measurement.groundtruth_path.id.split('::') if\n s.isdigit()]\n if id1 != id2:\n total += 1\n if id1[0] == id2[0] and id1[1] == (id2[1] - 1):\n count += 1\n return count / total * 100\n\n\nprint(\"Tracklets stitched correctly: \", StitcherCorrectness(stitched_tracks), \"%\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SIAP Metrics\nThe following cell calculates and records a range of SIAP (Single Integrated Air Picture) metrics\nto assess the accuracy of the stitcher. The value of math:`association_threshold` should be\nadjusted to represent the acceptable distance for association for the scenario that is being\nconsidered. For example, associating with a threshold of 50 metres may be acceptable if tracking a\nlarge ship, but not so useful for tracking biological cell movement.\n\nSIAP Ambiguity: Important as a value not equal to 1 suggests that the stitcher is not stitching\nwhole tracks together, or stitching multiple tracks into one.\n\nSIAP Completeness: Not a valuable metric for track stitching evaluation as we are only tracking\nfractions of the true objects - metric value is scaled by the ratio of truthlets to\ndisjoint sections.\n\nSIAP Rate of Track Number Change: Important metric for assessing track stitching. Any value above\nzero is showing that tracklets are being incorrectly stitched to tracklets from different truth\npaths.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\nfrom stonesoup.measures import Euclidean\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.dataassociator.tracktotrack import TrackToTruth\nfrom stonesoup.metricgenerator.manager import SimpleManager\nfrom stonesoup.metricgenerator.metrictables import SIAPTableGenerator\n\nsiap_generator = SIAPMetrics(position_measure=Euclidean((0, 2)),\n velocity_measure=Euclidean((1, 3)))\n\nassociator = TrackToTruth(association_threshold=30)\n\nmetric_manager = SimpleManager([siap_generator],\n associator=associator)\nmetric_manager.add_data(truths, set(all_tracks))\n\nplt.rcParams[\"figure.figsize\"] = (10, 8)\nmetrics = metric_manager.generate_metrics()\n\nsiap_averages = {metrics.get(metric) for metric in metrics\n if metric.startswith(\"SIAP\") and not metric.endswith(\" at times\")}\nsiap_time_based = {metrics.get(metric) for metric in metrics if metric.endswith(' at times')}\n\n_ = SIAPTableGenerator(siap_averages).compute_metric()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}