Source code for stonesoup.metricgenerator.pcrbmetric

from collections.abc import Sequence
from copy import copy

import numpy as np
from numpy.linalg import inv

from .base import MetricGenerator
from ..base import Property
from ..types.state import GaussianState
from ..types.groundtruth import GroundTruthState, GroundTruthPath
from ..types.array import StateVectors
from ..models.transition import TransitionModel
from ..models.measurement import MeasurementModel
from ..types.metric import TimeRangeMetric
from ..types.time import TimeRange


[docs] class PCRBMetric(MetricGenerator): """ Computes the Posterior Cramer-Rao Bound (PCRB) [1] for a given ground truth prior, using a Riccati recursion [2]. PCRB provides a MSE bound on the performance of unbiased filtering algorithms. Reference: [1] M. L. Hernandez, B. Ristic and A. Farina, "A performance bound for maneuvering target tracking using best-fitting Gaussian distributions," 2005 7th International Conference on Information Fusion, 2005, pp. 8 pp.-, doi: 10.1109/ICIF.2005.1591829. [2] P. Tichavsky, C. H. Muravchik and A. Nehorai, "Posterior Cramer-Rao bounds for discrete-time nonlinear filtering," in IEEE Transactions on Signal Processing, vol. 46, no. 5, pp. 1386-1396, May 1998, doi: 10.1109/78.668800. """ prior: GaussianState = Property(doc="The prior used to initiate the track") transition_model: TransitionModel = Property( doc="The transition model used to propagate the track's state") measurement_model: MeasurementModel = Property( doc="The measurement model that projects a track into measurement space (and vice versa") sensor_locations: StateVectors = Property( doc="The locations of the sensors (currently assuming sensors are static)") position_mapping: Sequence[int] = Property( default=None, doc="Mapping for position coordinates. Default `None`, which uses the measurement model" "mapping") velocity_mapping: Sequence[int] = Property( default=None, doc="Mapping for velocity coordinates. Default `None`, in which case velocity RMSE is not " "computed") irf: float = Property(doc="Information reduction factor. Default is 1", default=1.) truths_key: str = Property(doc="Key to access set of ground truths added to MetricManager", default='groundtruth_paths') generator_name: str = Property(doc="Unique identifier to use when accessing generated " "metrics from MultiManager", default='pcrb_generator') def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.position_mapping is None: self.position_mapping = self.measurement_model.mapping
[docs] def compute_metric(self, manager, **kwargs): groundtruth_paths = self._get_data(manager, self.truths_key) pcrb_metrics = [] # if groundtruth is a set of states not paths, make states into a path if isinstance(next(iter(groundtruth_paths)), GroundTruthState): groundtruth_paths = sorted(list(groundtruth_paths), key=lambda x: x.timestamp) groundtruth_paths = [GroundTruthPath(groundtruth_paths)] for gnd_path in groundtruth_paths: pcrb_metric = self._compute_pcrb_single(self.prior, self.transition_model, self.measurement_model, gnd_path, self.sensor_locations, self.irf, self.position_mapping, self.velocity_mapping) time_range = TimeRange(gnd_path.states[0].timestamp, gnd_path.timestamp) pcrb_metrics.append(TimeRangeMetric(title='PCRB Metrics', value=pcrb_metric, time_range=time_range, generator=self)) return pcrb_metrics
@classmethod def _compute_pcrb_single(cls, prior: GaussianState, transition_model: TransitionModel, measurement_model: MeasurementModel, groundtruth: GroundTruthPath, sensor_locations: StateVectors, irf_overall: float, position_mapping: Sequence[int], velocity_mapping: Sequence[int]): """ Compute the PCRB for a single Ground truth path Parameters ---------- prior: GaussianState The prior used to initiate the track transition_model: TransitionModel The transition model used to propagate the track's state measurement_model: MeasurementModel The measurement model that projects a track into measurement space (and vice versa) groundtruth: GroundTruthPath The ground truth path sensor_locations: StateVectors The locations of the sensors (currently assuming sensors are static) irf_overall: float Information reduction factor position_mapping: list of int Mapping for position coordinates. Default `None`, which uses the measurement model mapping velocity_mapping: list of int Mapping for velocity coordinates. Default `None`, in which case velocity RMSE is not computed Returns ------- dict A dictionary with keys: - `track`: The groundtruth track - `inverse_j`: A matrix of shape (`ndim_state`, `ndim_state`) - `position_RMSE`: The MSE bound on the positional RMSE - `velocity_RMSE`: The MSE bound on the velocity RMSE (only provided if \ `velocity_mapping` is not None) """ num_timesteps = len(groundtruth) num_sensors = sensor_locations.shape[1] ndim_state = transition_model.ndim_state # allocate memory inverse_j = np.zeros((num_timesteps, ndim_state, ndim_state)) pos_rmse = np.zeros(num_timesteps) vel_rmse = np.zeros(num_timesteps) irf = np.ones((num_sensors, num_timesteps))*irf_overall j = np.zeros((num_timesteps + 1, ndim_state, ndim_state)) # initialisation j[0] = inv(prior.covar) inverse_j[0] = inv(j[0]) # Compute RMSE pos_rmse[0] = cls._compute_pos_rmse(inverse_j[0], position_mapping) if velocity_mapping: vel_rmse[0] = cls._compute_vel_rmse(inverse_j[0], velocity_mapping) # Previous time prev_time = groundtruth.states[0].timestamp # run Riccati recursion for i, state in enumerate(groundtruth.states[1:], 1): # Current time curr_time = state.timestamp # Determine measurement contribution (total_j_z) - including clutter (if applicable) total_j_z = cls._calculate_j_z(state, sensor_locations, measurement_model, irf[:, i]) # Determine F and Q matrices dt = curr_time - prev_time f_matrix = transition_model.jacobian(state, time_interval=dt) q_matrix = transition_model.covar(time_interval=dt) # Determine: j[k] = inv(F j^_1 F^T + Q) + j_z (adjusted for the irf) j[i] = inv(f_matrix @ inverse_j[i - 1] @ f_matrix.T + q_matrix) + total_j_z # Determine the PCRB inverse_j[i] = inv(j[i]) # Determine the location RMSE pos_rmse[i] = cls._compute_pos_rmse(inverse_j[i], position_mapping) if velocity_mapping: vel_rmse[i] = cls._compute_vel_rmse(inverse_j[i], velocity_mapping) # Update previous time prev_time = curr_time metric = {'track': groundtruth, 'inverse_j': inverse_j, 'position_RMSE': pos_rmse} if velocity_mapping: metric['velocity_RMSE'] = vel_rmse return metric @staticmethod def _calculate_j_z(state, sensor_locations, measurement_model, irf): # allocate memory / initialisation overall_j_z = np.zeros((state.ndim, state.ndim)) # inverse of measurement covariance measurement_cov_inv = inv(measurement_model.covar()) num_sensors = sensor_locations.shape[1] for i in range(num_sensors): sensor_location = sensor_locations[:, i] measurement_model_cp = copy(measurement_model) if hasattr(measurement_model_cp, 'translation_offset'): measurement_model_cp.translation_offset = sensor_location h_matrix = measurement_model_cp.jacobian(state) j_z = h_matrix.T @ measurement_cov_inv @ h_matrix # increment overall_j_z += irf[i] * j_z return overall_j_z @staticmethod def _compute_pos_rmse(inverse_j, position_mapping): pos_mse = 0 for index in position_mapping: pos_mse += inverse_j[index, index] return np.sqrt(pos_mse) @staticmethod def _compute_vel_rmse(inverse_j, velocity_mapping): vel_mse = 0 for index in velocity_mapping: vel_mse += inverse_j[index, index] return np.sqrt(vel_mse)