import copy
import numpy as np
from ._utils import predict_lru_cache
from .kalman import KalmanPredictor
from ..base import Property
from ..kernel import Kernel, QuadraticKernel
from ..types.prediction import Prediction
from ..types.state import State
from ..types.update import KernelParticleStateUpdate
[docs]
class AdaptiveKernelKalmanPredictor(KalmanPredictor):
r"""An implementation of the adaptive kernel Kalman filter (AKKF) predictor. Here, the AKKF
draws inspiration from the concepts of kernel mean embeddings (KME) and Kalman Filter to
address tracking problems in nonlinear systems.
In the state space, at time :math:`k`, the prior state
particles are generated by passing the proposal particles at time :math:`k-1`, i.e.,
:math:`\tilde{\mathbf{x}}_{k-1}^{\{i=1:M\}}`, through the motion model as
.. math::
\mathbf{x}_k^{\{i\}} = \mathtt{f}\left(\tilde{\mathbf{x}}_{k-1}^{\{i\}},
\mathbf{u}_{k}^{\{i\}} \right).
In the kernel space, :math:`{\mathbf{x}}_{k}^{\{i=1:M_{\text{A}}\}}` are mapped as feature
mappings :math:`\Phi_k`.
Then, the predictive kernel weight vector :math:`\mathbf{w}^{-}_{k}`, and covariance matrix
:math:`{S}_{k}^{-}`, are calculated as
.. math::
\mathbf{w}^{-}_{k} &= \Gamma_{k} \mathbf{w}^{+}_{k-1}\\
{S}_{k}^{-} &= \Gamma_{k} {S}^{+}_{k-1} \Gamma_{k} ^{\mathrm{T}} +V_{k}.
Here, :math:`\mathbf{w}^{+}_{k-1}` and :math:`{S}_{k-1}^{+}` are the posterior kernel weight
mean vector and covariance matrix at time :math:`k-1`, respectively.
The transition matrix :math:`\Gamma_{k}` represents the change of sample representation, and
:math:`{V}_{k}` represents the finite matrix representation of the transition residual matrix.
"""
kernel: Kernel = Property(
default=None,
doc="Default is None. If None, the default :class:`~QuadraticKernel` is used.")
lambda_predictor: float = Property(
default=1e-3,
doc=r":math:`\lambda_{\tilde{K}}`. Regularisation parameter used to stabilise the inverse "
r"Gram matrix. Range is :math:`\left[10^{-4}, 10^{-2}\right]`")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.kernel is None:
self.kernel = QuadraticKernel()
[docs]
@predict_lru_cache()
def predict(self, prior, timestamp=None, proposal=None, **kwargs):
r"""The adaptive kernel version of the predict step
Parameters
----------
prior : :class:`~.KernelParticleState`
Prior state, :math:`\mathbf{x}_{k-1}`
timestamp : :class:`datetime.datetime`
Time to transit to (:math:`k`)
proposal : :class:`~.KernelParticleState`
Proposal state, :math:`\mathbf{x}_{k-1}`
**kwargs : various, optional
These are passed to :meth:`~.TransitionModel.covar`
Returns
-------
: :class:`~.KernelParticleStatePrediction`
The predicted state :math:`\mathbf{x}_{k|k-1}` and the predicted
state covariance :math:`P_{k|k-1}`
"""
if proposal is None:
if isinstance(prior, KernelParticleStateUpdate):
proposal = State(state_vector=prior.proposal)
else:
proposal = copy.copy(prior)
# Get the prediction interval
predict_over_interval = self._predict_over_interval(prior, timestamp)
new_state_vector = self.transition_model.function(
proposal,
time_interval=predict_over_interval,
**kwargs)
k_tilde_tilde = self.kernel(proposal)
k_tilde_nontilde = self.kernel(proposal, prior)
I = np.identity(len(prior)) # noqa: E741
inv_val = np.linalg.pinv(k_tilde_tilde + self.lambda_predictor * I)
kernel_t = inv_val @ k_tilde_nontilde
prediction_weights = kernel_t @ prior.weight
new_val = inv_val @ k_tilde_tilde - I
v = new_val@new_val.T / len(prior)
prediction_covariance = kernel_t @ prior.kernel_covar @ kernel_t.T + v
return Prediction.from_state(prior,
state_vector=new_state_vector,
weight=prediction_weights,
kernel_covar=prediction_covariance,
timestamp=timestamp,
transition_model=self.transition_model)