Запись Spark-структурированного потокового вывода в тему Kafka - PullRequest
0 голосов
/ 26 июня 2018

У меня есть простое структурированное потоковое приложение, которое просто читает данные из одной темы Кафки и пишет в другую.

SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("test");

SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

Dataset<Row> dataset = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "start")
        .load();

StreamingQuery query = dataset
        .writeStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("checkpointLocation", "checkpoint")
        .option("topic", "end")
        .start();

query.awaitTermination(20000);

По теме start необходимо обработать два сообщения. Этот код выполняется без исключений, однако сообщения на тему end не появляются. Что не так с этим примером?

1 Ответ

0 голосов
/ 27 июня 2018

Проблема в том, что сообщения уже были в потоке, а начальное смещение не было установлено как "самое раннее".

Dataset<Row> dataset = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", start.getTopicName())
        .option("startingOffsets", "earliest")
        .load();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...