Source code for stonesoup.reader.opensky

import datetime
from time import sleep

try:
    import requests
    from requests.compat import urljoin
except ImportError as error:
    raise ImportError(
        "Usage of opensky requires the dependency 'requests' is installed. ") from error


from .base import Reader, DetectionReader, GroundTruthReader
from ..base import Property
from ..buffered_generator import BufferedGenerator
from ..types.detection import Detection
from ..types.groundtruth import GroundTruthPath, GroundTruthState
from ..types.state import State


class _OpenSkyNetworkReader(Reader):
    """OpenSky Network reader

    This reader uses the `OpenSky Network <https://opensky-network.org/>`_ REST
    API to fetch air traffic control data.

    The state vector consists of longitude, latitude
    (in decimal degrees) and altitude (in meters).

    .. note::

        By using this reader, you are agreeing to `OpenSky Network's terms of
        use <https://opensky-network.org/about/terms-of-use>`_.

    """

    url = "https://opensky-network.org/"
    sources = {
        0: "ADS-B",
        1: "ASTERIX",
        2: "MLAT",
        3: "FLARM",
    }

    bbox: tuple[float, float, float, float] = Property(
        default=None,
        doc="Bounding box to filter data to (left, bottom, right, top). "
            "Default `None` which will include global data.")
    timestep: datetime.timedelta = Property(
        default=datetime.timedelta(seconds=15),
        doc="Time of each poll after reported time from OpenSky. "
            "Must be greater than 10 seconds. Default 15 seconds.")

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if self.timestep < datetime.timedelta(seconds=10):
            raise ValueError("'timestep' must be >= 10 seconds.")

    def data_gen(self):
        if self.bbox:
            params = {
             'lomin': self.bbox[0],
             'lamin': self.bbox[1],
             'lomax': self.bbox[2],
             'lamax': self.bbox[3],
             }
        else:
            params = {}  # Global

        url = urljoin(self.url, "api/states/all")
        time = None
        with requests.Session() as session:
            while True:
                response = session.get(url, params=params)
                response.raise_for_status()
                data = response.json()

                states_and_metadata = []
                for state in data['states']:
                    if state[8]:  # On ground
                        continue
                    # Must have position (lon, lat, geo-alt)
                    if not all(state[index] for index in (5, 6, 13)):
                        continue
                    timestamp = datetime.datetime.fromtimestamp(
                        state[3], datetime.timezone.utc).replace(tzinfo=None)
                    # Skip old detections
                    if time is not None and timestamp <= time:
                        continue

                    states_and_metadata.append((
                        State([[state[5]], [state[6]], [state[13]]], timestamp=timestamp),
                        {
                            'icao24': state[0],
                            'callsign': state[1],
                            'orign_country': state[2],
                            'sensors': state[12],
                            'squawk': state[14],
                            'spi': state[15],
                            'source': self.sources[state[16]],
                        }
                    ))
                time = datetime.datetime.fromtimestamp(
                    data['time'], datetime.timezone.utc).replace(tzinfo=None)
                yield time, states_and_metadata

                while time + self.timestep > datetime.datetime.now(
                        datetime.timezone.utc).replace(tzinfo=None):
                    sleep(0.1)


[docs] class OpenSkyNetworkDetectionReader(_OpenSkyNetworkReader, DetectionReader): """OpenSky Network detection reader This reader uses the `OpenSky Network <https://opensky-network.org/>`_ REST API to fetch air traffic control data. The detection state vector consists of longitude, latitude (in decimal degrees) and altitude (in meters). .. note:: By using this reader, you are agreeing to `OpenSky Network's terms of use <https://opensky-network.org/about/terms-of-use>`_. """
[docs] @BufferedGenerator.generator_method def detections_gen(self): for time, states_and_metadata in self.data_gen(): yield time, {Detection(state.state_vector, state.timestamp, metadata) for state, metadata in states_and_metadata}
[docs] class OpenSkyNetworkGroundTruthReader(_OpenSkyNetworkReader, GroundTruthReader): """OpenSky Network groundtruth reader This reader uses the `OpenSky Network <https://opensky-network.org/>`_ REST API to fetch air traffic control data. The groundtruth state vector consists of longitude, latitude (in decimal degrees) and altitude (in meters). Paths that are yielded are grouped based on the International Civil Aviation Organisation (ICAO) 24-bit address. .. note:: By using this reader, you are agreeing to `OpenSky Network's terms of use <https://opensky-network.org/about/terms-of-use>`_. """
[docs] @BufferedGenerator.generator_method def groundtruth_paths_gen(self): groundtruth_dict = {} for time, states_and_metadata in self.data_gen(): updated_paths = set() for state, metadata in states_and_metadata: path_id = metadata.get('icao24') if path_id is None: path = GroundTruthPath() else: if path_id not in groundtruth_dict: groundtruth_dict[path_id] = GroundTruthPath([], id=path_id) path = groundtruth_dict[path_id] path.append(GroundTruthState(state.state_vector, state.timestamp, metadata)) updated_paths.add(groundtruth_dict[path_id]) yield time, updated_paths