import datetime
from typing import Set, Union
from itertools import combinations
from ..base import Property
from .base import Type
from .time import TimeRange, CompoundTimeRange
[docs]
class Association(Type):
"""Association type
An association between objects.
"""
# TODO: Should probably add a link to the associator that produced it
objects: Set = Property(doc="Set of objects being associated.")
[docs]
class AssociationPair(Association):
"""AssociationPair type
An :class:`~.Association` representing the association of two objects.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if len(self.objects) != 2:
raise ValueError("Only two objects can be associated in one "
"AssociationPair object.")
[docs]
class SingleTimeAssociation(Association):
"""SingleTimeAssociation type
An :class:`~.Association` representing the linking of objects at a single
time.
"""
timestamp: datetime.datetime = Property(
default=None,
doc="Timestamp of the association. Default is None.")
[docs]
class TimeRangeAssociation(Association):
"""TimeRangeAssociation type
An :class:`~.AssociationPair` representing the linking of objects over a
range of times.
"""
time_range: Union[CompoundTimeRange, TimeRange] = Property(
default=None, doc="Range of times that association exists over. Default is None.")
@property
def duration(self):
return self.time_range.duration.total_seconds()
[docs]
class AssociationSet(Type):
"""AssociationSet type
A set of :class:`~.Association` type objects representing multiple
independent associations. Contains functions for indexing into the
associations.
"""
associations: Set[Association] = Property(default=None, doc="Set of independent associations.")
def __init__(self, associations=None, *args, **kwargs):
super().__init__(associations, *args, **kwargs)
if self.associations is None:
self.associations = set()
if not all(isinstance(member, Association) for member in self.associations):
raise TypeError("Association set must contain only Association instances.")
self._simplify()
def __eq__(self, other):
return self.associations == other.associations
def add(self, association):
if association is None:
return
elif isinstance(association, Association):
self.associations.add(association)
elif isinstance(association, AssociationSet):
for component in association:
self.add(component)
else:
raise TypeError("Supplied parameter must be an Association or AssociationSet.")
self._simplify()
def _simplify(self):
"""Where multiple associations describe the same pair of objects, combine them into one.
This is only implemented for pairs with a time_range attribute - others will be skipped.
"""
to_remove = set()
for (assoc1, assoc2) in combinations(self.associations, 2):
if not (len(assoc1.objects) == 2 and len(assoc2.objects) == 2) or \
not (hasattr(assoc1, 'time_range') and hasattr(assoc2, 'time_range')):
continue
if assoc1.objects == assoc2.objects:
if isinstance(assoc1.time_range, CompoundTimeRange):
assoc1.time_range.add(assoc2.time_range)
to_remove.add(assoc2)
elif isinstance(assoc2.time_range, CompoundTimeRange):
assoc2.time_range.add(assoc1.time_range)
to_remove.add(assoc1)
else:
assoc1.time_range = CompoundTimeRange([assoc1.time_range, assoc2.time_range])
to_remove.add(assoc2)
for assoc in to_remove:
self.remove(assoc)
def remove(self, association):
if association is None:
return
elif isinstance(association, Association):
if association not in self.associations:
raise ValueError("Supplied parameter must be contained by this instance.")
self.associations.remove(association)
elif isinstance(association, AssociationSet):
for component in association:
self.remove(component)
else:
raise TypeError("Supplied parameter must be an Association or AssociationSet.")
@property
def key_times(self):
"""Returns all timestamps at which a component starts or ends, or where there is a
:class:`.~SingleTimeAssociation`."""
key_times = list(self.overall_time_range.key_times)
for association in self.associations:
if isinstance(association, SingleTimeAssociation):
key_times.append(association.timestamp)
return sorted(key_times)
@property
def overall_time_range(self):
"""Returns a :class:`~.CompoundTimeRange` covering all times at which at least
one association is active.
Note: :class:`~.SingleTimeAssociation` are not counted.
"""
overall_range = CompoundTimeRange()
for association in self.associations:
if hasattr(association, 'time_range'):
overall_range.add(association.time_range)
return overall_range
@property
def object_set(self):
"""Returns a set of all objects contained by this instance.
"""
object_set = set()
for assoc in self.associations:
for obj in assoc.objects:
object_set.add(obj)
return object_set
[docs]
def associations_at_timestamp(self, timestamp):
"""Return the associations that exist at a given timestamp.
Method will return a set of all the :class:`~.Association` type
objects which occur at the specified time stamp.
Parameters
----------
timestamp: datetime.datetime
Timestamp at which associations should be identified.
Returns
-------
: :class:`~.AssociationSet`
Associations which occur at specified timestamp.
"""
if not isinstance(timestamp, datetime.datetime):
raise TypeError("Supplied parameter must be a datetime.datetime object.")
ret_associations = set()
for association in self.associations:
# If the association is at a single time
if hasattr(association, "timestamp"):
if association.timestamp == timestamp:
ret_associations.add(association)
else:
if timestamp in association.time_range:
ret_associations.add(association)
return AssociationSet(ret_associations)
[docs]
def associations_including_objects(self, objects):
"""Return associations that include all the given objects.
Method will return the set of all the :class:`~.Association` type
objects which contain an association with the provided object.
Parameters
----------
objects: set of objects
A set of objects to look for in associations.
Returns
-------
: :class:`~.AssociationSet`
A set of associations containing every member of objects.
"""
# Ensure objects is iterable
if not isinstance(objects, list) and not isinstance(objects, set):
objects = {objects}
return AssociationSet({association
for association in self.associations
if all(object_ in association.objects
for object_ in objects)})
def __contains__(self, item):
return item in self.associations
def __iter__(self):
return iter(self.associations)
def __len__(self):
return len(self.associations)