PySpark аккумулятор, который вычисляет максимальное значение - PullRequest
0 голосов
/ 03 октября 2018

Что если нам нужно, чтобы значение аккумулятора было установлено как максимальное число из всех значений, возвращаемых всеми задачами / узлами?

Пример:

  • аккумуляторнаборы

    • node1: 5
    • наборы node2: 6
    • наборы node3: 4

As6 больше 4, конечное значение аккумулятора должно быть 6.

1 Ответ

0 голосов
/ 03 октября 2018

Вам нужно будет определить AccumulatorParam как этот:

from pyspark import AccumulatorParam

class MaxAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return initialValue
    def addInPlace(self, v1, v2):
        return max(v1, v2)

, который можно использовать, как показано ниже:

acc = spark.sparkContext.accumulator(float("-inf"), MaxAccumulatorParam())
rdd = sc.parallelize([5, 6, 4], 3)

acc.value
# -inf

rdd.foreach(lambda x: acc.add(x))
acc.value
# 6
...