Как исправить ошибку при применении функции округления в столбцах набора данных (SparkException: задача не сериализуема) - PullRequest
1 голос
/ 12 апреля 2019

Я начинаю использовать Spark со Scala в записной книжке dataBricks, однако у меня появляется странная ошибка:

 SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.Column
 Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: t020101)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@1ccc6944)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
 ...

Код работает нормально, когда я выполняю функцию округления непосредственно над значениями:

 def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {

  summed.map{
       case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
       TimeUsageRow(working, sex, age, (primaryNeeds* 10).round / 10d, (work* 10).round / 10d, (other* 10).round / 10d)
     }
   }

 val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
 display(time_Usage_Round_DS)

Но, когда я пытаюсь выполнить вспомогательную функцию, я получаю ошибку, упомянутую выше:

 def timeUsageGroupedRound(summed: Dataset[TimeUsageRow]): Dataset[TimeUsageRow] = {

  def round1(d:Double):Double = (d * 10).round / 10d

  summed.map{
       case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
       TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))
     }
   }
 val time_Usage_Round_DS = timeUsageGroupedRound(time_Usage_Grouped_DS)
 display(time_Usage_Round_DS)

Кто-нибудь может объяснить, почему это происходит?Большое спасибо!

1 Ответ

0 голосов
/ 13 апреля 2019

Краткий ответ 1:

Переместите round1 из вашего класса в объект (возможно, используйте сопутствующий объект https://docs.scala -lang.org / tour / singleton-objects.html ).

Краткий ответ 2:

В качестве альтернативы, переместите все, что не Serializable, за пределы вашего класса (см. Длинный ответ) - хотя это может быть болезненным в зависимости от размера класса.

Длинный ответ:

Это интересный случай, который меня несколько раз сбивал с толку в прошлом. Во-первых, когда вы делаете .map на наборе данных / DataFrame, то, что происходит внутри, так это то, что все внутри карты - в вашем случае:

case TimeUsageRow(working, sex, age, primaryNeeds, work, other) => 
   TimeUsageRow(working, sex, age, round1(primaryNeeds), round1(work), round1(other))

упаковывается и отправляется от водителя к исполнителям. Из-за того, как Spark взаимодействует между драйвером и исполнителем, все, что вы отправляете, должно быть Serializable. Эта ошибка возникает из-за того, что при включении round1 он также перетаскивает в остальной класс вместе с ним, и если в классе есть что-то, что не является Serializable, то возникает эта ошибка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...