Этот скрипт создает аккумулятор accumulate_dict
. Кажется, он работает хорошо, например: inc("foo")
, за которым следует inc("foo")
, обновляет аккумулятор до Accumulator<id=1, value={'foo': 2}>
. Но когда я запускаю последнюю строку, которая запускает его на rdd, происходит сбой с ошибкой: File "<stdin>", line 6, in addInPlace
TypeError: unhashable type: 'dict'
. PySpark пытается каким-то образом иметь аккумулятор sh? Как я могу обновить этот словарь, используя аккумулятор?
from pyspark import AccumulatorParam, SparkContext
rdd = sc.parallelize(["foo", "bar", "foo", "foo", "bar"])
class SAP(AccumulatorParam):
def zero(self, value):
return value.copy()
def addInPlace(self, v1, v2):
v3 = dict(v1)
v3[v2] = v3.get(v2, 0) + 1
return v3
accumulate_dict = sc.accumulator({}, SAP())
def inc(x):
global test
test += x
rdd.foreach(inc)
Ссылки:
Исходный код для pyspark.accumulators
Вопрос Stackoverflow