Выходите из медленной карты Spark по таймауту, но сохраняйте результаты до сих пор - PullRequest
0 голосов
/ 06 июля 2018

Я сопоставляю SpD RDD с очень дорогой функцией (потенциально десятки секунд на строку).

Возможно, что работа займет слишком много времени, и мне нужно будет прервать ее, чтобы освободить место для других заданий в нашем потоке данных.

Тем не менее, результаты, вычисленные до сих пор, все еще были бы полезны для меня, поэтому я не хочу отказываться от них, тем более, что они, возможно, уже потратили часы на вычисления.

Есть ли способ выйти из преобразования рано, по таймауту, , но сохранить частичные результаты, вычисленные до сих пор ?

1 Ответ

0 голосов
/ 06 июля 2018

Есть по крайней мере несколько способов сделать это, переключившись с map на связанное преобразование:

mapPartitions

mapPartitions дает нам доступ к итератору для каждого раздела, поэтому мы можем просто делать вид, что в нем нет элементов, если время ожидания истекло:

val data = sc.parallelize(1 to 100)

val timeout = 10000

val start = System.currentTimeMillis

data.repartition(10).mapPartitions { iter =>
  if (System.currentTimeMillis - start > timeout) Iterator.empty
  else iter.map(x => { Thread.sleep(500); x + 1 })
}.count

В зависимости от вашей среды вам может потребоваться настроить таймауты в этом примере оболочки с искрой, но он должен давать различное количество результатов в зависимости от того, когда именно вы запустите преобразование относительно установки start.

Обратите внимание, что должно быть значительно большее количество разделов, чем общее количество ядер-исполнителей, в противном случае все разделы начнутся немедленно, и пропустить нечего. Следовательно, для этого примера мы явно переделаем данные перед тем, как запустить mapPartitions. Это может потребоваться или не потребоваться в зависимости от размера ваших данных и количества выделенных ядер.

flatMap

Более тонкий подход заключается в использовании flatMap, который позволяет нам обрабатывать или пропускать каждую отдельную строку условно, через функцию, которая возвращает Option (и просто возвращает None, если время ожидания истекло);

// setup as before
data.flatMap{ x => if (System.currentTimeMillis - start > timeout) None
                   else Some({Thread.sleep(500); x + 1}) }.count

Этот подход не требует разбиения, но он сканирует все оставшиеся разделы даже после истечения времени ожидания (но не выполняет дорогостоящую обработку ни одной из строк).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...