Невозможно запустить аккумулятор типа словарь на RDD в в Pyspark - PullRequest
0 голосов
/ 06 февраля 2020

Этот скрипт создает аккумулятор accumulate_dict. Кажется, он работает хорошо, например: inc("foo"), за которым следует inc("foo"), обновляет аккумулятор до Accumulator<id=1, value={'foo': 2}>. Но когда я запускаю последнюю строку, которая запускает его на rdd, происходит сбой с ошибкой: File "<stdin>", line 6, in addInPlace TypeError: unhashable type: 'dict'. PySpark пытается каким-то образом иметь аккумулятор sh? Как я могу обновить этот словарь, используя аккумулятор?

from pyspark import AccumulatorParam, SparkContext

rdd = sc.parallelize(["foo", "bar", "foo", "foo", "bar"])

class SAP(AccumulatorParam):
    def zero(self, value):
        return value.copy()
    def addInPlace(self, v1, v2):
        v3 = dict(v1)
        v3[v2] = v3.get(v2, 0) + 1
        return v3

accumulate_dict = sc.accumulator({}, SAP())
def inc(x):
    global test
    test += x

rdd.foreach(inc)

Ссылки:
Исходный код для pyspark.accumulators
Вопрос Stackoverflow

...