Source code for stonesoup.serialise

"""Provides an ability to serialise Stone Soup objects into and from YAML.

Stone Soup utilises YAML_ for serialisation. The :doc:`stonesoup.base`
feature of components is exploited in order to store the data of the
components and data types.

This module functions as a plug-in for ruamel.yaml_, specified by
:code:`typ='stonesoup'`, but for convenience it is recommended to
use :class:`~.stonesoup.serialise.YAML` which defaults with the plug-in
enabled.

It is also possible to extend the serialisation for other types with
Stone Soup, via `stonesoup.serialise.yaml` entry point, typically
expected to be used with :mod:`stonesoup.plugins`. The entry point
should point to a function which expects a single argument, a
:class:`ruamel.yaml.YAML` instance.

For example:

.. code-block:: python

    setup(
        ...
        entry_points={
            'stonesoup.plugins': 'my_plugin = my_package',
            'stonesoup.serialise.yaml': 'my_plugin = my_package:yaml_init_func}
        ...
    )


.. _YAML: http://yaml.org/
.. _ruamel.yaml: https://yaml.readthedocs.io/
"""
import datetime
import warnings
from io import StringIO
from collections import OrderedDict, deque
from functools import lru_cache
from pathlib import Path
from importlib import import_module
from importlib.metadata import entry_points

import numpy as np
import ruamel.yaml
from ruamel.yaml.constructor import ConstructorError

from .base import Base, Property
from .types.angle import Angle
from .types.array import Matrix, StateVector
from .types.numeric import Probability
from .sensor.sensor import Sensor

__all__ = ['YAML']
typ = 'stonesoup'


def init_typ(yaml):
    # Load additional custom serialisation
    eps = entry_points()
    try:
        entrypoints = eps['stonesoup.serialise.yaml']
    except KeyError:
        entrypoints = []
    for entry_point in entrypoints:
        try:
            entry_point.load()(yaml)
        except (ImportError, ModuleNotFoundError) as e:
            warnings.warn(f'Failed to load module. {e}')

    # NumPy
    yaml.representer.add_multi_representer(np.ndarray, ndarray_to_yaml)
    yaml.constructor.add_constructor("!numpy.ndarray", ndarray_from_yaml)
    yaml.representer.add_multi_representer(np.integer, yaml.representer.yaml_representers[int])
    yaml.representer.add_multi_representer(np.floating, npfloating_as_yaml)

    # Datetime
    yaml.representer.add_representer(datetime.timedelta, timedelta_to_yaml)
    yaml.constructor.add_constructor("!datetime.timedelta", timedelta_from_yaml)

    # Path
    yaml.representer.add_multi_representer(Path, path_to_yaml)
    yaml.constructor.add_constructor("!pathlib.Path", path_from_yaml)

    # deque
    yaml.representer.add_representer(deque, deque_to_yaml)
    yaml.constructor.add_constructor("!collections.deque", deque_from_yaml)
    # Probability
    yaml.representer.add_representer(Probability, probability_to_yaml)
    yaml.constructor.add_constructor(yaml_tag(Probability), probability_from_yaml)

    # Angle
    yaml.representer.add_multi_representer(Angle, angle_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.types.angle.', angle_from_yaml)

    # Array
    yaml.representer.add_multi_representer(Matrix, ndarray_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.types.array.', array_from_yaml)

    # Declarative classes
    yaml.representer.add_multi_representer(Base, declarative_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.', declarative_from_yaml)


[docs] class YAML(ruamel.yaml.YAML): """Class for YAML serialisation in Stone Soup.""" def __init__(self, **kwargs): typ = kwargs.pop('typ', ['rt']) if isinstance(typ, str): typ = [typ] typ.append('stonesoup') if kwargs.get('plug_ins') is None: kwargs['plug_ins'] = [] kwargs['plug_ins'].append('stonesoup.serialise') super().__init__(typ=typ, **kwargs) self.representer.default_flow_style = False self.representer.sort_base_mapping_type_on_output = False
[docs] def dumps(self, data, *args, **kwargs): """Return as a string.""" stream = StringIO() self.dump(data, stream, *args, **kwargs) return stream.getvalue()
def yaml_tag(class_): """Return YAML tag for object. Constructed from module and class name.""" return f"!{class_.__module__}.{class_.__qualname__}" def declarative_to_yaml(representer, node): """Convert declarative class instances to YAML. Store as mapping of declared properties, skipping any which are the default value.""" node_properties = OrderedDict(type(node).properties) # Special case of a sensor with a default platform if isinstance(node, Sensor) and node._has_internal_controller: node_properties['position'] = Property(StateVector) node_properties['orientation'] = Property(StateVector) return representer.represent_omap( yaml_tag(type(node)), OrderedDict((name, getattr(node, name)) for name, property_ in node_properties.items() if getattr(node, name) is not property_.default)) def declarative_from_yaml(constructor, tag_suffix, node): """Convert YAML to declarative class instances.""" try: class_ = get_class(f'!stonesoup.{tag_suffix}') except ImportError: raise ConstructorError( "while constructing a Stone Soup component", node.start_mark, f"unable to import component 'stonesoup.{tag_suffix}'", node.start_mark) # Must have deep construct here to ensure mutable sub-objects are fully created. constructor.deep_construct = True properties = [ data for data in constructor.construct_yaml_omap(node)][0] try: return class_(**properties) except Exception as e: raise ConstructorError("while constructing Stone Soup component", node.start_mark, str(e), node.start_mark) @lru_cache(None) def get_class(tag): classes = [ subclass for subclass in Base.subclasses if yaml_tag(subclass) == tag] if len(classes) > 1: warnings.warn( f"Multiple possible classes found for YAML tag {tag!r}", UserWarning) elif not classes: module_name, class_name = tag.lstrip('!').rsplit(".", 1) module = import_module(module_name) classes = [getattr(module, class_name, None)] if classes[0] is None: raise ImportError(f"Unable to find {tag!r}") return classes[0] def probability_to_yaml(representer, node): return representer.represent_scalar(yaml_tag(type(node)), str(node)) def probability_from_yaml(constructor, node): string = constructor.construct_scalar(node) if string.startswith('exp('): return Probability(float(string[4:-1]), log_value=True) else: return Probability(float(string)) def angle_to_yaml(representer, node): return representer.represent_scalar(yaml_tag(type(node)), str(node)) def angle_from_yaml(constructor, tag_suffix, node): class_ = get_class(f'!stonesoup.types.angle.{tag_suffix}') return class_(float(constructor.construct_scalar(node))) def npfloating_as_yaml(representer, node): """Convert np.floating to YAML.""" return representer.yaml_representers[float](representer, float(node)) def ndarray_to_yaml(representer, node): """Convert numpy.ndarray to YAML.""" # If using "round trip" type, change flow style to make more readable if node.ndim > 1 and 'rt' in representer.dumper.typ: array = [representer.dumper.seq(row) for row in node.tolist()] [seq.fa.set_flow_style() for seq in array] else: array = node.tolist() return representer.represent_sequence(yaml_tag(type(node)), array) def ndarray_from_yaml(constructor, node): """Convert YAML to numpy.ndarray.""" return np.array(constructor.construct_sequence(node, deep=True)) def array_from_yaml(constructor, tag_suffix, node): """Convert YAML to numpy.ndarray.""" class_ = get_class(f'!stonesoup.types.array.{tag_suffix}') return class_(constructor.construct_sequence(node, deep=True)) def timedelta_to_yaml(representer, node): """Convert datetime.timedelta to YAML. Value is total number of seconds.""" return representer.represent_scalar("!datetime.timedelta", str(node.total_seconds())) def timedelta_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return datetime.timedelta(seconds=float(constructor.construct_scalar(node))) def path_to_yaml(representer, node): """Convert path to YAML. Value is total number of seconds.""" return representer.represent_scalar("!pathlib.Path", str(node)) def path_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return Path(constructor.construct_scalar(node)) def deque_to_yaml(representer, node): """Convert collections.deque to YAML""" return representer.represent_sequence("!collections.deque", (list(node), node.maxlen)) def deque_from_yaml(constructor, node): """Convert YAML to collections.deque""" iterable, maxlen = constructor.construct_sequence(node, deep=True) return deque(iterable, maxlen)