В настоящее время я работаю над переносом таблицы кустов.Проблема, с которой я сталкиваюсь, заключается в том, что, когда я пытаюсь загрузить его в информационный фрейм в искре и записать обратно в Hive, возникает ошибка сериализации, как на скриншоте.Тем не менее, когда я пытаюсь выполнить преобразование Dataframe с ~ 60 столбцами, он работает нормально.У нас есть приблизительно 200 столбцов, которые нам нужны из 1800 столбцов в таблице.Я думаю, что это из-за размера таблицы, которая вызывает эту ошибку.
Я думаю, что ошибка сериализации вызвана «Spark версии 2.2.0» (используемой в наших кластерах разработки).Существует ошибка, связанная с «org.apache.spark.unsafe.types.UTF8String $ IntWrapper», которая вызывает эту ошибку в этой версии.В версиях Spark 2.2.1 и выше эта ошибка была устранена в соответствии с отчетом о проблеме.В качестве поворота к этому я попытался обернуть развернутую флягу с обновленными зависимостями.Я все еще сталкиваюсь с той же проблемой, потому что я думаю, что она все еще использует jar-файлы по умолчанию из среды разработки.Я был бы очень благодарен, если есть какая-либо альтернатива для его запуска.
Ссылка на ошибку apache = https://issues.apache.org/jira/browse/SPARK-21445
Ссылка на решение = https://github.com/apache/spark/pull/18660/commits/d2202903518b3dfa0f4a719a0b9cb5431088ed66
ВРешение проблемы они написали код в Java, я пишу в Scala.Я хочу знать, как импортировать классы "IntWrapper" и "LongWrapper" из библиотеки UTF8String и позволить программе использовать мои объявленные переменные.Для этого я написал классы, взяв класс в ссылке в качестве ссылки и используя KryoRegistrator в spark, чтобы вызвать эти классы к моей «сессии сеанса».Это правильно?bsTst3.scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.sources.IsNotNull
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SerializableWritable
import java.io._
@SerialVersionUID(0L)
class IntWrapper extends Serializable{
@transient var value:Int=0
}
@SerialVersionUID(1L)
class LongWrapper extends Serializable{
@transient var value:Long=0
}
object bsTst3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DrfApp")
.config("spark.kryo.registrator", "bsKryoRegistrator")
.config("spark.kryoserializer.buffer", "1024k")
.config("spark.kryoserializer.buffer.max", "1024m")
.enableHiveSupport().getOrCreate()
val bDRF = spark.sql("select * from global.table_partition1 limit 10")
import spark.implicits._
bDRF.write.saveAsTable("katukuri.sample3")
}
}
и это класс scala - это то, что я вызываю из основного объекта bsKryoRegistrator.scala
import org.apache.spark.serializer.KryoRegistrator
import com.esotericsoftware.kryo.Kryo;
import org.apache.hadoop.io.NullWritable
class bsKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Byte])
kryo.register(classOf[Short])
//kryo.register(classOf[Int])
//kryo.register(classOf[Long])
kryo.register(classOf[IntWrapper])
kryo.register(classOf[LongWrapper])
kryo.register(classOf[Float])
kryo.register(classOf[Double])
kryo.register(classOf[String])
kryo.register(classOf[Boolean])
kryo.register(classOf[Char])
kryo.register(classOf[Null])
kryo.register(classOf[Nothing])
//kryo.register(classOf[None])
kryo.register(classOf[NullWritable])
}
}
команда, которую я использую для запуска на своем кластере dev
spark2-submit --class bsTst3 --master yarn --queue root.default --deploy-mode client BSDrdTest3-0.0.1-SNAPSHOT.jar
ошибка, которую я получаю, которая похожа на ту, что в ссылке на ошибку
19/02/27 18:06:14 INFO spark.SparkContext: Starting job: saveAsTable at bsTst3.scala:37
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Registering RDD 6 (saveAsTable at bsTst3.scala:37)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Got job 0 (saveAsTable at bsTst3.scala:37) with 1 output partitions
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (saveAsTable at bsTst3.scala:37)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[6] at saveAsTable at bsTst3.scala:37), which has no missing parents
19/02/27 18:06:14 INFO cluster.YarnScheduler: Cancelling stage 0
19/02/27 18:06:14 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (saveAsTable at bsTst3.scala:37) failed in Unknown s due to Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
Serialization stack:
- object not serializable (class: org.apache.spark.unsafe.types.UTF8String$IntWrapper, value: org.apache.spark.unsafe.types.UTF8String$IntWrapper@63ec9c06)
- field (class: org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, name: result$2, type: class org.apache.spark.unsafe.types.UTF8String$IntWrapper)