Возврат результатов от исполнителя к водителю - PullRequest
0 голосов
/ 27 мая 2020

У меня есть искровое приложение, которое в основном принимает большой набор данных, выполняет над ним некоторые вычисления и, наконец, выполняет некоторые операции ввода-вывода для хранения его в базе данных. Все эти этапы выполняются исполнителями, и драйвер получает (собирает) логическое значение от каждой задачи, представляющее состояние успеха / сбоя этой задачи (например, вычисление или ввод-вывод может завершиться ошибкой для некоторых элементов).

Например, после является чрезмерно упрощенным происхождением (в реальной реализации есть несколько этапов переразбиения и вычислений):

readSomeDataset()
  .repartition()
  .mapPartition { // do some calculation }
  .mapPartition { // do some IO }
  .collect()

Проблема: Основываясь на результате вычислений, я хотел бы сделать что-нибудь еще с драйвером (например, опубликовать сообщение о том, что «вычисление выполнено успешно»). Это нужно сделать один раз для всего набора данных, а не для отдельного раздела, и, следовательно, это должно происходить в драйвере.

Однако ввод-вывод исполнителей занимает много времени, и я не хочу ждать для этого нужно завершить sh перед публикацией.

Есть ли у исполнителей способ отправить «сообщение» обратно драйверу во время обработки задач?

(Что-то вроде На ум приходят аккумуляторы, но, как известно, их можно будет использовать только после того, как завершится финальное действие над исполнителями)

1 Ответ

0 голосов
/ 27 мая 2020

Spark - это ленивый фреймворк, и для его выполнения требуется полное задание (от чтения до записи), он не может выполнять только часть.

Чтобы внести эти изменения без повторной обработки, вы можете кэшировать фреймы данных, чтобы восстановить как можно быстрее, примерно так.

val calculatedDF = readSomeDataset()
  .repartition()
  .mapPartition { // do some calculation }
  .cache() // or persist if can't fit in memory of the executors

if (caculatedDF.map(checkEackAreOK).reduce(_ && _).head) { // a condition to see if the calculations are ok and an action to launch it
  println("correct calculation")
  calculatedDF
    .mapPartition { // do some IO }
    .collect()
} else {
  println("incorrect calculation")
}
...