Source code for stonesoup.writer.kafka

import json

try:
    from confluent_kafka import Producer
except ImportError as error:  # pragma: no cover
    raise ImportError(
        "Kafka Writers require the dependency 'confluent-kafka' to be installed."
    ) from error

from .base import Writer
from ..base import Property


[docs] class KafkaWriter(Writer): """A simple Kafka writer that writes data to a Kafka topic. Parameters ---------- """ kafka_config: dict[str, str] = Property( doc="Configuration properties for the underlying kafka consumer. See the " "`confluent-kafka documentation <https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration>`_ " # noqa "for more details.") def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._producer = Producer(self.kafka_config) def write(self, topic, data, flush=True): as_json = json.dumps(data) self._producer.produce(topic, as_json) if flush: self._producer.flush()