Source code for stonesoup.simulator.simple

# -*- coding: utf-8 -*-
import datetime
from typing import Sequence

import numpy as np

from ..base import Property
from ..models.measurement import MeasurementModel
from ..models.transition import TransitionModel
from ..reader import GroundTruthReader
from ..types.detection import TrueDetection, Clutter
from ..types.groundtruth import GroundTruthPath, GroundTruthState
from ..types.numeric import Probability
from ..types.state import GaussianState, State
from .base import DetectionSimulator, GroundTruthSimulator
from stonesoup.buffered_generator import BufferedGenerator


[docs]class SingleTargetGroundTruthSimulator(GroundTruthSimulator): """Target simulator that produces a single target""" transition_model: TransitionModel = Property( doc="Transition Model used as propagator for track.") initial_state: State = Property(doc="Initial state to use to generate ground truth") timestep: datetime.timedelta = Property( default=datetime.timedelta(seconds=1), doc="Time step between each state. Default one second.") number_steps: int = Property(default=100, doc="Number of time steps to run for") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.index = 0
[docs] @BufferedGenerator.generator_method def groundtruth_paths_gen(self): time = self.initial_state.timestamp or datetime.datetime.now() gttrack = GroundTruthPath([ GroundTruthState(self.initial_state.state_vector, timestamp=time, metadata={"index": self.index})]) yield time, {gttrack} for _ in range(self.number_steps - 1): time += self.timestep # Move track forward trans_state_vector = self.transition_model.function( gttrack[-1], noise=True, time_interval=self.timestep) gttrack.append(GroundTruthState( trans_state_vector, timestamp=time, metadata={"index": self.index})) yield time, {gttrack}
[docs]class SwitchOneTargetGroundTruthSimulator(SingleTargetGroundTruthSimulator): """Target simulator that produces a single target. This target switches between multiple transition models based on a markov matrix (:attr:`model_probs`)""" transition_models: Sequence[TransitionModel] = Property( doc="List of transition models to be used, ensure that they all have the same dimensions.") model_probs: np.ndarray = Property(doc="A matrix of probabilities.\ The element in the ith row and the jth column is the probability of\ switching from the ith transition model in :attr:`transition_models`\ to the jth") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.index = 0 @property def transition_model(self): self.index = np.random.choice(range(0, len(self.transition_models)), p=self.model_probs[self.index]) return self.transition_models[self.index]
[docs]class MultiTargetGroundTruthSimulator(SingleTargetGroundTruthSimulator): """Target simulator that produces multiple targets. Targets are created and destroyed randomly, as defined by the birth rate and death probability.""" transition_model: TransitionModel = Property( doc="Transition Model used as propagator for track.") initial_state: GaussianState = Property(doc="Initial state to use to generate states") birth_rate: float = Property( default=1.0, doc="Rate at which tracks are born. Expected number of occurrences (λ) in " "Poisson distribution. Default 1.0.") death_probability: Probability = Property( default=0.1, doc="Probability of track dying in each time step. Default 0.1.")
[docs] @BufferedGenerator.generator_method def groundtruth_paths_gen(self): groundtruth_paths = set() time = self.initial_state.timestamp or datetime.datetime.now() for _ in range(self.number_steps): # Random drop tracks groundtruth_paths.difference_update( gttrack for gttrack in groundtruth_paths.copy() if np.random.rand() <= self.death_probability) # Move tracks forward for gttrack in groundtruth_paths: self.index = gttrack[-1].metadata.get("index") trans_state_vector = self.transition_model.function( gttrack[-1], noise=True, time_interval=self.timestep) gttrack.append(GroundTruthState( trans_state_vector, timestamp=time, metadata={"index": self.index})) # Random create for _ in range(np.random.poisson(self.birth_rate)): self.index = 0 gttrack = GroundTruthPath() gttrack.append(GroundTruthState( self.initial_state.state_vector + self.initial_state.covar @ np.random.randn(self.initial_state.ndim, 1), timestamp=time, metadata={"index": self.index})) groundtruth_paths.add(gttrack) yield time, groundtruth_paths time += self.timestep
[docs]class SwitchMultiTargetGroundTruthSimulator(MultiTargetGroundTruthSimulator): """Functions identically to :class:`~.MultiTargetGroundTruthSimulator`, but has the transition model switching ability from :class:`.SwitchOneTargetGroundTruthSimulator`""" transition_models: Sequence[TransitionModel] = Property( doc="List of transition models to be used, ensure that they all have the same dimensions.") model_probs: np.ndarray = Property(doc="A matrix of probabilities.\ The element in the ith row and the jth column is the probability of\ switching from the ith transition model in :attr:`transition_models`\ to the jth") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.index = 0 @property def transition_model(self): self.index = np.random.choice(range(0, len(self.transition_models)), p=self.model_probs[self.index]) return self.transition_models[self.index]
[docs]class SimpleDetectionSimulator(DetectionSimulator): """A simple detection simulator. Parameters ---------- groundtruth : GroundTruthReader Source of ground truth tracks used to generate detections for. measurement_model : MeasurementModel Measurement model used in generating detections. """ groundtruth: GroundTruthReader = Property() measurement_model: MeasurementModel = Property() meas_range: np.ndarray = Property() detection_probability: Probability = Property(default=0.9) clutter_rate: float = Property(default=2.0) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.real_detections = set() self.clutter_detections = set() self.index = 0 @property def clutter_spatial_density(self): """returns the clutter spatial density of the measurement space - num clutter detections per unit volume per timestep""" return self.clutter_rate/np.prod(np.diff(self.meas_range)) def __in_state_space(self, detection): """ Checks if a measurement is in the state space """ for dim in range(self.meas_range.ndim): if not self.meas_range[dim][0] <= detection.state_vector[dim] \ <= self.meas_range[dim][-1]: return False return True
[docs] @BufferedGenerator.generator_method def detections_gen(self): for time, tracks in self.groundtruth: self.real_detections.clear() self.clutter_detections.clear() for track in tracks: self.index = track[-1].metadata.get("index") if np.random.rand() < self.detection_probability: detection = TrueDetection( self.measurement_model.function(track[-1], noise=True), timestamp=track[-1].timestamp, groundtruth_path=track) detection.clutter = False self.real_detections.add(detection) # generate clutter for _ in range(np.random.poisson(self.clutter_rate)): detection = Clutter( np.random.rand(self.measurement_model.ndim_meas, 1) * np.diff(self.meas_range) + self.meas_range[:, :1], timestamp=time) if self.__in_state_space(detection): self.clutter_detections.add(detection) yield time, self.real_detections | self.clutter_detections
[docs]class SwitchDetectionSimulator(SimpleDetectionSimulator): """Functions identically as the :class:`SimpleDetectionSimulator`, but for ground truth paths formed using multiple transition models it allows the user to assign a detection probability to each transition models. For example, if you wanted a higher detection probability when the simulated object makes a turn""" detection_probabilities: Sequence[Probability] = Property( doc="List of probabilities that correspond to the detection probability of the simulated " "object while undergoing each transition model") @property def detection_probability(self): return self.detection_probabilities[self.index]
[docs]class DummyGroundTruthSimulator(GroundTruthSimulator): """A Dummy Ground Truth Simulator which allows simulations to be built where platform, rather than ground truth objects, motions are simulated. It returns an empty set at each time step. """ times: Sequence[datetime.datetime] = Property(doc='list of times to return')
[docs] @BufferedGenerator.generator_method def groundtruth_paths_gen(self): for time in self.times: yield time, set()