Проблема в том, что imap
ссылается на глобальную переменную, которая не существует (присвоение в test
создает только локальную переменную в этой функции). Эта простая программа (без Spark) выдает ту же ошибку по той же причине:
def foo():
blah = 1
def bar():
global blah
print(blah)
bar()
if __name__ == '__main__':
foo()
Назначение acc
на уровне модуля работает:
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf).getOrCreate()
rdds = sc.parallelize([Row(user="spark", item="book"), Row(user="spark", item="goods"),
Row(user="hadoop", item="book"), Row(user="python", item="duck")])
acc = sc.accumulator(0)
print("accumulator: {}".format(acc))
def imap(row):
global acc
acc += 1
return row
rdds.map(imap).foreach(print)
print(acc.value)
Добавление global acc
оператор для test
является альтернативой, если вам нужно сохранить функцию test
.