{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n# 6 - Data association - multi-target tracking tutorial\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tracking multiple targets through clutter\n\nAs we've seen, more often than not, the difficult part of state estimation concerns the ambiguous\nassociation of predicted states with measurements. This happens whenever there is more than one\ntarget under consideration, there are false alarms or clutter, targets can appear and disappear.\nThat is to say it happens everywhere.\n\nIn this tutorial we introduce **global nearest neighbour** data association, which\nattempts to find a globally-consistent collection of hypotheses such that some overall score of\ncorrect association is maximised.\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Background\nMake the assumption that each target generates, at most, one measurement, and that\none measurement is generated by, at most, a single target, or is a clutter point. Under these\nassumptions, global nearest neighbour will assign measurements to predicted measurements to\nminimise a total (global) cost which is a function of the sum of innovations. This is an example\nof an assignment problem in combinatorial optimisation.\n\nWith multiple targets to track, the :class:`~.NearestNeighbour` algorithm compiles a list of all\nhypotheses and selects pairings with higher scores first.\n\n \n\nIn the diagram above, the top detection is selected for association with the blue track,\nas this has the highest score/probability (\$0.5\$), and (as each measurement is associated\nat most once) the remaining detection must then be associated with the orange track, giving a net\nglobal score/probability of \$0.51\$.\n\nThe :class:`~.GlobalNearestNeighbour` evaluates all valid (distance-based) hypotheses\n(measurement-prediction pairs) and selects the subset with the\ngreatest net 'score' (the collection of hypotheses pairs which have a minimum sum of distances\noverall).\n\n \n\nIn the diagram above, the blue track is associated to the bottom detection even though the top\ndetection scores higher relative to it. This association leads to a global score/probability of\n\$0.6\$ - a better net score/probability than the \$0.51\$ returned by the nearest\nneighbour algorithm.\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A multi-target simulation\nWe start by simulating 2 targets moving in different directions across the 2D Cartesian plane.\nThey start at (0, 0) and (0, 20) and cross roughly half-way through their transit.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import numpy as np\nfrom datetime import datetime, timedelta\nstart_time = datetime.now()\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate ground truth\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "np.random.seed(1991)\n\ntruths = set()\n\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 20, -1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the ground truth\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.plotter import Plotterly\nplotter = Plotterly()\nplotter.plot_ground_truths(truths, [0, 2])\nplotter.fig" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate detections with clutter\nNext, generate detections with clutter just as in the previous tutorial. This time, we generate\nclutter about each state at each time-step.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\nall_measurements = []\n\nfor k in range(20):\n measurement_set = set()\n\n for truth in truths:\n # Generate actual detection from the state with a 10% chance that no detection is received.\n if np.random.rand() <= 0.9:\n measurement = measurement_model.function(truth[k], noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter at this time-step\n truth_x = truth[k].state_vector\n truth_y = truth[k].state_vector\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n all_measurements.append(measurement_set)\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2])\nplotter.fig" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the Kalman predictor and updater\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As in the clutter tutorial, we will quantify predicted-measurement to measurement distance\nusing the Mahalanobis distance.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nhypothesiser = DistanceHypothesiser(predictor, updater, measure=Mahalanobis(), missed_distance=3)\n\n\nfrom stonesoup.dataassociator.neighbour import GlobalNearestNeighbour\ndata_associator = GlobalNearestNeighbour(hypothesiser)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the Kalman filters\n\nWe create 2 priors reflecting the targets' initial states.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.types.state import GaussianState\nprior1 = GaussianState([, , , ], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\nprior2 = GaussianState([, , , [-1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Loop through the predict, hypothesise, associate and update steps.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.types.track import Track\ntracks = {Track([prior1]), Track([prior2])}\n\nfor n, measurements in enumerate(all_measurements):\n # Calculate all hypothesis pairs and associate the elements in the best subset to the tracks.\n hypotheses = data_associator.associate(tracks,\n measurements,\n start_time + timedelta(seconds=n))\n for track in tracks:\n hypothesis = hypotheses[track]\n if hypothesis.measurement:\n post = updater.update(hypothesis)\n track.append(post)\n else: # When data associator says no detections are good enough, we'll keep the prediction\n track.append(hypothesis.prediction)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the resulting tracks\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plotter.plot_tracks(tracks, [0, 2], uncertainty=True)\nplotter.fig" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.4" } }, "nbformat": 4, "nbformat_minor": 0 }