Переменные вещания базы данных Azure не сериализуются - PullRequest
0 голосов
/ 26 апреля 2018

Итак, я пытаюсь создать чрезвычайно простую записную книжку spark, используя блоки данных Azure, и хотел бы использовать простой вызов карты RDD.

Это просто для того, чтобы возиться, поэтому пример немного надуманный, но я не могу получить значение для работы в вызове карты RDD, если оно не является значением статической константы

Я пытался использовать широковещательную переменную

Вот простой пример использования int, который я транслирую, а затем пытаюсь использовать в карте RDD

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

Вот еще один пример, где я использую простой сериализуемый одноэлементный объект с полем int, которое я транслирую, а затем пытаюсь использовать в карте RDD

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()

И, наконец, List[int] с одним элементом, который я транслирую, а затем пытаюсь использовать в карте RDD

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()

Однако ВСЕ приведенные выше примеры завершаются с этой ошибкой. Который, как вы можете видеть, указывает на проблему с сериализуемым значением карты RDD. Я не вижу проблемы, и значение int должно быть сериализуемым, используя все приведенные выше примеры, я думаю

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:379)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:378)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
    at org.apache.spark.rdd.RDD.map(RDD.scala:378)

Если я, однако, сделаю значение в карте RDD обычным значением типа int, как это

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()

Все работает нормально, и я вижу мой простой DataFrame, показанный как ожидалось

enter image description here

Есть идеи у кого-нибудь?

Ответы [ 2 ]

0 голосов
/ 02 мая 2018

Таким образом, ответ состоял в том, что вы не должны захватывать контент Spark в val, а затем использовать его для трансляции. Так что это рабочий код

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = spark.sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

Спасибо @nadim Bahadoor за этот ответ

0 голосов
/ 28 апреля 2018

Из вашего кода я бы предположил, что вы используете Spark 2+. Возможно, нет необходимости опускаться до уровня RDD и вместо этого работать с DataFrames.

В приведенном ниже коде показано, как объединить два кадра данных и явно транслировать первый.

import sparkSession.implicits._
import org.apache.spark.sql.functions._

val data = Seq(1, 2, 3, 4, 5)
val dataDF = data.toDF("id")

val largeDataDF = Seq((0, "Apple"), (1, "Pear"), (2, "Banana")).toDF("id", "value")
val df = largeDataDF.join(broadcast(dataDF), Seq("id"))

df.show()

Как правило, небольшие DataFrames являются идеальными кандидатами для вещания в качестве оптимизации, благодаря чему они отправляются всем исполнителям. spark.sql.autoBroadcastJoinThreshold - это конфигурация, которая ограничивает размер фреймов данных, подходящих для трансляции. Дополнительные сведения можно найти в официальной документации Spark

Также обратите внимание, что с DataFrames у вас есть доступ к удобному методу объяснение . С его помощью вы можете увидеть физический план, и он может быть полезен для отладки.

Выполнение объяснения () в нашем примере подтвердит, что Spark выполняет BroadcastHashJoin оптимизацию.

df.explain()

== Physical Plan ==
*Project [id#11, value#12]
+- *BroadcastHashJoin [id#11], [id#3], Inner, BuildRight
:- LocalTableScan [id#11, value#12]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#3]  

Если вам нужна дополнительная помощь с DataFrames, я предоставлю обширный список примеров на http://allaboutscala.com/big-data/spark/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...