import copy
import datetime
import warnings
from collections.abc import Iterable, Callable
from itertools import pairwise
from typing import Union
import numpy as np
from ..types.array import StateVectors
from ..types.state import StateMutableSequence, State
[docs]
def time_range(start_time: datetime.datetime, end_time: datetime.datetime,
timestep: datetime.timedelta = datetime.timedelta(seconds=1)) \
-> Iterable[datetime.datetime]:
"""
Produces a range of datetime object between ``start_time`` (inclusive) and ``end_time``
(inclusive)
Parameters
----------
start_time: datetime.datetime time range start (inclusive)
end_time: datetime.datetime time range end (inclusive)
timestep: datetime.timedelta default value is 1 second
Returns
-------
Generator[datetime.datetime]
"""
duration = end_time - start_time
n_time_steps = duration / timestep
for x in range(int(n_time_steps) + 1):
yield start_time + x * timestep
[docs]
def interpolate_state_mutable_sequence(sms: StateMutableSequence,
times: Union[datetime.datetime, list[datetime.datetime]],
) -> Union[StateMutableSequence, State]:
"""
This function performs linear interpolation on a :class:`~.StateMutableSequence`. The function
has two slightly different forms:
If an individual :class:`~datetime.datetime` is inputted for the variable ``times`` then a
:class:`~.State` is returned corresponding to ``times``.
If a list of :class:`~datetime.datetime` is inputted for the variable ``times`` then a
:class:`~.StateMutableSequence` is returned with the states in the sequence corresponding to
``times``.
When interpolating the previous state is used to create the interpolated state. This means
properties from that previous state are also copied but will not be interpolated
e.g. covariance.
Parameters
----------
sms: StateMutableSequence
A :class:`~.StateMutableSequence` that should be interpolated
times: Union[datetime.datetime, list[datetime.datetime]]
a time, or a list of times for ``sms`` to be interpolated to.
Returns
-------
Union[StateMutableSequence, State]
If a single time is provided then a single state is returned. If a list of times is
provided then a :class:`~.StateMutableSequence` with the same type as ``sms`` is returned
Note
----
This function does **not** extrapolate. Times outside the range of the time range of ``sms``
are discarded and warning is given. If all ``times`` values are outside the time range of
``sms`` then an ``IndexError`` is raised.
Unique states for each time are required for interpolation. If there are multiple states with
the same time in ``sms`` the later state in the sequence is used.
For :class:`~.Track` inputs the *metadatas* is removed as it can't be interpolated.
"""
# If single time is used, insert time into list and run again.
# A StateMutableSequence is produced by the inner function call.
# The single state is taken from that StateMutableSequence
if isinstance(times, datetime.datetime):
new_sms = interpolate_state_mutable_sequence(sms, [times])
return new_sms.state
# Track metadata removed and no interpolation can be performed on the metadata
new_sms = copy.copy(sms)
if hasattr(new_sms, "metadatas"):
new_sms.metadatas = list()
# This step ensure unique states for each timestamp. The last state for a timestamp is used
# with earlier states not being used.
time_state_dict = {state.timestamp: state
for state in sms}
# Filter times if required
max_state_time = sms[-1].timestamp
min_state_time = sms[0].timestamp
if max(times) > max_state_time or min(times) < min_state_time:
new_times = [time
for time in times
if min_state_time <= time <= max_state_time]
if len(new_times) == 0:
raise IndexError(f"All times are outside of the state mutable sequence's time range "
f"({min_state_time} -> {max_state_time})")
removed_times = set(times).difference(new_times)
warnings.warn(f"Trying to interpolate states which are outside the time range "
f"({min_state_time} -> {max_state_time}) of the state mutable sequence. The "
f"following times aren't included in the output {removed_times}")
times = new_times
# Find times that require interpolation
times_to_interpolate = sorted(set(times).difference(time_state_dict.keys()))
if len(times_to_interpolate) > 0:
# Only interpolate if required
state_vectors = StateVectors([state.state_vector for state in time_state_dict.values()])
# Needed for states with angles present
state_vectors = state_vectors.astype(float)
state_timestamps = [time.timestamp() for time in time_state_dict.keys()]
interp_timestamps = [time.timestamp() for time in times_to_interpolate]
interp_output = np.empty((sms.state.ndim, len(times_to_interpolate)))
for element_index in range(sms.state.ndim):
interp_output[element_index, :] = np.interp(x=interp_timestamps,
xp=state_timestamps,
fp=state_vectors[element_index, :])
retrieve_previous_state_fun = _get_previous_state(sms)
for state_index, time in enumerate(times_to_interpolate):
original_state_before = retrieve_previous_state_fun(time)
time_state_dict[time] = original_state_before.from_state(
state=original_state_before,
timestamp=time,
state_vector=interp_output[:, state_index])
new_sms.states = [time_state_dict[time] for time in times]
return new_sms
def _get_previous_state(sms: StateMutableSequence) -> Callable[[datetime.datetime], State]:
"""This function produces a function which will return the state before a time in ``sms``.
Parameters
----------
sms: StateMutableSequence
A :class:`~.StateMutableSequence` to provide the states.
Returns
-------
Function
This function takes a :class:`datetime.datetime` and will return the State before that
time. If this inner function is called multiple times, the time must not decrease.
"""
state_iter = iter(pairwise(sms.states))
state_before, state_after = next(state_iter)
def inner_fun(t: datetime.datetime) -> State:
nonlocal state_before, state_after
while state_after.timestamp < t:
state_before, state_after = next(state_iter)
return state_before
return inner_fun