В случае агрегации с отслеживанием состояния (произвольной) в структурированном потоке с foreachBatch, чтобы объединить обновление с дельта-таблицей, следует ли мне сохранять пакетный фрейм данных внутри foreachBatch перед загрузкой или нет?
Похоже, что сохранение не требуется так как я пишу в один приемник данных.
С другой стороны, у меня есть сильное чувство, что отсутствие сохранения вызовет повторное сканирование источника и дважды вызовет агрегацию.
Любые комментарии / мысли?
foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) ->
deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
.whenNotMatched().insertAll() // new session to be added
.whenMatched()
.updateAll()
.execute())