Apache Spark Scala - вставка куста в "слишком большую ошибку фрейма данных" - PullRequest
0 голосов
/ 27 ноября 2018

Я пытаюсь вставить в Hive, используя приведенный ниже код, но по какой-то причине он всегда терпит неудачу.Я пытался настроить память, но это не помогло.

Трассировка стека ошибок:

[Stage 4:=====================================================>(999 + 1) / 1000]18/11/27 09:59:44 WARN TaskSetManager: Lost task 364.0 in stage 4.0 (TID 1367, spark-node, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:159)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.shuffle.FetchFailedException: Too large frame: 5587345928
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:286)
... 8 more
Caused by: java.lang.IllegalArgumentException: Too large frame: 5587345928
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more

Вот мой иск-отправка: spark-submit --class com.generic.MSSQLHiveIngestion -основная пряжа --num-executors 8 --executor-cores 2 --executor-memory 16G --driver-memory 8G --driver-cores 4 --conf spark.yarn.executor.memoryOverhead = 1G data-ingestion.jar

Ниже мой код sudo:

//create spark session first
val spark = SparkSession.builder()
  .appName("MSSQLIngestion")
  .master("yarn")
  .config("spark.sql.caseSensitive", "false")
  .config("spark.sql.shuffle.partitions", "1000")
  .config("spark.shuffle.spill", "true")
  .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .enableHiveSupport()
  .getOrCreate();

spark.sql("set hive.exec.parallel=true")

// Create a Properties() object to hold the parameters.
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", driverClass)
connectionProperties.setProperty("fetchSize", "100000")

// read data from JDBC server and construct a dataframe
val jdbcDF1 = spark.read.jdbc(url = jdbcUrl, table = (select * from jdbcTable) e, properties = connectionProperties)

val jdbcDF = jdbcDF1.repartition(1000)

val count = jdbcDF.count()

println("red "+count+" records from sql server and started loading into hive")

// if count > 0 then insert the records into Hive
if (count > 0) {
  // create spark temporary table
  jdbcDF.createOrReplaceTempView("sparkTempTable")
  // insert into Hive external table
  spark.sql("insert into externalTable partition (hivePartitionCol) select * from sparkTempTable  distribute by  hivePartitionCol ")
}
println("completed the job for loading the data into hive")

spark.stop()

1 Ответ

0 голосов
/ 27 ноября 2018

Эта проблема возникает из-за слишком больших объектов, чтобы перетасовать.

Можете ли вы попытаться увеличить число разделов?

.config("spark.shuffle.compress", true)

Или вы можете уменьшить размер блока, чтобы уменьшить использование случайной памяти

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...