Spark Structured Streaming foreachBatch и UPSERT (объединение): сохранять или не сохранять? - PullRequest
1 голос
/ 19 января 2020

В случае агрегации с отслеживанием состояния (произвольной) в структурированном потоке с foreachBatch, чтобы объединить обновление с дельта-таблицей, следует ли мне сохранять пакетный фрейм данных внутри foreachBatch перед загрузкой или нет?

Похоже, что сохранение не требуется так как я пишу в один приемник данных.

С другой стороны, у меня есть сильное чувство, что отсутствие сохранения вызовет повторное сканирование источника и дважды вызовет агрегацию.

Любые комментарии / мысли?

foreachBatch((VoidFunction2<Dataset<Row>, Long>) (batchDf, batchId) -> 
        deltaTable.as("table").merge(batchDf.as("updates"), functions.expr("table.id=updates.id"))
        .whenNotMatched().insertAll()           // new session to be added
        .whenMatched()
        .updateAll()
        .execute())

1 Ответ

0 голосов
/ 19 января 2020

Позвольте мне процитировать страницу ниже:

Чтобы избежать повторных вычислений, вы должны кэшировать выходной DataFrame / Dataset, записать его в несколько мест, а затем разархивировать.

Я не знаю, если вы уже посетили эту страницу, но, похоже, вы правы, что в вашем случае упорствовать не нужно. Это важно для нескольких мест.

Источник: https://docs.databricks.com/spark/latest/structured-streaming/foreach.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...