from typing import Optional
import datetime
from collections.abc import Sequence, Collection
import numpy as np
from ordered_set import OrderedSet
from ..base import Property
from ..models.measurement import MeasurementModel
from ..models.transition import TransitionModel
from ..reader import GroundTruthReader
from ..types.detection import TrueDetection, Clutter
from ..types.groundtruth import GroundTruthPath, GroundTruthState
from ..types.numeric import Probability
from ..types.state import GaussianState, State
from ..types.array import StateVector
from .base import DetectionSimulator, GroundTruthSimulator
from stonesoup.buffered_generator import BufferedGenerator
[docs]
class SingleTargetGroundTruthSimulator(GroundTruthSimulator):
"""Target simulator that produces a single target"""
transition_model: TransitionModel = Property(
doc="Transition Model used as propagator for track.")
initial_state: State = Property(doc="Initial state to use to generate ground truth")
timestep: datetime.timedelta = Property(
default=datetime.timedelta(seconds=1),
doc="Time step between each state. Default one second.")
number_steps: int = Property(default=100, doc="Number of time steps to run for")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.index = 0
[docs]
@BufferedGenerator.generator_method
def groundtruth_paths_gen(self):
time = self.initial_state.timestamp or datetime.datetime.now()
gttrack = GroundTruthPath([
GroundTruthState(self.initial_state.state_vector, timestamp=time,
metadata={"index": self.index})])
yield time, {gttrack}
for _ in range(self.number_steps - 1):
time += self.timestep
# Move track forward
trans_state_vector = self.transition_model.function(
gttrack[-1], noise=True, time_interval=self.timestep)
gttrack.append(GroundTruthState(
trans_state_vector, timestamp=time,
metadata={"index": self.index}))
yield time, {gttrack}
[docs]
class SwitchOneTargetGroundTruthSimulator(SingleTargetGroundTruthSimulator):
"""Target simulator that produces a single target. This target switches
between multiple transition models based on a markov matrix
(:attr:`model_probs`)"""
transition_models: Sequence[TransitionModel] = Property(
doc="List of transition models to be used, ensure that they all have the same dimensions.")
model_probs: np.ndarray = Property(doc="A matrix of probabilities.\
The element in the ith row and the jth column is the probability of\
switching from the ith transition model in :attr:`transition_models`\
to the jth")
seed: Optional[int] = Property(default=None, doc="Seed for random number generation."
" Default None")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.seed is not None:
self.random_state = np.random.RandomState(self.seed)
else:
self.random_state = np.random.mtrand._rand
@property
def transition_model(self):
self.index = self.random_state.choice(range(0, len(self.transition_models)),
p=self.model_probs[self.index])
return self.transition_models[self.index]
[docs]
class MultiTargetGroundTruthSimulator(SingleTargetGroundTruthSimulator):
"""Target simulator that produces multiple targets.
Targets are created and destroyed randomly, as defined by the birth rate
and death probability."""
transition_model: TransitionModel = Property(
doc="Transition Model used as propagator for track.")
initial_state: GaussianState = Property(doc="Initial state to use to generate states")
birth_rate: float = Property(
default=1.0, doc="Rate at which tracks are born. Expected number of occurrences (λ) in "
"Poisson distribution. Default 1.0.")
death_probability: Probability = Property(
default=0.1, doc="Probability of track dying in each time step. Default 0.1.")
seed: Optional[int] = Property(default=None, doc="Seed for random number generation."
" Default None")
preexisting_states: Collection[StateVector] = Property(
default=list(), doc="State vectors at time 0 for "
"groundtruths which should exist at the start of simulation.")
initial_number_targets: int = Property(
default=0, doc="Initial number of targets to be "
"simulated. These simulated targets will be made in addition to those "
"defined by :attr:`preexisting_states`.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.seed is not None:
self.random_state = np.random.RandomState(self.seed)
else:
self.random_state = np.random.mtrand._rand
def _new_target(self, time, random_state, state_vector=None):
if state_vector is not None:
vector = state_vector
else:
vector = self.initial_state.state_vector + \
self.initial_state.covar @ \
random_state.randn(self.initial_state.ndim, 1)
gttrack = GroundTruthPath()
gttrack.append(GroundTruthState(
state_vector=vector,
timestamp=time,
metadata={"index": self.index})
)
return gttrack
[docs]
@BufferedGenerator.generator_method
def groundtruth_paths_gen(self, random_state=None):
time = self.initial_state.timestamp or datetime.datetime.now()
random_state = random_state if random_state is not None else self.random_state
number_steps_remaining = self.number_steps
if self.preexisting_states or self.initial_number_targets:
# Use preexisting_states to make some groundtruth paths
preexisting_paths = OrderedSet(
self._new_target(time, random_state, state) for state in self.preexisting_states)
# Simulate more groundtruth paths for the number of initial_simulated_states
initial_simulated_paths = OrderedSet(
self._new_target(time, random_state) for _ in range(self.initial_number_targets))
# Union the two sets
groundtruth_paths = preexisting_paths | initial_simulated_paths
number_steps_remaining -= 1
yield time, groundtruth_paths
time += self.timestep
else:
groundtruth_paths = OrderedSet()
for _ in range(number_steps_remaining):
# Random drop tracks
groundtruth_paths.difference_update(
gttrack
for gttrack in groundtruth_paths.copy()
if random_state.rand() <= self.death_probability)
# Move tracks forward
for gttrack in groundtruth_paths:
self.index = gttrack[-1].metadata.get("index")
trans_state_vector = self.transition_model.function(
gttrack[-1], noise=True, time_interval=self.timestep)
gttrack.append(GroundTruthState(
trans_state_vector, timestamp=time,
metadata={"index": self.index}))
# Random create
for _ in range(random_state.poisson(self.birth_rate)):
self.index = 0
gttrack = self._new_target(time, random_state)
groundtruth_paths.add(gttrack)
yield time, groundtruth_paths
time += self.timestep
[docs]
class SwitchMultiTargetGroundTruthSimulator(MultiTargetGroundTruthSimulator):
"""Functions identically to :class:`~.MultiTargetGroundTruthSimulator`,
but has the transition model switching ability from
:class:`.SwitchOneTargetGroundTruthSimulator`"""
transition_models: Sequence[TransitionModel] = Property(
doc="List of transition models to be used, ensure that they all have the same dimensions.")
model_probs: np.ndarray = Property(doc="A matrix of probabilities.\
The element in the ith row and the jth column is the probability of\
switching from the ith transition model in :attr:`transition_models`\
to the jth")
seed: Optional[int] = Property(default=None, doc="Seed for random number generation."
" Default None")
@property
def transition_model(self, random_state=None):
random_state = random_state if random_state is not None else self.random_state
self.index = random_state.choice(range(0, len(self.transition_models)),
p=self.model_probs[self.index])
return self.transition_models[self.index]
[docs]
class SimpleDetectionSimulator(DetectionSimulator):
"""A simple detection simulator.
Parameters
----------
groundtruth : GroundTruthReader
Source of ground truth tracks used to generate detections for.
measurement_model : MeasurementModel
Measurement model used in generating detections.
"""
groundtruth: GroundTruthReader = Property()
measurement_model: MeasurementModel = Property()
meas_range: np.ndarray = Property()
detection_probability: Probability = Property(default=0.9)
clutter_rate: float = Property(default=2.0)
seed: Optional[int] = Property(default=None, doc="Seed for random number generation."
" Default None")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.real_detections = set()
self.clutter_detections = set()
self.index = 0
if self.seed is not None:
self.random_state = np.random.RandomState(self.seed)
else:
self.random_state = np.random.mtrand._rand
@property
def clutter_spatial_density(self):
"""returns the clutter spatial density of the measurement space - num
clutter detections per unit volume per timestep"""
return self.clutter_rate/np.prod(np.diff(self.meas_range))
def __in_state_space(self, detection):
"""
Checks if a measurement is in the state space
"""
for dim in range(self.meas_range.ndim):
if not self.meas_range[dim][0] <= detection.state_vector[dim] \
<= self.meas_range[dim][-1]:
return False
return True
[docs]
@BufferedGenerator.generator_method
def detections_gen(self, random_state=None):
for time, tracks in self.groundtruth:
self.real_detections.clear()
self.clutter_detections.clear()
random_state = random_state if random_state is not None else self.random_state
for track in tracks:
self.index = track[-1].metadata.get("index")
if random_state.rand() < self.detection_probability:
detection = TrueDetection(
self.measurement_model.function(track[-1], noise=True),
timestamp=track[-1].timestamp,
groundtruth_path=track,
measurement_model=self.measurement_model)
detection.clutter = False
self.real_detections.add(detection)
# generate clutter
for _ in range(random_state.poisson(self.clutter_rate)):
detection = Clutter(
random_state.rand(self.measurement_model.ndim_meas, 1) *
np.diff(self.meas_range) + self.meas_range[:, :1],
timestamp=time,
measurement_model=self.measurement_model)
if self.__in_state_space(detection):
self.clutter_detections.add(detection)
yield time, self.real_detections | self.clutter_detections
[docs]
class SwitchDetectionSimulator(SimpleDetectionSimulator):
"""Functions identically as the :class:`SimpleDetectionSimulator`, but for
ground truth paths formed using multiple transition models it allows the
user to assign a detection probability to each transition models.
For example, if you wanted a higher detection probability when the
simulated object makes a turn"""
detection_probabilities: Sequence[Probability] = Property(
doc="List of probabilities that correspond to the detection probability of the simulated "
"object while undergoing each transition model")
@property
def detection_probability(self):
return self.detection_probabilities[self.index]
[docs]
class DummyGroundTruthSimulator(GroundTruthSimulator):
"""A Dummy Ground Truth Simulator which allows simulations to be built
where platform, rather than ground truth objects, motions are simulated.
It returns an empty set at each time step.
"""
times: Sequence[datetime.datetime] = Property(doc='list of times to return')
[docs]
@BufferedGenerator.generator_method
def groundtruth_paths_gen(self):
for time in self.times:
yield time, set()