Мое понимание механизма распределения кода Spark по узлам, на которых он работает, является лишь поверхностным, и мне не удается успешно выполнить мой код в API-интерфейсе Spark mapPartitions
, когда я sh создаю экземпляр класса для каждого раздела, с аргументом.
Приведенный ниже код работал отлично, пока я не разработал класс MyWorkerClass
, чтобы требовать аргумент:
val result : DataFrame =
inputDF.as[Foo].mapPartitions(sparkIterator => {
// (1) initialize heavy class instance once per partition
val workerClassInstance = MyWorkerClass(bar)
// (2) provide an iterator using a function from that class instance
new CloseableIteratorForSparkMapPartitions[Post, Post](sparkIterator, workerClassInstance.recordProcessFunc)
}
Приведенный выше код работал прекрасно до того момента, когда я имел (или выбрал) добавить аргумент конструктора в мой класс MyWorkerClass
. Переданное значение аргумента в рабочем случае получается как null
вместо действительного значения bar
. Каким-то образом сериализация аргумента не работает должным образом.
Как бы вы go об этом?
Дополнительные мысли / комментарии
Я буду избегать добавление громоздкого кода CloseableIteratorForSparkMapPartitions
- он просто предоставляет дружественный Spark итератор и может даже не быть самой элегантной реализацией в этом.
Насколько я понимаю, аргумент конструктора не правильно передается рабочему Spark из-за того, как Spark захватывает состояние при сериализации содержимого для отправки на выполнение на рабочем Spark. Однако создание экземпляра класса позволяет легко загружать в этот класс ресурсы с большой нагрузкой, обычно доступные для функции, представленной в последней строке моего кода выше; И класс, похоже, был создан для каждого раздела. Что на самом деле является допустимым, если не ключевым вариантом использования для использования mapPartitions
вместо map
.
Это передача аргумента его экземпляру, что у меня возникают проблемы с выяснением, как включить или обойти , В моем случае этот аргумент является значением, известным только после запуска программы (даже если он всегда инвариантен в течение одного выполнения моей работы; на самом деле это аргумент программы). Мне нужно передать его для инициализации класса.
Я попытался решить эту проблему, предоставив функцию , которая создает экземпляр MyWorkerClass
с входным аргументом, а не непосредственно, как описано выше, но это не решило вопросы.
Симптом проблемы root не является исключением, а просто то, что значение bar
при создании экземпляра MyWorkerClass
будет просто null
вместо фактического значения bar
который известен в области кода, охватывающего фрагмент кода, который я включил выше!
* один связанный старый вопрос об обсуждении Spark здесь