Вы можете использовать аккумуляторы Spark для таких целей. Аккумуляторы предоставляют возможность обновлять значения на рабочих узлах и накапливать эти значения в Spark Driver. Они не предоставляют достоверную статистику в реальном времени, но они определенно могут дать вам хорошее представление о прогрессе.
В этом случае вы можете создать аккумулятор, который просто подсчитывает записи, проходящие через ваш фильтр:
// create accumulator on Driver
val counter = sc.longAccumulator("filtered")
// use it in function sent to Worker
val filteredData = seqRDD.filter { v =>
counter.add(1)
v < 10
}
// perform some action on filteredData...
// use accumulator's value method to get value in Driver at any given moment.
println(counter.value)
В приведенном выше примере мы «читаем» аккумулятор только после того, как закончим. Но вы можете получить доступ к counter.value
из других потоков, пока действие на filteredData
еще выполняется, и получить «частичные» результаты. Вот простой пример, который показывает это:
// partitioning the data - otherwise accumulator would
// probably only get updated for all data at once:
val seqRDD = sc.parallelize(1 to 1000, 20)
// create accumulator
val counter = sc.longAccumulator("filtered")
// schedule TimerTask to print current value of accumulator every 50 milis:
val t = new java.util.Timer()
t.schedule(new java.util.TimerTask {
def run() = println(counter.value)
}, 10L, 50L)
// apply filter
val filteredData = seqRDD.filter { v =>
counter.add(1)
Thread.sleep(5)
v < 10
}
// perform action:
filteredData.collect()
t.cancel()
Это печатает counter.value
во время выполнения действия, и вывод выглядит примерно так:
0
0
0
0
0
0
0
200
200
200
200
200
400
400
400
400
400
450
600
600
600
600
600
700
800
800
800
800
900