Функция Spark не сериализуема - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть класс:

class DataLoader {

  def rdd2RddTransform(
    ss: SparkSession,
    inputRDD: RDD[GenericRecord]): RDD[GenericRecord] = {

    inputRDD.asInstanceOf[RDD[TrainingData]]
            .map(reformatTrainingData)
  }

  private def reformatTrainingData: TrainingData => ReFormatedData
               = (trainingData: TrainingData) => {func implement}

}

Он работал очень хорошо, но выдал исключение: org.apache.spark.SparkException: Task not serializable после того, как я внес небольшое изменение в карту СДР:

inputRDD.asInstanceOf[RDD[TrainingData]].map(reformatTrainingData(_))

Я думал, что две функции должны быть одинаковыми, но, похоже, это не так.Почему они разные?

1 Ответ

0 голосов
/ 01 февраля 2019

Это потому, что методы и функции не вполне взаимозаменяемы в Scala.

Функции являются автономными объектами (то есть экземплярами таких классов, как Function1, Function2, Function3 ...), нометоды остаются привязанными к своему классу.Это может создать проблемы в Spark, если включающий класс не Serializable - когда Spark пытается сериализовать метод, он не может сериализовать связанный экземпляр класса.

Обратите внимание, что reformatTrainingData равен aметод, который возвращает функцию

Поэтому, когда вы вызываете что-то вроде:

rdd.map(reformatTrainingData)

, вы фактически вызываете метод no-arg reformatTrainingData и возвращаете автономный Function1 экземплярэто можно безопасно сериализовать.Вы также можете написать это как

private def reformatTrainingData(): TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData())

, чтобы подчеркнуть, что происходит вызов метода.

Когда вы переходите на reformatTrainingData(_), вы вместо этого используете частично примененный метод;когда Spark попытался сериализовать это, ему нужно включить и сериализовать включающий класс DataLoader, который не помечен как Serializable.

Та же проблема возникла бы, если бы reformatTrainingData был простым методомвведите TrainingData => ReFormatedData.

Если вы пометите DataLoader как extends Serializable, то любая из версий должна работать.

Было бы также хорошо сделать reformatTrainingData в val,потому что vals не тянет вмещающий класс при сериализации:

private val reformatTrainingData: TrainingData => ReFormatedData ...

rdd.map(reformatTrainingData)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...