Я пытаюсь передать массив внутри функции mapPartitionsWithIndex , но это вызывает исключение NullPointerException. Вот мой код
var bestSolutions = bcWrapper(sc, (Array(): Array[BAT1], 3: Int)) //
rdd.mapPartitionsWithIndex { (index, iterator) =>
var li = iterator.toArray
var arr1 = arr.sortWith(_ < _).take(5)
val selected = (arr1, index)
bestSolutions.update(selected)
}
определение bcWrapper равно
case class bcWrapper[T: ClassTag](@transient sc: SparkContext,
@transient _v: T)
extends Serializable {
var broadcasted: Broadcast[T] = sc.broadcast(_v)
def update(v: T): Unit = {
try {
broadcasted.destroy()
} catch {
case e: Throwable =>
println("broadcast cannot be destroyed", e)
}
broadcasted = sc.broadcast(v)
}
def value: T = broadcasted.value
}
Код вызывает исключение при вызове update функции bcWrapper класс. Как я могу исправить эту проблему?