Note
Go to the end to download the full example code. or to run this example in your browser via Binder
Monte Carlo Tree Search for Autonomous Source Term Estimation
This example demonstrates how to set up and run a Monte Carlo tree search (MCTS) sensor management algorithm for autonomous source term estimation (STE). More details about the problem of autonomous STE can be found in the Autonomous Source Term Estimation example, so this will not be replicated here. Instead, the focus is on MCTS and how to implement it this for the problem.
MCTS is a technique for solving MDP and POMDP problems by simultaneously constructing and evaluating a search tree. The process consists of 4 stages that are iterated until we reach some predefined computational budget. These processes are: Selection, Expansion, Simulation and Backpropagation. The purpose of the algorithm is to arrive at the optimal action policy by sequentially estimating the action value function, \(Q\), and returning the maximal argument to this at the end of the process.
The process is described as follows. Starting from the root node (current state or estimated state) the best child node is selected. The most common way, and the way implemented here, is to select this node according to the upper confidence bound (UCB) for trees. This is given by
where \(a\) is the action, \(h\) is the history (for POMDP problems a
history or belief is commonly used but in MDP problems this would be replaced with a
state), \(Q(h, a)\) is the current cumulative action value estimate,
\(N(h, a)\) is the number of visits or simulations of this node, \(N(h)\)
is the number of visits to the parent node and \(c\) is the exploration factor,
defined with exploration_factor
. The purpose of the UCB is to trade off
between exploitation of the most rewarding nodes in the tree and exploration of
those that have been visited as frequently.
Once the best child node has been selected, this becomes a parent node and a
new child node added according to the available set of unvisited actions. This
selection happens at random. This node is then simulated by predicting the
current state estimate in the parent node and updating this estimate with a
generated detection after applying the candidate action. This provides a
predicted future state which is used to calculate the action value of this
node. This is done by providing a reward_function
. Finally, this
reward is added to the current action value estimated in each node on the
search tree branch that was descended during selection. This creates a
tradeoff between future and immediate rewards during the next iteration
of the search process. Once a predefined computational budget has been
reached, which in this implementation is the niterations
attribute,
the best child to the root node in the tree is determined and returned from
the choose_actions()
. The user can select which criteria used to
select this best action by defining the best_child_policy
.
Further detail on MCTS can be found in the literature, including
variations and alternative POMDP approaches [1].
This example and MCTS implementation is based on the work by Glover et al [2].
The simulation shown here is similar to that work, with some modification
to reduce execution time. The reward implemented for this problem is the
Kullback-Leibler divegence (KLD) using ExpectedKLDivergence
and this will be used with the MCTSRolloutSensorManager
and
benchmarked against a myopic alternative with BruteForceSensorManager
.
Setup
First, some general packages used throughout the example are imported and random number generation is seeded for repeatability.
# General imports and environment setup
import numpy as np
from datetime import datetime, timedelta
np.random.seed(1991)
Generate ground truth
Here we generate the source term ground truth and import the
IsotropicPlume
measurement model to plot the resulting plume
from the ground truth.
from stonesoup.types.groundtruth import GroundTruthState
from stonesoup.models.measurement.gas import IsotropicPlume
start_time = datetime.now()
theta = GroundTruthState([30, # x
40, # y
1, # z
5, # Q
4, # u
np.radians(90), # phi
1, # ci
8], # cii
timestamp=start_time)
measurement_model = IsotropicPlume()
Plot the resulting plume from the ground truth source term. This uses
the Plotter
class from Stone Soup.
from stonesoup.plotter import Plotter
from stonesoup.types.state import StateVector
plotter = Plotter()
z = 0
n_values = 200
intensity = np.zeros([n_values, n_values])
pos_x = np.zeros([n_values])
pos_y = np.zeros([n_values])
for n_x, x in enumerate(np.linspace(0, 50, n_values)):
pos_x[n_x] = x
for n_y, y in enumerate(np.linspace(0, 50, n_values)):
pos_y[n_y] = y
pos = StateVector([x, y, z])
measurement_model.translation_offset = pos
intensity[n_x, n_y] = measurement_model.function(theta)
plotter.ax.set_xlim(left=0, right=50)
plotter.ax.set_ylim(bottom=0, top=50)
plotter.ax.set_box_aspect(1)
gas_distribution = plotter.ax.pcolor(pos_x, pos_y, intensity.T)
plotter.fig.colorbar(gas_distribution, label='Concentration')
/home/docs/checkouts/readthedocs.org/user_builds/stonesoup/checkouts/stable/docs/examples/sensormanagement/Monte_Carlo_Tree_Search_for_Autonomous_Source_Term_Estimation.py:123: DeprecationWarning:
Conversion of an array with ndim > 0 to a scalar is deprecated, and will error in future. Ensure you extract a single element from your array before performing this operation. (Deprecated NumPy 1.25.)
<matplotlib.colorbar.Colorbar object at 0x7f1f58b0df70>
Create sensors and platforms
The sensor used here is a GasIntensitySensor
that provides
point concentration measurements. The documentation for the sensor provides
insight into the various noise parameters used here.
The sensor alone is not controllable and therefore will be mounted onto an
actionable platform with the NStepDirectionalGridMovable
movement
controller, that allows movement in the \(xy\) plane in fixed step sizes. The
resulting trajectory from a controller of this type can be seen later in the
results.
The gas_sensorA and sensor_platformA is to be used with the benchmark algorithm and gas_sensorB and sensor_platformB is to be used with the MCTS sensor manager.
from stonesoup.types.state import State
from stonesoup.platform import FixedPlatform
from stonesoup.movable.grid import NStepDirectionalGridMovable
from stonesoup.sensor.gas import GasIntensitySensor
gas_sensorA = GasIntensitySensor(min_noise=1e-4,
missed_detection_probability=0.3,
sensing_threshold=5e-4)
sensor_platformA = FixedPlatform(
movement_controller=NStepDirectionalGridMovable(states=[State([[5], [15], [0.]],
timestamp=start_time)],
position_mapping=(0, 1, 2),
resolution=2,
n_steps=1,
step_size=1,
action_mapping=(0, 1),
action_space=np.array([[0, 50], [0, 50]])),
sensors=[gas_sensorA])
gas_sensorB = GasIntensitySensor(min_noise=1e-4,
missed_detection_probability=0.3,
sensing_threshold=5e-4)
sensor_platformB = FixedPlatform(
movement_controller=NStepDirectionalGridMovable(states=[State([[5], [15], [0.]],
timestamp=start_time)],
position_mapping=(0, 1, 2),
resolution=2,
n_steps=1,
step_size=1,
action_mapping=(0, 1),
action_space=np.array([[0, 50], [0, 50]])),
sensors=[gas_sensorB])
Create particle predictor and updater
Now the ParticlePredictor
and ParticleUpdater
are
constructed. The particle predictor will be created with a RandomWalk
motion model with 0 magnitude, meaning that the predictor will not change
the estimated source term.
The ParticleUpdater
is created with an effective sample size
resampling technique (ESSResampler
) and Markov Chain Monte Carlo
regularisation (MCMCRegulariser
). A constraint function is also
provided to the ParticleUpdater
and MCMCRegulariser
to prevent invalid states from being generated.
from stonesoup.resampler.particle import ESSResampler
from stonesoup.regulariser.particle import MCMCRegulariser
from stonesoup.predictor.particle import ParticlePredictor
from stonesoup.updater.particle import ParticleUpdater
from stonesoup.models.transition.linear import RandomWalk, CombinedGaussianTransitionModel
def constraint_function(particle_state):
logical_indx = ((particle_state.state_vector[3, :]<0) |
(particle_state.state_vector[4, :]<0) |
(particle_state.state_vector[6, :]<0) |
(particle_state.state_vector[7, :]<0))
return logical_indx
resampler = ESSResampler()
regulariser = MCMCRegulariser(constraint_func=constraint_function)
predictor = ParticlePredictor(CombinedGaussianTransitionModel([RandomWalk(0.0)]*8))
updater = ParticleUpdater(measurement_model,
resampler=resampler,
regulariser=regulariser,
constraint_func=constraint_function)
Create reward functions and sensor managers
Both the myopic benchmark and MCTS algorithms will be using the KLD
reward function (ExpectedKLDivergence
) but two separate
instances are created as the MCTS version will be required to return
tracks to store states in the search tree as it is constructed. Both
are created with a separate ParticleUpdater
that does
not perform resampling or regularisation.
The BruteForceSensorManager
is defined in the usual way,
but MCTSRolloutSensorManager
requires some more arguments.
niterations
defines the computational budget for MCTS,
exploration_factor
controls how exploratory the tree search
behaviour will be, discount_factor
discounts future rewards
calculated in the rollout to reflect increasing uncertainty of rewards
with increasing future depth, rollout_depth
controls the
rollout horizon and best_child_policy
determines how to select
the best child at the end of the MCTS process. Choices include maximum
action value ('max_cumulative_reward'
), average action value per
visit ('max_average_reward'
) and maximum number of visits
('max_visits'
).
from stonesoup.sensormanager.reward import ExpectedKLDivergence
from stonesoup.sensormanager import BruteForceSensorManager
from stonesoup.sensormanager.tree_search import MCTSRolloutSensorManager
reward_updater = ParticleUpdater(measurement_model=None)
# Myopic benchmark approach
reward_funcA = ExpectedKLDivergence(updater=reward_updater, measurement_noise=True)
sensormanagerA = BruteForceSensorManager(sensors={gas_sensorA},
platforms={sensor_platformA},
reward_function=reward_funcA)
# MCTS with rollout approach
reward_funcB = ExpectedKLDivergence(updater=reward_updater,
measurement_noise=True,
return_tracks=True)
sensormanagerB = MCTSRolloutSensorManager(sensors={gas_sensorB},
platforms={sensor_platformB},
reward_function=reward_funcB,
niterations=100,
exploration_factor=0.05,
discount_factor=0.9,
rollout_depth=5,
best_child_policy='max_cumulative_reward')
Create prior distribution
Each component of the source term will receive its own distribution. Source location received a uniform distribution across the environment, release rate received a Gamma distribution and the wind speed, direction and diffusivity parameters all receive normal distributions. The prior for both myopic and MCTS algorithms will be identical.
from stonesoup.types.state import StateVectors, ParticleState
from stonesoup.types.track import Track
import copy
n_parts = 10000
priorA = ParticleState(StateVectors([np.random.uniform(0, 50, n_parts),
np.random.uniform(0, 50, n_parts),
np.random.uniform(0, 5, n_parts),
np.random.gamma(2, 5, n_parts),
np.random.normal(theta.state_vector[4],
2,
n_parts),
np.random.normal(theta.state_vector[5],
np.radians(10),
n_parts),
np.random.normal(theta.state_vector[6],
0.1,
n_parts),
np.random.normal(theta.state_vector[7],
1,
n_parts)]),
weight=np.array([1/n_parts]*n_parts),
timestamp=start_time)
priorA.parent = priorA
priorB = copy.copy(priorA)
Run the myopic sensor manager
First the myopic sensor manager and associated filter is run over 100 iterations, or until it reaches a stopping criteria defined as the square root of the covariance trace being below a threshold value. It is important to note that this has no bearing on the performance of the estimation, just the convergence of the particle distribution.
from stonesoup.types.hypothesis import SingleHypothesis
n_iter = 100
stopping_criteria = 3
trackA = Track(priorA)
sensorA_x = [sensor_platformA.position[0]]
sensorA_y = [sensor_platformA.position[1]]
for n in range(n_iter):
time = (start_time + timedelta(seconds=n+1))
if n > 0:
chosen_actions = sensormanagerA.choose_actions({trackA}, time, n_steps=2, step_size=2,
action_mapping=(0, 1))
for sensor, actions in chosen_actions[0].items():
sensor.add_actions(actions)
sensor.act(time)
prediction = predictor.predict(priorA, timestamp=time)
theta.timestamp=time
detection = (gas_sensorA.measure({theta}, noise=True).pop())
hypothesis = SingleHypothesis(prediction, detection)
update = updater.update(hypothesis)
trackA.append(update)
priorA = trackA[-1]
sensorA_x.append(sensor_platformA.position[0])
sensorA_y.append(sensor_platformA.position[1])
if np.sqrt(np.trace(trackA.state.covar)) < stopping_criteria:
print('Converged in {} iterations'.format(n))
break
Plot the myopic result
The performance of the myopic benchmark approach is illustrated with an animation showing the sensor platform trajectory, detections along this trajectory and the resulting source estimate with the particle distributions.
from matplotlib import animation
import matplotlib
matplotlib.rcParams['animation.html'] = 'jshtml'
matplotlib.rcParams['animation.embed_limit'] = 40000000
# Plot ground truth plume
plotterA = Plotter()
plotterA.ax.set_xlim(left=0, right=50)
plotterA.ax.set_ylim(bottom=0, top=50)
plotterA.ax.set_box_aspect(1)
gas_distributionA = plotterA.ax.pcolor(pos_x, pos_y, intensity.T)
plotterA.fig.colorbar(gas_distributionA, label='Concentration')
plotterA.ax.set_title('Myopic KLD Sensor Management Result')
# Plot platform start location
plotterA.ax.plot(sensor_platformA.movement_controller.states[0].state_vector[0],
sensor_platformA.movement_controller.states[0].state_vector[1],
'go',
label='Start Location')
# Plot initial particles and store line object for setting later
partsA, = plotterA.ax.plot(trackA[0].state_vector[0],
trackA[0].state_vector[1],
'g.',
markersize=0.5,
linewidth=0,
label='Particles')
# Plot scatter of detection locations, intially with large marker for the legend
detectionsA = plotterA.ax.scatter(sensorA_x,
sensorA_y,
s=np.array([10]*len(sensorA_x)),
c='r',
linewidth=0,
label='Detections')
# Plot first sensor position for constructing trajectory, storing the line object for setting later
trajectoryA, = plotterA.ax.plot(sensor_platformA.movement_controller.states[0].state_vector[0],
sensor_platformA.movement_controller.states[0].state_vector[0],
'r-',
label='Sensor Path')
plotterA.ax.legend(loc='center left', bbox_to_anchor=(-0.5, 0.5))
# Reset the marker sizes after creating legend
detectionsA.set_sizes(np.array([0]*len(sensorA_x)))
def anim_funcA(i):
# Update particle line object according to latest track
partsA.set_data(trackA[i].state_vector[0], trackA[i].state_vector[1])
states = np.array([[sensorA_x[0]], [sensorA_y[0]], [0]])
detection_marker_sizes = np.zeros(len(trackA))
for n in range(i):
# Collect states upto timestep i
states = np.append(states, np.array([[sensorA_x[n+1]], [sensorA_y[n+1]], [0]]), axis=1)
# Collect detections upto timestep i
if hasattr(trackA[n+1], 'hypothesis'):
detection_marker_sizes[n+1] = (
trackA[n+1].hypothesis.measurement.state_vector[0][0]*3000)
# Update trajectory data and detection scatter marker sizes
trajectoryA.set_data(states[0], states[1])
detectionsA.set_sizes(detection_marker_sizes)
animation.FuncAnimation(plotterA.fig, anim_funcA, interval=100, frames=len(trackA))