Source code for stonesoup.models.measurement.gas

from typing import Sequence, Union

from math import sqrt
import numpy as np
from scipy.special import erf

from ...base import Property
from ...types.numeric import Probability

from ...types.array import StateVector, CovarianceMatrix, StateVectors
from ..base import GaussianModel
from .base import MeasurementModel
from ...types.state import State


[docs] class IsotropicPlume(GaussianModel, MeasurementModel): r"""This is a class implementing the isotropic plume model for approximating the resulting plume from gas release. Mathematical formulation of the algorithm can be seen in [1]_ and [2]_. The model assumes isotropic diffusivity and mean wind velocity, source strength and turbulent conditions. The model calculates the concentration level at a given location based on the provided source term. The model employs a sensing threshold for deciding if gas has been detected or if the reading is just sensor noise and turbulent conditions are accounted for using a missed detection probability. The source term if formed according to the following: .. math:: \mathbf{S} = \left[\begin{array}{c} x \\ y \\ z \\ Q \\ u \\ \phi \\ \zeta_1 \\ \zeta_2 \end{array}\right], where :math:`x, y` and :math:`z` are the source position in 3D Cartesian space, :math:`Q` is the emission rate/strength in g/s, :math:`u` is the wind speed in m/s, :math:`\phi` is the wind direction in radians, :math:`\zeta_1` is the diffusivity of the gas in the environment and :math:`\zeta_2` is the lifetime of the gas. The concentration is calculated according to .. math:: \begin{multline} \mathcal{M}(\vec{x}_k, \Theta_k) = \frac{Q}{4\pi\Vert\vec{x}_k-\vec{p}^s\Vert} \cdot\text{exp}\left[\frac{-\Vert\vec{x}_k-\vec{p}^s\Vert}{\lambda}\right]\cdot\\ \text{exp}\left[\frac{-(x_k-x)u\cos\phi}{2\zeta_1}\right] \cdot\text{exp}\left[\frac{-(y_k-y)u\sin\phi}{2\zeta_1}\right], \end{multline} where :math:`\vec{x}_k` is the position of the sensor in 3D Cartesian space (:math:`[x_k\quad y_k\quad z_k]^\intercal`), :math:`\vec{p}^s` is the source location (:math:`[x\quad y\quad z]^\intercal`) and .. math:: \lambda = \sqrt{\frac{\zeta_1\zeta_2}{1+\frac{(u^2\zeta_2)}{4\zeta_1}}}. References ---------- .. [1] Vergassola, Massima & Villermaux, Emmanuel & Shraiman, Boris I. "'Infotaxis' as a strategy for searching without gradients", Nature, vol. 445, 406-409, 2007 .. [2] Hutchinson, Michael & Liu, Cunjia & Chen, Wen-Hua, "Source term estimation of a hazardous airborne release using an unmanned aerial vehicle", Journal of Field Robotics, Vol. 36, 797-917, 2019 """ ndim_state: int = Property( default=8, doc="Number of state dimensions" ) mapping: Sequence[int] = Property( default=tuple(range(0, 8)), doc="Mapping between measurement and state dims" ) min_noise: float = Property( default=1e-4, doc="Minimum sensor noise" ) standard_deviation_percentage: float = Property( default=0.5, doc="Standard deviation as a percentage of the concentration level" ) translation_offset: StateVector = Property( default=None, doc="A 3x1 array specifying the Cartesian origin offset in terms of :math:`x,y,z` " "coordinates.") missed_detection_probability: Probability = Property( default=0.1, doc="The probability that the detection has detection has been affected by turbulence." ) sensing_threshold: float = Property( default=0.1, doc="Measurement threshold. Should be set high enough to minimise false detections." ) def __init__(self, *args, **kwargs): """ Ensure that the translation offset is initiated """ super().__init__(*args, **kwargs) # Set values to defaults if not provided if self.translation_offset is None: self.translation_offset = StateVector([0] * 3)
[docs] def covar(self, **kwargs) -> CovarianceMatrix: raise NotImplementedError('Covariance for IsotropicPlume is dependant on the ' 'measurement as well as standard deviation!')
@property def ndim_meas(self) -> int: return 1
[docs] def function(self, state: State, noise: Union[bool, np.ndarray] = False, **kwargs) -> Union[ StateVector, StateVectors]: r"""Model function :math:`h(\vec{x}_t,\vec{v}_t)` Parameters ---------- state: :class:`~.StateVector` An input source term state vector noise: :class:`numpy.ndarray` or bool An externally generated random process noise sample (the default is `False`, in which case no noise will be added if 'True', the output of :meth:`~.Model.rvs` is added). If `False`, then the model also does not consider the :attr:`sensing_threshold` and :attr:`missed_detection_probability` Returns ------- : :class:`numpy.ndarray` of shape (1, 1) The model function evaluated with the provided source term """ x, y, z, Q, u, phi, ci, cii = state.state_vector[self.mapping, :].view(np.ndarray) px, py, pz = self.translation_offset lambda_ = np.sqrt((ci * cii)/(1 + (u**2 * cii)/(4 * ci))) abs_dist = np.linalg.norm(state.state_vector[self.mapping[:3], :] - self.translation_offset, axis=0) # prevent divide by zero when converging on the source location abs_dist[abs_dist < 0.1] = 0.1 C = Q / (4 * np.pi * ci * abs_dist) * np.exp( (-(px - x) * u * np.cos(phi) / (2 * ci)) + (-(py - y) * u * np.sin(phi) / (2 * ci)) + (-1 * abs_dist / lambda_)) C = np.atleast_2d(C) if noise: C += self.rvs(state=C.view(StateVectors), num_samples=state.state_vector.shape[1], **kwargs) # measurement thresholding C[C < self.sensing_threshold] = 0 # missed detections flag = np.random.uniform(size=state.state_vector.shape[1]) \ > (1 - self.missed_detection_probability) C[:, flag] = 0 return C.view(StateVectors)
[docs] def logpdf(self, state1: State, state2: State, **kwargs) -> Union[float, np.ndarray]: r"""Model log pdf/likelihood evaluation function Evaluates the log pdf/likelihood of ``state1``, given the state ``state2`` which is passed to :meth:`function()`. This function implements the likelihood functions from :meth:`~.pdf` that have been converted to the log space. Parameters ---------- state1 : :class:`~.State` state2 : :class:`~.State` Returns ------- : float or :class:`~.numpy.ndarray` The log likelihood of ``state1``, given ``state2`` """ p_m = self.missed_detection_probability nd_sigma = self.sensing_threshold pred_meas = self.function(state2, **kwargs) if state1.state_vector[0] <= self.sensing_threshold: pdf = p_m + ((1-p_m) * 1/2 * (1+erf((self.sensing_threshold - pred_meas) / (nd_sigma * sqrt(2))))) likelihood = np.atleast_1d(np.log(pdf)).view(np.ndarray) else: d_sigma = self.standard_deviation_percentage * pred_meas + self.min_noise with np.errstate(divide="ignore"): likelihood = np.atleast_1d(np.log(1/(d_sigma*np.sqrt(2*np.pi)) * np.exp((-(state1.state_vector-pred_meas) ** 2)/(2*d_sigma**2)))).view(np.ndarray) if len(likelihood) == 1: likelihood = likelihood[0] return likelihood
[docs] def pdf(self, state1: State, state2: State, **kwargs) -> Union[Probability, np.ndarray]: r"""Model pdf/likelihood evaluation function Evaluates the pdf/likelihood of ``state1``, given the state ``state2`` which is passed to :meth:`function()`. This function implements the following likelihood function, adapted from (12) in [2]_, removing the background sensor noise term. .. math:: p(z_k|\Theta_k) = \frac{1}{\sigma_d\sqrt{2\pi}}\text{exp} \left(-\frac{(z_k-\hat{z}_k)^2}{2\sigma_d}\right), where :math:`z_k` = ``state1``, :math:`\Theta_k` = ``state2``, :math:`\hat{z}_k` = :meth:`~.Model.function` on ``state2`` and :math:`\sigma_d` is the measurement standard deviation assuming the measurement arose from a true gas detection. This is given by .. math:: \sigma_d = \sigma_{\text{percentage}} \cdot \hat{z} + \nu_{\text{min}}, where :math:`\sigma_{\text{percentage}}` = :attr:`standard_deviation_percentage` and :math:`\nu_{\text{min}}` = :attr:`noise`. In the event that a measurement is below the sensor threshold or missed, a different likelihood function is used. This is given by .. math:: p(z_k|\Theta_k) = (P_m) + \left((1-P_m)\cdot\frac{1}{2}\left[1+\text{erf} \left(\frac{z_{\text{thr}}-\hat{z}_k}{\sigma_m\sqrt{2}}\right)\right]\right), where :math:`P_m` = :attr:`missed_detection_probability`, :math:`\sigma_m` is the missed detection standard deviation which is implemented as equal to :attr:`sensing_threshold` and :math:`\text{erf}()` is the error function. Parameters ---------- state1 : :class:`~.State` state2 : :class:`~.State` Returns ------- : :class:`~.Probability` The likelihood of ``state1``, given ``state2`` """ return super().pdf(state1, state2, **kwargs)
[docs] def rvs(self, state: Union[StateVector, StateVectors], num_samples: int = 1, random_state=None, **kwargs) -> Union[StateVector, StateVectors]: r"""Model noise/sample generation function Generates noise samples from the model. For this noise, the magnitude of sensor noise depends on the measurement. Thus, the noise term is given by .. math:: \nu_k = \mathcal{N}\left(0,(\sigma_{\text{percentage}}\cdot z_k)^2\right). Parameters ---------- state: :class:`~.StateVector` or :class:`~.StateVectors` The measured state (concentration for this model) used to scale the noise term. num_samples: scalar, optional The number of samples to be generated (the default is 1). Returns ------- noise : 2-D array of shape (:attr:`ndim`, ``num_samples``) A set of Np samples, generated from the model's noise distribution. """ random_state = random_state if random_state is not None else self.random_state generator = np.random.RandomState(random_state) noise = generator.normal(np.zeros(self.ndim_meas), np.ravel(state*self.standard_deviation_percentage), num_samples) noise = np.atleast_2d(noise) if num_samples == 1: return noise.view(StateVector) else: return noise.view(StateVectors)