Классификация Case Case в Spark - PullRequest
0 голосов
/ 06 июля 2018

В приложении Spark (Spark 2.1) я пытаюсь отправить класс case в качестве входного параметра функции, предназначенной для выполнения на исполнителях

object TestJob extends App {

  val appName = "TestJob"
  val out = "out"
  val p = Params("my-driver-string")

  val spark = SparkSession.builder()
    .appName(appName)
    .getOrCreate()
  import spark.implicits._

  (1 to 100).toDF.as[Int].flatMap(i => Dummy.process(i, p))
    .write
    .option("header", "true")
    .csv(out)
}

object Dummy {

  def process(i: Int, v:Params): Vector[String] = {
    Vector { if( i % 2 == 1) v + "_odd" else v + "_even" }
  }
}

case class Params(v: String)

Когда я запускаю его с master local [*], все идет хорошо, в то время как при работе в кластере состояние класса Params не сериализуется, и вывод приводит к Нулевой _even 1007 * нуль * _odd ...

Не могли бы вы помочь мне понять, что я делаю неправильно?

1 Ответ

0 голосов
/ 06 июля 2018

Поглядывая вокруг, я нашел этот пост, который дал мне решение: Переменная Spark возвращает NullPointerException при запуске в кластере Amazon EMR

В конце концов, проблема связана с расширить приложения

...