Исключительная структурированная потоковая передача foreachbatch продолжает добавлять отфильтрованные события при каждой записи. - PullRequest
0 голосов
/ 09 марта 2020

В моем приложении для структурированного потокового вещания у меня есть следующее требование:

Потоковые события из топологии Kafka c в микропакетном режиме. Фильтруйте события и затем агрегируйте на основе одного уникального идентификатора. Запишите агрегированные данные в виде списка в приемник. Я использую foreachbatch для записи и внутренне вызываю функцию Void2 для того же.

До записи все работает нормально, и проблема в том, что набор данных, который передается в каждом пакете, добавляется к данным предыдущего пакета.

Я использую режим вывода в качестве обновления, не могу использовать добавление, поскольку у меня не будет никакого контроля над временем события и, следовательно, я не смогу группировать по тому же.

Пример код ниже:

dataset.writeStream()
        .foreachBatch(sinkFunction)
        .options(config.getOptions())
        .outputMode("update")
        .trigger(Trigger.ProcessingTime(20 seconds))
        .start();

И в функции приемника выполните следующее:

dataset.toDF().select("value")
        .selectExpr("testserialize(value) as rows")
        .select("rows.*")
        .selectExpr(UNIQUE_KEY, DATA)
        .as(Encoders.bean(TEST.class))
        .collectAsList();

И запишите список в приемник.

Любая помощь приветствуется.

...