Source code for stonesoup.models.transition.graph

from collections import defaultdict
from typing import Optional, Union

import numpy as np
from scipy.linalg import block_diag
from scipy.stats import multivariate_normal

from stonesoup.base import Property
from stonesoup.functions.graph import normalise_re
from stonesoup.models.base import TimeVariantModel
from stonesoup.models.transition import TransitionModel
from stonesoup.models.transition.linear import LinearGaussianTransitionModel
from stonesoup.types.array import StateVector, StateVectors
from stonesoup.types.graph import RoadNetwork
from stonesoup.types.numeric import Probability
from stonesoup.types.state import State


[docs] class OptimalPathToDestinationTransitionModel(TransitionModel, TimeVariantModel): r""" Shortest path to destination transition model. A transition model that models a target travelling along the shortest path to a destination, on a given road network. The model is a generalised implementation of the transition model described in [#net]_. The state vector :math:`x_k` is assumed to take the form .. math:: x_k = \left[r_k, \cdots, e_k, d_k, s_k\right] where :math:`e_k` denotes the edge the target is currently on, :math:`r_k` is the distance travelled along the edge, :math:`d_k` is the destination node, and :math:`s_k` is the source node. The notation :math:`\cdots` denotes additional state variables thar are propagated using the selected `transition_model` along with `r_k` (e.g. velocity :math:`\dot{r}_k`). The transition model provides the ability to resample the destination node with a given `destination_resample_probability`. This is useful when the destination node is unknown and needs to be estimated (e.g. using a particle filter). The destination node is resampled from the set of possible destinations, given the current edge. To avoid searching the entire graph for possible destinations, the `possible_destinations` argument can be used to specify a list of possible destinations. References ---------- .. [#net] L. Vladimirov and S. Maskell, "A SMC Sampler for Joint Tracking and Destination Estimation from Noisy Data," 2020 IEEE 23rd International Conference on Information Fusion (FUSION), Rustenburg, South Africa, 2020, pp. 1-8, doi: 10.23919/FUSION45008.2020.9190463. """ transition_model: LinearGaussianTransitionModel = Property( doc=r"A base transition model that models the movement of the target along a given edge. " "This can be any model, with the restriction that the first state variable of the " "transition model must be the distance travelled along the edge (i.e. :math:`r`).") graph: RoadNetwork = Property( doc="The road network that the target is moving on.") destination_resample_probability: Probability = Property( default=0.1, doc="The probability of resampling the destination. This is useful when the destination " "node is unknown and needs to be estimated (e.g. using a particle filter). If None, " "or 0, then the destination is not resampled.") possible_destinations: list = Property( default=None, doc="The possible destinations that the target can travel to. Restricting the possible " "destinations can greatly speed up the destination sampling process, since the " "shortest path algorithm does not need to search the entire graph. If None, then all " "nodes in the graph are considered possible destinations.") seed: Optional[int] = Property( default=None, doc="Seed for random number generation.") @property def ndim_state(self): return self.transition_model.ndim + 3 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.seed is not None: self.random_state = np.random.RandomState(self.seed) else: self.random_state = np.random.mtrand._rand
[docs] def function(self, state, noise=False, **kwargs): num_particles = state.state_vector.shape[1] if isinstance(noise, bool) or noise is None: if noise: noise = self.rvs(num_samples=num_particles, **kwargs) else: noise = 0 # 1) CV (Position-Velocity) Propagation t_matrix = block_diag(*[self.transition_model.matrix(**kwargs), np.eye(3)]) new_state_vectors = t_matrix @ state.state_vector + noise # 2) SMC destination sampling # Get all valid destinations given the current edge if self.destination_resample_probability: edges = new_state_vectors[-3, :].astype(int) unique_edges = np.unique(edges) sources = list(np.unique(new_state_vectors[-1, :].astype(int))) s_paths = self.graph.shortest_path(sources, self.possible_destinations, path_type='edge') v_dest = defaultdict(set) for edge in unique_edges: # Filter paths that contain the edge filtered_paths = filter(lambda x: np.any(x[1] == edge), s_paths.items()) v_dest[edge] |= {dest for (_, dest), _ in filtered_paths} # Perform destination sampling resample_inds = np.flatnonzero( self.random_state.binomial(1, float(self.destination_resample_probability), num_particles) ) for i in resample_inds: dests = v_dest[edges[i]] # If no valid destinations exist for the current edge, keep the current # destination if dests: new_state_vectors[-2, i] = self.random_state.choice(list(dests)) # 3) Process edge change # The CV propagation may lead to r's which are either less that zero or more than the # length of the edge. This means that the range and edge identifier needs to be adjusted # to correctly place the particle. # Get shortcuts for faster accessing r = new_state_vectors[0, :] e = new_state_vectors[-3, :].astype(int) d = new_state_vectors[-2, :].astype(int) s = new_state_vectors[-1, :].astype(int) for i in range(num_particles): try: path = self.graph.shortest_path(s[i], d[i], path_type='edge')[(s[i], d[i])] r_i, e_i = normalise_re(r[i], e[i], path, self.graph) new_state_vectors[0, i] = r_i new_state_vectors[-3, i] = e_i except KeyError: continue return new_state_vectors
[docs] def rvs(self, num_samples: int = 1, random_state=None, **kwargs) ->\ Union[StateVector, StateVectors]: covar = self._covar(**kwargs) # If model has None-type covariance or contains None, it does not represent a Gaussian if covar is None or None in covar: raise ValueError("Cannot generate rvs from None-type covariance") random_state = random_state if random_state is not None else self.random_state noise = multivariate_normal.rvs( np.zeros(self.ndim), covar, num_samples, random_state=random_state) noise = np.atleast_2d(noise) noise = noise.T # numpy.rvs method squeezes 1-dimensional matrices to integers if num_samples == 1: return noise.view(StateVector) else: return noise.view(StateVectors)
[docs] def logpdf(self, state1: State, state2: State, **kwargs) -> Union[Probability, np.ndarray]: covar = self._covar(**kwargs) # If model has None-type covariance or contains None, it does not represent a Gaussian if covar is None or None in covar: raise ValueError("Cannot generate pdf from None-type covariance") # Calculate difference before to handle custom types (mean defaults to zero) # This is required as log pdf coverts arrays to floats likelihood = np.atleast_1d( multivariate_normal.logpdf((state1.state_vector - self.function(state2, **kwargs)).T, cov=covar, allow_singular=True)) if len(likelihood) == 1: likelihood = likelihood[0] return likelihood
[docs] def pdf(self, state1: State, state2: State, **kwargs) -> Union[Probability, np.ndarray]: return Probability.from_log_ufunc(self.logpdf(state1, state2, **kwargs))
def _covar(self, **kwargs) -> np.ndarray: """Model pseudo-covariance matrix calculation function (private) """ covar_list = [self.transition_model.covar(**kwargs), np.zeros((3, 3))] return block_diag(*covar_list)