Source code for stonesoup.types.graph

import itertools
from collections.abc import Iterable

import numpy as np

try:
    import geopandas
    import networkx as nx
    from shapely import LineString
except ImportError as error:
    raise ImportError(
        "Usage of the road networks classes requires that the optional package dependencies "
        "'geopandas' and 'networkx' are installed. This can be achieved by running "
        "'python -m pip install stonesoup[roadnet]'") from error


[docs] class RoadNetwork(nx.DiGraph): """Road network type A road network is a directed graph, where nodes represent intersections and edges represent roads. Nodes must have a ``'pos'`` attribute, which is a tuple of (x, y) coordinates. Edges must have a ``'weight'`` attribute, which is a float representing the weight of the edges (e.g. length). The underlying data structure is a :class:`networkx.DiGraph`, with some limitations on the allowed node and edge attributes, and some additional methods. .. note:: The current implementation is not optimised for continuous updates of the graph structure. More importantly, although possible, it is not advisable to interleave structure update calls (e.g. :meth:`add_edge`) with query calls (e.g. :meth:`shortest_path`). Instead, the intended use is to construct the network once and then perform repeated queries. Parameters ---------- """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._edge_list = [] self._short_paths_cache = {} @property def edge_list(self): if self._edge_list is None: self._edge_list = np.array(list(self.edges.keys())) return self._edge_list
[docs] def add_node(self, n, **attr): """Add a single node n and update node attributes. Parameters ---------- n : int A node identifier attr Node attributes to update. Must contain a ``'pos'`` attribute, that is a tuple of (x, y) coordinates. """ if not isinstance(n, int) or n <= 0: raise TypeError("Road network nodes must be positive integers") if "pos" not in attr: raise ValueError("Road network nodes must have a 'pos' attribute") super().add_node(n, **attr)
[docs] def add_nodes_from(self, nodes, **attr): """Add multiple nodes. Parameters ---------- nodes : Union[Sequence[int], Sequence[Tuple[int, Dict]]] A container of nodes to be added. Can be a list of node identifiers, or a list of tuples of (node identifier, node attributes dict). attr Update attributes for all nodes in nodes. Node attributes specified in nodes as a tuple take precedence over attributes specified via keyword arguments. """ for n in nodes: if isinstance(n, int): self.add_node(n, **attr) elif isinstance(n, tuple): self.add_node(n[0], **{**attr, **n[1]}) else: raise ValueError("Invalid node format. Elements of nodes must be integers, or " "tuples of (int, dict)")
[docs] def remove_node(self, n): """Remove node n. Parameters ---------- n : int A node identifier """ super().remove_node(n)
[docs] def remove_nodes_from(self, nodes): """Remove multiple nodes. Parameters ---------- nodes : Sequence[int] A sequence of node identifiers """ for n in nodes: self.remove_node(n)
[docs] def add_edge(self, u, v, **attr): """Add an edge between nodes u and v. Parameters ---------- u : int Start node identifier v : int End node identifier attr Edge attributes to update. Must contain a ``'weight'`` attribute, that is a float. """ if "weight" not in attr: raise ValueError("Road network edges must have a 'weight' attribute that is a float") super().add_edge(u, v, **attr) self._edge_list = None self._short_paths_cache = {}
[docs] def add_edges_from(self, ebunch, **kwargs): """Add multiple edges. Parameters ---------- ebunch : Sequence[Tuple[int, int, Dict]] An iterable of edges. Edges must be specified as tuples (u, v, d) where d is a dict of edge attributes (must contain a ``'weight'`` attribute that is a float). kwargs Update attributes for all edges in ebunch. Edge attributes specified in ebunch as a tuple take precedence over attributes specified via keyword arguments. """ for e in ebunch: u, v = e[0:2] attr = e[2] attr.update(kwargs) self.add_edge(u, v, **attr)
[docs] def remove_edge(self, u, v): """ Remove an edge between nodes u and v. Parameters ---------- u : int Start node identifier v : int End node identifier """ super().remove_edge(u, v) self._edge_list = None self._short_paths_cache = {}
[docs] def remove_edges_from(self, ebunch): """Remove multiple edges. Parameters ---------- ebunch : Sequence[Tuple[int, int]] An iterable of edges. Edges must be specified as tuples (u, v) where u and v are node identifiers. """ for e in ebunch: u, v = e self.remove_edge(u, v)
[docs] def update(self, edges=None, nodes=None): """ Not implemented Raises ------ NotImplementedError This method is not implemented for :class:`~.RoadNetwork`. """ raise NotImplementedError("RoadNetwork does not support update")
[docs] def clear(self): """Remove all nodes and edges from the graph.""" super().clear() self._edge_list = None self._short_paths_cache = {}
[docs] def clear_edges(self): """Remove all edges from the graph without altering nodes.""" super().clear_edges() self._edge_list = None self._short_paths_cache = {}
[docs] def to_gdf(self, **kwargs): """Convert the road network to a GeoDataFrame. The GeoDataFrame has the following columns: - 'geometry': LineString A LineString representing the edge geometry. - 'weight': float The edge weight. - 'from_node': int The node identifier of the start node. - 'to_node': int The node identifier of the end node. """ d = {'geometry': [], 'weight': [], 'from_node': [], 'to_node': []} for e in self.edges: d['geometry'].append( LineString([self.nodes[e[0]]['pos'], self.nodes[e[1]]['pos']]) ) d['weight'].append(self.edges[e]['weight']) d['from_node'].append(e[0]) d['to_node'].append(e[1]) gdf = geopandas.GeoDataFrame(d, **kwargs) return gdf
[docs] @classmethod def from_dict(cls, dct): """Create a RoadNetwork from a dictionary. Parameters ---------- dct : dict A dictionary with keys ``'nodes'`` and ``'edges'``. The value of ``'nodes'`` is a dictionary mapping node identifiers to a dictionary of node attributes. The value of ``'edges'`` is a dictionary mapping edge identifiers to edge attributes. Returns ------- RoadNetwork A road network object Examples -------- >>> dct = {'nodes': {0: {'pos': (0, 0)}, 1: {'pos': (1, 0)}, ... 2: {'pos': (0, 1)}, 3: {'pos': (1, 1)}}, ... 'edges': {(0, 1): {'weight': 1}, (0, 2): {'weight': 1}, ... (1, 3): {'weight': 1}, (2, 3): {'weight': 1}}} >>> net = RoadNetwork.from_dict(dct) """ # Create empty graph object net = RoadNetwork() # Add nodes to graph for node, attr in dct['nodes'].items(): net.add_node(node, **attr) # Add edges to graph for edge, attr in dct['edges'].items(): net.add_edge(*edge, **attr) return net
[docs] def shortest_path(self, source=None, target=None, weight='weight', method='dijkstra', path_type='node'): """Compute the shortest path(s) between source and target. This method is a wrapper around the NetworkX :func:`networkx.shortest_path` function, that adds some additional functionality: - While the NetworkX function only allows to compute the shortest path(s) between a single source and a single target, this method allows to compute the shortest path(s) between a source node (or a list of source nodes) and a target node (or a list of target nodes). - The method allows to specify the type of path to return. By default, the method returns a list of node identifiers (same as NetworkX). However, it is also possible to return a list of edges, or even both types in a single call. - When a path is computed, it is stored in a cached dictionary. When possible, the method will attempt to retrieve the shortest path(s) from this cache, to improve performance. Parameters ---------- source : int or Iterable[int] or None A node identifier, or an iterable of node identifiers. If `None`, the shortest path(s) from all nodes to the specified target node(s) are computed. target : int or Iterable[int] or None A node identifier, or an iterable of node identifiers. If `None`, the shortest path(s) to all nodes from the specified source node(s) are computed. weight : str The edge attribute to use as edge weights. Attribute values must be floats. Default is ``'weight'``. method : str The method to use to compute the shortest path(s). Must be one of ``'dijkstra'`` or ``'bellman-ford'``. Default is ``'dijkstra'``. path_type : str The type of path to return. Must be one of ``'node'``, ``'edge'`` or ``'both'``. If ``'node'``, each shortest path is a list of node identifiers. If ``'edge'``, each shortest path is a list of edges. If ``'both'``, then both types of paths are returned (see bellow). Default is ``'node'``. Returns ------- dict The shortest path(s) between source and target. The format depends on the value of `path_type`: - If ``path_type`` is ``'node'``, return a dict whose keys are tuples (source, target) and values are lists of node identifiers. - If ``path_type`` is ``'edge'``, return a dict whose keys are tuples (source, target) and values are lists of edge identifiers. - If ``path_type`` is ``'both'``, return a dict with keys ``'node'`` and ``'edge'``, where the values follow the same format as above. """ # Source and target must be lists, or None source = [source] if (source is not None and not isinstance(source, Iterable)) else source target = [target] if (target is not None and not isinstance(target, Iterable)) else target # First attempt to get the shortest path(s) from cache try: return self._get_shortest_path_from_cache(source, target, weight, method, path_type) except KeyError: pass # If retrieving the shortest path(s) from cache failed, proceed to compute them short_paths = self._compute_short_paths(source, target, weight, method) # Update cache self._update_short_paths_cache(short_paths, weight, method) # Return the requested path(s) return self._format_shortest_paths_output(short_paths, path_type)
def _get_shortest_path_from_cache(self, source=None, target=None, weight='weight', method='dijkstra', path_type='both'): short_paths = { 'node': dict(), 'edge': dict() } if source is None: source = self.nodes if target is None: target = self.nodes # Attempt to get the requested path(s) from cache try: for s, t in itertools.product(source, target): if s == t: continue if path_type == 'both' or path_type == 'node': short_paths['node'][(s, t)] = \ self._short_paths_cache[method][weight]['node'][(s, t)] if path_type == 'both' or path_type == 'edge': short_paths['edge'][(s, t)] = \ self._short_paths_cache[method][weight]['edge'][(s, t)] except KeyError as e: raise KeyError('Shortest path not found for the given source and target') from e # Return the requested path(s) return self._format_shortest_paths_output(short_paths, path_type) def _compute_short_paths(self, source, target, weight, method): paths = dict() if source is None and target is None: # Compute all shortest paths, from all nodes, to all nodes paths = nx.shortest_path(self, weight=weight, method=method) if not isinstance(paths, dict): # Fix for NetworkX 3.5+, where shortest_path returns an iterator of tuples # instead of a dictionary paths = {p[0]: p[1] for p in paths} elif source is None or target is None: # If source or target is None, compute the shortest paths from/to the given nodes iter_ = source if source is not None else target for x in iter_: extra_kwargs = {'source': x} if source is not None else {'target': x} x_paths = nx.shortest_path(self, weight=weight, method=method, **extra_kwargs) for y, node_path in x_paths.items(): if x == y: continue s = x if source is not None else y t = y if source is not None else x try: paths[s][t] = node_path except KeyError: paths[s] = {t: node_path} else: # Compute shortest paths from each source to each target for s, t in itertools.product(source, target): try: path = nx.shortest_path(self, source=s, target=t, weight=weight, method=method) except nx.NetworkXNoPath: continue try: paths[s][t] = path except KeyError: paths[s] = {t: path} # Convert computed paths to node and edge paths short_paths = { 'node': dict(), 'edge': dict() } edges_index = {edge: i for i, edge in enumerate(self.edges)} for s, dct in paths.items(): for t, node_path in dct.items(): if s == t: continue path_edges = zip(node_path, node_path[1:]) edge_path = [edges_index[edge] for edge in path_edges] short_paths['node'][(s, t)], short_paths['edge'][(s, t)] = (node_path, edge_path) return short_paths def _update_short_paths_cache(self, paths, weight='weight', method='dijkstra'): if method not in self._short_paths_cache: self._short_paths_cache[method] = dict() if weight not in self._short_paths_cache[method]: self._short_paths_cache[method][weight] = {'node': dict(), 'edge': dict()} self._short_paths_cache[method][weight]['node'].update(paths['node']) self._short_paths_cache[method][weight]['edge'].update(paths['edge']) @staticmethod def _format_shortest_paths_output(paths, path_type): # Return output if path_type == 'node': return paths['node'] elif path_type == 'edge': return paths['edge'] return paths