{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# 4 - Sampling methods: particle filter\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the previous tutorials we encountered some shortcomings in describing distributions as\nGaussians, albeit with considerable flexibility in coping with the non-linear transforms.\n\nSampling methods offer an attractive alternative to such parametric methods in that there is\nno need for complicated though approximate covariance calculations. In this tutorial we look at a\nclass of *sequential Monte Carlo sampling* methods, and in particular, the *particle filter*.\n\nColloquially we can think of a particle filter as a series of point samples being recursed\nthrough the predict-update stages of a Bayesian filter. The diversity of samples compensates for\nthe lack of a covariance estimate, though often at the expense of increased computation\nrequirements.\n\n## Background\n\nIn more detail, we seek to approximate the posterior state estimate as a sum of samples, or\nparticles,\n\n\\begin{align}p(\\textbf{x}_{k}|\\textbf{z}_{1:k}) \\approx\n \\sum_{i} w_{k}^i \\delta (\\textbf{x}_{k} - \\textbf{x}_{k}^i)\\end{align}\n\nwhere $w_{k}^i$ are weights such that $\\sum\\limits_{i} w_{k}^i = 1$. This posterior\ncan be calculated, and subsequently maintained, by successive applications of the\nChapman-Kolmogorov equation and Bayes rule in an analogous manner to the Kalman family of\nfilters of previous tutorials. There is considerable flexibility in how to sample from these\nvarious distributions and the interested reader can refer to [#]_ for more detail.\n\nThe present tutorial focuses on a so-called *sequential importance resampling* filter. This is\nfacilitated by a number of Stone Soup classes. The weight-update equation is,\n\n\\begin{align}w^i_k = w^i_{k-1}\n \\frac{p(\\mathbf{z}_k|\\mathbf{x}^i_k) p(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1})}\n {q(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1},\\mathbf{z}^i_{1:k})}\\end{align}\n\nwhere $p(\\mathbf{z}_k | \\mathbf{x}^i_k)$ is the likelihood distribution (as defined by the\n:class:`~.MeasurementModel`) and $p(\\mathbf{x}^i_k|\\mathbf{x}^1_{k-1})$ is the transition\nprobability distribution (:class:`~.TransitionModel`). The $q(\\cdot)$ distribution -- the\nimportance density -- should approximate the posterior distribution, while still being easy to\nsample from.\n\nA common occurrence in such methods is that of *sample impoverishment*. After a few iterations,\nall but a small number of the particles will have negligible weight. This affects accuracy and\nwastes computation on particles with little effect on the estimate. Many resampling schemes\nexist and are designed to redistribute particles to areas where the posterior probability is\nhigher. In Stone Soup such resampling is accomplished by a :class:`~.Resampler`. More detail is\nprovided in the\nexample below.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Nearly-constant velocity example\nWe continue in the same vein as the previous tutorials.\n\n### Ground truth\nImport the necessary libraries\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\n\nfrom datetime import datetime\nfrom datetime import timedelta\nstart_time = datetime.now().replace(microsecond=0)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"np.random.seed(1991)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialise Stone Soup ground-truth and transition models.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),\n ConstantVelocity(0.05)])\ntimesteps = [start_time]\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the truth path\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"for k in range(1, 21):\n timesteps.append(start_time+timedelta(seconds=k))\n truth.append(GroundTruthState(\n transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=timesteps[k]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the ground truth.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import AnimatedPlotterly\nplotter = AnimatedPlotterly(timesteps, tail_length=0.3)\nplotter.plot_ground_truths(truth, [0, 2])\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialise the bearing, range sensor using the appropriate measurement model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.measurement.nonlinear import CartesianToBearingRange\nfrom stonesoup.types.detection import Detection\n\nsensor_x = 50\nsensor_y = 0\n\nmeasurement_model = CartesianToBearingRange(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.diag([np.radians(0.2), 1]),\n translation_offset=np.array([[sensor_x], [sensor_y]])\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Populate the measurement array\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"measurements = []\nfor state in truth:\n measurement = measurement_model.function(state, noise=True)\n measurements.append(Detection(measurement, timestamp=state.timestamp,\n measurement_model=measurement_model))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot those measurements\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_measurements(measurements, [0, 2])\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up the particle filter\nAnalogously to the Kalman family, we create a :class:`~.ParticlePredictor` and a\n:class:`~.ParticleUpdater` which take responsibility for the predict and update steps\nrespectively. These require a :class:`~.TransitionModel` and :class:`~.MeasurementModel` as\nbefore.\nTo cope with sample sparsity we also include a resampler, in this instance\n:class:`~.SystematicResampler`, which is passed to the updater. It should be noted that there are\nmany resampling schemes, and almost as many choices as to when to undertake resampling. The\nsystematic resampler is described in [#]_, and in what follows below resampling is undertaken\nat each time-step.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Use of Effective Sample Size resampler (ESS)\nResampling removes particles with a low weight and duplicates particles with a high weight.\nA side-effect of this is that additional variance is added. Use of `~.SystematicResampler`\nat each time-step means that additional variance is being introduced when it may not necessarily\nbe required. To reduce the additional variance, it may be optimal to resample less frequently.\n\nThe Effective Sample Size resampler (`~.ESSResampler`) compares the variance of the unnormalised weights\nof the particles to a pre-specified threshold, and only resamples when the variance is greater than this threshold.\nThis threshold is often calculated by the ESS criterion (at time n) given by:\n\n\\begin{align}ESS = \\left(\\sum_{i=1}^{N} (W_{n}^i)^2\\right)^{-1}\\end{align}\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.particle import ParticlePredictor\npredictor = ParticlePredictor(transition_model)\nfrom stonesoup.resampler.particle import ESSResampler\nresampler = ESSResampler()\nfrom stonesoup.updater.particle import ParticleUpdater\nupdater = ParticleUpdater(measurement_model, resampler)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialise a prior\nTo start we create a prior estimate. This is a :class:`~.ParticleState` which describes\nthe state as a distribution of particles using :class:`~.StateVectors` and weights.\nThis is sampled from the Gaussian distribution (using the same parameters we\nhad in the previous examples).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from scipy.stats import multivariate_normal\n\nfrom stonesoup.types.numeric import Probability # Similar to a float type\nfrom stonesoup.types.state import ParticleState\nfrom stonesoup.types.array import StateVectors\n\nnumber_particles = 1000\n\n# Sample from the prior Gaussian distribution\nsamples = multivariate_normal.rvs(np.array([0, 1, 0, 1]),\n np.diag([1.5, 0.5, 1.5, 0.5]),\n size=number_particles)\n\n# Create prior particle state.\nprior = ParticleState(state_vector=StateVectors(samples.T),\n weight=np.array([Probability(1/number_particles)]*number_particles),\n timestamp=start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Run the tracker\nWe now run the predict and update steps, propagating the collection of particles and resampling\nwhen told to (at every step).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.hypothesis import SingleHypothesis\nfrom stonesoup.types.track import Track\n\ntrack = Track()\nfor measurement in measurements:\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement)\n post = updater.update(hypothesis)\n track.append(post)\n prior = track[-1]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot the resulting track with the sample points at each iteration. Can also change 'plot_history'\nto True if wanted.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter.plot_tracks(track, [0, 2], particle=True, plot_history=False)\nplotter.fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Key points\n1. Sampling methods offer an attractive alternative to Kalman-based filtering for recursive\n state estimation.\n2. The particle filter trades off a more subtle quantification of a non-Gaussian\n estimate against increased computational effort.\n3. Very often particle filters encounter sample impoverishment and require a resampling step.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Sanjeev Arulampalam M., Maskell S., Gordon N., Clapp T. 2002, Tutorial on Particle Filters\n for Online Nonlinear/Non-Gaussian Bayesian Tracking, IEEE transactions on signal\n processing, vol. 50, no. 2\n\n.. [#] Carpenter J., Clifford P., Fearnhead P. 1999, An improved particle filter for non-linear\n problems, IEE Proc., Radar Sonar Navigation, 146:2\u20137\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}