# -*- coding: utf-8 -*-
from scipy.spatial import distance as dist
import uuid
from ..base import Property
from .base import MixtureReducer
from ..types.state import TaggedWeightedGaussianState, WeightedGaussianState
from operator import attrgetter
[docs]class GaussianMixtureReducer(MixtureReducer):
"""
Gaussian Mixture Reducer class:
Reduces the number of components in a Gaussian mixture to increase
computational efficiency. See [1] for details.
Achieved in two ways: pruning and merging.
Pruning is the act of removing low weight components from the mixture
that fall below a pruning threshold.
Merging is the act of combining similar components in the mixture
that fall with a distance threshold into a single component.
References
----------
[1] B.-N. Vo and W.-K. Ma, “The Gaussian Mixture Probability Hypothesis
Density Filter,” Signal Processing,IEEE Transactions on, vol. 54, no. 11,
pp. 4091–4104, 2006..
"""
prune_threshold: float = Property(default=1e-9, doc="Threshold for pruning.")
merge_threshold: float = Property(default=16, doc='Threshold for merging')
merging: bool = Property(default=True, doc='Flag for merging')
pruning: bool = Property(default=True, doc='Flag for pruning')
[docs] def reduce(self, components_list):
"""
Reduce the components of Gaussian Mixture :class:`list`
through pruning and merging
Parameters
----------
components_list : :class:`~.list`
The components of Gaussian Mixture
Returns
-------
:class:`~.list`
Reduced components
"""
if len(components_list) > 0:
if self.pruning:
components_list = self.prune(components_list)
if len(components_list) > 1 & self.merging:
components_list = self.merge(components_list)
return components_list
[docs] def prune(self, components_list):
"""
Pruning is the act of removing low weight components from the mixture
that fall below a pruning threshold :attr:`prune_threshold`.
Parameters
----------
components_list : :class:`~.list`
The components of Gaussian Mixture to be pruned
Returns
-------
remaining_components : :class:`~.GaussianMixtureState`
Components that remain after pruning
"""
# Prune low weight components
pruned_weight_sum = 0
for component in components_list:
if component.weight < self.prune_threshold:
pruned_weight_sum += component.weight
remaining_components = [component for component in components_list
if component.weight > self.prune_threshold]
# Distribute pruned weights across remaining components
for component in remaining_components:
component.weight += \
pruned_weight_sum / len(remaining_components)
return remaining_components
[docs] def merge_components(self, component_1, component_2):
"""
Merge two similar components
Parameters
----------
component_1 : :class:`~.WeightedGaussianState`
First component to be merged
component_2 : :class:`~.WeightedGaussianState`
Second component to be merged
Returns
-------
merged_component : :class:`~.WeightedGaussianState`
Merged Gaussian component
"""
weight_sum = component_1.weight+component_2.weight
w1 = component_1.weight / weight_sum
w2 = component_2.weight / weight_sum
merged_mean = component_1.mean*w1 + component_2.mean*w2
merged_covar = component_1.covar*w1 + component_2.covar*w2
mu1_minus_m2 = component_1.mean - component_2.mean
merged_covar = merged_covar + \
mu1_minus_m2*mu1_minus_m2.T*w1*w2
merged_weight = component_1.weight + component_2.weight
if merged_weight > 1:
merged_weight = 1
if isinstance(component_1, TaggedWeightedGaussianState):
merged_component = TaggedWeightedGaussianState(
state_vector=merged_mean,
covar=merged_covar,
weight=merged_weight,
tag=component_1.tag,
timestamp=component_1.timestamp
)
elif isinstance(component_1, WeightedGaussianState):
merged_component = WeightedGaussianState(
state_vector=merged_mean,
covar=merged_covar,
weight=merged_weight,
timestamp=component_1.timestamp
)
return merged_component
[docs] def merge(self, components_list):
"""
Merging is the act of combining similar components in the mixture
that fall with a distance threshold :attr:`merge_threshold` into
a single component.
Parameters
----------
components_list : :class:`~.list`
Components of the Gaussian Mixture to be merged
Returns
-------
:class:`~.list`
Merged components
"""
# Sort components by weight
remaining_components = sorted(
components_list, key=attrgetter('weight'))
merged_components = []
final_merged_components = []
while remaining_components:
# Get highest weighted component
best_component = remaining_components.pop()
# Check for similar components
# (modifying list in loop, so copy used)
for component in remaining_components.copy():
# Calculate distance between component and best component
distance = dist.mahalanobis(
best_component.mean, component.mean, best_component.covar)
# Merge if similar
if distance < self.merge_threshold:
remaining_components.remove(component)
best_component = self.merge_components(
best_component, component
)
# Add potentially merged component to new mixture
merged_components.append(best_component)
if all(isinstance(component, TaggedWeightedGaussianState)
for component in merged_components):
# Check for duplicate tags
components_tags = set(component.tag for component in merged_components)
if len(components_tags) != len(merged_components):
# There are duplicatze tags so assign
# new tags to the lower weighted shared ones
for shared_tag in components_tags:
shared_components = sorted(
(component for component in merged_components
if component.tag == shared_tag),
key=attrgetter('weight'))
final_merged_components.append(shared_components[0])
for component in shared_components[1:]:
# Assign a new uuid
component.tag = str(uuid.uuid4())
final_merged_components.append(component)
else:
# No duplicates
final_merged_components.extend(merged_components)
else:
# Just weighted components (no tags)
final_merged_components.extend(merged_components)
# Assign merged components to the mixture
return final_merged_components