Я использую сериализатор kryo, и я объявил класс, который расширяет Serializable, и у него есть некоторые атрибуты и методы:
class MySerializable extends Serializable {
var arrStr: Array[String] = _
// .... more attributes
def getArrName(index: Byte): String = arrStr(index)
// .... more methods
}
Я создаю SparkSession
следующим образом:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MySerializable]))
val spark = SparkSession
.builder
.master(masterName)
.appName(name)
.config(conf)
.config("spark.ui.showConsoleProgress", "false")
.config("spark.default.parallelism", numPart)
.config("spark.hadoop.validateOutputSpecs", "false") //If set to true, validates the output specification (e.g. checking if the output directory already exists) used in saveAsHadoopFile and other variants.
.getOrCreate
Тогда проблема в том, что когда я пытаюсь использовать следующее, я получаю исключение нулевого указателя:
val myDF = oldDF.map {
case Row(index: Double) => {
Row(concat(delimiter + objectMySerializable.getArrName(index.toByte)))
}
}(schema)
Но вне вызова map
хорошо работает objectMySerializable.getArrName()
. Итак, я решил свою проблему, сделав следующие изменения:
val myArr = objectMySerializable.arrStr
val myDF = oldDF.map {
case Row(index: Double) => {
val textConcat = delimiter + myArr(index.toByte)
Row(textConcat)
}
}(schema)
И это работает! Но я хотел бы понять, почему я не могу использовать первый подход, а также, если я неправильно понимаю концепцию сериализации, потому что я думал, что это лучший способ для эффективной передачи данных по всем узлам, но так, как я мы пытались сделать это не сработало.