{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 7 - Probabilistic data association tutorial\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Making an assignment between a single track and a single measurement can be problematic. In the\nprevious tutorials you may have encountered the phenomenon of *track seduction*. This occurs\nwhen clutter, or other track, points are mis-associated with a prediction. If this happens\nrepeatedly (as can be the case in high-clutter or low-$p_d$ situations) the track can\ndeviate significantly from the truth.\n\nRather than make a firm assignment at each time-step, we could work out the probability that each\nmeasurement should be assigned to a particular target. We could then propagate a measure of\nthese collective probabilities to mitigate the effect of track seduction.\n\nPictorially:\n\n- Calculate a posterior for each hypothesis;\n\n\n\n- Weight each posterior state according to the probability that its corresponding hypothesis\n was true (including the probability of missed-detection);\n\n\n\n- Merge the resulting estimate states in to a single posterior approximation.\n\n\n\nThis results in a more robust approximation to the posterior state covariances that incorporates\nnot only the uncertainty in state, but also in the association.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A PDA filter example\n\n### Ground truth\n\nSo, as before, we'll first begin by simulating some ground truth.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\n\nfrom datetime import datetime\nfrom datetime import timedelta\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\nnp.random.seed(1991)\n\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Add clutter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\nprob_detect = 0.9 # 90% chance of detection.\n\nall_measurements = []\nfor state in truth:\n measurement_set = set()\n\n # Generate detection.\n if np.random.rand() <= prob_detect:\n measurement = measurement_model.function(state, noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter.\n truth_x = state.state_vector[0]\n truth_y = state.state_vector[2]\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n all_measurements.append(measurement_set)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth and measurements with clutter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.ax.set_ylim(0, 25)\nplotter.plot_ground_truths(truth, [0, 2])\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the predictor and updater\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialise Probabilistic Data Associator\nThe :class:`~.PDAHypothesiser` and :class:`~.PDA` associator generate track predictions and\ncalculate probabilities for all prediction-detection pairs for a single prediction and multiple\ndetections.\nThe :class:`~.PDAHypothesiser` returns a collection of :class:`~.SingleProbabilityHypothesis`\ntypes. The :class:`~.PDA` takes these hypotheses and returns a dictionary of key-value pairings\nof each track and detection which it is to be associated with.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.probability import PDAHypothesiser\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.125,\n prob_detect=prob_detect)\n\nfrom stonesoup.dataassociator.probability import PDA\ndata_associator = PDA(hypothesiser=hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the PDA Filter\n\nWith these components, we can run the simulated data and clutter through the Kalman filter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Create prior\nfrom stonesoup.types.state import GaussianState\nprior = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\n\n# Loop through the predict, hypothesise, associate and update steps.\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import StateVectors # For storing state vectors during association\nfrom stonesoup.functions import gm_reduce_single # For merging states to get posterior estimate\nfrom stonesoup.types.update import GaussianStateUpdate # To store posterior estimate\n\ntrack = Track([prior])\nfor n, measurements in enumerate(all_measurements):\n hypotheses = data_associator.associate({track},\n measurements,\n start_time + timedelta(seconds=n))\n\n hypotheses = hypotheses[track]\n\n # Loop through each hypothesis, creating posterior states for each, and merge to calculate\n # approximation to actual posterior state mean and covariance.\n posterior_states = []\n posterior_state_weights = []\n for hypothesis in hypotheses:\n if not hypothesis:\n posterior_states.append(hypothesis.prediction)\n else:\n posterior_state = updater.update(hypothesis)\n posterior_states.append(posterior_state)\n posterior_state_weights.append(\n hypothesis.probability)\n\n means = StateVectors([state.state_vector for state in posterior_states])\n covars = np.stack([state.covar for state in posterior_states], axis=2)\n weights = np.asarray(posterior_state_weights)\n\n # Reduce mixture of states to one posterior estimate Gaussian.\n post_mean, post_covar = gm_reduce_single(means, covars, weights)\n\n # Add a Gaussian state approximation to the track.\n track.append(GaussianStateUpdate(\n post_mean, post_covar,\n hypotheses,\n hypotheses[0].measurement.timestamp))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], uncertainty=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n1. Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control\nSystems Magazine\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}