Spark сериализует значение переменной как ноль вместо ее реального значения - PullRequest
0 голосов
/ 30 апреля 2020

Мое понимание механизма распределения кода Spark по узлам, на которых он работает, является лишь поверхностным, и мне не удается успешно выполнить мой код в API-интерфейсе Spark mapPartitions, когда я sh создаю экземпляр класса для каждого раздела, с аргументом.

Приведенный ниже код работал отлично, пока я не разработал класс MyWorkerClass, чтобы требовать аргумент:

  val result : DataFrame =
    inputDF.as[Foo].mapPartitions(sparkIterator => {

      // (1) initialize heavy class instance once per partition
      val workerClassInstance = MyWorkerClass(bar)

      // (2) provide an iterator using a function from that class instance
      new CloseableIteratorForSparkMapPartitions[Post, Post](sparkIterator, workerClassInstance.recordProcessFunc)
    }

Приведенный выше код работал прекрасно до того момента, когда я имел (или выбрал) добавить аргумент конструктора в мой класс MyWorkerClass. Переданное значение аргумента в рабочем случае получается как null вместо действительного значения bar. Каким-то образом сериализация аргумента не работает должным образом.

Как бы вы go об этом?


Дополнительные мысли / комментарии

Я буду избегать добавление громоздкого кода CloseableIteratorForSparkMapPartitions - он просто предоставляет дружественный Spark итератор и может даже не быть самой элегантной реализацией в этом.

Насколько я понимаю, аргумент конструктора не правильно передается рабочему Spark из-за того, как Spark захватывает состояние при сериализации содержимого для отправки на выполнение на рабочем Spark. Однако создание экземпляра класса позволяет легко загружать в этот класс ресурсы с большой нагрузкой, обычно доступные для функции, представленной в последней строке моего кода выше; И класс, похоже, был создан для каждого раздела. Что на самом деле является допустимым, если не ключевым вариантом использования для использования mapPartitions вместо map.

Это передача аргумента его экземпляру, что у меня возникают проблемы с выяснением, как включить или обойти , В моем случае этот аргумент является значением, известным только после запуска программы (даже если он всегда инвариантен в течение одного выполнения моей работы; на самом деле это аргумент программы). Мне нужно передать его для инициализации класса.

Я попытался решить эту проблему, предоставив функцию , которая создает экземпляр MyWorkerClass с входным аргументом, а не непосредственно, как описано выше, но это не решило вопросы.

Симптом проблемы root не является исключением, а просто то, что значение bar при создании экземпляра MyWorkerClass будет просто null вместо фактического значения bar который известен в области кода, охватывающего фрагмент кода, который я включил выше!

* один связанный старый вопрос об обсуждении Spark здесь

...