Source code for stonesoup.updater.composite

from typing import Sequence

from .base import Updater
from ..base import Property
from ..types.hypothesis import CompositeHypothesis
from ..types.update import CompositeUpdate


[docs]class CompositeUpdater(Updater): """Composite updater type A composition of sub-updaters (:class:`~.Updater`). """ sub_updaters: Sequence[Updater] = Property( doc="Sequence of sub-updaters comprising the composite updater. Must not be empty.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not isinstance(self.sub_updaters, Sequence): raise ValueError(f"Sub-updaters must be defined as an ordered list, not " f"{type(self.sub_updaters)}") if len(self.sub_updaters) == 0: raise ValueError("Cannot create an empty composite updater") if any(not isinstance(sub_updater, Updater) for sub_updater in self.sub_updaters): raise ValueError("All sub-updaters must be a Updater type") @property def measurement_model(self): raise NotImplementedError("A composition of updaters has no defined measurement model")
[docs] def predict_measurement(self, *args, **kwargs): """To attain measurement predictions, the composite updater will use it's sub-updaters' `predict_measurement` methods and leave combining these to the :class:`~.CompositeHypothesis` type.""" raise NotImplementedError("A composite updater has no method to predict a measurement")
[docs] def update(self, hypothesis: CompositeHypothesis, **kwargs): r"""Given a hypothesised association between a composite predicted state or composite predicted measurement and a composite measurement, calculate the composite posterior state. Parameters ---------- hypothesis : :class:`~.CompositeHypothesis` the prediction-measurement association hypothesis. This hypothesis may carry a composite predicted measurement, or a composite predicted state. In the latter case a measurement prediction is calculated for each sub-state of the composite hypothesis, which will then create its own composite measurement prediction. **kwargs : various These are passed to the :meth:`predict_measurement` method of each sub-updater Returns ------- : :class:`~.CompositeUpdate` The posterior composite state update """ sub_updates = [] if not isinstance(hypothesis, CompositeHypothesis): raise ValueError("CompositeUpdater can only update with CompositeHypothesis types") if len(hypothesis) != len(self): raise ValueError(f"Mismatch in number of sub-hypotheses {len(hypothesis)} and number " f"of sub-updaters {len(self)}") for sub_updater, sub_hypothesis in zip(self.sub_updaters, hypothesis.sub_hypotheses): sub_pred = sub_hypothesis.prediction sub_meas_model = sub_hypothesis.measurement.measurement_model if sub_hypothesis.measurement_prediction is None: sub_hypothesis.measurement_prediction = \ sub_updater.predict_measurement(sub_pred, sub_meas_model) if sub_hypothesis: sub_update = sub_updater.update(sub_hypothesis, **kwargs) else: # append predictions where no detection is available for sub-state sub_update = sub_hypothesis.prediction sub_updates.append(sub_update) return CompositeUpdate(sub_states=sub_updates, hypothesis=hypothesis)
def __contains__(self, item): return self.sub_updaters.__contains__(item) def __getitem__(self, index): """Can be indexed as a list, or sliced, in which case a new composite updater will be created from the sub-list of sub-updaters.""" if isinstance(index, slice): return self.__class__(self.sub_updaters.__getitem__(index)) return self.sub_updaters.__getitem__(index) def __iter__(self): return self.sub_updaters.__iter__() def __len__(self): return self.sub_updaters.__len__()