Вопрос : он жалуется, что "попытался использовать Broadcast после его уничтожения" во втором t3.count (), что меня смущает. Если я правильно понимаю, мы вызываем persist на t3 и, таким образом, после первого t3.count (), t3 сохраняется в памяти. Если это так, t3 не нужно пересчитывать во втором t3.count (), и это должно быть безопасно уничтожить t2. Но, похоже, это не так.
- с искровым корпусом с искрой 2.4.0, я также получаю ту же ошибку.
* Но, что удивительно, intellij local maven scala проект (с Spark 2.4.5 и Spark 2.2.2) с использованием Case cache
/ persist
Я НЕ получить это исключение. В искре может быть проблема или может быть какая-то другая причина. *
ПРИМЕР 1: Без использования кеша / вызова сохраняемого вызова destroy
val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2)
val t3 = t1.filter(_ % t2.value == 0)
println(t3.count())
t2.destroy()
println(t3.count())
Так как это не cache
или persist
ed, вы получите результат ниже Результат:
org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at BroadCastCheck.scala:20)
at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
Case2: с использованием кеша / вызова сохраняемого вызова destroy .
Вариант использования с cache
/ persist
: кадр данных t3 не будет пересчитан. следовательно, нет ошибки после destroy
val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2)
val t3 = t1.filter(_ % t2.value == 0).cache // or persist as well
println(t3.count())
t2.destroy()
println(t3.count())
Результат:
5
5