Несколько операций / агрегатов на одном и том же Dataframe / Dataset в Spark структурированной потоковой передаче - PullRequest
1 голос
/ 19 марта 2019

Я использую Spark 2.3.2.

Я получаю данные от Кафки.Я должен сделать несколько агрегаций на одних и тех же данных .Затем все результаты агрегации будут отправлены в той же базы данных (столбцы или таблицы могут быть изменены) .Например:

val kafkaSource = spark.readStream.option("kafka") ...
val agg1 = kafkaSource.groupBy().agg ...
val agg2 = kafkaSource.groupBy().mapgroupswithstate() ...
val agg3 = kafkaSource.groupBy().mapgroupswithstate() ...

Но когда я пытаюсь вызвать writeStream для каждого результата агрегации:

aggr1.writeStream().foreach().start()
aggr2.writeStream().foreach().start()
aggr3.writeStream().foreach().start()

Spark получает данные независимо в каждом writeStream.Этот способ эффективен?

Могу ли я сделать несколько агрегатов с одним writeStream?Если это возможно, этот способ эффективен?

Ответы [ 2 ]

1 голос
/ 18 апреля 2019

Каждая операция «writeestream» приводит к новому потоковому запросу. Каждый потоковый запрос будет читать из источника и выполнять весь план запроса. В отличие от DStream, опция кэширования / сохранения не доступна.

В spark 2.4 был представлен новый API forEachBatch для более эффективного решения подобных сценариев.

1 голос
/ 19 марта 2019

Кэширование может использоваться, чтобы избежать многократного чтения:

kafkaSource.writeStream.foreachBatch((df, id) => {
  df.persist()
  val agg1 = df.groupBy().agg ...
  val agg2 = df.groupBy().mapgroupswithstate() ...
  val agg3 = df.groupBy().mapgroupswithstate() ...
  df.unpersist()
}).start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...