«Исключение в потоке» dispatcher-event-l oop -0 »java .lang.OutOfMemoryError: Java пространство кучи» в коде Spark Scala - PullRequest
0 голосов
/ 29 февраля 2020
val data = spark.read
    .text(filePath)
    .toDF("val")
    .withColumn("id", monotonically_increasing_id())



    val count = data.count()



    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)



    val columns = header
    .replace("H|*|", "")
    .replace("|##|", "")
    .split("\\|\\*\\|")


    val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))



    var correctData = data.where('id > 1 && 'id < count-1).select("val")
    var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
    var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

    display(finalDF)

Эта часть кода дает ошибку:

Исключение в потоке "dispatcher-event-l oop -0" java .lang.OutOfMemoryError: Java пространство кучи

После нескольких часов отладки, в основном, детали:

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

, вызвавшей ошибку.

Я изменил деталь как

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         }).toList
  val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)

Но ошибка остается такой же. Что мне следует изменить, чтобы избежать этого?

Когда я запускал этот код, это спарк-кластер блоков данных, определенное задание выдает эту ошибку драйвера Spark:

Задание прервано из-за сбоя этапа: Задача с сериализацией 45: 0 было 792585456 байт, что превышает максимально допустимое значение: spark.rp c .message.maxSize (268435456 байт).

Я добавил эту часть кода:

spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)

, но бесполезно.

1 Ответ

2 голосов
/ 29 февраля 2020

Я предполагаю, что

var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")

- это проблема, потому что вы собираете (почти) все данные для драйвера, то есть для 1 единственной JVM.

Возможно, эта строка запускается , но последующие операции на dataString превысят пределы вашей памяти. Вы не должны собирать свои данные! Вместо этого работайте с распределенными «структурами данных», такими как Dataframe или RDD.

Я думаю, вы можете просто пропустить collect в приведенной выше строке

...