Как сохранить агрегированные потоковые данные в Elasticsearch? - PullRequest
0 голосов
/ 22 апреля 2020

Я выполняю aws задания emr для чтения потоковых данных из kafka и загрузки в awsasticsearch. Ниже приведен код.

val input = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafkaBroker")   
            .option("subscribe", "topic")    
            .option("startingOffsets", "earliest") 
            .load()

val inputES = input.withWatermark("Time","10 minutes")
             .groupBy($"ID",window($"Time").cast("Timestamp"),"10 minutes")).count()
             .select($"ID",$"Timestamp",$"count")

val result = inputES.writeStream.outputMode("append").queryName("writeToES").format("org.elastcsearch.spark.sql")
.start(ID/Totalcount)

У меня возникает следующая ошибка

org. apache .spark. sql .AnalysisException: добавление режима вывода не поддерживается, когда есть являются потоковыми агрегатами потоковых DataFrames / DataSets без водяных знаков;

Как выполнить агрегацию перед загрузкой данных вasticsearch?

Примечание. Это прекрасно работает при записи в консоль. Я пытаюсь получить количество идентификаторов в течение 10 минут и написать в ES.

...