Source code for stonesoup.architecture.generator

import copy
from datetime import datetime
import random

import networkx as nx
import numpy as np

from stonesoup.architecture import SensorNode, FusionNode, Edge, Edges, InformationArchitecture, \
    NetworkArchitecture
from stonesoup.architecture.edge import FusionQueue
from stonesoup.architecture.node import SensorFusionNode, RepeaterNode
from stonesoup.base import Base, Property
from stonesoup.feeder.track import Tracks2GaussianDetectionFeeder
from stonesoup.sensor.sensor import Sensor
from stonesoup.tracker import Tracker


[docs] class InformationArchitectureGenerator(Base): """ Convenience class that can be used to generate one or multiple :class:`~.InformationArchitecture`s given a set of input parameters. The graph is generated randomly subject to the parameters such as `node_ratio` and `mean_degree`, rather than having to be user-defined. """ arch_type: str = Property( doc="Type of architecture to be modelled. Currently only 'hierarchical' and " "'decentralised' are supported.", default='decentralised') start_time: datetime = Property( doc="Start time of simulation to be passed to the Architecture class.", default_factory=datetime.now) node_ratio: tuple = Property( doc="Tuple containing the number of each type of node, in the order of (sensor nodes, " "sensor fusion nodes, fusion nodes).", default=None) mean_degree: float = Property( doc="Average (mean) degree of nodes in the network.", default=None) base_sensor: Sensor = Property( doc="Sensor class object that will be duplicated to create multiple sensors. Position of " "this sensor is used with 'sensor_max_distance' to calculate a position for " "duplicated sensors.", default=None) sensor_max_distance: tuple = Property( doc="Max distance each sensor can be from base_sensor.position. Should be a tuple of " "length equal to len(base_sensor.position_mapping)", default=None) base_tracker: Tracker = Property( doc="Tracker class object that will be duplicated to create multiple trackers. " "Should have detector=None.", default=None) iteration_limit: int = Property( doc="Limit for the number of iterations the generate_edgelist() method can make when " "attempting to build a suitable graph.", default=10000) allow_invalid_graph: bool = Property( doc="Bool where True allows invalid graphs to be returned without throwing an error. " "False by default", default=False) n_archs: int = Property( doc="How many architectures should be generated.", default=2) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.n_nodes = sum(self.node_ratio) self.n_sensor_nodes = self.node_ratio[0] self.n_fusion_nodes = self.node_ratio[2] self.n_sensor_fusion_nodes = self.node_ratio[1] self.n_edges = int(np.ceil(self.n_nodes * self.mean_degree * 0.5)) if self.sensor_max_distance is None: self.sensor_max_distance = tuple(np.zeros(len(self.base_sensor.position_mapping))) if self.arch_type not in ['decentralised', 'hierarchical']: raise ValueError('arch_style must be "decentralised" or "hierarchical"')
[docs] def generate(self): """ Generate one or more InformationArchitecture objects based on the generator's parameters. Returns ------- list List of generated InformationArchitecture objects. """ edgelist, node_labels = self._generate_edgelist() nodes = self._assign_nodes(node_labels) archs = list() for architecture in nodes.keys(): arch = self._generate_architecture(nodes[architecture], edgelist) archs.append(arch) return archs
def _generate_architecture(self, nodes, edgelist): edges = [] for t in edgelist: edge = Edge((nodes[t[0]], nodes[t[1]])) edges.append(edge) arch_edges = Edges(edges) arch = InformationArchitecture(arch_edges, self.start_time) return arch def _assign_nodes(self, node_labels): nodes = {} for architecture in range(self.n_archs): nodes[architecture] = {} for label in node_labels: if label.startswith('f'): for architecture in range(self.n_archs): t = copy.deepcopy(self.base_tracker) fq = FusionQueue() t.detector = Tracks2GaussianDetectionFeeder(fq) node = FusionNode(tracker=t, fusion_queue=fq, label=label, latency=0) nodes[architecture][label] = node elif label.startswith('sf'): pos = np.array( [[p + random.uniform(-d, d)] for p, d in zip(self.base_sensor.position, self.sensor_max_distance)]) for architecture in range(self.n_archs): s = copy.deepcopy(self.base_sensor) s.position = pos t = copy.deepcopy(self.base_tracker) fq = FusionQueue() t.detector = Tracks2GaussianDetectionFeeder(fq) node = SensorFusionNode(sensor=s, tracker=t, fusion_queue=fq, label=label, latency=0) nodes[architecture][label] = node elif label.startswith('s'): pos = np.array( [[p + random.uniform(-d, d)] for p, d in zip(self.base_sensor.position, self.sensor_max_distance)]) for architecture in range(self.n_archs): s = copy.deepcopy(self.base_sensor) s.position = pos node = SensorNode(sensor=s, label=label, latency=0) nodes[architecture][label] = node else: for architecture in range(self.n_archs): node = RepeaterNode(label=label, latency=0) nodes[architecture][label] = node return nodes def _generate_edgelist(self): edges = [] nodes = ['f' + str(i) for i in range(self.n_fusion_nodes)] + \ ['sf' + str(i) for i in range(self.n_sensor_fusion_nodes)] + \ ['s' + str(i) for i in range(self.n_sensor_nodes)] valid = False if self.arch_type == 'hierarchical': while not valid: edges = [] n = self.n_fusion_nodes + self.n_sensor_fusion_nodes for i, node in enumerate(nodes): if i == 0: continue elif i == 1: source = nodes[0] target = nodes[1] else: if node.startswith('s') and not node.startswith('sf'): source = node target = nodes[random.randint(0, n - 1)] else: source = node target = nodes[random.randint(0, i - 1)] # Create edge edge = (source, target) edges.append(edge) # Logic checks on graph g = nx.DiGraph(edges) for f_node in ['f' + str(i) for i in range(self.n_fusion_nodes)]: if g.in_degree(f_node) == 0: break else: valid = True else: while not valid: for i in range(1, self.n_edges + 1): source = target = -1 if i < self.n_nodes: source = nodes[i] target = nodes[random.randint( 0, min(i - 1, len(nodes) - self.n_sensor_nodes - 1))] else: while source == target \ or (source, target) in edges \ or (target, source) in edges: source = nodes[random.randint(0, len(nodes) - 1)] target = nodes[random.randint(0, len(nodes) - self.n_sensor_nodes - 1)] edges.append((source, target)) # Logic checks on graph g = nx.DiGraph(edges) for f_node in ['f' + str(i) for i in range(self.n_fusion_nodes)]: if g.in_degree(f_node) == 0: break else: valid = True return edges, nodes
[docs] class NetworkArchitectureGenerator(InformationArchitectureGenerator): """ Convenience class that can be used to generate one or multiple :class:`~.NetworkArchitecture`s given a set of input parameters. The graph is generated randomly subject to the parameters such as `node_ratio` and `mean_degree`, rather than having to be user-defined. """ n_routes: tuple = Property( doc="Tuple containing a minimum and maximum value for the number of routes created in the " "network architecture to represent a single edge in the information architecture.", default=(1, 2))
[docs] def generate(self): """ Generate one or more NetworkArchitecture objects based on the generator's parameters. Returns ------- list List of generated NetworkArchitecture objects. """ edgelist, node_labels = self._generate_edgelist() edgelist, node_labels = self._add_network(edgelist, node_labels) nodes = self._assign_nodes(node_labels) archs = list() for architecture in nodes.keys(): arch = self._generate_architecture(nodes[architecture], edgelist) archs.append(arch) return archs
def _add_network(self, edgelist, nodes): network_edgelist = [] i = 0 for e in edgelist: # Choose number of routes between two information architecture nodes n = self.n_routes[0] if len(self.n_routes) == 1 else \ random.randint(self.n_routes[0], self.n_routes[1]) for route in range(n): r_lab = 'r' + str(i) network_edgelist.append((e[0], r_lab)) network_edgelist.append((r_lab, e[1])) nodes.append(r_lab) i += 1 return network_edgelist, nodes def _generate_architecture(self, nodes, edgelist): edges = [] for t in edgelist: edge = Edge((nodes[t[0]], nodes[t[1]])) edges.append(edge) arch_edges = Edges(edges) arch = NetworkArchitecture(arch_edges, self.start_time) return arch