У меня есть простое структурированное потоковое приложение, которое просто читает данные из одной темы Кафки и пишет в другую.
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "start")
.load();
StreamingQuery query = dataset
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "checkpoint")
.option("topic", "end")
.start();
query.awaitTermination(20000);
По теме start
необходимо обработать два сообщения. Этот код выполняется без исключений, однако сообщения на тему end
не появляются. Что не так с этим примером?