Как правильно написать собственный AccumulatorParam для этой задачи? - PullRequest
1 голос
/ 12 марта 2020

Контекст: Работа в Azure Databricks, Python язык программирования, среда Spark.

У меня rdd, и я создал операцию map.

rdd = sc.parallelize(my_collection)
mapper = rdd.map(lambda val: do_something(val))

Допустим, элементы в этом преобразователе имеют тип Foo. У меня есть глобальный объект типа Bar, который находится на узле драйвера и имеет внутреннюю коллекцию Foo объектов, которые необходимо заполнить с рабочих узлов (то есть элементов в mapper).

# This is what I want to do
bar_obj = Bar()

def add_to_bar(foo_obj):
    global bar_obj
    bar_obj.add_foo(foo_obj)

mapper.foreach(add_to_bar)

Из моего понимания Руководства по программированию RDD это не сработает из-за того, как в Spark работают замыкания. Вместо этого я должен использовать Accumulator для выполнения sh этого.

Я знаю, что мне как-то понадобится подкласс AccumulatorParam, но я не уверен относительно того, как этот класс выглядит, и как использовать его в этом случае.

Вот первый проход, который я сделал:

class FooAccumulator(AccumulatorParam):
  def zero(self, value):
    return value.bar
  def addInPlace(self, value1, value2):
    # bar is the parent Bar object for the value1 Foo instance
    value1.bar.add_foo(value2)
    return value1

Но я не уверен, как действовать дальше.

I Я также хотел бы добавить, что я попытался просто .collect() получить результаты от преобразователя, но это приводит к тому, что набор результатов превышает максимально допустимую память на узле драйвера (~ 4G, при повышении до 10G он работает, но в конце концов тайм-аут).

1 Ответ

0 голосов
/ 12 марта 2020

Не знаю, пробовали ли вы что-нибудь до сих пор? Я сам нашел этот кусок кода:

    from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

Так что, может быть, вы можете попробовать сделать что-то вроде этого:

from pyspark import AccumulatorParam

class FooAccumulator(AccumulatorParam):
    def zero(self, f):
        return []
    def addInPlace(self, f1, f2):
        f1.extend(f2)
        return acc1

accumulator = sc.accumulator("", FooAccumulator())

Я думаю, что этот поток может быть также полезно для вас.

...