{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n# 8 - Joint probabilistic data association tutorial\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When we have multiple targets we're going to want to arrive at a globally-consistent collection\nof associations for PDA, in much the same way as we did for the global nearest neighbour\nassociator. This is the purpose of the *joint* probabilistic data association (JPDA) filter.\n\nSimilar to the PDA, the JPDA algorithm calculates hypothesis pairs for every measurement\nfor every track. The probability of a track-measurement hypothesis is calculated by the sum of\nnormalised conditional probabilities that every other track is associated to every other\nmeasurement (including missed detection). For example, with 3 tracks $(A, B, C)$ and 3\nmeasurements $(x, y, z)$ (including missed detection $None$), the probability of\ntrack $A$ being associated with measurement $x$ ($A \\to x$) is given by:\n\n\\begin{align}p(A \\to x) &= \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to y) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to z) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to y \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to y \\cap C \\to z) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to z \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to z \\cap C \\to y)\\end{align}\n\nwhere $\\bar{p}(\\textit{multi-hypothesis})$ is the normalised probability of the\nmulti-hypothesis.\n\nThis is demonstrated for 2 tracks associating to 3 measurements in the diagrams below:\n\n \n\nWhere the probability (for example) of the orange track associating to the green measurement is\n$0.25$.\nThe probability of every possible association set is calculated. These probabilities are then\nnormalised.\n\n \n\nA track-measurement hypothesis weight is then recalculated as the sum of the probabilities of\nevery occurrence where that track associates to that measurement.\n\n \n\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simulate ground truth\nAs with the multi-target data association tutorial, we simulate two targets moving in the\npositive x, y Cartesian plane (intersecting approximately half-way through their transition).\nWe then add truth detections with clutter at each time-step.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from datetime import datetime\nfrom datetime import timedelta\nimport numpy as np\nfrom scipy.stats import uniform\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\nnp.random.seed(1991)\n\ntruths = set()\n\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 20, -1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\n# Plot ground truth.\nfrom stonesoup.plotter import Plotterly\nplotter = Plotterly()\nplotter.plot_ground_truths(truths, [0, 2])\n\n# Generate measurements.\nall_measurements = []\n\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\nprob_detect = 0.9 # 90% chance of detection.\n\nfor k in range(20):\n measurement_set = set()\n\n for truth in truths:\n # Generate actual detection from the state with a 10% chance that no detection is received.\n if np.random.rand() <= prob_detect:\n measurement = measurement_model.function(truth[k], noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter at this time-step\n truth_x = truth[k].state_vector\n truth_y = truth[k].state_vector\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n all_measurements.append(measurement_set)\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2])\nplotter.fig" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Initial hypotheses are calculated (per track) in the same manner as the PDA.\nTherefore, in Stone Soup, the JPDA filter uses the :class:~.PDAHypothesiser to create these\nhypotheses.\nUnlike the :class:~.PDA data associator, in Stone Soup, the :class:~.JPDA associator takes\nthis collection of hypotheses and adjusts their weights according to the method described above,\nbefore returning key-value pairs of tracks and detections to be associated with them.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.hypothesiser.probability import PDAHypothesiser\n# This doesn't need to be created again, but for the sake of visualising the process, it has been\n# added.\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.125,\n prob_detect=prob_detect)\n\nfrom stonesoup.dataassociator.probability import JPDA\ndata_associator = JPDA(hypothesiser=hypothesiser)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running the JPDA filter\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from stonesoup.types.state import GaussianState\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import StateVectors\nfrom stonesoup.functions import gm_reduce_single\nfrom stonesoup.types.update import GaussianStateUpdate\n\nprior1 = GaussianState([, , , ], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\nprior2 = GaussianState([, , , [-1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\n\ntracks = {Track([prior1]), Track([prior2])}\n\nfor n, measurements in enumerate(all_measurements):\n hypotheses = data_associator.associate(tracks,\n measurements,\n start_time + timedelta(seconds=n))\n\n # Loop through each track, performing the association step with weights adjusted according to\n # JPDA.\n for track in tracks:\n track_hypotheses = hypotheses[track]\n\n posterior_states = []\n posterior_state_weights = []\n for hypothesis in track_hypotheses:\n if not hypothesis:\n posterior_states.append(hypothesis.prediction)\n else:\n posterior_state = updater.update(hypothesis)\n posterior_states.append(posterior_state)\n posterior_state_weights.append(hypothesis.probability)\n\n means = StateVectors([state.state_vector for state in posterior_states])\n covars = np.stack([state.covar for state in posterior_states], axis=2)\n weights = np.asarray(posterior_state_weights)\n\n # Reduce mixture of states to one posterior estimate Gaussian.\n post_mean, post_covar = gm_reduce_single(means, covars, weights)\n\n # Add a Gaussian state approximation to the track.\n track.append(GaussianStateUpdate(\n post_mean, post_covar,\n track_hypotheses,\n track_hypotheses.measurement.timestamp))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the resulting tracks.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plotter.plot_tracks(tracks, [0, 2], uncertainty=True)\nplotter.fig" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References\n1. Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control\nSystems Magazine\n\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.4" } }, "nbformat": 4, "nbformat_minor": 0 }