Source code for stonesoup.updater.kalman

import warnings

import numpy as np
import scipy.linalg as la
from functools import lru_cache

from ..base import Property
from .base import Updater
from ..types.array import CovarianceMatrix, StateVector
from ..types.prediction import MeasurementPrediction
from ..types.update import Update
from ..models.base import LinearModel
from ..models.measurement.linear import LinearGaussian
from ..models.measurement import MeasurementModel
from ..functions import gauss2sigma, unscented_transform, cubature_transform
from ..measures import Measure, Euclidean


[docs] class KalmanUpdater(Updater): r"""A class which embodies Kalman-type updaters; also a class which performs measurement update step as in the standard Kalman filter. The Kalman updaters assume :math:`h(\mathbf{x}) = H \mathbf{x}` with additive noise :math:`\sigma = \mathcal{N}(0,R)`. Daughter classes can overwrite to specify a more general measurement model :math:`h(\mathbf{x})`. :meth:`update` first calls :meth:`predict_measurement` function which proceeds by calculating the predicted measurement, innovation covariance and measurement cross-covariance, .. math:: \mathbf{z}_{k|k-1} &= H_k \mathbf{x}_{k|k-1} S_k &= H_k P_{k|k-1} H_k^T + R_k \Upsilon_k &= P_{k|k-1} H_k^T where :math:`P_{k|k-1}` is the predicted state covariance. :meth:`predict_measurement` returns a :class:`~.GaussianMeasurementPrediction`. The Kalman gain is then calculated as, .. math:: K_k = \Upsilon_k S_k^{-1} and the posterior state mean and covariance are, .. math:: \mathbf{x}_{k|k} &= \mathbf{x}_{k|k-1} + K_k (\mathbf{z}_k - H_k \mathbf{x}_{k|k-1}) P_{k|k} &= P_{k|k-1} - K_k S_k K_k^T These are returned as a :class:`~.GaussianStateUpdate` object. """ # TODO: at present this will throw an error if a measurement model is not # TODO: specified in either individual measurements or the Updater object measurement_model: LinearGaussian = Property( default=None, doc="A linear Gaussian measurement model. This need not be defined if " "a measurement model is provided in the measurement. If no model " "specified on construction, or in the measurement, then error " "will be thrown.") force_symmetric_covariance: bool = Property( default=False, doc="A flag to force the output covariance matrix to be symmetric by way of a simple " "geometric combination of the matrix and transpose. Default is False.") use_joseph_cov: bool = Property( default=False, doc="Bool dictating the method of covariance calculation. If use_joseph_cov is True then " "the Joseph form of the covariance equation is used.") def _measurement_matrix(self, predicted_state=None, measurement_model=None, **kwargs): r"""This is straightforward Kalman so just get the Matrix from the measurement model. Parameters ---------- predicted_state : :class:`~.GaussianState` The predicted state :math:`\mathbf{x}_{k|k-1}`, :math:`P_{k|k-1}` measurement_model : :class:`~.MeasurementModel` The measurement model. If omitted, the model in the updater object is used **kwargs : various Passed to :meth:`~.MeasurementModel.matrix` Returns ------- : :class:`numpy.ndarray` The measurement matrix, :math:`H_k` """ return self._check_measurement_model( measurement_model).matrix(**kwargs) def _measurement_cross_covariance(self, predicted_state, measurement_matrix): """ Return the measurement cross covariance matrix, :math:`P_{k~k-1} H_k^T` Parameters ---------- predicted_state : :class:`GaussianState` The predicted state which contains the covariance matrix :math:`P` as :attr:`.covar` attribute measurement_matrix : numpy.array The measurement matrix, :math:`H` Returns ------- : numpy.ndarray The measurement cross-covariance matrix """ return predicted_state.covar @ measurement_matrix.T def _innovation_covariance(self, m_cross_cov, meas_mat, meas_mod, measurement_noise, **kwargs): """Compute the innovation covariance Parameters ---------- m_cross_cov : numpy.ndarray The measurement cross covariance matrix meas_mat : numpy.ndarray Measurement matrix meas_mod : :class:~.MeasurementModel` Measurement model measurement_noise : bool Include measurement noise or not Returns ------- : numpy.ndarray The innovation covariance """ innov_covar = meas_mat @ m_cross_cov if measurement_noise: innov_covar += meas_mod.covar(**kwargs) return innov_covar def _posterior_mean(self, predicted_state, kalman_gain, measurement, measurement_prediction): r"""Compute the posterior mean, :math:`\mathbf{x}_{k|k} = \mathbf{x}_{k|k-1} + K_k \mathbf{y}_k`, where the innovation :math:`\mathbf{y}_k = \mathbf{z}_k - h(\mathbf{x}_{k|k-1}). Parameters ---------- predicted_state : :class:`State`, :class:`Prediction` The predicted state kalman_gain : numpy.ndarray Kalman gain measurement : :class:`Detection` The measurement measurement_prediction : :class:`MeasurementPrediction` Predicted measurement Returns ------- : :class:`StateVector` The posterior mean estimate """ post_mean = predicted_state.state_vector + \ kalman_gain @ (measurement.state_vector - measurement_prediction.state_vector) return post_mean.view(StateVector) def _posterior_covariance(self, hypothesis): """ Return the posterior covariance for a given hypothesis Parameters ---------- hypothesis: :class:`~.Hypothesis` A hypothesised association between state prediction and measurement. It returns the measurement prediction which in turn contains the measurement cross covariance, :math:`P_{k|k-1} H_k^T and the innovation covariance, :math:`S = H_k P_{k|k-1} H_k^T + R` Returns ------- : :class:`~.CovarianceMatrix` The posterior covariance matrix rendered via the Kalman update process. : numpy.ndarray The Kalman gain, :math:`K = P_{k|k-1} H_k^T S^{-1}` """ if self.use_joseph_cov: # Identity matrix id_matrix = np.identity(hypothesis.prediction.ndim) # Calculate Kalman gain kalman_gain = hypothesis.measurement_prediction.cross_covar @ \ np.linalg.inv(hypothesis.measurement_prediction.covar) measurement_model = self._check_measurement_model( hypothesis.measurement.measurement_model) # Calculate measurement matrix/jacobian matrix meas_matrix = self._measurement_matrix(hypothesis.prediction, measurement_model) # Calculate Prior covariance prior_covar = hypothesis.prediction.covar # Calculate measurement covariance meas_covar = measurement_model.covar() # Compute posterior covariance matrix I_KH = id_matrix - kalman_gain @ meas_matrix post_cov = I_KH @ prior_covar @ I_KH.T \ + kalman_gain @ meas_covar @ kalman_gain.T else: kalman_gain = hypothesis.measurement_prediction.cross_covar @ \ np.linalg.inv(hypothesis.measurement_prediction.covar) post_cov = hypothesis.prediction.covar - kalman_gain @ \ hypothesis.measurement_prediction.covar @ kalman_gain.T return post_cov.view(CovarianceMatrix), kalman_gain
[docs] @lru_cache() def predict_measurement(self, predicted_state, measurement_model=None, measurement_noise=True, **kwargs): r"""Predict the measurement implied by the predicted state mean Parameters ---------- predicted_state : :class:`~.GaussianState` The predicted state :math:`\mathbf{x}_{k|k-1}`, :math:`P_{k|k-1}` measurement_model : :class:`~.MeasurementModel` The measurement model. If omitted, the model in the updater object is used measurement_noise : bool Whether to include measurement noise :math:`R` with innovation covariance. Default `True` **kwargs : various These are passed to :meth:`~.MeasurementModel.function` and :meth:`~.MeasurementModel.matrix` Returns ------- : :class:`GaussianMeasurementPrediction` The measurement prediction, :math:`\mathbf{z}_{k|k-1}` """ # If a measurement model is not specified then use the one that's # native to the updater measurement_model = self._check_measurement_model(measurement_model) pred_meas = measurement_model.function(predicted_state, **kwargs) hh = self._measurement_matrix(predicted_state=predicted_state, measurement_model=measurement_model, **kwargs) # The measurement cross covariance and innovation covariance meas_cross_cov = self._measurement_cross_covariance(predicted_state, hh) innov_cov = self._innovation_covariance( meas_cross_cov, hh, measurement_model, measurement_noise, **kwargs) return MeasurementPrediction.from_state( predicted_state, pred_meas, innov_cov, cross_covar=meas_cross_cov)
[docs] def update(self, hypothesis, **kwargs): r"""The Kalman update method. Given a hypothesised association between a predicted state or predicted measurement and an actual measurement, calculate the posterior state. Parameters ---------- hypothesis : :class:`~.SingleHypothesis` the prediction-measurement association hypothesis. This hypothesis may carry a predicted measurement, or a predicted state. In the latter case a predicted measurement will be calculated. **kwargs : various These are passed to :meth:`predict_measurement` Returns ------- : :class:`~.GaussianStateUpdate` The posterior state Gaussian with mean :math:`\mathbf{x}_{k|k}` and covariance :math:`P_{x|x}` """ # Get the predicted state out of the hypothesis predicted_state = hypothesis.prediction # If there is no measurement prediction in the hypothesis then do the # measurement prediction (and attach it back to the hypothesis). if hypothesis.measurement_prediction is None: # Get the measurement model out of the measurement if it's there. # If not, use the one native to the updater (which might still be # none) measurement_model = hypothesis.measurement.measurement_model measurement_model = self._check_measurement_model( measurement_model) # Attach the measurement prediction to the hypothesis hypothesis.measurement_prediction = self.predict_measurement( predicted_state, measurement_model=measurement_model, **kwargs) # Kalman gain and posterior covariance posterior_covariance, kalman_gain = self._posterior_covariance(hypothesis) # Posterior mean posterior_mean = self._posterior_mean(predicted_state, kalman_gain, hypothesis.measurement, hypothesis.measurement_prediction) if self.force_symmetric_covariance: posterior_covariance = \ (posterior_covariance + posterior_covariance.T)/2 return Update.from_state( hypothesis.prediction, posterior_mean, posterior_covariance, timestamp=hypothesis.measurement.timestamp, hypothesis=hypothesis)
[docs] class ExtendedKalmanUpdater(KalmanUpdater): r"""The Extended Kalman Filter version of the Kalman Updater. Inherits most of the functionality from :class:`~.KalmanUpdater`. The difference is that the measurement model may now be non-linear, though must be differentiable to return the linearisation of :math:`h(\mathbf{x})` via the matrix :math:`H` accessible via :meth:`~.NonLinearModel.jacobian`. """ # TODO: Enforce the fact that this version of MeasurementModel must be # TODO: capable of executing :attr:`jacobian()` measurement_model: MeasurementModel = Property( default=None, doc="A measurement model. This need not be defined if a measurement " "model is provided in the measurement. If no model specified on " "construction, or in the measurement, then error will be thrown. " "Must be linear or capable or implement the " ":meth:`~.NonLinearModel.jacobian`.") def _measurement_matrix(self, predicted_state, measurement_model=None, linearisation_point=None, **kwargs): r"""Return the (via :meth:`NonLinearModel.jacobian`) measurement matrix Parameters ---------- predicted_state : :class:`~.State` The predicted state :math:`\mathbf{x}_{k|k-1}` measurement_model : :class:`~.MeasurementModel` The measurement model. If omitted, the model in the updater object is used **kwargs : various Passed to :meth:`~.MeasurementModel.matrix` if linear or :meth:`~.MeasurementModel.jacobian` if not Returns ------- : :class:`numpy.ndarray` The measurement matrix, :math:`H_k` """ measurement_model = self._check_measurement_model(measurement_model) if isinstance(measurement_model, LinearModel): return measurement_model.matrix(**kwargs) else: if linearisation_point is None: linearisation_point = predicted_state return measurement_model.jacobian(linearisation_point, **kwargs)
[docs] class UnscentedKalmanUpdater(KalmanUpdater): """The Unscented Kalman Filter version of the Kalman Updater. Inherits most of the functionality from :class:`~.KalmanUpdater`. In this case the :meth:`predict_measurement` function uses the :func:`unscented_transform` function to estimate a (Gaussian) predicted measurement. This is then updated via the standard Kalman update equations. """ # Can be non-linear and non-differentiable measurement_model: MeasurementModel = Property( default=None, doc="The measurement model to be used. This need not be defined if a " "measurement model is provided in the measurement. If no model " "specified on construction, or in the measurement, then error " "will be thrown.") alpha: float = Property( default=0.5, doc="Primary sigma point spread scaling parameter. Default is 0.5.") beta: float = Property( default=2, doc="Used to incorporate prior knowledge of the distribution. If the " "true distribution is Gaussian, the value of 2 is optimal. " "Default is 2") kappa: float = Property( default=None, doc="Secondary spread scaling parameter. Default is calculated as " "3-Ns")
[docs] @lru_cache() def predict_measurement(self, predicted_state, measurement_model=None, measurement_noise=True, **kwargs): """Unscented Kalman Filter measurement prediction step. Uses the unscented transform to estimate a Gauss-distributed predicted measurement. Parameters ---------- predicted_state : :class:`~.GaussianStatePrediction` A predicted state measurement_model : :class:`~.MeasurementModel`, optional The measurement model used to generate the measurement prediction. This should be used in cases where the measurement model is dependent on the received measurement (the default is `None`, in which case the updater will use the measurement model specified on initialisation) measurement_noise : bool Whether to include measurement noise :math:`R` with innovation covariance Returns ------- : :class:`~.GaussianMeasurementPrediction` The measurement prediction """ measurement_model = self._check_measurement_model(measurement_model) sigma_points, mean_weights, covar_weights = \ gauss2sigma(predicted_state, self.alpha, self.beta, self.kappa) covar_noise = measurement_model.covar(**kwargs) if measurement_noise else None meas_pred_mean, meas_pred_covar, cross_covar, *_ = \ unscented_transform(sigma_points, mean_weights, covar_weights, measurement_model.function, covar_noise=covar_noise) return MeasurementPrediction.from_state( predicted_state, meas_pred_mean, meas_pred_covar, cross_covar=cross_covar)
[docs] class SqrtKalmanUpdater(ExtendedKalmanUpdater): r"""The Square root version of the Kalman Updater. The input :class:`~.State` is a :class:`~.SqrtGaussianState` which means that the covariance of the predicted state is stored in square root form. This can be achieved by keeping :attr:`covar` attribute as :math:`L` where the 'full' covariance matrix :math:`P_{k|k-1} = L_{k|k-1} L^T_{k|k-1}` [Eq1]. In its basic form :math:`L` is the lower triangular matrix returned via Cholesky factorisation. There's no reason why other forms that satisfy Eq 1 above can't be used. References ---------- 1. Schmidt, S.F. 1970, Computational techniques in Kalman filtering, NATO advisory group for aerospace research and development, London 1970 2. Andrews, A. 1968, A square root formulation of the Kalman covariance equations, AIAA Journal, 6:6, 1165-1166 """ qr_method: bool = Property( default=False, doc="A switch to do the update via a QR decomposition, rather than using the (vector form " "of) the Potter method.") def _measurement_cross_covariance(self, predicted_state, measurement_matrix): """ Return the measurement cross covariance matrix, :math:`P_{k|k-1} H_k^T`. This differs slightly from its parent in that it the predicted state covariance (now a square root matrix) is transposed. Parameters ---------- predicted_state : :class:`SqrtGaussianState` The predicted state which contains the square root form of the covariance matrix :math:`W` as :attr:`.covar` attribute measurement_matrix : numpy.array The measurement matrix, :math:`H` Returns ------- : numpy.ndarray The measurement cross-covariance matrix """ return predicted_state.sqrt_covar.T @ measurement_matrix.T def _innovation_covariance(self, m_cross_cov, meas_mat, meas_mod, measurement_noise, **kwargs): """Compute the innovation covariance Parameters ---------- m_cross_cov : numpy.ndarray The measurement cross covariance matrix meas_mat : numpy.ndarray The measurement matrix. Not required in this instance. Ignored. meas_mod : :class:`~.MeasurementModel` Measurement model. The class attribute :attr:`sqrt_covar` indicates whether this is passed in square root form. If it doesn't exist then :attr:`covar` is assumed to exist and is used instead. measurement_noise : bool Include measurement noise or not Returns ------- : numpy.ndarray The innovation covariance """ innov_covar = m_cross_cov.T @ m_cross_cov if measurement_noise: # If the measurement covariance matrix is square root then square it try: meas_cov = meas_mod.sqrt_covar @ meas_mod.sqrt_covar.T except AttributeError: meas_cov = meas_mod.covar(**kwargs) innov_covar += meas_cov return innov_covar def _posterior_covariance(self, hypothesis): """ Return the posterior covariance for a given hypothesis. Hypothesis contains the predicted state covariance in square root form, the measurement prediction (which in turn contains the measurement cross covariance, :math:`P_{k|k-1} H_k^T and the innovation covariance, :math:`S = H_k P_{k|k-1} H_k^T + R`, not in square root form). The hypothesis or the updater contain the measurement noise matrix. The :attr:`sqrt_measurement_noise` flag indicates whether we should use the square root form of this matrix (True) or its full form (False). Parameters ---------- hypothesis: :class:`~.Hypothesis` A hypothesised association between state prediction and measurement Method ------ If the :attr:`qr_method` flag is set to True then the update proceeds via a QR decomposition which requires only one further matrix inversion (see [1]), rather than three plus a Cholesky factorisation, for the method set out in [2]. Returns ------- : numpy.array The posterior covariance matrix rendered via the Kalman update process in lower-triangular form. : numpy.array The Kalman gain, :math:`K = P_{k|k-1} H_k^T S^{-1}` """ # Do we already have a measurement model? measurement_model = \ self._check_measurement_model(hypothesis.measurement.measurement_model) # Square root of the noise covariance, account for the fact that it may be supplied in one # of two ways try: sqrt_noise_cov = measurement_model.sqrt_covar except AttributeError: sqrt_noise_cov = la.sqrtm(measurement_model.covar()) if self.qr_method: # The prior and noise covariances and the measurement matrix sqrt_prior_cov = hypothesis.prediction.sqrt_covar bigh = measurement_model.matrix() # Set up and execute the QR decomposition measdim = measurement_model.ndim_meas zeros = np.zeros((measurement_model.ndim_state, measdim)) biga = np.block([[sqrt_noise_cov, bigh@sqrt_prior_cov], [zeros, sqrt_prior_cov]]) _, upper = np.linalg.qr(biga.T) # Extract meaningful quantities atheta = upper.T sqrt_innov_cov = atheta[:measdim, :measdim] kalman_gain = atheta[measdim:, :measdim]@(np.linalg.inv(sqrt_innov_cov)) post_cov = atheta[measdim:, measdim:] else: # Kalman gain kalman_gain = \ hypothesis.prediction.sqrt_covar @ \ hypothesis.measurement_prediction.cross_covar @ \ np.linalg.inv(hypothesis.measurement_prediction.covar) # Square root of the innovation covariance sqrt_innov_cov = la.sqrtm(hypothesis.measurement_prediction.covar) # Posterior covariance post_cov = hypothesis.prediction.sqrt_covar @ \ (np.identity(hypothesis.prediction.ndim) - hypothesis.measurement_prediction.cross_covar @ np.linalg.inv(sqrt_innov_cov.T) @ np.linalg.inv(sqrt_innov_cov + sqrt_noise_cov) @ hypothesis.measurement_prediction.cross_covar.T) return post_cov, kalman_gain
[docs] class IteratedKalmanUpdater(ExtendedKalmanUpdater): r"""This version of the Kalman updater runs an iteration over the linearisation of the sensor function in order to refine the posterior state estimate. Specifically, .. math:: \mathbf{x}_{k,i+1} &= \mathbf{x}_{k|k-1} + K_i [\mathbf{z} - h(\mathbf{x}_{k,i}) - H_i (\mathbf{x}_{k|k-1} - \mathbf{x}_{k,i}) ] P_{k,i+1} &= (I - K_i H_i) P_{k|k-1} where, .. math:: H_i &= h^{\prime}(\mathbf{x}_{k,i}), K_i &= P_{k|k-1} H_i^T (H_i P_{k|k-1} H_i^T + R)^{-1} and .. math:: \mathbf{x}_{k,0} &= \mathbf{x}_{k|k-1} P_{k,0} &= P_{k|k-1} It inherits from the ExtendedKalmanUpdater as it uses the same linearisation of the sensor function via the :meth:`_measurement_matrix()` function. """ tolerance: float = Property( default=1e-6, doc="The value of the difference in the measure used as a stopping criterion.") measure: Measure = Property( default=Euclidean(), doc="The measure to use to test the iteration stopping criterion. Defaults to the " "Euclidean distance between current and prior posterior state estimate.") max_iterations: int = Property( default=1000, doc="Number of iterations before while loop is exited and a non-convergence warning is " "returned")
[docs] def update(self, hypothesis, **kwargs): r"""The iterated Kalman update method. Given a hypothesised association between a predicted state or predicted measurement and an actual measurement, calculate the posterior state. Parameters ---------- hypothesis : :class:`~.SingleHypothesis` the prediction-measurement association hypothesis. This hypothesis may carry a predicted measurement, or a predicted state. In the latter case a predicted measurement will be calculated. **kwargs : various These are passed to the measurement model function Returns ------- : :class:`~.GaussianStateUpdate` The posterior state Gaussian with mean :math:`\mathbf{x}_{k|k}` and covariance :math:`P_{k|k}` """ # Get the measurement model measurement_model = self._check_measurement_model(hypothesis.measurement.measurement_model) # The first iteration is just the application of the EKF post_state = super().update(hypothesis, **kwargs) # Now update the measurement prediction mean and loop iterations = 0 prev_state = None while iterations == 0 or self.measure(prev_state, post_state) > self.tolerance: if iterations > self.max_iterations: warnings.warn("Iterated Kalman update did not converge") break # These lines effectively bypass the predict_measurement function in update() # by attaching new linearised quantities to the measurement_prediction. Those # would otherwise be calculated (from the original prediction) by the update() method. hh = self._measurement_matrix(post_state, measurement_model=measurement_model) post_state.hypothesis.measurement_prediction.state_vector = \ measurement_model.function(post_state, noise=None) + \ hh@(hypothesis.prediction.state_vector - post_state.state_vector) cross_cov = self._measurement_cross_covariance(hypothesis.prediction, hh) post_state.hypothesis.measurement_prediction.cross_covar = cross_cov post_state.hypothesis.measurement_prediction.covar = \ self._innovation_covariance(cross_cov, hh, measurement_model, True) prev_state = post_state post_state = super().update(post_state.hypothesis, **kwargs) # increment counter iterations += 1 return post_state
[docs] class SchmidtKalmanUpdater(ExtendedKalmanUpdater): r"""A class which extends the standard Kalman filter to employ the Schmidt-Kalman version of the update. The key thing here is that the state vector is split into parameters to be estimated, and those which are merely 'considered'. The consider parameters are not updated, though their relative covariances are maintained through the process. The state vector, covariance and measurement matrix are defined as, .. math :: \mathbf{x}^T &\triangleq [\mathbf{s}^T \ \mathbf{p}^T] H &= [H_s \ H_p] .. math :: P &= \begin{bmatrix} P_{ss} & P_{sp} \\ P_{ps} & P_{pp} \end{bmatrix} where the consider parameters are denoted :math:`p` and those to be estimated :math:`s`. Note that though they are separated in the definition above, they may be interleaved in practice. The update proceeds as: .. math :: K_s &= (P_{ss,k|k-1} H_s^T + P_{sp,k|k-1} H_p^T) S^{-1}, \mathbf{s}_{k|k} &= \mathbf{s}_{k|k-1} + K_s (\mathbf{z} - H_s \mathbf{s}_{k|k-1} - H_p \mathbf{p}_{k|k-1}), \mathbf{p}_{k|k} &= \mathbf{p}_{k|k-1}, .. math :: P_{k|k} &= \begin{bmatrix} P_{ss,k|k-1} - K_s S K_s^T & P_{sp,k|k-1} - K_s H \begin{bmatrix} P_{sp,k|k-1} \\ P_{pp,k|k-1} \end{bmatrix} \\ P_{ps,k|k-1} - \begin{bmatrix} P_{sp,k|k-1} \\ P_{pp,k|k-1} \end{bmatrix}^T H^T K_s^T & P_{pp,k|k-1} \end{bmatrix} Note ---- Due to the excellent efficiency of NumPy's matrix algebra tools, the savings gained by extracting a sub-matrix over performing the calculation on full matrices, are relatively minor. This class therefore functions most effectively as a tutorial example of the Schmidt-Kalman updater. Efficiencies could be made by enforcing view operations rather than copies, using the square-root form or, most promisingly, by employing Cython. References ---------- [1] S. F. Schmidt, “Application of State-Space Methods to Navigation Problems,” Advances in Control Systems, Vol. 3, 1966, pp. 293–340 [2] Zanetti, R. & D’Souza, C. (2013). Recursive Implementations of the Schmidt-Kalman ‘Consider’ Filter. The Journal of the Astronautical Sciences. 60. 672-685. 10.1007/s40295-015-0068-7. """ consider: np.ndarray = Property(default=None, doc="The boolean vector of 'consider' parameters. True " "indicates considered, False are state parameters to be " "estimated. If undefined these default to all False, i.e." "the standard Kalman filter.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.consider is None: self.consider = np.zeros(self.measurement_model.ndim_state, dtype=bool) def _posterior_mean(self, predicted_state, kalman_gain, measurement, measurement_prediction): """Compute the posterior mean, :math:`s_{k|k} = s_{k|k-1} + K_s (z - H_s s_{k|k-1} - H_p p_{k|k-1})`, :math:`p_{k|k} = p_{k|k-1}. Parameters ---------- predicted_state : :class:`State`, :class:`Prediction` The predicted state kalman_gain : numpy.ndarray The reduced form of the Kalman gain, :math:`K_s` measurement : :class:`Detection` The measurement measurement_prediction : :class:`MeasurementPrediction` Predicted measurement Returns ------- : :class:`StateVector` The posterior mean estimate """ post_mean = predicted_state.state_vector.copy() post_mean[np.ix_(~self.consider)] += \ kalman_gain @ (measurement.state_vector - measurement_prediction.state_vector) return post_mean.view(StateVector) def _posterior_covariance(self, hypothesis): """ Return the posterior covariance for a given hypothesis Parameters ---------- hypothesis: :class:`~.Hypothesis` A hypothesised association between state prediction and measurement. It returns the measurement prediction which in turn contains the measurement cross covariance, :math:`P_{k|k-1} H_k^T and the innovation covariance, :math:`S = H_k P_{k|k-1} H_k^T + R` Returns ------- : :class:`~.CovarianceMatrix` The posterior covariance matrix rendered via the Schmidt-Kalman update process. : numpy.ndarray The reduced form of the Kalman gain, :math:`K_s = (P_{ss,k|k-1} H_{s,k}^T + P_{sp,k|k-1} H_{p,k}^T) S^{-1}` """ # Intermediate matrices P_p and H. pp = hypothesis.prediction.covar[np.tile(self.consider, (len(self.consider), 1))] pp = pp.reshape((len(self.consider), np.sum(self.consider))) hh = self._measurement_matrix(predicted_state=hypothesis.prediction) # First get the Kalman gain mcc = hypothesis.measurement_prediction.cross_covar kalman_gain = mcc[np.ix_(~self.consider)] @ \ np.linalg.inv(hypothesis.measurement_prediction.covar) # Then assemble the quadrants of the posterior covariance (easier to think of them as # quadrants even though they're actually submatrices who may appear in somewhat different # places.) post_cov = hypothesis.prediction.covar.copy() post_cov[np.ix_(~self.consider, ~self.consider)] -= \ kalman_gain @ hypothesis.measurement_prediction.covar @ kalman_gain.T khp = kalman_gain @ hh @ pp post_cov[np.ix_(~self.consider, self.consider)] -= khp post_cov[np.ix_(self.consider, ~self.consider)] -= khp.T return post_cov.view(CovarianceMatrix), kalman_gain
[docs] class CubatureKalmanUpdater(KalmanUpdater): """The cubature Kalman filter version of the Kalman updater. Inherits most of its functionality from :class:`~.KalmanUpdater`. The :meth:`predict_measurement` function uses the :func:`cubature_transform` function to estimate a (Gaussian) predicted measurement. This is then updated via the standard Kalman update equations. """ measurement_model: MeasurementModel = Property( default=None, doc="The measurement model to be used. This need not be defined if a " "measurement model is provided in the measurement. If no model " "specified on construction, or in the measurement, then error " "will be thrown.") alpha: float = Property( default=1.0, doc="Scaling parameter. Default is 1.0. Lower values select points closer to the mean and " "vice versa.")
[docs] @lru_cache() def predict_measurement(self, predicted_state, measurement_model=None, measurement_noise=True, **kwargs): """Cubature Kalman Filter measurement prediction step. Uses the cubature transform to estimate a Gauss-distributed predicted measurement. Parameters ---------- predicted_state : :class:`~.GaussianStatePrediction` A predicted state measurement_model : :class:`~.MeasurementModel`, optional The measurement model used to generate the measurement prediction. This should be used in cases where the measurement model is dependent on the received measurement (the default is `None`, in which case the updater will use the measurement model specified on initialisation) measurement_noise : bool Whether to include measurement noise :math:`R` with innovation covariance. Default `True` Returns ------- : :class:`~.GaussianMeasurementPrediction` The measurement prediction """ measurement_model = self._check_measurement_model(measurement_model) covar_noise = measurement_model.covar(**kwargs) if measurement_noise else None meas_pred_mean, meas_pred_covar, cross_covar, _ = \ cubature_transform(predicted_state, measurement_model.function, covar_noise=covar_noise, alpha=self.alpha) return MeasurementPrediction.from_state( predicted_state, meas_pred_mean, meas_pred_covar, cross_covar=cross_covar)