Source code for stonesoup.serialise

"""Provides an ability to serialise Stone Soup objects into and from YAML.

Stone Soup utilises YAML_ for serialisation. The :doc:`stonesoup.base`
feature of components is exploited in order to store the data of the
components and data types.

This module functions as a plug-in for ruamel.yaml_, specified by
:code:`typ='stonesoup'`, but for convenience it is recommended to
use :class:`~.stonesoup.serialise.YAML` which defaults with the plug-in
enabled.

It is also possible to extend the serialisation for other types with
Stone Soup, via `stonesoup.serialise.yaml` entry point, typically
expected to be used with :mod:`stonesoup.plugins`. The entry point
should point to a function which expects a single argument, a
:class:`ruamel.yaml.YAML` instance.

For example:

.. code-block:: python

    setup(
        ...
        entry_points={
            'stonesoup.plugins': 'my_plugin = my_package',
            'stonesoup.serialise.yaml': 'my_plugin = my_package:yaml_init_func}
        ...
    )


.. _YAML: http://yaml.org/
.. _ruamel.yaml: https://yaml.readthedocs.io/
"""
import datetime
import warnings
from io import StringIO
from collections import OrderedDict, deque
from functools import lru_cache
from pathlib import Path
from importlib import import_module

import numpy as np
import pkg_resources
import ruamel.yaml
from ruamel.yaml.constructor import ConstructorError

from .base import Base, Property
from .types.angle import Angle
from .types.array import Matrix, StateVector
from .types.numeric import Probability
from .sensor.sensor import Sensor

__all__ = ['YAML']
typ = 'stonesoup'


def init_typ(yaml):
    # Load additional custom serialisation
    for entry_point in pkg_resources.iter_entry_points('stonesoup.serialise.yaml'):
        try:
            entry_point.load()(yaml)
        except (ImportError, ModuleNotFoundError) as e:
            warnings.warn(f'Failed to load module. {e}')

    # NumPy
    yaml.representer.add_multi_representer(np.ndarray, ndarray_to_yaml)
    yaml.constructor.add_constructor("!numpy.ndarray", ndarray_from_yaml)
    yaml.representer.add_multi_representer(np.integer, yaml.representer.yaml_representers[int])
    yaml.representer.add_multi_representer(np.floating, yaml.representer.yaml_representers[float])

    # Datetime
    yaml.representer.add_representer(datetime.timedelta, timedelta_to_yaml)
    yaml.constructor.add_constructor("!datetime.timedelta", timedelta_from_yaml)

    # Path
    yaml.representer.add_multi_representer(Path, path_to_yaml)
    yaml.constructor.add_constructor("!pathlib.Path", path_from_yaml)

    # deque
    yaml.representer.add_representer(deque, deque_to_yaml)
    yaml.constructor.add_constructor("!collections.deque", deque_from_yaml)
    # Probability
    yaml.representer.add_representer(Probability, probability_to_yaml)
    yaml.constructor.add_constructor(yaml_tag(Probability), probability_from_yaml)

    # Angle
    yaml.representer.add_multi_representer(Angle, angle_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.types.angle.', angle_from_yaml)

    # Array
    yaml.representer.add_multi_representer(Matrix, ndarray_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.types.array.', array_from_yaml)

    # Declarative classes
    yaml.representer.add_multi_representer(Base, declarative_to_yaml)
    yaml.constructor.add_multi_constructor('!stonesoup.', declarative_from_yaml)


[docs]class YAML(ruamel.yaml.YAML): """Class for YAML serialisation in Stone Soup.""" def __init__(self, **kwargs): typ = kwargs.pop('typ', ['rt']) if isinstance(typ, str): typ = [typ] typ.append('stonesoup') if kwargs.get('plug_ins') is None: kwargs['plug_ins'] = [] kwargs['plug_ins'].append('stonesoup.serialise') super().__init__(typ=typ, **kwargs) self.representer.default_flow_style = False self.representer.sort_base_mapping_type_on_output = False
[docs] def dumps(self, data, *args, **kwargs): """Return as a string.""" stream = StringIO() self.dump(data, stream, *args, **kwargs) return stream.getvalue()
def yaml_tag(class_): """Return YAML tag for object. Constructed from module and class name.""" return f"!{class_.__module__}.{class_.__qualname__}" def declarative_to_yaml(representer, node): """Convert declarative class instances to YAML. Store as mapping of declared properties, skipping any which are the default value.""" node_properties = OrderedDict(type(node).properties) # Special case of a sensor with a default platform if isinstance(node, Sensor) and node._has_internal_controller: node_properties['position'] = Property(StateVector) node_properties['orientation'] = Property(StateVector) return representer.represent_omap( yaml_tag(type(node)), OrderedDict((name, getattr(node, name)) for name, property_ in node_properties.items() if getattr(node, name) is not property_.default)) def declarative_from_yaml(constructor, tag_suffix, node): """Convert YAML to declarative class instances.""" try: class_ = get_class(f'!stonesoup.{tag_suffix}') except ImportError: raise ConstructorError( "while constructing a Stone Soup component", node.start_mark, f"unable to import component 'stonesoup.{tag_suffix}'", node.start_mark) # Must have deep construct here to ensure mutable sub-objects are fully created. constructor.deep_construct = True properties = [ data for data in constructor.construct_yaml_omap(node)][0] try: return class_(**properties) except Exception as e: raise ConstructorError("while constructing Stone Soup component", node.start_mark, str(e), node.start_mark) @lru_cache(None) def get_class(tag): classes = [ subclass for subclass in Base.subclasses if yaml_tag(subclass) == tag] if len(classes) > 1: warnings.warn( f"Multiple possible classes found for YAML tag {tag!r}", UserWarning) elif not classes: module_name, class_name = tag.lstrip('!').rsplit(".", 1) module = import_module(module_name) classes = [getattr(module, class_name, None)] if classes[0] is None: raise ImportError(f"Unable to find {tag!r}") return classes[0] def probability_to_yaml(representer, node): return representer.represent_scalar(yaml_tag(type(node)), str(node)) def probability_from_yaml(constructor, node): string = constructor.construct_scalar(node) if string.startswith('exp('): return Probability(float(string[4:-1]), log_value=True) else: return Probability(float(string)) def angle_to_yaml(representer, node): return representer.represent_scalar(yaml_tag(type(node)), str(node)) def angle_from_yaml(constructor, tag_suffix, node): class_ = get_class(f'!stonesoup.types.angle.{tag_suffix}') return class_(float(constructor.construct_scalar(node))) def ndarray_to_yaml(representer, node): """Convert numpy.ndarray to YAML.""" # If using "round trip" type, change flow style to make more readable if node.ndim > 1 and 'rt' in representer.dumper.typ: array = [representer.dumper.seq(row) for row in node.tolist()] [seq.fa.set_flow_style() for seq in array] else: array = node.tolist() return representer.represent_sequence(yaml_tag(type(node)), array) def ndarray_from_yaml(constructor, node): """Convert YAML to numpy.ndarray.""" return np.array(constructor.construct_sequence(node, deep=True)) def array_from_yaml(constructor, tag_suffix, node): """Convert YAML to numpy.ndarray.""" class_ = get_class(f'!stonesoup.types.array.{tag_suffix}') return class_(constructor.construct_sequence(node, deep=True)) def timedelta_to_yaml(representer, node): """Convert datetime.timedelta to YAML. Value is total number of seconds.""" return representer.represent_scalar("!datetime.timedelta", str(node.total_seconds())) def timedelta_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return datetime.timedelta(seconds=float(constructor.construct_scalar(node))) def path_to_yaml(representer, node): """Convert path to YAML. Value is total number of seconds.""" return representer.represent_scalar("!pathlib.Path", str(node)) def path_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return Path(constructor.construct_scalar(node)) def deque_to_yaml(representer, node): """Convert collections.deque to YAML""" return representer.represent_sequence("!collections.deque", (list(node), node.maxlen)) def deque_from_yaml(constructor, node): """Convert YAML to collections.deque""" iterable, maxlen = constructor.construct_sequence(node, deep=True) return deque(iterable, maxlen)