Контекст: Работа в Azure Databricks, Python язык программирования, среда Spark.
У меня rdd
, и я создал операцию map
.
rdd = sc.parallelize(my_collection)
mapper = rdd.map(lambda val: do_something(val))
Допустим, элементы в этом преобразователе имеют тип Foo
. У меня есть глобальный объект типа Bar
, который находится на узле драйвера и имеет внутреннюю коллекцию Foo
объектов, которые необходимо заполнить с рабочих узлов (то есть элементов в mapper
).
# This is what I want to do
bar_obj = Bar()
def add_to_bar(foo_obj):
global bar_obj
bar_obj.add_foo(foo_obj)
mapper.foreach(add_to_bar)
Из моего понимания Руководства по программированию RDD это не сработает из-за того, как в Spark работают замыкания. Вместо этого я должен использовать Accumulator
для выполнения sh этого.
Я знаю, что мне как-то понадобится подкласс AccumulatorParam
, но я не уверен относительно того, как этот класс выглядит, и как использовать его в этом случае.
Вот первый проход, который я сделал:
class FooAccumulator(AccumulatorParam):
def zero(self, value):
return value.bar
def addInPlace(self, value1, value2):
# bar is the parent Bar object for the value1 Foo instance
value1.bar.add_foo(value2)
return value1
Но я не уверен, как действовать дальше.
I Я также хотел бы добавить, что я попытался просто .collect()
получить результаты от преобразователя, но это приводит к тому, что набор результатов превышает максимально допустимую память на узле драйвера (~ 4G, при повышении до 10G он работает, но в конце концов тайм-аут).