Почему этот код Spark работает в локальном режиме, а не в режиме кластера? - PullRequest
1 голос
/ 01 ноября 2019

Итак, у меня есть что-то вроде этого. Обратите внимание, что baseTrait (признак) здесь является сериализуемым, и поэтому thisClass (класс Object) также должен быть сериализуемым.

object thisClass extends baseTrait {
  private var someVar = null 

  def someFunc: RDD[...] {
    ...
    // assigned some string value or an empty string value (not null anymore)
    someVar = ... 
    ...
    if (someVar != "")
      someRDD.filter(x => aFunc(x, someVar))
    else
      ...
  }

В режиме кластера, когда я вызываю функцию someFunc (который является статическим методом, поскольку thisClass является классом Object) Я получаю исключение нулевого указателя, которое, я думаю, связано с тем, что someVar не сериализуется должным образом. Потому что, когда я делаю это, он отлично работает в кластерном режиме.

if (someVar != "") {
  val someVar_ = someVar
  someRDD.filter(x => aFunc(x, someVar_))
}

Есть идеи, что пошло не так в исходном коде, когда thisClass в первую очередь сериализуем?

Я предполагаю, что можно использовать переменную сериализуемого класса из другого класса, но если вы попытаетесь сделать это внутри этого класса, у вас могут возникнуть проблемы, так как в этом случае у вас будет среда выполнения, пытающаяся сериализовать тот же классоткуда закрытие вызывается. Что ты думаешь?

1 Ответ

3 голосов
/ 01 ноября 2019

В этом случае у вас нет проблем с сериализацией.

По сути, в кластерном режиме происходит то, что thisClass.someFunc фактически никогда не выполняется в JVM удаленного исполнителя. На исполнителе создается экземпляр thisClass, а someVar назначается null. Затем, пока объект thisClass находится в этом состоянии, платформа spark выполняет вашу лямбда-функцию непосредственно над записями, доступными в разделе данных этого исполнителя.

Чтобы избежать этого, нужно переместить присвоениеsomeVar в тело объекта thisClass. При этом значение будет присвоено someVar сразу после создания объекта. Имейте в виду, что этот код будет выполняться на каждом исполнителе в кластере.

Если это невозможно, другой вариант будет сопоставить ваш RDD[T] с RDD[(T, String)], гдестрока someVar для каждой записи, и тогда ваш фильтр может быть что-то вроде .filter(x => aFunc(x._1, x._2)). Этот метод будет использовать больше памяти, так как у вас будет много копий значения someVar.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...