Я выполняю aws задания emr для чтения потоковых данных из kafka и загрузки в awsasticsearch. Ниже приведен код.
val input = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafkaBroker")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.load()
val inputES = input.withWatermark("Time","10 minutes")
.groupBy($"ID",window($"Time").cast("Timestamp"),"10 minutes")).count()
.select($"ID",$"Timestamp",$"count")
val result = inputES.writeStream.outputMode("append").queryName("writeToES").format("org.elastcsearch.spark.sql")
.start(ID/Totalcount)
У меня возникает следующая ошибка
org. apache .spark. sql .AnalysisException: добавление режима вывода не поддерживается, когда есть являются потоковыми агрегатами потоковых DataFrames / DataSets без водяных знаков;
Как выполнить агрегацию перед загрузкой данных вasticsearch?
Примечание. Это прекрасно работает при записи в консоль. Я пытаюсь получить количество идентификаторов в течение 10 минут и написать в ES.