Получите, сколько строк было обработано во время искрового фильтра - PullRequest
0 голосов
/ 31 октября 2018

У меня есть базовая операция - отфильтровать фрейм данных / rdd.

val sc = ...
val seqRDD = sc.parallelize(1 to 10)

val filteredData = seqRDD.filter(v => v < 10)
... count / other actions...

Я хочу получить статистику в реальном времени о том, сколько строк было обработано уже во время этого фильтра. Или сколько процентов фильтра было завершено.
Как я могу получить статистику в реальном времени? Поиск в Интернете, не могу найти правильное решение ...
Спасибо.

Ответы [ 2 ]

0 голосов
/ 01 ноября 2018

Принимая пример сценария.

Допустим, у меня есть файл и в нем есть пустые строки. Я хочу посчитать эти пустые строки и отфильтровать их.

Пример файла:

Hello World

Это образец файлов

Использование аккумулятора для вычисления пустых строк.

Код:

val accum = sc.accumulator(0,"testAccum")

scala> rddFile.filter{ x =>
 | if(x == "")
 | {
 | accum += 1
 | }
 | x != ""
 | }


accum.value

Пожалуйста, сначала соберите rdd, полученный в результате фильтра, а затем напечатайте значение «аккумулировать»

Будет напечатано количество пустых строк, доступных в файле. если вы хотите узнать, сколько обработанных строк, просто снимите условие if.

0 голосов
/ 01 ноября 2018

Вы можете использовать аккумуляторы Spark для таких целей. Аккумуляторы предоставляют возможность обновлять значения на рабочих узлах и накапливать эти значения в Spark Driver. Они не предоставляют достоверную статистику в реальном времени, но они определенно могут дать вам хорошее представление о прогрессе.

В этом случае вы можете создать аккумулятор, который просто подсчитывает записи, проходящие через ваш фильтр:

// create accumulator on Driver
val counter = sc.longAccumulator("filtered")

// use it in function sent to Worker
val filteredData = seqRDD.filter { v =>
  counter.add(1)
  v < 10
}

// perform some action on filteredData...

// use accumulator's value method to get value in Driver at any given moment.
println(counter.value)

В приведенном выше примере мы «читаем» аккумулятор только после того, как закончим. Но вы можете получить доступ к counter.value из других потоков, пока действие на filteredData еще выполняется, и получить «частичные» результаты. Вот простой пример, который показывает это:

// partitioning the data - otherwise accumulator would 
// probably only get updated for all data at once:
val seqRDD = sc.parallelize(1 to 1000, 20)

// create accumulator
val counter = sc.longAccumulator("filtered")

// schedule TimerTask to print current value of accumulator every 50 milis:
val t = new java.util.Timer()
t.schedule(new java.util.TimerTask {
  def run() = println(counter.value)
}, 10L, 50L)

// apply filter
val filteredData = seqRDD.filter { v =>
  counter.add(1)
  Thread.sleep(5)
  v < 10
}

// perform action:
filteredData.collect()

t.cancel()

Это печатает counter.value во время выполнения действия, и вывод выглядит примерно так: 0 0 0 0 0 0 0 200 200 200 200 200 400 400 400 400 400 450 600 600 600 600 600 700 800 800 800 800 900

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...