Spark Broadcast Variable - PullRequest
       3

Spark Broadcast Variable

3 голосов
/ 13 апреля 2020

Я попробовал следующий код

val t1 = sc.parallelize(0 until 10)
val t2 = sc.broadcast(2)
val t3 = t1.filter(_ % t2.value == 0).persist()
t3.count()
t2.destroy()
t3.count()

Он жалуется, что "попытался использовать Broadcast после его уничтожения" во второй t3.count(), что меня смущает. Если я правильно понимаю, мы вызываем persist на t3 и, таким образом, после первого t3.count(), t3 сохраняется в памяти. Если это так, t3 не нужно пересчитывать во втором t3.count(), и его можно безопасно уничтожить t2. Но, похоже, это не так. Интересно, что здесь происходит.

1 Ответ

2 голосов
/ 13 апреля 2020

Вопрос : он жалуется, что "попытался использовать Broadcast после его уничтожения" во втором t3.count (), что меня смущает. Если я правильно понимаю, мы вызываем persist на t3 и, таким образом, после первого t3.count (), t3 сохраняется в памяти. Если это так, t3 не нужно пересчитывать во втором t3.count (), и это должно быть безопасно уничтожить t2. Но, похоже, это не так.


  • с искровым корпусом с искрой 2.4.0, я также получаю ту же ошибку.

* Но, что удивительно, intellij local maven scala проект (с Spark 2.4.5 и Spark 2.2.2) с использованием Case cache / persist Я НЕ получить это исключение. В искре может быть проблема или может быть какая-то другая причина. *

ПРИМЕР 1: Без использования кеша / вызова сохраняемого вызова destroy

  val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0)
  println(t3.count())
  t2.destroy()
  println(t3.count())

Так как это не cache или persist ed, вы получите результат ниже Результат:

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at BroadCastCheck.scala:20) 
    at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)

Case2: с использованием кеша / вызова сохраняемого вызова destroy .
Вариант использования с cache / persist: кадр данных t3 не будет пересчитан. следовательно, нет ошибки после destroy

 val t1 = sc.parallelize(0 until 10)
  val t2 = sc.broadcast(2)
  val t3 = t1.filter(_ % t2.value == 0).cache // or persist as well  
  println(t3.count())
  t2.destroy()
  println(t3.count())

Результат:

5

5
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...