{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n# 7 - Probabilistic data association tutorial\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Making an assignment between a single track and a single measurement can be problematic. In the\nprevious tutorials you may have encountered the phenomenon of *track seduction*. This occurs\nwhen clutter, or other track, points are mis-associated with a prediction. If this happens\nrepeatedly (as can be the case in high-clutter or low-\$p_d\$ situations) the track can\ndeviate significantly from the truth.\n\nRather than make a firm assignment at each time-step, we could work out the probability that each\nmeasurement should be assigned to a particular target. We could then propagate a measure of\nthese collective probabilities to mitigate the effect of track seduction.\n\nPictorially:\n\n- Calculate a posterior for each hypothesis;\n\n \n\n- Weight each posterior state according to the probability that its corresponding hypothesis\n was true (including the probability of missed-detection);\n\n \n\n- Merge the resulting estimate states in to a single posterior approximation.\n\n \n\nThis results in a more robust approximation to the posterior state covariances that incorporates\nnot only the uncertainty in state, but also in the association.\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## A PDA filter example\n\n### Ground truth\n\nSo, as before, we'll first begin by simulating some ground truth.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import numpy as np\n\nfrom datetime import datetime\nfrom datetime import timedelta\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\nnp.random.seed(1991)\n\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add clutter.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\nprob_detect = 0.9 # 90% chance of detection.\n\nall_measurements = []\nfor state in truth:\n measurement_set = set()\n\n # Generate detection.\n if np.random.rand() <= prob_detect:\n measurement = measurement_model.function(state, noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter.\n truth_x = state.state_vector\n truth_y = state.state_vector\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n all_measurements.append(measurement_set)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the ground truth and measurements with clutter.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.ax.set_ylim(0, 25)\nplotter.plot_ground_truths(truth, [0, 2])\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the predictor and updater\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initialise Probabilistic Data Associator\nThe :class:`~.PDAHypothesiser` and :class:`~.PDA` associator generate track predictions and\ncalculate probabilities for all prediction-detection pairs for a single prediction and multiple\ndetections.\nThe :class:`~.PDAHypothesiser` returns a collection of :class:`~.SingleProbabilityHypothesis`\ntypes. The :class:`~.PDA` takes these hypotheses and returns a dictionary of key-value pairings\nof each track and detection which it is to be associated with.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.hypothesiser.probability import PDAHypothesiser\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.125,\n prob_detect=prob_detect)\n\nfrom stonesoup.dataassociator.probability import PDA\ndata_associator = PDA(hypothesiser=hypothesiser)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the PDA Filter\n\nWith these components, we can run the simulated data and clutter through the Kalman filter.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Create prior\nfrom stonesoup.types.state import GaussianState\nprior = GaussianState([, , , ], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\n\n# Loop through the predict, hypothesise, associate and update steps.\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import StateVectors # For storing state vectors during association\nfrom stonesoup.functions import gm_reduce_single # For merging states to get posterior estimate\nfrom stonesoup.types.update import GaussianStateUpdate # To store posterior estimate\n\ntrack = Track([prior])\nfor n, measurements in enumerate(all_measurements):\n hypotheses = data_associator.associate({track},\n measurements,\n start_time + timedelta(seconds=n))\n\n hypotheses = hypotheses[track]\n\n # Loop through each hypothesis, creating posterior states for each, and merge to calculate\n # approximation to actual posterior state mean and covariance.\n posterior_states = []\n posterior_state_weights = []\n for hypothesis in hypotheses:\n if not hypothesis:\n posterior_states.append(hypothesis.prediction)\n else:\n posterior_state = updater.update(hypothesis)\n posterior_states.append(posterior_state)\n posterior_state_weights.append(\n hypothesis.probability)\n\n means = StateVectors([state.state_vector for state in posterior_states])\n covars = np.stack([state.covar for state in posterior_states], axis=2)\n weights = np.asarray(posterior_state_weights)\n\n # Reduce mixture of states to one posterior estimate Gaussian.\n post_mean, post_covar = gm_reduce_single(means, covars, weights)\n\n # Add a Gaussian state approximation to the track.\n track.append(GaussianStateUpdate(\n post_mean, post_covar,\n hypotheses,\n hypotheses.measurement.timestamp))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the resulting track\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plotter.plot_tracks(track, [0, 2], uncertainty=True)\nplotter.fig" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References\n1. Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control\nSystems Magazine\n\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.9" } }, "nbformat": 4, "nbformat_minor": 0 }