PK *9TDkW W 01_KalmanFilterTutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 1 - An introduction to Stone Soup: using the Kalman filter\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook is designed to introduce some of the basic features of Stone Soup using a single\ntarget scenario and a Kalman filter as an example.\n\n## Background and notation\n\nLet $p(\\mathbf{x}_{k})$, the probability distribution over\n$\\mathbf{x}_{k} \\in \\mathbb{R}^n$, represent a hidden state at some discrete point\n$k$. The $k$ is most often interpreted as a timestep, but could be any sequential\nindex (hence we don't use $t$). The measurement is given by\n$\\mathbf{z}_{k} \\in \\mathbb{R}^m$.\n\nIn Stone Soup, objects derived from the :class:`~.State` class carry information on a variety of\nstates, whether they be hidden states, observations, ground truth. Minimally, a state will\ncomprise a :class:`~.StateVector` and a timestamp. These can be extended. For example the\n:class:`~.GaussianState` is parameterised by a mean state vector and a covariance matrix. (We'll\nsee this in action later.)\n\nOur goal is to infer $p(\\mathbf{x}_{k})$, given a sequence of measurements\n$\\mathbf{z}_{1}, ..., \\mathbf{z}_{k}$, (which we'll write as $\\mathbf{z}_{1:k}$). In\ngeneral $\\mathbf{z}$ can include clutter and false alarms. We'll defer those complications\nto later tutorials and assume, for the moment, that all measurements are generated by a target,\nand that detecting the target at each timestep is certain ($p_d = 1$ and\n$p_{fa} = 0$).\n\n### Prediction\n\nWe proceed under the Markovian assumption that $p(\\mathbf{x}_k) = \\int_{-\\infty} ^{\\infty}\np(\\mathbf{x}_k|\\mathbf{x}_{k-1}) p(\\mathbf{x}_{k-1}) d \\mathbf{x}_{k-1}$, meaning that the\ndistribution over the state of an object at time $k$ can be predicted entirely from its\nstate at previous time $k-1$. If our understanding of $p(\\mathbf{x}_{k-1})$ was\ninformed by a series of measurements up to and including timestep $k-1$, we can write\n\n\\begin{align}p(\\mathbf{x}_k|\\mathbf{z}_{1:k-1}) =\n \\int_{-\\infty}^{\\infty} p(\\mathbf{x}_k|\\mathbf{x}_{k-1})\n p(\\mathbf{x}_{k-1}|\\mathbf{z}_{1:k-1})d \\mathbf{x}_{k-1}\\end{align}\n\nThis is known as the *Chapman-Kolmogorov* equation. In Stone Soup we refer to this process as\n*prediction* and to an object that undertakes it as a :class:`~.Predictor`. A predictor requires\na *state transition model*, namely a function which undertakes\n$\\mathbf{x}_{k|k-1} = f(\\mathbf{x}_{k-1}, \\mathbf{w}_k)$, where $\\mathbf{w}_k$ is a\nnoise term. Stone Soup has transition models derived from the :class:`~.TransitionModel` class.\n\n### Update\n\nWe assume a sensor measurement is generated by some stochastic process represented by a function,\n$\\mathbf{z}_k = h(\\mathbf{x}_k, \\boldsymbol{\\nu}_k)$ where $\\boldsymbol{\\nu}_k$ is\nthe noise.\n\nThe goal of the update process is to generate the *posterior state estimate*, from the prediction\nand the measurement. It does this by way of Bayes' rule,\n\n\\begin{align}p(\\mathbf{x}_k | \\mathbf{z}_{1:k}) =\n \\frac{ p(\\mathbf{z}_{k} | \\mathbf{x}_k) p(\\mathbf{x}_k | \\mathbf{z}_{1:k-1})}\n {p(\\mathbf{z}_k)}\\end{align}\n\nwhere $p(\\mathbf{x}_k | \\mathbf{z}_{1:k-1})$ is the output of the prediction stage,\n$p(\\mathbf{z}_{k} | \\mathbf{x}_k)$ is known as the likelihood, and $p(\\mathbf{z}_k)$\nthe evidence. In Stone Soup, this calculation is undertaken by the :class:`~.Updater` class.\nUpdaters use a :class:`~.MeasurementModel` class which models the effect of $h(\\cdot)$.\n\n\n\nThis figure represents a single instance of this predict-update process. It shows a prior\ndistribution, at the bottom left, a prediction (dotted ellipse), and posterior (top right)\ncalculated after a measurement update. We then proceed recursively, the posterior distribution at\n$k$ becoming the prior for the next measurement timestep, and so on.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A nearly-constant velocity example\n\nWe're going to set up a simple scenario in which a target moves at constant velocity with the\naddition of some random noise, (referred to as a *nearly constant velocity* model).\n\nAs is customary in Python scripts we begin with some imports. (These ones allow us access to\nmathematical and timing functions.)\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\nfrom datetime import datetime, timedelta"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Simulate a target\n\nWe consider a 2d Cartesian scheme where the state vector is\n$[x \\ \\dot{x} \\ y \\ \\dot{y}]^T$. That is, we'll model the target motion as a position\nand velocity component in each dimension. The units used are unimportant, but do need to be\nconsistent.\n\nTo start we'll create a simple truth path, sampling at 1\nsecond intervals. We'll do this by employing one of Stone Soup's native transition models.\n\nThese inputs are required:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\n\n# And the clock starts\nstart_time = datetime.now()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We note that it can sometimes be useful to fix our random number generator in order to probe a\nparticular example repeatedly.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"np.random.seed(1991)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The :class:`~.ConstantVelocity` class creates a one-dimensional constant velocity model with\nGaussian noise. For this simulation $\\mathbf{x}_k = F_k \\mathbf{x}_{k-1} + \\mathbf{w}_k$,\n$\\mathbf{w}_k \\sim \\mathcal{N}(0,Q)$, with\n\n\\begin{align}F_{k} &= \\begin{bmatrix}\n 1 & \\triangle t \\\\\n 0 & 1 \\\\\n \\end{bmatrix} \\\\\n Q_k &= \\begin{bmatrix}\n \\frac{\\triangle t^3}{3} & \\frac{\\triangle t^2}{2} \\\\\n \\frac{\\triangle t^2}{2} & \\triangle t \\\\\n \\end{bmatrix} q\\end{align}\n\nwhere $q$, the input parameter to :class:`~.ConstantVelocity`, is the magnitude of the\nnoise per $\\triangle t$-sized timestep.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The :class:`~.CombinedLinearGaussianTransitionModel` class takes a number\nof 1d models and combines them in a linear Gaussian model of arbitrary dimension, $D$.\n\n\\begin{align}F_{k}^{D} &= \\begin{bmatrix}\n F_k^{1} & & \\mathbf{0} \\\\\n & \\ddots & \\\\\n \\mathbf{0} & & F_k^d \\\\\n \\end{bmatrix}\\\\\n Q_{k}^{D} &= \\begin{bmatrix}\n Q_k^{1} & & \\mathbf{0} \\\\\n & \\ddots & \\\\\n \\mathbf{0} & & Q_k^d \\\\\n \\end{bmatrix}\\end{align}\n\nWe want a 2d simulation, so we'll do:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"q_x = 0.05\nq_y = 0.05\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(q_x),\n ConstantVelocity(q_y)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A 'truth path' is created starting at (0,0) moving to the NE at one distance unit per (time) step in each dimension.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\n\nnum_steps = 20\nfor k in range(1, num_steps + 1):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Thus the ground truth is generated and we can plot the result.\n\nStone Soup has an in-built plotting class which can be used to plot\nground truths, measurements and tracks in a consistent format. It can be accessed by importing\nthe class :class:`Plotter` from Stone Soup as below.\n\nNote that the mapping argument is [0, 2] because those are the x and y position indices from our state vector.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.plot_ground_truths(truth, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can check the $F_k$ and $Q_k$ matrices (generated over a 1s period).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model.matrix(time_interval=timedelta(seconds=1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model.covar(time_interval=timedelta(seconds=1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"At this point you can play with the various parameters and see how it affects the simulated\noutput.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Simulate measurements\n\nWe'll use one of Stone Soup's measurement models in order to generate\nmeasurements from the ground truth. For the moment we assume a 'linear' sensor which detects the\nposition, but not velocity, of a target, such that\n$\\mathbf{z}_k = H_k \\mathbf{x}_k + \\boldsymbol{\\nu}_k$,\n$\\boldsymbol{\\nu}_k \\sim \\mathcal{N}(0,R)$, with\n\n\\begin{align}H_k &= \\begin{bmatrix}\n 1 & 0 & 0 & 0\\\\\n 0 & 0 & 1 & 0\\\\\n \\end{bmatrix}\\\\\n R &= \\begin{bmatrix}\n 1 & 0\\\\\n 0 & 1\\\\\n \\end{bmatrix} \\omega\\end{align}\n\nwhere $\\omega$ is set to 5 initially (but again, feel free to play around).\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We're going to need a :class:`~.Detection` type to\nstore the detections, and a :class:`~.LinearGaussian` measurement model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.detection import Detection\nfrom stonesoup.models.measurement.linear import LinearGaussian"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The linear Gaussian measurement model is set up by indicating the number of dimensions in the\nstate vector and the dimensions that are measured (so specifying $H_k$) and the noise\ncovariance matrix $R$.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurement_model = LinearGaussian(\n ndim_state=4, # Number of state dimensions (position and velocity in 2D)\n mapping=(0, 2), # Mapping measurement vector index to state index\n noise_covar=np.array([[5, 0], # Covariance matrix for Gaussian PDF\n [0, 5]])\n )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check the output is as we expect\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurement_model.matrix()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurement_model.covar()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Generate the measurements\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurements = []\nfor state in truth:\n measurement = measurement_model.function(state, noise=True)\n measurements.append(Detection(measurement,\n timestamp=state.timestamp,\n measurement_model=measurement_model))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the result, again mapping the x and y position values\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_measurements(measurements, [0, 2])\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"At this stage you should have a moderately linear ground truth path (dotted line) with a series\nof simulated measurements overplotted (blue circles). Take a moment to fiddle with the numbers in\n$Q$ and $R$ to see what it does to the path and measurements.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Construct a Kalman filter\n\nWe're now ready to build a tracker. We'll use a Kalman filter as it's conceptually the simplest\nto start with. The Kalman filter is described extensively elsewhere [#]_, [#]_, so for the\nmoment we just assert that the prediction step proceeds as:\n\n\\begin{align}\\mathbf{x}_{k|k-1} &= F_{k}\\mathbf{x}_{k-1} + B_{k}\\mathbf{u}_{k}\\\\\n P_{k|k-1} &= F_{k}P_{k-1}F_{k}^T + Q_{k},\\end{align}\n\n$B_k \\mathbf{u}_k$ is a control term which we'll ignore for now (we assume we don't\ninfluence the target directly).\n\nThe update step is:\n\n\\begin{align}\\mathbf{x}_{k} &= \\mathbf{x}_{k|k-1} + K_k(\\mathbf{z}_k - H_{k}\\mathbf{x}_{k|k-1})\\\\\n P_{k} &= P_{k|k-1} - K_k H_{k} P_{k|k-1}\\\\\\end{align}\n\nwhere,\n\n\\begin{align}K_k &= P_{k|k-1} H_{k}^T S_k^{-1}\\\\\n S_k &= H_{k} P_{k|k-1} H_{k}^T + R_{k}\\end{align}\n\n$\\mathbf{z}_k - H_{k}\\mathbf{x}_{k|k-1}$ is known as the *innovation* and $S_k$ the\n*innovation covariance*; $K_k$ is the *Kalman gain*.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Constructing a predictor and updater in Stone Soup is simple. In a nice division of\nresponsibility, a :class:`~.Predictor` takes a :class:`~.TransitionModel` as input and\nan :class:`~.Updater` takes a :class:`~.MeasurementModel` as input. Note that for now we're using\nthe same models used to generate the ground truth and the simulated measurements. This won't\nusually be possible and it's an interesting exercise to explore what happens when these\nparameters are mismatched.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the Kalman filter\nNow we have the components, we can execute the Kalman filter estimator on the simulated data.\n\nIn order to start, we'll need to create the first prior estimate. We're going to use the\n:class:`~.GaussianState` we mentioned earlier. As the name suggests, this parameterises the state\nas $\\mathcal{N}(\\mathbf{x}_0, P_0)$. By happy chance the initial values are chosen to match\nthe truth quite well. You might want to manipulate these to see what happens.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import GaussianState\nprior = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this instance data association is done somewhat implicitly. There is one prediction and\none detection per timestep so no need to think too deeply. Stone Soup discourages such\n(undesirable) practice and requires that a :class:`~.Prediction` and :class:`~.Detection` are\nassociated explicitly. This is done by way of a :class:`~.Hypothesis`; the most simple\nis a :class:`~.SingleHypothesis` which associates a single predicted state with a single\ndetection. There is much more detail on how the :class:`~.Hypothesis` class is used in later\ntutorials.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.hypothesis import SingleHypothesis"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With this, we'll now loop through our measurements, predicting and updating at each timestep.\nUncontroversially, a Predictor has :meth:`predict` function and an Updater an :meth:`update` to\ndo this. Storing the information is facilitated by the top-level :class:`~.Track` class which\nholds a sequence of states.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.track import Track\ntrack = Track()\nfor measurement in measurements:\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement) # Group a prediction and measurement\n post = updater.update(hypothesis)\n track.append(post)\n prior = track[-1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track, including uncertainty ellipses\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], uncertainty=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Key points\n1. Stone Soup is built on a variety of types of :class:`~.State` object. These can be used to\n represent hidden states, observations, estimates, ground truth, and more.\n2. Bayesian recursion is undertaken by the successive applications of predict and update methods\n using a :class:`~.Predictor` and an :class:`~.Updater`. Explicit association of predicted\n states with measurements is necessary. Broadly speaking, predictors apply a\n :class:`~.TransitionModel`, data associators use a\n :class:`~.Hypothesiser` to associate a prediction with a measurement, and updaters use this\n association together with the :class:`~.MeasurementModel` to calculate the posterior state\n estimate.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Kalman 1960, A New Approach to Linear Filtering and Prediction Problems, Transactions of\n the ASME, Journal of Basic Engineering, 82 (series D), 35\n (https://pdfs.semanticscholar.org/bb55/c1c619c30f939fc792b049172926a4a0c0f7.pdf?_ga=2.51363242.2056055521.1592932441-1812916183.1592932441)\n.. [#] Anderson & Moore 1979, Optimal filtering,\n (http://users.cecs.anu.edu.au/~john/papers/BOOK/B02.PDF)\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK 19T=3- - 08_JPDATutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 8 - Joint probabilistic data association tutorial\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When we have multiple targets we're going to want to arrive at a globally-consistent collection\nof associations for PDA, in much the same way as we did for the global nearest neighbour\nassociator. This is the purpose of the *joint* probabilistic data association (JPDA) filter.\n\nSimilar to the PDA, the JPDA algorithm calculates hypothesis pairs for every measurement\nfor every track. The probability of a track-measurement hypothesis is calculated by the sum of\nnormalised conditional probabilities that every other track is associated to every other\nmeasurement (including missed detection). For example, with 3 tracks $(A, B, C)$ and 3\nmeasurements $(x, y, z)$ (including missed detection $None$), the probability of\ntrack $A$ being associated with measurement $x$ ($A \\to x$) is given by:\n\n\\begin{align}p(A \\to x) &= \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to y) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to None \\cap C \\to z) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to y \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to y \\cap C \\to z) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to z \\cap C \\to None) +\\\\\n &+ \\bar{p}(A \\to x \\cap B \\to z \\cap C \\to y)\\end{align}\n\nwhere $\\bar{p}(\\textit{multi-hypothesis})$ is the normalised probability of the\nmulti-hypothesis.\n\nThis is demonstrated for 2 tracks associating to 3 measurements in the diagrams below:\n\n\n\nWhere the probability (for example) of the orange track associating to the green measurement is\n$0.25$.\nThe probability of every possible association set is calculated. These probabilities are then\nnormalised.\n\n\n\nA track-measurement hypothesis weight is then recalculated as the sum of the probabilities of\nevery occurrence where that track associates to that measurement.\n\n\n\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Simulate ground truth\nAs with the multi-target data association tutorial, we simulate two targets moving in the\npositive x, y cartesian plane (intersecting approximately half-way through their transition).\nWe then add truth detections with clutter at each time-step.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime\nfrom datetime import timedelta\nimport numpy as np\nfrom scipy.stats import uniform\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\nnp.random.seed(1991)\n\ntruths = set()\n\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 20, -1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\n# Plot ground truth.\nfrom stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.ax.set_ylim(0, 25)\nplotter.plot_ground_truths(truths, [0, 2])\n\n# Generate measurements.\nall_measurements = []\n\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\nprob_detect = 0.9 # 90% chance of detection.\n\nfor k in range(20):\n measurement_set = set()\n\n for truth in truths:\n # Generate actual detection from the state with a 10% chance that no detection is received.\n if np.random.rand() <= prob_detect:\n measurement = measurement_model.function(truth[k], noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter at this time-step\n truth_x = truth[k].state_vector[0]\n truth_y = truth[k].state_vector[2]\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n all_measurements.append(measurement_set)\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2], color='g')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initial hypotheses are calculated (per track) in the same manner as the PDA.\nTherefore, in Stone Soup, the JPDA filter uses the :class:`~.PDAHypothesiser` to create these\nhypotheses.\nUnlike the :class:`~.PDA` data associator, in Stone Soup, the :class:`~.JPDA` associator takes\nthis collection of hypotheses and adjusts their weights according to the method described above,\nbefore returning key-value pairs of tracks and detections to be associated with them.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.probability import PDAHypothesiser\n# This doesn't need to be created again, but for the sake of visualising the process, it has been\n# added.\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.125,\n prob_detect=prob_detect)\n\nfrom stonesoup.dataassociator.probability import JPDA\ndata_associator = JPDA(hypothesiser=hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running the JPDA filter\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import GaussianState\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import StateVectors\nfrom stonesoup.functions import gm_reduce_single\nfrom stonesoup.types.update import GaussianStateUpdate\n\nprior1 = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\nprior2 = GaussianState([[0], [1], [20], [-1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\n\ntracks = {Track([prior1]), Track([prior2])}\n\nfor n, measurements in enumerate(all_measurements):\n hypotheses = data_associator.associate(tracks,\n measurements,\n start_time + timedelta(seconds=n))\n\n # Loop through each track, performing the association step with weights adjusted according to\n # JPDA.\n for track in tracks:\n track_hypotheses = hypotheses[track]\n\n posterior_states = []\n posterior_state_weights = []\n for hypothesis in track_hypotheses:\n if not hypothesis:\n posterior_states.append(hypothesis.prediction)\n else:\n posterior_state = updater.update(hypothesis)\n posterior_states.append(posterior_state)\n posterior_state_weights.append(hypothesis.probability)\n\n means = StateVectors([state.state_vector for state in posterior_states])\n covars = np.stack([state.covar for state in posterior_states], axis=2)\n weights = np.asarray(posterior_state_weights)\n\n # Reduce mixture of states to one posterior estimate Gaussian.\n post_mean, post_covar = gm_reduce_single(means, covars, weights)\n\n # Add a Gaussian state approximation to the track.\n track.append(GaussianStateUpdate(\n post_mean, post_covar,\n track_hypotheses,\n track_hypotheses[0].measurement.timestamp))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting tracks.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(tracks, [0, 2], uncertainty=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n1. Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control\nSystems Magazine\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK E9TQ]vY 11_GMPHDTutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 11 - Gaussian mixture PHD tutorial\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Background\n\nPrevious tutorials have described the difficulties of state estimation when there are\nmultiple targets under consideration. The probability hypothesis density (PHD) filter has been proposed as a solution\nto this problem that is analogous to the Kalman Filter's solution in single-object\ntracking. Where the Kalman filter propagates the first order movement of the posterior\ndistribution of the target, the PHD filter creates a multiple target posterior\ndistribution and propagates its first-order statistical moment, or PHD. At each\ntime instance, the collections of targets and detections (including both measurements\nand false detections) are modelled as random finite sets. This means that the number\nof elements in each set is a random variable, and the elements themselves follow a\nprobability distribution. Note that this is different from previously discussed filters\nwhich have a constant or known number of objects.\n\nIn a GM-PHD filter, each object is assumed to follow a linear Gaussian model, just like\nwe saw in previous tutorials. However, the multiple objects need not have the same\ncovariance matrices, meaning that the multiple target posterior distribution will be\na **Gaussian mixture (GM)**.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First we will recall some of the formulas that will be used in this filter.\n\nTransition Density: Given a state $p(\\mathbf{x}_{k-1})$ at time $k-1$, the probability density of\na transition to the state $p(\\mathbf{x}_k)$ at time $k$ is given by\n$f_{k\\vert k-1}(\\mathbf{x}_{k}\\vert \\mathbf{x}_{k-1})$\n\nLikelihood Function: Given a state $\\mathbf{x}_{k}$ at time $k$, the probability density of\nreceiving the detection $\\mathbf{z}_{k}$ is given by\n$g_{k}(\\mathbf{z}_{k}\\vert \\mathbf{x}_{k})$\n\nThe Posterior Density: The probability density of state $\\mathbf{x}_{k}$ given all the previous\nobservations is denoted by $p_{k}(\\mathbf{x}_{k}\\vert \\mathbf{z}_{1:k})$. Using an initial density\n$p_{0}(\\cdot)$, we can apply Bayes' recursion to show that the posterior density is actually\n\n\\begin{align}p_{k}(\\mathbf{x}_{k}\\vert \\mathbf{z}_{1:k}) = {{g_{k}(\\mathbf{z}_{k}\\vert \\mathbf{x}_{k})p_{k\\vert k-1}(\\mathbf{x}_{k}\\vert \\mathbf{z}_{1:k-1})} \\over {\\int g_{k}(\\mathbf{z}_{k}\\vert \\mathbf{x})p_{k\\vert k-1}(\\mathbf{x}\\vert \\mathbf{z}_{1:k-1})d\\mathbf{x}}}\\end{align}\n\n\nIt is important to notice here that the state at time $k$ can be derived wholly by\nthe state at time $k-1$.\n\nHere we also introduce the following notation:\n$p_{S,k}(\\zeta)$ is the probability that a target $S$ will exist at time $k$ given that\nits previous state was $\\zeta$\n\nSuppose we have the random finite set $\\mathbf{X}_{k} \\in \\chi$ corresponding to the set of\ntarget states at time $k$ and $\\mathbf{X}_k$ has probability distribution $P$. Integrating over\nevery region $S \\in \\chi$, we get a formula for the first order moment (also called the\nintensity) at time $k$, $v_{k}$\n\n\\begin{align}\\int \\left \\vert \\mathbf{X}_{k}\\cap S\\right \\vert P(d\\mathbf{X}_k)=\\int _{S}v_{k}(x)dx.\\end{align}\n\nThe set of targets spawned at time $k$ by a target whose previous state was $\\zeta$ is the\nrandom finite set $\\mathbf{B}_{k|k-1}$. This new set of targets has intensity denoted $\\beta_{k|k-1}$.\n\nThe intensity of the random finite set of births at time $k$ is given by $\\gamma_{k}$.\n\nThe intensity of the random finite set of clutter at time $k$ is given by $\\kappa_{k}$.\n\nThe probability that a state $x$ will be detected at time $k$ is given by $p_{D, k}(x)$.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Assumptions\nThe GM-PHD filter assumes that each target is independent of one another in both generated\nobservations and in evolution. Clutter is also assumed to be independent of the target\nmeasurements. Finally, we assume that the target locations at a given time step are\ndependent on the multi-target prior density, and their distributions are Poisson. Typically,\nthe target locations are also dependent on previous measurements, but that has been omitted\nin current GM-PHD algorithms.\n\n### Posterior Propagation Formula\nUnder the above assumptions, Vo and Ma [#]_ proved that the posterior intensity can be\npropagated in time using the PHD recursion as follows:\n$\\eqalignno{v _{ k\\vert k-1} (x) =&\\, \\int p_{S,k}(\\zeta)f_{k\\vert k-1} (x\\vert \\zeta)v_{k-1}(\\zeta)d\\zeta\\cr & +\\int \\beta_{k\\vert k-1} (x\\vert \\zeta)v_{k-1}(\\zeta)d\\zeta+\\gamma _{k}(x) & \\cr v_{k} (x) =&\\, \\left[ 1-p_{D,k}(x)\\right]v_{k\\vert k-1}(x)\\cr & +\\!\\!\\sum\\limits _{z\\in Z_{k}} \\!{{ p_{D,k}(x)g_{k}(z\\vert x)}v_{k\\vert k-1}(x) \\over { \\kappa _{k}(z)\\!+\\!\\int p_{D,k}(\\xi)g_{k}(z\\vert \\xi)v_{k\\vert k-1}(\\xi)}} . \\cr &&}$\n\nFor more information about the specific formulas for linear and non-linear Gaussian models,\nplease see Vo and Ma's full paper.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A Ground-Based Multi-Target Simulation\nThis simulation will include several targets moving in different directions across the 2D\nCartesian plane. The start locations of each object are random. These start locations are\ncalled priors and are known to the filter, via the density $p_{0}(\\cdot)$ discussed above.\n\nAt each time step, new targets are created and some targets die according to defined\nprobabilities.\n\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We start with some imports as usual.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Imports for plotting\nfrom matplotlib import pyplot as plt\nplt.rcParams['figure.figsize'] = (14, 12)\nplt.style.use('seaborn-colorblind')\n# Other general imports\nimport numpy as np\nfrom datetime import datetime, timedelta\nstart_time = datetime.now()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truth\n\nAt the end of the tutorial we will plot the Gaussian mixtures. The ground truth Gaussian\nmixtures are stored in this list where each index refers to an instance in time and holds\nall ground truths at that time step.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"truths_by_time = []\n\n# Create transition model\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, ConstantVelocity\ntransition_model = CombinedLinearGaussianTransitionModel(\n (ConstantVelocity(0.3), ConstantVelocity(0.3)))\n\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nstart_time = datetime.now()\ntruths = set() # Truths across all time\ncurrent_truths = set() # Truths alive at current time\nstart_truths = set()\nnumber_steps = 20\ndeath_probability = 0.005\nbirth_probability = 0.2\n\n# Initialize 3 truths. This can be changed to any number of truths you wish.\ntruths_by_time.append([])\nfor i in range(3):\n x, y = initial_position = np.random.uniform(-30, 30, 2) # Range [-30, 30] for x and y\n x_vel, y_vel = (np.random.rand(2))*2 - 1 # Range [-1, 1] for x and y velocity\n state = GroundTruthState([x, x_vel, y, y_vel], timestamp=start_time)\n\n truth = GroundTruthPath([state])\n current_truths.add(truth)\n truths.add(truth)\n start_truths.add(truth)\n truths_by_time[0].append(state)\n\n# Simulate the ground truth over time\nfor k in range(number_steps):\n truths_by_time.append([])\n # Death\n for truth in current_truths.copy():\n if np.random.rand() <= death_probability:\n current_truths.remove(truth)\n # Update truths\n for truth in current_truths:\n updated_state = GroundTruthState(\n transition_model.function(truth[-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time + timedelta(seconds=k))\n truth.append(updated_state)\n truths_by_time[k].append(updated_state)\n # Birth\n for _ in range(np.random.poisson(birth_probability)):\n x, y = initial_position = np.random.rand(2) * [120, 120] # Range [0, 20] for x and y\n x_vel, y_vel = (np.random.rand(2))*2 - 1 # Range [-1, 1] for x and y velocity\n state = GroundTruthState([x, x_vel, y, y_vel], timestamp=start_time + timedelta(seconds=k))\n\n # Add to truth set for current and for all timestamps\n truth = GroundTruthPath([state])\n current_truths.add(truth)\n truths.add(truth)\n truths_by_time[k].append(state)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth\n\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.plot_ground_truths(truths, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate detections with clutter\nNext, generate detections with clutter just as in the previous tutorial. The clutter is\nassumed to be uniformly distributed across the entire field of view, here assumed to\nbe the space where $x \\in [-200, 200]$ and $y \\in [-200, 200]$.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Make the measurement model\nfrom stonesoup.models.measurement.linear import LinearGaussian\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\n\n# Generate detections and clutter\n\nfrom scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\n\nall_measurements = []\n# The probability detection and clutter rate play key roles in the posterior intensity.\n# They can be changed to see their effect.\nprobability_detection = 0.9\nclutter_rate = 3.0\n\nfor k in range(number_steps):\n measurement_set = set()\n timestamp = start_time + timedelta(seconds=k)\n\n for truth in truths:\n try:\n truth_state = truth[timestamp]\n except IndexError:\n # This truth not alive at this time. Skip this iteration of the for loop.\n continue\n\n # Generate actual detection from the state with a 10% chance that no detection is received.\n if np.random.rand() <= probability_detection:\n # Generate actual detection from the state\n measurement = measurement_model.function(truth_state, noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=truth_state.timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter at this time-step\n for _ in range(np.random.poisson(clutter_rate)):\n x = uniform.rvs(-200, 400)\n y = uniform.rvs(-200, 400)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=timestamp,\n measurement_model=measurement_model))\n\n all_measurements.append(measurement_set)\n\n\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2], color='g')\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create the Predicter and Updater\n\nThe updater is a :class:`~.PHDUpdater`, and since it uses the mixed Gaussian paths, it is a\nGM-PHD updater. For each individual track we use a :class:`~.KalmanUpdater`. Here we assume\nthat the measurement range and clutter spatial density are known to the filter. We\ninvite you to change these variables to mimic removing this assumption.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.kalman import KalmanUpdater\nkalman_updater = KalmanUpdater(measurement_model)\n\n# Area in which we look for target. Note that if a target appears outside of this area the \n# filter will not pick up on it.\nmeas_range = np.array([[-1, 1], [-1, 1]])*200 \nclutter_spatial_density = clutter_rate/np.prod(np.diff(meas_range))\n\nfrom stonesoup.updater.pointprocess import PHDUpdater\nupdater = PHDUpdater(\n kalman_updater,\n clutter_spatial_density=clutter_spatial_density,\n prob_detection=probability_detection,\n prob_survival=1-death_probability)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The GM-PHD filter quantifies the predicted-measurement distance, just as in previous\ntutorials. The metric for this is the Mahalanobis distance. We also require two\nobjects which together form the predictor and to generate hypotheses (or predictions)\nbased on the previous state. Recall that the GM-PHD propagates the first-order\nstatistical moment which is a Gaussian mixture. The predicted state at the next time\nstep is also a Gaussian mixture and can be determined solely by the propagated prior.\nDetermining this predicted Gaussian mixture is the job for the\n:class:`~.GaussianMixtureHypothesiser` class. We must also generate a prediction for each track\nin the simulation, and so use the :class:`~.DistanceHypothesiser` object as before.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\nkalman_predictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nbase_hypothesiser = DistanceHypothesiser(kalman_predictor, kalman_updater, Mahalanobis(), missed_distance=3)\n\nfrom stonesoup.hypothesiser.gaussianmixture import GaussianMixtureHypothesiser\nhypothesiser = GaussianMixtureHypothesiser(base_hypothesiser, order_by_detection=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The updater takes a list of hypotheses from the hypothesiser and transforms them into\npotential new states for our tracks. Each state is a :class:`~.TaggedWeightedGaussianState`\nobject and has a state vector, covariance, weight, tag, and timestamp. Some of the\nupdated states have a very low weight, indicating that they do not contribute much to\nthe Gaussian mixture. To ease the computational complexity, a :class:`~.GaussianMixtureReducer`\nis used to merge and prune many of the states based on provided thresholds. States whose\ndistance is less than the merging threshold will be combined, and states whose weight\nis less than the pruning threshold will be removed.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.mixturereducer.gaussianmixture import GaussianMixtureReducer\n# Initialise a Gaussian Mixture reducer\nmerge_threshold = 5\nprune_threshold = 1E-8\n\nreducer = GaussianMixtureReducer(\n prune_threshold=prune_threshold,\n pruning=True,\n merge_threshold=merge_threshold,\n merging=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we initialize the Gaussian mixture at time k=0. In this implementation, the GM-PHD\ntracker knows the start state of the first 3 tracks that were created. After that it\nmust pick up on new tracks and discard old ones. It is not necessary to provide the \ntracker with these start states, you can simply define the `tracks` as an empty set.\n\nFeel free to change the `state_vector` from the actual truth state vector to something\nelse. This would mimic if the tracker was unsure about where the objects were originating.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import TaggedWeightedGaussianState\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import CovarianceMatrix\ncovar = CovarianceMatrix(np.diag([10, 5, 10, 5]))\n\ntracks = set()\nfor truth in start_truths:\n new_track = TaggedWeightedGaussianState(\n state_vector=truth.state_vector,\n covar=covar**2,\n weight=0.25,\n tag=TaggedWeightedGaussianState.BIRTH,\n timestamp=start_time)\n tracks.add(Track(new_track))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The hypothesier takes the current Gaussian mixture as a parameter. Here we will \ninitialize it to use later. \n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"reduced_states = set([track[-1] for track in tracks])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To ensure that new targets get represented in the filter, we must add a birth \ncomponent to the Gaussian mixture at every time step. The birth component's mean and \ncovariance must create a distribution that covers the entire state space, and its weight \nmust be equal to the expected number of births per timestep. For more information about \nthe birth component, see the algorithm provided in [#]_. If the state space is very \nlarge, it becomes inefficient to hold a component that covers it. Alternative \nimplementations (as well as more dicussion about the birth component) are discussed in \n[#]_.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"birth_covar = CovarianceMatrix(np.diag([1000, 2, 1000, 2]))\nbirth_component = TaggedWeightedGaussianState(\n state_vector=[0, 0, 0, 0],\n covar=birth_covar**2,\n weight=0.25,\n tag='birth',\n timestamp=start_time\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the Tracker\nNow that we have all of our components, we can create a tracker. At each time instance,\nthe tracker will go through four steps: hypothesise, update, reduce, and match. Let us \nbriefly recap these four steps. The 'hypothesise' step is similar to the 'prediction' \nstep in other filters. It uses the existing state space and the measurements to generate \na list of all hypotheses for this time step (remember, each hypothesis is a Gaussian \ncomponent). In the 'update' step, the filter combines the hypotheses into an updated \nGaussian mixture. The 'reduce' step helps limit the computational complexity by merging \nand pruning the updated Gaussian mixture. The filter returns this final set of states \nand then we perform a 'match' step where we use the states' tags to match them with an \nexisting track (or create a new track).\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"These lists will be used to plot the Gaussian mixtures later. They are not used in\nthe filter itself.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"all_gaussians = []\ntracks_by_time = []"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We need a threshold to compare state weights against. If the state has a high enough\nweight in the Gaussian mixture, we will add it to an existing track or make a new\ntrack for it. Lowering this value makes the filter more sensitive but may also\nincrease the number of false estimations. Increasing this value may increase the number\nof missed targets.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"state_threshold = 0.25\n\nfor n, measurements in enumerate(all_measurements):\n tracks_by_time.append([])\n all_gaussians.append([])\n\n # The hypothesiser takes in the current state of the Gaussian mixture. This is equal to the list of \n # reduced states from the previous iteration. If this is the first iteration, then we use the priors \n # defined above. \n current_state = reduced_states\n \n # At every time step we must add the birth component to the current state \n if measurements: \n time = list(measurements)[0].timestamp\n else: \n time = start_time + timedelta(seconds=n)\n birth_component.timestamp = time\n current_state.add(birth_component)\n\n # Generate the set of hypotheses\n hypotheses = hypothesiser.hypothesise(current_state,\n measurements,\n timestamp=time,\n # keep our hypotheses ordered by detection, not by track\n order_by_detection=True)\n\n # Turn the hypotheses into a GaussianMixture object holding a list of states\n updated_states = updater.update(hypotheses)\n\n # Prune and merge the updated states into a list of reduced states\n reduced_states = set(reducer.reduce(updated_states))\n\n # Add the reduced states to the track list. Each reduced state has a unique tag. If this tag matches the tag of a\n # state from a live track, we add the state to that track. Otherwise, we generate a new track if the reduced\n # state's weight is high enough (i.e. we are sufficiently certain that it is a new track).\n for reduced_state in reduced_states:\n # Add the reduced state to the list of Gaussians that we will plot later. Have a low threshold to eliminate some\n # clutter that would make the graph busy and hard to understand\n if reduced_state.weight > 0.05: all_gaussians[n].append(reduced_state)\n\n tag = reduced_state.tag\n # Here we check to see if the state has a sufficiently high weight to consider being added.\n if reduced_state.weight > state_threshold:\n # Check if the reduced state belongs to a live track\n for track in tracks:\n track_tags = [state.tag for state in track.states]\n\n if tag in track_tags:\n track.append(reduced_state)\n tracks_by_time[n].append(reduced_state)\n break\n else: # Execute if no \"break\" is hit; i.e. no track with matching tag\n # Make a new track out of the reduced state\n new_track = Track(reduced_state)\n tracks.add(new_track)\n tracks_by_time[n].append(reduced_state)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Plot the Tracks\nFirst, determine the x and y range for axes. We want to zoom in as much as possible\non the measurements and tracks while not losing any of the information. This section\nis not strictly necessary as we already set the field of view to be the range [-100, 100]\nfor both x and y. However, sometimes an object may leave the field of view. If you want\nto ignore objects that have left the field of view, comment out this section and define\nthe variables\n`x_min` = `y_min` = -100 and `x_max` = `y_max` = 100.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"x_min, x_max, y_min, y_max = 0, 0, 0, 0\n\n# Get bounds from the tracks\nfor track in tracks:\n for state in track:\n x_min = min([state.state_vector[0], x_min])\n x_max = max([state.state_vector[0], x_max])\n y_min = min([state.state_vector[2], y_min])\n y_max = max([state.state_vector[2], y_max])\n\n# Get bounds from measurements\nfor measurement_set in all_measurements:\n for measurement in measurement_set:\n if type(measurement) == TrueDetection:\n x_min = min([measurement.state_vector[0], x_min])\n x_max = max([measurement.state_vector[0], x_max])\n y_min = min([measurement.state_vector[1], y_min])\n y_max = max([measurement.state_vector[1], y_max])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can use the :class:`~.Plotter` class to draw the tracks. Note that if the birth\ncomponent it plotted you will see its uncertainty ellipse centered around $(0, 0)$.\nThis ellipse need not cover the entire state space, as long as the distribution does.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Plot the tracks\nplotter = Plotter()\nplotter.plot_ground_truths(truths, [0, 2])\nplotter.plot_measurements(all_measurements, [0, 2], color='g')\nplotter.plot_tracks(tracks, [0, 2], uncertainty=True)\nplotter.ax.set_xlim(x_min-5, x_max+5)\nplotter.ax.set_ylim(y_min-5, y_max+5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Examining the Gaussian Mixtures\nAt every time step, the above GM-PHD algorithm creates a Gaussian mixture, which is\na distribution over our target space. The following sections take a closer look at what\nthis Gaussian really looks like. Note that the figures below only include the reduced\nstates which have weight greater than 0.05. This decreases the overall number of states\nshown in the mixture and makes it easier to examine. But you can change this threshold\nparameter in the Run The Tracker section.\n\nFirst we define a function that will help generate the z values for the Gaussian\nmixture. This lets us plot it later. This function has been updated from the one\nfound `here `_ from \n`this `_ GPL-3.0 licensed repository.\n\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import multivariate_normal\nfrom mpl_toolkits.mplot3d import Axes3D\nfrom matplotlib import cm\n\ndef get_mixture_density(x, y, weights, means, sigmas):\n # We use the quantiles as a parameter in the multivariate_normal function. We don't need to pass in any quantiles,\n # but the last axis must have the components x and y\n quantiles = np.empty(x.shape + (2,)) # if x.shape is (m,n) then quantiles.shape is (m,n,2)\n quantiles[:, :, 0] = x\n quantiles[:, :, 1] = y\n\n # Go through each gaussian in the list and add its PDF to the mixture\n z = np.zeros(x.shape)\n for gaussian in range(len(weights)):\n z += weights[gaussian]*multivariate_normal.pdf(x=quantiles, mean=means[gaussian, :], cov=sigmas[gaussian])\n return z"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For each timestep, create a new figure with 2 subplots. The plot on the left will\nshow the 3D Gaussian Mixture density. The plot on the right will show a 2D\nbirds-eye-view of the space, including the ground truths, detections, clutter, and tracks.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from matplotlib import animation\nfrom matplotlib import pyplot as plt\nfrom matplotlib.lines import Line2D # Will be used when making the legend\n\n# This is the function that updates the figure we will be animating. As parameters we must\n# pass in the elements that will be changed, as well as the index i\ndef animate(i, sf, truths, tracks, measurements, clutter):\n # Set up the axes\n axL.clear()\n axR.set_title('Tracking Space at k='+str(i))\n axL.set_xlabel(\"x\")\n axL.set_ylabel(\"y\")\n axL.set_title('PDF of the Gaussian Mixture')\n axL.view_init(elev=30, azim=-80)\n axL.set_zlim(0, 0.3)\n\n # Initialize the variables\n weights = [] # weights of each Gaussian. This is analogous to the probability of its existence\n means = [] # means of each Gaussian. This is equal to the x and y of its state vector\n sigmas = [] # standard deviation of each Gaussian.\n\n # Fill the lists of weights, means, and standard deviations\n for state in all_gaussians[i]:\n weights.append(state.weight)\n means.append([state.state_vector[0], state.state_vector[2]])\n sigmas.append([state.covar[0][0], state.covar[1][1]])\n means = np.array(means)\n sigmas = np.array(sigmas)\n\n # Generate the z values over the space and plot on the left axis\n zarray[:, :, i] = get_mixture_density(x, y, weights, means, sigmas)\n sf = axL.plot_surface(x, y, zarray[:, :, i], cmap=cm.RdBu, linewidth=0, antialiased=False)\n\n # Make lists to hold the new ground truths, tracks, detections, and clutter\n new_truths, new_tracks, new_measurements, new_clutter = [], [], [], []\n for truth in truths_by_time[i]:\n new_truths.append([truth.state_vector[0], truth.state_vector[2]])\n for state in tracks_by_time[i]:\n new_tracks.append([state.state_vector[0], state.state_vector[2]])\n for measurement in all_measurements[i]:\n if isinstance(measurement, TrueDetection):\n new_measurements.append([measurement.state_vector[0], measurement.state_vector[1]])\n elif isinstance(measurement, Clutter):\n new_clutter.append([measurement.state_vector[0], measurement.state_vector[1]])\n\n # Plot the contents of these lists on the right axis\n if new_truths:\n truths.set_offsets(new_truths)\n if new_tracks:\n tracks.set_offsets(new_tracks)\n if new_measurements:\n measurements.set_offsets(new_measurements)\n if new_clutter:\n clutter.set_offsets(new_clutter)\n\n # Create a legend. The use of Line2D is purely for the visual in the legend\n data_types = [Line2D([0], [0], color='white', marker='o', markerfacecolor='blue', markersize=15,\n label='Ground Truth'),\n Line2D([0], [0], color='white', marker='o', markerfacecolor='orange', markersize=15,\n label='Clutter'),\n Line2D([0], [0], color='white', marker='o', markerfacecolor='green', markersize=15,\n label='Detection'),\n Line2D([0], [0], color='white', marker='o', markerfacecolor='red', markersize=15,\n label='Track')]\n axR.legend(handles=data_types, bbox_to_anchor=(1.0, 1), loc='upper left')\n\n return sf, truths, tracks, measurements, clutter\n\n# Set up the x, y, and z space for the 3D axis\nxx = np.linspace(x_min-5, x_max+5, 100)\nyy = np.linspace(y_min-5, y_max+5, 100)\nx, y = np.meshgrid(xx, yy)\nzarray = np.zeros((100, 100, number_steps))\n\n# Create the matplotlib figure and axes. Here we will have two axes being animated in sync.\n# `axL` will be the a 3D axis showing the Gaussian mixture\n# `axR` will be be a 2D axis showing the ground truth, detections, and updated tracks at \n# each time step. \nfig = plt.figure(figsize=(16, 8))\naxL = fig.add_subplot(121, projection='3d')\naxR = fig.add_subplot(122)\naxR.set_xlim(x_min-5, x_max+5)\naxR.set_ylim(y_min-5, y_max+5)\n\n# Add an initial surface to the left axis and scattered points on the right axis. Doing\n# this now means that in the animate() function we only have to update these variables\nsf = axL.plot_surface(x, y, zarray[:, :, 0], cmap=cm.RdBu, linewidth=0, antialiased=False)\ntruths = axR.scatter(x_min-10, y_min-10, c='blue', linewidth=6, zorder=0.5)\ntracks = axR.scatter(x_min-10, y_min-10, c='red', linewidth=4, zorder=1)\nmeasurements = axR.scatter(x_min-10, y_min-10, c='green', linewidth=4, zorder=0.5)\nclutter = axR.scatter(x_min-10, y_min-10, c='orange', linewidth=4, zorder=0.5)\n\n# Create and display the animation\nfrom matplotlib import rc\nanim = animation.FuncAnimation(fig, animate, frames=number_steps, interval=500,\n fargs=(sf, truths, tracks, measurements, clutter), blit=False)\nrc('animation', html='jshtml')\nanim"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] B. Vo and W. Ma, \"The Gaussian Mixture Probability Hypothesis Density Filter,\" in IEEE \n Transactions on Signal Processing, vol. 54, no. 11, pp. 4091-4104, Nov. 2006, doi: \n 10.1109/TSP.2006.881190\n\n.. [#] D. E. Clark, K. Panta and B. Vo, \"The GM-PHD Filter Multiple Target Tracker,\" 2006 9th \n International Conference on Information Fusion, 2006, pp. 1-8, doi: 10.1109/ICIF.2006.301809\n\n.. [#] B. Ristic, D. Clark, B. Vo and B. Vo, \"Adaptive Target Birth Intensity for PHD and CPHD \n Filters,\" in IEEE Transactions on Aerospace and Electronic Systems, vol. 48, no. 2, pp. \n 1656-1668, Apr 2012, doi: 10.1109/TAES.2012.6178085\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK +9T ?^1 ^1 % 02_ExtendedKalmanFilterTutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 2 - Non-linear models: extended Kalman filter\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As well as introducing various aspects of the Stone Soup framework, the previous tutorial\ndetailed the use of a Kalman filter. A significant problem in using the Kalman filter is that it\nrequires transition and sensor models to be linear-Gaussian. In practice, many models are not\nlike this and so alternatives are required. We examine the most commonly-used of such\nalternatives, the extended Kalman filter [#]_ (EKF), in this tutorial.\n\n## Background\n\nRecall that the Kalman filter makes the following linear assumptions of the predicted state and\npredicted measurement:\n\n\\begin{align}\\mathbf{x}_{k|k-1} &= F_{k} \\mathbf{x}_{k-1} + B_{k}\\mathbf{u}_{k}\\\\\n \\mathbf{z}_{k|k-1} &= H_{k} \\mathbf{x}_{k|k-1}\\end{align}\n\nThe EKF gets round the fact that $f(\\cdot)$ and $h(\\cdot)$ are not of this form by\nmaking linear approximations about $\\mathbf{x}_{k\u22121} = \\boldsymbol{\\mu}_{k\u22121}$ or\n$\\mathbf{x}_{k|k\u22121} = \\boldsymbol{\\mu}_{k|k\u22121}$. It does this by way of a Taylor expansion,\n\n\\begin{align}g(\\mathbf{x})\\rvert_{\\mathbf{x}=\\boldsymbol{\\mu}} \\approx \\sum\\limits_{|\\alpha| \\ge 0}\n \\frac{(\\mathbf{x} - \\boldsymbol{\\mu})^{\\alpha}}{\\alpha !}\n (\\mathcal{D}^{\\alpha} g)(\\boldsymbol{\\mu})\\end{align}\n\nThis is usually truncated after the first term, meaning that either\n$\\tilde{F}(\\mathbf{x}_{k-1}) \\approx J(f)\\rvert_{\\mathbf{x}=\\boldsymbol{\\mu}_{k-1}}$ or\n$\\tilde{H}(\\mathbf{x}_{k|k-1}) \\approx J(h)\\rvert_{\\mathbf{x}=\\boldsymbol{\\mu}_{k|k-1}}$\nor both, where $J(\\cdot)$ is the Jacobian matrix. The calculation of the covariances,\nincluding the innovation covariance, then proceeds in exactly the same way as in the Kalman\nfilter using these approximations,\n\n\\begin{align}\\mathbf{x}_{k|k-1} &= f_{k}(\\mathbf{x}_{k-1}) \\\\\n P_{k|k-1} &= \\tilde{F}_{k}P_{k-1}\\tilde{F}_{k}^T + Q_{k}\\\\\n \\mathbf{x}_{k} &= \\mathbf{x}_{k|k-1} +\n \\tilde{K}_k(\\mathbf{z}_k - h_k(\\mathbf{x}_{k|k-1}))\\\\\n P_{k} &= P_{k|k-1} - \\tilde{K}_k \\tilde{H}_{k} P_{k|k-1}\\\\\\end{align}\n\nwhere,\n\n\\begin{align}\\tilde{K}_k &= P_{k|k-1} \\tilde{H}_{k}^T \\tilde{S}_k^{-1}\\\\\n \\tilde{S}_k &= \\tilde{H}_{k} P_{k|k-1} \\tilde{H}_{k}^T + R_{k}\\end{align}\n\n(we omit the control term in this analysis but it also can be incorporated as a non-linear\napproximation of the same form.)\n\nStone Soup implements the EKF\nfor non-linear functions using a finite difference method to find $J(\\cdot)$\nin the appropriate places. We'll now see this in action.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Nearly-constant velocity example\n\nWe're going to use the same target model as previously, but this time we use a non-linear sensor\nmodel.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\nfrom datetime import datetime, timedelta\nstart_time = datetime.now()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"np.random.seed(1991)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create a target\n\nAs in the previous tutorial, the target moves with near constant velocity NE from 0,0.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),\n ConstantVelocity(0.05)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\n\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot this\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.plot_ground_truths(truth, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A bearing-range sensor\n\nWe're going to simulate a 2d bearing-range sensor. An example of this is available in Stone Soup\nvia the :class:`~.CartesianToBearingRange` class. As the name suggests, this takes the Cartesian\nstate input and returns a relative bearing and range to target. Specifically,\n\n\\begin{align}\\begin{bmatrix}\n \\theta\\\\\n r\\\\\n \\end{bmatrix}\n =\n \\begin{bmatrix}\n \\arctan\\left(\\frac{y-y_p}{x-x_p}\\right)\\\\\n \\sqrt{(x-x_p)^2 + (y-y_p)^2}\n \\end{bmatrix}\\end{align}\n\nwhere $x_p,y_p$ is the 2d Cartesian position of the sensor and $x,y$ that of the\ntarget. Note also that the arctan function has to resolve the quadrant ambiguity and so is\nimplemented as the :class:`~.numpy.arctan2`$(y,x)$ function in Python.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.measurement.nonlinear import CartesianToBearingRange\nsensor_x = 50 # Placing the sensor off-centre\nsensor_y = 0\n\nmeasurement_model = CartesianToBearingRange(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.diag([np.radians(0.2), 1]), # Covariance matrix. 0.2 degree variance in\n # bearing and 1 metre in range\n translation_offset=np.array([[sensor_x], [sensor_y]]) # Offset measurements to location of\n # sensor in cartesian.\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We create a set of detections using this sensor model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.detection import Detection\n\nmeasurements = []\nfor state in truth:\n measurement = measurement_model.function(state, noise=True)\n measurements.append(Detection(measurement, timestamp=state.timestamp,\n measurement_model=measurement_model))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the measurements. Where the model is nonlinear the plotting function uses the inverse\nfunction to get coordinates.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_measurements(measurements, [0, 2])\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up the extended Kalman filter elements\n\nAnalogously to the Kalman filter, we now create the predictor and updater. As is our custom\nthe predictor takes a transition model and the updater a measurement model. Note that if either\nof these models are linear then the extended predictor/updater defaults to its Kalman equivalent.\nIn fact the extended Kalman filter classes inherit nearly all of their functionality from the\nKalman classes. The only difference being that instead of returning a matrix, in the extended\nversion the :meth:`~.MeasurementModel.matrix()` function returns the Jacobian.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import ExtendedKalmanPredictor\npredictor = ExtendedKalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import ExtendedKalmanUpdater\nupdater = ExtendedKalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the extended Kalman filter\nFirst, we'll create a prior state.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import GaussianState\nprior = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next iterate over hypotheses and place in a track.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.hypothesis import SingleHypothesis\nfrom stonesoup.types.track import Track\n\ntrack = Track()\nfor measurement in measurements:\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement) # Group a prediction and measurement\n post = updater.update(hypothesis)\n track.append(post)\n prior = track[-1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track complete with error ellipses at each estimate.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], uncertainty=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Key points\n1. Non-linear models can be incorporated into the tracking scheme through the use of the\n appropriate combination of :class:`~.Predictor`/:class:`~.TransitionModel` or\n :class:`~.Updater`/:class:`~.MeasurementModel`\n2. Because Stone Soup uses inheritance, the amount of engineering required to achieve this is\n minimal and the interface is the same. A user can apply the EKF components in exactly the same\n way as the KF.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The first order approximations used by the EKF provide a simple way to handle non-linear tracking\nproblems. However, in highly non-linear systems these simplifications can lead to large errors in\nboth the posterior state mean and covariance. In instances where we have noisy transition, or\nperhaps unreliable measurement, this could lead to a sub-optimal performance or even divergence\nof the filter. In the next tutorial, we see how the **unscented Kalman filter** can begin to\naddresses these issues.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Anderson & Moore 2012, Optimal filtering,\n (http://users.cecs.anu.edu.au/~john/papers/BOOK/B02.PDF)\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK 09T~!J) ) 07_PDATutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 7 - Probabilistic data association tutorial\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Making an assignment between a single track and a single measurement can be problematic. In the\nprevious tutorials you may have encountered the phenomenon of *track seduction*. This occurs\nwhen clutter, or other track, points are mis-associated with a prediction. If this happens\nrepeatedly (as can be the case in high-clutter or low-$p_d$ situations) the track can\ndeviate significantly from the truth.\n\nRather than make a firm assignment at each time-step, we could work out the probability that each\nmeasurement should be assigned to a particular target. We could then propagate a measure of\nthese collective probabilities to mitigate the effect of track seduction.\n\nPictorially:\n\n- Calculate a posterior for each hypothesis;\n\n\n\n- Weight each posterior state according to the probability that its corresponding hypothesis\n was true (including the probability of missed-detection);\n\n\n\n- Merge the resulting estimate states in to a single posterior approximation.\n\n\n\nThis results in a more robust approximation to the posterior state covariances that incorporates\nnot only the uncertainty in state, but also in the association.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A PDA filter example\n\n### Ground truth\n\nSo, as before, we'll first begin by simulating some ground truth.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\n\nfrom datetime import datetime\nfrom datetime import timedelta\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\nnp.random.seed(1991)\n\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Add clutter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\n\nprob_detect = 0.9 # 90% chance of detection.\n\nall_measurements = []\nfor state in truth:\n measurement_set = set()\n\n # Generate detection.\n if np.random.rand() <= prob_detect:\n measurement = measurement_model.function(state, noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter.\n truth_x = state.state_vector[0]\n truth_y = state.state_vector[2]\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n all_measurements.append(measurement_set)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth and measurements with clutter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.ax.set_ylim(0, 25)\nplotter.plot_ground_truths(truth, [0, 2])\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the predictor and updater\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialise Probabilistic Data Associator\nThe :class:`~.PDAHypothesiser` and :class:`~.PDA` associator generate track predictions and\ncalculate probabilities for all prediction-detection pairs for a single prediction and multiple\ndetections.\nThe :class:`~.PDAHypothesiser` returns a collection of :class:`~.SingleProbabilityHypothesis`\ntypes. The :class:`~.PDA` takes these hypotheses and returns a dictionary of key-value pairings\nof each track and detection which it is to be associated with.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.probability import PDAHypothesiser\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.125,\n prob_detect=prob_detect)\n\nfrom stonesoup.dataassociator.probability import PDA\ndata_associator = PDA(hypothesiser=hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the PDA Filter\n\nWith these components, we can run the simulated data and clutter through the Kalman filter.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Create prior\nfrom stonesoup.types.state import GaussianState\nprior = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\n\n# Loop through the predict, hypothesise, associate and update steps.\nfrom stonesoup.types.track import Track\nfrom stonesoup.types.array import StateVectors # For storing state vectors during association\nfrom stonesoup.functions import gm_reduce_single # For merging states to get posterior estimate\nfrom stonesoup.types.update import GaussianStateUpdate # To store posterior estimate\n\ntrack = Track([prior])\nfor n, measurements in enumerate(all_measurements):\n hypotheses = data_associator.associate({track},\n measurements,\n start_time + timedelta(seconds=n))\n\n hypotheses = hypotheses[track]\n\n # Loop through each hypothesis, creating posterior states for each, and merge to calculate\n # approximation to actual posterior state mean and covariance.\n posterior_states = []\n posterior_state_weights = []\n for hypothesis in hypotheses:\n if not hypothesis:\n posterior_states.append(hypothesis.prediction)\n else:\n posterior_state = updater.update(hypothesis)\n posterior_states.append(posterior_state)\n posterior_state_weights.append(\n hypothesis.probability)\n\n means = StateVectors([state.state_vector for state in posterior_states])\n covars = np.stack([state.covar for state in posterior_states], axis=2)\n weights = np.asarray(posterior_state_weights)\n\n # Reduce mixture of states to one posterior estimate Gaussian.\n post_mean, post_covar = gm_reduce_single(means, covars, weights)\n\n # Add a Gaussian state approximation to the track.\n track.append(GaussianStateUpdate(\n post_mean, post_covar,\n hypotheses,\n hypotheses[0].measurement.timestamp))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], uncertainty=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n1. Bar-Shalom Y, Daum F, Huang F 2009, The Probabilistic Data Association Filter, IEEE Control\nSystems Magazine\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK /9T%.@Z, Z, , 06_DataAssociation-MultiTargetTutorial.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 6 - Data association - multi-target tracking tutorial\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tracking multiple targets through clutter\n\nAs we've seen, more often than not, the difficult part of state estimation concerns the ambiguous\nassociation of predicted states with measurements. This happens whenever there is more that one\ntarget under consideration, there are false alarms or clutter, targets can appear and disappear.\nThat is to say it happens everywhere.\n\nIn this tutorial we introduce **global nearest neighbour** data association, which\nattempts to find a globally-consistent collection of hypotheses such that some overall score of\ncorrect association is maximised.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Background\nMake the assumption that each target generates, at most, one measurement, and that\none measurement is generated by, at most, a single target, or is a clutter point. Under these\nassumptions, global nearest neighbour will assign measurements to predicted measurements to\nminimise a total (global) cost which is a function of the sum of innovations. This is an example\nof an assignment problem in combinatorial optimisation.\n\nWith multiple targets to track, the :class:`~.NearestNeighbour` algorithm compiles a list of all\nhypotheses and selects pairings with higher scores first.\n\n\n\nIn the diagram above, the top detection is selected for association with the blue track,\nas this has the highest score/probability ($0.5$), and (as each measurement is associated\nat most once) the remaining detection must then be associated with the orange track, giving a net\nglobal score/probability of $0.51$.\n\nThe :class:`~.GlobalNearestNeighbour` evaluates all valid (distance-based) hypotheses\n(measurement-prediction pairs) and selects the subset with the\ngreatest net 'score' (the collection of hypotheses pairs which have a minimum sum of distances\noverall).\n\n\n\nIn the diagram above, the blue track is associated to the bottom detection even though the top\ndetection scores higher relative to it. This association leads to a global score/probability of\n$0.6$ - a better net score/probability than the $0.51$ returned by the nearest\nneighbour algorithm.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A multi-target simulation\nWe start by simulating 2 targets moving in different directions across the 2D Cartesian plane.\nThey start at (0, 0) and (0, 20) and cross roughly half-way through their transit.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\nfrom datetime import datetime, timedelta\nstart_time = datetime.now()\n\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truth\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"np.random.seed(1991)\n\ntruths = set()\n\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),\n ConstantVelocity(0.005)])\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)\n\ntruth = GroundTruthPath([GroundTruthState([0, 1, 20, -1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))\ntruths.add(truth)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.ax.set_ylim(0, 25)\nplotter.plot_ground_truths(truths, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate detections with clutter\nNext, generate detections with clutter just as in the previous tutorial. This time, we generate\nclutter about each state at each time-step.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import uniform\n\nfrom stonesoup.types.detection import TrueDetection\nfrom stonesoup.types.detection import Clutter\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\nmeasurement_model = LinearGaussian(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.array([[0.75, 0],\n [0, 0.75]])\n )\nall_measurements = []\n\nfor k in range(20):\n measurement_set = set()\n\n for truth in truths:\n # Generate actual detection from the state with a 10% chance that no detection is received.\n if np.random.rand() <= 0.9:\n measurement = measurement_model.function(truth[k], noise=True)\n measurement_set.add(TrueDetection(state_vector=measurement,\n groundtruth_path=truth,\n timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n\n # Generate clutter at this time-step\n truth_x = truth[k].state_vector[0]\n truth_y = truth[k].state_vector[2]\n for _ in range(np.random.randint(10)):\n x = uniform.rvs(truth_x - 10, 20)\n y = uniform.rvs(truth_y - 10, 20)\n measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=truth[k].timestamp,\n measurement_model=measurement_model))\n all_measurements.append(measurement_set)\n\n# Plot true detections and clutter.\nplotter.plot_measurements(all_measurements, [0, 2], color='g')\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the Kalman predictor and updater\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)\n\nfrom stonesoup.updater.kalman import KalmanUpdater\nupdater = KalmanUpdater(measurement_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As in the clutter tutorial, we will quantify predicted-measurement to measurement distance\nusing the Mahalanobis distance.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nhypothesiser = DistanceHypothesiser(predictor, updater, measure=Mahalanobis(), missed_distance=3)\n\n\nfrom stonesoup.dataassociator.neighbour import GlobalNearestNeighbour\ndata_associator = GlobalNearestNeighbour(hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the Kalman filters\n\nWe create 2 priors reflecting the targets' initial states.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import GaussianState\nprior1 = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)\nprior2 = GaussianState([[0], [1], [20], [-1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp=start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Loop through the predict, hypothesise, associate and update steps.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.track import Track\ntracks = {Track([prior1]), Track([prior2])}\n\nfor n, measurements in enumerate(all_measurements):\n # Calculate all hypothesis pairs and associate the elements in the best subset to the tracks.\n hypotheses = data_associator.associate(tracks,\n measurements,\n start_time + timedelta(seconds=n))\n for track in tracks:\n hypothesis = hypotheses[track]\n if hypothesis.measurement:\n post = updater.update(hypothesis)\n track.append(post)\n else: # When data associator says no detections are good enough, we'll keep the prediction\n track.append(hypothesis.prediction)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting tracks\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(tracks, [0, 2], uncertainty=True)\nplotter.fig"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK -9T.f/1 1 04_ParticleFilter.ipynb{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 4 - Sampling methods: particle filter\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the previous tutorials we encountered some shortcomings in describing distributions as\nGaussians, albeit with considerable flexibility in coping with the non-linear transforms.\n\nSampling methods offer an attractive alternative to such parametric methods in that there is\nno need for complicated though approximate covariance calculations. In this tutorial we look at a\nclass of *sequential Monte Carlo sampling* methods, and in particular, the *particle filter*.\n\nColloquially we can think of a particle filter as a series of point samples being recursed\nthrough the predict-update stages of a Bayesian filter. The diversity of samples compensates for\nthe lack of a covariance estimate, though often at the expense of increased computation\nrequirements.\n\n## Background\n\nIn more detail, we seek to approximate the posterior state estimate as a sum of samples, or\nparticles,\n\n\\begin{align}p(\\textbf{x}_{k}|\\textbf{z}_{1:k}) \\approx\n \\sum_{i} w_{k}^i \\delta (\\textbf{x}_{k} - \\textbf{x}_{k}^i)\\end{align}\n\nwhere $w_{k}^i$ are weights such that $\\sum\\limits_{i} w_{k}^i = 1$. This posterior\ncan be calculated, and subsequently maintained, by successive applications of the\nChapman-Kolmogorov equation and Bayes rule in an analogous manner to the Kalman family of\nfilters of previous tutorials. There is considerable flexibility in how to sample from these\nvarious distributions and the interested reader can refer to [#]_ for more detail.\n\nThe present tutorial focuses on a so-called *sequential importance resampling* filter. This is\nfacilitated by a number of Stone Soup classes. The weight-update equation is,\n\n\\begin{align}w^i_k = w^i_{k-1}\n \\frac{p(\\mathbf{z}_k|\\mathbf{x}^i_k) p(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1})}\n {q(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1},\\mathbf{z}^i_{1:k})}\\end{align}\n\nwhere $p(\\mathbf{z}_k | \\mathbf{x}^i_k)$ is the likelihood distribution (as defined by the\n:class:`~.MeasurementModel`) and $p(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1})$ is the transition\nprobability distribution (:class:`~.TransitionModel`). The $q(\\cdot)$ distribution -- the\nimportance density -- should approximate the posterior distribution, while still being easy to\nsample from.\n\nA common occurrence in such methods is that of *sample impoverishment*. After a few iterations,\nall but a small number of the particles will have negligible weight. This affects accuracy and\nwastes computation on particles with little effect on the estimate. Many resampling schemes\nexist and are designed to redistribute particles to areas where the posterior probability is\nhigher. In Stone Soup such resampling is accomplished by a :class:`~.Resampler`. More detail is\nprovided in the\nexample below.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Nearly-constant velocity example\nWe continue in the same vein as the previous tutorials.\n\n### Ground truth\nImport the necessary libraries\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\n\nfrom datetime import datetime\nfrom datetime import timedelta\nstart_time = datetime.now()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"np.random.seed(1991)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialise Stone Soup ground-truth and transition models.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),\n ConstantVelocity(0.05)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the truth path\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"for k in range(1, 21):\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=start_time+timedelta(seconds=k)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter\nplotter = Plotter()\nplotter.plot_ground_truths(truth, [0, 2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialise the bearing, range sensor using the appropriate measurement model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.measurement.nonlinear import CartesianToBearingRange\nfrom stonesoup.types.detection import Detection\n\nsensor_x = 50\nsensor_y = 0\n\nmeasurement_model = CartesianToBearingRange(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.diag([np.radians(0.2), 1]),\n translation_offset=np.array([[sensor_x], [sensor_y]])\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Populate the measurement array\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurements = []\nfor state in truth:\n measurement = measurement_model.function(state, noise=True)\n measurements.append(Detection(measurement, timestamp=state.timestamp,\n measurement_model=measurement_model))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot those measurements\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_measurements(measurements, [0, 2])\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up the particle filter\nAnalogously to the Kalman family, we create a :class:`~.ParticlePredictor` and a\n:class:`~.ParticleUpdater` which take responsibility for the predict and update steps\nrespectively. These require a :class:`~.TransitionModel` and :class:`~.MeasurementModel` as\nbefore.\nTo cope with sample sparsity we also include a resampler, in this instance\n:class:`~.SystematicResampler`, which is passed to the updater. It should be noted that there are\nmany resampling schemes, and almost as many choices as to when to undertake resampling. The\nsystematic resampler is described in [#]_, and in what follows below resampling is undertaken\nat each time-step.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.particle import ParticlePredictor\npredictor = ParticlePredictor(transition_model)\nfrom stonesoup.resampler.particle import SystematicResampler\nresampler = SystematicResampler()\nfrom stonesoup.updater.particle import ParticleUpdater\nupdater = ParticleUpdater(measurement_model, resampler)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialise a prior\nTo start we create a prior estimate. This is a set of :class:`~.Particle` and we sample from\nGaussian distribution (using the same parameters we had in the previous examples).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import multivariate_normal\n\nfrom stonesoup.types.particle import Particles\nfrom stonesoup.types.numeric import Probability # Similar to a float type\nfrom stonesoup.types.state import ParticleState\nfrom stonesoup.types.array import StateVectors\n\nnumber_particles = 1000\n\n# Sample from the prior Gaussian distribution\nsamples = multivariate_normal.rvs(np.array([0, 1, 0, 1]),\n np.diag([1.5, 0.5, 1.5, 0.5]),\n size=number_particles)\n\n# Create state vectors and weights for particles\nparticles = Particles(state_vector=StateVectors(samples.T),\n weight=np.array([Probability(1/number_particles)]*number_particles)\n )\n\n# Create prior particle state.\nprior = ParticleState(particles, timestamp=start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the tracker\nWe now run the predict and update steps, propagating the collection of particles and resampling\nwhen told to (at every step).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.hypothesis import SingleHypothesis\nfrom stonesoup.types.track import Track\n\ntrack = Track()\nfor measurement in measurements:\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement)\n post = updater.update(hypothesis)\n track.append(post)\n prior = track[-1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track with the sample points at each iteration.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], particle=True)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Key points\n1. Sampling methods offer an attractive alternative to Kalman-based filtering for recursive\n state estimation.\n2. The particle filter trades off a more subtle quantification of a non-Gaussian\n estimate against increased computational effort.\n3. Very often particle filters encounter sample impoverishment and require a resampling step.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Sanjeev Arulampalam M., Maskell S., Gordon N., Clapp T. 2002, Tutorial on Particle Filters\n for Online Nonlinear/Non-Gaussian Bayesian Tracking, IEEE transactions on signal\n processing, vol. 50, no. 2\n\n.. [#] Carpenter J., Clifford P., Fearnhead P. 1999, An improved particle filter for non-linear\n problems, IEE Proc., Radar Sonar Navigation, 146:2\u20137\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK ,9Tv+F +F &