Source code for stonesoup.serialise

# -*- coding: utf-8 -*-
"""Provides an ability to serialise Stone Soup objects into and from YAML.

Stone Soup utilises YAML_ for serialisation. The :doc:`stonesoup.base`
feature of components is exploited in order to store the data of the
components and data types.

.. _YAML: http://yaml.org/"""
import datetime
import warnings
from io import StringIO
from collections import OrderedDict, deque
from functools import lru_cache
from pathlib import Path
from importlib import import_module

import numpy as np
import ruamel.yaml
from ruamel.yaml.constructor import ConstructorError

from .base import Base, Property
from .types.angle import Angle
from .types.array import Matrix, StateVector
from .types.numeric import Probability
from .sensor.sensor import Sensor

__all__ = ['YAML']


[docs]class YAML: """Class for YAML serialisation.""" tag_prefix = '!{}.'.format(__name__.split('.', 1)[0]) def __init__(self, typ='rt'): self._yaml = ruamel.yaml.YAML(typ=[typ]) self._yaml.default_flow_style = False # NumPy self._yaml.representer.add_multi_representer( np.ndarray, self.ndarray_to_yaml) self._yaml.constructor.add_constructor( "!numpy.ndarray", self.ndarray_from_yaml) self._yaml.representer.add_multi_representer( np.integer, self.numpy_int_to_yaml ) self._yaml.representer.add_multi_representer( np.floating, self.numpy_float_to_yaml ) # Datetime self._yaml.representer.add_representer( datetime.timedelta, self.timedelta_to_yaml) self._yaml.constructor.add_constructor( "!datetime.timedelta", self.timedelta_from_yaml) # Path self._yaml.representer.add_multi_representer( Path, self.path_to_yaml) self._yaml.constructor.add_constructor( "!pathlib.Path", self.path_from_yaml) # deque self._yaml.representer.add_representer( deque, self.deque_to_yaml) self._yaml.constructor.add_constructor( "!collections.deque", self.deque_from_yaml) # Probability self._yaml.representer.add_representer( Probability, self.probability_to_yaml) self._yaml.constructor.add_constructor( self.yaml_tag(Probability), self.probability_from_yaml) # Angle self._yaml.representer.add_multi_representer( Angle, self.angle_to_yaml) self._yaml.constructor.add_multi_constructor( '{}types.angle.'.format(self.tag_prefix), self.angle_from_yaml) # Array self._yaml.representer.add_multi_representer( Matrix, self.ndarray_to_yaml) self._yaml.constructor.add_multi_constructor( '{}types.array.'.format(self.tag_prefix), self.array_from_yaml) # Declarative classes self._yaml.representer.add_multi_representer( Base, self.declarative_to_yaml) self._yaml.constructor.add_multi_constructor( self.tag_prefix, self.declarative_from_yaml) def dump(self, data, stream, **kwargs): return self._yaml.dump(data, stream, **kwargs)
[docs] def dumps(self, data, *args, **kwargs): """Return as a string.""" stream = StringIO() self.dump(data, stream, *args, **kwargs) return stream.getvalue()
def dump_all(self, documents, stream, **kwargs): return self._yaml.dump_all(documents, stream, **kwargs) def load(self, stream): return self._yaml.load(stream) def load_all(self, stream): yield from self._yaml.load_all(stream)
[docs] @classmethod def yaml_tag(cls, class_): """Return YAML tag for object. Constructed from module and class name.""" return "!{}.{}".format(class_.__module__, class_.__qualname__)
[docs] @classmethod def declarative_to_yaml(cls, representer, node): """Convert declarative class instances to YAML. Store as mapping of declared properties, skipping any which are the default value.""" node_properties = OrderedDict(type(node).properties) # Special case of a sensor with a default platform if isinstance(node, Sensor) and node._has_internal_platform: node_properties['position'] = Property(StateVector) node_properties['orientation'] = Property(StateVector) return representer.represent_omap( cls.yaml_tag(type(node)), OrderedDict((name, getattr(node, name)) for name, property_ in node_properties.items() if getattr(node, name) is not property_.default))
[docs] @classmethod def declarative_from_yaml(cls, constructor, tag_suffix, node): """Convert YAML to declarative class instances.""" try: class_ = cls._get_class(tag_suffix) except ImportError: raise ConstructorError( "while constructing a Stone Soup component", node.start_mark, "unable to import component {!r}".format(tag_suffix), node.start_mark) # Must have deep construct here to ensure mutable sub-objects are fully created. constructor.deep_construct = True properties = [ data for data in constructor.construct_yaml_omap(node)][0] try: return class_(**properties) except Exception as e: raise ConstructorError("while constructing Stone Soup component", node.start_mark, str(e), node.start_mark)
@classmethod @lru_cache(None) def _get_class(cls, tag_suffix): tag = cls.tag_prefix + tag_suffix classes = [ subclass for subclass in Base.subclasses if cls.yaml_tag(subclass) == tag] if len(classes) > 1: warnings.warn( "Multiple possible classes found for YAML tag {!r}".format( tag), UserWarning) elif not classes: module_name, class_name = tag_suffix.rsplit(".", 1) module = import_module("..{}".format(module_name), __name__) classes = [getattr(module, class_name, None)] if classes[0] is None: raise ImportError("Unable to find {!r}".format(tag)) return classes[0] @classmethod def probability_to_yaml(cls, representer, node): return representer.represent_scalar(cls.yaml_tag(type(node)), str(node)) @staticmethod def probability_from_yaml(constructor, node): string = constructor.construct_scalar(node) if string.startswith('exp('): return Probability(float(string[4:-1]), log_value=True) else: return Probability(float(string)) @classmethod def angle_to_yaml(cls, representer, node): return representer.represent_scalar(cls.yaml_tag(type(node)), str(node)) @classmethod def angle_from_yaml(cls, constructor, tag_suffix, node): class_ = cls._get_class('types.angle.{}'.format(tag_suffix)) return class_(float(constructor.construct_scalar(node)))
[docs] def ndarray_to_yaml(self, representer, node): """Convert numpy.ndarray to YAML.""" # If using "round trip" type, change flow style to make more readable if node.ndim > 1 and 'rt' in self._yaml.typ: array = [self._yaml.seq(row) for row in node.tolist()] [seq.fa.set_flow_style() for seq in array] else: array = node.tolist() return representer.represent_sequence(self.yaml_tag(type(node)), array)
[docs] @staticmethod def ndarray_from_yaml(constructor, node): """Convert YAML to numpy.ndarray.""" return np.array(constructor.construct_sequence(node, deep=True))
[docs] @classmethod def array_from_yaml(cls, constructor, tag_suffix, node): """Convert YAML to numpy.ndarray.""" class_ = cls._get_class('types.array.{}'.format(tag_suffix)) return class_(constructor.construct_sequence(node, deep=True))
[docs] @staticmethod def numpy_int_to_yaml(representer, node): """Convert numpy ints to YAML""" return representer.represent_int(node)
[docs] @staticmethod def numpy_float_to_yaml(representer, node): """Convert numpy floats to YAML""" return representer.represent_float(node)
[docs] @staticmethod def timedelta_to_yaml(representer, node): """Convert datetime.timedelta to YAML. Value is total number of seconds.""" return representer.represent_scalar( "!datetime.timedelta", str(node.total_seconds()))
[docs] @staticmethod def timedelta_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return datetime.timedelta( seconds=float(constructor.construct_scalar(node)))
[docs] @staticmethod def path_to_yaml(representer, node): """Convert path to YAML. Value is total number of seconds.""" return representer.represent_scalar( "!pathlib.Path", str(node))
[docs] @staticmethod def path_from_yaml(constructor, node): """Convert YAML to datetime.timedelta. Value should be total number of seconds.""" return Path(constructor.construct_scalar(node))
[docs] @staticmethod def deque_to_yaml(representer, node): """Convert collections.deque to YAML""" return representer.represent_sequence( "!collections.deque", (list(node), node.maxlen))
[docs] @staticmethod def deque_from_yaml(constructor, node): """Convert YAML to collections.deque""" iterable, maxlen = constructor.construct_sequence(node, deep=True) return deque(iterable, maxlen)