Source code for stonesoup.platform.base
# -*- coding: utf-8 -*-
import datetime
import weakref
from abc import abstractmethod, ABC
from functools import lru_cache
from math import cos, sin
from typing import Sequence, MutableSequence, Optional, TYPE_CHECKING
from scipy.linalg import expm
import numpy as np
from ..functions import cart2sphere, cart2pol, rotz
from ..types.array import StateVector
from ..base import Property
from ..types.state import State, StateMutableSequence
from ..models.transition import TransitionModel
if TYPE_CHECKING: # pragma: no cover
from ..sensor.base import BaseSensor
[docs]class Platform(StateMutableSequence, ABC):
"""A platform that can carry a number of different sensors.
The location of platform mounted sensors will be maintained relative to
the sensor position. Platforms move within a 2 or 3 dimensional
rectangular cartesian space.
A simple platform is considered to always be aligned with its principle
velocity. It does not take into account issues such as bank angle or body
deformation (e.g. flex).
.. note:: This class abstract and not intended to be instantiated. To get the behaviour of
this class use a subclass which gives movement
behaviours. Currently these are :class:`~.FixedPlatform` and
:class:`~.MovingPlatform`
"""
states: Sequence[State] = Property(
doc="A list of States which enables the platform's history to be "
"accessed in simulators and for plotting. Initiated as a "
"state, for a static platform, this would usually contain its "
"position coordinates in the form ``[x, y, z]``. For a moving "
"platform it would contain position and velocity interleaved: "
"``[x, vx, y, vy, z, vz]``")
position_mapping: Sequence[int] = Property(
doc="Mapping between platform position and state vector. For a "
"position-only 3d platform this might be ``[0, 1, 2]``. For a "
"position and velocity platform: ``[0, 2, 4]``")
rotation_offsets: MutableSequence[StateVector] = Property(
default=None, readonly=True,
doc="A list of StateVectors containing the sensor rotation "
"offsets from the platform's primary axis (defined as the "
"direction of motion). Defaults to a zero vector with the "
"same length as the Platform's :attr:`position_mapping`")
mounting_offsets: MutableSequence[StateVector] = Property(
default=None, readonly=True,
doc="A list of StateVectors containing the sensor translation "
"offsets from the platform's reference point. Defaults to "
"a zero vector with the same length as the Platform's "
":attr:`position_mapping`")
sensors: MutableSequence['BaseSensor'] = Property(
default=None, readonly=True,
doc="A list of N mounted sensors. Defaults to an empty list")
velocity_mapping: Sequence[int] = Property(
default=None,
doc="Mapping between platform velocity and state dims. If not "
"set, it will default to ``[m+1 for m in position_mapping]``")
# TODO: Determine where a platform coordinate frame should be maintained
def __init__(self, *args, **kwargs):
"""
Ensure that the platform location and the sensor locations are
consistent at initialisation.
"""
super().__init__(*args, **kwargs)
# Set values to defaults if not provided
if self.sensors is None:
self._property_sensors = []
if self.velocity_mapping is None:
self.velocity_mapping = [p + 1 for p in self.position_mapping]
if self.mounting_offsets is None:
self._property_mounting_offsets = [StateVector([0] * self.ndim)] * len(self.sensors)
if self.rotation_offsets is None:
self._property_rotation_offsets = [StateVector([0] * 3)] * len(self.sensors)
if len(self.sensors) != len(self.mounting_offsets):
raise ValueError(
"Number of sensors associated with the platform does not "
"match the number of sensor mounting offsets specified")
if len(self.sensors) != len(self.rotation_offsets):
raise ValueError(
"Number of sensors associated with the platform does not "
"match the number of sensor rotation offsets specified")
# Store the platform weakref in each of the child sensors
for sensor in self.sensors:
sensor.platform_system = weakref.ref(self)
@property
def position(self) -> StateVector:
"""Return the position of the platform.
Extracted from the state vector of the platform using the platform's
:attr:`position_mapping`. This property is settable for fixed platforms, but not for
movable ones, where the position must be set by moving the model with a transition model.
"""
return self.state_vector[self.position_mapping, :]
@position.setter
def position(self, value: StateVector) -> None:
self._set_position(value)
@staticmethod
def _tuple_or_none(value):
return None if value is None else tuple(value)
@sensors.getter
def sensors(self):
return self._tuple_or_none(self._property_sensors)
@mounting_offsets.getter
def mounting_offsets(self):
return self._tuple_or_none(self._property_mounting_offsets)
@rotation_offsets.getter
def rotation_offsets(self):
return self._tuple_or_none(self._property_rotation_offsets)
@property
def ndim(self) -> int:
"""Convenience property to return the number of dimensions in which the platform operates.
Given by the length of the :attr:`position_mapping`
"""
return len(self.position_mapping)
@property
@abstractmethod
def orientation(self) -> StateVector:
"""Return the orientation of the platform.
Implementation is case dependent and left to the Fixed/Moving subclasses
"""
raise NotImplementedError
@property
@abstractmethod
def velocity(self) -> StateVector:
"""Return the velocity of the platform.
Implementation is case dependent and left to the Fixed/Moving subclasses
"""
raise NotImplementedError
[docs] @abstractmethod
def is_moving(self) -> bool:
"""Return the ``True`` if the platform is moving, ``False`` otherwise.
"""
raise NotImplementedError
[docs] @abstractmethod
def move(self, timestamp: datetime.datetime, **kwargs) -> None:
"""Update the platform position using the :attr:`transition_model`.
Parameters
----------
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the end of the maneuver \
(the default is ``None``)
Notes
-----
This methods updates the value of :attr:`position`.
Any provided ``kwargs`` are forwarded to the :attr:`transition_model`.
If :attr:`transition_model` or ``timestamp`` is ``None``, the method has
no effect, but will return successfully.
"""
raise NotImplementedError
@abstractmethod
def _set_position(self, value: StateVector) -> None:
raise NotImplementedError
[docs] def add_sensor(self, sensor: "BaseSensor", mounting_offset: Optional[StateVector] = None,
rotation_offset: Optional[StateVector] = None) -> None:
""" Add a sensor to the platform
Parameters
----------
sensor : :class:`~.BaseSensor`
The sensor object to add
mounting_offset : :class:`~.StateVector`, optional
A StateVector with the mounting offset of the new sensor. If not supplied, defaults to
a zero vector
rotation_offset : :class:`~.StateVector`, optional
A StateVector with the rotation offset of the new sensor. If not supplied, defaults to
a zero vector.
"""
self._property_sensors.append(sensor)
sensor.platform_system = weakref.ref(self)
if mounting_offset is None:
mounting_offset = StateVector([0] * self.ndim)
if rotation_offset is None:
rotation_offset = StateVector([0] * 3)
self._property_mounting_offsets.append(mounting_offset)
self._property_rotation_offsets.append(rotation_offset)
[docs] def remove_sensor(self, sensor: "BaseSensor") -> None:
""" Remove a sensor from the platform
Parameters
----------
sensor : :class:`~.BaseSensor`
The sensor object to remove
"""
self.pop_sensor(self._property_sensors.index(sensor))
[docs] def pop_sensor(self, index: int):
""" Remove a sensor from the platform by index
Parameters
----------
index : int
The index of the sensor to remove
"""
self._property_sensors.pop(index)
self._property_mounting_offsets.pop(index)
self._property_rotation_offsets.pop(index)
[docs] def get_sensor_position(self, sensor: "BaseSensor") -> StateVector:
"""Return the position of the given sensor, which should be already attached to the
platform. If the sensor is not attached to the platform, raises a :class:`ValueError`.
Parameters
----------
sensor : :class:`~.BaseSensor`
The sensor for which to return the position.
Returns
-------
: :class:`StateVector`
The position of the sensor, taking into account the platform position and orientation
and the mounting offset of the sensor.
"""
i = self.sensors.index(sensor)
if self.is_moving:
offset = self._get_rotated_offset(i)
else:
offset = self.mounting_offsets[i]
new_sensor_pos = self.position + offset
return new_sensor_pos
[docs] def get_sensor_orientation(self, sensor: "BaseSensor") -> StateVector:
"""Return the orientation of the given sensor, which should be already attached to the
platform. If the sensor is not attached to the platform, raises a :class:`ValueError`.
Parameters
----------
sensor : :class:`~.BaseSensor`
The sensor for which to return the orientation.
Returns
-------
: :class:`StateVector`
The orientation of the sensor, taking into account the platform orientation
and the rotation offset of the sensor.
"""
# TODO handle roll?
i = self.sensors.index(sensor)
offset = self.rotation_offsets[i]
return self.orientation + offset
def _get_rotated_offset(self, i: int) -> np.ndarray:
""" Determine the sensor mounting offset for the platforms relative
orientation.
Parameters
----------
i : :class:`int`
Integer reference to the sensor index
Returns
-------
: :class:`np.ndarray`
Sensor mounting offset rotated relative to platform motion
"""
vel = self.velocity
rot = _get_rotation_matrix(vel)
return rot @ self.mounting_offsets[i]
[docs]class FixedPlatform(Platform):
"""Fixed platform base class
A platform represents a random object defined as a :class:`~.State`
with fixed (but settable) position and orientation.
.. note:: Position and orientation are a read/write properties in this class.
"""
orientation: StateVector = Property(
default=None,
doc='A fixed orientation of the static platform. Defaults to the zero vector')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.orientation is None:
self.orientation = StateVector([0, 0, 0])
def _set_position(self, value: StateVector) -> None:
self.state_vector[self.position_mapping, :] = value
@property
def velocity(self) -> StateVector:
"""Return the velocity of the platform.
For a fixed platform this is always a zero vector of length :attr:`ndim`.
"""
return StateVector([0] * self.ndim)
@property
def is_moving(self) -> bool:
return False
[docs] def move(self, timestamp: datetime.datetime, **kwargs) -> None:
"""For a fixed platform this method has no effect other than to update the timestamp."""
# TODO Is this a sensible implementation?
# Return without moving static platforms
self.state.timestamp = timestamp
[docs]class MovingPlatform(Platform):
"""Moving platform base class
A platform represents a random object defined as a :class:`~.State`
that moves according to a given :class:`~.TransitionModel`.
.. note:: Position and orientation are a read only properties in this class.
"""
transition_model: TransitionModel = Property(doc="Transition model")
@property
def velocity(self) -> StateVector:
"""Return the velocity of the platform.
Extracted from the state vector of the platform using the platform's
:attr:`velocity_mapping`. If the state vector is too short and does not contain the
elements specified in the :attr:`velocity_mapping` this raises an :class:`AttributeError`
"""
try:
return self.state_vector[self.velocity_mapping, :]
except IndexError:
raise AttributeError('Velocity is not defined for this platform')
@property
def orientation(self) -> StateVector:
"""Return the orientation of the platform.
This is defined as a 3x1 StateVector of angles (rad), specifying the sensor orientation in
terms of the counter-clockwise rotation around each Cartesian axis in the order
:math:`x,y,z`. The rotation angles are positive if the rotation is in the counter-clockwise
direction when viewed by an observer looking along the respective rotation axis,
towards the origin.
The orientation of this platform is defined as along the direction of its velocity, with
roll always set to zero (as this is the angle the platform is rotated about the velocity
axis, which is not defined in this approximation).
Notes
-----
A non-moving platform (``self.is_moving == False``) does not have a defined orientation in
this approximations and so raises an :class:`AttributeError`
"""
if not self.is_moving:
raise NotImplementedError('Orientation of a zero-velocity moving platform is not'
'defined')
velocity = self.velocity
if self.ndim == 3:
_, bearing, elevation = cart2sphere(*velocity.flat)
return StateVector([0, elevation, bearing])
elif self.ndim == 2:
_, bearing = cart2pol(*velocity.flat)
return StateVector([0, 0, bearing])
else:
raise NotImplementedError('Orientation of a moving platform is only implemented for 2'
'and 3 dimensions')
@property
def is_moving(self) -> bool:
"""Return the ``True`` if the platform is moving, ``False`` otherwise.
Equivalent (for this class) to ``all(v == 0 for v in self.velocity)``
"""
# Note: a platform without a transition model can be given a velocity as part of it's
# StateVector. It just won't move
# This inconsistency is handled in the move logic
return np.any(self.velocity != 0)
def _set_position(self, value: StateVector):
# The logic below is this: if a moving platform is being built from (say) input
# real-world data then its transition model would not be set, and so it would be fine to
# set its position. However, if the transition model is set, then setting the position is
# both unexpected and may cause odd effects, so is forbidden
if self.transition_model is None:
self.state_vector[self.position_mapping, :] = value
else:
raise AttributeError('Cannot set the position of a moving platform with a '
'transition model')
[docs] def move(self, timestamp=None, **kwargs) -> None:
"""Propagate the platform position using the :attr:`transition_model`.
Parameters
----------
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying when the end of the maneuver \
(the default is ``None``)
Notes
-----
This methods updates the value of :attr:`position`.
Any provided ``kwargs`` are forwarded to the :attr:`transition_model`.
If :attr:`transition_model` or ``timestamp`` is ``None``, the method has
no effect, but will return successfully.
"""
if self.state.timestamp is None:
self.state.timestamp = timestamp
return
# Compute time_interval
try:
time_interval = timestamp - self.state.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
return
if self.transition_model is None:
raise AttributeError('Platform without a transition model cannot be moved')
self.states.append(State(
state_vector=self.transition_model.function(
state=self.state,
noise=True,
timestamp=timestamp,
time_interval=time_interval,
**kwargs),
timestamp=timestamp))
[docs]class MultiTransitionMovingPlatform(MovingPlatform):
"""Moving platform with multiple transition models
A list of transition models are given with corresponding transition times, dictating the
movement behaviour of the platform for given durations.
"""
transition_models: Sequence[TransitionModel] = Property(doc="List of transition models")
transition_times: Sequence[datetime.timedelta] = Property(doc="Durations for each listed "
"transition model")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if len(self.transition_models) != len(self.transition_times):
raise AttributeError('transition_models and transition_times must be same length')
self.transition_index = 0
self.current_interval = self.transition_times[0]
@property
def transition_model(self):
return self.transition_models[self.transition_index]
[docs] def move(self, timestamp=None, **kwargs) -> None:
"""Propagate the platform position using the :attr:`transition_model`.
Parameters
----------
timestamp: :class:`datetime.datetime`, optional
A timestamp signifying the end of the maneuver (the default is ``None``)
Notes
-----
This methods updates the value of :attr:`position`.
Any provided ``kwargs`` are forwarded to the :attr:`transition_model`.
If :attr:`transition_model` or ``timestamp`` is ``None``, the method has
no effect, but will return successfully.
This method updates :attr:`transition_model`, :attr:`transition_index` and
:attr:`current_interval`:
If the timestamp provided gives a time delta greater than :attr:`current_interval` the
:attr:`transition_model` is called for the rest of its corresponding duration, and the move
method is called again on the next transition model (by incrementing
:attr:`transition_index`) in :attr:`transition_models` with the residue time delta.
If the time delta is less than :attr:`current_interval` the :attr:`transition_model` is
called for that duration and :attr:`current_interval` is reduced accordingly.
"""
if self.state.timestamp is None:
self.state.timestamp = timestamp
return
try:
time_interval = timestamp - self.state.timestamp
except TypeError:
# TypeError: (timestamp or prior.timestamp) is None
return
temp_state = self.state
while time_interval != 0:
if time_interval >= self.current_interval:
temp_state = State(
state_vector=self.transition_model.function(
state=temp_state,
noise=True,
time_interval=self.current_interval,
**kwargs),
timestamp=timestamp
)
time_interval -= self.current_interval
self.transition_index = (self.transition_index + 1) % len(self.transition_models)
self.current_interval = self.transition_times[self.transition_index]
else:
temp_state = State(
state_vector=self.transition_model.function(
state=temp_state,
noise=True,
time_interval=time_interval,
**kwargs),
timestamp=timestamp
)
self.current_interval -= time_interval
time_interval = 0
self.states.append(temp_state)
def _get_rotation_matrix(vel: StateVector) -> np.ndarray:
""" Generates a rotation matrix which can be used to determine the
corrected sensor offsets.
In the 2d case this returns the following rotation matrix
[cos[theta] -sin[theta]]
[cos[theta] sin[theta]]
In the 2d case this will be a 3x3 matrix which rotates around the Z axis
followed by a rotation about the new Y axis.
Parameters
----------
vel : StateVector
Dx1 vector denoting platform velocity in D dimensions
Returns
-------
np.array
DxD rotation matrix
"""
if len(vel) == 3:
return _rot3d(vel)
elif len(vel) == 2:
theta = _get_angle(vel, np.array([[1, 0]]))
if vel[1] < 0:
theta *= -1
return np.array([[cos(theta), -sin(theta)],
[sin(theta), cos(theta)]])
else:
raise NotImplementedError
def _get_angle(vec: StateVector, axis: np.ndarray) -> float:
""" Returns the angle between a pair of vectors. Used to determine the
angle of rotation required between relative rectangular cartesian
coordinate frame of reference and platform inertial frame of reference.
Parameters
----------
vec : StateVector
1xD array denoting platform velocity
axis : np.ndarray
Dx1 array denoting sensor offset relative to platform
Returns
-------
Angle : float
Angle, in radians, between the two vectors
"""
vel_norm = vec / np.linalg.norm(vec)
axis_norm = axis / np.linalg.norm(axis)
return np.arccos(np.clip(np.dot(axis_norm, vel_norm), -1.0, 1.0))
def _rot3d(vec: np.ndarray) -> np.ndarray:
"""
This approach determines the platforms attitude based upon its velocity
component. It does not take into account potential platform roll, nor
are the components calculated to account for physical artifacts such as
platform trim (e.g. aircraft yaw whilst flying forwards).
The process determines the yaw (x-y) and pitch (z to x-y plane) angles.
The rotation matrix for a rotation by yaw around the Z-axis is then
calculated, the rotated Y axis is then determined and used to calculate the
rotation matrix which takes into account the platform pitch
Parameters
----------
vec: StateVector
platform velocity
Returns
-------
np.ndarray
3x3 rotation matrix
"""
return _rot3d_tuple(tuple(vec.flat))
@lru_cache(maxsize=128)
def _rot3d_tuple(vec: tuple) -> np.ndarray:
""" Private method. Should not be called directly, only from `_rot3d`
Params and returns as :func:`~_rot3d`
This wrapped method takes a tuple rather than a state vector. This allows caching, which
is important as the new sensor approach means `_rot3d` is called on each call to get_position,
and becomes a significant performance hit.
"""
# TODO handle platform roll
yaw = np.arctan2(vec[1], vec[0])
pitch = np.arctan2(vec[2],
np.sqrt(vec[0] ** 2 + vec[1] ** 2)) * -1
rot_z = rotz(yaw)
# Modify to correct for new y axis
y_axis = np.array([0, 1, 0])
rot_y = expm(np.cross(np.eye(3), np.dot(rot_z, y_axis) * pitch))
return np.dot(rot_y, rot_z)