В моем приложении для структурированного потокового вещания у меня есть следующее требование:
Потоковые события из топологии Kafka c в микропакетном режиме. Фильтруйте события и затем агрегируйте на основе одного уникального идентификатора. Запишите агрегированные данные в виде списка в приемник. Я использую foreachbatch для записи и внутренне вызываю функцию Void2 для того же.
До записи все работает нормально, и проблема в том, что набор данных, который передается в каждом пакете, добавляется к данным предыдущего пакета.
Я использую режим вывода в качестве обновления, не могу использовать добавление, поскольку у меня не будет никакого контроля над временем события и, следовательно, я не смогу группировать по тому же.
Пример код ниже:
dataset.writeStream()
.foreachBatch(sinkFunction)
.options(config.getOptions())
.outputMode("update")
.trigger(Trigger.ProcessingTime(20 seconds))
.start();
И в функции приемника выполните следующее:
dataset.toDF().select("value")
.selectExpr("testserialize(value) as rows")
.select("rows.*")
.selectExpr(UNIQUE_KEY, DATA)
.as(Encoders.bean(TEST.class))
.collectAsList();
И запишите список в приемник.
Любая помощь приветствуется.