Source code for stonesoup.smoother.lineargaussian

# -*- coding: utf-8 -*-
import copy

import numpy as np

from .base import Smoother
from ..types.multihypothesis import MultipleHypothesis
from ..types.state import GaussianState
from ..types.update import Update


[docs]class Backward(Smoother): """ Backwards component of a fixed-interval forward-backward smoother for a Linear Gaussian State Space Model. """ @staticmethod def _get_estimates(track): estimates = [] for state in track: # Check for multi-hypothesis if isinstance(state, Update) \ and isinstance(state.hypothesis, MultipleHypothesis): predictions = {hypothesis.prediction for hypothesis in state.hypothesis} if len(predictions) == 1: # One predictions, this is fine to use. estimates.append(predictions.pop()) else: # Multiple predictions, so can't process this. raise ValueError( "Track has MultipleHypotheses updates with multiple " "predictions.") elif isinstance(state, Update): estimates.append(state.hypothesis.prediction) else: # Prediction or other state # TODO: Should this be ignored and removed from filtered_track? estimates.append(state) return estimates
[docs] def track_smooth(self, filtered_track): """ Apply smoothing to a track of filtered estimates. Parameters ---------- filtered_track : :class:`~.Track` :class:`~.Track` object consisting a of :class:`~.GaussianStateUpdate` objects. Returns ------- smoothed_track : :class:`~.Track` :class:`~.Track` object containing smoothed :class:`~.GaussianState` objects. """ estimates = self._get_estimates(filtered_track) penultimate_index = len(filtered_track) - 2 smoothed_track = copy.deepcopy(filtered_track) # Iterating backwards from the penultimate state, to the first state. for t in range(penultimate_index, -1, -1): smoothed_track[t] = self.smooth(filtered_track[t], estimates[t+1], smoothed_track[t+1]) return smoothed_track
[docs] def smooth(self, filtered_state_t, predicted_state_tplus1, smoothed_state_tplus1): """ Deduce smoothed distribution :math:`p(x_{t} | y_{1:T})` from :math:`p(x_{t} | y_{1:t})`, :math:`p(x_{t+1} | y_{1:t})` and :math:`p(x_{t+1} | y_{1:T})`. Parameters ---------- filtered_state_t : :class:`GaussianState` Filtered state at time t. predicted_state_tplus1 : :class:`GaussianState` Prediction (at timestep t), of the state at time t+1. smoothed_state_tplus1 : :class:`GaussianState` Smoothed state at time t+1. Returns ------- : :class:`GaussianState` """ t = filtered_state_t.timestamp t_delta = smoothed_state_tplus1.timestamp - t A = self.transition_model.matrix(time_interval=t_delta) x = filtered_state_t.mean V = filtered_state_t.covar x_predict = predicted_state_tplus1.mean V_predict = predicted_state_tplus1.covar x_tplus1 = smoothed_state_tplus1.mean V_tplus1 = smoothed_state_tplus1.covar smoother_gain = V @ A.T @ np.linalg.inv(V_predict) x_smoothed = x + smoother_gain@(x_tplus1 - x_predict) V_smoothed = V + smoother_gain@(V_tplus1 - V_predict)@smoother_gain.T return GaussianState(x_smoothed, V_smoothed, timestamp=t)