Сериализация Kryo в Spark: отправка атрибута объекта с помощью карты - PullRequest
0 голосов
/ 11 апреля 2020

Я использую сериализатор kryo, и я объявил класс, который расширяет Serializable, и у него есть некоторые атрибуты и методы:

class MySerializable extends Serializable {
  var arrStr: Array[String] = _
  // .... more attributes

  def getArrName(index: Byte): String = arrStr(index)
  // .... more methods
}

Я создаю SparkSession следующим образом:

    val conf = new SparkConf()
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[MySerializable]))

    val spark = SparkSession
      .builder
      .master(masterName)
      .appName(name)
      .config(conf)
      .config("spark.ui.showConsoleProgress", "false")
      .config("spark.default.parallelism", numPart)
      .config("spark.hadoop.validateOutputSpecs", "false") //If set to true, validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants.
      .getOrCreate

Тогда проблема в том, что когда я пытаюсь использовать следующее, я получаю исключение нулевого указателя:

val myDF = oldDF.map {
  case Row(index: Double) => {
    Row(concat(delimiter + objectMySerializable.getArrName(index.toByte)))
  }
}(schema)

Но вне вызова map хорошо работает objectMySerializable.getArrName(). Итак, я решил свою проблему, сделав следующие изменения:

val myArr = objectMySerializable.arrStr
val myDF = oldDF.map {
  case Row(index: Double) => {
    val textConcat = delimiter + myArr(index.toByte)
    Row(textConcat)
  }
}(schema)

И это работает! Но я хотел бы понять, почему я не могу использовать первый подход, а также, если я неправильно понимаю концепцию сериализации, потому что я думал, что это лучший способ для эффективной передачи данных по всем узлам, но так, как я мы пытались сделать это не сработало.

...