Source code for stonesoup.mixturereducer.gaussianmixture

import uuid

import numpy as np
from ordered_set import OrderedSet
from scipy.spatial import KDTree
from scipy.linalg import pinv

from ..base import Property
from .base import MixtureReducer
from ..types.state import TaggedWeightedGaussianState, WeightedGaussianState, GaussianState
from ..measures import SquaredMahalanobis
from operator import attrgetter


[docs] class GaussianMixtureReducer(MixtureReducer): """ Gaussian Mixture Reducer class: Reduces the number of components in a Gaussian mixture to increase computational efficiency. See [1] for details. Achieved in three ways: pruning, merging, and truncating. Pruning is the act of removing low weight components from the mixture that fall below a pruning threshold. Merging is the act of combining similar components in the mixture that fall with a distance threshold into a single component. Truncating is the act of removing low weight components from the mixture so that the number of components in the mixture stays below a given threshold. Truncating is performed after the pruning and merging. References ---------- [1] B.-N. Vo and W.-K. Ma, “The Gaussian Mixture Probability Hypothesis Density Filter,” Signal Processing,IEEE Transactions on, vol. 54, no. 11, pp. 4091–4104, 2006.. """ prune_threshold: float = Property(default=1e-9, doc='Mixture component weight ' 'threshold for pruning') merge_threshold: float = Property(default=16, doc='Squared Mahalanobis distance ' 'threshold for merging') max_number_components: int = Property(default=np.iinfo(np.int64).max, doc='Maximum number of components to keep ' 'in the Gaussian mixture') merging: bool = Property(default=True, doc='Flag for merging') pruning: bool = Property(default=True, doc='Flag for pruning components whose weight is below ' ':attr:`prune_threshold`') truncating: bool = Property(default=True, doc='Flag for truncating components, keeping a maximum ' 'of :attr:`max_number_components` components') kdtree_max_distance: float = Property( default=None, doc="This defines the max Euclidean search distance for a kd-tree, " "used as part of the merge process as a coarse gate. Default " "`None` where tree isn't used and all components are checked " "against the merge threshold.")
[docs] def reduce(self, components_list): """ Reduce the components of Gaussian Mixture :class:`list` through pruning, merging, and truncating Parameters ---------- components_list : :class:`~.list` The components of Gaussian Mixture Returns ------- :class:`~.list` Reduced components """ if len(components_list) > 0: if self.pruning: components_list = self.prune(components_list) if len(components_list) > 1 and self.merging: components_list = self.merge(components_list) if len(components_list) > self.max_number_components and self.truncating: components_list = self.truncate(components_list) return components_list
[docs] def prune(self, components_list): """ Pruning is the act of removing low weight components from the mixture that fall below a pruning threshold :attr:`prune_threshold`. Parameters ---------- components_list : :class:`~.list` The components of Gaussian Mixture to be pruned Returns ------- remaining_components : :class:`~.GaussianMixtureState` Components that remain after pruning """ # Prune low weight components pruned_weight_sum = 0 for component in components_list: if component.weight < self.prune_threshold: pruned_weight_sum += component.weight remaining_components = [component for component in components_list if component.weight >= self.prune_threshold] # Distribute pruned weights across remaining components for component in remaining_components: component.weight += \ pruned_weight_sum / len(remaining_components) return remaining_components
[docs] def merge_components(self, component_1, component_2): """ Merge two similar components Parameters ---------- component_1 : :class:`~.WeightedGaussianState` First component to be merged component_2 : :class:`~.WeightedGaussianState` Second component to be merged Returns ------- merged_component : :class:`~.WeightedGaussianState` Merged Gaussian component """ weight_sum = component_1.weight + component_2.weight w1 = component_1.weight / weight_sum w2 = component_2.weight / weight_sum merged_mean = component_1.mean*w1 + component_2.mean*w2 merged_covar = component_1.covar*w1 + component_2.covar*w2 mu1_minus_m2 = component_1.mean - component_2.mean merged_covar = merged_covar + \ mu1_minus_m2*mu1_minus_m2.T*w1*w2 if weight_sum > 1: weight_sum = 1 if isinstance(component_1, TaggedWeightedGaussianState): merged_component = TaggedWeightedGaussianState( state_vector=merged_mean, covar=merged_covar, weight=weight_sum, tag=component_1.tag, timestamp=component_1.timestamp ) elif isinstance(component_1, WeightedGaussianState): merged_component = WeightedGaussianState( state_vector=merged_mean, covar=merged_covar, weight=weight_sum, timestamp=component_1.timestamp ) return merged_component
[docs] def merge(self, components_list): """ Merging is the act of combining similar components in the mixture that fall with a distance threshold :attr:`merge_threshold` into a single component. Parameters ---------- components_list : :class:`~.list` Components of the Gaussian Mixture to be merged Returns ------- :class:`~.list` Merged components """ if self.kdtree_max_distance is not None: tree = KDTree( np.vstack([component.state_vector[:, 0] for component in components_list])) else: tree = None # Sort components by weight remaining_components = OrderedSet(sorted( components_list, key=attrgetter('weight'))) merged_components = [] final_merged_components = [] measure = SquaredMahalanobis(state_covar_inv_cache_size=None) while remaining_components: # Get highest weighted component best_component = remaining_components.pop() # If kdtree_max_distance set, use this as gate if tree: indexes = tree.query_ball_point( best_component.state_vector.ravel(), r=self.kdtree_max_distance) matched_components = {components_list[i] for i in indexes if components_list[i] in remaining_components} else: # Modifying list in loop, so copy used matched_components = remaining_components.copy() # Check for similar components against threshold for component in matched_components: # Calculate distance between component and best component distance = measure(state1=component, state2=best_component) # Merge if similar if distance < self.merge_threshold: remaining_components.remove(component) best_component = self.merge_components( best_component, component ) # Add potentially merged component to new mixture merged_components.append(best_component) if all(isinstance(component, TaggedWeightedGaussianState) for component in merged_components): # Check for duplicate tags components_tags = set(component.tag for component in merged_components) if len(components_tags) != len(merged_components): # There are duplicatze tags so assign # new tags to the lower weighted shared ones for shared_tag in components_tags: shared_components = sorted( (component for component in merged_components if component.tag == shared_tag), key=attrgetter('weight'), reverse=True) final_merged_components.append(shared_components[0]) for component in shared_components[1:]: # Assign a new uuid component.tag = str(uuid.uuid4()) final_merged_components.append(component) else: # No duplicates final_merged_components.extend(merged_components) else: # Just weighted components (no tags) final_merged_components.extend(merged_components) # Assign merged components to the mixture return final_merged_components
[docs] def truncate(self, components_list): """ Truncating is the act of removing low-weight components from the mixture so that the size of the mixture (number of components) stays within the given threshold :attr:`max_number_components`. Parameters ---------- components_list : :class:`~.list` Components of the Gaussian Mixture to be truncated Returns ------- :class:`~.list` The :attr:`max_number_components` components with the highest weights """ # Sort components by weight from highest to lowest all_components = sorted( components_list, key=attrgetter('weight'), reverse=True) # Make list of truncated components. This function is called only when # len(components_list) > self.max_number_components, so the next line # will never give an index error truncated_components = all_components[self.max_number_components:] truncated_weight_sum = sum([component.weight for component in truncated_components]) # Distribute truncated weights across remaining components remaining_components = all_components[:self.max_number_components] for component in remaining_components: component.weight += \ truncated_weight_sum / self.max_number_components return remaining_components
[docs] class CovarianceIntersection(MixtureReducer): r"""Class to perform Covariance Intersection of `n` states. The resulting state is given by its state vector :math:`\bf{x}` and covariance :math:`C`. .. math:: C^{-1} = \sum_{i=1}^{n} \omega_{i}C_{i}^{-1} .. math:: \mathbf{x} = C \sum_{i=1}^{n} \omega_{i}C_{i}^{-1}\mathbf{x}_{i} where :math:`\omega` are the weights associated with each state such that :math:`\sum_{i=1}^{n} \omega_i = 1`. """
[docs] @staticmethod def merge_components(*components, weights=None): r""" Merge similar Gaussian components using coveriance intersection Parameters ---------- *components : :class:`GaussianState` Gaussian states to be combined. weights : list[float], default: None The weight :math:`\omega` associated with each state. If left as `None` this will be set to equal weights across all states. Returns ------- :class:`GaussianState` Merged Gaussian component. """ if weights is None: weights = np.ones(len(components)) if len(weights) != len(components): raise IndexError weights = np.asarray(weights) / sum(weights) inv_covs = [weight*pinv(component.covar) for weight, component in zip(weights, components)] C = pinv(sum(inv_covs)) x = C @ sum(inv_cov @ component.state_vector for inv_cov, component in zip(inv_covs, components)) new_component = GaussianState.from_state(components[0], state_vector=x, covar=C) return new_component