Итак, я пытаюсь создать чрезвычайно простую записную книжку spark, используя блоки данных Azure, и хотел бы использовать простой вызов карты RDD.
Это просто для того, чтобы возиться, поэтому пример немного надуманный, но я не могу получить значение для работы в вызове карты RDD, если оно не является значением статической константы
Я пытался использовать широковещательную переменную
Вот простой пример использования int, который я транслирую, а затем пытаюсь использовать в карте RDD
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
Вот еще один пример, где я использую простой сериализуемый одноэлементный объект с полем int, которое я транслирую, а затем пытаюсь использовать в карте RDD
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()
И, наконец, List[int]
с одним элементом, который я транслирую, а затем пытаюсь использовать в карте RDD
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()
Однако ВСЕ приведенные выше примеры завершаются с этой ошибкой. Который, как вы можете видеть, указывает на проблему с сериализуемым значением карты RDD. Я не вижу проблемы, и значение int должно быть сериализуемым, используя все приведенные выше примеры, я думаю
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:378)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.map(RDD.scala:378)
Если я, однако, сделаю значение в карте RDD обычным значением типа int, как это
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()
Все работает нормально, и я вижу мой простой DataFrame, показанный как ожидалось

Есть идеи у кого-нибудь?