Source code for stonesoup.smoother.kalman

# -*- coding: utf-8 -*-
import copy
from functools import partial

import numpy as np

from .base import Smoother
from ..base import Property
from ..types.prediction import Prediction, GaussianStatePrediction
from ..types.update import Update, GaussianStateUpdate
from ..models.base import LinearModel
from ..models.transition.base import TransitionModel
from ..models.transition.linear import LinearGaussianTransitionModel
from ..functions import gauss2sigma, unscented_transform


[docs]class KalmanSmoother(Smoother): r""" The linear-Gaussian or Rauch-Tung-Striebel smoother, colloquially the Kalman smoother [1]_. The transition model is therefore linear-Gaussian. No control model is currently implemented. TODO: Include a control model The smooth function undertakes the backward algorithm on a :class:`~.Track()` object. This is done by starting at the final index in the track, :math:`K` and proceeding from :math:`K \rightarrow 1` via: .. math:: \mathbf{x}_{k|k-1} &= F_{k} \mathbf{x}_{k-1} P_{k|k-1} &= F_{k} P_{k-1} F_{k}^T + Q_{k} G_k &= P_{k-1} F_{k}^T P_{k|k-1}^{-1} \mathbf{x}_{k-1}^s &= \mathbf{x}_{k-1} + G_k (\mathbf{x}_{k}^s - \mathbf{x}_{k|k-1}) P_{k-1}^s &= P_{k-1} + G_k (P_{k}^s - P_{k|k-1}) G_k^T where :math:`\mathbf{x}_{K}^s = \mathbf{x}_{K}` and :math:`P_K^s = P_K`. The predicted state vector and covariance are retrieved from the Track via predicted state or updated state via the links therein. Note that this means that the first two equations are not calculated, the results merely retrieved. This smoother is therefore strictly Kalman only in the backward portion. The prediction might have come by any number of means. If present, the transition model (providing :math:`F` and :math:`Q`) in the prediction is used. This allows for a dynamic transition model (i.e. one that changes with :math:`k`). Otherwise, the (static) transition model is used, defined on smoother initialisation. References .. [1] Särkä S. 2013, Bayesian filtering and smoothing, Cambridge University Press """ transition_model: LinearGaussianTransitionModel = Property( doc="The transition model. The :meth:`smooth` function will initially look for a " "transition model in the prediction. If that is not found then this one is used.") def _prediction(self, state): """ Return the predicted state, either from the prediction directly, or from the attached hypothesis if the queried state is an Update. If not a :class:`~.GaussianStatePrediction` or :class:`~.GaussianStateUpdate` a :class:`~.TypeError` is thrown. Parameters ---------- state : :class:`~.GaussianStatePrediction` or :class:`~.GaussianStateUpdate` Returns ------- : :class:`~.GaussianStatePrediction` The prediction associated with the prediction (i.e. itself), or the prediction from the hypothesis used to generate an update. """ if isinstance(state, GaussianStatePrediction): return state elif isinstance(state, GaussianStateUpdate): return state.hypothesis.prediction else: raise TypeError("States must be GaussianStatePredictions or GaussianStateUpdates.") def _transition_model(self, state): """ If it exists, return the transition model from the prediction associated with input state. If that doesn't exist then use the (static) transition model defined by the smoother. Parameters ---------- state : :class:`~.GaussianStatePrediction` or :class:`~.GaussianStateUpdate` Returns ------- : :class:`~.TransitionModel` The transition model to be associated with state """ # Is there a transition model linked to the prediction? if getattr(self._prediction(state), "transition_model", None) is not None: transition_model = self._prediction(state).transition_model else: # No? Return the class attribute transition_model = self.transition_model return transition_model def _transition_matrix(self, state, **kwargs): """ Return the transition matrix Parameters ---------- state : :class:`~.State` The input state (to check for a linked prediction) **kwargs These are passed to the :meth:`matrix()` function Returns ------- : :class:`numpy.ndarray` The transition matrix """ return self._transition_model(state).matrix(**kwargs) def _smooth_gain(self, state, prediction, **kwargs): """Calculate the smoothing gain Parameters ---------- state : :class:`~.State` The input state prediction : :class:`~.GaussianStatePrediction` The prediction (from the subsequent state) Returns ------- : Matrix The smoothing gain """ return state.covar @ self._transition_matrix(state, **kwargs).T @ np.linalg.inv( prediction.covar)
[docs] def smooth(self, track): """ Perform the backward recursion to smooth the track. Parameters ---------- track : :class:`~.Track` The input track. Returns ------- : :class:`~.Track` Smoothed track """ subsq_state = track[-1] smoothed_states = [subsq_state] for state in reversed(track[:-1]): # Delta t time_interval = subsq_state.timestamp - state.timestamp # Retrieve the prediction from the subsequent (k+1th) timestep accessed previously prediction = self._prediction(subsq_state) # The smoothing gain, mean and covariance ksmooth_gain = self._smooth_gain(state, prediction, time_interval=time_interval) smooth_mean = state.state_vector + ksmooth_gain @ (subsq_state.state_vector - prediction.state_vector) smooth_covar = state.covar + \ ksmooth_gain @ (subsq_state.covar - prediction.covar) @ ksmooth_gain.T # Create a new type called SmoothedState? if isinstance(state, Update): subsq_state = Update.from_state(state, smooth_mean, smooth_covar, timestamp=state.timestamp, hypothesis=state.hypothesis) elif isinstance(state, Prediction): subsq_state = Prediction.from_state(state, smooth_mean, smooth_covar, timestamp=state.timestamp) smoothed_states.insert(0, subsq_state) # Deep copy existing track, but avoid copying original states, as this would be super # expensive. This works by informing deepcopy that the smoothed states are the # replacement object for the original track states. smoothed_track = copy.deepcopy(track, {id(track.states): smoothed_states}) return smoothed_track
[docs]class ExtendedKalmanSmoother(KalmanSmoother): r""" The extended version of the Kalman smoother. The equations are modified slightly, analogously to the extended Kalman filter, .. math:: \mathbf{x}_{k|k-1} &= f_{k} (\mathbf{x}_{k-1}) F_k &\approx J_f (\mathbf{x}_{k-1}) where :math:`J_f (\mathbf{x}_{k-1})` is the Jacobian matrix evaluated at :math:`\mathbf{x}_{k-1}`. The rest of the calculation proceeds as with the Kalman smoother. In fact the first equation isn't calculated -- it's presumed to have been undertaken by a filter when building the track; similarly for the predicted covariance. In practice, the only difference between this and the Kalman smoother is in the use of the linearised transition matrix to calculate the smoothing gain. """ transition_model: TransitionModel = Property(doc="The transition model to be used.") def _transition_matrix(self, state, **kwargs): r"""Returns the transition matrix, a matrix if the model is linear, or approximated as Jacobian otherwise. Parameters ---------- state : :class:`~.State` :math:`\mathbf{x}_{k-1}` **kwargs : various, optional These are passed to :meth:`~.TransitionModel.matrix` or :meth:`~.TransitionModel.jacobian` Returns ------- : :class:`numpy.ndarray` The transition matrix, :math:`F_k`, if linear (i.e. :meth:`TransitionModel.matrix` exists, or :meth:`~.TransitionModel.jacobian` if not) """ if isinstance(self._transition_model(state), LinearModel): return self._transition_model(state).matrix(**kwargs) else: return self._transition_model(state).jacobian(state, **kwargs)
[docs]class UnscentedKalmanSmoother(KalmanSmoother): r"""The unscented version of the Kalman filter. As with the parent version of the Kalman smoother, the mean and covariance of the prediction are retrieved from the track. The unscented transform is used to calculate the smoothing gain. """ transition_model: TransitionModel = Property(doc="The transition model to be used.") alpha: float = Property( default=0.5, doc="Primary sigma point spread scaling parameter. Default is 0.5.") beta: float = Property( default=2, doc="Used to incorporate prior knowledge of the distribution. If the " "true distribution is Gaussian, the value of 2 is optimal. " "Default is 2") kappa: float = Property( default=0, doc="Secondary spread scaling parameter. Default is calculated as " "3-Ns") def _smooth_gain(self, state, prediction, time_interval, **kwargs): """Calculate the smoothing gain Parameters ---------- state : :class:`~.State` The input state prediction : :class:`~.GaussianStatePrediction` The prediction from the subsequent timestep Returns ------- : Matrix The smoothing gain """ # This ensures that the time interval is correctly applied. transition_function = partial( self._transition_model(state).function, time_interval=time_interval) # Get the sigma points from the mean and covariance. sigma_point_states, mean_weights, covar_weights = gauss2sigma( state, self.alpha, self.beta, self.kappa) # Use the unscented transform to return the cross-covariance _, _, cross_covar, _, _, _ = unscented_transform( sigma_point_states, mean_weights, covar_weights, transition_function) return cross_covar @ np.linalg.inv(prediction.covar)