import datetime
import inspect
from abc import ABC, abstractmethod
from collections.abc import Sequence, Iterator
from typing import Any
from stonesoup.base import Base, Property
[docs]
class Action(Base):
"""The base class for an action that can be taken by a sensor or platform with an
:class:`~.ActionableProperty`."""
generator: Any = Property(default=None,
readonly=True,
doc="Action generator that created the action.")
end_time: datetime.datetime = Property(readonly=True,
doc="Time at which modification of the "
"attribute ends.")
target_value: Any = Property(doc="Target value.")
[docs]
def act(self, current_time, timestamp, init_value, **kwargs):
"""Return the attribute modified.
Parameters
----------
current_time: datetime.datetime
Current time
timestamp: datetime.datetime
Modification of attribute ends at this time stamp
init_value: Any
Current value of the modifiable attribute
Returns
-------
Any
The new value of the attribute
"""
raise NotImplementedError()
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return all(getattr(self, name) == getattr(other, name) for name in type(self).properties)
def __hash__(self):
return hash(tuple(getattr(self, name) for name in type(self).properties))
[docs]
class ActionGenerator(Base):
"""The base class for an action generator."""
owner: object = Property(doc="Actionable object that has the attribute to be modified.")
attribute: str = Property(doc="The name of the attribute to be modified.")
start_time: datetime.datetime = Property(doc="Start time of action.")
end_time: datetime.datetime = Property(doc="End time of action.")
resolution: float = Property(default=None, doc="Resolution of action space")
@abstractmethod
def __contains__(self, item):
raise NotImplementedError()
@abstractmethod
def __iter__(self) -> Iterator[Action]:
raise NotImplementedError()
@property
def current_value(self):
"""Return the current value of the owner's attribute."""
return getattr(self.owner, self.attribute)
@property
def default_action(self):
"""The default action to modify the property if there is no given action."""
raise NotImplementedError()
[docs]
class RealNumberActionGenerator(ActionGenerator):
"""Action generator where action is a choice of a real number."""
@property
@abstractmethod
def initial_value(self):
raise NotImplementedError
@property
@abstractmethod
def min(self):
raise NotImplementedError
@property
@abstractmethod
def max(self):
raise NotImplementedError
[docs]
class ActionableProperty(Property):
"""Property that is modified via an :class:`~.Action` with defined, non-equal start and end
times."""
def __init__(self, generator_cls, generator_kwargs_mapping=None,
cls=None, *, default=inspect.Parameter.empty,
doc=None, readonly=False):
super().__init__(cls=cls, default=default, doc=doc, readonly=readonly)
self.generator_cls = generator_cls
self.generator_kwargs_mapping = generator_kwargs_mapping
if generator_kwargs_mapping is None:
self.generator_kwargs_mapping = dict()
[docs]
class Actionable(Base, ABC):
"""Base Actionable type.
Contains the core methods of an actionable sensor/platform type.
Notes
-----
An Actionable is required to have a `timestamp` attribute, in order to validate actions and
act. This is an abstract base class, and not intended for direct use. Attaining a timestamp is
left to the inheriting type.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._generator_kwargs = dict()
self.scheduled_actions = dict() # dictionary of property - action pairs
@property
def _actionable_properties(self):
"""Dictionary of all name - property pairs where the property is an
:class:`~.ActionableProperty` (i.e. it is modified via action)."""
return {_name: _property for _name, _property in type(self).properties.items()
if isinstance(_property, ActionableProperty)}
def _default_action(self, name, property_, timestamp):
"""Returns the default action of the action generator associated with the property
(assumes the property is an :class:`~.ActionableProperty`."""
generator = property_.generator_cls(
owner=self,
attribute=name,
start_time=self.timestamp,
end_time=timestamp,
**{key: getattr(self, value)
for key, value in property_.generator_kwargs_mapping.items()})
return generator.default_action
[docs]
def actions(self, timestamp: datetime.datetime, start_timestamp: datetime.datetime = None
) -> set[ActionGenerator]:
"""Method to return a set of action generators available up to a provided timestamp.
A generator is returned for each actionable property that the sensor has.
Parameters
----------
timestamp: datetime.datetime
Time of action finish.
start_timestamp: datetime.datetime, optional
Time of action start.
Returns
-------
: set of :class:`~.ActionGenerator`
Set of action generators, that describe the bounds of each action space.
"""
if not self.validate_timestamp():
self.timestamp = timestamp
if start_timestamp is None:
start_timestamp = self.timestamp
generators = set()
for name, property_ in self._actionable_properties.items():
generators.add(property_.generator_cls(
owner=self,
attribute=name,
start_time=start_timestamp,
end_time=timestamp,
**{key: getattr(self, value)
for key, value in property_.generator_kwargs_mapping.items()}))
return generators
[docs]
def add_actions(self, actions: Sequence[Action]) -> bool:
"""Add actions to the sensor
Parameters
----------
actions: sequence of :class:`~.Action`
Sequence of actions that will be executed in order
Returns
-------
bool
Return True if actions accepted. False if rejected.
Returns neither if timestamp is invalid.
Raises
------
NotImplementedError
If sensor cannot be tasked.
Notes
-----
Base class returns True
"""
if not self.validate_timestamp():
return
if any(action.end_time < self.timestamp for action in actions):
raise ValueError("Cannot schedule an action that ends before the current time.")
if len(actions) > len(self._actionable_properties):
raise ValueError("Cannot schedule more actions than there are actionable properties.")
for name in self._actionable_properties:
for action in actions:
if action.generator.attribute == name:
self.scheduled_actions[name] = action
break
return True
[docs]
def act(self, timestamp: datetime.datetime, **kwargs):
"""Carry out actions up to a timestamp.
Parameters
----------
timestamp: datetime.datetime
Carry out actions up to this timestamp.
"""
if not self.validate_timestamp():
self.timestamp = timestamp
return
for name, property_ in self._actionable_properties.items():
value = getattr(self, name)
try:
action = self.scheduled_actions[name]
except KeyError:
action = self._default_action(name, property_, timestamp)
setattr(self, name, action.act(self.timestamp, timestamp, value, **kwargs))
else:
end_time = action.end_time
if end_time < timestamp:
# complete action, remove from schedule
# switch to default, and carry-out default until timestamp
interim_value = action.act(self.timestamp, end_time, value, **kwargs)
# remove scheduled action
self.scheduled_actions.pop(name)
action = self._default_action(name, property_, timestamp)
setattr(self, name, action.act(end_time, timestamp, interim_value, **kwargs))
elif end_time == timestamp:
# complete action and remove from schedule
setattr(self, name, action.act(self.timestamp, timestamp, value, **kwargs))
self.scheduled_actions.pop(name)
else:
# carry-out action to timestamp
setattr(self, name, action.act(self.timestamp, timestamp, value, **kwargs))
self.timestamp = timestamp
[docs]
@abstractmethod
def validate_timestamp(self) -> bool:
"""Method to validate the timestamp of the actionable.
Returns
-------
bool
True if timestamp is valid, False otherwise.
"""
raise NotImplementedError