from abc import ABC
import copy
import datetime
from collections.abc import Mapping, Sequence
import numpy as np
from ..base import Base, Property
from ..dataassociator import DataAssociator
from ..measures import Euclidean, KLDivergence
from ..platform import Platform
from ..predictor import Predictor
from ..predictor.kalman import KalmanPredictor
from ..predictor.particle import ParticlePredictor
from ..resampler.particle import SystematicResampler
from ..sensor.sensor import Sensor
from ..sensormanager.action import Action, Actionable
from ..types.detection import TrueDetection
from ..types.groundtruth import GroundTruthState
from ..types.hypothesis import SingleHypothesis
from ..types.prediction import Prediction
from ..types.shape import AreaOfInterest
from ..types.state import State
from ..types.track import Track
from ..updater import Updater
from ..updater.kalman import ExtendedKalmanUpdater
from ..updater.particle import ParticleUpdater
[docs]
class RewardFunction(Base, ABC):
"""
The reward function base class.
A reward function is a callable used by a sensor manager to determine the best choice of
action(s) for a sensor or group of sensors to take. For a given configuration of sensors
and actions the reward function calculates a metric to evaluate how useful that choice
of actions would be with a particular objective or objectives in mind.
The sensor manager algorithm compares this metric for different possible configurations
and chooses the appropriate sensing configuration to use at that time step.
"""
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime.datetime, *args, **kwargs):
"""
A method which returns a reward metric based on information about the state of the
system, sensors and possible actions they can take. This requires a mapping of
sensors to action(s) to be evaluated by reward function, a set of tracks at given
time and the time at which the actions would be carried out until.
Returns
-------
: float
Calculated metric
"""
raise NotImplementedError
[docs]
class AdditiveRewardFunction(RewardFunction):
"""Additive reward function
Elementwise addition of corresponding reward functions.
"""
reward_function_list: Sequence[RewardFunction] = Property(doc="List of reward functions")
weights: list = Property(default=None, doc="Weight for each reward function.")
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime.datetime, *args, **kwargs):
if self.weights is None:
self.weights = [1] * len(self.reward_function_list)
if len(self.reward_function_list) != len(self.weights):
raise IndexError
return np.sum([reward_function(config, tracks, metric_time, *args, **kwargs) * weight
for reward_function, weight in
zip(self.reward_function_list, self.weights)])
[docs]
class MultiplicativeRewardFunction(RewardFunction):
"""Multiplicative reward function
Elementwise multiplication of corresponding reward functions.
"""
reward_function_list: Sequence[RewardFunction] = Property(doc="List of reward functions")
weights: list = Property(default=None, doc="Weight for each reward function.")
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime.datetime, *args, **kwargs):
if self.weights is None:
self.weights = [1] * len(self.reward_function_list)
if len(self.reward_function_list) != len(self.weights):
raise IndexError
return np.prod([reward_function(config, tracks, metric_time, *args, **kwargs) * weight
for reward_function, weight in
zip(self.reward_function_list, self.weights)])
[docs]
class UncertaintyRewardFunction(RewardFunction):
"""A reward function which calculates the potential reduction in the uncertainty of track
estimates if a particular action is taken by a sensor or group of sensors.
Given a configuration of sensors and actions, a metric is calculated for the potential
reduction in the uncertainty of the tracks that would occur if the sensing configuration
were used to make an observation. A larger value indicates a greater reduction in
uncertainty.
"""
predictor: KalmanPredictor = Property(doc="Predictor used to predict the track to a new state")
updater: ExtendedKalmanUpdater = Property(doc="Updater used to update "
"the track to the new state.")
method_sum: bool = Property(default=True, doc="Determines method of calculating reward."
"Default calculates sum across all targets."
"Otherwise calculates mean of all targets.")
return_tracks: bool = Property(default=False,
doc="A flag for allowing the predicted track, "
"used to calculate the reward, to be "
"returned.")
measurement_noise: bool = Property(default=False,
doc="Decide whether or not to apply measurement model "
"noise to the predicted measurements for sensor "
"management.")
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime.datetime, *args, **kwargs):
"""
For a given configuration of sensors and actions this reward function calculates the
potential uncertainty reduction of each track by
computing the difference between the covariance matrix norms of the prediction
and the posterior assuming a predicted measurement corresponding to that prediction.
This requires a mapping of sensors to action(s)
to be evaluated by reward function, a set of tracks at given time and the time at which
the actions would be carried out until.
The metric returned is the total potential reduction in uncertainty across all tracks.
Returns
-------
: float
Metric of uncertainty for given configuration
"""
# Reward value
config_metric = 0
predicted_sensors = set()
memo = {}
# For each sensor/platform in the configuration
for actionable, actions in config.items():
predicted_actionable = copy.deepcopy(actionable, memo)
predicted_actionable.add_actions(actions)
predicted_actionable.act(metric_time, noise=False)
if isinstance(actionable, Sensor):
predicted_sensors.add(predicted_actionable) # checks if it's a sensor
# Create dictionary of predictions for the tracks in the configuration
predicted_tracks = set()
for track in tracks:
predicted_track = copy.copy(track)
predicted_track.append(self.predictor.predict(predicted_track, timestamp=metric_time))
predicted_tracks.add(predicted_track)
for sensor in predicted_sensors:
ground_truth_states = dict(
(GroundTruthState(predicted_track.mean,
timestamp=predicted_track.timestamp,
metadata=predicted_track.metadata),
predicted_track)
for predicted_track in predicted_tracks)
detections_set = sensor.measure(
set(ground_truth_states.keys()), noise=self.measurement_noise)
# Assumes one detection per track
detections = {
ground_truth_states[detection.groundtruth_path]: detection
for detection in detections_set
if isinstance(detection, TrueDetection)}
for predicted_track, detection in detections.items():
# Generate hypothesis based on prediction/previous update and detection
hypothesis = SingleHypothesis(predicted_track.state, detection)
# Do the update based on this hypothesis and store covariance matrix
update = self.updater.update(hypothesis)
previous_cov_norm = np.linalg.norm(predicted_track.covar)
update_cov_norm = np.linalg.norm(update.covar)
# Replace prediction with update
predicted_track.append(update)
# Calculate metric for the track observation and add to the metric
# for the configuration
metric = previous_cov_norm - update_cov_norm
config_metric += metric
if self.method_sum is False and len(detections) != 0:
config_metric /= len(detections)
# Return value of configuration metric
if self.return_tracks:
return config_metric, predicted_tracks
else:
return config_metric
[docs]
class ExpectedKLDivergence(RewardFunction):
"""A reward function that implements the Kullback-Leibler divergence
for quantifying relative information gain between actions taken by
a sensor or group of sensors.
From a configuration of sensors and actions, an expected measurement is
generated based on the predicted distribution and an action being taken.
An update is generated based on this measurement. The Kullback-Leibler
divergence is then calculated between the predicted and updated target
distribution that resulted from the measurement. A larger divergence
between these distributions equates to more information gained from
the action and resulting measurement from that action.
"""
predictor: Predictor = Property(default=None,
doc="Predictor used to predict the track to a "
"new state. This reward function is only "
"compatible with :class:`~.ParticlePredictor` "
"types.")
updater: Updater = Property(default=None,
doc="Updater used to update the track to the new state. "
"This reward function is only compatible with "
":class:`~.ParticleUpdater` types.")
method_sum: bool = Property(default=True,
doc="Determines method of calculating reward."
"Default calculates sum across all targets."
"Otherwise calculates mean of all targets.")
data_associator: DataAssociator = Property(default=None,
doc="Data associator for associating "
"detections to tracks when "
"multiple sensors are managed.")
return_tracks: bool = Property(default=False,
doc="A flag for allowing the predicted track, "
"used to calculate the reward, to be "
"returned.")
measurement_noise: bool = Property(default=False,
doc="Decide whether or not to apply measurement model "
"noise to the predicted measurements for sensor "
"management.")
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.KLD = KLDivergence()
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime.datetime, *args, **kwargs):
"""
For a given configuration of sensors and actions this reward function
calculates the expected Kullback-Leibler divergence of each track. It is
calculated between the prediction and the posterior assuming an expected update
based on a predicted measurement.
This requires a mapping of sensors to action(s) to be evaluated by the
reward function, a set of tracks at given time and the time at which
the actions would be carried out until.
The metric returned is the total expected Kullback-Leibler
divergence across all tracks.
Returns
-------
: float
Kullback-Leibler divergence for given configuration
: Set[Track] (if defined)
Set of tracks that have been predicted and updated in reward
calculation if :attr:`return_tracks` is `True`
"""
# Reward value
kld = 0.
memo = {}
predicted_sensors = set()
# For each actionable in the configuration
for actionable, actions in config.items():
# Don't currently have an Actionable base for platforms hence either Platform or Sensor
if isinstance(actionable, Platform) or isinstance(actionable, Actionable):
predicted_actionable = copy.deepcopy(actionable, memo)
predicted_actionable.add_actions(actions)
predicted_actionable.act(metric_time)
if isinstance(actionable, Sensor):
predicted_sensors.add(predicted_actionable) # checks if its a sensor
elif isinstance(actionable, Platform):
predicted_sensors.update(predicted_actionable.sensors)
# Create dictionary of predictions for the tracks in the configuration
predicted_tracks = set()
for track in tracks:
predicted_track = copy.copy(track)
if self.predictor:
predicted_track.append(self.predictor.predict(track[-1],
timestamp=metric_time))
else:
predicted_track.append(Prediction.from_state(track[-1],
timestamp=metric_time))
predicted_tracks.add(predicted_track)
sensor_detections = self._generate_detections(predicted_tracks,
predicted_sensors,
timestamp=metric_time)
det_count = 0
for sensor, detections in sensor_detections.items():
for predicted_track, detection_set in detections.items():
det_count += len(detection_set)
for n, detection in enumerate(detection_set):
# Generate hypothesis based on prediction/previous update and detection
hypothesis = SingleHypothesis(predicted_track.state, detection)
# Do the update based on this hypothesis and store covariance matrix
update = self.updater.update(hypothesis)
kld += self.KLD(predicted_track[-1], update)
if not isinstance(self, MultiUpdateExpectedKLDivergence):
predicted_track.append(update)
if self.method_sum is False and det_count != 0:
kld /= det_count
# Return value of configuration metric
if self.return_tracks:
return kld, predicted_tracks
else:
return kld
def _generate_detections(self, predicted_tracks, sensors, timestamp=None):
all_detections = {}
for sensor in sensors:
detections = {}
ground_truth_states = dict(
(GroundTruthState(predicted_track.mean,
timestamp=predicted_track.timestamp,
metadata=predicted_track.metadata),
predicted_track)
for predicted_track in predicted_tracks)
detections_set = sensor.measure(
set(ground_truth_states.keys()), noise=self.measurement_noise)
# Assumes one detection per track
detections = {
ground_truth_states[detection.groundtruth_path]: {detection}
for detection in detections_set
if isinstance(detection, TrueDetection)}
if self.data_associator:
tmp_hypotheses = self.data_associator.associate(
predicted_tracks,
{det for dets in detections.values() for det in dets},
timestamp)
detections = {predicted_track: {hypothesis.measurement}
for predicted_track, hypothesis in tmp_hypotheses.items()
if hypothesis}
all_detections.update({sensor: detections})
return all_detections
[docs]
class MultiUpdateExpectedKLDivergence(ExpectedKLDivergence):
"""A reward function that implements the Kullback-Leibler divergence
for quantifying relative information gain between actions taken by
a sensor or group of sensors.
From a configuration of sensors and actions, multiple expected measurements per
track are generated based on the predicted distribution and an action being taken.
The measurements are generated by resampling the particle state down to a
subsample with length specified by the user. Updates are generated for each of
these measurements and the Kullback-Leibler divergence calculated for each
of them.
"""
predictor: ParticlePredictor = Property(default=None,
doc="Predictor used to predict the track to a "
"new state. This reward function is only "
"compatible with :class:`~.ParticlePredictor` "
"types.")
updater: ParticleUpdater = Property(default=None,
doc="Updater used to update the track to the new state. "
"This reward function is only compatible with "
":class:`~.ParticleUpdater` types.")
updates_per_track: int = Property(default=2,
doc="Number of measurements to generate from each "
"track prediction. This should be > 1.")
measurement_noise: bool = Property(default=True)
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.KLD = KLDivergence()
if self.predictor is not None and not isinstance(self.predictor, ParticlePredictor):
raise NotImplementedError('Only ParticlePredictor types are currently compatible '
'with this reward function')
if self.updater is not None and not isinstance(self.updater, ParticleUpdater):
raise NotImplementedError('Only ParticleUpdater types are currently compatible '
'with this reward function')
if self.updates_per_track < 2:
raise ValueError(f'updates_per_track = {self.updates_per_track}. This reward '
f'function only accepts >= 2')
def _generate_detections(self, predicted_tracks, sensors, timestamp=None):
detections = {}
all_detections = {}
resampler = SystematicResampler()
for sensor in sensors:
for predicted_track in predicted_tracks:
measurement_sources = resampler.resample(predicted_track[-1],
nparts=self.updates_per_track)
tmp_detections = set()
for state in measurement_sources.state_vector:
tmp_detections.update(
sensor.measure({GroundTruthState(state,
timestamp=timestamp,
metadata=predicted_track.metadata)},
noise=self.measurement_noise))
detections.update({predicted_track: tmp_detections})
all_detections.update({sensor: detections})
return all_detections
[docs]
class FOVInteractionRewardFunction(RewardFunction):
"""
A reward function for the FOV interaction scenario.
This function rewards the sensor for keeping the target in its FOV while
penalising it for entering the target's FOV.
"""
predictor: KalmanPredictor = Property(
doc="The predictor used to predict the track to a new state.")
updater: ExtendedKalmanUpdater = Property(
doc="The updater used to update the track to the new state.")
sensor_fov_radius: float = Property(
default=20.0, doc="The radius of the sensor platform's field of view.")
target_fov_radius: float = Property(
default=10.0, doc="The assumed radius of the target's field of view.")
sensor_mapping: list[int] = Property(
default=(0, 1),
doc="The mapping of sensor platform coordinates. Used to calculate the distance between"
"the sensor and target in the reward function.")
target_mapping: list[int] = Property(
default=(0, 2),
doc="The mapping of target coordinates. Used to calculate the distance between the"
"sensor and target in the reward function.")
fov_scale: float = Property(default=1.0, doc="")
track_weight: float = Property(
default=2.0,
doc="The weight of the reward for keeping the target in the sensor's FOV.")
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime, *args, **kwargs) -> float:
"""
Calculate the reward for a given sensor and predicted target state.
Parameters
----------
sensor : Sensor
The sensor platform.
predicted_state : State
The predicted state of the target.
Returns
-------
float
The calculated reward.
"""
measure = Euclidean(self.sensor_mapping, self.target_mapping)
predicted_sensors = set()
memo = {}
# For each sensor/platform in the configuration
for actionable, actions in config.items():
predicted_actionable = copy.deepcopy(actionable, memo)
predicted_actionable.add_actions(actions)
predicted_actionable.act(metric_time, noise=False)
if isinstance(actionable, Sensor):
predicted_sensors.add(predicted_actionable) # checks if it's a sensor
# Create dictionary of predictions for the tracks in the configuration
predicted_tracks = set()
# This loops are not currently used for multiple tracks but are left in for future
# compatibility with multiple targets
for track in tracks:
predicted_track = copy.copy(track)
predicted_track.append(self.predictor.predict(predicted_track, timestamp=metric_time))
predicted_tracks.add(predicted_track)
no_tracks = int(len(predicted_tracks))
# This loop is not currently used for multiple sensors but is left in for future
# compatibility with multiple sensors
for sensor in predicted_sensors:
sensor_pos = sensor.position if isinstance(sensor.position, State) else State(sensor.position) # noqa: E501
total_reward = 0
# This loop is not currently used for multiple tracks but is left in for future
# compatibility with multiple targets
for target in predicted_tracks:
target_pos = target
distance = measure(sensor_pos, target_pos)
tracking_reward = (self.track_weight * no_tracks
if distance <= self.sensor_fov_radius * self.fov_scale
else -no_tracks)
# Penalty for entering the target's FOV
lack_of_stealth_penalty = (-no_tracks if distance <= self.target_fov_radius else 0.0)
# Combine the reward and penalty
total_reward += tracking_reward + lack_of_stealth_penalty
return total_reward
[docs]
class AOIRewardFunction2D(RewardFunction):
"""
A reward function which enables the use of different reward functions,
depending on the :class:`~.AreaOfInterest` the target is located in.
This function takes thresholds for how interested the sensor manager is in a particular area
(e.g. how important is achieving good tracking performance),
and how accessible an area is (e.g. how much risk is there for a sensor operating in that
area),
with mappings to a particular reward function to use when that
threshold is met.
The :class:`~.AdditiveRewardFunction` is used to combine the interest
and access reward functions if both thresholds are met. If no thresholds are met,
the default reward function is used.
"""
interest_thresholds: Mapping[int, RewardFunction] = Property(default=None,
doc="Mapping of interest "
"thresholds to reward functions")
access_thresholds: Mapping[int, RewardFunction] = Property(default=None,
doc="Mapping of access "
"thresholds to reward functions")
default_reward: RewardFunction = Property(doc="Default reward function")
areas: Sequence[AreaOfInterest] = Property(doc="List of areas")
target_mapping: tuple[int, int] = Property(doc="Position mapping for the target")
[docs]
def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: set[Track],
metric_time: datetime, *args, **kwargs):
reward_func = self.default_reward
for track in tracks:
track_x = track.state_vector[self.target_mapping[0]]
track_y = track.state_vector[self.target_mapping[1]]
for area in self.areas:
if area.xmin < track_x < area.xmax and area.ymin < track_y < area.ymax:
interest_reward = None
if self.interest_thresholds is not None:
for threshold, reward in self.interest_thresholds.items():
if threshold <= area.interest:
interest_reward = reward
access_reward = None
if self.access_thresholds is not None:
for threshold, reward in self.access_thresholds.items():
if threshold <= area.access:
access_reward = reward
if interest_reward and access_reward:
reward_func = AdditiveRewardFunction([interest_reward, access_reward])
elif interest_reward:
reward_func = interest_reward
elif access_reward:
reward_func = access_reward
else:
reward_func = self.default_reward
return reward_func(config, tracks, metric_time, *args, **kwargs)