PK yzXH; ; Classifying_Using_HMM.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Classification Using Hidden Markov Model\nThis is a demonstration using the implemented forward algorithm in the context of a hidden Markov\nmodel to classify multiple targets.\n\nWe will attempt to classify 3 targets in an undefined region.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"All Stone Soup imports will be given in order of usage.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\n\nimport numpy as np\n\nfrom stonesoup.models.transition.categorical import MarkovianTransitionModel\nfrom stonesoup.types.groundtruth import CategoricalGroundTruthState\nfrom stonesoup.types.groundtruth import GroundTruthPath"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Ground Truth\nThe targets may take one of two discrete hidden classes: 'bike', and 'car'.\nA target may be able to transition from one class to another (this could be considered as a\nperson switching from riding a bike to driving a car and vice versa).\nThis behaviour will be modelled in the transition matrix of the\n:class:`~.MarkovianTransitionModel`. This transition matrix is a Markov process matrix, whereby\nit is assumed that the state of a target is wholly dependent on its previous state, and nothing\nelse.\n\nA :class:`~.CategoricalState` class is used to store information on the classification/category\nof the targets. The state vector will define a categorical distribution over the 2 possible\nclasses, whereby each component defines the probability that a target is of the corresponding\nclass. For example, the state vector (0.2, 0.8), with category names ('bike', 'car')\nindicates that a target has a 20% probability of being class 'bike' and an 80% probability of\nbeing class 'car' etc.\nIt does not make sense to have a true target being a distribution over the possible classes, and\ntherefore the true categorical states will have binary state vectors indicating a specific class\n(i.e. a '1' at one state vector index, and '0's elsewhere). This can be considered as stating\nthere is a 100% probability that the target is of a particular class. We specify that there\nshould be noise when functioning our transition model in order to sample the resultant\ndistribution and receive this binary vector.\nThe :class:`~.CategoricalGroundTruthState` class inherits directly from the base\n:class:`~.CategoricalState` class.\n\nThe category and timings for one of the ground truth paths will be printed.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_matrix = np.array([[0.8, 0.2], # P(bike | bike), P(bike | car)\n [0.4, 0.6]]) # P(car | bike), P(car | car)\ncategory_transition = MarkovianTransitionModel(transition_matrix=transition_matrix)\n\nstart = datetime.now()\n\nhidden_classes = ['bike', 'car']\n\n# Generating ground truth\nground_truths = list()\nfor i in range(1, 4): # 4 targets\n state_vector = np.zeros(2) # create a vector with 2 zeroes\n state_vector[np.random.choice(2, 1, p=[1 / 2, 1 / 2])] = 1 # pick a random class out of the 2\n ground_truth_state = CategoricalGroundTruthState(state_vector,\n timestamp=start,\n categories=hidden_classes)\n\n ground_truth = GroundTruthPath([ground_truth_state], id=f\"GT{i}\")\n\n for _ in range(10):\n new_vector = category_transition.function(ground_truth[-1],\n noise=True,\n time_interval=timedelta(seconds=1))\n new_state = CategoricalGroundTruthState(\n new_vector,\n timestamp=ground_truth[-1].timestamp + timedelta(seconds=1),\n categories=hidden_classes\n )\n\n ground_truth.append(new_state)\n ground_truths.append(ground_truth)\n\nfor states in np.vstack(ground_truths).T:\n print(f\"{states[0].timestamp:%H:%M:%S}\", end=\"\")\n for state in states:\n print(f\" -- {state.category}\", end=\"\")\n print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Measurement\nUsing a Hidden Markov model, it is assumed the true class of a target cannot be directly\nobserved (hence 'hidden'), and instead observations that are dependent on this class are taken.\nIn this instance, observations of the targets' sizes are taken ('small', 'medium' or 'large').\nThe relationship between true class and observed size is modelled by the `emission matrix` of the\n:class:`~.MarkovianMeasurementModel`, which is used by the :class:`~.HMMSensor` to\nprovide :class:`~.CategoricalDetection` types.\nWe will model this such that a 'bike' has a very small chance of being observed as a 'big'\ntarget etc.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.measurement.categorical import MarkovianMeasurementModel\nfrom stonesoup.sensor.categorical import HMMSensor\n\nE = np.array([[0.8, 0.1], # P(small | bike), P(small | car)\n [0.19, 0.3], # P(medium | bike), P(medium | car)\n [0.01, 0.6]]) # P(large | bike), P(large | car)\n\nmodel = MarkovianMeasurementModel(emission_matrix=E,\n measurement_categories=['small', 'medium', 'large'])\n\neo = HMMSensor(measurement_model=model)\n\n# Generating measurements\nmeasurements = list()\nfor index, states in enumerate(np.vstack(ground_truths).T):\n if index == 5:\n measurements_at_time = set() # Give tracker chance to use prediction instead\n else:\n measurements_at_time = eo.measure(states)\n timestamp = next(iter(states)).timestamp\n measurements.append((timestamp, measurements_at_time))\n\n print(f\"{timestamp:%H:%M:%S} -- {[meas.category for meas in measurements_at_time]}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tracking Components\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Predictor\nA :class:`~.HMMPredictor` specifically uses :class:`~.MarkovianTransitionModel` types to\npredict.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.categorical import HMMPredictor\n\n# It would be cheating to use the same transition model as in ground truth generation!\ntransition_matrix = np.array([[0.81, 0.19], # P(bike | bike), P(bike | car)\n [0.39, 0.61]]) # P(car | bike), P(car | car)\ncategory_transition = MarkovianTransitionModel(transition_matrix=transition_matrix)\n\npredictor = HMMPredictor(category_transition)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Updater\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.categorical import HMMUpdater\n\nupdater = HMMUpdater()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Hypothesiser\nA :class:`~.HMMHypothesiser` is used for calculating categorical hypotheses.\nIt utilises the :class:`~.ObservationAccuracy` measure: a multi-dimensional extension of an\n'accuracy' score, essentially providing a measure of the similarity between two categorical\ndistributions.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.categorical import HMMHypothesiser\n\nhypothesiser = HMMHypothesiser(predictor=predictor, updater=updater)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Associator\nWe will use a standard :class:`~.GNNWith2DAssignment` data associator.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.dataassociator.neighbour import GNNWith2DAssignment\n\ndata_associator = GNNWith2DAssignment(hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Prior\nAs we are tracking in a categorical state space, we should initiate with a categorical state for\nthe prior. Equal probability is given to all 3 of the possible hidden classes that a target\nmight take (the category names are also provided here).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.types.state import CategoricalState\n\nprior = CategoricalState([1 / 2, 1 / 2], categories=hidden_classes)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initiator\nFor each unassociated detection, a new track will be initiated. In this instance we use a\n:class:`~.SimpleCategoricalMeasurementInitiator`, which specifically handles categorical state\npriors.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.initiator.categorical import SimpleCategoricalMeasurementInitiator\n\ninitiator = SimpleCategoricalMeasurementInitiator(prior_state=prior, updater=updater)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deleter\nWe can use a standard :class:`~.UpdateTimeStepsDeleter`.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.deleter.time import UpdateTimeStepsDeleter\n\ndeleter = UpdateTimeStepsDeleter(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tracker\nWe can use a standard :class:`~.MultiTargetTracker`.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.tracker.simple import MultiTargetTracker\n\ntracker = MultiTargetTracker(initiator, deleter, measurements, data_associator, updater)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tracking\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tracks = set()\nfor time, ctracks in tracker:\n tracks.update(ctracks)\n\nprint(f\"Number of tracks: {len(tracks)}\")\nfor track in tracks:\n certainty = track.state_vector[np.argmax(track.state_vector)][0] * 100\n print(f\"id: {track.id} -- category: {track.category} -- certainty: {certainty}%\")\n for state in track:\n _time = state.timestamp.strftime('%H:%M')\n _type = str(type(state)).replace(\"class 'stonesoup.types.\", \"\").strip(\"<>'. \")\n state_string = f\"{_time} -- {_type} -- {state.category}\"\n try:\n meas_string = f\"associated measurement: {state.hypothesis.measurement.category}\"\n except AttributeError:\n pass\n else:\n state_string += f\" -- {meas_string}\"\n print(state_string)\n print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Metric\nDetermining tracking accuracy.\nIn calculating how many targets were classified correctly, only tracks with the highest\nclassification certainty are considered. In the situation where probabilities are equal, a\nrandom classification is chosen.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"excess_tracks = len(tracks) - len(ground_truths) # target value = 0\nsorted_tracks = sorted(tracks,\n key=lambda track: track.state_vector[np.argmax(track.state_vector)][0],\n reverse=True)\nbest_tracks = sorted_tracks[:3]\ntrue_classifications = [ground_truth.category for ground_truth in ground_truths]\ntrack_classifications = [track.category for track in best_tracks]\n\nnum_correct_classifications = 0 # target value = num ground truths\nfor true_classification in true_classifications:\n for i in range(len(track_classifications)):\n if track_classifications[i] == true_classification:\n num_correct_classifications += 1\n del track_classifications[i]\n break\n\nprint(f\"Excess tracks: {excess_tracks}\")\nprint(f\"No. correct classifications: {num_correct_classifications}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Plotting\nPlotting the probability that each one of our targets and tracks is a 'bike' will help to\nvisualise this 2-hidden class problem.\n\nDotted lines indicate ground truth probabilities, and solid lines for tracks.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n\n\ndef plot(path, style):\n times = list()\n probs = list()\n for state in path:\n times.append(state.timestamp)\n probs.append(state.state_vector[0])\n plt.plot(times, probs, linestyle=style)\n\n\nfor truth in ground_truths:\n plot(truth, '--')\nfor track in tracks:\n plot(track, '-')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK zzX {P P Track_Stitcher_Example.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Track Stitching Example\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\nTrack Stitching considers a set of broken fragments of track (which we call tracklets), and aims\nto identify which fragments should be stitched (joined) together to form one track. This is done\nby considering the state of a tracked object and predicting its state at a future (or past) time.\nThis example generates a set of ``tracklets`` and applies track stitching to them. The figure\nbelow visualises the aim of track stitching: taking a set of tracklets (left, black) and\nproducing a set of tracks (right, blue/red).\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitching Method\nIn a scenario where there are many sections of tracks that are all disconnected from each other,\nwe aim to stitch the track sections together into full tracks. We can use the known states of\ntracklets at known times to predict where the tracked object would be at a different time.\nWe can use this information to associate tracklets with each other using the following methods:\n\n### Predicting forward\nFor a given track section, consider the state at the end-point of the track $x$ at\nthe time $k$ that the observation was made. We use the state of the object to predict its\nstate at time $k + \\delta k$. If the state at the start point of\nanother track section falls within an acceptable range of this prediction, we associate the\ntracks and stitch them together. This method is used in the function :func:`forward_predict`.\n\n### Predicting backward\nSimilarly to predicting forward, we can consider the state at the start point of a track section\nat time $k$ and backwards-predict the state to time $k - \\delta k$. We can then\nassociate and stitch tracks together as before. This method is used in the function\n:func:`backward_predict`.\n\n### Using both predictions\nWe can use both methods simultaneously to calculate the probability that two track sections are\npart of the same track. The track stitcher in this example uses the :class:`~.KalmanPredictor`\nto make predictions about which tracklets should be stitched into the same track.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import Modules\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime, timedelta\nimport numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scenario Generation\nSet Variables for Scenario Generation\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nThe code below contains parameters used to generate input truth paths.\n\n``number_of_targets`` is the total number of truth paths generated in the initial simulation.\n\nThe starting location of each truth path is defined in the region (-``range_value``,\n``range_value``) in all dimensions.\n\nEach truth object is split into a number of segments chosen randomly from the range\n(1, ``max_segments``).\n\nThe start time of each truth path is bounded between $t$ = 0 and $t$ =\n``max_track_start``.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"start_time = datetime.now().replace(second=0, microsecond=0)\nnp.random.seed(100)\n\nnumber_of_targets = 10\nrange_value = 10000\nmax_segments = 10\nmax_segment_length = 125\nmin_segment_length = 60\nmax_disjoint_length = 250\nmin_disjoint_length = 125\nmax_track_start = 125\nn_spacial_dimensions = 3\nmeasurement_noise = 100\n\n# Set transition model:\n# ConstantVelocity = CV\n# KnownTurnRate = KTR\n\nTM = \"CV\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transition and Measurement Models\nThe code below sets transition and measurement models. It also checks that sets of track data are\nempty before the scenario is generated.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity, KnownTurnRate\nfrom stonesoup.models.measurement.linear import LinearGaussian\n\n# Check all sets are empty\ntruths = set()\ntruthlets = set()\ntracklets = set()\nall_tracks = set()\n\n# Set transition model\nif TM == \"CV\":\n transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(1)] *\n n_spacial_dimensions, seed=12)\nelif TM == \"KTR\":\n transition_model = KnownTurnRate(turn_rate=np.radians(0.5), turn_noise_diff_coeffs=(0.1, 0.1))\n if n_spacial_dimensions != 2:\n print(\"KnownTurnRate model only works for 2 dimensions. Changing from {} \"\n \"dimensions to 2D.\".format(n_spacial_dimensions))\n n_spacial_dimensions = 2\nelse:\n raise TypeError(\"Must assign 'CV' or 'KTR' to TM\")\n\n# Variable calculations for measurement model\nmeasurement_cov_array = np.zeros((n_spacial_dimensions, n_spacial_dimensions), int)\nnp.fill_diagonal(measurement_cov_array, measurement_noise)\n\n# Set measurement model\nmeasurement_model = LinearGaussian(ndim_state=2 * n_spacial_dimensions,\n mapping=list(range(0, 2 * n_spacial_dimensions, 2)),\n noise_covar=measurement_cov_array)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate ground truths and truthlets\nHere we generate a set of ground truths. We then break the truths into alternating sections of\ntruthlets (sections of 'known' state data) and disjoint sections (sections of no data). Note that\nno 'truth' data is used in track stitching - in this tutorial it is only used for generating\ntracklets and for evaluation of track stitching results.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import OrnsteinUhlenbeck\nfrom stonesoup.predictor.kalman import KalmanPredictor\nfrom stonesoup.updater.kalman import KalmanUpdater\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nfrom stonesoup.dataassociator.neighbour import GNNWith2DAssignment\nfrom stonesoup.deleter.error import CovarianceBasedDeleter\nfrom stonesoup.deleter.multi import CompositeDeleter\nfrom stonesoup.deleter.time import UpdateTimeStepsDeleter\nfrom stonesoup.initiator.simple import SimpleMeasurementInitiator\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.types.state import GaussianState\n\n# Parameters for tracker\npredictor = KalmanPredictor(transition_model)\nupdater = KalmanUpdater(measurement_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=30)\ndata_associator = GNNWith2DAssignment(hypothesiser)\ndeleter = CompositeDeleter([UpdateTimeStepsDeleter(50), CovarianceBasedDeleter(5000)])\ninitiator = SimpleMeasurementInitiator(\n prior_state=GaussianState(np.zeros((2 * n_spacial_dimensions, 1), int),\n np.diag([1, 0] * n_spacial_dimensions)),\n measurement_model=measurement_model)\nstate_vector = [np.random.uniform(-range_value, range_value, 1),\n np.random.uniform(-2, 2, 1)] * n_spacial_dimensions\n\n# Calculate start and end points for truthlets given the starting conditions\nfor i in range(number_of_targets):\n # Sets number of segments from range of random numbers\n number_of_segments = int(np.random.choice(range(1, max_segments), 1))\n\n # Set length of first truthlet segment\n truthlet0_length = np.random.choice(range(max_track_start), 1)\n\n # Set lengths of each of the truthlet segments\n truthlet_lengths = np.random.choice(range(min_segment_length, max_segment_length),\n number_of_segments)\n\n # Set lengths of each disjoint section\n disjoint_lengths = np.random.choice(range(min_disjoint_length, max_disjoint_length),\n number_of_segments)\n\n # Sum pairs of truthlets and disjoints, and set the start-point of the truth path\n segment_pair_lengths = np.insert(truthlet_lengths + disjoint_lengths, 0, truthlet0_length,\n axis=0)\n\n # Cumulative sum of segments, giving the start point of each truth segment\n truthlet_startpoints = np.cumsum(segment_pair_lengths)\n\n # Sum truth segments length to start point, giving end point for each segment\n truthlet_endpoints = truthlet_startpoints + np.append(truthlet_lengths, 0)\n\n # Set start and end points for each segment\n starts = truthlet_startpoints[:number_of_segments]\n stops = truthlet_endpoints[:number_of_segments]\n truth = GroundTruthPath([GroundTruthState(state_vector, timestamp=start_time)],\n id=i)\n for k in range(1, np.max(stops)):\n truth.append(GroundTruthState(\n transition_model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)),\n timestamp=truth[k - 1].timestamp + timedelta(seconds=1)))\n for j in range(number_of_segments):\n truthlet = GroundTruthPath(truth[starts[j]:stops[j]], id=str(\"G::\" + str(truth.id) +\n \"::S::\" + str(j) + \"::\"))\n truthlets.add(truthlet)\n truths.add(truth)\n\nprint(number_of_targets, \" targets required.\")\nprint(len(truths), \" truths have been generated.\")\nprint(len(truthlets), \" truthlets have been generated.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Generate a tracklet from each truthlet\nWe introduce measurement noise (as set in variables section) and generate a set of tracklets from\nthe set of truthlets.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.tracker.simple import MultiTargetTracker\nfrom stonesoup.types.detection import TrueDetection\n\n# Generate tracklets from truthlets calculated above\nfor n, truthlet in enumerate(truthlets):\n measurementlet = []\n for state in truthlet:\n m = measurement_model.function(state, noise=True)\n m0 = TrueDetection(m,\n timestamp=state.timestamp,\n measurement_model=measurement_model,\n groundtruth_path=truthlet)\n measurementlet.append((state.timestamp, {m0}))\n tracklet = MultiTargetTracker(initiator, deleter, measurementlet, data_associator, updater)\n for _, t in tracklet:\n all_tracks |= t\n\nprint(len(all_tracks), \" tracklets have been produced.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Plot the set of tracklets\nThe following plots present the generated tracks and the underlying ground truths.\nA 2D graph is plotted for each 2D plane in the N-D space.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter, Dimension\n\n# Plot graph for each 2D face in n-dimensional space\ndimensions_list = list(range(0, 2 * n_spacial_dimensions, 2))\ndim_pairs = [(a, b) for idx, a in enumerate(dimensions_list) for b in dimensions_list[idx + 1:]]\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(all_tracks, list(pair))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Plot 3D graph if working in 3-dimensional space\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(all_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Track Stitcher Class\nThe cell below contains the track stitcher class. The functions :func:`forward_predict` and\n:func:`backward_predict` perform the forward and backward predictions respectively (as noted\nabove). If using forwards and backwards stitching, predictions from both methods are merged\ntogether. They calculate which pairs of tracks could possibly be stitched together. The function\n:func:`stitch` uses :func:`forward_predict` and :func:`backward_predict` to pair and 'stitch'\ntrack sections together.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.stitcher import TrackStitcher"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Applying the Track Stitcher\nNow that we have a set of tracklets, we can apply the Track Stitching method to stitch tracklets\ntogether into tracks. The code in the following cell applies this process using the\n:class:`~.TrackStitcher` and plots the stitched tracks. :class:`~.TrackStitcher` has a property\n'search_window' that reduces computation time by filtering out track segments that are outside a\ndefined time window. When forward stitching, the associator will consider any track that has a\nstart point that falls within the time window $(t, t + search\\_window)$. When backward\nstitching, the associator will consider tracks that have an endpoint within the time window\n$(t - search\\_window, t)$.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model = CombinedLinearGaussianTransitionModel([OrnsteinUhlenbeck(0.001, 2e-2)] *\n n_spacial_dimensions, seed=12)\n\npredictor = KalmanPredictor(transition_model)\nhypothesiser = DistanceHypothesiser(predictor, updater, Mahalanobis(), missed_distance=300)\nstitcher = TrackStitcher(forward_hypothesiser=hypothesiser, search_window=timedelta(seconds=500))\n\nstitched_tracks, _ = stitcher.stitch(all_tracks, start_time)\n\nfor pair in dim_pairs:\n plotter = Plotter()\n plotter.plot_ground_truths(truths, list(pair))\n plotter.plot_tracks(stitched_tracks, list(pair))\n\nif n_spacial_dimensions == 3:\n plotter = Plotter(Dimension.THREE)\n plotter.plot_ground_truths(truths, [0, 2, 4])\n plotter.plot_tracks(stitched_tracks, [0, 2, 4])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Applying Metrics\nNow tracklets are stitched into tracks, we can compare the tracks to the ground truths. This can\nbe done by using SIAP metrics as well as a custom metric specialized for track stitching.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### % of tracklets stitched to the correct previous tracklet\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def stitcher_correctness(stitchedtracks):\n\n stitchedtracks = list(stitchedtracks)\n total, count = 0, 0\n\n for track in stitchedtracks: # loop through all stitched tracks\n\n for j, state in enumerate(track): # for every state of a stitched track\n\n if j == len(track) - 1:\n continue\n id1 = [int(s) for s in state.hypothesis.measurement.groundtruth_path.id.split('::')\n if s.isdigit()]\n id2 = [int(s) for s in\n track[j + 1].hypothesis.measurement.groundtruth_path.id.split('::') if\n s.isdigit()]\n if id1 != id2:\n total += 1\n if id1[0] == id2[0] and id1[1] == (id2[1] - 1):\n count += 1\n return count / total * 100\n\n\nprint(\"Tracklets stitched correctly: \", stitcher_correctness(stitched_tracks), \"%\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### SIAP Metrics\nThe following cell calculates and records a range of SIAP (Single Integrated Air Picture) metrics\nto assess the accuracy of the stitcher. The value of math:`association_threshold` should be\nadjusted to represent the acceptable distance for association for the scenario that is being\nconsidered. For example, associating with a threshold of 50 metres may be acceptable if tracking a\nlarge ship, but not so useful for tracking biological cell movement.\n\nSIAP Ambiguity: Important as a value not equal to 1 suggests that the stitcher is not stitching\nwhole tracks together, or stitching multiple tracks into one.\n\nSIAP Completeness: Not a valuable metric for track stitching evaluation as we are only tracking\nfractions of the true objects - metric value is scaled by the ratio of truthlets to\ndisjoint sections.\n\nSIAP Rate of Track Number Change: Important metric for assessing track stitching. Any value above\nzero is showing that tracklets are being incorrectly stitched to tracklets from different truth\npaths.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.measures import Euclidean\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.dataassociator.tracktotrack import TrackToTruth\nfrom stonesoup.metricgenerator.manager import MultiManager\nfrom stonesoup.metricgenerator.metrictables import SIAPTableGenerator\n\nsiap_generator = SIAPMetrics(position_measure=Euclidean((0, 2)),\n velocity_measure=Euclidean((1, 3)),\n generator_name='SIAPs',\n tracks_key='tracks',\n truths_key='truths'\n )\n\nassociator = TrackToTruth(association_threshold=30)\n\n# create metric manager and add tracks and truths data to it\nmetric_manager = MultiManager([siap_generator],\n associator=associator)\nmetric_manager.add_data({'truths': truths, 'tracks': set(all_tracks)})\n\n# generate metrics and extract SIAP averages to display in SIAP table\nmetrics = metric_manager.generate_metrics()\nsiap_metrics = metrics['SIAPs']\nsiap_averages = {siap_metrics.get(metric) for metric in siap_metrics\n if metric.startswith(\"SIAP\") and not metric.endswith(\" at times\")}\n\n_ = SIAPTableGenerator(siap_averages).compute_metric()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK yzX+wƈ ƈ Track2Track_Fusion_Example.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Multi-Sensor Fusion: Covariance Intersection Using Tracks as Measurements\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Background\nThe Covariance Intersection Algorithm from Julier and Uhlmann [#]_ is a popular algorithm for\ntrack-to-track fusion in target tracking systems. This approach is highly appealing due to its\nrobustness, simple structure, and applicability to any tracking system that uses Gaussians as the\nbasis for tracking. Generalisations to non-Gaussian systems have been proposed based on the\nexponential mixture density structure of the algorithm. The approach is based on a simple rule\ncalled the Chernoff Fusion Rule. However, due to the non-Bayesian formulation of the rule, it\ncannot be integrated straightforwardly into multi-target tracking algorithms which are based on\nBayesian formulations.\n\nA new Bayesian formulation for covariance intersection was recently proposed which allows for the\nintegration of the approach into multi-target tracking algorithms. [#]_ The new formulation\nrecasts the fusion rule as a Bayesian update rule that calculates a normalisation constant which\nenables integration into different multi-target tracking algorithms.\n\nIn this example we demonstrate the approach with different multi-target trackers in a\nmulti-platform scenario where the sensors output estimated target tracks instead of raw\nmeasurements. In real life situations, such sensors make multi-target tracking more accessible to\nnew researchers because the researchers don't have to know about or implement target filtering\nand/or tracking algorithms on their own. However, when there are multiple sensors measuring the\nsame target space and they all produce estimated tracks, as demonstrated in this example, it is\nnot immediately clear how to combine this information into a single set of tracks. This is where\ncovariance intersection comes in.\n\nThe concept of covariance intersection relies on the aforementioned Chernoff fusion rule [#]_ :\n\n\\begin{align}p_{\\omega}(x_{k}) = \\frac{p_{1}(x_{k})^{\\omega}p_{2}(x_{k})^{1-\\omega}}{\\int p_{1}(x)^{\\omega}p_{2}(x)^{1-\\omega}dx}\\end{align}\n\n\nIn situations where $p_1(x)$ and $p_2(x)$ are multivariate Gaussian distributions,\nthis formula is equal to the Covariance Intersection Algorithm from Julier and Uhlmann. In the\nCovariance Intersection Algorithm, the weighting parameter, $\\omega \\in [0, 1]$ is chosen\nusing an optimization algorithm. In this example, we have set it to $0.5$ for simplicity.\n\nWe also introduce the following identity. Given two Gaussians, $N(x ; a, A)$ and\n$N(x ; b, B)$ with the same dimension, we have:\n\n\\begin{align}\\left({\\frac{\\mathcal{N}(x ; a, A)}{\\mathcal{N}(x ; a, A)}}\\right)^\\omega \\mathcal{N}(x ; b, B) \\propto \\mathcal{N}(a ; b, V) \\mathcal{N}(x ; d, D)\\end{align}\n\n\nwhere\n\n\\begin{align}D &= \\left(\\omega A^{-1}+(1-\\omega) B^{-1}\\right)^{-1} \\\\\n d &= D\\left(\\omega A^{-1} {a}+(1-\\omega ) B^{-1} {b}\\right) \\\\\n V &= A/(1-\\omega )+ B/\\omega\\end{align}\n\n\nThis example considers the Gaussian mixture probability hypothesis density (GM-PHD) algorithm\nas the tracker for the track-to-track fusion. The following table shows the formulas used in the\nregular GM-PHD, and the GM-PHD covariance intersector algorithm.\n\n\n mean, updated covariance, innovation, Kalman gain, and innovation covariance in the\n GM-PHD and the GM-PHD Covariance Intersector algorithm.\n\n\nThe specifics for implementing the Covariance Intersection Algorithm in several popular\nmulti-target tracking algorithms was expanded upon recently by Clark et al [#]_. The work\nincludes a discussion of Stone Soup and and is used as the basis for this example.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The rest of this example will continue as follows:\n * Create a simulator for the ground truth\n * Create 2 radar simulators, one on the ground and one that is airborne\n * Make a JPDA tracker for the first radar, and a Gaussian mixture linear complexity with\n cumulants (GM-LCC) tracker for the second. These will mimic the situation where the radar\n sensors outputs tracks instead of raw measurements.\n * Create a GM-PHD tracker that will perform measurement fusion, using all measurements from\n both radars. This is created to compare with the covariance intersection method.\n * Create a GM-PHD tracker that will perform track fusion via covariance intersection using\n the :class:`ChernoffUpdater` class.\n * Create a metric manager to generate metrics for each of the four trackers\n * Set up the detection feeders. The track fusion tracker will also use the\n :class:`Tracks2GaussianDetectionFeeder` class.\n * Run the simulation\n * Plot the resulting tracks and the metrics over time\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from copy import deepcopy\nimport numpy as np\nfrom datetime import datetime\n\nstart_time = datetime.now()\nnum_steps = 50"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1: Create a Ground Truth Simulator\nWe will simulate the paths of two targets using the :class:`~.MultiTargetGroundTruthSimulator`.\nWe can dictate the starting states of the two targets using the `preexisting_states` parameter.\nThe targets start at [-100, -200, 500] and [0, 300, 500] respectively. Their initial velocities\nare [4, 0.5, 0] and [5, -0.5, 0] and they move according to a constant velocity transition model\nwith noise.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel,\\\n ConstantVelocity\ntruth_transition_model = CombinedLinearGaussianTransitionModel(\n (ConstantVelocity(0.5), ConstantVelocity(0.5), ConstantVelocity(0.5)))\n\nfrom stonesoup.simulator.simple import MultiTargetGroundTruthSimulator\nfrom stonesoup.types.state import GaussianState\ngt_simulator = MultiTargetGroundTruthSimulator(\n transition_model=truth_transition_model,\n initial_state=GaussianState([0, 0, 0, 0, 500, 0], np.diag([100, 1, 100, 1, 100, 1]),\n timestamp=start_time),\n birth_rate=0,\n death_probability=0,\n number_steps=num_steps,\n preexisting_states=[[-100, 4, -200, 0.5, 500, 0], [0, 5, 300, -0.5, 500, 0]]\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2: Create Two Radars and a Detection Simulation\nThe two radars can share the same clutter model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.clutter.clutter import ClutterModel\nclutter_model = ClutterModel(\n clutter_rate=2.0,\n distribution=np.random.default_rng().uniform,\n dist_params=((-600.0, 600.0), (-600.0, 600.0), (250.0, 750.0))\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The first radar will be airborne, at an altitude of approximately 3000 m. It makes detections\nwith an elevation, bearing, and range measurement model. By setting the `max_range` to 3500, we\ncan ensure that it does not make detections of the other radar (which will be far away on the\nground). We will later do a similar thing with the second radar. This mimics a real-life scenario\nwhere each radar is outside the field-of-view of the other.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.sensor.radar.radar import RadarElevationBearingRange\nfrom stonesoup.types.array import CovarianceMatrix\nfrom stonesoup.types.array import StateVector\nfrom stonesoup.platform.base import MovingPlatform\nfrom stonesoup.types.state import State\n\nradar1 = RadarElevationBearingRange(\n ndim_state=6,\n position_mapping=(0, 2, 4),\n noise_covar=CovarianceMatrix(np.diag([np.deg2rad(0.005), np.deg2rad(0.005), 0.05])),\n mounting_offset=StateVector([10, 0, 0]),\n clutter_model=clutter_model,\n max_range=3500\n)\n\n# Mount the radar onto a moving platform. The platform starts at [-250, 50, 3000]\n# with velocity [1, 5, 0] and moves according to a constant velocity model with low noise\nsensor1_initial_loc = StateVector([[-250], [1], [50], [5], [3000], [0]])\ninitial_state = State(sensor1_initial_loc, start_time)\nsensor1_transition_model = CombinedLinearGaussianTransitionModel(\n [ConstantVelocity(0.3), ConstantVelocity(0.3), ConstantVelocity(0.3)])\nsensor1_platform = MovingPlatform(\n states=initial_state,\n position_mapping=(0, 2, 4),\n velocity_mapping=(1, 3, 5),\n transition_model=sensor1_transition_model,\n sensors=[radar1]\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The second radar will be stationary on the ground at the point [2000, 50, 0]. This radar also\nmeasures in 3D using bearing, range, and elevation.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"radar2_noise_covar = CovarianceMatrix(np.diag([np.deg2rad(0.005), np.deg2rad(0.005), 0.05]))\nradar2 = RadarElevationBearingRange(\n ndim_state=6,\n position_mapping=(0, 2, 4),\n noise_covar=radar2_noise_covar,\n clutter_model=clutter_model,\n max_range=3000\n)\n\n# Make a platform and mount the radar\nfrom stonesoup.platform.base import FixedPlatform\nsensor2_platform = FixedPlatform(\n State([2000, 0, 50, 0, 0, 0]),\n position_mapping=[0, 2, 4],\n sensors=[radar2]\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can pass the platforms into a detection simulator. At each timestep, the simulator will\nreturn the detections from the `sensor1_platform`, then the detections from the\n`sensor2_platform`.\n\nAs we'll be using the same simulation and detectors in multiple detectors, trackers and for\nplotting :func:`itertools.tee()` is used to create independent iterators to use in different\ncomponents.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from itertools import tee\nfrom stonesoup.feeder.multi import MultiDataFeeder\nfrom stonesoup.simulator.platform import PlatformDetectionSimulator\n\ngt_sims = tee(gt_simulator, 2)\nradar1_simulator = PlatformDetectionSimulator(\n groundtruth=gt_sims[0],\n platforms=[sensor1_platform]\n)\nradar1_plotting, radar1_simulator, s1_detector = tee(radar1_simulator, 3)\n\nradar2_simulator = PlatformDetectionSimulator(\n groundtruth=gt_sims[1],\n platforms=[sensor2_platform]\n)\nradar2_plotting, radar2_simulator, s2_detector = tee(radar2_simulator, 3)\n\ns1s2_detector = MultiDataFeeder([radar1_simulator, radar2_simulator])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's briefly visualize the truths and measurements before we move on. Note that the final\nsimulation will not have the same truths because the ground truth generator is randomized. But\nthis gives an idea of what it will look like. The detections from the first sensor (airborne)\nwill be plotted in blue, and the detections from the second sensor are in red. The clutter from\nboth sensors are plotted in yellow. The sensor locations will be plotted in green Xs.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter, Dimension\n\n# Lists to hold the detections from each sensor and the path of the airborne radar\ns1_detections = set()\ns2_detections = set()\nradar1_path = []\ntruths = set()\n\n# Iterate over the time steps, extracting the detections, truths, and airborne sensor path\nfor (time, s1ds), (_, s2ds) in zip(radar1_plotting, radar2_plotting):\n s1_detections.update(s1ds)\n s2_detections.update(s2ds)\n radar1_path.append(sensor1_platform.position)\n truths.update(gt_simulator.groundtruth_paths)\n\n# Plot the truths and detections\nplotter = Plotter(dimension=Dimension.THREE)\nplotter.plot_ground_truths(truths, [0, 2, 4])\nplotter.plot_measurements(s1_detections, [0, 2, 4], color='blue')\nplotter.plot_measurements(s2_detections, [0, 2, 4], color='red')\n\n# Plot the radar positions\nplotter.ax.plot(*zip(*radar1_path), marker='x', color='green')\nplotter.ax.plot(2000, 50, 0, marker='x', color='green')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3: Make Trackers for the Radars\nThe airborne radar will be tracking using a JPDA tracker, and the stationary one will use a\nGM-LCC. These trackers will not be given the platform detection simulation objects as parameters,\nwe will feed the measurements later to ensure that that the same measurements are used in the\nfusion trackers.\nTo start, we can calculate the clutter spatial density.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"clutter_area = np.prod(np.diff(clutter_model.dist_params))\nclutter_spatial_density = clutter_model.clutter_rate/clutter_area"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### JPDA Tracker\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.probability import PDAHypothesiser\nfrom stonesoup.updater.kalman import ExtendedKalmanUpdater\nfrom stonesoup.predictor.kalman import ExtendedKalmanPredictor\nfrom stonesoup.dataassociator.probability import JPDA\nfrom stonesoup.deleter.error import CovarianceBasedDeleter\nfrom stonesoup.initiator.simple import MultiMeasurementInitiator\nfrom stonesoup.tracker.simple import MultiTargetMixtureTracker\n\n# Updater\njpda_updater = ExtendedKalmanUpdater(measurement_model=None)\n\n# Data Associator\npredictor = ExtendedKalmanPredictor(truth_transition_model)\nhypothesiser = PDAHypothesiser(\n predictor=predictor,\n updater=jpda_updater,\n clutter_spatial_density=clutter_spatial_density,\n prob_detect=0.9\n)\ndata_associator = JPDA(hypothesiser=hypothesiser)\n\n# Deleter\ncovariance_limit_for_delete = 500\ndeleter = CovarianceBasedDeleter(covar_trace_thresh=covariance_limit_for_delete)\n\n# Initiator\ns_prior_state = GaussianState([0, 0, 0, 0, 500, 0], np.diag([0, 50, 0, 50, 0, 50]))\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nhypothesiser = DistanceHypothesiser(\n predictor,\n jpda_updater,\n measure=Mahalanobis(),\n missed_distance=3\n)\n\nfrom stonesoup.dataassociator.neighbour import GNNWith2DAssignment\ninitiator_associator = GNNWith2DAssignment(hypothesiser)\ninitiator_deleter = CovarianceBasedDeleter(covar_trace_thresh=500)\ninitiator = MultiMeasurementInitiator(\n prior_state=s_prior_state,\n measurement_model=None,\n deleter=initiator_deleter,\n data_associator=initiator_associator,\n updater=jpda_updater,\n min_points=2\n)\n\njpda_tracker = MultiTargetMixtureTracker(\n initiator=initiator,\n deleter=deleter,\n detector=s1_detector,\n data_associator=data_associator,\n updater=jpda_updater\n)\njpda_tracker, s1_tracker = tee(jpda_tracker, 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### GM-LCC Tracker\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.pointprocess import LCCUpdater\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\nfrom stonesoup.hypothesiser.gaussianmixture import GaussianMixtureHypothesiser\nfrom stonesoup.mixturereducer.gaussianmixture import GaussianMixtureReducer\nfrom stonesoup.types.state import TaggedWeightedGaussianState\nfrom stonesoup.tracker.pointprocess import PointProcessMultiTargetTracker\n\n# Updater\nkalman_updater = ExtendedKalmanUpdater(measurement_model=None)\nupdater = LCCUpdater(\n updater=kalman_updater,\n clutter_spatial_density=clutter_spatial_density,\n normalisation=True,\n prob_detection=0.9,\n prob_survival=0.9,\n mean_number_of_false_alarms=clutter_model.clutter_rate,\n variance_of_false_alarms=100\n)\n\n# Hypothesiser\nkalman_predictor = ExtendedKalmanPredictor(truth_transition_model)\nbase_hypothesiser = DistanceHypothesiser(\n predictor=kalman_predictor,\n updater=kalman_updater,\n measure=Mahalanobis(),\n missed_distance=15,\n include_all=False\n)\nhypothesiser = GaussianMixtureHypothesiser(\n base_hypothesiser,\n order_by_detection=True\n)\n\n# Reducer\nreducer = GaussianMixtureReducer(\n prune_threshold=1E-3,\n pruning=True,\n merge_threshold=200,\n merging=True\n)\n\n# Birth component\nbirth_covar = CovarianceMatrix(np.diag([10000, 10, 10000, 10, 10000, 10]))\nbirth_component = TaggedWeightedGaussianState(\n state_vector=[0, 0, 0, 0, 500, 0],\n covar=birth_covar**2,\n weight=0.5,\n tag=TaggedWeightedGaussianState.BIRTH,\n timestamp=start_time\n)\n\n# Tracker\ngmlcc_tracker = PointProcessMultiTargetTracker(\n detector=s2_detector,\n hypothesiser=deepcopy(hypothesiser),\n updater=deepcopy(updater),\n reducer=deepcopy(reducer),\n birth_component=deepcopy(birth_component),\n extraction_threshold=0.90,\n)\ngmlcc_tracker, s2_tracker = tee(gmlcc_tracker, 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4: Make GM-PHD Tracker For Measurement Fusion\nThis tracker can use many of the same elements as the GM-LCC one.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.pointprocess import PHDUpdater\n\nupdater = PHDUpdater(\n kalman_updater,\n clutter_spatial_density=clutter_spatial_density,\n prob_detection=0.9,\n prob_survival=0.9\n)\n\nmeas_fusion_tracker = PointProcessMultiTargetTracker(\n detector=s1s2_detector,\n hypothesiser=deepcopy(hypothesiser),\n updater=deepcopy(updater),\n reducer=deepcopy(reducer),\n birth_component=deepcopy(birth_component),\n extraction_threshold=0.90,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5: Define a GM-PHD Tracker for Track Fusion\nTrack fusion using covariance intersection is implemented in Stone Soup using the\n:class:`ChernoffUpdater` class. For use in a GM-PHD, we insert the :class:`ChernoffUpdater` as\nthe base updater, instead of a typical :class:`~.KalmanUpdater`. The `clutter_spatial_density`\nparameter now refers to the estimated intensity of false tracks. Since the previous tracker will\n(hopefully) have ignored some of the clutter, we can use a smaller intensity than in the previous\ntrackers. The `omega` parameter is also adjustable. We will set it to 0.5 for now.\n\nThe remaining tracker parameters have been kept the same as the measurement fusion tracker except\nwhere noted. This will ensure a fair comparison of the results.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.chernoff import ChernoffUpdater\nfrom stonesoup.measures import Euclidean\n\n# Updater\nch_updater = ChernoffUpdater(measurement_model=None)\nupdater = PHDUpdater(\n ch_updater,\n clutter_spatial_density=1E-15,\n prob_detection=0.9,\n prob_survival=0.9\n)\n\n\n# Hypothesiser\n# The states being used as measurements are in Cartesian space. We will use Euclidean distance in\n# the :class:`~.DistanceHypothesiser`, meaning that we need a bigger missed distance than the\n# previous hypothesiser which used the Mahalanobis distance.\nkalman_predictor = ExtendedKalmanPredictor(truth_transition_model)\nbase_hypothesiser = DistanceHypothesiser(\n kalman_predictor,\n ch_updater,\n Euclidean(),\n missed_distance=300,\n include_all=False\n)\nhypothesiser = GaussianMixtureHypothesiser(base_hypothesiser, order_by_detection=True)\n\n# Reducer\n# The states tend to have low weights when they are first initialized using this method, so we will\n# keep the pruning threshold low.\nch_reducer = GaussianMixtureReducer(\n prune_threshold=1E-10,\n pruning=True,\n merge_threshold=200,\n merging=True\n)\n\n# Birth component\nbirth_covar = CovarianceMatrix(np.diag([100000, 100, 100000, 100, 100000, 100]))\nch_birth_component = TaggedWeightedGaussianState(\n state_vector=[0, 0, 0, 0, 500, 0],\n covar=birth_covar**2,\n weight=0.5,\n tag=TaggedWeightedGaussianState.BIRTH,\n timestamp=start_time\n)\n\n# Make tracker\nfrom stonesoup.feeder.track import Tracks2GaussianDetectionFeeder\n\ntrack_fusion_tracker = PointProcessMultiTargetTracker(\n detector=Tracks2GaussianDetectionFeeder(MultiDataFeeder([s1_tracker, s2_tracker])),\n hypothesiser=hypothesiser,\n updater=updater,\n reducer=deepcopy(ch_reducer),\n birth_component=deepcopy(ch_birth_component),\n extraction_threshold=0.90,\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6: Make Metric Manager\nWe will generate metrics of each of the four trackers for comparison.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.metricgenerator.basicmetrics import BasicMetrics\nfrom stonesoup.metricgenerator.ospametric import OSPAMetric\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.metricgenerator.uncertaintymetric import SumofCovarianceNormsMetric\n\nmetric_generators = []\n\ntrack_labels = ['jpda', 'gmlcc', 'meas_fusion', 'track_fusion']\nlabels = ['Airborne Radar (JPDAF)', 'Ground Radar (GM-LCC)', 'Measurement Fusion (GM-PHD)',\n 'Covariance Intersection (GM-PHD)']\n\nfor track_label, label in zip(track_labels, labels):\n # for _ in ['jpda', 'gmlcc', 'meas_fusion', 'track_fusion']:\n metric_generators.extend([BasicMetrics(generator_name=f'basic {label}',\n tracks_key=f'{track_label}_tracks',\n truths_key='truths'\n ),\n OSPAMetric(c=10, p=1, measure=Euclidean([0, 2, 4]),\n generator_name=f'OSPA {label}',\n tracks_key=f'{track_label}_tracks',\n truths_key='truths'\n ),\n SIAPMetrics(position_measure=Euclidean(), velocity_measure=Euclidean(),\n generator_name=f'SIAP {label}',\n tracks_key=f'{track_label}_tracks',\n truths_key='truths'\n ),\n SumofCovarianceNormsMetric(generator_name=f'Uncertainty {label}',\n tracks_key=f'{track_label}_tracks'\n )\n\n ])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.dataassociator.tracktotrack import TrackToTruth\nassociator = TrackToTruth(association_threshold=30)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.metricgenerator.manager import MultiManager\n\nmetric_manager = MultiManager(metric_generators, associator=associator)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7: Set Up the Detection Feeders\nAs one final step before running the simulation, we will write a little class which feeds the\ndetections for a single timestep. This makes sure that the two radars and the measurement\nfusion tracker are getting the same measurements.\n\nThe track fusion tracker will also use the :class:`~.Tracks2GaussianDetectionFeeder` class to\nfeed the tracks as measurements. At each time step, the resultant live tracks from the JPDA and\nGM-LCC trackers will be put into a :class:`~.Tracks2GaussianDetectionFeeder`. The feeder will\ntake the most recent state from each\ntrack and turn it into a :class:`~.GaussianDetection` object. The set of detection objects will\nbe returned and passed into the tracker.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8: Run Simulation\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"jpda_tracks, gmlcc_tracks = set(), set()\nmeas_fusion_tracks, track_fusion_tracks = set(), set()\n\nmeas_fusion_tracker_iter = iter(meas_fusion_tracker)\ntrack_fusion_tracker_iter = iter(track_fusion_tracker)\n\nfor t in range(num_steps):\n\n # Run JPDA tracker from sensor 1\n _, sensor1_tracks = next(jpda_tracker)\n jpda_tracks.update(sensor1_tracks)\n\n # Run GM-LCC tracker from sensor 2\n _, sensor2_tracks = next(gmlcc_tracker)\n gmlcc_tracks.update(sensor2_tracks)\n\n # Run the GM-PHD for measurement fusion. This one gets called twice, once for each set of\n # detections. This ensures there is only one detection per target.\n # Run the GM-PHD for track fusion. Similar to the measurement fusion, this tracker gets run\n # twice, once for each set of tracks.\n for _ in (0, 1):\n _, tracks = next(meas_fusion_tracker_iter)\n meas_fusion_tracks.update(tracks)\n _, tracks = next(track_fusion_tracker_iter)\n track_fusion_tracks.update(tracks)\n\ndetections = s1_detections | s2_detections\nmetric_manager.add_data({'truths': truths, 'detections': detections}, overwrite=False)\n\n# Remove tracks that have just one state in them as they were probably from clutter\njpda_tracks = {track for track in jpda_tracks if len(track) > 1}\ngmlcc_tracks = {track for track in gmlcc_tracks if len(track) > 1}\nmeas_fusion_tracks = {track for track in meas_fusion_tracks if len(track) > 1}\ntrack_fusion_tracks = {track for track in track_fusion_tracks if len(track) > 1}\n\n# Add track data to the metric manager\nmetric_manager.add_data({'jpda_tracks': jpda_tracks,\n 'gmlcc_tracks': gmlcc_tracks,\n 'meas_fusion_tracks': meas_fusion_tracks,\n 'track_fusion_tracks': track_fusion_tracks\n }, overwrite=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 9: Plot the Results\nNext, we will plot all of the resulting tracks and measurements. This will be done in two plots.\nThe first plot will show all of the data, and the second plot will show a closer view of one\nresultant track.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter1, plotter2 = Plotter(), Plotter()\nfor plotter in [plotter1, plotter2]:\n plotter.plot_ground_truths(truths, [0, 2],\n color='black')\n plotter.plot_measurements(s1_detections, [0, 2], color='orange', marker='*',\n measurements_label='Measurements - Airborne Radar')\n plotter.plot_measurements(s2_detections, [0, 2], color='blue', marker='*',\n measurements_label='Measurements - Ground Radar')\n plotter.plot_tracks(jpda_tracks, [0, 2], color='red',\n track_label='Tracks - Airborne Radar (JPDAF)')\n plotter.plot_tracks(gmlcc_tracks, [0, 2], color='purple',\n track_label='Tracks - Ground Radar (GM-LCC)')\n plotter.plot_tracks(meas_fusion_tracks, [0, 2], color='green',\n track_label='Tracks - Measurement Fusion (GM-PHD)')\n plotter.plot_tracks(track_fusion_tracks, [0, 2], color='pink',\n track_label='Tracks - Covariance Intersection (GM-PHD)')\n\n # Format the legend a bit. Set the position outside of the plot, and\n # swap the order of the clutter and ground radar measurements\n pos = plotter.ax.get_position()\n plotter.ax.set_position([pos.x0, pos.y0, pos.width * 0.7, pos.height])\n k = list(plotter.legend_dict.keys())\n k[2], k[3] = k[3], k[2]\n v = list(plotter.legend_dict.values())\n v[2], v[3] = v[3], v[2]\n plotter.ax.legend(handles=v, labels=k, loc='lower center', bbox_to_anchor=(0.5, -0.5))\n\nplotter1.fig.show()\n\ntrack = sorted(track_fusion_tracks, key=len)[-1] # Longest track\nx_min = min([state.state_vector[0] for state in track])\nx_max = max([state.state_vector[0] for state in track])\ny_min = min([state.state_vector[2] for state in track])\ny_max = max([state.state_vector[2] for state in track])\n\nplotter2.ax.set_xlim(x_min-50, x_max+50)\nplotter2.ax.set_ylim(y_min-50, y_max+50)\n\nplotter2.fig.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we will plot the metrics. First, we call a function to calculate\nthe metrics.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"metrics = metric_manager.generate_metrics()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can plot them. The SIAP and OSPA metrics can be done together in a loop. The\nTrack-To-Truth ratio needs to be done separately so that it can be calculated at each\ntimestep.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import MetricPlotter\n\nfig = MetricPlotter()\nfig.plot_metrics(metrics, color=['blue', 'orange', 'green', 'red'], linestyle='--')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot Track to Truth Ratio\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\nfig, ax = plt.subplots()\ntimes = metric_manager.list_timestamps(metric_manager.generators[2])\n\n# Iterate through the trackers. For each one, go through the list of all timesteps\n# and calculate the ratio at that time\nfor track_label, label in zip(track_labels, labels):\n ratios = []\n for time in times:\n num_tracks = SIAPMetrics.num_tracks_at_time(metric_manager.states_sets[f'{track_label}_tracks'], timestamp=time)\n num_truths = SIAPMetrics.num_truths_at_time(metric_manager.states_sets['truths'], timestamp=time)\n ratios.append(num_tracks / num_truths)\n plt.plot(ratios, linewidth=2, label=label, linestyle='--')\n\nax.set_title('Track-to-Truth Ratio')\nax.set_ylabel('Ratio')\nax.set_xlabel('Time $(s)$')\nax.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Julier, S. J. and Uhlmann, J. K., \u201cGeneral decentralized data fusion\n with covariance intersection,\u201d Handbook of multisensor data fusion:\n theory and practice, pp. 319\u2013344, 2009.\n\n.. [#] Clark, D. E. and Campbell, M. A., \u201cIntegrating covariance intersection\n into Bayesian multi-target tracking filters,\u201d preprint on TechRxiv.\n submitted to IEEE Transactions on Aerospace and Electronic Systems.\n\n.. [#] Hurley, M. B., \u201cAn information theoretic justification for covariance\n intersection and its generalization,\u201d in Proceedings of the Fifth\n International Conference on Information Fusion. FUSION 2002.(IEEE Cat.\n No. 02EX5997), vol. 1. IEEE, 2002, pp. 505\u2013511\n\n.. [#] Clark, D. and Hunter, E. and Balaji, B. and O'Rourke, S., \u201cCentralized multi-sensor\n multi-target data fusion with tracks as measurements,\u201d to be submitted to SPIE Defense and\n Security Symposium 2023.\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK yzX^ ^ MTT_3D_Platform.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Multi-Target Tracking in 3D Using Platform Simulation\nIn the Stone Soup library, simulations can be set up and run using special\n:class:`~.FixedPlatform` and :class:`~.Sensor` objects. Simulated data can be preferable to\nreal data as the user has more control over the tracking scenario and real\ndata can be difficult or costly to acquire.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the Sensor\nWe will begin by importing many relevant packages for the simulation.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from datetime import datetime\nfrom datetime import timedelta\nimport numpy as np\nimport random\n\n\n# Stone Soup imports:\nfrom stonesoup.types.state import State, GaussianState\nfrom stonesoup.types.array import StateVector, CovarianceMatrix\nfrom stonesoup.models.transition.linear import (\n CombinedLinearGaussianTransitionModel, ConstantVelocity)\nfrom stonesoup.models.measurement.nonlinear import \\\n CartesianToElevationBearingRange\nfrom stonesoup.deleter.time import UpdateTimeStepsDeleter\nfrom stonesoup.tracker.simple import MultiTargetMixtureTracker\nfrom matplotlib import pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We set the start time to be the moment when we begin the simulation; for\nsimulations, the actual time doesn't matter, only the time delta between the\nstart and the point in question. We also set a random seed to ensure a\nstandard outcome. At the end, you can try changing this value to see how the\nstochastic nature of the simulation and tracker can produce very different\ntracking scenarios with the same parameters.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"start_time = datetime.now()\nnp.random.seed(783)\nrandom.seed(783)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create the Stationary Platform\nNext, we will create a platform that will hold our radar sensor. In this\ncase, the platform is stationary and located at the point (0, 0, 0), though\nin general it need not be.\n\nDefine the initial platform position, in this case the origin\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"platform_state_vector = StateVector([[0], [0], [0]])\nposition_mapping = (0, 1, 2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create the initial state (position, time). Notice that the time is set to\nthe simulation start time defined earlier\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"platform_state = State(platform_state_vector, start_time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create our fixed platform\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.platform.base import FixedPlatform\nplatform = FixedPlatform(\n states=platform_state,\n position_mapping=position_mapping\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a Sensor\nNow that our sensor platform has been created, we can create a sensor to\nattach to it. In this case, we will be using a radar that takes measurements\nof range, bearing, and elevation of the targets.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.sensor.radar.radar import RadarElevationBearingRange\nfrom stonesoup.models.clutter import ClutterModel"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First we create a covariance matrix which is a suitable measurement accuracy\nfor the radar sensor. This radar measures range with an accuracy of +/- 25m,\nelevation accuracy +/- 0.15, degrees and bearing accuracy of +/- 0.15\ndegrees.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"noise_covar = CovarianceMatrix(np.array(np.diag([np.deg2rad(0.15)**2,\n np.deg2rad(0.15)**2,\n 25**2])))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The radar needs to be informed of where x, y, and z are in the target state\nspace. In Stone Soup the states are often of the form [x, vx, y, vy, z, vz].\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"radar_mapping = (0, 2, 4)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A newer feature of the Stone Soup platform simulations are the ability to\ngenerate clutter directly from the sensors using the :class:`~.ClutterModel`\nclass. Using the clutter models, we can simulate realistic clutter\noriginating from the measurement model. Clutter is defined in the Cartesian\nplane and converted to the correct measurement types according to the\nsensor. We will now add a clutter model to the radar sensor. This clutter\nmodel will use a uniform distribution over the defined ranges in each\ndimension.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"params = ((-10000, 10000), # clutter min x and max x\n (-10000, 10000), # clutter min y and max y\n (8000, 10000)) # clutter min z and max z\nclutter_model = ClutterModel(\n clutter_rate=0.5,\n distribution=np.random.default_rng().uniform,\n dist_params=params\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Instantiate the radar and finally, attach the sensor to the stationary platform we defined above.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"radar = RadarElevationBearingRange(\n ndim_state=6,\n position_mapping=radar_mapping,\n noise_covar=noise_covar,\n clutter_model=clutter_model\n)\n\nplatform.add_sensor(radar)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create the Simulation\nFor this example, we wish to have a simulation of multiple airborne targets.\nWe will use the :class:`~.MultiTargetGroundTruthSimulator` class to simulate\nthe target paths, and then the :class:`~.PlatformDetectionSimulator` class\nto handle the radar simulation.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set a constant velocity transition model for the targets\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"transition_model = CombinedLinearGaussianTransitionModel(\n [ConstantVelocity(0.5), ConstantVelocity(0.5), ConstantVelocity(0.1)])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the Gaussian State from which new targets are sampled on\ninitialisation\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"initial_target_state = GaussianState(\n state_vector=StateVector([[0], [0], [0], [0], [9000], [0]]),\n covar=CovarianceMatrix(np.diag([2000, 50, 2000, 50, 100, 1]))\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And create the truth simulator for the targets\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.simulator.simple import MultiTargetGroundTruthSimulator\ngroundtruth_sim = MultiTargetGroundTruthSimulator(\n transition_model=transition_model, # target transition model\n initial_state=initial_target_state, # add our initial state for targets\n timestep=timedelta(seconds=1), # time between measurements\n number_steps=120, # 2 minutes\n birth_rate=0.05, # 5% chance of a new target being born every second\n death_probability=0.05 # 5% chance of a target being killed\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With our truth data generated and our sensor platform placed, we can now\nconstruct a simulator to generate measurements of the targets from each\nof the sensors in the simulation; in this case, just the stationary radar.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.simulator.platform import PlatformDetectionSimulator\n\nsim = PlatformDetectionSimulator(\n groundtruth=groundtruth_sim,\n platforms=[platform]\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set Up the Tracking Algorithm\nFor this example, we will be using the JPDA algorithm to perform \"soft\"\nassociations of the measurements to the targets. This is necessary as we\nhave multiple airborne targets whose paths may intersect - a \"hard\" or\n\"greedy\" association algorithm such as the GNN may have issues in these\ncases.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First, we create a Kalman predictor using the transition model from the\ntarget simulation. In real situations, you may not know the actual\ntransition model.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.predictor.kalman import KalmanPredictor\npredictor = KalmanPredictor(transition_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we define a measurement model for the Kalman updater. Here we have\naltered the noise covariance matrix slightly to make it harder for the\ntracker.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"meas_covar = np.diag([np.deg2rad(0.5), np.deg2rad(0.15), 25])\nmeas_covar_trk = CovarianceMatrix(1.0*np.power(meas_covar, 2))\nmeas_model = CartesianToElevationBearingRange(\n ndim_state=6,\n mapping=(0, 2, 4),\n noise_covar=meas_covar_trk\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the measurement model, we make a Kalman updater which we will pass\ninto our JPDA tracker.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.updater.kalman import ExtendedKalmanUpdater\nupdater = ExtendedKalmanUpdater(measurement_model=meas_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The hypothesiser will assume that there is a 95% chance to measure any given\ntarget at any given timestep. In real life, this probability is based on the\nSNR of the target signals. The clutter spatial density of the hypothesiser\ncan be changed to check what happens when there is a mismatch between the\nestimated clutter rate and actual clutter rate.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.hypothesiser.probability import PDAHypothesiser\nPd = 0.95 # 95%\nhypothesiser = PDAHypothesiser(predictor=predictor,\n updater=updater,\n clutter_spatial_density=0.5,\n prob_detect=Pd)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the hypothesiser, we can make a data associator. Other MTT algorithms\nmay use different association algorithms (like GNN)\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.dataassociator.probability import JPDA\ndata_associator = JPDA(hypothesiser=hypothesiser)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We implement a simple deleter algorithm to delete tracks if no measurements\nhave fallen within the JPDA gating region in 3 time steps.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"deleter = UpdateTimeStepsDeleter(time_steps_since_update=3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will now set up a track initiator. In real life, targets may enter the\nmeasurement zone at any time during the collection period, and may leave at\nany point as well. To distinguish new targets from random clutter, we use a\ntrack initiator. This specific algorithm is a multi-measurement initiator;\nit utilises features of the tracker to initiate and hold tracks temporarily\nwithin the initiator itself, releasing them to the tracker once there are\nmultiple detections associated with them enough to determine that they are\n\"sure\" tracks. In this case, the tracks are released after 3 appropriate\ndetections in a row.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.initiator.simple import MultiMeasurementInitiator\nfrom stonesoup.dataassociator.neighbour import NearestNeighbour\n\nmin_detections = 3 # number of detections required to begin a track\ninitiator_prior_state = GaussianState(\n state_vector=np.array([[0], [0], [0], [0], [0], [0]]),\n covar=np.diag([0, 10, 0, 10, 0, 10])**2\n)\n\ninitiator_meas_model = CartesianToElevationBearingRange(\n ndim_state=6,\n mapping=np.array([0, 2, 4]),\n noise_covar=noise_covar\n)\n\ninitiator = MultiMeasurementInitiator(\n prior_state=initiator_prior_state,\n measurement_model=meas_model,\n deleter=deleter,\n data_associator=NearestNeighbour(hypothesiser),\n updater=updater,\n min_points=min_detections,\n updates_only=True\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we are ready to Create a JPDA multi-target tracker.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"JPDA_tracker = MultiTargetMixtureTracker(\n initiator=initiator,\n deleter=deleter,\n detector=sim,\n data_associator=data_associator,\n updater=updater\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the Simulation and Tracker\nSince the JPDA tracker holds the simulation variables, we can easily iterate\nthrough the tracker. Each time it will update the groundtruth simulation,\ngenerate detections using our fixed platform and radar, and run the tracking\nalgorithm.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Create lists to hold the information we want to plot later\ntracks_plot = set()\ntracks_id = set()\ngroundtruth_plot = set()\ndetections_plot = set()\n\n# Run the simulation and tracker\nfor time, ctracks in JPDA_tracker:\n print(time) # allows us to see the progress of the tracking simulation\n\n for track in ctracks:\n tracks_plot.add(track)\n for truth in groundtruth_sim.current[1]:\n groundtruth_plot.add(truth)\n for detection in sim.detections:\n detections_plot.add(detection)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Plot the Results\nNow that all of the relevant information has been extracted, the results can\nbe plotted using the 3D plotting functionality provided by the\n:class:`~.Plotter` class.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import Plotter, Dimension\nplotter = Plotter(Dimension.THREE)\nplotter.plot_ground_truths(groundtruth_plot, [0, 2, 4])\nplotter.plot_measurements(detections_plot, [0, 2, 4])\nplotter.plot_tracks(tracks_plot, [0, 2, 4], uncertainty=False, err_freq=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will also make a plot without measurements/clutter to better see the\ntracks.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plotter2 = Plotter(Dimension.THREE)\nplotter2.plot_ground_truths(groundtruth_plot, [0, 2, 4])\nplotter2.plot_tracks(tracks_plot, [0, 2, 4], uncertainty=True, err_freq=5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Metrics\nTo analyse the tracker performance, we will use the OSPA, SIAP, and\nuncertainty metrics. For each of these metrics, we make a generator\nobject which gets put into a metric manager.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# OSPA metric\nfrom stonesoup.metricgenerator.ospametric import OSPAMetric\nospa_generator = OSPAMetric(c=40, p=1,\n generator_name='OSPA metrics',\n tracks_key='tracks',\n truths_key='truths'\n )\n\n# SIAP metrics\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.measures import Euclidean\nSIAPpos_measure = Euclidean(mapping=np.array([0, 2]))\nSIAPvel_measure = Euclidean(mapping=np.array([1, 3]))\nsiap_generator = SIAPMetrics(\n position_measure=SIAPpos_measure,\n velocity_measure=SIAPvel_measure,\n generator_name='SIAP metrics',\n tracks_key='tracks',\n truths_key='truths'\n)\n\n# Uncertainty metric\nfrom stonesoup.metricgenerator.uncertaintymetric import \\\n SumofCovarianceNormsMetric\nuncertainty_generator = SumofCovarianceNormsMetric(generator_name='Uncertainty metric',\n tracks_key='tracks')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The metric manager requires us to define an associator. Here we want to\ncompare the track estimates with the ground truth.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.dataassociator.tracktotrack import TrackToTruth\nassociator = TrackToTruth(association_threshold=30)\n\nfrom stonesoup.metricgenerator.manager import MultiManager\nmetric_manager = MultiManager(\n [ospa_generator, siap_generator, uncertainty_generator],\n associator=associator\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we saved the groundtruth and tracks before, we can easily add them\nto the metric manager now, and then tell it to generate the metrics.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"metric_manager.add_data({'truths': groundtruth_plot, 'tracks': tracks_plot})\nmetrics = metric_manager.generate_metrics()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The first metric we will look at is the OSPA metric.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.plotter import MetricPlotter\n\nfig1 = MetricPlotter()\nfig1.plot_metrics(metrics, generator_names=['OSPA metrics'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next are the SIAP metrics. Specifically, we will look at the position and\nvelocity accuracy.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig2 = MetricPlotter()\nfig2.plot_metrics(metrics, metric_names=['SIAP Position Accuracy at times',\n 'SIAP Velocity Accuracy at times'])\nfig2.set_fig_title('SIAP metrics')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we will examine a general uncertainty metric. This is calculated as\nthe sum of the norms of the covariance matrices of each estimated state.\nSince the sum is not normalized for the number of estimated states, it is\nmost important to look at the trends of this graph rather than the values.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig3 = MetricPlotter()\nfig3.plot_metrics(metrics, generator_names=['Uncertainty metric'])\nfig3.set_ax_title(['Track uncertainty over time'])"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK yzXTðO) ) ParticleFlow.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Gromov Particle Flow Filter\nThis example looks at utilising the Generalized Gromov method for stochastic particle flow filters.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The Filter\n\nWe use the simple exact formula solution [#]_ to the equation:\n\n\\begin{align}\\frac{\\partial p}{\\partial \\lambda} =\n -div(fp)+\\frac{1}{2}div\\left[Q(x)\\frac{\\partial p}{\\partial x}\\right]\\end{align}\n\nwhere $p(x, \u03bb)$ is the conditional probability density of $x$ as a function of\n$\u03bb\\in [0, 1]$, $f$ is the drift function and $Q$ is the diffusion covariance.\n\nUnder the assumption that the prior density and likelihood are both multivariate Gaussian\ndensities, and that $Q$ is a symmetric positive semi-definite matrix independent of\n$x$, we have the simple exact formula:\n\n\\begin{align}Q = [P^{-1}+\\lambda H^{T}R^{-1}H]^{-1}H^{T}R^{-1}H[P^{-1}+\\lambda H^{T}R^{-1}H]^{-1}\\end{align}\n\nwhere $Q$ can be guaranteed to be positive, semi-definite by taking\n\n\\begin{align}Q \\rightarrow \\frac{Q + Q^{T}}{2}\\end{align}\n\nUsing $Q$, we can generate random samples of the diffusion for use in the stochastic flow\nof particles.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Comparison\nComparing the bootstrap Stonesoup Particle filter, the Gromov particle flow filter, and the\nGromov particle flow filter with parallel EKF covariance computation ([#]_ using algorithm 2\nwith Gromov flow).\n\nTo note with particle flow, that resampling isn't required and lower numbers of particles are\nneeded, as it doesn't suffer the same issues of degeneracy as bootstrap particle filter.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### One time-step\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import datetime\nimport numpy as np\nfrom scipy.stats import multivariate_normal\n\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\nfrom stonesoup.models.measurement.nonlinear import CartesianToBearingRange\nfrom stonesoup.types.detection import Detection\nfrom stonesoup.predictor.particle import ParticlePredictor, ParticleFlowKalmanPredictor\nfrom stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\nfrom stonesoup.updater.particle import ParticleUpdater, GromovFlowParticleUpdater, \\\n GromovFlowKalmanParticleUpdater\nfrom stonesoup.types.particle import Particle\nfrom stonesoup.types.numeric import Probability\nfrom stonesoup.types.state import ParticleState\n\nnp.random.seed(2020)\n\nstart_time = datetime.datetime(2020, 1, 1)\ntruth = GroundTruthPath([\n GroundTruthState([4, 4, 4, 4], timestamp=start_time + datetime.timedelta(seconds=1))\n])\n\nmeasurement_model = CartesianToBearingRange(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=np.diag([np.radians(0.5), 1])\n)\nmeasurement = Detection(measurement_model.function(truth.state, noise=True),\n timestamp=truth.state.timestamp,\n measurement_model=measurement_model)\n\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),\n ConstantVelocity(0.05)])\n\np_predictor = ParticlePredictor(transition_model)\npfk_predictor = ParticleFlowKalmanPredictor(transition_model) # By default, parallels EKF\npredictors = [p_predictor, p_predictor, pfk_predictor]\n\np_updater = ParticleUpdater(measurement_model)\nf_updater = GromovFlowParticleUpdater(measurement_model)\npfk_updater = GromovFlowKalmanParticleUpdater(measurement_model) # By default, parallels EKF\nupdaters = [p_updater, f_updater, pfk_updater]\n\nnumber_particles = 1000\nsamples = multivariate_normal.rvs(np.array([0, 1, 0, 1]),\n np.diag([1.5, 0.5, 1.5, 0.5]),\n size=number_particles)\n# Note weights not used in particle flow, so value won't effect it.\nweight = Probability(1/number_particles)\nparticles = [\n Particle(sample.reshape(-1, 1), weight=weight) for sample in samples]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the filters\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from matplotlib import pyplot as plt\nfrom stonesoup.types.hypothesis import SingleHypothesis\n\nfig = plt.figure(figsize=(10, 6))\nax = fig.add_subplot(1, 1, 1)\nax.axis('equal')\n\nfilters = ['Particle', 'Particle Flow', 'Parallel EKF']\nparticle_counts = [1000, 50, 50]\ncolours = ['blue', 'green', 'red']\nhandles, labels = [], []\n\nfor predictor, updater, colour, filter, particle_count \\\n in zip(predictors, updaters, colours, filters, particle_counts):\n prior = ParticleState(None, particle_list=particles[:particle_count], timestamp=start_time)\n\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement)\n post = updater.update(hypothesis)\n\n handles.append(ax.scatter(post.state_vector[0, :], post.state_vector[2, :], color=colour, s=2))\n\n labels.append(filter)\n\nax.scatter(*truth.state_vector[[0, 2]], color='black')\nax.legend(handles=handles, labels=labels)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Multiple time-steps\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])\nfor k in range(1, 21):\n truth.append(\n GroundTruthState(transition_model.function(truth[k-1],\n noise=True,\n time_interval=datetime.timedelta(seconds=1)),\n timestamp=start_time+datetime.timedelta(seconds=k))\n )\n\nmeasurements = []\nfor state in truth:\n measurement = measurement_model.function(state, noise=True)\n measurements.append(Detection(measurement, timestamp=state.timestamp,\n measurement_model=measurement_model))\n\n\nnumber_particles = 1000\nsamples = multivariate_normal.rvs(np.array([0, 1, 0, 1]),\n np.diag([1.5, 0.5, 1.5, 0.5]),\n size=number_particles)\nweight = Probability(1/number_particles)\nparticles = [Particle(sample.reshape(-1, 1), weight=weight)\n for sample in samples]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.resampler.particle import SystematicResampler\nfrom stonesoup.types.track import Track\nfrom stonesoup.dataassociator.tracktotrack import TrackToTruth\nfrom stonesoup.metricgenerator.manager import MultiManager\nfrom stonesoup.metricgenerator.tracktotruthmetrics import SIAPMetrics\nfrom stonesoup.plotter import Plotter\nfrom stonesoup.measures import Euclidean\n\nupdaters[0].resampler = SystematicResampler() # Allow particle filter to re-sample\n\npa = dict()\nsiap_gens = [SIAPMetrics(position_measure=Euclidean((0, 2)), velocity_measure=Euclidean((1, 3)),\n generator_name=filter, tracks_key=f'tracks_{filter}', truths_key='truth')\n for filter in filters]\n\nmetric_manager = MultiManager(siap_gens, associator=TrackToTruth(association_threshold=np.inf))\nmetric_manager.add_data({'truth': {truth}})\n\nfor predictor, updater, colour, filter, particle_count, generators \\\n in zip(predictors, updaters, colours, filters, particle_counts, siap_gens):\n track = Track()\n prior = ParticleState(None, particle_list=particles[:particle_count], timestamp=start_time)\n\n for measurement in measurements:\n prediction = predictor.predict(prior, timestamp=measurement.timestamp)\n hypothesis = SingleHypothesis(prediction, measurement)\n post = updater.update(hypothesis)\n track.append(post)\n prior = track[-1]\n\n plotter = Plotter()\n plotter.plot_ground_truths(truth, [0, 2])\n plotter.plot_measurements(measurements, [0, 2])\n plotter.plot_tracks(track, [0, 2], particle=True, color=colour)\n plotter.ax.set_title(filter)\n plotter.ax.set_xlim(0, 30)\n plotter.ax.set_ylim(0, 30)\n\n metric_manager.add_data({f'tracks_{filter}': {track}}, overwrite=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Positional Accuracy\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"metrics = metric_manager.generate_metrics()\n\nfrom stonesoup.plotter import MetricPlotter\n\nfig2 = MetricPlotter()\nfig2.plot_metrics(metrics, metric_names=['SIAP Position Accuracy at times'])\nfig2.axes[0].set_ylim(-2, 6)\nfig2.axes[0].set_title('Positional Accuracy')\nfig2.fig.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n.. [#] Fred Daum, Jim Huang & Arjang Noushin 2017, Generalized Gromov method for stochastic\n particle flow filters\n.. [#] Tao Ding and Mark J. Coates 2012, IMPLEMENTATION OF THE DAUM-HUANG EXACT-FLOW PARTICLE\n FILTER\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK !zzX+t,2 ,2 bearing_only_example.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Bearings-only tracking example\n\nNon-linear bearing-only target tracking is a complex problem for estimating a\ntarget's state from bearing measurements from a sensor.\nFrom bearing-only measurements we can estimate the parameters of the target\nmotion (range and course). This is a non-linear problem caused by the non-linearity\nbetween the measurements and the target state vector.\n\nIn this short tutorial we show how we can run a bearing-only simulation inside the Stone Soup framework.\n\nIn this tutorial, we simulate a radar placed on top of a moving platform collecting measurements,\nthen using the :class:`~.ExtendedKalmanFilter` we track the target. In this example we employ a\ndistance-based data associator to merge the hypothesis and the detections from the sensor.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Layout\nThe layout of this example follows:\n\n1) Create the moving platform and the :class:`~.RadarBearing` detector;\n2) Generate the target ground truth paths;\n3) Set up the simulation for generating detections from the ground truth paths;\n4) Run the simulation and create the plots\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# some general imports\nimport numpy as np\nfrom matplotlib import pyplot as plt\n\nfrom datetime import datetime\nfrom datetime import timedelta\n\n# Load Stone Soup materials\nfrom stonesoup.types.state import State, GaussianState\nfrom stonesoup.types.array import StateVector, CovarianceMatrix\nfrom stonesoup.models.transition.linear import (CombinedLinearGaussianTransitionModel, ConstantVelocity)\nfrom stonesoup.models.measurement.nonlinear import Cartesian2DToBearing\n\n# Load the filter components\nfrom stonesoup.updater.kalman import ExtendedKalmanUpdater\nfrom stonesoup.predictor.kalman import ExtendedKalmanPredictor\nfrom stonesoup.deleter.time import UpdateTimeStepsDeleter\nfrom stonesoup.tracker.simple import SingleTargetTracker\n\n# set a random seed and start of the simulation\nnp.random.seed(2001)\nstart_time = datetime.now()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1) Create the moving platform and the Bearing-Only radar\nFirstly, we create the initial state of the platform, including the origin point and the\ncartesian (x, y) movement direction. Then, we create a transition model (in 2D cartesian coordinates)\nof the platform.\nAt this point, we can setup the Radar which receives only the bearing measurements from the targets using the\n:class:`~.RadarBearing` sensor.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Import the platform to place the sensor\nfrom stonesoup.platform.base import MovingPlatform\n\n# Define the platform location, place it in the origin, and define its Cartesian movements.\n# In addition specify the position and velocity mapping. This is done in 2D Cartesian coordinates.\n\nplatform_state_vector = StateVector([[0], [-5], [0], [-7]])\nposition_mapping = (0, 2)\nvelocity_mapping = (1, 3)\n\n# Create the initial state (position and time)\nplatform_state = State(platform_state_vector, start_time)\n\n# Create a platform transition model, let's assume it is moving with constant velocity\nplatform_transition_model = CombinedLinearGaussianTransitionModel([\n ConstantVelocity(0.0), ConstantVelocity(0.0)])\n\n# We can instantiate the platform's initial state, position and velocity mapping, and \n# the transition model using the :class:`~.MovingPlatform` platform class.\nplatform = MovingPlatform(states=platform_state,\n position_mapping=position_mapping,\n velocity_mapping=velocity_mapping,\n transition_model=platform_transition_model)\n\n# At this stage, we need to create the sensor, let's import the RadarBearing. \n# This sensor only provides the bearing measurements from the target detections, \n# the range is not specified.\nfrom stonesoup.sensor.radar.radar import RadarBearing\n\n# Configure the radar noise, since we are using just a single dimension we need to specify only the\n# noise associated with the bearing dimension, we assume a bearing accuracy of +/- 0.025 degrees for \n# each measurement\nnoise_covar = CovarianceMatrix(np.array(np.diag([np.deg2rad(0.025) ** 2])))\n\n# This radar needs to be informed of the x and y mapping of the target space.\nradar_mapping = (0, 2)\n\n# Instantiate the radar\nradar = RadarBearing(ndim_state=4,\n position_mapping=radar_mapping,\n noise_covar=noise_covar)\n\n# As presented in the other examples we have to place the sensor on the platform.\nplatform.add_sensor(radar)\n# At this point we can also check the offset rotation or the mounting of the radar in respect to the\n# platform as shown in other tutorials."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2) Generate the ground truth target movements\nWe now build a ground truth simulator of a single target with a transition model\nand a known initial state.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Load the single target ground truth simulator\nfrom stonesoup.simulator.simple import SingleTargetGroundTruthSimulator\n\n# Instantiate the transition model\ntransition_model = CombinedLinearGaussianTransitionModel([\n ConstantVelocity(1.0), ConstantVelocity(1.0)])\n\n# Define the initial target state\n# We use a Gaussian state to specify the initial\n# 2D Cartesian position and velocity, and the accuracy\n# using a covariance matrix.\ninitial_target_state = GaussianState([50, 0, 50, 0],\n np.diag([1, 1, 1, 1]) ** 2,\n timestamp=start_time)\n\n# Set up the ground truth simulation\ngroundtruth_simulation = SingleTargetGroundTruthSimulator(\n transition_model=transition_model,\n initial_state=initial_target_state,\n timestep=timedelta(seconds=1),\n number_steps=100\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3) Set up the detection simulation that generates the bearing measurements\nAfter defining the measurement model and simulation, we will use these components to run our example.\nThe measurement model is the :class:`~.Cartesian2DToBearing`.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Define the measurement model using a Cartesian to bearing\nmeas_model = Cartesian2DToBearing(\n ndim_state=4,\n mapping=(0, 2),\n noise_covar=noise_covar)\n\n# Import the PlatformDetectionSimulator\nfrom stonesoup.simulator.platform import PlatformDetectionSimulator\n\nsim = PlatformDetectionSimulator(groundtruth=groundtruth_simulation,\n platforms=[platform])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4) Set up the tracker\nInstantiate the filter components\nCreate an Extended Kalman Predictor\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"predictor = ExtendedKalmanPredictor(transition_model)\n\n# Create an Extended Kalman Updater\nupdater = ExtendedKalmanUpdater(measurement_model=None)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Given the complexity of the bearing-only tracking, let's feed the\nsame initial state to both the ground truth measurements and tracker\nas Stone Soup, currently, does not have a bearing only initiator.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Instantiate the single point initiator\nfrom stonesoup.initiator.simple import SinglePointInitiator\ninitiator = SinglePointInitiator(\n prior_state=initial_target_state,\n measurement_model=meas_model)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Add the hypothesiser components. We use a distance based hypothesiser using a Malahonobis\ndistance to do the data association between the detections and the tracks.\nSince we consider a single target case a simple nearest neighbour will be enough for the data associator.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Load the hypothesiser and data associator\nfrom stonesoup.hypothesiser.distance import DistanceHypothesiser\nfrom stonesoup.measures import Mahalanobis\n\nhypothesiser = DistanceHypothesiser(predictor, updater,\n measure=Mahalanobis(),\n missed_distance=5)\n\nfrom stonesoup.dataassociator.neighbour import NearestNeighbour\n\ndata_associator = NearestNeighbour(hypothesiser)\n\n# Instantiate the time based deleter\ndeleter = UpdateTimeStepsDeleter(time_steps_since_update=3)\n\n# Build the Kalman tracker\nkalman_tracker = SingleTargetTracker(\n initiator=initiator,\n deleter=deleter,\n detector=sim,\n data_associator=data_associator,\n updater=updater\n)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5) Run the simulation and create the plots\nWe have everything for running the simulation, we have the tracker, the sensor\ndetections and platform movements.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"kalman_tracks = {} # Store for plotting later\ngroundtruth_paths = {} # Store for plotting later\n\n# Loop for the tracks and the ground truths\nfor time, ctracks in kalman_tracker:\n for track in ctracks:\n loc = (track.state_vector[0], track.state_vector[2])\n if track not in kalman_tracks:\n kalman_tracks[track] = []\n kalman_tracks[track].append(loc)\n\n for truth in groundtruth_simulation.current[1]:\n loc = (truth.state_vector[0], truth.state_vector[2])\n if truth not in groundtruth_paths:\n groundtruth_paths[truth] = []\n groundtruth_paths[truth].append(loc)\n\nfrom stonesoup.plotter import AnimatedPlotterly, AnimationPlotter\n\nplotter = AnimationPlotter(legend_kwargs=dict(loc='upper left'))\nplotter.plot_ground_truths(groundtruth_paths, (0,2))\nplotter.plot_tracks(kalman_tracks, (0,2))\nplotter.plot_ground_truths(platform, (0,2), truths_label=\"Sensor Platform\")\nplotter.run()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This concludes this short tutorial on how to setup and run a simple single target\nsimulation using Bearings-Only measurements obtained by a moving sensor using an Extended Kalman Filter.\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## References\n[1] Xiangdong Lin, Thiagalingam Kirubarajan, Yaakov Bar-Shalom, Simon Maskell,\n\"Comparison of EKF, pseudomeasurement, and particle filters for a bearing-only target tracking problem\", in\nSignal and Data Processing of Small Targets 2002, 2002, vol 4728, pp. 240-250. doi:10.1117/12.478508\n[2] Vincent J. Aidala, \"Kalman Filter Behavior in Bearing-Only Tracking Applications\", IEEE Transactions\non Aerospace Electronic Systems, Vol. 15, January 1979\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK yzXL!X; X; Custom_Pandas_Dataloader.ipynb{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# Use of Custom Readers that support Pandas DataFrames\nThis is a demonstration of using customised readers that\nsupport data contained within Pandas DataFrames, rather than\nloading directly from a .csv file using :class:`~.CSVGroundTruthReader` or\n:class:`~.CSVDetectionReader`.\n\nThe benefit is that this allows us to use the versatile data loading\ncapabilities of pandas to read from many different data source types\nas needed, including .csv, JSON, XML, Parquet, HDF5, .txt, .zip and more.\nThe resulting DataFrame can then simply be fed into the defined\n`DataFrameGroundTruthReader` or `DataFrameDetectionReader` for further processing\nin Stone Soup as required.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Software dependencies\nBefore beginning this example, you need to ensure that Pandas is installed,\nwhich is a fast, powerful and flexible open-source data analysis tool in Python.\nThe easiest way to install pandas (if not done so already), is to run pip install\nfrom a terminal window within the desired environment:\n\n.. code::\n\n pip install pandas\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The main dependencies and imports for this example are included below:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import numpy as np\nimport pandas as pd\n\nfrom math import modf\n\nfrom stonesoup.base import Property\nfrom stonesoup.buffered_generator import BufferedGenerator\nfrom stonesoup.reader.base import GroundTruthReader, DetectionReader, Reader\nfrom stonesoup.types.detection import Detection\nfrom stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState\n\nfrom typing import Sequence, Collection\n\nfrom datetime import datetime, timedelta\nfrom dateutil.parser import parse"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Frame Reader\nSimilarly to Stone Soup's :class:`~._CSVFrameReader`, we'll define a `_DataFrameReader`\nclass that inherits from the base :class:`~.Reader` class to read a DataFrame containing\nstate vector fields, a time field, and additional metadata fields (all other columns\nby default). The only difference between this class and the :class:`~._CSVFrameReader`\nclass is that we have no path attribute (the DataFrame is already loaded in memory).\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class _DataFrameReader(Reader):\n state_vector_fields: Sequence[str] = Property(\n doc='List of columns names to be used in state vector')\n time_field: str = Property(\n doc='Name of column to be used as time field')\n time_field_format: str = Property(\n default=None, doc='Optional datetime format')\n timestamp: bool = Property(\n default=False, doc='Treat time field as a timestamp from epoch')\n metadata_fields: Collection[str] = Property(\n default=None, doc='List of columns to be saved as metadata, default all')\n\n def _get_metadata(self, row):\n if self.metadata_fields is None:\n local_metadata = dict(row)\n for key in list(local_metadata):\n if key == self.time_field or key in self.state_vector_fields:\n del local_metadata[key]\n else:\n local_metadata = {field: row[field]\n for field in self.metadata_fields\n if field in row}\n return local_metadata\n\n def _get_time(self, row):\n if self.time_field_format is not None:\n time_field_value = datetime.strptime(row[self.time_field], self.time_field_format)\n elif self.timestamp:\n fractional, timestamp = modf(float(row[self.time_field]))\n time_field_value = datetime.utcfromtimestamp(int(timestamp))\n time_field_value += timedelta(microseconds=fractional * 1E6)\n else:\n time_field_value = row[self.time_field]\n\n if not isinstance(time_field_value, datetime):\n time_field_value = parse(time_field_value, ignoretz=True)\n\n return time_field_value"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Data Ground Truth Reader\nWith the help of our `_DataFrameReader` class, we can now define a custom\n`DataFrameGroundTruthReader`. This is similar to :class:`~.CSVGroundTruthReader` and\ninherits from the base `GroundTruthReader` class. A key difference is that we\ninclude an instance attribute for the dataframe containing our data.\n\nWe also define a custom generator function (groundtruth_paths_gen) that uses the decorator\n`@BufferedGenerator.generator_method`. The generator needs to return a time and a set of\ndetections, like so:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class DataFrameGroundTruthReader(GroundTruthReader, _DataFrameReader):\n \"\"\"A custom reader for pandas DataFrames containing truth data.\n\n The DataFrame must have colums containing all fields needed to generate the\n ground truth state. Those states with the same ID will be put into\n a :class:`~.GroundTruthPath` in sequence, and all paths that are updated at the same time\n are yielded together, and such assumes file is in time order.\n\n Parameters\n ----------\n \"\"\"\n dataframe: pd.DataFrame = Property(doc=\"DataFrame containing the ground truth data.\")\n path_id_field: str = Property(doc='Name of column to be used as path ID')\n\n @BufferedGenerator.generator_method\n def groundtruth_paths_gen(self):\n \"\"\" Generator method for providing each row of ground truth data. \"\"\"\n groundtruth_dict = {}\n updated_paths = set()\n previous_time = None\n for row in self.dataframe.to_dict(orient=\"records\"):\n\n time = self._get_time(row)\n if previous_time is not None and previous_time != time:\n yield previous_time, updated_paths\n updated_paths = set()\n previous_time = time\n\n state = GroundTruthState(np.array([[row[col_name]] for col_name\n in self.state_vector_fields],\n dtype=np.float64),\n timestamp=time,\n metadata=self._get_metadata(row))\n\n id_ = row[self.path_id_field]\n if id_ not in groundtruth_dict:\n groundtruth_dict[id_] = GroundTruthPath(id=id_)\n groundtruth_path = groundtruth_dict[id_]\n groundtruth_path.append(state)\n updated_paths.add(groundtruth_path)\n\n # Yield remaining\n yield previous_time, updated_paths"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With our `DataFrameGroundTruthReader` defined, we can test it on a simple example. Let's\ndo a basic 3D simulation to create an example dataframe, from which we can test our class:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \\\n ConstantVelocity\n\nq_x = 0.05\nq_y = 0.05\nq_z = 0.05\nstart_time = datetime.now()\ntransition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(q_x),\n ConstantVelocity(q_y),\n ConstantVelocity(q_z)])\ntruth = GroundTruthPath([GroundTruthState([0, 1, 0, 1, 0, 1], timestamp=start_time)])\n\ntimes = []\nx, y, z = [], [], []\nvel_x, vel_y, vel_z = [], [], []\n\nnum_steps = 25\nfor k in range(1, num_steps + 1):\n\n time = start_time+timedelta(seconds=k)\n\n next_state = GroundTruthState(\n transition_model.function(truth[k-1], noise=True,\n time_interval=timedelta(seconds=1)),\n timestamp=time)\n truth.append(next_state)\n\n times.append(time)\n x.append(next_state.state_vector[0])\n vel_x.append(next_state.state_vector[1])\n y.append(next_state.state_vector[2])\n vel_y.append(next_state.state_vector[3])\n z.append(next_state.state_vector[4])\n vel_z.append(next_state.state_vector[5])\n\ntruth_df = pd.DataFrame({'time': times,\n 'x': x,\n 'y': y,\n 'z': z,\n 'vel_x': vel_x,\n 'vel_y': vel_y,\n 'vel_z': vel_z,\n 'track_id': 0})\n\ntruth_df.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the process above is just an example for providing a simple dataframe to use,\nand is not generally something we would need to do (since we already have the GroundTruthPath).\nThe dataframe above is just used to show the workings of our newly defined\n`DataFrameGroundTruthReader`. In practice, we can use any dataframe containing\nour Cartesian positions or longitude and latitude co-ordinates. Note that if we\nare using longitude and latitude inputs, we would also need to transform these\nusing :class:`~.LongLatToUTMConverter` or equivalent.\n\nWe can now initialise our DataFrameGroundTruthReader using this example DataFrame like so:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# read ground truth data from pandas dataframe\nground_truth_reader = DataFrameGroundTruthReader(\n dataframe=truth_df,\n state_vector_fields=['x', 'vel_x', 'y', 'vel_y', 'z', 'vel_z'],\n time_field='time',\n path_id_field='track_id')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's demonstrate the ground truth reader generating output for one iteration:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"next(iter(ground_truth_reader))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another benefit of this ground truth reader is that we now have convenient access to the original\ndataframe, using the .dataframe attribute, like so:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ground_truth_reader.dataframe.head(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### DataFrame Detection Reader\nSimilarly to our `DataFrameGroundTruthReader`, we can develop a custom `DataFrameDetectionReader`\nthat can read in DataFrames containing detections through subclassing from Stone Soup's\n`DetectionReader` class, along with our custom `_DataFrameReader` class above.\nAgain, this closely resembles the existing `CSVDetectionReader` class within the Stone Soup\nlibrary, except we include a instance attribute 'dataframe', and modify our detections_gen\nfunction to work with dataframes rather than .csv files. This can be seen below:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class DataFrameDetectionReader(DetectionReader, _DataFrameReader):\n \"\"\"A custom detection reader for DataFrames containing detections.\n\n DataFrame must have headers with the appropriate fields needed to generate\n the detection. Detections at the same time are yielded together, and such assume file is in\n time order.\n\n Parameters\n ----------\n \"\"\"\n dataframe: pd.DataFrame = Property(doc=\"DataFrame containing the detection data.\")\n\n @BufferedGenerator.generator_method\n def detections_gen(self):\n detections = set()\n previous_time = None\n for row in self.dataframe.to_dict(orient=\"records\"):\n\n time = self._get_time(row)\n if previous_time is not None and previous_time != time:\n yield previous_time, detections\n detections = set()\n previous_time = time\n\n detections.add(Detection(\n np.array([[row[col_name]] for col_name in self.state_vector_fields],\n dtype=np.float64),\n timestamp=time,\n metadata=self._get_metadata(row)))\n\n # Yield remaining\n yield previous_time, detections"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can instantiate this using our example DataFrame above like so:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"detection_reader = DataFrameDetectionReader(\n dataframe=truth_df,\n state_vector_fields=['x', 'vel_x', 'y', 'vel_y', 'z', 'vel_z'],\n time_field='time')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Following this, we can now perform any desired follow-up task such as simulation or tracking\nas covered in the other Stone Soup examples, tutorials and demonstrations. As discussed\npreviously, the huge benefits of using a custom DataFrame reader like this is that we can\nread any type of data supported by the pandas library, which gives us a huge range of\noptions. This strategy also saves us the overhead of manually specifying custom Stone\nSoup Reader classes for each format of data.\n\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}PK yzXmsk sk &