from abc import abstractmethod
from collections.abc import Iterator, Sequence
from itertools import product
import numpy as np
from ...base import Property
from ...sensormanager.action import ActionGenerator, Action
from ...types.state import StateVector
[docs]
class MovePositionAction(Action):
"""This is the base class for an action that changes the
position of a platform or sensor."""
[docs]
def act(self, current_time, timestamp, init_value):
return self.target_value
[docs]
class GridActionGenerator(ActionGenerator):
"""This is the base class for generators that generate actions in a grid like fashion."""
action_space: np.ndarray = Property(
default=None,
doc="The bounds of the action space that should not be exceeded. Of shape (ndim, 2) "
"where ndim is the length of the action_mapping. For example, "
":code:`np.array([[xmin, xmax], [ymin, ymax]])`."
)
action_mapping: Sequence[int] = Property(
default=(0, 1),
doc="The state dimensions that actions are applied to."
)
resolution: float = Property(
default=1,
doc="The size of each grid cell. Cells are assumed square."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.action_space is not None:
if len(self.action_space) != len(self.action_mapping):
raise ValueError(f"Dimensions of action_space {self.action_space.shape} "
f"are not compatible with action_mapping of length "
f"{len(self.action_mapping)}. action_space should be "
f"of shape (ndim, 2) where ndim is the length of the "
f"action_mapping.")
if (np.any(self.current_value[self.action_mapping, :] < self.action_space[:, [0]])
or np.any(self.current_value[self.action_mapping, :] > self.action_space[:, [1]])): # noqa: E501
raise ValueError(f"Initial platform location {self.current_value} is not within "
f"the bounds of the action space {self.action_space}.")
def __contains__(self, item):
return item in iter(self)
@abstractmethod
def __iter__(self) -> Iterator[MovePositionAction]:
raise NotImplementedError
[docs]
class NStepDirectionalGridActionGenerator(GridActionGenerator):
"""This is a grid action generator that enables movement by a number
of steps in the specified directions. Actions are applied symmetrically so
can move by a number of steps in positive and negative directions along
the specified dimensions."""
n_steps: int = Property(
default=1,
doc="The number of steps that can be moved in either direction "
"along specified dimensions"
)
step_size: int = Property(
default=1,
doc="The number of grid cells per step"
)
@property
def default_action(self):
return MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
def __iter__(self):
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
action_deltas = np.linspace(-1*self.n_steps*self.step_size*self.resolution,
self.n_steps*self.step_size*self.resolution,
2*self.n_steps+1)
for dim in self.action_mapping:
for n in action_deltas:
if n == 0:
continue
value = StateVector(np.zeros(len(self.current_value)))
value[dim] += n
target_value = self.current_value + value
if self.action_space is None or \
(np.all(target_value[self.action_mapping, :] >= self.action_space[:, [0]])
and np.all(target_value[self.action_mapping, :] <= self.action_space[:, [1]])): # noqa: E501
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=target_value)
[docs]
class SamplePositionActionGenerator(ActionGenerator):
"""Base action generator for sampling approaches to action generation. The action
generator requires the user to define a number of samples to generate
(:attr:`n_samples`) according to the defined sampling technique."""
action_space: np.ndarray = Property(
default=None,
doc="The bounds of the action space that should not be exceeded. Of shape (ndim, 2) "
"where ndim is the length of the action_mapping. For example, "
":code:`np.array([[xmin, xmax], [ymin, ymax]])`."
)
action_mapping: Sequence[int] = Property(
default=(0, 1),
doc="The state dimensions that actions are applied to."
)
n_samples: int = Property(
default=10,
doc="Number of samples to generate. This does not include the action "
"to remain at the current position, meaning :attr:`n_samples` +1 "
"actions will be generated."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.action_space is not None:
if len(self.action_space) != len(self.action_mapping):
raise ValueError(f"Dimensions of action_space {self.action_space.shape} "
f"are not compatible with action_mapping of length "
f"{len(self.action_mapping)}. action_space should be "
f"of shape (ndim, 2) where ndim is the length of the "
f"action_mapping.")
if (np.any(self.current_value[self.action_mapping, :] < self.action_space[:, [0]])
or np.any(self.current_value[self.action_mapping, :] > self.action_space[:, [1]])): # noqa: E501
raise ValueError(f"Initial platform location {self.current_value} is not within "
f"the bounds of the action space {self.action_space}.")
@property
def default_action(self):
return MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
def __contains__(self, item):
return item in iter(self)
@abstractmethod
def __iter__(self):
raise NotImplementedError
[docs]
class CircleSamplePositionActionGenerator(SamplePositionActionGenerator):
"""Action generator which samples candidate future positions uniformly within
a circle around the current position. Circle radius is defined by the user
with :attr:`maximum_travel`. This generator is only applicable to 2D position
actions."""
maximum_travel: float = Property(
default=1.0,
doc="Maximum possible travel distance. Specifies the radius of "
"sampling area."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if len(self.action_mapping) != 2:
raise ValueError(f"Action mapping {self.action_mapping} does "
f"not have 2 dimensions. "
f":class:`~.CircleSamplePositionActionGenerator` "
f"is designed for 2D action generation only.")
def __iter__(self):
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
radius_angle_samples = np.random.uniform([0, 0], [1, 2*np.pi], (self.n_samples, 2))
sample_values = self.maximum_travel*np.sqrt(radius_angle_samples[:, 0]) *\
np.array([np.sin(radius_angle_samples[:, 1]), np.cos(radius_angle_samples[:, 1])])
values = np.zeros((self.current_value.shape[0], self.n_samples))
values[self.action_mapping,] = sample_values
target_values = self.current_value + values
if self.action_space is not None:
while (np.any(target_values[self.action_mapping, :] < self.action_space[:, [0]])
or np.any(target_values[self.action_mapping, :] > self.action_space[:, [1]])):
_, idx = np.where(np.logical_or(
target_values[self.action_mapping, :] > self.action_space[:, [1]],
target_values[self.action_mapping, :] < self.action_space[:, [0]]))
radius_angle_samples = np.random.uniform([0, 0], [1, 2*np.pi], (len(idx), 2))
sample_values = self.maximum_travel*np.sqrt(radius_angle_samples[:, 0]) *\
np.array([np.sin(radius_angle_samples[:, 1]),
np.cos(radius_angle_samples[:, 1])])
values = np.zeros((self.current_value.shape[0], len(idx)))
values[self.action_mapping,] = sample_values
target_values[:, idx] = self.current_value + values
for target_value in target_values.T:
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=StateVector(target_value))
[docs]
class MaxSpeedPositionActionGenerator(ActionGenerator):
"""Action generator which generates actions uniformly within a circle around the current
position. Circle radius is defined by the :attr:`max_speed` of the platform and the duration of
the action. This generator is only applicable to 2D position actions."""
action_space: np.ndarray = Property(
default=None,
doc="The bounds of the action space that should not be exceeded. Of shape (ndim, 2) "
"where ndim is the length of the action_mapping. For example, "
":code:`np.array([[xmin, xmax], [ymin, ymax]])`."
)
action_mapping: Sequence[int] = Property(
default=(0, 1),
doc="The state dimensions that actions are applied to."
)
max_speed: float = Property(
default=1,
doc="The maximum speed that the platform can move in m/s."
)
resolution: float = Property(
default=1,
doc="The interval in distance travelled for each action."
)
angle_resolution: float = Property(
default=np.pi/2,
doc="The interval in angle for each action."
)
epsilon = 1e-6
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if len(self.action_mapping) != 2:
raise ValueError(f"Action mapping {self.action_mapping} does not have 2 dimensions."
":class:`~.RangeAngleActionGenerator` is designed for 2D action "
"generation only.")
if self.action_space is not None:
if len(self.action_space) != len(self.action_mapping):
raise ValueError(f"Dimensions of action_space {self.action_space.shape} are not "
"compatible with action_mapping of length "
f"{len(self.action_mapping)}. action_space should be of shape "
"(ndim, 2) where ndim is the length of the action_mapping.")
if (np.any(self.current_value[self.action_mapping, :] < self.action_space[:, [0]])
or np.any(self.current_value[self.action_mapping, :] > self.action_space[:, [1]])): # noqa: E501
raise ValueError(f"Initial platform location {self.current_value} is not within "
f"the bounds of the action space {self.action_space}.")
@property
def duration(self):
return self.end_time - self.start_time
@property
def maximum_travel(self):
return self.duration.total_seconds() * self.max_speed
@property
def angle_deltas(self):
return np.arange(0, 2*np.pi, self.angle_resolution)
@property
def range_deltas(self):
return np.arange(0, self.maximum_travel + self.epsilon, self.resolution)
def __contains__(self, item):
if isinstance(item, MovePositionAction):
item = item.target_value
distance = np.sqrt(sum((item - self.current_value)**2))
return distance <= self.maximum_travel + self.epsilon
@property
def default_action(self):
return MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
def __iter__(self):
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=self.current_value)
for angle_, range_ in product(self.angle_deltas, self.range_deltas):
if range_ == 0:
continue
value = np.zeros(self.current_value.shape)
value[self.action_mapping,] = np.array([[range_*np.cos(angle_)],
[range_*np.sin(angle_)]])
target_value = self.current_value + value
if self.action_space is None or \
(np.all(target_value[self.action_mapping, :] >= self.action_space[:, [0]])
and np.all(target_value[self.action_mapping, :] <= self.action_space[:, [1]])): # noqa: E501
yield MovePositionAction(generator=self,
end_time=self.end_time,
target_value=target_value)
[docs]
def action_from_value(self, value=None) -> MovePositionAction:
"""
Given a value for the position, generates the action would achieve that value.
Parameters
----------
value: StateVector
Property value for which the action is required.
Returns
-------
MovePositionAction
Action which will achieve this position.
"""
if value not in self:
return None
return MovePositionAction(generator=self,
end_time=self.end_time,
target_value=value)